From ed9e03ae1412e8c249b1cb70cc2b4e996edae46c Mon Sep 17 00:00:00 2001 From: Adam Rutkowski Date: Mon, 21 Nov 2022 15:54:47 +0100 Subject: [PATCH] Outline basic rate-limiting interface (#2452) * Outline basic rate-limiting interface * Fixup typespecs * Address empty cache race condition on refresh (#2457) Ref https://github.com/plausible/analytics/pull/2444/files#r1023751222 * Fix up test case: make sure the cache is refreshed --- lib/plausible/site/cache.ex | 32 +++- lib/plausible/site/cache/rate_limiter.ex | 109 ++++++++++++++ test/plausible/site/cache_test.exs | 63 ++++++++ test/plausible/site/rate_limiter_test.exs | 169 ++++++++++++++++++++++ 4 files changed, 366 insertions(+), 7 deletions(-) create mode 100644 lib/plausible/site/cache/rate_limiter.ex create mode 100644 test/plausible/site/rate_limiter_test.exs diff --git a/lib/plausible/site/cache.ex b/lib/plausible/site/cache.ex index f0df6e33b..b77943764 100644 --- a/lib/plausible/site/cache.ex +++ b/lib/plausible/site/cache.ex @@ -22,7 +22,7 @@ defmodule Plausible.Site.Cache do There are two modes of refreshing the cache: `:all` and `:single`. * `:all` means querying the database for all Site entries and should be done - periodically (via `Cache.Warmer`). All existing Cache entries all cleared + periodically (via `Cache.Warmer`). All existing Cache entries all cleared prior to writing the new batch. * `:single` attempts to re-query a specific site by domain and should be done @@ -74,8 +74,8 @@ defmodule Plausible.Site.Cache do end @spec refresh_all(Keyword.t()) :: :ok - def refresh_all(opts) do - cache_name = Keyword.fetch!(opts, :cache_name) + def refresh_all(opts \\ []) do + cache_name = Keyword.get(opts, :cache_name, @cache_name) measure_duration(telemetry_event_refresh(cache_name, :all), fn -> sites_by_domain_query = @@ -87,11 +87,29 @@ defmodule Plausible.Site.Cache do sites_by_domain = Plausible.Repo.all(sites_by_domain_query) - Cachex.clear!(cache_name) + :ok = merge(sites_by_domain, opts) + end) - if not Enum.empty?(sites_by_domain) do - true = Cachex.put_many!(cache_name, sites_by_domain) - end + :ok + end + + @spec merge(new_items :: [Site.t()], opts :: Keyword.t()) :: :ok + def merge(new_items, opts \\ []) + def merge([], _), do: :ok + + def merge(new_items, opts) do + cache_name = Keyword.get(opts, :cache_name, @cache_name) + {:ok, old_keys} = Cachex.keys(cache_name) + + new = MapSet.new(Enum.into(new_items, [], fn {k, _} -> k end)) + old = MapSet.new(old_keys) + + true = Cachex.put_many!(cache_name, new_items) + + old + |> MapSet.difference(new) + |> Enum.each(fn k -> + Cachex.del(cache_name, k) end) :ok diff --git a/lib/plausible/site/cache/rate_limiter.ex b/lib/plausible/site/cache/rate_limiter.ex new file mode 100644 index 000000000..dc11feb5e --- /dev/null +++ b/lib/plausible/site/cache/rate_limiter.ex @@ -0,0 +1,109 @@ +defmodule Plausible.Site.RateLimiter do + @policy_for_non_existing_sites :deny + @policy_on_rate_limiting_backend_error :allow + + @moduledoc """ + Thin wrapper around Hammer for rate limiting domain-specific events + during the ingestion phase. Currently there are two policies + on which the `allow/2` function operates: + * `:allow` + * `:deny` + + Rate Limiting buckets are configured per site (externally via the CRM). + See: `Plausible.Site` + + To look up each site's configuration, the RateLimiter fetches + a Site by domain using `Plausible.Cache` interface. + If the Site is not found in Cache, a DB refresh attempt is made. + The result of that last attempt gets stored in Cache to prevent + excessive DB queries. + + The module defines two policies outside the regular bucket inspection: + * when the site does not exist in the database: #{@policy_for_non_existing_sites} + * when the underlying rate limiting mechanism returns + an internal error: #{@policy_on_rate_limiting_backend_error} + + Each policy computation emits a single telemetry event. + See: `policy_telemetry_event/1` + """ + alias Plausible.Site + alias Plausible.Site.Cache + + require Logger + + @spec allow?(String.t(), Keyword.t()) :: boolean() + def allow?(domain, opts \\ []) do + policy(domain, opts) == :allow + end + + @spec key(String.t()) :: String.t() + def key(domain) do + "ingest:site:#{domain}" + end + + @spec policy_telemetry_event(:allow | :deny) :: list(atom()) + def policy_telemetry_event(policy) do + [:plausible, :ingest, :rate_limit, policy] + end + + defp policy(domain, opts) do + result = + case get_from_cache_or_refresh(domain, Keyword.get(opts, :cache_opts, [])) do + %Ecto.NoResultsError{} -> + @policy_for_non_existing_sites + + %Site{} = site -> + check_rate_limit(site, opts) + + {:error, _} -> + @policy_on_rate_limiting_backend_error + end + + :ok = emit_allowance_telemetry(result) + result + end + + defp check_rate_limit(%Site{ingest_rate_limit_threshold: nil}, _opts) do + :allow + end + + defp check_rate_limit(%Site{ingest_rate_limit_threshold: threshold} = site, opts) + when is_integer(threshold) do + key = Keyword.get(opts, :key, key(site.domain)) + scale_ms = site.ingest_rate_limit_scale_seconds * 1_000 + + case Hammer.check_rate(key, scale_ms, threshold) do + {:deny, _} -> + :deny + + {:allow, _} -> + :allow + + {:error, reason} -> + Logger.error( + "Error checking rate limit for '#{key}': #{inspect(reason)}. Falling back to: #{@policy_on_rate_limiting_backend_error}" + ) + + @policy_on_rate_limiting_backend_error + end + end + + defp get_from_cache_or_refresh(domain, cache_opts) do + case Cache.get(domain, cache_opts) do + %Site{} = site -> + site + + %Ecto.NoResultsError{} = not_found -> + not_found + + nil -> + with {:ok, refreshed_item} <- Cache.refresh_one(domain, cache_opts) do + refreshed_item + end + end + end + + defp emit_allowance_telemetry(policy) do + :telemetry.execute(policy_telemetry_event(policy), %{}, %{}) + end +end diff --git a/test/plausible/site/cache_test.exs b/test/plausible/site/cache_test.exs index 4b6edb7a5..84648c66a 100644 --- a/test/plausible/site/cache_test.exs +++ b/test/plausible/site/cache_test.exs @@ -195,6 +195,69 @@ defmodule Plausible.Site.CacheTest do end end + describe "merging the cache" do + test "merging adds new items", %{test: test} do + {:ok, _} = start_test_cache(test) + + :ok = Cache.merge([{"item1", :item1}], cache_name: test) + assert :item1 == Cache.get("item1", cache_name: test, force?: true) + end + + test "merging no new items leaves the old cache intact", %{test: test} do + {:ok, _} = start_test_cache(test) + + :ok = Cache.merge([{"item1", :item1}], cache_name: test) + :ok = Cache.merge([], cache_name: test) + assert :item1 == Cache.get("item1", cache_name: test, force?: true) + end + + test "merging removes stale items", %{test: test} do + {:ok, _} = start_test_cache(test) + + :ok = Cache.merge([{"item1", :item1}], cache_name: test) + :ok = Cache.merge([{"item2", :item2}], cache_name: test) + + refute Cache.get("item1", cache_name: test, force?: true) + assert Cache.get("item2", cache_name: test, force?: true) + end + + test "merging updates changed items", %{test: test} do + {:ok, _} = start_test_cache(test) + + :ok = Cache.merge([{"item1", :item1}, {"item2", :item2}], cache_name: test) + :ok = Cache.merge([{"item1", :changed}, {"item2", :item2}], cache_name: test) + + assert :changed == Cache.get("item1", cache_name: test, force?: true) + assert :item2 == Cache.get("item2", cache_name: test, force?: true) + end + + @items1 for i <- 1..200_000, do: {i, :batch1} + @items2 for _ <- 1..200_000, do: {Enum.random(1..400_000), :batch2} + @max_seconds 2 + test "merging large sets is expected to be under #{@max_seconds} seconds", %{test: test} do + {:ok, _} = start_test_cache(test) + + {t1, :ok} = + :timer.tc(fn -> + :ok = Cache.merge(@items1, cache_name: test) + end) + + {t2, :ok} = + :timer.tc(fn -> + :ok = Cache.merge(@items1, cache_name: test) + end) + + {t3, :ok} = + :timer.tc(fn -> + :ok = Cache.merge(@items2, cache_name: test) + end) + + assert t1 / 1_000_000 <= @max_seconds + assert t2 / 1_000_000 <= @max_seconds + assert t3 / 1_000_000 <= @max_seconds + end + end + defp report_back(test_pid) do fn opts -> send(test_pid, {:cache_warmed, %{at: System.monotonic_time(), opts: opts}}) diff --git a/test/plausible/site/rate_limiter_test.exs b/test/plausible/site/rate_limiter_test.exs new file mode 100644 index 000000000..74b8d92df --- /dev/null +++ b/test/plausible/site/rate_limiter_test.exs @@ -0,0 +1,169 @@ +defmodule Plausible.Site.RateLimiterTest do + use Plausible.DataCase, async: true + + alias Plausible.Site.Cache + alias Plausible.Site.RateLimiter + + import ExUnit.CaptureLog + + setup %{test: test} do + {:ok, _} = start_test_cache(test) + opts = [cache_opts: [cache_name: test, force?: true]] + {:ok, %{opts: opts}} + end + + test "(for now) throws an exception when cache is disabled" do + assert_raise(RuntimeError, fn -> RateLimiter.allow?("example.com") end) + end + + test "sites not found in cache/DB are denied", %{opts: opts} do + refute RateLimiter.allow?("example.com", opts) + end + + test "site from cache with no ingest_rate_limit_threshold is allowed", %{test: test, opts: opts} do + domain = "site1.example.com" + + add_site_and_refresh_cache(test, domain: domain) + assert RateLimiter.allow?(domain, opts) + end + + test "site from DB with no ingest_rate_limit_threshold is allowed", %{opts: opts} do + domain = "site1.example.com" + + insert(:site, domain: domain) + + assert RateLimiter.allow?(domain, opts) + end + + test "rate limiting works with threshold", %{test: test, opts: opts} do + domain = "site1.example.com" + + add_site_and_refresh_cache(test, + domain: domain, + ingest_rate_limit_threshold: 1, + ingest_rate_limit_scale_seconds: 60 + ) + + assert RateLimiter.allow?(domain, opts) + refute RateLimiter.allow?(domain, opts) + refute RateLimiter.allow?(domain, opts) + end + + test "rate limiting works with scale window", %{test: test, opts: opts} do + domain = "site1.example.com" + + add_site_and_refresh_cache(test, + domain: domain, + ingest_rate_limit_threshold: 1, + ingest_rate_limit_scale_seconds: 1 + ) + + assert RateLimiter.allow?(domain, opts) + Process.sleep(1) + refute RateLimiter.allow?(domain, opts) + Process.sleep(1_000) + assert RateLimiter.allow?(domain, opts) + end + + test "rate limiting prioritises cache lookups", %{test: test, opts: opts} do + domain = "site1.example.com" + + site = + add_site_and_refresh_cache(test, + domain: domain, + ingest_rate_limit_threshold: 1000, + ingest_rate_limit_scale_seconds: 600 + ) + + {:ok, _} = Plausible.Repo.delete(site) + # We need some dummy site, otherwise the cache won't refresh in case the DB + # is completely empty + insert(:site) + + assert RateLimiter.allow?(domain, opts) + :ok = Cache.refresh_all(opts[:cache_opts]) + refute RateLimiter.allow?(domain, opts) + end + + test "rate limiter policy switches to allow when RL backend errors bubble-up", %{ + test: test, + opts: opts + } do + domain = "causingerrors.example.com" + + site = + add_site_and_refresh_cache(test, + domain: domain, + ingest_rate_limit_threshold: 0, + ingest_rate_limit_scale_seconds: 600 + ) + + refute RateLimiter.allow?(domain, opts) + {:ok, :broken} = break_hammer(site) + + log = + capture_log(fn -> + assert RateLimiter.allow?(domain, opts) + end) + + assert log =~ "Error checking rate limit for 'ingest:site:causingerrors.example.com'" + assert log =~ "Falling back to: allow" + end + + test "telemetry event is emitted on :deny", %{test: test, opts: opts} do + start_telemetry_handler(test, event: RateLimiter.policy_telemetry_event(:deny)) + RateLimiter.allow?("example.com", opts) + assert_receive :telemetry_handled + end + + test "telemetry event is emitted on :allow", %{test: test, opts: opts} do + start_telemetry_handler(test, event: RateLimiter.policy_telemetry_event(:allow)) + + domain = "site1.example.com" + add_site_and_refresh_cache(test, domain: domain) + + RateLimiter.allow?(domain, opts) + assert_receive :telemetry_handled + end + + # We need a way to force Hammer to error-out on Hammer.check_rate/3. + # This is tricky because we don't configure multiple backends, + # so the easiest (and naive) way to do it, without mocking, is to + # plant a hand-crafted ets entry that makes it throw an exception + # when it gets to it. This should not affect any shared state tests + # because the rogue entry is only stored for a specific key. + # The drawback of doing this, the test will break if we + # ever change the underlying Rate Limiting backend/implementation. + defp break_hammer(site) do + scale_ms = site.ingest_rate_limit_scale_seconds * 1_000 + rogue_key = site.domain + our_key = RateLimiter.key(rogue_key) + {_, key} = Hammer.Utils.stamp_key(our_key, scale_ms) + true = :ets.insert(:hammer_ets_buckets, {key, 1, "TOTALLY-WRONG", "ABSOLUTELY-BREAKING"}) + {:ok, :broken} + end + + defp start_test_cache(cache_name) do + %{start: {m, f, a}} = Cache.child_spec(cache_name: cache_name) + apply(m, f, a) + end + + defp add_site_and_refresh_cache(cache_name, site_data) do + site = insert(:site, site_data) + Cache.refresh_one(site.domain, cache_name: cache_name, force?: true) + site + end + + defp start_telemetry_handler(test, event: event) do + test_pid = self() + + :telemetry.attach( + "#{test}-telemetry-handler", + event, + fn ^event, %{}, %{}, _ -> + send(test_pid, :telemetry_handled) + end, + %{} + ) + end +end