Worker to clean site data from ClickHouse (#3959)

* Create a worker to clean clickhouse deleted sites data

The plan is to run this weekly, but going to trigger it manually the first few times on cloud

* Make asserting count more reliable

* credo

* PR feedback

* Fixes
This commit is contained in:
Karl-Aksel Puulmann 2024-04-08 12:26:38 +03:00 committed by GitHub
parent acbbaa9116
commit a6d4786959
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 137 additions and 0 deletions

View File

@ -0,0 +1,68 @@
defmodule Plausible.Workers.ClickhouseCleanSites do
@moduledoc """
Cleans deleted site data from ClickHouse asynchronously.
We batch up data deletions from ClickHouse as deleting a single site is
just as expensive as deleting many.
"""
use Plausible.Repo
use Plausible.ClickhouseRepo
use Plausible.IngestRepo
use Oban.Worker, queue: :clickhouse_clean_sites
import Ecto.Query
require Logger
@tables_to_clear [
"events_v2",
"sessions_v2",
"ingest_counters",
"imported_browsers",
"imported_devices",
"imported_entry_pages",
"imported_exit_pages",
"imported_locations",
"imported_operating_systems",
"imported_pages",
"imported_sources",
"imported_visitors"
]
@settings if Mix.env() in [:test, :small_test], do: [mutations_sync: 2], else: []
def perform(_job) do
deleted_sites = get_deleted_sites_with_clickhouse_data()
if not Enum.empty?(deleted_sites) do
Logger.info(
"Clearing ClickHouse data for the following #{length(deleted_sites)} sites which have been deleted: #{inspect(deleted_sites)}"
)
for table <- @tables_to_clear do
IngestRepo.query!(
"ALTER TABLE {$0:Identifier} DELETE WHERE site_id IN {$1:Array(UInt64)}",
[table, deleted_sites],
settings: @settings
)
end
end
:ok
end
def get_deleted_sites_with_clickhouse_data() do
pg_sites =
from(s in Plausible.Site, select: s.id)
|> Plausible.Repo.all()
|> MapSet.new()
ch_sites =
from(e in "events_v2", group_by: e.site_id, select: e.site_id)
|> Plausible.ClickhouseRepo.all(timeout: :infinity)
|> MapSet.new()
MapSet.difference(ch_sites, pg_sites) |> MapSet.to_list()
end
end

View File

@ -0,0 +1,69 @@
defmodule Plausible.Workers.ClickhouseCleanSitesTest do
use Plausible.DataCase
use Plausible.TestUtils
use Plausible
import Plausible.Factory
alias Plausible.Workers.ClickhouseCleanSites
test "deletes data from events and sessions tables" do
site = insert(:site)
deleted_site = insert(:site)
populate_stats(site, [
build(:pageview)
])
populate_stats(deleted_site, [
build(:pageview),
build(:pageview),
build(:imported_visitors),
build(:imported_sources),
build(:imported_pages),
build(:imported_entry_pages),
build(:imported_exit_pages),
build(:imported_locations),
build(:imported_devices),
build(:imported_browsers),
build(:imported_operating_systems)
])
Repo.delete!(deleted_site)
assert Enum.member?(
ClickhouseCleanSites.get_deleted_sites_with_clickhouse_data(),
deleted_site.id
)
assert not Enum.member?(
ClickhouseCleanSites.get_deleted_sites_with_clickhouse_data(),
site.id
)
ClickhouseCleanSites.perform(nil)
assert_count(deleted_site, "events_v2", 0)
assert_count(deleted_site, "sessions_v2", 0)
assert_count(deleted_site, "imported_visitors", 0)
assert_count(deleted_site, "imported_sources", 0)
assert_count(deleted_site, "imported_pages", 0)
assert_count(deleted_site, "imported_entry_pages", 0)
assert_count(deleted_site, "imported_exit_pages", 0)
assert_count(deleted_site, "imported_locations", 0)
assert_count(deleted_site, "imported_devices", 0)
assert_count(deleted_site, "imported_browsers", 0)
assert_count(deleted_site, "imported_operating_systems", 0)
assert_count(site, "events_v2", 1)
assert_count(site, "sessions_v2", 1)
assert not Enum.member?(
ClickhouseCleanSites.get_deleted_sites_with_clickhouse_data(),
deleted_site.id
)
end
def assert_count(site, table, expected_count) do
q = from(e in table, select: %{count: fragment("count()")}, where: e.site_id == ^site.id)
await_clickhouse_count(q, expected_count)
end
end