mirror of
https://github.com/plausible/analytics.git
synced 2024-11-23 03:04:43 +03:00
f8b4d5066a
* Clean up references to no longer active `google_analytics_imports` Oban queue * Stub CSV importer * Add SiteImport schema * Rename `Plausible.Imported` module file to match module name * Add `import_id` column to `Imported.*` CH schemas * Implement Importer behavior and manage imports state using new entities * Implement importer callbacks and maintain site.imported_data for UA * Keep imports in sync when forgetting all imports * Scope imported data queries to completed import IDs * Mark newly imported data with respective import ID * Clean up Importer implementation a bit * Test querying legacy and new imported data * Send Oban notifications on import worker failure too * Fix checking for forgettable imports and remove redundant function * Fix UA integration test * Change site import source to atom enum and add source label * Add typespecs and reduce repetition in `Plausible.Imported` * Improve documentation and typespecs * Add test for purging particular import * Switch email notification templates depending on import source * Document running import synchronously * Fix UA importer args parsing and ensure it's covered by tests * Clear `site.stats_start_date` on complete import to force recalculation * Test Oban notifications (h/t @ruslandoga) * Purge stats on import failure right away to reduce a chance of leaving debris behind * Fix typos Co-authored-by: hq1 <hq@mtod.org> * Fix another typo * Refactor fetching earliest import and earliest stats start date * Use `Date.after?` instead of `Timex.after?` * Cache import data in site virtual fields and limit queried imports to 5 * Ensure always current `stats_start_date` is used * Work around broken typespec in Timex * Make `SiteController.forget_imported` action idempotent * Discard irrecoverably failed import tasks * Use macros for site import statuses There's also a fix ensuring only complete imports are considered where relevant - couldn't isolate it as it was in a common hunk * Use `import_id` as worker job uniqueness criterion * Do not load imported stats data in plugins API context --------- Co-authored-by: hq1 <hq@mtod.org>
264 lines
6.3 KiB
Elixir
264 lines
6.3 KiB
Elixir
defmodule Plausible.TestUtils do
|
|
use Plausible.Repo
|
|
alias Plausible.Factory
|
|
|
|
defmacro __using__(_) do
|
|
quote do
|
|
require Plausible.TestUtils
|
|
import Plausible.TestUtils
|
|
end
|
|
end
|
|
|
|
defmacro patch_env(env_key, value) do
|
|
quote do
|
|
if __MODULE__.__info__(:attributes)[:ex_unit_async] == [true] do
|
|
raise "Patching env is unsafe in asynchronous tests. maybe extract the case elsewhere?"
|
|
end
|
|
|
|
original_env = Application.get_env(:plausible, unquote(env_key))
|
|
Application.put_env(:plausible, unquote(env_key), unquote(value))
|
|
|
|
on_exit(fn ->
|
|
Application.put_env(:plausible, unquote(env_key), original_env)
|
|
end)
|
|
|
|
{:ok, %{patched_env: true}}
|
|
end
|
|
end
|
|
|
|
defmacro setup_patch_env(env_key, value) do
|
|
quote do
|
|
setup do
|
|
patch_env(unquote(env_key), unquote(value))
|
|
end
|
|
end
|
|
end
|
|
|
|
def create_user(_) do
|
|
{:ok, user: Factory.insert(:user)}
|
|
end
|
|
|
|
def create_site(%{user: user}) do
|
|
site =
|
|
Factory.insert(:site,
|
|
members: [user]
|
|
)
|
|
|
|
{:ok, site: site}
|
|
end
|
|
|
|
def add_imported_data(%{site: site}) do
|
|
site =
|
|
site
|
|
|> Plausible.Site.start_import(~D[2005-01-01], Timex.today(), "Google Analytics", "ok")
|
|
|> Repo.update!()
|
|
|
|
{:ok, site: site}
|
|
end
|
|
|
|
def create_new_site(%{user: user}) do
|
|
site = Factory.insert(:site, members: [user])
|
|
{:ok, site: site}
|
|
end
|
|
|
|
def create_api_key(%{user: user}) do
|
|
api_key = Factory.insert(:api_key, user: user)
|
|
|
|
{:ok, api_key: api_key.key}
|
|
end
|
|
|
|
def use_api_key(%{conn: conn, api_key: api_key}) do
|
|
conn = Plug.Conn.put_req_header(conn, "authorization", "Bearer #{api_key}")
|
|
|
|
{:ok, conn: conn}
|
|
end
|
|
|
|
def create_pageviews(pageviews) do
|
|
pageviews =
|
|
Enum.map(pageviews, fn pageview ->
|
|
pageview =
|
|
pageview
|
|
|> Map.delete(:site)
|
|
|> Map.put(:site_id, pageview.site.id)
|
|
|
|
Factory.build(:pageview, pageview)
|
|
|> Map.from_struct()
|
|
|> Map.delete(:__meta__)
|
|
|> update_in([:timestamp], &to_naive_truncate/1)
|
|
end)
|
|
|
|
Plausible.IngestRepo.insert_all(Plausible.ClickhouseEventV2, pageviews)
|
|
end
|
|
|
|
def create_events(events) do
|
|
events =
|
|
Enum.map(events, fn event ->
|
|
Factory.build(:event, event)
|
|
|> Map.from_struct()
|
|
|> Map.delete(:__meta__)
|
|
|> update_in([:timestamp], &to_naive_truncate/1)
|
|
end)
|
|
|
|
Plausible.IngestRepo.insert_all(Plausible.ClickhouseEventV2, events)
|
|
end
|
|
|
|
def create_sessions(sessions) do
|
|
sessions =
|
|
Enum.map(sessions, fn session ->
|
|
Factory.build(:ch_session, session)
|
|
|> Map.from_struct()
|
|
|> Map.delete(:__meta__)
|
|
|> update_in([:timestamp], &to_naive_truncate/1)
|
|
|> update_in([:start], &to_naive_truncate/1)
|
|
end)
|
|
|
|
Plausible.IngestRepo.insert_all(Plausible.ClickhouseSessionV2, sessions)
|
|
end
|
|
|
|
def log_in(%{user: user, conn: conn}) do
|
|
conn =
|
|
init_session(conn)
|
|
|> Plug.Conn.put_session(:current_user_id, user.id)
|
|
|
|
{:ok, conn: conn}
|
|
end
|
|
|
|
def init_session(conn) do
|
|
opts =
|
|
Plug.Session.init(
|
|
store: :cookie,
|
|
key: "foobar",
|
|
encryption_salt: "encrypted cookie salt",
|
|
signing_salt: "signing salt",
|
|
log: false,
|
|
encrypt: false
|
|
)
|
|
|
|
conn
|
|
|> Plug.Session.call(opts)
|
|
|> Plug.Conn.fetch_session()
|
|
end
|
|
|
|
def generate_usage_for(site, i, timestamp \\ NaiveDateTime.utc_now()) do
|
|
events = for _i <- 1..i, do: Factory.build(:pageview, timestamp: timestamp)
|
|
populate_stats(site, events)
|
|
:ok
|
|
end
|
|
|
|
def populate_stats(site, import_id, events) do
|
|
Enum.map(events, fn event ->
|
|
event = Map.put(event, :site_id, site.id)
|
|
|
|
case event do
|
|
%Plausible.ClickhouseEventV2{} ->
|
|
event
|
|
|
|
imported_event ->
|
|
Map.put(imported_event, :import_id, import_id)
|
|
end
|
|
end)
|
|
|> populate_stats
|
|
end
|
|
|
|
def populate_stats(site, events) do
|
|
Enum.map(events, fn event ->
|
|
Map.put(event, :site_id, site.id)
|
|
end)
|
|
|> populate_stats
|
|
end
|
|
|
|
def populate_stats(events) do
|
|
{native, imported} =
|
|
events
|
|
|> Enum.map(fn event ->
|
|
case event do
|
|
%{timestamp: timestamp} ->
|
|
%{event | timestamp: to_naive_truncate(timestamp)}
|
|
|
|
_other ->
|
|
event
|
|
end
|
|
end)
|
|
|> Enum.split_with(fn event ->
|
|
case event do
|
|
%Plausible.ClickhouseEventV2{} ->
|
|
true
|
|
|
|
_ ->
|
|
false
|
|
end
|
|
end)
|
|
|
|
populate_native_stats(native)
|
|
populate_imported_stats(imported)
|
|
end
|
|
|
|
defp populate_native_stats(events) do
|
|
sessions =
|
|
Enum.reduce(events, %{}, fn event, sessions ->
|
|
session_id = Plausible.Session.CacheStore.on_event(event, nil)
|
|
Map.put(sessions, {event.site_id, event.user_id}, session_id)
|
|
end)
|
|
|
|
Enum.each(events, fn event ->
|
|
event = Map.put(event, :session_id, sessions[{event.site_id, event.user_id}])
|
|
Plausible.Event.WriteBuffer.insert(event)
|
|
end)
|
|
|
|
Plausible.Session.WriteBuffer.flush()
|
|
Plausible.Event.WriteBuffer.flush()
|
|
end
|
|
|
|
defp populate_imported_stats(events) do
|
|
Enum.group_by(events, &Map.fetch!(&1, :table), &Map.delete(&1, :table))
|
|
|> Enum.map(fn {table, events} -> Plausible.Imported.Buffer.insert_all(table, events) end)
|
|
end
|
|
|
|
def relative_time(shifts) do
|
|
NaiveDateTime.utc_now()
|
|
|> Timex.shift(shifts)
|
|
|> NaiveDateTime.truncate(:second)
|
|
end
|
|
|
|
def to_naive_truncate(%DateTime{} = dt) do
|
|
to_naive_truncate(DateTime.to_naive(dt))
|
|
end
|
|
|
|
def to_naive_truncate(%NaiveDateTime{} = naive) do
|
|
NaiveDateTime.truncate(naive, :second)
|
|
end
|
|
|
|
def to_naive_truncate(%Date{} = date) do
|
|
NaiveDateTime.new!(date, ~T[00:00:00])
|
|
end
|
|
|
|
def eventually(expectation, wait_time_ms \\ 50, retries \\ 10) do
|
|
Enum.reduce_while(1..retries, nil, fn attempt, _acc ->
|
|
case expectation.() do
|
|
{true, result} ->
|
|
{:halt, result}
|
|
|
|
{false, _} ->
|
|
Process.sleep(wait_time_ms * attempt)
|
|
{:cont, nil}
|
|
end
|
|
end)
|
|
end
|
|
|
|
def await_clickhouse_count(query, expected) do
|
|
eventually(
|
|
fn ->
|
|
count = Plausible.ClickhouseRepo.aggregate(query, :count)
|
|
|
|
{count == expected, count}
|
|
end,
|
|
200,
|
|
10
|
|
)
|
|
end
|
|
|
|
def random_ip() do
|
|
Enum.map_join(1..4, ".", fn _ -> Enum.random(1..254) end)
|
|
end
|
|
end
|