Gatekeep ingestion pipeline (#2472)

* Update Sites.Cache

So it's now capable of refreshing most recent sites.
Refreshing a single site is no longer wanted.

* Introduce Warmer.RecentlyUpdated

This is Sites Cache warmer that runs only for
most recently updated sites every 30s.

* Validate Request creation early

* Rename RateLimiter to GateKeeper and introduce detailed policies

* Update events API tests - a provisioned site is now required

* Update events ingestion tests

* Make limits visible in CRM Sites index

* Hard-deprecate DOMAIN_BLACKLIST

* Remove unnecessary clause

* Fix typo

* Explicitly delegate Warmer.All

* GateKeeper.allwoance => GateKeeper.check

* Instrument Sites.Cache measurments

* Update send_pageview task to output response headers

* Instrument ingestion pipeline

* Credo

* Make event telemetry test a sync case

* Simplify Request.uri/hostname handling

* Use embedded schema, apply action and rely on get_field
This commit is contained in:
Adam Rutkowski 2022-11-28 15:50:55 +01:00 committed by GitHub
parent e2563acf36
commit 356575ef78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1918 additions and 1602 deletions

View File

@ -158,11 +158,6 @@ log_level =
|> get_var_from_path_or_env("LOG_LEVEL", "warn")
|> String.to_existing_atom()
domain_blacklist =
config_dir
|> get_var_from_path_or_env("DOMAIN_BLACKLIST", "")
|> String.split(",")
is_selfhost =
config_dir
|> get_var_from_path_or_env("SELFHOST", "true")
@ -195,8 +190,7 @@ config :plausible,
site_limit: site_limit,
site_limit_exempt: site_limit_exempt,
is_selfhost: is_selfhost,
custom_script_name: custom_script_name,
domain_blacklist: domain_blacklist
custom_script_name: custom_script_name
config :plausible, :selfhost,
enable_email_verification: enable_email_verification,

View File

@ -50,11 +50,13 @@ defmodule Mix.Tasks.SendPageview do
body = get_body(parsed_opts)
case Plausible.HTTPClient.post(url, headers, body) do
{:ok, _} ->
{:ok, resp} ->
IO.puts(
"✅ Successfully sent #{body[:name]} event to #{url}\n\nip=#{ip}\nuser_agent=#{user_agent}\nbody= #{inspect(body, pretty: true)}"
)
IO.puts("Response headers: " <> inspect(resp.headers, pretty: true))
{:error, e} ->
IO.puts("❌ Could not send event to #{url}. Got the following error: \n\n #{inspect(e)}")
end

View File

@ -21,11 +21,12 @@ defmodule Plausible.Application do
Supervisor.child_spec({Cachex, name: :sessions, limit: nil, stats: true},
id: :cachex_sessions
),
Plausible.PromEx,
{Plausible.Site.Cache, []},
{Plausible.Site.Cache.Warmer, []},
{Plausible.Site.Cache.Warmer.All, []},
{Plausible.Site.Cache.Warmer.RecentlyUpdated, []},
PlausibleWeb.Endpoint,
{Oban, Application.get_env(:plausible, Oban)},
Plausible.PromEx
{Oban, Application.get_env(:plausible, Oban)}
]
opts = [strategy: :one_for_one, name: Plausible.Supervisor]

View File

@ -7,6 +7,7 @@ defmodule Plausible.Ingestion.Event do
"""
alias Plausible.Ingestion.{Request, CityOverrides}
alias Plausible.ClickhouseEvent
alias Plausible.Site.GateKeeper
defstruct domain: nil,
clickhouse_event_attrs: %{},
@ -14,13 +15,14 @@ defmodule Plausible.Ingestion.Event do
dropped?: false,
drop_reason: nil,
request: nil,
salts: nil
salts: nil,
changeset: nil
@type drop_reason() ::
:bot
| :domain_blocked
| :spam_referrer
| {:error, Ecto.Changeset.t()}
| GateKeeper.policy()
| :invalid
@type t() :: %__MODULE__{
domain: String.t() | nil,
@ -29,26 +31,28 @@ defmodule Plausible.Ingestion.Event do
dropped?: boolean(),
drop_reason: drop_reason(),
request: Request.t(),
salts: map()
salts: map(),
changeset: %Ecto.Changeset{}
}
@spec build_and_buffer(Request.t()) ::
{:ok, %{dropped: [t()], buffered: [t()]}}
@spec build_and_buffer(Request.t()) :: {:ok, %{buffered: [t()], dropped: [t()]}}
def build_and_buffer(%Request{domains: domains} = request) do
processed_events =
if spam_referrer?(request) do
for domain <- domains, do: drop(new(domain, request), :spam_referrer)
else
Enum.reduce(domains, [], fn domain, acc ->
if domain_blocked?(domain) do
[drop(new(domain, request), :domain_blocked) | acc]
else
processed =
domain
|> new(request)
|> process_unless_dropped(pipeline())
case GateKeeper.check(domain) do
:allow ->
processed =
domain
|> new(request)
|> process_unless_dropped(pipeline())
[processed | acc]
[processed | acc]
{:deny, reason} ->
[drop(new(domain, request), reason) | acc]
end
end)
end
@ -57,6 +61,16 @@ defmodule Plausible.Ingestion.Event do
{:ok, %{dropped: dropped, buffered: buffered}}
end
@spec telemetry_event_buffered() :: [atom()]
def telemetry_event_buffered() do
[:plausible, :ingest, :event, :buffered]
end
@spec telemetry_event_dropped() :: [atom()]
def telemetry_event_dropped() do
[:plausible, :ingest, :event, :dropped]
end
defp pipeline() do
[
&put_user_agent/1,
@ -84,15 +98,21 @@ defmodule Plausible.Ingestion.Event do
end
defp new(domain, request) do
%__MODULE__{domain: domain, request: request}
struct!(__MODULE__, domain: domain, request: request)
end
defp drop(%__MODULE__{} = event, reason) do
%{event | dropped?: true, drop_reason: reason}
defp drop(%__MODULE__{} = event, reason, attrs \\ []) do
fields =
attrs
|> Keyword.put(:dropped?, true)
|> Keyword.put(:drop_reason, reason)
emit_telemetry_dropped(reason)
struct!(event, fields)
end
defp update_attrs(%__MODULE__{} = event, %{} = attrs) do
%{event | clickhouse_event_attrs: Map.merge(event.clickhouse_event_attrs, attrs)}
struct!(event, clickhouse_event_attrs: Map.merge(event.clickhouse_event_attrs, attrs))
end
defp put_user_agent(%__MODULE__{} = event) do
@ -117,19 +137,12 @@ defmodule Plausible.Ingestion.Event do
end
defp put_basic_info(%__MODULE__{} = event) do
host =
case event.request.uri do
%{host: ""} -> "(none)"
%{host: host} when is_binary(host) -> host
_ -> nil
end
update_attrs(event, %{
domain: event.domain,
timestamp: NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second),
timestamp: event.request.timestamp,
name: event.request.event_name,
hostname: Request.sanitize_hostname(host),
pathname: get_pathname(event.request.uri, event.request.hash_mode)
hostname: event.request.hostname,
pathname: event.request.pathname
})
end
@ -239,7 +252,7 @@ defmodule Plausible.Ingestion.Event do
%{event | clickhouse_event: valid_clickhouse_event}
{:error, changeset} ->
drop(event, {:error, changeset})
drop(event, :invalid, changeset: changeset)
end
end
@ -260,24 +273,10 @@ defmodule Plausible.Ingestion.Event do
defp write_to_buffer(%__MODULE__{clickhouse_event: clickhouse_event} = event) do
{:ok, _} = Plausible.Event.WriteBuffer.insert(clickhouse_event)
emit_telemetry_buffered()
event
end
defp get_pathname(_uri = nil, _hash_mode), do: "/"
defp get_pathname(uri, hash_mode) do
pathname =
(uri.path || "/")
|> URI.decode()
|> String.trim_trailing()
if hash_mode == 1 && uri.fragment do
pathname <> "#" <> URI.decode(uri.fragment)
else
pathname
end
end
defp parse_referrer(_uri, _referrer_str = nil), do: nil
defp parse_referrer(uri, referrer_str) do
@ -409,8 +408,11 @@ defmodule Plausible.Ingestion.Event do
defp spam_referrer?(_), do: false
defp domain_blocked?(domain) do
domain in Application.get_env(:plausible, :domain_blacklist) or
FunWithFlags.enabled?(:block_event_ingest, for: domain)
defp emit_telemetry_buffered() do
:telemetry.execute(telemetry_event_buffered(), %{}, %{})
end
defp emit_telemetry_dropped(reason) do
:telemetry.execute(telemetry_event_dropped(), %{}, %{reason: reason})
end
end

View File

@ -1,51 +1,68 @@
defmodule Plausible.Ingestion.Request do
@moduledoc """
The %Plausible.Ingestion.Request{} struct stores all needed fields to create an event downstream.
The %Plausible.Ingestion.Request{} struct stores all needed fields
to create an event downstream. Pre-eliminary validation is made
to detect user errors early.
"""
defstruct [
:remote_ip,
:user_agent,
:event_name,
:uri,
:referrer,
:domains,
:screen_width,
:hash_mode,
props: %{},
query_params: %{}
]
use Ecto.Schema
alias Ecto.Changeset
@type t() :: %__MODULE__{
remote_ip: String.t() | nil,
user_agent: String.t() | nil,
event_name: term(),
uri: URI.t() | nil,
referrer: term(),
domains: list(String.t()),
screen_width: term(),
hash_mode: term(),
props: map(),
query_params: map()
}
embedded_schema do
field :remote_ip, :string
field :user_agent, :string
field :event_name, :string
field :uri, :string
field :hostname, :string
field :referrer, :string
field :domains, {:array, :string}
field :screen_width, :string
field :hash_mode, :string
field :pathname, :string
field :props, :map
field :query_params, :map
@spec build(Plug.Conn.t()) :: {:ok, t()} | {:error, :invalid_json}
field :timestamp, :naive_datetime,
default: NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)
end
@type t() :: %__MODULE__{}
@spec build(Plug.Conn.t()) :: {:ok, t()} | {:error, Changeset.t()}
@doc """
Builds a list of %Plausible.Ingestion.Request{} struct from %Plug.Conn{}.
Builds and initially validates %Plausible.Ingestion.Request{} struct from %Plug.Conn{}.
"""
def build(%Plug.Conn{} = conn) do
with {:ok, request_body} <- parse_body(conn) do
%__MODULE__{}
|> Map.put(:remote_ip, PlausibleWeb.RemoteIp.get(conn))
|> put_uri(request_body)
|> put_user_agent(conn)
|> put_request_params(request_body)
|> put_query_params()
|> map_domains(request_body)
|> then(&{:ok, &1})
changeset = Changeset.change(%__MODULE__{})
case parse_body(conn) do
{:ok, request_body} ->
changeset
|> put_remote_ip(conn)
|> put_uri(request_body)
|> put_hostname()
|> put_user_agent(conn)
|> put_request_params(request_body)
|> put_pathname()
|> put_query_params()
|> map_domains(request_body)
|> Changeset.validate_required([
:event_name,
:hostname,
:pathname,
:timestamp
])
|> Changeset.apply_action(nil)
{:error, :invalid_json} ->
{:error, Changeset.add_error(changeset, :request, "Unable to parse request body as json")}
end
end
defp put_remote_ip(changeset, conn) do
Changeset.put_change(changeset, :remote_ip, PlausibleWeb.RemoteIp.get(conn))
end
defp parse_body(conn) do
case conn.body_params do
%Plug.Conn.Unfetched{} ->
@ -61,36 +78,74 @@ defmodule Plausible.Ingestion.Request do
end
end
defp put_request_params(%__MODULE__{} = request, %{} = request_body) do
%__MODULE__{
request
| event_name: request_body["n"] || request_body["name"],
referrer: request_body["r"] || request_body["referrer"],
screen_width: request_body["w"] || request_body["screen_width"],
hash_mode: request_body["h"] || request_body["hashMode"],
props: parse_props(request_body)
}
defp put_request_params(changeset, %{} = request_body) do
Changeset.change(
changeset,
event_name: request_body["n"] || request_body["name"],
referrer: request_body["r"] || request_body["referrer"],
screen_width: request_body["w"] || request_body["screen_width"],
hash_mode: request_body["h"] || request_body["hashMode"],
props: parse_props(request_body)
)
end
defp map_domains(%__MODULE__{} = request, %{} = request_body) do
domains =
if raw = request_body["d"] || request_body["domain"] do
raw
|> String.split(",")
|> Enum.map(&sanitize_hostname/1)
else
[sanitize_hostname(request.uri)]
defp put_pathname(changeset) do
uri = Changeset.get_field(changeset, :uri)
hash_mode = Changeset.get_field(changeset, :hash_mode)
pathname = get_pathname(uri, hash_mode)
Changeset.put_change(changeset, :pathname, pathname)
end
defp map_domains(changeset, %{} = request_body) do
raw = request_body["d"] || request_body["domain"]
raw = if is_binary(raw), do: String.trim(raw)
case raw do
"" ->
Changeset.add_error(changeset, :domain, "can't be blank")
raw when is_binary(raw) ->
domains =
raw
|> String.split(",")
|> Enum.map(&sanitize_hostname/1)
Changeset.put_change(changeset, :domains, domains)
nil ->
from_uri = sanitize_hostname(Changeset.get_field(changeset, :uri))
if from_uri do
Changeset.put_change(changeset, :domains, [from_uri])
else
Changeset.add_error(changeset, :domain, "can't be blank")
end
end
end
defp put_uri(changeset, %{} = request_body) do
url = request_body["u"] || request_body["url"]
case url do
nil ->
Changeset.add_error(changeset, :url, "is required")
url when is_binary(url) ->
Changeset.put_change(changeset, :uri, URI.parse(url))
_ ->
Changeset.add_error(changeset, :url, "must be a string")
end
end
defp put_hostname(changeset) do
host =
case Changeset.get_field(changeset, :uri) do
%{host: host} when is_binary(host) and host != "" -> host
_ -> "(none)"
end
%__MODULE__{request | domains: domains}
end
defp put_uri(%__MODULE__{} = request, %{} = request_body) do
if url = request_body["u"] || request_body["url"] do
%__MODULE__{request | uri: URI.parse(url)}
else
request
end
Changeset.put_change(changeset, :hostname, sanitize_hostname(host))
end
defp parse_props(%{} = request_body) do
@ -132,23 +187,38 @@ defmodule Plausible.Ingestion.Request do
end
end
defp put_query_params(%__MODULE__{} = request) do
case request do
%__MODULE__{uri: %URI{query: query}} when is_binary(query) ->
%__MODULE__{request | query_params: URI.decode_query(query)}
defp put_query_params(changeset) do
case Changeset.get_field(changeset, :uri) do
%{query: query} when is_binary(query) ->
Changeset.put_change(changeset, :query_params, URI.decode_query(query))
_any ->
request
changeset
end
end
defp put_user_agent(%__MODULE__{} = request, %Plug.Conn{} = conn) do
defp put_user_agent(changeset, %Plug.Conn{} = conn) do
user_agent =
conn
|> Plug.Conn.get_req_header("user-agent")
|> List.first()
%__MODULE__{request | user_agent: user_agent}
Changeset.put_change(changeset, :user_agent, user_agent)
end
defp get_pathname(nil, _hash_mode), do: "/"
defp get_pathname(uri, hash_mode) do
pathname =
(uri.path || "/")
|> URI.decode()
|> String.trim_trailing()
if hash_mode == 1 && uri.fragment do
pathname <> "#" <> URI.decode(uri.fragment)
else
pathname
end
end
@doc """

View File

@ -40,7 +40,16 @@ defmodule Plausible.SiteAdmin do
timezone: nil,
public: nil,
owner: %{value: &get_owner_email/1},
other_members: %{value: &get_other_members/1}
other_members: %{value: &get_other_members/1},
limits: %{
value: fn site ->
case site.ingest_rate_limit_threshold do
nil -> ""
0 -> "🛑 BLOCKED"
n -> "#{n}/#{site.ingest_rate_limit_scale_seconds}s (per server)"
end
end
}
]
end

View File

@ -19,23 +19,18 @@ defmodule Plausible.Site.Cache do
database counterpart -- to spare bandwidth and query execution time,
only selected database columns are retrieved and cached.
There are two modes of refreshing the cache: `:all` and `:single`.
There are two modes of refreshing the cache: `:all` and `:updated_recently`.
* `:all` means querying the database for all Site entries and should be done
periodically (via `Cache.Warmer`). All existing Cache entries all cleared
prior to writing the new batch.
periodically (via `Cache.Warmer`). All stale Cache entries are cleared
upon persisting the new batch.
* `:single` attempts to re-query a specific site by domain and should be done
only when the initial Cache.get attempt resulted with `nil`. Single refresh will
write `%Ecto.NoResultsError{}` to the cache so that subsequent Cache.get calls
indicate that we already failed to retrieve a Site.
* `:updated_recently` attempts to re-query sites updated within the last
15 minutes only, to account for most recent changes. This refresh
is lighter on the database and is meant to be executed more frequently
(via `Cache.Warmer.RecentlyUpdated`).
This helps in recognising missing/deleted Sites with minimal number of DB lookups
across a disconnected cluster within the periodic refresh window.
Refreshing a single Site emits a telemetry event including `duration` measurement
and meta-data indicating whether the site was found in the DB or is missing still.
The telemetry event is defined with `telemetry_event_refresh/2`.
Refreshing the cache emits telemetry event defined as per `telemetry_event_refresh/2`.
The `@cached_schema_fields` attribute defines the list of DB columns
queried on each cache refresh.
@ -49,6 +44,7 @@ defmodule Plausible.Site.Cache do
alias Plausible.Site
@cache_name :sites_by_domain
@modes [:all, :updated_recently]
@cached_schema_fields ~w(
id
@ -57,8 +53,7 @@ defmodule Plausible.Site.Cache do
ingest_rate_limit_threshold
)a
@type not_found_in_db() :: %Ecto.NoResultsError{}
@type t() :: Site.t() | not_found_in_db()
@type t() :: Site.t()
def name(), do: @cache_name
@ -75,22 +70,36 @@ defmodule Plausible.Site.Cache do
@spec refresh_all(Keyword.t()) :: :ok
def refresh_all(opts \\ []) do
cache_name = Keyword.get(opts, :cache_name, @cache_name)
sites_by_domain_query =
from s in Site,
select: {
s.domain,
%{struct(s, ^@cached_schema_fields) | from_cache?: true}
}
measure_duration(telemetry_event_refresh(cache_name, :all), fn ->
sites_by_domain_query =
from s in Site,
select: {
s.domain,
%{struct(s, ^@cached_schema_fields) | from_cache?: true}
}
refresh(
:all,
sites_by_domain_query,
Keyword.put(opts, :delete_stale_items?, true)
)
end
sites_by_domain = Plausible.Repo.all(sites_by_domain_query)
@spec refresh_updated_recently(Keyword.t()) :: :ok
def refresh_updated_recently(opts \\ []) do
recently_updated_sites_query =
from s in Site,
order_by: [asc: s.updated_at],
where: s.updated_at > ago(^15, "minute"),
select: {
s.domain,
%{struct(s, ^@cached_schema_fields) | from_cache?: true}
}
:ok = merge(sites_by_domain, opts)
end)
:ok
refresh(
:updated_recently,
recently_updated_sites_query,
Keyword.put(opts, :delete_stale_items?, false)
)
end
@spec merge(new_items :: [Site.t()], opts :: Keyword.t()) :: :ok
@ -99,18 +108,20 @@ defmodule Plausible.Site.Cache do
def merge(new_items, opts) do
cache_name = Keyword.get(opts, :cache_name, @cache_name)
{:ok, old_keys} = Cachex.keys(cache_name)
new = MapSet.new(Enum.into(new_items, [], fn {k, _} -> k end))
old = MapSet.new(old_keys)
true = Cachex.put_many!(cache_name, new_items)
old
|> MapSet.difference(new)
|> Enum.each(fn k ->
Cachex.del(cache_name, k)
end)
if Keyword.get(opts, :delete_stale_items?, true) do
{:ok, old_keys} = Cachex.keys(cache_name)
new = MapSet.new(Enum.into(new_items, [], fn {k, _} -> k end))
old = MapSet.new(old_keys)
old
|> MapSet.difference(new)
|> Enum.each(fn k ->
Cachex.del(cache_name, k)
end)
end
:ok
end
@ -152,29 +163,8 @@ defmodule Plausible.Site.Cache do
end
end
@spec refresh_one(String.t(), Keyword.t()) :: {:ok, t()} | {:error, any()}
def refresh_one(domain, opts) do
cache_name = Keyword.get(opts, :cache_name, @cache_name)
force? = Keyword.get(opts, :force?, false)
if not enabled?() and not force?, do: raise("Cache: '#{cache_name}' is disabled")
measure_duration_with_metadata(telemetry_event_refresh(cache_name, :one), fn ->
{found_in_db?, item_to_cache} = select_one(domain)
case Cachex.put(cache_name, domain, item_to_cache) do
{:ok, _} ->
result = {:ok, item_to_cache}
{result, with_telemetry_metadata(found_in_db?: found_in_db?)}
{:error, _} = error ->
{error, with_telemetry_metadata(error: true)}
end
end)
end
@spec telemetry_event_refresh(atom(), :all | :one) :: list(atom())
def telemetry_event_refresh(cache_name, mode) when mode in [:all, :one] do
@spec telemetry_event_refresh(atom(), atom()) :: list(atom())
def telemetry_event_refresh(cache_name \\ @cache_name, mode) when mode in @modes do
[:plausible, :cache, cache_name, :refresh, mode]
end
@ -182,26 +172,15 @@ defmodule Plausible.Site.Cache do
Application.fetch_env!(:plausible, :sites_by_domain_cache_enabled) == true
end
defp select_one(domain) do
site_by_domain_query =
from s in Site,
where: s.domain == ^domain,
select: %{struct(s, ^@cached_schema_fields) | from_cache?: true}
defp refresh(mode, query, opts) when mode in @modes do
cache_name = Keyword.get(opts, :cache_name, @cache_name)
case Plausible.Repo.one(site_by_domain_query) do
nil -> {false, %Ecto.NoResultsError{}}
site -> {true, site}
end
end
measure_duration(telemetry_event_refresh(cache_name, mode), fn ->
sites_by_domain = Plausible.Repo.all(query)
:ok = merge(sites_by_domain, opts)
end)
defp with_telemetry_metadata(props) do
Enum.into(props, %{})
end
defp measure_duration_with_metadata(event, fun) when is_function(fun, 0) do
{duration, {result, telemetry_metadata}} = time_it(fun)
:telemetry.execute(event, %{duration: duration}, telemetry_metadata)
result
:ok
end
defp measure_duration(event, fun) when is_function(fun, 0) do

View File

@ -1,109 +0,0 @@
defmodule Plausible.Site.RateLimiter do
@policy_for_non_existing_sites :deny
@policy_on_rate_limiting_backend_error :allow
@moduledoc """
Thin wrapper around Hammer for rate limiting domain-specific events
during the ingestion phase. Currently there are two policies
on which the `allow/2` function operates:
* `:allow`
* `:deny`
Rate Limiting buckets are configured per site (externally via the CRM).
See: `Plausible.Site`
To look up each site's configuration, the RateLimiter fetches
a Site by domain using `Plausible.Cache` interface.
If the Site is not found in Cache, a DB refresh attempt is made.
The result of that last attempt gets stored in Cache to prevent
excessive DB queries.
The module defines two policies outside the regular bucket inspection:
* when the site does not exist in the database: #{@policy_for_non_existing_sites}
* when the underlying rate limiting mechanism returns
an internal error: #{@policy_on_rate_limiting_backend_error}
Each policy computation emits a single telemetry event.
See: `policy_telemetry_event/1`
"""
alias Plausible.Site
alias Plausible.Site.Cache
require Logger
@spec allow?(String.t(), Keyword.t()) :: boolean()
def allow?(domain, opts \\ []) do
policy(domain, opts) == :allow
end
@spec key(String.t()) :: String.t()
def key(domain) do
"ingest:site:#{domain}"
end
@spec policy_telemetry_event(:allow | :deny) :: list(atom())
def policy_telemetry_event(policy) do
[:plausible, :ingest, :rate_limit, policy]
end
defp policy(domain, opts) do
result =
case get_from_cache_or_refresh(domain, Keyword.get(opts, :cache_opts, [])) do
%Ecto.NoResultsError{} ->
@policy_for_non_existing_sites
%Site{} = site ->
check_rate_limit(site, opts)
{:error, _} ->
@policy_on_rate_limiting_backend_error
end
:ok = emit_allowance_telemetry(result)
result
end
defp check_rate_limit(%Site{ingest_rate_limit_threshold: nil}, _opts) do
:allow
end
defp check_rate_limit(%Site{ingest_rate_limit_threshold: threshold} = site, opts)
when is_integer(threshold) do
key = Keyword.get(opts, :key, key(site.domain))
scale_ms = site.ingest_rate_limit_scale_seconds * 1_000
case Hammer.check_rate(key, scale_ms, threshold) do
{:deny, _} ->
:deny
{:allow, _} ->
:allow
{:error, reason} ->
Logger.error(
"Error checking rate limit for '#{key}': #{inspect(reason)}. Falling back to: #{@policy_on_rate_limiting_backend_error}"
)
@policy_on_rate_limiting_backend_error
end
end
defp get_from_cache_or_refresh(domain, cache_opts) do
case Cache.get(domain, cache_opts) do
%Site{} = site ->
site
%Ecto.NoResultsError{} = not_found ->
not_found
nil ->
with {:ok, refreshed_item} <- Cache.refresh_one(domain, cache_opts) do
refreshed_item
end
end
end
defp emit_allowance_telemetry(policy) do
:telemetry.execute(policy_telemetry_event(policy), %{}, %{})
end
end

View File

@ -30,11 +30,11 @@ defmodule Plausible.Site.Cache.Warmer do
@spec child_spec(Keyword.t()) :: Supervisor.child_spec() | :ignore
def child_spec(opts) do
child_name = Keyword.get(opts, :child_name, {:local, __MODULE__})
child_name = Keyword.get(opts, :child_name, __MODULE__)
%{
id: __MODULE__,
start: {:gen_cycle, :start_link, [child_name, __MODULE__, opts]}
id: child_name,
start: {:gen_cycle, :start_link, [{:local, child_name}, __MODULE__, opts]}
}
end

View File

@ -0,0 +1,8 @@
defmodule Plausible.Site.Cache.Warmer.All do
@moduledoc """
A Cache.Warmer adapter that refreshes the Sites Cache fully.
This module exists only to make it explicit what the warmer
refreshes, to be used in the supervisor tree.
"""
defdelegate child_spec(opts), to: Plausible.Site.Cache.Warmer
end

View File

@ -0,0 +1,20 @@
defmodule Plausible.Site.Cache.Warmer.RecentlyUpdated do
@moduledoc """
A Cache.Warmer adapter that only refreshes the Cache
with recently updated sites every 30 seconds.
"""
alias Plausible.Site.Cache
@spec child_spec(Keyword.t()) :: Supervisor.child_spec() | :ignore
def child_spec(opts) do
child_name = Keyword.get(opts, :child_name, __MODULE__)
opts = [
child_name: child_name,
interval: :timer.seconds(30),
warmer_fn: &Cache.refresh_updated_recently/1
]
Plausible.Site.Cache.Warmer.child_spec(opts)
end
end

View File

@ -0,0 +1,87 @@
defmodule Plausible.Site.GateKeeper do
@type policy() :: :allow | :not_found | :block | :throttle
@policy_for_non_existing_sites :not_found
@policy_on_rate_limiting_backend_error :allow
@type t() :: :allow | {:deny, policy()}
@moduledoc """
Thin wrapper around Hammer for gate keeping domain-specific events
during the ingestion phase. When the site is allowed, gate keeping
check returns `:allow`, otherwise a `:deny` tagged tuple is returned
with one of the following policy markers:
* `:not_found` (indicates site not found in cache)
* `:block` (indicates disabled sites)
* `:throttle` (indicates rate limiting)
Rate Limiting buckets are configured per site (externally via the CRM).
See: `Plausible.Site`
To look up each site's configuration, the RateLimiter fetches
a Site by domain using `Plausible.Cache` interface.
The module defines two policies outside the regular bucket inspection:
* when the the site is not found in cache: #{@policy_for_non_existing_sites}
* when the underlying rate limiting mechanism returns
an internal error: #{@policy_on_rate_limiting_backend_error}
"""
alias Plausible.Site
alias Plausible.Site.Cache
require Logger
@spec check(String.t(), Keyword.t()) :: t()
def check(domain, opts \\ []) when is_binary(domain) do
case policy(domain, opts) do
:allow -> :allow
other -> {:deny, other}
end
end
@spec key(String.t()) :: String.t()
def key(domain) do
"ingest:site:#{domain}"
end
defp policy(domain, opts) when is_binary(domain) do
result =
case Cache.get(domain, Keyword.get(opts, :cache_opts, [])) do
nil ->
@policy_for_non_existing_sites
%Site{} = site ->
check_rate_limit(site, opts)
end
result
end
defp check_rate_limit(%Site{ingest_rate_limit_threshold: nil}, _opts) do
:allow
end
defp check_rate_limit(%Site{ingest_rate_limit_threshold: 0}, _opts) do
:block
end
defp check_rate_limit(%Site{ingest_rate_limit_threshold: threshold} = site, opts)
when is_integer(threshold) do
key = Keyword.get(opts, :key, key(site.domain))
scale_ms = site.ingest_rate_limit_scale_seconds * 1_000
case Hammer.check_rate(key, scale_ms, threshold) do
{:deny, _} ->
:throttle
{:allow, _} ->
:allow
{:error, reason} ->
Logger.error(
"Error checking rate limit for '#{key}': #{inspect(reason)}. Falling back to: #{@policy_on_rate_limiting_backend_error}"
)
@policy_on_rate_limiting_backend_error
end
end
end

View File

@ -3,6 +3,8 @@ defmodule Plausible.PromEx.Plugins.PlausibleMetrics do
Custom PromEx plugin for instrumenting code within Plausible app.
"""
use PromEx.Plugin
alias Plausible.Site
alias Plausible.Ingestion
@impl true
def polling_metrics(opts) do
@ -19,6 +21,45 @@ defmodule Plausible.PromEx.Plugins.PlausibleMetrics do
]
end
@impl true
def event_metrics(opts) do
otp_app = Keyword.fetch!(opts, :otp_app)
metric_prefix = Keyword.get(opts, :metric_prefix, PromEx.metric_prefix(otp_app, :plausible))
Event.build(
:plausible_internal_telemetry,
[
distribution(
metric_prefix ++ [:cache_warmer, :sites, :refresh, :all],
event_name: Site.Cache.telemetry_event_refresh(:all),
reporter_options: [
buckets: [500, 1000, 2000, 5000, 10_000]
],
unit: {:native, :millisecond},
measurement: :duration
),
distribution(
metric_prefix ++ [:cache_warmer, :sites, :refresh, :updated_recently],
event_name: Site.Cache.telemetry_event_refresh(:updated_recently),
reporter_options: [
buckets: [500, 1000, 2000, 5000, 10_000]
],
unit: {:native, :millisecond},
measurement: :duration
),
counter(
metric_prefix ++ [:ingest, :events, :buffered, :total],
event_name: Ingestion.Event.telemetry_event_buffered()
),
counter(
metric_prefix ++ [:ingest, :events, :dropped, :total],
event_name: Ingestion.Event.telemetry_event_dropped(),
tags: [:reason]
)
]
)
end
@doc """
Add telemetry events for Session and Event write buffers
"""
@ -68,6 +109,11 @@ defmodule Plausible.PromEx.Plugins.PlausibleMetrics do
count: sessions_count,
hit_rate: sessions_hit_rate
})
:telemetry.execute([:prom_ex, :plugin, :cachex, :sites], %{
count: Site.Cache.size(),
hit_rate: Site.Cache.hit_rate()
})
end
defp write_buffer_metrics(metric_prefix, poll_rate) do
@ -109,13 +155,21 @@ defmodule Plausible.PromEx.Plugins.PlausibleMetrics do
last_value(
metric_prefix ++ [:cache, :user_agents, :hit_ratio],
event_name: [:prom_ex, :plugin, :cachex, :user_agents],
description: "UA cache hit ratio",
measurement: :hit_rate
),
last_value(
metric_prefix ++ [:cache, :sessions, :hit_ratio],
event_name: [:prom_ex, :plugin, :cachex, :sessions],
description: "Sessions cache hit ratio",
measurement: :hit_rate
),
last_value(
metric_prefix ++ [:cache, :sites, :size],
event_name: [:prom_ex, :plugin, :cachex, :sites],
measurement: :count
),
last_value(
metric_prefix ++ [:cache, :sites, :hit_ratio],
event_name: [:prom_ex, :plugin, :cachex, :sites],
measurement: :hit_rate
)
]

View File

@ -35,10 +35,10 @@ defmodule PlausibleWeb.Api.ExternalController do
end
end
else
{:error, :invalid_json} ->
{:error, %Ecto.Changeset{} = changeset} ->
conn
|> put_status(400)
|> json(%{errors: %{request: "Unable to parse request body as json"}})
|> json(%{errors: traverse_errors(changeset)})
end
end

View File

@ -0,0 +1,44 @@
defmodule Plausible.Ingestion.EventTelemetryTest do
import Phoenix.ConnTest
alias Plausible.Ingestion.Request
alias Plausible.Ingestion.Event
use Plausible.DataCase, async: false
test "telemetry is emitted for all events", %{test: test} do
test_pid = self()
telemetry_dropped = Event.telemetry_event_dropped()
telemetry_buffered = Event.telemetry_event_buffered()
:telemetry.attach_many(
"#{test}-telemetry-handler",
[
telemetry_dropped,
telemetry_buffered
],
fn event, %{}, metadata, _ ->
send(test_pid, {:telemetry_handled, event, metadata})
end,
%{}
)
site = insert(:site, ingest_rate_limit_threshold: 2)
payload = %{
name: "pageview",
url: "http://dummy.site",
d: "#{site.domain}"
}
conn = build_conn(:post, "/api/events", payload)
assert {:ok, request} = Request.build(conn)
for _ <- 1..3, do: Event.build_and_buffer(request)
assert_receive {:telemetry_handled, ^telemetry_buffered, %{}}
assert_receive {:telemetry_handled, ^telemetry_buffered, %{}}
assert_receive {:telemetry_handled, ^telemetry_dropped, %{reason: :throttle}}
end
end

View File

@ -1,112 +1,102 @@
defmodule Plausible.Ingestion.EventTest do
use Plausible.DataCase
use Plausible.DataCase, async: true
@valid_request %Plausible.Ingestion.Request{
remote_ip: "2.2.2.2",
user_agent:
"Mozilla/5.0 (iPad; U; CPU OS 3_2_1 like Mac OS X; en-us) AppleWebKit/531.21.10 (KHTML, like Gecko) Mobile/7B405",
event_name: "pageview",
uri: URI.parse("http://skywalker.test"),
referrer: "http://m.facebook.test/",
screen_width: 1440,
hash_mode: nil,
query_params: %{
"utm_medium" => "utm_medium",
"utm_source" => "utm_source",
"utm_campaign" => "utm_campaign",
"utm_content" => "utm_content",
"utm_term" => "utm_term",
"source" => "source",
"ref" => "ref"
import Phoenix.ConnTest
alias Plausible.Ingestion.Request
alias Plausible.Ingestion.Event
test "event pipeline processes a request into an event" do
site = insert(:site)
payload = %{
name: "pageview",
url: "http://#{site.domain}"
}
}
describe "integration" do
test "build_and_buffer/1 creates an event" do
assert {:ok, %{buffered: [_], dropped: []}} =
@valid_request
|> Map.put(:domains, ["plausible-ingestion-event-basic.test"])
|> Plausible.Ingestion.Event.build_and_buffer()
conn = build_conn(:post, "/api/events", payload)
assert {:ok, request} = Request.build(conn)
assert %Plausible.ClickhouseEvent{
session_id: session_id,
user_id: user_id,
domain: "plausible-ingestion-event-basic.test",
browser: "Safari",
browser_version: "",
city_geoname_id: 2_988_507,
country_code: "FR",
hostname: "skywalker.test",
"meta.key": [],
"meta.value": [],
name: "pageview",
operating_system: "iOS",
operating_system_version: "3.2",
pathname: "/",
referrer: "m.facebook.test",
referrer_source: "utm_source",
screen_size: "Desktop",
subdivision1_code: "FR-IDF",
subdivision2_code: "FR-75",
transferred_from: "",
utm_campaign: "utm_campaign",
utm_content: "utm_content",
utm_medium: "utm_medium",
utm_source: "utm_source",
utm_term: "utm_term"
} = get_event("plausible-ingestion-event-basic.test")
assert {:ok, %{buffered: [_], dropped: []}} = Event.build_and_buffer(request)
end
assert is_integer(session_id)
assert is_integer(user_id)
end
test "event pipeline drops a request when site does not exists" do
payload = %{
name: "pageview",
url: "http://dummy.site"
}
test "build_and_buffer/1 takes multiple domains" do
request = %Plausible.Ingestion.Request{
@valid_request
| domains: [
"plausible-ingestion-event-multiple-1.test",
"plausible-ingestion-event-multiple-2.test"
]
}
conn = build_conn(:post, "/api/events", payload)
assert {:ok, request} = Request.build(conn)
assert {:ok, %{buffered: [_, _], dropped: []}} =
Plausible.Ingestion.Event.build_and_buffer(request)
assert {:ok, %{buffered: [], dropped: [dropped]}} = Event.build_and_buffer(request)
assert dropped.drop_reason == :not_found
end
assert %Plausible.ClickhouseEvent{domain: "plausible-ingestion-event-multiple-1.test"} =
get_event("plausible-ingestion-event-multiple-1.test")
test "event pipeline drops a request when referrer is spam" do
site = insert(:site)
assert %Plausible.ClickhouseEvent{domain: "plausible-ingestion-event-multiple-2.test"} =
get_event("plausible-ingestion-event-multiple-2.test")
end
payload = %{
name: "pageview",
url: "http://dummy.site",
referrer: "https://www.1-best-seo.com",
domain: site.domain
}
test "build_and_buffer/1 drops invalid events" do
request = %Plausible.Ingestion.Request{
@valid_request
| domains: ["plausible-ingestion-event-multiple-with-error-1.test", nil]
}
conn = build_conn(:post, "/api/events", payload)
assert {:ok, request} = Request.build(conn)
assert {:ok, %{buffered: [_], dropped: [dropped]}} =
Plausible.Ingestion.Event.build_and_buffer(request)
assert {:ok, %{buffered: [], dropped: [dropped]}} = Event.build_and_buffer(request)
assert dropped.drop_reason == :spam_referrer
end
assert {:error, changeset} = dropped.drop_reason
refute changeset.valid?
test "event pipeline drops a request when referrer is spam for multiple domains" do
site = insert(:site)
assert %Plausible.ClickhouseEvent{
domain: "plausible-ingestion-event-multiple-with-error-1.test"
} = get_event("plausible-ingestion-event-multiple-with-error-1.test")
end
payload = %{
name: "pageview",
url: "http://dummy.site",
referrer: "https://www.1-best-seo.com",
d: "#{site.domain},#{site.domain}"
}
defp get_event(domain) do
Plausible.TestUtils.eventually(fn ->
Plausible.Event.WriteBuffer.flush()
conn = build_conn(:post, "/api/events", payload)
assert {:ok, request} = Request.build(conn)
event =
Plausible.ClickhouseRepo.one(
from e in Plausible.ClickhouseEvent, where: e.domain == ^domain
)
assert {:ok, %{dropped: [dropped, dropped]}} = Event.build_and_buffer(request)
assert dropped.drop_reason == :spam_referrer
end
{!is_nil(event), event}
end)
end
test "event pipeline selectively drops an event for multiple domains" do
site = insert(:site)
payload = %{
name: "pageview",
url: "http://dummy.site",
d: "#{site.domain},thisdoesnotexist.com"
}
conn = build_conn(:post, "/api/events", payload)
assert {:ok, request} = Request.build(conn)
assert {:ok, %{buffered: [_], dropped: [dropped]}} = Event.build_and_buffer(request)
assert dropped.drop_reason == :not_found
end
test "event pipeline selectively drops an event when rate-limited" do
site = insert(:site, ingest_rate_limit_threshold: 1)
payload = %{
name: "pageview",
url: "http://dummy.site",
d: "#{site.domain}"
}
conn = build_conn(:post, "/api/events", payload)
assert {:ok, request} = Request.build(conn)
assert {:ok, %{buffered: [_], dropped: []}} = Event.build_and_buffer(request)
assert {:ok, %{buffered: [], dropped: [dropped]}} = Event.build_and_buffer(request)
assert dropped.drop_reason == :throttle
end
end

View File

@ -0,0 +1,187 @@
defmodule Plausible.Ingestion.RequestTest do
use Plausible.DataCase, async: true
import Phoenix.ConnTest
import Plug.Conn
alias Plausible.Ingestion.Request
test "request cannot be built from conn without payload" do
conn = build_conn(:post, "/api/events", %{})
assert {:error, changeset} = Request.build(conn)
errors = Keyword.keys(changeset.errors)
assert :event_name in errors
assert :domain in errors
assert :url in errors
end
test "request cannot be built from non-json payload" do
conn = build_conn(:post, "/api/events", "defnotjson")
assert {:error, changeset} = Request.build(conn)
assert changeset.errors[:request]
end
test "request can be built from URL alone" do
payload = %{
name: "pageview",
url: "http://dummy.site"
}
conn = build_conn(:post, "/api/events", payload)
assert {:ok, request} = Request.build(conn)
assert request.domains == ["dummy.site"]
assert request.event_name == "pageview"
assert request.pathname == "/"
assert request.remote_ip == "127.0.0.1"
assert %NaiveDateTime{} = request.timestamp
assert request.user_agent == nil
assert request.hostname == "dummy.site"
assert request.uri.host == "dummy.site"
assert request.uri.scheme == "http"
assert request.props == %{}
end
test "request can be built with domain" do
payload = %{
name: "pageview",
domain: "dummy.site",
url: "http://dummy.site/index"
}
conn = build_conn(:post, "/api/events", payload)
assert {:ok, request} = Request.build(conn)
assert request.domains == ["dummy.site"]
assert request.uri.host == "dummy.site"
end
test "request can be built with domain using shorthands" do
payload = %{
n: "pageview",
d: "dummy.site",
u: "http://dummy.site/index"
}
conn = build_conn(:post, "/api/events", payload)
assert {:ok, request} = Request.build(conn)
assert request.domains == ["dummy.site"]
assert request.uri.host == "dummy.site"
end
test "request can be built for multiple domains" do
payload = %{
n: "pageview",
d: "dummy.site,crash.site",
u: "http://dummy.site/index"
}
conn = build_conn(:post, "/api/events", payload)
assert {:ok, request} = Request.build(conn)
assert request.domains == ["dummy.site", "crash.site"]
assert request.uri.host == "dummy.site"
end
test "hostname is (none) if host-less uri provided" do
payload = %{
name: "pageview",
domain: "dummy.site",
url: "about:config"
}
conn = build_conn(:post, "/api/events", payload)
assert {:ok, request} = Request.build(conn)
assert request.hostname == "(none)"
end
test "hostname is set" do
payload = %{
name: "pageview",
domain: "dummy.site",
url: "http://dummy.site/index.html"
}
conn = build_conn(:post, "/api/events", payload)
assert {:ok, request} = Request.build(conn)
assert request.hostname == "dummy.site"
end
test "user agent is set as-is" do
payload = %{
name: "pageview",
domain: "dummy.site",
url: "http://dummy.site/index.html"
}
conn = build_conn(:post, "/api/events", payload) |> put_req_header("user-agent", "Mozilla")
assert {:ok, request} = Request.build(conn)
assert request.user_agent == "Mozilla"
end
test "request params are set" do
payload = %{
name: "pageview",
domain: "dummy.site",
url: "http://dummy.site/index.html",
referrer: "https://example.com",
screen_width: 1024,
hashMode: 1,
props: %{
"custom1" => "property1",
"custom2" => "property2"
}
}
conn = build_conn(:post, "/api/events", payload)
assert {:ok, request} = Request.build(conn)
assert request.referrer == "https://example.com"
assert request.screen_width == 1024
assert request.hash_mode == 1
assert request.props["custom1"] == "property1"
assert request.props["custom2"] == "property2"
end
test "pathname is set" do
payload = %{
name: "pageview",
domain: "dummy.site",
url: "http://dummy.site/pictures/index.html#foo"
}
conn = build_conn(:post, "/api/events", payload)
assert {:ok, request} = Request.build(conn)
assert request.pathname == "/pictures/index.html"
end
test "pathname is set with hashMode" do
payload = %{
name: "pageview",
domain: "dummy.site",
url: "http://dummy.site/pictures/index.html#foo",
hashMode: 1
}
conn = build_conn(:post, "/api/events", payload)
assert {:ok, request} = Request.build(conn)
assert request.pathname == "/pictures/index.html#foo"
end
test "query params are set" do
payload = %{
name: "pageview",
domain: "dummy.site",
url: "http://dummy.site/pictures/index.html?foo=bar&baz=bam"
}
conn = build_conn(:post, "/api/events", payload)
assert {:ok, request} = Request.build(conn)
assert request.query_params["foo"] == "bar"
assert request.query_params["baz"] == "bam"
end
end

View File

@ -66,7 +66,7 @@ defmodule Plausible.Site.CacheTest do
assert Cache.hit_rate(test) == 50
end
test "a single cached site can be refreshed", %{test: test} do
test "only recently updated sites can be refreshed", %{test: test} do
{:ok, _} = start_test_cache(test)
domain1 = "site1.example.com"
@ -74,44 +74,18 @@ defmodule Plausible.Site.CacheTest do
cache_opts = [cache_name: test, force?: true]
assert Cache.get(domain1) == nil
yesterday = DateTime.utc_now() |> DateTime.add(-1 * 60 * 60 * 24)
insert(:site, domain: domain1, inserted_at: yesterday, updated_at: yesterday)
insert(:site, domain: domain1)
insert(:site, domain: domain2)
assert {:ok, %{domain: ^domain1}} = Cache.refresh_one(domain1, cache_opts)
assert %Site{domain: ^domain1} = Cache.get(domain1, cache_opts)
assert Cache.get(domain1, cache_opts) == nil
assert Cache.get(domain2, cache_opts) == nil
assert {:ok, %Ecto.NoResultsError{}} = Cache.refresh_one(domain2, cache_opts)
assert %Ecto.NoResultsError{} = Cache.get(domain2, cache_opts)
end
assert :ok = Cache.refresh_updated_recently(cache_opts)
test "refreshing a single site sends a telemetry event indicating record not found in the database",
%{
test: test
} do
:ok =
start_test_cache_with_telemetry_handler(test,
event: Cache.telemetry_event_refresh(test, :one)
)
Cache.refresh_one("missing.example.com", force?: true, cache_name: test)
assert_receive {:telemetry_handled, %{found_in_db?: false}}
end
test "refreshing a single site sends a telemetry event indicating record found in the database",
%{
test: test
} do
domain = "site1.example.com"
insert(:site, domain: domain)
:ok =
start_test_cache_with_telemetry_handler(test,
event: Cache.telemetry_event_refresh(test, :one)
)
Cache.refresh_one(domain, force?: true, cache_name: test)
assert_receive {:telemetry_handled, %{found_in_db?: true}}
refute Cache.get(domain1, cache_opts)
assert %Site{domain: ^domain2} = Cache.get(domain2, cache_opts)
end
test "refreshing all sites sends a telemetry event",
@ -190,8 +164,8 @@ defmodule Plausible.Site.CacheTest do
assert Cache.get(domain2, cache_opts)
refute Cache.get(domain1, cache_opts)
Cache.refresh_one(domain1, cache_opts)
assert Cache.get(domain1, cache_opts) == %Ecto.NoResultsError{}
:ok = Cache.refresh_all(cache_opts)
refute Cache.get(domain1, cache_opts)
end
end
@ -221,6 +195,16 @@ defmodule Plausible.Site.CacheTest do
assert Cache.get("item2", cache_name: test, force?: true)
end
test "merging optionally leaves stale items intact", %{test: test} do
{:ok, _} = start_test_cache(test)
:ok = Cache.merge([{"item1", :item1}], cache_name: test)
:ok = Cache.merge([{"item2", :item2}], cache_name: test, delete_stale_items?: false)
assert Cache.get("item1", cache_name: test, force?: true)
assert Cache.get("item2", cache_name: test, force?: true)
end
test "merging updates changed items", %{test: test} do
{:ok, _} = start_test_cache(test)
@ -271,7 +255,7 @@ defmodule Plausible.Site.CacheTest do
end
defp start_test_warmer(opts) do
child_name_opt = {:child_name, {:local, Keyword.fetch!(opts, :cache_name)}}
child_name_opt = {:child_name, Keyword.fetch!(opts, :cache_name)}
%{start: {m, f, a}} = Cache.Warmer.child_spec([child_name_opt | opts])
apply(m, f, a)
end

View File

@ -1,8 +1,8 @@
defmodule Plausible.Site.RateLimiterTest do
defmodule Plausible.Site.GateKeeperTest do
use Plausible.DataCase, async: true
alias Plausible.Site.Cache
alias Plausible.Site.RateLimiter
alias Plausible.Site.GateKeeper
import ExUnit.CaptureLog
@ -12,27 +12,15 @@ defmodule Plausible.Site.RateLimiterTest do
{:ok, %{opts: opts}}
end
test "(for now) throws an exception when cache is disabled" do
assert_raise(RuntimeError, fn -> RateLimiter.allow?("example.com") end)
end
test "sites not found in cache/DB are denied", %{opts: opts} do
refute RateLimiter.allow?("example.com", opts)
test "sites not found in cache are denied", %{opts: opts} do
assert {:deny, :not_found} = GateKeeper.check("example.com", opts)
end
test "site from cache with no ingest_rate_limit_threshold is allowed", %{test: test, opts: opts} do
domain = "site1.example.com"
add_site_and_refresh_cache(test, domain: domain)
assert RateLimiter.allow?(domain, opts)
end
test "site from DB with no ingest_rate_limit_threshold is allowed", %{opts: opts} do
domain = "site1.example.com"
insert(:site, domain: domain)
assert RateLimiter.allow?(domain, opts)
assert :allow = GateKeeper.check(domain, opts)
end
test "rate limiting works with threshold", %{test: test, opts: opts} do
@ -44,9 +32,9 @@ defmodule Plausible.Site.RateLimiterTest do
ingest_rate_limit_scale_seconds: 60
)
assert RateLimiter.allow?(domain, opts)
refute RateLimiter.allow?(domain, opts)
refute RateLimiter.allow?(domain, opts)
assert :allow = GateKeeper.check(domain, opts)
assert {:deny, :throttle} = GateKeeper.check(domain, opts)
assert {:deny, :throttle} = GateKeeper.check(domain, opts)
end
test "rate limiting works with scale window", %{test: test, opts: opts} do
@ -58,11 +46,11 @@ defmodule Plausible.Site.RateLimiterTest do
ingest_rate_limit_scale_seconds: 1
)
assert RateLimiter.allow?(domain, opts)
assert :allow = GateKeeper.check(domain, opts)
Process.sleep(1)
refute RateLimiter.allow?(domain, opts)
assert {:deny, :throttle} = GateKeeper.check(domain, opts)
Process.sleep(1_000)
assert RateLimiter.allow?(domain, opts)
assert :allow = GateKeeper.check(domain, opts)
end
test "rate limiting prioritises cache lookups", %{test: test, opts: opts} do
@ -80,9 +68,9 @@ defmodule Plausible.Site.RateLimiterTest do
# is completely empty
insert(:site)
assert RateLimiter.allow?(domain, opts)
assert :allow = GateKeeper.check(domain, opts)
:ok = Cache.refresh_all(opts[:cache_opts])
refute RateLimiter.allow?(domain, opts)
assert {:deny, :not_found} = GateKeeper.check(domain, opts)
end
test "rate limiter policy switches to allow when RL backend errors bubble-up", %{
@ -94,38 +82,24 @@ defmodule Plausible.Site.RateLimiterTest do
site =
add_site_and_refresh_cache(test,
domain: domain,
ingest_rate_limit_threshold: 0,
ingest_rate_limit_threshold: 1,
ingest_rate_limit_scale_seconds: 600
)
refute RateLimiter.allow?(domain, opts)
assert :allow = GateKeeper.check(domain, opts)
assert {:deny, :throttle} = GateKeeper.check(domain, opts)
{:ok, :broken} = break_hammer(site)
log =
capture_log(fn ->
assert RateLimiter.allow?(domain, opts)
assert :allow = GateKeeper.check(domain, opts)
end)
assert log =~ "Error checking rate limit for 'ingest:site:causingerrors.example.com'"
assert log =~ "Falling back to: allow"
end
test "telemetry event is emitted on :deny", %{test: test, opts: opts} do
start_telemetry_handler(test, event: RateLimiter.policy_telemetry_event(:deny))
RateLimiter.allow?("example.com", opts)
assert_receive :telemetry_handled
end
test "telemetry event is emitted on :allow", %{test: test, opts: opts} do
start_telemetry_handler(test, event: RateLimiter.policy_telemetry_event(:allow))
domain = "site1.example.com"
add_site_and_refresh_cache(test, domain: domain)
RateLimiter.allow?(domain, opts)
assert_receive :telemetry_handled
end
# We need a way to force Hammer to error-out on Hammer.check_rate/3.
# This is tricky because we don't configure multiple backends,
# so the easiest (and naive) way to do it, without mocking, is to
@ -137,7 +111,7 @@ defmodule Plausible.Site.RateLimiterTest do
defp break_hammer(site) do
scale_ms = site.ingest_rate_limit_scale_seconds * 1_000
rogue_key = site.domain
our_key = RateLimiter.key(rogue_key)
our_key = GateKeeper.key(rogue_key)
{_, key} = Hammer.Utils.stamp_key(our_key, scale_ms)
true = :ets.insert(:hammer_ets_buckets, {key, 1, "TOTALLY-WRONG", "ABSOLUTELY-BREAKING"})
{:ok, :broken}
@ -150,20 +124,7 @@ defmodule Plausible.Site.RateLimiterTest do
defp add_site_and_refresh_cache(cache_name, site_data) do
site = insert(:site, site_data)
Cache.refresh_one(site.domain, cache_name: cache_name, force?: true)
Cache.refresh_updated_recently(cache_name: cache_name, force?: true)
site
end
defp start_telemetry_handler(test, event: event) do
test_pid = self()
:telemetry.attach(
"#{test}-telemetry-handler",
event,
fn ^event, %{}, %{}, _ ->
send(test_pid, :telemetry_handled)
end,
%{}
)
end
end

File diff suppressed because it is too large Load Diff