mirror of
https://github.com/plausible/analytics.git
synced 2024-09-11 18:07:33 +03:00
Implement script for backfilling legacy site import entries and adjusting end dates of site imports (#3954)
* Always select and clear import ID 0 when referring to legacy imports * Implement script for adding site import entries and adjusting end dates * Log cases where end date computation is using fallback * Don't log queries when running the migration to reduce noise
This commit is contained in:
parent
5bf59d1d8a
commit
71fe541359
169
lib/plausible/data_migration/site_imports.ex
Normal file
169
lib/plausible/data_migration/site_imports.ex
Normal file
@ -0,0 +1,169 @@
|
||||
defmodule Plausible.DataMigration.SiteImports do
|
||||
@moduledoc """
|
||||
Site imports migration backfilling SiteImport entries for old imports
|
||||
and alters import end dates to match actual end date of respective import stats.
|
||||
"""
|
||||
|
||||
import Ecto.Query
|
||||
|
||||
alias Plausible.ClickhouseRepo
|
||||
alias Plausible.Imported
|
||||
alias Plausible.Repo
|
||||
alias Plausible.Site
|
||||
|
||||
def run(opts \\ []) do
|
||||
dry_run? = Keyword.get(opts, :dry_run?, true)
|
||||
|
||||
site_import_query =
|
||||
from(i in Imported.SiteImport, where: i.site_id == parent_as(:site).id, select: 1)
|
||||
|
||||
sites_with_imports =
|
||||
from(s in Site, as: :site, where: not is_nil(s.imported_data) or exists(site_import_query))
|
||||
|> Repo.all(log: false)
|
||||
|
||||
sites_count = length(sites_with_imports)
|
||||
|
||||
IO.puts("Processing #{sites_count} sites with imports (DRY RUN: #{dry_run?})...")
|
||||
|
||||
for {site, idx} <- Enum.with_index(sites_with_imports) do
|
||||
site_imports =
|
||||
from(i in Imported.SiteImport, where: i.site_id == ^site.id)
|
||||
|> Repo.all(log: false)
|
||||
|
||||
IO.puts(
|
||||
"Processing site ID #{site.id} (#{idx + 1} / #{sites_count}) (imported_data: #{is_struct(site.imported_data)}, site_imports: #{length(site_imports)})"
|
||||
)
|
||||
|
||||
site_imports =
|
||||
if site.imported_data && not Enum.any?(site_imports, & &1.legacy) do
|
||||
IO.puts("Creating legacy site import entry for site ID #{site.id}")
|
||||
|
||||
# create legacy entry if there's not one yet
|
||||
params =
|
||||
site.imported_data
|
||||
|> Imported.SiteImport.from_legacy()
|
||||
|> Map.put(:site_id, site.id)
|
||||
|> Map.take([:legacy, :start_date, :end_date, :source, :status, :site_id])
|
||||
|
||||
legacy_site_import =
|
||||
%Imported.SiteImport{}
|
||||
|> Ecto.Changeset.change(params)
|
||||
|> insert!(dry_run?)
|
||||
|
||||
[legacy_site_import | site_imports]
|
||||
else
|
||||
IO.puts("Legacy site import entry for site ID #{site.id} already exists")
|
||||
|
||||
site_imports
|
||||
end
|
||||
|
||||
# adjust end date for each site import
|
||||
for site_import <- site_imports do
|
||||
IO.puts(
|
||||
"Adjusting end date for site import #{site_import.id} (site ID #{site.id}, start date: #{site_import.start_date}, end date: #{site_import.end_date})"
|
||||
)
|
||||
|
||||
import_ids =
|
||||
if site_import.legacy do
|
||||
[0, site_import.id]
|
||||
else
|
||||
[site_import.id]
|
||||
end
|
||||
|
||||
end_date = imported_stats_end_date(site.id, import_ids)
|
||||
|
||||
end_date =
|
||||
if !end_date do
|
||||
IO.puts(
|
||||
"Site import #{site_import.id} (site ID #{site.id}) does not have any recorded stats. Setting end date to minimum."
|
||||
)
|
||||
|
||||
Date.add(site_import.start_date, 2)
|
||||
else
|
||||
end_date
|
||||
end
|
||||
|
||||
end_date =
|
||||
if Date.compare(end_date, site_import.end_date) in [:lt, :eq] do
|
||||
end_date
|
||||
else
|
||||
IO.puts(
|
||||
"Site import #{site_import.id} (site ID #{site.id}) computed end date is later than the current one. Skipping."
|
||||
)
|
||||
|
||||
site_import.end_date
|
||||
end
|
||||
|
||||
site_import
|
||||
|> Ecto.Changeset.change(end_date: end_date)
|
||||
|> update!(dry_run?)
|
||||
|
||||
IO.puts(
|
||||
"End date of site import #{site_import.id} (site ID #{site.id}) adjusted to #{end_date}"
|
||||
)
|
||||
end
|
||||
|
||||
IO.puts("Done processing site ID #{site.id}")
|
||||
end
|
||||
|
||||
IO.puts("Finished")
|
||||
end
|
||||
|
||||
# Exposed for testing purposes
|
||||
@doc false
|
||||
def imported_stats_end_date(site_id, import_ids) do
|
||||
[first_schema | schemas] = Imported.schemas()
|
||||
|
||||
query =
|
||||
Enum.reduce(schemas, max_date_query(first_schema, site_id, import_ids), fn schema, query ->
|
||||
from(s in subquery(union_all(query, ^max_date_query(schema, site_id, import_ids))))
|
||||
end)
|
||||
|
||||
dates = ClickhouseRepo.all(from(q in query, select: q.max_date), log: false)
|
||||
|
||||
if dates != [] do
|
||||
case Enum.max(dates, Date) do
|
||||
# no stats for this domain yet
|
||||
~D[1970-01-01] ->
|
||||
nil
|
||||
|
||||
date ->
|
||||
date
|
||||
end
|
||||
else
|
||||
nil
|
||||
end
|
||||
end
|
||||
|
||||
defp insert!(changeset, false = _dry_run?) do
|
||||
Repo.insert!(changeset)
|
||||
end
|
||||
|
||||
defp insert!(changeset, true = _dry_run?) do
|
||||
if changeset.valid? do
|
||||
Ecto.Changeset.apply_changes(changeset)
|
||||
else
|
||||
raise "Invalid insert: #{inspect(changeset)}"
|
||||
end
|
||||
end
|
||||
|
||||
defp update!(changeset, false = _dry_run?) do
|
||||
Repo.update!(changeset)
|
||||
end
|
||||
|
||||
defp update!(changeset, true = _dry_run?) do
|
||||
if changeset.valid? do
|
||||
Ecto.Changeset.apply_changes(changeset)
|
||||
else
|
||||
raise "Invalid update: #{inspect(changeset)}"
|
||||
end
|
||||
end
|
||||
|
||||
defp max_date_query(schema, site_id, import_ids) do
|
||||
from(q in schema,
|
||||
where: q.site_id == ^site_id,
|
||||
where: q.import_id in ^import_ids,
|
||||
select: %{max_date: fragment("max(?)", q.date)}
|
||||
)
|
||||
end
|
||||
end
|
@ -37,6 +37,9 @@ defmodule Plausible.Imported do
|
||||
# Maximum number of complete imports to account for when querying stats
|
||||
@max_complete_imports 5
|
||||
|
||||
@spec schemas() :: [module()]
|
||||
def schemas, do: @tables
|
||||
|
||||
@spec tables() :: [String.t()]
|
||||
def tables, do: @table_names
|
||||
|
||||
@ -91,13 +94,16 @@ defmodule Plausible.Imported do
|
||||
ids =
|
||||
from(i in SiteImport,
|
||||
where: i.site_id == ^site.id and i.status == ^SiteImport.completed(),
|
||||
select: i.id,
|
||||
select: {i.legacy, i.id},
|
||||
limit: @max_complete_imports
|
||||
)
|
||||
|> Repo.all()
|
||||
|
||||
has_legacy? = Enum.any?(ids, fn {legacy?, _} -> legacy? end)
|
||||
ids = Enum.map(ids, &elem(&1, 1))
|
||||
|
||||
# account for legacy imports as well
|
||||
if site.imported_data && site.imported_data.status == "ok" do
|
||||
if has_legacy? || (site.imported_data && site.imported_data.status == "ok") do
|
||||
[0 | ids]
|
||||
else
|
||||
ids
|
||||
|
@ -666,6 +666,8 @@ defmodule PlausibleWeb.SiteController do
|
||||
Plausible.Repo.delete!(site_import)
|
||||
|
||||
if site_import.legacy do
|
||||
Plausible.Purge.delete_imported_stats!(site, 0)
|
||||
|
||||
site
|
||||
|> Plausible.Site.remove_imported_data()
|
||||
|> Repo.update!()
|
||||
|
170
test/plausible/data_migration/site_imports_test.exs
Normal file
170
test/plausible/data_migration/site_imports_test.exs
Normal file
@ -0,0 +1,170 @@
|
||||
defmodule Plausible.DataMigration.SiteImportsTest do
|
||||
use Plausible.DataCase, async: true
|
||||
|
||||
import ExUnit.CaptureIO
|
||||
|
||||
alias Plausible.DataMigration.SiteImports
|
||||
alias Plausible.Imported
|
||||
alias Plausible.Repo
|
||||
alias Plausible.Site
|
||||
|
||||
describe "run/1" do
|
||||
test "runs for empty dataset" do
|
||||
dry_run_output =
|
||||
capture_io(fn ->
|
||||
assert :ok = SiteImports.run()
|
||||
end)
|
||||
|
||||
assert dry_run_output =~ "Processing 0 sites"
|
||||
assert dry_run_output =~ "DRY RUN: true"
|
||||
|
||||
real_run_output =
|
||||
capture_io(fn ->
|
||||
assert :ok = SiteImports.run(dry_run?: false)
|
||||
end)
|
||||
|
||||
assert real_run_output =~ "Processing 0 sites"
|
||||
assert real_run_output =~ "DRY RUN: false"
|
||||
end
|
||||
|
||||
test "adds site import entry when it's missing and adjusts end date" do
|
||||
site =
|
||||
insert(:site)
|
||||
|> Site.start_import(~D[2021-01-02], ~D[2021-01-08], "Google Analytics", "ok")
|
||||
|> Repo.update!()
|
||||
|
||||
populate_stats(site, 0, [
|
||||
build(:imported_visitors, date: ~D[2021-01-07])
|
||||
])
|
||||
|
||||
assert capture_io(fn ->
|
||||
assert :ok = SiteImports.run(dry_run?: false)
|
||||
end) =~ "Processing 1 sites"
|
||||
|
||||
site = Repo.reload!(site)
|
||||
|
||||
assert [%{id: id, legacy: true} = site_import] = Imported.list_all_imports(site)
|
||||
assert id > 0
|
||||
assert site_import.start_date == site.imported_data.start_date
|
||||
assert site_import.end_date == ~D[2021-01-07]
|
||||
assert site_import.source == :universal_analytics
|
||||
end
|
||||
|
||||
test "does not set end date to latter than the current one" do
|
||||
site =
|
||||
insert(:site)
|
||||
|> Site.start_import(~D[2021-01-02], ~D[2021-01-08], "Google Analytics", "ok")
|
||||
|> Repo.update!()
|
||||
|
||||
populate_stats(site, 0, [
|
||||
build(:imported_visitors, date: ~D[2021-01-10])
|
||||
])
|
||||
|
||||
assert capture_io(fn ->
|
||||
assert :ok = SiteImports.run(dry_run?: false)
|
||||
end) =~ "Processing 1 sites"
|
||||
|
||||
site = Repo.reload!(site)
|
||||
|
||||
assert [%{id: id, legacy: true} = site_import] = Imported.list_all_imports(site)
|
||||
assert id > 0
|
||||
assert site_import.start_date == site.imported_data.start_date
|
||||
assert site_import.end_date == ~D[2021-01-08]
|
||||
assert site_import.source == :universal_analytics
|
||||
end
|
||||
|
||||
test "leaves site and imports unchanged if everything fits" do
|
||||
site =
|
||||
insert(:site)
|
||||
|> Site.start_import(~D[2021-01-02], ~D[2021-01-08], "Google Analytics", "ok")
|
||||
|> Repo.update!()
|
||||
|
||||
existing_import =
|
||||
insert(:site_import,
|
||||
site: site,
|
||||
start_date: ~D[2021-01-02],
|
||||
end_date: ~D[2021-01-08],
|
||||
status: :completed,
|
||||
legacy: true
|
||||
)
|
||||
|
||||
populate_stats(site, existing_import.id, [
|
||||
build(:imported_visitors, date: ~D[2021-01-08])
|
||||
])
|
||||
|
||||
assert capture_io(fn ->
|
||||
assert :ok = SiteImports.run(dry_run?: false)
|
||||
end) =~ "Processing 1 sites"
|
||||
|
||||
site = Repo.reload!(site)
|
||||
|
||||
assert [%{id: id, legacy: true} = site_import] = Imported.list_all_imports(site)
|
||||
assert id == existing_import.id
|
||||
assert site_import.start_date == site.imported_data.start_date
|
||||
assert site_import.end_date == ~D[2021-01-08]
|
||||
assert site_import.source == :universal_analytics
|
||||
end
|
||||
end
|
||||
|
||||
describe "imported_stats_end_date/1" do
|
||||
test "returns nil when there are no stats" do
|
||||
site_import = insert(:site_import)
|
||||
|
||||
assert SiteImports.imported_stats_end_date(site_import.site_id, [site_import.id]) == nil
|
||||
end
|
||||
|
||||
test "returns date when there are stats recorded in one imported table" do
|
||||
site_import = insert(:site_import)
|
||||
|
||||
populate_stats(site_import.site, site_import.id, [
|
||||
build(:imported_visitors, date: ~D[2021-01-01])
|
||||
])
|
||||
|
||||
assert SiteImports.imported_stats_end_date(site_import.site_id, [site_import.id]) ==
|
||||
~D[2021-01-01]
|
||||
end
|
||||
|
||||
test "returns max date across all imported tables" do
|
||||
site_import = insert(:site_import)
|
||||
|
||||
populate_stats(site_import.site, site_import.id, [
|
||||
build(:imported_visitors, date: ~D[2021-01-01]),
|
||||
build(:imported_visitors, date: ~D[2021-01-07]),
|
||||
build(:imported_sources, date: ~D[2021-01-01]),
|
||||
build(:imported_sources, date: ~D[2021-01-08]),
|
||||
build(:imported_entry_pages, date: ~D[2021-01-02]),
|
||||
build(:imported_entry_pages, date: ~D[2021-02-11]),
|
||||
build(:imported_exit_pages, date: ~D[2021-01-01]),
|
||||
build(:imported_exit_pages, date: ~D[2021-01-08]),
|
||||
build(:imported_locations, date: ~D[2021-01-01]),
|
||||
build(:imported_locations, date: ~D[2021-01-08]),
|
||||
build(:imported_devices, date: ~D[2021-01-01]),
|
||||
build(:imported_devices, date: ~D[2021-01-08]),
|
||||
build(:imported_browsers, date: ~D[2021-01-01]),
|
||||
build(:imported_browsers, date: ~D[2021-01-08]),
|
||||
build(:imported_operating_systems, date: ~D[2021-01-01]),
|
||||
build(:imported_operating_systems, date: ~D[2021-01-08])
|
||||
])
|
||||
|
||||
assert SiteImports.imported_stats_end_date(site_import.site_id, [site_import.id]) ==
|
||||
~D[2021-02-11]
|
||||
end
|
||||
|
||||
test "considers all imported tables" do
|
||||
date = ~D[2021-01-11]
|
||||
|
||||
for {table, idx} <- Enum.with_index(Imported.tables()) do
|
||||
site_import = insert(:site_import)
|
||||
end_date = Date.add(date, idx)
|
||||
|
||||
populate_stats(site_import.site, site_import.id, [
|
||||
build(String.to_atom(table), date: date),
|
||||
build(String.to_atom(table), date: end_date)
|
||||
])
|
||||
|
||||
assert SiteImports.imported_stats_end_date(site_import.site_id, [site_import.id]) ==
|
||||
end_date
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue
Block a user