Local CSV exports/imports and S3/UI updates (#3989)

* local CSV exports/imports and S3 updates

* credo

* dialyzer

* refactor input columns

* fix ci minio/clickhouse tests

* Update lib/plausible_web/live/csv_export.ex

Co-authored-by: Adrian Gruntkowski <adrian.gruntkowski@gmail.com>

* fix date range filter in export_pages_q and process only pageviews

* remove toTimeZone(zero_timestamp) note

* use SiteImport.pending(), SiteImport.importing()

* escape [SiteImport.pending(), SiteImport.importing()]

* use random s3 keys for imports to avoid collisions (sometimes makes the upload get stuck)

* clamp import date ranges

* site is already in assigns

* recompute cutoff date each time

* use toDate(timestamp[, timezone]) shortcut

* show alreats on export cancel/delete and extract hint into a component

* switch to Imported.clamp_dates/4

* reprocess tables when imports are added

* recompute cutoff_date on each call

* actually use clamped_date_range on submit

* add warning message

* add expiry rules to buckets in make minio

* add site_id to imports notifications and use it in csv_importer

* try/catch safer

* return :ok

* date range is not available when no uploads

* improve ui and warning messages

* use Generic.notice

* fix flaky exports test

* begin tests

* Improve `Importer` notification payload shape

---------

Co-authored-by: Adrian Gruntkowski <adrian.gruntkowski@gmail.com>
This commit is contained in:
ruslandoga 2024-04-10 02:59:48 +08:00 committed by GitHub
parent bb108450cb
commit 1a0cb52f95
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 1429 additions and 468 deletions

View File

@ -19,17 +19,17 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
strategy: strategy:
matrix: matrix:
mix_env: ['test', 'small_test'] mix_env: ["test", "small_test"]
postgres_image: ['postgres:16'] postgres_image: ["postgres:16"]
test_experimental_reduced_joins: ['0'] test_experimental_reduced_joins: ["0"]
include: include:
- mix_env: 'test' - mix_env: "test"
postgres_image: 'postgres:15' postgres_image: "postgres:15"
test_experimental_reduced_joins: '0' test_experimental_reduced_joins: "0"
- mix_env: 'test' - mix_env: "test"
postgres_image: 'postgres:16' postgres_image: "postgres:16"
test_experimental_reduced_joins: '1' test_experimental_reduced_joins: "1"
env: env:
MIX_ENV: ${{ matrix.mix_env }} MIX_ENV: ${{ matrix.mix_env }}
@ -105,8 +105,13 @@ jobs:
- run: mix do ecto.create, ecto.migrate - run: mix do ecto.create, ecto.migrate
- run: mix run -e "Tzdata.ReleaseUpdater.poll_for_update" - run: mix run -e "Tzdata.ReleaseUpdater.poll_for_update"
- run: make minio
if: env.MIX_ENV == 'test'
- run: mix test --include slow --include minio --max-failures 1 --warnings-as-errors - run: mix test --include slow --include minio --max-failures 1 --warnings-as-errors
if: env.MIX_ENV == 'test' if: env.MIX_ENV == 'test'
env:
MINIO_HOST_FOR_CLICKHOUSE: "172.17.0.1"
- run: mix test --include slow --max-failures 1 --warnings-as-errors - run: mix test --include slow --max-failures 1 --warnings-as-errors
if: env.MIX_ENV == 'small_test' if: env.MIX_ENV == 'small_test'

View File

@ -40,8 +40,10 @@ postgres-stop: ## Stop and remove the postgres container
minio: ## Start a transient container with a recent version of minio (s3) minio: ## Start a transient container with a recent version of minio (s3)
docker run -d --rm -p 10000:10000 -p 10001:10001 --name plausible_minio minio/minio server /data --address ":10000" --console-address ":10001" docker run -d --rm -p 10000:10000 -p 10001:10001 --name plausible_minio minio/minio server /data --address ":10000" --console-address ":10001"
while ! docker exec plausible_minio mc alias set local http://localhost:10000 minioadmin minioadmin; do sleep 1; done while ! docker exec plausible_minio mc alias set local http://localhost:10000 minioadmin minioadmin; do sleep 1; done
docker exec plausible_minio mc mb local/dev-exports docker exec plausible_minio sh -c 'mc mb local/dev-exports && mc ilm add --expiry-days 7 local/dev-exports'
docker exec plausible_minio mc mb local/dev-imports docker exec plausible_minio sh -c 'mc mb local/dev-imports && mc ilm add --expiry-days 7 local/dev-imports'
docker exec plausible_minio sh -c 'mc mb local/test-exports && mc ilm add --expiry-days 7 local/test-exports'
docker exec plausible_minio sh -c 'mc mb local/test-imports && mc ilm add --expiry-days 7 local/test-imports'
minio-stop: ## Stop and remove the minio container minio-stop: ## Stop and remove the minio container
docker stop plausible_minio docker stop plausible_minio

View File

@ -296,7 +296,8 @@ config :plausible,
is_selfhost: is_selfhost, is_selfhost: is_selfhost,
custom_script_name: custom_script_name, custom_script_name: custom_script_name,
log_failed_login_attempts: log_failed_login_attempts, log_failed_login_attempts: log_failed_login_attempts,
license_key: license_key license_key: license_key,
persistent_cache_dir: persistent_cache_dir
config :plausible, :selfhost, config :plausible, :selfhost,
enable_email_verification: enable_email_verification, enable_email_verification: enable_email_verification,
@ -537,10 +538,10 @@ base_queues = [
site_setup_emails: 1, site_setup_emails: 1,
clean_invitations: 1, clean_invitations: 1,
analytics_imports: 1, analytics_imports: 1,
analytics_exports: 1,
notify_exported_analytics: 1,
domain_change_transition: 1, domain_change_transition: 1,
check_accept_traffic_until: 1, check_accept_traffic_until: 1
# NOTE: maybe move s3_csv_export to cloud_queues?
s3_csv_export: 1
] ]
cloud_queues = [ cloud_queues = [

View File

@ -3,85 +3,284 @@ defmodule Plausible.Exports do
Contains functions to export data for events and sessions as Zip archives. Contains functions to export data for events and sessions as Zip archives.
""" """
require Plausible use Plausible
import Ecto.Query import Ecto.Query
@doc "Schedules CSV export job to S3 storage"
@spec schedule_s3_export(pos_integer, String.t()) :: {:ok, Oban.Job.t()} | {:error, :no_data}
def schedule_s3_export(site_id, email_to) do
with :ok <- ensure_has_data(site_id) do
args = %{
"storage" => "s3",
"site_id" => site_id,
"email_to" => email_to,
"s3_bucket" => Plausible.S3.exports_bucket(),
"s3_path" => s3_export_key(site_id)
}
{:ok, Oban.insert!(Plausible.Workers.ExportAnalytics.new(args))}
end
end
@doc "Schedules CSV export job to local storage"
@spec schedule_local_export(pos_integer, String.t()) :: {:ok, Oban.Job.t()} | {:error, :no_data}
def schedule_local_export(site_id, email_to) do
with :ok <- ensure_has_data(site_id) do
args = %{
"storage" => "local",
"site_id" => site_id,
"email_to" => email_to,
"local_path" => local_export_file(site_id)
}
{:ok, Oban.insert!(Plausible.Workers.ExportAnalytics.new(args))}
end
end
@spec ensure_has_data(pos_integer) :: :ok | {:error, :no_data}
defp ensure_has_data(site_id) do
# SELECT true FROM "events_v2" AS e0 WHERE (e0."site_id" = ^site_id) LIMIT 1
has_data? = Plausible.ClickhouseRepo.exists?(from "events_v2", where: [site_id: ^site_id])
if has_data?, do: :ok, else: {:error, :no_data}
end
@doc "Gets last CSV export job for a site"
@spec get_last_export_job(pos_integer) :: Oban.Job.t() | nil
def get_last_export_job(site_id) do
Plausible.Repo.one(
from e in Plausible.Workers.ExportAnalytics.base_query(site_id),
order_by: [desc: :id],
limit: 1
)
end
@doc "Subscribes to CSV export job notifications"
def oban_listen, do: Oban.Notifier.listen(__MODULE__)
@doc false
def oban_notify(site_id), do: Oban.Notifier.notify(__MODULE__, %{"site_id" => site_id})
@doc """ @doc """
Renders filename for the Zip archive containing the exported CSV files. Renders export archive filename.
Examples: Examples:
iex> archive_filename("plausible.io", ~D[2021-01-01], ~D[2024-12-31]) iex> archive_filename("plausible.io", _created_on = ~D[2024-12-31])
"plausible_io_20210101_20241231.zip" "plausible_io_20241231.zip"
iex> archive_filename("Bücher.example", ~D[2021-01-01], ~D[2024-12-31])
"Bücher_example_20210101_20241231.zip"
""" """
def archive_filename(domain, min_date, max_date) do def archive_filename(domain, %Date{} = created_on) do
name = String.replace(domain, ".", "_") <> "_" <> Calendar.strftime(created_on, "%Y%m%d") <> ".zip"
Enum.join( end
[
String.replace(domain, ".", "_"), @doc ~S"""
Calendar.strftime(min_date, "%Y%m%d"), Safely renders content disposition for an arbitrary export filename.
Calendar.strftime(max_date, "%Y%m%d")
], Examples:
"_"
iex> content_disposition("plausible_io_20241231.zip")
"attachment; filename=\"plausible_io_20241231.zip\""
iex> content_disposition("📊.zip")
"attachment; filename=\"plausible-export.zip\"; filename*=utf-8''%F0%9F%93%8A.zip"
"""
def content_disposition(filename) do
encoded_filename = URI.encode(filename)
if encoded_filename == filename do
~s[attachment; filename="#{filename}"]
else
~s[attachment; filename="plausible-export.zip"; filename*=utf-8''#{encoded_filename}]
end
end
@type export :: %{
path: Path.t(),
name: String.t(),
expires_at: DateTime.t() | nil,
download_link: String.t(),
size: pos_integer
}
@doc "Gets local export for a site"
@spec get_local_export(pos_integer, String.t(), String.t()) :: export | nil
def get_local_export(site_id, domain, timezone) do
path = local_export_file(site_id)
if File.exists?(path) do
%File.Stat{size: size, mtime: mtime} = File.stat!(path, time: :posix)
created_at = DateTime.from_unix!(mtime)
created_on_in_site_tz = Plausible.Timezones.to_date_in_timezone(created_at, timezone)
name = archive_filename(domain, created_on_in_site_tz)
download_link =
PlausibleWeb.Router.Helpers.site_path(
PlausibleWeb.Endpoint,
:download_local_export,
domain
) )
name <> ".zip" %{path: path, name: name, expires_at: nil, download_link: download_link, size: size}
end
end
@doc "Deletes local export for a site"
@spec delete_local_export(pos_integer) :: :ok
def delete_local_export(site_id) do
file = local_export_file(site_id)
if File.exists?(file) do
File.rm!(file)
end
:ok
end
@spec local_export_file(pos_integer) :: Path.t()
defp local_export_file(site_id) do
persistent_cache_dir = Application.get_env(:plausible, :persistent_cache_dir)
Path.join([
persistent_cache_dir || System.tmp_dir!(),
"plausible-exports",
Integer.to_string(site_id)
])
end
@doc "Gets S3 export for a site"
@spec get_s3_export(pos_integer) :: export | nil
def get_s3_export(site_id) do
path = s3_export_key(site_id)
bucket = Plausible.S3.exports_bucket()
head_object_op = ExAws.S3.head_object(bucket, path)
case ExAws.request(head_object_op) do
{:error, {:http_error, 404, _response}} ->
nil
{:ok, %{status_code: 200, headers: headers}} ->
"attachment; filename=" <> filename = :proplists.get_value("content-disposition", headers)
name = String.trim(filename, "\"")
size = :proplists.get_value("content-length", headers, nil)
expires_at =
if x_amz_expiration = :proplists.get_value("x-amz-expiration", headers, nil) do
["expiry-date=", expiry_date, ", rule-id=", _rule_id] =
String.split(x_amz_expiration, "\"", trim: true)
Timex.parse!(expiry_date, "{RFC1123}")
end
%{
path: path,
name: name,
expires_at: expires_at,
download_link: Plausible.S3.download_url(bucket, path),
size: String.to_integer(size)
}
end
end
@doc "Deletes S3 export for a site"
@spec delete_s3_export(pos_integer) :: :ok
def delete_s3_export(site_id) do
if export = get_s3_export(site_id) do
exports_bucket = Plausible.S3.exports_bucket()
delete_op = ExAws.S3.delete_object(exports_bucket, export.path)
ExAws.request!(delete_op)
end
:ok
end
defp s3_export_key(site_id), do: Integer.to_string(site_id)
@doc "Returns the date range for the site's events data in site's timezone or `nil` if there is no data"
@spec date_range(non_neg_integer, String.t()) :: Date.Range.t() | nil
def date_range(site_id, timezone) do
[%Date{} = start_date, %Date{} = end_date] =
Plausible.ClickhouseRepo.one(
from e in "events_v2",
where: [site_id: ^site_id],
select: [
fragment("toDate(min(?),?)", e.timestamp, ^timezone),
fragment("toDate(max(?),?)", e.timestamp, ^timezone)
]
)
unless end_date == ~D[1970-01-01] do
Date.range(start_date, end_date)
end
end end
@doc """ @doc """
Builds Ecto queries to export data from `events_v2` and `sessions_v2` Builds Ecto queries to export data from `events_v2` and `sessions_v2`
tables into the format of `imported_*` tables for a website. tables into the format of `imported_*` tables for a website.
""" """
@spec export_queries(pos_integer, extname: String.t(), date_range: Date.Range.t()) :: @spec export_queries(pos_integer,
extname: String.t(),
date_range: Date.Range.t(),
timezone: String.t()
) ::
%{String.t() => Ecto.Query.t()} %{String.t() => Ecto.Query.t()}
def export_queries(site_id, opts \\ []) do def export_queries(site_id, opts \\ []) do
extname = opts[:extname] || ".csv" extname = opts[:extname] || ".csv"
date_range = opts[:date_range] date_range = opts[:date_range]
timezone = opts[:timezone] || "UTC"
filename = fn table -> suffix =
name =
if date_range do if date_range do
first_date = Timex.format!(date_range.first, "{YYYY}{0M}{0D}") first_date = Timex.format!(date_range.first, "{YYYY}{0M}{0D}")
last_date = Timex.format!(date_range.last, "{YYYY}{0M}{0D}") last_date = Timex.format!(date_range.last, "{YYYY}{0M}{0D}")
"#{table}_#{first_date}_#{last_date}" "_#{first_date}_#{last_date}" <> extname
else else
table extname
end end
name <> extname filename = fn name -> name <> suffix end
end
%{ %{
filename.("imported_visitors") => export_visitors_q(site_id), filename.("imported_visitors") => export_visitors_q(site_id, timezone, date_range),
filename.("imported_sources") => export_sources_q(site_id), filename.("imported_sources") => export_sources_q(site_id, timezone, date_range),
# NOTE: this query can result in `MEMORY_LIMIT_EXCEEDED` error # NOTE: this query can result in `MEMORY_LIMIT_EXCEEDED` error
filename.("imported_pages") => export_pages_q(site_id), filename.("imported_pages") => export_pages_q(site_id, timezone, date_range),
filename.("imported_entry_pages") => export_entry_pages_q(site_id), filename.("imported_entry_pages") => export_entry_pages_q(site_id, timezone, date_range),
filename.("imported_exit_pages") => export_exit_pages_q(site_id), filename.("imported_exit_pages") => export_exit_pages_q(site_id, timezone, date_range),
filename.("imported_locations") => export_locations_q(site_id), filename.("imported_locations") => export_locations_q(site_id, timezone, date_range),
filename.("imported_devices") => export_devices_q(site_id), filename.("imported_devices") => export_devices_q(site_id, timezone, date_range),
filename.("imported_browsers") => export_browsers_q(site_id), filename.("imported_browsers") => export_browsers_q(site_id, timezone, date_range),
filename.("imported_operating_systems") => export_operating_systems_q(site_id) filename.("imported_operating_systems") =>
export_operating_systems_q(site_id, timezone, date_range)
} }
end end
Plausible.on_full_build do on_full_build do
defp sampled(table) do defp sampled(table, date_range) do
Plausible.Stats.Sampling.add_query_hint(from(table)) from(table)
|> Plausible.Stats.Sampling.add_query_hint()
|> limit_date_range(date_range)
end end
else else
defp sampled(table) do defp sampled(table, date_range) do
table limit_date_range(table, date_range)
end end
end end
defmacrop date(timestamp) do defp limit_date_range(query, nil), do: query
defp limit_date_range(query, date_range) do
from t in query,
where:
selected_as(:date) >= ^date_range.first and
selected_as(:date) <= ^date_range.last
end
defmacrop date(timestamp, timezone) do
quote do quote do
selected_as(fragment("toDate(?)", unquote(timestamp)), :date) selected_as(
fragment("toDate(?,?)", unquote(timestamp), unquote(timezone)),
:date
)
end end
end end
@ -127,13 +326,12 @@ defmodule Plausible.Exports do
end end
end end
@spec export_visitors_q(pos_integer) :: Ecto.Query.t() defp export_visitors_q(site_id, timezone, date_range) do
def export_visitors_q(site_id) do from s in sampled("sessions_v2", date_range),
from s in sampled("sessions_v2"),
where: s.site_id == ^site_id, where: s.site_id == ^site_id,
group_by: selected_as(:date), group_by: selected_as(:date),
select: [ select: [
date(s.start), date(s.start, ^timezone),
visitors(s), visitors(s),
pageviews(s), pageviews(s),
bounces(s), bounces(s),
@ -142,9 +340,8 @@ defmodule Plausible.Exports do
] ]
end end
@spec export_sources_q(pos_integer) :: Ecto.Query.t() defp export_sources_q(site_id, timezone, date_range) do
def export_sources_q(site_id) do from s in sampled("sessions_v2", date_range),
from s in sampled("sessions_v2"),
where: s.site_id == ^site_id, where: s.site_id == ^site_id,
group_by: [ group_by: [
selected_as(:date), selected_as(:date),
@ -158,7 +355,7 @@ defmodule Plausible.Exports do
], ],
order_by: selected_as(:date), order_by: selected_as(:date),
select: [ select: [
date(s.start), date(s.start, ^timezone),
selected_as(s.referrer_source, :source), selected_as(s.referrer_source, :source),
s.referrer, s.referrer,
s.utm_source, s.utm_source,
@ -174,32 +371,40 @@ defmodule Plausible.Exports do
] ]
end end
@spec export_pages_q(pos_integer) :: Ecto.Query.t() defp export_pages_q(site_id, timezone, date_range) do
def export_pages_q(site_id) do
window_q = window_q =
from e in sampled("events_v2"), from e in sampled("events_v2", nil),
where: e.site_id == ^site_id, where: e.site_id == ^site_id,
where: [name: "pageview"],
select: %{ select: %{
timestamp: e.timestamp, timestamp: selected_as(fragment("toTimeZone(?,?)", e.timestamp, ^timezone), :timestamp),
next_timestamp: next_timestamp:
over(fragment("leadInFrame(?)", e.timestamp), over(fragment("leadInFrame(toTimeZone(?,?))", e.timestamp, ^timezone),
partition_by: e.session_id, partition_by: e.session_id,
order_by: e.timestamp, order_by: e.timestamp,
frame: fragment("ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING") frame: fragment("ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING")
), ),
pathname: e.pathname, pathname: e.pathname,
hostname: e.hostname, hostname: e.hostname,
name: e.name,
user_id: e.user_id, user_id: e.user_id,
session_id: e.session_id, session_id: e.session_id,
_sample_factor: fragment("_sample_factor") _sample_factor: fragment("_sample_factor")
} }
window_q =
if date_range do
from e in window_q,
where: selected_as(:timestamp) >= ^date_range.first,
where: fragment("toDate(?)", selected_as(:timestamp)) <= ^date_range.last
else
window_q
end
from e in subquery(window_q), from e in subquery(window_q),
group_by: [selected_as(:date), e.pathname], group_by: [selected_as(:date), e.pathname],
order_by: selected_as(:date), order_by: selected_as(:date),
select: [ select: [
date(e.timestamp), selected_as(fragment("toDate(?)", e.timestamp), :date),
selected_as(fragment("any(?)", e.hostname), :hostname), selected_as(fragment("any(?)", e.hostname), :hostname),
selected_as(e.pathname, :page), selected_as(e.pathname, :page),
selected_as( selected_as(
@ -207,11 +412,7 @@ defmodule Plausible.Exports do
:visits :visits
), ),
visitors(e), visitors(e),
selected_as( selected_as(fragment("toUInt64(round(count()*any(_sample_factor)))"), :pageviews),
fragment("toUInt64(round(countIf(?='pageview')*any(_sample_factor)))", e.name),
:pageviews
),
# NOTE: are exits pageviews or any events?
selected_as( selected_as(
fragment("toUInt64(round(countIf(?=0)*any(_sample_factor)))", e.next_timestamp), fragment("toUInt64(round(countIf(?=0)*any(_sample_factor)))", e.next_timestamp),
:exits :exits
@ -223,14 +424,13 @@ defmodule Plausible.Exports do
] ]
end end
@spec export_entry_pages_q(pos_integer) :: Ecto.Query.t() defp export_entry_pages_q(site_id, timezone, date_range) do
def export_entry_pages_q(site_id) do from s in sampled("sessions_v2", date_range),
from s in sampled("sessions_v2"),
where: s.site_id == ^site_id, where: s.site_id == ^site_id,
group_by: [selected_as(:date), s.entry_page], group_by: [selected_as(:date), s.entry_page],
order_by: selected_as(:date), order_by: selected_as(:date),
select: [ select: [
date(s.start), date(s.start, ^timezone),
s.entry_page, s.entry_page,
visitors(s), visitors(s),
selected_as( selected_as(
@ -243,14 +443,13 @@ defmodule Plausible.Exports do
] ]
end end
@spec export_exit_pages_q(pos_integer) :: Ecto.Query.t() defp export_exit_pages_q(site_id, timezone, date_range) do
def export_exit_pages_q(site_id) do from s in sampled("sessions_v2", date_range),
from s in sampled("sessions_v2"),
where: s.site_id == ^site_id, where: s.site_id == ^site_id,
group_by: [selected_as(:date), s.exit_page], group_by: [selected_as(:date), s.exit_page],
order_by: selected_as(:date), order_by: selected_as(:date),
select: [ select: [
date(s.start), date(s.start, ^timezone),
s.exit_page, s.exit_page,
visitors(s), visitors(s),
visit_duration(s), visit_duration(s),
@ -263,15 +462,14 @@ defmodule Plausible.Exports do
] ]
end end
@spec export_locations_q(pos_integer) :: Ecto.Query.t() defp export_locations_q(site_id, timezone, date_range) do
def export_locations_q(site_id) do from s in sampled("sessions_v2", date_range),
from s in sampled("sessions_v2"),
where: s.site_id == ^site_id, where: s.site_id == ^site_id,
where: s.city_geoname_id != 0 and s.country_code != "\0\0" and s.country_code != "ZZ", where: s.city_geoname_id != 0 and s.country_code != "\0\0" and s.country_code != "ZZ",
group_by: [selected_as(:date), s.country_code, selected_as(:region), s.city_geoname_id], group_by: [selected_as(:date), s.country_code, selected_as(:region), s.city_geoname_id],
order_by: selected_as(:date), order_by: selected_as(:date),
select: [ select: [
date(s.start), date(s.start, ^timezone),
selected_as(s.country_code, :country), selected_as(s.country_code, :country),
selected_as(s.subdivision1_code, :region), selected_as(s.subdivision1_code, :region),
selected_as(s.city_geoname_id, :city), selected_as(s.city_geoname_id, :city),
@ -283,14 +481,13 @@ defmodule Plausible.Exports do
] ]
end end
@spec export_devices_q(pos_integer) :: Ecto.Query.t() defp export_devices_q(site_id, timezone, date_range) do
def export_devices_q(site_id) do from s in sampled("sessions_v2", date_range),
from s in sampled("sessions_v2"),
where: s.site_id == ^site_id, where: s.site_id == ^site_id,
group_by: [selected_as(:date), s.screen_size], group_by: [selected_as(:date), s.screen_size],
order_by: selected_as(:date), order_by: selected_as(:date),
select: [ select: [
date(s.start), date(s.start, ^timezone),
selected_as(s.screen_size, :device), selected_as(s.screen_size, :device),
visitors(s), visitors(s),
visits(s), visits(s),
@ -300,14 +497,13 @@ defmodule Plausible.Exports do
] ]
end end
@spec export_browsers_q(pos_integer) :: Ecto.Query.t() defp export_browsers_q(site_id, timezone, date_range) do
def export_browsers_q(site_id) do from s in sampled("sessions_v2", date_range),
from s in sampled("sessions_v2"),
where: s.site_id == ^site_id, where: s.site_id == ^site_id,
group_by: [selected_as(:date), s.browser, s.browser_version], group_by: [selected_as(:date), s.browser, s.browser_version],
order_by: selected_as(:date), order_by: selected_as(:date),
select: [ select: [
date(s.start), date(s.start, ^timezone),
s.browser, s.browser,
s.browser_version, s.browser_version,
visitors(s), visitors(s),
@ -318,14 +514,13 @@ defmodule Plausible.Exports do
] ]
end end
@spec export_operating_systems_q(pos_integer) :: Ecto.Query.t() defp export_operating_systems_q(site_id, timezone, date_range) do
def export_operating_systems_q(site_id) do from s in sampled("sessions_v2", date_range),
from s in sampled("sessions_v2"),
where: s.site_id == ^site_id, where: s.site_id == ^site_id,
group_by: [selected_as(:date), s.operating_system, s.operating_system_version], group_by: [selected_as(:date), s.operating_system, s.operating_system_version],
order_by: selected_as(:date), order_by: selected_as(:date),
select: [ select: [
date(s.start), date(s.start, ^timezone),
s.operating_system, s.operating_system,
s.operating_system_version, s.operating_system_version,
visitors(s), visitors(s),

View File

@ -1,6 +1,7 @@
defmodule Plausible.Imported.CSVImporter do defmodule Plausible.Imported.CSVImporter do
@moduledoc """ @moduledoc """
CSV importer from S3 that uses ClickHouse [s3 table function.](https://clickhouse.com/docs/en/sql-reference/table-functions/s3) CSV importer from either S3 for which it uses ClickHouse [s3 table function](https://clickhouse.com/docs/en/sql-reference/table-functions/s3)
or from local storage for which it uses [input function.](https://clickhouse.com/docs/en/sql-reference/table-functions/input)
""" """
use Plausible.Imported.Importer use Plausible.Imported.Importer
@ -16,10 +17,45 @@ defmodule Plausible.Imported.CSVImporter do
def email_template(), do: "google_analytics_import.html" def email_template(), do: "google_analytics_import.html"
@impl true @impl true
def parse_args(%{"uploads" => uploads}), do: [uploads: uploads] def parse_args(%{"uploads" => uploads, "storage" => storage}) do
[uploads: uploads, storage: storage]
end
@impl true @impl true
def import_data(site_import, opts) do def import_data(site_import, opts) do
storage = Keyword.fetch!(opts, :storage)
uploads = Keyword.fetch!(opts, :uploads)
if storage == "local" do
# we need to remove the imported files from local storage
# after the importer has completed or ran out of attempts
paths = Enum.map(uploads, &Map.fetch!(&1, "local_path"))
Oban.insert!(
Plausible.Workers.LocalImportAnalyticsCleaner.new(
%{"import_id" => site_import.id, "paths" => paths},
schedule_in: _one_hour = 3600
)
)
end
{:ok, ch} =
Plausible.IngestRepo.config()
|> Keyword.replace!(:pool_size, 1)
|> Ch.start_link()
case storage do
"s3" -> import_s3(ch, site_import, uploads)
"local" -> import_local(ch, site_import, uploads)
end
rescue
# we are cancelling on any argument or ClickHouse errors, assuming they are permanent
e in [ArgumentError, Ch.Error] ->
# see Plausible.Imported.Importer for more details on transient vs permanent errors
{:error, Exception.message(e)}
end
defp import_s3(ch, site_import, uploads) do
%{ %{
id: import_id, id: import_id,
site_id: site_id, site_id: site_id,
@ -27,34 +63,20 @@ defmodule Plausible.Imported.CSVImporter do
end_date: end_date end_date: end_date
} = site_import } = site_import
uploads = Keyword.fetch!(opts, :uploads)
%{access_key_id: s3_access_key_id, secret_access_key: s3_secret_access_key} = %{access_key_id: s3_access_key_id, secret_access_key: s3_secret_access_key} =
Plausible.S3.import_clickhouse_credentials() Plausible.S3.import_clickhouse_credentials()
{:ok, ch} =
Plausible.IngestRepo.config()
|> Keyword.replace!(:pool_size, 1)
|> Ch.start_link()
Enum.each(uploads, fn upload -> Enum.each(uploads, fn upload ->
%{"filename" => filename, "s3_url" => s3_url} = upload %{"filename" => filename, "s3_url" => s3_url} = upload
{table, _, _} = parse_filename!(filename) {table, _, _} = parse_filename!(filename)
s3_structure = input_structure!(table) s3_structure = input_structure!(table)
s3_columns = input_columns!(table)
s3_structure_cols_expr =
s3_structure
|> String.split(",", trim: true)
|> Enum.map_join(", ", fn kv ->
[col, _type] = String.split(kv)
col
end)
statement = statement =
""" """
INSERT INTO {table:Identifier}(site_id, #{s3_structure_cols_expr}, import_id) \ INSERT INTO {table:Identifier}(site_id,import_id,#{s3_columns}) \
SELECT {site_id:UInt64} AS site_id, *, {import_id:UInt64} AS import_id \ SELECT {site_id:UInt64}, {import_id:UInt64}, * \
FROM s3({s3_url:String},{s3_access_key_id:String},{s3_secret_access_key:String},{s3_format:String},{s3_structure:String}) \ FROM s3({s3_url:String},{s3_access_key_id:String},{s3_secret_access_key:String},{s3_format:String},{s3_structure:String}) \
WHERE date >= {start_date:Date} AND date <= {end_date:Date}\ WHERE date >= {start_date:Date} AND date <= {end_date:Date}\
""" """
@ -75,10 +97,53 @@ defmodule Plausible.Imported.CSVImporter do
Ch.query!(ch, statement, params, timeout: :infinity) Ch.query!(ch, statement, params, timeout: :infinity)
end) end)
rescue end
# we are cancelling on any argument or ClickHouse errors
e in [ArgumentError, Ch.Error] -> defp import_local(ch, site_import, uploads) do
{:error, Exception.message(e)} %{
id: import_id,
site_id: site_id,
start_date: start_date,
end_date: end_date
} = site_import
DBConnection.run(
ch,
fn conn ->
Enum.each(uploads, fn upload ->
%{"filename" => filename, "local_path" => local_path} = upload
{table, _, _} = parse_filename!(filename)
input_structure = input_structure!(table)
input_columns = input_columns!(table)
statement =
"""
INSERT INTO {table:Identifier}(site_id,import_id,#{input_columns}) \
SELECT {site_id:UInt64}, {import_id:UInt64}, * \
FROM input({input_structure:String}) \
WHERE date >= {start_date:Date} AND date <= {end_date:Date} \
FORMAT CSVWithNames\
"""
params = %{
"table" => table,
"site_id" => site_id,
"import_id" => import_id,
"input_structure" => input_structure,
"start_date" => start_date,
"end_date" => end_date
}
# we are reading in 512KB chunks for better performance
# the default would've been line by line (not great for a CSV)
File.stream!(local_path, 512_000)
|> Stream.into(Ch.stream(conn, statement, params))
|> Stream.run()
end)
end,
timeout: :infinity
)
end end
input_structures = %{ input_structures = %{
@ -109,7 +174,7 @@ defmodule Plausible.Imported.CSVImporter do
iex> date_range([ iex> date_range([
...> %{"filename" => "imported_devices_20190101_20210101.csv"}, ...> %{"filename" => "imported_devices_20190101_20210101.csv"},
...> "imported_pages_20200101_20220101.csv" ...> "pages_20200101_20220101.csv"
...> ]) ...> ])
Date.range(~D[2019-01-01], ~D[2022-01-01]) Date.range(~D[2019-01-01], ~D[2022-01-01])
@ -165,6 +230,9 @@ defmodule Plausible.Imported.CSVImporter do
iex> parse_filename!("imported_devices_00010101_20250101.csv") iex> parse_filename!("imported_devices_00010101_20250101.csv")
{"imported_devices", ~D[0001-01-01], ~D[2025-01-01]} {"imported_devices", ~D[0001-01-01], ~D[2025-01-01]}
iex> parse_filename!("devices_00010101_20250101.csv")
{"imported_devices", ~D[0001-01-01], ~D[2025-01-01]}
""" """
@spec parse_filename!(String.t()) :: @spec parse_filename!(String.t()) ::
{table :: String.t(), start_date :: Date.t(), end_date :: Date.t()} {table :: String.t(), start_date :: Date.t(), end_date :: Date.t()}
@ -173,11 +241,29 @@ defmodule Plausible.Imported.CSVImporter do
for {table, input_structure} <- input_structures do for {table, input_structure} <- input_structures do
defp input_structure!(unquote(table)), do: unquote(input_structure) defp input_structure!(unquote(table)), do: unquote(input_structure)
input_columns =
input_structure
|> String.split(",", trim: true)
|> Enum.map_join(",", fn kv ->
[col, _type] = String.split(kv)
String.trim(col)
end)
defp input_columns!(unquote(table)), do: unquote(input_columns)
def parse_filename!( def parse_filename!(
<<unquote(table)::bytes, ?_, start_date::8-bytes, ?_, end_date::8-bytes, ".csv">> <<unquote(table)::bytes, ?_, start_date::8-bytes, ?_, end_date::8-bytes, ".csv">>
) do ) do
{unquote(table), parse_date!(start_date), parse_date!(end_date)} {unquote(table), parse_date!(start_date), parse_date!(end_date)}
end end
"imported_" <> name = table
def parse_filename!(
<<unquote(name)::bytes, ?_, start_date::8-bytes, ?_, end_date::8-bytes, ".csv">>
) do
{unquote(table), parse_date!(start_date), parse_date!(end_date)}
end
end end
def parse_filename!(_filename) do def parse_filename!(_filename) do
@ -195,6 +281,9 @@ defmodule Plausible.Imported.CSVImporter do
iex> valid_filename?("imported_devices_00010101_20250101.csv") iex> valid_filename?("imported_devices_00010101_20250101.csv")
true true
iex> valid_filename?("devices_00010101_20250101.csv")
true
""" """
@spec valid_filename?(String.t()) :: boolean @spec valid_filename?(String.t()) :: boolean
def valid_filename?(filename) do def valid_filename?(filename) do
@ -220,10 +309,35 @@ defmodule Plausible.Imported.CSVImporter do
iex> extract_table("imported_devices_00010101_20250101.csv") iex> extract_table("imported_devices_00010101_20250101.csv")
"imported_devices" "imported_devices"
iex> extract_table("devices_00010101_20250101.csv")
"imported_devices"
""" """
@spec extract_table(String.t()) :: String.t() @spec extract_table(String.t()) :: String.t()
def extract_table(filename) do def extract_table(filename) do
{table, _start_date, _end_date} = parse_filename!(filename) {table, _start_date, _end_date} = parse_filename!(filename)
table table
end end
@doc """
Returns local directory for CSV imports storage.
Builds upon `$PERSISTENT_CACHE_DIR` (if set) and falls back to /tmp
Examples:
iex> local_dir = local_dir(_site_id = 37)
iex> String.ends_with?(local_dir, "/plausible-imports/37")
true
"""
def local_dir(site_id) do
persistent_cache_dir = Application.get_env(:plausible, :persistent_cache_dir)
Path.join([
persistent_cache_dir || System.tmp_dir!(),
"plausible-imports",
Integer.to_string(site_id)
])
end
end end

View File

@ -73,13 +73,13 @@ defmodule Plausible.Imported.Importer do
import_id = job.args[:import_id] import_id = job.args[:import_id]
receive do receive do
{:notification, :analytics_imports_jobs, %{"complete" => ^import_id}} -> {:notification, :analytics_imports_jobs, %{"event" => "complete", "import_id" => ^import_id}} ->
IO.puts("Job completed") IO.puts("Job completed")
{:notification, :analytics_imports_jobs, %{"transient_fail" => ^import_id}} -> {:notification, :analytics_imports_jobs, %{"event" => "transient_fail", "import_id" => ^import_id}} ->
IO.puts("Job failed transiently") IO.puts("Job failed transiently")
{:notification, :analytics_imports_jobs, %{"fail" => ^import_id}} -> {:notification, :analytics_imports_jobs, %{"event" => "fail", "import_id" => ^import_id}} ->
IO.puts("Job failed permanently") IO.puts("Job failed permanently")
after after
15_000 -> 15_000 ->
@ -203,7 +203,11 @@ defmodule Plausible.Imported.Importer do
@doc false @doc false
def notify(site_import, event) do def notify(site_import, event) do
Oban.Notifier.notify(Oban, @oban_channel, %{event => site_import.id}) Oban.Notifier.notify(Oban, @oban_channel, %{
"event" => event,
"import_id" => site_import.id,
"site_id" => site_import.site_id
})
end end
@doc """ @doc """

View File

@ -43,31 +43,27 @@ defmodule Plausible.S3 do
Example: Example:
iex> %{ iex> upload = import_presign_upload(_site_id = 123, _filename = "imported_browsers.csv")
...> s3_url: "http://localhost:10000/test-imports/123/imported_browsers.csv", iex> true = String.ends_with?(upload.s3_url, "/test-imports/123/imported_browsers.csv")
...> presigned_url: "http://localhost:10000/test-imports/123/imported_browsers.csv?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=minioadmin" <> _ iex> true = String.contains?(upload.presigned_url, "/test-imports/123/imported_browsers.csv?X-Amz-Algorithm=AWS4-HMAC-SHA256&")
...> } = import_presign_upload(_site_id = 123, _filename = "imported_browsers.csv")
""" """
def import_presign_upload(site_id, filename) do def import_presign_upload(site_id, filename) do
config = ExAws.Config.new(:s3) config = ExAws.Config.new(:s3)
s3_path = Path.join(to_string(site_id), filename) s3_path = Path.join(Integer.to_string(site_id), filename)
bucket = imports_bucket() bucket = imports_bucket()
{:ok, presigned_url} = ExAws.S3.presigned_url(config, :put, bucket, s3_path) {:ok, presigned_url} = ExAws.S3.presigned_url(config, :put, bucket, s3_path)
%{s3_url: extract_s3_url(presigned_url), presigned_url: presigned_url} %{s3_url: extract_s3_url(presigned_url), presigned_url: presigned_url}
end end
# to make ClickHouse see MinIO in dev and test envs we replace # to make ClickHouse see MinIO in dev and test envs we replace
# the host in the S3 URL with whatever's set in S3_CLICKHOUSE_HOST env var # the host in the S3 URL with host.docker.internal or whatever's set in $MINIO_HOST_FOR_CLICKHOUSE
if Mix.env() in [:dev, :test, :small_dev, :small_test] do if Mix.env() in [:dev, :test, :small_dev, :small_test] do
defp extract_s3_url(presigned_url) do defp extract_s3_url(presigned_url) do
[s3_url, _] = String.split(presigned_url, "?") [s3_url, _] = String.split(presigned_url, "?")
default_ch_host = unless System.get_env("CI"), do: "host.docker.internal"
if ch_host = System.get_env("S3_CLICKHOUSE_HOST") do ch_host = System.get_env("MINIO_HOST_FOR_CLICKHOUSE", default_ch_host)
URI.to_string(%URI{URI.parse(s3_url) | host: ch_host}) URI.to_string(%URI{URI.parse(s3_url) | host: ch_host})
else
s3_url
end
end end
else else
defp extract_s3_url(presigned_url) do defp extract_s3_url(presigned_url) do
@ -79,36 +75,37 @@ defmodule Plausible.S3 do
@doc """ @doc """
Chunks and uploads Zip archive to the provided S3 destination. Chunks and uploads Zip archive to the provided S3 destination.
In the current implementation the bucket always goes into the path component.
"""
@spec export_upload_multipart(Enumerable.t(), String.t(), Path.t(), String.t()) :: :ok
def export_upload_multipart(stream, s3_bucket, s3_path, filename) do
# 5 MiB is the smallest chunk size AWS S3 supports
chunk_into_parts(stream, 5 * 1024 * 1024)
|> ExAws.S3.upload(s3_bucket, s3_path,
content_disposition: Plausible.Exports.content_disposition(filename),
content_type: "application/zip",
timeout: :infinity
)
|> ExAws.request!()
:ok
end
@doc """
Returns a presigned URL to download the exported Zip archive from S3. Returns a presigned URL to download the exported Zip archive from S3.
The URL expires in 24 hours. The URL expires in 24 hours.
In the current implementation the bucket always goes into the path component. In the current implementation the bucket always goes into the path component.
""" """
@spec export_upload_multipart(Enumerable.t(), String.t(), Path.t(), String.t(), keyword) :: @spec download_url(String.t(), Path.t()) :: :uri_string.uri_string()
:uri_string.uri_string() def download_url(s3_bucket, s3_path) do
def export_upload_multipart(stream, s3_bucket, s3_path, filename, config_overrides \\ []) do
config = ExAws.Config.new(:s3) config = ExAws.Config.new(:s3)
encoded_filename = URI.encode(filename) # ex_aws_s3 doesn't allow expires_in longer than one week
disposition = ~s[attachment; filename="#{encoded_filename}"] one_week = 60 * 60 * 24 * 7
disposition =
if encoded_filename != filename do
disposition <> "; filename*=utf-8''#{encoded_filename}"
else
disposition
end
# 5 MiB is the smallest chunk size AWS S3 supports
chunk_into_parts(stream, 5 * 1024 * 1024)
|> ExAws.S3.upload(s3_bucket, s3_path,
content_disposition: disposition,
content_type: "application/zip"
)
|> ExAws.request!(config_overrides)
{:ok, download_url} = {:ok, download_url} =
ExAws.S3.presigned_url(config, :get, s3_bucket, s3_path, expires_in: _24hr = 86_400) ExAws.S3.presigned_url(config, :get, s3_bucket, s3_path, expires_in: one_week)
download_url download_url
end end

View File

@ -23,14 +23,6 @@ defmodule Plausible.Sites do
Repo.get_by!(Site, domain: domain) Repo.get_by!(Site, domain: domain)
end end
def get_domain!(site_id) do
Plausible.Repo.one!(
from s in Plausible.Site,
where: [id: ^site_id],
select: s.domain
)
end
@spec toggle_pin(Auth.User.t(), Site.t()) :: @spec toggle_pin(Auth.User.t(), Site.t()) ::
{:ok, Site.UserPreference.t()} | {:error, :too_many_pins} {:ok, Site.UserPreference.t()} | {:error, :too_many_pins}
def toggle_pin(user, site) do def toggle_pin(user, site) do

View File

@ -1,6 +1,8 @@
defmodule PlausibleWeb.SiteController do defmodule PlausibleWeb.SiteController do
use PlausibleWeb, :controller use PlausibleWeb, :controller
use Plausible.Repo use Plausible.Repo
use Plausible
alias Plausible.Sites alias Plausible.Sites
alias Plausible.Billing.Quota alias Plausible.Billing.Quota
@ -710,21 +712,27 @@ defmodule PlausibleWeb.SiteController do
|> redirect(external: Routes.site_path(conn, :settings_integrations, site.domain)) |> redirect(external: Routes.site_path(conn, :settings_integrations, site.domain))
end end
def csv_export(conn, _params) do on_full_build do
%{site: site, current_user: user} = conn.assigns # exported archives are downloaded from object storage
else
alias Plausible.Exports
Oban.insert!( def download_local_export(conn, _params) do
Plausible.Workers.ExportCSV.new(%{ %{id: site_id, domain: domain, timezone: timezone} = conn.assigns.site
"site_id" => site.id,
"email_to" => user.email, if local_export = Exports.get_local_export(site_id, domain, timezone) do
"s3_bucket" => Plausible.S3.exports_bucket(), %{path: export_path, name: name} = local_export
"s3_path" => "Plausible-#{site.id}.zip"
})
)
conn conn
|> put_flash(:success, "SCHEDULED. WAIT FOR MAIL") |> put_resp_content_type("application/zip")
|> redirect(to: Routes.site_path(conn, :settings_imports_exports, site.domain)) |> put_resp_header("content-disposition", Exports.content_disposition(name))
|> send_file(200, export_path)
else
conn
|> put_flash(:error, "Export not found")
|> redirect(external: Routes.site_path(conn, :settings_imports_exports, domain))
end
end
end end
def csv_import(conn, _params) do def csv_import(conn, _params) do

View File

@ -1,4 +1,5 @@
defmodule PlausibleWeb.Email do defmodule PlausibleWeb.Email do
use Plausible
use Bamboo.Phoenix, view: PlausibleWeb.EmailView use Bamboo.Phoenix, view: PlausibleWeb.EmailView
import Bamboo.PostmarkHelper import Bamboo.PostmarkHelper
@ -346,6 +347,48 @@ defmodule PlausibleWeb.Email do
}) })
end end
def export_success(user, site, download_url, expires_at) do
subject =
on_full_build do
"Your Plausible Analytics export is now ready for download"
else
"Your export is now ready for download"
end
expires_in =
if expires_at do
Timex.Format.DateTime.Formatters.Relative.format!(
expires_at,
"{relative}"
)
end
priority_email()
|> to(user)
|> tag("export-success")
|> subject(subject)
|> render("export_success.html",
user: user,
site: site,
download_url: download_url,
expires_in: expires_in
)
end
def export_failure(user, site) do
subject =
on_full_build do
"Your Plausible Analytics export has failed"
else
"Your export has failed"
end
priority_email()
|> to(user)
|> subject(subject)
|> render("export_failure.html", user: user, site: site)
end
def error_report(reported_by, trace_id, feedback) do def error_report(reported_by, trace_id, feedback) do
Map.new() Map.new()
|> Map.put(:layout, nil) |> Map.put(:layout, nil)

View File

@ -0,0 +1,281 @@
defmodule PlausibleWeb.Live.CSVExport do
@moduledoc """
LiveView allowing scheduling, watching, downloading, and deleting S3 and local exports.
"""
use PlausibleWeb, :live_view
use Phoenix.HTML
alias PlausibleWeb.Components.Generic
alias Plausible.Exports
# :not_mounted_at_router ensures we have already done auth checks in the controller
# if this liveview becomes available from the router, please make sure
# to check that current_user_role is allowed to manage site exports
@impl true
def mount(:not_mounted_at_router, session, socket) do
%{
"storage" => storage,
"site_id" => site_id,
"email_to" => email_to
} = session
socket =
socket
|> assign(site_id: site_id, email_to: email_to, storage: storage)
|> assign_new(:site, fn -> Plausible.Repo.get!(Plausible.Site, site_id) end)
|> fetch_export()
if connected?(socket) do
Exports.oban_listen()
end
{:ok, socket}
end
defp fetch_export(socket) do
%{storage: storage, site_id: site_id} = socket.assigns
get_export =
case storage do
"s3" ->
&Exports.get_s3_export/1
"local" ->
%{domain: domain, timezone: timezone} = socket.assigns.site
&Exports.get_local_export(&1, domain, timezone)
end
socket = assign(socket, export: nil)
if job = Exports.get_last_export_job(site_id) do
%Oban.Job{state: state} = job
case state do
_ when state in ["scheduled", "available", "retryable"] ->
assign(socket, status: "in_progress")
"executing" ->
# Exports.oban_notify/1 is called in `perform/1` and
# the notification arrives while the job.state is still "executing"
if export = get_export.(site_id) do
assign(socket, status: "ready", export: export)
else
assign(socket, status: "in_progress")
end
"completed" ->
if export = get_export.(site_id) do
assign(socket, status: "ready", export: export)
else
assign(socket, status: "can_schedule")
end
"discarded" ->
assign(socket, status: "failed")
"cancelled" ->
# credo:disable-for-next-line Credo.Check.Refactor.Nesting
if export = get_export.(site_id) do
assign(socket, status: "ready", export: export)
else
assign(socket, status: "can_schedule")
end
end
else
if export = get_export.(site_id) do
assign(socket, status: "ready", export: export)
else
assign(socket, status: "can_schedule")
end
end
end
@impl true
def render(assigns) do
~H"""
<%= case @status do %>
<% "can_schedule" -> %>
<.prepare_download />
<% "in_progress" -> %>
<.in_progress />
<% "failed" -> %>
<.failed />
<% "ready" -> %>
<.download storage={@storage} export={@export} />
<% end %>
"""
end
defp prepare_download(assigns) do
~H"""
<Generic.button phx-click="export">Prepare download</Generic.button>
<p class="text-sm mt-4 text-gray-500">
Prepare your data for download by clicking the button above. When that's done, a Zip file that you can download will appear.
</p>
"""
end
defp in_progress(assigns) do
~H"""
<div class="flex items-center justify-between space-x-2">
<div class="flex items-center">
<Generic.spinner />
<span class="ml-2">We are preparing your download ...</span>
</div>
<button
phx-click="cancel"
class="text-red-500 font-semibold"
data-confirm="Are you sure you want to cancel this export?"
>
Cancel
</button>
</div>
<p class="text-sm mt-4 text-gray-500">
The preparation of your stats might take a while. Depending on the volume of your data, it might take up to 20 minutes. Feel free to leave the page and return later.
</p>
"""
end
defp failed(assigns) do
~H"""
<div class="flex items-center">
<Heroicons.exclamation_circle class="w-4 h-4 text-red-500" />
<p class="ml-2 text-sm text-gray-500">
Something went wrong when preparing your download. Please
<button phx-click="export" class="text-indigo-500">try again.</button>
</p>
</div>
"""
end
defp download(assigns) do
~H"""
<div class="flex items-center justify-between space-x-2">
<a href={@export.download_link} class="inline-flex items-center">
<Heroicons.document_text class="w-4 h-4" />
<span class="ml-1 text-indigo-500"><%= @export.name %></span>
</a>
<button
phx-click="delete"
class="text-red-500 font-semibold"
data-confirm="Are you sure you want to delete this export?"
>
<Heroicons.trash class="w-4 h-4" />
</button>
</div>
<p :if={@export.expires_at} class="text-sm mt-4 text-gray-500">
Note that this file will expire
<.hint message={@export.expires_at}>
<%= Timex.Format.DateTime.Formatters.Relative.format!(@export.expires_at, "{relative}") %>.
</.hint>
</p>
<p :if={@storage == "local"} class="text-sm mt-4 text-gray-500">
Located at
<.hint message={@export.path}><%= format_path(@export.path) %></.hint>
(<%= format_bytes(@export.size) %>)
</p>
"""
end
defp hint(assigns) do
~H"""
<span title={@message} class="underline cursor-help underline-offset-2 decoration-dashed">
<%= render_slot(@inner_block) %>
</span>
"""
end
@impl true
def handle_event("export", _params, socket) do
%{storage: storage, site_id: site_id, email_to: email_to} = socket.assigns
schedule_result =
case storage do
"s3" -> Exports.schedule_s3_export(site_id, email_to)
"local" -> Exports.schedule_local_export(site_id, email_to)
end
socket =
case schedule_result do
{:ok, _job} ->
fetch_export(socket)
{:error, :no_data} ->
socket
|> put_flash(:error, "There is no data to export")
|> redirect(
external:
Routes.site_path(socket, :settings_imports_exports, socket.assigns.site.domain)
)
end
{:noreply, socket}
end
def handle_event("cancel", _params, socket) do
if job = Exports.get_last_export_job(socket.assigns.site_id), do: Oban.cancel_job(job)
{:noreply, fetch_export(socket)}
end
def handle_event("delete", _params, socket) do
%{storage: storage, site_id: site_id} = socket.assigns
case storage do
"s3" -> Exports.delete_s3_export(site_id)
"local" -> Exports.delete_local_export(site_id)
end
{:noreply, fetch_export(socket)}
end
@impl true
def handle_info({:notification, Exports, %{"site_id" => site_id}}, socket) do
socket =
if site_id == socket.assigns.site_id do
fetch_export(socket)
else
socket
end
{:noreply, socket}
end
@format_path_regex ~r/^(?<beginning>((.+?\/){3})).*(?<ending>(\/.*){3})$/
defp format_path(path) do
path_string =
path
|> to_string()
|> String.replace_prefix("\"", "")
|> String.replace_suffix("\"", "")
case Regex.named_captures(@format_path_regex, path_string) do
%{"beginning" => beginning, "ending" => ending} -> "#{beginning}...#{ending}"
_ -> path_string
end
end
defp format_bytes(bytes) when is_integer(bytes) do
cond do
bytes >= memory_unit("TiB") -> format_bytes(bytes, "TiB")
bytes >= memory_unit("GiB") -> format_bytes(bytes, "GiB")
bytes >= memory_unit("MiB") -> format_bytes(bytes, "MiB")
bytes >= memory_unit("KiB") -> format_bytes(bytes, "KiB")
true -> format_bytes(bytes, "B")
end
end
defp format_bytes(bytes, "B"), do: "#{bytes} B"
defp format_bytes(bytes, unit) do
value = bytes / memory_unit(unit)
"#{:erlang.float_to_binary(value, decimals: 1)} #{unit}"
end
defp memory_unit("TiB"), do: 1024 * 1024 * 1024 * 1024
defp memory_unit("GiB"), do: 1024 * 1024 * 1024
defp memory_unit("MiB"), do: 1024 * 1024
defp memory_unit("KiB"), do: 1024
end

View File

@ -1,26 +1,76 @@
defmodule PlausibleWeb.Live.CSVImport do defmodule PlausibleWeb.Live.CSVImport do
@moduledoc """ @moduledoc """
LiveView allowing uploading CSVs for imported tables to S3 LiveView allowing uploading CSVs for imported tables to S3 or local storage
""" """
use PlausibleWeb, :live_view
alias Plausible.Imported.CSVImporter
use PlausibleWeb, :live_view
alias PlausibleWeb.Components.Generic
require Plausible.Imported.SiteImport
alias Plausible.Imported.CSVImporter
alias Plausible.Imported
# :not_mounted_at_router ensures we have already done auth checks in the controller
# if this liveview becomes available from the router, please make sure
# to check that current_user_role is allowed to make site imports
@impl true @impl true
def mount(_params, session, socket) do def mount(:not_mounted_at_router, session, socket) do
%{"site_id" => site_id, "user_id" => user_id} = session %{"site_id" => site_id, "current_user_id" => user_id, "storage" => storage} = session
upload_opts = [
accept: [".csv", "text/csv"],
auto_upload: true,
max_entries: length(Imported.tables()),
# 1GB
max_file_size: 1_000_000_000,
progress: &handle_progress/3
]
upload_opts =
case storage do
"s3" -> [{:external, &presign_upload/2} | upload_opts]
"local" -> upload_opts
end
upload_consumer =
case storage do
"s3" ->
fn meta, entry ->
{:ok, %{"s3_url" => meta.s3_url, "filename" => entry.client_name}}
end
"local" ->
local_dir = CSVImporter.local_dir(site_id)
File.mkdir_p!(local_dir)
fn meta, entry ->
local_path = Path.join(local_dir, Path.basename(meta.path))
File.rename!(meta.path, local_path)
{:ok, %{"local_path" => local_path, "filename" => entry.client_name}}
end
end
%{assigns: %{site: site}} =
socket = assign_new(socket, :site, fn -> Plausible.Repo.get!(Plausible.Site, site_id) end)
# we'll listen for new completed imports to know
# when to reload the occupied ranges
if connected?(socket), do: Imported.listen()
occupied_ranges = Imported.get_occupied_date_ranges(site)
native_stats_start_date = Plausible.Sites.native_stats_start_date(site)
socket = socket =
socket socket
|> assign(site_id: site_id, user_id: user_id) |> assign(
|> allow_upload(:import, site_id: site_id,
accept: [".csv", "text/csv"], user_id: user_id,
auto_upload: true, storage: storage,
max_entries: length(Plausible.Imported.tables()), upload_consumer: upload_consumer,
# 1GB occupied_ranges: occupied_ranges,
max_file_size: 1_000_000_000, native_stats_start_date: native_stats_start_date
external: &presign_upload/2,
progress: &handle_progress/3
) )
|> allow_upload(:import, upload_opts)
|> process_imported_tables() |> process_imported_tables()
{:ok, socket} {:ok, socket}
@ -32,8 +82,12 @@ defmodule PlausibleWeb.Live.CSVImport do
<div> <div>
<form action="#" method="post" phx-change="validate-upload-form" phx-submit="submit-upload-form"> <form action="#" method="post" phx-change="validate-upload-form" phx-submit="submit-upload-form">
<.csv_picker upload={@uploads.import} imported_tables={@imported_tables} /> <.csv_picker upload={@uploads.import} imported_tables={@imported_tables} />
<.confirm_button date_range={@date_range} can_confirm?={@can_confirm?} /> <.confirm_button date_range={@clamped_date_range} can_confirm?={@can_confirm?} />
<.maybe_date_range_warning
:if={@original_date_range}
clamped={@clamped_date_range}
original={@original_date_range}
/>
<p :for={error <- upload_errors(@uploads.import)} class="text-red-400"> <p :for={error <- upload_errors(@uploads.import)} class="text-red-400">
<%= error_to_string(error) %> <%= error_to_string(error) %>
</p> </p>
@ -46,13 +100,11 @@ defmodule PlausibleWeb.Live.CSVImport do
~H""" ~H"""
<label <label
phx-drop-target={@upload.ref} phx-drop-target={@upload.ref}
class="block border-2 dark:border-gray-600 rounded p-4 group hover:border-indigo-500 dark:hover:border-indigo-600 transition cursor-pointer" class="block border-2 dark:border-gray-600 rounded-md p-4 hover:bg-gray-50 dark:hover:bg-gray-900 hover:border-indigo-500 dark:hover:border-indigo-600 transition cursor-pointer"
> >
<div class="flex items-center"> <div class="flex items-center text-gray-500 dark:text-gray-500">
<div class="bg-gray-200 dark:bg-gray-600 rounded p-1 group-hover:bg-indigo-500 dark:group-hover:bg-indigo-600 transition"> <Heroicons.document_plus class="w-5 h-5 transition" />
<Heroicons.document_plus class="w-5 h-5 group-hover:text-white transition" /> <span class="ml-1.5 text-sm">
</div>
<span class="ml-2 text-sm text-gray-600 dark:text-gray-500">
(or drag-and-drop your unzipped CSVs here) (or drag-and-drop your unzipped CSVs here)
</span> </span>
<.live_file_input upload={@upload} class="hidden" /> <.live_file_input upload={@upload} class="hidden" />
@ -81,7 +133,7 @@ defmodule PlausibleWeb.Live.CSVImport do
]} ]}
> >
<%= if @date_range do %> <%= if @date_range do %>
Confirm import from <%= @date_range.first %> to <%= @date_range.last %> Confirm import <.dates range={@date_range} />
<% else %> <% else %>
Confirm import Confirm import
<% end %> <% end %>
@ -89,6 +141,31 @@ defmodule PlausibleWeb.Live.CSVImport do
""" """
end end
defp maybe_date_range_warning(assigns) do
~H"""
<%= if @clamped do %>
<Generic.notice :if={@clamped != @original} title="Dates Adjusted" theme={:yellow} class="mt-4">
The dates <.dates range={@original} />
overlap with previous imports, so we'll use the next best period, <.dates range={@clamped} />
</Generic.notice>
<% else %>
<Generic.notice title="Dates Conflict" theme={:red} class="mt-4">
The dates <.dates range={@original} />
overlap with dates we've already imported and cannot be used for new imports.
</Generic.notice>
<% end %>
"""
end
defp dates(assigns) do
~H"""
<span class="whitespace-nowrap">
<span class="font-medium"><%= @range.first %></span>
to <span class="font-medium"><%= @range.last %></span>
</span>
"""
end
defp imported_table(assigns) do defp imported_table(assigns) do
status = status =
cond do cond do
@ -101,19 +178,16 @@ defmodule PlausibleWeb.Live.CSVImport do
assigns = assign(assigns, status: status) assigns = assign(assigns, status: status)
~H""" ~H"""
<li id={@table} class="ml-1.5"> <li id={@table} class="ml-0.5">
<div class="flex items-center space-x-2"> <div class="flex items-center space-x-2 text-gray-600 dark:text-gray-500">
<Heroicons.document_check :if={@status == :success} class="w-4 h-4 text-indigo-600" /> <Heroicons.document_check :if={@status == :success} class="w-4 h-4" />
<PlausibleWeb.Components.Generic.spinner <Generic.spinner :if={@status == :in_progress} class="w-4 h-4" />
:if={@status == :in_progress} <Heroicons.document :if={@status == :empty} class="w-4 h-4 opacity-80" />
class="w-4 h-4 text-indigo-600"
/>
<Heroicons.document :if={@status == :empty} class="w-4 h-4 text-gray-400 dark:text-gray-500" />
<Heroicons.document :if={@status == :error} class="w-4 h-4 text-red-600 dark:text-red-700" /> <Heroicons.document :if={@status == :error} class="w-4 h-4 text-red-600 dark:text-red-700" />
<span class={[ <span class={[
"text-sm", "text-sm",
if(@upload, do: "dark:text-gray-400", else: "text-gray-400 dark:text-gray-500"), if(@status == :empty, do: "opacity-80"),
if(@status == :error, do: "text-red-600 dark:text-red-700") if(@status == :error, do: "text-red-600 dark:text-red-700")
]}> ]}>
<%= if @upload do %> <%= if @upload do %>
@ -137,20 +211,24 @@ defmodule PlausibleWeb.Live.CSVImport do
end end
def handle_event("submit-upload-form", _params, socket) do def handle_event("submit-upload-form", _params, socket) do
%{site_id: site_id, user_id: user_id, date_range: date_range} = socket.assigns %{
site = Plausible.Repo.get!(Plausible.Site, site_id) storage: storage,
user = Plausible.Repo.get!(Plausible.Auth.User, user_id) site: site,
user_id: user_id,
clamped_date_range: clamped_date_range,
upload_consumer: upload_consumer
} =
socket.assigns
uploads = user = Plausible.Repo.get!(Plausible.Auth.User, user_id)
consume_uploaded_entries(socket, :import, fn meta, entry -> uploads = consume_uploaded_entries(socket, :import, upload_consumer)
{:ok, %{"s3_url" => meta.s3_url, "filename" => entry.client_name}}
end)
{:ok, _job} = {:ok, _job} =
CSVImporter.new_import(site, user, CSVImporter.new_import(site, user,
start_date: date_range.first, start_date: clamped_date_range.first,
end_date: date_range.last, end_date: clamped_date_range.last,
uploads: uploads uploads: uploads,
storage: storage
) )
redirect_to = redirect_to =
@ -159,6 +237,21 @@ defmodule PlausibleWeb.Live.CSVImport do
{:noreply, redirect(socket, external: redirect_to)} {:noreply, redirect(socket, external: redirect_to)}
end end
@impl true
def handle_info({:notification, :analytics_imports_jobs, details}, socket) do
site = socket.assigns.site
socket =
if details["site_id"] == site.id and details["event"] == "complete" do
occupied_ranges = Imported.get_occupied_date_ranges(site)
socket |> assign(occupied_ranges: occupied_ranges) |> process_imported_tables()
else
socket
end
{:noreply, socket}
end
defp error_to_string(:too_large), do: "is too large (max size is 1 gigabyte)" defp error_to_string(:too_large), do: "is too large (max size is 1 gigabyte)"
defp error_to_string(:too_many_files), do: "too many files" defp error_to_string(:too_many_files), do: "too many files"
defp error_to_string(:not_accepted), do: "unacceptable file types" defp error_to_string(:not_accepted), do: "unacceptable file types"
@ -166,11 +259,20 @@ defmodule PlausibleWeb.Live.CSVImport do
defp presign_upload(entry, socket) do defp presign_upload(entry, socket) do
%{s3_url: s3_url, presigned_url: upload_url} = %{s3_url: s3_url, presigned_url: upload_url} =
Plausible.S3.import_presign_upload(socket.assigns.site_id, entry.client_name) Plausible.S3.import_presign_upload(socket.assigns.site_id, random_suffix(entry.client_name))
{:ok, %{uploader: "S3", s3_url: s3_url, url: upload_url}, socket} {:ok, %{uploader: "S3", s3_url: s3_url, url: upload_url}, socket}
end end
defp random_suffix(filename) do
# based on Plug.Upload.path/2
# https://github.com/elixir-plug/plug/blob/eabf0b9d43060c10663a9105cb1baf984d272a6c/lib/plug/upload.ex#L154-L159
sec = Integer.to_string(:os.system_time(:second))
rand = Integer.to_string(:rand.uniform(999_999_999_999))
scheduler_id = Integer.to_string(:erlang.system_info(:scheduler_id))
filename <> "-" <> sec <> "-" <> rand <> "-" <> scheduler_id
end
defp handle_progress(:import, entry, socket) do defp handle_progress(:import, entry, socket) do
if entry.done? do if entry.done? do
{:noreply, process_imported_tables(socket)} {:noreply, process_imported_tables(socket)}
@ -180,7 +282,7 @@ defmodule PlausibleWeb.Live.CSVImport do
end end
defp process_imported_tables(socket) do defp process_imported_tables(socket) do
tables = Plausible.Imported.tables() tables = Imported.tables()
{completed, in_progress} = uploaded_entries(socket, :import) {completed, in_progress} = uploaded_entries(socket, :import)
{valid_uploads, invalid_uploads} = {valid_uploads, invalid_uploads} =
@ -207,16 +309,37 @@ defmodule PlausibleWeb.Live.CSVImport do
replaced_uploads replaced_uploads
end) end)
date_range = CSVImporter.date_range(Enum.map(valid_uploads, & &1.client_name)) original_date_range = CSVImporter.date_range(Enum.map(valid_uploads, & &1.client_name))
clamped_date_range =
if original_date_range do
%Date.Range{first: start_date, last: end_date} = original_date_range
%{
site: site,
occupied_ranges: occupied_ranges,
native_stats_start_date: native_stats_start_date
} = socket.assigns
cutoff_date = native_stats_start_date || Timex.today(site.timezone)
case Imported.clamp_dates(occupied_ranges, cutoff_date, start_date, end_date) do
{:ok, start_date, end_date} -> Date.range(start_date, end_date)
{:error, :no_time_window} -> nil
end
end
all_uploaded? = completed != [] and in_progress == [] all_uploaded? = completed != [] and in_progress == []
can_confirm? = all_uploaded? and not is_nil(clamped_date_range)
socket socket
|> cancel_uploads(invalid_uploads) |> cancel_uploads(invalid_uploads)
|> cancel_uploads(replaced_uploads) |> cancel_uploads(replaced_uploads)
|> assign( |> assign(
imported_tables: imported_tables, imported_tables: imported_tables,
can_confirm?: all_uploaded?, can_confirm?: can_confirm?,
date_range: date_range original_date_range: original_date_range,
clamped_date_range: clamped_date_range
) )
end end

View File

@ -145,9 +145,9 @@ defmodule PlausibleWeb.Live.ImportsExportsSettings do
""" """
end end
def handle_info({:notification, :analytics_imports_jobs, status}, socket) do def handle_info({:notification, :analytics_imports_jobs, details}, socket) do
[{status_str, import_id}] = Enum.to_list(status) {site_imports, updated?} =
{site_imports, updated?} = update_imports(socket.assigns.site_imports, import_id, status_str) update_imports(socket.assigns.site_imports, details["import_id"], details["event"])
pageview_counts = pageview_counts =
if updated? do if updated? do

View File

@ -62,7 +62,7 @@ defmodule PlausibleWeb.Router do
end end
end end
if Mix.env() == :dev do if Mix.env() in [:dev, :small_dev] do
forward "/sent-emails", Bamboo.SentEmailViewerPlug forward "/sent-emails", Bamboo.SentEmailViewerPlug
end end
@ -390,7 +390,13 @@ defmodule PlausibleWeb.Router do
delete "/:website/settings/forget-imported", SiteController, :forget_imported delete "/:website/settings/forget-imported", SiteController, :forget_imported
delete "/:website/settings/forget-import/:import_id", SiteController, :forget_import delete "/:website/settings/forget-import/:import_id", SiteController, :forget_import
post "/:website/settings/export", SiteController, :csv_export
on_full_build do
# exported archives are downloaded from object storage
else
get "/:website/exported-archive", SiteController, :download_local_export
end
get "/:website/settings/import", SiteController, :csv_import get "/:website/settings/import", SiteController, :csv_import
get "/:domain/export", StatsController, :csv_export get "/:domain/export", StatsController, :csv_export

View File

@ -0,0 +1,5 @@
Your <%= if full_build?() do %>Plausible Analytics <% end %>export for <%= @site.domain %> has encountered an error and was unsuccessful.
Sorry for the trouble this may have caused.
<br/><br/>
Please attempt to export your data again.
<%= if full_build?() do %>Should the problem persist, do reply to this email so we can assist. Thanks!<% end %>

View File

@ -0,0 +1,3 @@
Your <%= if full_build?() do %>Plausible Analytics <% end %>export for <%= @site.domain %> is now ready for download.
Please click <a href="<%= @download_url %>">here</a> to start the download process.
<%= if @expires_in do %>Note that this link will expire <%= @expires_in %>.<% end %>

View File

@ -15,6 +15,10 @@
</div> </div>
<%= live_render(@conn, PlausibleWeb.Live.CSVImport, <%= live_render(@conn, PlausibleWeb.Live.CSVImport,
session: %{"site_id" => @site.id, "user_id" => @current_user.id} session: %{
"site_id" => @site.id,
"current_user_id" => @current_user.id,
"storage" => on_full_build(do: "s3", else: "local")
}
) %> ) %>
</div> </div>

View File

@ -13,9 +13,11 @@
<%= live_render(@conn, PlausibleWeb.Live.ImportsExportsSettings, <%= live_render(@conn, PlausibleWeb.Live.ImportsExportsSettings,
session: %{"domain" => @site.domain} session: %{"domain" => @site.domain}
) %> ) %>
</div>
<header class="relative border-b border-gray-200 pb-4"> <div class="shadow bg-white dark:bg-gray-800 dark:text-gray-200 sm:rounded-md sm:overflow-hidden py-6 px-4 sm:p-6">
<h2 class="mt-8 text-lg leading-6 font-medium text-gray-900 dark:text-gray-100"> <header class="relative border-b border-gray-200 pb-4 mb-5">
<h2 class="text-lg leading-6 font-medium text-gray-900 dark:text-gray-100">
Export Data Export Data
</h2> </h2>
<p class="mt-1 text-sm leading-5 text-gray-500 dark:text-gray-200"> <p class="mt-1 text-sm leading-5 text-gray-500 dark:text-gray-200">
@ -23,13 +25,11 @@
</p> </p>
</header> </header>
<div class="mt-4"> <%= live_render(@conn, PlausibleWeb.Live.CSVExport,
<PlausibleWeb.Components.Generic.button session: %{
data-method="post" "site_id" => @site.id,
data-to={"/#{URI.encode_www_form(@site.domain)}/settings/export"} "email_to" => @current_user.email,
data-csrf={Plug.CSRFProtection.get_csrf_token()} "storage" => on_full_build(do: "s3", else: "local")
> }
Export to CSV ) %>
</PlausibleWeb.Components.Generic.button>
</div>
</div> </div>

View File

@ -0,0 +1,121 @@
defmodule Plausible.Workers.ExportAnalytics do
@moduledoc """
Worker for running CSV export jobs. Supports S3 and local storage.
To avoid blocking the queue, a timeout of 15 minutes is enforced.
"""
use Oban.Worker,
queue: :analytics_exports,
max_attempts: 3
alias Plausible.Exports
@doc "This base query filters export jobs for a site"
def base_query(site_id) do
import Ecto.Query, only: [from: 2]
from j in Oban.Job,
where: j.worker == ^Oban.Worker.to_string(__MODULE__),
where: j.args["site_id"] == ^site_id
end
@impl true
def timeout(_job), do: :timer.minutes(15)
@impl true
def perform(%Oban.Job{args: args} = job) do
%{
"storage" => storage,
"site_id" => site_id
} = args
site = Plausible.Repo.get!(Plausible.Site, site_id)
%Date.Range{} = date_range = Exports.date_range(site.id, site.timezone)
queries =
Exports.export_queries(site_id,
date_range: date_range,
timezone: site.timezone,
extname: ".csv"
)
# since each worker / `perform` attempt runs in a separate process
# it's ok to use start_link to keep connection lifecycle
# bound to that of the worker
{:ok, ch} =
Plausible.ClickhouseRepo.config()
|> Keyword.replace!(:pool_size, 1)
|> Ch.start_link()
try do
case storage do
"s3" -> perform_s3_export(ch, site, queries, args)
"local" -> perform_local_export(ch, queries, args)
end
after
Exports.oban_notify(site_id)
end
email_success(job.args)
:ok
catch
class, reason ->
if job.attempt >= job.max_attempts, do: email_failure(job.args)
:erlang.raise(class, reason, __STACKTRACE__)
end
defp perform_s3_export(ch, site, queries, args) do
%{
"s3_bucket" => s3_bucket,
"s3_path" => s3_path
} = args
created_on = Plausible.Timezones.to_date_in_timezone(DateTime.utc_now(), site.timezone)
filename = Exports.archive_filename(site.domain, created_on)
DBConnection.run(
ch,
fn conn ->
conn
|> Exports.stream_archive(queries, format: "CSVWithNames")
|> Plausible.S3.export_upload_multipart(s3_bucket, s3_path, filename)
end,
timeout: :infinity
)
end
defp perform_local_export(ch, queries, args) do
%{"local_path" => local_path} = args
tmp_path = Plug.Upload.random_file!("tmp-plausible-export")
DBConnection.run(
ch,
fn conn ->
Exports.stream_archive(conn, queries, format: "CSVWithNames")
|> Stream.into(File.stream!(tmp_path))
|> Stream.run()
end,
timeout: :infinity
)
File.mkdir_p!(Path.dirname(local_path))
if File.exists?(local_path), do: File.rm!(local_path)
File.rename!(tmp_path, local_path)
end
defp email_failure(args) do
args |> Map.put("status", "failure") |> email()
end
defp email_success(args) do
args |> Map.put("status", "success") |> email()
end
defp email(args) do
# email delivery can potentially fail and cause already successful
# export to be repeated which is costly, hence email is delivered
# in a separate job
Oban.insert!(Plausible.Workers.NotifyExportedAnalytics.new(args))
end
end

View File

@ -1,105 +0,0 @@
defmodule Plausible.Workers.ExportCSV do
@moduledoc """
Worker for running CSV export jobs.
"""
use Oban.Worker,
queue: :s3_csv_export,
max_attempts: 3,
unique: [fields: [:args], keys: [:s3_bucket, :s3_path], period: 60]
@impl true
def perform(job) do
%Oban.Job{
args:
%{
"site_id" => site_id,
"email_to" => email,
"s3_bucket" => s3_bucket,
"s3_path" => s3_path
} = args
} = job
{:ok, ch} =
Plausible.ClickhouseRepo.config()
|> Keyword.replace!(:pool_size, 1)
|> Ch.start_link()
%Ch.Result{rows: [[%Date{} = min_date, %Date{} = max_date]]} =
Ch.query!(
ch,
"SELECT toDate(min(timestamp)), toDate(max(timestamp)) FROM events_v2 WHERE site_id={site_id:UInt64}",
%{"site_id" => site_id}
)
if max_date == ~D[1970-01-01] do
# NOTE: replace with proper Plausible.Email template
Plausible.Mailer.deliver_now!(
Bamboo.Email.new_email(
from: PlausibleWeb.Email.mailer_email_from(),
to: email,
subject: "EXPORT FAILURE",
text_body: "there is nothing to export"
)
)
else
domain = Plausible.Sites.get_domain!(site_id)
export_archive_filename = Plausible.Exports.archive_filename(domain, min_date, max_date)
s3_config_overrides = s3_config_overrides(args)
download_url =
DBConnection.run(
ch,
fn conn ->
conn
|> Plausible.Exports.stream_archive(
Plausible.Exports.export_queries(site_id,
date_range: Date.range(min_date, max_date),
extname: ".csv"
),
format: "CSVWithNames"
)
|> Plausible.S3.export_upload_multipart(
s3_bucket,
s3_path,
export_archive_filename,
s3_config_overrides
)
end,
timeout: :infinity
)
# NOTE: replace with proper Plausible.Email template
Plausible.Mailer.deliver_now!(
Bamboo.Email.new_email(
from: PlausibleWeb.Email.mailer_email_from(),
to: email,
subject: "EXPORT SUCCESS",
text_body: """
download it from #{download_url}! hurry up! you have 24 hours!"
""",
html_body: """
download it from <a href="#{download_url}">here</a>! hurry up! you have 24 hours!
"""
)
)
end
:ok
end
# right now custom config is used in tests only (to access the minio container)
# ideally it would be passed via the s3 url
# but ExAws.S3.upload is hard to make work with s3 urls
if Mix.env() in [:test, :small_test] do
defp s3_config_overrides(args) do
if config_overrides = args["s3_config_overrides"] do
Enum.map(config_overrides, fn {k, v} -> {String.to_existing_atom(k), v} end)
else
[]
end
end
else
defp s3_config_overrides(_args), do: []
end
end

View File

@ -0,0 +1,32 @@
defmodule Plausible.Workers.LocalImportAnalyticsCleaner do
@moduledoc """
Worker for cleaning local files left after analytics import jobs.
"""
use Oban.Worker, queue: :analytics_imports, unique: [period: 3600]
@impl Oban.Worker
def perform(%Oban.Job{args: args}) do
%{"import_id" => import_id, "paths" => paths} = args
if import_in_progress?(import_id) do
{:snooze, _one_hour = 3600}
else
Enum.each(paths, fn path ->
# credo:disable-for-next-line Credo.Check.Refactor.Nesting
if File.exists?(path), do: File.rm!(path)
end)
end
end
defp import_in_progress?(import_id) do
import Ecto.Query
require Plausible.Imported.SiteImport
alias Plausible.Imported.SiteImport
SiteImport
|> where(id: ^import_id)
|> where([i], i.status in ^[SiteImport.pending(), SiteImport.importing()])
|> Plausible.Repo.exists?()
end
end

View File

@ -0,0 +1,48 @@
defmodule Plausible.Workers.NotifyExportedAnalytics do
@moduledoc "This worker delivers emails for successful and failed exports"
use Oban.Worker,
queue: :notify_exported_analytics,
max_attempts: 5
@impl true
def perform(%Oban.Job{args: args}) do
%{
"status" => status,
"storage" => storage,
"email_to" => email_to,
"site_id" => site_id
} = args
user = Plausible.Repo.get_by!(Plausible.Auth.User, email: email_to)
site = Plausible.Repo.get!(Plausible.Site, site_id)
email =
case status do
"success" ->
case storage do
"s3" ->
%{"s3_bucket" => s3_bucket, "s3_path" => s3_path} = args
download_url = Plausible.S3.download_url(s3_bucket, s3_path)
%{expires_at: expires_at} = Plausible.Exports.get_s3_export(site_id)
PlausibleWeb.Email.export_success(user, site, download_url, expires_at)
"local" ->
download_url =
PlausibleWeb.Router.Helpers.site_path(
PlausibleWeb.Endpoint,
:download_local_export,
site.domain
)
PlausibleWeb.Email.export_success(user, site, download_url, _expires_at = nil)
end
"failure" ->
PlausibleWeb.Email.export_failure(user, site)
end
Plausible.Mailer.deliver_now!(email)
:ok
end
end

View File

@ -141,7 +141,6 @@ defmodule Plausible.MixProject do
{:ex_aws, "~> 2.5"}, {:ex_aws, "~> 2.5"},
{:ex_aws_s3, "~> 2.5"}, {:ex_aws_s3, "~> 2.5"},
{:sweet_xml, "~> 0.7.4"}, {:sweet_xml, "~> 0.7.4"},
{:testcontainers, "~> 1.6", only: [:test, :small_test]},
{:zstream, "~> 0.6.4"}, {:zstream, "~> 0.6.4"},
{:con_cache, "~> 1.0"} {:con_cache, "~> 1.0"}
] ]

View File

@ -47,7 +47,6 @@
"ex_cldr_currencies": {:hex, :ex_cldr_currencies, "2.15.1", "e92ba17c41e7405b7784e0e65f406b5f17cfe313e0e70de9befd653e12854822", [:mix], [{:ex_cldr, "~> 2.34", [hex: :ex_cldr, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "31df8bd37688340f8819bdd770eb17d659652078d34db632b85d4a32864d6a25"}, "ex_cldr_currencies": {:hex, :ex_cldr_currencies, "2.15.1", "e92ba17c41e7405b7784e0e65f406b5f17cfe313e0e70de9befd653e12854822", [:mix], [{:ex_cldr, "~> 2.34", [hex: :ex_cldr, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "31df8bd37688340f8819bdd770eb17d659652078d34db632b85d4a32864d6a25"},
"ex_cldr_numbers": {:hex, :ex_cldr_numbers, "2.32.3", "b631ff94c982ec518e46bf4736000a30a33d6b58facc085d5f240305f512ad4a", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:digital_token, "~> 0.3 or ~> 1.0", [hex: :digital_token, repo: "hexpm", optional: false]}, {:ex_cldr, "~> 2.37", [hex: :ex_cldr, repo: "hexpm", optional: false]}, {:ex_cldr_currencies, ">= 2.14.2", [hex: :ex_cldr_currencies, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "7b626ff1e59a0ec9c3c5db5ce9ca91a6995e2ab56426b71f3cbf67181ea225f5"}, "ex_cldr_numbers": {:hex, :ex_cldr_numbers, "2.32.3", "b631ff94c982ec518e46bf4736000a30a33d6b58facc085d5f240305f512ad4a", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:digital_token, "~> 0.3 or ~> 1.0", [hex: :digital_token, repo: "hexpm", optional: false]}, {:ex_cldr, "~> 2.37", [hex: :ex_cldr, repo: "hexpm", optional: false]}, {:ex_cldr_currencies, ">= 2.14.2", [hex: :ex_cldr_currencies, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "7b626ff1e59a0ec9c3c5db5ce9ca91a6995e2ab56426b71f3cbf67181ea225f5"},
"ex_doc": {:hex, :ex_doc, "0.31.1", "8a2355ac42b1cc7b2379da9e40243f2670143721dd50748bf6c3b1184dae2089", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "3178c3a407c557d8343479e1ff117a96fd31bafe52a039079593fb0524ef61b0"}, "ex_doc": {:hex, :ex_doc, "0.31.1", "8a2355ac42b1cc7b2379da9e40243f2670143721dd50748bf6c3b1184dae2089", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "3178c3a407c557d8343479e1ff117a96fd31bafe52a039079593fb0524ef61b0"},
"ex_docker_engine_api": {:hex, :ex_docker_engine_api, "1.43.1", "1161e34b6bea5cef84d8fdc1d5d510fcb0c463941ce84c36f4a0f44a9096eb96", [:mix], [{:hackney, "~> 1.20", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:tesla, "~> 1.7", [hex: :tesla, repo: "hexpm", optional: false]}], "hexpm", "ec8fc499389aeef56ddca67e89e9e98098cff50587b56e8b4613279f382793b1"},
"ex_json_logger": {:hex, :ex_json_logger, "1.4.0", "ad1dcc1cfe6940ee1d9d489b20757c89769626ce34c4957548d6fbe155cd96f1", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "7548a1ecba290746e06214d2b3d8783c76760c779a8903a8e44bfd23a7340444"}, "ex_json_logger": {:hex, :ex_json_logger, "1.4.0", "ad1dcc1cfe6940ee1d9d489b20757c89769626ce34c4957548d6fbe155cd96f1", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "7548a1ecba290746e06214d2b3d8783c76760c779a8903a8e44bfd23a7340444"},
"ex_machina": {:hex, :ex_machina, "2.7.0", "b792cc3127fd0680fecdb6299235b4727a4944a09ff0fa904cc639272cd92dc7", [:mix], [{:ecto, "~> 2.2 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_sql, "~> 3.0", [hex: :ecto_sql, repo: "hexpm", optional: true]}], "hexpm", "419aa7a39bde11894c87a615c4ecaa52d8f107bbdd81d810465186f783245bf8"}, "ex_machina": {:hex, :ex_machina, "2.7.0", "b792cc3127fd0680fecdb6299235b4727a4944a09ff0fa904cc639272cd92dc7", [:mix], [{:ecto, "~> 2.2 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_sql, "~> 3.0", [hex: :ecto_sql, repo: "hexpm", optional: true]}], "hexpm", "419aa7a39bde11894c87a615c4ecaa52d8f107bbdd81d810465186f783245bf8"},
"ex_money": {:hex, :ex_money, "5.15.3", "ea070eb1eefd22258aa288921ba482f1fa5f870d229069dc3d12458b7b8bf66d", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ex_cldr_numbers, "~> 2.31", [hex: :ex_cldr_numbers, repo: "hexpm", optional: false]}, {:gringotts, "~> 1.1", [hex: :gringotts, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.0 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:poison, "~> 3.0 or ~> 4.0 or ~> 5.0", [hex: :poison, repo: "hexpm", optional: true]}], "hexpm", "3671f1808c428b7c4688650d43dc1af0b64c0eea822429a28c55cef15fb4fdc1"}, "ex_money": {:hex, :ex_money, "5.15.3", "ea070eb1eefd22258aa288921ba482f1fa5f870d229069dc3d12458b7b8bf66d", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ex_cldr_numbers, "~> 2.31", [hex: :ex_cldr_numbers, repo: "hexpm", optional: false]}, {:gringotts, "~> 1.1", [hex: :gringotts, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.0 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:poison, "~> 3.0 or ~> 4.0 or ~> 5.0", [hex: :poison, repo: "hexpm", optional: true]}], "hexpm", "3671f1808c428b7c4688650d43dc1af0b64c0eea822429a28c55cef15fb4fdc1"},
@ -142,14 +141,11 @@
"telemetry_metrics_prometheus_core": {:hex, :telemetry_metrics_prometheus_core, "1.1.0", "4e15f6d7dbedb3a4e3aed2262b7e1407f166fcb9c30ca3f96635dfbbef99965c", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "0dd10e7fe8070095df063798f82709b0a1224c31b8baf6278b423898d591a069"}, "telemetry_metrics_prometheus_core": {:hex, :telemetry_metrics_prometheus_core, "1.1.0", "4e15f6d7dbedb3a4e3aed2262b7e1407f166fcb9c30ca3f96635dfbbef99965c", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "0dd10e7fe8070095df063798f82709b0a1224c31b8baf6278b423898d591a069"},
"telemetry_poller": {:hex, :telemetry_poller, "1.0.0", "db91bb424e07f2bb6e73926fcafbfcbcb295f0193e0a00e825e589a0a47e8453", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "b3a24eafd66c3f42da30fc3ca7dda1e9d546c12250a2d60d7b81d264fbec4f6e"}, "telemetry_poller": {:hex, :telemetry_poller, "1.0.0", "db91bb424e07f2bb6e73926fcafbfcbcb295f0193e0a00e825e589a0a47e8453", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "b3a24eafd66c3f42da30fc3ca7dda1e9d546c12250a2d60d7b81d264fbec4f6e"},
"telemetry_registry": {:hex, :telemetry_registry, "0.3.1", "14a3319a7d9027bdbff7ebcacf1a438f5f5c903057b93aee484cca26f05bdcba", [:mix, :rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6d0ca77b691cf854ed074b459a93b87f4c7f5512f8f7743c635ca83da81f939e"}, "telemetry_registry": {:hex, :telemetry_registry, "0.3.1", "14a3319a7d9027bdbff7ebcacf1a438f5f5c903057b93aee484cca26f05bdcba", [:mix, :rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6d0ca77b691cf854ed074b459a93b87f4c7f5512f8f7743c635ca83da81f939e"},
"tesla": {:hex, :tesla, "1.8.0", "d511a4f5c5e42538d97eef7c40ec4f3e44effdc5068206f42ed859e09e51d1fd", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: true]}, {:finch, "~> 0.13", [hex: :finch, repo: "hexpm", optional: true]}, {:fuse, "~> 2.4", [hex: :fuse, repo: "hexpm", optional: true]}, {:gun, ">= 1.0.0", [hex: :gun, repo: "hexpm", optional: true]}, {:hackney, "~> 1.6", [hex: :hackney, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.2", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: true]}, {:msgpax, "~> 2.3", [hex: :msgpax, repo: "hexpm", optional: true]}, {:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "10501f360cd926a309501287470372af1a6e1cbed0f43949203a4c13300bc79f"},
"testcontainers": {:hex, :testcontainers, "1.6.0", "14b3251f01ce0b1ada716130d371ba0b6cb1ce2904aa38bd58e5ff4194f4d88f", [:mix], [{:ecto, "~> 3.3", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_sql, "~> 3.3", [hex: :ecto_sql, repo: "hexpm", optional: true]}, {:ex_docker_engine_api, "~> 1.43.1", [hex: :ex_docker_engine_api, repo: "hexpm", optional: false]}, {:uuid, "~> 1.1", [hex: :uuid, repo: "hexpm", optional: false]}], "hexpm", "3f812407f232954999a3a2e05b2802e1d8d1afba120533c42b32c7cc91d35daf"},
"timex": {:hex, :timex, "3.7.11", "bb95cb4eb1d06e27346325de506bcc6c30f9c6dea40d1ebe390b262fad1862d1", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:gettext, "~> 0.20", [hex: :gettext, repo: "hexpm", optional: false]}, {:tzdata, "~> 1.1", [hex: :tzdata, repo: "hexpm", optional: false]}], "hexpm", "8b9024f7efbabaf9bd7aa04f65cf8dcd7c9818ca5737677c7b76acbc6a94d1aa"}, "timex": {:hex, :timex, "3.7.11", "bb95cb4eb1d06e27346325de506bcc6c30f9c6dea40d1ebe390b262fad1862d1", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:gettext, "~> 0.20", [hex: :gettext, repo: "hexpm", optional: false]}, {:tzdata, "~> 1.1", [hex: :tzdata, repo: "hexpm", optional: false]}], "hexpm", "8b9024f7efbabaf9bd7aa04f65cf8dcd7c9818ca5737677c7b76acbc6a94d1aa"},
"tls_certificate_check": {:hex, :tls_certificate_check, "1.21.0", "042ab2c0c860652bc5cf69c94e3a31f96676d14682e22ec7813bd173ceff1788", [:rebar3], [{:ssl_verify_fun, "~> 1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "6cee6cffc35a390840d48d463541d50746a7b0e421acaadb833cfc7961e490e7"}, "tls_certificate_check": {:hex, :tls_certificate_check, "1.21.0", "042ab2c0c860652bc5cf69c94e3a31f96676d14682e22ec7813bd173ceff1788", [:rebar3], [{:ssl_verify_fun, "~> 1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "6cee6cffc35a390840d48d463541d50746a7b0e421acaadb833cfc7961e490e7"},
"tzdata": {:hex, :tzdata, "1.1.1", "20c8043476dfda8504952d00adac41c6eda23912278add38edc140ae0c5bcc46", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "a69cec8352eafcd2e198dea28a34113b60fdc6cb57eb5ad65c10292a6ba89787"}, "tzdata": {:hex, :tzdata, "1.1.1", "20c8043476dfda8504952d00adac41c6eda23912278add38edc140ae0c5bcc46", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "a69cec8352eafcd2e198dea28a34113b60fdc6cb57eb5ad65c10292a6ba89787"},
"ua_inspector": {:hex, :ua_inspector, "3.9.0", "2021bbddb1ee41f202da7f006fb09f5c5617ad579108b7b9bcf1881828462f04", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:yamerl, "~> 0.7", [hex: :yamerl, repo: "hexpm", optional: false]}], "hexpm", "0f2d1b9806e78b14b91b6f20eaaa4a68f8989f4f2a99a376b874c295ac50a30d"}, "ua_inspector": {:hex, :ua_inspector, "3.9.0", "2021bbddb1ee41f202da7f006fb09f5c5617ad579108b7b9bcf1881828462f04", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:yamerl, "~> 0.7", [hex: :yamerl, repo: "hexpm", optional: false]}], "hexpm", "0f2d1b9806e78b14b91b6f20eaaa4a68f8989f4f2a99a376b874c295ac50a30d"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"},
"uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm", "c790593b4c3b601f5dc2378baae7efaf5b3d73c4c6456ba85759905be792f2ac"},
"websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"},
"websock_adapter": {:hex, :websock_adapter, "0.5.5", "9dfeee8269b27e958a65b3e235b7e447769f66b5b5925385f5a569269164a210", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "4b977ba4a01918acbf77045ff88de7f6972c2a009213c515a445c48f224ffce9"}, "websock_adapter": {:hex, :websock_adapter, "0.5.5", "9dfeee8269b27e958a65b3e235b7e447769f66b5b5925385f5a569269164a210", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "4b977ba4a01918acbf77045ff88de7f6972c2a009213c515a445c48f224ffce9"},
"yamerl": {:hex, :yamerl, "0.10.0", "4ff81fee2f1f6a46f1700c0d880b24d193ddb74bd14ef42cb0bcf46e81ef2f8e", [:rebar3], [], "hexpm", "346adb2963f1051dc837a2364e4acf6eb7d80097c0f53cbdc3046ec8ec4b4e6e"}, "yamerl": {:hex, :yamerl, "0.10.0", "4ff81fee2f1f6a46f1700c0d880b24d193ddb74bd14ef42cb0bcf46e81ef2f8e", [:rebar3], [], "hexpm", "346adb2963f1051dc837a2364e4acf6eb7d80097c0f53cbdc3046ec8ec4b4e6e"},

View File

@ -1,5 +1,5 @@
defmodule Plausible.ExportsTest do defmodule Plausible.ExportsTest do
use Plausible.DataCase, async: true use Plausible.DataCase
doctest Plausible.Exports, import: true doctest Plausible.Exports, import: true
@ -72,17 +72,16 @@ defmodule Plausible.ExportsTest do
test "creates zip archive", %{ch: ch, tmp_dir: tmp_dir} do test "creates zip archive", %{ch: ch, tmp_dir: tmp_dir} do
queries = %{ queries = %{
"1.csv" => from(n in "numbers", select: n.number, limit: 3), "1.csv" => from(n in fragment("numbers(3)"), select: n.number),
"2.csv" => "2.csv" =>
from(n in "numbers", from(n in fragment("numbers(3)"),
select: [n.number, selected_as(n.number + n.number, :double)], select: [n.number, selected_as(n.number + n.number, :double)]
limit: 3
) )
} }
DBConnection.run(ch, fn conn -> DBConnection.run(ch, fn conn ->
conn conn
|> Plausible.Exports.stream_archive(queries, database: "system", format: "CSVWithNames") |> Plausible.Exports.stream_archive(queries, format: "CSVWithNames")
|> Stream.into(File.stream!(Path.join(tmp_dir, "numbers.zip"))) |> Stream.into(File.stream!(Path.join(tmp_dir, "numbers.zip")))
|> Stream.run() |> Stream.run()
end) end)
@ -123,14 +122,14 @@ defmodule Plausible.ExportsTest do
test "stops on error", %{ch: ch, tmp_dir: tmp_dir} do test "stops on error", %{ch: ch, tmp_dir: tmp_dir} do
queries = %{ queries = %{
"1.csv" => from(n in "numbers", select: n.number, limit: 1000), "1.csv" => from(n in fragment("numbers(1000)"), select: n.number),
"2.csv" => from(n in "no_such_table", select: n.number) "2.csv" => from(n in "no_such_table", select: n.number)
} }
assert_raise Ch.Error, ~r/UNKNOWN_TABLE/, fn -> assert_raise Ch.Error, ~r/UNKNOWN_TABLE/, fn ->
DBConnection.run(ch, fn conn -> DBConnection.run(ch, fn conn ->
conn conn
|> Plausible.Exports.stream_archive(queries, database: "system", format: "CSVWithNames") |> Plausible.Exports.stream_archive(queries, format: "CSVWithNames")
|> Stream.into(File.stream!(Path.join(tmp_dir, "failed.zip"))) |> Stream.into(File.stream!(Path.join(tmp_dir, "failed.zip")))
|> Stream.run() |> Stream.run()
end) end)

View File

@ -1,44 +1,13 @@
defmodule Plausible.Imported.CSVImporterTest do defmodule Plausible.Imported.CSVImporterTest do
use Plausible.DataCase, async: true use Plausible
use Plausible.DataCase
alias Plausible.Imported.{CSVImporter, SiteImport} alias Plausible.Imported.{CSVImporter, SiteImport}
alias Testcontainers.MinioContainer
require SiteImport require SiteImport
doctest CSVImporter, import: true doctest CSVImporter, import: true
on_full_build do
@moduletag :minio @moduletag :minio
setup_all do
Testcontainers.start_link()
{:ok, minio} = Testcontainers.start_container(MinioContainer.new())
on_exit(fn -> :ok = Testcontainers.stop_container(minio.container_id) end)
connection_opts = MinioContainer.connection_opts(minio)
s3 = fn op -> ExAws.request!(op, connection_opts) end
s3.(ExAws.S3.put_bucket("imports", "us-east-1"))
s3.(ExAws.S3.put_bucket("exports", "us-east-1"))
{:ok, container: minio, s3: s3}
end
setup %{container: minio, s3: s3} do
connection_opts = MinioContainer.connection_opts(minio)
clean_bucket = fn bucket ->
ExAws.S3.list_objects_v2(bucket)
|> ExAws.stream!(connection_opts)
|> Stream.each(fn objects ->
keys = objects |> List.wrap() |> Enum.map(& &1.key)
s3.(ExAws.S3.delete_all_objects(bucket, keys))
end)
|> Stream.run()
end
clean_bucket.("imports")
clean_bucket.("exports")
:ok
end end
describe "new_import/3 and parse_args/1" do describe "new_import/3 and parse_args/1" do
@ -64,10 +33,18 @@ defmodule Plausible.Imported.CSVImporterTest do
Enum.map(tables, fn table -> Enum.map(tables, fn table ->
filename = "#{table}_#{start_date}_#{end_date}.csv" filename = "#{table}_#{start_date}_#{end_date}.csv"
on_full_build do
%{ %{
"filename" => filename, "filename" => filename,
"s3_url" => "https://bucket-name.s3.eu-north-1.amazonaws.com/#{site.id}/#{filename}" "s3_url" =>
"https://bucket-name.s3.eu-north-1.amazonaws.com/#{site.id}/#{filename}-some-random-suffix"
} }
else
%{
"filename" => filename,
"local_path" => "/tmp/some-random-path"
}
end
end) end)
date_range = CSVImporter.date_range(uploads) date_range = CSVImporter.date_range(uploads)
@ -76,7 +53,8 @@ defmodule Plausible.Imported.CSVImporterTest do
CSVImporter.new_import(site, user, CSVImporter.new_import(site, user,
start_date: date_range.first, start_date: date_range.first,
end_date: date_range.last, end_date: date_range.last,
uploads: uploads uploads: uploads,
storage: on_full_build(do: "s3", else: "local")
) )
assert %Oban.Job{args: %{"import_id" => import_id, "uploads" => ^uploads} = args} = assert %Oban.Job{args: %{"import_id" => import_id, "uploads" => ^uploads} = args} =
@ -93,14 +71,22 @@ defmodule Plausible.Imported.CSVImporterTest do
] = Plausible.Imported.list_all_imports(site) ] = Plausible.Imported.list_all_imports(site)
assert %{imported_data: nil} = Repo.reload!(site) assert %{imported_data: nil} = Repo.reload!(site)
assert CSVImporter.parse_args(args) == [uploads: uploads]
assert CSVImporter.parse_args(args) == [
uploads: uploads,
storage: on_full_build(do: "s3", else: "local")
]
end end
end end
describe "import_data/2" do describe "import_data/2" do
setup [:create_user, :create_new_site] setup [:create_user, :create_new_site, :clean_buckets]
@describetag :tmp_dir
test "imports tables from S3", %{site: site, user: user} = ctx do
_ = ctx
test "imports tables from S3", %{site: site, user: user, s3: s3, container: minio} do
csvs = [ csvs = [
%{ %{
name: "imported_browsers_20211230_20211231.csv", name: "imported_browsers_20211230_20211231.csv",
@ -328,23 +314,29 @@ defmodule Plausible.Imported.CSVImporterTest do
uploads = uploads =
for %{name: name, body: body} <- csvs do for %{name: name, body: body} <- csvs do
key = "#{site.id}/#{name}" on_full_build do
s3.(ExAws.S3.put_object("imports", key, body)) %{s3_url: s3_url} = Plausible.S3.import_presign_upload(site.id, name)
%{"filename" => name, "s3_url" => minio_url(minio, "imports", key)} [bucket, key] = String.split(URI.parse(s3_url).path, "/", parts: 2)
ExAws.request!(ExAws.S3.put_object(bucket, key, body))
%{"filename" => name, "s3_url" => s3_url}
else
local_path = Path.join(ctx.tmp_dir, name)
File.write!(local_path, body)
%{"filename" => name, "local_path" => local_path}
end
end end
date_range = CSVImporter.date_range(uploads) date_range = CSVImporter.date_range(uploads)
{:ok, job} = {:ok, _job} =
CSVImporter.new_import(site, user, CSVImporter.new_import(site, user,
start_date: date_range.first, start_date: date_range.first,
end_date: date_range.last, end_date: date_range.last,
uploads: uploads uploads: uploads,
storage: on_full_build(do: "s3", else: "local")
) )
job = Repo.reload!(job) assert %{success: 1} = Oban.drain_queue(queue: :analytics_imports, with_safety?: false)
assert :ok = Plausible.Workers.ImportAnalytics.perform(job)
assert %SiteImport{ assert %SiteImport{
start_date: ~D[2011-12-25], start_date: ~D[2011-12-25],
@ -356,7 +348,9 @@ defmodule Plausible.Imported.CSVImporterTest do
assert Plausible.Stats.Clickhouse.imported_pageview_count(site) == 99 assert Plausible.Stats.Clickhouse.imported_pageview_count(site) == 99
end end
test "fails on invalid CSV", %{site: site, user: user, s3: s3, container: minio} do test "fails on invalid CSV", %{site: site, user: user} = ctx do
_ = ctx
csvs = [ csvs = [
%{ %{
name: "imported_browsers_20211230_20211231.csv", name: "imported_browsers_20211230_20211231.csv",
@ -382,24 +376,33 @@ defmodule Plausible.Imported.CSVImporterTest do
uploads = uploads =
for %{name: name, body: body} <- csvs do for %{name: name, body: body} <- csvs do
key = "#{site.id}/#{name}" on_full_build do
s3.(ExAws.S3.put_object("imports", key, body)) %{s3_url: s3_url} = Plausible.S3.import_presign_upload(site.id, name)
%{"filename" => name, "s3_url" => minio_url(minio, "imports", key)} [bucket, key] = String.split(URI.parse(s3_url).path, "/", parts: 2)
ExAws.request!(ExAws.S3.put_object(bucket, key, body))
%{"filename" => name, "s3_url" => s3_url}
else
local_path = Path.join(ctx.tmp_dir, name)
File.write!(local_path, body)
%{"filename" => name, "local_path" => local_path}
end
end end
date_range = CSVImporter.date_range(uploads) date_range = CSVImporter.date_range(uploads)
{:ok, job} = {:ok, _job} =
CSVImporter.new_import(site, user, CSVImporter.new_import(site, user,
start_date: date_range.first, start_date: date_range.first,
end_date: date_range.last, end_date: date_range.last,
uploads: uploads uploads: uploads,
storage: on_full_build(do: "s3", else: "local")
) )
job = Repo.reload!(job) assert %{discard: 1} = Oban.drain_queue(queue: :analytics_imports, with_safety?: false)
assert {:discard, message} = Plausible.Workers.ImportAnalytics.perform(job) # TODO
assert message =~ "CANNOT_PARSE_INPUT_ASSERTION_FAILED" # assert {:discard, message} = Plausible.Workers.ImportAnalytics.perform(job)
# assert message =~ "CANNOT_PARSE_INPUT_ASSERTION_FAILED"
assert %SiteImport{id: import_id, source: :csv, status: :failed} = assert %SiteImport{id: import_id, source: :csv, status: :failed} =
Repo.get_by!(SiteImport, site_id: site.id) Repo.get_by!(SiteImport, site_id: site.id)
@ -411,11 +414,10 @@ defmodule Plausible.Imported.CSVImporterTest do
end end
describe "export -> import" do describe "export -> import" do
setup [:create_user, :create_new_site] setup [:create_user, :create_new_site, :clean_buckets]
@describetag :tmp_dir @tag :tmp_dir
test "it works", %{site: site, user: user, tmp_dir: tmp_dir} do
test "it works", %{site: site, user: user, s3: s3, tmp_dir: tmp_dir, container: minio} do
populate_stats(site, [ populate_stats(site, [
build(:pageview, build(:pageview,
user_id: 123, user_id: 123,
@ -479,50 +481,57 @@ defmodule Plausible.Imported.CSVImporterTest do
]) ])
# export archive to s3 # export archive to s3
Oban.insert!( on_full_build do
Plausible.Workers.ExportCSV.new(%{ assert {:ok, _job} = Plausible.Exports.schedule_s3_export(site.id, user.email)
"site_id" => site.id, else
"email_to" => user.email, assert {:ok, %{args: %{"local_path" => local_path}}} =
"s3_bucket" => "exports", Plausible.Exports.schedule_local_export(site.id, user.email)
"s3_path" => "#{site.id}/Plausible.zip", end
"s3_config_overrides" => Map.new(MinioContainer.connection_opts(minio))
})
)
assert %{success: 1} = Oban.drain_queue(queue: :s3_csv_export, with_safety: false) assert %{success: 1} = Oban.drain_queue(queue: :analytics_exports, with_safety: false)
# download archive # download archive
s3.( on_full_build do
ExAws.request!(
ExAws.S3.download_file( ExAws.S3.download_file(
"exports", Plausible.S3.exports_bucket(),
"/#{site.id}/Plausible.zip", to_string(site.id),
Path.join(tmp_dir, "Plausible.zip") Path.join(tmp_dir, "plausible-export.zip")
) )
) )
else
File.rename!(local_path, Path.join(tmp_dir, "plausible-export.zip"))
end
# unzip archive # unzip archive
{:ok, files} = :zip.unzip(to_charlist(Path.join(tmp_dir, "Plausible.zip")), cwd: tmp_dir) {:ok, files} =
:zip.unzip(to_charlist(Path.join(tmp_dir, "plausible-export.zip")), cwd: tmp_dir)
# upload csvs # upload csvs
uploads = uploads =
Enum.map(files, fn file -> Enum.map(files, fn file ->
key = "#{site.id}/#{Path.basename(file)}" on_full_build do
s3.(ExAws.S3.put_object("imports", key, File.read!(file))) %{s3_url: s3_url} = Plausible.S3.import_presign_upload(site.id, file)
%{"filename" => Path.basename(file), "s3_url" => minio_url(minio, "imports", key)} [bucket, key] = String.split(URI.parse(s3_url).path, "/", parts: 2)
ExAws.request!(ExAws.S3.put_object(bucket, key, File.read!(file)))
%{"filename" => Path.basename(file), "s3_url" => s3_url}
else
%{"filename" => Path.basename(file), "local_path" => file}
end
end) end)
# run importer # run importer
date_range = CSVImporter.date_range(uploads) date_range = CSVImporter.date_range(uploads)
{:ok, job} = {:ok, _job} =
CSVImporter.new_import(site, user, CSVImporter.new_import(site, user,
start_date: date_range.first, start_date: date_range.first,
end_date: date_range.last, end_date: date_range.last,
uploads: uploads uploads: uploads,
storage: on_full_build(do: "s3", else: "local")
) )
job = Repo.reload!(job) assert %{success: 1} = Oban.drain_queue(queue: :analytics_imports, with_safety: false)
assert :ok = Plausible.Workers.ImportAnalytics.perform(job)
# validate import # validate import
assert %SiteImport{ assert %SiteImport{
@ -536,14 +545,27 @@ defmodule Plausible.Imported.CSVImporterTest do
end end
end end
defp minio_url(minio, bucket, key) do defp clean_buckets(_context) do
arch = to_string(:erlang.system_info(:system_architecture)) on_full_build do
clean_bucket = fn bucket ->
ExAws.S3.list_objects_v2(bucket)
|> ExAws.stream!()
|> Stream.each(fn objects ->
keys = objects |> List.wrap() |> Enum.map(& &1.key)
ExAws.request!(ExAws.S3.delete_all_objects(bucket, keys))
end)
|> Stream.run()
end
if String.contains?(arch, "darwin") do clean_bucket.(Plausible.S3.imports_bucket())
Path.join(["http://#{minio.ip_address}:9000", bucket, key]) clean_bucket.(Plausible.S3.exports_bucket())
on_exit(fn ->
clean_bucket.(Plausible.S3.imports_bucket())
clean_bucket.(Plausible.S3.exports_bucket())
end)
else else
port = minio |> MinioContainer.connection_opts() |> Keyword.fetch!(:port) :ok
Path.join(["http://172.17.0.1:#{port}", bucket, key])
end end
end end
end end

View File

@ -639,7 +639,7 @@ defmodule PlausibleWeb.SiteControllerTest do
end end
describe "GET /:website/settings/imports-exports" do describe "GET /:website/settings/imports-exports" do
setup [:create_user, :log_in, :create_site] setup [:create_user, :log_in, :create_site, :maybe_fake_minio]
test "renders empty imports list", %{conn: conn, site: site} do test "renders empty imports list", %{conn: conn, site: site} do
conn = get(conn, "/#{site.domain}/settings/imports-exports") conn = get(conn, "/#{site.domain}/settings/imports-exports")

View File

@ -232,4 +232,52 @@ defmodule Plausible.TestUtils do
def random_ip() do def random_ip() do
Enum.map_join(1..4, ".", fn _ -> Enum.random(1..254) end) Enum.map_join(1..4, ".", fn _ -> Enum.random(1..254) end)
end end
def minio_running? do
%{host: host, port: port} = ExAws.Config.new(:s3)
healthcheck_req = Finch.build(:head, "http://#{host}:#{port}")
case Finch.request(healthcheck_req, Plausible.Finch) do
{:ok, %Finch.Response{}} -> true
{:error, %Mint.TransportError{reason: :econnrefused}} -> false
end
end
def ensure_minio do
unless minio_running?() do
%{host: host, port: port} = ExAws.Config.new(:s3)
IO.puts("""
#{IO.ANSI.red()}
You are trying to run MinIO tests (--include minio) \
but nothing is running on #{"http://#{host}:#{port}"}.
#{IO.ANSI.blue()}Please make sure to start MinIO with `make minio`#{IO.ANSI.reset()}
""")
:init.stop(1)
end
end
if Mix.env() == :test do
def maybe_fake_minio(_context) do
unless minio_running?() do
%{port: port} = ExAws.Config.new(:s3)
bypass = Bypass.open(port: port)
Bypass.expect(bypass, fn conn ->
# we only need to fake HeadObject, all the other S3 requests are "controlled"
"HEAD" = conn.method
# we pretent the object is not found
Plug.Conn.send_resp(conn, 404, [])
end)
end
:ok
end
else
def maybe_fake_minio(_context) do
:ok
end
end
end end

View File

@ -13,6 +13,11 @@ end
Ecto.Adapters.SQL.Sandbox.mode(Plausible.Repo, :manual) Ecto.Adapters.SQL.Sandbox.mode(Plausible.Repo, :manual)
# warn about minio if it's included in tests but not running
if :minio in Keyword.fetch!(ExUnit.configuration(), :include) do
Plausible.TestUtils.ensure_minio()
end
if Mix.env() == :small_test do if Mix.env() == :small_test do
IO.puts("Test mode: SMALL") IO.puts("Test mode: SMALL")
ExUnit.configure(exclude: [:slow, :minio, :full_build_only]) ExUnit.configure(exclude: [:slow, :minio, :full_build_only])

View File

@ -145,6 +145,7 @@ defmodule Plausible.Workers.ImportAnalyticsTest do
Ecto.Adapters.SQL.Sandbox.unboxed_run(Plausible.Repo, fn -> Ecto.Adapters.SQL.Sandbox.unboxed_run(Plausible.Repo, fn ->
Repo.delete_all(Plausible.Site) Repo.delete_all(Plausible.Site)
Repo.delete_all(Plausible.Auth.User) Repo.delete_all(Plausible.Auth.User)
Repo.delete_all(Oban.Job)
end) end)
:ok :ok
@ -164,6 +165,7 @@ defmodule Plausible.Workers.ImportAnalyticsTest do
Ecto.Adapters.SQL.Sandbox.unboxed_run(Plausible.Repo, fn -> Ecto.Adapters.SQL.Sandbox.unboxed_run(Plausible.Repo, fn ->
user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1)) user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1))
site = insert(:site, members: [user]) site = insert(:site, members: [user])
site_id = site.id
import_opts = Keyword.put(import_opts, :listen?, true) import_opts = Keyword.put(import_opts, :listen?, true)
{:ok, job} = Plausible.Imported.NoopImporter.new_import(site, user, import_opts) {:ok, job} = Plausible.Imported.NoopImporter.new_import(site, user, import_opts)
@ -173,7 +175,8 @@ defmodule Plausible.Workers.ImportAnalyticsTest do
|> Repo.reload!() |> Repo.reload!()
|> ImportAnalytics.perform() |> ImportAnalytics.perform()
assert_receive {:notification, :analytics_imports_jobs, %{"complete" => ^import_id}} assert_receive {:notification, :analytics_imports_jobs,
%{"event" => "complete", "import_id" => ^import_id, "site_id" => ^site_id}}
end) end)
end end
@ -183,6 +186,7 @@ defmodule Plausible.Workers.ImportAnalyticsTest do
Ecto.Adapters.SQL.Sandbox.unboxed_run(Plausible.Repo, fn -> Ecto.Adapters.SQL.Sandbox.unboxed_run(Plausible.Repo, fn ->
user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1)) user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1))
site = insert(:site, members: [user]) site = insert(:site, members: [user])
site_id = site.id
import_opts = import_opts =
import_opts import_opts
@ -196,7 +200,8 @@ defmodule Plausible.Workers.ImportAnalyticsTest do
|> Repo.reload!() |> Repo.reload!()
|> ImportAnalytics.perform() |> ImportAnalytics.perform()
assert_receive {:notification, :analytics_imports_jobs, %{"fail" => ^import_id}} assert_receive {:notification, :analytics_imports_jobs,
%{"event" => "fail", "import_id" => ^import_id, "site_id" => ^site_id}}
end) end)
end end
@ -206,6 +211,7 @@ defmodule Plausible.Workers.ImportAnalyticsTest do
Ecto.Adapters.SQL.Sandbox.unboxed_run(Plausible.Repo, fn -> Ecto.Adapters.SQL.Sandbox.unboxed_run(Plausible.Repo, fn ->
user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1)) user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1))
site = insert(:site, members: [user]) site = insert(:site, members: [user])
site_id = site.id
import_opts = import_opts =
import_opts import_opts
@ -226,7 +232,12 @@ defmodule Plausible.Workers.ImportAnalyticsTest do
_ -> ImportAnalytics.import_fail_transient(site_import) _ -> ImportAnalytics.import_fail_transient(site_import)
end end
assert_receive {:notification, :analytics_imports_jobs, %{"transient_fail" => ^import_id}} assert_receive {:notification, :analytics_imports_jobs,
%{
"event" => "transient_fail",
"import_id" => ^import_id,
"site_id" => ^site_id
}}
end) end)
end end
@ -237,6 +248,7 @@ defmodule Plausible.Workers.ImportAnalyticsTest do
Ecto.Adapters.SQL.Sandbox.unboxed_run(Plausible.Repo, fn -> Ecto.Adapters.SQL.Sandbox.unboxed_run(Plausible.Repo, fn ->
user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1)) user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1))
site = insert(:site, members: [user]) site = insert(:site, members: [user])
site_id = site.id
{:ok, job} = Plausible.Imported.NoopImporter.new_import(site, user, import_opts) {:ok, job} = Plausible.Imported.NoopImporter.new_import(site, user, import_opts)
import_id = job.args[:import_id] import_id = job.args[:import_id]
@ -247,7 +259,8 @@ defmodule Plausible.Workers.ImportAnalyticsTest do
|> Repo.reload!() |> Repo.reload!()
|> ImportAnalytics.perform() |> ImportAnalytics.perform()
assert_receive {:notification, :analytics_imports_jobs, %{"complete" => ^import_id}} assert_receive {:notification, :analytics_imports_jobs,
%{"event" => "complete", "import_id" => ^import_id, "site_id" => ^site_id}}
end) end)
end end
end end