CSV exports (no UI) (#3836)

* csv exports

* use ex_unit's tmp_dir
This commit is contained in:
ruslandoga 2024-03-13 00:27:27 +08:00 committed by GitHub
parent 6eef32a8ff
commit 5a3072ca21
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 782 additions and 19 deletions

View File

@ -554,7 +554,8 @@ cloud_queues = [
trial_notification_emails: 1,
check_usage: 1,
notify_annual_renewal: 1,
lock_sites: 1
lock_sites: 1,
s3_csv_export: 1
]
queues = if(is_selfhost, do: base_queues, else: base_queues ++ cloud_queues)

349
lib/plausible/exports.ex Normal file
View File

@ -0,0 +1,349 @@
defmodule Plausible.Exports do
@moduledoc """
Contains functions to export data for events and sessions as Zip archives.
"""
require Plausible
import Ecto.Query
@doc """
Builds Ecto queries to export data from `events_v2` and `sessions_v2`
tables into the format of `imported_*` tables for a website.
"""
@spec export_queries(pos_integer, extname: String.t(), date_range: Date.Range.t()) ::
%{String.t() => Ecto.Query.t()}
def export_queries(site_id, opts \\ []) do
extname = opts[:extname] || ".csv"
date_range = opts[:date_range]
filename = fn table ->
name =
if date_range do
first_date = Timex.format!(date_range.first, "{YYYY}{0M}{0D}")
last_date = Timex.format!(date_range.last, "{YYYY}{0M}{0D}")
"#{table}_#{first_date}_#{last_date}"
else
table
end
name <> extname
end
%{
filename.("imported_visitors") => export_visitors_q(site_id),
filename.("imported_sources") => export_sources_q(site_id),
# NOTE: this query can result in `MEMORY_LIMIT_EXCEEDED` error
filename.("imported_pages") => export_pages_q(site_id),
filename.("imported_entry_pages") => export_entry_pages_q(site_id),
filename.("imported_exit_pages") => export_exit_pages_q(site_id),
filename.("imported_locations") => export_locations_q(site_id),
filename.("imported_devices") => export_devices_q(site_id),
filename.("imported_browsers") => export_browsers_q(site_id),
filename.("imported_operating_systems") => export_operating_systems_q(site_id)
}
end
Plausible.on_full_build do
defp sampled(table) do
Plausible.Stats.Sampling.add_query_hint(from(table))
end
else
defp sampled(table) do
table
end
end
defmacrop date(timestamp) do
quote do
selected_as(fragment("toDate(?)", unquote(timestamp)), :date)
end
end
defmacrop visit_duration(t) do
quote do
selected_as(
fragment("greatest(sum(?*?),0)", unquote(t).sign, unquote(t).duration),
:visit_duration
)
end
end
defmacrop visitors(t) do
quote do
selected_as(
fragment("toUInt64(round(uniq(?)*any(_sample_factor)))", unquote(t).user_id),
:visitors
)
end
end
defmacrop visits(t) do
quote do
selected_as(sum(unquote(t).sign), :visits)
end
end
defmacrop bounces(t) do
quote do
selected_as(
fragment("greatest(sum(?*?),0)", unquote(t).sign, unquote(t).is_bounce),
:bounces
)
end
end
@spec export_visitors_q(pos_integer) :: Ecto.Query.t()
def export_visitors_q(site_id) do
visitors_sessions_q =
from s in sampled("sessions_v2"),
where: s.site_id == ^site_id,
group_by: selected_as(:date),
select: %{
date: date(s.start),
bounces: bounces(s),
visits: visits(s),
visit_duration: visit_duration(s)
# NOTE: can we use just sessions_v2 table in this query? sum(pageviews) and visitors(s)?
# visitors: visitors(s)
}
visitors_events_q =
from e in sampled("events_v2"),
where: e.site_id == ^site_id,
group_by: selected_as(:date),
select: %{
date: date(e.timestamp),
visitors: visitors(e),
pageviews:
selected_as(
fragment("toUInt64(round(countIf(?='pageview')*any(_sample_factor)))", e.name),
:pageviews
)
}
visitors_q =
"e"
|> with_cte("e", as: ^visitors_events_q)
|> with_cte("s", as: ^visitors_sessions_q)
from e in visitors_q,
full_join: s in "s",
on: e.date == s.date,
order_by: selected_as(:date),
select: [
selected_as(fragment("greatest(?,?)", s.date, e.date), :date),
e.visitors,
e.pageviews,
s.bounces,
s.visits,
s.visit_duration
]
end
@spec export_sources_q(pos_integer) :: Ecto.Query.t()
def export_sources_q(site_id) do
from s in sampled("sessions_v2"),
where: s.site_id == ^site_id,
group_by: [
selected_as(:date),
selected_as(:source),
s.utm_medium,
s.utm_campaign,
s.utm_content,
s.utm_term
],
order_by: selected_as(:date),
select: [
date(s.start),
selected_as(s.referrer_source, :source),
s.utm_medium,
s.utm_campaign,
s.utm_content,
s.utm_term,
visitors(s),
visits(s),
visit_duration(s),
bounces(s)
]
end
@spec export_pages_q(pos_integer) :: Ecto.Query.t()
def export_pages_q(site_id) do
window_q =
from e in sampled("events_v2"),
where: e.site_id == ^site_id,
select: %{
timestamp: e.timestamp,
next_timestamp:
over(fragment("leadInFrame(?)", e.timestamp),
partition_by: e.session_id,
order_by: e.timestamp,
frame: fragment("ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING")
),
pathname: e.pathname,
hostname: e.hostname,
name: e.name,
user_id: e.user_id,
_sample_factor: fragment("_sample_factor")
}
from e in subquery(window_q),
group_by: [selected_as(:date), e.pathname],
order_by: selected_as(:date),
select: [
date(e.timestamp),
selected_as(fragment("any(?)", e.hostname), :hostname),
selected_as(e.pathname, :page),
visitors(e),
selected_as(
fragment("toUInt64(round(countIf(?='pageview')*any(_sample_factor)))", e.name),
:pageviews
),
# NOTE: are exits pageviews or any events?
selected_as(
fragment("toUInt64(round(countIf(?=0)*any(_sample_factor)))", e.next_timestamp),
:exits
),
selected_as(
fragment("sum(greatest(?,0))", e.next_timestamp - e.timestamp),
:time_on_page
)
]
end
@spec export_entry_pages_q(pos_integer) :: Ecto.Query.t()
def export_entry_pages_q(site_id) do
from s in sampled("sessions_v2"),
where: s.site_id == ^site_id,
group_by: [selected_as(:date), s.entry_page],
order_by: selected_as(:date),
select: [
date(s.start),
s.entry_page,
visitors(s),
selected_as(
fragment("toUInt64(round(sum(?)*any(_sample_factor)))", s.sign),
:entrances
),
visit_duration(s),
bounces(s)
]
end
@spec export_exit_pages_q(pos_integer) :: Ecto.Query.t()
def export_exit_pages_q(site_id) do
from s in sampled("sessions_v2"),
where: s.site_id == ^site_id,
group_by: [selected_as(:date), s.exit_page],
order_by: selected_as(:date),
select: [
date(s.start),
s.exit_page,
visitors(s),
selected_as(
fragment("toUInt64(round(sum(?)*any(_sample_factor)))", s.sign),
:exits
)
]
end
@spec export_locations_q(pos_integer) :: Ecto.Query.t()
def export_locations_q(site_id) do
from s in sampled("sessions_v2"),
where: s.site_id == ^site_id,
where: s.city_geoname_id != 0 and s.country_code != "\0\0" and s.country_code != "ZZ",
group_by: [selected_as(:date), s.country_code, selected_as(:region), s.city_geoname_id],
order_by: selected_as(:date),
select: [
date(s.start),
selected_as(s.country_code, :country),
selected_as(s.subdivision1_code, :region),
selected_as(s.city_geoname_id, :city),
visitors(s),
visits(s),
visit_duration(s),
bounces(s)
]
end
@spec export_devices_q(pos_integer) :: Ecto.Query.t()
def export_devices_q(site_id) do
from s in sampled("sessions_v2"),
where: s.site_id == ^site_id,
group_by: [selected_as(:date), s.screen_size],
order_by: selected_as(:date),
select: [
date(s.start),
selected_as(s.screen_size, :device),
visitors(s),
visits(s),
visit_duration(s),
bounces(s)
]
end
@spec export_browsers_q(pos_integer) :: Ecto.Query.t()
def export_browsers_q(site_id) do
from s in sampled("sessions_v2"),
where: s.site_id == ^site_id,
group_by: [selected_as(:date), s.browser],
order_by: selected_as(:date),
select: [
date(s.start),
s.browser,
visitors(s),
visits(s),
visit_duration(s),
bounces(s)
]
end
@spec export_operating_systems_q(pos_integer) :: Ecto.Query.t()
def export_operating_systems_q(site_id) do
from s in sampled("sessions_v2"),
where: s.site_id == ^site_id,
group_by: [selected_as(:date), s.operating_system],
order_by: selected_as(:date),
select: [
date(s.start),
s.operating_system,
visitors(s),
visits(s),
visit_duration(s),
bounces(s)
]
end
@doc """
Creates a streamable Zip archive from the provided (named) Ecto queries.
Example usage:
{:ok, pool} = Ch.start_link(pool_size: 1)
DBConnection.run(pool, fn conn ->
conn
|> stream_archive(export_queries(_site_id = 1), format: "CSVWithNames")
|> Stream.into(File.stream!("export.zip"))
|> Stream.run()
end)
"""
@spec stream_archive(DBConnection.t(), %{String.t() => Ecto.Query.t()}, [Ch.query_option()]) ::
Enumerable.t()
def stream_archive(conn, named_queries, opts \\ []) do
entries =
Enum.map(named_queries, fn {name, query} ->
{sql, params} = Plausible.ClickhouseRepo.to_sql(:all, query)
datastream =
conn
|> Ch.stream(sql, params, opts)
|> Stream.map(fn %Ch.Result{data: data} -> data end)
Zstream.entry(name, datastream, coder: Zstream.Coder.Stored)
end)
Zstream.zip(entries)
end
end

View File

@ -3,6 +3,60 @@ defmodule Plausible.S3 do
Helper functions for S3 exports/imports.
"""
@doc """
Chunks and uploads Zip archive to the provided S3 destination.
Returns a presigned URL to download the exported Zip archive from S3.
The URL expires in 24 hours.
In the current implementation the bucket always goes into the path component.
"""
@spec export_upload_multipart(Enumerable.t(), String.t(), Path.t(), keyword) ::
:uri_string.uri_string()
def export_upload_multipart(stream, s3_bucket, s3_path, config_overrides \\ []) do
config = ExAws.Config.new(:s3)
# 5 MiB is the smallest chunk size AWS S3 supports
chunk_into_parts(stream, 5 * 1024 * 1024)
|> ExAws.S3.upload(s3_bucket, s3_path,
content_disposition: ~s|attachment; filename="Plausible.zip"|,
content_type: "application/zip"
)
|> ExAws.request!(config_overrides)
{:ok, download_url} =
ExAws.S3.presigned_url(config, :get, s3_bucket, s3_path, expires_in: _24hr = 86_400)
download_url
end
defp chunk_into_parts(stream, min_part_size) do
Stream.chunk_while(
stream,
_acc = %{buffer_size: 0, buffer: [], min_part_size: min_part_size},
_chunk_fun = &buffer_until_big_enough/2,
_after_fun = &flush_leftovers/1
)
end
defp buffer_until_big_enough(data, acc) do
%{buffer_size: prev_buffer_size, buffer: prev_buffer, min_part_size: min_part_size} = acc
new_buffer_size = prev_buffer_size + IO.iodata_length(data)
new_buffer = [prev_buffer | data]
if new_buffer_size > min_part_size do
# NOTE: PR to make ExAws.Operation.ExAws.Operation.S3.put_content_length_header/3 accept iodata
{:cont, IO.iodata_to_binary(new_buffer), %{acc | buffer_size: 0, buffer: []}}
else
{:cont, %{acc | buffer_size: new_buffer_size, buffer: new_buffer}}
end
end
defp flush_leftovers(acc) do
# NOTE: PR to make ExAws.Operation.ExAws.Operation.S3.put_content_length_header/3 accept iodata
{:cont, IO.iodata_to_binary(acc.buffer), %{acc | buffer_size: 0, buffer: []}}
end
@doc """
Returns `access_key_id` and `secret_access_key` to be used by ClickHouse during imports from S3.
"""

83
lib/workers/export_csv.ex Normal file
View File

@ -0,0 +1,83 @@
defmodule Plausible.Workers.ExportCSV do
@moduledoc """
Worker for running CSV export jobs.
"""
use Oban.Worker,
queue: :s3_csv_export,
max_attempts: 3,
unique: [fields: [:args], keys: [:s3_bucket, :s3_path], period: 60]
@impl true
def perform(job) do
%Oban.Job{
args:
%{
"site_id" => site_id,
"email_to" => email,
"s3_bucket" => s3_bucket,
"s3_path" => s3_path
} = args
} = job
{:ok, ch} =
Plausible.ClickhouseRepo.config()
|> Keyword.replace!(:pool_size, 1)
|> Ch.start_link()
# NOTE: should we use site.timezone?
# %Ch.Result{rows: [[min_date, max_date]]} =
# Ch.query!(
# ch,
# "SELECT toDate(min(timestamp)), toDate(max(timestamp)) FROM events_v2 WHERE site_id={site_id:UInt64}",
# %{"site_id" => site_id}
# )
download_url =
DBConnection.run(
ch,
fn conn ->
conn
|> Plausible.Exports.stream_archive(
# date_range: Date.range(min_date, max_date)
Plausible.Exports.export_queries(site_id, extname: ".csv"),
format: "CSVWithNames"
)
|> Plausible.S3.export_upload_multipart(s3_bucket, s3_path, s3_config_overrides(args))
end,
timeout: :infinity
)
# NOTE: replace with proper Plausible.Email template
Plausible.Mailer.deliver_now!(
Bamboo.Email.new_email(
from: "plausible@email.com",
to: email,
subject: "EXPORT SUCCESS",
text_body: """
download it from #{download_url}! hurry up! you have 24 hours!"
""",
html_body: """
download it from <a href="#{download_url}">here</a>! hurry up! you have 24 hours!
"""
)
)
:ok
end
# right now custom config is used in tests only (to access the minio container)
# ideally it would be passed via the s3 url
# but ExAws.S3.upload is hard to make work with s3 urls
if Mix.env() in [:test, :small_test] do
defp s3_config_overrides(args) do
if config_overrides = args["s3_config_overrides"] do
Enum.map(config_overrides, fn {k, v} -> {String.to_existing_atom(k), v} end)
else
[]
end
end
else
defp s3_config_overrides(_args), do: []
end
end

View File

@ -140,7 +140,8 @@ defmodule Plausible.MixProject do
{:ex_aws, "~> 2.5"},
{:ex_aws_s3, "~> 2.5"},
{:sweet_xml, "~> 0.7.4"},
{:testcontainers, "~> 1.6", only: [:test, :small_test]}
{:testcontainers, "~> 1.6", only: [:test, :small_test]},
{:zstream, "~> 0.6.4"}
]
end

View File

@ -11,7 +11,7 @@
"cachex": {:hex, :cachex, "3.6.0", "14a1bfbeee060dd9bec25a5b6f4e4691e3670ebda28c8ba2884b12fe30b36bf8", [:mix], [{:eternal, "~> 1.2", [hex: :eternal, repo: "hexpm", optional: false]}, {:jumper, "~> 1.0", [hex: :jumper, repo: "hexpm", optional: false]}, {:sleeplocks, "~> 1.1", [hex: :sleeplocks, repo: "hexpm", optional: false]}, {:unsafe, "~> 1.0", [hex: :unsafe, repo: "hexpm", optional: false]}], "hexpm", "ebf24e373883bc8e0c8d894a63bbe102ae13d918f790121f5cfe6e485cc8e2e2"},
"castore": {:hex, :castore, "1.0.5", "9eeebb394cc9a0f3ae56b813459f990abb0a3dedee1be6b27fdb50301930502f", [:mix], [], "hexpm", "8d7c597c3e4a64c395980882d4bca3cebb8d74197c590dc272cfd3b6a6310578"},
"certifi": {:hex, :certifi, "2.12.0", "2d1cca2ec95f59643862af91f001478c9863c2ac9cb6e2f89780bfd8de987329", [:rebar3], [], "hexpm", "ee68d85df22e554040cdb4be100f33873ac6051387baf6a8f6ce82272340ff1c"},
"ch": {:hex, :ch, "0.2.4", "d510fbb5542d009f7c5b00bb1ecab73307b6066d9fb9b220600257d462cba67f", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.5", [hex: :ecto, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: false]}], "hexpm", "8f065d15aaf912ae8da56c9ca5298fb2d1a09108d006de589bcf8c2b39a7e2bb"},
"ch": {:hex, :ch, "0.2.5", "b8d70689951bd14c8c8791dc72cdc957ba489ceae723e79cf1a91d95b6b855ae", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.5", [hex: :ecto, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: false]}], "hexpm", "97de104c8f513a23c6d673da37741f68ae743f6cdb654b96a728d382e2fba4de"},
"chatterbox": {:hex, :ts_chatterbox, "0.15.1", "5cac4d15dd7ad61fc3c4415ce4826fc563d4643dee897a558ec4ea0b1c835c9c", [:rebar3], [{:hpack, "~> 0.3.0", [hex: :hpack_erl, repo: "hexpm", optional: false]}], "hexpm", "4f75b91451338bc0da5f52f3480fa6ef6e3a2aeecfc33686d6b3d0a0948f31aa"},
"cldr_utils": {:hex, :cldr_utils, "2.24.2", "364fa30be55d328e704629568d431eb74cd2f085752b27f8025520b566352859", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:certifi, "~> 2.5", [hex: :certifi, repo: "hexpm", optional: true]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}], "hexpm", "3362b838836a9f0fa309de09a7127e36e67310e797d556db92f71b548832c7cf"},
"cloak": {:hex, :cloak, "1.1.2", "7e0006c2b0b98d976d4f559080fabefd81f0e0a50a3c4b621f85ceeb563e80bb", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "940d5ac4fcd51b252930fd112e319ea5ae6ab540b722f3ca60a85666759b9585"},
@ -156,5 +156,6 @@
"websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"},
"websock_adapter": {:hex, :websock_adapter, "0.5.5", "9dfeee8269b27e958a65b3e235b7e447769f66b5b5925385f5a569269164a210", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "4b977ba4a01918acbf77045ff88de7f6972c2a009213c515a445c48f224ffce9"},
"yamerl": {:hex, :yamerl, "0.10.0", "4ff81fee2f1f6a46f1700c0d880b24d193ddb74bd14ef42cb0bcf46e81ef2f8e", [:rebar3], [], "hexpm", "346adb2963f1051dc837a2364e4acf6eb7d80097c0f53cbdc3046ec8ec4b4e6e"},
"zstream": {:hex, :zstream, "0.6.4", "169ce887a443d4163085ee682ab1b0ad38db8fa45e843927b9b431a92f4b7d9e", [:mix], [], "hexpm", "acc6c35b6db9eb2cfe8b85e972cb9dc1b730f8efeb76c5bbe871216fe639d9a1"},
"zxcvbn": {:git, "https://github.com/techgaun/zxcvbn-elixir.git", "aede1d49d39e89d7b3d1c381de5f04c9907d8171", []},
}

View File

@ -0,0 +1,141 @@
defmodule Plausible.ExportsTest do
use Plausible.DataCase, async: true
# for e2e export->import tests please see Plausible.Imported.CSVImporterTest
describe "export_queries/2" do
test "returns named ecto queries" do
queries = Plausible.Exports.export_queries(_site_id = 1)
assert queries |> Map.values() |> Enum.all?(&match?(%Ecto.Query{}, &1))
assert Map.keys(queries) == [
"imported_browsers.csv",
"imported_devices.csv",
"imported_entry_pages.csv",
"imported_exit_pages.csv",
"imported_locations.csv",
"imported_operating_systems.csv",
"imported_pages.csv",
"imported_sources.csv",
"imported_visitors.csv"
]
end
test "with date range" do
queries =
Plausible.Exports.export_queries(_site_id = 1,
date_range: Date.range(~D[2023-01-01], ~D[2024-03-12])
)
assert Map.keys(queries) == [
"imported_browsers_20230101_20240312.csv",
"imported_devices_20230101_20240312.csv",
"imported_entry_pages_20230101_20240312.csv",
"imported_exit_pages_20230101_20240312.csv",
"imported_locations_20230101_20240312.csv",
"imported_operating_systems_20230101_20240312.csv",
"imported_pages_20230101_20240312.csv",
"imported_sources_20230101_20240312.csv",
"imported_visitors_20230101_20240312.csv"
]
end
test "with custom extension" do
queries =
Plausible.Exports.export_queries(_site_id = 1,
extname: ".ch"
)
assert Map.keys(queries) == [
"imported_browsers.ch",
"imported_devices.ch",
"imported_entry_pages.ch",
"imported_exit_pages.ch",
"imported_locations.ch",
"imported_operating_systems.ch",
"imported_pages.ch",
"imported_sources.ch",
"imported_visitors.ch"
]
end
end
describe "stream_archive/3" do
@describetag :tmp_dir
setup do
config = Keyword.replace!(Plausible.ClickhouseRepo.config(), :pool_size, 1)
{:ok, ch: start_supervised!({Ch, config})}
end
test "creates zip archive", %{ch: ch, tmp_dir: tmp_dir} do
queries = %{
"1.csv" => from(n in "numbers", select: n.number, limit: 3),
"2.csv" =>
from(n in "numbers",
select: [n.number, selected_as(n.number + n.number, :double)],
limit: 3
)
}
DBConnection.run(ch, fn conn ->
conn
|> Plausible.Exports.stream_archive(queries, database: "system", format: "CSVWithNames")
|> Stream.into(File.stream!(Path.join(tmp_dir, "numbers.zip")))
|> Stream.run()
end)
assert {:ok, files} =
:zip.unzip(to_charlist(Path.join(tmp_dir, "numbers.zip")), cwd: tmp_dir)
assert Enum.map(files, &Path.basename/1) == ["1.csv", "2.csv"]
read_csv = fn file ->
Enum.find(files, &(Path.basename(&1) == file))
|> File.read!()
|> NimbleCSV.RFC4180.parse_string(skip_headers: false)
end
assert read_csv.("1.csv") ==
NimbleCSV.RFC4180.parse_string(
"""
number
0
1
2
""",
skip_headers: false
)
assert read_csv.("2.csv") ==
NimbleCSV.RFC4180.parse_string(
"""
number,double
0,0
1,2
2,4
""",
skip_headers: false
)
end
test "stops on error", %{ch: ch, tmp_dir: tmp_dir} do
queries = %{
"1.csv" => from(n in "numbers", select: n.number, limit: 1000),
"2.csv" => from(n in "no_such_table", select: n.number)
}
assert_raise Ch.Error, ~r/UNKNOWN_TABLE/, fn ->
DBConnection.run(ch, fn conn ->
conn
|> Plausible.Exports.stream_archive(queries, database: "system", format: "CSVWithNames")
|> Stream.into(File.stream!(Path.join(tmp_dir, "failed.zip")))
|> Stream.run()
end)
end
assert {:error, :einval} =
:zip.unzip(to_charlist(Path.join(tmp_dir, "failed.zip")), cwd: tmp_dir)
end
end
end

View File

@ -13,11 +13,11 @@ defmodule Plausible.Imported.CSVImporterTest do
on_exit(fn -> :ok = Testcontainers.stop_container(minio.container_id) end)
connection_opts = MinioContainer.connection_opts(minio)
bucket = "imports"
ExAws.request!(ExAws.S3.put_bucket(bucket, "us-east-1"), connection_opts)
on_exit(fn -> ExAws.request!(ExAws.S3.delete_bucket(bucket), connection_opts) end)
s3 = fn op -> ExAws.request!(op, connection_opts) end
s3.(ExAws.S3.put_bucket("imports", "us-east-1"))
on_exit(fn -> s3.(ExAws.S3.delete_bucket("imports")) end)
{:ok, container: minio, bucket: bucket}
{:ok, container: minio, s3: s3}
end
describe "new_import/3 and parse_args/1" do
@ -75,7 +75,7 @@ defmodule Plausible.Imported.CSVImporterTest do
describe "import_data/2" do
setup [:create_user, :create_new_site]
test "imports tables from S3", %{site: site, user: user, bucket: bucket, container: minio} do
test "imports tables from S3", %{site: site, user: user, s3: s3, container: minio} do
csvs = [
%{
name: "imported_browsers.csv",
@ -301,18 +301,16 @@ defmodule Plausible.Imported.CSVImporterTest do
}
]
connection_opts = MinioContainer.connection_opts(minio)
on_exit(fn ->
keys = Enum.map(csvs, fn csv -> "#{site.id}/#{csv.name}" end)
ExAws.request!(ExAws.S3.delete_all_objects(bucket, keys), connection_opts)
s3.(ExAws.S3.delete_all_objects("imports", keys))
end)
uploads =
for %{name: name, body: body} <- csvs do
key = "#{site.id}/#{name}"
ExAws.request!(ExAws.S3.put_object(bucket, key, body), connection_opts)
%{"filename" => name, "s3_url" => s3_url(minio, bucket, key)}
s3.(ExAws.S3.put_object("imports", key, body))
%{"filename" => name, "s3_url" => s3_url(minio, "imports", key)}
end
{:ok, job} =
@ -340,7 +338,7 @@ defmodule Plausible.Imported.CSVImporterTest do
assert Plausible.Stats.Clickhouse.imported_pageview_count(site) == 99
end
test "fails on invalid CSV", %{site: site, user: user, bucket: bucket, container: minio} do
test "fails on invalid CSV", %{site: site, user: user, s3: s3, container: minio} do
csvs = [
%{
name: "imported_browsers.csv",
@ -364,18 +362,16 @@ defmodule Plausible.Imported.CSVImporterTest do
}
]
connection_opts = MinioContainer.connection_opts(minio)
on_exit(fn ->
keys = Enum.map(csvs, fn csv -> "#{site.id}/#{csv.name}" end)
ExAws.request!(ExAws.S3.delete_all_objects(bucket, keys), connection_opts)
s3.(ExAws.S3.delete_all_objects("imports", keys))
end)
uploads =
for %{name: name, body: body} <- csvs do
key = "#{site.id}/#{name}"
ExAws.request!(ExAws.S3.put_object(bucket, key, body), connection_opts)
%{"filename" => name, "s3_url" => s3_url(minio, bucket, key)}
s3.(ExAws.S3.put_object("imports", key, body))
%{"filename" => name, "s3_url" => s3_url(minio, "imports", key)}
end
{:ok, job} =
@ -401,6 +397,143 @@ defmodule Plausible.Imported.CSVImporterTest do
end
end
describe "export -> import" do
setup [:create_user, :create_new_site]
@describetag :tmp_dir
test "it works", %{site: site, user: user, s3: s3, tmp_dir: tmp_dir, container: minio} do
populate_stats(site, [
build(:pageview,
user_id: 123,
pathname: "/",
timestamp:
Timex.shift(~N[2021-10-20 12:00:00], minutes: -1) |> NaiveDateTime.truncate(:second),
country_code: "EE",
subdivision1_code: "EE-37",
city_geoname_id: 588_409,
referrer_source: "Google"
),
build(:pageview,
user_id: 123,
pathname: "/some-other-page",
timestamp:
Timex.shift(~N[2021-10-20 12:00:00], minutes: -2) |> NaiveDateTime.truncate(:second),
country_code: "EE",
subdivision1_code: "EE-37",
city_geoname_id: 588_409,
referrer_source: "Google"
),
build(:pageview,
pathname: "/",
timestamp:
Timex.shift(~N[2021-10-20 12:00:00], days: -1) |> NaiveDateTime.truncate(:second),
utm_medium: "search",
utm_campaign: "ads",
utm_source: "google",
utm_content: "content",
utm_term: "term",
browser: "Firefox",
browser_version: "120",
operating_system: "Mac",
operating_system_version: "14"
),
build(:pageview,
timestamp:
Timex.shift(~N[2021-10-20 12:00:00], months: -1) |> NaiveDateTime.truncate(:second),
country_code: "EE",
browser: "Firefox",
browser_version: "120",
operating_system: "Mac",
operating_system_version: "14"
),
build(:pageview,
timestamp:
Timex.shift(~N[2021-10-20 12:00:00], months: -5) |> NaiveDateTime.truncate(:second),
utm_campaign: "ads",
country_code: "EE",
referrer_source: "Google",
browser: "FirefoxNoVersion",
operating_system: "MacNoVersion"
),
build(:event,
timestamp:
Timex.shift(~N[2021-10-20 12:00:00], days: -1) |> NaiveDateTime.truncate(:second),
name: "Signup",
"meta.key": ["variant"],
"meta.value": ["A"]
)
])
# export archive to s3
s3.(ExAws.S3.put_bucket("exports", "us-east-1"))
on_exit(fn -> s3.(ExAws.S3.delete_bucket("exports")) end)
Oban.insert!(
Plausible.Workers.ExportCSV.new(%{
"site_id" => site.id,
"email_to" => user.email,
"s3_bucket" => "exports",
"s3_path" => "#{site.id}/Plausible.zip",
"s3_config_overrides" => Map.new(MinioContainer.connection_opts(minio))
})
)
on_exit(fn -> s3.(ExAws.S3.delete_object("exports", "#{site.id}/Plausible.zip")) end)
assert %{success: 1} = Oban.drain_queue(queue: :s3_csv_export, with_safety: false)
# download archive
s3.(
ExAws.S3.download_file(
"exports",
"/#{site.id}/Plausible.zip",
Path.join(tmp_dir, "Plausible.zip")
)
)
# unzip archive
{:ok, files} = :zip.unzip(to_charlist(Path.join(tmp_dir, "Plausible.zip")), cwd: tmp_dir)
# upload csvs
on_exit(fn ->
keys = Enum.map(files, fn file -> "#{site.id}/#{Path.basename(file)}" end)
s3.(ExAws.S3.delete_all_objects("imports", keys))
end)
uploads =
Enum.map(files, fn file ->
key = "#{site.id}/#{Path.basename(file)}"
s3.(ExAws.S3.put_object("imports", key, File.read!(file)))
%{"filename" => Path.basename(file), "s3_url" => s3_url(minio, "imports", key)}
end)
# run importer
{:ok, job} =
CSVImporter.new_import(
site,
user,
start_date: ~D[1970-01-01],
end_date: ~D[1970-01-01],
uploads: uploads
)
job = Repo.reload!(job)
assert :ok = Plausible.Workers.ImportAnalytics.perform(job)
# validate import
assert %SiteImport{
start_date: ~D[2021-05-20],
end_date: ~D[2021-10-20],
source: :csv,
status: :completed
} = Repo.get_by!(SiteImport, site_id: site.id)
assert Plausible.Stats.Clickhouse.imported_pageview_count(site) == 5
end
end
defp s3_url(minio, bucket, key) do
port = minio |> MinioContainer.connection_opts() |> Keyword.fetch!(:port)
Path.join(["http://172.17.0.1:#{port}", bucket, key])