diff --git a/.github/workflows/elixir.yml b/.github/workflows/elixir.yml index 18162bc90..5655b33b1 100644 --- a/.github/workflows/elixir.yml +++ b/.github/workflows/elixir.yml @@ -19,17 +19,17 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - mix_env: ['test', 'small_test'] - postgres_image: ['postgres:16'] - test_experimental_reduced_joins: ['0'] + mix_env: ["test", "small_test"] + postgres_image: ["postgres:16"] + test_experimental_reduced_joins: ["0"] include: - - mix_env: 'test' - postgres_image: 'postgres:15' - test_experimental_reduced_joins: '0' - - mix_env: 'test' - postgres_image: 'postgres:16' - test_experimental_reduced_joins: '1' + - mix_env: "test" + postgres_image: "postgres:15" + test_experimental_reduced_joins: "0" + - mix_env: "test" + postgres_image: "postgres:16" + test_experimental_reduced_joins: "1" env: MIX_ENV: ${{ matrix.mix_env }} @@ -105,8 +105,13 @@ jobs: - run: mix do ecto.create, ecto.migrate - run: mix run -e "Tzdata.ReleaseUpdater.poll_for_update" + - run: make minio + if: env.MIX_ENV == 'test' - run: mix test --include slow --include minio --max-failures 1 --warnings-as-errors if: env.MIX_ENV == 'test' + env: + MINIO_HOST_FOR_CLICKHOUSE: "172.17.0.1" + - run: mix test --include slow --max-failures 1 --warnings-as-errors if: env.MIX_ENV == 'small_test' diff --git a/Makefile b/Makefile index ea4d92bbb..fd1a3e023 100644 --- a/Makefile +++ b/Makefile @@ -40,8 +40,10 @@ postgres-stop: ## Stop and remove the postgres container minio: ## Start a transient container with a recent version of minio (s3) docker run -d --rm -p 10000:10000 -p 10001:10001 --name plausible_minio minio/minio server /data --address ":10000" --console-address ":10001" while ! docker exec plausible_minio mc alias set local http://localhost:10000 minioadmin minioadmin; do sleep 1; done - docker exec plausible_minio mc mb local/dev-exports - docker exec plausible_minio mc mb local/dev-imports + docker exec plausible_minio sh -c 'mc mb local/dev-exports && mc ilm add --expiry-days 7 local/dev-exports' + docker exec plausible_minio sh -c 'mc mb local/dev-imports && mc ilm add --expiry-days 7 local/dev-imports' + docker exec plausible_minio sh -c 'mc mb local/test-exports && mc ilm add --expiry-days 7 local/test-exports' + docker exec plausible_minio sh -c 'mc mb local/test-imports && mc ilm add --expiry-days 7 local/test-imports' minio-stop: ## Stop and remove the minio container docker stop plausible_minio diff --git a/config/runtime.exs b/config/runtime.exs index ad64d357f..5b7c8d852 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -296,7 +296,8 @@ config :plausible, is_selfhost: is_selfhost, custom_script_name: custom_script_name, log_failed_login_attempts: log_failed_login_attempts, - license_key: license_key + license_key: license_key, + persistent_cache_dir: persistent_cache_dir config :plausible, :selfhost, enable_email_verification: enable_email_verification, @@ -537,10 +538,10 @@ base_queues = [ site_setup_emails: 1, clean_invitations: 1, analytics_imports: 1, + analytics_exports: 1, + notify_exported_analytics: 1, domain_change_transition: 1, - check_accept_traffic_until: 1, - # NOTE: maybe move s3_csv_export to cloud_queues? - s3_csv_export: 1 + check_accept_traffic_until: 1 ] cloud_queues = [ diff --git a/lib/plausible/exports.ex b/lib/plausible/exports.ex index 50c12d3d7..00b7e4dc4 100644 --- a/lib/plausible/exports.ex +++ b/lib/plausible/exports.ex @@ -3,85 +3,284 @@ defmodule Plausible.Exports do Contains functions to export data for events and sessions as Zip archives. """ - require Plausible + use Plausible import Ecto.Query + @doc "Schedules CSV export job to S3 storage" + @spec schedule_s3_export(pos_integer, String.t()) :: {:ok, Oban.Job.t()} | {:error, :no_data} + def schedule_s3_export(site_id, email_to) do + with :ok <- ensure_has_data(site_id) do + args = %{ + "storage" => "s3", + "site_id" => site_id, + "email_to" => email_to, + "s3_bucket" => Plausible.S3.exports_bucket(), + "s3_path" => s3_export_key(site_id) + } + + {:ok, Oban.insert!(Plausible.Workers.ExportAnalytics.new(args))} + end + end + + @doc "Schedules CSV export job to local storage" + @spec schedule_local_export(pos_integer, String.t()) :: {:ok, Oban.Job.t()} | {:error, :no_data} + def schedule_local_export(site_id, email_to) do + with :ok <- ensure_has_data(site_id) do + args = %{ + "storage" => "local", + "site_id" => site_id, + "email_to" => email_to, + "local_path" => local_export_file(site_id) + } + + {:ok, Oban.insert!(Plausible.Workers.ExportAnalytics.new(args))} + end + end + + @spec ensure_has_data(pos_integer) :: :ok | {:error, :no_data} + defp ensure_has_data(site_id) do + # SELECT true FROM "events_v2" AS e0 WHERE (e0."site_id" = ^site_id) LIMIT 1 + has_data? = Plausible.ClickhouseRepo.exists?(from "events_v2", where: [site_id: ^site_id]) + if has_data?, do: :ok, else: {:error, :no_data} + end + + @doc "Gets last CSV export job for a site" + @spec get_last_export_job(pos_integer) :: Oban.Job.t() | nil + def get_last_export_job(site_id) do + Plausible.Repo.one( + from e in Plausible.Workers.ExportAnalytics.base_query(site_id), + order_by: [desc: :id], + limit: 1 + ) + end + + @doc "Subscribes to CSV export job notifications" + def oban_listen, do: Oban.Notifier.listen(__MODULE__) + @doc false + def oban_notify(site_id), do: Oban.Notifier.notify(__MODULE__, %{"site_id" => site_id}) + @doc """ - Renders filename for the Zip archive containing the exported CSV files. + Renders export archive filename. Examples: - iex> archive_filename("plausible.io", ~D[2021-01-01], ~D[2024-12-31]) - "plausible_io_20210101_20241231.zip" - - iex> archive_filename("Bücher.example", ~D[2021-01-01], ~D[2024-12-31]) - "Bücher_example_20210101_20241231.zip" + iex> archive_filename("plausible.io", _created_on = ~D[2024-12-31]) + "plausible_io_20241231.zip" """ - def archive_filename(domain, min_date, max_date) do - name = - Enum.join( - [ - String.replace(domain, ".", "_"), - Calendar.strftime(min_date, "%Y%m%d"), - Calendar.strftime(max_date, "%Y%m%d") - ], - "_" + def archive_filename(domain, %Date{} = created_on) do + String.replace(domain, ".", "_") <> "_" <> Calendar.strftime(created_on, "%Y%m%d") <> ".zip" + end + + @doc ~S""" + Safely renders content disposition for an arbitrary export filename. + + Examples: + + iex> content_disposition("plausible_io_20241231.zip") + "attachment; filename=\"plausible_io_20241231.zip\"" + + iex> content_disposition("📊.zip") + "attachment; filename=\"plausible-export.zip\"; filename*=utf-8''%F0%9F%93%8A.zip" + + """ + def content_disposition(filename) do + encoded_filename = URI.encode(filename) + + if encoded_filename == filename do + ~s[attachment; filename="#{filename}"] + else + ~s[attachment; filename="plausible-export.zip"; filename*=utf-8''#{encoded_filename}] + end + end + + @type export :: %{ + path: Path.t(), + name: String.t(), + expires_at: DateTime.t() | nil, + download_link: String.t(), + size: pos_integer + } + + @doc "Gets local export for a site" + @spec get_local_export(pos_integer, String.t(), String.t()) :: export | nil + def get_local_export(site_id, domain, timezone) do + path = local_export_file(site_id) + + if File.exists?(path) do + %File.Stat{size: size, mtime: mtime} = File.stat!(path, time: :posix) + created_at = DateTime.from_unix!(mtime) + created_on_in_site_tz = Plausible.Timezones.to_date_in_timezone(created_at, timezone) + name = archive_filename(domain, created_on_in_site_tz) + + download_link = + PlausibleWeb.Router.Helpers.site_path( + PlausibleWeb.Endpoint, + :download_local_export, + domain + ) + + %{path: path, name: name, expires_at: nil, download_link: download_link, size: size} + end + end + + @doc "Deletes local export for a site" + @spec delete_local_export(pos_integer) :: :ok + def delete_local_export(site_id) do + file = local_export_file(site_id) + + if File.exists?(file) do + File.rm!(file) + end + + :ok + end + + @spec local_export_file(pos_integer) :: Path.t() + defp local_export_file(site_id) do + persistent_cache_dir = Application.get_env(:plausible, :persistent_cache_dir) + + Path.join([ + persistent_cache_dir || System.tmp_dir!(), + "plausible-exports", + Integer.to_string(site_id) + ]) + end + + @doc "Gets S3 export for a site" + @spec get_s3_export(pos_integer) :: export | nil + def get_s3_export(site_id) do + path = s3_export_key(site_id) + bucket = Plausible.S3.exports_bucket() + head_object_op = ExAws.S3.head_object(bucket, path) + + case ExAws.request(head_object_op) do + {:error, {:http_error, 404, _response}} -> + nil + + {:ok, %{status_code: 200, headers: headers}} -> + "attachment; filename=" <> filename = :proplists.get_value("content-disposition", headers) + name = String.trim(filename, "\"") + size = :proplists.get_value("content-length", headers, nil) + + expires_at = + if x_amz_expiration = :proplists.get_value("x-amz-expiration", headers, nil) do + ["expiry-date=", expiry_date, ", rule-id=", _rule_id] = + String.split(x_amz_expiration, "\"", trim: true) + + Timex.parse!(expiry_date, "{RFC1123}") + end + + %{ + path: path, + name: name, + expires_at: expires_at, + download_link: Plausible.S3.download_url(bucket, path), + size: String.to_integer(size) + } + end + end + + @doc "Deletes S3 export for a site" + @spec delete_s3_export(pos_integer) :: :ok + def delete_s3_export(site_id) do + if export = get_s3_export(site_id) do + exports_bucket = Plausible.S3.exports_bucket() + delete_op = ExAws.S3.delete_object(exports_bucket, export.path) + ExAws.request!(delete_op) + end + + :ok + end + + defp s3_export_key(site_id), do: Integer.to_string(site_id) + + @doc "Returns the date range for the site's events data in site's timezone or `nil` if there is no data" + @spec date_range(non_neg_integer, String.t()) :: Date.Range.t() | nil + def date_range(site_id, timezone) do + [%Date{} = start_date, %Date{} = end_date] = + Plausible.ClickhouseRepo.one( + from e in "events_v2", + where: [site_id: ^site_id], + select: [ + fragment("toDate(min(?),?)", e.timestamp, ^timezone), + fragment("toDate(max(?),?)", e.timestamp, ^timezone) + ] ) - name <> ".zip" + unless end_date == ~D[1970-01-01] do + Date.range(start_date, end_date) + end end @doc """ Builds Ecto queries to export data from `events_v2` and `sessions_v2` - tables into the format of `imported_*` tables for a website. + tables into the format of `imported_*` tables for a website. """ - @spec export_queries(pos_integer, extname: String.t(), date_range: Date.Range.t()) :: + @spec export_queries(pos_integer, + extname: String.t(), + date_range: Date.Range.t(), + timezone: String.t() + ) :: %{String.t() => Ecto.Query.t()} def export_queries(site_id, opts \\ []) do extname = opts[:extname] || ".csv" date_range = opts[:date_range] + timezone = opts[:timezone] || "UTC" - filename = fn table -> - name = - if date_range do - first_date = Timex.format!(date_range.first, "{YYYY}{0M}{0D}") - last_date = Timex.format!(date_range.last, "{YYYY}{0M}{0D}") - "#{table}_#{first_date}_#{last_date}" - else - table - end + suffix = + if date_range do + first_date = Timex.format!(date_range.first, "{YYYY}{0M}{0D}") + last_date = Timex.format!(date_range.last, "{YYYY}{0M}{0D}") + "_#{first_date}_#{last_date}" <> extname + else + extname + end - name <> extname - end + filename = fn name -> name <> suffix end %{ - filename.("imported_visitors") => export_visitors_q(site_id), - filename.("imported_sources") => export_sources_q(site_id), + filename.("imported_visitors") => export_visitors_q(site_id, timezone, date_range), + filename.("imported_sources") => export_sources_q(site_id, timezone, date_range), # NOTE: this query can result in `MEMORY_LIMIT_EXCEEDED` error - filename.("imported_pages") => export_pages_q(site_id), - filename.("imported_entry_pages") => export_entry_pages_q(site_id), - filename.("imported_exit_pages") => export_exit_pages_q(site_id), - filename.("imported_locations") => export_locations_q(site_id), - filename.("imported_devices") => export_devices_q(site_id), - filename.("imported_browsers") => export_browsers_q(site_id), - filename.("imported_operating_systems") => export_operating_systems_q(site_id) + filename.("imported_pages") => export_pages_q(site_id, timezone, date_range), + filename.("imported_entry_pages") => export_entry_pages_q(site_id, timezone, date_range), + filename.("imported_exit_pages") => export_exit_pages_q(site_id, timezone, date_range), + filename.("imported_locations") => export_locations_q(site_id, timezone, date_range), + filename.("imported_devices") => export_devices_q(site_id, timezone, date_range), + filename.("imported_browsers") => export_browsers_q(site_id, timezone, date_range), + filename.("imported_operating_systems") => + export_operating_systems_q(site_id, timezone, date_range) } end - Plausible.on_full_build do - defp sampled(table) do - Plausible.Stats.Sampling.add_query_hint(from(table)) + on_full_build do + defp sampled(table, date_range) do + from(table) + |> Plausible.Stats.Sampling.add_query_hint() + |> limit_date_range(date_range) end else - defp sampled(table) do - table + defp sampled(table, date_range) do + limit_date_range(table, date_range) end end - defmacrop date(timestamp) do + defp limit_date_range(query, nil), do: query + + defp limit_date_range(query, date_range) do + from t in query, + where: + selected_as(:date) >= ^date_range.first and + selected_as(:date) <= ^date_range.last + end + + defmacrop date(timestamp, timezone) do quote do - selected_as(fragment("toDate(?)", unquote(timestamp)), :date) + selected_as( + fragment("toDate(?,?)", unquote(timestamp), unquote(timezone)), + :date + ) end end @@ -127,13 +326,12 @@ defmodule Plausible.Exports do end end - @spec export_visitors_q(pos_integer) :: Ecto.Query.t() - def export_visitors_q(site_id) do - from s in sampled("sessions_v2"), + defp export_visitors_q(site_id, timezone, date_range) do + from s in sampled("sessions_v2", date_range), where: s.site_id == ^site_id, group_by: selected_as(:date), select: [ - date(s.start), + date(s.start, ^timezone), visitors(s), pageviews(s), bounces(s), @@ -142,9 +340,8 @@ defmodule Plausible.Exports do ] end - @spec export_sources_q(pos_integer) :: Ecto.Query.t() - def export_sources_q(site_id) do - from s in sampled("sessions_v2"), + defp export_sources_q(site_id, timezone, date_range) do + from s in sampled("sessions_v2", date_range), where: s.site_id == ^site_id, group_by: [ selected_as(:date), @@ -158,7 +355,7 @@ defmodule Plausible.Exports do ], order_by: selected_as(:date), select: [ - date(s.start), + date(s.start, ^timezone), selected_as(s.referrer_source, :source), s.referrer, s.utm_source, @@ -174,32 +371,40 @@ defmodule Plausible.Exports do ] end - @spec export_pages_q(pos_integer) :: Ecto.Query.t() - def export_pages_q(site_id) do + defp export_pages_q(site_id, timezone, date_range) do window_q = - from e in sampled("events_v2"), + from e in sampled("events_v2", nil), where: e.site_id == ^site_id, + where: [name: "pageview"], select: %{ - timestamp: e.timestamp, + timestamp: selected_as(fragment("toTimeZone(?,?)", e.timestamp, ^timezone), :timestamp), next_timestamp: - over(fragment("leadInFrame(?)", e.timestamp), + over(fragment("leadInFrame(toTimeZone(?,?))", e.timestamp, ^timezone), partition_by: e.session_id, order_by: e.timestamp, frame: fragment("ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING") ), pathname: e.pathname, hostname: e.hostname, - name: e.name, user_id: e.user_id, session_id: e.session_id, _sample_factor: fragment("_sample_factor") } + window_q = + if date_range do + from e in window_q, + where: selected_as(:timestamp) >= ^date_range.first, + where: fragment("toDate(?)", selected_as(:timestamp)) <= ^date_range.last + else + window_q + end + from e in subquery(window_q), group_by: [selected_as(:date), e.pathname], order_by: selected_as(:date), select: [ - date(e.timestamp), + selected_as(fragment("toDate(?)", e.timestamp), :date), selected_as(fragment("any(?)", e.hostname), :hostname), selected_as(e.pathname, :page), selected_as( @@ -207,11 +412,7 @@ defmodule Plausible.Exports do :visits ), visitors(e), - selected_as( - fragment("toUInt64(round(countIf(?='pageview')*any(_sample_factor)))", e.name), - :pageviews - ), - # NOTE: are exits pageviews or any events? + selected_as(fragment("toUInt64(round(count()*any(_sample_factor)))"), :pageviews), selected_as( fragment("toUInt64(round(countIf(?=0)*any(_sample_factor)))", e.next_timestamp), :exits @@ -223,14 +424,13 @@ defmodule Plausible.Exports do ] end - @spec export_entry_pages_q(pos_integer) :: Ecto.Query.t() - def export_entry_pages_q(site_id) do - from s in sampled("sessions_v2"), + defp export_entry_pages_q(site_id, timezone, date_range) do + from s in sampled("sessions_v2", date_range), where: s.site_id == ^site_id, group_by: [selected_as(:date), s.entry_page], order_by: selected_as(:date), select: [ - date(s.start), + date(s.start, ^timezone), s.entry_page, visitors(s), selected_as( @@ -243,14 +443,13 @@ defmodule Plausible.Exports do ] end - @spec export_exit_pages_q(pos_integer) :: Ecto.Query.t() - def export_exit_pages_q(site_id) do - from s in sampled("sessions_v2"), + defp export_exit_pages_q(site_id, timezone, date_range) do + from s in sampled("sessions_v2", date_range), where: s.site_id == ^site_id, group_by: [selected_as(:date), s.exit_page], order_by: selected_as(:date), select: [ - date(s.start), + date(s.start, ^timezone), s.exit_page, visitors(s), visit_duration(s), @@ -263,15 +462,14 @@ defmodule Plausible.Exports do ] end - @spec export_locations_q(pos_integer) :: Ecto.Query.t() - def export_locations_q(site_id) do - from s in sampled("sessions_v2"), + defp export_locations_q(site_id, timezone, date_range) do + from s in sampled("sessions_v2", date_range), where: s.site_id == ^site_id, where: s.city_geoname_id != 0 and s.country_code != "\0\0" and s.country_code != "ZZ", group_by: [selected_as(:date), s.country_code, selected_as(:region), s.city_geoname_id], order_by: selected_as(:date), select: [ - date(s.start), + date(s.start, ^timezone), selected_as(s.country_code, :country), selected_as(s.subdivision1_code, :region), selected_as(s.city_geoname_id, :city), @@ -283,14 +481,13 @@ defmodule Plausible.Exports do ] end - @spec export_devices_q(pos_integer) :: Ecto.Query.t() - def export_devices_q(site_id) do - from s in sampled("sessions_v2"), + defp export_devices_q(site_id, timezone, date_range) do + from s in sampled("sessions_v2", date_range), where: s.site_id == ^site_id, group_by: [selected_as(:date), s.screen_size], order_by: selected_as(:date), select: [ - date(s.start), + date(s.start, ^timezone), selected_as(s.screen_size, :device), visitors(s), visits(s), @@ -300,14 +497,13 @@ defmodule Plausible.Exports do ] end - @spec export_browsers_q(pos_integer) :: Ecto.Query.t() - def export_browsers_q(site_id) do - from s in sampled("sessions_v2"), + defp export_browsers_q(site_id, timezone, date_range) do + from s in sampled("sessions_v2", date_range), where: s.site_id == ^site_id, group_by: [selected_as(:date), s.browser, s.browser_version], order_by: selected_as(:date), select: [ - date(s.start), + date(s.start, ^timezone), s.browser, s.browser_version, visitors(s), @@ -318,14 +514,13 @@ defmodule Plausible.Exports do ] end - @spec export_operating_systems_q(pos_integer) :: Ecto.Query.t() - def export_operating_systems_q(site_id) do - from s in sampled("sessions_v2"), + defp export_operating_systems_q(site_id, timezone, date_range) do + from s in sampled("sessions_v2", date_range), where: s.site_id == ^site_id, group_by: [selected_as(:date), s.operating_system, s.operating_system_version], order_by: selected_as(:date), select: [ - date(s.start), + date(s.start, ^timezone), s.operating_system, s.operating_system_version, visitors(s), diff --git a/lib/plausible/imported/csv_importer.ex b/lib/plausible/imported/csv_importer.ex index c2f292ce3..4de8f7a4d 100644 --- a/lib/plausible/imported/csv_importer.ex +++ b/lib/plausible/imported/csv_importer.ex @@ -1,6 +1,7 @@ defmodule Plausible.Imported.CSVImporter do @moduledoc """ - CSV importer from S3 that uses ClickHouse [s3 table function.](https://clickhouse.com/docs/en/sql-reference/table-functions/s3) + CSV importer from either S3 for which it uses ClickHouse [s3 table function](https://clickhouse.com/docs/en/sql-reference/table-functions/s3) + or from local storage for which it uses [input function.](https://clickhouse.com/docs/en/sql-reference/table-functions/input) """ use Plausible.Imported.Importer @@ -16,10 +17,45 @@ defmodule Plausible.Imported.CSVImporter do def email_template(), do: "google_analytics_import.html" @impl true - def parse_args(%{"uploads" => uploads}), do: [uploads: uploads] + def parse_args(%{"uploads" => uploads, "storage" => storage}) do + [uploads: uploads, storage: storage] + end @impl true def import_data(site_import, opts) do + storage = Keyword.fetch!(opts, :storage) + uploads = Keyword.fetch!(opts, :uploads) + + if storage == "local" do + # we need to remove the imported files from local storage + # after the importer has completed or ran out of attempts + paths = Enum.map(uploads, &Map.fetch!(&1, "local_path")) + + Oban.insert!( + Plausible.Workers.LocalImportAnalyticsCleaner.new( + %{"import_id" => site_import.id, "paths" => paths}, + schedule_in: _one_hour = 3600 + ) + ) + end + + {:ok, ch} = + Plausible.IngestRepo.config() + |> Keyword.replace!(:pool_size, 1) + |> Ch.start_link() + + case storage do + "s3" -> import_s3(ch, site_import, uploads) + "local" -> import_local(ch, site_import, uploads) + end + rescue + # we are cancelling on any argument or ClickHouse errors, assuming they are permanent + e in [ArgumentError, Ch.Error] -> + # see Plausible.Imported.Importer for more details on transient vs permanent errors + {:error, Exception.message(e)} + end + + defp import_s3(ch, site_import, uploads) do %{ id: import_id, site_id: site_id, @@ -27,34 +63,20 @@ defmodule Plausible.Imported.CSVImporter do end_date: end_date } = site_import - uploads = Keyword.fetch!(opts, :uploads) - %{access_key_id: s3_access_key_id, secret_access_key: s3_secret_access_key} = Plausible.S3.import_clickhouse_credentials() - {:ok, ch} = - Plausible.IngestRepo.config() - |> Keyword.replace!(:pool_size, 1) - |> Ch.start_link() - Enum.each(uploads, fn upload -> %{"filename" => filename, "s3_url" => s3_url} = upload {table, _, _} = parse_filename!(filename) s3_structure = input_structure!(table) - - s3_structure_cols_expr = - s3_structure - |> String.split(",", trim: true) - |> Enum.map_join(", ", fn kv -> - [col, _type] = String.split(kv) - col - end) + s3_columns = input_columns!(table) statement = """ - INSERT INTO {table:Identifier}(site_id, #{s3_structure_cols_expr}, import_id) \ - SELECT {site_id:UInt64} AS site_id, *, {import_id:UInt64} AS import_id \ + INSERT INTO {table:Identifier}(site_id,import_id,#{s3_columns}) \ + SELECT {site_id:UInt64}, {import_id:UInt64}, * \ FROM s3({s3_url:String},{s3_access_key_id:String},{s3_secret_access_key:String},{s3_format:String},{s3_structure:String}) \ WHERE date >= {start_date:Date} AND date <= {end_date:Date}\ """ @@ -75,10 +97,53 @@ defmodule Plausible.Imported.CSVImporter do Ch.query!(ch, statement, params, timeout: :infinity) end) - rescue - # we are cancelling on any argument or ClickHouse errors - e in [ArgumentError, Ch.Error] -> - {:error, Exception.message(e)} + end + + defp import_local(ch, site_import, uploads) do + %{ + id: import_id, + site_id: site_id, + start_date: start_date, + end_date: end_date + } = site_import + + DBConnection.run( + ch, + fn conn -> + Enum.each(uploads, fn upload -> + %{"filename" => filename, "local_path" => local_path} = upload + + {table, _, _} = parse_filename!(filename) + input_structure = input_structure!(table) + input_columns = input_columns!(table) + + statement = + """ + INSERT INTO {table:Identifier}(site_id,import_id,#{input_columns}) \ + SELECT {site_id:UInt64}, {import_id:UInt64}, * \ + FROM input({input_structure:String}) \ + WHERE date >= {start_date:Date} AND date <= {end_date:Date} \ + FORMAT CSVWithNames\ + """ + + params = %{ + "table" => table, + "site_id" => site_id, + "import_id" => import_id, + "input_structure" => input_structure, + "start_date" => start_date, + "end_date" => end_date + } + + # we are reading in 512KB chunks for better performance + # the default would've been line by line (not great for a CSV) + File.stream!(local_path, 512_000) + |> Stream.into(Ch.stream(conn, statement, params)) + |> Stream.run() + end) + end, + timeout: :infinity + ) end input_structures = %{ @@ -109,7 +174,7 @@ defmodule Plausible.Imported.CSVImporter do iex> date_range([ ...> %{"filename" => "imported_devices_20190101_20210101.csv"}, - ...> "imported_pages_20200101_20220101.csv" + ...> "pages_20200101_20220101.csv" ...> ]) Date.range(~D[2019-01-01], ~D[2022-01-01]) @@ -165,6 +230,9 @@ defmodule Plausible.Imported.CSVImporter do iex> parse_filename!("imported_devices_00010101_20250101.csv") {"imported_devices", ~D[0001-01-01], ~D[2025-01-01]} + iex> parse_filename!("devices_00010101_20250101.csv") + {"imported_devices", ~D[0001-01-01], ~D[2025-01-01]} + """ @spec parse_filename!(String.t()) :: {table :: String.t(), start_date :: Date.t(), end_date :: Date.t()} @@ -173,11 +241,29 @@ defmodule Plausible.Imported.CSVImporter do for {table, input_structure} <- input_structures do defp input_structure!(unquote(table)), do: unquote(input_structure) + input_columns = + input_structure + |> String.split(",", trim: true) + |> Enum.map_join(",", fn kv -> + [col, _type] = String.split(kv) + String.trim(col) + end) + + defp input_columns!(unquote(table)), do: unquote(input_columns) + def parse_filename!( <> ) do {unquote(table), parse_date!(start_date), parse_date!(end_date)} end + + "imported_" <> name = table + + def parse_filename!( + <> + ) do + {unquote(table), parse_date!(start_date), parse_date!(end_date)} + end end def parse_filename!(_filename) do @@ -195,6 +281,9 @@ defmodule Plausible.Imported.CSVImporter do iex> valid_filename?("imported_devices_00010101_20250101.csv") true + iex> valid_filename?("devices_00010101_20250101.csv") + true + """ @spec valid_filename?(String.t()) :: boolean def valid_filename?(filename) do @@ -220,10 +309,35 @@ defmodule Plausible.Imported.CSVImporter do iex> extract_table("imported_devices_00010101_20250101.csv") "imported_devices" + iex> extract_table("devices_00010101_20250101.csv") + "imported_devices" + """ @spec extract_table(String.t()) :: String.t() def extract_table(filename) do {table, _start_date, _end_date} = parse_filename!(filename) table end + + @doc """ + Returns local directory for CSV imports storage. + + Builds upon `$PERSISTENT_CACHE_DIR` (if set) and falls back to /tmp + + Examples: + + iex> local_dir = local_dir(_site_id = 37) + iex> String.ends_with?(local_dir, "/plausible-imports/37") + true + + """ + def local_dir(site_id) do + persistent_cache_dir = Application.get_env(:plausible, :persistent_cache_dir) + + Path.join([ + persistent_cache_dir || System.tmp_dir!(), + "plausible-imports", + Integer.to_string(site_id) + ]) + end end diff --git a/lib/plausible/imported/importer.ex b/lib/plausible/imported/importer.ex index bbac48809..a7f55dfda 100644 --- a/lib/plausible/imported/importer.ex +++ b/lib/plausible/imported/importer.ex @@ -73,13 +73,13 @@ defmodule Plausible.Imported.Importer do import_id = job.args[:import_id] receive do - {:notification, :analytics_imports_jobs, %{"complete" => ^import_id}} -> + {:notification, :analytics_imports_jobs, %{"event" => "complete", "import_id" => ^import_id}} -> IO.puts("Job completed") - {:notification, :analytics_imports_jobs, %{"transient_fail" => ^import_id}} -> + {:notification, :analytics_imports_jobs, %{"event" => "transient_fail", "import_id" => ^import_id}} -> IO.puts("Job failed transiently") - {:notification, :analytics_imports_jobs, %{"fail" => ^import_id}} -> + {:notification, :analytics_imports_jobs, %{"event" => "fail", "import_id" => ^import_id}} -> IO.puts("Job failed permanently") after 15_000 -> @@ -203,7 +203,11 @@ defmodule Plausible.Imported.Importer do @doc false def notify(site_import, event) do - Oban.Notifier.notify(Oban, @oban_channel, %{event => site_import.id}) + Oban.Notifier.notify(Oban, @oban_channel, %{ + "event" => event, + "import_id" => site_import.id, + "site_id" => site_import.site_id + }) end @doc """ diff --git a/lib/plausible/s3.ex b/lib/plausible/s3.ex index 12be32fc2..3db53297b 100644 --- a/lib/plausible/s3.ex +++ b/lib/plausible/s3.ex @@ -43,31 +43,27 @@ defmodule Plausible.S3 do Example: - iex> %{ - ...> s3_url: "http://localhost:10000/test-imports/123/imported_browsers.csv", - ...> presigned_url: "http://localhost:10000/test-imports/123/imported_browsers.csv?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=minioadmin" <> _ - ...> } = import_presign_upload(_site_id = 123, _filename = "imported_browsers.csv") + iex> upload = import_presign_upload(_site_id = 123, _filename = "imported_browsers.csv") + iex> true = String.ends_with?(upload.s3_url, "/test-imports/123/imported_browsers.csv") + iex> true = String.contains?(upload.presigned_url, "/test-imports/123/imported_browsers.csv?X-Amz-Algorithm=AWS4-HMAC-SHA256&") """ def import_presign_upload(site_id, filename) do config = ExAws.Config.new(:s3) - s3_path = Path.join(to_string(site_id), filename) + s3_path = Path.join(Integer.to_string(site_id), filename) bucket = imports_bucket() {:ok, presigned_url} = ExAws.S3.presigned_url(config, :put, bucket, s3_path) %{s3_url: extract_s3_url(presigned_url), presigned_url: presigned_url} end # to make ClickHouse see MinIO in dev and test envs we replace - # the host in the S3 URL with whatever's set in S3_CLICKHOUSE_HOST env var + # the host in the S3 URL with host.docker.internal or whatever's set in $MINIO_HOST_FOR_CLICKHOUSE if Mix.env() in [:dev, :test, :small_dev, :small_test] do defp extract_s3_url(presigned_url) do [s3_url, _] = String.split(presigned_url, "?") - - if ch_host = System.get_env("S3_CLICKHOUSE_HOST") do - URI.to_string(%URI{URI.parse(s3_url) | host: ch_host}) - else - s3_url - end + default_ch_host = unless System.get_env("CI"), do: "host.docker.internal" + ch_host = System.get_env("MINIO_HOST_FOR_CLICKHOUSE", default_ch_host) + URI.to_string(%URI{URI.parse(s3_url) | host: ch_host}) end else defp extract_s3_url(presigned_url) do @@ -79,36 +75,37 @@ defmodule Plausible.S3 do @doc """ Chunks and uploads Zip archive to the provided S3 destination. + In the current implementation the bucket always goes into the path component. + """ + @spec export_upload_multipart(Enumerable.t(), String.t(), Path.t(), String.t()) :: :ok + def export_upload_multipart(stream, s3_bucket, s3_path, filename) do + # 5 MiB is the smallest chunk size AWS S3 supports + chunk_into_parts(stream, 5 * 1024 * 1024) + |> ExAws.S3.upload(s3_bucket, s3_path, + content_disposition: Plausible.Exports.content_disposition(filename), + content_type: "application/zip", + timeout: :infinity + ) + |> ExAws.request!() + + :ok + end + + @doc """ Returns a presigned URL to download the exported Zip archive from S3. The URL expires in 24 hours. In the current implementation the bucket always goes into the path component. """ - @spec export_upload_multipart(Enumerable.t(), String.t(), Path.t(), String.t(), keyword) :: - :uri_string.uri_string() - def export_upload_multipart(stream, s3_bucket, s3_path, filename, config_overrides \\ []) do + @spec download_url(String.t(), Path.t()) :: :uri_string.uri_string() + def download_url(s3_bucket, s3_path) do config = ExAws.Config.new(:s3) - encoded_filename = URI.encode(filename) - disposition = ~s[attachment; filename="#{encoded_filename}"] - - disposition = - if encoded_filename != filename do - disposition <> "; filename*=utf-8''#{encoded_filename}" - else - disposition - end - - # 5 MiB is the smallest chunk size AWS S3 supports - chunk_into_parts(stream, 5 * 1024 * 1024) - |> ExAws.S3.upload(s3_bucket, s3_path, - content_disposition: disposition, - content_type: "application/zip" - ) - |> ExAws.request!(config_overrides) + # ex_aws_s3 doesn't allow expires_in longer than one week + one_week = 60 * 60 * 24 * 7 {:ok, download_url} = - ExAws.S3.presigned_url(config, :get, s3_bucket, s3_path, expires_in: _24hr = 86_400) + ExAws.S3.presigned_url(config, :get, s3_bucket, s3_path, expires_in: one_week) download_url end diff --git a/lib/plausible/sites.ex b/lib/plausible/sites.ex index b41591e3f..12d4235ad 100644 --- a/lib/plausible/sites.ex +++ b/lib/plausible/sites.ex @@ -23,14 +23,6 @@ defmodule Plausible.Sites do Repo.get_by!(Site, domain: domain) end - def get_domain!(site_id) do - Plausible.Repo.one!( - from s in Plausible.Site, - where: [id: ^site_id], - select: s.domain - ) - end - @spec toggle_pin(Auth.User.t(), Site.t()) :: {:ok, Site.UserPreference.t()} | {:error, :too_many_pins} def toggle_pin(user, site) do diff --git a/lib/plausible_web/controllers/site_controller.ex b/lib/plausible_web/controllers/site_controller.ex index 30a83a7de..1463565d7 100644 --- a/lib/plausible_web/controllers/site_controller.ex +++ b/lib/plausible_web/controllers/site_controller.ex @@ -1,6 +1,8 @@ defmodule PlausibleWeb.SiteController do use PlausibleWeb, :controller use Plausible.Repo + use Plausible + alias Plausible.Sites alias Plausible.Billing.Quota @@ -710,21 +712,27 @@ defmodule PlausibleWeb.SiteController do |> redirect(external: Routes.site_path(conn, :settings_integrations, site.domain)) end - def csv_export(conn, _params) do - %{site: site, current_user: user} = conn.assigns + on_full_build do + # exported archives are downloaded from object storage + else + alias Plausible.Exports - Oban.insert!( - Plausible.Workers.ExportCSV.new(%{ - "site_id" => site.id, - "email_to" => user.email, - "s3_bucket" => Plausible.S3.exports_bucket(), - "s3_path" => "Plausible-#{site.id}.zip" - }) - ) + def download_local_export(conn, _params) do + %{id: site_id, domain: domain, timezone: timezone} = conn.assigns.site - conn - |> put_flash(:success, "SCHEDULED. WAIT FOR MAIL") - |> redirect(to: Routes.site_path(conn, :settings_imports_exports, site.domain)) + if local_export = Exports.get_local_export(site_id, domain, timezone) do + %{path: export_path, name: name} = local_export + + conn + |> put_resp_content_type("application/zip") + |> put_resp_header("content-disposition", Exports.content_disposition(name)) + |> send_file(200, export_path) + else + conn + |> put_flash(:error, "Export not found") + |> redirect(external: Routes.site_path(conn, :settings_imports_exports, domain)) + end + end end def csv_import(conn, _params) do diff --git a/lib/plausible_web/email.ex b/lib/plausible_web/email.ex index 76af84627..01218d896 100644 --- a/lib/plausible_web/email.ex +++ b/lib/plausible_web/email.ex @@ -1,4 +1,5 @@ defmodule PlausibleWeb.Email do + use Plausible use Bamboo.Phoenix, view: PlausibleWeb.EmailView import Bamboo.PostmarkHelper @@ -346,6 +347,48 @@ defmodule PlausibleWeb.Email do }) end + def export_success(user, site, download_url, expires_at) do + subject = + on_full_build do + "Your Plausible Analytics export is now ready for download" + else + "Your export is now ready for download" + end + + expires_in = + if expires_at do + Timex.Format.DateTime.Formatters.Relative.format!( + expires_at, + "{relative}" + ) + end + + priority_email() + |> to(user) + |> tag("export-success") + |> subject(subject) + |> render("export_success.html", + user: user, + site: site, + download_url: download_url, + expires_in: expires_in + ) + end + + def export_failure(user, site) do + subject = + on_full_build do + "Your Plausible Analytics export has failed" + else + "Your export has failed" + end + + priority_email() + |> to(user) + |> subject(subject) + |> render("export_failure.html", user: user, site: site) + end + def error_report(reported_by, trace_id, feedback) do Map.new() |> Map.put(:layout, nil) diff --git a/lib/plausible_web/live/csv_export.ex b/lib/plausible_web/live/csv_export.ex new file mode 100644 index 000000000..b6759ca29 --- /dev/null +++ b/lib/plausible_web/live/csv_export.ex @@ -0,0 +1,281 @@ +defmodule PlausibleWeb.Live.CSVExport do + @moduledoc """ + LiveView allowing scheduling, watching, downloading, and deleting S3 and local exports. + """ + use PlausibleWeb, :live_view + use Phoenix.HTML + + alias PlausibleWeb.Components.Generic + alias Plausible.Exports + + # :not_mounted_at_router ensures we have already done auth checks in the controller + # if this liveview becomes available from the router, please make sure + # to check that current_user_role is allowed to manage site exports + @impl true + def mount(:not_mounted_at_router, session, socket) do + %{ + "storage" => storage, + "site_id" => site_id, + "email_to" => email_to + } = session + + socket = + socket + |> assign(site_id: site_id, email_to: email_to, storage: storage) + |> assign_new(:site, fn -> Plausible.Repo.get!(Plausible.Site, site_id) end) + |> fetch_export() + + if connected?(socket) do + Exports.oban_listen() + end + + {:ok, socket} + end + + defp fetch_export(socket) do + %{storage: storage, site_id: site_id} = socket.assigns + + get_export = + case storage do + "s3" -> + &Exports.get_s3_export/1 + + "local" -> + %{domain: domain, timezone: timezone} = socket.assigns.site + &Exports.get_local_export(&1, domain, timezone) + end + + socket = assign(socket, export: nil) + + if job = Exports.get_last_export_job(site_id) do + %Oban.Job{state: state} = job + + case state do + _ when state in ["scheduled", "available", "retryable"] -> + assign(socket, status: "in_progress") + + "executing" -> + # Exports.oban_notify/1 is called in `perform/1` and + # the notification arrives while the job.state is still "executing" + if export = get_export.(site_id) do + assign(socket, status: "ready", export: export) + else + assign(socket, status: "in_progress") + end + + "completed" -> + if export = get_export.(site_id) do + assign(socket, status: "ready", export: export) + else + assign(socket, status: "can_schedule") + end + + "discarded" -> + assign(socket, status: "failed") + + "cancelled" -> + # credo:disable-for-next-line Credo.Check.Refactor.Nesting + if export = get_export.(site_id) do + assign(socket, status: "ready", export: export) + else + assign(socket, status: "can_schedule") + end + end + else + if export = get_export.(site_id) do + assign(socket, status: "ready", export: export) + else + assign(socket, status: "can_schedule") + end + end + end + + @impl true + def render(assigns) do + ~H""" + <%= case @status do %> + <% "can_schedule" -> %> + <.prepare_download /> + <% "in_progress" -> %> + <.in_progress /> + <% "failed" -> %> + <.failed /> + <% "ready" -> %> + <.download storage={@storage} export={@export} /> + <% end %> + """ + end + + defp prepare_download(assigns) do + ~H""" + Prepare download +

+ Prepare your data for download by clicking the button above. When that's done, a Zip file that you can download will appear. +

+ """ + end + + defp in_progress(assigns) do + ~H""" +
+
+ + We are preparing your download ... +
+ +
+

+ The preparation of your stats might take a while. Depending on the volume of your data, it might take up to 20 minutes. Feel free to leave the page and return later. +

+ """ + end + + defp failed(assigns) do + ~H""" +
+ +

+ Something went wrong when preparing your download. Please + +

+
+ """ + end + + defp download(assigns) do + ~H""" + + +

+ Note that this file will expire + <.hint message={@export.expires_at}> + <%= Timex.Format.DateTime.Formatters.Relative.format!(@export.expires_at, "{relative}") %>. + +

+ +

+ Located at + <.hint message={@export.path}><%= format_path(@export.path) %> + (<%= format_bytes(@export.size) %>) +

+ """ + end + + defp hint(assigns) do + ~H""" + + <%= render_slot(@inner_block) %> + + """ + end + + @impl true + def handle_event("export", _params, socket) do + %{storage: storage, site_id: site_id, email_to: email_to} = socket.assigns + + schedule_result = + case storage do + "s3" -> Exports.schedule_s3_export(site_id, email_to) + "local" -> Exports.schedule_local_export(site_id, email_to) + end + + socket = + case schedule_result do + {:ok, _job} -> + fetch_export(socket) + + {:error, :no_data} -> + socket + |> put_flash(:error, "There is no data to export") + |> redirect( + external: + Routes.site_path(socket, :settings_imports_exports, socket.assigns.site.domain) + ) + end + + {:noreply, socket} + end + + def handle_event("cancel", _params, socket) do + if job = Exports.get_last_export_job(socket.assigns.site_id), do: Oban.cancel_job(job) + {:noreply, fetch_export(socket)} + end + + def handle_event("delete", _params, socket) do + %{storage: storage, site_id: site_id} = socket.assigns + + case storage do + "s3" -> Exports.delete_s3_export(site_id) + "local" -> Exports.delete_local_export(site_id) + end + + {:noreply, fetch_export(socket)} + end + + @impl true + def handle_info({:notification, Exports, %{"site_id" => site_id}}, socket) do + socket = + if site_id == socket.assigns.site_id do + fetch_export(socket) + else + socket + end + + {:noreply, socket} + end + + @format_path_regex ~r/^(?((.+?\/){3})).*(?(\/.*){3})$/ + + defp format_path(path) do + path_string = + path + |> to_string() + |> String.replace_prefix("\"", "") + |> String.replace_suffix("\"", "") + + case Regex.named_captures(@format_path_regex, path_string) do + %{"beginning" => beginning, "ending" => ending} -> "#{beginning}...#{ending}" + _ -> path_string + end + end + + defp format_bytes(bytes) when is_integer(bytes) do + cond do + bytes >= memory_unit("TiB") -> format_bytes(bytes, "TiB") + bytes >= memory_unit("GiB") -> format_bytes(bytes, "GiB") + bytes >= memory_unit("MiB") -> format_bytes(bytes, "MiB") + bytes >= memory_unit("KiB") -> format_bytes(bytes, "KiB") + true -> format_bytes(bytes, "B") + end + end + + defp format_bytes(bytes, "B"), do: "#{bytes} B" + + defp format_bytes(bytes, unit) do + value = bytes / memory_unit(unit) + "#{:erlang.float_to_binary(value, decimals: 1)} #{unit}" + end + + defp memory_unit("TiB"), do: 1024 * 1024 * 1024 * 1024 + defp memory_unit("GiB"), do: 1024 * 1024 * 1024 + defp memory_unit("MiB"), do: 1024 * 1024 + defp memory_unit("KiB"), do: 1024 +end diff --git a/lib/plausible_web/live/csv_import.ex b/lib/plausible_web/live/csv_import.ex index 38c97ca17..bfc617ae9 100644 --- a/lib/plausible_web/live/csv_import.ex +++ b/lib/plausible_web/live/csv_import.ex @@ -1,26 +1,76 @@ defmodule PlausibleWeb.Live.CSVImport do @moduledoc """ - LiveView allowing uploading CSVs for imported tables to S3 + LiveView allowing uploading CSVs for imported tables to S3 or local storage """ - use PlausibleWeb, :live_view - alias Plausible.Imported.CSVImporter + use PlausibleWeb, :live_view + alias PlausibleWeb.Components.Generic + + require Plausible.Imported.SiteImport + alias Plausible.Imported.CSVImporter + alias Plausible.Imported + + # :not_mounted_at_router ensures we have already done auth checks in the controller + # if this liveview becomes available from the router, please make sure + # to check that current_user_role is allowed to make site imports @impl true - def mount(_params, session, socket) do - %{"site_id" => site_id, "user_id" => user_id} = session + def mount(:not_mounted_at_router, session, socket) do + %{"site_id" => site_id, "current_user_id" => user_id, "storage" => storage} = session + + upload_opts = [ + accept: [".csv", "text/csv"], + auto_upload: true, + max_entries: length(Imported.tables()), + # 1GB + max_file_size: 1_000_000_000, + progress: &handle_progress/3 + ] + + upload_opts = + case storage do + "s3" -> [{:external, &presign_upload/2} | upload_opts] + "local" -> upload_opts + end + + upload_consumer = + case storage do + "s3" -> + fn meta, entry -> + {:ok, %{"s3_url" => meta.s3_url, "filename" => entry.client_name}} + end + + "local" -> + local_dir = CSVImporter.local_dir(site_id) + File.mkdir_p!(local_dir) + + fn meta, entry -> + local_path = Path.join(local_dir, Path.basename(meta.path)) + File.rename!(meta.path, local_path) + {:ok, %{"local_path" => local_path, "filename" => entry.client_name}} + end + end + + %{assigns: %{site: site}} = + socket = assign_new(socket, :site, fn -> Plausible.Repo.get!(Plausible.Site, site_id) end) + + # we'll listen for new completed imports to know + # when to reload the occupied ranges + if connected?(socket), do: Imported.listen() + + occupied_ranges = Imported.get_occupied_date_ranges(site) + native_stats_start_date = Plausible.Sites.native_stats_start_date(site) socket = socket - |> assign(site_id: site_id, user_id: user_id) - |> allow_upload(:import, - accept: [".csv", "text/csv"], - auto_upload: true, - max_entries: length(Plausible.Imported.tables()), - # 1GB - max_file_size: 1_000_000_000, - external: &presign_upload/2, - progress: &handle_progress/3 + |> assign( + site_id: site_id, + user_id: user_id, + storage: storage, + upload_consumer: upload_consumer, + occupied_ranges: occupied_ranges, + native_stats_start_date: native_stats_start_date ) + |> allow_upload(:import, upload_opts) |> process_imported_tables() {:ok, socket} @@ -32,8 +82,12 @@ defmodule PlausibleWeb.Live.CSVImport do
<.csv_picker upload={@uploads.import} imported_tables={@imported_tables} /> - <.confirm_button date_range={@date_range} can_confirm?={@can_confirm?} /> - + <.confirm_button date_range={@clamped_date_range} can_confirm?={@can_confirm?} /> + <.maybe_date_range_warning + :if={@original_date_range} + clamped={@clamped_date_range} + original={@original_date_range} + />

<%= error_to_string(error) %>

@@ -46,13 +100,11 @@ defmodule PlausibleWeb.Live.CSVImport do ~H"""