Run migrations in order across repos (#4466)

* order migrations

* migrate in 'streaks'

* cleanup

* docs

* tests

* add comment

* changelog

* continue

* run migrations tests in ci

* add pending_streams cmd

* fix ci

* add docs

* fix test

* simplify test

* only test v2+

* fix test

* return async: true

* cleanup

* add 'no pending migration'

* drop migrate/0 and pending_migrations/0

---------

Co-authored-by: Adrian Gruntkowski <adrian.gruntkowski@gmail.com>
This commit is contained in:
ruslandoga 2024-09-09 16:27:11 +07:00 committed by GitHub
parent 9ba9e2abc5
commit 8ee4827fea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 324 additions and 41 deletions

View File

@ -108,12 +108,12 @@ jobs:
- run: make minio - run: make minio
if: env.MIX_ENV == 'test' if: env.MIX_ENV == 'test'
- run: mix test --include slow --include minio --max-failures 1 --warnings-as-errors - run: mix test --include slow --include minio --include migrations --max-failures 1 --warnings-as-errors
if: env.MIX_ENV == 'test' if: env.MIX_ENV == 'test'
env: env:
MINIO_HOST_FOR_CLICKHOUSE: "172.17.0.1" MINIO_HOST_FOR_CLICKHOUSE: "172.17.0.1"
- run: mix test --include slow --max-failures 1 --warnings-as-errors - run: mix test --include slow --include migrations --max-failures 1 --warnings-as-errors
if: env.MIX_ENV == 'ce_test' if: env.MIX_ENV == 'ce_test'
static: static:

View File

@ -47,6 +47,7 @@ All notable changes to this project will be documented in this file.
- Don't include imports when showing time series hourly interval. Previously imported data was shown each midnight - Don't include imports when showing time series hourly interval. Previously imported data was shown each midnight
- Fix property filter suggestions 500 error when property hasn't been selected - Fix property filter suggestions 500 error when property hasn't been selected
- Bamboo.Mua: add Date and Message-ID headers if missing plausible/analytics#4474 - Bamboo.Mua: add Date and Message-ID headers if missing plausible/analytics#4474
- Fix migration order across `plausible_db` and `plausible_events_db` databases plausible/analytics#4466
## v2.1.1 - 2024-06-06 ## v2.1.1 - 2024-06-06

View File

@ -19,18 +19,119 @@ defmodule Plausible.Release do
end end
end end
def migrate do @doc """
`interweave_migrate/0` is a migration function that:
- Lists all pending migrations across multiple repositories.
- Sorts these migrations into a single list.
- Groups consecutive migrations by repository into "streaks".
- Executes the migrations in the correct order by processing each streak sequentially.
### Why Use This Approach?
This function resolves dependencies between migrations that span across different repositories.
The default `migrate/0` function migrates each repository independently, which may result in
migrations running in the wrong order when there are cross-repository dependencies.
Consider the following example (adapted from reality, not 100% accurate):
- **Migration 1**: The PostgreSQL (PG) repository creates a table named `site_imports`.
- **Migration 2**: The ClickHouse (CH) repository creates `import_id` columns in `imported_*` tables.
- **Migration 3**: The PG repository runs a data migration that utilizes both PG and CH databases,
reading from the `import_id` column in `imported_*` tables.
The default `migrate/0` would execute these migrations by repository, resulting in the following order:
1. Migration 1 (PG)
2. Migration 3 (PG)
3. Migration 2 (CH)
This sequence would fail at Migration 3, as the `import_id` columns in the CH repository have not been created yet.
`interweave_migrate/0` addresses this issue by consolidating all pending migrations into a single, ordered queue:
1. Migration 1 (PG)
2. Migration 2 (CH)
3. Migration 3 (PG)
This ensures all dependencies are resolved in the correct order.
"""
def interweave_migrate(repos \\ repos()) do
prepare() prepare()
Enum.each(repos(), &run_migrations_for/1)
IO.puts("Migrations successful!") pending = all_pending_migrations(repos)
streaks = migration_streaks(pending)
Enum.each(streaks, fn {repo, up_to_version} ->
{:ok, _, _} = Ecto.Migrator.with_repo(repo, &Ecto.Migrator.run(&1, :up, to: up_to_version))
end)
end end
def pending_migrations do defp migration_streaks(pending_migrations) do
prepare() sorted_migrations =
IO.puts("Pending migrations") pending_migrations
IO.puts("") |> Enum.map(fn {repo, version, _name} -> {repo, version} end)
Enum.each(repos(), &list_pending_migrations_for/1) |> Enum.sort_by(fn {_repo, version} -> version end, :asc)
streaks_reversed =
Enum.reduce(sorted_migrations, [], fn {repo, _version} = latest_migration, streaks_acc ->
case streaks_acc do
# start the streak for repo
[] -> [latest_migration]
# extend the streak
[{^repo, _prev_version} | rest] -> [latest_migration | rest]
# end the streak for prev_repo, start the streak for repo
[{_prev_repo, _prev_version} | _rest] -> [latest_migration | streaks_acc]
end end
end)
:lists.reverse(streaks_reversed)
end
@spec all_pending_migrations([Ecto.Repo.t()]) :: [{Ecto.Repo.t(), integer, String.t()}]
defp all_pending_migrations(repos) do
Enum.flat_map(repos, fn repo ->
# credo:disable-for-lines:6 Credo.Check.Refactor.Nesting
{:ok, pending, _started} =
Ecto.Migrator.with_repo(repo, fn repo ->
Ecto.Migrator.migrations(repo)
|> Enum.filter(fn {status, _version, _name} -> status == :down end)
|> Enum.map(fn {_status, version, name} -> {repo, version, name} end)
end)
pending
end)
end
def pending_streaks(repos \\ repos()) do
prepare()
IO.puts("Collecting pending migrations..")
pending = all_pending_migrations(repos)
if pending == [] do
IO.puts("No pending migrations!")
else
streaks = migration_streaks(pending)
print_migration_streaks(streaks, pending)
end
end
defp print_migration_streaks([{repo, up_to_version} | streaks], pending) do
{streak, pending} =
Enum.split_with(pending, fn {pending_repo, version, _name} ->
pending_repo == repo and version <= up_to_version
end)
IO.puts(
"\n#{inspect(repo)} [#{Path.relative_to_cwd(Ecto.Migrator.migrations_path(repo))}] streak up to version #{up_to_version}:"
)
Enum.each(streak, fn {_repo, version, name} -> IO.puts(" * #{version}_#{name}") end)
print_migration_streaks(streaks, pending)
end
defp print_migration_streaks([], []), do: :ok
def seed do def seed do
prepare() prepare()
@ -123,33 +224,6 @@ defmodule Plausible.Release do
end end
end end
defp run_migrations_for(repo) do
IO.puts("Running migrations for #{repo}")
{:ok, _, _} = Ecto.Migrator.with_repo(repo, &Ecto.Migrator.run(&1, :up, all: true))
end
defp list_pending_migrations_for(repo) do
IO.puts("Listing pending migrations for #{repo}")
IO.puts("")
migration_directory = Ecto.Migrator.migrations_path(repo)
pending =
repo
|> Ecto.Migrator.migrations([migration_directory])
|> Enum.filter(fn {status, _version, _migration} -> status == :down end)
if pending == [] do
IO.puts("No pending migrations")
else
Enum.each(pending, fn {_, version, migration} ->
IO.puts("* #{version}_#{migration}")
end)
end
IO.puts("")
end
defp ensure_repo_created(repo) do defp ensure_repo_created(repo) do
IO.puts("create #{inspect(repo)} database if it doesn't exist") IO.puts("create #{inspect(repo)} database if it doesn't exist")

View File

@ -3,4 +3,4 @@
BIN_DIR=$(dirname "$0") BIN_DIR=$(dirname "$0")
"${BIN_DIR}"/bin/plausible eval Plausible.Release.migrate "${BIN_DIR}"/bin/plausible eval Plausible.Release.interweave_migrate

View File

@ -3,4 +3,4 @@
BIN_DIR=$(dirname "$0") BIN_DIR=$(dirname "$0")
"${BIN_DIR}"/bin/plausible eval Plausible.Release.pending_migrations "${BIN_DIR}"/bin/plausible eval Plausible.Release.pending_streaks

View File

@ -40,4 +40,210 @@ defmodule Plausible.ReleaseTest do
assert stdout =~ "Starting repos.." assert stdout =~ "Starting repos.."
assert stdout =~ "Inserted 54 plans" assert stdout =~ "Inserted 54 plans"
end end
test "ecto_repos sanity check" do
# if the repos here are modified, please make sure `interweave_migrate/0` tests below are properly updated as well
assert Application.get_env(:plausible, :ecto_repos) == [Plausible.Repo, Plausible.IngestRepo]
end
describe "pending_streaks/1" do
@describetag :migrations
# this repo is used in place of Plausible.Repo
defmodule PostgreSQL do
use Ecto.Repo, otp_app: :plausible, adapter: Ecto.Adapters.Postgres
end
# this repo is used in place of Plausible.IngestRepo
defmodule ClickHouse do
use Ecto.Repo, otp_app: :plausible, adapter: Ecto.Adapters.ClickHouse
end
setup do
pg_config =
Plausible.Repo.config()
|> Keyword.replace!(:database, "plausible_test_migrations")
# to see priv/repo/migrations from this fake pg repo
|> Keyword.put_new(:priv, "priv/repo")
ch_config =
Plausible.IngestRepo.config()
|> Keyword.replace!(:database, "plausible_test_migrations")
# to see priv/ingest_repo/migrations from this fake ch repo
|> Keyword.put_new(:priv, "priv/ingest_repo")
Application.put_env(:plausible, PostgreSQL, pg_config)
on_exit(fn -> Application.delete_env(:plausible, PostgreSQL) end)
Application.put_env(:plausible, ClickHouse, ch_config)
on_exit(fn -> Application.delete_env(:plausible, ClickHouse) end)
pg_config = PostgreSQL.config()
:ok = Ecto.Adapters.Postgres.storage_up(pg_config)
on_exit(fn -> :ok = Ecto.Adapters.Postgres.storage_down(pg_config) end)
ch_config = ClickHouse.config()
:ok = Ecto.Adapters.ClickHouse.storage_up(ch_config)
on_exit(fn -> :ok = Ecto.Adapters.ClickHouse.storage_down(ch_config) end)
:ok
end
defp last_migration(repo) do
{:ok, {_status, version, name}, _started} =
Ecto.Migrator.with_repo(repo, fn repo ->
repo
|> Ecto.Migrator.migrations()
|> List.last()
end)
"#{version}_#{name}"
end
defp fake_migrate(repo, up_to_migration) do
{up_to_version, _name} = Integer.parse(up_to_migration)
insert_opts =
if repo == ClickHouse do
[types: [version: "Int64", inserted_at: "DateTime"]]
else
[]
end
Ecto.Migrator.with_repo(repo, fn repo ->
schema_versions =
Ecto.Migrator.migrations(repo)
|> Enum.filter(fn {status, version, _name} ->
status == :down and version <= up_to_version
end)
|> Enum.map(fn {_status, version, _name} ->
[version: version, inserted_at: NaiveDateTime.utc_now(:second)]
end)
repo.insert_all("schema_migrations", schema_versions, insert_opts)
end)
end
test "v2.0.0 -> master" do
# pretend to migrate the repos up to v2.0.0
# https://github.com/plausible/analytics/tree/v2.0.0/priv/repo/migrations
fake_migrate(PostgreSQL, _up_to = "20230516131041_add_unique_index_to_api_keys")
# https://github.com/plausible/analytics/tree/v2.0.0/priv/ingest_repo/migrations
fake_migrate(ClickHouse, _up_to = "20230509124919_clean_up_old_tables_after_v2_migration")
pending_streaks = capture_io(fn -> Release.pending_streaks([PostgreSQL, ClickHouse]) end)
pending_streaks =
if Plausible.ce?() do
# just to make the tests pass in CI
String.replace(pending_streaks, "_build/ce_test/lib", "_build/test/lib")
else
pending_streaks
end
assert """
Loading plausible..
Starting dependencies..
Starting repos..
Collecting pending migrations..
Plausible.ReleaseTest.PostgreSQL [_build/test/lib/plausible/priv/repo/migrations] streak up to version 20231011101825:
* 20230530161856_add_enable_feature_fields_for_site
* 20230724131709_change_allowed_event_props_type
* 20230802081520_cascade_delete_user
* 20230914071244_fix_broken_goals
* 20230914071245_goals_unique
* 20230925072840_plugins_api_tokens
* 20231003081927_add_user_previous_email
* 20231010074900_add_unique_index_on_site_memberships_site_id_when_owner
* 20231011101825_add_email_activation_codes
Plausible.ReleaseTest.ClickHouse [_build/test/lib/plausible/priv/ingest_repo/migrations] streak up to version 20231017073642:
* 20231017073642_disable_deduplication_window_for_imports
Plausible.ReleaseTest.PostgreSQL [_build/test/lib/plausible/priv/repo/migrations] streak up to version 20240123095646:
* 20231018081657_add_last_used_at_to_plugins_api_tokens
* 20231109090334_add_site_user_preferences
* 20231115131025_add_limits_to_enterprise_plans
* 20231115140646_add_totp_user_fields_and_recovery_codes
* 20231121131602_create_plans_table
* 20231127132321_remove_custom_domains
* 20231129103158_add_allow_next_upgrade_override_to_users
* 20231129161022_add_totp_token_to_users
* 20231204151831_backfill_last_bill_date_to_subscriptions
* 20231208125624_add_data_retention_in_years_to_plans
* 20231211092344_add_accept_traffic_until_to_sites
* 20231219083050_track_accept_traffic_until_notifcations
* 20231220072829_add_accept_traffic_until_to_user
* 20231220101920_backfill_accept_traffic_until
* 20240103090304_upgrade_oban_jobs_to_v12
* 20240123085318_add_ip_block_list_table
* 20240123095646_remove_google_analytics_imports_jobs
Plausible.ReleaseTest.ClickHouse [_build/test/lib/plausible/priv/ingest_repo/migrations] streak up to version 20240123142959:
* 20240123142959_add_import_id_to_imported_tables
Plausible.ReleaseTest.PostgreSQL [_build/test/lib/plausible/priv/repo/migrations] streak up to version 20240129113531:
* 20240123144308_add_site_imports
* 20240129102900_migrate_accepted_traffic_until
* 20240129113531_backfill_accept_traffic_until_for_users_missing_notifications
Plausible.ReleaseTest.ClickHouse [_build/test/lib/plausible/priv/ingest_repo/migrations] streak up to version 20240209085338:
* 20240209085338_minmax_index_session_timestamp
Plausible.ReleaseTest.PostgreSQL [_build/test/lib/plausible/priv/repo/migrations] streak up to version 20240214114158:
* 20240214114158_add_legacy_flag_to_site_imports
Plausible.ReleaseTest.ClickHouse [_build/test/lib/plausible/priv/ingest_repo/migrations] streak up to version 20240220123656:
* 20240220123656_create_sessions_events_compression_options
Plausible.ReleaseTest.PostgreSQL [_build/test/lib/plausible/priv/repo/migrations] streak up to version 20240221122626:
* 20240220144655_cascade_delete_ip_rules
* 20240221122626_shield_country_rules
Plausible.ReleaseTest.ClickHouse [_build/test/lib/plausible/priv/ingest_repo/migrations] streak up to version 20240305085310:
* 20240222082911_sessions_v2_versioned_collapsing_merge_tree
* 20240305085310_events_sessions_columns_improved
Plausible.ReleaseTest.PostgreSQL [_build/test/lib/plausible/priv/repo/migrations] streak up to version 20240319094940:
* 20240307083402_shield_page_rules
* 20240319094940_add_label_to_site_imports
Plausible.ReleaseTest.ClickHouse [_build/test/lib/plausible/priv/ingest_repo/migrations] streak up to version 20240327085855:
* 20240326134840_add_metrics_to_imported_tables
* 20240327085855_hostnames_in_sessions
Plausible.ReleaseTest.PostgreSQL [_build/test/lib/plausible/priv/repo/migrations] streak up to version 20240407104659:
* 20240407104659_shield_hostname_rules
Plausible.ReleaseTest.ClickHouse [_build/test/lib/plausible/priv/ingest_repo/migrations] streak up to version 20240502115822:
* 20240419133926_add_active_visitors_to_imported_pages
* 20240423094014_add_imported_custom_events
* 20240502115822_alias_api_prop_names
Plausible.ReleaseTest.PostgreSQL [_build/test/lib/plausible/priv/repo/migrations] streak up to version 20240708120453:
* 20240528115149_migrate_site_imports
* 20240702055817_traffic_drop_notifications
* 20240708120453_create_help_scout_credentials
Plausible.ReleaseTest.ClickHouse [_build/test/lib/plausible/priv/ingest_repo/migrations] streak up to version 20240709181437:
* 20240709181437_populate_location_data
Plausible.ReleaseTest.PostgreSQL [_build/test/lib/plausible/priv/repo/migrations] streak up to version \
""" <> _future = pending_streaks
fake_migrate(PostgreSQL, last_migration(PostgreSQL))
fake_migrate(ClickHouse, last_migration(ClickHouse))
no_streaks = capture_io(fn -> Release.pending_streaks([PostgreSQL, ClickHouse]) end)
assert no_streaks == """
Loading plausible..
Starting dependencies..
Starting repos..
Collecting pending migrations..
No pending migrations!
"""
end
end
end end

View File

@ -20,10 +20,12 @@ if :minio in Keyword.fetch!(ExUnit.configuration(), :include) do
Plausible.TestUtils.ensure_minio() Plausible.TestUtils.ensure_minio()
end end
default_exclude = [:slow, :minio, :migrations]
if Mix.env() == :ce_test do if Mix.env() == :ce_test do
IO.puts("Test mode: Community Edition") IO.puts("Test mode: Community Edition")
ExUnit.configure(exclude: [:slow, :minio, :ee_only]) ExUnit.configure(exclude: [:ee_only | default_exclude])
else else
IO.puts("Test mode: Enterprise Edition") IO.puts("Test mode: Enterprise Edition")
ExUnit.configure(exclude: [:slow, :minio, :ce_build_only]) ExUnit.configure(exclude: [:ce_build_only | default_exclude])
end end