Outline basic rate-limiting interface (#2452)

* Outline basic rate-limiting interface

* Fixup typespecs

* Address empty cache race condition on refresh (#2457)

Ref https://github.com/plausible/analytics/pull/2444/files#r1023751222

* Fix up test case: make sure the cache is refreshed
This commit is contained in:
Adam Rutkowski 2022-11-21 15:54:47 +01:00 committed by GitHub
parent 135471c32e
commit ed9e03ae14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 366 additions and 7 deletions

View File

@ -74,8 +74,8 @@ defmodule Plausible.Site.Cache do
end
@spec refresh_all(Keyword.t()) :: :ok
def refresh_all(opts) do
cache_name = Keyword.fetch!(opts, :cache_name)
def refresh_all(opts \\ []) do
cache_name = Keyword.get(opts, :cache_name, @cache_name)
measure_duration(telemetry_event_refresh(cache_name, :all), fn ->
sites_by_domain_query =
@ -87,11 +87,29 @@ defmodule Plausible.Site.Cache do
sites_by_domain = Plausible.Repo.all(sites_by_domain_query)
Cachex.clear!(cache_name)
:ok = merge(sites_by_domain, opts)
end)
if not Enum.empty?(sites_by_domain) do
true = Cachex.put_many!(cache_name, sites_by_domain)
end
:ok
end
@spec merge(new_items :: [Site.t()], opts :: Keyword.t()) :: :ok
def merge(new_items, opts \\ [])
def merge([], _), do: :ok
def merge(new_items, opts) do
cache_name = Keyword.get(opts, :cache_name, @cache_name)
{:ok, old_keys} = Cachex.keys(cache_name)
new = MapSet.new(Enum.into(new_items, [], fn {k, _} -> k end))
old = MapSet.new(old_keys)
true = Cachex.put_many!(cache_name, new_items)
old
|> MapSet.difference(new)
|> Enum.each(fn k ->
Cachex.del(cache_name, k)
end)
:ok

109
lib/plausible/site/cache/rate_limiter.ex vendored Normal file
View File

@ -0,0 +1,109 @@
defmodule Plausible.Site.RateLimiter do
@policy_for_non_existing_sites :deny
@policy_on_rate_limiting_backend_error :allow
@moduledoc """
Thin wrapper around Hammer for rate limiting domain-specific events
during the ingestion phase. Currently there are two policies
on which the `allow/2` function operates:
* `:allow`
* `:deny`
Rate Limiting buckets are configured per site (externally via the CRM).
See: `Plausible.Site`
To look up each site's configuration, the RateLimiter fetches
a Site by domain using `Plausible.Cache` interface.
If the Site is not found in Cache, a DB refresh attempt is made.
The result of that last attempt gets stored in Cache to prevent
excessive DB queries.
The module defines two policies outside the regular bucket inspection:
* when the site does not exist in the database: #{@policy_for_non_existing_sites}
* when the underlying rate limiting mechanism returns
an internal error: #{@policy_on_rate_limiting_backend_error}
Each policy computation emits a single telemetry event.
See: `policy_telemetry_event/1`
"""
alias Plausible.Site
alias Plausible.Site.Cache
require Logger
@spec allow?(String.t(), Keyword.t()) :: boolean()
def allow?(domain, opts \\ []) do
policy(domain, opts) == :allow
end
@spec key(String.t()) :: String.t()
def key(domain) do
"ingest:site:#{domain}"
end
@spec policy_telemetry_event(:allow | :deny) :: list(atom())
def policy_telemetry_event(policy) do
[:plausible, :ingest, :rate_limit, policy]
end
defp policy(domain, opts) do
result =
case get_from_cache_or_refresh(domain, Keyword.get(opts, :cache_opts, [])) do
%Ecto.NoResultsError{} ->
@policy_for_non_existing_sites
%Site{} = site ->
check_rate_limit(site, opts)
{:error, _} ->
@policy_on_rate_limiting_backend_error
end
:ok = emit_allowance_telemetry(result)
result
end
defp check_rate_limit(%Site{ingest_rate_limit_threshold: nil}, _opts) do
:allow
end
defp check_rate_limit(%Site{ingest_rate_limit_threshold: threshold} = site, opts)
when is_integer(threshold) do
key = Keyword.get(opts, :key, key(site.domain))
scale_ms = site.ingest_rate_limit_scale_seconds * 1_000
case Hammer.check_rate(key, scale_ms, threshold) do
{:deny, _} ->
:deny
{:allow, _} ->
:allow
{:error, reason} ->
Logger.error(
"Error checking rate limit for '#{key}': #{inspect(reason)}. Falling back to: #{@policy_on_rate_limiting_backend_error}"
)
@policy_on_rate_limiting_backend_error
end
end
defp get_from_cache_or_refresh(domain, cache_opts) do
case Cache.get(domain, cache_opts) do
%Site{} = site ->
site
%Ecto.NoResultsError{} = not_found ->
not_found
nil ->
with {:ok, refreshed_item} <- Cache.refresh_one(domain, cache_opts) do
refreshed_item
end
end
end
defp emit_allowance_telemetry(policy) do
:telemetry.execute(policy_telemetry_event(policy), %{}, %{})
end
end

View File

@ -195,6 +195,69 @@ defmodule Plausible.Site.CacheTest do
end
end
describe "merging the cache" do
test "merging adds new items", %{test: test} do
{:ok, _} = start_test_cache(test)
:ok = Cache.merge([{"item1", :item1}], cache_name: test)
assert :item1 == Cache.get("item1", cache_name: test, force?: true)
end
test "merging no new items leaves the old cache intact", %{test: test} do
{:ok, _} = start_test_cache(test)
:ok = Cache.merge([{"item1", :item1}], cache_name: test)
:ok = Cache.merge([], cache_name: test)
assert :item1 == Cache.get("item1", cache_name: test, force?: true)
end
test "merging removes stale items", %{test: test} do
{:ok, _} = start_test_cache(test)
:ok = Cache.merge([{"item1", :item1}], cache_name: test)
:ok = Cache.merge([{"item2", :item2}], cache_name: test)
refute Cache.get("item1", cache_name: test, force?: true)
assert Cache.get("item2", cache_name: test, force?: true)
end
test "merging updates changed items", %{test: test} do
{:ok, _} = start_test_cache(test)
:ok = Cache.merge([{"item1", :item1}, {"item2", :item2}], cache_name: test)
:ok = Cache.merge([{"item1", :changed}, {"item2", :item2}], cache_name: test)
assert :changed == Cache.get("item1", cache_name: test, force?: true)
assert :item2 == Cache.get("item2", cache_name: test, force?: true)
end
@items1 for i <- 1..200_000, do: {i, :batch1}
@items2 for _ <- 1..200_000, do: {Enum.random(1..400_000), :batch2}
@max_seconds 2
test "merging large sets is expected to be under #{@max_seconds} seconds", %{test: test} do
{:ok, _} = start_test_cache(test)
{t1, :ok} =
:timer.tc(fn ->
:ok = Cache.merge(@items1, cache_name: test)
end)
{t2, :ok} =
:timer.tc(fn ->
:ok = Cache.merge(@items1, cache_name: test)
end)
{t3, :ok} =
:timer.tc(fn ->
:ok = Cache.merge(@items2, cache_name: test)
end)
assert t1 / 1_000_000 <= @max_seconds
assert t2 / 1_000_000 <= @max_seconds
assert t3 / 1_000_000 <= @max_seconds
end
end
defp report_back(test_pid) do
fn opts ->
send(test_pid, {:cache_warmed, %{at: System.monotonic_time(), opts: opts}})

View File

@ -0,0 +1,169 @@
defmodule Plausible.Site.RateLimiterTest do
use Plausible.DataCase, async: true
alias Plausible.Site.Cache
alias Plausible.Site.RateLimiter
import ExUnit.CaptureLog
setup %{test: test} do
{:ok, _} = start_test_cache(test)
opts = [cache_opts: [cache_name: test, force?: true]]
{:ok, %{opts: opts}}
end
test "(for now) throws an exception when cache is disabled" do
assert_raise(RuntimeError, fn -> RateLimiter.allow?("example.com") end)
end
test "sites not found in cache/DB are denied", %{opts: opts} do
refute RateLimiter.allow?("example.com", opts)
end
test "site from cache with no ingest_rate_limit_threshold is allowed", %{test: test, opts: opts} do
domain = "site1.example.com"
add_site_and_refresh_cache(test, domain: domain)
assert RateLimiter.allow?(domain, opts)
end
test "site from DB with no ingest_rate_limit_threshold is allowed", %{opts: opts} do
domain = "site1.example.com"
insert(:site, domain: domain)
assert RateLimiter.allow?(domain, opts)
end
test "rate limiting works with threshold", %{test: test, opts: opts} do
domain = "site1.example.com"
add_site_and_refresh_cache(test,
domain: domain,
ingest_rate_limit_threshold: 1,
ingest_rate_limit_scale_seconds: 60
)
assert RateLimiter.allow?(domain, opts)
refute RateLimiter.allow?(domain, opts)
refute RateLimiter.allow?(domain, opts)
end
test "rate limiting works with scale window", %{test: test, opts: opts} do
domain = "site1.example.com"
add_site_and_refresh_cache(test,
domain: domain,
ingest_rate_limit_threshold: 1,
ingest_rate_limit_scale_seconds: 1
)
assert RateLimiter.allow?(domain, opts)
Process.sleep(1)
refute RateLimiter.allow?(domain, opts)
Process.sleep(1_000)
assert RateLimiter.allow?(domain, opts)
end
test "rate limiting prioritises cache lookups", %{test: test, opts: opts} do
domain = "site1.example.com"
site =
add_site_and_refresh_cache(test,
domain: domain,
ingest_rate_limit_threshold: 1000,
ingest_rate_limit_scale_seconds: 600
)
{:ok, _} = Plausible.Repo.delete(site)
# We need some dummy site, otherwise the cache won't refresh in case the DB
# is completely empty
insert(:site)
assert RateLimiter.allow?(domain, opts)
:ok = Cache.refresh_all(opts[:cache_opts])
refute RateLimiter.allow?(domain, opts)
end
test "rate limiter policy switches to allow when RL backend errors bubble-up", %{
test: test,
opts: opts
} do
domain = "causingerrors.example.com"
site =
add_site_and_refresh_cache(test,
domain: domain,
ingest_rate_limit_threshold: 0,
ingest_rate_limit_scale_seconds: 600
)
refute RateLimiter.allow?(domain, opts)
{:ok, :broken} = break_hammer(site)
log =
capture_log(fn ->
assert RateLimiter.allow?(domain, opts)
end)
assert log =~ "Error checking rate limit for 'ingest:site:causingerrors.example.com'"
assert log =~ "Falling back to: allow"
end
test "telemetry event is emitted on :deny", %{test: test, opts: opts} do
start_telemetry_handler(test, event: RateLimiter.policy_telemetry_event(:deny))
RateLimiter.allow?("example.com", opts)
assert_receive :telemetry_handled
end
test "telemetry event is emitted on :allow", %{test: test, opts: opts} do
start_telemetry_handler(test, event: RateLimiter.policy_telemetry_event(:allow))
domain = "site1.example.com"
add_site_and_refresh_cache(test, domain: domain)
RateLimiter.allow?(domain, opts)
assert_receive :telemetry_handled
end
# We need a way to force Hammer to error-out on Hammer.check_rate/3.
# This is tricky because we don't configure multiple backends,
# so the easiest (and naive) way to do it, without mocking, is to
# plant a hand-crafted ets entry that makes it throw an exception
# when it gets to it. This should not affect any shared state tests
# because the rogue entry is only stored for a specific key.
# The drawback of doing this, the test will break if we
# ever change the underlying Rate Limiting backend/implementation.
defp break_hammer(site) do
scale_ms = site.ingest_rate_limit_scale_seconds * 1_000
rogue_key = site.domain
our_key = RateLimiter.key(rogue_key)
{_, key} = Hammer.Utils.stamp_key(our_key, scale_ms)
true = :ets.insert(:hammer_ets_buckets, {key, 1, "TOTALLY-WRONG", "ABSOLUTELY-BREAKING"})
{:ok, :broken}
end
defp start_test_cache(cache_name) do
%{start: {m, f, a}} = Cache.child_spec(cache_name: cache_name)
apply(m, f, a)
end
defp add_site_and_refresh_cache(cache_name, site_data) do
site = insert(:site, site_data)
Cache.refresh_one(site.domain, cache_name: cache_name, force?: true)
site
end
defp start_telemetry_handler(test, event: event) do
test_pid = self()
:telemetry.attach(
"#{test}-telemetry-handler",
event,
fn ^event, %{}, %{}, _ ->
send(test_pid, :telemetry_handled)
end,
%{}
)
end
end