diff --git a/.github/workflows/migrations-validation.yml b/.github/workflows/migrations-validation.yml index 2d20350b8..2046dc0d8 100644 --- a/.github/workflows/migrations-validation.yml +++ b/.github/workflows/migrations-validation.yml @@ -16,13 +16,17 @@ jobs: id: changes with: list-files: json + predicate-quantifier: 'every' filters: | lib: - 'lib/**' + - '!lib/plausible/data_migration/**' + extra: - 'extra/**' + config: - 'config/**' - - if: steps.changes.outputs.lib == 'true' + - if: steps.changes.outputs.lib == 'true' || steps.changes.outputs.extra == 'true' || steps.changes.outputs.config == 'true' run: | echo "::error file=${{ fromJSON(steps.changes.outputs.lib_files)[0] }}::Code and migrations shouldn't be changed at the same time" exit 1 diff --git a/lib/plausible/data_migration/acquisition_channel.ex b/lib/plausible/data_migration/acquisition_channel.ex index 6366aaeb3..34dbe2c5e 100644 --- a/lib/plausible/data_migration/acquisition_channel.ex +++ b/lib/plausible/data_migration/acquisition_channel.ex @@ -2,6 +2,15 @@ defmodule Plausible.DataMigration.AcquisitionChannel do @moduledoc """ Creates dictionaries and functions to calculate acquisition channel in ClickHouse + Creates `acquisition_channel` columns in `events_v2` and `sessions_v2` tables. + Run via `Plausible.DataMigration.AcquisitionChannel.run(options)` + + Options: + - `add_column` - creates the materialized column. Already done in a migration + - `update_column` - Updates the column definition to use new function definitions. Defaults to true. + Note that historical data is only updated if `backfill` is set to true or if it was never materialized. + - `backfill` - backfills the data for the column. Speeds up calculations on historical data. + SQL files available at: priv/data_migrations/AcquisitionChannel/sql """ use Plausible.DataMigration, dir: "AcquisitionChannel", repo: Plausible.IngestRepo @@ -11,19 +20,71 @@ defmodule Plausible.DataMigration.AcquisitionChannel do # In distributed environments, wait for insert to all temporary tables. insert_quorum = Plausible.IngestRepo.replica_count("sessions_v2") - run_sql_multi( - "acquisition_channel_functions", - [ - on_cluster_statement: on_cluster_statement, - dictionary_connection_params: Plausible.MigrationUtils.dictionary_connection_params(), - insert_quorum: insert_quorum - ], - params: %{ - "source_categories" => - Plausible.Ingestion.Acquisition.source_categories() |> Map.to_list(), - "paid_sources" => Plausible.Ingestion.Source.paid_sources() - }, - quiet: Keyword.get(opts, :quiet, false) - ) + :ok = + run_sql_multi( + "acquisition_channel_functions", + [ + on_cluster_statement: on_cluster_statement, + dictionary_connection_params: Plausible.MigrationUtils.dictionary_connection_params(), + insert_quorum: insert_quorum + ], + params: %{ + "source_categories" => + Plausible.Ingestion.Acquisition.source_categories() |> Map.to_list(), + "paid_sources" => Plausible.Ingestion.Source.paid_sources() + }, + quiet: Keyword.get(opts, :quiet, false) + ) + + cond do + Keyword.get(opts, :add_column) -> + alter_data_tables( + "acquisition_channel_add_materialized_column", + on_cluster_statement, + opts + ) + + Keyword.get(opts, :update_column, true) -> + alter_data_tables( + "acquisition_channel_update_materialized_column", + on_cluster_statement, + opts + ) + + true -> + nil + end + + if Keyword.get(opts, :backfill) do + alter_data_tables( + "acquisition_channel_backfill_materialized_column", + on_cluster_statement, + opts + ) + end + + :ok + end + + defp alter_data_tables(sql_name, on_cluster_statement, opts) do + {:ok, _} = + run_sql( + sql_name, + [ + table: "events_v2", + on_cluster_statement: on_cluster_statement + ], + quiet: Keyword.get(opts, :quiet, false) + ) + + {:ok, _} = + run_sql( + sql_name, + [ + table: "sessions_v2", + on_cluster_statement: on_cluster_statement + ], + quiet: Keyword.get(opts, :quiet, false) + ) end end diff --git a/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_add_materialized_column.sql.eex b/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_add_materialized_column.sql.eex new file mode 100644 index 000000000..47688db5f --- /dev/null +++ b/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_add_materialized_column.sql.eex @@ -0,0 +1,4 @@ +ALTER TABLE <%= @table %> +<%= @on_cluster_statement %> +ADD COLUMN IF NOT EXISTS acquisition_channel LowCardinality(String) +MATERIALIZED acquisition_channel(referrer_source, utm_medium, utm_campaign, utm_source, click_id_param) diff --git a/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_backfill_materialized_column.sql.eex b/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_backfill_materialized_column.sql.eex new file mode 100644 index 000000000..2b087cdc9 --- /dev/null +++ b/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_backfill_materialized_column.sql.eex @@ -0,0 +1,2 @@ +ALTER TABLE <%= @table %> +MATERIALIZE COLUMN acquisition_channel diff --git a/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_functions.sql.eex b/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_functions.sql.eex index f324edf4d..72e97def4 100644 --- a/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_functions.sql.eex +++ b/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_functions.sql.eex @@ -11,7 +11,7 @@ ENGINE = MergeTree() <% end %> ORDER BY referrer_source; -TRUNCATE TABLE acquisition_channel_source_category SETTINGS alter_sync=2; +TRUNCATE TABLE acquisition_channel_source_category <%= @on_cluster_statement %>; INSERT INTO acquisition_channel_source_category(referrer_source, category) SELECT t.1 AS referrer_source, t.2 AS category @@ -43,7 +43,7 @@ ENGINE = MergeTree() <% end %> ORDER BY referrer_source; -TRUNCATE TABLE acquisition_channel_paid_sources SETTINGS alter_sync=2; +TRUNCATE TABLE acquisition_channel_paid_sources <%= @on_cluster_statement %>; INSERT INTO acquisition_channel_paid_sources(referrer_source) SELECT arrayJoin({paid_sources:Array(String)}) AS referrer_source diff --git a/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_update_materialized_column.sql.eex b/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_update_materialized_column.sql.eex new file mode 100644 index 000000000..1fd8027f6 --- /dev/null +++ b/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_update_materialized_column.sql.eex @@ -0,0 +1,4 @@ +ALTER TABLE <%= @table %> +<%= @on_cluster_statement %> +MODIFY COLUMN acquisition_channel LowCardinality(String) +MATERIALIZED acquisition_channel(referrer_source, utm_medium, utm_campaign, utm_source, click_id_param) diff --git a/priv/ingest_repo/migrations/20241111084056_create_acquisition_channel_column.exs b/priv/ingest_repo/migrations/20241111084056_create_acquisition_channel_column.exs new file mode 100644 index 000000000..4e3bdc017 --- /dev/null +++ b/priv/ingest_repo/migrations/20241111084056_create_acquisition_channel_column.exs @@ -0,0 +1,11 @@ +defmodule Plausible.IngestRepo.Migrations.CreateAcquisitionChannelColumn do + use Ecto.Migration + + def up do + Plausible.DataMigration.AcquisitionChannel.run(add_column: true, backfill: false) + end + + def down do + raise "irreversible" + end +end