mirror of
https://github.com/plausible/analytics.git
synced 2024-11-26 11:44:03 +03:00
Split Clickhouse pools into Read-Only and Read/Write (dedicated to writes) (#2661)
* Configure ingest repo access/pool size If I'm not mistaken 3 is a sane default, the only inserts we're doing are: - session buffer dump - events buffer dump - GA import dump And all are serializable within their scopes? * Add IngestRepo * Start IngestRepo * Use IngestRepo for inserts * Annotate ClickhouseRepo as read_only So no insert* functions are expanded * Update moduledoc * rename alias * Fix default env var value so it can be casted * Use IngestRepo for migrations * Set default ingest pool size from 3 to 5 in case conns are restarting or else... * Ensure all Repo prometheus metrics are collected
This commit is contained in:
parent
4494252cce
commit
8f85b110aa
@ -1,7 +1,7 @@
|
||||
import Config
|
||||
|
||||
config :plausible,
|
||||
ecto_repos: [Plausible.Repo, Plausible.ClickhouseRepo]
|
||||
ecto_repos: [Plausible.Repo, Plausible.IngestRepo]
|
||||
|
||||
config :plausible, PlausibleWeb.Endpoint,
|
||||
pubsub_server: Plausible.PubSub,
|
||||
|
@ -81,6 +81,14 @@ ch_db_url =
|
||||
"http://plausible_events_db:8123/plausible_events_db"
|
||||
)
|
||||
|
||||
{ingest_pool_size, ""} =
|
||||
get_var_from_path_or_env(
|
||||
config_dir,
|
||||
"CLICKHOUSE_INGEST_POOL_SIZE",
|
||||
"5"
|
||||
)
|
||||
|> Integer.parse()
|
||||
|
||||
{ch_flush_interval_ms, ""} =
|
||||
config_dir
|
||||
|> get_var_from_path_or_env("CLICKHOUSE_FLUSH_INTERVAL_MS", "5000")
|
||||
@ -268,12 +276,19 @@ config :plausible, :google,
|
||||
max_buffer_size: get_int_from_path_or_env(config_dir, "GOOGLE_MAX_BUFFER_SIZE", 10_000)
|
||||
|
||||
config :plausible, Plausible.ClickhouseRepo,
|
||||
loggers: [Ecto.LogEntry],
|
||||
queue_target: 500,
|
||||
queue_interval: 2000,
|
||||
url: ch_db_url
|
||||
|
||||
config :plausible, Plausible.IngestRepo,
|
||||
loggers: [Ecto.LogEntry],
|
||||
queue_target: 500,
|
||||
queue_interval: 2000,
|
||||
url: ch_db_url,
|
||||
flush_interval_ms: ch_flush_interval_ms,
|
||||
max_buffer_size: ch_max_buffer_size
|
||||
max_buffer_size: ch_max_buffer_size,
|
||||
pool_size: ingest_pool_size
|
||||
|
||||
case mailer_adapter do
|
||||
"Bamboo.PostmarkAdapter" ->
|
||||
|
@ -9,6 +9,7 @@ defmodule Plausible.Application do
|
||||
children = [
|
||||
Plausible.Repo,
|
||||
Plausible.ClickhouseRepo,
|
||||
Plausible.IngestRepo,
|
||||
{Finch, name: Plausible.Finch, pools: finch_pool_config()},
|
||||
{Phoenix.PubSub, name: Plausible.PubSub},
|
||||
Plausible.Session.Salts,
|
||||
|
@ -1,7 +1,8 @@
|
||||
defmodule Plausible.ClickhouseRepo do
|
||||
use Ecto.Repo,
|
||||
otp_app: :plausible,
|
||||
adapter: ClickhouseEcto
|
||||
adapter: ClickhouseEcto,
|
||||
read_only: true
|
||||
|
||||
defmacro __using__(_) do
|
||||
quote do
|
||||
|
@ -2,6 +2,8 @@ defmodule Plausible.Event.WriteBuffer do
|
||||
use GenServer
|
||||
require Logger
|
||||
|
||||
alias Plausible.IngestRepo
|
||||
|
||||
def start_link(_opts) do
|
||||
GenServer.start_link(__MODULE__, [], name: __MODULE__)
|
||||
end
|
||||
@ -62,15 +64,15 @@ defmodule Plausible.Event.WriteBuffer do
|
||||
events ->
|
||||
Logger.info("Flushing #{length(events)} events")
|
||||
events = Enum.map(events, &(Map.from_struct(&1) |> Map.delete(:__meta__)))
|
||||
Plausible.ClickhouseRepo.insert_all(Plausible.ClickhouseEvent, events)
|
||||
IngestRepo.insert_all(Plausible.ClickhouseEvent, events)
|
||||
end
|
||||
end
|
||||
|
||||
defp flush_interval_ms() do
|
||||
Keyword.fetch!(Application.get_env(:plausible, Plausible.ClickhouseRepo), :flush_interval_ms)
|
||||
Keyword.fetch!(Application.get_env(:plausible, IngestRepo), :flush_interval_ms)
|
||||
end
|
||||
|
||||
defp max_buffer_size() do
|
||||
Keyword.fetch!(Application.get_env(:plausible, Plausible.ClickhouseRepo), :max_buffer_size)
|
||||
Keyword.fetch!(Application.get_env(:plausible, IngestRepo), :max_buffer_size)
|
||||
end
|
||||
end
|
||||
|
@ -91,6 +91,6 @@ defmodule Plausible.Google.Buffer do
|
||||
Process.sleep(1000)
|
||||
|
||||
Logger.info("Import: Flushing #{length(records)} from #{table_name} buffer")
|
||||
Plausible.ClickhouseRepo.insert_all(table_name, records)
|
||||
Plausible.IngestRepo.insert_all(table_name, records)
|
||||
end
|
||||
end
|
||||
|
17
lib/plausible/ingest_repo.ex
Normal file
17
lib/plausible/ingest_repo.ex
Normal file
@ -0,0 +1,17 @@
|
||||
defmodule Plausible.IngestRepo do
|
||||
@moduledoc """
|
||||
Write-centric Clickhouse access interface
|
||||
"""
|
||||
|
||||
use Ecto.Repo,
|
||||
otp_app: :plausible,
|
||||
adapter: ClickhouseEcto
|
||||
|
||||
defmacro __using__(_) do
|
||||
quote do
|
||||
alias Plausible.IngestRepo
|
||||
import Ecto
|
||||
import Ecto.Query, only: [from: 1, from: 2]
|
||||
end
|
||||
end
|
||||
end
|
@ -9,7 +9,7 @@ defmodule Plausible.PromEx do
|
||||
Plugins.Application,
|
||||
Plugins.Beam,
|
||||
{Plugins.Phoenix, router: PlausibleWeb.Router, endpoint: PlausibleWeb.Endpoint},
|
||||
Plugins.Ecto,
|
||||
{Plugins.Ecto, repos: [Plausible.Repo, Plausible.ClickhouseRepo, Plausible.IngestRepo]},
|
||||
Plugins.Oban,
|
||||
Plausible.PromEx.Plugins.PlausibleMetrics
|
||||
]
|
||||
|
@ -2,6 +2,8 @@ defmodule Plausible.Session.WriteBuffer do
|
||||
use GenServer
|
||||
require Logger
|
||||
|
||||
alias Plausible.IngestRepo
|
||||
|
||||
def start_link(_opts) do
|
||||
GenServer.start_link(__MODULE__, [], name: __MODULE__)
|
||||
end
|
||||
@ -67,15 +69,15 @@ defmodule Plausible.Session.WriteBuffer do
|
||||
|> Enum.map(&(Map.from_struct(&1) |> Map.delete(:__meta__)))
|
||||
|> Enum.reverse()
|
||||
|
||||
Plausible.ClickhouseRepo.insert_all(Plausible.ClickhouseSession, sessions)
|
||||
IngestRepo.insert_all(Plausible.ClickhouseSession, sessions)
|
||||
end
|
||||
end
|
||||
|
||||
defp flush_interval_ms() do
|
||||
Keyword.fetch!(Application.get_env(:plausible, Plausible.ClickhouseRepo), :flush_interval_ms)
|
||||
Keyword.fetch!(Application.get_env(:plausible, IngestRepo), :flush_interval_ms)
|
||||
end
|
||||
|
||||
defp max_buffer_size() do
|
||||
Keyword.fetch!(Application.get_env(:plausible, Plausible.ClickhouseRepo), :max_buffer_size)
|
||||
Keyword.fetch!(Application.get_env(:plausible, IngestRepo), :max_buffer_size)
|
||||
end
|
||||
end
|
||||
|
@ -7,7 +7,7 @@ defmodule Plausible.ImportedTest do
|
||||
defp import_data(ga_data, site_id, table_name) do
|
||||
ga_data
|
||||
|> Plausible.Imported.from_google_analytics(site_id, table_name)
|
||||
|> then(&Plausible.ClickhouseRepo.insert_all(table_name, &1))
|
||||
|> then(&Plausible.IngestRepo.insert_all(table_name, &1))
|
||||
end
|
||||
|
||||
describe "Parse and import third party data fetched from Google Analytics" do
|
||||
|
@ -71,7 +71,7 @@ defmodule Plausible.TestUtils do
|
||||
Factory.build(:pageview, pageview) |> Map.from_struct() |> Map.delete(:__meta__)
|
||||
end)
|
||||
|
||||
Plausible.ClickhouseRepo.insert_all("events", pageviews)
|
||||
Plausible.IngestRepo.insert_all("events", pageviews)
|
||||
end
|
||||
|
||||
def create_events(events) do
|
||||
@ -80,7 +80,7 @@ defmodule Plausible.TestUtils do
|
||||
Factory.build(:event, event) |> Map.from_struct() |> Map.delete(:__meta__)
|
||||
end)
|
||||
|
||||
Plausible.ClickhouseRepo.insert_all("events", events)
|
||||
Plausible.IngestRepo.insert_all("events", events)
|
||||
end
|
||||
|
||||
def create_sessions(sessions) do
|
||||
@ -89,7 +89,7 @@ defmodule Plausible.TestUtils do
|
||||
Factory.build(:ch_session, session) |> Map.from_struct() |> Map.delete(:__meta__)
|
||||
end)
|
||||
|
||||
Plausible.ClickhouseRepo.insert_all("sessions", sessions)
|
||||
Plausible.IngestRepo.insert_all("sessions", sessions)
|
||||
end
|
||||
|
||||
def log_in(%{user: user, conn: conn}) do
|
||||
@ -163,7 +163,7 @@ defmodule Plausible.TestUtils do
|
||||
|
||||
defp populate_imported_stats(events) do
|
||||
Enum.group_by(events, &Map.fetch!(&1, :table), &Map.delete(&1, :table))
|
||||
|> Enum.map(fn {table, events} -> Plausible.ClickhouseRepo.insert_all(table, events) end)
|
||||
|> Enum.map(fn {table, events} -> Plausible.IngestRepo.insert_all(table, events) end)
|
||||
end
|
||||
|
||||
def relative_time(shifts) do
|
||||
|
Loading…
Reference in New Issue
Block a user