Channels: Fix ON CLUSTER behavior (#4801)

* Channels: Fix cluster behavior

CREATE TABLE AS SELECT syntax did not work on cluster.

Instead, let's do a normal insert. For safety and to avoid timing
issues, ensure that INSERT waits for data to be inserted on all active
replicas.

* Proper replicated tables
This commit is contained in:
Karl-Aksel Puulmann 2024-11-11 21:59:16 +02:00 committed by GitHub
parent b22b35793c
commit 3759db9b8c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 45 additions and 15 deletions

View File

@ -8,12 +8,15 @@ defmodule Plausible.DataMigration.AcquisitionChannel do
def run(opts \\ []) do def run(opts \\ []) do
on_cluster_statement = Plausible.MigrationUtils.on_cluster_statement("sessions_v2") on_cluster_statement = Plausible.MigrationUtils.on_cluster_statement("sessions_v2")
# In distributed environments, wait for insert to all temporary tables.
insert_quorum = Plausible.IngestRepo.replica_count("sessions_v2")
run_sql_multi( run_sql_multi(
"acquisition_channel_functions", "acquisition_channel_functions",
[ [
on_cluster_statement: on_cluster_statement, on_cluster_statement: on_cluster_statement,
dictionary_connection_params: Plausible.MigrationUtils.dictionary_connection_params() dictionary_connection_params: Plausible.MigrationUtils.dictionary_connection_params(),
insert_quorum: insert_quorum
], ],
params: %{ params: %{
"source_categories" => "source_categories" =>

View File

@ -16,9 +16,13 @@ defmodule Plausible.IngestRepo do
end end
def clustered_table?(table) do def clustered_table?(table) do
case query("SELECT 1 FROM system.replicas WHERE table = '#{table}'") do replica_count(table) > 1
{:ok, %{rows: []}} -> false end
{:ok, _} -> true
end def replica_count(table) do
{:ok, %{rows: [[count]]}} =
query("SELECT sum(active_replicas) FROM system.replicas WHERE table = '#{table}'")
count
end end
end end

View File

@ -1,12 +1,24 @@
CREATE OR REPLACE TABLE acquisition_channel_source_category(referrer_source String, category LowCardinality(String)) CREATE TABLE IF NOT EXISTS acquisition_channel_source_category
<%= @on_cluster_statement %> <%= @on_cluster_statement %>
Engine = MergeTree() (
ORDER BY referrer_source referrer_source String,
AS category LowCardinality(String)
)
<%= if @on_cluster_statement != "" do %>
ENGINE = ReplicatedMergeTree('/clickhouse/{cluster}/tables/{shard}/plausible_prod/acquisition_channel_source_category', '{replica}')
<% else %>
ENGINE = MergeTree()
<% end %>
ORDER BY referrer_source;
TRUNCATE TABLE acquisition_channel_source_category SETTINGS alter_sync=2;
INSERT INTO acquisition_channel_source_category(referrer_source, category)
SELECT t.1 AS referrer_source, t.2 AS category SELECT t.1 AS referrer_source, t.2 AS category
FROM ( FROM (
SELECT arrayJoin({source_categories:Array(Tuple(String, String))}) AS t SELECT arrayJoin({source_categories:Array(Tuple(String, String))}) AS t
); )
SETTINGS insert_quorum = <%= @insert_quorum %>;
CREATE OR REPLACE DICTIONARY acquisition_channel_source_category_dict CREATE OR REPLACE DICTIONARY acquisition_channel_source_category_dict
<%= @on_cluster_statement %> <%= @on_cluster_statement %>
@ -19,12 +31,23 @@ SOURCE(CLICKHOUSE(TABLE acquisition_channel_source_category <%= @dictionary_conn
LIFETIME(0) LIFETIME(0)
LAYOUT(hashed()); LAYOUT(hashed());
CREATE OR REPLACE TABLE acquisition_channel_paid_sources(referrer_source String) CREATE TABLE IF NOT EXISTS acquisition_channel_paid_sources
<%= @on_cluster_statement %> <%= @on_cluster_statement %>
Engine = MergeTree() (
ORDER BY referrer_source referrer_source String
AS )
SELECT arrayJoin({paid_sources:Array(String)}) AS referrer_source; <%= if @on_cluster_statement != "" do %>
ENGINE = ReplicatedMergeTree('/clickhouse/{cluster}/tables/{shard}/plausible_prod/acquisition_channel_paid_sources', '{replica}')
<% else %>
ENGINE = MergeTree()
<% end %>
ORDER BY referrer_source;
TRUNCATE TABLE acquisition_channel_paid_sources SETTINGS alter_sync=2;
INSERT INTO acquisition_channel_paid_sources(referrer_source)
SELECT arrayJoin({paid_sources:Array(String)}) AS referrer_source
SETTINGS insert_quorum = <%= @insert_quorum %>;
CREATE OR REPLACE DICTIONARY acquisition_channel_paid_sources_dict CREATE OR REPLACE DICTIONARY acquisition_channel_paid_sources_dict
<%= @on_cluster_statement %> <%= @on_cluster_statement %>