From 4aa7dec301614d4418b8f213914828e17a5ad22d Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Tue, 12 Nov 2024 08:41:34 +0200 Subject: [PATCH] Channels: Migration to add materialized column, backfill code (#4798) * Channels: Migration to add column, backfill code This change adds `acqusition_channel` columns to events_v2 and sessions_v2 tables. These columns are materialized - we don't ingest into them directly. Instead they're calculated based on other columns. The data migration changes now allow to also backfill the column. Tested the ability to change definitions by changing the function definitions and re-running the migration with backfill. Confirmed that the underlying data changed as expected. * quiet option * Exclude data migrations from validation * Migration consistency --- .github/workflows/migrations-validation.yml | 6 +- .../data_migration/acquisition_channel.ex | 89 ++++++++++++++++--- ...on_channel_add_materialized_column.sql.eex | 4 + ...annel_backfill_materialized_column.sql.eex | 2 + .../sql/acquisition_channel_functions.sql.eex | 4 +- ...channel_update_materialized_column.sql.eex | 4 + ...4056_create_acquisition_channel_column.exs | 11 +++ 7 files changed, 103 insertions(+), 17 deletions(-) create mode 100644 priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_add_materialized_column.sql.eex create mode 100644 priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_backfill_materialized_column.sql.eex create mode 100644 priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_update_materialized_column.sql.eex create mode 100644 priv/ingest_repo/migrations/20241111084056_create_acquisition_channel_column.exs diff --git a/.github/workflows/migrations-validation.yml b/.github/workflows/migrations-validation.yml index 2d20350b8..2046dc0d8 100644 --- a/.github/workflows/migrations-validation.yml +++ b/.github/workflows/migrations-validation.yml @@ -16,13 +16,17 @@ jobs: id: changes with: list-files: json + predicate-quantifier: 'every' filters: | lib: - 'lib/**' + - '!lib/plausible/data_migration/**' + extra: - 'extra/**' + config: - 'config/**' - - if: steps.changes.outputs.lib == 'true' + - if: steps.changes.outputs.lib == 'true' || steps.changes.outputs.extra == 'true' || steps.changes.outputs.config == 'true' run: | echo "::error file=${{ fromJSON(steps.changes.outputs.lib_files)[0] }}::Code and migrations shouldn't be changed at the same time" exit 1 diff --git a/lib/plausible/data_migration/acquisition_channel.ex b/lib/plausible/data_migration/acquisition_channel.ex index 6366aaeb3..34dbe2c5e 100644 --- a/lib/plausible/data_migration/acquisition_channel.ex +++ b/lib/plausible/data_migration/acquisition_channel.ex @@ -2,6 +2,15 @@ defmodule Plausible.DataMigration.AcquisitionChannel do @moduledoc """ Creates dictionaries and functions to calculate acquisition channel in ClickHouse + Creates `acquisition_channel` columns in `events_v2` and `sessions_v2` tables. + Run via `Plausible.DataMigration.AcquisitionChannel.run(options)` + + Options: + - `add_column` - creates the materialized column. Already done in a migration + - `update_column` - Updates the column definition to use new function definitions. Defaults to true. + Note that historical data is only updated if `backfill` is set to true or if it was never materialized. + - `backfill` - backfills the data for the column. Speeds up calculations on historical data. + SQL files available at: priv/data_migrations/AcquisitionChannel/sql """ use Plausible.DataMigration, dir: "AcquisitionChannel", repo: Plausible.IngestRepo @@ -11,19 +20,71 @@ defmodule Plausible.DataMigration.AcquisitionChannel do # In distributed environments, wait for insert to all temporary tables. insert_quorum = Plausible.IngestRepo.replica_count("sessions_v2") - run_sql_multi( - "acquisition_channel_functions", - [ - on_cluster_statement: on_cluster_statement, - dictionary_connection_params: Plausible.MigrationUtils.dictionary_connection_params(), - insert_quorum: insert_quorum - ], - params: %{ - "source_categories" => - Plausible.Ingestion.Acquisition.source_categories() |> Map.to_list(), - "paid_sources" => Plausible.Ingestion.Source.paid_sources() - }, - quiet: Keyword.get(opts, :quiet, false) - ) + :ok = + run_sql_multi( + "acquisition_channel_functions", + [ + on_cluster_statement: on_cluster_statement, + dictionary_connection_params: Plausible.MigrationUtils.dictionary_connection_params(), + insert_quorum: insert_quorum + ], + params: %{ + "source_categories" => + Plausible.Ingestion.Acquisition.source_categories() |> Map.to_list(), + "paid_sources" => Plausible.Ingestion.Source.paid_sources() + }, + quiet: Keyword.get(opts, :quiet, false) + ) + + cond do + Keyword.get(opts, :add_column) -> + alter_data_tables( + "acquisition_channel_add_materialized_column", + on_cluster_statement, + opts + ) + + Keyword.get(opts, :update_column, true) -> + alter_data_tables( + "acquisition_channel_update_materialized_column", + on_cluster_statement, + opts + ) + + true -> + nil + end + + if Keyword.get(opts, :backfill) do + alter_data_tables( + "acquisition_channel_backfill_materialized_column", + on_cluster_statement, + opts + ) + end + + :ok + end + + defp alter_data_tables(sql_name, on_cluster_statement, opts) do + {:ok, _} = + run_sql( + sql_name, + [ + table: "events_v2", + on_cluster_statement: on_cluster_statement + ], + quiet: Keyword.get(opts, :quiet, false) + ) + + {:ok, _} = + run_sql( + sql_name, + [ + table: "sessions_v2", + on_cluster_statement: on_cluster_statement + ], + quiet: Keyword.get(opts, :quiet, false) + ) end end diff --git a/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_add_materialized_column.sql.eex b/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_add_materialized_column.sql.eex new file mode 100644 index 000000000..47688db5f --- /dev/null +++ b/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_add_materialized_column.sql.eex @@ -0,0 +1,4 @@ +ALTER TABLE <%= @table %> +<%= @on_cluster_statement %> +ADD COLUMN IF NOT EXISTS acquisition_channel LowCardinality(String) +MATERIALIZED acquisition_channel(referrer_source, utm_medium, utm_campaign, utm_source, click_id_param) diff --git a/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_backfill_materialized_column.sql.eex b/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_backfill_materialized_column.sql.eex new file mode 100644 index 000000000..2b087cdc9 --- /dev/null +++ b/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_backfill_materialized_column.sql.eex @@ -0,0 +1,2 @@ +ALTER TABLE <%= @table %> +MATERIALIZE COLUMN acquisition_channel diff --git a/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_functions.sql.eex b/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_functions.sql.eex index f324edf4d..72e97def4 100644 --- a/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_functions.sql.eex +++ b/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_functions.sql.eex @@ -11,7 +11,7 @@ ENGINE = MergeTree() <% end %> ORDER BY referrer_source; -TRUNCATE TABLE acquisition_channel_source_category SETTINGS alter_sync=2; +TRUNCATE TABLE acquisition_channel_source_category <%= @on_cluster_statement %>; INSERT INTO acquisition_channel_source_category(referrer_source, category) SELECT t.1 AS referrer_source, t.2 AS category @@ -43,7 +43,7 @@ ENGINE = MergeTree() <% end %> ORDER BY referrer_source; -TRUNCATE TABLE acquisition_channel_paid_sources SETTINGS alter_sync=2; +TRUNCATE TABLE acquisition_channel_paid_sources <%= @on_cluster_statement %>; INSERT INTO acquisition_channel_paid_sources(referrer_source) SELECT arrayJoin({paid_sources:Array(String)}) AS referrer_source diff --git a/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_update_materialized_column.sql.eex b/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_update_materialized_column.sql.eex new file mode 100644 index 000000000..1fd8027f6 --- /dev/null +++ b/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_update_materialized_column.sql.eex @@ -0,0 +1,4 @@ +ALTER TABLE <%= @table %> +<%= @on_cluster_statement %> +MODIFY COLUMN acquisition_channel LowCardinality(String) +MATERIALIZED acquisition_channel(referrer_source, utm_medium, utm_campaign, utm_source, click_id_param) diff --git a/priv/ingest_repo/migrations/20241111084056_create_acquisition_channel_column.exs b/priv/ingest_repo/migrations/20241111084056_create_acquisition_channel_column.exs new file mode 100644 index 000000000..4e3bdc017 --- /dev/null +++ b/priv/ingest_repo/migrations/20241111084056_create_acquisition_channel_column.exs @@ -0,0 +1,11 @@ +defmodule Plausible.IngestRepo.Migrations.CreateAcquisitionChannelColumn do + use Ecto.Migration + + def up do + Plausible.DataMigration.AcquisitionChannel.run(add_column: true, backfill: false) + end + + def down do + raise "irreversible" + end +end