mirror of
https://github.com/plausible/analytics.git
synced 2024-12-25 10:33:01 +03:00
c1c03b729c
* Reapply "Local CSV exports/imports and S3/UI updates (#3989)" (#3995)
This reverts commit aee69e44c8
.
* remove unused functions
* eh, that one was actually used
* ugh, they were both used
---------
Co-authored-by: ruslandoga <67764432+ruslandoga@users.noreply.github.com>
122 lines
3.2 KiB
Elixir
122 lines
3.2 KiB
Elixir
defmodule Plausible.Workers.ExportAnalytics do
|
|
@moduledoc """
|
|
Worker for running CSV export jobs. Supports S3 and local storage.
|
|
To avoid blocking the queue, a timeout of 15 minutes is enforced.
|
|
"""
|
|
|
|
use Oban.Worker,
|
|
queue: :analytics_exports,
|
|
max_attempts: 3
|
|
|
|
alias Plausible.Exports
|
|
|
|
@doc "This base query filters export jobs for a site"
|
|
def base_query(site_id) do
|
|
import Ecto.Query, only: [from: 2]
|
|
|
|
from j in Oban.Job,
|
|
where: j.worker == ^Oban.Worker.to_string(__MODULE__),
|
|
where: j.args["site_id"] == ^site_id
|
|
end
|
|
|
|
@impl true
|
|
def timeout(_job), do: :timer.minutes(15)
|
|
|
|
@impl true
|
|
def perform(%Oban.Job{args: args} = job) do
|
|
%{
|
|
"storage" => storage,
|
|
"site_id" => site_id
|
|
} = args
|
|
|
|
site = Plausible.Repo.get!(Plausible.Site, site_id)
|
|
%Date.Range{} = date_range = Exports.date_range(site.id, site.timezone)
|
|
|
|
queries =
|
|
Exports.export_queries(site_id,
|
|
date_range: date_range,
|
|
timezone: site.timezone,
|
|
extname: ".csv"
|
|
)
|
|
|
|
# since each worker / `perform` attempt runs in a separate process
|
|
# it's ok to use start_link to keep connection lifecycle
|
|
# bound to that of the worker
|
|
{:ok, ch} =
|
|
Plausible.ClickhouseRepo.config()
|
|
|> Keyword.replace!(:pool_size, 1)
|
|
|> Ch.start_link()
|
|
|
|
try do
|
|
case storage do
|
|
"s3" -> perform_s3_export(ch, site, queries, args)
|
|
"local" -> perform_local_export(ch, queries, args)
|
|
end
|
|
after
|
|
Exports.oban_notify(site_id)
|
|
end
|
|
|
|
email_success(job.args)
|
|
|
|
:ok
|
|
catch
|
|
class, reason ->
|
|
if job.attempt >= job.max_attempts, do: email_failure(job.args)
|
|
:erlang.raise(class, reason, __STACKTRACE__)
|
|
end
|
|
|
|
defp perform_s3_export(ch, site, queries, args) do
|
|
%{
|
|
"s3_bucket" => s3_bucket,
|
|
"s3_path" => s3_path
|
|
} = args
|
|
|
|
created_on = Plausible.Timezones.to_date_in_timezone(DateTime.utc_now(), site.timezone)
|
|
filename = Exports.archive_filename(site.domain, created_on)
|
|
|
|
DBConnection.run(
|
|
ch,
|
|
fn conn ->
|
|
conn
|
|
|> Exports.stream_archive(queries, format: "CSVWithNames")
|
|
|> Plausible.S3.export_upload_multipart(s3_bucket, s3_path, filename)
|
|
end,
|
|
timeout: :infinity
|
|
)
|
|
end
|
|
|
|
defp perform_local_export(ch, queries, args) do
|
|
%{"local_path" => local_path} = args
|
|
tmp_path = Plug.Upload.random_file!("tmp-plausible-export")
|
|
|
|
DBConnection.run(
|
|
ch,
|
|
fn conn ->
|
|
Exports.stream_archive(conn, queries, format: "CSVWithNames")
|
|
|> Stream.into(File.stream!(tmp_path))
|
|
|> Stream.run()
|
|
end,
|
|
timeout: :infinity
|
|
)
|
|
|
|
File.mkdir_p!(Path.dirname(local_path))
|
|
if File.exists?(local_path), do: File.rm!(local_path)
|
|
File.rename!(tmp_path, local_path)
|
|
end
|
|
|
|
defp email_failure(args) do
|
|
args |> Map.put("status", "failure") |> email()
|
|
end
|
|
|
|
defp email_success(args) do
|
|
args |> Map.put("status", "success") |> email()
|
|
end
|
|
|
|
defp email(args) do
|
|
# email delivery can potentially fail and cause already successful
|
|
# export to be repeated which is costly, hence email is delivered
|
|
# in a separate job
|
|
Oban.insert!(Plausible.Workers.NotifyExportedAnalytics.new(args))
|
|
end
|
|
end
|