Channels: Migration to add materialized column, backfill code (#4798)

* Channels: Migration to add column, backfill code

This change adds `acqusition_channel` columns to events_v2 and
sessions_v2 tables. These columns are materialized - we don't ingest
into them directly. Instead they're calculated based on other columns.

The data migration changes now allow to also backfill the column.

Tested the ability to change definitions by changing the function
definitions and re-running the migration with backfill. Confirmed that
the underlying data changed as expected.

* quiet option

* Exclude data migrations from validation

* Migration consistency
This commit is contained in:
Karl-Aksel Puulmann 2024-11-12 08:41:34 +02:00 committed by GitHub
parent 3759db9b8c
commit 4aa7dec301
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 103 additions and 17 deletions

View File

@ -16,13 +16,17 @@ jobs:
id: changes
with:
list-files: json
predicate-quantifier: 'every'
filters: |
lib:
- 'lib/**'
- '!lib/plausible/data_migration/**'
extra:
- 'extra/**'
config:
- 'config/**'
- if: steps.changes.outputs.lib == 'true'
- if: steps.changes.outputs.lib == 'true' || steps.changes.outputs.extra == 'true' || steps.changes.outputs.config == 'true'
run: |
echo "::error file=${{ fromJSON(steps.changes.outputs.lib_files)[0] }}::Code and migrations shouldn't be changed at the same time"
exit 1

View File

@ -2,6 +2,15 @@ defmodule Plausible.DataMigration.AcquisitionChannel do
@moduledoc """
Creates dictionaries and functions to calculate acquisition channel in ClickHouse
Creates `acquisition_channel` columns in `events_v2` and `sessions_v2` tables.
Run via `Plausible.DataMigration.AcquisitionChannel.run(options)`
Options:
- `add_column` - creates the materialized column. Already done in a migration
- `update_column` - Updates the column definition to use new function definitions. Defaults to true.
Note that historical data is only updated if `backfill` is set to true or if it was never materialized.
- `backfill` - backfills the data for the column. Speeds up calculations on historical data.
SQL files available at: priv/data_migrations/AcquisitionChannel/sql
"""
use Plausible.DataMigration, dir: "AcquisitionChannel", repo: Plausible.IngestRepo
@ -11,19 +20,71 @@ defmodule Plausible.DataMigration.AcquisitionChannel do
# In distributed environments, wait for insert to all temporary tables.
insert_quorum = Plausible.IngestRepo.replica_count("sessions_v2")
run_sql_multi(
"acquisition_channel_functions",
[
on_cluster_statement: on_cluster_statement,
dictionary_connection_params: Plausible.MigrationUtils.dictionary_connection_params(),
insert_quorum: insert_quorum
],
params: %{
"source_categories" =>
Plausible.Ingestion.Acquisition.source_categories() |> Map.to_list(),
"paid_sources" => Plausible.Ingestion.Source.paid_sources()
},
quiet: Keyword.get(opts, :quiet, false)
)
:ok =
run_sql_multi(
"acquisition_channel_functions",
[
on_cluster_statement: on_cluster_statement,
dictionary_connection_params: Plausible.MigrationUtils.dictionary_connection_params(),
insert_quorum: insert_quorum
],
params: %{
"source_categories" =>
Plausible.Ingestion.Acquisition.source_categories() |> Map.to_list(),
"paid_sources" => Plausible.Ingestion.Source.paid_sources()
},
quiet: Keyword.get(opts, :quiet, false)
)
cond do
Keyword.get(opts, :add_column) ->
alter_data_tables(
"acquisition_channel_add_materialized_column",
on_cluster_statement,
opts
)
Keyword.get(opts, :update_column, true) ->
alter_data_tables(
"acquisition_channel_update_materialized_column",
on_cluster_statement,
opts
)
true ->
nil
end
if Keyword.get(opts, :backfill) do
alter_data_tables(
"acquisition_channel_backfill_materialized_column",
on_cluster_statement,
opts
)
end
:ok
end
defp alter_data_tables(sql_name, on_cluster_statement, opts) do
{:ok, _} =
run_sql(
sql_name,
[
table: "events_v2",
on_cluster_statement: on_cluster_statement
],
quiet: Keyword.get(opts, :quiet, false)
)
{:ok, _} =
run_sql(
sql_name,
[
table: "sessions_v2",
on_cluster_statement: on_cluster_statement
],
quiet: Keyword.get(opts, :quiet, false)
)
end
end

View File

@ -0,0 +1,4 @@
ALTER TABLE <%= @table %>
<%= @on_cluster_statement %>
ADD COLUMN IF NOT EXISTS acquisition_channel LowCardinality(String)
MATERIALIZED acquisition_channel(referrer_source, utm_medium, utm_campaign, utm_source, click_id_param)

View File

@ -0,0 +1,2 @@
ALTER TABLE <%= @table %>
MATERIALIZE COLUMN acquisition_channel

View File

@ -11,7 +11,7 @@ ENGINE = MergeTree()
<% end %>
ORDER BY referrer_source;
TRUNCATE TABLE acquisition_channel_source_category SETTINGS alter_sync=2;
TRUNCATE TABLE acquisition_channel_source_category <%= @on_cluster_statement %>;
INSERT INTO acquisition_channel_source_category(referrer_source, category)
SELECT t.1 AS referrer_source, t.2 AS category
@ -43,7 +43,7 @@ ENGINE = MergeTree()
<% end %>
ORDER BY referrer_source;
TRUNCATE TABLE acquisition_channel_paid_sources SETTINGS alter_sync=2;
TRUNCATE TABLE acquisition_channel_paid_sources <%= @on_cluster_statement %>;
INSERT INTO acquisition_channel_paid_sources(referrer_source)
SELECT arrayJoin({paid_sources:Array(String)}) AS referrer_source

View File

@ -0,0 +1,4 @@
ALTER TABLE <%= @table %>
<%= @on_cluster_statement %>
MODIFY COLUMN acquisition_channel LowCardinality(String)
MATERIALIZED acquisition_channel(referrer_source, utm_medium, utm_campaign, utm_source, click_id_param)

View File

@ -0,0 +1,11 @@
defmodule Plausible.IngestRepo.Migrations.CreateAcquisitionChannelColumn do
use Ecto.Migration
def up do
Plausible.DataMigration.AcquisitionChannel.run(add_column: true, backfill: false)
end
def down do
raise "irreversible"
end
end