Add metrics to ingestion pipeline (#3927)

* Add metrics to ingestion pipeline

* Format

* Format

* Update buckets

* Credo
This commit is contained in:
hq1 2024-03-26 09:42:48 +01:00 committed by GitHub
parent 604bf88451
commit 7523abe93e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 63 additions and 38 deletions

View File

@ -176,26 +176,16 @@ defmodule Plausible.Cache do
defp refresh(mode, query, opts) when mode in @modes do
cache_name = Keyword.get(opts, :cache_name, name())
measure_duration(telemetry_event_refresh(cache_name, mode), fn ->
items = Plausible.Repo.all(query)
:ok = merge_items(items, opts)
end)
Plausible.PromEx.Plugins.PlausibleMetrics.measure_duration(
telemetry_event_refresh(cache_name, mode),
fn ->
items = Plausible.Repo.all(query)
:ok = merge_items(items, opts)
end
)
:ok
end
defp measure_duration(event, fun) when is_function(fun, 0) do
{duration, result} = time_it(fun)
:telemetry.execute(event, %{duration: duration}, %{})
result
end
defp time_it(fun) do
start = System.monotonic_time()
result = fun.()
stop = System.monotonic_time()
{stop - start, result}
end
end
end
end

View File

@ -100,6 +100,10 @@ defmodule Plausible.Ingestion.Event do
[:plausible, :ingest, :event, :dropped]
end
def telemetry_pipeline_step_duration() do
[:plausible, :ingest, :pipeline, :step]
end
@spec emit_telemetry_buffered(t()) :: :ok
def emit_telemetry_buffered(event) do
:telemetry.execute(telemetry_event_buffered(), %{}, %{
@ -119,34 +123,42 @@ defmodule Plausible.Ingestion.Event do
defp pipeline() do
[
&drop_datacenter_ip/1,
&drop_shield_rule_ip/1,
&drop_shield_rule_page/1,
&put_geolocation/1,
&drop_shield_rule_country/1,
&put_user_agent/1,
&put_basic_info/1,
&put_referrer/1,
&put_utm_tags/1,
&put_props/1,
&put_revenue/1,
&put_salts/1,
&put_user_id/1,
&validate_clickhouse_event/1,
&register_session/1,
&write_to_buffer/1
drop_datacenter_ip: &drop_datacenter_ip/1,
drop_shield_rule_page: &drop_shield_rule_page/1,
drop_shield_rule_ip: &drop_shield_rule_ip/1,
put_geolocation: &put_geolocation/1,
drop_shield_rule_country: &drop_shield_rule_country/1,
put_user_agent: &put_user_agent/1,
put_basic_info: &put_basic_info/1,
put_referrer: &put_referrer/1,
put_utm_tags: &put_utm_tags/1,
put_props: &put_props/1,
put_revenue: &put_revenue/1,
put_salts: &put_salts/1,
put_user_id: &put_user_id/1,
validate_clickhouse_event: &validate_clickhouse_event/1,
register_session: &register_session/1,
write_to_buffer: &write_to_buffer/1
]
end
defp process_unless_dropped(%__MODULE__{} = initial_event, pipeline) do
Enum.reduce_while(pipeline, initial_event, fn pipeline_step, acc_event ->
case pipeline_step.(acc_event) do
%__MODULE__{dropped?: true} = dropped -> {:halt, dropped}
%__MODULE__{dropped?: false} = event -> {:cont, event}
end
Enum.reduce_while(pipeline, initial_event, fn {step_name, step_fn}, acc_event ->
Plausible.PromEx.Plugins.PlausibleMetrics.measure_duration(
telemetry_pipeline_step_duration(),
fn -> execute_step(step_fn, acc_event) end,
%{step: "#{step_name}"}
)
end)
end
defp execute_step(step_fn, acc_event) do
case step_fn.(acc_event) do
%__MODULE__{dropped?: true} = dropped -> {:halt, dropped}
%__MODULE__{dropped?: false} = event -> {:cont, event}
end
end
defp new(domain, request) do
struct!(__MODULE__, domain: domain, request: request)
end

View File

@ -47,6 +47,16 @@ defmodule Plausible.PromEx.Plugins.PlausibleMetrics do
unit: {:native, :millisecond},
measurement: :duration
),
distribution(
metric_prefix ++ [:ingest, :events, :pipeline, :steps],
event_name: Ingestion.Event.telemetry_pipeline_step_duration(),
reporter_options: [
buckets: [10, 50, 100, 250, 350, 500, 1000, 5000, 10_000, 100_000, 500_000]
],
unit: {:native, :microsecond},
measurement: :duration,
tags: [:step]
),
counter(
metric_prefix ++ [:ingest, :events, :buffered, :total],
event_name: Ingestion.Event.telemetry_event_buffered()
@ -110,6 +120,19 @@ defmodule Plausible.PromEx.Plugins.PlausibleMetrics do
})
end
def measure_duration(event, fun, meta \\ %{}) when is_function(fun, 0) do
{duration, result} = time_it(fun)
:telemetry.execute(event, %{duration: duration}, meta)
result
end
defp time_it(fun) do
start = System.monotonic_time()
result = fun.()
stop = System.monotonic_time()
{stop - start, result}
end
defp write_buffer_metrics(metric_prefix, poll_rate) do
Polling.build(
:write_buffer_metrics,