Mutation to populate event session columns (#3844)

* WIP mutation to populate event session columns

* Remove duplication

* report errors, allow_nondeterministic_updates

* use right columns

* Update existing columns instead of session_* ones

* Make dialyzer happy

* Fix issue with passing pre-existing params in

* Logger -> IO.puts

* Use IngestRepo.config for connection settings

* Make dictionary options configurable

* Move allow_nondeterministic_mutations to within the migration

* Solve credo warning about too deep nesting

* Missed logger call

* Pattern matching in function head
This commit is contained in:
Karl-Aksel Puulmann 2024-03-08 09:27:24 +02:00 committed by GitHub
parent 0cdba7d407
commit 26d41ddbb9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 274 additions and 4 deletions

View File

@ -62,13 +62,13 @@ defmodule Plausible.DataMigration do
|> EEx.eval_file(assigns: assigns)
end
def run_sql(name, assigns \\ []) do
def run_sql(name, assigns \\ [], options \\ []) do
query = unwrap(name, assigns)
do_run(name, query)
do_run(name, query, options)
end
defp do_run(name, query) do
case @repo.query(query, [], timeout: :infinity) do
defp do_run(name, query, options \\ []) do
case @repo.query(query, [], [timeout: :infinity] ++ options) do
{:ok, res} ->
IO.puts(" #{IO.ANSI.yellow()}#{name} #{IO.ANSI.green()}Done!#{IO.ANSI.reset()}\n")
IO.puts(String.duplicate("-", 78))

View File

@ -0,0 +1,166 @@
defmodule Plausible.DataMigration.PopulateEventSessionColumns do
@moduledoc """
Populates event session columns with data from sessions table.
Run via: ./bin/plausible rpc "Plausible.DataMigration.PopulateEventSessionColumns.run"
Kill via: ./bin/plausible rpc "Plausible.DataMigration.PopulateEventSessionColumns.kill"
Monitor via ./bin/plausible rpc "Plausible.DataMigration.PopulateEventSessionColumns.report_progress"
Suggested to run in a screen/tmux session to be able to easily monitor
SQL files available at: priv/data_migrations/PopulateEventSessionColumns/sql
"""
use Plausible.DataMigration, dir: "PopulateEventSessionColumns", repo: Plausible.IngestRepo
require Logger
# See https://clickhouse.com/docs/en/sql-reference/dictionaries#cache for meaning of these defaults
@default_dictionary_config %{
lifetime: 600_000,
size_in_cells: 1_000_000,
max_threads_for_updates: 6
}
def run(opts \\ []) do
cluster? = Plausible.MigrationUtils.clustered_table?("sessions_v2")
{:ok, _} =
run_sql("create-sessions-dictionary",
cluster?: cluster?,
dictionary_connection_params:
Keyword.get(opts, :dictionary_connection_string, dictionary_connection_params()),
dictionary_config: dictionary_config(opts)
)
{partitions, _, _, _} = get_partitions(opts)
IO.puts("Starting mutation on #{length(partitions)} partition(s)")
for partition <- partitions do
{:ok, _} =
run_sql("update-table", [cluster?: cluster?, partition: partition],
settings: [allow_nondeterministic_mutations: 1]
)
end
wait_until_mutations_complete(opts)
IO.puts("Mutations seem done, cleaning up!")
{:ok, _} = run_sql("drop-sessions-dictionary", cluster?: cluster?)
end
def kill(opts \\ []) do
cluster? = Plausible.MigrationUtils.clustered_table?("events_v2")
report_progress(opts)
IO.puts("Killing running mutations")
{:ok, _} = run_sql("kill-running-mutations", cluster?: cluster?)
end
def wait_until_mutations_complete(opts \\ []) do
Process.sleep(5_000)
in_progress? = report_progress(opts)
if in_progress? do
wait_until_mutations_complete(opts)
end
end
def report_progress(opts \\ []) do
{partitions, parts, min_partition, max_partition} = get_partitions(opts)
{:ok, %{rows: mutation_results}} =
run_sql("get-mutations-progress",
min_partition: min_partition,
max_partition: max_partition
)
[
[
mutations,
parts_to_do,
running_for,
total_size,
todo_size,
progress,
latest_fail_reason,
_,
_
]
] =
mutation_results
{:ok, %{rows: [[merges]]}} = run_sql("get-merges-progress")
{:ok, %{rows: disks}} = run_sql("get-disks")
IO.puts("\n\n#{Timex.now() |> Timex.format!("{ISO:Extended}")}")
# List partitions that need to run
IO.puts(
"Progress report for partitions #{Enum.min(partitions)}-#{Enum.max(partitions)} (parts: #{length(parts)})"
)
IO.puts("Disks overview:")
for [name, path, full_space, total_space, full_percentage] <- disks do
IO.puts(
" #{name} at #{path} is at #{full_space}/#{total_space} (#{full_percentage}% full)"
)
end
IO.puts("Currently #{mutations} mutation(s) are running.")
if mutations > 0 do
IO.puts(" To do #{parts_to_do} parts, #{todo_size}")
IO.puts(" Out of #{length(parts)} parts, #{total_size}")
IO.puts(" Running for #{format_duration(running_for)}")
if progress > 0 do
estimated_time_left = round(running_for / progress / 100 - running_for)
IO.puts(" Estimated #{progress}% done, #{format_duration(estimated_time_left)} left")
end
if latest_fail_reason do
IO.puts(" Some mutations might be failing. ClickHouse report: #{latest_fail_reason}")
end
end
IO.puts("Currently #{merges} merge(s) are running relating to mutations.")
mutations > 0
end
defp dictionary_config(opts) do
@default_dictionary_config
|> Map.merge(Keyword.get(opts, :dictionary_config, %{}))
end
# See https://clickhouse.com/docs/en/sql-reference/dictionaries#clickhouse for context
defp dictionary_connection_params() do
Plausible.IngestRepo.config()
|> Enum.map(fn
{:database, database} -> "DB '#{database}'"
{:username, username} -> "USER '#{username}'"
{:password, password} -> "PASSWORD '#{password}'"
_ -> nil
end)
|> Enum.reject(&is_nil/1)
|> Enum.join(" ")
end
defp get_partitions(opts) do
[min_partition, max_partition] = Keyword.get(opts, :partition_range, ["0", "999999"])
{:ok, %{rows: [[partitions, parts]]}} =
run_sql("list-partitions", min_partition: min_partition, max_partition: max_partition)
{partitions, parts, min_partition, max_partition}
end
defp format_duration(seconds) do
seconds
|> Timex.Duration.from_seconds()
|> Timex.format_duration(Timex.Format.Duration.Formatters.Humanized)
end
end

View File

@ -0,0 +1,32 @@
CREATE OR REPLACE DICTIONARY sessions_dict
<%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
(
site_id UInt64,
session_id UInt64,
referrer String,
referrer_source String,
utm_medium String,
utm_source String,
utm_campaign String,
utm_content String,
utm_term String,
country_code String,
subdivision1_code String,
subdivision2_code String,
city_geoname_id UInt32,
screen_size String,
operating_system String,
operating_system_version String,
browser String,
browser_version String
)
PRIMARY KEY site_id, session_id
SOURCE(CLICKHOUSE(TABLE sessions_v2 <%= @dictionary_connection_params %>))
LIFETIME(<%= @dictionary_config.lifetime %>)
LAYOUT(
complex_key_cache(
size_in_cells <%= @dictionary_config.size_in_cells %>
max_threads_for_updates <%= @dictionary_config.max_threads_for_updates %>
allow_read_expired_keys 1
)
)

View File

@ -0,0 +1,2 @@
DROP DICTIONARY IF EXISTS sessions_dict
<%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>

View File

@ -0,0 +1,7 @@
SELECT
name,
path,
formatReadableSize(total_space - free_space) AS full_space,
formatReadableSize(total_space),
round((total_space - free_space) / total_space * 100, 2)
FROM system.disks

View File

@ -0,0 +1,4 @@
SELECT count()
FROM system.merges
WHERE table = 'events_v2'
AND is_mutation

View File

@ -0,0 +1,29 @@
SELECT
total,
length(parts_to_do_names),
running_for_seconds,
formatReadableSize(total_bytes),
formatReadableSize(todo_bytes),
round(100 - todo_bytes / total_bytes * 100, 2),
latest_fail_reason,
sum(part.bytes_on_disk) as total_bytes,
sumIf(part.bytes_on_disk, has(parts_to_do_names, part.name)) AS todo_bytes
FROM (
SELECT count() AS total,
groupArrayArray(parts_to_do_names) AS parts_to_do_names,
now() - min(create_time) AS running_for_seconds,
anyIf(latest_fail_reason, latest_fail_reason != '') AS latest_fail_reason
FROM system.mutations
WHERE not is_done
AND table = 'events_v2'
AND command ILIKE '%sessions_dict%'
) AS mut
CROSS JOIN (
SELECT name, bytes_on_disk
FROM system.parts
WHERE table = 'events_v2'
AND active
AND partition >= '<%= @min_partition %>'
AND partition <= '<%= @max_partition %>'
) AS part
GROUP BY mut.total, mut.parts_to_do_names, mut.running_for_seconds, mut.latest_fail_reason

View File

@ -0,0 +1,2 @@
KILL MUTATION <%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
WHERE command ILIKE '%sessions_dict%'

View File

@ -0,0 +1,7 @@
SELECT arraySort(groupUniqArray(partition)), arraySort(groupArray(name))
FROM system.parts
WHERE active
AND database = currentDatabase()
AND table = 'events_v2'
AND partition >= '<%= @min_partition %>'
AND partition <= '<%= @max_partition %>'

View File

@ -0,0 +1,21 @@
ALTER TABLE events_v2
<%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
UPDATE
referrer = dictGet('sessions_dict', 'referrer', tuple(site_id, session_id)),
referrer_source = dictGet('sessions_dict', 'referrer_source', tuple(site_id, session_id)),
utm_medium = dictGet('sessions_dict', 'utm_medium', tuple(site_id, session_id)),
utm_source = dictGet('sessions_dict', 'utm_source', tuple(site_id, session_id)),
utm_campaign = dictGet('sessions_dict', 'utm_campaign', tuple(site_id, session_id)),
utm_content = dictGet('sessions_dict', 'utm_content', tuple(site_id, session_id)),
utm_term = dictGet('sessions_dict', 'utm_term', tuple(site_id, session_id)),
country_code = dictGet('sessions_dict', 'country_code', tuple(site_id, session_id)),
subdivision1_code = dictGet('sessions_dict', 'subdivision1_code', tuple(site_id, session_id)),
subdivision2_code = dictGet('sessions_dict', 'subdivision2_code', tuple(site_id, session_id)),
city_geoname_id = dictGet('sessions_dict', 'city_geoname_id', tuple(site_id, session_id)),
screen_size = dictGet('sessions_dict', 'screen_size', tuple(site_id, session_id)),
operating_system = dictGet('sessions_dict', 'operating_system', tuple(site_id, session_id)),
operating_system_version = dictGet('sessions_dict', 'operating_system_version', tuple(site_id, session_id)),
browser = dictGet('sessions_dict', 'browser', tuple(site_id, session_id)),
browser_version = dictGet('sessions_dict', 'browser_version', tuple(site_id, session_id))
IN PARTITION '<%= @partition %>'
WHERE 1=1