diff --git a/config/runtime.exs b/config/runtime.exs index d4fa91863..2d056318d 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -554,7 +554,8 @@ cloud_queues = [ trial_notification_emails: 1, check_usage: 1, notify_annual_renewal: 1, - lock_sites: 1 + lock_sites: 1, + s3_csv_export: 1 ] queues = if(is_selfhost, do: base_queues, else: base_queues ++ cloud_queues) diff --git a/lib/plausible/exports.ex b/lib/plausible/exports.ex new file mode 100644 index 000000000..4f2094de3 --- /dev/null +++ b/lib/plausible/exports.ex @@ -0,0 +1,349 @@ +defmodule Plausible.Exports do + @moduledoc """ + Contains functions to export data for events and sessions as Zip archives. + """ + + require Plausible + import Ecto.Query + + @doc """ + Builds Ecto queries to export data from `events_v2` and `sessions_v2` + tables into the format of `imported_*` tables for a website. + """ + @spec export_queries(pos_integer, extname: String.t(), date_range: Date.Range.t()) :: + %{String.t() => Ecto.Query.t()} + def export_queries(site_id, opts \\ []) do + extname = opts[:extname] || ".csv" + date_range = opts[:date_range] + + filename = fn table -> + name = + if date_range do + first_date = Timex.format!(date_range.first, "{YYYY}{0M}{0D}") + last_date = Timex.format!(date_range.last, "{YYYY}{0M}{0D}") + "#{table}_#{first_date}_#{last_date}" + else + table + end + + name <> extname + end + + %{ + filename.("imported_visitors") => export_visitors_q(site_id), + filename.("imported_sources") => export_sources_q(site_id), + # NOTE: this query can result in `MEMORY_LIMIT_EXCEEDED` error + filename.("imported_pages") => export_pages_q(site_id), + filename.("imported_entry_pages") => export_entry_pages_q(site_id), + filename.("imported_exit_pages") => export_exit_pages_q(site_id), + filename.("imported_locations") => export_locations_q(site_id), + filename.("imported_devices") => export_devices_q(site_id), + filename.("imported_browsers") => export_browsers_q(site_id), + filename.("imported_operating_systems") => export_operating_systems_q(site_id) + } + end + + Plausible.on_full_build do + defp sampled(table) do + Plausible.Stats.Sampling.add_query_hint(from(table)) + end + else + defp sampled(table) do + table + end + end + + defmacrop date(timestamp) do + quote do + selected_as(fragment("toDate(?)", unquote(timestamp)), :date) + end + end + + defmacrop visit_duration(t) do + quote do + selected_as( + fragment("greatest(sum(?*?),0)", unquote(t).sign, unquote(t).duration), + :visit_duration + ) + end + end + + defmacrop visitors(t) do + quote do + selected_as( + fragment("toUInt64(round(uniq(?)*any(_sample_factor)))", unquote(t).user_id), + :visitors + ) + end + end + + defmacrop visits(t) do + quote do + selected_as(sum(unquote(t).sign), :visits) + end + end + + defmacrop bounces(t) do + quote do + selected_as( + fragment("greatest(sum(?*?),0)", unquote(t).sign, unquote(t).is_bounce), + :bounces + ) + end + end + + @spec export_visitors_q(pos_integer) :: Ecto.Query.t() + def export_visitors_q(site_id) do + visitors_sessions_q = + from s in sampled("sessions_v2"), + where: s.site_id == ^site_id, + group_by: selected_as(:date), + select: %{ + date: date(s.start), + bounces: bounces(s), + visits: visits(s), + visit_duration: visit_duration(s) + # NOTE: can we use just sessions_v2 table in this query? sum(pageviews) and visitors(s)? + # visitors: visitors(s) + } + + visitors_events_q = + from e in sampled("events_v2"), + where: e.site_id == ^site_id, + group_by: selected_as(:date), + select: %{ + date: date(e.timestamp), + visitors: visitors(e), + pageviews: + selected_as( + fragment("toUInt64(round(countIf(?='pageview')*any(_sample_factor)))", e.name), + :pageviews + ) + } + + visitors_q = + "e" + |> with_cte("e", as: ^visitors_events_q) + |> with_cte("s", as: ^visitors_sessions_q) + + from e in visitors_q, + full_join: s in "s", + on: e.date == s.date, + order_by: selected_as(:date), + select: [ + selected_as(fragment("greatest(?,?)", s.date, e.date), :date), + e.visitors, + e.pageviews, + s.bounces, + s.visits, + s.visit_duration + ] + end + + @spec export_sources_q(pos_integer) :: Ecto.Query.t() + def export_sources_q(site_id) do + from s in sampled("sessions_v2"), + where: s.site_id == ^site_id, + group_by: [ + selected_as(:date), + selected_as(:source), + s.utm_medium, + s.utm_campaign, + s.utm_content, + s.utm_term + ], + order_by: selected_as(:date), + select: [ + date(s.start), + selected_as(s.referrer_source, :source), + s.utm_medium, + s.utm_campaign, + s.utm_content, + s.utm_term, + visitors(s), + visits(s), + visit_duration(s), + bounces(s) + ] + end + + @spec export_pages_q(pos_integer) :: Ecto.Query.t() + def export_pages_q(site_id) do + window_q = + from e in sampled("events_v2"), + where: e.site_id == ^site_id, + select: %{ + timestamp: e.timestamp, + next_timestamp: + over(fragment("leadInFrame(?)", e.timestamp), + partition_by: e.session_id, + order_by: e.timestamp, + frame: fragment("ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING") + ), + pathname: e.pathname, + hostname: e.hostname, + name: e.name, + user_id: e.user_id, + _sample_factor: fragment("_sample_factor") + } + + from e in subquery(window_q), + group_by: [selected_as(:date), e.pathname], + order_by: selected_as(:date), + select: [ + date(e.timestamp), + selected_as(fragment("any(?)", e.hostname), :hostname), + selected_as(e.pathname, :page), + visitors(e), + selected_as( + fragment("toUInt64(round(countIf(?='pageview')*any(_sample_factor)))", e.name), + :pageviews + ), + # NOTE: are exits pageviews or any events? + selected_as( + fragment("toUInt64(round(countIf(?=0)*any(_sample_factor)))", e.next_timestamp), + :exits + ), + selected_as( + fragment("sum(greatest(?,0))", e.next_timestamp - e.timestamp), + :time_on_page + ) + ] + end + + @spec export_entry_pages_q(pos_integer) :: Ecto.Query.t() + def export_entry_pages_q(site_id) do + from s in sampled("sessions_v2"), + where: s.site_id == ^site_id, + group_by: [selected_as(:date), s.entry_page], + order_by: selected_as(:date), + select: [ + date(s.start), + s.entry_page, + visitors(s), + selected_as( + fragment("toUInt64(round(sum(?)*any(_sample_factor)))", s.sign), + :entrances + ), + visit_duration(s), + bounces(s) + ] + end + + @spec export_exit_pages_q(pos_integer) :: Ecto.Query.t() + def export_exit_pages_q(site_id) do + from s in sampled("sessions_v2"), + where: s.site_id == ^site_id, + group_by: [selected_as(:date), s.exit_page], + order_by: selected_as(:date), + select: [ + date(s.start), + s.exit_page, + visitors(s), + selected_as( + fragment("toUInt64(round(sum(?)*any(_sample_factor)))", s.sign), + :exits + ) + ] + end + + @spec export_locations_q(pos_integer) :: Ecto.Query.t() + def export_locations_q(site_id) do + from s in sampled("sessions_v2"), + where: s.site_id == ^site_id, + where: s.city_geoname_id != 0 and s.country_code != "\0\0" and s.country_code != "ZZ", + group_by: [selected_as(:date), s.country_code, selected_as(:region), s.city_geoname_id], + order_by: selected_as(:date), + select: [ + date(s.start), + selected_as(s.country_code, :country), + selected_as(s.subdivision1_code, :region), + selected_as(s.city_geoname_id, :city), + visitors(s), + visits(s), + visit_duration(s), + bounces(s) + ] + end + + @spec export_devices_q(pos_integer) :: Ecto.Query.t() + def export_devices_q(site_id) do + from s in sampled("sessions_v2"), + where: s.site_id == ^site_id, + group_by: [selected_as(:date), s.screen_size], + order_by: selected_as(:date), + select: [ + date(s.start), + selected_as(s.screen_size, :device), + visitors(s), + visits(s), + visit_duration(s), + bounces(s) + ] + end + + @spec export_browsers_q(pos_integer) :: Ecto.Query.t() + def export_browsers_q(site_id) do + from s in sampled("sessions_v2"), + where: s.site_id == ^site_id, + group_by: [selected_as(:date), s.browser], + order_by: selected_as(:date), + select: [ + date(s.start), + s.browser, + visitors(s), + visits(s), + visit_duration(s), + bounces(s) + ] + end + + @spec export_operating_systems_q(pos_integer) :: Ecto.Query.t() + def export_operating_systems_q(site_id) do + from s in sampled("sessions_v2"), + where: s.site_id == ^site_id, + group_by: [selected_as(:date), s.operating_system], + order_by: selected_as(:date), + select: [ + date(s.start), + s.operating_system, + visitors(s), + visits(s), + visit_duration(s), + bounces(s) + ] + end + + @doc """ + Creates a streamable Zip archive from the provided (named) Ecto queries. + + Example usage: + + {:ok, pool} = Ch.start_link(pool_size: 1) + + DBConnection.run(pool, fn conn -> + conn + |> stream_archive(export_queries(_site_id = 1), format: "CSVWithNames") + |> Stream.into(File.stream!("export.zip")) + |> Stream.run() + end) + + """ + @spec stream_archive(DBConnection.t(), %{String.t() => Ecto.Query.t()}, [Ch.query_option()]) :: + Enumerable.t() + def stream_archive(conn, named_queries, opts \\ []) do + entries = + Enum.map(named_queries, fn {name, query} -> + {sql, params} = Plausible.ClickhouseRepo.to_sql(:all, query) + + datastream = + conn + |> Ch.stream(sql, params, opts) + |> Stream.map(fn %Ch.Result{data: data} -> data end) + + Zstream.entry(name, datastream, coder: Zstream.Coder.Stored) + end) + + Zstream.zip(entries) + end +end diff --git a/lib/plausible/s3.ex b/lib/plausible/s3.ex index d8f8ad65f..1d27a8f13 100644 --- a/lib/plausible/s3.ex +++ b/lib/plausible/s3.ex @@ -3,6 +3,60 @@ defmodule Plausible.S3 do Helper functions for S3 exports/imports. """ + @doc """ + Chunks and uploads Zip archive to the provided S3 destination. + + Returns a presigned URL to download the exported Zip archive from S3. + The URL expires in 24 hours. + + In the current implementation the bucket always goes into the path component. + """ + @spec export_upload_multipart(Enumerable.t(), String.t(), Path.t(), keyword) :: + :uri_string.uri_string() + def export_upload_multipart(stream, s3_bucket, s3_path, config_overrides \\ []) do + config = ExAws.Config.new(:s3) + + # 5 MiB is the smallest chunk size AWS S3 supports + chunk_into_parts(stream, 5 * 1024 * 1024) + |> ExAws.S3.upload(s3_bucket, s3_path, + content_disposition: ~s|attachment; filename="Plausible.zip"|, + content_type: "application/zip" + ) + |> ExAws.request!(config_overrides) + + {:ok, download_url} = + ExAws.S3.presigned_url(config, :get, s3_bucket, s3_path, expires_in: _24hr = 86_400) + + download_url + end + + defp chunk_into_parts(stream, min_part_size) do + Stream.chunk_while( + stream, + _acc = %{buffer_size: 0, buffer: [], min_part_size: min_part_size}, + _chunk_fun = &buffer_until_big_enough/2, + _after_fun = &flush_leftovers/1 + ) + end + + defp buffer_until_big_enough(data, acc) do + %{buffer_size: prev_buffer_size, buffer: prev_buffer, min_part_size: min_part_size} = acc + new_buffer_size = prev_buffer_size + IO.iodata_length(data) + new_buffer = [prev_buffer | data] + + if new_buffer_size > min_part_size do + # NOTE: PR to make ExAws.Operation.ExAws.Operation.S3.put_content_length_header/3 accept iodata + {:cont, IO.iodata_to_binary(new_buffer), %{acc | buffer_size: 0, buffer: []}} + else + {:cont, %{acc | buffer_size: new_buffer_size, buffer: new_buffer}} + end + end + + defp flush_leftovers(acc) do + # NOTE: PR to make ExAws.Operation.ExAws.Operation.S3.put_content_length_header/3 accept iodata + {:cont, IO.iodata_to_binary(acc.buffer), %{acc | buffer_size: 0, buffer: []}} + end + @doc """ Returns `access_key_id` and `secret_access_key` to be used by ClickHouse during imports from S3. """ diff --git a/lib/workers/export_csv.ex b/lib/workers/export_csv.ex new file mode 100644 index 000000000..1eb3d42e1 --- /dev/null +++ b/lib/workers/export_csv.ex @@ -0,0 +1,83 @@ +defmodule Plausible.Workers.ExportCSV do + @moduledoc """ + Worker for running CSV export jobs. + """ + + use Oban.Worker, + queue: :s3_csv_export, + max_attempts: 3, + unique: [fields: [:args], keys: [:s3_bucket, :s3_path], period: 60] + + @impl true + def perform(job) do + %Oban.Job{ + args: + %{ + "site_id" => site_id, + "email_to" => email, + "s3_bucket" => s3_bucket, + "s3_path" => s3_path + } = args + } = job + + {:ok, ch} = + Plausible.ClickhouseRepo.config() + |> Keyword.replace!(:pool_size, 1) + |> Ch.start_link() + + # NOTE: should we use site.timezone? + # %Ch.Result{rows: [[min_date, max_date]]} = + # Ch.query!( + # ch, + # "SELECT toDate(min(timestamp)), toDate(max(timestamp)) FROM events_v2 WHERE site_id={site_id:UInt64}", + # %{"site_id" => site_id} + # ) + + download_url = + DBConnection.run( + ch, + fn conn -> + conn + |> Plausible.Exports.stream_archive( + # date_range: Date.range(min_date, max_date) + Plausible.Exports.export_queries(site_id, extname: ".csv"), + format: "CSVWithNames" + ) + |> Plausible.S3.export_upload_multipart(s3_bucket, s3_path, s3_config_overrides(args)) + end, + timeout: :infinity + ) + + # NOTE: replace with proper Plausible.Email template + Plausible.Mailer.deliver_now!( + Bamboo.Email.new_email( + from: "plausible@email.com", + to: email, + subject: "EXPORT SUCCESS", + text_body: """ + download it from #{download_url}! hurry up! you have 24 hours!" + """, + html_body: """ + download it from here! hurry up! you have 24 hours! + """ + ) + ) + + :ok + end + + # right now custom config is used in tests only (to access the minio container) + # ideally it would be passed via the s3 url + # but ExAws.S3.upload is hard to make work with s3 urls + if Mix.env() in [:test, :small_test] do + defp s3_config_overrides(args) do + if config_overrides = args["s3_config_overrides"] do + Enum.map(config_overrides, fn {k, v} -> {String.to_existing_atom(k), v} end) + else + [] + end + end + else + defp s3_config_overrides(_args), do: [] + end +end diff --git a/mix.exs b/mix.exs index 95c76fdf5..2fada1d0f 100644 --- a/mix.exs +++ b/mix.exs @@ -140,7 +140,8 @@ defmodule Plausible.MixProject do {:ex_aws, "~> 2.5"}, {:ex_aws_s3, "~> 2.5"}, {:sweet_xml, "~> 0.7.4"}, - {:testcontainers, "~> 1.6", only: [:test, :small_test]} + {:testcontainers, "~> 1.6", only: [:test, :small_test]}, + {:zstream, "~> 0.6.4"} ] end diff --git a/mix.lock b/mix.lock index a985822b3..9fd693a59 100644 --- a/mix.lock +++ b/mix.lock @@ -11,7 +11,7 @@ "cachex": {:hex, :cachex, "3.6.0", "14a1bfbeee060dd9bec25a5b6f4e4691e3670ebda28c8ba2884b12fe30b36bf8", [:mix], [{:eternal, "~> 1.2", [hex: :eternal, repo: "hexpm", optional: false]}, {:jumper, "~> 1.0", [hex: :jumper, repo: "hexpm", optional: false]}, {:sleeplocks, "~> 1.1", [hex: :sleeplocks, repo: "hexpm", optional: false]}, {:unsafe, "~> 1.0", [hex: :unsafe, repo: "hexpm", optional: false]}], "hexpm", "ebf24e373883bc8e0c8d894a63bbe102ae13d918f790121f5cfe6e485cc8e2e2"}, "castore": {:hex, :castore, "1.0.5", "9eeebb394cc9a0f3ae56b813459f990abb0a3dedee1be6b27fdb50301930502f", [:mix], [], "hexpm", "8d7c597c3e4a64c395980882d4bca3cebb8d74197c590dc272cfd3b6a6310578"}, "certifi": {:hex, :certifi, "2.12.0", "2d1cca2ec95f59643862af91f001478c9863c2ac9cb6e2f89780bfd8de987329", [:rebar3], [], "hexpm", "ee68d85df22e554040cdb4be100f33873ac6051387baf6a8f6ce82272340ff1c"}, - "ch": {:hex, :ch, "0.2.4", "d510fbb5542d009f7c5b00bb1ecab73307b6066d9fb9b220600257d462cba67f", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.5", [hex: :ecto, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: false]}], "hexpm", "8f065d15aaf912ae8da56c9ca5298fb2d1a09108d006de589bcf8c2b39a7e2bb"}, + "ch": {:hex, :ch, "0.2.5", "b8d70689951bd14c8c8791dc72cdc957ba489ceae723e79cf1a91d95b6b855ae", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.5", [hex: :ecto, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: false]}], "hexpm", "97de104c8f513a23c6d673da37741f68ae743f6cdb654b96a728d382e2fba4de"}, "chatterbox": {:hex, :ts_chatterbox, "0.15.1", "5cac4d15dd7ad61fc3c4415ce4826fc563d4643dee897a558ec4ea0b1c835c9c", [:rebar3], [{:hpack, "~> 0.3.0", [hex: :hpack_erl, repo: "hexpm", optional: false]}], "hexpm", "4f75b91451338bc0da5f52f3480fa6ef6e3a2aeecfc33686d6b3d0a0948f31aa"}, "cldr_utils": {:hex, :cldr_utils, "2.24.2", "364fa30be55d328e704629568d431eb74cd2f085752b27f8025520b566352859", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:certifi, "~> 2.5", [hex: :certifi, repo: "hexpm", optional: true]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}], "hexpm", "3362b838836a9f0fa309de09a7127e36e67310e797d556db92f71b548832c7cf"}, "cloak": {:hex, :cloak, "1.1.2", "7e0006c2b0b98d976d4f559080fabefd81f0e0a50a3c4b621f85ceeb563e80bb", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "940d5ac4fcd51b252930fd112e319ea5ae6ab540b722f3ca60a85666759b9585"}, @@ -156,5 +156,6 @@ "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, "websock_adapter": {:hex, :websock_adapter, "0.5.5", "9dfeee8269b27e958a65b3e235b7e447769f66b5b5925385f5a569269164a210", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "4b977ba4a01918acbf77045ff88de7f6972c2a009213c515a445c48f224ffce9"}, "yamerl": {:hex, :yamerl, "0.10.0", "4ff81fee2f1f6a46f1700c0d880b24d193ddb74bd14ef42cb0bcf46e81ef2f8e", [:rebar3], [], "hexpm", "346adb2963f1051dc837a2364e4acf6eb7d80097c0f53cbdc3046ec8ec4b4e6e"}, + "zstream": {:hex, :zstream, "0.6.4", "169ce887a443d4163085ee682ab1b0ad38db8fa45e843927b9b431a92f4b7d9e", [:mix], [], "hexpm", "acc6c35b6db9eb2cfe8b85e972cb9dc1b730f8efeb76c5bbe871216fe639d9a1"}, "zxcvbn": {:git, "https://github.com/techgaun/zxcvbn-elixir.git", "aede1d49d39e89d7b3d1c381de5f04c9907d8171", []}, } diff --git a/test/plausible/exports_test.exs b/test/plausible/exports_test.exs new file mode 100644 index 000000000..7f0f1e01f --- /dev/null +++ b/test/plausible/exports_test.exs @@ -0,0 +1,141 @@ +defmodule Plausible.ExportsTest do + use Plausible.DataCase, async: true + + # for e2e export->import tests please see Plausible.Imported.CSVImporterTest + + describe "export_queries/2" do + test "returns named ecto queries" do + queries = Plausible.Exports.export_queries(_site_id = 1) + assert queries |> Map.values() |> Enum.all?(&match?(%Ecto.Query{}, &1)) + + assert Map.keys(queries) == [ + "imported_browsers.csv", + "imported_devices.csv", + "imported_entry_pages.csv", + "imported_exit_pages.csv", + "imported_locations.csv", + "imported_operating_systems.csv", + "imported_pages.csv", + "imported_sources.csv", + "imported_visitors.csv" + ] + end + + test "with date range" do + queries = + Plausible.Exports.export_queries(_site_id = 1, + date_range: Date.range(~D[2023-01-01], ~D[2024-03-12]) + ) + + assert Map.keys(queries) == [ + "imported_browsers_20230101_20240312.csv", + "imported_devices_20230101_20240312.csv", + "imported_entry_pages_20230101_20240312.csv", + "imported_exit_pages_20230101_20240312.csv", + "imported_locations_20230101_20240312.csv", + "imported_operating_systems_20230101_20240312.csv", + "imported_pages_20230101_20240312.csv", + "imported_sources_20230101_20240312.csv", + "imported_visitors_20230101_20240312.csv" + ] + end + + test "with custom extension" do + queries = + Plausible.Exports.export_queries(_site_id = 1, + extname: ".ch" + ) + + assert Map.keys(queries) == [ + "imported_browsers.ch", + "imported_devices.ch", + "imported_entry_pages.ch", + "imported_exit_pages.ch", + "imported_locations.ch", + "imported_operating_systems.ch", + "imported_pages.ch", + "imported_sources.ch", + "imported_visitors.ch" + ] + end + end + + describe "stream_archive/3" do + @describetag :tmp_dir + + setup do + config = Keyword.replace!(Plausible.ClickhouseRepo.config(), :pool_size, 1) + {:ok, ch: start_supervised!({Ch, config})} + end + + test "creates zip archive", %{ch: ch, tmp_dir: tmp_dir} do + queries = %{ + "1.csv" => from(n in "numbers", select: n.number, limit: 3), + "2.csv" => + from(n in "numbers", + select: [n.number, selected_as(n.number + n.number, :double)], + limit: 3 + ) + } + + DBConnection.run(ch, fn conn -> + conn + |> Plausible.Exports.stream_archive(queries, database: "system", format: "CSVWithNames") + |> Stream.into(File.stream!(Path.join(tmp_dir, "numbers.zip"))) + |> Stream.run() + end) + + assert {:ok, files} = + :zip.unzip(to_charlist(Path.join(tmp_dir, "numbers.zip")), cwd: tmp_dir) + + assert Enum.map(files, &Path.basename/1) == ["1.csv", "2.csv"] + + read_csv = fn file -> + Enum.find(files, &(Path.basename(&1) == file)) + |> File.read!() + |> NimbleCSV.RFC4180.parse_string(skip_headers: false) + end + + assert read_csv.("1.csv") == + NimbleCSV.RFC4180.parse_string( + """ + number + 0 + 1 + 2 + """, + skip_headers: false + ) + + assert read_csv.("2.csv") == + NimbleCSV.RFC4180.parse_string( + """ + number,double + 0,0 + 1,2 + 2,4 + """, + skip_headers: false + ) + end + + test "stops on error", %{ch: ch, tmp_dir: tmp_dir} do + queries = %{ + "1.csv" => from(n in "numbers", select: n.number, limit: 1000), + "2.csv" => from(n in "no_such_table", select: n.number) + } + + assert_raise Ch.Error, ~r/UNKNOWN_TABLE/, fn -> + DBConnection.run(ch, fn conn -> + conn + |> Plausible.Exports.stream_archive(queries, database: "system", format: "CSVWithNames") + |> Stream.into(File.stream!(Path.join(tmp_dir, "failed.zip"))) + |> Stream.run() + end) + end + + assert {:error, :einval} = + :zip.unzip(to_charlist(Path.join(tmp_dir, "failed.zip")), cwd: tmp_dir) + end + end +end diff --git a/test/plausible/imported/csv_importer_test.exs b/test/plausible/imported/csv_importer_test.exs index 9cf4c0985..e05c914c9 100644 --- a/test/plausible/imported/csv_importer_test.exs +++ b/test/plausible/imported/csv_importer_test.exs @@ -13,11 +13,11 @@ defmodule Plausible.Imported.CSVImporterTest do on_exit(fn -> :ok = Testcontainers.stop_container(minio.container_id) end) connection_opts = MinioContainer.connection_opts(minio) - bucket = "imports" - ExAws.request!(ExAws.S3.put_bucket(bucket, "us-east-1"), connection_opts) - on_exit(fn -> ExAws.request!(ExAws.S3.delete_bucket(bucket), connection_opts) end) + s3 = fn op -> ExAws.request!(op, connection_opts) end + s3.(ExAws.S3.put_bucket("imports", "us-east-1")) + on_exit(fn -> s3.(ExAws.S3.delete_bucket("imports")) end) - {:ok, container: minio, bucket: bucket} + {:ok, container: minio, s3: s3} end describe "new_import/3 and parse_args/1" do @@ -75,7 +75,7 @@ defmodule Plausible.Imported.CSVImporterTest do describe "import_data/2" do setup [:create_user, :create_new_site] - test "imports tables from S3", %{site: site, user: user, bucket: bucket, container: minio} do + test "imports tables from S3", %{site: site, user: user, s3: s3, container: minio} do csvs = [ %{ name: "imported_browsers.csv", @@ -301,18 +301,16 @@ defmodule Plausible.Imported.CSVImporterTest do } ] - connection_opts = MinioContainer.connection_opts(minio) - on_exit(fn -> keys = Enum.map(csvs, fn csv -> "#{site.id}/#{csv.name}" end) - ExAws.request!(ExAws.S3.delete_all_objects(bucket, keys), connection_opts) + s3.(ExAws.S3.delete_all_objects("imports", keys)) end) uploads = for %{name: name, body: body} <- csvs do key = "#{site.id}/#{name}" - ExAws.request!(ExAws.S3.put_object(bucket, key, body), connection_opts) - %{"filename" => name, "s3_url" => s3_url(minio, bucket, key)} + s3.(ExAws.S3.put_object("imports", key, body)) + %{"filename" => name, "s3_url" => s3_url(minio, "imports", key)} end {:ok, job} = @@ -340,7 +338,7 @@ defmodule Plausible.Imported.CSVImporterTest do assert Plausible.Stats.Clickhouse.imported_pageview_count(site) == 99 end - test "fails on invalid CSV", %{site: site, user: user, bucket: bucket, container: minio} do + test "fails on invalid CSV", %{site: site, user: user, s3: s3, container: minio} do csvs = [ %{ name: "imported_browsers.csv", @@ -364,18 +362,16 @@ defmodule Plausible.Imported.CSVImporterTest do } ] - connection_opts = MinioContainer.connection_opts(minio) - on_exit(fn -> keys = Enum.map(csvs, fn csv -> "#{site.id}/#{csv.name}" end) - ExAws.request!(ExAws.S3.delete_all_objects(bucket, keys), connection_opts) + s3.(ExAws.S3.delete_all_objects("imports", keys)) end) uploads = for %{name: name, body: body} <- csvs do key = "#{site.id}/#{name}" - ExAws.request!(ExAws.S3.put_object(bucket, key, body), connection_opts) - %{"filename" => name, "s3_url" => s3_url(minio, bucket, key)} + s3.(ExAws.S3.put_object("imports", key, body)) + %{"filename" => name, "s3_url" => s3_url(minio, "imports", key)} end {:ok, job} = @@ -401,6 +397,143 @@ defmodule Plausible.Imported.CSVImporterTest do end end + describe "export -> import" do + setup [:create_user, :create_new_site] + + @describetag :tmp_dir + + test "it works", %{site: site, user: user, s3: s3, tmp_dir: tmp_dir, container: minio} do + populate_stats(site, [ + build(:pageview, + user_id: 123, + pathname: "/", + timestamp: + Timex.shift(~N[2021-10-20 12:00:00], minutes: -1) |> NaiveDateTime.truncate(:second), + country_code: "EE", + subdivision1_code: "EE-37", + city_geoname_id: 588_409, + referrer_source: "Google" + ), + build(:pageview, + user_id: 123, + pathname: "/some-other-page", + timestamp: + Timex.shift(~N[2021-10-20 12:00:00], minutes: -2) |> NaiveDateTime.truncate(:second), + country_code: "EE", + subdivision1_code: "EE-37", + city_geoname_id: 588_409, + referrer_source: "Google" + ), + build(:pageview, + pathname: "/", + timestamp: + Timex.shift(~N[2021-10-20 12:00:00], days: -1) |> NaiveDateTime.truncate(:second), + utm_medium: "search", + utm_campaign: "ads", + utm_source: "google", + utm_content: "content", + utm_term: "term", + browser: "Firefox", + browser_version: "120", + operating_system: "Mac", + operating_system_version: "14" + ), + build(:pageview, + timestamp: + Timex.shift(~N[2021-10-20 12:00:00], months: -1) |> NaiveDateTime.truncate(:second), + country_code: "EE", + browser: "Firefox", + browser_version: "120", + operating_system: "Mac", + operating_system_version: "14" + ), + build(:pageview, + timestamp: + Timex.shift(~N[2021-10-20 12:00:00], months: -5) |> NaiveDateTime.truncate(:second), + utm_campaign: "ads", + country_code: "EE", + referrer_source: "Google", + browser: "FirefoxNoVersion", + operating_system: "MacNoVersion" + ), + build(:event, + timestamp: + Timex.shift(~N[2021-10-20 12:00:00], days: -1) |> NaiveDateTime.truncate(:second), + name: "Signup", + "meta.key": ["variant"], + "meta.value": ["A"] + ) + ]) + + # export archive to s3 + s3.(ExAws.S3.put_bucket("exports", "us-east-1")) + on_exit(fn -> s3.(ExAws.S3.delete_bucket("exports")) end) + + Oban.insert!( + Plausible.Workers.ExportCSV.new(%{ + "site_id" => site.id, + "email_to" => user.email, + "s3_bucket" => "exports", + "s3_path" => "#{site.id}/Plausible.zip", + "s3_config_overrides" => Map.new(MinioContainer.connection_opts(minio)) + }) + ) + + on_exit(fn -> s3.(ExAws.S3.delete_object("exports", "#{site.id}/Plausible.zip")) end) + assert %{success: 1} = Oban.drain_queue(queue: :s3_csv_export, with_safety: false) + + # download archive + + s3.( + ExAws.S3.download_file( + "exports", + "/#{site.id}/Plausible.zip", + Path.join(tmp_dir, "Plausible.zip") + ) + ) + + # unzip archive + {:ok, files} = :zip.unzip(to_charlist(Path.join(tmp_dir, "Plausible.zip")), cwd: tmp_dir) + + # upload csvs + on_exit(fn -> + keys = Enum.map(files, fn file -> "#{site.id}/#{Path.basename(file)}" end) + s3.(ExAws.S3.delete_all_objects("imports", keys)) + end) + + uploads = + Enum.map(files, fn file -> + key = "#{site.id}/#{Path.basename(file)}" + s3.(ExAws.S3.put_object("imports", key, File.read!(file))) + %{"filename" => Path.basename(file), "s3_url" => s3_url(minio, "imports", key)} + end) + + # run importer + {:ok, job} = + CSVImporter.new_import( + site, + user, + start_date: ~D[1970-01-01], + end_date: ~D[1970-01-01], + uploads: uploads + ) + + job = Repo.reload!(job) + + assert :ok = Plausible.Workers.ImportAnalytics.perform(job) + + # validate import + assert %SiteImport{ + start_date: ~D[2021-05-20], + end_date: ~D[2021-10-20], + source: :csv, + status: :completed + } = Repo.get_by!(SiteImport, site_id: site.id) + + assert Plausible.Stats.Clickhouse.imported_pageview_count(site) == 5 + end + end + defp s3_url(minio, bucket, key) do port = minio |> MinioContainer.connection_opts() |> Keyword.fetch!(:port) Path.join(["http://172.17.0.1:#{port}", bucket, key])