mirror of
https://github.com/plausible/analytics.git
synced 2024-12-23 09:33:19 +03:00
Keep track of native stats start timestamp when retrieving data (#2715)
* Stats boundary/PoC? * Delete stats removal * Drop events check on site creation * Update seeds script * Use native_stats_start_at * Don't rely on native stats pointer in imported stats queries * Reset site * Export reset/1 * Remove unnecessary inserted_at settings * Update seeds * Remove unnecessary inserted_at setting
This commit is contained in:
parent
05e7f93da2
commit
8f86036e57
@ -401,8 +401,7 @@ base_queues = [
|
|||||||
site_setup_emails: 1,
|
site_setup_emails: 1,
|
||||||
clean_email_verification_codes: 1,
|
clean_email_verification_codes: 1,
|
||||||
clean_invitations: 1,
|
clean_invitations: 1,
|
||||||
google_analytics_imports: 1,
|
google_analytics_imports: 1
|
||||||
site_stats_removal: 1
|
|
||||||
]
|
]
|
||||||
|
|
||||||
cloud_queues = [
|
cloud_queues = [
|
||||||
|
@ -27,19 +27,23 @@ defmodule Plausible.Purge do
|
|||||||
|
|
||||||
@spec delete_native_stats!(Plausible.Site.t()) :: :ok
|
@spec delete_native_stats!(Plausible.Site.t()) :: :ok
|
||||||
@doc """
|
@doc """
|
||||||
Deletes native stats for a site, and clears the `stats_start_date` field.
|
Move stats pointers so that no historical stats are available.
|
||||||
"""
|
"""
|
||||||
def delete_native_stats!(site) do
|
def delete_native_stats!(site) do
|
||||||
events_sql = "ALTER TABLE events DELETE WHERE domain = ?"
|
reset!(site)
|
||||||
sessions_sql = "ALTER TABLE sessions DELETE WHERE domain = ?"
|
|
||||||
Ecto.Adapters.SQL.query!(Plausible.ClickhouseRepo, events_sql, [site.domain])
|
|
||||||
Ecto.Adapters.SQL.query!(Plausible.ClickhouseRepo, sessions_sql, [site.domain])
|
|
||||||
|
|
||||||
clear_stats_start_date!(site)
|
|
||||||
|
|
||||||
:ok
|
:ok
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def reset!(site) do
|
||||||
|
site
|
||||||
|
|> Ecto.Changeset.change(
|
||||||
|
native_stats_start_at: NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second),
|
||||||
|
stats_start_date: nil
|
||||||
|
)
|
||||||
|
|> Plausible.Repo.update!()
|
||||||
|
end
|
||||||
|
|
||||||
defp clear_stats_start_date!(site) do
|
defp clear_stats_start_date!(site) do
|
||||||
site
|
site
|
||||||
|> Ecto.Changeset.change(stats_start_date: nil)
|
|> Ecto.Changeset.change(stats_start_date: nil)
|
||||||
|
@ -16,6 +16,7 @@ defmodule Plausible.Site do
|
|||||||
field :public, :boolean
|
field :public, :boolean
|
||||||
field :locked, :boolean
|
field :locked, :boolean
|
||||||
field :stats_start_date, :date
|
field :stats_start_date, :date
|
||||||
|
field :native_stats_start_at, :naive_datetime
|
||||||
|
|
||||||
field :ingest_rate_limit_scale_seconds, :integer, default: 60
|
field :ingest_rate_limit_scale_seconds, :integer, default: 60
|
||||||
field :ingest_rate_limit_threshold, :integer
|
field :ingest_rate_limit_threshold, :integer
|
||||||
|
@ -1,41 +1,14 @@
|
|||||||
defmodule Plausible.Site.Removal do
|
defmodule Plausible.Site.Removal do
|
||||||
@moduledoc """
|
@moduledoc """
|
||||||
A service responsible for site and its stats deletion.
|
A site deletion service stub.
|
||||||
The site deletion alone is done in postgres and is executed first,
|
|
||||||
the latter deletions (events, sessions and imported tables in clickhouse)
|
|
||||||
are performed asynchrnounsly via `Plausible.Workers.StatsRemoval`.
|
|
||||||
|
|
||||||
This is to avoid race condition in which the site is deleted, but stats
|
|
||||||
writes are pending (either in the buffers or are about to be buffered, due
|
|
||||||
to Sites.Cache keeping the now obsolete record until refresh is triggered).
|
|
||||||
"""
|
"""
|
||||||
@stats_deletion_delay_seconds 60 * 20
|
|
||||||
|
|
||||||
alias Plausible.Workers.StatsRemoval
|
|
||||||
alias Plausible.Repo
|
alias Plausible.Repo
|
||||||
alias Ecto.Multi
|
|
||||||
|
|
||||||
import Ecto.Query
|
import Ecto.Query
|
||||||
|
|
||||||
@spec stats_deletion_delay_seconds() :: pos_integer()
|
|
||||||
def stats_deletion_delay_seconds() do
|
|
||||||
@stats_deletion_delay_seconds
|
|
||||||
end
|
|
||||||
|
|
||||||
@spec run(String.t()) :: {:ok, map()}
|
@spec run(String.t()) :: {:ok, map()}
|
||||||
def run(domain) do
|
def run(domain) do
|
||||||
site_by_domain_q = from s in Plausible.Site, where: s.domain == ^domain
|
result = Repo.delete_all(from(s in Plausible.Site, where: s.domain == ^domain))
|
||||||
|
{:ok, %{delete_all: result}}
|
||||||
Multi.new()
|
|
||||||
|> Multi.run(:site_id, fn _, _ ->
|
|
||||||
{:ok, Repo.one(from s in site_by_domain_q, select: s.id)}
|
|
||||||
end)
|
|
||||||
|> Multi.delete_all(:delete_all, site_by_domain_q)
|
|
||||||
|> Oban.insert(:delayed_metrics_removal, fn %{site_id: site_id} ->
|
|
||||||
StatsRemoval.new(%{domain: domain, site_id: site_id},
|
|
||||||
schedule_in: stats_deletion_delay_seconds()
|
|
||||||
)
|
|
||||||
end)
|
|
||||||
|> Repo.transaction()
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -24,13 +24,6 @@ defmodule Plausible.Sites do
|
|||||||
end
|
end
|
||||||
end)
|
end)
|
||||||
|> Ecto.Multi.insert(:site, site_changeset)
|
|> Ecto.Multi.insert(:site, site_changeset)
|
||||||
|> Ecto.Multi.run(:existing_events, fn _, _ ->
|
|
||||||
site_changeset
|
|
||||||
|> Ecto.Changeset.validate_change(:domain, fn :domain, domain ->
|
|
||||||
check_for_existing_events(domain, params)
|
|
||||||
end)
|
|
||||||
|> Ecto.Changeset.apply_action(:insert)
|
|
||||||
end)
|
|
||||||
|> Ecto.Multi.run(:site_membership, fn repo, %{site: site} ->
|
|> Ecto.Multi.run(:site_membership, fn repo, %{site: site} ->
|
||||||
membership_changeset =
|
membership_changeset =
|
||||||
Site.Membership.changeset(%Site.Membership{}, %{
|
Site.Membership.changeset(%Site.Membership{}, %{
|
||||||
@ -183,19 +176,4 @@ defmodule Plausible.Sites do
|
|||||||
where: sm.role == :owner
|
where: sm.role == :owner
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp check_for_existing_events(domain, params) do
|
|
||||||
if has_events?(domain) do
|
|
||||||
Sentry.capture_message("Refused to create a site with existing events",
|
|
||||||
extra: %{params: params}
|
|
||||||
)
|
|
||||||
|
|
||||||
[
|
|
||||||
domain:
|
|
||||||
"This domain cannot be registered. Perhaps one of your colleagues registered it? Or did you recently delete it from your account? The deletion may take up to 48 hours before you can add the same site again. If that's not the case, please contact support@plausible.io"
|
|
||||||
]
|
|
||||||
else
|
|
||||||
[]
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
@ -31,7 +31,7 @@ defmodule Plausible.Stats.Base do
|
|||||||
end
|
end
|
||||||
|
|
||||||
def query_events(site, query) do
|
def query_events(site, query) do
|
||||||
{first_datetime, last_datetime} = utc_boundaries(query, site.timezone)
|
{first_datetime, last_datetime} = utc_boundaries(query, site)
|
||||||
|
|
||||||
q =
|
q =
|
||||||
from(
|
from(
|
||||||
@ -145,7 +145,7 @@ defmodule Plausible.Stats.Base do
|
|||||||
}
|
}
|
||||||
|
|
||||||
def query_sessions(site, query) do
|
def query_sessions(site, query) do
|
||||||
{first_datetime, last_datetime} = utc_boundaries(query, site.timezone)
|
{first_datetime, last_datetime} = utc_boundaries(query, site)
|
||||||
|
|
||||||
sessions_q =
|
sessions_q =
|
||||||
from(
|
from(
|
||||||
@ -395,31 +395,48 @@ defmodule Plausible.Stats.Base do
|
|||||||
defp db_prop_val(_, @not_set), do: ""
|
defp db_prop_val(_, @not_set), do: ""
|
||||||
defp db_prop_val(_, val), do: val
|
defp db_prop_val(_, val), do: val
|
||||||
|
|
||||||
def utc_boundaries(%Query{period: "realtime"}, _timezone) do
|
defp beginning_of_time(candidate, native_stats_start_at) do
|
||||||
last_datetime = NaiveDateTime.utc_now() |> Timex.shift(seconds: 5)
|
if Timex.after?(native_stats_start_at, candidate) do
|
||||||
|
native_stats_start_at
|
||||||
|
else
|
||||||
|
candidate
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def utc_boundaries(%Query{period: "realtime"}, site) do
|
||||||
|
last_datetime =
|
||||||
|
NaiveDateTime.utc_now()
|
||||||
|
|> Timex.shift(seconds: 5)
|
||||||
|
|> beginning_of_time(site.native_stats_start_at)
|
||||||
|
|
||||||
first_datetime = NaiveDateTime.utc_now() |> Timex.shift(minutes: -5)
|
first_datetime = NaiveDateTime.utc_now() |> Timex.shift(minutes: -5)
|
||||||
|
|
||||||
{first_datetime, last_datetime}
|
{first_datetime, last_datetime}
|
||||||
end
|
end
|
||||||
|
|
||||||
def utc_boundaries(%Query{period: "30m"}, _timezone) do
|
def utc_boundaries(%Query{period: "30m"}, site) do
|
||||||
last_datetime = NaiveDateTime.utc_now() |> Timex.shift(seconds: 5)
|
last_datetime =
|
||||||
|
NaiveDateTime.utc_now()
|
||||||
|
|> Timex.shift(seconds: 5)
|
||||||
|
|> beginning_of_time(site.native_stats_start_at)
|
||||||
|
|
||||||
first_datetime = NaiveDateTime.utc_now() |> Timex.shift(minutes: -30)
|
first_datetime = NaiveDateTime.utc_now() |> Timex.shift(minutes: -30)
|
||||||
|
|
||||||
{first_datetime, last_datetime}
|
{first_datetime, last_datetime}
|
||||||
end
|
end
|
||||||
|
|
||||||
def utc_boundaries(%Query{date_range: date_range}, timezone) do
|
def utc_boundaries(%Query{date_range: date_range}, site) do
|
||||||
{:ok, first} = NaiveDateTime.new(date_range.first, ~T[00:00:00])
|
{:ok, first} = NaiveDateTime.new(date_range.first, ~T[00:00:00])
|
||||||
|
|
||||||
first_datetime =
|
first_datetime =
|
||||||
Timex.to_datetime(first, timezone)
|
Timex.to_datetime(first, site.timezone)
|
||||||
|> Timex.Timezone.convert("UTC")
|
|> Timex.Timezone.convert("UTC")
|
||||||
|
|> beginning_of_time(site.native_stats_start_at)
|
||||||
|
|
||||||
{:ok, last} = NaiveDateTime.new(date_range.last |> Timex.shift(days: 1), ~T[00:00:00])
|
{:ok, last} = NaiveDateTime.new(date_range.last |> Timex.shift(days: 1), ~T[00:00:00])
|
||||||
|
|
||||||
last_datetime =
|
last_datetime =
|
||||||
Timex.to_datetime(last, timezone)
|
Timex.to_datetime(last, site.timezone)
|
||||||
|> Timex.Timezone.convert("UTC")
|
|> Timex.Timezone.convert("UTC")
|
||||||
|
|
||||||
{first_datetime, last_datetime}
|
{first_datetime, last_datetime}
|
||||||
|
@ -11,7 +11,8 @@ defmodule Plausible.Stats.Clickhouse do
|
|||||||
ClickhouseRepo.one(
|
ClickhouseRepo.one(
|
||||||
from e in "events",
|
from e in "events",
|
||||||
select: fragment("min(?)", e.timestamp),
|
select: fragment("min(?)", e.timestamp),
|
||||||
where: e.domain == ^site.domain
|
where: e.domain == ^site.domain,
|
||||||
|
where: e.timestamp >= ^site.native_stats_start_at
|
||||||
)
|
)
|
||||||
|
|
||||||
case datetime do
|
case datetime do
|
||||||
@ -155,19 +156,15 @@ defmodule Plausible.Stats.Clickhouse do
|
|||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
def has_pageviews?([]), do: false
|
|
||||||
|
|
||||||
def has_pageviews?(domains) when is_list(domains) do
|
|
||||||
ClickhouseRepo.exists?(
|
|
||||||
from e in "events",
|
|
||||||
select: e.timestamp,
|
|
||||||
where: fragment("? IN tuple(?)", e.domain, ^domains)
|
|
||||||
)
|
|
||||||
end
|
|
||||||
|
|
||||||
def has_pageviews?(site) do
|
def has_pageviews?(site) do
|
||||||
ClickhouseRepo.exists?(
|
ClickhouseRepo.exists?(
|
||||||
from e in "events", where: e.domain == ^site.domain and e.name == "pageview"
|
from(e in "events",
|
||||||
|
where:
|
||||||
|
e.domain == ^site.domain and
|
||||||
|
e.name == "pageview" and
|
||||||
|
e.timestamp >=
|
||||||
|
^site.native_stats_start_at
|
||||||
|
)
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -187,7 +184,7 @@ defmodule Plausible.Stats.Clickhouse do
|
|||||||
end
|
end
|
||||||
|
|
||||||
defp base_session_query(site, query) do
|
defp base_session_query(site, query) do
|
||||||
{first_datetime, last_datetime} = utc_boundaries(query, site.timezone)
|
{first_datetime, last_datetime} = utc_boundaries(query, site)
|
||||||
|
|
||||||
q =
|
q =
|
||||||
from(s in "sessions",
|
from(s in "sessions",
|
||||||
@ -306,7 +303,7 @@ defmodule Plausible.Stats.Clickhouse do
|
|||||||
end
|
end
|
||||||
|
|
||||||
defp base_query_bare(site, query) do
|
defp base_query_bare(site, query) do
|
||||||
{first_datetime, last_datetime} = utc_boundaries(query, site.timezone)
|
{first_datetime, last_datetime} = utc_boundaries(query, site)
|
||||||
|
|
||||||
q =
|
q =
|
||||||
from(e in "events",
|
from(e in "events",
|
||||||
@ -437,31 +434,36 @@ defmodule Plausible.Stats.Clickhouse do
|
|||||||
base_query_bare(site, query) |> include_goal_conversions(query)
|
base_query_bare(site, query) |> include_goal_conversions(query)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp utc_boundaries(%Query{period: "30m"}, _timezone) do
|
defp utc_boundaries(%Query{period: "30m"}, site) do
|
||||||
last_datetime = NaiveDateTime.utc_now()
|
last_datetime = NaiveDateTime.utc_now()
|
||||||
|
|
||||||
first_datetime = last_datetime |> Timex.shift(minutes: -30)
|
first_datetime =
|
||||||
|
last_datetime |> Timex.shift(minutes: -30) |> beginning_of_time(site.native_stats_start_at)
|
||||||
|
|
||||||
{first_datetime, last_datetime}
|
{first_datetime, last_datetime}
|
||||||
end
|
end
|
||||||
|
|
||||||
defp utc_boundaries(%Query{period: "realtime"}, _timezone) do
|
defp utc_boundaries(%Query{period: "realtime"}, site) do
|
||||||
last_datetime = NaiveDateTime.utc_now()
|
last_datetime = NaiveDateTime.utc_now()
|
||||||
|
|
||||||
first_datetime = last_datetime |> Timex.shift(minutes: -5)
|
first_datetime =
|
||||||
|
last_datetime |> Timex.shift(minutes: -5) |> beginning_of_time(site.native_stats_start_at)
|
||||||
|
|
||||||
{first_datetime, last_datetime}
|
{first_datetime, last_datetime}
|
||||||
end
|
end
|
||||||
|
|
||||||
defp utc_boundaries(%Query{date_range: date_range}, timezone) do
|
defp utc_boundaries(%Query{date_range: date_range}, site) do
|
||||||
{:ok, first} = NaiveDateTime.new(date_range.first, ~T[00:00:00])
|
{:ok, first} = NaiveDateTime.new(date_range.first, ~T[00:00:00])
|
||||||
|
|
||||||
first_datetime =
|
first_datetime =
|
||||||
Timex.to_datetime(first, timezone)
|
Timex.to_datetime(first, site.timezone)
|
||||||
|> Timex.Timezone.convert("UTC")
|
|> Timex.Timezone.convert("UTC")
|
||||||
|
|> beginning_of_time(site.native_stats_start_at)
|
||||||
|
|
||||||
{:ok, last} = NaiveDateTime.new(date_range.last |> Timex.shift(days: 1), ~T[00:00:00])
|
{:ok, last} = NaiveDateTime.new(date_range.last |> Timex.shift(days: 1), ~T[00:00:00])
|
||||||
|
|
||||||
last_datetime =
|
last_datetime =
|
||||||
Timex.to_datetime(last, timezone)
|
Timex.to_datetime(last, site.timezone)
|
||||||
|> Timex.Timezone.convert("UTC")
|
|> Timex.Timezone.convert("UTC")
|
||||||
|
|
||||||
{first_datetime, last_datetime}
|
{first_datetime, last_datetime}
|
||||||
@ -589,4 +591,12 @@ defmodule Plausible.Stats.Clickhouse do
|
|||||||
db_query
|
db_query
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp beginning_of_time(candidate, site_creation_date) do
|
||||||
|
if Timex.after?(site_creation_date, candidate) do
|
||||||
|
site_creation_date
|
||||||
|
else
|
||||||
|
candidate
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
defmodule Plausible.Stats.Imported do
|
defmodule Plausible.Stats.Imported do
|
||||||
use Plausible.ClickhouseRepo
|
use Plausible.ClickhouseRepo
|
||||||
alias Plausible.Stats.Query
|
alias Plausible.Stats.Query
|
||||||
|
|
||||||
import Ecto.Query
|
import Ecto.Query
|
||||||
import Plausible.Stats.Base
|
|
||||||
import Plausible.Stats.Fragments
|
import Plausible.Stats.Fragments
|
||||||
|
|
||||||
@no_ref "Direct / None"
|
@no_ref "Direct / None"
|
||||||
@ -23,7 +23,7 @@ defmodule Plausible.Stats.Imported do
|
|||||||
select: %{}
|
select: %{}
|
||||||
)
|
)
|
||||||
|> select_imported_metrics(metrics)
|
|> select_imported_metrics(metrics)
|
||||||
|> apply_interval(query, site.timezone)
|
|> apply_interval(query, site)
|
||||||
|
|
||||||
from(s in Ecto.Query.subquery(native_q),
|
from(s in Ecto.Query.subquery(native_q),
|
||||||
full_join: i in subquery(imported_q),
|
full_join: i in subquery(imported_q),
|
||||||
@ -33,21 +33,19 @@ defmodule Plausible.Stats.Imported do
|
|||||||
|> select_joined_metrics(metrics)
|
|> select_joined_metrics(metrics)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp apply_interval(imported_q, %Plausible.Stats.Query{interval: "month"}, _timezone) do
|
defp apply_interval(imported_q, %Plausible.Stats.Query{interval: "month"}, _site) do
|
||||||
imported_q
|
imported_q
|
||||||
|> group_by([i], fragment("toStartOfMonth(?)", i.date))
|
|> group_by([i], fragment("toStartOfMonth(?)", i.date))
|
||||||
|> select_merge([i], %{date: fragment("toStartOfMonth(?)", i.date)})
|
|> select_merge([i], %{date: fragment("toStartOfMonth(?)", i.date)})
|
||||||
end
|
end
|
||||||
|
|
||||||
defp apply_interval(imported_q, %Plausible.Stats.Query{interval: "week"} = query, timezone) do
|
defp apply_interval(imported_q, %Plausible.Stats.Query{interval: "week"} = query, _site) do
|
||||||
{first_datetime, _} = utc_boundaries(query, timezone)
|
|
||||||
|
|
||||||
imported_q
|
imported_q
|
||||||
|> group_by([i], weekstart_not_before(i.date, ^first_datetime))
|
|> group_by([i], weekstart_not_before(i.date, ^query.date_range.first))
|
||||||
|> select_merge([i], %{date: weekstart_not_before(i.date, ^first_datetime)})
|
|> select_merge([i], %{date: weekstart_not_before(i.date, ^query.date_range.first)})
|
||||||
end
|
end
|
||||||
|
|
||||||
defp apply_interval(imported_q, _query, _timezone) do
|
defp apply_interval(imported_q, _query, _site) do
|
||||||
imported_q
|
imported_q
|
||||||
|> group_by([i], i.date)
|
|> group_by([i], i.date)
|
||||||
|> select_merge([i], %{date: i.date})
|
|> select_merge([i], %{date: i.date})
|
||||||
|
@ -121,7 +121,7 @@ defmodule Plausible.Stats.Timeseries do
|
|||||||
end
|
end
|
||||||
|
|
||||||
defp select_bucket(q, site, %Query{interval: "week"} = query) do
|
defp select_bucket(q, site, %Query{interval: "week"} = query) do
|
||||||
{first_datetime, _} = utc_boundaries(query, site.timezone)
|
{first_datetime, _} = utc_boundaries(query, site)
|
||||||
|
|
||||||
from(
|
from(
|
||||||
e in q,
|
e in q,
|
||||||
|
@ -4,7 +4,9 @@ defmodule PlausibleWeb.Api.InternalController do
|
|||||||
alias Plausible.Stats.Clickhouse, as: Stats
|
alias Plausible.Stats.Clickhouse, as: Stats
|
||||||
|
|
||||||
def domain_status(conn, %{"domain" => domain}) do
|
def domain_status(conn, %{"domain" => domain}) do
|
||||||
if Stats.has_pageviews?(%Plausible.Site{domain: domain}) do
|
site = Plausible.Sites.get_by_domain(domain)
|
||||||
|
|
||||||
|
if Stats.has_pageviews?(site) do
|
||||||
json(conn, "READY")
|
json(conn, "READY")
|
||||||
else
|
else
|
||||||
json(conn, "WAITING")
|
json(conn, "WAITING")
|
||||||
|
@ -349,7 +349,7 @@ defmodule PlausibleWeb.SiteController do
|
|||||||
|
|
||||||
def reset_stats(conn, _params) do
|
def reset_stats(conn, _params) do
|
||||||
site = conn.assigns[:site]
|
site = conn.assigns[:site]
|
||||||
Plausible.Purge.delete_native_stats!(site)
|
Plausible.Purge.reset!(site)
|
||||||
|
|
||||||
conn
|
conn
|
||||||
|> put_flash(:success, "#{site.domain} stats will be reset in a few minutes")
|
|> put_flash(:success, "#{site.domain} stats will be reset in a few minutes")
|
||||||
|
@ -1,59 +0,0 @@
|
|||||||
defmodule Plausible.Workers.StatsRemoval do
|
|
||||||
@moduledoc """
|
|
||||||
Asynchronous worker firing deletion mutations to clickhouse.
|
|
||||||
For now only ALTER TABLE deletions are supported. Experimental
|
|
||||||
DELETE FROM support is going to be introduced once production db
|
|
||||||
is upgraded.
|
|
||||||
|
|
||||||
At most 3 attempts are made, with 15m backoff value.
|
|
||||||
|
|
||||||
Imported stats tables keep site reference through a numeric id, whilist
|
|
||||||
events and sessions store domain as-is - hence two different deletes,
|
|
||||||
one of which cannot be performed anymore once the site identifier is permanently
|
|
||||||
gone from postgres.
|
|
||||||
"""
|
|
||||||
use Plausible.Repo
|
|
||||||
|
|
||||||
use Oban.Worker,
|
|
||||||
queue: :site_stats_removal,
|
|
||||||
max_attempts: 3,
|
|
||||||
unique: [period: :infinity, fields: [:args]]
|
|
||||||
|
|
||||||
@impl Oban.Worker
|
|
||||||
def perform(%{args: args}) do
|
|
||||||
domain = Map.fetch!(args, "domain")
|
|
||||||
site_id = Map.get(args, "site_id")
|
|
||||||
|
|
||||||
imported_result = delete_imported!(site_id)
|
|
||||||
native_result = delete_native!(domain)
|
|
||||||
|
|
||||||
{:ok, Map.merge(imported_result, native_result)}
|
|
||||||
end
|
|
||||||
|
|
||||||
@impl Oban.Worker
|
|
||||||
def backoff(_job) do
|
|
||||||
15 * 60
|
|
||||||
end
|
|
||||||
|
|
||||||
defp delete_imported!(nil) do
|
|
||||||
%{}
|
|
||||||
end
|
|
||||||
|
|
||||||
defp delete_imported!(id) when is_integer(id) do
|
|
||||||
Enum.map(Plausible.Imported.tables(), fn table ->
|
|
||||||
sql = "ALTER TABLE #{table} DELETE WHERE site_id = ?"
|
|
||||||
{table, Ecto.Adapters.SQL.query!(Plausible.ClickhouseRepo, sql, [id])}
|
|
||||||
end)
|
|
||||||
|> Enum.into(%{})
|
|
||||||
end
|
|
||||||
|
|
||||||
defp delete_native!(domain) do
|
|
||||||
events_sql = "ALTER TABLE events DELETE WHERE domain = ?"
|
|
||||||
sessions_sql = "ALTER TABLE sessions DELETE WHERE domain = ?"
|
|
||||||
|
|
||||||
%{
|
|
||||||
"events" => Ecto.Adapters.SQL.query!(Plausible.ClickhouseRepo, events_sql, [domain]),
|
|
||||||
"sessions" => Ecto.Adapters.SQL.query!(Plausible.ClickhouseRepo, sessions_sql, [domain])
|
|
||||||
}
|
|
||||||
end
|
|
||||||
end
|
|
@ -12,16 +12,29 @@
|
|||||||
|
|
||||||
user = Plausible.Factory.insert(:user, email: "user@plausible.test", password: "plausible")
|
user = Plausible.Factory.insert(:user, email: "user@plausible.test", password: "plausible")
|
||||||
|
|
||||||
site = Plausible.Factory.insert(:site, domain: "dummy.site")
|
beginning_of_time = NaiveDateTime.add(NaiveDateTime.utc_now(), -721, :day)
|
||||||
|
|
||||||
|
site =
|
||||||
|
Plausible.Factory.insert(:site, domain: "dummy.site", native_stats_start_at: beginning_of_time)
|
||||||
|
|
||||||
_membership = Plausible.Factory.insert(:site_membership, user: user, site: site, role: :owner)
|
_membership = Plausible.Factory.insert(:site_membership, user: user, site: site, role: :owner)
|
||||||
|
|
||||||
put_random_time = fn date ->
|
put_random_time = fn
|
||||||
random_time = Time.new!(:rand.uniform(23), :rand.uniform(59), 0)
|
date, 0 ->
|
||||||
|
current_hour = Time.utc_now().hour
|
||||||
|
current_minute = Time.utc_now().minute
|
||||||
|
random_time = Time.new!(:rand.uniform(current_hour), :rand.uniform(current_minute - 1), 0)
|
||||||
|
|
||||||
date
|
date
|
||||||
|> NaiveDateTime.new!(random_time)
|
|> NaiveDateTime.new!(random_time)
|
||||||
|> NaiveDateTime.truncate(:second)
|
|> NaiveDateTime.truncate(:second)
|
||||||
|
|
||||||
|
date, _ ->
|
||||||
|
random_time = Time.new!(:rand.uniform(23), :rand.uniform(59), 0)
|
||||||
|
|
||||||
|
date
|
||||||
|
|> NaiveDateTime.new!(random_time)
|
||||||
|
|> NaiveDateTime.truncate(:second)
|
||||||
end
|
end
|
||||||
|
|
||||||
geolocations = [
|
geolocations = [
|
||||||
@ -74,7 +87,7 @@ Enum.flat_map(-720..0, fn day_index ->
|
|||||||
[
|
[
|
||||||
domain: site.domain,
|
domain: site.domain,
|
||||||
hostname: site.domain,
|
hostname: site.domain,
|
||||||
timestamp: put_random_time.(date),
|
timestamp: put_random_time.(date, day_index),
|
||||||
referrer_source: Enum.random(["", "Facebook", "Twitter", "DuckDuckGo", "Google"]),
|
referrer_source: Enum.random(["", "Facebook", "Twitter", "DuckDuckGo", "Google"]),
|
||||||
browser: Enum.random(["Edge", "Chrome", "Safari", "Firefox", "Vivaldi"]),
|
browser: Enum.random(["Edge", "Chrome", "Safari", "Firefox", "Vivaldi"]),
|
||||||
browser_version: to_string(Enum.random(0..50)),
|
browser_version: to_string(Enum.random(0..50)),
|
||||||
|
@ -39,17 +39,13 @@ defmodule Plausible.PurgeTest do
|
|||||||
assert %Plausible.Site{stats_start_date: nil} = Plausible.Repo.reload(site)
|
assert %Plausible.Site{stats_start_date: nil} = Plausible.Repo.reload(site)
|
||||||
end
|
end
|
||||||
|
|
||||||
test "delete_native_stats!/1 deletes native stats", %{site: site} do
|
test "delete_native_stats!/1 moves the native_stats_start_at pointer", %{site: site} do
|
||||||
events_query = from(s in Plausible.ClickhouseEvent, where: s.domain == ^site.domain)
|
|
||||||
assert await_clickhouse_count(events_query, 1)
|
|
||||||
|
|
||||||
sessions_query = from(s in Plausible.ClickhouseSession, where: s.domain == ^site.domain)
|
|
||||||
assert await_clickhouse_count(sessions_query, 1)
|
|
||||||
|
|
||||||
assert :ok == Plausible.Purge.delete_native_stats!(site)
|
assert :ok == Plausible.Purge.delete_native_stats!(site)
|
||||||
|
|
||||||
assert await_clickhouse_count(events_query, 0)
|
assert %Plausible.Site{native_stats_start_at: native_stats_start_at} =
|
||||||
assert await_clickhouse_count(sessions_query, 0)
|
Plausible.Repo.reload(site)
|
||||||
|
|
||||||
|
assert NaiveDateTime.compare(native_stats_start_at, site.native_stats_start_at) == :gt
|
||||||
end
|
end
|
||||||
|
|
||||||
test "delete_native_stats!/1 resets stats_start_date", %{site: site} do
|
test "delete_native_stats!/1 resets stats_start_date", %{site: site} do
|
||||||
|
@ -4,113 +4,16 @@ defmodule Plausible.Site.SiteRemovalTest do
|
|||||||
|
|
||||||
alias Plausible.Site.Removal
|
alias Plausible.Site.Removal
|
||||||
alias Plausible.Sites
|
alias Plausible.Sites
|
||||||
alias Plausible.Workers.StatsRemoval
|
|
||||||
|
|
||||||
describe "execution and scheduling" do
|
test "site from postgres is immediately deleted" do
|
||||||
test "site from postgres is immediately deleted" do
|
site = insert(:site)
|
||||||
site = insert(:site)
|
assert {:ok, context} = Removal.run(site.domain)
|
||||||
assert {:ok, context} = Removal.run(site.domain)
|
assert context.delete_all == {1, nil}
|
||||||
assert context.delete_all == {1, nil}
|
refute Sites.get_by_domain(site.domain)
|
||||||
assert context.site_id == site.id
|
|
||||||
refute Sites.get_by_domain(site.domain)
|
|
||||||
end
|
|
||||||
|
|
||||||
test "deletion is idempotent" do
|
|
||||||
assert {:ok, context} = Removal.run("some.example.com")
|
|
||||||
assert context.delete_all == {0, nil}
|
|
||||||
end
|
|
||||||
|
|
||||||
test "stats deletion job is scheduled when no site exists in postgres" do
|
|
||||||
assert {:ok, _} = Removal.run("a.domain.example.com")
|
|
||||||
|
|
||||||
assert_enqueued(
|
|
||||||
worker: StatsRemoval,
|
|
||||||
args: %{"domain" => "a.domain.example.com", "site_id" => nil}
|
|
||||||
)
|
|
||||||
end
|
|
||||||
|
|
||||||
test "stats deletion job is scheduled when site exists in postgres" do
|
|
||||||
site = insert(:site)
|
|
||||||
assert {:ok, _} = Removal.run(site.domain)
|
|
||||||
|
|
||||||
assert_enqueued(
|
|
||||||
worker: StatsRemoval,
|
|
||||||
args: %{"domain" => site.domain, "site_id" => site.id}
|
|
||||||
)
|
|
||||||
end
|
|
||||||
|
|
||||||
test "stats deletion is always scheduled ~20m in the future" do
|
|
||||||
assert {:ok, _} = Removal.run("foo.example.com")
|
|
||||||
|
|
||||||
in_20m = DateTime.add(DateTime.utc_now(), 1200, :second)
|
|
||||||
|
|
||||||
assert_enqueued(
|
|
||||||
worker: StatsRemoval,
|
|
||||||
scheduled_at: {in_20m, delta: 5}
|
|
||||||
)
|
|
||||||
end
|
|
||||||
|
|
||||||
test "stats deletion is always scheduled late enough for sites cache to expire" do
|
|
||||||
delay_ms = Removal.stats_deletion_delay_seconds() * 1000
|
|
||||||
assert delay_ms > Plausible.Site.Cache.Warmer.interval()
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
describe "the background worker" do
|
test "deletion is idempotent" do
|
||||||
test "the job runs deletes at clickhouse" do
|
assert {:ok, context} = Removal.run("some.example.com")
|
||||||
assert {:ok, %{"events" => r, "sessions" => r}} =
|
assert context.delete_all == {0, nil}
|
||||||
perform_job(StatsRemoval, %{"domain" => "foo.example.com"})
|
|
||||||
|
|
||||||
assert %Clickhousex.Result{command: :updated} = r
|
|
||||||
|
|
||||||
assert {:ok, %{"events" => r, "sessions" => r, "imported_browsers" => r}} =
|
|
||||||
perform_job(StatsRemoval, %{"domain" => "foo.example.com", "site_id" => 777})
|
|
||||||
|
|
||||||
assert %Clickhousex.Result{command: :updated} = r
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
describe "integration" do
|
|
||||||
setup do
|
|
||||||
site = insert(:site, stats_start_date: ~D[2020-01-01])
|
|
||||||
|
|
||||||
populate_stats(site, [
|
|
||||||
build(:pageview),
|
|
||||||
build(:imported_visitors),
|
|
||||||
build(:imported_sources),
|
|
||||||
build(:imported_pages),
|
|
||||||
build(:imported_entry_pages),
|
|
||||||
build(:imported_exit_pages),
|
|
||||||
build(:imported_locations),
|
|
||||||
build(:imported_devices),
|
|
||||||
build(:imported_browsers),
|
|
||||||
build(:imported_operating_systems)
|
|
||||||
])
|
|
||||||
|
|
||||||
{:ok, %{site: site}}
|
|
||||||
end
|
|
||||||
|
|
||||||
test "the job actually removes stats from clickhouse", %{site: site} do
|
|
||||||
Enum.each(Plausible.Imported.tables(), fn table ->
|
|
||||||
query = from(imported in table, where: imported.site_id == ^site.id)
|
|
||||||
assert await_clickhouse_count(query, 1)
|
|
||||||
end)
|
|
||||||
|
|
||||||
events_query = from(s in Plausible.ClickhouseEvent, where: s.domain == ^site.domain)
|
|
||||||
assert await_clickhouse_count(events_query, 1)
|
|
||||||
|
|
||||||
sessions_query = from(s in Plausible.ClickhouseSession, where: s.domain == ^site.domain)
|
|
||||||
assert await_clickhouse_count(sessions_query, 1)
|
|
||||||
|
|
||||||
perform_job(StatsRemoval, %{"domain" => site.domain, "site_id" => site.id})
|
|
||||||
|
|
||||||
assert await_clickhouse_count(events_query, 0)
|
|
||||||
assert await_clickhouse_count(sessions_query, 0)
|
|
||||||
|
|
||||||
Enum.each(Plausible.Imported.tables(), fn table ->
|
|
||||||
query = from(imported in table, where: imported.site_id == ^site.id)
|
|
||||||
assert await_clickhouse_count(query, 0)
|
|
||||||
end)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -79,29 +79,6 @@ defmodule PlausibleWeb.Api.ExternalSitesControllerTest do
|
|||||||
}
|
}
|
||||||
end
|
end
|
||||||
|
|
||||||
test "does not allow creating a site when external events are present", %{
|
|
||||||
conn: conn
|
|
||||||
} do
|
|
||||||
domain = "events-exist.example.com"
|
|
||||||
|
|
||||||
populate_stats(%{domain: domain}, [
|
|
||||||
build(:pageview)
|
|
||||||
])
|
|
||||||
|
|
||||||
:inserted = eventually(fn -> {Plausible.Sites.has_events?(domain), :inserted} end)
|
|
||||||
|
|
||||||
conn =
|
|
||||||
post(conn, "/api/v1/sites", %{
|
|
||||||
"domain" => domain,
|
|
||||||
"timezone" => "Europe/Tallinn"
|
|
||||||
})
|
|
||||||
|
|
||||||
assert json_response(conn, 400) == %{
|
|
||||||
"error" =>
|
|
||||||
"domain: This domain cannot be registered. Perhaps one of your colleagues registered it? Or did you recently delete it from your account? The deletion may take up to 48 hours before you can add the same site again. If that's not the case, please contact support@plausible.io"
|
|
||||||
}
|
|
||||||
end
|
|
||||||
|
|
||||||
test "cannot access with a bad API key scope", %{conn: conn, user: user} do
|
test "cannot access with a bad API key scope", %{conn: conn, user: user} do
|
||||||
api_key = insert(:api_key, user: user, scopes: ["stats:read:*"])
|
api_key = insert(:api_key, user: user, scopes: ["stats:read:*"])
|
||||||
|
|
||||||
|
@ -41,7 +41,12 @@ defmodule PlausibleWeb.Api.StatsController.MainGraphTest do
|
|||||||
|
|
||||||
test "displays hourly stats in configured timezone", %{conn: conn, user: user} do
|
test "displays hourly stats in configured timezone", %{conn: conn, user: user} do
|
||||||
# UTC+1
|
# UTC+1
|
||||||
site = insert(:site, domain: "tz-test.com", members: [user], timezone: "CET")
|
site =
|
||||||
|
insert(:site,
|
||||||
|
domain: "tz-test.com",
|
||||||
|
members: [user],
|
||||||
|
timezone: "CET"
|
||||||
|
)
|
||||||
|
|
||||||
populate_stats(site, [
|
populate_stats(site, [
|
||||||
build(:pageview, timestamp: ~N[2021-01-01 00:00:00])
|
build(:pageview, timestamp: ~N[2021-01-01 00:00:00])
|
||||||
|
@ -108,29 +108,6 @@ defmodule PlausibleWeb.SiteControllerTest do
|
|||||||
assert Repo.get_by(Plausible.Site, domain: "example.com")
|
assert Repo.get_by(Plausible.Site, domain: "example.com")
|
||||||
end
|
end
|
||||||
|
|
||||||
test "refuses to create the site when events exist (pending deletion)", %{conn: conn} do
|
|
||||||
domain = "events-exist.example.com"
|
|
||||||
|
|
||||||
populate_stats(%{domain: domain}, [
|
|
||||||
build(:pageview)
|
|
||||||
])
|
|
||||||
|
|
||||||
:inserted = eventually(fn -> {Plausible.Sites.has_events?(domain), :inserted} end)
|
|
||||||
|
|
||||||
conn =
|
|
||||||
post(conn, "/sites", %{
|
|
||||||
"site" => %{
|
|
||||||
"domain" => domain,
|
|
||||||
"timezone" => "Europe/London"
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
assert html = html_response(conn, 200)
|
|
||||||
assert html =~ "This domain cannot be registered"
|
|
||||||
assert html =~ "please contact support"
|
|
||||||
refute Repo.get_by(Plausible.Site, domain: domain)
|
|
||||||
end
|
|
||||||
|
|
||||||
test "starts trial if user does not have trial yet", %{conn: conn, user: user} do
|
test "starts trial if user does not have trial yet", %{conn: conn, user: user} do
|
||||||
Plausible.Auth.User.remove_trial_expiry(user) |> Repo.update!()
|
Plausible.Auth.User.remove_trial_expiry(user) |> Repo.update!()
|
||||||
|
|
||||||
|
@ -25,6 +25,7 @@ defmodule Plausible.Factory do
|
|||||||
domain = sequence(:domain, &"example-#{&1}.com")
|
domain = sequence(:domain, &"example-#{&1}.com")
|
||||||
|
|
||||||
%Plausible.Site{
|
%Plausible.Site{
|
||||||
|
native_stats_start_at: ~N[2000-01-01 00:00:00],
|
||||||
domain: domain,
|
domain: domain,
|
||||||
timezone: "UTC"
|
timezone: "UTC"
|
||||||
}
|
}
|
||||||
|
@ -35,7 +35,12 @@ defmodule Plausible.TestUtils do
|
|||||||
end
|
end
|
||||||
|
|
||||||
def create_site(%{user: user}) do
|
def create_site(%{user: user}) do
|
||||||
site = Factory.insert(:site, domain: "test-site.com", members: [user])
|
site =
|
||||||
|
Factory.insert(:site,
|
||||||
|
domain: "test-site.com",
|
||||||
|
members: [user]
|
||||||
|
)
|
||||||
|
|
||||||
{:ok, site: site}
|
{:ok, site: site}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -24,7 +24,11 @@ defmodule Plausible.Workers.SendEmailReportTest do
|
|||||||
end
|
end
|
||||||
|
|
||||||
test "calculates timezone correctly" do
|
test "calculates timezone correctly" do
|
||||||
site = insert(:site, timezone: "US/Eastern")
|
site =
|
||||||
|
insert(:site,
|
||||||
|
timezone: "US/Eastern"
|
||||||
|
)
|
||||||
|
|
||||||
insert(:weekly_report, site: site, recipients: ["user@email.com"])
|
insert(:weekly_report, site: site, recipients: ["user@email.com"])
|
||||||
|
|
||||||
now = Timex.now(site.timezone)
|
now = Timex.now(site.timezone)
|
||||||
@ -61,9 +65,9 @@ defmodule Plausible.Workers.SendEmailReportTest do
|
|||||||
end
|
end
|
||||||
|
|
||||||
test "includes the correct stats" do
|
test "includes the correct stats" do
|
||||||
site = insert(:site, domain: "test-site.com")
|
|
||||||
insert(:weekly_report, site: site, recipients: ["user@email.com"])
|
|
||||||
now = NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)
|
now = NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)
|
||||||
|
site = insert(:site, domain: "test-site.com", inserted_at: Timex.shift(now, days: -8))
|
||||||
|
insert(:weekly_report, site: site, recipients: ["user@email.com"])
|
||||||
|
|
||||||
populate_stats(site, [
|
populate_stats(site, [
|
||||||
build(:pageview,
|
build(:pageview,
|
||||||
|
@ -40,6 +40,7 @@ defmodule Plausible.Workers.SendSiteSetupEmailsTest do
|
|||||||
describe "when user has managed to set up their site" do
|
describe "when user has managed to set up their site" do
|
||||||
test "sends the setup completed email as soon as possible" do
|
test "sends the setup completed email as soon as possible" do
|
||||||
user = insert(:user)
|
user = insert(:user)
|
||||||
|
|
||||||
insert(:site, members: [user], domain: "test-site.com")
|
insert(:site, members: [user], domain: "test-site.com")
|
||||||
|
|
||||||
perform_job(SendSiteSetupEmails, %{})
|
perform_job(SendSiteSetupEmails, %{})
|
||||||
|
Loading…
Reference in New Issue
Block a user