Channels: Speed up clickhouse calculations (#4789)

* Fix interpolation in data_migration.ex

* Speed up calculating acquisition_channel in clickhouse

The previous `has` queries proved to be problematic and causing a lot of
CPU overhead.

Benchmarked via this query:

```sql
SELECT
  channel,
  count(),
  countIf(acquisition_channel(referrer_source, utm_medium, utm_campaign, utm_source, click_id_param) = channel) AS matches
FROM events_v2
WHERE timestamp > now() - toIntervalHour(48)
GROUP BY channel
ORDER BY count() desc
```

Before this fix:
```
query_duration_ms:                                                57960
DiskReadElapsedMs:                                                374.712
RealTimeMs:                                                       2891200.667
UserTimeMs:                                                       2704024.783
SystemTimeMs:                                                     1693.265
OSCPUWaitMs:                                                      90.253
OSCPUVirtualTimeMs:                                               2705709.58
```

After this fix:
```
query_duration_ms:                                                4367
DiskReadElapsedMs:                                                454.356
RealTimeMs:                                                       213892.207
UserTimeMs:                                                       199363.485
SystemTimeMs:                                                     1479.364
OSCPUWaitMs:                                                      13.739
OSCPUVirtualTimeMs:                                               200837.37
```

Note that the new tables are not tracked in our schema as usual as
they're pretty much temporary tables to create the dictionary without
needing to upload files to clickhouse servers.

* CREATE OR REPLACE table with SELECT
This commit is contained in:
Karl-Aksel Puulmann 2024-11-11 12:39:51 +02:00 committed by GitHub
parent 98bc3e7554
commit d620432227
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 62 additions and 27 deletions

View File

@ -6,7 +6,18 @@ defmodule Mix.Tasks.CleanClickhouse do
def run(_) do def run(_) do
%{rows: rows} = IngestRepo.query!("show tables") %{rows: rows} = IngestRepo.query!("show tables")
tables = Enum.map(rows, fn [table] -> table end) tables = Enum.map(rows, fn [table] -> table end)
to_truncate = tables -- ["schema_migrations", "location_data", "location_data_dict"]
to_truncate =
tables --
[
"schema_migrations",
"location_data",
"location_data_dict",
"acquisition_channel_source_category",
"acquisition_channel_source_category_dict",
"acquisition_channel_paid_sources",
"acquisition_channel_paid_sources_dict"
]
Enum.each(to_truncate, fn table -> Enum.each(to_truncate, fn table ->
IngestRepo.query!("truncate #{table}") IngestRepo.query!("truncate #{table}")

View File

@ -84,7 +84,7 @@ defmodule Plausible.DataMigration do
|> String.split(";", trim: true) |> String.split(";", trim: true)
|> Enum.with_index(1) |> Enum.with_index(1)
|> Enum.reduce_while(:ok, fn {query, index}, _ -> |> Enum.reduce_while(:ok, fn {query, index}, _ ->
case do_run("name-#{index}", query, options) do case do_run("#{name}-#{index}", query, options) do
{:ok, _} -> {:cont, :ok} {:ok, _} -> {:cont, :ok}
error -> {:halt, error} error -> {:halt, error}
end end

View File

@ -1,40 +1,26 @@
defmodule Plausible.DataMigration.AcquisitionChannel do defmodule Plausible.DataMigration.AcquisitionChannel do
@moduledoc """ @moduledoc """
Creates functions to calculate acquisition channel in ClickHouse Creates dictionaries and functions to calculate acquisition channel in ClickHouse
SQL files available at: priv/data_migrations/AcquisitionChannel/sql SQL files available at: priv/data_migrations/AcquisitionChannel/sql
""" """
use Plausible.DataMigration, dir: "AcquisitionChannel", repo: Plausible.IngestRepo use Plausible.DataMigration, dir: "AcquisitionChannel", repo: Plausible.IngestRepo
def run(opts \\ []) do def run(opts \\ []) do
source_categories =
Plausible.Ingestion.Acquisition.source_categories()
|> invert_map()
on_cluster_statement = Plausible.MigrationUtils.on_cluster_statement("sessions_v2") on_cluster_statement = Plausible.MigrationUtils.on_cluster_statement("sessions_v2")
run_sql_multi( run_sql_multi(
"acquisition_channel_functions", "acquisition_channel_functions",
[ [
on_cluster_statement: on_cluster_statement on_cluster_statement: on_cluster_statement,
dictionary_connection_params: Plausible.MigrationUtils.dictionary_connection_params()
], ],
params: %{ params: %{
"source_category_shopping" => source_categories["SOURCE_CATEGORY_SHOPPING"], "source_categories" =>
"source_category_social" => source_categories["SOURCE_CATEGORY_SOCIAL"], Plausible.Ingestion.Acquisition.source_categories() |> Map.to_list(),
"source_category_video" => source_categories["SOURCE_CATEGORY_VIDEO"],
"source_category_search" => source_categories["SOURCE_CATEGORY_SEARCH"],
"source_category_email" => source_categories["SOURCE_CATEGORY_EMAIL"],
"paid_sources" => Plausible.Ingestion.Source.paid_sources() "paid_sources" => Plausible.Ingestion.Source.paid_sources()
}, },
quiet: Keyword.get(opts, :quiet, false) quiet: Keyword.get(opts, :quiet, false)
) )
end end
defp invert_map(source_categories) do
source_categories
|> Enum.group_by(
fn {_source, category} -> category end,
fn {source, _category} -> source end
)
end
end end

View File

@ -1,26 +1,64 @@
CREATE OR REPLACE TABLE acquisition_channel_source_category(referrer_source String, category LowCardinality(String))
<%= @on_cluster_statement %>
Engine = MergeTree()
ORDER BY referrer_source
AS
SELECT t.1 AS referrer_source, t.2 AS category
FROM (
SELECT arrayJoin({source_categories:Array(Tuple(String, String))}) AS t
);
CREATE OR REPLACE DICTIONARY acquisition_channel_source_category_dict
<%= @on_cluster_statement %>
(
`referrer_source` String,
`category` String
)
PRIMARY KEY referrer_source
SOURCE(CLICKHOUSE(TABLE acquisition_channel_source_category <%= @dictionary_connection_params %>))
LIFETIME(0)
LAYOUT(hashed());
CREATE OR REPLACE TABLE acquisition_channel_paid_sources(referrer_source String)
<%= @on_cluster_statement %>
Engine = MergeTree()
ORDER BY referrer_source
AS
SELECT arrayJoin({paid_sources:Array(String)}) AS referrer_source;
CREATE OR REPLACE DICTIONARY acquisition_channel_paid_sources_dict
<%= @on_cluster_statement %>
(
`referrer_source` String
)
PRIMARY KEY referrer_source
SOURCE(CLICKHOUSE(TABLE acquisition_channel_paid_sources <%= @dictionary_connection_params %>))
LIFETIME(0)
LAYOUT(hashed());
CREATE OR REPLACE FUNCTION acquisition_channel_has_category_shopping <%= @on_cluster_statement %> AS CREATE OR REPLACE FUNCTION acquisition_channel_has_category_shopping <%= @on_cluster_statement %> AS
(referrer_source) -> (referrer_source) ->
has({source_category_shopping:Array(String)}, referrer_source); dictGet('acquisition_channel_source_category_dict', 'category', referrer_source) = 'SOURCE_CATEGORY_SHOPPING';
CREATE OR REPLACE FUNCTION acquisition_channel_has_category_social <%= @on_cluster_statement %> AS CREATE OR REPLACE FUNCTION acquisition_channel_has_category_social <%= @on_cluster_statement %> AS
(referrer_source) -> (referrer_source) ->
has({source_category_social:Array(String)}, referrer_source); dictGet('acquisition_channel_source_category_dict', 'category', referrer_source) = 'SOURCE_CATEGORY_SOCIAL';
CREATE OR REPLACE FUNCTION acquisition_channel_has_category_video <%= @on_cluster_statement %> AS CREATE OR REPLACE FUNCTION acquisition_channel_has_category_video <%= @on_cluster_statement %> AS
(referrer_source) -> (referrer_source) ->
has({source_category_video:Array(String)}, referrer_source); dictGet('acquisition_channel_source_category_dict', 'category', referrer_source) = 'SOURCE_CATEGORY_VIDEO';
CREATE OR REPLACE FUNCTION acquisition_channel_has_category_search <%= @on_cluster_statement %> AS CREATE OR REPLACE FUNCTION acquisition_channel_has_category_search <%= @on_cluster_statement %> AS
(referrer_source) -> (referrer_source) ->
has({source_category_search:Array(String)}, referrer_source); dictGet('acquisition_channel_source_category_dict', 'category', referrer_source) = 'SOURCE_CATEGORY_SEARCH';
CREATE OR REPLACE FUNCTION acquisition_channel_has_category_email <%= @on_cluster_statement %> AS CREATE OR REPLACE FUNCTION acquisition_channel_has_category_email <%= @on_cluster_statement %> AS
(referrer_source) -> (referrer_source) ->
has({source_category_email:Array(String)}, referrer_source); dictGet('acquisition_channel_source_category_dict', 'category', referrer_source) = 'SOURCE_CATEGORY_EMAIL';
CREATE OR REPLACE FUNCTION acquisition_channel_paid_utm_source <%= @on_cluster_statement %> AS CREATE OR REPLACE FUNCTION acquisition_channel_paid_utm_source <%= @on_cluster_statement %> AS
(referrer_source) -> (referrer_source) ->
has({paid_sources:Array(String)}, referrer_source); dictHas('acquisition_channel_paid_sources_dict', referrer_source);
CREATE OR REPLACE FUNCTION acquisition_channel_cross_network <%= @on_cluster_statement %> AS CREATE OR REPLACE FUNCTION acquisition_channel_cross_network <%= @on_cluster_statement %> AS
(utm_campaign) -> (utm_campaign) ->