From 994e7d09de3f2cd03877a165a88e89d5b4af42eb Mon Sep 17 00:00:00 2001 From: Vini Brasil Date: Wed, 23 Nov 2022 10:05:44 -0300 Subject: [PATCH] Parse event URL and domain in Plausible.Ingestion.Request (#2351) * Parse event URL in Plausible.Ingestion.Request * Parse event domain in Plausible.Ingestion.Request * Rework ingestion pipeline processing (#2462) * Rework ingestion pipeline processing So that Request can have multiple domains and based on that each event is processed uniformly. The build_and_buffer/1 function now returns an accumulator with all the dropped/buffered events for further inspection. * Reduce function complexity * Don't chain struct fields to check for an empty host * Separate referrer and utm tags * Fix up `with` clause, credo was right cc @vinibrsl Co-authored-by: Adam Rutkowski --- lib/plausible/ingestion/event.ex | 451 +++++++++++------- lib/plausible/ingestion/request.ex | 66 ++- .../controllers/api/external_controller.ex | 63 ++- test/plausible/ingestion/event_test.exs | 134 ++++-- 4 files changed, 449 insertions(+), 265 deletions(-) diff --git a/lib/plausible/ingestion/event.ex b/lib/plausible/ingestion/event.ex index 2a7062335..867cb331d 100644 --- a/lib/plausible/ingestion/event.ex +++ b/lib/plausible/ingestion/event.ex @@ -1,39 +1,266 @@ defmodule Plausible.Ingestion.Event do - alias Plausible.Ingestion.{Request, CityOverrides} - - @spec build_and_buffer(Request.t()) :: :ok | :skip | {:error, Ecto.Changeset.t()} - @doc """ - Builds events from %Plausible.Ingestion.Request{} and adds them to Plausible.Event.WriteBuffer. - This function reads geolocation data and parses the user agent string. Returns :skip if the - request is identified as spam, or blocked. + @moduledoc """ + This module exposes the `build_and_buffer/1` function capable of + turning %Plausible.Ingestion.Request{} into a series of events that in turn + are uniformly either buffered in batches (to Clickhouse) or dropped + (e.g. due to spam blocklist) from the processing pipeline. """ - def build_and_buffer(%Request{} = request) do - with :ok <- spam_or_blocked?(request), - salts <- Plausible.Session.Salts.fetch(), - event <- Map.new(), - %{} = event <- put_user_agent(event, request), - %{} = event <- put_basic_info(event, request), - %{} = event <- put_referrer(event, request), - %{} = event <- put_geolocation(event, request), - %{} = event <- put_screen_size(event, request), - %{} = event <- put_props(event, request), - events when is_list(events) <- map_domains(event, request), - events when is_list(events) <- put_user_id(events, request, salts), - {:ok, events} <- validate_events(events), - events when is_list(events) <- register_session(events, request, salts) do - Enum.each(events, &Plausible.Event.WriteBuffer.insert/1) + alias Plausible.Ingestion.{Request, CityOverrides} + alias Plausible.ClickhouseEvent + + defstruct domain: nil, + clickhouse_event_attrs: %{}, + clickhouse_event: nil, + dropped?: false, + drop_reason: nil, + request: nil, + salts: nil + + @type drop_reason() :: + :bot + | :domain_blocked + | :spam_referrer + | {:error, Ecto.Changeset.t()} + + @type t() :: %__MODULE__{ + domain: String.t() | nil, + clickhouse_event_attrs: map(), + clickhouse_event: %ClickhouseEvent{} | nil, + dropped?: boolean(), + drop_reason: drop_reason(), + request: Request.t(), + salts: map() + } + + @spec build_and_buffer(Request.t()) :: + {:ok, %{dropped: [t()], buffered: [t()]}} + def build_and_buffer(%Request{domains: domains} = request) do + processed_events = + if spam_referrer?(request) do + for domain <- domains, do: drop(new(domain, request), :spam_referrer) + else + Enum.reduce(domains, [], fn domain, acc -> + if domain_blocked?(domain) do + [drop(new(domain, request), :domain_blocked) | acc] + else + processed = + domain + |> new(request) + |> process_unless_dropped(pipeline()) + + [processed | acc] + end + end) + end + + {dropped, buffered} = Enum.split_with(processed_events, & &1.dropped?) + {:ok, %{dropped: dropped, buffered: buffered}} + end + + defp pipeline() do + [ + &put_user_agent/1, + &put_basic_info/1, + &put_referrer/1, + &put_utm_tags/1, + &put_geolocation/1, + &put_screen_size/1, + &put_props/1, + &put_salts/1, + &put_user_id/1, + &validate_clickhouse_event/1, + ®ister_session/1, + &write_to_buffer/1 + ] + end + + defp process_unless_dropped(%__MODULE__{} = initial_event, pipeline) do + Enum.reduce_while(pipeline, initial_event, fn pipeline_step, acc_event -> + case pipeline_step.(acc_event) do + %__MODULE__{dropped?: true} = dropped -> {:halt, dropped} + %__MODULE__{dropped?: false} = event -> {:cont, event} + end + end) + end + + defp new(domain, request) do + %__MODULE__{domain: domain, request: request} + end + + defp drop(%__MODULE__{} = event, reason) do + %{event | dropped?: true, drop_reason: reason} + end + + defp update_attrs(%__MODULE__{} = event, %{} = attrs) do + %{event | clickhouse_event_attrs: Map.merge(event.clickhouse_event_attrs, attrs)} + end + + defp put_user_agent(%__MODULE__{} = event) do + case parse_user_agent(event.request) do + %UAInspector.Result{client: %UAInspector.Result.Client{name: "Headless Chrome"}} -> + drop(event, :bot) + + %UAInspector.Result.Bot{} -> + drop(event, :bot) + + %UAInspector.Result{} = user_agent -> + update_attrs(event, %{ + operating_system: os_name(user_agent), + operating_system_version: os_version(user_agent), + browser: browser_name(user_agent), + browser_version: browser_version(user_agent) + }) + + _any -> + event end end - defp put_basic_info(%{} = event, %Request{} = request) do - uri = request.url && URI.parse(request.url) - host = if uri && uri.host == "", do: "(none)", else: uri && uri.host + defp put_basic_info(%__MODULE__{} = event) do + host = + case event.request.uri do + %{host: ""} -> "(none)" + %{host: host} when is_binary(host) -> host + _ -> nil + end + update_attrs(event, %{ + domain: event.domain, + timestamp: NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second), + name: event.request.event_name, + hostname: Request.sanitize_hostname(host), + pathname: get_pathname(event.request.uri, event.request.hash_mode) + }) + end + + defp put_referrer(%__MODULE__{} = event) do + ref = parse_referrer(event.request.uri, event.request.referrer) + + update_attrs(event, %{ + referrer_source: get_referrer_source(event.request, ref), + referrer: clean_referrer(ref) + }) + end + + defp put_utm_tags(%__MODULE__{} = event) do + query_params = event.request.query_params + + update_attrs(event, %{ + utm_medium: query_params["utm_medium"], + utm_source: query_params["utm_source"], + utm_campaign: query_params["utm_campaign"], + utm_content: query_params["utm_content"], + utm_term: query_params["utm_term"] + }) + end + + defp put_geolocation(%__MODULE__{} = event) do + result = Geolix.lookup(event.request.remote_ip, where: :geolocation) + + country_code = + get_in(result, [:country, :iso_code]) + |> ignore_unknown_country() + + city_geoname_id = get_in(result, [:city, :geoname_id]) + city_geoname_id = Map.get(CityOverrides.get(), city_geoname_id, city_geoname_id) + + subdivision1_code = + case result do + %{subdivisions: [%{iso_code: iso_code} | _rest]} -> + country_code <> "-" <> iso_code + + _ -> + "" + end + + subdivision2_code = + case result do + %{subdivisions: [_first, %{iso_code: iso_code} | _rest]} -> + country_code <> "-" <> iso_code + + _ -> + "" + end + + update_attrs(event, %{ + country_code: country_code, + subdivision1_code: subdivision1_code, + subdivision2_code: subdivision2_code, + city_geoname_id: city_geoname_id + }) + end + + defp put_screen_size(%__MODULE__{} = event) do + screen_size = + case event.request.screen_width do + nil -> nil + width when width < 576 -> "Mobile" + width when width < 992 -> "Tablet" + width when width < 1440 -> "Laptop" + width when width >= 1440 -> "Desktop" + end + + update_attrs(event, %{screen_size: screen_size}) + end + + defp put_props(%__MODULE__{request: %{props: %{} = props}} = event) do + update_attrs(event, %{ + "meta.key": Map.keys(props), + "meta.value": Enum.map(props, fn {_, v} -> to_string(v) end) + }) + end + + defp put_props(%__MODULE__{} = event), do: event + + defp put_salts(%__MODULE__{} = event) do + %{event | salts: Plausible.Session.Salts.fetch()} + end + + defp put_user_id(%__MODULE__{} = event) do + update_attrs(event, %{ + user_id: + generate_user_id( + event.request, + event.domain, + event.clickhouse_event_attrs.hostname, + event.salts.current + ) + }) + end + + defp validate_clickhouse_event(%__MODULE__{} = event) do + clickhouse_event = + event + |> Map.fetch!(:clickhouse_event_attrs) + |> ClickhouseEvent.new() + + case Ecto.Changeset.apply_action(clickhouse_event, nil) do + {:ok, valid_clickhouse_event} -> + %{event | clickhouse_event: valid_clickhouse_event} + + {:error, changeset} -> + drop(event, {:error, changeset}) + end + end + + defp register_session(%__MODULE__{} = event) do + previous_user_id = + generate_user_id( + event.request, + event.domain, + event.clickhouse_event.hostname, + event.salts.previous + ) + + session_id = Plausible.Session.CacheStore.on_event(event.clickhouse_event, previous_user_id) + + clickhouse_event = Map.put(event.clickhouse_event, :session_id, session_id) + %{event | clickhouse_event: clickhouse_event} + end + + defp write_to_buffer(%__MODULE__{clickhouse_event: clickhouse_event} = event) do + {:ok, _} = Plausible.Event.WriteBuffer.insert(clickhouse_event) event - |> Map.put(:timestamp, NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)) - |> Map.put(:name, request.event_name) - |> Map.put(:hostname, strip_www(host)) - |> Map.put(:pathname, get_pathname(uri, request.hash_mode)) end defp get_pathname(_uri = nil, _hash_mode), do: "/" @@ -51,36 +278,13 @@ defmodule Plausible.Ingestion.Event do end end - defp put_props(%{} = event, %Request{} = request) do - if is_map(request.props) do - event - |> Map.put(:"meta.key", Map.keys(request.props)) - |> Map.put(:"meta.value", Map.values(request.props) |> Enum.map(&to_string/1)) - else - event - end - end - - defp put_referrer(%{} = event, %Request{} = request) do - uri = request.url && URI.parse(request.url) - ref = parse_referrer(uri, request.referrer) - - event - |> Map.put(:utm_medium, request.query_params["utm_medium"]) - |> Map.put(:utm_source, request.query_params["utm_source"]) - |> Map.put(:utm_campaign, request.query_params["utm_campaign"]) - |> Map.put(:utm_content, request.query_params["utm_content"]) - |> Map.put(:utm_term, request.query_params["utm_term"]) - |> Map.put(:referrer_source, get_referrer_source(request, ref)) - |> Map.put(:referrer, clean_referrer(ref)) - end - defp parse_referrer(_uri, _referrer_str = nil), do: nil defp parse_referrer(uri, referrer_str) do referrer_uri = URI.parse(referrer_str) - if strip_www(referrer_uri.host) !== strip_www(uri.host) && referrer_uri.host !== "localhost" do + if Request.sanitize_hostname(referrer_uri.host) !== Request.sanitize_hostname(uri.host) && + referrer_uri.host !== "localhost" do RefInspector.parse(referrer_str) end end @@ -106,26 +310,6 @@ defmodule Plausible.Ingestion.Event do end end - defp put_user_agent(%{} = event, %Request{} = request) do - case parse_user_agent(request) do - %UAInspector.Result{client: %UAInspector.Result.Client{name: "Headless Chrome"}} -> - :skip - - %UAInspector.Result.Bot{} -> - :skip - - %UAInspector.Result{} = user_agent -> - event - |> Map.put(:operating_system, os_name(user_agent)) - |> Map.put(:operating_system_version, os_version(user_agent)) - |> Map.put(:browser, browser_name(user_agent)) - |> Map.put(:browser_version, browser_version(user_agent)) - - _any -> - event - end - end - defp parse_user_agent(%Request{user_agent: user_agent}) when is_binary(user_agent) do case Cachex.fetch(:user_agents, user_agent, &UAInspector.parse/1) do {:ok, user_agent} -> user_agent @@ -189,88 +373,9 @@ defmodule Plausible.Ingestion.Event do end end - defp put_screen_size(%{} = event, %Request{} = request) do - screen_width = - case request.screen_width do - nil -> nil - width when width < 576 -> "Mobile" - width when width < 992 -> "Tablet" - width when width < 1440 -> "Laptop" - width when width >= 1440 -> "Desktop" - end - - Map.put(event, :screen_size, screen_width) - end - - defp put_geolocation(%{} = event, %Request{} = request) do - result = Geolix.lookup(request.remote_ip, where: :geolocation) - - country_code = - get_in(result, [:country, :iso_code]) - |> ignore_unknown_country() - - city_geoname_id = get_in(result, [:city, :geoname_id]) - city_geoname_id = Map.get(CityOverrides.get(), city_geoname_id, city_geoname_id) - - subdivision1_code = - case result do - %{subdivisions: [%{iso_code: iso_code} | _rest]} -> - country_code <> "-" <> iso_code - - _ -> - "" - end - - subdivision2_code = - case result do - %{subdivisions: [_first, %{iso_code: iso_code} | _rest]} -> - country_code <> "-" <> iso_code - - _ -> - "" - end - - event - |> Map.put(:country_code, country_code) - |> Map.put(:subdivision1_code, subdivision1_code) - |> Map.put(:subdivision2_code, subdivision2_code) - |> Map.put(:city_geoname_id, city_geoname_id) - end - defp ignore_unknown_country("ZZ"), do: nil defp ignore_unknown_country(country), do: country - defp map_domains(%{} = event, %Request{} = request) do - domains = - if request.domain do - String.split(request.domain, ",") - |> Enum.map(&String.trim/1) - |> Enum.map(&strip_www/1) - else - uri = request.url && URI.parse(request.url) - [strip_www(uri && uri.host)] - end - - for domain <- domains, do: Map.put(event, :domain, domain) - end - - defp put_user_id(events, %Request{} = request, salts) do - for %{} = event <- events do - user_id = generate_user_id(request, event.domain, event.hostname, salts.current) - Map.put(event, :user_id, user_id) - end - end - - defp register_session(events, %Request{} = request, salts) do - for %Plausible.ClickhouseEvent{} = event <- events do - previous_user_id = generate_user_id(request, event.domain, event.hostname, salts.previous) - - session_id = Plausible.Session.CacheStore.on_event(event, previous_user_id) - - Map.put(event, :session_id, session_id) - end - end - defp generate_user_id(request, domain, hostname, salt) do cond do is_nil(salt) -> @@ -296,40 +401,16 @@ defmodule Plausible.Ingestion.Event do end end - defp spam_or_blocked?(%Request{} = request) do - cond do - request.domain in Application.get_env(:plausible, :domain_blacklist) -> - :skip - - FunWithFlags.enabled?(:block_event_ingest, for: request.domain) -> - :skip - - request.referrer && - URI.parse(request.referrer).host |> strip_www() |> ReferrerBlocklist.is_spammer?() -> - :skip - - true -> - :ok - end + defp spam_referrer?(%Request{referrer: referrer}) when is_binary(referrer) do + URI.parse(referrer).host + |> Request.sanitize_hostname() + |> ReferrerBlocklist.is_spammer?() end - defp validate_events(events) do - Enum.reduce_while(events, {:ok, []}, fn %{} = attrs, {:ok, acc} -> - attrs - |> Plausible.ClickhouseEvent.new() - |> Ecto.Changeset.apply_action(nil) - |> case do - {:ok, event} -> {:cont, {:ok, [event | acc]}} - {:error, changeset} -> {:halt, {:error, changeset}} - end - end) - end + defp spam_referrer?(_), do: false - defp strip_www(hostname) do - if hostname do - String.replace_prefix(hostname, "www.", "") - else - nil - end + defp domain_blocked?(domain) do + domain in Application.get_env(:plausible, :domain_blacklist) or + FunWithFlags.enabled?(:block_event_ingest, for: domain) end end diff --git a/lib/plausible/ingestion/request.ex b/lib/plausible/ingestion/request.ex index 02e69bccc..39838bf94 100644 --- a/lib/plausible/ingestion/request.ex +++ b/lib/plausible/ingestion/request.ex @@ -7,9 +7,9 @@ defmodule Plausible.Ingestion.Request do :remote_ip, :user_agent, :event_name, - :url, + :uri, :referrer, - :domain, + :domains, :screen_width, :hash_mode, props: %{}, @@ -20,9 +20,9 @@ defmodule Plausible.Ingestion.Request do remote_ip: String.t() | nil, user_agent: String.t() | nil, event_name: term(), - url: term(), + uri: URI.t() | nil, referrer: term(), - domain: term(), + domains: list(String.t()), screen_width: term(), hash_mode: term(), props: map(), @@ -31,15 +31,17 @@ defmodule Plausible.Ingestion.Request do @spec build(Plug.Conn.t()) :: {:ok, t()} | {:error, :invalid_json} @doc """ - Builds a %Plausible.Ingestion.Request{} struct from %Plug.Conn{}. + Builds a list of %Plausible.Ingestion.Request{} struct from %Plug.Conn{}. """ def build(%Plug.Conn{} = conn) do with {:ok, request_body} <- parse_body(conn) do %__MODULE__{} |> Map.put(:remote_ip, PlausibleWeb.RemoteIp.get(conn)) + |> put_uri(request_body) |> put_user_agent(conn) |> put_request_params(request_body) |> put_query_params() + |> map_domains(request_body) |> then(&{:ok, &1}) end end @@ -63,15 +65,34 @@ defmodule Plausible.Ingestion.Request do %__MODULE__{ request | event_name: request_body["n"] || request_body["name"], - url: request_body["u"] || request_body["url"], referrer: request_body["r"] || request_body["referrer"], - domain: request_body["d"] || request_body["domain"], screen_width: request_body["w"] || request_body["screen_width"], hash_mode: request_body["h"] || request_body["hashMode"], props: parse_props(request_body) } end + defp map_domains(%__MODULE__{} = request, %{} = request_body) do + domains = + if raw = request_body["d"] || request_body["domain"] do + raw + |> String.split(",") + |> Enum.map(&sanitize_hostname/1) + else + [sanitize_hostname(request.uri)] + end + + %__MODULE__{request | domains: domains} + end + + defp put_uri(%__MODULE__{} = request, %{} = request_body) do + if url = request_body["u"] || request_body["url"] do + %__MODULE__{request | uri: URI.parse(url)} + else + request + end + end + defp parse_props(%{} = request_body) do raw_props = request_body["m"] || request_body["meta"] || request_body["p"] || request_body["props"] @@ -111,13 +132,13 @@ defmodule Plausible.Ingestion.Request do end end - defp put_query_params(%__MODULE__{url: url} = request) do - with url when is_binary(url) <- url, - %URI{query: query} when is_binary(query) <- URI.parse(url), - %{} = query_params <- URI.decode_query(query) do - Map.put(request, :query_params, query_params) - else - _any -> request + defp put_query_params(%__MODULE__{} = request) do + case request do + %__MODULE__{uri: %URI{query: query}} when is_binary(query) -> + %__MODULE__{request | query_params: URI.decode_query(query)} + + _any -> + request end end @@ -129,4 +150,21 @@ defmodule Plausible.Ingestion.Request do %__MODULE__{request | user_agent: user_agent} end + + @doc """ + Removes the "www" part of a hostname. + """ + def sanitize_hostname(%URI{host: hostname}) do + sanitize_hostname(hostname) + end + + def sanitize_hostname(hostname) when is_binary(hostname) do + hostname + |> String.trim() + |> String.replace_prefix("www.", "") + end + + def sanitize_hostname(nil) do + nil + end end diff --git a/lib/plausible_web/controllers/api/external_controller.ex b/lib/plausible_web/controllers/api/external_controller.ex index d4e1f3bf9..82d1281e6 100644 --- a/lib/plausible_web/controllers/api/external_controller.ex +++ b/lib/plausible_web/controllers/api/external_controller.ex @@ -8,29 +8,37 @@ defmodule PlausibleWeb.Api.ExternalController do use PlausibleWeb, :controller require Logger - def event(conn, _params) do - with {:ok, ingestion_request} <- Plausible.Ingestion.Request.build(conn), - _ <- Sentry.Context.set_extra_context(%{request: ingestion_request}), - :ok <- Plausible.Ingestion.Event.build_and_buffer(ingestion_request) do - conn |> put_status(202) |> text("ok") - else - :skip -> - conn |> put_status(202) |> text("ok") + alias Plausible.Ingestion + def event(conn, _params) do + with {:ok, request} <- Ingestion.Request.build(conn), + _ <- Sentry.Context.set_extra_context(%{request: request}) do + case Ingestion.Event.build_and_buffer(request) do + {:ok, %{dropped: [], buffered: _buffered}} -> + conn + |> put_status(202) + |> text("ok") + + {:ok, %{dropped: dropped, buffered: _}} -> + first_invalid_changeset = find_first_invalid_changeset(dropped) + + if first_invalid_changeset do + conn + |> put_resp_header("x-plausible-dropped", "#{Enum.count(dropped)}") + |> put_status(400) + |> json(%{errors: traverse_errors(first_invalid_changeset)}) + else + conn + |> put_resp_header("x-plausible-dropped", "#{Enum.count(dropped)}") + |> put_status(202) + |> text("ok") + end + end + else {:error, :invalid_json} -> conn |> put_status(400) |> json(%{errors: %{request: "Unable to parse request body as json"}}) - - {:error, %Ecto.Changeset{} = changeset} -> - errors = - Ecto.Changeset.traverse_errors(changeset, fn {msg, opts} -> - Regex.replace(~r"%{(\w+)}", msg, fn _, key -> - opts |> Keyword.get(String.to_existing_atom(key), key) |> to_string() - end) - end) - - conn |> put_status(400) |> json(%{errors: errors}) end end @@ -86,4 +94,23 @@ defmodule PlausibleWeb.Api.ExternalController do build: build }) end + + defp find_first_invalid_changeset(dropped) do + Enum.find_value(dropped, nil, fn dropped_event -> + case dropped_event.drop_reason do + {:error, %Ecto.Changeset{} = changeset} -> changeset + _ -> false + end + end) + end + + defp traverse_errors(changeset) do + Ecto.Changeset.traverse_errors(changeset, fn {msg, opts} -> + Regex.replace(~r"%{(\w+)}", msg, fn _, key -> + opts + |> Keyword.get(String.to_existing_atom(key), key) + |> to_string() + end) + end) + end end diff --git a/test/plausible/ingestion/event_test.exs b/test/plausible/ingestion/event_test.exs index e306bba2a..5679efadc 100644 --- a/test/plausible/ingestion/event_test.exs +++ b/test/plausible/ingestion/event_test.exs @@ -1,25 +1,12 @@ defmodule Plausible.Ingestion.EventTest do use Plausible.DataCase - def get_event(domain) do - Plausible.TestUtils.eventually(fn -> - Plausible.Event.WriteBuffer.flush() - - event = - Plausible.ClickhouseRepo.one( - from e in Plausible.ClickhouseEvent, where: e.domain == ^domain - ) - - {!is_nil(event), event} - end) - end - @valid_request %Plausible.Ingestion.Request{ remote_ip: "2.2.2.2", user_agent: "Mozilla/5.0 (iPad; U; CPU OS 3_2_1 like Mac OS X; en-us) AppleWebKit/531.21.10 (KHTML, like Gecko) Mobile/7B405", event_name: "pageview", - url: "http://skywalker.test", + uri: URI.parse("http://skywalker.test"), referrer: "http://m.facebook.test/", screen_width: 1440, hash_mode: nil, @@ -34,41 +21,92 @@ defmodule Plausible.Ingestion.EventTest do } } - test "build_and_buffer/3 creates an event" do - assert :ok == - @valid_request - |> Map.put(:domain, "plausible-ingestion-event-basic.test") - |> Plausible.Ingestion.Event.build_and_buffer() + describe "integration" do + test "build_and_buffer/1 creates an event" do + assert {:ok, %{buffered: [_], dropped: []}} = + @valid_request + |> Map.put(:domains, ["plausible-ingestion-event-basic.test"]) + |> Plausible.Ingestion.Event.build_and_buffer() - assert %Plausible.ClickhouseEvent{ - session_id: session_id, - user_id: user_id, - domain: "plausible-ingestion-event-basic.test", - browser: "Safari", - browser_version: "", - city_geoname_id: 2_988_507, - country_code: "FR", - hostname: "skywalker.test", - "meta.key": [], - "meta.value": [], - name: "pageview", - operating_system: "iOS", - operating_system_version: "3.2", - pathname: "/", - referrer: "m.facebook.test", - referrer_source: "utm_source", - screen_size: "Desktop", - subdivision1_code: "FR-IDF", - subdivision2_code: "FR-75", - transferred_from: "", - utm_campaign: "utm_campaign", - utm_content: "utm_content", - utm_medium: "utm_medium", - utm_source: "utm_source", - utm_term: "utm_term" - } = get_event("plausible-ingestion-event-basic.test") + assert %Plausible.ClickhouseEvent{ + session_id: session_id, + user_id: user_id, + domain: "plausible-ingestion-event-basic.test", + browser: "Safari", + browser_version: "", + city_geoname_id: 2_988_507, + country_code: "FR", + hostname: "skywalker.test", + "meta.key": [], + "meta.value": [], + name: "pageview", + operating_system: "iOS", + operating_system_version: "3.2", + pathname: "/", + referrer: "m.facebook.test", + referrer_source: "utm_source", + screen_size: "Desktop", + subdivision1_code: "FR-IDF", + subdivision2_code: "FR-75", + transferred_from: "", + utm_campaign: "utm_campaign", + utm_content: "utm_content", + utm_medium: "utm_medium", + utm_source: "utm_source", + utm_term: "utm_term" + } = get_event("plausible-ingestion-event-basic.test") - assert is_integer(session_id) - assert is_integer(user_id) + assert is_integer(session_id) + assert is_integer(user_id) + end + + test "build_and_buffer/1 takes multiple domains" do + request = %Plausible.Ingestion.Request{ + @valid_request + | domains: [ + "plausible-ingestion-event-multiple-1.test", + "plausible-ingestion-event-multiple-2.test" + ] + } + + assert {:ok, %{buffered: [_, _], dropped: []}} = + Plausible.Ingestion.Event.build_and_buffer(request) + + assert %Plausible.ClickhouseEvent{domain: "plausible-ingestion-event-multiple-1.test"} = + get_event("plausible-ingestion-event-multiple-1.test") + + assert %Plausible.ClickhouseEvent{domain: "plausible-ingestion-event-multiple-2.test"} = + get_event("plausible-ingestion-event-multiple-2.test") + end + + test "build_and_buffer/1 drops invalid events" do + request = %Plausible.Ingestion.Request{ + @valid_request + | domains: ["plausible-ingestion-event-multiple-with-error-1.test", nil] + } + + assert {:ok, %{buffered: [_], dropped: [dropped]}} = + Plausible.Ingestion.Event.build_and_buffer(request) + + assert {:error, changeset} = dropped.drop_reason + refute changeset.valid? + + assert %Plausible.ClickhouseEvent{ + domain: "plausible-ingestion-event-multiple-with-error-1.test" + } = get_event("plausible-ingestion-event-multiple-with-error-1.test") + end + + defp get_event(domain) do + Plausible.TestUtils.eventually(fn -> + Plausible.Event.WriteBuffer.flush() + + event = + Plausible.ClickhouseRepo.one( + from e in Plausible.ClickhouseEvent, where: e.domain == ^domain + ) + + {!is_nil(event), event} + end) + end end end