Implement ingest counters (#2693)

* Clickhouse migration: add ingest_counters table

* Configure ingest counters per MIX_ENV

* Emit telemetry for ingest events with rich metadata

* Allow building Request.t() with fake now() - for testing purposes

* Use clickhousex branch where session_id is assigned to each connection

* Add helper function for getting site id via cache

* Add Ecto schema for `ingest_counters` table

* Implement metrics buffer

* Implement buffering handler for `Plausible.Ingestion.Event` telemetry

* Implement periodic metrics aggregation

* Update counters docs

* Add toStartOfMinute() to ordering key

* Reset the sync connection state in `after` clause

* Flush counters on app termination

* Use separate Repo with async settings enabled at config level

* Switch to clickhouse_settings repo root config key

* Add AsyncInsertRepo module
This commit is contained in:
Adam Rutkowski 2023-02-23 14:34:24 +01:00 committed by GitHub
parent 7dec454a23
commit 867dad6da7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 676 additions and 17 deletions

View File

@ -52,4 +52,6 @@ config :plausible,
sites_by_domain_cache_refresh_interval_max_jitter: :timer.seconds(5),
sites_by_domain_cache_refresh_interval: :timer.minutes(15)
config :plausible, Plausible.Ingestion.Counters, enabled: true
import_config "#{config_env()}.exs"

View File

@ -290,6 +290,17 @@ config :plausible, Plausible.IngestRepo,
max_buffer_size: ch_max_buffer_size,
pool_size: ingest_pool_size
config :plausible, Plausible.AsyncInsertRepo,
loggers: [Ecto.LogEntry],
queue_target: 500,
queue_interval: 2000,
url: ch_db_url,
pool_size: 1,
clickhouse_settings: [
async_insert: 1,
wait_for_async_insert: 0
]
case mailer_adapter do
"Bamboo.PostmarkAdapter" ->
config :plausible, Plausible.Mailer,

View File

@ -28,3 +28,5 @@ config :plausible,
session_timeout: 0,
http_impl: Plausible.HTTPClient.Mock,
sites_by_domain_cache_enabled: false
config :plausible, Plausible.Ingestion.Counters, enabled: false

View File

@ -10,6 +10,8 @@ defmodule Plausible.Application do
Plausible.Repo,
Plausible.ClickhouseRepo,
Plausible.IngestRepo,
Plausible.AsyncInsertRepo,
Plausible.Ingestion.Counters,
{Finch, name: Plausible.Finch, pools: finch_pool_config()},
{Phoenix.PubSub, name: Plausible.PubSub},
Plausible.Session.Salts,

View File

@ -0,0 +1,17 @@
defmodule Plausible.AsyncInsertRepo do
@moduledoc """
Clickhouse access with async inserts enabled
"""
use Ecto.Repo,
otp_app: :plausible,
adapter: ClickhouseEcto
defmacro __using__(_) do
quote do
alias Plausible.AsyncInsertRepo
import Ecto
import Ecto.Query, only: [from: 1, from: 2]
end
end
end

View File

@ -0,0 +1,125 @@
defmodule Plausible.Ingestion.Counters do
@moduledoc """
This is instrumentation necessary for keeping track of per-domain
internal metrics. Due to metric labels cardinality (domain x metric_name),
these statistics are not suitable for prometheus/grafana exposure,
hence an internal storage is used.
The module installs `Counters.TelemetryHandler` and periodically
flushes the internal counter aggregates via `Counters.Buffer` interface.
The underlying database schema is running `SummingMergeTree` engine.
To take advantage of automatic roll-ups it provides, upon dispatching the
buffered records to Clickhouse this module transforms each `event_timebucket`
aggregate into a 1-minute resolution.
Clickhouse connection is set to insert counters asynchronously every time
a pool checkout is made. Those properties are reverted once the insert is done
(or naturally, if the connection crashes).
"""
@behaviour :gen_cycle
require Logger
alias Plausible.Ingestion.Counters.Buffer
alias Plausible.Ingestion.Counters.Record
alias Plausible.Ingestion.Counters.TelemetryHandler
alias Plausible.AsyncInsertRepo
@interval :timer.seconds(10)
@spec child_spec(Keyword.t()) :: Supervisor.child_spec() | :ignore
def child_spec(opts) do
buffer_name = Keyword.get(opts, :buffer_name, __MODULE__)
%{
id: buffer_name,
start: {:gen_cycle, :start_link, [{:local, buffer_name}, __MODULE__, opts]}
}
end
@spec enabled?() :: boolean()
def enabled?() do
Application.fetch_env!(:plausible, __MODULE__)[:enabled] == true
end
@impl true
def init_cycle(opts) do
Process.flag(:trap_exit, true)
buffer_name = Keyword.get(opts, :buffer_name, __MODULE__)
force_start? = Keyword.get(opts, :force_start?, false)
if enabled?() or force_start? do
buffer = Buffer.new(buffer_name, opts)
:ok = TelemetryHandler.install(buffer)
interval = Keyword.get(opts, :interval, @interval)
{:ok, {interval, buffer}}
else
:ignore
end
end
@impl true
def handle_cycle(buffer, now \\ DateTime.utc_now()) do
case Buffer.flush(buffer, now) do
[] ->
:noop
records ->
records =
Enum.map(records, fn {bucket, metric, domain, value} ->
%{
event_timebucket: to_0_minute_datetime(bucket),
metric: metric,
site_id: Plausible.Site.Cache.get_site_id(domain),
domain: domain,
value: value
}
end)
try do
{_, _} = AsyncInsertRepo.insert_all(Record, records)
catch
_, thrown ->
Logger.error(
"Caught an error when trying to flush ingest counters: #{inspect(thrown)}"
)
end
end
{:continue_hibernated, buffer}
end
@impl true
def handle_info(:stop, _state) do
{:stop, :normal}
end
def handle_info(_msg, state) do
{:continue, state}
end
@impl true
def terminate(_reason, buffer) do
# we'll travel in time to flush everything regardless of current bucket completion
future = DateTime.utc_now() |> DateTime.add(60, :second)
handle_cycle(buffer, future)
:ok
end
@spec stop(pid()) :: :ok
def stop(pid) do
send(pid, :stop)
:ok
end
defp to_0_minute_datetime(unix_ts) when is_integer(unix_ts) do
unix_ts
|> DateTime.from_unix!()
|> DateTime.truncate(:second)
|> Map.replace(:second, 0)
end
end

View File

@ -0,0 +1,106 @@
defmodule Plausible.Ingestion.Counters.Buffer do
@moduledoc """
A buffer aggregating counters for internal metrics, within 10 seconds time buckets.
See `Plausible.Ingestion.Counters` for integration.
Flushing is by default possible only once the 10s bucket is complete
(its window has moved). This is to avoid race conditions
when clearing up the buffer on dequeue - because there is no atomic "get and delete",
and items are buffered concurrently, there is a gap between get and delete
in which items written may disappear otherwise.
`aggregate_bucket_fn` and `flush_boundary_fn` control that semantics and
are configurable only for test purposes.
"""
defstruct [:buffer_name, :aggregate_bucket_fn, :flush_boundary_fn]
alias Plausible.Ingestion.Counters.Record
@type t() :: %__MODULE__{}
@type unix_timestamp() :: pos_integer()
@type bucket_fn_opt() ::
{:aggregate_bucket_fn, (NaiveDateTime.t() -> unix_timestamp())}
| {:flush_boundary_fn, (DateTime.t() -> unix_timestamp())}
@ets_opts [
:public,
:ordered_set,
:named_table,
write_concurrency: true
]
@spec new(atom(), [bucket_fn_opt()]) :: t()
def new(buffer_name, opts \\ []) do
^buffer_name = :ets.new(buffer_name, @ets_opts)
aggregate_bucket_fn = Keyword.get(opts, :aggregate_bucket_fn, &bucket_10s/1)
flush_boundary_fn = Keyword.get(opts, :flush_boundary_fn, &previous_10s/1)
%__MODULE__{
buffer_name: buffer_name,
aggregate_bucket_fn: aggregate_bucket_fn,
flush_boundary_fn: flush_boundary_fn
}
end
@spec aggregate(t(), binary(), binary(), timestamp :: NaiveDateTime.t()) :: t()
def aggregate(
%__MODULE__{buffer_name: buffer_name, aggregate_bucket_fn: bucket_fn} = buffer,
metric,
domain,
timestamp
) do
bucket = bucket_fn.(timestamp)
:ets.update_counter(
buffer_name,
{bucket, metric, domain},
{2, 1},
{{bucket, metric, domain}, 0}
)
buffer
end
@spec flush(t(), now :: DateTime.t()) :: [Record.t()]
def flush(
%__MODULE__{buffer_name: buffer_name, flush_boundary_fn: flush_boundary_fn},
now \\ DateTime.utc_now()
) do
boundary = flush_boundary_fn.(now)
match = {{:"$1", :"$2", :"$3"}, :"$4"}
guard = {:"=<", :"$1", boundary}
select = {{:"$1", :"$2", :"$3", :"$4"}}
match_specs_read = [{match, [guard], [select]}]
match_specs_delete = [{match, [guard], [true]}]
case :ets.select(buffer_name, match_specs_read) do
[] ->
[]
data ->
:ets.select_delete(buffer_name, match_specs_delete)
data
end
end
@spec bucket_10s(NaiveDateTime.t()) :: unix_timestamp()
def bucket_10s(datetime) do
datetime
|> DateTime.from_naive!("Etc/UTC")
|> DateTime.truncate(:second)
|> Map.replace(:second, div(datetime.second, 10) * 10)
|> DateTime.to_unix()
end
@spec previous_10s(DateTime.t()) :: unix_timestamp()
def previous_10s(datetime) do
datetime
|> DateTime.add(-10, :second)
|> DateTime.to_unix()
end
end

View File

@ -0,0 +1,17 @@
defmodule Plausible.Ingestion.Counters.Record do
@moduledoc """
Clickhouse schema for storing ingest counter metrics
"""
use Ecto.Schema
@type t() :: %__MODULE__{}
@primary_key false
schema "ingest_counters" do
field :event_timebucket, :utc_datetime
field :site_id, :integer
field :domain, :string
field :metric, :string
field :value, :integer
end
end

View File

@ -0,0 +1,49 @@
defmodule Plausible.Ingestion.Counters.TelemetryHandler do
@moduledoc """
Susbcribes to telemetry events emitted by `Plausible.Ingestion.Event`.
Every time a request derived event is either dispatched to clickhouse or dropped,
a telemetry event is emitted respectively. That event is captured here,
its metadata is extracted and sent for internal stats aggregation via
`Counters.Buffer` interface.
"""
alias Plausible.Ingestion.Counters
alias Plausible.Ingestion.Event
@event_dropped Event.telemetry_event_dropped()
@event_buffered Event.telemetry_event_buffered()
@telemetry_events [@event_dropped, @event_buffered]
@telemetry_handler &__MODULE__.handle_event/4
@spec install(Counters.Buffer.t()) :: :ok
def install(%Counters.Buffer{buffer_name: buffer_name} = buffer) do
:ok =
:telemetry.attach_many(
"ingest-counters-#{buffer_name}",
@telemetry_events,
@telemetry_handler,
buffer
)
end
@spec handle_event([atom()], any(), map(), Counters.Buffer.t()) :: :ok
def handle_event(
@event_dropped,
_measurements,
%{domain: domain, reason: reason, request_timestamp: timestamp},
buffer
) do
Counters.Buffer.aggregate(buffer, "dropped_#{reason}", domain, timestamp)
:ok
end
def handle_event(
@event_buffered,
_measurements,
%{domain: domain, request_timestamp: timestamp},
buffer
) do
Counters.Buffer.aggregate(buffer, "buffered", domain, timestamp)
:ok
end
end

View File

@ -71,6 +71,23 @@ defmodule Plausible.Ingestion.Event do
[:plausible, :ingest, :event, :dropped]
end
@spec emit_telemetry_buffered(t()) :: :ok
def emit_telemetry_buffered(event) do
:telemetry.execute(telemetry_event_buffered(), %{}, %{
domain: event.domain,
request_timestamp: event.request.timestamp
})
end
@spec emit_telemetry_dropped(t(), drop_reason()) :: :ok
def emit_telemetry_dropped(event, reason) do
:telemetry.execute(telemetry_event_dropped(), %{}, %{
domain: event.domain,
reason: reason,
request_timestamp: event.request.timestamp
})
end
defp pipeline() do
[
&put_user_agent/1,
@ -107,7 +124,7 @@ defmodule Plausible.Ingestion.Event do
|> Keyword.put(:dropped?, true)
|> Keyword.put(:drop_reason, reason)
emit_telemetry_dropped(reason)
emit_telemetry_dropped(event, reason)
struct!(event, fields)
end
@ -243,7 +260,7 @@ defmodule Plausible.Ingestion.Event do
defp write_to_buffer(%__MODULE__{clickhouse_event: clickhouse_event} = event) do
{:ok, _} = Plausible.Event.WriteBuffer.insert(clickhouse_event)
emit_telemetry_buffered()
emit_telemetry_buffered(event)
event
end
@ -374,12 +391,4 @@ defmodule Plausible.Ingestion.Event do
end
defp spam_referrer?(_), do: false
defp emit_telemetry_buffered() do
:telemetry.execute(telemetry_event_buffered(), %{}, %{})
end
defp emit_telemetry_dropped(reason) do
:telemetry.execute(telemetry_event_dropped(), %{}, %{reason: reason})
end
end

View File

@ -27,17 +27,17 @@ defmodule Plausible.Ingestion.Request do
@type t() :: %__MODULE__{}
@spec build(Plug.Conn.t()) :: {:ok, t()} | {:error, Changeset.t()}
@spec build(Plug.Conn.t(), NaiveDateTime.t()) :: {:ok, t()} | {:error, Changeset.t()}
@doc """
Builds and initially validates %Plausible.Ingestion.Request{} struct from %Plug.Conn{}.
"""
def build(%Plug.Conn{} = conn) do
def build(%Plug.Conn{} = conn, now \\ NaiveDateTime.utc_now()) do
changeset =
%__MODULE__{}
|> Changeset.change()
|> Changeset.put_change(
:timestamp,
NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)
NaiveDateTime.truncate(now, :second)
)
case parse_body(conn) do

View File

@ -180,6 +180,17 @@ defmodule Plausible.Site.Cache do
end
end
@spec get_site_id(String.t(), Keyword.t()) :: pos_integer() | nil
def get_site_id(domain, opts \\ []) do
case get(domain, opts) do
%{id: site_id} ->
site_id
nil ->
nil
end
end
@spec telemetry_event_refresh(atom(), atom()) :: list(atom())
def telemetry_event_refresh(cache_name \\ @cache_name, mode) when mode in @modes do
[:plausible, :cache, cache_name, :refresh, mode]

View File

@ -63,7 +63,8 @@ defmodule Plausible.MixProject do
{:bypass, "~> 2.1", only: [:dev, :test]},
{:cachex, "~> 3.4"},
{:clickhouse_ecto, git: "https://github.com/plausible/clickhouse_ecto.git"},
{:clickhousex, github: "plausible/clickhousex", branch: "master", override: true},
{:clickhousex,
github: "plausible/clickhousex", branch: "generate-session-id-on-connect", override: true},
{:combination, "~> 0.0.3"},
{:connection, "~> 1.1", override: true},
{:cors_plug, "~> 3.0"},
@ -81,7 +82,7 @@ defmodule Plausible.MixProject do
{:fun_with_flags, "~> 1.9.0"},
{:fun_with_flags_ui, "~> 0.8"},
{:locus, "~> 2.3"},
{:gen_cycle, "~> 1.0"},
{:gen_cycle, github: "aerosol/gen_cycle", branch: "add-terminate-callback"},
{:hackney, "~> 1.8"},
{:hammer, "~> 6.0"},
{:httpoison, "~> 1.4"},

View File

@ -12,7 +12,7 @@
"certifi": {:hex, :certifi, "2.9.0", "6f2a475689dd47f19fb74334859d460a2dc4e3252a3324bd2111b8f0429e7e21", [:rebar3], [], "hexpm", "266da46bdb06d6c6d35fde799bcb28d36d985d424ad7c08b5bb48f5b5cdd4641"},
"chatterbox": {:hex, :ts_chatterbox, "0.13.0", "6f059d97bcaa758b8ea6fffe2b3b81362bd06b639d3ea2bb088335511d691ebf", [:rebar3], [{:hpack, "~>0.2.3", [hex: :hpack_erl, repo: "hexpm", optional: false]}], "hexpm", "b93d19104d86af0b3f2566c4cba2a57d2e06d103728246ba1ac6c3c0ff010aa7"},
"clickhouse_ecto": {:git, "https://github.com/plausible/clickhouse_ecto.git", "43126c020f07b097c55a81f68a906019fd061f29", []},
"clickhousex": {:git, "https://github.com/plausible/clickhousex.git", "528087fae45cf7aea9a0630b3a4cd7e94543f011", [branch: "master"]},
"clickhousex": {:git, "https://github.com/plausible/clickhousex.git", "ecaab4de2fea0112a4dd6a42729aeaa75256681e", [branch: "generate-session-id-on-connect"]},
"combination": {:hex, :combination, "0.0.3", "746aedca63d833293ec6e835aa1f34974868829b1486b1e1cb0685f0b2ae1f41", [:mix], [], "hexpm", "72b099f463df42ef7dc6371d250c7070b57b6c5902853f69deb894f79eda18ca"},
"combine": {:hex, :combine, "0.10.0", "eff8224eeb56498a2af13011d142c5e7997a80c8f5b97c499f84c841032e429f", [:mix], [], "hexpm", "1b1dbc1790073076580d0d1d64e42eae2366583e7aecd455d1215b0d16f2451b"},
"comeonin": {:hex, :comeonin, "5.3.3", "2c564dac95a35650e9b6acfe6d2952083d8a08e4a89b93a481acb552b325892e", [:mix], [], "hexpm", "3e38c9c2cb080828116597ca8807bb482618a315bfafd98c90bc22a821cc84df"},
@ -47,7 +47,7 @@
"floki": {:hex, :floki, "0.32.1", "dfe3b8db3b793939c264e6f785bca01753d17318d144bd44b407fb3493acaa87", [:mix], [{:html_entities, "~> 0.5.0", [hex: :html_entities, repo: "hexpm", optional: false]}], "hexpm", "d4b91c713e4a784a3f7b1e3cc016eefc619f6b1c3898464222867cafd3c681a3"},
"fun_with_flags": {:hex, :fun_with_flags, "1.9.0", "0be8692727623af0c00e353f7bdcc9934dd6e1f87798ca6bfec9e739d42b63e6", [:mix], [{:ecto_sql, "~> 3.0", [hex: :ecto_sql, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.0", [hex: :phoenix_pubsub, repo: "hexpm", optional: true]}, {:redix, "~> 1.0", [hex: :redix, repo: "hexpm", optional: true]}], "hexpm", "8145dc009d549389f1b1915a715fb7e357fcd8fd7b91c74f04aae74912eef27a"},
"fun_with_flags_ui": {:hex, :fun_with_flags_ui, "0.8.0", "70587e344ba2035516a639e7bd8cb2ce8d54ee6c5f5dfd3e4e6f6a776e14ac1d", [:mix], [{:cowboy, ">= 2.0.0", [hex: :cowboy, repo: "hexpm", optional: true]}, {:fun_with_flags, "~> 1.8", [hex: :fun_with_flags, repo: "hexpm", optional: false]}, {:plug, "~> 1.12", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, ">= 2.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}], "hexpm", "95e1e5afae77e259a4a43f930f3da97ff3c388a72d4622f7848cb7ee34de6338"},
"gen_cycle": {:hex, :gen_cycle, "1.0.3", "ee0117b9c65814f58827abfe84967d4cb5380db0fbac992358e5b669b3ebcbcb", [:rebar3], [], "hexpm", "fffdadf7fb73de75e3429eb8a6982768e7240803561942602350b689991749e4"},
"gen_cycle": {:git, "https://github.com/aerosol/gen_cycle.git", "06aa70209332acd0a1197ea0f8bacaf5e6f210ac", [branch: "add-terminate-callback"]},
"gen_smtp": {:hex, :gen_smtp, "1.2.0", "9cfc75c72a8821588b9b9fe947ae5ab2aed95a052b81237e0928633a13276fd3", [:rebar3], [{:ranch, ">= 1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "5ee0375680bca8f20c4d85f58c2894441443a743355430ff33a783fe03296779"},
"gettext": {:hex, :gettext, "0.19.1", "564953fd21f29358e68b91634799d9d26989f8d039d7512622efb3c3b1c97892", [:mix], [], "hexpm", "10c656c0912b8299adba9b061c06947511e3f109ab0d18b44a866a4498e77222"},
"gproc": {:hex, :gproc, "0.8.0", "cea02c578589c61e5341fce149ea36ccef236cc2ecac8691fba408e7ea77ec2f", [:rebar3], [], "hexpm", "580adafa56463b75263ef5a5df4c86af321f68694e7786cb057fd805d1e2a7de"},

View File

@ -0,0 +1,79 @@
defmodule Plausible.Ingestion.Counters.BufferTest do
use Plausible.DataCase, async: true
alias Plausible.Ingestion.Counters.Buffer
test "10s buckets are created from input datetime" do
# time (...) :58 :59 :00 :01 :02 :03 :04 :05 :06 :07 :08 :09 :10 :11 :12 :13 (...)
# bucket 50 50 00 00 00 00 00 00 00 00 00 00 10 10 10 10
test_input = [
%{input: ~N[2023-02-14 01:00:00], bucket: ~U[2023-02-14 01:00:00Z]},
%{input: ~N[2023-02-14 01:00:02], bucket: ~U[2023-02-14 01:00:00Z]},
%{input: ~N[2023-02-14 01:00:05], bucket: ~U[2023-02-14 01:00:00Z]},
%{input: ~N[2023-02-14 01:00:09], bucket: ~U[2023-02-14 01:00:00Z]},
%{input: ~N[2023-02-14 01:00:09.123456], bucket: ~U[2023-02-14 01:00:00Z]},
%{input: ~N[2023-02-14 01:00:10], bucket: ~U[2023-02-14 01:00:10Z]},
%{input: ~N[2023-02-14 01:00:59], bucket: ~U[2023-02-14 01:00:50Z]},
%{input: ~N[2023-02-14 01:20:09], bucket: ~U[2023-02-14 01:20:00Z]}
]
for t <- test_input do
assert Buffer.bucket_10s(t.input) == DateTime.to_unix(t.bucket),
"#{t.input} must fall into #{t.bucket} but got #{Buffer.bucket_10s(t.input) |> DateTime.from_unix!()}"
end
end
test "aggregates metrics every 10 seconds", %{test: test} do
# time (...) :58 :59 :00 :01 :02 :03 :04 :05 :06 :07 :08 :09 :10 :11 :12 :13 (...)
# bucket 50 50 00 00 00 00 00 00 00 00 00 00 10 10 10 10
# metric x x x x
# value 1 1 2 3
timestamps = [
~N[2023-02-14 01:00:59],
~N[2023-02-14 01:01:01],
~N[2023-02-14 01:01:03],
~N[2023-02-14 01:01:09]
]
buffer = Buffer.new(test)
for ts <- timestamps do
Buffer.aggregate(buffer, "metric", "example.com", ts)
end
assert [
{bucket1, "metric", "example.com", 1},
{bucket2, "metric", "example.com", 3}
] = Buffer.flush(buffer)
assert bucket2 - bucket1 == 10
end
test "allows flushing only complete buckets", %{test: test} do
# time (...) :58 :59 :00 :01 :02 :03 :04 :05 :06 :07 :08 :09 :10 :11 :12 :13 (...)
# bucket 50 50 00 00 00 00 00 00 00 00 00 00 10 10 10 10
# metric x x x x
# aggregate 1 0 1 1 2 2 2 2 2 2 3 3 0
# flush attempt x x x x
# flushed count 0 1 0 3
timestamps = [
~N[2023-02-14 01:00:59],
~N[2023-02-14 01:01:01],
~N[2023-02-14 01:01:03],
~N[2023-02-14 01:01:09]
]
buffer = Buffer.new(test)
for ts <- timestamps do
Buffer.aggregate(buffer, "metric", "example.com", ts)
end
assert [] = Buffer.flush(buffer, ~U[2023-02-14 01:00:59.999999Z])
assert [{_, _, _, 1}] = Buffer.flush(buffer, ~U[2023-02-14 01:01:00.999999Z])
assert [] = Buffer.flush(buffer, ~U[2023-02-14 01:01:05.999999Z])
assert [{_, _, _, 3}] = Buffer.flush(buffer, ~U[2023-02-14 01:01:11.999999Z])
end
end

View File

@ -0,0 +1,54 @@
defmodule Plausible.Ingestion.Counters.TelemetryHandlerTest do
use Plausible.DataCase, async: true
alias Plausible.Ingestion.Counters.Buffer
alias Plausible.Ingestion.Counters.TelemetryHandler
alias Plausible.Ingestion.Event
test "install/1 attaches a telemetry handler", %{test: test} do
on_exit(:detach, fn ->
:telemetry.detach("ingest-counters-#{test}")
end)
buffer = Buffer.new(test)
assert :ok = TelemetryHandler.install(buffer)
all_handlers = :telemetry.list_handlers([:plausible, :ingest, :event])
assert Enum.find(all_handlers, fn handler ->
handler.config == buffer and
handler.event_name == Event.telemetry_event_dropped()
end)
assert Enum.find(all_handlers, fn handler ->
handler.config == buffer and
handler.event_name == Event.telemetry_event_buffered()
end)
end
test "handles ingest events by aggregating the counts", %{test: test} do
on_exit(:detach, fn ->
:telemetry.detach("ingest-counters-#{test}")
end)
buffer = Buffer.new(test)
assert :ok = TelemetryHandler.install(buffer)
e1 = %{domain: "a.example.com", request: %{timestamp: NaiveDateTime.utc_now()}}
e2 = %{domain: "b.example.com", request: %{timestamp: NaiveDateTime.utc_now()}}
e3 = %{domain: "c.example.com", request: %{timestamp: NaiveDateTime.utc_now()}}
:ok = Event.emit_telemetry_dropped(e1, :invalid)
:ok = Event.emit_telemetry_dropped(e2, :not_found)
:ok = Event.emit_telemetry_buffered(e3)
:ok = Event.emit_telemetry_dropped(e2, :not_found)
future = DateTime.utc_now() |> DateTime.add(120, :second)
assert aggregates = Buffer.flush(buffer, future)
assert Enum.find(aggregates, &match?({_, "dropped_invalid", "a.example.com", 1}, &1))
assert Enum.find(aggregates, &match?({_, "dropped_not_found", "b.example.com", 2}, &1))
assert Enum.find(aggregates, &match?({_, "buffered", "c.example.com", 1}, &1))
end
end

View File

@ -0,0 +1,160 @@
defmodule Plausible.Ingestion.CountersTest do
use Plausible.DataCase, async: false
import Ecto.Query
alias Plausible.Ingestion.Counters
alias Plausible.Ingestion.Counters.Record
alias Plausible.Ingestion.Event
alias Plausible.Ingestion.Request
import Phoenix.ConnTest
describe "integration" do
test "periodically flushes buffer aggregates to the database", %{test: test} do
on_exit(:detach, fn ->
:telemetry.detach("ingest-counters-#{test}")
end)
start_counters(
buffer_name: test,
interval: 100,
aggregate_bucket_fn: fn _now ->
System.os_time(:second)
end,
flush_boundary_fn: fn _now ->
System.os_time(:second) + 1000
end
)
{:ok, dropped} = emit_dropped_request()
{:ok, _dropped} = emit_dropped_request(domain: dropped.domain)
{:ok, buffered} = emit_buffered_request()
verify_record_written(dropped.domain, "dropped_not_found", 2)
site_id = Plausible.Sites.get_by_domain(buffered.domain).id
verify_record_written(buffered.domain, "buffered", 1, site_id)
end
test "the database eventually sums the records within 1-minute buckets", %{test: test} do
# Testing if the database works is an unfunny way of integration testing,
# but on the upside it's quite straight-forward way of testing if the
# 1-minute bucket rollups are applied when dumping the records that are
# originally aggregated with 10s windows.
on_exit(:detach, fn ->
:telemetry.detach("ingest-counters-#{test}")
end)
start_counters(
buffer_name: test,
interval: 100,
aggregate_bucket_fn: fn _now ->
System.os_time(:second)
end,
flush_boundary_fn: fn _now ->
System.os_time(:second) + 1000
end
)
event1_at = ~N[2023-02-14 01:00:03]
event2_at = ~N[2023-02-14 01:00:18]
event3_at = ~N[2023-02-14 01:00:55]
{:ok, event1} = emit_dropped_request(at: event1_at)
{:ok, _} = emit_dropped_request(domain: event1.domain, at: event2_at)
{:ok, _} = emit_dropped_request(domain: event1.domain, at: event3_at)
verify_record_written(event1.domain, "dropped_not_found", 3)
end
test "dumps the buffer on shutdown", %{test: test} do
on_exit(:detach, fn ->
:telemetry.detach("ingest-counters-#{test}")
end)
# normal operation, 10s cycle/10s bucket
{:ok, pid} = start_counters(buffer_name: test)
event1_at = ~N[2023-02-14 01:00:03]
event2_at = NaiveDateTime.utc_now() |> NaiveDateTime.add(10, :second)
{:ok, event1} = emit_dropped_request(at: event1_at)
{:ok, event2} = emit_dropped_request(at: event2_at)
assert Process.alive?(pid)
:ok = Counters.stop(pid)
assert :down ==
eventually(fn ->
{Process.alive?(pid) == false, :down}
end)
verify_record_written(event1.domain, "dropped_not_found", 1)
verify_record_written(event2.domain, "dropped_not_found", 1)
end
end
defp emit_dropped_request(opts \\ []) do
domain = Keyword.get(opts, :domain, random_domain())
at = Keyword.get(opts, :at, NaiveDateTime.utc_now())
site = build(:site, domain: domain)
payload = %{
name: "pageview",
url: "http://#{site.domain}"
}
conn = build_conn(:post, "/api/event", payload)
assert {:ok, request} = Request.build(conn, at)
assert {:ok, %{dropped: [dropped]}} = Event.build_and_buffer(request)
{:ok, dropped}
end
defp emit_buffered_request(opts \\ []) do
domain = Keyword.get(opts, :domain, random_domain())
at = Keyword.get(opts, :at, NaiveDateTime.utc_now())
site = insert(:site, domain: domain)
payload = %{
name: "pageview",
url: "http://#{site.domain}"
}
conn = build_conn(:post, "/api/event", payload)
assert {:ok, request} = Request.build(conn, at)
assert {:ok, %{buffered: [buffered]}} = Event.build_and_buffer(request)
{:ok, buffered}
end
defp start_counters(opts) do
opts = Keyword.put(opts, :force_start?, true)
%{start: {m, f, a}} = Counters.child_spec(opts)
{:ok, _pid} = apply(m, f, a)
end
defp verify_record_written(domain, metric, value, site_id \\ nil) do
query =
from(r in Record,
where:
r.domain == ^domain and
r.metric == ^metric and
r.value == ^value
)
query =
if site_id do
query |> where([r], r.site_id == ^site_id)
else
query |> where([r], is_nil(r.site_id))
end
assert await_clickhouse_count(query, 1)
end
defp random_domain() do
(:crypto.strong_rand_bytes(16) |> Base.encode16()) <> ".example.com"
end
end

View File

@ -121,6 +121,20 @@ defmodule Plausible.Site.CacheTest do
Cache.refresh_all(force?: true, cache_name: test)
assert_receive {:telemetry_handled, %{}}
end
test "get_site_id/2", %{test: test} do
{:ok, _} = start_test_cache(test)
site = insert(:site)
domain1 = site.domain
domain2 = "nonexisting.example.com"
:ok = Cache.refresh_all(cache_name: test)
assert site.id == Cache.get_site_id(domain1, force?: true, cache_name: test)
assert is_nil(Cache.get_site_id(domain2, force?: true, cache_name: test))
end
end
describe "warming the cache" do