mirror of
https://github.com/plausible/analytics.git
synced 2024-12-24 01:54:34 +03:00
Parse event URL and domain in Plausible.Ingestion.Request (#2351)
* Parse event URL in Plausible.Ingestion.Request * Parse event domain in Plausible.Ingestion.Request * Rework ingestion pipeline processing (#2462) * Rework ingestion pipeline processing So that Request can have multiple domains and based on that each event is processed uniformly. The build_and_buffer/1 function now returns an accumulator with all the dropped/buffered events for further inspection. * Reduce function complexity * Don't chain struct fields to check for an empty host * Separate referrer and utm tags * Fix up `with` clause, credo was right cc @vinibrsl Co-authored-by: Adam Rutkowski <hq@mtod.org>
This commit is contained in:
parent
8b5ae9baaa
commit
994e7d09de
@ -1,39 +1,266 @@
|
||||
defmodule Plausible.Ingestion.Event do
|
||||
alias Plausible.Ingestion.{Request, CityOverrides}
|
||||
|
||||
@spec build_and_buffer(Request.t()) :: :ok | :skip | {:error, Ecto.Changeset.t()}
|
||||
@doc """
|
||||
Builds events from %Plausible.Ingestion.Request{} and adds them to Plausible.Event.WriteBuffer.
|
||||
This function reads geolocation data and parses the user agent string. Returns :skip if the
|
||||
request is identified as spam, or blocked.
|
||||
@moduledoc """
|
||||
This module exposes the `build_and_buffer/1` function capable of
|
||||
turning %Plausible.Ingestion.Request{} into a series of events that in turn
|
||||
are uniformly either buffered in batches (to Clickhouse) or dropped
|
||||
(e.g. due to spam blocklist) from the processing pipeline.
|
||||
"""
|
||||
def build_and_buffer(%Request{} = request) do
|
||||
with :ok <- spam_or_blocked?(request),
|
||||
salts <- Plausible.Session.Salts.fetch(),
|
||||
event <- Map.new(),
|
||||
%{} = event <- put_user_agent(event, request),
|
||||
%{} = event <- put_basic_info(event, request),
|
||||
%{} = event <- put_referrer(event, request),
|
||||
%{} = event <- put_geolocation(event, request),
|
||||
%{} = event <- put_screen_size(event, request),
|
||||
%{} = event <- put_props(event, request),
|
||||
events when is_list(events) <- map_domains(event, request),
|
||||
events when is_list(events) <- put_user_id(events, request, salts),
|
||||
{:ok, events} <- validate_events(events),
|
||||
events when is_list(events) <- register_session(events, request, salts) do
|
||||
Enum.each(events, &Plausible.Event.WriteBuffer.insert/1)
|
||||
alias Plausible.Ingestion.{Request, CityOverrides}
|
||||
alias Plausible.ClickhouseEvent
|
||||
|
||||
defstruct domain: nil,
|
||||
clickhouse_event_attrs: %{},
|
||||
clickhouse_event: nil,
|
||||
dropped?: false,
|
||||
drop_reason: nil,
|
||||
request: nil,
|
||||
salts: nil
|
||||
|
||||
@type drop_reason() ::
|
||||
:bot
|
||||
| :domain_blocked
|
||||
| :spam_referrer
|
||||
| {:error, Ecto.Changeset.t()}
|
||||
|
||||
@type t() :: %__MODULE__{
|
||||
domain: String.t() | nil,
|
||||
clickhouse_event_attrs: map(),
|
||||
clickhouse_event: %ClickhouseEvent{} | nil,
|
||||
dropped?: boolean(),
|
||||
drop_reason: drop_reason(),
|
||||
request: Request.t(),
|
||||
salts: map()
|
||||
}
|
||||
|
||||
@spec build_and_buffer(Request.t()) ::
|
||||
{:ok, %{dropped: [t()], buffered: [t()]}}
|
||||
def build_and_buffer(%Request{domains: domains} = request) do
|
||||
processed_events =
|
||||
if spam_referrer?(request) do
|
||||
for domain <- domains, do: drop(new(domain, request), :spam_referrer)
|
||||
else
|
||||
Enum.reduce(domains, [], fn domain, acc ->
|
||||
if domain_blocked?(domain) do
|
||||
[drop(new(domain, request), :domain_blocked) | acc]
|
||||
else
|
||||
processed =
|
||||
domain
|
||||
|> new(request)
|
||||
|> process_unless_dropped(pipeline())
|
||||
|
||||
[processed | acc]
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp put_basic_info(%{} = event, %Request{} = request) do
|
||||
uri = request.url && URI.parse(request.url)
|
||||
host = if uri && uri.host == "", do: "(none)", else: uri && uri.host
|
||||
{dropped, buffered} = Enum.split_with(processed_events, & &1.dropped?)
|
||||
{:ok, %{dropped: dropped, buffered: buffered}}
|
||||
end
|
||||
|
||||
defp pipeline() do
|
||||
[
|
||||
&put_user_agent/1,
|
||||
&put_basic_info/1,
|
||||
&put_referrer/1,
|
||||
&put_utm_tags/1,
|
||||
&put_geolocation/1,
|
||||
&put_screen_size/1,
|
||||
&put_props/1,
|
||||
&put_salts/1,
|
||||
&put_user_id/1,
|
||||
&validate_clickhouse_event/1,
|
||||
®ister_session/1,
|
||||
&write_to_buffer/1
|
||||
]
|
||||
end
|
||||
|
||||
defp process_unless_dropped(%__MODULE__{} = initial_event, pipeline) do
|
||||
Enum.reduce_while(pipeline, initial_event, fn pipeline_step, acc_event ->
|
||||
case pipeline_step.(acc_event) do
|
||||
%__MODULE__{dropped?: true} = dropped -> {:halt, dropped}
|
||||
%__MODULE__{dropped?: false} = event -> {:cont, event}
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp new(domain, request) do
|
||||
%__MODULE__{domain: domain, request: request}
|
||||
end
|
||||
|
||||
defp drop(%__MODULE__{} = event, reason) do
|
||||
%{event | dropped?: true, drop_reason: reason}
|
||||
end
|
||||
|
||||
defp update_attrs(%__MODULE__{} = event, %{} = attrs) do
|
||||
%{event | clickhouse_event_attrs: Map.merge(event.clickhouse_event_attrs, attrs)}
|
||||
end
|
||||
|
||||
defp put_user_agent(%__MODULE__{} = event) do
|
||||
case parse_user_agent(event.request) do
|
||||
%UAInspector.Result{client: %UAInspector.Result.Client{name: "Headless Chrome"}} ->
|
||||
drop(event, :bot)
|
||||
|
||||
%UAInspector.Result.Bot{} ->
|
||||
drop(event, :bot)
|
||||
|
||||
%UAInspector.Result{} = user_agent ->
|
||||
update_attrs(event, %{
|
||||
operating_system: os_name(user_agent),
|
||||
operating_system_version: os_version(user_agent),
|
||||
browser: browser_name(user_agent),
|
||||
browser_version: browser_version(user_agent)
|
||||
})
|
||||
|
||||
_any ->
|
||||
event
|
||||
end
|
||||
end
|
||||
|
||||
defp put_basic_info(%__MODULE__{} = event) do
|
||||
host =
|
||||
case event.request.uri do
|
||||
%{host: ""} -> "(none)"
|
||||
%{host: host} when is_binary(host) -> host
|
||||
_ -> nil
|
||||
end
|
||||
|
||||
update_attrs(event, %{
|
||||
domain: event.domain,
|
||||
timestamp: NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second),
|
||||
name: event.request.event_name,
|
||||
hostname: Request.sanitize_hostname(host),
|
||||
pathname: get_pathname(event.request.uri, event.request.hash_mode)
|
||||
})
|
||||
end
|
||||
|
||||
defp put_referrer(%__MODULE__{} = event) do
|
||||
ref = parse_referrer(event.request.uri, event.request.referrer)
|
||||
|
||||
update_attrs(event, %{
|
||||
referrer_source: get_referrer_source(event.request, ref),
|
||||
referrer: clean_referrer(ref)
|
||||
})
|
||||
end
|
||||
|
||||
defp put_utm_tags(%__MODULE__{} = event) do
|
||||
query_params = event.request.query_params
|
||||
|
||||
update_attrs(event, %{
|
||||
utm_medium: query_params["utm_medium"],
|
||||
utm_source: query_params["utm_source"],
|
||||
utm_campaign: query_params["utm_campaign"],
|
||||
utm_content: query_params["utm_content"],
|
||||
utm_term: query_params["utm_term"]
|
||||
})
|
||||
end
|
||||
|
||||
defp put_geolocation(%__MODULE__{} = event) do
|
||||
result = Geolix.lookup(event.request.remote_ip, where: :geolocation)
|
||||
|
||||
country_code =
|
||||
get_in(result, [:country, :iso_code])
|
||||
|> ignore_unknown_country()
|
||||
|
||||
city_geoname_id = get_in(result, [:city, :geoname_id])
|
||||
city_geoname_id = Map.get(CityOverrides.get(), city_geoname_id, city_geoname_id)
|
||||
|
||||
subdivision1_code =
|
||||
case result do
|
||||
%{subdivisions: [%{iso_code: iso_code} | _rest]} ->
|
||||
country_code <> "-" <> iso_code
|
||||
|
||||
_ ->
|
||||
""
|
||||
end
|
||||
|
||||
subdivision2_code =
|
||||
case result do
|
||||
%{subdivisions: [_first, %{iso_code: iso_code} | _rest]} ->
|
||||
country_code <> "-" <> iso_code
|
||||
|
||||
_ ->
|
||||
""
|
||||
end
|
||||
|
||||
update_attrs(event, %{
|
||||
country_code: country_code,
|
||||
subdivision1_code: subdivision1_code,
|
||||
subdivision2_code: subdivision2_code,
|
||||
city_geoname_id: city_geoname_id
|
||||
})
|
||||
end
|
||||
|
||||
defp put_screen_size(%__MODULE__{} = event) do
|
||||
screen_size =
|
||||
case event.request.screen_width do
|
||||
nil -> nil
|
||||
width when width < 576 -> "Mobile"
|
||||
width when width < 992 -> "Tablet"
|
||||
width when width < 1440 -> "Laptop"
|
||||
width when width >= 1440 -> "Desktop"
|
||||
end
|
||||
|
||||
update_attrs(event, %{screen_size: screen_size})
|
||||
end
|
||||
|
||||
defp put_props(%__MODULE__{request: %{props: %{} = props}} = event) do
|
||||
update_attrs(event, %{
|
||||
"meta.key": Map.keys(props),
|
||||
"meta.value": Enum.map(props, fn {_, v} -> to_string(v) end)
|
||||
})
|
||||
end
|
||||
|
||||
defp put_props(%__MODULE__{} = event), do: event
|
||||
|
||||
defp put_salts(%__MODULE__{} = event) do
|
||||
%{event | salts: Plausible.Session.Salts.fetch()}
|
||||
end
|
||||
|
||||
defp put_user_id(%__MODULE__{} = event) do
|
||||
update_attrs(event, %{
|
||||
user_id:
|
||||
generate_user_id(
|
||||
event.request,
|
||||
event.domain,
|
||||
event.clickhouse_event_attrs.hostname,
|
||||
event.salts.current
|
||||
)
|
||||
})
|
||||
end
|
||||
|
||||
defp validate_clickhouse_event(%__MODULE__{} = event) do
|
||||
clickhouse_event =
|
||||
event
|
||||
|> Map.fetch!(:clickhouse_event_attrs)
|
||||
|> ClickhouseEvent.new()
|
||||
|
||||
case Ecto.Changeset.apply_action(clickhouse_event, nil) do
|
||||
{:ok, valid_clickhouse_event} ->
|
||||
%{event | clickhouse_event: valid_clickhouse_event}
|
||||
|
||||
{:error, changeset} ->
|
||||
drop(event, {:error, changeset})
|
||||
end
|
||||
end
|
||||
|
||||
defp register_session(%__MODULE__{} = event) do
|
||||
previous_user_id =
|
||||
generate_user_id(
|
||||
event.request,
|
||||
event.domain,
|
||||
event.clickhouse_event.hostname,
|
||||
event.salts.previous
|
||||
)
|
||||
|
||||
session_id = Plausible.Session.CacheStore.on_event(event.clickhouse_event, previous_user_id)
|
||||
|
||||
clickhouse_event = Map.put(event.clickhouse_event, :session_id, session_id)
|
||||
%{event | clickhouse_event: clickhouse_event}
|
||||
end
|
||||
|
||||
defp write_to_buffer(%__MODULE__{clickhouse_event: clickhouse_event} = event) do
|
||||
{:ok, _} = Plausible.Event.WriteBuffer.insert(clickhouse_event)
|
||||
event
|
||||
|> Map.put(:timestamp, NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second))
|
||||
|> Map.put(:name, request.event_name)
|
||||
|> Map.put(:hostname, strip_www(host))
|
||||
|> Map.put(:pathname, get_pathname(uri, request.hash_mode))
|
||||
end
|
||||
|
||||
defp get_pathname(_uri = nil, _hash_mode), do: "/"
|
||||
@ -51,36 +278,13 @@ defmodule Plausible.Ingestion.Event do
|
||||
end
|
||||
end
|
||||
|
||||
defp put_props(%{} = event, %Request{} = request) do
|
||||
if is_map(request.props) do
|
||||
event
|
||||
|> Map.put(:"meta.key", Map.keys(request.props))
|
||||
|> Map.put(:"meta.value", Map.values(request.props) |> Enum.map(&to_string/1))
|
||||
else
|
||||
event
|
||||
end
|
||||
end
|
||||
|
||||
defp put_referrer(%{} = event, %Request{} = request) do
|
||||
uri = request.url && URI.parse(request.url)
|
||||
ref = parse_referrer(uri, request.referrer)
|
||||
|
||||
event
|
||||
|> Map.put(:utm_medium, request.query_params["utm_medium"])
|
||||
|> Map.put(:utm_source, request.query_params["utm_source"])
|
||||
|> Map.put(:utm_campaign, request.query_params["utm_campaign"])
|
||||
|> Map.put(:utm_content, request.query_params["utm_content"])
|
||||
|> Map.put(:utm_term, request.query_params["utm_term"])
|
||||
|> Map.put(:referrer_source, get_referrer_source(request, ref))
|
||||
|> Map.put(:referrer, clean_referrer(ref))
|
||||
end
|
||||
|
||||
defp parse_referrer(_uri, _referrer_str = nil), do: nil
|
||||
|
||||
defp parse_referrer(uri, referrer_str) do
|
||||
referrer_uri = URI.parse(referrer_str)
|
||||
|
||||
if strip_www(referrer_uri.host) !== strip_www(uri.host) && referrer_uri.host !== "localhost" do
|
||||
if Request.sanitize_hostname(referrer_uri.host) !== Request.sanitize_hostname(uri.host) &&
|
||||
referrer_uri.host !== "localhost" do
|
||||
RefInspector.parse(referrer_str)
|
||||
end
|
||||
end
|
||||
@ -106,26 +310,6 @@ defmodule Plausible.Ingestion.Event do
|
||||
end
|
||||
end
|
||||
|
||||
defp put_user_agent(%{} = event, %Request{} = request) do
|
||||
case parse_user_agent(request) do
|
||||
%UAInspector.Result{client: %UAInspector.Result.Client{name: "Headless Chrome"}} ->
|
||||
:skip
|
||||
|
||||
%UAInspector.Result.Bot{} ->
|
||||
:skip
|
||||
|
||||
%UAInspector.Result{} = user_agent ->
|
||||
event
|
||||
|> Map.put(:operating_system, os_name(user_agent))
|
||||
|> Map.put(:operating_system_version, os_version(user_agent))
|
||||
|> Map.put(:browser, browser_name(user_agent))
|
||||
|> Map.put(:browser_version, browser_version(user_agent))
|
||||
|
||||
_any ->
|
||||
event
|
||||
end
|
||||
end
|
||||
|
||||
defp parse_user_agent(%Request{user_agent: user_agent}) when is_binary(user_agent) do
|
||||
case Cachex.fetch(:user_agents, user_agent, &UAInspector.parse/1) do
|
||||
{:ok, user_agent} -> user_agent
|
||||
@ -189,88 +373,9 @@ defmodule Plausible.Ingestion.Event do
|
||||
end
|
||||
end
|
||||
|
||||
defp put_screen_size(%{} = event, %Request{} = request) do
|
||||
screen_width =
|
||||
case request.screen_width do
|
||||
nil -> nil
|
||||
width when width < 576 -> "Mobile"
|
||||
width when width < 992 -> "Tablet"
|
||||
width when width < 1440 -> "Laptop"
|
||||
width when width >= 1440 -> "Desktop"
|
||||
end
|
||||
|
||||
Map.put(event, :screen_size, screen_width)
|
||||
end
|
||||
|
||||
defp put_geolocation(%{} = event, %Request{} = request) do
|
||||
result = Geolix.lookup(request.remote_ip, where: :geolocation)
|
||||
|
||||
country_code =
|
||||
get_in(result, [:country, :iso_code])
|
||||
|> ignore_unknown_country()
|
||||
|
||||
city_geoname_id = get_in(result, [:city, :geoname_id])
|
||||
city_geoname_id = Map.get(CityOverrides.get(), city_geoname_id, city_geoname_id)
|
||||
|
||||
subdivision1_code =
|
||||
case result do
|
||||
%{subdivisions: [%{iso_code: iso_code} | _rest]} ->
|
||||
country_code <> "-" <> iso_code
|
||||
|
||||
_ ->
|
||||
""
|
||||
end
|
||||
|
||||
subdivision2_code =
|
||||
case result do
|
||||
%{subdivisions: [_first, %{iso_code: iso_code} | _rest]} ->
|
||||
country_code <> "-" <> iso_code
|
||||
|
||||
_ ->
|
||||
""
|
||||
end
|
||||
|
||||
event
|
||||
|> Map.put(:country_code, country_code)
|
||||
|> Map.put(:subdivision1_code, subdivision1_code)
|
||||
|> Map.put(:subdivision2_code, subdivision2_code)
|
||||
|> Map.put(:city_geoname_id, city_geoname_id)
|
||||
end
|
||||
|
||||
defp ignore_unknown_country("ZZ"), do: nil
|
||||
defp ignore_unknown_country(country), do: country
|
||||
|
||||
defp map_domains(%{} = event, %Request{} = request) do
|
||||
domains =
|
||||
if request.domain do
|
||||
String.split(request.domain, ",")
|
||||
|> Enum.map(&String.trim/1)
|
||||
|> Enum.map(&strip_www/1)
|
||||
else
|
||||
uri = request.url && URI.parse(request.url)
|
||||
[strip_www(uri && uri.host)]
|
||||
end
|
||||
|
||||
for domain <- domains, do: Map.put(event, :domain, domain)
|
||||
end
|
||||
|
||||
defp put_user_id(events, %Request{} = request, salts) do
|
||||
for %{} = event <- events do
|
||||
user_id = generate_user_id(request, event.domain, event.hostname, salts.current)
|
||||
Map.put(event, :user_id, user_id)
|
||||
end
|
||||
end
|
||||
|
||||
defp register_session(events, %Request{} = request, salts) do
|
||||
for %Plausible.ClickhouseEvent{} = event <- events do
|
||||
previous_user_id = generate_user_id(request, event.domain, event.hostname, salts.previous)
|
||||
|
||||
session_id = Plausible.Session.CacheStore.on_event(event, previous_user_id)
|
||||
|
||||
Map.put(event, :session_id, session_id)
|
||||
end
|
||||
end
|
||||
|
||||
defp generate_user_id(request, domain, hostname, salt) do
|
||||
cond do
|
||||
is_nil(salt) ->
|
||||
@ -296,40 +401,16 @@ defmodule Plausible.Ingestion.Event do
|
||||
end
|
||||
end
|
||||
|
||||
defp spam_or_blocked?(%Request{} = request) do
|
||||
cond do
|
||||
request.domain in Application.get_env(:plausible, :domain_blacklist) ->
|
||||
:skip
|
||||
|
||||
FunWithFlags.enabled?(:block_event_ingest, for: request.domain) ->
|
||||
:skip
|
||||
|
||||
request.referrer &&
|
||||
URI.parse(request.referrer).host |> strip_www() |> ReferrerBlocklist.is_spammer?() ->
|
||||
:skip
|
||||
|
||||
true ->
|
||||
:ok
|
||||
end
|
||||
defp spam_referrer?(%Request{referrer: referrer}) when is_binary(referrer) do
|
||||
URI.parse(referrer).host
|
||||
|> Request.sanitize_hostname()
|
||||
|> ReferrerBlocklist.is_spammer?()
|
||||
end
|
||||
|
||||
defp validate_events(events) do
|
||||
Enum.reduce_while(events, {:ok, []}, fn %{} = attrs, {:ok, acc} ->
|
||||
attrs
|
||||
|> Plausible.ClickhouseEvent.new()
|
||||
|> Ecto.Changeset.apply_action(nil)
|
||||
|> case do
|
||||
{:ok, event} -> {:cont, {:ok, [event | acc]}}
|
||||
{:error, changeset} -> {:halt, {:error, changeset}}
|
||||
end
|
||||
end)
|
||||
end
|
||||
defp spam_referrer?(_), do: false
|
||||
|
||||
defp strip_www(hostname) do
|
||||
if hostname do
|
||||
String.replace_prefix(hostname, "www.", "")
|
||||
else
|
||||
nil
|
||||
end
|
||||
defp domain_blocked?(domain) do
|
||||
domain in Application.get_env(:plausible, :domain_blacklist) or
|
||||
FunWithFlags.enabled?(:block_event_ingest, for: domain)
|
||||
end
|
||||
end
|
||||
|
@ -7,9 +7,9 @@ defmodule Plausible.Ingestion.Request do
|
||||
:remote_ip,
|
||||
:user_agent,
|
||||
:event_name,
|
||||
:url,
|
||||
:uri,
|
||||
:referrer,
|
||||
:domain,
|
||||
:domains,
|
||||
:screen_width,
|
||||
:hash_mode,
|
||||
props: %{},
|
||||
@ -20,9 +20,9 @@ defmodule Plausible.Ingestion.Request do
|
||||
remote_ip: String.t() | nil,
|
||||
user_agent: String.t() | nil,
|
||||
event_name: term(),
|
||||
url: term(),
|
||||
uri: URI.t() | nil,
|
||||
referrer: term(),
|
||||
domain: term(),
|
||||
domains: list(String.t()),
|
||||
screen_width: term(),
|
||||
hash_mode: term(),
|
||||
props: map(),
|
||||
@ -31,15 +31,17 @@ defmodule Plausible.Ingestion.Request do
|
||||
|
||||
@spec build(Plug.Conn.t()) :: {:ok, t()} | {:error, :invalid_json}
|
||||
@doc """
|
||||
Builds a %Plausible.Ingestion.Request{} struct from %Plug.Conn{}.
|
||||
Builds a list of %Plausible.Ingestion.Request{} struct from %Plug.Conn{}.
|
||||
"""
|
||||
def build(%Plug.Conn{} = conn) do
|
||||
with {:ok, request_body} <- parse_body(conn) do
|
||||
%__MODULE__{}
|
||||
|> Map.put(:remote_ip, PlausibleWeb.RemoteIp.get(conn))
|
||||
|> put_uri(request_body)
|
||||
|> put_user_agent(conn)
|
||||
|> put_request_params(request_body)
|
||||
|> put_query_params()
|
||||
|> map_domains(request_body)
|
||||
|> then(&{:ok, &1})
|
||||
end
|
||||
end
|
||||
@ -63,15 +65,34 @@ defmodule Plausible.Ingestion.Request do
|
||||
%__MODULE__{
|
||||
request
|
||||
| event_name: request_body["n"] || request_body["name"],
|
||||
url: request_body["u"] || request_body["url"],
|
||||
referrer: request_body["r"] || request_body["referrer"],
|
||||
domain: request_body["d"] || request_body["domain"],
|
||||
screen_width: request_body["w"] || request_body["screen_width"],
|
||||
hash_mode: request_body["h"] || request_body["hashMode"],
|
||||
props: parse_props(request_body)
|
||||
}
|
||||
end
|
||||
|
||||
defp map_domains(%__MODULE__{} = request, %{} = request_body) do
|
||||
domains =
|
||||
if raw = request_body["d"] || request_body["domain"] do
|
||||
raw
|
||||
|> String.split(",")
|
||||
|> Enum.map(&sanitize_hostname/1)
|
||||
else
|
||||
[sanitize_hostname(request.uri)]
|
||||
end
|
||||
|
||||
%__MODULE__{request | domains: domains}
|
||||
end
|
||||
|
||||
defp put_uri(%__MODULE__{} = request, %{} = request_body) do
|
||||
if url = request_body["u"] || request_body["url"] do
|
||||
%__MODULE__{request | uri: URI.parse(url)}
|
||||
else
|
||||
request
|
||||
end
|
||||
end
|
||||
|
||||
defp parse_props(%{} = request_body) do
|
||||
raw_props =
|
||||
request_body["m"] || request_body["meta"] || request_body["p"] || request_body["props"]
|
||||
@ -111,13 +132,13 @@ defmodule Plausible.Ingestion.Request do
|
||||
end
|
||||
end
|
||||
|
||||
defp put_query_params(%__MODULE__{url: url} = request) do
|
||||
with url when is_binary(url) <- url,
|
||||
%URI{query: query} when is_binary(query) <- URI.parse(url),
|
||||
%{} = query_params <- URI.decode_query(query) do
|
||||
Map.put(request, :query_params, query_params)
|
||||
else
|
||||
_any -> request
|
||||
defp put_query_params(%__MODULE__{} = request) do
|
||||
case request do
|
||||
%__MODULE__{uri: %URI{query: query}} when is_binary(query) ->
|
||||
%__MODULE__{request | query_params: URI.decode_query(query)}
|
||||
|
||||
_any ->
|
||||
request
|
||||
end
|
||||
end
|
||||
|
||||
@ -129,4 +150,21 @@ defmodule Plausible.Ingestion.Request do
|
||||
|
||||
%__MODULE__{request | user_agent: user_agent}
|
||||
end
|
||||
|
||||
@doc """
|
||||
Removes the "www" part of a hostname.
|
||||
"""
|
||||
def sanitize_hostname(%URI{host: hostname}) do
|
||||
sanitize_hostname(hostname)
|
||||
end
|
||||
|
||||
def sanitize_hostname(hostname) when is_binary(hostname) do
|
||||
hostname
|
||||
|> String.trim()
|
||||
|> String.replace_prefix("www.", "")
|
||||
end
|
||||
|
||||
def sanitize_hostname(nil) do
|
||||
nil
|
||||
end
|
||||
end
|
||||
|
@ -8,29 +8,37 @@ defmodule PlausibleWeb.Api.ExternalController do
|
||||
use PlausibleWeb, :controller
|
||||
require Logger
|
||||
|
||||
def event(conn, _params) do
|
||||
with {:ok, ingestion_request} <- Plausible.Ingestion.Request.build(conn),
|
||||
_ <- Sentry.Context.set_extra_context(%{request: ingestion_request}),
|
||||
:ok <- Plausible.Ingestion.Event.build_and_buffer(ingestion_request) do
|
||||
conn |> put_status(202) |> text("ok")
|
||||
else
|
||||
:skip ->
|
||||
conn |> put_status(202) |> text("ok")
|
||||
alias Plausible.Ingestion
|
||||
|
||||
def event(conn, _params) do
|
||||
with {:ok, request} <- Ingestion.Request.build(conn),
|
||||
_ <- Sentry.Context.set_extra_context(%{request: request}) do
|
||||
case Ingestion.Event.build_and_buffer(request) do
|
||||
{:ok, %{dropped: [], buffered: _buffered}} ->
|
||||
conn
|
||||
|> put_status(202)
|
||||
|> text("ok")
|
||||
|
||||
{:ok, %{dropped: dropped, buffered: _}} ->
|
||||
first_invalid_changeset = find_first_invalid_changeset(dropped)
|
||||
|
||||
if first_invalid_changeset do
|
||||
conn
|
||||
|> put_resp_header("x-plausible-dropped", "#{Enum.count(dropped)}")
|
||||
|> put_status(400)
|
||||
|> json(%{errors: traverse_errors(first_invalid_changeset)})
|
||||
else
|
||||
conn
|
||||
|> put_resp_header("x-plausible-dropped", "#{Enum.count(dropped)}")
|
||||
|> put_status(202)
|
||||
|> text("ok")
|
||||
end
|
||||
end
|
||||
else
|
||||
{:error, :invalid_json} ->
|
||||
conn
|
||||
|> put_status(400)
|
||||
|> json(%{errors: %{request: "Unable to parse request body as json"}})
|
||||
|
||||
{:error, %Ecto.Changeset{} = changeset} ->
|
||||
errors =
|
||||
Ecto.Changeset.traverse_errors(changeset, fn {msg, opts} ->
|
||||
Regex.replace(~r"%{(\w+)}", msg, fn _, key ->
|
||||
opts |> Keyword.get(String.to_existing_atom(key), key) |> to_string()
|
||||
end)
|
||||
end)
|
||||
|
||||
conn |> put_status(400) |> json(%{errors: errors})
|
||||
end
|
||||
end
|
||||
|
||||
@ -86,4 +94,23 @@ defmodule PlausibleWeb.Api.ExternalController do
|
||||
build: build
|
||||
})
|
||||
end
|
||||
|
||||
defp find_first_invalid_changeset(dropped) do
|
||||
Enum.find_value(dropped, nil, fn dropped_event ->
|
||||
case dropped_event.drop_reason do
|
||||
{:error, %Ecto.Changeset{} = changeset} -> changeset
|
||||
_ -> false
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp traverse_errors(changeset) do
|
||||
Ecto.Changeset.traverse_errors(changeset, fn {msg, opts} ->
|
||||
Regex.replace(~r"%{(\w+)}", msg, fn _, key ->
|
||||
opts
|
||||
|> Keyword.get(String.to_existing_atom(key), key)
|
||||
|> to_string()
|
||||
end)
|
||||
end)
|
||||
end
|
||||
end
|
||||
|
@ -1,25 +1,12 @@
|
||||
defmodule Plausible.Ingestion.EventTest do
|
||||
use Plausible.DataCase
|
||||
|
||||
def get_event(domain) do
|
||||
Plausible.TestUtils.eventually(fn ->
|
||||
Plausible.Event.WriteBuffer.flush()
|
||||
|
||||
event =
|
||||
Plausible.ClickhouseRepo.one(
|
||||
from e in Plausible.ClickhouseEvent, where: e.domain == ^domain
|
||||
)
|
||||
|
||||
{!is_nil(event), event}
|
||||
end)
|
||||
end
|
||||
|
||||
@valid_request %Plausible.Ingestion.Request{
|
||||
remote_ip: "2.2.2.2",
|
||||
user_agent:
|
||||
"Mozilla/5.0 (iPad; U; CPU OS 3_2_1 like Mac OS X; en-us) AppleWebKit/531.21.10 (KHTML, like Gecko) Mobile/7B405",
|
||||
event_name: "pageview",
|
||||
url: "http://skywalker.test",
|
||||
uri: URI.parse("http://skywalker.test"),
|
||||
referrer: "http://m.facebook.test/",
|
||||
screen_width: 1440,
|
||||
hash_mode: nil,
|
||||
@ -34,10 +21,11 @@ defmodule Plausible.Ingestion.EventTest do
|
||||
}
|
||||
}
|
||||
|
||||
test "build_and_buffer/3 creates an event" do
|
||||
assert :ok ==
|
||||
describe "integration" do
|
||||
test "build_and_buffer/1 creates an event" do
|
||||
assert {:ok, %{buffered: [_], dropped: []}} =
|
||||
@valid_request
|
||||
|> Map.put(:domain, "plausible-ingestion-event-basic.test")
|
||||
|> Map.put(:domains, ["plausible-ingestion-event-basic.test"])
|
||||
|> Plausible.Ingestion.Event.build_and_buffer()
|
||||
|
||||
assert %Plausible.ClickhouseEvent{
|
||||
@ -71,4 +59,54 @@ defmodule Plausible.Ingestion.EventTest do
|
||||
assert is_integer(session_id)
|
||||
assert is_integer(user_id)
|
||||
end
|
||||
|
||||
test "build_and_buffer/1 takes multiple domains" do
|
||||
request = %Plausible.Ingestion.Request{
|
||||
@valid_request
|
||||
| domains: [
|
||||
"plausible-ingestion-event-multiple-1.test",
|
||||
"plausible-ingestion-event-multiple-2.test"
|
||||
]
|
||||
}
|
||||
|
||||
assert {:ok, %{buffered: [_, _], dropped: []}} =
|
||||
Plausible.Ingestion.Event.build_and_buffer(request)
|
||||
|
||||
assert %Plausible.ClickhouseEvent{domain: "plausible-ingestion-event-multiple-1.test"} =
|
||||
get_event("plausible-ingestion-event-multiple-1.test")
|
||||
|
||||
assert %Plausible.ClickhouseEvent{domain: "plausible-ingestion-event-multiple-2.test"} =
|
||||
get_event("plausible-ingestion-event-multiple-2.test")
|
||||
end
|
||||
|
||||
test "build_and_buffer/1 drops invalid events" do
|
||||
request = %Plausible.Ingestion.Request{
|
||||
@valid_request
|
||||
| domains: ["plausible-ingestion-event-multiple-with-error-1.test", nil]
|
||||
}
|
||||
|
||||
assert {:ok, %{buffered: [_], dropped: [dropped]}} =
|
||||
Plausible.Ingestion.Event.build_and_buffer(request)
|
||||
|
||||
assert {:error, changeset} = dropped.drop_reason
|
||||
refute changeset.valid?
|
||||
|
||||
assert %Plausible.ClickhouseEvent{
|
||||
domain: "plausible-ingestion-event-multiple-with-error-1.test"
|
||||
} = get_event("plausible-ingestion-event-multiple-with-error-1.test")
|
||||
end
|
||||
|
||||
defp get_event(domain) do
|
||||
Plausible.TestUtils.eventually(fn ->
|
||||
Plausible.Event.WriteBuffer.flush()
|
||||
|
||||
event =
|
||||
Plausible.ClickhouseRepo.one(
|
||||
from e in Plausible.ClickhouseEvent, where: e.domain == ^domain
|
||||
)
|
||||
|
||||
{!is_nil(event), event}
|
||||
end)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
Loading…
Reference in New Issue
Block a user