diff --git a/config/runtime.exs b/config/runtime.exs index 8084e08d8..c7925b414 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -360,8 +360,10 @@ config :plausible, :google, client_id: google_cid, client_secret: google_secret, api_url: "https://www.googleapis.com", - reporting_api_url: "https://analyticsreporting.googleapis.com", - max_buffer_size: get_int_from_path_or_env(config_dir, "GOOGLE_MAX_BUFFER_SIZE", 10_000) + reporting_api_url: "https://analyticsreporting.googleapis.com" + +config :plausible, :imported, + max_buffer_size: get_int_from_path_or_env(config_dir, "IMPORTED_MAX_BUFFER_SIZE", 10_000) maybe_ch_ipv6 = get_var_from_path_or_env(config_dir, "ECTO_CH_IPV6", "false") @@ -521,7 +523,9 @@ base_queues = [ check_stats_emails: 1, site_setup_emails: 1, clean_invitations: 1, + # NOTE: to be removed once #3700 is released google_analytics_imports: 1, + analytics_imports: 1, domain_change_transition: 1, check_accept_traffic_until: 1 ] diff --git a/lib/oban_error_reporter.ex b/lib/oban_error_reporter.ex index 4677fd483..171b7d5f4 100644 --- a/lib/oban_error_reporter.ex +++ b/lib/oban_error_reporter.ex @@ -22,26 +22,30 @@ defmodule ObanErrorReporter do Sentry.capture_exception(meta.reason, stacktrace: meta.stacktrace, extra: extra) end + # NOTE: To be cleaned up once #3700 is released + @analytics_queues ["analytics_imports", "google_analytics_imports"] + defp on_job_exception(%Oban.Job{ - queue: "google_analytics_imports", - args: %{"site_id" => site_id}, + queue: queue, + args: %{"site_id" => site_id, "source" => source}, state: "executing", attempt: attempt, max_attempts: max_attempts }) - when attempt >= max_attempts do + when queue in @analytics_queues and attempt >= max_attempts do site = Plausible.Repo.get(Plausible.Site, site_id) if site do - Plausible.Workers.ImportGoogleAnalytics.import_failed(site) + Plausible.Workers.ImportAnalytics.import_failed(source, site) end end defp on_job_exception(%Oban.Job{ - queue: "google_analytics_imports", + queue: queue, args: %{"site_id" => site_id}, state: "executing" - }) do + }) + when queue in @analytics_queues do site = Plausible.Repo.get(Plausible.Site, site_id) Plausible.Purge.delete_imported_stats!(site) end diff --git a/lib/plausible/google/api.ex b/lib/plausible/google/api.ex index b6b38d625..24e6771a3 100644 --- a/lib/plausible/google/api.ex +++ b/lib/plausible/google/api.ex @@ -110,18 +110,17 @@ defmodule Plausible.Google.Api do @per_page 7_500 @backoff_factor :timer.seconds(10) @max_attempts 5 - @spec import_analytics(Plausible.Site.t(), Date.Range.t(), String.t(), import_auth()) :: + @spec import_analytics(Date.Range.t(), String.t(), import_auth(), (String.t(), [map()] -> :ok)) :: :ok | {:error, term()} @doc """ Imports stats from a Google Analytics UA view to a Plausible site. This function fetches Google Analytics reports in batches of #{@per_page} per - request. The batches are then buffered to Clickhouse by the - `Plausible.Google.Buffer` process. + request. The batches are then passed to persist callback. Requests to Google Analytics can fail, and are retried at most #{@max_attempts} times with an exponential backoff. Returns `:ok` when - importing has finished or `{:error, term()}` when a request to GA failed too + importing has finished or `{:error, term()}` when a request to GA failed too many times. Useful links: @@ -131,61 +130,43 @@ defmodule Plausible.Google.Api do - [GA Dimensions reference](https://ga-dev-tools.web.app/dimensions-metrics-explorer) """ - def import_analytics(site, date_range, view_id, auth) do - with {:ok, access_token} <- maybe_refresh_token(auth), - :ok <- do_import_analytics(site, date_range, view_id, access_token) do - :ok - else - {:error, cause} -> - Sentry.capture_message("Failed to import from Google Analytics", - extra: %{site: site.domain, error: inspect(cause)} - ) - - {:error, cause} + def import_analytics(date_range, view_id, auth, persist_fn) do + with {:ok, access_token} <- maybe_refresh_token(auth) do + do_import_analytics(date_range, view_id, access_token, persist_fn) end end - defp do_import_analytics(site, date_range, view_id, access_token) do - {:ok, buffer} = Plausible.Google.Buffer.start_link() + defp do_import_analytics(date_range, view_id, access_token, persist_fn) do + Enum.reduce_while(ReportRequest.full_report(), :ok, fn report_request, :ok -> + report_request = %ReportRequest{ + report_request + | date_range: date_range, + view_id: view_id, + access_token: access_token, + page_token: nil, + page_size: @per_page + } - result = - Enum.reduce_while(ReportRequest.full_report(), :ok, fn report_request, :ok -> - report_request = %ReportRequest{ - report_request - | date_range: date_range, - view_id: view_id, - access_token: access_token, - page_token: nil, - page_size: @per_page - } - - case fetch_and_persist(site, report_request, buffer: buffer) do - :ok -> {:cont, :ok} - {:error, _} = error -> {:halt, error} - end - end) - - Plausible.Google.Buffer.flush(buffer) - Plausible.Google.Buffer.stop(buffer) - - result + case fetch_and_persist(report_request, persist_fn: persist_fn) do + :ok -> {:cont, :ok} + {:error, _} = error -> {:halt, error} + end + end) end - @spec fetch_and_persist(Plausible.Site.t(), ReportRequest.t(), Keyword.t()) :: + @spec fetch_and_persist(ReportRequest.t(), Keyword.t()) :: :ok | {:error, term()} - def fetch_and_persist(site, %ReportRequest{} = report_request, opts \\ []) do - buffer_pid = Keyword.get(opts, :buffer) + def fetch_and_persist(%ReportRequest{} = report_request, opts \\ []) do + persist_fn = Keyword.fetch!(opts, :persist_fn) attempt = Keyword.get(opts, :attempt, 1) sleep_time = Keyword.get(opts, :sleep_time, @backoff_factor) case HTTP.get_report(report_request) do {:ok, {rows, next_page_token}} -> - records = Plausible.Imported.from_google_analytics(rows, site.id, report_request.dataset) - :ok = Plausible.Google.Buffer.insert_many(buffer_pid, report_request.dataset, records) + :ok = persist_fn.(report_request.dataset, rows) if next_page_token do fetch_and_persist( - site, %ReportRequest{report_request | page_token: next_page_token}, opts ) @@ -198,7 +179,7 @@ defmodule Plausible.Google.Api do {:error, cause} else Process.sleep(attempt * sleep_time) - fetch_and_persist(site, report_request, Keyword.merge(opts, attempt: attempt + 1)) + fetch_and_persist(report_request, Keyword.merge(opts, attempt: attempt + 1)) end end end diff --git a/lib/plausible/google/schemas.ex b/lib/plausible/google/schemas.ex deleted file mode 100644 index 80d52edf0..000000000 --- a/lib/plausible/google/schemas.ex +++ /dev/null @@ -1,148 +0,0 @@ -defmodule Plausible.Google.ImportedVisitor do - @moduledoc false - use Ecto.Schema - - @primary_key false - schema "imported_visitors" do - field :site_id, Ch, type: "UInt64" - field :date, :date - field :visitors, Ch, type: "UInt64" - field :pageviews, Ch, type: "UInt64" - field :bounces, Ch, type: "UInt64" - field :visits, Ch, type: "UInt64" - field :visit_duration, Ch, type: "UInt64" - end -end - -defmodule Plausible.Google.ImportedSource do - @moduledoc false - use Ecto.Schema - - @primary_key false - schema "imported_sources" do - field :site_id, Ch, type: "UInt64" - field :date, :date - field :source, :string - field :utm_medium, :string - field :utm_campaign, :string - field :utm_content, :string - field :utm_term, :string - field :visitors, Ch, type: "UInt64" - field :visits, Ch, type: "UInt64" - field :visit_duration, Ch, type: "UInt64" - field :bounces, Ch, type: "UInt32" - end -end - -defmodule Plausible.Google.ImportedPage do - @moduledoc false - use Ecto.Schema - - @primary_key false - schema "imported_pages" do - field :site_id, Ch, type: "UInt64" - field :date, :date - field :hostname, :string - field :page, :string - field :visitors, Ch, type: "UInt64" - field :pageviews, Ch, type: "UInt64" - field :exits, Ch, type: "UInt64" - field :time_on_page, Ch, type: "UInt64" - end -end - -defmodule Plausible.Google.ImportedEntryPage do - @moduledoc false - use Ecto.Schema - - @primary_key false - schema "imported_entry_pages" do - field :site_id, Ch, type: "UInt64" - field :date, :date - field :entry_page, :string - field :visitors, Ch, type: "UInt64" - field :entrances, Ch, type: "UInt64" - field :visit_duration, Ch, type: "UInt64" - field :bounces, Ch, type: "UInt32" - end -end - -defmodule Plausible.Google.ImportedExitPage do - @moduledoc false - use Ecto.Schema - - @primary_key false - schema "imported_exit_pages" do - field :site_id, Ch, type: "UInt64" - field :date, :date - field :exit_page, :string - field :visitors, Ch, type: "UInt64" - field :exits, Ch, type: "UInt64" - end -end - -defmodule Plausible.Google.ImportedLocation do - @moduledoc false - use Ecto.Schema - - @primary_key false - schema "imported_locations" do - field :site_id, Ch, type: "UInt64" - field :date, :date - field :country, :string - field :region, :string - field :city, Ch, type: "UInt64" - field :visitors, Ch, type: "UInt64" - field :visits, Ch, type: "UInt64" - field :visit_duration, Ch, type: "UInt64" - field :bounces, Ch, type: "UInt32" - end -end - -defmodule Plausible.Google.ImportedDevice do - @moduledoc false - use Ecto.Schema - - @primary_key false - schema "imported_devices" do - field :site_id, Ch, type: "UInt64" - field :date, :date - field :device, :string - field :visitors, Ch, type: "UInt64" - field :visits, Ch, type: "UInt64" - field :visit_duration, Ch, type: "UInt64" - field :bounces, Ch, type: "UInt32" - end -end - -defmodule Plausible.Google.ImportedBrowser do - @moduledoc false - use Ecto.Schema - - @primary_key false - schema "imported_browsers" do - field :site_id, Ch, type: "UInt64" - field :date, :date - field :browser, :string - field :visitors, Ch, type: "UInt64" - field :visits, Ch, type: "UInt64" - field :visit_duration, Ch, type: "UInt64" - field :bounces, Ch, type: "UInt32" - end -end - -defmodule Plausible.Google.ImportedOperatingSystem do - @moduledoc false - use Ecto.Schema - - @primary_key false - schema "imported_operating_systems" do - field :site_id, Ch, type: "UInt64" - field :date, :date - field :operating_system, :string - field :visitors, Ch, type: "UInt64" - field :visits, Ch, type: "UInt64" - field :visit_duration, Ch, type: "UInt64" - field :bounces, Ch, type: "UInt32" - end -end diff --git a/lib/plausible/imported/browser.ex b/lib/plausible/imported/browser.ex new file mode 100644 index 000000000..56d2a8772 --- /dev/null +++ b/lib/plausible/imported/browser.ex @@ -0,0 +1,15 @@ +defmodule Plausible.Imported.Browser do + @moduledoc false + use Ecto.Schema + + @primary_key false + schema "imported_browsers" do + field :site_id, Ch, type: "UInt64" + field :date, :date + field :browser, :string + field :visitors, Ch, type: "UInt64" + field :visits, Ch, type: "UInt64" + field :visit_duration, Ch, type: "UInt64" + field :bounces, Ch, type: "UInt32" + end +end diff --git a/lib/plausible/google/buffer.ex b/lib/plausible/imported/buffer.ex similarity index 79% rename from lib/plausible/google/buffer.ex rename to lib/plausible/imported/buffer.ex index a32679fec..554cfaf70 100644 --- a/lib/plausible/google/buffer.ex +++ b/lib/plausible/imported/buffer.ex @@ -1,4 +1,4 @@ -defmodule Plausible.Google.Buffer do +defmodule Plausible.Imported.Buffer do @moduledoc """ This GenServer inserts records into Clickhouse `imported_*` tables. Multiple buffers are automatically created for each table. Records are flushed when the table buffer reaches the @@ -81,7 +81,7 @@ defmodule Plausible.Google.Buffer do defp max_buffer_size do :plausible - |> Application.get_env(:google) + |> Application.fetch_env!(:imported) |> Keyword.fetch!(:max_buffer_size) end @@ -101,13 +101,13 @@ defmodule Plausible.Google.Buffer do Plausible.IngestRepo.insert_all(schema, records) end - defp table_schema("imported_visitors"), do: Plausible.Google.ImportedVisitor - defp table_schema("imported_sources"), do: Plausible.Google.ImportedSource - defp table_schema("imported_pages"), do: Plausible.Google.ImportedPage - defp table_schema("imported_entry_pages"), do: Plausible.Google.ImportedEntryPage - defp table_schema("imported_exit_pages"), do: Plausible.Google.ImportedExitPage - defp table_schema("imported_locations"), do: Plausible.Google.ImportedLocation - defp table_schema("imported_devices"), do: Plausible.Google.ImportedDevice - defp table_schema("imported_browsers"), do: Plausible.Google.ImportedBrowser - defp table_schema("imported_operating_systems"), do: Plausible.Google.ImportedOperatingSystem + defp table_schema("imported_visitors"), do: Plausible.Imported.Visitor + defp table_schema("imported_sources"), do: Plausible.Imported.Source + defp table_schema("imported_pages"), do: Plausible.Imported.Page + defp table_schema("imported_entry_pages"), do: Plausible.Imported.EntryPage + defp table_schema("imported_exit_pages"), do: Plausible.Imported.ExitPage + defp table_schema("imported_locations"), do: Plausible.Imported.Location + defp table_schema("imported_devices"), do: Plausible.Imported.Device + defp table_schema("imported_browsers"), do: Plausible.Imported.Browser + defp table_schema("imported_operating_systems"), do: Plausible.Imported.OperatingSystem end diff --git a/lib/plausible/imported/device.ex b/lib/plausible/imported/device.ex new file mode 100644 index 000000000..896196f29 --- /dev/null +++ b/lib/plausible/imported/device.ex @@ -0,0 +1,15 @@ +defmodule Plausible.Imported.Device do + @moduledoc false + use Ecto.Schema + + @primary_key false + schema "imported_devices" do + field :site_id, Ch, type: "UInt64" + field :date, :date + field :device, :string + field :visitors, Ch, type: "UInt64" + field :visits, Ch, type: "UInt64" + field :visit_duration, Ch, type: "UInt64" + field :bounces, Ch, type: "UInt32" + end +end diff --git a/lib/plausible/imported/entry_page.ex b/lib/plausible/imported/entry_page.ex new file mode 100644 index 000000000..7d9b7b90d --- /dev/null +++ b/lib/plausible/imported/entry_page.ex @@ -0,0 +1,15 @@ +defmodule Plausible.Imported.EntryPage do + @moduledoc false + use Ecto.Schema + + @primary_key false + schema "imported_entry_pages" do + field :site_id, Ch, type: "UInt64" + field :date, :date + field :entry_page, :string + field :visitors, Ch, type: "UInt64" + field :entrances, Ch, type: "UInt64" + field :visit_duration, Ch, type: "UInt64" + field :bounces, Ch, type: "UInt32" + end +end diff --git a/lib/plausible/imported/exit_page.ex b/lib/plausible/imported/exit_page.ex new file mode 100644 index 000000000..cbea72b84 --- /dev/null +++ b/lib/plausible/imported/exit_page.ex @@ -0,0 +1,13 @@ +defmodule Plausible.Imported.ExitPage do + @moduledoc false + use Ecto.Schema + + @primary_key false + schema "imported_exit_pages" do + field :site_id, Ch, type: "UInt64" + field :date, :date + field :exit_page, :string + field :visitors, Ch, type: "UInt64" + field :exits, Ch, type: "UInt64" + end +end diff --git a/lib/plausible/imported/import_sources.ex b/lib/plausible/imported/import_sources.ex new file mode 100644 index 000000000..83a980ef7 --- /dev/null +++ b/lib/plausible/imported/import_sources.ex @@ -0,0 +1,16 @@ +defmodule Plausible.Imported.ImportSources do + @moduledoc """ + Definitions of import sources. + """ + + @sources [ + Plausible.Imported.UniversalAnalytics, + Plausible.Imported.NoopImporter + ] + + @sources_map Map.new(@sources, &{&1.name(), &1}) + + def by_name(name) do + Map.fetch!(@sources_map, name) + end +end diff --git a/lib/plausible/imported/location.ex b/lib/plausible/imported/location.ex new file mode 100644 index 000000000..de48e5560 --- /dev/null +++ b/lib/plausible/imported/location.ex @@ -0,0 +1,17 @@ +defmodule Plausible.Imported.Location do + @moduledoc false + use Ecto.Schema + + @primary_key false + schema "imported_locations" do + field :site_id, Ch, type: "UInt64" + field :date, :date + field :country, :string + field :region, :string + field :city, Ch, type: "UInt64" + field :visitors, Ch, type: "UInt64" + field :visits, Ch, type: "UInt64" + field :visit_duration, Ch, type: "UInt64" + field :bounces, Ch, type: "UInt32" + end +end diff --git a/lib/plausible/imported/noop_importer.ex b/lib/plausible/imported/noop_importer.ex new file mode 100644 index 000000000..9f9cf0349 --- /dev/null +++ b/lib/plausible/imported/noop_importer.ex @@ -0,0 +1,22 @@ +defmodule Plausible.Imported.NoopImporter do + @moduledoc """ + Stub import implementation. + """ + + @name "Noop" + + def name(), do: @name + + def create_job(site, opts) do + Plausible.Workers.ImportAnalytics.new(%{ + "source" => @name, + "site_id" => site.id, + "error" => opts[:error] + }) + end + + def parse_args(opts), do: opts + + def import(_site, %{"error" => true}), do: {:error, "Something went wrong"} + def import(_site, _opts), do: :ok +end diff --git a/lib/plausible/imported/operating_system.ex b/lib/plausible/imported/operating_system.ex new file mode 100644 index 000000000..fc5292ddb --- /dev/null +++ b/lib/plausible/imported/operating_system.ex @@ -0,0 +1,15 @@ +defmodule Plausible.Imported.OperatingSystem do + @moduledoc false + use Ecto.Schema + + @primary_key false + schema "imported_operating_systems" do + field :site_id, Ch, type: "UInt64" + field :date, :date + field :operating_system, :string + field :visitors, Ch, type: "UInt64" + field :visits, Ch, type: "UInt64" + field :visit_duration, Ch, type: "UInt64" + field :bounces, Ch, type: "UInt32" + end +end diff --git a/lib/plausible/imported/page.ex b/lib/plausible/imported/page.ex new file mode 100644 index 000000000..7a320ba24 --- /dev/null +++ b/lib/plausible/imported/page.ex @@ -0,0 +1,16 @@ +defmodule Plausible.Imported.Page do + @moduledoc false + use Ecto.Schema + + @primary_key false + schema "imported_pages" do + field :site_id, Ch, type: "UInt64" + field :date, :date + field :hostname, :string + field :page, :string + field :visitors, Ch, type: "UInt64" + field :pageviews, Ch, type: "UInt64" + field :exits, Ch, type: "UInt64" + field :time_on_page, Ch, type: "UInt64" + end +end diff --git a/lib/plausible/imported/site.ex b/lib/plausible/imported/site.ex index 6e30daaef..e304f1e83 100644 --- a/lib/plausible/imported/site.ex +++ b/lib/plausible/imported/site.ex @@ -1,10 +1,4 @@ defmodule Plausible.Imported do - use Plausible.ClickhouseRepo - use Timex - require Logger - - @missing_values ["(none)", "(not set)", "(not provided)", "(other)"] - @tables ~w( imported_visitors imported_sources imported_pages imported_entry_pages imported_exit_pages imported_locations imported_devices imported_browsers @@ -16,179 +10,4 @@ defmodule Plausible.Imported do def forget(site) do Plausible.Purge.delete_imported_stats!(site) end - - def from_google_analytics(nil, _site_id, _metric), do: nil - - def from_google_analytics(data, site_id, table) do - Enum.reduce(data, [], fn row, acc -> - if Map.get(row.dimensions, "ga:date") in @missing_values do - acc - else - [new_from_google_analytics(site_id, table, row) | acc] - end - end) - end - - defp parse_number(nr) do - {float, ""} = Float.parse(nr) - round(float) - end - - defp new_from_google_analytics(site_id, "imported_visitors", row) do - %{ - site_id: site_id, - date: get_date(row), - visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(), - pageviews: row.metrics |> Map.fetch!("ga:pageviews") |> parse_number(), - bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number(), - visits: row.metrics |> Map.fetch!("ga:sessions") |> parse_number(), - visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number() - } - end - - defp new_from_google_analytics(site_id, "imported_sources", row) do - %{ - site_id: site_id, - date: get_date(row), - source: row.dimensions |> Map.fetch!("ga:source") |> parse_referrer(), - utm_medium: row.dimensions |> Map.fetch!("ga:medium") |> default_if_missing(), - utm_campaign: row.dimensions |> Map.fetch!("ga:campaign") |> default_if_missing(), - utm_content: row.dimensions |> Map.fetch!("ga:adContent") |> default_if_missing(), - utm_term: row.dimensions |> Map.fetch!("ga:keyword") |> default_if_missing(), - visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(), - visits: row.metrics |> Map.fetch!("ga:sessions") |> parse_number(), - bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number(), - visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number() - } - end - - defp new_from_google_analytics(site_id, "imported_pages", row) do - %{ - site_id: site_id, - date: get_date(row), - hostname: row.dimensions |> Map.fetch!("ga:hostname") |> String.replace_prefix("www.", ""), - page: row.dimensions |> Map.fetch!("ga:pagePath") |> URI.parse() |> Map.get(:path), - visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(), - pageviews: row.metrics |> Map.fetch!("ga:pageviews") |> parse_number(), - exits: row.metrics |> Map.fetch!("ga:exits") |> parse_number(), - time_on_page: row.metrics |> Map.fetch!("ga:timeOnPage") |> parse_number() - } - end - - defp new_from_google_analytics(site_id, "imported_entry_pages", row) do - %{ - site_id: site_id, - date: get_date(row), - entry_page: row.dimensions |> Map.fetch!("ga:landingPagePath"), - visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(), - entrances: row.metrics |> Map.fetch!("ga:entrances") |> parse_number(), - visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number(), - bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number() - } - end - - defp new_from_google_analytics(site_id, "imported_exit_pages", row) do - %{ - site_id: site_id, - date: get_date(row), - exit_page: Map.fetch!(row.dimensions, "ga:exitPagePath"), - visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(), - exits: row.metrics |> Map.fetch!("ga:exits") |> parse_number() - } - end - - defp new_from_google_analytics(site_id, "imported_locations", row) do - country_code = row.dimensions |> Map.fetch!("ga:countryIsoCode") |> default_if_missing("") - city_name = row.dimensions |> Map.fetch!("ga:city") |> default_if_missing("") - city_data = Location.get_city(city_name, country_code) - - %{ - site_id: site_id, - date: get_date(row), - country: country_code, - region: row.dimensions |> Map.fetch!("ga:regionIsoCode") |> default_if_missing(""), - city: city_data && city_data.id, - visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(), - visits: row.metrics |> Map.fetch!("ga:sessions") |> parse_number(), - bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number(), - visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number() - } - end - - defp new_from_google_analytics(site_id, "imported_devices", row) do - %{ - site_id: site_id, - date: get_date(row), - device: row.dimensions |> Map.fetch!("ga:deviceCategory") |> String.capitalize(), - visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(), - visits: row.metrics |> Map.fetch!("ga:sessions") |> parse_number(), - bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number(), - visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number() - } - end - - @browser_google_to_plausible %{ - "User-Agent:Opera" => "Opera", - "Mozilla Compatible Agent" => "Mobile App", - "Android Webview" => "Mobile App", - "Android Browser" => "Mobile App", - "Safari (in-app)" => "Mobile App", - "User-Agent: Mozilla" => "Firefox", - "(not set)" => "" - } - - defp new_from_google_analytics(site_id, "imported_browsers", row) do - browser = Map.fetch!(row.dimensions, "ga:browser") - - %{ - site_id: site_id, - date: get_date(row), - browser: Map.get(@browser_google_to_plausible, browser, browser), - visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(), - visits: row.metrics |> Map.fetch!("ga:sessions") |> parse_number(), - bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number(), - visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number() - } - end - - @os_google_to_plausible %{ - "Macintosh" => "Mac", - "Linux" => "GNU/Linux", - "(not set)" => "" - } - - defp new_from_google_analytics(site_id, "imported_operating_systems", row) do - os = Map.fetch!(row.dimensions, "ga:operatingSystem") - - %{ - site_id: site_id, - date: get_date(row), - operating_system: Map.get(@os_google_to_plausible, os, os), - visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(), - visits: row.metrics |> Map.fetch!("ga:sessions") |> parse_number(), - bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number(), - visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number() - } - end - - defp get_date(%{dimensions: %{"ga:date" => date}}) do - date - |> Timex.parse!("%Y%m%d", :strftime) - |> NaiveDateTime.to_date() - end - - defp default_if_missing(value, default \\ nil) - defp default_if_missing(value, default) when value in @missing_values, do: default - defp default_if_missing(value, _default), do: value - - defp parse_referrer(nil), do: nil - defp parse_referrer("(direct)"), do: nil - defp parse_referrer("google"), do: "Google" - defp parse_referrer("bing"), do: "Bing" - defp parse_referrer("duckduckgo"), do: "DuckDuckGo" - - defp parse_referrer(ref) do - RefInspector.parse("https://" <> ref) - |> PlausibleWeb.RefInspector.parse() - end end diff --git a/lib/plausible/imported/source.ex b/lib/plausible/imported/source.ex new file mode 100644 index 000000000..c19ca07c6 --- /dev/null +++ b/lib/plausible/imported/source.ex @@ -0,0 +1,19 @@ +defmodule Plausible.Imported.Source do + @moduledoc false + use Ecto.Schema + + @primary_key false + schema "imported_sources" do + field :site_id, Ch, type: "UInt64" + field :date, :date + field :source, :string + field :utm_medium, :string + field :utm_campaign, :string + field :utm_content, :string + field :utm_term, :string + field :visitors, Ch, type: "UInt64" + field :visits, Ch, type: "UInt64" + field :visit_duration, Ch, type: "UInt64" + field :bounces, Ch, type: "UInt32" + end +end diff --git a/lib/plausible/imported/universal_analytics.ex b/lib/plausible/imported/universal_analytics.ex new file mode 100644 index 000000000..243704a7e --- /dev/null +++ b/lib/plausible/imported/universal_analytics.ex @@ -0,0 +1,271 @@ +defmodule Plausible.Imported.UniversalAnalytics do + @moduledoc """ + Import implementation for Universal Analytics. + """ + + use Plausible.ClickhouseRepo + + alias Plausible.Site + + @missing_values ["(none)", "(not set)", "(not provided)", "(other)"] + + @type job_opt() :: + {:view_id, non_neg_integer()} + | {:start_date | :end_date | :access_token | :refresh_token | :token_expires_at, + String.t()} + + @type import_opt() :: + {:view_id, non_neg_integer()} + | {:date_range, Date.Range.t()} + | {:auth, {String.t(), String.t(), String.t()}} + + # NOTE: we have to use old name for now + @name "Google Analytics" + + @spec name() :: String.t() + def name(), do: @name + + @spec create_job(Site.t(), [job_opt()]) :: Ecto.Changeset.t() + def create_job(site, opts) do + view_id = Keyword.fetch!(opts, :view_id) + start_date = Keyword.fetch!(opts, :start_date) + end_date = Keyword.fetch!(opts, :end_date) + access_token = Keyword.fetch!(opts, :access_token) + refresh_token = Keyword.fetch!(opts, :refresh_token) + token_expires_at = Keyword.fetch!(opts, :token_expires_at) + + Plausible.Workers.ImportAnalytics.new(%{ + "source" => @name, + "site_id" => site.id, + "view_id" => view_id, + "start_date" => start_date, + "end_date" => end_date, + "access_token" => access_token, + "refresh_token" => refresh_token, + "token_expires_at" => token_expires_at + }) + end + + @spec parse_args(map()) :: [import_opt()] + def parse_args( + %{"view_id" => view_id, "start_date" => start_date, "end_date" => end_date} = args + ) do + start_date = Date.from_iso8601!(start_date) + end_date = Date.from_iso8601!(end_date) + date_range = Date.range(start_date, end_date) + + auth = { + Map.fetch!(args, "access_token"), + Map.fetch!(args, "refresh_token"), + Map.fetch!(args, "token_expires_at") + } + + [ + view_id: view_id, + date_range: date_range, + auth: auth + ] + end + + @doc """ + Imports stats from a Google Analytics UA view to a Plausible site. + + This function fetches Google Analytics reports which are then passed in batches + to Clickhouse by the `Plausible.Imported.Buffer` process. + """ + @spec import(Site.t(), [import_opt()]) :: :ok | {:error, any()} + def import(site, opts) do + date_range = Keyword.fetch!(opts, :date_range) + view_id = Keyword.fetch!(opts, :view_id) + auth = Keyword.fetch!(opts, :auth) + + {:ok, buffer} = Plausible.Imported.Buffer.start_link() + + persist_fn = fn table, rows -> + records = from_report(rows, site.id, table) + Plausible.Imported.Buffer.insert_many(buffer, table, records) + end + + try do + Plausible.Google.Api.import_analytics(date_range, view_id, auth, persist_fn) + after + Plausible.Imported.Buffer.flush(buffer) + Plausible.Imported.Buffer.stop(buffer) + end + end + + def from_report(nil, _site_id, _metric), do: nil + + def from_report(data, site_id, table) do + Enum.reduce(data, [], fn row, acc -> + if Map.get(row.dimensions, "ga:date") in @missing_values do + acc + else + [new_from_report(site_id, table, row) | acc] + end + end) + end + + defp parse_number(nr) do + {float, ""} = Float.parse(nr) + round(float) + end + + defp new_from_report(site_id, "imported_visitors", row) do + %{ + site_id: site_id, + date: get_date(row), + visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(), + pageviews: row.metrics |> Map.fetch!("ga:pageviews") |> parse_number(), + bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number(), + visits: row.metrics |> Map.fetch!("ga:sessions") |> parse_number(), + visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number() + } + end + + defp new_from_report(site_id, "imported_sources", row) do + %{ + site_id: site_id, + date: get_date(row), + source: row.dimensions |> Map.fetch!("ga:source") |> parse_referrer(), + utm_medium: row.dimensions |> Map.fetch!("ga:medium") |> default_if_missing(), + utm_campaign: row.dimensions |> Map.fetch!("ga:campaign") |> default_if_missing(), + utm_content: row.dimensions |> Map.fetch!("ga:adContent") |> default_if_missing(), + utm_term: row.dimensions |> Map.fetch!("ga:keyword") |> default_if_missing(), + visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(), + visits: row.metrics |> Map.fetch!("ga:sessions") |> parse_number(), + bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number(), + visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number() + } + end + + defp new_from_report(site_id, "imported_pages", row) do + %{ + site_id: site_id, + date: get_date(row), + hostname: row.dimensions |> Map.fetch!("ga:hostname") |> String.replace_prefix("www.", ""), + page: row.dimensions |> Map.fetch!("ga:pagePath") |> URI.parse() |> Map.get(:path), + visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(), + pageviews: row.metrics |> Map.fetch!("ga:pageviews") |> parse_number(), + exits: row.metrics |> Map.fetch!("ga:exits") |> parse_number(), + time_on_page: row.metrics |> Map.fetch!("ga:timeOnPage") |> parse_number() + } + end + + defp new_from_report(site_id, "imported_entry_pages", row) do + %{ + site_id: site_id, + date: get_date(row), + entry_page: row.dimensions |> Map.fetch!("ga:landingPagePath"), + visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(), + entrances: row.metrics |> Map.fetch!("ga:entrances") |> parse_number(), + visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number(), + bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number() + } + end + + defp new_from_report(site_id, "imported_exit_pages", row) do + %{ + site_id: site_id, + date: get_date(row), + exit_page: Map.fetch!(row.dimensions, "ga:exitPagePath"), + visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(), + exits: row.metrics |> Map.fetch!("ga:exits") |> parse_number() + } + end + + defp new_from_report(site_id, "imported_locations", row) do + country_code = row.dimensions |> Map.fetch!("ga:countryIsoCode") |> default_if_missing("") + city_name = row.dimensions |> Map.fetch!("ga:city") |> default_if_missing("") + city_data = Location.get_city(city_name, country_code) + + %{ + site_id: site_id, + date: get_date(row), + country: country_code, + region: row.dimensions |> Map.fetch!("ga:regionIsoCode") |> default_if_missing(""), + city: city_data && city_data.id, + visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(), + visits: row.metrics |> Map.fetch!("ga:sessions") |> parse_number(), + bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number(), + visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number() + } + end + + defp new_from_report(site_id, "imported_devices", row) do + %{ + site_id: site_id, + date: get_date(row), + device: row.dimensions |> Map.fetch!("ga:deviceCategory") |> String.capitalize(), + visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(), + visits: row.metrics |> Map.fetch!("ga:sessions") |> parse_number(), + bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number(), + visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number() + } + end + + @browser_google_to_plausible %{ + "User-Agent:Opera" => "Opera", + "Mozilla Compatible Agent" => "Mobile App", + "Android Webview" => "Mobile App", + "Android Browser" => "Mobile App", + "Safari (in-app)" => "Mobile App", + "User-Agent: Mozilla" => "Firefox", + "(not set)" => "" + } + + defp new_from_report(site_id, "imported_browsers", row) do + browser = Map.fetch!(row.dimensions, "ga:browser") + + %{ + site_id: site_id, + date: get_date(row), + browser: Map.get(@browser_google_to_plausible, browser, browser), + visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(), + visits: row.metrics |> Map.fetch!("ga:sessions") |> parse_number(), + bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number(), + visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number() + } + end + + @os_google_to_plausible %{ + "Macintosh" => "Mac", + "Linux" => "GNU/Linux", + "(not set)" => "" + } + + defp new_from_report(site_id, "imported_operating_systems", row) do + os = Map.fetch!(row.dimensions, "ga:operatingSystem") + + %{ + site_id: site_id, + date: get_date(row), + operating_system: Map.get(@os_google_to_plausible, os, os), + visitors: row.metrics |> Map.fetch!("ga:users") |> parse_number(), + visits: row.metrics |> Map.fetch!("ga:sessions") |> parse_number(), + bounces: row.metrics |> Map.fetch!("ga:bounces") |> parse_number(), + visit_duration: row.metrics |> Map.fetch!("ga:sessionDuration") |> parse_number() + } + end + + defp get_date(%{dimensions: %{"ga:date" => date}}) do + date + |> Timex.parse!("%Y%m%d", :strftime) + |> NaiveDateTime.to_date() + end + + defp default_if_missing(value, default \\ nil) + defp default_if_missing(value, default) when value in @missing_values, do: default + defp default_if_missing(value, _default), do: value + + defp parse_referrer(nil), do: nil + defp parse_referrer("(direct)"), do: nil + defp parse_referrer("google"), do: "Google" + defp parse_referrer("bing"), do: "Bing" + defp parse_referrer("duckduckgo"), do: "DuckDuckGo" + + defp parse_referrer(ref) do + RefInspector.parse("https://" <> ref) + |> PlausibleWeb.RefInspector.parse() + end +end diff --git a/lib/plausible/imported/visitor.ex b/lib/plausible/imported/visitor.ex new file mode 100644 index 000000000..7cad7a196 --- /dev/null +++ b/lib/plausible/imported/visitor.ex @@ -0,0 +1,15 @@ +defmodule Plausible.Imported.Visitor do + @moduledoc false + use Ecto.Schema + + @primary_key false + schema "imported_visitors" do + field :site_id, Ch, type: "UInt64" + field :date, :date + field :visitors, Ch, type: "UInt64" + field :pageviews, Ch, type: "UInt64" + field :bounces, Ch, type: "UInt64" + field :visits, Ch, type: "UInt64" + field :visit_duration, Ch, type: "UInt64" + end +end diff --git a/lib/plausible/site/imported_data.ex b/lib/plausible/site/imported_data.ex index f63d736a7..c1db36cae 100644 --- a/lib/plausible/site/imported_data.ex +++ b/lib/plausible/site/imported_data.ex @@ -1,6 +1,6 @@ defmodule Plausible.Site.ImportedData do @moduledoc """ - Embedded schema for Google Analytics imports + Embedded schema for analytics imports """ use Ecto.Schema diff --git a/lib/plausible_web/controllers/site_controller.ex b/lib/plausible_web/controllers/site_controller.ex index 8ab575fc5..d4ddd4fe2 100644 --- a/lib/plausible_web/controllers/site_controller.ex +++ b/lib/plausible_web/controllers/site_controller.ex @@ -751,23 +751,24 @@ defmodule PlausibleWeb.SiteController do "refresh_token" => refresh_token, "expires_at" => expires_at }) do - site = conn.assigns[:site] + site = conn.assigns.site + source = "Google Analytics" job = - Plausible.Workers.ImportGoogleAnalytics.new(%{ - "site_id" => site.id, - "view_id" => view_id, - "start_date" => start_date, - "end_date" => end_date, - "access_token" => access_token, - "refresh_token" => refresh_token, - "token_expires_at" => expires_at - }) + Plausible.Imported.UniversalAnalytics.create_job( + site, + view_id: view_id, + start_date: start_date, + end_date: end_date, + access_token: access_token, + refresh_token: refresh_token, + token_expires_at: expires_at + ) Ecto.Multi.new() |> Ecto.Multi.update( :update_site, - Plausible.Site.start_import(site, start_date, end_date, "Google Analytics") + Plausible.Site.start_import(site, start_date, end_date, source) ) |> Oban.insert(:oban_job, job) |> Repo.transaction() @@ -777,6 +778,9 @@ defmodule PlausibleWeb.SiteController do |> redirect(external: Routes.site_path(conn, :settings_integrations, site.domain)) end + # NOTE: To be cleaned up once #3700 is released + @analytics_queues ["analytics_imports", "google_analytics_imports"] + def forget_imported(conn, _params) do site = conn.assigns[:site] @@ -785,7 +789,7 @@ defmodule PlausibleWeb.SiteController do Oban.cancel_all_jobs( from j in Oban.Job, where: - j.queue == "google_analytics_imports" and + j.queue in @analytics_queues and fragment("(? ->> 'site_id')::int", j.args) == ^site.id ) diff --git a/lib/plausible_web/email.ex b/lib/plausible_web/email.ex index 81aaff7e9..427f031f1 100644 --- a/lib/plausible_web/email.ex +++ b/lib/plausible_web/email.ex @@ -313,11 +313,12 @@ defmodule PlausibleWeb.Email do ) end - def import_success(user, site) do + # NOTE: make email different depending on import source + def import_success(source, user, site) do priority_email() |> to(user) |> tag("import-success-email") - |> subject("Google Analytics data imported for #{site.domain}") + |> subject("#{source} data imported for #{site.domain}") |> render("google_analytics_import.html", %{ site: site, link: PlausibleWeb.Endpoint.url() <> "/" <> URI.encode_www_form(site.domain), @@ -326,11 +327,12 @@ defmodule PlausibleWeb.Email do }) end - def import_failure(user, site) do + # NOTE: make email different depending on import source + def import_failure(source, user, site) do priority_email() |> to(user) |> tag("import-failure-email") - |> subject("Google Analytics import failed for #{site.domain}") + |> subject("#{source} import failed for #{site.domain}") |> render("google_analytics_import.html", %{ user: user, site: site, diff --git a/lib/workers/import_analytics.ex b/lib/workers/import_analytics.ex new file mode 100644 index 000000000..0be5885bb --- /dev/null +++ b/lib/workers/import_analytics.ex @@ -0,0 +1,79 @@ +defmodule Plausible.Workers.ImportAnalytics do + @moduledoc """ + Worker for running analytics import jobs. + """ + + use Plausible.Repo + require Logger + + use Oban.Worker, + queue: :analytics_imports, + max_attempts: 3, + unique: [fields: [:args], period: 60] + + alias Plausible.Imported.ImportSources + + @impl Oban.Worker + def perform(%Oban.Job{ + args: %{"site_id" => site_id, "source" => source} = args + }) do + import_api = ImportSources.by_name(source) + import_opts = import_api.parse_args(args) + + site = Repo.get!(Plausible.Site, site_id) + + case import_api.import(site, import_opts) do + :ok -> + import_success(source, site) + + :ok + + {:error, error} -> + Sentry.capture_message("Failed to import from Google Analytics", + extra: %{site: site.domain, error: inspect(error)} + ) + + import_failed(source, site) + + {:error, error} + end + end + + @impl Oban.Worker + def backoff(_job) do + # 5 minutes + 300 + end + + def import_success(source, site) do + site = Repo.preload(site, memberships: :user) + + site + |> Plausible.Site.import_success() + |> Repo.update!() + + Enum.each(site.memberships, fn membership -> + if membership.role in [:owner, :admin] do + PlausibleWeb.Email.import_success(source, membership.user, site) + |> Plausible.Mailer.send() + end + end) + end + + def import_failed(source, site) do + site = Repo.preload(site, memberships: :user) + + site + |> Plausible.Site.import_failure() + |> Repo.update!() + + Plausible.Purge.delete_imported_stats!(site) + + Enum.each(site.memberships, fn membership -> + if membership.role in [:owner, :admin] do + PlausibleWeb.Email.import_failure(source, membership.user, site) + |> Plausible.Mailer.send() + end + end) + end +end diff --git a/lib/workers/import_google_analytics.ex b/lib/workers/import_google_analytics.ex deleted file mode 100644 index 41714069b..000000000 --- a/lib/workers/import_google_analytics.ex +++ /dev/null @@ -1,71 +0,0 @@ -defmodule Plausible.Workers.ImportGoogleAnalytics do - use Plausible.Repo - require Logger - - use Oban.Worker, - queue: :google_analytics_imports, - max_attempts: 3, - unique: [fields: [:args], period: 60] - - @impl Oban.Worker - def perform( - %Oban.Job{ - args: - %{ - "site_id" => site_id, - "view_id" => view_id, - "start_date" => start_date, - "end_date" => end_date - } = args - }, - google_api \\ Plausible.Google.Api - ) do - site = Repo.get(Plausible.Site, site_id) |> Repo.preload([[memberships: :user]]) - start_date = Date.from_iso8601!(start_date) - end_date = Date.from_iso8601!(end_date) - date_range = Date.range(start_date, end_date) - - auth = {args["access_token"], args["refresh_token"], args["token_expires_at"]} - - case google_api.import_analytics(site, date_range, view_id, auth) do - :ok -> - Plausible.Site.import_success(site) - |> Repo.update!() - - Enum.each(site.memberships, fn membership -> - if membership.role in [:owner, :admin] do - PlausibleWeb.Email.import_success(membership.user, site) - |> Plausible.Mailer.send() - end - end) - - :ok - - {:error, error} -> - Logger.error("Import: Failed to import from GA. Reason: #{inspect(error)}") - import_failed(site) - - {:error, error} - end - end - - @impl Oban.Worker - def backoff(_job) do - # 5 minutes - 300 - end - - def import_failed(site) do - site = Repo.preload(site, memberships: :user) - - Plausible.Site.import_failure(site) |> Repo.update!() - Plausible.Purge.delete_imported_stats!(site) - - Enum.each(site.memberships, fn membership -> - if membership.role in [:owner, :admin] do - PlausibleWeb.Email.import_failure(membership.user, site) - |> Plausible.Mailer.send() - end - end) - end -end diff --git a/test/plausible/google/api_test.exs b/test/plausible/google/api_test.exs index 64b580eed..92897270a 100644 --- a/test/plausible/google/api_test.exs +++ b/test/plausible/google/api_test.exs @@ -3,6 +3,7 @@ defmodule Plausible.Google.ApiTest do use Plausible.Test.Support.HTTPMocker alias Plausible.Google.Api + alias Plausible.Imported.UniversalAnalytics import ExUnit.CaptureLog import Mox @@ -36,7 +37,18 @@ defmodule Plausible.Google.ApiTest do future = DateTime.utc_now() |> DateTime.add(3600, :second) |> DateTime.to_iso8601() auth = {"***", "refresh_token", future} - assert :ok == Plausible.Google.Api.import_analytics(site, date_range, view_id, auth) + {:ok, buffer} = Plausible.Imported.Buffer.start_link() + + persist_fn = fn table, rows -> + records = UniversalAnalytics.from_report(rows, site.id, table) + Plausible.Imported.Buffer.insert_many(buffer, table, records) + end + + assert :ok == Plausible.Google.Api.import_analytics(date_range, view_id, auth, persist_fn) + + Plausible.Imported.Buffer.flush(buffer) + Plausible.Imported.Buffer.stop(buffer) + assert 1_495_150 == Plausible.Stats.Clickhouse.imported_pageview_count(site) end @@ -67,20 +79,25 @@ defmodule Plausible.Google.ApiTest do end) end - assert :ok == Plausible.Google.Api.import_analytics(site, range, "123551", auth) + {:ok, buffer} = Plausible.Imported.Buffer.start_link() + + persist_fn = fn table, rows -> + records = UniversalAnalytics.from_report(rows, site.id, table) + Plausible.Imported.Buffer.insert_many(buffer, table, records) + end + + assert :ok == Plausible.Google.Api.import_analytics(range, "123551", auth, persist_fn) + + Plausible.Imported.Buffer.flush(buffer) + Plausible.Imported.Buffer.stop(buffer) end describe "fetch_and_persist/4" do @ok_response Jason.decode!(File.read!("fixture/ga_batch_report.json")) @no_report_response Jason.decode!(File.read!("fixture/ga_report_empty_rows.json")) - setup do - {:ok, pid} = Plausible.Google.Buffer.start_link() - {:ok, buffer: pid} - end - @tag :slow - test "will fetch and persist import data from Google Analytics", %{site: site, buffer: buffer} do + test "will fetch and persist import data from Google Analytics" do request = %Plausible.Google.ReportRequest{ dataset: "imported_exit_pages", view_id: "123", @@ -121,24 +138,19 @@ defmodule Plausible.Google.ApiTest do end ) - Api.fetch_and_persist(site, request, - sleep_time: 0, - buffer: buffer - ) + assert :ok = + Api.fetch_and_persist(request, + sleep_time: 0, + persist_fn: fn dataset, row -> + assert dataset == "imported_exit_pages" + assert length(row) == 1479 - Plausible.Google.Buffer.flush(buffer) - - assert 1479 == - Plausible.ClickhouseRepo.aggregate( - from(iex in "imported_exit_pages", where: iex.site_id == ^site.id), - :count + :ok + end ) end - test "retries HTTP request up to 5 times before raising the last error", %{ - site: site, - buffer: buffer - } do + test "retries HTTP request up to 5 times before raising the last error" do expect( Plausible.HTTPClient.Mock, :post, @@ -166,13 +178,13 @@ defmodule Plausible.Google.ApiTest do } assert {:error, :request_failed} = - Api.fetch_and_persist(site, request, + Api.fetch_and_persist(request, sleep_time: 0, - buffer: buffer + persist_fn: fn _dataset, _rows -> :ok end ) end - test "does not fail when report does not have rows key", %{site: site, buffer: buffer} do + test "does not fail when report does not have rows key" do expect( Plausible.HTTPClient.Mock, :post, @@ -197,9 +209,14 @@ defmodule Plausible.Google.ApiTest do } assert :ok == - Api.fetch_and_persist(site, request, + Api.fetch_and_persist(request, sleep_time: 0, - buffer: buffer + persist_fn: fn dataset, rows -> + assert dataset == "imported_exit_pages" + assert rows == [] + + :ok + end ) end end diff --git a/test/plausible/google/buffer_test.exs b/test/plausible/imported/buffer_test.exs similarity index 94% rename from test/plausible/google/buffer_test.exs rename to test/plausible/imported/buffer_test.exs index 32be287ad..473079d7c 100644 --- a/test/plausible/google/buffer_test.exs +++ b/test/plausible/imported/buffer_test.exs @@ -1,14 +1,14 @@ -defmodule Plausible.Google.BufferTest do +defmodule Plausible.Imported.BufferTest do use Plausible.DataCase, async: false import Ecto.Query - alias Plausible.Google.Buffer + alias Plausible.Imported.Buffer setup [:create_user, :create_new_site, :set_buffer_size] defp set_buffer_size(_setup_args) do - google_setting = Application.get_env(:plausible, :google) - patch_env(:google, Keyword.put(google_setting, :max_buffer_size, 10)) + imported_setting = Application.fetch_env!(:plausible, :imported) + patch_env(:imported, Keyword.put(imported_setting, :max_buffer_size, 10)) :ok end diff --git a/test/plausible/imported/imported_test.exs b/test/plausible/imported/imported_test.exs index 1b86b1cb7..dead7ba12 100644 --- a/test/plausible/imported/imported_test.exs +++ b/test/plausible/imported/imported_test.exs @@ -6,8 +6,8 @@ defmodule Plausible.ImportedTest do defp import_data(ga_data, site_id, table_name) do ga_data - |> Plausible.Imported.from_google_analytics(site_id, table_name) - |> then(&Plausible.Google.Buffer.insert_all(table_name, &1)) + |> Plausible.Imported.UniversalAnalytics.from_report(site_id, table_name) + |> then(&Plausible.Imported.Buffer.insert_all(table_name, &1)) end describe "Parse and import third party data fetched from Google Analytics" do diff --git a/test/plausible/imported/universal_analytics_test.exs b/test/plausible/imported/universal_analytics_test.exs new file mode 100644 index 000000000..6d842be7a --- /dev/null +++ b/test/plausible/imported/universal_analytics_test.exs @@ -0,0 +1,70 @@ +defmodule Plausible.Imported.UniversalAnalyticsTest do + use Plausible.DataCase, async: true + use Plausible.Test.Support.HTTPMocker + + alias Plausible.Imported.UniversalAnalytics + + setup [:create_user, :create_new_site] + + describe "create_job/2 and parse_args/1" do + test "parses job args properly" do + site = insert(:site) + site_id = site.id + expires_at = NaiveDateTime.to_iso8601(NaiveDateTime.utc_now()) + + job = + UniversalAnalytics.create_job(site, + view_id: 123, + start_date: "2023-10-01", + end_date: "2024-01-02", + access_token: "access123", + refresh_token: "refresh123", + token_expires_at: expires_at + ) + + assert %Ecto.Changeset{ + data: %Oban.Job{}, + changes: %{ + args: + %{ + "site_id" => ^site_id, + "view_id" => 123, + "start_date" => "2023-10-01", + "end_date" => "2024-01-02", + "access_token" => "access123", + "refresh_token" => "refresh123", + "token_expires_at" => ^expires_at + } = args + } + } = job + + assert opts = [_ | _] = UniversalAnalytics.parse_args(args) + + assert opts[:view_id] == 123 + assert opts[:date_range] == Date.range(~D[2023-10-01], ~D[2024-01-02]) + assert opts[:auth] == {"access123", "refresh123", expires_at} + end + end + + describe "import/2" do + @tag :slow + test "imports page views from Google Analytics", %{site: site} do + mock_http_with("google_analytics_import#1.json") + + view_id = "54297898" + date_range = Date.range(~D[2011-01-01], ~D[2022-07-19]) + + future = DateTime.utc_now() |> DateTime.add(3600, :second) |> DateTime.to_iso8601() + auth = {"***", "refresh_token", future} + + assert :ok == + UniversalAnalytics.import(site, + date_range: date_range, + view_id: view_id, + auth: auth + ) + + assert 1_495_150 == Plausible.Stats.Clickhouse.imported_pageview_count(site) + end + end +end diff --git a/test/plausible_web/controllers/site_controller_test.exs b/test/plausible_web/controllers/site_controller_test.exs index cc65ddb37..b08d878c9 100644 --- a/test/plausible_web/controllers/site_controller_test.exs +++ b/test/plausible_web/controllers/site_controller_test.exs @@ -1289,8 +1289,9 @@ defmodule PlausibleWeb.SiteControllerTest do }) assert_enqueued( - worker: Plausible.Workers.ImportGoogleAnalytics, + worker: Plausible.Workers.ImportAnalytics, args: %{ + "source" => "Google Analytics", "site_id" => site.id, "view_id" => "123", "start_date" => "2018-03-01", @@ -1330,7 +1331,8 @@ defmodule PlausibleWeb.SiteControllerTest do test "cancels Oban job if it exists", %{conn: conn, site: site} do {:ok, job} = - Plausible.Workers.ImportGoogleAnalytics.new(%{ + Plausible.Workers.ImportAnalytics.new(%{ + "source" => "Google Analytics", "site_id" => site.id, "view_id" => "123", "start_date" => "2022-01-01", diff --git a/test/support/test_utils.ex b/test/support/test_utils.ex index 01ae5a2cf..034447acf 100644 --- a/test/support/test_utils.ex +++ b/test/support/test_utils.ex @@ -196,7 +196,7 @@ defmodule Plausible.TestUtils do defp populate_imported_stats(events) do Enum.group_by(events, &Map.fetch!(&1, :table), &Map.delete(&1, :table)) - |> Enum.map(fn {table, events} -> Plausible.Google.Buffer.insert_all(table, events) end) + |> Enum.map(fn {table, events} -> Plausible.Imported.Buffer.insert_all(table, events) end) end def relative_time(shifts) do diff --git a/test/workers/import_analytics_test.exs b/test/workers/import_analytics_test.exs new file mode 100644 index 000000000..cb9782ccb --- /dev/null +++ b/test/workers/import_analytics_test.exs @@ -0,0 +1,125 @@ +defmodule Plausible.Workers.ImportAnalyticsTest do + use Plausible.DataCase + use Bamboo.Test + + alias Plausible.Workers.ImportAnalytics + + @moduletag capture_log: true + + describe "Universal Analytics" do + setup do + %{ + imported_data: %Plausible.Site.ImportedData{ + start_date: Timex.today() |> Timex.shift(days: -7), + end_date: Timex.today(), + source: "Noop", + status: "importing" + } + } + end + + test "updates the imported_data field for the site after successful import", %{ + imported_data: imported_data + } do + user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1)) + site = insert(:site, members: [user], imported_data: imported_data) + + ImportAnalytics.perform(%Oban.Job{ + args: %{ + "source" => "Noop", + "site_id" => site.id + } + }) + + assert Repo.reload!(site).imported_data.status == "ok" + end + + test "updates the stats_start_date field for the site after successful import", %{ + imported_data: imported_data + } do + user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1)) + site = insert(:site, members: [user], imported_data: imported_data) + + ImportAnalytics.perform(%Oban.Job{ + args: %{ + "source" => "Noop", + "site_id" => site.id + } + }) + + assert Repo.reload!(site).stats_start_date == imported_data.start_date + end + + test "sends email to owner after successful import", %{imported_data: imported_data} do + user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1)) + site = insert(:site, members: [user], imported_data: imported_data) + + ImportAnalytics.perform(%Oban.Job{ + args: %{ + "source" => "Noop", + "site_id" => site.id + } + }) + + assert_email_delivered_with( + to: [user], + subject: "Noop data imported for #{site.domain}" + ) + end + + test "updates site record after failed import", %{imported_data: imported_data} do + user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1)) + site = insert(:site, members: [user], imported_data: imported_data) + + ImportAnalytics.perform(%Oban.Job{ + args: %{ + "source" => "Noop", + "site_id" => site.id, + "error" => true + } + }) + + assert Repo.reload!(site).imported_data.status == "error" + end + + test "clears any orphaned data during import", %{imported_data: imported_data} do + user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1)) + site = insert(:site, members: [user], imported_data: imported_data) + + populate_stats(site, [ + build(:imported_visitors, pageviews: 10) + ]) + + ImportAnalytics.perform(%Oban.Job{ + args: %{ + "source" => "Noop", + "site_id" => site.id, + "error" => true + } + }) + + assert eventually(fn -> + count = Plausible.Stats.Clickhouse.imported_pageview_count(site) + {count == 0, count} + end) + end + + test "sends email to owner after failed import", %{imported_data: imported_data} do + user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1)) + site = insert(:site, members: [user], imported_data: imported_data) + + ImportAnalytics.perform(%Oban.Job{ + args: %{ + "source" => "Noop", + "site_id" => site.id, + "error" => true + } + }) + + assert_email_delivered_with( + to: [user], + subject: "Noop import failed for #{site.domain}" + ) + end + end +end diff --git a/test/workers/import_google_analytics_test.exs b/test/workers/import_google_analytics_test.exs deleted file mode 100644 index c7440c978..000000000 --- a/test/workers/import_google_analytics_test.exs +++ /dev/null @@ -1,197 +0,0 @@ -defmodule Plausible.Workers.ImportGoogleAnalyticsTest do - use Plausible.DataCase - use Bamboo.Test - import Double - - alias Plausible.Workers.ImportGoogleAnalytics - - @moduletag capture_log: true - - @imported_data %Plausible.Site.ImportedData{ - start_date: Timex.today() |> Timex.shift(days: -7), - end_date: Timex.today(), - source: "Google Analytics", - status: "importing" - } - - test "updates the imported_data field for the site after successful import" do - user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1)) - site = insert(:site, members: [user], imported_data: @imported_data) - - api_stub = - stub(Plausible.Google.Api, :import_analytics, fn _site, - _date_range, - _view_id, - _access_token -> - :ok - end) - - ImportGoogleAnalytics.perform( - %Oban.Job{ - args: %{ - "site_id" => site.id, - "view_id" => "view_id", - "start_date" => "2020-01-01", - "end_date" => "2022-01-01", - "access_token" => "token" - } - }, - api_stub - ) - - assert Repo.reload!(site).imported_data.status == "ok" - end - - test "updates the stats_start_date field for the site after successful import" do - user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1)) - site = insert(:site, members: [user], imported_data: @imported_data) - - api_stub = - stub(Plausible.Google.Api, :import_analytics, fn _site, - _date_range, - _view_id, - _access_token -> - :ok - end) - - ImportGoogleAnalytics.perform( - %Oban.Job{ - args: %{ - "site_id" => site.id, - "view_id" => "view_id", - "start_date" => "2020-01-01", - "end_date" => "2022-01-01", - "access_token" => "token" - } - }, - api_stub - ) - - assert Repo.reload!(site).stats_start_date == @imported_data.start_date - end - - test "sends email to owner after successful import" do - user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1)) - site = insert(:site, members: [user], imported_data: @imported_data) - - api_stub = - stub(Plausible.Google.Api, :import_analytics, fn _site, - _date_range, - _view_id, - _access_token -> - :ok - end) - - ImportGoogleAnalytics.perform( - %Oban.Job{ - args: %{ - "site_id" => site.id, - "view_id" => "view_id", - "start_date" => "2020-01-01", - "end_date" => "2022-01-01", - "access_token" => "token" - } - }, - api_stub - ) - - assert_email_delivered_with( - to: [user], - subject: "Google Analytics data imported for #{site.domain}" - ) - end - - test "updates site record after failed import" do - user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1)) - site = insert(:site, members: [user], imported_data: @imported_data) - - api_stub = - stub(Plausible.Google.Api, :import_analytics, fn _site, - _date_range, - _view_id, - _access_token -> - {:error, "Something went wrong"} - end) - - ImportGoogleAnalytics.perform( - %Oban.Job{ - args: %{ - "site_id" => site.id, - "view_id" => "view_id", - "start_date" => "2020-01-01", - "end_date" => "2022-01-01", - "access_token" => "token" - } - }, - api_stub - ) - - assert Repo.reload!(site).imported_data.status == "error" - end - - test "clears any orphaned data during import" do - user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1)) - site = insert(:site, members: [user], imported_data: @imported_data) - - populate_stats(site, [ - build(:imported_visitors, pageviews: 10) - ]) - - api_stub = - stub(Plausible.Google.Api, :import_analytics, fn _site, - _date_range, - _view_id, - _access_token -> - {:error, "Something went wrong"} - end) - - ImportGoogleAnalytics.perform( - %Oban.Job{ - args: %{ - "site_id" => site.id, - "view_id" => "view_id", - "start_date" => "2020-01-01", - "end_date" => "2022-01-01", - "access_token" => "token" - } - }, - api_stub - ) - - assert eventually(fn -> - count = Plausible.Stats.Clickhouse.imported_pageview_count(site) - {count == 0, count} - end) - end - - test "sends email to owner after failed import" do - user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1)) - site = insert(:site, members: [user], imported_data: @imported_data) - - api_stub = - stub(Plausible.Google.Api, :import_analytics, fn _site, - _date_range, - _view_id, - _access_token -> - {:error, "Something went wrong"} - end) - - ImportGoogleAnalytics.perform( - %Oban.Job{ - args: %{ - "site_id" => site.id, - "view_id" => "view_id", - "start_date" => "2020-01-01", - "end_date" => "2022-01-01", - "access_token" => "token" - } - }, - api_stub - ) - - assert_email_delivered_with( - to: [user], - subject: "Google Analytics import failed for #{site.domain}" - ) - end -end