Reapply "Local CSV exports/imports and S3/UI updates (#3989)" (#3995) (#3996)

* Reapply "Local CSV exports/imports and S3/UI updates (#3989)" (#3995)

This reverts commit aee69e44c8.

* remove unused functions

* eh, that one was actually used

* ugh, they were both used

---------

Co-authored-by: ruslandoga <67764432+ruslandoga@users.noreply.github.com>
This commit is contained in:
Adrian Gruntkowski 2024-04-11 09:15:01 +02:00 committed by GitHub
parent 5163880968
commit c1c03b729c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 1429 additions and 468 deletions

View File

@ -19,17 +19,17 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
mix_env: ['test', 'small_test']
postgres_image: ['postgres:16']
test_experimental_reduced_joins: ['0']
mix_env: ["test", "small_test"]
postgres_image: ["postgres:16"]
test_experimental_reduced_joins: ["0"]
include:
- mix_env: 'test'
postgres_image: 'postgres:15'
test_experimental_reduced_joins: '0'
- mix_env: 'test'
postgres_image: 'postgres:16'
test_experimental_reduced_joins: '1'
- mix_env: "test"
postgres_image: "postgres:15"
test_experimental_reduced_joins: "0"
- mix_env: "test"
postgres_image: "postgres:16"
test_experimental_reduced_joins: "1"
env:
MIX_ENV: ${{ matrix.mix_env }}
@ -105,8 +105,13 @@ jobs:
- run: mix do ecto.create, ecto.migrate
- run: mix run -e "Tzdata.ReleaseUpdater.poll_for_update"
- run: make minio
if: env.MIX_ENV == 'test'
- run: mix test --include slow --include minio --max-failures 1 --warnings-as-errors
if: env.MIX_ENV == 'test'
env:
MINIO_HOST_FOR_CLICKHOUSE: "172.17.0.1"
- run: mix test --include slow --max-failures 1 --warnings-as-errors
if: env.MIX_ENV == 'small_test'

View File

@ -40,8 +40,10 @@ postgres-stop: ## Stop and remove the postgres container
minio: ## Start a transient container with a recent version of minio (s3)
docker run -d --rm -p 10000:10000 -p 10001:10001 --name plausible_minio minio/minio server /data --address ":10000" --console-address ":10001"
while ! docker exec plausible_minio mc alias set local http://localhost:10000 minioadmin minioadmin; do sleep 1; done
docker exec plausible_minio mc mb local/dev-exports
docker exec plausible_minio mc mb local/dev-imports
docker exec plausible_minio sh -c 'mc mb local/dev-exports && mc ilm add --expiry-days 7 local/dev-exports'
docker exec plausible_minio sh -c 'mc mb local/dev-imports && mc ilm add --expiry-days 7 local/dev-imports'
docker exec plausible_minio sh -c 'mc mb local/test-exports && mc ilm add --expiry-days 7 local/test-exports'
docker exec plausible_minio sh -c 'mc mb local/test-imports && mc ilm add --expiry-days 7 local/test-imports'
minio-stop: ## Stop and remove the minio container
docker stop plausible_minio

View File

@ -296,7 +296,8 @@ config :plausible,
is_selfhost: is_selfhost,
custom_script_name: custom_script_name,
log_failed_login_attempts: log_failed_login_attempts,
license_key: license_key
license_key: license_key,
persistent_cache_dir: persistent_cache_dir
config :plausible, :selfhost,
enable_email_verification: enable_email_verification,
@ -537,10 +538,10 @@ base_queues = [
site_setup_emails: 1,
clean_invitations: 1,
analytics_imports: 1,
analytics_exports: 1,
notify_exported_analytics: 1,
domain_change_transition: 1,
check_accept_traffic_until: 1,
# NOTE: maybe move s3_csv_export to cloud_queues?
s3_csv_export: 1
check_accept_traffic_until: 1
]
cloud_queues = [

View File

@ -3,85 +3,284 @@ defmodule Plausible.Exports do
Contains functions to export data for events and sessions as Zip archives.
"""
require Plausible
use Plausible
import Ecto.Query
@doc "Schedules CSV export job to S3 storage"
@spec schedule_s3_export(pos_integer, String.t()) :: {:ok, Oban.Job.t()} | {:error, :no_data}
def schedule_s3_export(site_id, email_to) do
with :ok <- ensure_has_data(site_id) do
args = %{
"storage" => "s3",
"site_id" => site_id,
"email_to" => email_to,
"s3_bucket" => Plausible.S3.exports_bucket(),
"s3_path" => s3_export_key(site_id)
}
{:ok, Oban.insert!(Plausible.Workers.ExportAnalytics.new(args))}
end
end
@doc "Schedules CSV export job to local storage"
@spec schedule_local_export(pos_integer, String.t()) :: {:ok, Oban.Job.t()} | {:error, :no_data}
def schedule_local_export(site_id, email_to) do
with :ok <- ensure_has_data(site_id) do
args = %{
"storage" => "local",
"site_id" => site_id,
"email_to" => email_to,
"local_path" => local_export_file(site_id)
}
{:ok, Oban.insert!(Plausible.Workers.ExportAnalytics.new(args))}
end
end
@spec ensure_has_data(pos_integer) :: :ok | {:error, :no_data}
defp ensure_has_data(site_id) do
# SELECT true FROM "events_v2" AS e0 WHERE (e0."site_id" = ^site_id) LIMIT 1
has_data? = Plausible.ClickhouseRepo.exists?(from "events_v2", where: [site_id: ^site_id])
if has_data?, do: :ok, else: {:error, :no_data}
end
@doc "Gets last CSV export job for a site"
@spec get_last_export_job(pos_integer) :: Oban.Job.t() | nil
def get_last_export_job(site_id) do
Plausible.Repo.one(
from e in Plausible.Workers.ExportAnalytics.base_query(site_id),
order_by: [desc: :id],
limit: 1
)
end
@doc "Subscribes to CSV export job notifications"
def oban_listen, do: Oban.Notifier.listen(__MODULE__)
@doc false
def oban_notify(site_id), do: Oban.Notifier.notify(__MODULE__, %{"site_id" => site_id})
@doc """
Renders filename for the Zip archive containing the exported CSV files.
Renders export archive filename.
Examples:
iex> archive_filename("plausible.io", ~D[2021-01-01], ~D[2024-12-31])
"plausible_io_20210101_20241231.zip"
iex> archive_filename("Bücher.example", ~D[2021-01-01], ~D[2024-12-31])
"Bücher_example_20210101_20241231.zip"
iex> archive_filename("plausible.io", _created_on = ~D[2024-12-31])
"plausible_io_20241231.zip"
"""
def archive_filename(domain, min_date, max_date) do
name =
Enum.join(
[
String.replace(domain, ".", "_"),
Calendar.strftime(min_date, "%Y%m%d"),
Calendar.strftime(max_date, "%Y%m%d")
],
"_"
def archive_filename(domain, %Date{} = created_on) do
String.replace(domain, ".", "_") <> "_" <> Calendar.strftime(created_on, "%Y%m%d") <> ".zip"
end
@doc ~S"""
Safely renders content disposition for an arbitrary export filename.
Examples:
iex> content_disposition("plausible_io_20241231.zip")
"attachment; filename=\"plausible_io_20241231.zip\""
iex> content_disposition("📊.zip")
"attachment; filename=\"plausible-export.zip\"; filename*=utf-8''%F0%9F%93%8A.zip"
"""
def content_disposition(filename) do
encoded_filename = URI.encode(filename)
if encoded_filename == filename do
~s[attachment; filename="#{filename}"]
else
~s[attachment; filename="plausible-export.zip"; filename*=utf-8''#{encoded_filename}]
end
end
@type export :: %{
path: Path.t(),
name: String.t(),
expires_at: DateTime.t() | nil,
download_link: String.t(),
size: pos_integer
}
@doc "Gets local export for a site"
@spec get_local_export(pos_integer, String.t(), String.t()) :: export | nil
def get_local_export(site_id, domain, timezone) do
path = local_export_file(site_id)
if File.exists?(path) do
%File.Stat{size: size, mtime: mtime} = File.stat!(path, time: :posix)
created_at = DateTime.from_unix!(mtime)
created_on_in_site_tz = Plausible.Timezones.to_date_in_timezone(created_at, timezone)
name = archive_filename(domain, created_on_in_site_tz)
download_link =
PlausibleWeb.Router.Helpers.site_path(
PlausibleWeb.Endpoint,
:download_local_export,
domain
)
%{path: path, name: name, expires_at: nil, download_link: download_link, size: size}
end
end
@doc "Deletes local export for a site"
@spec delete_local_export(pos_integer) :: :ok
def delete_local_export(site_id) do
file = local_export_file(site_id)
if File.exists?(file) do
File.rm!(file)
end
:ok
end
@spec local_export_file(pos_integer) :: Path.t()
defp local_export_file(site_id) do
persistent_cache_dir = Application.get_env(:plausible, :persistent_cache_dir)
Path.join([
persistent_cache_dir || System.tmp_dir!(),
"plausible-exports",
Integer.to_string(site_id)
])
end
@doc "Gets S3 export for a site"
@spec get_s3_export(pos_integer) :: export | nil
def get_s3_export(site_id) do
path = s3_export_key(site_id)
bucket = Plausible.S3.exports_bucket()
head_object_op = ExAws.S3.head_object(bucket, path)
case ExAws.request(head_object_op) do
{:error, {:http_error, 404, _response}} ->
nil
{:ok, %{status_code: 200, headers: headers}} ->
"attachment; filename=" <> filename = :proplists.get_value("content-disposition", headers)
name = String.trim(filename, "\"")
size = :proplists.get_value("content-length", headers, nil)
expires_at =
if x_amz_expiration = :proplists.get_value("x-amz-expiration", headers, nil) do
["expiry-date=", expiry_date, ", rule-id=", _rule_id] =
String.split(x_amz_expiration, "\"", trim: true)
Timex.parse!(expiry_date, "{RFC1123}")
end
%{
path: path,
name: name,
expires_at: expires_at,
download_link: Plausible.S3.download_url(bucket, path),
size: String.to_integer(size)
}
end
end
@doc "Deletes S3 export for a site"
@spec delete_s3_export(pos_integer) :: :ok
def delete_s3_export(site_id) do
if export = get_s3_export(site_id) do
exports_bucket = Plausible.S3.exports_bucket()
delete_op = ExAws.S3.delete_object(exports_bucket, export.path)
ExAws.request!(delete_op)
end
:ok
end
defp s3_export_key(site_id), do: Integer.to_string(site_id)
@doc "Returns the date range for the site's events data in site's timezone or `nil` if there is no data"
@spec date_range(non_neg_integer, String.t()) :: Date.Range.t() | nil
def date_range(site_id, timezone) do
[%Date{} = start_date, %Date{} = end_date] =
Plausible.ClickhouseRepo.one(
from e in "events_v2",
where: [site_id: ^site_id],
select: [
fragment("toDate(min(?),?)", e.timestamp, ^timezone),
fragment("toDate(max(?),?)", e.timestamp, ^timezone)
]
)
name <> ".zip"
unless end_date == ~D[1970-01-01] do
Date.range(start_date, end_date)
end
end
@doc """
Builds Ecto queries to export data from `events_v2` and `sessions_v2`
tables into the format of `imported_*` tables for a website.
tables into the format of `imported_*` tables for a website.
"""
@spec export_queries(pos_integer, extname: String.t(), date_range: Date.Range.t()) ::
@spec export_queries(pos_integer,
extname: String.t(),
date_range: Date.Range.t(),
timezone: String.t()
) ::
%{String.t() => Ecto.Query.t()}
def export_queries(site_id, opts \\ []) do
extname = opts[:extname] || ".csv"
date_range = opts[:date_range]
timezone = opts[:timezone] || "UTC"
filename = fn table ->
name =
if date_range do
first_date = Timex.format!(date_range.first, "{YYYY}{0M}{0D}")
last_date = Timex.format!(date_range.last, "{YYYY}{0M}{0D}")
"#{table}_#{first_date}_#{last_date}"
else
table
end
suffix =
if date_range do
first_date = Timex.format!(date_range.first, "{YYYY}{0M}{0D}")
last_date = Timex.format!(date_range.last, "{YYYY}{0M}{0D}")
"_#{first_date}_#{last_date}" <> extname
else
extname
end
name <> extname
end
filename = fn name -> name <> suffix end
%{
filename.("imported_visitors") => export_visitors_q(site_id),
filename.("imported_sources") => export_sources_q(site_id),
filename.("imported_visitors") => export_visitors_q(site_id, timezone, date_range),
filename.("imported_sources") => export_sources_q(site_id, timezone, date_range),
# NOTE: this query can result in `MEMORY_LIMIT_EXCEEDED` error
filename.("imported_pages") => export_pages_q(site_id),
filename.("imported_entry_pages") => export_entry_pages_q(site_id),
filename.("imported_exit_pages") => export_exit_pages_q(site_id),
filename.("imported_locations") => export_locations_q(site_id),
filename.("imported_devices") => export_devices_q(site_id),
filename.("imported_browsers") => export_browsers_q(site_id),
filename.("imported_operating_systems") => export_operating_systems_q(site_id)
filename.("imported_pages") => export_pages_q(site_id, timezone, date_range),
filename.("imported_entry_pages") => export_entry_pages_q(site_id, timezone, date_range),
filename.("imported_exit_pages") => export_exit_pages_q(site_id, timezone, date_range),
filename.("imported_locations") => export_locations_q(site_id, timezone, date_range),
filename.("imported_devices") => export_devices_q(site_id, timezone, date_range),
filename.("imported_browsers") => export_browsers_q(site_id, timezone, date_range),
filename.("imported_operating_systems") =>
export_operating_systems_q(site_id, timezone, date_range)
}
end
Plausible.on_full_build do
defp sampled(table) do
Plausible.Stats.Sampling.add_query_hint(from(table))
on_full_build do
defp sampled(table, date_range) do
from(table)
|> Plausible.Stats.Sampling.add_query_hint()
|> limit_date_range(date_range)
end
else
defp sampled(table) do
table
defp sampled(table, date_range) do
limit_date_range(table, date_range)
end
end
defmacrop date(timestamp) do
defp limit_date_range(query, nil), do: query
defp limit_date_range(query, date_range) do
from t in query,
where:
selected_as(:date) >= ^date_range.first and
selected_as(:date) <= ^date_range.last
end
defmacrop date(timestamp, timezone) do
quote do
selected_as(fragment("toDate(?)", unquote(timestamp)), :date)
selected_as(
fragment("toDate(?,?)", unquote(timestamp), unquote(timezone)),
:date
)
end
end
@ -127,13 +326,12 @@ defmodule Plausible.Exports do
end
end
@spec export_visitors_q(pos_integer) :: Ecto.Query.t()
def export_visitors_q(site_id) do
from s in sampled("sessions_v2"),
defp export_visitors_q(site_id, timezone, date_range) do
from s in sampled("sessions_v2", date_range),
where: s.site_id == ^site_id,
group_by: selected_as(:date),
select: [
date(s.start),
date(s.start, ^timezone),
visitors(s),
pageviews(s),
bounces(s),
@ -142,9 +340,8 @@ defmodule Plausible.Exports do
]
end
@spec export_sources_q(pos_integer) :: Ecto.Query.t()
def export_sources_q(site_id) do
from s in sampled("sessions_v2"),
defp export_sources_q(site_id, timezone, date_range) do
from s in sampled("sessions_v2", date_range),
where: s.site_id == ^site_id,
group_by: [
selected_as(:date),
@ -158,7 +355,7 @@ defmodule Plausible.Exports do
],
order_by: selected_as(:date),
select: [
date(s.start),
date(s.start, ^timezone),
selected_as(s.referrer_source, :source),
s.referrer,
s.utm_source,
@ -174,32 +371,40 @@ defmodule Plausible.Exports do
]
end
@spec export_pages_q(pos_integer) :: Ecto.Query.t()
def export_pages_q(site_id) do
defp export_pages_q(site_id, timezone, date_range) do
window_q =
from e in sampled("events_v2"),
from e in sampled("events_v2", nil),
where: e.site_id == ^site_id,
where: [name: "pageview"],
select: %{
timestamp: e.timestamp,
timestamp: selected_as(fragment("toTimeZone(?,?)", e.timestamp, ^timezone), :timestamp),
next_timestamp:
over(fragment("leadInFrame(?)", e.timestamp),
over(fragment("leadInFrame(toTimeZone(?,?))", e.timestamp, ^timezone),
partition_by: e.session_id,
order_by: e.timestamp,
frame: fragment("ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING")
),
pathname: e.pathname,
hostname: e.hostname,
name: e.name,
user_id: e.user_id,
session_id: e.session_id,
_sample_factor: fragment("_sample_factor")
}
window_q =
if date_range do
from e in window_q,
where: selected_as(:timestamp) >= ^date_range.first,
where: fragment("toDate(?)", selected_as(:timestamp)) <= ^date_range.last
else
window_q
end
from e in subquery(window_q),
group_by: [selected_as(:date), e.pathname],
order_by: selected_as(:date),
select: [
date(e.timestamp),
selected_as(fragment("toDate(?)", e.timestamp), :date),
selected_as(fragment("any(?)", e.hostname), :hostname),
selected_as(e.pathname, :page),
selected_as(
@ -207,11 +412,7 @@ defmodule Plausible.Exports do
:visits
),
visitors(e),
selected_as(
fragment("toUInt64(round(countIf(?='pageview')*any(_sample_factor)))", e.name),
:pageviews
),
# NOTE: are exits pageviews or any events?
selected_as(fragment("toUInt64(round(count()*any(_sample_factor)))"), :pageviews),
selected_as(
fragment("toUInt64(round(countIf(?=0)*any(_sample_factor)))", e.next_timestamp),
:exits
@ -223,14 +424,13 @@ defmodule Plausible.Exports do
]
end
@spec export_entry_pages_q(pos_integer) :: Ecto.Query.t()
def export_entry_pages_q(site_id) do
from s in sampled("sessions_v2"),
defp export_entry_pages_q(site_id, timezone, date_range) do
from s in sampled("sessions_v2", date_range),
where: s.site_id == ^site_id,
group_by: [selected_as(:date), s.entry_page],
order_by: selected_as(:date),
select: [
date(s.start),
date(s.start, ^timezone),
s.entry_page,
visitors(s),
selected_as(
@ -243,14 +443,13 @@ defmodule Plausible.Exports do
]
end
@spec export_exit_pages_q(pos_integer) :: Ecto.Query.t()
def export_exit_pages_q(site_id) do
from s in sampled("sessions_v2"),
defp export_exit_pages_q(site_id, timezone, date_range) do
from s in sampled("sessions_v2", date_range),
where: s.site_id == ^site_id,
group_by: [selected_as(:date), s.exit_page],
order_by: selected_as(:date),
select: [
date(s.start),
date(s.start, ^timezone),
s.exit_page,
visitors(s),
visit_duration(s),
@ -263,15 +462,14 @@ defmodule Plausible.Exports do
]
end
@spec export_locations_q(pos_integer) :: Ecto.Query.t()
def export_locations_q(site_id) do
from s in sampled("sessions_v2"),
defp export_locations_q(site_id, timezone, date_range) do
from s in sampled("sessions_v2", date_range),
where: s.site_id == ^site_id,
where: s.city_geoname_id != 0 and s.country_code != "\0\0" and s.country_code != "ZZ",
group_by: [selected_as(:date), s.country_code, selected_as(:region), s.city_geoname_id],
order_by: selected_as(:date),
select: [
date(s.start),
date(s.start, ^timezone),
selected_as(s.country_code, :country),
selected_as(s.subdivision1_code, :region),
selected_as(s.city_geoname_id, :city),
@ -283,14 +481,13 @@ defmodule Plausible.Exports do
]
end
@spec export_devices_q(pos_integer) :: Ecto.Query.t()
def export_devices_q(site_id) do
from s in sampled("sessions_v2"),
defp export_devices_q(site_id, timezone, date_range) do
from s in sampled("sessions_v2", date_range),
where: s.site_id == ^site_id,
group_by: [selected_as(:date), s.screen_size],
order_by: selected_as(:date),
select: [
date(s.start),
date(s.start, ^timezone),
selected_as(s.screen_size, :device),
visitors(s),
visits(s),
@ -300,14 +497,13 @@ defmodule Plausible.Exports do
]
end
@spec export_browsers_q(pos_integer) :: Ecto.Query.t()
def export_browsers_q(site_id) do
from s in sampled("sessions_v2"),
defp export_browsers_q(site_id, timezone, date_range) do
from s in sampled("sessions_v2", date_range),
where: s.site_id == ^site_id,
group_by: [selected_as(:date), s.browser, s.browser_version],
order_by: selected_as(:date),
select: [
date(s.start),
date(s.start, ^timezone),
s.browser,
s.browser_version,
visitors(s),
@ -318,14 +514,13 @@ defmodule Plausible.Exports do
]
end
@spec export_operating_systems_q(pos_integer) :: Ecto.Query.t()
def export_operating_systems_q(site_id) do
from s in sampled("sessions_v2"),
defp export_operating_systems_q(site_id, timezone, date_range) do
from s in sampled("sessions_v2", date_range),
where: s.site_id == ^site_id,
group_by: [selected_as(:date), s.operating_system, s.operating_system_version],
order_by: selected_as(:date),
select: [
date(s.start),
date(s.start, ^timezone),
s.operating_system,
s.operating_system_version,
visitors(s),

View File

@ -1,6 +1,7 @@
defmodule Plausible.Imported.CSVImporter do
@moduledoc """
CSV importer from S3 that uses ClickHouse [s3 table function.](https://clickhouse.com/docs/en/sql-reference/table-functions/s3)
CSV importer from either S3 for which it uses ClickHouse [s3 table function](https://clickhouse.com/docs/en/sql-reference/table-functions/s3)
or from local storage for which it uses [input function.](https://clickhouse.com/docs/en/sql-reference/table-functions/input)
"""
use Plausible.Imported.Importer
@ -16,10 +17,45 @@ defmodule Plausible.Imported.CSVImporter do
def email_template(), do: "google_analytics_import.html"
@impl true
def parse_args(%{"uploads" => uploads}), do: [uploads: uploads]
def parse_args(%{"uploads" => uploads, "storage" => storage}) do
[uploads: uploads, storage: storage]
end
@impl true
def import_data(site_import, opts) do
storage = Keyword.fetch!(opts, :storage)
uploads = Keyword.fetch!(opts, :uploads)
if storage == "local" do
# we need to remove the imported files from local storage
# after the importer has completed or ran out of attempts
paths = Enum.map(uploads, &Map.fetch!(&1, "local_path"))
Oban.insert!(
Plausible.Workers.LocalImportAnalyticsCleaner.new(
%{"import_id" => site_import.id, "paths" => paths},
schedule_in: _one_hour = 3600
)
)
end
{:ok, ch} =
Plausible.IngestRepo.config()
|> Keyword.replace!(:pool_size, 1)
|> Ch.start_link()
case storage do
"s3" -> import_s3(ch, site_import, uploads)
"local" -> import_local(ch, site_import, uploads)
end
rescue
# we are cancelling on any argument or ClickHouse errors, assuming they are permanent
e in [ArgumentError, Ch.Error] ->
# see Plausible.Imported.Importer for more details on transient vs permanent errors
{:error, Exception.message(e)}
end
defp import_s3(ch, site_import, uploads) do
%{
id: import_id,
site_id: site_id,
@ -27,34 +63,20 @@ defmodule Plausible.Imported.CSVImporter do
end_date: end_date
} = site_import
uploads = Keyword.fetch!(opts, :uploads)
%{access_key_id: s3_access_key_id, secret_access_key: s3_secret_access_key} =
Plausible.S3.import_clickhouse_credentials()
{:ok, ch} =
Plausible.IngestRepo.config()
|> Keyword.replace!(:pool_size, 1)
|> Ch.start_link()
Enum.each(uploads, fn upload ->
%{"filename" => filename, "s3_url" => s3_url} = upload
{table, _, _} = parse_filename!(filename)
s3_structure = input_structure!(table)
s3_structure_cols_expr =
s3_structure
|> String.split(",", trim: true)
|> Enum.map_join(", ", fn kv ->
[col, _type] = String.split(kv)
col
end)
s3_columns = input_columns!(table)
statement =
"""
INSERT INTO {table:Identifier}(site_id, #{s3_structure_cols_expr}, import_id) \
SELECT {site_id:UInt64} AS site_id, *, {import_id:UInt64} AS import_id \
INSERT INTO {table:Identifier}(site_id,import_id,#{s3_columns}) \
SELECT {site_id:UInt64}, {import_id:UInt64}, * \
FROM s3({s3_url:String},{s3_access_key_id:String},{s3_secret_access_key:String},{s3_format:String},{s3_structure:String}) \
WHERE date >= {start_date:Date} AND date <= {end_date:Date}\
"""
@ -75,10 +97,53 @@ defmodule Plausible.Imported.CSVImporter do
Ch.query!(ch, statement, params, timeout: :infinity)
end)
rescue
# we are cancelling on any argument or ClickHouse errors
e in [ArgumentError, Ch.Error] ->
{:error, Exception.message(e)}
end
defp import_local(ch, site_import, uploads) do
%{
id: import_id,
site_id: site_id,
start_date: start_date,
end_date: end_date
} = site_import
DBConnection.run(
ch,
fn conn ->
Enum.each(uploads, fn upload ->
%{"filename" => filename, "local_path" => local_path} = upload
{table, _, _} = parse_filename!(filename)
input_structure = input_structure!(table)
input_columns = input_columns!(table)
statement =
"""
INSERT INTO {table:Identifier}(site_id,import_id,#{input_columns}) \
SELECT {site_id:UInt64}, {import_id:UInt64}, * \
FROM input({input_structure:String}) \
WHERE date >= {start_date:Date} AND date <= {end_date:Date} \
FORMAT CSVWithNames\
"""
params = %{
"table" => table,
"site_id" => site_id,
"import_id" => import_id,
"input_structure" => input_structure,
"start_date" => start_date,
"end_date" => end_date
}
# we are reading in 512KB chunks for better performance
# the default would've been line by line (not great for a CSV)
File.stream!(local_path, 512_000)
|> Stream.into(Ch.stream(conn, statement, params))
|> Stream.run()
end)
end,
timeout: :infinity
)
end
input_structures = %{
@ -109,7 +174,7 @@ defmodule Plausible.Imported.CSVImporter do
iex> date_range([
...> %{"filename" => "imported_devices_20190101_20210101.csv"},
...> "imported_pages_20200101_20220101.csv"
...> "pages_20200101_20220101.csv"
...> ])
Date.range(~D[2019-01-01], ~D[2022-01-01])
@ -165,6 +230,9 @@ defmodule Plausible.Imported.CSVImporter do
iex> parse_filename!("imported_devices_00010101_20250101.csv")
{"imported_devices", ~D[0001-01-01], ~D[2025-01-01]}
iex> parse_filename!("devices_00010101_20250101.csv")
{"imported_devices", ~D[0001-01-01], ~D[2025-01-01]}
"""
@spec parse_filename!(String.t()) ::
{table :: String.t(), start_date :: Date.t(), end_date :: Date.t()}
@ -173,11 +241,29 @@ defmodule Plausible.Imported.CSVImporter do
for {table, input_structure} <- input_structures do
defp input_structure!(unquote(table)), do: unquote(input_structure)
input_columns =
input_structure
|> String.split(",", trim: true)
|> Enum.map_join(",", fn kv ->
[col, _type] = String.split(kv)
String.trim(col)
end)
defp input_columns!(unquote(table)), do: unquote(input_columns)
def parse_filename!(
<<unquote(table)::bytes, ?_, start_date::8-bytes, ?_, end_date::8-bytes, ".csv">>
) do
{unquote(table), parse_date!(start_date), parse_date!(end_date)}
end
"imported_" <> name = table
def parse_filename!(
<<unquote(name)::bytes, ?_, start_date::8-bytes, ?_, end_date::8-bytes, ".csv">>
) do
{unquote(table), parse_date!(start_date), parse_date!(end_date)}
end
end
def parse_filename!(_filename) do
@ -195,6 +281,9 @@ defmodule Plausible.Imported.CSVImporter do
iex> valid_filename?("imported_devices_00010101_20250101.csv")
true
iex> valid_filename?("devices_00010101_20250101.csv")
true
"""
@spec valid_filename?(String.t()) :: boolean
def valid_filename?(filename) do
@ -220,10 +309,35 @@ defmodule Plausible.Imported.CSVImporter do
iex> extract_table("imported_devices_00010101_20250101.csv")
"imported_devices"
iex> extract_table("devices_00010101_20250101.csv")
"imported_devices"
"""
@spec extract_table(String.t()) :: String.t()
def extract_table(filename) do
{table, _start_date, _end_date} = parse_filename!(filename)
table
end
@doc """
Returns local directory for CSV imports storage.
Builds upon `$PERSISTENT_CACHE_DIR` (if set) and falls back to /tmp
Examples:
iex> local_dir = local_dir(_site_id = 37)
iex> String.ends_with?(local_dir, "/plausible-imports/37")
true
"""
def local_dir(site_id) do
persistent_cache_dir = Application.get_env(:plausible, :persistent_cache_dir)
Path.join([
persistent_cache_dir || System.tmp_dir!(),
"plausible-imports",
Integer.to_string(site_id)
])
end
end

View File

@ -73,13 +73,13 @@ defmodule Plausible.Imported.Importer do
import_id = job.args[:import_id]
receive do
{:notification, :analytics_imports_jobs, %{"complete" => ^import_id}} ->
{:notification, :analytics_imports_jobs, %{"event" => "complete", "import_id" => ^import_id}} ->
IO.puts("Job completed")
{:notification, :analytics_imports_jobs, %{"transient_fail" => ^import_id}} ->
{:notification, :analytics_imports_jobs, %{"event" => "transient_fail", "import_id" => ^import_id}} ->
IO.puts("Job failed transiently")
{:notification, :analytics_imports_jobs, %{"fail" => ^import_id}} ->
{:notification, :analytics_imports_jobs, %{"event" => "fail", "import_id" => ^import_id}} ->
IO.puts("Job failed permanently")
after
15_000 ->
@ -203,7 +203,11 @@ defmodule Plausible.Imported.Importer do
@doc false
def notify(site_import, event) do
Oban.Notifier.notify(Oban, @oban_channel, %{event => site_import.id})
Oban.Notifier.notify(Oban, @oban_channel, %{
"event" => event,
"import_id" => site_import.id,
"site_id" => site_import.site_id
})
end
@doc """

View File

@ -43,31 +43,27 @@ defmodule Plausible.S3 do
Example:
iex> %{
...> s3_url: "http://localhost:10000/test-imports/123/imported_browsers.csv",
...> presigned_url: "http://localhost:10000/test-imports/123/imported_browsers.csv?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=minioadmin" <> _
...> } = import_presign_upload(_site_id = 123, _filename = "imported_browsers.csv")
iex> upload = import_presign_upload(_site_id = 123, _filename = "imported_browsers.csv")
iex> true = String.ends_with?(upload.s3_url, "/test-imports/123/imported_browsers.csv")
iex> true = String.contains?(upload.presigned_url, "/test-imports/123/imported_browsers.csv?X-Amz-Algorithm=AWS4-HMAC-SHA256&")
"""
def import_presign_upload(site_id, filename) do
config = ExAws.Config.new(:s3)
s3_path = Path.join(to_string(site_id), filename)
s3_path = Path.join(Integer.to_string(site_id), filename)
bucket = imports_bucket()
{:ok, presigned_url} = ExAws.S3.presigned_url(config, :put, bucket, s3_path)
%{s3_url: extract_s3_url(presigned_url), presigned_url: presigned_url}
end
# to make ClickHouse see MinIO in dev and test envs we replace
# the host in the S3 URL with whatever's set in S3_CLICKHOUSE_HOST env var
# the host in the S3 URL with host.docker.internal or whatever's set in $MINIO_HOST_FOR_CLICKHOUSE
if Mix.env() in [:dev, :test, :small_dev, :small_test] do
defp extract_s3_url(presigned_url) do
[s3_url, _] = String.split(presigned_url, "?")
if ch_host = System.get_env("S3_CLICKHOUSE_HOST") do
URI.to_string(%URI{URI.parse(s3_url) | host: ch_host})
else
s3_url
end
default_ch_host = unless System.get_env("CI"), do: "host.docker.internal"
ch_host = System.get_env("MINIO_HOST_FOR_CLICKHOUSE", default_ch_host)
URI.to_string(%URI{URI.parse(s3_url) | host: ch_host})
end
else
defp extract_s3_url(presigned_url) do
@ -79,36 +75,37 @@ defmodule Plausible.S3 do
@doc """
Chunks and uploads Zip archive to the provided S3 destination.
In the current implementation the bucket always goes into the path component.
"""
@spec export_upload_multipart(Enumerable.t(), String.t(), Path.t(), String.t()) :: :ok
def export_upload_multipart(stream, s3_bucket, s3_path, filename) do
# 5 MiB is the smallest chunk size AWS S3 supports
chunk_into_parts(stream, 5 * 1024 * 1024)
|> ExAws.S3.upload(s3_bucket, s3_path,
content_disposition: Plausible.Exports.content_disposition(filename),
content_type: "application/zip",
timeout: :infinity
)
|> ExAws.request!()
:ok
end
@doc """
Returns a presigned URL to download the exported Zip archive from S3.
The URL expires in 24 hours.
In the current implementation the bucket always goes into the path component.
"""
@spec export_upload_multipart(Enumerable.t(), String.t(), Path.t(), String.t(), keyword) ::
:uri_string.uri_string()
def export_upload_multipart(stream, s3_bucket, s3_path, filename, config_overrides \\ []) do
@spec download_url(String.t(), Path.t()) :: :uri_string.uri_string()
def download_url(s3_bucket, s3_path) do
config = ExAws.Config.new(:s3)
encoded_filename = URI.encode(filename)
disposition = ~s[attachment; filename="#{encoded_filename}"]
disposition =
if encoded_filename != filename do
disposition <> "; filename*=utf-8''#{encoded_filename}"
else
disposition
end
# 5 MiB is the smallest chunk size AWS S3 supports
chunk_into_parts(stream, 5 * 1024 * 1024)
|> ExAws.S3.upload(s3_bucket, s3_path,
content_disposition: disposition,
content_type: "application/zip"
)
|> ExAws.request!(config_overrides)
# ex_aws_s3 doesn't allow expires_in longer than one week
one_week = 60 * 60 * 24 * 7
{:ok, download_url} =
ExAws.S3.presigned_url(config, :get, s3_bucket, s3_path, expires_in: _24hr = 86_400)
ExAws.S3.presigned_url(config, :get, s3_bucket, s3_path, expires_in: one_week)
download_url
end

View File

@ -23,14 +23,6 @@ defmodule Plausible.Sites do
Repo.get_by!(Site, domain: domain)
end
def get_domain!(site_id) do
Plausible.Repo.one!(
from s in Plausible.Site,
where: [id: ^site_id],
select: s.domain
)
end
@spec toggle_pin(Auth.User.t(), Site.t()) ::
{:ok, Site.UserPreference.t()} | {:error, :too_many_pins}
def toggle_pin(user, site) do

View File

@ -1,6 +1,8 @@
defmodule PlausibleWeb.SiteController do
use PlausibleWeb, :controller
use Plausible.Repo
use Plausible
alias Plausible.Sites
alias Plausible.Billing.Quota
@ -710,21 +712,27 @@ defmodule PlausibleWeb.SiteController do
|> redirect(external: Routes.site_path(conn, :settings_integrations, site.domain))
end
def csv_export(conn, _params) do
%{site: site, current_user: user} = conn.assigns
on_full_build do
# exported archives are downloaded from object storage
else
alias Plausible.Exports
Oban.insert!(
Plausible.Workers.ExportCSV.new(%{
"site_id" => site.id,
"email_to" => user.email,
"s3_bucket" => Plausible.S3.exports_bucket(),
"s3_path" => "Plausible-#{site.id}.zip"
})
)
def download_local_export(conn, _params) do
%{id: site_id, domain: domain, timezone: timezone} = conn.assigns.site
conn
|> put_flash(:success, "SCHEDULED. WAIT FOR MAIL")
|> redirect(to: Routes.site_path(conn, :settings_imports_exports, site.domain))
if local_export = Exports.get_local_export(site_id, domain, timezone) do
%{path: export_path, name: name} = local_export
conn
|> put_resp_content_type("application/zip")
|> put_resp_header("content-disposition", Exports.content_disposition(name))
|> send_file(200, export_path)
else
conn
|> put_flash(:error, "Export not found")
|> redirect(external: Routes.site_path(conn, :settings_imports_exports, domain))
end
end
end
def csv_import(conn, _params) do

View File

@ -1,4 +1,5 @@
defmodule PlausibleWeb.Email do
use Plausible
use Bamboo.Phoenix, view: PlausibleWeb.EmailView
import Bamboo.PostmarkHelper
@ -346,6 +347,48 @@ defmodule PlausibleWeb.Email do
})
end
def export_success(user, site, download_url, expires_at) do
subject =
on_full_build do
"Your Plausible Analytics export is now ready for download"
else
"Your export is now ready for download"
end
expires_in =
if expires_at do
Timex.Format.DateTime.Formatters.Relative.format!(
expires_at,
"{relative}"
)
end
priority_email()
|> to(user)
|> tag("export-success")
|> subject(subject)
|> render("export_success.html",
user: user,
site: site,
download_url: download_url,
expires_in: expires_in
)
end
def export_failure(user, site) do
subject =
on_full_build do
"Your Plausible Analytics export has failed"
else
"Your export has failed"
end
priority_email()
|> to(user)
|> subject(subject)
|> render("export_failure.html", user: user, site: site)
end
def error_report(reported_by, trace_id, feedback) do
Map.new()
|> Map.put(:layout, nil)

View File

@ -0,0 +1,281 @@
defmodule PlausibleWeb.Live.CSVExport do
@moduledoc """
LiveView allowing scheduling, watching, downloading, and deleting S3 and local exports.
"""
use PlausibleWeb, :live_view
use Phoenix.HTML
alias PlausibleWeb.Components.Generic
alias Plausible.Exports
# :not_mounted_at_router ensures we have already done auth checks in the controller
# if this liveview becomes available from the router, please make sure
# to check that current_user_role is allowed to manage site exports
@impl true
def mount(:not_mounted_at_router, session, socket) do
%{
"storage" => storage,
"site_id" => site_id,
"email_to" => email_to
} = session
socket =
socket
|> assign(site_id: site_id, email_to: email_to, storage: storage)
|> assign_new(:site, fn -> Plausible.Repo.get!(Plausible.Site, site_id) end)
|> fetch_export()
if connected?(socket) do
Exports.oban_listen()
end
{:ok, socket}
end
defp fetch_export(socket) do
%{storage: storage, site_id: site_id} = socket.assigns
get_export =
case storage do
"s3" ->
&Exports.get_s3_export/1
"local" ->
%{domain: domain, timezone: timezone} = socket.assigns.site
&Exports.get_local_export(&1, domain, timezone)
end
socket = assign(socket, export: nil)
if job = Exports.get_last_export_job(site_id) do
%Oban.Job{state: state} = job
case state do
_ when state in ["scheduled", "available", "retryable"] ->
assign(socket, status: "in_progress")
"executing" ->
# Exports.oban_notify/1 is called in `perform/1` and
# the notification arrives while the job.state is still "executing"
if export = get_export.(site_id) do
assign(socket, status: "ready", export: export)
else
assign(socket, status: "in_progress")
end
"completed" ->
if export = get_export.(site_id) do
assign(socket, status: "ready", export: export)
else
assign(socket, status: "can_schedule")
end
"discarded" ->
assign(socket, status: "failed")
"cancelled" ->
# credo:disable-for-next-line Credo.Check.Refactor.Nesting
if export = get_export.(site_id) do
assign(socket, status: "ready", export: export)
else
assign(socket, status: "can_schedule")
end
end
else
if export = get_export.(site_id) do
assign(socket, status: "ready", export: export)
else
assign(socket, status: "can_schedule")
end
end
end
@impl true
def render(assigns) do
~H"""
<%= case @status do %>
<% "can_schedule" -> %>
<.prepare_download />
<% "in_progress" -> %>
<.in_progress />
<% "failed" -> %>
<.failed />
<% "ready" -> %>
<.download storage={@storage} export={@export} />
<% end %>
"""
end
defp prepare_download(assigns) do
~H"""
<Generic.button phx-click="export">Prepare download</Generic.button>
<p class="text-sm mt-4 text-gray-500">
Prepare your data for download by clicking the button above. When that's done, a Zip file that you can download will appear.
</p>
"""
end
defp in_progress(assigns) do
~H"""
<div class="flex items-center justify-between space-x-2">
<div class="flex items-center">
<Generic.spinner />
<span class="ml-2">We are preparing your download ...</span>
</div>
<button
phx-click="cancel"
class="text-red-500 font-semibold"
data-confirm="Are you sure you want to cancel this export?"
>
Cancel
</button>
</div>
<p class="text-sm mt-4 text-gray-500">
The preparation of your stats might take a while. Depending on the volume of your data, it might take up to 20 minutes. Feel free to leave the page and return later.
</p>
"""
end
defp failed(assigns) do
~H"""
<div class="flex items-center">
<Heroicons.exclamation_circle class="w-4 h-4 text-red-500" />
<p class="ml-2 text-sm text-gray-500">
Something went wrong when preparing your download. Please
<button phx-click="export" class="text-indigo-500">try again.</button>
</p>
</div>
"""
end
defp download(assigns) do
~H"""
<div class="flex items-center justify-between space-x-2">
<a href={@export.download_link} class="inline-flex items-center">
<Heroicons.document_text class="w-4 h-4" />
<span class="ml-1 text-indigo-500"><%= @export.name %></span>
</a>
<button
phx-click="delete"
class="text-red-500 font-semibold"
data-confirm="Are you sure you want to delete this export?"
>
<Heroicons.trash class="w-4 h-4" />
</button>
</div>
<p :if={@export.expires_at} class="text-sm mt-4 text-gray-500">
Note that this file will expire
<.hint message={@export.expires_at}>
<%= Timex.Format.DateTime.Formatters.Relative.format!(@export.expires_at, "{relative}") %>.
</.hint>
</p>
<p :if={@storage == "local"} class="text-sm mt-4 text-gray-500">
Located at
<.hint message={@export.path}><%= format_path(@export.path) %></.hint>
(<%= format_bytes(@export.size) %>)
</p>
"""
end
defp hint(assigns) do
~H"""
<span title={@message} class="underline cursor-help underline-offset-2 decoration-dashed">
<%= render_slot(@inner_block) %>
</span>
"""
end
@impl true
def handle_event("export", _params, socket) do
%{storage: storage, site_id: site_id, email_to: email_to} = socket.assigns
schedule_result =
case storage do
"s3" -> Exports.schedule_s3_export(site_id, email_to)
"local" -> Exports.schedule_local_export(site_id, email_to)
end
socket =
case schedule_result do
{:ok, _job} ->
fetch_export(socket)
{:error, :no_data} ->
socket
|> put_flash(:error, "There is no data to export")
|> redirect(
external:
Routes.site_path(socket, :settings_imports_exports, socket.assigns.site.domain)
)
end
{:noreply, socket}
end
def handle_event("cancel", _params, socket) do
if job = Exports.get_last_export_job(socket.assigns.site_id), do: Oban.cancel_job(job)
{:noreply, fetch_export(socket)}
end
def handle_event("delete", _params, socket) do
%{storage: storage, site_id: site_id} = socket.assigns
case storage do
"s3" -> Exports.delete_s3_export(site_id)
"local" -> Exports.delete_local_export(site_id)
end
{:noreply, fetch_export(socket)}
end
@impl true
def handle_info({:notification, Exports, %{"site_id" => site_id}}, socket) do
socket =
if site_id == socket.assigns.site_id do
fetch_export(socket)
else
socket
end
{:noreply, socket}
end
@format_path_regex ~r/^(?<beginning>((.+?\/){3})).*(?<ending>(\/.*){3})$/
defp format_path(path) do
path_string =
path
|> to_string()
|> String.replace_prefix("\"", "")
|> String.replace_suffix("\"", "")
case Regex.named_captures(@format_path_regex, path_string) do
%{"beginning" => beginning, "ending" => ending} -> "#{beginning}...#{ending}"
_ -> path_string
end
end
defp format_bytes(bytes) when is_integer(bytes) do
cond do
bytes >= memory_unit("TiB") -> format_bytes(bytes, "TiB")
bytes >= memory_unit("GiB") -> format_bytes(bytes, "GiB")
bytes >= memory_unit("MiB") -> format_bytes(bytes, "MiB")
bytes >= memory_unit("KiB") -> format_bytes(bytes, "KiB")
true -> format_bytes(bytes, "B")
end
end
defp format_bytes(bytes, "B"), do: "#{bytes} B"
defp format_bytes(bytes, unit) do
value = bytes / memory_unit(unit)
"#{:erlang.float_to_binary(value, decimals: 1)} #{unit}"
end
defp memory_unit("TiB"), do: 1024 * 1024 * 1024 * 1024
defp memory_unit("GiB"), do: 1024 * 1024 * 1024
defp memory_unit("MiB"), do: 1024 * 1024
defp memory_unit("KiB"), do: 1024
end

View File

@ -1,26 +1,76 @@
defmodule PlausibleWeb.Live.CSVImport do
@moduledoc """
LiveView allowing uploading CSVs for imported tables to S3
LiveView allowing uploading CSVs for imported tables to S3 or local storage
"""
use PlausibleWeb, :live_view
alias Plausible.Imported.CSVImporter
use PlausibleWeb, :live_view
alias PlausibleWeb.Components.Generic
require Plausible.Imported.SiteImport
alias Plausible.Imported.CSVImporter
alias Plausible.Imported
# :not_mounted_at_router ensures we have already done auth checks in the controller
# if this liveview becomes available from the router, please make sure
# to check that current_user_role is allowed to make site imports
@impl true
def mount(_params, session, socket) do
%{"site_id" => site_id, "user_id" => user_id} = session
def mount(:not_mounted_at_router, session, socket) do
%{"site_id" => site_id, "current_user_id" => user_id, "storage" => storage} = session
upload_opts = [
accept: [".csv", "text/csv"],
auto_upload: true,
max_entries: length(Imported.tables()),
# 1GB
max_file_size: 1_000_000_000,
progress: &handle_progress/3
]
upload_opts =
case storage do
"s3" -> [{:external, &presign_upload/2} | upload_opts]
"local" -> upload_opts
end
upload_consumer =
case storage do
"s3" ->
fn meta, entry ->
{:ok, %{"s3_url" => meta.s3_url, "filename" => entry.client_name}}
end
"local" ->
local_dir = CSVImporter.local_dir(site_id)
File.mkdir_p!(local_dir)
fn meta, entry ->
local_path = Path.join(local_dir, Path.basename(meta.path))
File.rename!(meta.path, local_path)
{:ok, %{"local_path" => local_path, "filename" => entry.client_name}}
end
end
%{assigns: %{site: site}} =
socket = assign_new(socket, :site, fn -> Plausible.Repo.get!(Plausible.Site, site_id) end)
# we'll listen for new completed imports to know
# when to reload the occupied ranges
if connected?(socket), do: Imported.listen()
occupied_ranges = Imported.get_occupied_date_ranges(site)
native_stats_start_date = Plausible.Sites.native_stats_start_date(site)
socket =
socket
|> assign(site_id: site_id, user_id: user_id)
|> allow_upload(:import,
accept: [".csv", "text/csv"],
auto_upload: true,
max_entries: length(Plausible.Imported.tables()),
# 1GB
max_file_size: 1_000_000_000,
external: &presign_upload/2,
progress: &handle_progress/3
|> assign(
site_id: site_id,
user_id: user_id,
storage: storage,
upload_consumer: upload_consumer,
occupied_ranges: occupied_ranges,
native_stats_start_date: native_stats_start_date
)
|> allow_upload(:import, upload_opts)
|> process_imported_tables()
{:ok, socket}
@ -32,8 +82,12 @@ defmodule PlausibleWeb.Live.CSVImport do
<div>
<form action="#" method="post" phx-change="validate-upload-form" phx-submit="submit-upload-form">
<.csv_picker upload={@uploads.import} imported_tables={@imported_tables} />
<.confirm_button date_range={@date_range} can_confirm?={@can_confirm?} />
<.confirm_button date_range={@clamped_date_range} can_confirm?={@can_confirm?} />
<.maybe_date_range_warning
:if={@original_date_range}
clamped={@clamped_date_range}
original={@original_date_range}
/>
<p :for={error <- upload_errors(@uploads.import)} class="text-red-400">
<%= error_to_string(error) %>
</p>
@ -46,13 +100,11 @@ defmodule PlausibleWeb.Live.CSVImport do
~H"""
<label
phx-drop-target={@upload.ref}
class="block border-2 dark:border-gray-600 rounded p-4 group hover:border-indigo-500 dark:hover:border-indigo-600 transition cursor-pointer"
class="block border-2 dark:border-gray-600 rounded-md p-4 hover:bg-gray-50 dark:hover:bg-gray-900 hover:border-indigo-500 dark:hover:border-indigo-600 transition cursor-pointer"
>
<div class="flex items-center">
<div class="bg-gray-200 dark:bg-gray-600 rounded p-1 group-hover:bg-indigo-500 dark:group-hover:bg-indigo-600 transition">
<Heroicons.document_plus class="w-5 h-5 group-hover:text-white transition" />
</div>
<span class="ml-2 text-sm text-gray-600 dark:text-gray-500">
<div class="flex items-center text-gray-500 dark:text-gray-500">
<Heroicons.document_plus class="w-5 h-5 transition" />
<span class="ml-1.5 text-sm">
(or drag-and-drop your unzipped CSVs here)
</span>
<.live_file_input upload={@upload} class="hidden" />
@ -81,7 +133,7 @@ defmodule PlausibleWeb.Live.CSVImport do
]}
>
<%= if @date_range do %>
Confirm import from <%= @date_range.first %> to <%= @date_range.last %>
Confirm import <.dates range={@date_range} />
<% else %>
Confirm import
<% end %>
@ -89,6 +141,31 @@ defmodule PlausibleWeb.Live.CSVImport do
"""
end
defp maybe_date_range_warning(assigns) do
~H"""
<%= if @clamped do %>
<Generic.notice :if={@clamped != @original} title="Dates Adjusted" theme={:yellow} class="mt-4">
The dates <.dates range={@original} />
overlap with previous imports, so we'll use the next best period, <.dates range={@clamped} />
</Generic.notice>
<% else %>
<Generic.notice title="Dates Conflict" theme={:red} class="mt-4">
The dates <.dates range={@original} />
overlap with dates we've already imported and cannot be used for new imports.
</Generic.notice>
<% end %>
"""
end
defp dates(assigns) do
~H"""
<span class="whitespace-nowrap">
<span class="font-medium"><%= @range.first %></span>
to <span class="font-medium"><%= @range.last %></span>
</span>
"""
end
defp imported_table(assigns) do
status =
cond do
@ -101,19 +178,16 @@ defmodule PlausibleWeb.Live.CSVImport do
assigns = assign(assigns, status: status)
~H"""
<li id={@table} class="ml-1.5">
<div class="flex items-center space-x-2">
<Heroicons.document_check :if={@status == :success} class="w-4 h-4 text-indigo-600" />
<PlausibleWeb.Components.Generic.spinner
:if={@status == :in_progress}
class="w-4 h-4 text-indigo-600"
/>
<Heroicons.document :if={@status == :empty} class="w-4 h-4 text-gray-400 dark:text-gray-500" />
<li id={@table} class="ml-0.5">
<div class="flex items-center space-x-2 text-gray-600 dark:text-gray-500">
<Heroicons.document_check :if={@status == :success} class="w-4 h-4" />
<Generic.spinner :if={@status == :in_progress} class="w-4 h-4" />
<Heroicons.document :if={@status == :empty} class="w-4 h-4 opacity-80" />
<Heroicons.document :if={@status == :error} class="w-4 h-4 text-red-600 dark:text-red-700" />
<span class={[
"text-sm",
if(@upload, do: "dark:text-gray-400", else: "text-gray-400 dark:text-gray-500"),
if(@status == :empty, do: "opacity-80"),
if(@status == :error, do: "text-red-600 dark:text-red-700")
]}>
<%= if @upload do %>
@ -137,20 +211,24 @@ defmodule PlausibleWeb.Live.CSVImport do
end
def handle_event("submit-upload-form", _params, socket) do
%{site_id: site_id, user_id: user_id, date_range: date_range} = socket.assigns
site = Plausible.Repo.get!(Plausible.Site, site_id)
user = Plausible.Repo.get!(Plausible.Auth.User, user_id)
%{
storage: storage,
site: site,
user_id: user_id,
clamped_date_range: clamped_date_range,
upload_consumer: upload_consumer
} =
socket.assigns
uploads =
consume_uploaded_entries(socket, :import, fn meta, entry ->
{:ok, %{"s3_url" => meta.s3_url, "filename" => entry.client_name}}
end)
user = Plausible.Repo.get!(Plausible.Auth.User, user_id)
uploads = consume_uploaded_entries(socket, :import, upload_consumer)
{:ok, _job} =
CSVImporter.new_import(site, user,
start_date: date_range.first,
end_date: date_range.last,
uploads: uploads
start_date: clamped_date_range.first,
end_date: clamped_date_range.last,
uploads: uploads,
storage: storage
)
redirect_to =
@ -159,6 +237,21 @@ defmodule PlausibleWeb.Live.CSVImport do
{:noreply, redirect(socket, external: redirect_to)}
end
@impl true
def handle_info({:notification, :analytics_imports_jobs, details}, socket) do
site = socket.assigns.site
socket =
if details["site_id"] == site.id and details["event"] == "complete" do
occupied_ranges = Imported.get_occupied_date_ranges(site)
socket |> assign(occupied_ranges: occupied_ranges) |> process_imported_tables()
else
socket
end
{:noreply, socket}
end
defp error_to_string(:too_large), do: "is too large (max size is 1 gigabyte)"
defp error_to_string(:too_many_files), do: "too many files"
defp error_to_string(:not_accepted), do: "unacceptable file types"
@ -166,11 +259,20 @@ defmodule PlausibleWeb.Live.CSVImport do
defp presign_upload(entry, socket) do
%{s3_url: s3_url, presigned_url: upload_url} =
Plausible.S3.import_presign_upload(socket.assigns.site_id, entry.client_name)
Plausible.S3.import_presign_upload(socket.assigns.site_id, random_suffix(entry.client_name))
{:ok, %{uploader: "S3", s3_url: s3_url, url: upload_url}, socket}
end
defp random_suffix(filename) do
# based on Plug.Upload.path/2
# https://github.com/elixir-plug/plug/blob/eabf0b9d43060c10663a9105cb1baf984d272a6c/lib/plug/upload.ex#L154-L159
sec = Integer.to_string(:os.system_time(:second))
rand = Integer.to_string(:rand.uniform(999_999_999_999))
scheduler_id = Integer.to_string(:erlang.system_info(:scheduler_id))
filename <> "-" <> sec <> "-" <> rand <> "-" <> scheduler_id
end
defp handle_progress(:import, entry, socket) do
if entry.done? do
{:noreply, process_imported_tables(socket)}
@ -180,7 +282,7 @@ defmodule PlausibleWeb.Live.CSVImport do
end
defp process_imported_tables(socket) do
tables = Plausible.Imported.tables()
tables = Imported.tables()
{completed, in_progress} = uploaded_entries(socket, :import)
{valid_uploads, invalid_uploads} =
@ -207,16 +309,37 @@ defmodule PlausibleWeb.Live.CSVImport do
replaced_uploads
end)
date_range = CSVImporter.date_range(Enum.map(valid_uploads, & &1.client_name))
original_date_range = CSVImporter.date_range(Enum.map(valid_uploads, & &1.client_name))
clamped_date_range =
if original_date_range do
%Date.Range{first: start_date, last: end_date} = original_date_range
%{
site: site,
occupied_ranges: occupied_ranges,
native_stats_start_date: native_stats_start_date
} = socket.assigns
cutoff_date = native_stats_start_date || Timex.today(site.timezone)
case Imported.clamp_dates(occupied_ranges, cutoff_date, start_date, end_date) do
{:ok, start_date, end_date} -> Date.range(start_date, end_date)
{:error, :no_time_window} -> nil
end
end
all_uploaded? = completed != [] and in_progress == []
can_confirm? = all_uploaded? and not is_nil(clamped_date_range)
socket
|> cancel_uploads(invalid_uploads)
|> cancel_uploads(replaced_uploads)
|> assign(
imported_tables: imported_tables,
can_confirm?: all_uploaded?,
date_range: date_range
can_confirm?: can_confirm?,
original_date_range: original_date_range,
clamped_date_range: clamped_date_range
)
end

View File

@ -145,9 +145,9 @@ defmodule PlausibleWeb.Live.ImportsExportsSettings do
"""
end
def handle_info({:notification, :analytics_imports_jobs, status}, socket) do
[{status_str, import_id}] = Enum.to_list(status)
{site_imports, updated?} = update_imports(socket.assigns.site_imports, import_id, status_str)
def handle_info({:notification, :analytics_imports_jobs, details}, socket) do
{site_imports, updated?} =
update_imports(socket.assigns.site_imports, details["import_id"], details["event"])
pageview_counts =
if updated? do

View File

@ -62,7 +62,7 @@ defmodule PlausibleWeb.Router do
end
end
if Mix.env() == :dev do
if Mix.env() in [:dev, :small_dev] do
forward "/sent-emails", Bamboo.SentEmailViewerPlug
end
@ -390,7 +390,13 @@ defmodule PlausibleWeb.Router do
delete "/:website/settings/forget-imported", SiteController, :forget_imported
delete "/:website/settings/forget-import/:import_id", SiteController, :forget_import
post "/:website/settings/export", SiteController, :csv_export
on_full_build do
# exported archives are downloaded from object storage
else
get "/:website/exported-archive", SiteController, :download_local_export
end
get "/:website/settings/import", SiteController, :csv_import
get "/:domain/export", StatsController, :csv_export

View File

@ -0,0 +1,5 @@
Your <%= if full_build?() do %>Plausible Analytics <% end %>export for <%= @site.domain %> has encountered an error and was unsuccessful.
Sorry for the trouble this may have caused.
<br/><br/>
Please attempt to export your data again.
<%= if full_build?() do %>Should the problem persist, do reply to this email so we can assist. Thanks!<% end %>

View File

@ -0,0 +1,3 @@
Your <%= if full_build?() do %>Plausible Analytics <% end %>export for <%= @site.domain %> is now ready for download.
Please click <a href="<%= @download_url %>">here</a> to start the download process.
<%= if @expires_in do %>Note that this link will expire <%= @expires_in %>.<% end %>

View File

@ -15,6 +15,10 @@
</div>
<%= live_render(@conn, PlausibleWeb.Live.CSVImport,
session: %{"site_id" => @site.id, "user_id" => @current_user.id}
session: %{
"site_id" => @site.id,
"current_user_id" => @current_user.id,
"storage" => on_full_build(do: "s3", else: "local")
}
) %>
</div>

View File

@ -13,9 +13,11 @@
<%= live_render(@conn, PlausibleWeb.Live.ImportsExportsSettings,
session: %{"domain" => @site.domain}
) %>
</div>
<header class="relative border-b border-gray-200 pb-4">
<h2 class="mt-8 text-lg leading-6 font-medium text-gray-900 dark:text-gray-100">
<div class="shadow bg-white dark:bg-gray-800 dark:text-gray-200 sm:rounded-md sm:overflow-hidden py-6 px-4 sm:p-6">
<header class="relative border-b border-gray-200 pb-4 mb-5">
<h2 class="text-lg leading-6 font-medium text-gray-900 dark:text-gray-100">
Export Data
</h2>
<p class="mt-1 text-sm leading-5 text-gray-500 dark:text-gray-200">
@ -23,13 +25,11 @@
</p>
</header>
<div class="mt-4">
<PlausibleWeb.Components.Generic.button
data-method="post"
data-to={"/#{URI.encode_www_form(@site.domain)}/settings/export"}
data-csrf={Plug.CSRFProtection.get_csrf_token()}
>
Export to CSV
</PlausibleWeb.Components.Generic.button>
</div>
<%= live_render(@conn, PlausibleWeb.Live.CSVExport,
session: %{
"site_id" => @site.id,
"email_to" => @current_user.email,
"storage" => on_full_build(do: "s3", else: "local")
}
) %>
</div>

View File

@ -0,0 +1,121 @@
defmodule Plausible.Workers.ExportAnalytics do
@moduledoc """
Worker for running CSV export jobs. Supports S3 and local storage.
To avoid blocking the queue, a timeout of 15 minutes is enforced.
"""
use Oban.Worker,
queue: :analytics_exports,
max_attempts: 3
alias Plausible.Exports
@doc "This base query filters export jobs for a site"
def base_query(site_id) do
import Ecto.Query, only: [from: 2]
from j in Oban.Job,
where: j.worker == ^Oban.Worker.to_string(__MODULE__),
where: j.args["site_id"] == ^site_id
end
@impl true
def timeout(_job), do: :timer.minutes(15)
@impl true
def perform(%Oban.Job{args: args} = job) do
%{
"storage" => storage,
"site_id" => site_id
} = args
site = Plausible.Repo.get!(Plausible.Site, site_id)
%Date.Range{} = date_range = Exports.date_range(site.id, site.timezone)
queries =
Exports.export_queries(site_id,
date_range: date_range,
timezone: site.timezone,
extname: ".csv"
)
# since each worker / `perform` attempt runs in a separate process
# it's ok to use start_link to keep connection lifecycle
# bound to that of the worker
{:ok, ch} =
Plausible.ClickhouseRepo.config()
|> Keyword.replace!(:pool_size, 1)
|> Ch.start_link()
try do
case storage do
"s3" -> perform_s3_export(ch, site, queries, args)
"local" -> perform_local_export(ch, queries, args)
end
after
Exports.oban_notify(site_id)
end
email_success(job.args)
:ok
catch
class, reason ->
if job.attempt >= job.max_attempts, do: email_failure(job.args)
:erlang.raise(class, reason, __STACKTRACE__)
end
defp perform_s3_export(ch, site, queries, args) do
%{
"s3_bucket" => s3_bucket,
"s3_path" => s3_path
} = args
created_on = Plausible.Timezones.to_date_in_timezone(DateTime.utc_now(), site.timezone)
filename = Exports.archive_filename(site.domain, created_on)
DBConnection.run(
ch,
fn conn ->
conn
|> Exports.stream_archive(queries, format: "CSVWithNames")
|> Plausible.S3.export_upload_multipart(s3_bucket, s3_path, filename)
end,
timeout: :infinity
)
end
defp perform_local_export(ch, queries, args) do
%{"local_path" => local_path} = args
tmp_path = Plug.Upload.random_file!("tmp-plausible-export")
DBConnection.run(
ch,
fn conn ->
Exports.stream_archive(conn, queries, format: "CSVWithNames")
|> Stream.into(File.stream!(tmp_path))
|> Stream.run()
end,
timeout: :infinity
)
File.mkdir_p!(Path.dirname(local_path))
if File.exists?(local_path), do: File.rm!(local_path)
File.rename!(tmp_path, local_path)
end
defp email_failure(args) do
args |> Map.put("status", "failure") |> email()
end
defp email_success(args) do
args |> Map.put("status", "success") |> email()
end
defp email(args) do
# email delivery can potentially fail and cause already successful
# export to be repeated which is costly, hence email is delivered
# in a separate job
Oban.insert!(Plausible.Workers.NotifyExportedAnalytics.new(args))
end
end

View File

@ -1,105 +0,0 @@
defmodule Plausible.Workers.ExportCSV do
@moduledoc """
Worker for running CSV export jobs.
"""
use Oban.Worker,
queue: :s3_csv_export,
max_attempts: 3,
unique: [fields: [:args], keys: [:s3_bucket, :s3_path], period: 60]
@impl true
def perform(job) do
%Oban.Job{
args:
%{
"site_id" => site_id,
"email_to" => email,
"s3_bucket" => s3_bucket,
"s3_path" => s3_path
} = args
} = job
{:ok, ch} =
Plausible.ClickhouseRepo.config()
|> Keyword.replace!(:pool_size, 1)
|> Ch.start_link()
%Ch.Result{rows: [[%Date{} = min_date, %Date{} = max_date]]} =
Ch.query!(
ch,
"SELECT toDate(min(timestamp)), toDate(max(timestamp)) FROM events_v2 WHERE site_id={site_id:UInt64}",
%{"site_id" => site_id}
)
if max_date == ~D[1970-01-01] do
# NOTE: replace with proper Plausible.Email template
Plausible.Mailer.deliver_now!(
Bamboo.Email.new_email(
from: PlausibleWeb.Email.mailer_email_from(),
to: email,
subject: "EXPORT FAILURE",
text_body: "there is nothing to export"
)
)
else
domain = Plausible.Sites.get_domain!(site_id)
export_archive_filename = Plausible.Exports.archive_filename(domain, min_date, max_date)
s3_config_overrides = s3_config_overrides(args)
download_url =
DBConnection.run(
ch,
fn conn ->
conn
|> Plausible.Exports.stream_archive(
Plausible.Exports.export_queries(site_id,
date_range: Date.range(min_date, max_date),
extname: ".csv"
),
format: "CSVWithNames"
)
|> Plausible.S3.export_upload_multipart(
s3_bucket,
s3_path,
export_archive_filename,
s3_config_overrides
)
end,
timeout: :infinity
)
# NOTE: replace with proper Plausible.Email template
Plausible.Mailer.deliver_now!(
Bamboo.Email.new_email(
from: PlausibleWeb.Email.mailer_email_from(),
to: email,
subject: "EXPORT SUCCESS",
text_body: """
download it from #{download_url}! hurry up! you have 24 hours!"
""",
html_body: """
download it from <a href="#{download_url}">here</a>! hurry up! you have 24 hours!
"""
)
)
end
:ok
end
# right now custom config is used in tests only (to access the minio container)
# ideally it would be passed via the s3 url
# but ExAws.S3.upload is hard to make work with s3 urls
if Mix.env() in [:test, :small_test] do
defp s3_config_overrides(args) do
if config_overrides = args["s3_config_overrides"] do
Enum.map(config_overrides, fn {k, v} -> {String.to_existing_atom(k), v} end)
else
[]
end
end
else
defp s3_config_overrides(_args), do: []
end
end

View File

@ -0,0 +1,32 @@
defmodule Plausible.Workers.LocalImportAnalyticsCleaner do
@moduledoc """
Worker for cleaning local files left after analytics import jobs.
"""
use Oban.Worker, queue: :analytics_imports, unique: [period: 3600]
@impl Oban.Worker
def perform(%Oban.Job{args: args}) do
%{"import_id" => import_id, "paths" => paths} = args
if import_in_progress?(import_id) do
{:snooze, _one_hour = 3600}
else
Enum.each(paths, fn path ->
# credo:disable-for-next-line Credo.Check.Refactor.Nesting
if File.exists?(path), do: File.rm!(path)
end)
end
end
defp import_in_progress?(import_id) do
import Ecto.Query
require Plausible.Imported.SiteImport
alias Plausible.Imported.SiteImport
SiteImport
|> where(id: ^import_id)
|> where([i], i.status in ^[SiteImport.pending(), SiteImport.importing()])
|> Plausible.Repo.exists?()
end
end

View File

@ -0,0 +1,48 @@
defmodule Plausible.Workers.NotifyExportedAnalytics do
@moduledoc "This worker delivers emails for successful and failed exports"
use Oban.Worker,
queue: :notify_exported_analytics,
max_attempts: 5
@impl true
def perform(%Oban.Job{args: args}) do
%{
"status" => status,
"storage" => storage,
"email_to" => email_to,
"site_id" => site_id
} = args
user = Plausible.Repo.get_by!(Plausible.Auth.User, email: email_to)
site = Plausible.Repo.get!(Plausible.Site, site_id)
email =
case status do
"success" ->
case storage do
"s3" ->
%{"s3_bucket" => s3_bucket, "s3_path" => s3_path} = args
download_url = Plausible.S3.download_url(s3_bucket, s3_path)
%{expires_at: expires_at} = Plausible.Exports.get_s3_export(site_id)
PlausibleWeb.Email.export_success(user, site, download_url, expires_at)
"local" ->
download_url =
PlausibleWeb.Router.Helpers.site_path(
PlausibleWeb.Endpoint,
:download_local_export,
site.domain
)
PlausibleWeb.Email.export_success(user, site, download_url, _expires_at = nil)
end
"failure" ->
PlausibleWeb.Email.export_failure(user, site)
end
Plausible.Mailer.deliver_now!(email)
:ok
end
end

View File

@ -141,7 +141,6 @@ defmodule Plausible.MixProject do
{:ex_aws, "~> 2.5"},
{:ex_aws_s3, "~> 2.5"},
{:sweet_xml, "~> 0.7.4"},
{:testcontainers, "~> 1.6", only: [:test, :small_test]},
{:zstream, "~> 0.6.4"},
{:con_cache, "~> 1.0"}
]

View File

@ -47,7 +47,6 @@
"ex_cldr_currencies": {:hex, :ex_cldr_currencies, "2.15.1", "e92ba17c41e7405b7784e0e65f406b5f17cfe313e0e70de9befd653e12854822", [:mix], [{:ex_cldr, "~> 2.34", [hex: :ex_cldr, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "31df8bd37688340f8819bdd770eb17d659652078d34db632b85d4a32864d6a25"},
"ex_cldr_numbers": {:hex, :ex_cldr_numbers, "2.32.3", "b631ff94c982ec518e46bf4736000a30a33d6b58facc085d5f240305f512ad4a", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:digital_token, "~> 0.3 or ~> 1.0", [hex: :digital_token, repo: "hexpm", optional: false]}, {:ex_cldr, "~> 2.37", [hex: :ex_cldr, repo: "hexpm", optional: false]}, {:ex_cldr_currencies, ">= 2.14.2", [hex: :ex_cldr_currencies, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "7b626ff1e59a0ec9c3c5db5ce9ca91a6995e2ab56426b71f3cbf67181ea225f5"},
"ex_doc": {:hex, :ex_doc, "0.31.1", "8a2355ac42b1cc7b2379da9e40243f2670143721dd50748bf6c3b1184dae2089", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "3178c3a407c557d8343479e1ff117a96fd31bafe52a039079593fb0524ef61b0"},
"ex_docker_engine_api": {:hex, :ex_docker_engine_api, "1.43.1", "1161e34b6bea5cef84d8fdc1d5d510fcb0c463941ce84c36f4a0f44a9096eb96", [:mix], [{:hackney, "~> 1.20", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:tesla, "~> 1.7", [hex: :tesla, repo: "hexpm", optional: false]}], "hexpm", "ec8fc499389aeef56ddca67e89e9e98098cff50587b56e8b4613279f382793b1"},
"ex_json_logger": {:hex, :ex_json_logger, "1.4.0", "ad1dcc1cfe6940ee1d9d489b20757c89769626ce34c4957548d6fbe155cd96f1", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "7548a1ecba290746e06214d2b3d8783c76760c779a8903a8e44bfd23a7340444"},
"ex_machina": {:hex, :ex_machina, "2.7.0", "b792cc3127fd0680fecdb6299235b4727a4944a09ff0fa904cc639272cd92dc7", [:mix], [{:ecto, "~> 2.2 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_sql, "~> 3.0", [hex: :ecto_sql, repo: "hexpm", optional: true]}], "hexpm", "419aa7a39bde11894c87a615c4ecaa52d8f107bbdd81d810465186f783245bf8"},
"ex_money": {:hex, :ex_money, "5.15.3", "ea070eb1eefd22258aa288921ba482f1fa5f870d229069dc3d12458b7b8bf66d", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ex_cldr_numbers, "~> 2.31", [hex: :ex_cldr_numbers, repo: "hexpm", optional: false]}, {:gringotts, "~> 1.1", [hex: :gringotts, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.0 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:poison, "~> 3.0 or ~> 4.0 or ~> 5.0", [hex: :poison, repo: "hexpm", optional: true]}], "hexpm", "3671f1808c428b7c4688650d43dc1af0b64c0eea822429a28c55cef15fb4fdc1"},
@ -142,14 +141,11 @@
"telemetry_metrics_prometheus_core": {:hex, :telemetry_metrics_prometheus_core, "1.1.0", "4e15f6d7dbedb3a4e3aed2262b7e1407f166fcb9c30ca3f96635dfbbef99965c", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "0dd10e7fe8070095df063798f82709b0a1224c31b8baf6278b423898d591a069"},
"telemetry_poller": {:hex, :telemetry_poller, "1.0.0", "db91bb424e07f2bb6e73926fcafbfcbcb295f0193e0a00e825e589a0a47e8453", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "b3a24eafd66c3f42da30fc3ca7dda1e9d546c12250a2d60d7b81d264fbec4f6e"},
"telemetry_registry": {:hex, :telemetry_registry, "0.3.1", "14a3319a7d9027bdbff7ebcacf1a438f5f5c903057b93aee484cca26f05bdcba", [:mix, :rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6d0ca77b691cf854ed074b459a93b87f4c7f5512f8f7743c635ca83da81f939e"},
"tesla": {:hex, :tesla, "1.8.0", "d511a4f5c5e42538d97eef7c40ec4f3e44effdc5068206f42ed859e09e51d1fd", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: true]}, {:finch, "~> 0.13", [hex: :finch, repo: "hexpm", optional: true]}, {:fuse, "~> 2.4", [hex: :fuse, repo: "hexpm", optional: true]}, {:gun, ">= 1.0.0", [hex: :gun, repo: "hexpm", optional: true]}, {:hackney, "~> 1.6", [hex: :hackney, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.2", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: true]}, {:msgpax, "~> 2.3", [hex: :msgpax, repo: "hexpm", optional: true]}, {:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "10501f360cd926a309501287470372af1a6e1cbed0f43949203a4c13300bc79f"},
"testcontainers": {:hex, :testcontainers, "1.6.0", "14b3251f01ce0b1ada716130d371ba0b6cb1ce2904aa38bd58e5ff4194f4d88f", [:mix], [{:ecto, "~> 3.3", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_sql, "~> 3.3", [hex: :ecto_sql, repo: "hexpm", optional: true]}, {:ex_docker_engine_api, "~> 1.43.1", [hex: :ex_docker_engine_api, repo: "hexpm", optional: false]}, {:uuid, "~> 1.1", [hex: :uuid, repo: "hexpm", optional: false]}], "hexpm", "3f812407f232954999a3a2e05b2802e1d8d1afba120533c42b32c7cc91d35daf"},
"timex": {:hex, :timex, "3.7.11", "bb95cb4eb1d06e27346325de506bcc6c30f9c6dea40d1ebe390b262fad1862d1", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:gettext, "~> 0.20", [hex: :gettext, repo: "hexpm", optional: false]}, {:tzdata, "~> 1.1", [hex: :tzdata, repo: "hexpm", optional: false]}], "hexpm", "8b9024f7efbabaf9bd7aa04f65cf8dcd7c9818ca5737677c7b76acbc6a94d1aa"},
"tls_certificate_check": {:hex, :tls_certificate_check, "1.21.0", "042ab2c0c860652bc5cf69c94e3a31f96676d14682e22ec7813bd173ceff1788", [:rebar3], [{:ssl_verify_fun, "~> 1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "6cee6cffc35a390840d48d463541d50746a7b0e421acaadb833cfc7961e490e7"},
"tzdata": {:hex, :tzdata, "1.1.1", "20c8043476dfda8504952d00adac41c6eda23912278add38edc140ae0c5bcc46", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "a69cec8352eafcd2e198dea28a34113b60fdc6cb57eb5ad65c10292a6ba89787"},
"ua_inspector": {:hex, :ua_inspector, "3.9.0", "2021bbddb1ee41f202da7f006fb09f5c5617ad579108b7b9bcf1881828462f04", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:yamerl, "~> 0.7", [hex: :yamerl, repo: "hexpm", optional: false]}], "hexpm", "0f2d1b9806e78b14b91b6f20eaaa4a68f8989f4f2a99a376b874c295ac50a30d"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"},
"uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm", "c790593b4c3b601f5dc2378baae7efaf5b3d73c4c6456ba85759905be792f2ac"},
"websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"},
"websock_adapter": {:hex, :websock_adapter, "0.5.5", "9dfeee8269b27e958a65b3e235b7e447769f66b5b5925385f5a569269164a210", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "4b977ba4a01918acbf77045ff88de7f6972c2a009213c515a445c48f224ffce9"},
"yamerl": {:hex, :yamerl, "0.10.0", "4ff81fee2f1f6a46f1700c0d880b24d193ddb74bd14ef42cb0bcf46e81ef2f8e", [:rebar3], [], "hexpm", "346adb2963f1051dc837a2364e4acf6eb7d80097c0f53cbdc3046ec8ec4b4e6e"},

View File

@ -1,5 +1,5 @@
defmodule Plausible.ExportsTest do
use Plausible.DataCase, async: true
use Plausible.DataCase
doctest Plausible.Exports, import: true
@ -72,17 +72,16 @@ defmodule Plausible.ExportsTest do
test "creates zip archive", %{ch: ch, tmp_dir: tmp_dir} do
queries = %{
"1.csv" => from(n in "numbers", select: n.number, limit: 3),
"1.csv" => from(n in fragment("numbers(3)"), select: n.number),
"2.csv" =>
from(n in "numbers",
select: [n.number, selected_as(n.number + n.number, :double)],
limit: 3
from(n in fragment("numbers(3)"),
select: [n.number, selected_as(n.number + n.number, :double)]
)
}
DBConnection.run(ch, fn conn ->
conn
|> Plausible.Exports.stream_archive(queries, database: "system", format: "CSVWithNames")
|> Plausible.Exports.stream_archive(queries, format: "CSVWithNames")
|> Stream.into(File.stream!(Path.join(tmp_dir, "numbers.zip")))
|> Stream.run()
end)
@ -123,14 +122,14 @@ defmodule Plausible.ExportsTest do
test "stops on error", %{ch: ch, tmp_dir: tmp_dir} do
queries = %{
"1.csv" => from(n in "numbers", select: n.number, limit: 1000),
"1.csv" => from(n in fragment("numbers(1000)"), select: n.number),
"2.csv" => from(n in "no_such_table", select: n.number)
}
assert_raise Ch.Error, ~r/UNKNOWN_TABLE/, fn ->
DBConnection.run(ch, fn conn ->
conn
|> Plausible.Exports.stream_archive(queries, database: "system", format: "CSVWithNames")
|> Plausible.Exports.stream_archive(queries, format: "CSVWithNames")
|> Stream.into(File.stream!(Path.join(tmp_dir, "failed.zip")))
|> Stream.run()
end)

View File

@ -1,44 +1,13 @@
defmodule Plausible.Imported.CSVImporterTest do
use Plausible.DataCase, async: true
use Plausible
use Plausible.DataCase
alias Plausible.Imported.{CSVImporter, SiteImport}
alias Testcontainers.MinioContainer
require SiteImport
doctest CSVImporter, import: true
@moduletag :minio
setup_all do
Testcontainers.start_link()
{:ok, minio} = Testcontainers.start_container(MinioContainer.new())
on_exit(fn -> :ok = Testcontainers.stop_container(minio.container_id) end)
connection_opts = MinioContainer.connection_opts(minio)
s3 = fn op -> ExAws.request!(op, connection_opts) end
s3.(ExAws.S3.put_bucket("imports", "us-east-1"))
s3.(ExAws.S3.put_bucket("exports", "us-east-1"))
{:ok, container: minio, s3: s3}
end
setup %{container: minio, s3: s3} do
connection_opts = MinioContainer.connection_opts(minio)
clean_bucket = fn bucket ->
ExAws.S3.list_objects_v2(bucket)
|> ExAws.stream!(connection_opts)
|> Stream.each(fn objects ->
keys = objects |> List.wrap() |> Enum.map(& &1.key)
s3.(ExAws.S3.delete_all_objects(bucket, keys))
end)
|> Stream.run()
end
clean_bucket.("imports")
clean_bucket.("exports")
:ok
on_full_build do
@moduletag :minio
end
describe "new_import/3 and parse_args/1" do
@ -64,10 +33,18 @@ defmodule Plausible.Imported.CSVImporterTest do
Enum.map(tables, fn table ->
filename = "#{table}_#{start_date}_#{end_date}.csv"
%{
"filename" => filename,
"s3_url" => "https://bucket-name.s3.eu-north-1.amazonaws.com/#{site.id}/#{filename}"
}
on_full_build do
%{
"filename" => filename,
"s3_url" =>
"https://bucket-name.s3.eu-north-1.amazonaws.com/#{site.id}/#{filename}-some-random-suffix"
}
else
%{
"filename" => filename,
"local_path" => "/tmp/some-random-path"
}
end
end)
date_range = CSVImporter.date_range(uploads)
@ -76,7 +53,8 @@ defmodule Plausible.Imported.CSVImporterTest do
CSVImporter.new_import(site, user,
start_date: date_range.first,
end_date: date_range.last,
uploads: uploads
uploads: uploads,
storage: on_full_build(do: "s3", else: "local")
)
assert %Oban.Job{args: %{"import_id" => import_id, "uploads" => ^uploads} = args} =
@ -93,14 +71,22 @@ defmodule Plausible.Imported.CSVImporterTest do
] = Plausible.Imported.list_all_imports(site)
assert %{imported_data: nil} = Repo.reload!(site)
assert CSVImporter.parse_args(args) == [uploads: uploads]
assert CSVImporter.parse_args(args) == [
uploads: uploads,
storage: on_full_build(do: "s3", else: "local")
]
end
end
describe "import_data/2" do
setup [:create_user, :create_new_site]
setup [:create_user, :create_new_site, :clean_buckets]
@describetag :tmp_dir
test "imports tables from S3", %{site: site, user: user} = ctx do
_ = ctx
test "imports tables from S3", %{site: site, user: user, s3: s3, container: minio} do
csvs = [
%{
name: "imported_browsers_20211230_20211231.csv",
@ -328,23 +314,29 @@ defmodule Plausible.Imported.CSVImporterTest do
uploads =
for %{name: name, body: body} <- csvs do
key = "#{site.id}/#{name}"
s3.(ExAws.S3.put_object("imports", key, body))
%{"filename" => name, "s3_url" => minio_url(minio, "imports", key)}
on_full_build do
%{s3_url: s3_url} = Plausible.S3.import_presign_upload(site.id, name)
[bucket, key] = String.split(URI.parse(s3_url).path, "/", parts: 2)
ExAws.request!(ExAws.S3.put_object(bucket, key, body))
%{"filename" => name, "s3_url" => s3_url}
else
local_path = Path.join(ctx.tmp_dir, name)
File.write!(local_path, body)
%{"filename" => name, "local_path" => local_path}
end
end
date_range = CSVImporter.date_range(uploads)
{:ok, job} =
{:ok, _job} =
CSVImporter.new_import(site, user,
start_date: date_range.first,
end_date: date_range.last,
uploads: uploads
uploads: uploads,
storage: on_full_build(do: "s3", else: "local")
)
job = Repo.reload!(job)
assert :ok = Plausible.Workers.ImportAnalytics.perform(job)
assert %{success: 1} = Oban.drain_queue(queue: :analytics_imports, with_safety?: false)
assert %SiteImport{
start_date: ~D[2011-12-25],
@ -356,7 +348,9 @@ defmodule Plausible.Imported.CSVImporterTest do
assert Plausible.Stats.Clickhouse.imported_pageview_count(site) == 99
end
test "fails on invalid CSV", %{site: site, user: user, s3: s3, container: minio} do
test "fails on invalid CSV", %{site: site, user: user} = ctx do
_ = ctx
csvs = [
%{
name: "imported_browsers_20211230_20211231.csv",
@ -382,24 +376,33 @@ defmodule Plausible.Imported.CSVImporterTest do
uploads =
for %{name: name, body: body} <- csvs do
key = "#{site.id}/#{name}"
s3.(ExAws.S3.put_object("imports", key, body))
%{"filename" => name, "s3_url" => minio_url(minio, "imports", key)}
on_full_build do
%{s3_url: s3_url} = Plausible.S3.import_presign_upload(site.id, name)
[bucket, key] = String.split(URI.parse(s3_url).path, "/", parts: 2)
ExAws.request!(ExAws.S3.put_object(bucket, key, body))
%{"filename" => name, "s3_url" => s3_url}
else
local_path = Path.join(ctx.tmp_dir, name)
File.write!(local_path, body)
%{"filename" => name, "local_path" => local_path}
end
end
date_range = CSVImporter.date_range(uploads)
{:ok, job} =
{:ok, _job} =
CSVImporter.new_import(site, user,
start_date: date_range.first,
end_date: date_range.last,
uploads: uploads
uploads: uploads,
storage: on_full_build(do: "s3", else: "local")
)
job = Repo.reload!(job)
assert %{discard: 1} = Oban.drain_queue(queue: :analytics_imports, with_safety?: false)
assert {:discard, message} = Plausible.Workers.ImportAnalytics.perform(job)
assert message =~ "CANNOT_PARSE_INPUT_ASSERTION_FAILED"
# TODO
# assert {:discard, message} = Plausible.Workers.ImportAnalytics.perform(job)
# assert message =~ "CANNOT_PARSE_INPUT_ASSERTION_FAILED"
assert %SiteImport{id: import_id, source: :csv, status: :failed} =
Repo.get_by!(SiteImport, site_id: site.id)
@ -411,11 +414,10 @@ defmodule Plausible.Imported.CSVImporterTest do
end
describe "export -> import" do
setup [:create_user, :create_new_site]
setup [:create_user, :create_new_site, :clean_buckets]
@describetag :tmp_dir
test "it works", %{site: site, user: user, s3: s3, tmp_dir: tmp_dir, container: minio} do
@tag :tmp_dir
test "it works", %{site: site, user: user, tmp_dir: tmp_dir} do
populate_stats(site, [
build(:pageview,
user_id: 123,
@ -479,50 +481,57 @@ defmodule Plausible.Imported.CSVImporterTest do
])
# export archive to s3
Oban.insert!(
Plausible.Workers.ExportCSV.new(%{
"site_id" => site.id,
"email_to" => user.email,
"s3_bucket" => "exports",
"s3_path" => "#{site.id}/Plausible.zip",
"s3_config_overrides" => Map.new(MinioContainer.connection_opts(minio))
})
)
on_full_build do
assert {:ok, _job} = Plausible.Exports.schedule_s3_export(site.id, user.email)
else
assert {:ok, %{args: %{"local_path" => local_path}}} =
Plausible.Exports.schedule_local_export(site.id, user.email)
end
assert %{success: 1} = Oban.drain_queue(queue: :s3_csv_export, with_safety: false)
assert %{success: 1} = Oban.drain_queue(queue: :analytics_exports, with_safety: false)
# download archive
s3.(
ExAws.S3.download_file(
"exports",
"/#{site.id}/Plausible.zip",
Path.join(tmp_dir, "Plausible.zip")
on_full_build do
ExAws.request!(
ExAws.S3.download_file(
Plausible.S3.exports_bucket(),
to_string(site.id),
Path.join(tmp_dir, "plausible-export.zip")
)
)
)
else
File.rename!(local_path, Path.join(tmp_dir, "plausible-export.zip"))
end
# unzip archive
{:ok, files} = :zip.unzip(to_charlist(Path.join(tmp_dir, "Plausible.zip")), cwd: tmp_dir)
{:ok, files} =
:zip.unzip(to_charlist(Path.join(tmp_dir, "plausible-export.zip")), cwd: tmp_dir)
# upload csvs
uploads =
Enum.map(files, fn file ->
key = "#{site.id}/#{Path.basename(file)}"
s3.(ExAws.S3.put_object("imports", key, File.read!(file)))
%{"filename" => Path.basename(file), "s3_url" => minio_url(minio, "imports", key)}
on_full_build do
%{s3_url: s3_url} = Plausible.S3.import_presign_upload(site.id, file)
[bucket, key] = String.split(URI.parse(s3_url).path, "/", parts: 2)
ExAws.request!(ExAws.S3.put_object(bucket, key, File.read!(file)))
%{"filename" => Path.basename(file), "s3_url" => s3_url}
else
%{"filename" => Path.basename(file), "local_path" => file}
end
end)
# run importer
date_range = CSVImporter.date_range(uploads)
{:ok, job} =
{:ok, _job} =
CSVImporter.new_import(site, user,
start_date: date_range.first,
end_date: date_range.last,
uploads: uploads
uploads: uploads,
storage: on_full_build(do: "s3", else: "local")
)
job = Repo.reload!(job)
assert :ok = Plausible.Workers.ImportAnalytics.perform(job)
assert %{success: 1} = Oban.drain_queue(queue: :analytics_imports, with_safety: false)
# validate import
assert %SiteImport{
@ -536,14 +545,27 @@ defmodule Plausible.Imported.CSVImporterTest do
end
end
defp minio_url(minio, bucket, key) do
arch = to_string(:erlang.system_info(:system_architecture))
defp clean_buckets(_context) do
on_full_build do
clean_bucket = fn bucket ->
ExAws.S3.list_objects_v2(bucket)
|> ExAws.stream!()
|> Stream.each(fn objects ->
keys = objects |> List.wrap() |> Enum.map(& &1.key)
ExAws.request!(ExAws.S3.delete_all_objects(bucket, keys))
end)
|> Stream.run()
end
if String.contains?(arch, "darwin") do
Path.join(["http://#{minio.ip_address}:9000", bucket, key])
clean_bucket.(Plausible.S3.imports_bucket())
clean_bucket.(Plausible.S3.exports_bucket())
on_exit(fn ->
clean_bucket.(Plausible.S3.imports_bucket())
clean_bucket.(Plausible.S3.exports_bucket())
end)
else
port = minio |> MinioContainer.connection_opts() |> Keyword.fetch!(:port)
Path.join(["http://172.17.0.1:#{port}", bucket, key])
:ok
end
end
end

View File

@ -639,7 +639,7 @@ defmodule PlausibleWeb.SiteControllerTest do
end
describe "GET /:website/settings/imports-exports" do
setup [:create_user, :log_in, :create_site]
setup [:create_user, :log_in, :create_site, :maybe_fake_minio]
test "renders empty imports list", %{conn: conn, site: site} do
conn = get(conn, "/#{site.domain}/settings/imports-exports")

View File

@ -232,4 +232,52 @@ defmodule Plausible.TestUtils do
def random_ip() do
Enum.map_join(1..4, ".", fn _ -> Enum.random(1..254) end)
end
def minio_running? do
%{host: host, port: port} = ExAws.Config.new(:s3)
healthcheck_req = Finch.build(:head, "http://#{host}:#{port}")
case Finch.request(healthcheck_req, Plausible.Finch) do
{:ok, %Finch.Response{}} -> true
{:error, %Mint.TransportError{reason: :econnrefused}} -> false
end
end
def ensure_minio do
unless minio_running?() do
%{host: host, port: port} = ExAws.Config.new(:s3)
IO.puts("""
#{IO.ANSI.red()}
You are trying to run MinIO tests (--include minio) \
but nothing is running on #{"http://#{host}:#{port}"}.
#{IO.ANSI.blue()}Please make sure to start MinIO with `make minio`#{IO.ANSI.reset()}
""")
:init.stop(1)
end
end
if Mix.env() == :test do
def maybe_fake_minio(_context) do
unless minio_running?() do
%{port: port} = ExAws.Config.new(:s3)
bypass = Bypass.open(port: port)
Bypass.expect(bypass, fn conn ->
# we only need to fake HeadObject, all the other S3 requests are "controlled"
"HEAD" = conn.method
# we pretent the object is not found
Plug.Conn.send_resp(conn, 404, [])
end)
end
:ok
end
else
def maybe_fake_minio(_context) do
:ok
end
end
end

View File

@ -13,6 +13,11 @@ end
Ecto.Adapters.SQL.Sandbox.mode(Plausible.Repo, :manual)
# warn about minio if it's included in tests but not running
if :minio in Keyword.fetch!(ExUnit.configuration(), :include) do
Plausible.TestUtils.ensure_minio()
end
if Mix.env() == :small_test do
IO.puts("Test mode: SMALL")
ExUnit.configure(exclude: [:slow, :minio, :full_build_only])

View File

@ -145,6 +145,7 @@ defmodule Plausible.Workers.ImportAnalyticsTest do
Ecto.Adapters.SQL.Sandbox.unboxed_run(Plausible.Repo, fn ->
Repo.delete_all(Plausible.Site)
Repo.delete_all(Plausible.Auth.User)
Repo.delete_all(Oban.Job)
end)
:ok
@ -164,6 +165,7 @@ defmodule Plausible.Workers.ImportAnalyticsTest do
Ecto.Adapters.SQL.Sandbox.unboxed_run(Plausible.Repo, fn ->
user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1))
site = insert(:site, members: [user])
site_id = site.id
import_opts = Keyword.put(import_opts, :listen?, true)
{:ok, job} = Plausible.Imported.NoopImporter.new_import(site, user, import_opts)
@ -173,7 +175,8 @@ defmodule Plausible.Workers.ImportAnalyticsTest do
|> Repo.reload!()
|> ImportAnalytics.perform()
assert_receive {:notification, :analytics_imports_jobs, %{"complete" => ^import_id}}
assert_receive {:notification, :analytics_imports_jobs,
%{"event" => "complete", "import_id" => ^import_id, "site_id" => ^site_id}}
end)
end
@ -183,6 +186,7 @@ defmodule Plausible.Workers.ImportAnalyticsTest do
Ecto.Adapters.SQL.Sandbox.unboxed_run(Plausible.Repo, fn ->
user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1))
site = insert(:site, members: [user])
site_id = site.id
import_opts =
import_opts
@ -196,7 +200,8 @@ defmodule Plausible.Workers.ImportAnalyticsTest do
|> Repo.reload!()
|> ImportAnalytics.perform()
assert_receive {:notification, :analytics_imports_jobs, %{"fail" => ^import_id}}
assert_receive {:notification, :analytics_imports_jobs,
%{"event" => "fail", "import_id" => ^import_id, "site_id" => ^site_id}}
end)
end
@ -206,6 +211,7 @@ defmodule Plausible.Workers.ImportAnalyticsTest do
Ecto.Adapters.SQL.Sandbox.unboxed_run(Plausible.Repo, fn ->
user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1))
site = insert(:site, members: [user])
site_id = site.id
import_opts =
import_opts
@ -226,7 +232,12 @@ defmodule Plausible.Workers.ImportAnalyticsTest do
_ -> ImportAnalytics.import_fail_transient(site_import)
end
assert_receive {:notification, :analytics_imports_jobs, %{"transient_fail" => ^import_id}}
assert_receive {:notification, :analytics_imports_jobs,
%{
"event" => "transient_fail",
"import_id" => ^import_id,
"site_id" => ^site_id
}}
end)
end
@ -237,6 +248,7 @@ defmodule Plausible.Workers.ImportAnalyticsTest do
Ecto.Adapters.SQL.Sandbox.unboxed_run(Plausible.Repo, fn ->
user = insert(:user, trial_expiry_date: Timex.today() |> Timex.shift(days: 1))
site = insert(:site, members: [user])
site_id = site.id
{:ok, job} = Plausible.Imported.NoopImporter.new_import(site, user, import_opts)
import_id = job.args[:import_id]
@ -247,7 +259,8 @@ defmodule Plausible.Workers.ImportAnalyticsTest do
|> Repo.reload!()
|> ImportAnalytics.perform()
assert_receive {:notification, :analytics_imports_jobs, %{"complete" => ^import_id}}
assert_receive {:notification, :analytics_imports_jobs,
%{"event" => "complete", "import_id" => ^import_id, "site_id" => ^site_id}}
end)
end
end