Flush Clickhouse buffers faster by encoding early (#3623)

* encode early

* rename: CLICKHOUSE_MAX_BUFFER_SIZE -> CLICKHOUSE_MAX_BUFFER_SIZE_BYTES

* deprecate CLICKHOUSE_MAX_BUFFER_SIZE

* cleanup

---------

Co-authored-by: hq1 <hq@mtod.org>
This commit is contained in:
ruslandoga 2023-12-14 21:21:27 +08:00 committed by GitHub
parent 17ba44e8db
commit 6639d6af63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 221 additions and 144 deletions

View File

@ -16,16 +16,19 @@ All notable changes to this project will be documented in this file.
- Add support for JSON logger, via LOG_FORMAT=json environment variable
- Add support for 2FA authentication
- Add 'browser_versions.csv' to CSV export
- Add `CLICKHOUSE_MAX_BUFFER_SIZE_BYTES` env var which defaults to `100000` (100KB)
### Removed
- Removed the nested custom event property breakdown UI when filtering by a goal in Goal Conversions
- Removed the `prop-breakdown.csv` file from CSV export
- Deprecated `CLICKHOUSE_MAX_BUFFER_SIZE`
### Changed
- Limit the number of Goal Conversions shown on the dashboard and render a "Details" link when there are more entries to show
- Show Outbound Links / File Downloads / 404 Pages / Cloaked Links instead of Goal Conversions when filtering by the corresponding goal
- Require custom properties to be explicitly added from Site Settings > Custom Properties in order for them to show up on the dashboard
- GA/SC sections moved to new settings: Integrations
- Replace `CLICKHOUSE_MAX_BUFFER_SIZE` with `CLICKHOUSE_MAX_BUFFER_SIZE_BYTES`
### Fixed
- Only return `(none)` values in custom property breakdown for the first page (pagination) of results

View File

@ -140,9 +140,15 @@ ch_db_url =
|> get_var_from_path_or_env("CLICKHOUSE_FLUSH_INTERVAL_MS", "5000")
|> Integer.parse()
if get_var_from_path_or_env(config_dir, "CLICKHOUSE_MAX_BUFFER_SIZE") do
Logger.warning(
"CLICKHOUSE_MAX_BUFFER_SIZE is deprecated, please use CLICKHOUSE_MAX_BUFFER_SIZE_BYTES instead"
)
end
{ch_max_buffer_size, ""} =
config_dir
|> get_var_from_path_or_env("CLICKHOUSE_MAX_BUFFER_SIZE", "10000")
|> get_var_from_path_or_env("CLICKHOUSE_MAX_BUFFER_SIZE_BYTES", "100000")
|> Integer.parse()
# Can be generated with `Base.encode64(:crypto.strong_rand_bytes(32))` from

View File

@ -17,8 +17,8 @@ defmodule Plausible.Application do
{Finch, name: Plausible.Finch, pools: finch_pool_config()},
{Phoenix.PubSub, name: Plausible.PubSub},
Plausible.Session.Salts,
Plausible.Event.WriteBuffer,
Plausible.Session.WriteBuffer,
Supervisor.child_spec(Plausible.Event.WriteBuffer, id: Plausible.Event.WriteBuffer),
Supervisor.child_spec(Plausible.Session.WriteBuffer, id: Plausible.Session.WriteBuffer),
ReferrerBlocklist,
Supervisor.child_spec({Cachex, name: :user_agents, limit: 10_000, stats: true},
id: :cachex_user_agents

View File

@ -1,79 +1,38 @@
defmodule Plausible.Event.WriteBuffer do
use GenServer
require Logger
@moduledoc false
alias Plausible.IngestRepo
%{
header: header,
insert_sql: insert_sql,
insert_opts: insert_opts,
fields: fields,
encoding_types: encoding_types
} =
Plausible.Ingestion.WriteBuffer.compile_time_prepare(Plausible.ClickhouseEventV2)
def start_link(_opts) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def child_spec(opts) do
opts =
Keyword.merge(opts,
name: __MODULE__,
header: unquote(header),
insert_sql: unquote(insert_sql),
insert_opts: unquote(insert_opts)
)
def init(buffer) do
Process.flag(:trap_exit, true)
timer = Process.send_after(self(), :tick, flush_interval_ms())
{:ok, %{buffer: buffer, timer: timer}}
Plausible.Ingestion.WriteBuffer.child_spec(opts)
end
def insert(event) do
GenServer.cast(__MODULE__, {:insert, event})
row_binary =
[Enum.map(unquote(fields), fn field -> Map.fetch!(event, field) end)]
|> Ch.RowBinary._encode_rows(unquote(encoding_types))
|> IO.iodata_to_binary()
:ok = Plausible.Ingestion.WriteBuffer.insert(__MODULE__, row_binary)
{:ok, event}
end
def flush() do
GenServer.call(__MODULE__, :flush, :infinity)
:ok
end
def handle_cast({:insert, event}, %{buffer: buffer} = state) do
new_buffer = [event | buffer]
if length(new_buffer) >= max_buffer_size() do
Logger.info("Buffer full, flushing to disk")
Process.cancel_timer(state[:timer])
do_flush(new_buffer)
new_timer = Process.send_after(self(), :tick, flush_interval_ms())
{:noreply, %{buffer: [], timer: new_timer}}
else
{:noreply, %{state | buffer: new_buffer}}
end
end
def handle_info(:tick, %{buffer: buffer}) do
do_flush(buffer)
timer = Process.send_after(self(), :tick, flush_interval_ms())
{:noreply, %{buffer: [], timer: timer}}
end
def handle_call(:flush, _from, %{buffer: buffer} = state) do
Process.cancel_timer(state[:timer])
do_flush(buffer)
new_timer = Process.send_after(self(), :tick, flush_interval_ms())
{:reply, nil, %{buffer: [], timer: new_timer}}
end
def terminate(_reason, %{buffer: buffer}) do
Logger.info("Flushing event buffer before shutdown...")
do_flush(buffer)
end
defp do_flush(buffer) do
case buffer do
[] ->
nil
events ->
Logger.info("Flushing #{length(events)} events")
events = Enum.map(events, &(Map.from_struct(&1) |> Map.delete(:__meta__)))
IngestRepo.insert_all(Plausible.ClickhouseEventV2, events)
end
end
defp flush_interval_ms() do
Keyword.fetch!(Application.get_env(:plausible, IngestRepo), :flush_interval_ms)
end
defp max_buffer_size() do
Keyword.fetch!(Application.get_env(:plausible, IngestRepo), :max_buffer_size)
def flush do
Plausible.Ingestion.WriteBuffer.flush(__MODULE__)
end
end

View File

@ -0,0 +1,149 @@
defmodule Plausible.Ingestion.WriteBuffer do
@moduledoc false
use GenServer
require Logger
alias Plausible.IngestRepo
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: Keyword.fetch!(opts, :name))
end
def insert(server, row_binary) do
GenServer.cast(server, {:insert, row_binary})
end
def flush(server) do
GenServer.call(server, :flush, :infinity)
end
@impl true
def init(opts) do
buffer = opts[:buffer] || []
max_buffer_size = opts[:max_buffer_size] || default_max_buffer_size()
flush_interval_ms = opts[:flush_interval_ms] || default_flush_interval_ms()
Process.flag(:trap_exit, true)
timer = Process.send_after(self(), :tick, flush_interval_ms)
{:ok,
%{
buffer: buffer,
timer: timer,
name: Keyword.fetch!(opts, :name),
insert_sql: Keyword.fetch!(opts, :insert_sql),
insert_opts: Keyword.fetch!(opts, :insert_opts),
header: Keyword.fetch!(opts, :header),
buffer_size: IO.iodata_length(buffer),
max_buffer_size: max_buffer_size,
flush_interval_ms: flush_interval_ms
}}
end
@impl true
def handle_cast({:insert, row_binary}, state) do
state = %{
state
| buffer: [state.buffer | row_binary],
buffer_size: state.buffer_size + IO.iodata_length(row_binary)
}
if state.buffer_size >= state.max_buffer_size do
Logger.info("#{state.name} buffer full, flushing to ClickHouse")
Process.cancel_timer(state.timer)
do_flush(state)
new_timer = Process.send_after(self(), :tick, state.flush_interval_ms)
{:noreply, %{state | buffer: [], timer: new_timer, buffer_size: 0}}
else
{:noreply, state}
end
end
@impl true
def handle_info(:tick, state) do
do_flush(state)
timer = Process.send_after(self(), :tick, state.flush_interval_ms)
{:noreply, %{state | buffer: [], buffer_size: 0, timer: timer}}
end
@impl true
def handle_call(:flush, _from, state) do
%{timer: timer, flush_interval_ms: flush_interval_ms} = state
Process.cancel_timer(timer)
do_flush(state)
new_timer = Process.send_after(self(), :tick, flush_interval_ms)
{:reply, :ok, %{state | buffer: [], buffer_size: 0, timer: new_timer}}
end
@impl true
def terminate(_reason, %{name: name} = state) do
Logger.info("Flushing #{name} buffer before shutdown...")
do_flush(state)
end
defp do_flush(state) do
%{
buffer: buffer,
buffer_size: buffer_size,
insert_opts: insert_opts,
insert_sql: insert_sql,
header: header,
name: name
} = state
case buffer do
[] ->
nil
_not_empty ->
Logger.info("Flushing #{buffer_size} byte(s) RowBinary from #{name}")
IngestRepo.query!(insert_sql, [header | buffer], insert_opts)
end
end
defp default_flush_interval_ms do
Keyword.fetch!(Application.get_env(:plausible, IngestRepo), :flush_interval_ms)
end
defp default_max_buffer_size do
Keyword.fetch!(Application.get_env(:plausible, IngestRepo), :max_buffer_size)
end
@doc false
def compile_time_prepare(schema) do
fields = schema.__schema__(:fields)
types =
Enum.map(fields, fn field ->
type = schema.__schema__(:type, field) || raise "missing type for #{field}"
type
|> Ecto.Type.type()
|> Ecto.Adapters.ClickHouse.Schema.remap_type(schema, field)
end)
encoding_types = Ch.RowBinary.encoding_types(types)
header =
fields
|> Enum.map(&to_string/1)
|> Ch.RowBinary.encode_names_and_types(types)
|> IO.iodata_to_binary()
insert_sql = "INSERT INTO #{schema.__schema__(:source)} FORMAT RowBinaryWithNamesAndTypes"
%{
fields: fields,
types: types,
encoding_types: encoding_types,
header: header,
insert_sql: insert_sql,
insert_opts: [
command: :insert,
encode: false,
source: schema.__schema__(:source),
cast_params: []
]
}
end
end

View File

@ -8,7 +8,7 @@ defmodule Plausible.Session.CacheStore do
session =
if found_session do
updated_session = update_session(found_session, event)
buffer.insert([%{updated_session | sign: 1}, %{found_session | sign: -1}])
buffer.insert([%{found_session | sign: -1}, %{updated_session | sign: 1}])
persist_session(updated_session)
else
new_session = new_session_from_event(event)

View File

@ -1,83 +1,43 @@
defmodule Plausible.Session.WriteBuffer do
use GenServer
require Logger
@moduledoc false
alias Plausible.IngestRepo
%{
header: header,
insert_sql: insert_sql,
insert_opts: insert_opts,
fields: fields,
encoding_types: encoding_types
} =
Plausible.Ingestion.WriteBuffer.compile_time_prepare(Plausible.ClickhouseSessionV2)
def start_link(_opts) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def child_spec(opts) do
opts =
Keyword.merge(opts,
name: __MODULE__,
header: unquote(header),
insert_sql: unquote(insert_sql),
insert_opts: unquote(insert_opts)
)
def init(buffer) do
Process.flag(:trap_exit, true)
timer = Process.send_after(self(), :tick, flush_interval_ms())
{:ok, %{buffer: buffer, timer: timer}}
Plausible.Ingestion.WriteBuffer.child_spec(opts)
end
def insert(sessions) do
GenServer.cast(__MODULE__, {:insert, sessions})
row_binary =
sessions
|> Enum.map(fn %{is_bounce: is_bounce} = session ->
{:ok, is_bounce} = Plausible.ClickhouseSessionV2.BoolUInt8.dump(is_bounce)
session = %{session | is_bounce: is_bounce}
Enum.map(unquote(fields), fn field -> Map.fetch!(session, field) end)
end)
|> Ch.RowBinary._encode_rows(unquote(encoding_types))
|> IO.iodata_to_binary()
:ok = Plausible.Ingestion.WriteBuffer.insert(__MODULE__, row_binary)
{:ok, sessions}
end
def flush() do
GenServer.call(__MODULE__, :flush, :infinity)
:ok
end
def handle_cast({:insert, sessions}, %{buffer: buffer} = state) do
new_buffer = sessions ++ buffer
if length(new_buffer) >= max_buffer_size() do
Logger.info("Buffer full, flushing to disk")
Process.cancel_timer(state[:timer])
do_flush(new_buffer)
new_timer = Process.send_after(self(), :tick, flush_interval_ms())
{:noreply, %{buffer: [], timer: new_timer}}
else
{:noreply, %{state | buffer: new_buffer}}
end
end
def handle_info(:tick, %{buffer: buffer}) do
do_flush(buffer)
timer = Process.send_after(self(), :tick, flush_interval_ms())
{:noreply, %{buffer: [], timer: timer}}
end
def handle_call(:flush, _from, %{buffer: buffer} = state) do
Process.cancel_timer(state[:timer])
do_flush(buffer)
new_timer = Process.send_after(self(), :tick, flush_interval_ms())
{:reply, nil, %{buffer: [], timer: new_timer}}
end
def terminate(_reason, %{buffer: buffer}) do
Logger.info("Flushing session buffer before shutdown...")
do_flush(buffer)
end
defp do_flush(buffer) do
case buffer do
[] ->
nil
sessions ->
Logger.info("Flushing #{length(sessions)} sessions")
sessions =
sessions
|> Enum.map(&(Map.from_struct(&1) |> Map.delete(:__meta__)))
|> Enum.reverse()
IngestRepo.insert_all(Plausible.ClickhouseSessionV2, sessions)
end
end
defp flush_interval_ms() do
Keyword.fetch!(Application.get_env(:plausible, IngestRepo), :flush_interval_ms)
end
defp max_buffer_size() do
Keyword.fetch!(Application.get_env(:plausible, IngestRepo), :max_buffer_size)
def flush do
Plausible.Ingestion.WriteBuffer.flush(__MODULE__)
end
end

View File

@ -88,7 +88,7 @@ defmodule Plausible.Session.CacheStoreTest do
CacheStore.on_event(event1, nil, buffer)
CacheStore.on_event(event2, nil, buffer)
assert_receive({WriteBuffer, :insert, [[session, _negative_record]]})
assert_receive({WriteBuffer, :insert, [[_negative_record, session]]})
assert session.is_bounce == false
assert session.duration == 10
assert session.pageviews == 2
@ -113,7 +113,7 @@ defmodule Plausible.Session.CacheStoreTest do
CacheStore.on_event(event1, nil, buffer)
CacheStore.on_event(event2, nil, buffer)
assert_receive({WriteBuffer, :insert, [[session, _negative_record]]})
assert_receive({WriteBuffer, :insert, [[_negative_record, session]]})
assert session.duration == 10
end