Implement resumable GA4 imports to work around rate limiting (#4049)

* Add support for resuming import to GA4 importer

* Handle rate limiting gracefully for all remainig GA4 HTTP requests

* Show notice tooltip for long running imports

* Bump resume job schedule delay to 65 minutes

* Fix tooltip styling
This commit is contained in:
Adrian Gruntkowski 2024-04-30 18:06:18 +02:00 committed by GitHub
parent dd493fdad2
commit 41fef85d29
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 469 additions and 52 deletions

View File

@ -48,7 +48,7 @@ defmodule ObanErrorReporter do
site_import = Plausible.Repo.get(Plausible.Imported.SiteImport, import_id) site_import = Plausible.Repo.get(Plausible.Imported.SiteImport, import_id)
if site_import do if site_import do
Plausible.Workers.ImportAnalytics.import_fail(site_import) Plausible.Workers.ImportAnalytics.import_fail(site_import, [])
end end
end end

View File

@ -71,17 +71,20 @@ defmodule Plausible.Google.GA4.API do
GA4.HTTP.get_analytics_end_date(access_token, property) GA4.HTTP.get_analytics_end_date(access_token, property)
end end
def import_analytics(date_range, property, auth, persist_fn) do def import_analytics(date_range, property, auth, opts) do
persist_fn = Keyword.fetch!(opts, :persist_fn)
resume_opts = Keyword.get(opts, :resume_opts, [])
Logger.debug( Logger.debug(
"[#{inspect(__MODULE__)}:#{property}] Starting import from #{date_range.first} to #{date_range.last}" "[#{inspect(__MODULE__)}:#{property}] Starting import from #{date_range.first} to #{date_range.last}"
) )
with {:ok, access_token} <- Google.API.maybe_refresh_token(auth) do with {:ok, access_token} <- Google.API.maybe_refresh_token(auth) do
do_import_analytics(date_range, property, access_token, persist_fn) do_import_analytics(date_range, property, access_token, persist_fn, resume_opts)
end end
end end
defp do_import_analytics(date_range, property, access_token, persist_fn) do defp do_import_analytics(date_range, property, access_token, persist_fn, [] = _resume_opts) do
Enum.reduce_while(GA4.ReportRequest.full_report(), :ok, fn report_request, :ok -> Enum.reduce_while(GA4.ReportRequest.full_report(), :ok, fn report_request, :ok ->
Logger.debug( Logger.debug(
"[#{inspect(__MODULE__)}:#{property}] Starting to import #{report_request.dataset}" "[#{inspect(__MODULE__)}:#{property}] Starting to import #{report_request.dataset}"
@ -96,6 +99,36 @@ defmodule Plausible.Google.GA4.API do
end) end)
end end
defp do_import_analytics(date_range, property, access_token, persist_fn, resume_opts) do
dataset = Keyword.fetch!(resume_opts, :dataset)
offset = Keyword.fetch!(resume_opts, :offset)
GA4.ReportRequest.full_report()
|> Enum.drop_while(&(&1.dataset != dataset))
|> Enum.reduce_while(:ok, fn report_request, :ok ->
Logger.debug(
"[#{inspect(__MODULE__)}:#{property}] Starting to import #{report_request.dataset}"
)
request_offset =
if report_request.dataset == dataset do
offset
else
0
end
report_request =
report_request
|> prepare_request(date_range, property, access_token)
|> Map.put(:offset, request_offset)
case fetch_and_persist(report_request, persist_fn: persist_fn) do
:ok -> {:cont, :ok}
{:error, _} = error -> {:halt, error}
end
end)
end
@spec fetch_and_persist(GA4.ReportRequest.t(), Keyword.t()) :: @spec fetch_and_persist(GA4.ReportRequest.t(), Keyword.t()) ::
:ok | {:error, term()} :ok | {:error, term()}
def fetch_and_persist(%GA4.ReportRequest{} = report_request, opts \\ []) do def fetch_and_persist(%GA4.ReportRequest{} = report_request, opts \\ []) do
@ -124,6 +157,9 @@ defmodule Plausible.Google.GA4.API do
:ok :ok
end end
{:error, {:rate_limit_exceeded, details}} ->
{:error, {:rate_limit_exceeded, details}}
{:error, cause} -> {:error, cause} ->
if attempt >= @max_attempts do if attempt >= @max_attempts do
Logger.debug( Logger.debug(

View File

@ -54,6 +54,16 @@ defmodule Plausible.Google.GA4.HTTP do
{:ok, report} <- convert_to_maps(report) do {:ok, report} <- convert_to_maps(report) do
{:ok, {report, row_count}} {:ok, {report, row_count}}
else else
{:error, %{reason: %{status: 429, body: body}}} ->
Logger.debug(
"[#{inspect(__MODULE__)}:#{report_request.property}] Request failed for #{report_request.dataset} due to exceeding rate limit."
)
Sentry.Context.set_extra_context(%{ga_response: %{body: body, status: 429}})
{:error,
{:rate_limit_exceeded, dataset: report_request.dataset, offset: report_request.offset}}
{:error, %{reason: %{status: status, body: body}}} -> {:error, %{reason: %{status: status, body: body}}} ->
Logger.debug( Logger.debug(
"[#{inspect(__MODULE__)}:#{report_request.property}] Request failed for #{report_request.dataset} with code #{status}: #{inspect(body)}" "[#{inspect(__MODULE__)}:#{report_request.property}] Request failed for #{report_request.dataset} with code #{status}: #{inspect(body)}"
@ -137,6 +147,9 @@ defmodule Plausible.Google.GA4.HTTP do
{:ok, %Finch.Response{body: body, status: 200}} -> {:ok, %Finch.Response{body: body, status: 200}} ->
{:ok, body} {:ok, body}
{:error, %HTTPClient.Non200Error{reason: %{status: 429}}} ->
{:error, :rate_limit_exceeded}
{:error, %HTTPClient.Non200Error{} = error} when error.reason.status in [401, 403] -> {:error, %HTTPClient.Non200Error{} = error} when error.reason.status in [401, 403] ->
{:error, :authentication_failed} {:error, :authentication_failed}
@ -158,6 +171,9 @@ defmodule Plausible.Google.GA4.HTTP do
{:ok, %Finch.Response{body: body, status: 200}} -> {:ok, %Finch.Response{body: body, status: 200}} ->
{:ok, body} {:ok, body}
{:error, %HTTPClient.Non200Error{reason: %{status: 429}}} ->
{:error, :rate_limit_exceeded}
{:error, %HTTPClient.Non200Error{} = error} when error.reason.status in [401, 403] -> {:error, %HTTPClient.Non200Error{} = error} when error.reason.status in [401, 403] ->
{:error, :authentication_failed} {:error, :authentication_failed}
@ -224,6 +240,9 @@ defmodule Plausible.Google.GA4.HTTP do
{:ok, date} {:ok, date}
{:error, %HTTPClient.Non200Error{reason: %{status: 429}}} ->
{:error, :rate_limit_exceeded}
{:error, %HTTPClient.Non200Error{} = error} when error.reason.status in [401, 403] -> {:error, %HTTPClient.Non200Error{} = error} when error.reason.status in [401, 403] ->
{:error, :authentication_failed} {:error, :authentication_failed}

View File

@ -5,6 +5,9 @@ defmodule Plausible.Imported.GoogleAnalytics4 do
use Plausible.Imported.Importer use Plausible.Imported.Importer
alias Plausible.Imported
alias Plausible.Repo
@missing_values ["(none)", "(not set)", "(not provided)", "(other)"] @missing_values ["(none)", "(not set)", "(not provided)", "(other)"]
@impl true @impl true
@ -17,6 +20,31 @@ defmodule Plausible.Imported.GoogleAnalytics4 do
def email_template(), do: "google_analytics_import.html" def email_template(), do: "google_analytics_import.html"
@impl true @impl true
def before_start(site_import, opts) do
site_import = Repo.preload(site_import, :site)
if import_id = Keyword.get(opts, :resume_from_import_id) do
if existing_site_import = Imported.get_import(site_import.site, import_id) do
Repo.delete!(site_import)
{:ok, existing_site_import}
else
# NOTE: shouldn't happen under normal circumsatnces
{:error, {:no_import_to_resume, import_id}}
end
else
{:ok, site_import}
end
end
@impl true
def parse_args(%{"resume_from_dataset" => dataset, "resume_from_offset" => offset} = args) do
args
|> Map.drop(["resume_from_dataset", "resume_from_offset"])
|> parse_args()
|> Keyword.put(:dataset, dataset)
|> Keyword.put(:offset, offset)
end
def parse_args( def parse_args(
%{"property" => property, "start_date" => start_date, "end_date" => end_date} = args %{"property" => property, "start_date" => start_date, "end_date" => end_date} = args
) do ) do
@ -57,8 +85,47 @@ defmodule Plausible.Imported.GoogleAnalytics4 do
Plausible.Imported.Buffer.insert_many(buffer, table, records) Plausible.Imported.Buffer.insert_many(buffer, table, records)
end end
resume_opts = Keyword.take(opts, [:dataset, :offset])
try do try do
Plausible.Google.GA4.API.import_analytics(date_range, property, auth, persist_fn) result =
Plausible.Google.GA4.API.import_analytics(date_range, property, auth,
persist_fn: persist_fn,
resume_opts: resume_opts
)
case result do
{:error, {:rate_limit_exceeded, details}} ->
site_import = Repo.preload(site_import, [:site, :imported_by])
dataset = Keyword.fetch!(details, :dataset)
offset = Keyword.fetch!(details, :offset)
{access_token, refresh_token, token_expires_at} = auth
resume_import_opts = [
property: property,
label: property,
start_date: date_range.first,
end_date: date_range.last,
access_token: access_token,
refresh_token: refresh_token,
token_expires_at: token_expires_at,
resume_from_import_id: site_import.id,
resume_from_dataset: dataset,
resume_from_offset: offset,
job_opts: [schedule_in: {65, :minutes}, unique: nil]
]
new_import(
site_import.site,
site_import.imported_by,
resume_import_opts
)
{:error, :rate_limit_exceeded, skip_purge?: true, skip_mark_failed?: true}
other ->
other
end
after after
Plausible.Imported.Buffer.flush(buffer) Plausible.Imported.Buffer.flush(buffer)
Plausible.Imported.Buffer.stop(buffer) Plausible.Imported.Buffer.stop(buffer)

View File

@ -27,10 +27,15 @@ defmodule Plausible.Imported.Importer do
used for updating site import struct and is passed to `on_success/2` callback. used for updating site import struct and is passed to `on_success/2` callback.
Please note that error tuple should be only returned on errors that can't be Please note that error tuple should be only returned on errors that can't be
recovered from. For transient errors, the import should throw an exception or recovered from. For transient errors, the import should throw an exception or
simply crash. simply crash. The error tuple has an alternative `{error, reason, opts}` form,
* `before_start/1` - Optional callback run right before scheduling import job. It's where `opts` allow to skip purging imported data so far via `skip_purge?` flag
expected to either return `:ok` for the import to proceed or `{:error, ...}` tuple, and skip marking the import as failed and notifying the user via `skip_mark_failed?`
which will be returned from `new_import/3` call. flag. Both flags are booleans.
* `before_start/2` - Optional callback run right before scheduling import job. It's
expected to either return `{:ok, site_import}` for the import to proceed
or `{:error, ...}` tuple, which will be returned from `new_import/3` call.
The `site_import` can be altered or replaced at this stage. The second argument
are opts passed to `new_import/3`.
* `on_success/2` - Optional callback run once site import is completed. Receives map * `on_success/2` - Optional callback run once site import is completed. Receives map
returned from `import_data/2`. Expected to always return `:ok`. returned from `import_data/2`. Expected to always return `:ok`.
* `on_failure/1` - Optional callback run when import job fails permanently. * `on_failure/1` - Optional callback run when import job fails permanently.
@ -99,8 +104,9 @@ defmodule Plausible.Imported.Importer do
@callback label() :: String.t() @callback label() :: String.t()
@callback email_template() :: String.t() @callback email_template() :: String.t()
@callback parse_args(map()) :: Keyword.t() @callback parse_args(map()) :: Keyword.t()
@callback import_data(SiteImport.t(), Keyword.t()) :: :ok | {:error, any()} @callback import_data(SiteImport.t(), Keyword.t()) ::
@callback before_start(SiteImport.t()) :: :ok | {:ok, map()} | {:error, any()} :ok | {:error, any()} | {:error, any(), Keyword.t()}
@callback before_start(SiteImport.t(), Keyword.t()) :: {:ok, SiteImport.t()} | {:error, any()}
@callback on_success(SiteImport.t(), map()) :: :ok @callback on_success(SiteImport.t(), map()) :: :ok
@callback on_failure(SiteImport.t()) :: :ok @callback on_failure(SiteImport.t()) :: :ok
@ -109,13 +115,13 @@ defmodule Plausible.Imported.Importer do
@behaviour Plausible.Imported.Importer @behaviour Plausible.Imported.Importer
@spec new_import(Plausible.Site.t(), Plausible.Auth.User.t(), Keyword.t()) :: @spec new_import(Plausible.Site.t(), Plausible.Auth.User.t(), Keyword.t()) ::
{:ok, Oban.Job.t()} | {:error, Ecto.Changeset.t() | :import_in_progress} {:ok, Oban.Job.t()} | {:error, Ecto.Changeset.t() | :import_in_progress | any()}
def new_import(site, user, opts) do def new_import(site, user, opts) do
Plausible.Imported.Importer.new_import(name(), site, user, opts, &before_start/1) Plausible.Imported.Importer.new_import(name(), site, user, opts, &before_start/2)
end end
@doc false @doc false
@spec run_import(SiteImport.t(), map()) :: :ok | {:error, :any} @spec run_import(SiteImport.t(), map()) :: {:ok, SiteImport.t()} | {:error, :any}
def run_import(site_import, args) do def run_import(site_import, args) do
Plausible.Imported.Importer.run_import( Plausible.Imported.Importer.run_import(
site_import, site_import,
@ -140,7 +146,7 @@ defmodule Plausible.Imported.Importer do
end end
@impl true @impl true
def before_start(_site_import), do: :ok def before_start(site_import, _opts), do: {:ok, site_import}
@impl true @impl true
def on_success(_site_import, _extra_data), do: :ok def on_success(_site_import, _extra_data), do: :ok
@ -148,7 +154,7 @@ defmodule Plausible.Imported.Importer do
@impl true @impl true
def on_failure(_site_import), do: :ok def on_failure(_site_import), do: :ok
defoverridable before_start: 1, on_success: 2, on_failure: 1 defoverridable before_start: 2, on_success: 2, on_failure: 1
end end
end end
@ -167,7 +173,7 @@ defmodule Plausible.Imported.Importer do
|> Repo.insert() |> Repo.insert()
with {:ok, site_import} <- result, with {:ok, site_import} <- result,
:ok <- before_start_fun.(site_import), {:ok, site_import} <- before_start_fun.(site_import, opts),
{:ok, job} <- schedule_job(site_import, opts) do {:ok, job} <- schedule_job(site_import, opts) do
job job
else else
@ -195,7 +201,10 @@ defmodule Plausible.Imported.Importer do
{:ok, mark_complete(site_import, extra_data, on_success_fun)} {:ok, mark_complete(site_import, extra_data, on_success_fun)}
{:error, error} -> {:error, error} ->
{:error, error} {:error, error, []}
{:error, error, opts} ->
{:error, error, opts}
end end
end end
@ -222,6 +231,7 @@ defmodule Plausible.Imported.Importer do
defp schedule_job(site_import, opts) do defp schedule_job(site_import, opts) do
{listen?, opts} = Keyword.pop(opts, :listen?, false) {listen?, opts} = Keyword.pop(opts, :listen?, false)
{job_opts, opts} = Keyword.pop(opts, :job_opts, [])
if listen? do if listen? do
:ok = listen() :ok = listen()
@ -231,7 +241,7 @@ defmodule Plausible.Imported.Importer do
opts opts
|> Keyword.put(:import_id, site_import.id) |> Keyword.put(:import_id, site_import.id)
|> Map.new() |> Map.new()
|> Plausible.Workers.ImportAnalytics.new() |> Plausible.Workers.ImportAnalytics.new(job_opts)
|> Oban.insert() |> Oban.insert()
else else
{:error, :import_in_progress} {:error, :import_in_progress}

View File

@ -24,10 +24,10 @@ defmodule Plausible.Imported.NoopImporter do
def import_data(_site_import, _opts), do: :ok def import_data(_site_import, _opts), do: :ok
@impl true @impl true
def before_start(site_import) do def before_start(site_import, _opts) do
send(self(), {:before_start, site_import.id}) send(self(), {:before_start, site_import.id})
:ok {:ok, site_import}
end end
@impl true @impl true

View File

@ -217,7 +217,7 @@ defmodule PlausibleWeb.Components.Billing.PlanBox do
class="h-0 text-center text-sm text-red-700 dark:text-red-500 disabled-message" class="h-0 text-center text-sm text-red-700 dark:text-red-500 disabled-message"
> >
<%= if @exceeded_plan_limits != [] do %> <%= if @exceeded_plan_limits != [] do %>
<PlausibleWeb.Components.Generic.tooltip> <PlausibleWeb.Components.Generic.tooltip class="text-sm text-red-700 dark:text-red-500 mt-1 justify-center">
<%= @disabled_message %> <%= @disabled_message %>
<:tooltip_content> <:tooltip_content>
Your usage exceeds the following limit(s):<br /><br /> Your usage exceeds the following limit(s):<br /><br />

View File

@ -318,25 +318,27 @@ defmodule PlausibleWeb.Components.Generic do
""" """
end end
attr :wrapper_class, :any, default: ""
attr :class, :any, default: ""
slot :inner_block, required: true slot :inner_block, required: true
slot :tooltip_content, required: true slot :tooltip_content, required: true
def tooltip(assigns) do def tooltip(assigns) do
~H""" ~H"""
<div x-data="{sticky: false, hovered: false}" class="tooltip-wrapper relative"> <div x-data="{sticky: false, hovered: false}" class={["tooltip-wrapper relative", @wrapper_class]}>
<p <p
x-on:click="sticky = true; hovered = true" x-on:click="sticky = true; hovered = true"
x-on:click.outside="sticky = false; hovered = false" x-on:click.outside="sticky = false; hovered = false"
x-on:mouseover="hovered = true" x-on:mouseover="hovered = true"
x-on:mouseout="hovered = false" x-on:mouseout="hovered = false"
class="cursor-pointer text-sm text-red-700 dark:text-red-500 mt-1 flex justify-center align-items-center" class={["cursor-pointer flex align-items-center", @class]}
> >
<%= render_slot(@inner_block) %> <%= render_slot(@inner_block) %>
<Heroicons.information_circle class="w-5 h-5 ml-2" /> <Heroicons.information_circle class="w-5 h-5 ml-2" />
</p> </p>
<span <span
x-show="hovered || sticky" x-show="hovered || sticky"
class="bg-gray-900 pointer-events-none absolute bottom-10 margin-x-auto left-10 right-10 transition-opacity p-4 rounded text-white" class="bg-gray-900 pointer-events-none absolute bottom-10 margin-x-auto left-10 right-10 transition-opacity p-4 rounded text-sm text-white"
> >
<%= render_slot(List.first(@tooltip_content)) %> <%= render_slot(List.first(@tooltip_content)) %>
</span> </span>

View File

@ -79,6 +79,14 @@ defmodule PlausibleWeb.GoogleAnalyticsController do
layout: {PlausibleWeb.LayoutView, "focus.html"} layout: {PlausibleWeb.LayoutView, "focus.html"}
) )
{:error, :rate_limit_exceeded} ->
conn
|> put_flash(
:error,
"Google Analytics rate limit has been exceeded. Please try again later."
)
|> redirect(external: redirect_route)
{:error, :authentication_failed} -> {:error, :authentication_failed} ->
conn conn
|> put_flash( |> put_flash(
@ -153,6 +161,14 @@ defmodule PlausibleWeb.GoogleAnalyticsController do
property_or_view_form(conn, params) property_or_view_form(conn, params)
{:error, :rate_limit_exceeded} ->
conn
|> put_flash(
:error,
"Google Analytics rate limit has been exceeded. Please try again later."
)
|> redirect(external: redirect_route)
{:error, :authentication_failed} -> {:error, :authentication_failed} ->
conn conn
|> put_flash( |> put_flash(
@ -211,6 +227,14 @@ defmodule PlausibleWeb.GoogleAnalyticsController do
layout: {PlausibleWeb.LayoutView, "focus.html"} layout: {PlausibleWeb.LayoutView, "focus.html"}
) )
{:error, :rate_limit_exceeded} ->
conn
|> put_flash(
:error,
"Google Analytics rate limit has been exceeded. Please try again later."
)
|> redirect(external: redirect_route)
{:error, :authentication_failed} -> {:error, :authentication_failed} ->
conn conn
|> put_flash( |> put_flash(

View File

@ -27,7 +27,9 @@ defmodule PlausibleWeb.Live.ImportsExportsSettings do
|> assign_new(:site_imports, fn %{site: site} -> |> assign_new(:site_imports, fn %{site: site} ->
site site
|> Imported.list_all_imports() |> Imported.list_all_imports()
|> Enum.map(&%{site_import: &1, live_status: &1.status}) |> Enum.map(
&%{site_import: &1, live_status: &1.status, tooltip: notice_label(&1, &1.status)}
)
end) end)
|> assign_new(:pageview_counts, fn %{site: site} -> |> assign_new(:pageview_counts, fn %{site: site} ->
Plausible.Stats.Clickhouse.imported_pageview_counts(site) Plausible.Stats.Clickhouse.imported_pageview_counts(site)
@ -120,34 +122,42 @@ defmodule PlausibleWeb.Live.ImportsExportsSettings do
<ul :if={not Enum.empty?(@site_imports)}> <ul :if={not Enum.empty?(@site_imports)}>
<li :for={entry <- @site_imports} class="py-4 flex items-center justify-between space-x-4"> <li :for={entry <- @site_imports} class="py-4 flex items-center justify-between space-x-4">
<div class="flex flex-col"> <div class="flex flex-col">
<p class="text-sm leading-5 font-medium text-gray-900 dark:text-gray-100"> <div class="flex items-center text-sm leading-5 font-medium text-gray-900 dark:text-gray-100">
<Heroicons.clock <Heroicons.clock
:if={entry.live_status == SiteImport.pending()} :if={entry.live_status == SiteImport.pending()}
class="inline-block h-6 w-5 text-indigo-600 dark:text-green-600" class="block h-6 w-5 text-indigo-600 dark:text-green-600"
/> />
<.spinner <.spinner
:if={entry.live_status == SiteImport.importing()} :if={entry.live_status == SiteImport.importing()}
class="inline-block h-6 w-5 text-indigo-600 dark:text-green-600" class="block h-6 w-5 text-indigo-600 dark:text-green-600"
/> />
<Heroicons.check <Heroicons.check
:if={entry.live_status == SiteImport.completed()} :if={entry.live_status == SiteImport.completed()}
class="inline-block h-6 w-5 text-indigo-600 dark:text-green-600" class="block h-6 w-5 text-indigo-600 dark:text-green-600"
/> />
<Heroicons.exclamation_triangle <Heroicons.exclamation_triangle
:if={entry.live_status == SiteImport.failed()} :if={entry.live_status == SiteImport.failed()}
class="inline-block h-6 w-5 text-red-700 dark:text-red-700" class="block h-6 w-5 text-red-700 dark:text-red-700"
/> />
<span :if={entry.live_status == SiteImport.failed()}> <div :if={entry.live_status == SiteImport.failed()}>
Import failed - Import failed -
</span> </div>
<%= Plausible.Imported.SiteImport.label(entry.site_import) %> <.tooltip :if={entry.tooltip} wrapper_class="ml-2 grow" class="justify-left">
<span :if={entry.live_status == SiteImport.completed()} class="text-xs font-normal"> <%= Plausible.Imported.SiteImport.label(entry.site_import) %>
<:tooltip_content>
<.notice_message message_label={entry.tooltip} />
</:tooltip_content>
</.tooltip>
<div :if={!entry.tooltip} class="ml-2">
<%= Plausible.Imported.SiteImport.label(entry.site_import) %>
</div>
<div :if={entry.live_status == SiteImport.completed()} class="text-xs font-normal ml-1">
(<%= PlausibleWeb.StatsView.large_number_format( (<%= PlausibleWeb.StatsView.large_number_format(
pageview_count(entry.site_import, @pageview_counts) pageview_count(entry.site_import, @pageview_counts)
) %> page views) ) %> page views)
</span> </div>
</p> </div>
<p class="text-sm leading-5 text-gray-500 dark:text-gray-200"> <div class="text-sm leading-5 text-gray-500 dark:text-gray-200">
From <%= format_date(entry.site_import.start_date) %> to <%= format_date( From <%= format_date(entry.site_import.start_date) %> to <%= format_date(
entry.site_import.end_date entry.site_import.end_date
) %> ) %>
@ -157,7 +167,7 @@ defmodule PlausibleWeb.Live.ImportsExportsSettings do
(started (started
<% end %> <% end %>
on <%= format_date(entry.site_import.inserted_at) %>) on <%= format_date(entry.site_import.inserted_at) %>)
</p> </div>
</div> </div>
<.button <.button
data-to={"/#{URI.encode_www_form(@site.domain)}/settings/forget-import/#{entry.site_import.id}"} data-to={"/#{URI.encode_www_form(@site.domain)}/settings/forget-import/#{entry.site_import.id}"}
@ -216,10 +226,30 @@ defmodule PlausibleWeb.Live.ImportsExportsSettings do
"transient_fail" -> SiteImport.importing() "transient_fail" -> SiteImport.importing()
end end
{%{entry | live_status: new_status}, true} {%{entry | live_status: new_status, tooltip: notice_label(entry.site_import, new_status)},
true}
entry, changed? -> entry, changed? ->
{entry, changed?} {entry, changed?}
end) end)
end end
defp notice_label(site_import, status) do
now = NaiveDateTime.utc_now()
seconds_since_update = NaiveDateTime.diff(now, site_import.updated_at, :second)
in_progress? = status in [SiteImport.pending(), SiteImport.importing()]
if in_progress? and seconds_since_update >= 300 do
:slow_import
end
end
defp notice_message(%{message_label: :slow_import} = assigns) do
~H"""
The import process might be taking longer due to the amount of data
and rate limiting enforced by Google Analytics.
"""
end
defp notice_message(_), do: nil
end end

View File

@ -32,7 +32,7 @@ defmodule Plausible.Workers.ImportAnalytics do
:ok :ok
{:error, error} -> {:error, error, error_opts} ->
Sentry.capture_message("Failed to import from #{site_import.source}", Sentry.capture_message("Failed to import from #{site_import.source}",
extra: %{ extra: %{
import_id: site_import.id, import_id: site_import.id,
@ -41,7 +41,7 @@ defmodule Plausible.Workers.ImportAnalytics do
} }
) )
import_fail(site_import) import_fail(site_import, error_opts)
{:discard, error} {:discard, error}
end end
@ -72,19 +72,26 @@ defmodule Plausible.Workers.ImportAnalytics do
Importer.notify(site_import, :transient_fail) Importer.notify(site_import, :transient_fail)
end end
def import_fail(site_import) do def import_fail(site_import, opts) do
Plausible.Purge.delete_imported_stats!(site_import) skip_purge? = Keyword.get(opts, :skip_purge?, false)
skip_mark_failed? = Keyword.get(opts, :skip_mark_failed?, false)
import_api = ImportSources.by_name(site_import.source) if not skip_purge? do
Plausible.Purge.delete_imported_stats!(site_import)
end
site_import = if not skip_mark_failed? do
site_import import_api = ImportSources.by_name(site_import.source)
|> import_api.mark_failed()
|> Repo.preload([:site, :imported_by])
Importer.notify(site_import, :fail) site_import =
site_import
|> import_api.mark_failed()
|> Repo.preload([:site, :imported_by])
PlausibleWeb.Email.import_failure(site_import, site_import.imported_by) Importer.notify(site_import, :fail)
|> Plausible.Mailer.send()
PlausibleWeb.Email.import_failure(site_import, site_import.imported_by)
|> Plausible.Mailer.send()
end
end end
end end

View File

@ -1,5 +1,6 @@
defmodule Plausible.Imported.GoogleAnalytics4Test do defmodule Plausible.Imported.GoogleAnalytics4Test do
use PlausibleWeb.ConnCase, async: true use PlausibleWeb.ConnCase, async: true
use Oban.Testing, repo: Plausible.Repo
import Mox import Mox
import Ecto.Query, only: [from: 2] import Ecto.Query, only: [from: 2]
@ -134,6 +135,106 @@ defmodule Plausible.Imported.GoogleAnalytics4Test do
assert_os_versions(conn, breakdown_params) assert_os_versions(conn, breakdown_params)
assert_active_visitors(site_import) assert_active_visitors(site_import)
end end
test "handles rate limiting gracefully", %{user: user, site: site} do
future = DateTime.add(DateTime.utc_now(), 3600, :second)
{:ok, job} =
Plausible.Imported.GoogleAnalytics4.new_import(
site,
user,
label: "properties/123456",
property: "properties/123456",
start_date: ~D[2024-01-01],
end_date: ~D[2024-01-31],
access_token: "redacted_access_token",
refresh_token: "redacted_refresh_token",
token_expires_at: DateTime.to_iso8601(future)
)
site_import = Plausible.Imported.get_import(site, job.args.import_id)
opts = job |> Repo.reload!() |> Map.get(:args) |> GoogleAnalytics4.parse_args()
opts = Keyword.put(opts, :flush_interval_ms, 10)
expect(Plausible.HTTPClient.Mock, :post, fn _url, headers, _body, _opts ->
assert [{"Authorization", "Bearer redacted_access_token"}] == headers
{:ok, %Finch.Response{status: 200, body: List.first(@full_report_mock)}}
end)
expect(Plausible.HTTPClient.Mock, :post, fn _url, headers, _body, _opts ->
assert [{"Authorization", "Bearer redacted_access_token"}] == headers
{:error,
Plausible.HTTPClient.Non200Error.new(%Finch.Response{
status: 429,
body: "Rate limit exceeded"
})}
end)
assert {:error, :rate_limit_exceeded, skip_purge?: true, skip_mark_failed?: true} =
GoogleAnalytics4.import_data(site_import, opts)
in_65_minutes = DateTime.add(DateTime.utc_now(), 3900, :second)
assert_enqueued(
worker: Plausible.Workers.ImportAnalytics,
args: %{resume_from_import_id: site_import.id},
scheduled_at: {in_65_minutes, delta: 10}
)
[%{args: resume_args}, _] = all_enqueued()
resume_opts = GoogleAnalytics4.parse_args(resume_args)
resume_opts = Keyword.put(resume_opts, :flush_interval_ms, 10)
site_import = Repo.reload!(site_import)
Enum.each(Plausible.Imported.tables(), fn table ->
count =
case table do
"imported_visitors" -> 31
"imported_sources" -> 0
"imported_pages" -> 0
"imported_entry_pages" -> 0
"imported_exit_pages" -> 0
"imported_locations" -> 0
"imported_devices" -> 0
"imported_browsers" -> 0
"imported_operating_systems" -> 0
end
query = from(imported in table, where: imported.site_id == ^site.id)
assert await_clickhouse_count(query, count)
end)
for report <- tl(@full_report_mock) do
expect(Plausible.HTTPClient.Mock, :post, fn _url, headers, _body, _opts ->
assert [{"Authorization", "Bearer redacted_access_token"}] == headers
{:ok, %Finch.Response{status: 200, body: report}}
end)
end
assert :ok = GoogleAnalytics4.import_data(site_import, resume_opts)
Enum.each(Plausible.Imported.tables(), fn table ->
count =
case table do
"imported_sources" -> 210
"imported_visitors" -> 31
"imported_pages" -> 3340
"imported_entry_pages" -> 2934
"imported_exit_pages" -> 0
"imported_locations" -> 2291
"imported_devices" -> 93
"imported_browsers" -> 233
"imported_operating_systems" -> 1068
end
query = from(imported in table, where: imported.site_id == ^site.id)
assert await_clickhouse_count(query, count)
end)
end
end end
defp assert_active_visitors(site_import) do defp assert_active_visitors(site_import) do

View File

@ -79,6 +79,37 @@ defmodule PlausibleWeb.GoogleAnalyticsControllerTest do
assert response =~ "GA4 - Google Merch Shop (properties/213025502)" assert response =~ "GA4 - Google Merch Shop (properties/213025502)"
end end
test "redirects to imports and exports on rate limit error with flash error", %{
conn: conn,
site: site
} do
expect(
Plausible.HTTPClient.Mock,
:get,
fn _url, _opts ->
{:error, %HTTPClient.Non200Error{reason: %{status: 429, body: "rate limit exceeded"}}}
end
)
conn =
conn
|> get("/#{site.domain}/import/google-analytics/property-or-view", %{
"access_token" => "token",
"refresh_token" => "foo",
"expires_at" => "2022-09-22T20:01:37.112777"
})
assert redirected_to(conn, 302) ==
PlausibleWeb.Router.Helpers.site_path(
conn,
:settings_imports_exports,
site.domain
)
assert Phoenix.Flash.get(conn.assigns.flash, :error) =~
"Google Analytics rate limit has been exceeded. Please try again later."
end
test "redirects to imports and exports on auth error with flash error", %{ test "redirects to imports and exports on auth error with flash error", %{
conn: conn, conn: conn,
site: site site: site
@ -416,6 +447,39 @@ defmodule PlausibleWeb.GoogleAnalyticsControllerTest do
"We were unable to retrieve information from Google Analytics" "We were unable to retrieve information from Google Analytics"
end end
test "redirects to imports and exports on rate limiting with flash error",
%{
conn: conn,
site: site
} do
expect(
Plausible.HTTPClient.Mock,
:post,
fn _url, _opts, _params ->
{:error, %HTTPClient.Non200Error{reason: %{status: 429, body: "rate limit exceeded"}}}
end
)
conn =
conn
|> post("/#{site.domain}/import/google-analytics/property-or-view", %{
"property_or_view" => "properties/428685906",
"access_token" => "token",
"refresh_token" => "foo",
"expires_at" => "2022-09-22T20:01:37.112777"
})
assert redirected_to(conn, 302) ==
PlausibleWeb.Router.Helpers.site_path(
conn,
:settings_imports_exports,
site.domain
)
assert Phoenix.Flash.get(conn.assigns.flash, :error) =~
"Google Analytics rate limit has been exceeded. Please try again later."
end
test "redirects to imports and exports on expired authentication with flash error", test "redirects to imports and exports on expired authentication with flash error",
%{ %{
conn: conn, conn: conn,
@ -600,6 +664,41 @@ defmodule PlausibleWeb.GoogleAnalyticsControllerTest do
"We were unable to retrieve information from Google Analytics" "We were unable to retrieve information from Google Analytics"
end end
test "redirects to imports and exports on rate limiting with flash error",
%{
conn: conn,
site: site
} do
expect(
Plausible.HTTPClient.Mock,
:get,
fn _url, _params ->
{:error, %HTTPClient.Non200Error{reason: %{status: 429, body: "rate limit exceeded"}}}
end
)
conn =
conn
|> get("/#{site.domain}/import/google-analytics/confirm", %{
"property_or_view" => "properties/428685906",
"access_token" => "token",
"refresh_token" => "foo",
"expires_at" => "2022-09-22T20:01:37.112777",
"start_date" => "2024-02-22",
"end_date" => "2024-02-26"
})
assert redirected_to(conn, 302) ==
PlausibleWeb.Router.Helpers.site_path(
conn,
:settings_imports_exports,
site.domain
)
assert Phoenix.Flash.get(conn.assigns.flash, :error) =~
"Google Analytics rate limit has been exceeded. Please try again later."
end
test "redirects to imports and exports on expired authentication with flash error", test "redirects to imports and exports on expired authentication with flash error",
%{ %{
conn: conn, conn: conn,

View File

@ -722,6 +722,28 @@ defmodule PlausibleWeb.SiteControllerTest do
conn = get(conn, "/#{site.domain}/settings/imports-exports") conn = get(conn, "/#{site.domain}/settings/imports-exports")
refute html_response(conn, 200) =~ "No new imports can be started" refute html_response(conn, 200) =~ "No new imports can be started"
end end
test "displays notice when import in progress is running for over 5 minutes", %{
conn: conn,
site: site
} do
six_minutes_ago = NaiveDateTime.add(NaiveDateTime.utc_now(), -360)
_site_import1 = insert(:site_import, site: site, status: SiteImport.completed())
_site_import2 =
insert(:site_import,
site: site,
status: SiteImport.importing(),
updated_at: six_minutes_ago
)
conn = get(conn, "/#{site.domain}/settings/imports-exports")
response = html_response(conn, 200)
assert response =~ "No new imports can be started"
assert response =~ "The import process might be taking longer due to the amount of data"
assert response =~ "and rate limiting enforced by Google Analytics"
end
end end
describe "GET /:website/settings/integrations for self-hosting" do describe "GET /:website/settings/integrations for self-hosting" do