mirror of
https://github.com/plausible/analytics.git
synced 2024-11-22 18:52:38 +03:00
Clickhouse (#66)
* Get stats from clickhosue * Pull stats from clickhouse * Use correct Query namespace * Use Clickhouse in unit tests * Use Clickhouse in stats controller tests * Use fixtures for unit tests * Add Clickhouse to travis * Use Clickhouse session store for sessions * Add garbage collection to session store * Reload session state from Clickhouse on server restart * Query from sessions table * Trap exits in event write buffer * Run hydration without starting the whole app * Make session length 30 minutes * Revert changes to fingerprint schema * Remove clickhouse from fingerprint sessions * Flush buffers before shutdown * Use old stats when merging * Remove old session schema * Fix tests with CH sessions * Add has_pageviews? to Stats * Use CH in staging * Update schema * Fix test setup
This commit is contained in:
parent
eeb54c4575
commit
d94a8dbc7e
@ -2,6 +2,7 @@ defmodule Mix.Tasks.HydrateClickhouse do
|
||||
use Mix.Task
|
||||
use Plausible.Repo
|
||||
require Logger
|
||||
@hash_key Keyword.fetch!(Application.get_env(:plausible, PlausibleWeb.Endpoint), :secret_key_base) |> binary_part(0, 16)
|
||||
|
||||
def run(args) do
|
||||
Application.ensure_all_started(:db_connection)
|
||||
@ -21,24 +22,25 @@ defmodule Mix.Tasks.HydrateClickhouse do
|
||||
|
||||
def create_events() do
|
||||
ddl = """
|
||||
CREATE TABLE IF NOT EXISTS events (
|
||||
CREATE TABLE events (
|
||||
timestamp DateTime,
|
||||
name String,
|
||||
domain String,
|
||||
user_id FixedString(64),
|
||||
user_id UInt64,
|
||||
session_id UInt64,
|
||||
hostname String,
|
||||
pathname String,
|
||||
referrer Nullable(String),
|
||||
referrer_source Nullable(String),
|
||||
initial_referrer Nullable(String),
|
||||
initial_referrer_source Nullable(String),
|
||||
country_code Nullable(FixedString(2)),
|
||||
screen_size Nullable(String),
|
||||
operating_system Nullable(String),
|
||||
browser Nullable(String)
|
||||
referrer String,
|
||||
referrer_source String,
|
||||
initial_referrer String,
|
||||
initial_referrer_source String,
|
||||
country_code LowCardinality(FixedString(2)),
|
||||
screen_size LowCardinality(String),
|
||||
operating_system LowCardinality(String),
|
||||
browser LowCardinality(String)
|
||||
) ENGINE = MergeTree()
|
||||
PARTITION BY toYYYYMM(timestamp)
|
||||
ORDER BY (name, domain, timestamp, user_id)
|
||||
ORDER BY (name, domain, user_id, timestamp)
|
||||
SETTINGS index_granularity = 8192
|
||||
"""
|
||||
|
||||
@ -48,29 +50,29 @@ defmodule Mix.Tasks.HydrateClickhouse do
|
||||
|
||||
def create_sessions() do
|
||||
ddl = """
|
||||
CREATE TABLE IF NOT EXISTS sessions (
|
||||
session_id UUID,
|
||||
CREATE TABLE sessions (
|
||||
session_id UInt64,
|
||||
sign Int8,
|
||||
domain String,
|
||||
user_id FixedString(64),
|
||||
user_id UInt64,
|
||||
hostname String,
|
||||
timestamp DateTime,
|
||||
start DateTime,
|
||||
is_bounce UInt8,
|
||||
entry_page Nullable(String),
|
||||
exit_page Nullable(String),
|
||||
entry_page String,
|
||||
exit_page String,
|
||||
pageviews Int32,
|
||||
events Int32,
|
||||
duration UInt32,
|
||||
referrer Nullable(String),
|
||||
referrer_source Nullable(String),
|
||||
country_code Nullable(FixedString(2)),
|
||||
screen_size Nullable(String),
|
||||
operating_system Nullable(String),
|
||||
browser Nullable(String)
|
||||
referrer String,
|
||||
referrer_source String,
|
||||
country_code LowCardinality(FixedString(2)),
|
||||
screen_size LowCardinality(String),
|
||||
operating_system LowCardinality(String),
|
||||
browser LowCardinality(String)
|
||||
) ENGINE = CollapsingMergeTree(sign)
|
||||
PARTITION BY toYYYYMM(start)
|
||||
ORDER BY (domain, start, user_id, session_id)
|
||||
ORDER BY (domain, user_id, session_id, start)
|
||||
SETTINGS index_granularity = 8192
|
||||
"""
|
||||
|
||||
@ -99,27 +101,33 @@ defmodule Mix.Tasks.HydrateClickhouse do
|
||||
event_chunks = from(e in Plausible.Event, where: e.domain == "plausible.io", order_by: e.id) |> chunk_query(10_000, repo)
|
||||
|
||||
Enum.reduce(event_chunks, %{}, fn events, session_cache ->
|
||||
{session_cache, sessions} = Enum.reduce(events, {session_cache, []}, fn event, {session_cache, sessions} ->
|
||||
{session_cache, sessions, events} = Enum.reduce(events, {session_cache, [], []}, fn event, {session_cache, sessions, new_events} ->
|
||||
found_session = session_cache[event.fingerprint]
|
||||
active = is_active?(found_session, event)
|
||||
user_id = SipHash.hash!(@hash_key, event.fingerprint)
|
||||
clickhouse_event = struct(Plausible.ClickhouseEvent, Map.from_struct(event) |> Map.put(:user_id, user_id))
|
||||
|
||||
cond do
|
||||
found_session && active ->
|
||||
new_session = update_session(found_session, event)
|
||||
new_session = update_session(found_session, clickhouse_event)
|
||||
{
|
||||
Map.put(session_cache, event.fingerprint, new_session),
|
||||
[%{new_session | sign: 1}, %{found_session | sign: -1} | sessions]
|
||||
[%{new_session | sign: 1}, %{found_session | sign: -1} | sessions],
|
||||
new_events ++ [%{clickhouse_event | session_id: new_session.session_id}]
|
||||
}
|
||||
found_session && !active ->
|
||||
new_session = new_session_from_event(event)
|
||||
new_session = new_session_from_event(clickhouse_event)
|
||||
{
|
||||
Map.put(session_cache, event.fingerprint, new_session),
|
||||
[new_session | sessions]
|
||||
[new_session | sessions],
|
||||
new_events ++ [%{clickhouse_event | session_id: new_session.session_id}]
|
||||
}
|
||||
true ->
|
||||
new_session = new_session_from_event(event)
|
||||
new_session = new_session_from_event(clickhouse_event)
|
||||
{
|
||||
Map.put(session_cache, event.fingerprint, new_session),
|
||||
[new_session | sessions]
|
||||
[new_session | sessions],
|
||||
new_events ++ [%{clickhouse_event | session_id: new_session.session_id}]
|
||||
}
|
||||
end
|
||||
end)
|
||||
@ -141,10 +149,10 @@ defmodule Mix.Tasks.HydrateClickhouse do
|
||||
defp new_session_from_event(event) do
|
||||
%Plausible.ClickhouseSession{
|
||||
sign: 1,
|
||||
session_id: UUID.uuid4(),
|
||||
session_id: Plausible.ClickhouseSession.random_uint64(),
|
||||
hostname: event.hostname,
|
||||
domain: event.domain,
|
||||
user_id: event.fingerprint,
|
||||
user_id: event.user_id,
|
||||
entry_page: event.pathname,
|
||||
exit_page: event.pathname,
|
||||
is_bounce: true,
|
||||
|
@ -16,7 +16,7 @@ defmodule Plausible.Clickhouse do
|
||||
""" <> String.duplicate(" (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?),", length(events))
|
||||
|
||||
args = Enum.reduce(events, [], fn event, acc ->
|
||||
[event.name, event.timestamp, event.domain, event.fingerprint, event.hostname, escape_quote(event.pathname), event.referrer, event.referrer_source, event.initial_referrer, event.initial_referrer_source, event.country_code, event.screen_size, event.browser, event.operating_system] ++ acc
|
||||
[event.name, event.timestamp, event.domain, event.user_id, event.hostname, escape_quote(event.pathname), event.referrer || "", event.referrer_source || "", event.initial_referrer || "", event.initial_referrer_source || "", event.country_code || "", event.screen_size || "", event.browser || "", event.operating_system || ""] ++ acc
|
||||
end)
|
||||
|
||||
Clickhousex.query(:clickhouse, insert, args, log: {Plausible.Clickhouse, :log, []})
|
||||
@ -29,7 +29,7 @@ defmodule Plausible.Clickhouse do
|
||||
""" <> String.duplicate(" (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?),", Enum.count(sessions))
|
||||
|
||||
args = Enum.reduce(sessions, [], fn session, acc ->
|
||||
[session.sign, session.session_id, session.domain, session.user_id, session.timestamp, session.hostname, session.start, session.is_bounce && 1 || 0, session.entry_page, session.exit_page, session.events, session.pageviews, session.duration, session.referrer, session.referrer_source,session.country_code, session.screen_size, session.browser, session.operating_system] ++ acc
|
||||
[session.sign, session.session_id, session.domain, session.user_id, session.timestamp, session.hostname, session.start, session.is_bounce && 1 || 0, session.entry_page, session.exit_page, session.events, session.pageviews, session.duration, session.referrer || "", session.referrer_source || "", session.country_code || "", session.screen_size || "", session.browser || "", session.operating_system || ""] ++ acc
|
||||
end)
|
||||
|
||||
Clickhousex.query(:clickhouse, insert, args, log: {Plausible.Clickhouse, :log, []})
|
||||
|
30
lib/plausible/event/clickhouse_schema.ex
Normal file
30
lib/plausible/event/clickhouse_schema.ex
Normal file
@ -0,0 +1,30 @@
|
||||
defmodule Plausible.ClickhouseEvent do
|
||||
use Ecto.Schema
|
||||
import Ecto.Changeset
|
||||
|
||||
schema "events" do
|
||||
field :name, :string
|
||||
field :domain, :string
|
||||
field :hostname, :string
|
||||
field :pathname, :string
|
||||
field :user_id, :integer
|
||||
field :session_id, :integer
|
||||
|
||||
field :referrer, :string
|
||||
field :referrer_source, :string
|
||||
field :initial_referrer, :string
|
||||
field :initial_referrer_source, :string
|
||||
field :country_code, :string
|
||||
field :screen_size, :string
|
||||
field :operating_system, :string
|
||||
field :browser, :string
|
||||
|
||||
timestamps(inserted_at: :timestamp, updated_at: false)
|
||||
end
|
||||
|
||||
def changeset(pageview, attrs) do
|
||||
pageview
|
||||
|> cast(attrs, [:name, :domain, :hostname, :pathname, :user_id, :operating_system, :browser, :referrer, :referrer_source, :initial_referrer, :initial_referrer_source, :country_code, :screen_size])
|
||||
|> validate_required([:name, :domain, :hostname, :pathname, :user_id])
|
||||
end
|
||||
end
|
@ -2,11 +2,12 @@ defmodule Plausible.ClickhouseSession do
|
||||
use Ecto.Schema
|
||||
import Ecto.Changeset
|
||||
|
||||
@primary_key {:session_id, :binary_id, autogenerate: false}
|
||||
@primary_key false
|
||||
schema "sessions" do
|
||||
field :hostname, :string
|
||||
field :domain, :string
|
||||
field :user_id, :string
|
||||
field :user_id, :integer
|
||||
field :session_id, :integer
|
||||
|
||||
field :start, :naive_datetime
|
||||
field :duration, :integer
|
||||
@ -26,6 +27,10 @@ defmodule Plausible.ClickhouseSession do
|
||||
field :timestamp, :naive_datetime
|
||||
end
|
||||
|
||||
def random_uint64() do
|
||||
:crypto.strong_rand_bytes(8) |> :binary.decode_unsigned()
|
||||
end
|
||||
|
||||
def changeset(session, attrs) do
|
||||
session
|
||||
|> cast(attrs, [:hostname, :domain, :entry_page, :exit_page, :referrer, :fingerprint, :start, :length, :is_bounce, :operating_system, :browser, :referrer_source, :country_code, :screen_size])
|
||||
|
@ -155,24 +155,28 @@ defmodule Plausible.Stats.Clickhouse do
|
||||
from e in base_query(site, query),
|
||||
select: {fragment("? as name", e.initial_referrer_source), fragment("min(?) as url", e.initial_referrer), fragment("uniq(user_id) as count")},
|
||||
group_by: e.initial_referrer_source,
|
||||
where: not is_nil(e.initial_referrer_source),
|
||||
where: e.initial_referrer_source != "",
|
||||
order_by: [desc: fragment("count")],
|
||||
limit: ^limit
|
||||
) |> Enum.map(fn ref ->
|
||||
Map.update(ref, :url, nil, fn url -> url && URI.parse("http://" <> url).host end)
|
||||
ref
|
||||
|> Map.update("url", nil, fn url -> url && URI.parse("http://" <> url).host end)
|
||||
|> Map.update("name", nil, fn name -> if name == "", do: "(no referrer)", else: name end)
|
||||
end)
|
||||
end
|
||||
|
||||
def top_referrers(site, query, limit \\ 5, include \\ []) do
|
||||
referrers = Clickhouse.all(
|
||||
from e in base_query(site, query),
|
||||
select: {fragment("? as name", e.referrer_source), fragment("min(?) as url", e.referrer), fragment("uniq(user_id) as count")},
|
||||
select: {fragment("? as name", e.referrer_source), fragment("any(?) as url", e.referrer), fragment("uniq(user_id) as count")},
|
||||
group_by: e.referrer_source,
|
||||
where: not is_nil(e.referrer_source),
|
||||
where: e.referrer_source != "",
|
||||
order_by: [desc: fragment("count")],
|
||||
limit: ^limit
|
||||
) |> Enum.map(fn ref ->
|
||||
Map.update(ref, :url, nil, fn url -> url && URI.parse("http://" <> url).host end)
|
||||
ref
|
||||
|> Map.update("url", nil, fn url -> url && URI.parse("http://" <> url).host end)
|
||||
|> Map.update("name", nil, fn name -> if name == "", do: "(no referrer)", else: name end)
|
||||
end)
|
||||
|
||||
if "bounce_rate" in include do
|
||||
@ -191,7 +195,7 @@ defmodule Plausible.Stats.Clickhouse do
|
||||
select: {s.referrer_source, fragment("count(*) as total"), fragment("round(sum(is_bounce * sign) / sum(sign) * 100) as bounce_rate")},
|
||||
where: s.domain == ^site.domain,
|
||||
where: s.start >= ^first_datetime and s.start < ^last_datetime,
|
||||
where: not is_nil(s.referrer_source),
|
||||
where: s.referrer_source != "",
|
||||
group_by: s.referrer_source,
|
||||
order_by: [desc: fragment("total")],
|
||||
limit: 100
|
||||
@ -269,7 +273,7 @@ defmodule Plausible.Stats.Clickhouse do
|
||||
select: {s.referrer, fragment("count(*) as total"), fragment("round(sum(is_bounce * sign) / sum(sign) * 100) as bounce_rate")},
|
||||
where: s.domain == ^site.domain,
|
||||
where: s.start >= ^first_datetime and s.start < ^last_datetime,
|
||||
where: not is_nil(s.referrer),
|
||||
where: s.referrer != "",
|
||||
group_by: s.referrer,
|
||||
order_by: [desc: fragment("total")],
|
||||
limit: 100
|
||||
@ -302,7 +306,6 @@ defmodule Plausible.Stats.Clickhouse do
|
||||
select: {s.entry_page, fragment("count(*) as total"), fragment("round(sum(is_bounce * sign) / sum(sign) * 100) as bounce_rate")},
|
||||
where: s.domain == ^site.domain,
|
||||
where: s.start >= ^first_datetime and s.start < ^last_datetime,
|
||||
where: not is_nil(s.entry_page),
|
||||
group_by: s.entry_page,
|
||||
order_by: [desc: fragment("total")],
|
||||
limit: 100
|
||||
@ -324,7 +327,7 @@ defmodule Plausible.Stats.Clickhouse do
|
||||
from e in base_query(site, query),
|
||||
select: {fragment("? as name", e.screen_size), fragment("uniq(user_id) as count")},
|
||||
group_by: e.screen_size,
|
||||
where: not is_nil(e.screen_size)
|
||||
where: e.screen_size != ""
|
||||
)
|
||||
|> Enum.sort(fn %{"name" => screen_size1}, %{"name" => screen_size2} ->
|
||||
index1 = Enum.find_index(@available_screen_sizes, fn s -> s == screen_size1 end)
|
||||
@ -339,7 +342,7 @@ defmodule Plausible.Stats.Clickhouse do
|
||||
from e in base_query(site, query),
|
||||
select: {fragment("? as name", e.country_code), fragment("uniq(user_id) as count")},
|
||||
group_by: e.country_code,
|
||||
where: not is_nil(e.country_code),
|
||||
where: e.country_code != "",
|
||||
order_by: [desc: fragment("count")]
|
||||
)
|
||||
|> Enum.map(fn stat ->
|
||||
@ -356,7 +359,7 @@ defmodule Plausible.Stats.Clickhouse do
|
||||
from e in base_query(site, query),
|
||||
select: {fragment("? as name", e.browser), fragment("uniq(user_id) as count")},
|
||||
group_by: e.browser,
|
||||
where: not is_nil(e.browser),
|
||||
where: e.browser != "",
|
||||
order_by: [desc: fragment("count")]
|
||||
)
|
||||
|> add_percentages
|
||||
@ -368,7 +371,7 @@ defmodule Plausible.Stats.Clickhouse do
|
||||
from e in base_query(site, query),
|
||||
select: {fragment("? as name", e.operating_system), fragment("uniq(user_id) as count")},
|
||||
group_by: e.operating_system,
|
||||
where: not is_nil(e.operating_system),
|
||||
where: e.operating_system != "",
|
||||
order_by: [desc: fragment("count")]
|
||||
)
|
||||
|> add_percentages
|
||||
|
@ -1,7 +1,7 @@
|
||||
defmodule PlausibleWeb.Api.StatsController do
|
||||
use PlausibleWeb, :controller
|
||||
use Plausible.Repo
|
||||
alias Plausible.Stats
|
||||
alias Plausible.Stats.Clickhouse, as: Stats
|
||||
alias Plausible.Stats.Query
|
||||
plug PlausibleWeb.AuthorizeStatsPlug
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
defmodule PlausibleWeb.StatsController do
|
||||
use PlausibleWeb, :controller
|
||||
use Plausible.Repo
|
||||
alias Plausible.Stats
|
||||
alias Plausible.Stats.Clickhouse, as: Stats
|
||||
alias Plausible.Stats.Query
|
||||
|
||||
plug PlausibleWeb.AuthorizeStatsPlug when action in [:stats, :csv_export]
|
||||
|
1
mix.exs
1
mix.exs
@ -63,6 +63,7 @@ defmodule Plausible.MixProject do
|
||||
{:csv, "~> 2.3"},
|
||||
{:oauther, "~> 1.1"},
|
||||
{:nanoid, "~> 2.0.2"},
|
||||
{:siphash, "~> 3.2"},
|
||||
{:clickhousex, [git: "https://github.com/atlas-forks/clickhousex.git"]}
|
||||
]
|
||||
end
|
||||
|
1
mix.lock
1
mix.lock
@ -55,6 +55,7 @@
|
||||
"ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"},
|
||||
"ref_inspector": {:hex, :ref_inspector, "1.3.0", "a02b89647440d084f2867ecece7a99895bcd4683482397fe086508bb22a165f3", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:yamerl, "~> 0.7", [hex: :yamerl, repo: "hexpm", optional: false]}], "hexpm", "d2069ae6b371112ac696a3cd116fd1e08d5726249b8d1357f377e67f0716cc10"},
|
||||
"sentry": {:hex, :sentry, "7.2.4", "b5bc90b594d40c2e653581e797a5fd2fdf994f2568f6bd66b7fa4971598be8d5", [:mix], [{:hackney, "~> 1.8 or 1.6.5", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix, "~> 1.3", [hex: :phoenix, repo: "hexpm", optional: true]}, {:plug, "~> 1.6", [hex: :plug, repo: "hexpm", optional: true]}, {:plug_cowboy, "~> 1.0 or ~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}], "hexpm", "4ee4d368b5013076afcc8b73ed028bdc8ee9db84ea987e3591101e194c1fc24b"},
|
||||
"siphash": {:hex, :siphash, "3.2.0", "ec03fd4066259218c85e2a4b8eec4bb9663bc02b127ea8a0836db376ba73f2ed", [:make, :mix], [], "hexpm", "ba3810701c6e95637a745e186e8a4899087c3b079ba88fb8f33df054c3b0b7c3"},
|
||||
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.5", "6eaf7ad16cb568bb01753dbbd7a95ff8b91c7979482b95f38443fe2c8852a79b", [:make, :mix, :rebar3], [], "hexpm", "13104d7897e38ed7f044c4de953a6c28597d1c952075eb2e328bc6d6f2bfc496"},
|
||||
"telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm", "4738382e36a0a9a2b6e25d67c960e40e1a2c95560b9f936d8e29de8cd858480f"},
|
||||
"timex": {:hex, :timex, "3.6.1", "efdf56d0e67a6b956cc57774353b0329c8ab7726766a11547e529357ffdc1d56", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:gettext, "~> 0.10", [hex: :gettext, repo: "hexpm", optional: false]}, {:tzdata, "~> 0.1.8 or ~> 0.5 or ~> 1.0.0", [hex: :tzdata, repo: "hexpm", optional: false]}], "hexpm", "f354efb2400dd7a80fd9eb6c8419068c4f632da4ac47f3d8822d6e33f08bc852"},
|
||||
|
@ -1,5 +1,6 @@
|
||||
defmodule Plausible.Factory do
|
||||
use ExMachina.Ecto, repo: Plausible.Repo
|
||||
@hash_key Keyword.fetch!(Application.get_env(:plausible, PlausibleWeb.Endpoint), :secret_key_base) |> binary_part(0, 16)
|
||||
|
||||
def user_factory(attrs) do
|
||||
pw = Map.get(attrs, :password, "password")
|
||||
@ -67,12 +68,12 @@ defmodule Plausible.Factory do
|
||||
def event_factory do
|
||||
hostname = sequence(:domain, &"example-#{&1}.com")
|
||||
|
||||
%Plausible.Event{
|
||||
%Plausible.ClickhouseEvent{
|
||||
hostname: hostname,
|
||||
domain: hostname,
|
||||
pathname: "/",
|
||||
timestamp: Timex.now(),
|
||||
fingerprint: UUID.uuid4()
|
||||
user_id: SipHash.hash!(@hash_key, UUID.uuid4())
|
||||
}
|
||||
end
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user