View Source Plausible.Imported.Importer behaviour (Plausible v0.0.1)

Behaviour that should be implemented for each import source.

All imports are executed as background jobs run via Plausible.Workers.ImportAnalytics Oban worker. Each import source must define a module conforming Importer behaviour.

The callbacks that need to be implemented:

  • name/0 - Returns import source name as an atom. Example: :universal_analytics.
  • label/0 - Descriptive, display friendly name of the source. Example: "Google Analytics".
  • email_template/0 - Name of the email template to use for notifications in PlausibleWeb.Email (import_success and import_failure). The template should have content customized for a particular source.
  • parse_args/1 - Receives Oban job arguments coming from new_import/3. Whatever options were passed to new_import/3 will be present in the input map with string keys and values serialized to primitives. If, for instance start_date: ~D[2024-01-03] is passed as an option, parse_args/1 receives %{..., "start_date" => "2024-01-03"}. The expectation is parsing the map values producing a keyword list of options to pass to import_data/2.
  • import_data/2 - Receives site import struct and options produced by parse_args/1. This is where all the import processing is done. The way the import is implemented is entirely arbitrary except the requirement that the process as a whole must by synchronous. The callback is expected to return either :ok or {:ok, %{...}} on successful import or {:error, ...} on failure. The map in success tuple is used for updating site import struct and is passed to on_success/2 callback. Please note that error tuple should be only returned on errors that can't be recovered from. For transient errors, the import should throw an exception or simply crash.
  • before_start/1 - Optional callback run right before scheduling import job. It's expected to either return :ok for the import to proceed or {:error, ...} tuple, which will be returned from new_import/3 call.
  • on_success/2 - Optional callback run once site import is completed. Receives map returned from import_data/2. Expected to always return :ok.
  • on_failure/1 - Optional callback run when import job fails permanently.

All sources must be added to the list in Plausible.Imported.ImportSources.

In order to schedule a new import job using a given source, respective importer's new_import/3 function must be called. It accepts site, user who is doing the import and any options necessary to carry out the import.

There's an expectation that start_date and end_date are provided either as options passed to new_import/3 or data in map returned from import_data/2. If these parameters are not provided, the import will eventually crash. These parameters define time range of imported data which is in turn used for efficient querying.

Logic running inside import_data/2 is expected to populated all imported_* tables in ClickHouse with import_id column set to site import's ID.

Managing any configuration or authentication prior to running import is outside of scope of importer logic and is expected to be implemented separately.

Running import fully synchronously

In case it's necessary to run the whole import job fully synchronously, the Plausible.Workers.ImportAnalytics worker sends an Oban.Notifier message on completion, failure or transient failure of the import.

A basic usage scenario looks like this:

{:ok, job} = Plausible.Imported.NoopImporter.new_import(
  site,
  user,
  start_date: ~D[2005-01-01],
  end_date: Date.utc_today(),
  # this option is necessary to setup the calling process as listener
  listen?: true
)

import_id = job.args[:import_id]

receive do
  {:notification, :analytics_imports_jobs, %{"event" => "complete", "import_id" => ^import_id}} ->
    IO.puts("Job completed")

  {:notification, :analytics_imports_jobs, %{"event" => "transient_fail", "import_id" => ^import_id}} ->
    IO.puts("Job failed transiently")

  {:notification, :analytics_imports_jobs, %{"event" => "fail", "import_id" => ^import_id}} ->
    IO.puts("Job failed permanently")
after
  15_000 ->
    IO.puts("Job didn't finish in 15 seconds")
end

In a more realistic scenario, job scheduling will be done inside a GenServer process like LiveView, where notifications can be listened for via handle_info/2.

Summary

Functions

Allows to explicitly start listening for importer job notifications.

Callbacks

@callback before_start(Plausible.Imported.SiteImport.t()) ::
  :ok | {:ok, map()} | {:error, any()}
@callback email_template() :: String.t()
@callback import_data(Plausible.Imported.SiteImport.t(), Keyword.t()) ::
  :ok | {:error, any()}
@callback label() :: String.t()
@callback name() :: atom()
@callback on_failure(Plausible.Imported.SiteImport.t()) :: :ok
@callback on_success(Plausible.Imported.SiteImport.t(), map()) :: :ok
@callback parse_args(map()) :: Keyword.t()

Functions

@spec listen() :: :ok

Allows to explicitly start listening for importer job notifications.

Listener must explicitly filter out a subset of imports that apply to the given context.