View Source Plausible.Imported.Importer behaviour (Plausible v0.0.1)
Behaviour that should be implemented for each import source.
All imports are executed as background jobs run via Plausible.Workers.ImportAnalytics
Oban worker. Each import source must define a module conforming Importer
behaviour.
The callbacks that need to be implemented:
name/0
- Returns import source name as an atom. Example::universal_analytics
.label/0
- Descriptive, display friendly name of the source. Example: "Google Analytics".email_template/0
- Name of the email template to use for notifications inPlausibleWeb.Email
(import_success
andimport_failure
). The template should have content customized for a particular source.parse_args/1
- Receives Oban job arguments coming fromnew_import/3
. Whatever options were passed tonew_import/3
will be present in the input map with string keys and values serialized to primitives. If, for instancestart_date: ~D[2024-01-03]
is passed as an option,parse_args/1
receives%{..., "start_date" => "2024-01-03"}
. The expectation is parsing the map values producing a keyword list of options to pass toimport_data/2
.import_data/2
- Receives site import struct and options produced byparse_args/1
. This is where all the import processing is done. The way the import is implemented is entirely arbitrary except the requirement that the process as a whole must by synchronous. The callback is expected to return either:ok
or{:ok, %{...}}
on successful import or{:error, ...}
on failure. The map in success tuple is used for updating site import struct and is passed toon_success/2
callback. Please note that error tuple should be only returned on errors that can't be recovered from. For transient errors, the import should throw an exception or simply crash. The error tuple has an alternative{error, reason, opts}
form, whereopts
allow to skip purging imported data so far viaskip_purge?
flag and skip marking the import as failed and notifying the user viaskip_mark_failed?
flag. Both flags are booleans.before_start/2
- Optional callback run right before scheduling import job. It's expected to either return{:ok, site_import}
for the import to proceed or{:error, ...}
tuple, which will be returned fromnew_import/3
call. Thesite_import
can be altered or replaced at this stage. The second argument are opts passed tonew_import/3
.on_success/2
- Optional callback run once site import is completed. Receives map returned fromimport_data/2
. Expected to always return:ok
.on_failure/1
- Optional callback run when import job fails permanently.
All sources must be added to the list in Plausible.Imported.ImportSources
.
In order to schedule a new import job using a given source, respective importer's
new_import/3
function must be called. It accepts site, user who is doing the import
and any options necessary to carry out the import.
There's an expectation that start_date
and end_date
are provided either as options
passed to new_import/3
or data in map returned from import_data/2
. If these parameters
are not provided, the import will eventually crash. These parameters define time range
of imported data which is in turn used for efficient querying.
Logic running inside import_data/2
is expected to populated all imported_*
tables
in ClickHouse with import_id
column set to site import's ID.
Managing any configuration or authentication prior to running import is outside of scope of importer logic and is expected to be implemented separately.
Running import fully synchronously
In case it's necessary to run the whole import job fully synchronously, the
Plausible.Workers.ImportAnalytics
worker sends an Oban.Notifier
message
on completion, failure or transient failure of the import.
A basic usage scenario looks like this:
{:ok, job} = Plausible.Imported.NoopImporter.new_import(
site,
user,
start_date: ~D[2005-01-01],
end_date: Date.utc_today(),
# this option is necessary to setup the calling process as listener
listen?: true
)
import_id = job.args[:import_id]
receive do
{:notification, :analytics_imports_jobs, %{"event" => "complete", "import_id" => ^import_id}} ->
IO.puts("Job completed")
{:notification, :analytics_imports_jobs, %{"event" => "transient_fail", "import_id" => ^import_id}} ->
IO.puts("Job failed transiently")
{:notification, :analytics_imports_jobs, %{"event" => "fail", "import_id" => ^import_id}} ->
IO.puts("Job failed permanently")
after
15_000 ->
IO.puts("Job didn't finish in 15 seconds")
end
In a more realistic scenario, job scheduling will be done inside a GenServer process
like LiveView, where notifications can be listened for via handle_info/2
.
Summary
Callbacks
@callback before_start(Plausible.Imported.SiteImport.t(), Keyword.t()) :: {:ok, Plausible.Imported.SiteImport.t()} | {:error, any()}
@callback email_template() :: String.t()
@callback import_data(Plausible.Imported.SiteImport.t(), Keyword.t()) :: :ok | {:error, any()} | {:error, any(), Keyword.t()}
@callback label() :: String.t()
@callback name() :: atom()
@callback on_failure(Plausible.Imported.SiteImport.t()) :: :ok
@callback on_success(Plausible.Imported.SiteImport.t(), map()) :: :ok
Functions
@spec listen() :: :ok
Allows to explicitly start listening for importer job notifications.
Listener must explicitly filter out a subset of imports that apply to the given context.