Extract Universal Analytics import logic (#3700)

* Move imported tables schemas to separate modules outside Google ns

* Move buffer for imports to Imported ns

* fix schema newlines

* Extract UA import processing and persistence

* Decouple analytics worker implementation from UA

* Rename env variable for import buffer size

* Preserve old import queue until release
This commit is contained in:
Adrian Gruntkowski 2024-01-23 10:24:08 +01:00 committed by GitHub
parent 11c5d3b251
commit 822483c37c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 854 additions and 714 deletions

View File

@ -360,8 +360,10 @@ config :plausible, :google,
client_id: google_cid,
client_secret: google_secret,
api_url: "https://www.googleapis.com",
reporting_api_url: "https://analyticsreporting.googleapis.com",
max_buffer_size: get_int_from_path_or_env(config_dir, "GOOGLE_MAX_BUFFER_SIZE", 10_000)
reporting_api_url: "https://analyticsreporting.googleapis.com"
config :plausible, :imported,
max_buffer_size: get_int_from_path_or_env(config_dir, "IMPORTED_MAX_BUFFER_SIZE", 10_000)
maybe_ch_ipv6 =
get_var_from_path_or_env(config_dir, "ECTO_CH_IPV6", "false")
@ -521,7 +523,9 @@ base_queues = [
check_stats_emails: 1,
site_setup_emails: 1,
clean_invitations: 1,
# NOTE: to be removed once #3700 is released
google_analytics_imports: 1,
analytics_imports: 1,
domain_change_transition: 1,
check_accept_traffic_until: 1
]

View File

@ -22,26 +22,30 @@ defmodule ObanErrorReporter do
Sentry.capture_exception(meta.reason, stacktrace: meta.stacktrace, extra: extra)
end
# NOTE: To be cleaned up once #3700 is released
@analytics_queues ["analytics_imports", "google_analytics_imports"]
defp on_job_exception(%Oban.Job{
queue: "google_analytics_imports",
args: %{"site_id" => site_id},
queue: queue,
args: %{"site_id" => site_id, "source" => source},
state: "executing",
attempt: attempt,
max_attempts: max_attempts
})
when attempt >= max_attempts do
when queue in @analytics_queues and attempt >= max_attempts do
site = Plausible.Repo.get(Plausible.Site, site_id)
if site do
Plausible.Workers.ImportGoogleAnalytics.import_failed(site)
Plausible.Workers.ImportAnalytics.import_failed(source, site)
end
end
defp on_job_exception(%Oban.Job{
queue: "google_analytics_imports",
queue: queue,
args: %{"site_id" => site_id},
state: "executing"
}) do
})
when queue in @analytics_queues do
site = Plausible.Repo.get(Plausible.Site, site_id)
Plausible.Purge.delete_imported_stats!(site)
end

View File

@ -110,18 +110,17 @@ defmodule Plausible.Google.Api do
@per_page 7_500
@backoff_factor :timer.seconds(10)
@max_attempts 5
@spec import_analytics(Plausible.Site.t(), Date.Range.t(), String.t(), import_auth()) ::
@spec import_analytics(Date.Range.t(), String.t(), import_auth(), (String.t(), [map()] -> :ok)) ::
:ok | {:error, term()}
@doc """
Imports stats from a Google Analytics UA view to a Plausible site.
This function fetches Google Analytics reports in batches of #{@per_page} per
request. The batches are then buffered to Clickhouse by the
`Plausible.Google.Buffer` process.
request. The batches are then passed to persist callback.
Requests to Google Analytics can fail, and are retried at most
#{@max_attempts} times with an exponential backoff. Returns `:ok` when
importing has finished or `{:error, term()}` when a request to GA failed too
importing has finished or `{:error, term()}` when a request to GA failed too
many times.
Useful links:
@ -131,61 +130,43 @@ defmodule Plausible.Google.Api do
- [GA Dimensions reference](https://ga-dev-tools.web.app/dimensions-metrics-explorer)
"""
def import_analytics(site, date_range, view_id, auth) do
with {:ok, access_token} <- maybe_refresh_token(auth),
:ok <- do_import_analytics(site, date_range, view_id, access_token) do
:ok
else
{:error, cause} ->
Sentry.capture_message("Failed to import from Google Analytics",
extra: %{site: site.domain, error: inspect(cause)}
)
{:error, cause}
def import_analytics(date_range, view_id, auth, persist_fn) do
with {:ok, access_token} <- maybe_refresh_token(auth) do
do_import_analytics(date_range, view_id, access_token, persist_fn)
end
end
defp do_import_analytics(site, date_range, view_id, access_token) do
{:ok, buffer} = Plausible.Google.Buffer.start_link()
defp do_import_analytics(date_range, view_id, access_token, persist_fn) do
Enum.reduce_while(ReportRequest.full_report(), :ok, fn report_request, :ok ->
report_request = %ReportRequest{
report_request
| date_range: date_range,
view_id: view_id,
access_token: access_token,
page_token: nil,
page_size: @per_page
}
result =
Enum.reduce_while(ReportRequest.full_report(), :ok, fn report_request, :ok ->
report_request = %ReportRequest{
report_request
| date_range: date_range,
view_id: view_id,
access_token: access_token,
page_token: nil,
page_size: @per_page
}
case fetch_and_persist(site, report_request, buffer: buffer) do
:ok -> {:cont, :ok}
{:error, _} = error -> {:halt, error}
end
end)
Plausible.Google.Buffer.flush(buffer)
Plausible.Google.Buffer.stop(buffer)
result
case fetch_and_persist(report_request, persist_fn: persist_fn) do
:ok -> {:cont, :ok}
{:error, _} = error -> {:halt, error}
end
end)
end
@spec fetch_and_persist(Plausible.Site.t(), ReportRequest.t(), Keyword.t()) ::
@spec fetch_and_persist(ReportRequest.t(), Keyword.t()) ::
:ok | {:error, term()}
def fetch_and_persist(site, %ReportRequest{} = report_request, opts \\ []) do
buffer_pid = Keyword.get(opts, :buffer)
def fetch_and_persist(%ReportRequest{} = report_request, opts \\ []) do
persist_fn = Keyword.fetch!(opts, :persist_fn)
attempt = Keyword.get(opts, :attempt, 1)
sleep_time = Keyword.get(opts, :sleep_time, @backoff_factor)
case HTTP.get_report(report_request) do
{:ok, {rows, next_page_token}} ->
records = Plausible.Imported.from_google_analytics(rows, site.id, report_request.dataset)
:ok = Plausible.Google.Buffer.insert_many(buffer_pid, report_request.dataset, records)
:ok = persist_fn.(report_request.dataset, rows)
if next_page_token do
fetch_and_persist(
site,
%ReportRequest{report_request | page_token: next_page_token},
opts
)
@ -198,7 +179,7 @@ defmodule Plausible.Google.Api do
{:error, cause}
else
Process.sleep(attempt * sleep_time)
fetch_and_persist(site, report_request, Keyword.merge(opts, attempt: attempt + 1))
fetch_and_persist(report_request, Keyword.merge(opts, attempt: attempt + 1))
end
end
end

View File

@ -1,148 +0,0 @@
defmodule Plausible.Google.ImportedVisitor do
@moduledoc false
use Ecto.Schema
@primary_key false
schema "imported_visitors" do
field :site_id, Ch, type: "UInt64"
field :date, :date
field :visitors, Ch, type: "UInt64"
field :pageviews, Ch, type: "UInt64"
field :bounces, Ch, type: "UInt64"
field :visits, Ch, type: "UInt64"
field :visit_duration, Ch, type: "UInt64"
end
end
defmodule Plausible.Google.ImportedSource do
@moduledoc false
use Ecto.Schema
@primary_key false
schema "imported_sources" do
field :site_id, Ch, type: "UInt64"
field :date, :date
field :source, :string
field :utm_medium, :string
field :utm_campaign, :string
field :utm_content, :string
field :utm_term, :string
field :visitors, Ch, type: "UInt64"
field :visits, Ch, type: "UInt64"
field :visit_duration, Ch, type: "UInt64"
field :bounces, Ch, type: "UInt32"
end
end
defmodule Plausible.Google.ImportedPage do
@moduledoc false
use Ecto.Schema
@primary_key false
schema "imported_pages" do
field :site_id, Ch, type: "UInt64"
field :date, :date
field :hostname, :string
field :page, :string
field :visitors, Ch, type: "UInt64"
field :pageviews, Ch, type: "UInt64"
field :exits, Ch, type: "UInt64"
field :time_on_page, Ch, type: "UInt64"
end
end
defmodule Plausible.Google.ImportedEntryPage do
@moduledoc false
use Ecto.Schema
@primary_key false
schema "imported_entry_pages" do
field :site_id, Ch, type: "UInt64"
field :date, :date
field :entry_page, :string
field :visitors, Ch, type: "UInt64"
field :entrances, Ch, type: "UInt64"
field :visit_duration, Ch, type: "UInt64"
field :bounces, Ch, type: "UInt32"
end
end
defmodule Plausible.Google.ImportedExitPage do
@moduledoc false
use Ecto.Schema
@primary_key false
schema "imported_exit_pages" do
field :site_id, Ch, type: "UInt64"
field :date, :date
field :exit_page, :string
field :visitors, Ch, type: "UInt64"
field :exits, Ch, type: "UInt64"
end
end
defmodule Plausible.Google.ImportedLocation do
@moduledoc false
use Ecto.Schema
@primary_key false
schema "imported_locations" do
field :site_id, Ch, type: "UInt64"
field :date, :date
field :country, :string
field :region, :string
field :city, Ch, type: "UInt64"
field :visitors, Ch, type: "UInt64"
field :visits, Ch, type: "UInt64"
field :visit_duration, Ch, type: "UInt64"
field :bounces, Ch, type: "UInt32"
end
end
defmodule Plausible.Google.ImportedDevice do
@moduledoc false
use Ecto.Schema
@primary_key false
schema "imported_devices" do
field :site_id, Ch, type: "UInt64"
field :date, :date
field :device, :string
field :visitors, Ch, type: "UInt64"
field :visits, Ch, type: "UInt64"
field :visit_duration, Ch, type: "UInt64"
field :bounces, Ch, type: "UInt32"
end
end
defmodule Plausible.Google.ImportedBrowser do
@moduledoc false
use Ecto.Schema
@primary_key false
schema "imported_browsers" do
field :site_id, Ch, type: "UInt64"
field :date, :date
field :browser, :string
field :visitors, Ch, type: "UInt64"
field :visits, Ch, type: "UInt64"
field :visit_duration, Ch, type: "UInt64"
field :bounces, Ch, type: "UInt32"
end
end
defmodule Plausible.Google.ImportedOperatingSystem do
@moduledoc false
use Ecto.Schema
@primary_key false
schema "imported_operating_systems" do
field :site_id, Ch, type: "UInt64"
field :date, :date
field :operating_system, :string
field :visitors, Ch, type: "UInt64"
field :visits, Ch, type: "UInt64"
field :visit_duration, Ch, type: "UInt64"
field :bounces, Ch, type: "UInt32"
end
end

View File

@ -0,0 +1,15 @@
defmodule Plausible.Imported.Browser do
@moduledoc false
use Ecto.Schema
@primary_key false
schema "imported_browsers" do
field :site_id, Ch, type: "UInt64"
field :date, :date
field :browser, :string
field :visitors, Ch, type: "UInt64"
field :visits, Ch, type: "UInt64"
field :visit_duration, Ch, type: "UInt64"
field :bounces, Ch, type: "UInt32"
end
end

View File

@ -1,4 +1,4 @@
defmodule Plausible.Google.Buffer do
defmodule Plausible.Imported.Buffer do
@moduledoc """
This GenServer inserts records into Clickhouse `imported_*` tables. Multiple buffers are
automatically created for each table. Records are flushed when the table buffer reaches the
@ -81,7 +81,7 @@ defmodule Plausible.Google.Buffer do
defp max_buffer_size do
:plausible
|> Application.get_env(:google)
|> Application.fetch_env!(:imported)
|> Keyword.fetch!(:max_buffer_size)
end
@ -101,13 +101,13 @@ defmodule Plausible.Google.Buffer do
Plausible.IngestRepo.insert_all(schema, records)
end
defp table_schema("imported_visitors"), do: Plausible.Google.ImportedVisitor
defp table_schema("imported_sources"), do: Plausible.Google.ImportedSource
defp table_schema("imported_pages"), do: Plausible.Google.ImportedPage
defp table_schema("imported_entry_pages"), do: Plausible.Google.ImportedEntryPage
defp table_schema("imported_exit_pages"), do: Plausible.Google.ImportedExitPage
defp table_schema("imported_locations"), do: Plausible.Google.ImportedLocation
defp table_schema("imported_devices"), do: Plausible.Google.ImportedDevice
defp table_schema("imported_browsers"), do: Plausible.Google.ImportedBrowser
defp table_schema("imported_operating_systems"), do: Plausible.Google.ImportedOperatingSystem
defp table_schema("imported_visitors"), do: Plausible.Imported.Visitor
defp table_schema("imported_sources"), do: Plausible.Imported.Source
defp table_schema("imported_pages"), do: Plausible.Imported.Page
defp table_schema("imported_entry_pages"), do: Plausible.Imported.EntryPage
defp table_schema("imported_exit_pages"), do: Plausible.Imported.ExitPage
defp table_schema("imported_locations"), do: Plausible.Imported.Location
defp table_schema("imported_devices"), do: Plausible.Imported.Device
defp table_schema("imported_browsers"), do: Plausible.Imported.Browser
defp table_schema("imported_operating_systems"), do: Plausible.Imported.OperatingSystem
end

View File

@ -0,0 +1,15 @@
defmodule Plausible.Imported.Device do
@moduledoc false
use Ecto.Schema
@primary_key false
schema "imported_devices" do
field :site_id, Ch, type: "UInt64"
field :date, :date
field :device, :string
field :visitors, Ch, type: "UInt64"
field :visits, Ch, type: "UInt64"
field :visit_duration, Ch, type: "UInt64"
field :bounces, Ch, type: "UInt32"
end
end

View File

@ -0,0 +1,15 @@
defmodule Plausible.Imported.EntryPage do
@moduledoc false
use Ecto.Schema
@primary_key false
schema "imported_entry_pages" do
field :site_id, Ch, type: "UInt64"
field :date, :date
field :entry_page, :string
field :visitors, Ch, type: "UInt64"
field :entrances, Ch, type: "UInt64"
field :visit_duration, Ch, type: "UInt64"
field :bounces, Ch, type: "UInt32"
end
end

View File

@ -0,0 +1,13 @@
defmodule Plausible.Imported.ExitPage do
@moduledoc false
use Ecto.Schema
@primary_key false
schema "imported_exit_pages" do
field :site_id, Ch, type: "UInt64"
field :date, :date
field :exit_page, :string
field :visitors, Ch, type: "UInt64"
field :exits, Ch, type: "UInt64"
end
end

View File

@ -0,0 +1,16 @@
defmodule Plausible.Imported.ImportSources do
@moduledoc """
Definitions of import sources.
"""
@sources [
Plausible.Imported.UniversalAnalytics,
Plausible.Imported.NoopImporter
]
@sources_map Map.new(@sources, &{&1.name(), &1})
def by_name(name) do
Map.fetch!(@sources_map, name)
end
end

View File

@ -0,0 +1,17 @@
defmodule Plausible.Imported.Location do
@moduledoc false
use Ecto.Schema
@primary_key false
schema "imported_locations" do
field :site_id, Ch, type: "UInt64"
field :date, :date
field :country, :string
field :region, :string
field :city, Ch, type: "UInt64"
field :visitors, Ch, type: "UInt64"
field :visits, Ch, type: "UInt64"
field :visit_duration, Ch, type: "UInt64"
field :bounces, Ch, type: "UInt32"
end
end

View File

@ -0,0 +1,22 @@
defmodule Plausible.Imported.NoopImporter do
@moduledoc """
Stub import implementation.
"""
@name "Noop"
def name(), do: @name
def create_job(site, opts) do
Plausible.Workers.ImportAnalytics.new(%{
"source" => @name,
"site_id" => site.id,
"error" => opts[:error]
})
end
def parse_args(opts), do: opts
def import(_site, %{"error" => true}), do: {:error, "Something went wrong"}
def import(_site, _opts), do: :ok
end

View File

@ -0,0 +1,15 @@
defmodule Plausible.Imported.OperatingSystem do
@moduledoc false
use Ecto.Schema
@primary_key false
schema "imported_operating_systems" do
field :site_id, Ch, type: "UInt64"
field :date, :date
field :operating_system, :string
field :visitors, Ch, type: "UInt64"
field :visits, Ch, type: "UInt64"
field :visit_duration, Ch, type: "UInt64"
field :bounces, Ch, type: "UInt32"
end
end

View File

@ -0,0 +1,16 @@
defmodule Plausible.Imported.Page do
@moduledoc false
use Ecto.Schema
@primary_key false
schema "imported_pages" do
field :site_id, Ch, type: "UInt64"
field :date, :date
field :hostname, :string
field :page, :string
field :visitors, Ch, type: "UInt64"
field :pageviews, Ch, type: "UInt64"
field :exits, Ch, type: "UInt64"
field :time_on_page, Ch, type: "UInt64"
end
end

View File

@ -1,10 +1,4 @@
defmodule Plausible.Imported do
use Plausible.ClickhouseRepo
use Timex
require Logger
@missing_values ["(none)", "(not set)", "(not provided)", "(other)"]
@tables ~w(
imported_visitors imported_sources imported_pages imported_entry_pages
imported_exit_pages imported_locations imported_devices imported_browsers
@ -16,179 +10,4 @@ defmodule Plausible.Imported do
def forget(site) do
Plausible.Purge.delete_imported_stats!(site)
end
def from_google_analytics(nil, _site_id, _metric), do: nil
def from_google_analytics(data, site_id, table) do
Enum.reduce(data, [], fn row, acc ->
if Map.get(row.dimensions, "ga:date") in @missing_values do
acc
else
[new_from_google_analytics(site_id, table, row) | acc]
end
end)
end
defp parse_number(nr) do
{float, ""} = Float.parse(nr)
round(float)
end
defp new_from_google_analytics(site_id, "imported_visitors", row) do
%{
site_id: site_id,
date: get_date(row),
visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(),
pageviews: row.metrics |> Map.fetch!("ga:pageviews") |> parse_number(),
bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number(),
visits: row.metrics |> Map.fetch!("ga:sessions") |> parse_number(),
visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number()
}
end
defp new_from_google_analytics(site_id, "imported_sources", row) do
%{
site_id: site_id,
date: get_date(row),
source: row.dimensions |> Map.fetch!("ga:source") |> parse_referrer(),
utm_medium: row.dimensions |> Map.fetch!("ga:medium") |> default_if_missing(),
utm_campaign: row.dimensions |> Map.fetch!("ga:campaign") |> default_if_missing(),
utm_content: row.dimensions |> Map.fetch!("ga:adContent") |> default_if_missing(),
utm_term: row.dimensions |> Map.fetch!("ga:keyword") |> default_if_missing(),
visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(),
visits: row.metrics |> Map.fetch!("ga:sessions") |> parse_number(),
bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number(),
visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number()
}
end
defp new_from_google_analytics(site_id, "imported_pages", row) do
%{
site_id: site_id,
date: get_date(row),
hostname: row.dimensions |> Map.fetch!("ga:hostname") |> String.replace_prefix("www.", ""),
page: row.dimensions |> Map.fetch!("ga:pagePath") |> URI.parse() |> Map.get(:path),
visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(),
pageviews: row.metrics |> Map.fetch!("ga:pageviews") |> parse_number(),
exits: row.metrics |> Map.fetch!("ga:exits") |> parse_number(),
time_on_page: row.metrics |> Map.fetch!("ga:timeOnPage") |> parse_number()
}
end
defp new_from_google_analytics(site_id, "imported_entry_pages", row) do
%{
site_id: site_id,
date: get_date(row),
entry_page: row.dimensions |> Map.fetch!("ga:landingPagePath"),
visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(),
entrances: row.metrics |> Map.fetch!("ga:entrances") |> parse_number(),
visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number(),
bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number()
}
end
defp new_from_google_analytics(site_id, "imported_exit_pages", row) do
%{
site_id: site_id,
date: get_date(row),
exit_page: Map.fetch!(row.dimensions, "ga:exitPagePath"),
visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(),
exits: row.metrics |> Map.fetch!("ga:exits") |> parse_number()
}
end
defp new_from_google_analytics(site_id, "imported_locations", row) do
country_code = row.dimensions |> Map.fetch!("ga:countryIsoCode") |> default_if_missing("")
city_name = row.dimensions |> Map.fetch!("ga:city") |> default_if_missing("")
city_data = Location.get_city(city_name, country_code)
%{
site_id: site_id,
date: get_date(row),
country: country_code,
region: row.dimensions |> Map.fetch!("ga:regionIsoCode") |> default_if_missing(""),
city: city_data && city_data.id,
visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(),
visits: row.metrics |> Map.fetch!("ga:sessions") |> parse_number(),
bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number(),
visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number()
}
end
defp new_from_google_analytics(site_id, "imported_devices", row) do
%{
site_id: site_id,
date: get_date(row),
device: row.dimensions |> Map.fetch!("ga:deviceCategory") |> String.capitalize(),
visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(),
visits: row.metrics |> Map.fetch!("ga:sessions") |> parse_number(),
bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number(),
visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number()
}
end
@browser_google_to_plausible %{
"User-Agent:Opera" => "Opera",
"Mozilla Compatible Agent" => "Mobile App",
"Android Webview" => "Mobile App",
"Android Browser" => "Mobile App",
"Safari (in-app)" => "Mobile App",
"User-Agent: Mozilla" => "Firefox",
"(not set)" => ""
}
defp new_from_google_analytics(site_id, "imported_browsers", row) do
browser = Map.fetch!(row.dimensions, "ga:browser")
%{
site_id: site_id,
date: get_date(row),
browser: Map.get(@browser_google_to_plausible, browser, browser),
visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(),
visits: row.metrics |> Map.fetch!("ga:sessions") |> parse_number(),
bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number(),
visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number()
}
end
@os_google_to_plausible %{
"Macintosh" => "Mac",
"Linux" => "GNU/Linux",
"(not set)" => ""
}
defp new_from_google_analytics(site_id, "imported_operating_systems", row) do
os = Map.fetch!(row.dimensions, "ga:operatingSystem")
%{
site_id: site_id,
date: get_date(row),
operating_system: Map.get(@os_google_to_plausible, os, os),
visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(),
visits: row.metrics |> Map.fetch!("ga:sessions") |> parse_number(),
bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number(),
visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number()
}
end
defp get_date(%{dimensions: %{"ga:date" => date}}) do
date
|> Timex.parse!("%Y%m%d", :strftime)
|> NaiveDateTime.to_date()
end
defp default_if_missing(value, default \\ nil)
defp default_if_missing(value, default) when value in @missing_values, do: default
defp default_if_missing(value, _default), do: value
defp parse_referrer(nil), do: nil
defp parse_referrer("(direct)"), do: nil
defp parse_referrer("google"), do: "Google"
defp parse_referrer("bing"), do: "Bing"
defp parse_referrer("duckduckgo"), do: "DuckDuckGo"
defp parse_referrer(ref) do
RefInspector.parse("https://" <> ref)
|> PlausibleWeb.RefInspector.parse()
end
end

View File

@ -0,0 +1,19 @@
defmodule Plausible.Imported.Source do
@moduledoc false
use Ecto.Schema
@primary_key false
schema "imported_sources" do
field :site_id, Ch, type: "UInt64"
field :date, :date
field :source, :string
field :utm_medium, :string
field :utm_campaign, :string
field :utm_content, :string
field :utm_term, :string
field :visitors, Ch, type: "UInt64"
field :visits, Ch, type: "UInt64"
field :visit_duration, Ch, type: "UInt64"
field :bounces, Ch, type: "UInt32"
end
end

View File

@ -0,0 +1,271 @@
defmodule Plausible.Imported.UniversalAnalytics do
@moduledoc """
Import implementation for Universal Analytics.
"""
use Plausible.ClickhouseRepo
alias Plausible.Site
@missing_values ["(none)", "(not set)", "(not provided)", "(other)"]
@type job_opt() ::
{:view_id, non_neg_integer()}
| {:start_date | :end_date | :access_token | :refresh_token | :token_expires_at,
String.t()}
@type import_opt() ::
{:view_id, non_neg_integer()}
| {:date_range, Date.Range.t()}
| {:auth, {String.t(), String.t(), String.t()}}
# NOTE: we have to use old name for now
@name "Google Analytics"
@spec name() :: String.t()
def name(), do: @name
@spec create_job(Site.t(), [job_opt()]) :: Ecto.Changeset.t()
def create_job(site, opts) do
view_id = Keyword.fetch!(opts, :view_id)
start_date = Keyword.fetch!(opts, :start_date)
end_date = Keyword.fetch!(opts, :end_date)
access_token = Keyword.fetch!(opts, :access_token)
refresh_token = Keyword.fetch!(opts, :refresh_token)
token_expires_at = Keyword.fetch!(opts, :token_expires_at)
Plausible.Workers.ImportAnalytics.new(%{
"source" => @name,
"site_id" => site.id,
"view_id" => view_id,
"start_date" => start_date,
"end_date" => end_date,
"access_token" => access_token,
"refresh_token" => refresh_token,
"token_expires_at" => token_expires_at
})
end
@spec parse_args(map()) :: [import_opt()]
def parse_args(
%{"view_id" => view_id, "start_date" => start_date, "end_date" => end_date} = args
) do
start_date = Date.from_iso8601!(start_date)
end_date = Date.from_iso8601!(end_date)
date_range = Date.range(start_date, end_date)
auth = {
Map.fetch!(args, "access_token"),
Map.fetch!(args, "refresh_token"),
Map.fetch!(args, "token_expires_at")
}
[
view_id: view_id,
date_range: date_range,
auth: auth
]
end
@doc """
Imports stats from a Google Analytics UA view to a Plausible site.
This function fetches Google Analytics reports which are then passed in batches
to Clickhouse by the `Plausible.Imported.Buffer` process.
"""
@spec import(Site.t(), [import_opt()]) :: :ok | {:error, any()}
def import(site, opts) do
date_range = Keyword.fetch!(opts, :date_range)
view_id = Keyword.fetch!(opts, :view_id)
auth = Keyword.fetch!(opts, :auth)
{:ok, buffer} = Plausible.Imported.Buffer.start_link()
persist_fn = fn table, rows ->
records = from_report(rows, site.id, table)
Plausible.Imported.Buffer.insert_many(buffer, table, records)
end
try do
Plausible.Google.Api.import_analytics(date_range, view_id, auth, persist_fn)
after
Plausible.Imported.Buffer.flush(buffer)
Plausible.Imported.Buffer.stop(buffer)
end
end
def from_report(nil, _site_id, _metric), do: nil
def from_report(data, site_id, table) do
Enum.reduce(data, [], fn row, acc ->
if Map.get(row.dimensions, "ga:date") in @missing_values do
acc
else
[new_from_report(site_id, table, row) | acc]
end
end)
end
defp parse_number(nr) do
{float, ""} = Float.parse(nr)
round(float)
end
defp new_from_report(site_id, "imported_visitors", row) do
%{
site_id: site_id,
date: get_date(row),
visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(),
pageviews: row.metrics |> Map.fetch!("ga:pageviews") |> parse_number(),
bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number(),
visits: row.metrics |> Map.fetch!("ga:sessions") |> parse_number(),
visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number()
}
end
defp new_from_report(site_id, "imported_sources", row) do
%{
site_id: site_id,
date: get_date(row),
source: row.dimensions |> Map.fetch!("ga:source") |> parse_referrer(),
utm_medium: row.dimensions |> Map.fetch!("ga:medium") |> default_if_missing(),
utm_campaign: row.dimensions |> Map.fetch!("ga:campaign") |> default_if_missing(),
utm_content: row.dimensions |> Map.fetch!("ga:adContent") |> default_if_missing(),
utm_term: row.dimensions |> Map.fetch!("ga:keyword") |> default_if_missing(),
visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(),
visits: row.metrics |> Map.fetch!("ga:sessions") |> parse_number(),
bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number(),
visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number()
}
end
defp new_from_report(site_id, "imported_pages", row) do
%{
site_id: site_id,
date: get_date(row),
hostname: row.dimensions |> Map.fetch!("ga:hostname") |> String.replace_prefix("www.", ""),
page: row.dimensions |> Map.fetch!("ga:pagePath") |> URI.parse() |> Map.get(:path),
visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(),
pageviews: row.metrics |> Map.fetch!("ga:pageviews") |> parse_number(),
exits: row.metrics |> Map.fetch!("ga:exits") |> parse_number(),
time_on_page: row.metrics |> Map.fetch!("ga:timeOnPage") |> parse_number()
}
end
defp new_from_report(site_id, "imported_entry_pages", row) do
%{
site_id: site_id,
date: get_date(row),
entry_page: row.dimensions |> Map.fetch!("ga:landingPagePath"),
visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(),
entrances: row.metrics |> Map.fetch!("ga:entrances") |> parse_number(),
visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number(),
bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number()
}
end
defp new_from_report(site_id, "imported_exit_pages", row) do
%{
site_id: site_id,
date: get_date(row),
exit_page: Map.fetch!(row.dimensions, "ga:exitPagePath"),
visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(),
exits: row.metrics |> Map.fetch!("ga:exits") |> parse_number()
}
end
defp new_from_report(site_id, "imported_locations", row) do
country_code = row.dimensions |> Map.fetch!("ga:countryIsoCode") |> default_if_missing("")
city_name = row.dimensions |> Map.fetch!("ga:city") |> default_if_missing("")
city_data = Location.get_city(city_name, country_code)
%{
site_id: site_id,
date: get_date(row),
country: country_code,
region: row.dimensions |> Map.fetch!("ga:regionIsoCode") |> default_if_missing(""),
city: city_data && city_data.id,
visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(),
visits: row.metrics |> Map.fetch!("ga:sessions") |> parse_number(),
bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number(),
visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number()
}
end
defp new_from_report(site_id, "imported_devices", row) do
%{
site_id: site_id,
date: get_date(row),
device: row.dimensions |> Map.fetch!("ga:deviceCategory") |> String.capitalize(),
visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(),
visits: row.metrics |> Map.fetch!("ga:sessions") |> parse_number(),
bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number(),
visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number()
}
end
@browser_google_to_plausible %{
"User-Agent:Opera" => "Opera",
"Mozilla Compatible Agent" => "Mobile App",
"Android Webview" => "Mobile App",
"Android Browser" => "Mobile App",
"Safari (in-app)" => "Mobile App",
"User-Agent: Mozilla" => "Firefox",
"(not set)" => ""
}
defp new_from_report(site_id, "imported_browsers", row) do
browser = Map.fetch!(row.dimensions, "ga:browser")
%{
site_id: site_id,
date: get_date(row),
browser: Map.get(@browser_google_to_plausible, browser, browser),
visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(),
visits: row.metrics |> Map.fetch!("ga:sessions") |> parse_number(),
bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number(),
visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number()
}
end
@os_google_to_plausible %{
"Macintosh" => "Mac",
"Linux" => "GNU/Linux",
"(not set)" => ""
}
defp new_from_report(site_id, "imported_operating_systems", row) do
os = Map.fetch!(row.dimensions, "ga:operatingSystem")
%{
site_id: site_id,
date: get_date(row),
operating_system: Map.get(@os_google_to_plausible, os, os),
visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(),
visits: row.metrics |> Map.fetch!("ga:sessions") |> parse_number(),
bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number(),
visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number()
}
end
defp get_date(%{dimensions: %{"ga:date" => date}}) do
date
|> Timex.parse!("%Y%m%d", :strftime)
|> NaiveDateTime.to_date()
end
defp default_if_missing(value, default \\ nil)
defp default_if_missing(value, default) when value in @missing_values, do: default
defp default_if_missing(value, _default), do: value
defp parse_referrer(nil), do: nil
defp parse_referrer("(direct)"), do: nil
defp parse_referrer("google"), do: "Google"
defp parse_referrer("bing"), do: "Bing"
defp parse_referrer("duckduckgo"), do: "DuckDuckGo"
defp parse_referrer(ref) do
RefInspector.parse("https://" <> ref)
|> PlausibleWeb.RefInspector.parse()
end
end

View File

@ -0,0 +1,15 @@
defmodule Plausible.Imported.Visitor do
@moduledoc false
use Ecto.Schema
@primary_key false
schema "imported_visitors" do
field :site_id, Ch, type: "UInt64"
field :date, :date
field :visitors, Ch, type: "UInt64"
field :pageviews, Ch, type: "UInt64"
field :bounces, Ch, type: "UInt64"
field :visits, Ch, type: "UInt64"
field :visit_duration, Ch, type: "UInt64"
end
end

View File

@ -1,6 +1,6 @@
defmodule Plausible.Site.ImportedData do
@moduledoc """
Embedded schema for Google Analytics imports
Embedded schema for analytics imports
"""
use Ecto.Schema

View File

@ -751,23 +751,24 @@ defmodule PlausibleWeb.SiteController do
"refresh_token" => refresh_token,
"expires_at" => expires_at
}) do
site = conn.assigns[:site]
site = conn.assigns.site
source = "Google Analytics"
job =
Plausible.Workers.ImportGoogleAnalytics.new(%{
"site_id" => site.id,
"view_id" => view_id,
"start_date" => start_date,
"end_date" => end_date,
"access_token" => access_token,
"refresh_token" => refresh_token,
"token_expires_at" => expires_at
})
Plausible.Imported.UniversalAnalytics.create_job(
site,
view_id: view_id,
start_date: start_date,
end_date: end_date,
access_token: access_token,
refresh_token: refresh_token,
token_expires_at: expires_at
)
Ecto.Multi.new()
|> Ecto.Multi.update(
:update_site,
Plausible.Site.start_import(site, start_date, end_date, "Google Analytics")
Plausible.Site.start_import(site, start_date, end_date, source)
)
|> Oban.insert(:oban_job, job)
|> Repo.transaction()
@ -777,6 +778,9 @@ defmodule PlausibleWeb.SiteController do
|> redirect(external: Routes.site_path(conn, :settings_integrations, site.domain))
end
# NOTE: To be cleaned up once #3700 is released
@analytics_queues ["analytics_imports", "google_analytics_imports"]
def forget_imported(conn, _params) do
site = conn.assigns[:site]
@ -785,7 +789,7 @@ defmodule PlausibleWeb.SiteController do
Oban.cancel_all_jobs(
from j in Oban.Job,
where:
j.queue == "google_analytics_imports" and
j.queue in @analytics_queues and
fragment("(? ->> 'site_id')::int", j.args) == ^site.id
)

View File

@ -313,11 +313,12 @@ defmodule PlausibleWeb.Email do
)
end
def import_success(user, site) do
# NOTE: make email different depending on import source
def import_success(source, user, site) do
priority_email()
|> to(user)
|> tag("import-success-email")
|> subject("Google Analytics data imported for #{site.domain}")
|> subject("#{source} data imported for #{site.domain}")
|> render("google_analytics_import.html", %{
site: site,
link: PlausibleWeb.Endpoint.url() <> "/" <> URI.encode_www_form(site.domain),
@ -326,11 +327,12 @@ defmodule PlausibleWeb.Email do
})
end
def import_failure(user, site) do
# NOTE: make email different depending on import source
def import_failure(source, user, site) do
priority_email()
|> to(user)
|> tag("import-failure-email")
|> subject("Google Analytics import failed for #{site.domain}")
|> subject("#{source} import failed for #{site.domain}")
|> render("google_analytics_import.html", %{
user: user,
site: site,

View File

@ -0,0 +1,79 @@
defmodule Plausible.Workers.ImportAnalytics do
@moduledoc """
Worker for running analytics import jobs.
"""
use Plausible.Repo
require Logger
use Oban.Worker,
queue: :analytics_imports,
max_attempts: 3,
unique: [fields: [:args], period: 60]
alias Plausible.Imported.ImportSources
@impl Oban.Worker
def perform(%Oban.Job{
args: %{"site_id" => site_id, "source" => source} = args
}) do
import_api = ImportSources.by_name(source)
import_opts = import_api.parse_args(args)
site = Repo.get!(Plausible.Site, site_id)
case import_api.import(site, import_opts) do
:ok ->
import_success(source, site)
:ok
{:error, error} ->
Sentry.capture_message("Failed to import from Google Analytics",
extra: %{site: site.domain, error: inspect(error)}
)
import_failed(source, site)
{:error, error}
end
end
@impl Oban.Worker
def backoff(_job) do
# 5 minutes
300
end
def import_success(source, site) do
site = Repo.preload(site, memberships: :user)
site
|> Plausible.Site.import_success()
|> Repo.update!()
Enum.each(site.memberships, fn membership ->
if membership.role in [:owner, :admin] do
PlausibleWeb.Email.import_success(source, membership.user, site)
|> Plausible.Mailer.send()
end
end)
end
def import_failed(source, site) do
site = Repo.preload(site, memberships: :user)
site
|> Plausible.Site.import_failure()
|> Repo.update!()
Plausible.Purge.delete_imported_stats!(site)
Enum.each(site.memberships, fn membership ->
if membership.role in [:owner, :admin] do
PlausibleWeb.Email.import_failure(source, membership.user, site)
|> Plausible.Mailer.send()
end
end)
end
end

View File

@ -1,71 +0,0 @@
defmodule Plausible.Workers.ImportGoogleAnalytics do
use Plausible.Repo
require Logger
use Oban.Worker,
queue: :google_analytics_imports,
max_attempts: 3,
unique: [fields: [:args], period: 60]
@impl Oban.Worker
def perform(
%Oban.Job{
args:
%{
"site_id" => site_id,
"view_id" => view_id,
"start_date" => start_date,
"end_date" => end_date
} = args
},
google_api \\ Plausible.Google.Api
) do
site = Repo.get(Plausible.Site, site_id) |> Repo.preload([[memberships: :user]])
start_date = Date.from_iso8601!(start_date)
end_date = Date.from_iso8601!(end_date)
date_range = Date.range(start_date, end_date)
auth = {args["access_token"], args["refresh_token"], args["token_expires_at"]}
case google_api.import_analytics(site, date_range, view_id, auth) do
:ok ->
Plausible.Site.import_success(site)
|> Repo.update!()
Enum.each(site.memberships, fn membership ->
if membership.role in [:owner, :admin] do
PlausibleWeb.Email.import_success(membership.user, site)
|> Plausible.Mailer.send()
end
end)
:ok
{:error, error} ->
Logger.error("Import: Failed to import from GA. Reason: #{inspect(error)}")
import_failed(site)
{:error, error}
end
end
@impl Oban.Worker
def backoff(_job) do
# 5 minutes
300
end
def import_failed(site) do
site = Repo.preload(site, memberships: :user)
Plausible.Site.import_failure(site) |> Repo.update!()
Plausible.Purge.delete_imported_stats!(site)
Enum.each(site.memberships, fn membership ->
if membership.role in [:owner, :admin] do
PlausibleWeb.Email.import_failure(membership.user, site)
|> Plausible.Mailer.send()
end
end)
end
end

View File

@ -3,6 +3,7 @@ defmodule Plausible.Google.ApiTest do
use Plausible.Test.Support.HTTPMocker
alias Plausible.Google.Api
alias Plausible.Imported.UniversalAnalytics
import ExUnit.CaptureLog
import Mox
@ -36,7 +37,18 @@ defmodule Plausible.Google.ApiTest do
future = DateTime.utc_now() |> DateTime.add(3600, :second) |> DateTime.to_iso8601()
auth = {"***", "refresh_token", future}
assert :ok == Plausible.Google.Api.import_analytics(site, date_range, view_id, auth)
{:ok, buffer} = Plausible.Imported.Buffer.start_link()
persist_fn = fn table, rows ->
records = UniversalAnalytics.from_report(rows, site.id, table)
Plausible.Imported.Buffer.insert_many(buffer, table, records)
end
assert :ok == Plausible.Google.Api.import_analytics(date_range, view_id, auth, persist_fn)
Plausible.Imported.Buffer.flush(buffer)
Plausible.Imported.Buffer.stop(buffer)
assert 1_495_150 == Plausible.Stats.Clickhouse.imported_pageview_count(site)
end
@ -67,20 +79,25 @@ defmodule Plausible.Google.ApiTest do
end)
end
assert :ok == Plausible.Google.Api.import_analytics(site, range, "123551", auth)
{:ok, buffer} = Plausible.Imported.Buffer.start_link()
persist_fn = fn table, rows ->
records = UniversalAnalytics.from_report(rows, site.id, table)
Plausible.Imported.Buffer.insert_many(buffer, table, records)
end
assert :ok == Plausible.Google.Api.import_analytics(range, "123551", auth, persist_fn)
Plausible.Imported.Buffer.flush(buffer)
Plausible.Imported.Buffer.stop(buffer)
end
describe "fetch_and_persist/4" do
@ok_response Jason.decode!(File.read!("fixture/ga_batch_report.json"))
@no_report_response Jason.decode!(File.read!("fixture/ga_report_empty_rows.json"))
setup do
{:ok, pid} = Plausible.Google.Buffer.start_link()
{:ok, buffer: pid}
end
@tag :slow
test "will fetch and persist import data from Google Analytics", %{site: site, buffer: buffer} do
test "will fetch and persist import data from Google Analytics" do
request = %Plausible.Google.ReportRequest{
dataset: "imported_exit_pages",
view_id: "123",
@ -121,24 +138,19 @@ defmodule Plausible.Google.ApiTest do
end
)
Api.fetch_and_persist(site, request,
sleep_time: 0,
buffer: buffer
)
assert :ok =
Api.fetch_and_persist(request,
sleep_time: 0,
persist_fn: fn dataset, row ->
assert dataset == "imported_exit_pages"
assert length(row) == 1479
Plausible.Google.Buffer.flush(buffer)
assert 1479 ==
Plausible.ClickhouseRepo.aggregate(
from(iex in "imported_exit_pages", where: iex.site_id == ^site.id),
:count
:ok
end
)
end
test "retries HTTP request up to 5 times before raising the last error", %{
site: site,
buffer: buffer
} do
test "retries HTTP request up to 5 times before raising the last error" do
expect(
Plausible.HTTPClient.Mock,
:post,
@ -166,13 +178,13 @@ defmodule Plausible.Google.ApiTest do
}
assert {:error, :request_failed} =
Api.fetch_and_persist(site, request,
Api.fetch_and_persist(request,
sleep_time: 0,
buffer: buffer
persist_fn: fn _dataset, _rows -> :ok end
)
end
test "does not fail when report does not have rows key", %{site: site, buffer: buffer} do
test "does not fail when report does not have rows key" do
expect(
Plausible.HTTPClient.Mock,
:post,
@ -197,9 +209,14 @@ defmodule Plausible.Google.ApiTest do
}
assert :ok ==
Api.fetch_and_persist(site, request,
Api.fetch_and_persist(request,
sleep_time: 0,
buffer: buffer
persist_fn: fn dataset, rows ->
assert dataset == "imported_exit_pages"
assert rows == []
:ok
end
)
end
end

View File

@ -1,14 +1,14 @@
defmodule Plausible.Google.BufferTest do
defmodule Plausible.Imported.BufferTest do
use Plausible.DataCase, async: false
import Ecto.Query
alias Plausible.Google.Buffer
alias Plausible.Imported.Buffer
setup [:create_user, :create_new_site, :set_buffer_size]
defp set_buffer_size(_setup_args) do
google_setting = Application.get_env(:plausible, :google)
patch_env(:google, Keyword.put(google_setting, :max_buffer_size, 10))
imported_setting = Application.fetch_env!(:plausible, :imported)
patch_env(:imported, Keyword.put(imported_setting, :max_buffer_size, 10))
:ok
end

View File

@ -6,8 +6,8 @@ defmodule Plausible.ImportedTest do
defp import_data(ga_data, site_id, table_name) do
ga_data
|> Plausible.Imported.from_google_analytics(site_id, table_name)
|> then(&Plausible.Google.Buffer.insert_all(table_name, &1))
|> Plausible.Imported.UniversalAnalytics.from_report(site_id, table_name)
|> then(&Plausible.Imported.Buffer.insert_all(table_name, &1))
end
describe "Parse and import third party data fetched from Google Analytics" do

View File

@ -0,0 +1,70 @@
defmodule Plausible.Imported.UniversalAnalyticsTest do
use Plausible.DataCase, async: true
use Plausible.Test.Support.HTTPMocker
alias Plausible.Imported.UniversalAnalytics
setup [:create_user, :create_new_site]
describe "create_job/2 and parse_args/1" do
test "parses job args properly" do
site = insert(:site)
site_id = site.id
expires_at = NaiveDateTime.to_iso8601(NaiveDateTime.utc_now())
job =
UniversalAnalytics.create_job(site,
view_id: 123,
start_date: "2023-10-01",
end_date: "2024-01-02",
access_token: "access123",
refresh_token: "refresh123",
token_expires_at: expires_at
)
assert %Ecto.Changeset{
data: %Oban.Job{},
changes: %{
args:
%{
"site_id" => ^site_id,
"view_id" => 123,
"start_date" => "2023-10-01",
"end_date" => "2024-01-02",
"access_token" => "access123",
"refresh_token" => "refresh123",
"token_expires_at" => ^expires_at
} = args
}
} = job
assert opts = [_ | _] = UniversalAnalytics.parse_args(args)
assert opts[:view_id] == 123
assert opts[:date_range] == Date.range(~D[2023-10-01], ~D[2024-01-02])
assert opts[:auth] == {"access123", "refresh123", expires_at}
end
end
describe "import/2" do
@tag :slow
test "imports page views from Google Analytics", %{site: site} do
mock_http_with("google_analytics_import#1.json")
view_id = "54297898"
date_range = Date.range(~D[2011-01-01], ~D[2022-07-19])
future = DateTime.utc_now() |> DateTime.add(3600, :second) |> DateTime.to_iso8601()
auth = {"***", "refresh_token", future}
assert :ok ==
UniversalAnalytics.import(site,
date_range: date_range,
view_id: view_id,
auth: auth
)
assert 1_495_150 == Plausible.Stats.Clickhouse.imported_pageview_count(site)
end
end
end

View File

@ -1289,8 +1289,9 @@ defmodule PlausibleWeb.SiteControllerTest do
})
assert_enqueued(
worker: Plausible.Workers.ImportGoogleAnalytics,
worker: Plausible.Workers.ImportAnalytics,
args: %{
"source" => "Google Analytics",
"site_id" => site.id,
"view_id" => "123",
"start_date" => "2018-03-01",
@ -1330,7 +1331,8 @@ defmodule PlausibleWeb.SiteControllerTest do
test "cancels Oban job if it exists", %{conn: conn, site: site} do
{:ok, job} =
Plausible.Workers.ImportGoogleAnalytics.new(%{
Plausible.Workers.ImportAnalytics.new(%{
"source" => "Google Analytics",
"site_id" => site.id,
"view_id" => "123",
"start_date" => "2022-01-01",

View File

@ -196,7 +196,7 @@ defmodule Plausible.TestUtils do
defp populate_imported_stats(events) do
Enum.group_by(events, &Map.fetch!(&1, :table), &Map.delete(&1, :table))
|> Enum.map(fn {table, events} -> Plausible.Google.Buffer.insert_all(table, events) end)
|> Enum.map(fn {table, events} -> Plausible.Imported.Buffer.insert_all(table, events) end)
end
def relative_time(shifts) do

View File

@ -0,0 +1,125 @@
defmodule Plausible.Workers.ImportAnalyticsTest do
use Plausible.DataCase
use Bamboo.Test
alias Plausible.Workers.ImportAnalytics
@moduletag capture_log: true
describe "Universal Analytics" do
setup do
%{
imported_data: %Plausible.Site.ImportedData{
start_date: Timex.today() |> Timex.shift(days: -7),
end_date: Timex.today(),
source: "Noop",
status: "importing"
}
}
end
test "updates the imported_data field for the site after successful import", %{
imported_data: imported_data
} do
user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1))
site = insert(:site, members: [user], imported_data: imported_data)
ImportAnalytics.perform(%Oban.Job{
args: %{
"source" => "Noop",
"site_id" => site.id
}
})
assert Repo.reload!(site).imported_data.status == "ok"
end
test "updates the stats_start_date field for the site after successful import", %{
imported_data: imported_data
} do
user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1))
site = insert(:site, members: [user], imported_data: imported_data)
ImportAnalytics.perform(%Oban.Job{
args: %{
"source" => "Noop",
"site_id" => site.id
}
})
assert Repo.reload!(site).stats_start_date == imported_data.start_date
end
test "sends email to owner after successful import", %{imported_data: imported_data} do
user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1))
site = insert(:site, members: [user], imported_data: imported_data)
ImportAnalytics.perform(%Oban.Job{
args: %{
"source" => "Noop",
"site_id" => site.id
}
})
assert_email_delivered_with(
to: [user],
subject: "Noop data imported for #{site.domain}"
)
end
test "updates site record after failed import", %{imported_data: imported_data} do
user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1))
site = insert(:site, members: [user], imported_data: imported_data)
ImportAnalytics.perform(%Oban.Job{
args: %{
"source" => "Noop",
"site_id" => site.id,
"error" => true
}
})
assert Repo.reload!(site).imported_data.status == "error"
end
test "clears any orphaned data during import", %{imported_data: imported_data} do
user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1))
site = insert(:site, members: [user], imported_data: imported_data)
populate_stats(site, [
build(:imported_visitors, pageviews: 10)
])
ImportAnalytics.perform(%Oban.Job{
args: %{
"source" => "Noop",
"site_id" => site.id,
"error" => true
}
})
assert eventually(fn ->
count = Plausible.Stats.Clickhouse.imported_pageview_count(site)
{count == 0, count}
end)
end
test "sends email to owner after failed import", %{imported_data: imported_data} do
user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1))
site = insert(:site, members: [user], imported_data: imported_data)
ImportAnalytics.perform(%Oban.Job{
args: %{
"source" => "Noop",
"site_id" => site.id,
"error" => true
}
})
assert_email_delivered_with(
to: [user],
subject: "Noop import failed for #{site.domain}"
)
end
end
end

View File

@ -1,197 +0,0 @@
defmodule Plausible.Workers.ImportGoogleAnalyticsTest do
use Plausible.DataCase
use Bamboo.Test
import Double
alias Plausible.Workers.ImportGoogleAnalytics
@moduletag capture_log: true
@imported_data %Plausible.Site.ImportedData{
start_date: Timex.today() |> Timex.shift(days: -7),
end_date: Timex.today(),
source: "Google Analytics",
status: "importing"
}
test "updates the imported_data field for the site after successful import" do
user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1))
site = insert(:site, members: [user], imported_data: @imported_data)
api_stub =
stub(Plausible.Google.Api, :import_analytics, fn _site,
_date_range,
_view_id,
_access_token ->
:ok
end)
ImportGoogleAnalytics.perform(
%Oban.Job{
args: %{
"site_id" => site.id,
"view_id" => "view_id",
"start_date" => "2020-01-01",
"end_date" => "2022-01-01",
"access_token" => "token"
}
},
api_stub
)
assert Repo.reload!(site).imported_data.status == "ok"
end
test "updates the stats_start_date field for the site after successful import" do
user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1))
site = insert(:site, members: [user], imported_data: @imported_data)
api_stub =
stub(Plausible.Google.Api, :import_analytics, fn _site,
_date_range,
_view_id,
_access_token ->
:ok
end)
ImportGoogleAnalytics.perform(
%Oban.Job{
args: %{
"site_id" => site.id,
"view_id" => "view_id",
"start_date" => "2020-01-01",
"end_date" => "2022-01-01",
"access_token" => "token"
}
},
api_stub
)
assert Repo.reload!(site).stats_start_date == @imported_data.start_date
end
test "sends email to owner after successful import" do
user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1))
site = insert(:site, members: [user], imported_data: @imported_data)
api_stub =
stub(Plausible.Google.Api, :import_analytics, fn _site,
_date_range,
_view_id,
_access_token ->
:ok
end)
ImportGoogleAnalytics.perform(
%Oban.Job{
args: %{
"site_id" => site.id,
"view_id" => "view_id",
"start_date" => "2020-01-01",
"end_date" => "2022-01-01",
"access_token" => "token"
}
},
api_stub
)
assert_email_delivered_with(
to: [user],
subject: "Google Analytics data imported for #{site.domain}"
)
end
test "updates site record after failed import" do
user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1))
site = insert(:site, members: [user], imported_data: @imported_data)
api_stub =
stub(Plausible.Google.Api, :import_analytics, fn _site,
_date_range,
_view_id,
_access_token ->
{:error, "Something went wrong"}
end)
ImportGoogleAnalytics.perform(
%Oban.Job{
args: %{
"site_id" => site.id,
"view_id" => "view_id",
"start_date" => "2020-01-01",
"end_date" => "2022-01-01",
"access_token" => "token"
}
},
api_stub
)
assert Repo.reload!(site).imported_data.status == "error"
end
test "clears any orphaned data during import" do
user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1))
site = insert(:site, members: [user], imported_data: @imported_data)
populate_stats(site, [
build(:imported_visitors, pageviews: 10)
])
api_stub =
stub(Plausible.Google.Api, :import_analytics, fn _site,
_date_range,
_view_id,
_access_token ->
{:error, "Something went wrong"}
end)
ImportGoogleAnalytics.perform(
%Oban.Job{
args: %{
"site_id" => site.id,
"view_id" => "view_id",
"start_date" => "2020-01-01",
"end_date" => "2022-01-01",
"access_token" => "token"
}
},
api_stub
)
assert eventually(fn ->
count = Plausible.Stats.Clickhouse.imported_pageview_count(site)
{count == 0, count}
end)
end
test "sends email to owner after failed import" do
user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1))
site = insert(:site, members: [user], imported_data: @imported_data)
api_stub =
stub(Plausible.Google.Api, :import_analytics, fn _site,
_date_range,
_view_id,
_access_token ->
{:error, "Something went wrong"}
end)
ImportGoogleAnalytics.perform(
%Oban.Job{
args: %{
"site_id" => site.id,
"view_id" => "view_id",
"start_date" => "2020-01-01",
"end_date" => "2022-01-01",
"access_token" => "token"
}
},
api_stub
)
assert_email_delivered_with(
to: [user],
subject: "Google Analytics import failed for #{site.domain}"
)
end
end