Generalize Cache/Warmers - extract only specifics to Site.Cache (#3716)

* Simplify caches config

The intervals were proven to never change really,
and previous shape made them not very discoverable either.
We'll look into a more declarative setup.

* Aim for more declarative supervision tree setup

* Generalize `Plausible.Cache`

This makes the `Site.Cache` module implement the specifics,
leaving the common bits reusable for upcoming cache processes.

* Generalize `Plausible.Cache` warmers

* Fix typos
This commit is contained in:
hq1 2024-01-30 10:11:29 +01:00 committed by GitHub
parent d5818a63f7
commit 931161f693
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 589 additions and 442 deletions

View File

@ -65,10 +65,7 @@ config :plausible, Plausible.Repo,
connect_timeout: 300_000,
handshake_timeout: 300_000
config :plausible,
sites_by_domain_cache_enabled: true,
sites_by_domain_cache_refresh_interval_max_jitter: :timer.seconds(5),
sites_by_domain_cache_refresh_interval: :timer.minutes(15)
config :plausible, Plausible.Cache, enabled: true
config :plausible, Plausible.Ingestion.Counters, enabled: true

View File

@ -22,8 +22,9 @@ config :bamboo, :refute_timeout, 10
config :plausible,
session_timeout: 0,
http_impl: Plausible.HTTPClient.Mock,
sites_by_domain_cache_enabled: false
http_impl: Plausible.HTTPClient.Mock
config :plausible, Plausible.Cache, enabled: false
config :ex_money, api_module: Plausible.ExchangeRateMock

View File

@ -27,8 +27,20 @@ defmodule Plausible.Application do
id: :cachex_sessions
),
{Plausible.Site.Cache, []},
{Plausible.Site.Cache.Warmer.All, []},
{Plausible.Site.Cache.Warmer.RecentlyUpdated, []},
{Plausible.Cache.Warmer,
[
child_name: Plausible.Site.Cache.All,
cache_impl: Plausible.Site.Cache,
interval: :timer.minutes(15) + Enum.random(1..:timer.seconds(10)),
warmer_fn: :refresh_all
]},
{Plausible.Cache.Warmer,
[
child_name: Plausible.Site.Cache.RecentlyUpdated,
cache_impl: Plausible.Site.Cache,
interval: :timer.seconds(30),
warmer_fn: :refresh_updated_recently
]},
{Plausible.Auth.TOTP.Vault, key: totp_vault_key()},
PlausibleWeb.Endpoint,
{Oban, Application.get_env(:plausible, Oban)},

210
lib/plausible/cache.ex Normal file
View File

@ -0,0 +1,210 @@
defmodule Plausible.Cache do
@moduledoc """
Caching interface specific for Plausible. Usage:
use Plausible.Cache
# - Implement the callbacks required
# - Optionally override `unwrap_cache_keys/1`
# - Populate the cache with `Plausible.Cache.Warmer`
Serves as a thin wrapper around Cachex, but the underlying
implementation can be transparently swapped.
Even though the Cachex process is started, cache access is disabled
during tests via the `:plausible, #{__MODULE__}, enabled: bool()` application env key.
This can be overridden on case by case basis, using the child specs options.
The `base_db_query/0` callback is used to generate the base query that is
executed on every cache refresh.
There are two modes of refresh operation: `:all` and `:updated_recently`;
the former will invoke the query as is and clear all the existing entries,
while the latter will attempt to limit the query to only the records that
have been updated in the last 15 minutes and try to merge the new results with
existing cache entries.
Both refresh modes are normally executed periodically from within a warmer process;
see: `Plausible.Cache.Warmer`. The reason for two modes is that the latter is lighter
on the database and can be executed more frequently.
When Cache is disabled via application env, the `get/1` function
falls back to pure database lookups (implemented via `get_from_source/1` callback.
This should help with introducing cached lookups in existing code,
so that no existing tests should break.
Refreshing the cache emits telemetry event defined as per `telemetry_event_refresh/2`.
"""
@doc "Unique cache name, used by underlying implementation"
@callback name() :: atom()
@doc "Supervisor child id, must be unique within the supervision tree"
@callback child_id() :: atom()
@doc "Counts all items at the source, an aggregate query most likely"
@callback count_all() :: integer()
@doc "Optionally unwraps the keys of the cache items, in case one item is stored under multiple keys"
@callback unwrap_cache_keys([any()]) :: [{any(), any()}]
@doc "Returns the base Ecto query used to refresh the cache"
@callback base_db_query() :: Ecto.Query.t()
@doc "Retrieves the item from the source, in case the cache is disabled"
@callback get_from_source(any()) :: any()
@doc "Looks for application env value at `:plausible, #{__MODULE__}, enabled: bool()`"
def enabled?() do
Application.fetch_env!(:plausible, __MODULE__)[:enabled] == true
end
defmacro __using__(_opts) do
quote do
require Logger
@behaviour Plausible.Cache
@modes [:all, :updated_recently]
@spec get(String.t(), Keyword.t()) :: any() | nil
def get(key, opts \\ []) do
cache_name = Keyword.get(opts, :cache_name, name())
force? = Keyword.get(opts, :force?, false)
if Plausible.Cache.enabled?() or force? do
case Cachex.get(cache_name, key) do
{:ok, nil} ->
nil
{:ok, item} ->
item
{:error, e} ->
Logger.error("Error retrieving key from '#{inspect(cache_name)}': #{inspect(e)}")
nil
end
else
get_from_source(key)
end
end
def unwrap_cache_keys(items), do: items
defoverridable unwrap_cache_keys: 1
@spec refresh_all(Keyword.t()) :: :ok
def refresh_all(opts \\ []) do
refresh(
:all,
base_db_query(),
Keyword.put(opts, :delete_stale_items?, true)
)
end
@spec refresh_updated_recently(Keyword.t()) :: :ok
def refresh_updated_recently(opts \\ []) do
recently_updated_query =
from [s, _rg] in base_db_query(),
order_by: [asc: s.updated_at],
where: s.updated_at > ago(^15, "minute")
refresh(
:updated_recently,
recently_updated_query,
Keyword.put(opts, :delete_stale_items?, false)
)
end
@spec merge_items(new_items :: [any()], opts :: Keyword.t()) :: :ok
def merge_items(new_items, opts \\ [])
def merge_items([], _), do: :ok
def merge_items(new_items, opts) do
new_items = unwrap_cache_keys(new_items)
cache_name = Keyword.get(opts, :cache_name, name())
true = Cachex.put_many!(cache_name, new_items)
if Keyword.get(opts, :delete_stale_items?, true) do
{:ok, old_keys} = Cachex.keys(cache_name)
new = MapSet.new(Enum.into(new_items, [], fn {k, _} -> k end))
old = MapSet.new(old_keys)
old
|> MapSet.difference(new)
|> Enum.each(fn k ->
Cachex.del(cache_name, k)
end)
end
:ok
end
@spec child_spec(Keyword.t()) :: Supervisor.child_spec()
def child_spec(opts) do
cache_name = Keyword.get(opts, :cache_name, name())
child_id = Keyword.get(opts, :child_id, child_id())
Supervisor.child_spec(
{Cachex, name: cache_name, limit: nil, stats: true},
id: child_id
)
end
@doc """
Ensures the cache has non-zero size unless no items exist.
Useful for orchestrating app startup to prevent the service
going up asynchronously with an empty cache.
"""
@spec ready?(atom()) :: boolean
def ready?(cache_name \\ name()) do
case size(cache_name) do
n when n > 0 ->
true
0 ->
count_all() == 0
end
end
@spec size() :: non_neg_integer()
def size(cache_name \\ name()) do
case Cachex.size(cache_name) do
{:ok, size} -> size
_ -> 0
end
end
@spec hit_rate() :: number()
def hit_rate(cache_name \\ name()) do
case Cachex.stats(cache_name) do
{:ok, stats} -> Map.get(stats, :hit_rate, 0)
_ -> 0
end
end
@spec telemetry_event_refresh(atom(), atom()) :: list(atom())
def telemetry_event_refresh(cache_name \\ name(), mode) when mode in @modes do
[:plausible, :cache, cache_name, :refresh, mode]
end
defp refresh(mode, query, opts) when mode in @modes do
cache_name = Keyword.get(opts, :cache_name, name())
measure_duration(telemetry_event_refresh(cache_name, mode), fn ->
items = Plausible.Repo.all(query)
:ok = merge_items(items, opts)
end)
:ok
end
defp measure_duration(event, fun) when is_function(fun, 0) do
{duration, result} = time_it(fun)
:telemetry.execute(event, %{duration: duration}, %{})
result
end
defp time_it(fun) do
start = System.monotonic_time()
result = fun.()
stop = System.monotonic_time()
{stop - start, result}
end
end
end
end

83
lib/plausible/cache/warmer.ex vendored Normal file
View File

@ -0,0 +1,83 @@
defmodule Plausible.Cache.Warmer do
@moduledoc """
A periodic cache warmer.
Child specification options available:
* `cache_impl` - module expected to implement `Plausible.Cache` behaviour
* `interval` - the number of milliseconds for each warm-up cycle
* `cache_name` - defaults to cache_impl.name() but can be overridden for testing
* `force_start?` - enforcess process startup for testing, even if it's barred
by `Plausible.Cache.enabled?`. This is useful for avoiding issues with DB ownership
and async tests.
* `warmer_fn` - by convention, either `:refresh_all` or `:refresh_updated_recently`,
both are automatically provided by `cache_impl` module. Technically any exported
or captured function will work, if need be.
See tests for more comprehensive examples.
"""
@behaviour :gen_cycle
require Logger
@spec child_spec(Keyword.t()) :: Supervisor.child_spec() | :ignore
def child_spec(opts) do
child_name = Keyword.get(opts, :child_name, __MODULE__)
%{
id: child_name,
start: {:gen_cycle, :start_link, [{:local, child_name}, __MODULE__, opts]}
}
end
@impl true
def init_cycle(opts) do
cache_impl = Keyword.fetch!(opts, :cache_impl)
cache_name = Keyword.get(opts, :cache_name, cache_impl.name())
interval = Keyword.fetch!(opts, :interval)
warmer_fn =
case Keyword.fetch!(opts, :warmer_fn) do
f when is_function(f, 1) ->
f
f when is_atom(f) ->
true = function_exported?(cache_impl, f, 1)
Function.capture(cache_impl, f, 1)
end
force_start? = Keyword.get(opts, :force_start?, false)
if Plausible.Cache.enabled?() or force_start? do
Logger.info(
"#{__MODULE__} initializing #{inspect(warmer_fn)} #{cache_name} with interval #{interval}..."
)
{:ok,
{interval,
opts
|> Keyword.put(:cache_name, cache_name)
|> Keyword.put(:warmer_fn, warmer_fn)}}
else
:ignore
end
end
@impl true
def handle_cycle(opts) do
cache_name = Keyword.fetch!(opts, :cache_name)
warmer_fn = Keyword.fetch!(opts, :warmer_fn)
Logger.info("#{__MODULE__} running #{inspect(warmer_fn)} on #{cache_name}...")
warmer_fn.(opts)
{:continue_hibernated, opts}
end
@impl true
def handle_info(_msg, state) do
{:continue, state}
end
end

View File

@ -1,41 +1,15 @@
defmodule Plausible.Site.Cache do
@moduledoc """
A "sites by domain" caching interface.
Serves as a thin wrapper around Cachex, but the underlying
implementation can be transparently swapped.
Even though the Cachex process is started, cache access is disabled
during tests via the `:sites_by_domain_cache_enabled` application env key.
This can be overridden on case by case basis, using the child specs options.
NOTE: the cache allows lookups by both `domain` and `domain_changed_from`
The cache allows lookups by both `domain` and `domain_changed_from`
fields - this is to allow traffic from sites whose domains changed within a certain
grace period (see: `Plausible.Site.Transfer`).
When Cache is disabled via application env, the `get/1` function
falls back to pure database lookups. This should help with introducing
cached lookups in existing code, so that no existing tests should break.
To differentiate cached Site structs from those retrieved directly from the
database, a virtual schema field `from_cache?: true` is set.
This indicates the `Plausible.Site` struct is incomplete in comparison to its
database counterpart -- to spare bandwidth and query execution time,
only selected database columns are retrieved and cached.
There are two modes of refreshing the cache: `:all` and `:updated_recently`.
* `:all` means querying the database for all Site entries and should be done
periodically (via `Cache.Warmer`). All stale Cache entries are cleared
upon persisting the new batch.
* `:updated_recently` attempts to re-query sites updated within the last
15 minutes only, to account for most recent changes. This refresh
is lighter on the database and is meant to be executed more frequently
(via `Cache.Warmer.RecentlyUpdated`).
Refreshing the cache emits telemetry event defined as per `telemetry_event_refresh/2`.
The `@cached_schema_fields` attribute defines the list of DB columns
queried on each cache refresh.
@ -47,8 +21,9 @@ defmodule Plausible.Site.Cache do
alias Plausible.Site
use Plausible.Cache
@cache_name :sites_by_domain
@modes [:all, :updated_recently]
@cached_schema_fields ~w(
id
@ -58,62 +33,19 @@ defmodule Plausible.Site.Cache do
ingest_rate_limit_threshold
)a
@type t() :: Site.t()
@spec name() :: atom()
@impl true
def name(), do: @cache_name
@spec child_spec(Keyword.t()) :: Supervisor.child_spec()
def child_spec(opts) do
cache_name = Keyword.get(opts, :cache_name, @cache_name)
child_id = Keyword.get(opts, :child_id, :cachex_sites)
@impl true
def child_id(), do: :cachex_sites
Supervisor.child_spec(
{Cachex, name: cache_name, limit: nil, stats: true},
id: child_id
)
@impl true
def count_all() do
Plausible.Repo.aggregate(Site, :count)
end
@doc """
Ensures the cache has non-zero size unless no sites exist.
Useful for orchestrating app startup to prevent the service
going up asynchronously with an empty cache.
"""
@spec ready?(atom()) :: boolean
def ready?(cache_name \\ @cache_name) do
case size(cache_name) do
n when n > 0 ->
true
0 ->
Plausible.Repo.aggregate(Site, :count) == 0
end
end
@spec refresh_all(Keyword.t()) :: :ok
def refresh_all(opts \\ []) do
refresh(
:all,
sites_by_domain_query(),
Keyword.put(opts, :delete_stale_items?, true)
)
end
@spec refresh_updated_recently(Keyword.t()) :: :ok
def refresh_updated_recently(opts \\ []) do
recently_updated_sites_query =
from [s, _rg] in sites_by_domain_query(),
order_by: [asc: s.updated_at],
where: s.updated_at > ago(^15, "minute")
refresh(
:updated_recently,
recently_updated_sites_query,
Keyword.put(opts, :delete_stale_items?, false)
)
end
defp sites_by_domain_query do
@impl true
def base_db_query() do
from s in Site,
left_join: rg in assoc(s, :revenue_goals),
inner_join: owner in assoc(s, :owner),
@ -125,68 +57,9 @@ defmodule Plausible.Site.Cache do
preload: [revenue_goals: rg, owner: owner]
end
@spec merge(new_items :: [Site.t()], opts :: Keyword.t()) :: :ok
def merge(new_items, opts \\ [])
def merge([], _), do: :ok
def merge(new_items, opts) do
new_items = unwrap_cache_keys(new_items)
cache_name = Keyword.get(opts, :cache_name, @cache_name)
true = Cachex.put_many!(cache_name, new_items)
if Keyword.get(opts, :delete_stale_items?, true) do
{:ok, old_keys} = Cachex.keys(cache_name)
new = MapSet.new(Enum.into(new_items, [], fn {k, _} -> k end))
old = MapSet.new(old_keys)
old
|> MapSet.difference(new)
|> Enum.each(fn k ->
Cachex.del(cache_name, k)
end)
end
:ok
end
@spec size() :: non_neg_integer()
def size(cache_name \\ @cache_name) do
{:ok, size} = Cachex.size(cache_name)
size
end
@spec hit_rate() :: number()
def hit_rate(cache_name \\ @cache_name) do
{:ok, stats} = Cachex.stats(cache_name)
Map.get(stats, :hit_rate, 0)
end
@spec get(String.t(), Keyword.t()) :: t() | nil
def get(domain, opts \\ []) do
cache_name = Keyword.get(opts, :cache_name, @cache_name)
force? = Keyword.get(opts, :force?, false)
if enabled?() or force? do
case Cachex.get(cache_name, domain) do
{:ok, nil} ->
nil
{:ok, site} ->
site
{:error, e} ->
Logger.error("Error retrieving domain from '#{inspect(cache_name)}': #{inspect(e)}")
nil
end
else
get_from_source(domain)
end
end
defp get_from_source(domain) do
query = from s in sites_by_domain_query(), where: s.domain == ^domain
@impl true
def get_from_source(domain) do
query = from s in base_db_query(), where: s.domain == ^domain
case Plausible.Repo.one(query) do
{_, _, site} -> %Site{site | from_cache?: false}
@ -205,11 +78,6 @@ defmodule Plausible.Site.Cache do
end
end
@spec telemetry_event_refresh(atom(), atom()) :: list(atom())
def telemetry_event_refresh(cache_name \\ @cache_name, mode) when mode in @modes do
[:plausible, :cache, cache_name, :refresh, mode]
end
@spec touch_site!(Site.t(), DateTime.t()) :: Site.t()
def touch_site!(site, now) do
now =
@ -222,35 +90,8 @@ defmodule Plausible.Site.Cache do
|> Plausible.Repo.update!()
end
def enabled?() do
Application.fetch_env!(:plausible, :sites_by_domain_cache_enabled) == true
end
defp refresh(mode, query, opts) when mode in @modes do
cache_name = Keyword.get(opts, :cache_name, @cache_name)
measure_duration(telemetry_event_refresh(cache_name, mode), fn ->
sites_by_domain = Plausible.Repo.all(query)
:ok = merge(sites_by_domain, opts)
end)
:ok
end
defp measure_duration(event, fun) when is_function(fun, 0) do
{duration, result} = time_it(fun)
:telemetry.execute(event, %{duration: duration}, %{})
result
end
defp time_it(fun) do
start = System.monotonic_time()
result = fun.()
stop = System.monotonic_time()
{stop - start, result}
end
defp unwrap_cache_keys(items) do
@impl true
def unwrap_cache_keys(items) do
Enum.reduce(items, [], fn
{domain, nil, object}, acc ->
[{domain, object} | acc]

View File

@ -1,85 +0,0 @@
defmodule Plausible.Site.Cache.Warmer do
@moduledoc """
A periodic cache warmer.
Queries all Sites from Postgres, every `interval` and pre-populates the cache.
After each run the process is hibernated, triggering garbage collection.
Currently Cachex is used, but the underlying implementation can be transparently swapped.
Child specification options available:
* `interval` - the number of milliseconds for each warm-up cycle, defaults
to `:sites_by_domain_cache_refresh_interval` application env value
with random jitter added, for which the maximum is stored under
`:sites_by_domain_cache_refresh_interval_max_jitter` key.
* `cache_name` - defaults to Cache.name() but can be overriden for testing
* `force_start?` - enforcess process startup for testing, even if it's barred
by `Cache.enabled?`. This is useful for avoiding issues with DB ownership
and async tests.
* `warmer_fn` - used for testing, a custom function to retrieve the items meant
to be cached during the warm-up cycle.
See tests for more comprehensive examples.
"""
@behaviour :gen_cycle
require Logger
alias Plausible.Site.Cache
@spec child_spec(Keyword.t()) :: Supervisor.child_spec() | :ignore
def child_spec(opts) do
child_name = Keyword.get(opts, :child_name, __MODULE__)
%{
id: child_name,
start: {:gen_cycle, :start_link, [{:local, child_name}, __MODULE__, opts]}
}
end
@impl true
def init_cycle(opts) do
interval = Keyword.get(opts, :interval, interval())
force_start? = Keyword.get(opts, :force_start?, false)
cache_name = Keyword.get(opts, :cache_name, Cache.name())
if Cache.enabled?() or force_start? do
Logger.info("Initializing #{__MODULE__} with interval #{interval}")
{:ok, {interval, Keyword.put(opts, :cache_name, cache_name)}}
else
:ignore
end
end
@impl true
def handle_cycle(opts) do
cache_name = Keyword.fetch!(opts, :cache_name)
Logger.info("Refreshing #{cache_name} cache...")
warmer_fn = Keyword.get(opts, :warmer_fn, &Cache.refresh_all/1)
warmer_fn.(opts)
{:continue_hibernated, opts}
end
@impl true
def handle_info(_msg, state) do
{:continue, state}
end
@spec interval() :: pos_integer()
def interval() do
interval = Application.fetch_env!(:plausible, :sites_by_domain_cache_refresh_interval)
interval + jitter()
end
defp jitter() do
max_jitter =
Application.fetch_env!(:plausible, :sites_by_domain_cache_refresh_interval_max_jitter)
Enum.random(1..max_jitter)
end
end

View File

@ -1,8 +0,0 @@
defmodule Plausible.Site.Cache.Warmer.All do
@moduledoc """
A Cache.Warmer adapter that refreshes the Sites Cache fully.
This module exists only to make it explicit what the warmer
refreshes, to be used in the supervisor tree.
"""
defdelegate child_spec(opts), to: Plausible.Site.Cache.Warmer
end

View File

@ -1,20 +0,0 @@
defmodule Plausible.Site.Cache.Warmer.RecentlyUpdated do
@moduledoc """
A Cache.Warmer adapter that only refreshes the Cache
with recently updated sites every 30 seconds.
"""
alias Plausible.Site.Cache
@spec child_spec(Keyword.t()) :: Supervisor.child_spec() | :ignore
def child_spec(opts) do
child_name = Keyword.get(opts, :child_name, __MODULE__)
opts = [
child_name: child_name,
interval: :timer.seconds(30),
warmer_fn: &Cache.refresh_updated_recently/1
]
Plausible.Site.Cache.Warmer.child_spec(opts)
end
end

View File

@ -32,7 +32,7 @@ defmodule Plausible.Stats.Comparisons do
more details.
The comparison query returned by the function has its end date restricted to
the current day. This can be overriden by the `now` option, described below.
the current day. This can be overridden by the `now` option, described below.
## Options

View File

@ -0,0 +1,183 @@
defmodule Plausible.CacheTest do
use Plausible.DataCase, async: true
alias Plausible.Cache
import ExUnit.CaptureLog
defmodule ExampleCache do
use Plausible.Cache
def name(), do: __MODULE__
def child_id(), do: __MODULE__
def count_all(), do: 100
def base_db_query() do
%Ecto.Query{}
end
def get_from_source("existing_key") do
:from_source
end
def get_from_source(_) do
nil
end
end
describe "public cache interface" do
test "cache process is started, but falls back to the database if cache is disabled", %{
test: test
} do
{:ok, _} =
Supervisor.start_link(
[{ExampleCache, [cache_name: ExampleCache.name(), child_id: test]}],
strategy: :one_for_one,
name: :"cache_supervisor_#{test}"
)
insert(:site, domain: "example.test")
refute Cache.enabled?()
assert Process.alive?(Process.whereis(ExampleCache.name()))
refute Process.whereis(ExampleCache.Warmer)
assert :from_source = ExampleCache.get("existing_key")
assert ExampleCache.size() == 0
refute ExampleCache.get("other.test")
end
test "critical cache errors are logged and nil is returned" do
log =
capture_log(fn ->
assert ExampleCache.get("key", force?: true, cache_name: NonExistingCache) == nil
end)
assert log =~ "Error retrieving key from 'NonExistingCache': :no_cache"
end
end
describe "merging cache items" do
test "merging adds new items", %{test: test} do
{:ok, _} = start_test_cache(test)
:ok = ExampleCache.merge_items([{"item1", :item1}], cache_name: test)
assert :item1 == ExampleCache.get("item1", cache_name: test, force?: true)
end
test "merging no new items leaves the old cache intact", %{test: test} do
{:ok, _} = start_test_cache(test)
:ok = ExampleCache.merge_items([{"item1", :item1}], cache_name: test)
:ok = ExampleCache.merge_items([], cache_name: test)
assert :item1 == ExampleCache.get("item1", cache_name: test, force?: true)
end
test "merging removes stale items", %{test: test} do
{:ok, _} = start_test_cache(test)
:ok = ExampleCache.merge_items([{"item1", :item1}], cache_name: test)
:ok = ExampleCache.merge_items([{"item2", :item2}], cache_name: test)
refute ExampleCache.get("item1", cache_name: test, force?: true)
assert ExampleCache.get("item2", cache_name: test, force?: true)
end
test "merging optionally leaves stale items intact", %{test: test} do
{:ok, _} = start_test_cache(test)
:ok = ExampleCache.merge_items([{"item1", :item1}], cache_name: test)
:ok =
ExampleCache.merge_items([{"item2", :item2}],
cache_name: test,
delete_stale_items?: false
)
assert ExampleCache.get("item1", cache_name: test, force?: true)
assert ExampleCache.get("item2", cache_name: test, force?: true)
end
test "merging updates changed items", %{test: test} do
{:ok, _} = start_test_cache(test)
:ok =
ExampleCache.merge_items([{"item1", :item1}, {"item2", :item2}],
cache_name: test
)
:ok =
ExampleCache.merge_items([{"item1", :changed}, {"item2", :item2}],
cache_name: test
)
assert :changed == ExampleCache.get("item1", cache_name: test, force?: true)
assert :item2 == ExampleCache.get("item2", cache_name: test, force?: true)
end
end
describe "warming the cache" do
test "cache warmer process warms up the cache", %{test: test} do
test_pid = self()
opts = [
cache_impl: ExampleCache,
force_start?: true,
warmer_fn: report_back(test_pid),
cache_name: test,
interval: 30
]
{:ok, _} =
Supervisor.start_link([{Plausible.Cache.Warmer, opts}],
strategy: :one_for_one,
name: test
)
assert Process.whereis(Plausible.Cache.Warmer)
assert_receive {:cache_warmed, %{opts: got_opts}}
assert got_opts[:cache_name] == test
end
test "cache warmer warms periodically with an interval", %{test: test} do
test_pid = self()
opts = [
cache_impl: ExampleCache,
force_start?: true,
warmer_fn: report_back(test_pid),
cache_name: test,
interval: 30
]
{:ok, _} = start_test_warmer(opts)
assert_receive {:cache_warmed, %{at: at1}}, 100
assert_receive {:cache_warmed, %{at: at2}}, 100
assert_receive {:cache_warmed, %{at: at3}}, 100
assert is_integer(at1)
assert is_integer(at2)
assert is_integer(at3)
assert at1 < at2
assert at3 > at2
end
end
defp report_back(test_pid) do
fn opts ->
send(test_pid, {:cache_warmed, %{at: System.monotonic_time(), opts: opts}})
:ok
end
end
defp start_test_warmer(opts) do
child_name_opt = {:child_name, Keyword.fetch!(opts, :cache_name)}
%{start: {m, f, a}} = Cache.Warmer.child_spec([child_name_opt | opts])
apply(m, f, a)
end
defp start_test_cache(cache_name) do
%{start: {m, f, a}} = ExampleCache.child_spec(cache_name: cache_name)
apply(m, f, a)
end
end

View File

@ -18,7 +18,7 @@ defmodule Plausible.PaginationTest do
refute pagination.metadata.before
end
test "limit can be overriden", %{query: q} do
test "limit can be overridden", %{query: q} do
pagination = Pagination.paginate(q, %{"limit" => 3}, cursor_fields: [id: :desc])
assert Enum.count(pagination.entries) == 3

View File

@ -4,29 +4,8 @@ defmodule Plausible.Site.CacheTest do
alias Plausible.{Site, Goal}
alias Plausible.Site.Cache
import ExUnit.CaptureLog
describe "public cache interface" do
test "cache process is started, but falls back to the database if cache is disabled" do
insert(:site, domain: "example.test")
refute Cache.enabled?()
assert Process.alive?(Process.whereis(Cache.name()))
refute Process.whereis(Cache.Warmer)
assert %Site{domain: "example.test", from_cache?: false} = Cache.get("example.test")
assert Cache.size() == 0
refute Cache.get("other.test")
end
test "critical cache errors are logged and nil is returned" do
log =
capture_log(fn ->
assert Cache.get("key", force?: true, cache_name: NonExistingCache) == nil
end)
assert log =~ "Error retrieving domain from 'NonExistingCache': :no_cache"
end
test "cache caches", %{test: test} do
test "cache caches sites", %{test: test} do
{:ok, _} =
Supervisor.start_link([{Cache, [cache_name: test, child_id: :test_cache_caches_id]}],
strategy: :one_for_one,
@ -282,40 +261,90 @@ defmodule Plausible.Site.CacheTest do
end
end
describe "warming the cache" do
test "cache warmer process warms up the cache", %{test: test} do
test_pid = self()
opts = [force_start?: true, warmer_fn: report_back(test_pid), cache_name: test]
describe "merging the cache" do
test "merging adds new items", %{test: test} do
{:ok, _} = start_test_cache(test)
{:ok, _} = Supervisor.start_link([{Cache.Warmer, opts}], strategy: :one_for_one, name: test)
assert Process.whereis(Cache.Warmer)
assert_receive {:cache_warmed, %{opts: got_opts}}
assert got_opts[:cache_name] == test
:ok = Cache.merge_items([{"item1", nil, :item1}], cache_name: test)
assert :item1 == Cache.get("item1", cache_name: test, force?: true)
end
test "cache warmer warms periodically with an interval", %{test: test} do
test_pid = self()
test "merging no new items leaves the old cache intact", %{test: test} do
{:ok, _} = start_test_cache(test)
opts = [
force_start?: true,
warmer_fn: report_back(test_pid),
cache_name: test,
interval: 30
]
:ok = Cache.merge_items([{"item1", nil, :item1}], cache_name: test)
:ok = Cache.merge_items([], cache_name: test)
assert :item1 == Cache.get("item1", cache_name: test, force?: true)
end
{:ok, _} = start_test_warmer(opts)
test "merging removes stale items", %{test: test} do
{:ok, _} = start_test_cache(test)
assert_receive {:cache_warmed, %{at: at1}}, 100
assert_receive {:cache_warmed, %{at: at2}}, 100
assert_receive {:cache_warmed, %{at: at3}}, 100
:ok = Cache.merge_items([{"item1", nil, :item1}], cache_name: test)
:ok = Cache.merge_items([{"item2", nil, :item2}], cache_name: test)
assert is_integer(at1)
assert is_integer(at2)
assert is_integer(at3)
refute Cache.get("item1", cache_name: test, force?: true)
assert Cache.get("item2", cache_name: test, force?: true)
end
assert at1 < at2
assert at3 > at2
test "merging optionally leaves stale items intact", %{test: test} do
{:ok, _} = start_test_cache(test)
:ok = Cache.merge_items([{"item1", nil, :item1}], cache_name: test)
:ok =
Cache.merge_items([{"item2", nil, :item2}], cache_name: test, delete_stale_items?: false)
assert Cache.get("item1", cache_name: test, force?: true)
assert Cache.get("item2", cache_name: test, force?: true)
end
test "merging updates changed items", %{test: test} do
{:ok, _} = start_test_cache(test)
:ok = Cache.merge_items([{"item1", nil, :item1}, {"item2", nil, :item2}], cache_name: test)
:ok =
Cache.merge_items([{"item1", nil, :changed}, {"item2", nil, :item2}], cache_name: test)
assert :changed == Cache.get("item1", cache_name: test, force?: true)
assert :item2 == Cache.get("item2", cache_name: test, force?: true)
end
test "merging keeps secondary keys", %{test: test} do
{:ok, _} = start_test_cache(test)
:ok = Cache.merge_items([{"item1", nil, :item1}], cache_name: test)
:ok = Cache.merge_items([{"item2", "item1", :updated}], cache_name: test)
assert :updated == Cache.get("item1", cache_name: test, force?: true)
assert :updated == Cache.get("item2", cache_name: test, force?: true)
end
@items1 for i <- 1..200_000, do: {i, nil, :batch1}
@items2 for _ <- 1..200_000, do: {Enum.random(1..400_000), nil, :batch2}
@max_seconds 2
@tag :slow
test "merging large sets is expected to be under #{@max_seconds} seconds", %{test: test} do
{:ok, _} = start_test_cache(test)
{t1, :ok} =
:timer.tc(fn ->
:ok = Cache.merge_items(@items1, cache_name: test)
end)
{t2, :ok} =
:timer.tc(fn ->
:ok = Cache.merge_items(@items1, cache_name: test)
end)
{t3, :ok} =
:timer.tc(fn ->
:ok = Cache.merge_items(@items2, cache_name: test)
end)
assert t1 / 1_000_000 <= @max_seconds
assert t2 / 1_000_000 <= @max_seconds
assert t3 / 1_000_000 <= @max_seconds
end
test "deleted sites don't stay in cache on another refresh", %{test: test} do
@ -346,107 +375,11 @@ defmodule Plausible.Site.CacheTest do
end
end
describe "merging the cache" do
test "merging adds new items", %{test: test} do
{:ok, _} = start_test_cache(test)
:ok = Cache.merge([{"item1", nil, :item1}], cache_name: test)
assert :item1 == Cache.get("item1", cache_name: test, force?: true)
end
test "merging no new items leaves the old cache intact", %{test: test} do
{:ok, _} = start_test_cache(test)
:ok = Cache.merge([{"item1", nil, :item1}], cache_name: test)
:ok = Cache.merge([], cache_name: test)
assert :item1 == Cache.get("item1", cache_name: test, force?: true)
end
test "merging removes stale items", %{test: test} do
{:ok, _} = start_test_cache(test)
:ok = Cache.merge([{"item1", nil, :item1}], cache_name: test)
:ok = Cache.merge([{"item2", nil, :item2}], cache_name: test)
refute Cache.get("item1", cache_name: test, force?: true)
assert Cache.get("item2", cache_name: test, force?: true)
end
test "merging optionally leaves stale items intact", %{test: test} do
{:ok, _} = start_test_cache(test)
:ok = Cache.merge([{"item1", nil, :item1}], cache_name: test)
:ok = Cache.merge([{"item2", nil, :item2}], cache_name: test, delete_stale_items?: false)
assert Cache.get("item1", cache_name: test, force?: true)
assert Cache.get("item2", cache_name: test, force?: true)
end
test "merging updates changed items", %{test: test} do
{:ok, _} = start_test_cache(test)
:ok = Cache.merge([{"item1", nil, :item1}, {"item2", nil, :item2}], cache_name: test)
:ok = Cache.merge([{"item1", nil, :changed}, {"item2", nil, :item2}], cache_name: test)
assert :changed == Cache.get("item1", cache_name: test, force?: true)
assert :item2 == Cache.get("item2", cache_name: test, force?: true)
end
test "merging keeps secondary keys", %{test: test} do
{:ok, _} = start_test_cache(test)
:ok = Cache.merge([{"item1", nil, :item1}], cache_name: test)
:ok = Cache.merge([{"item2", "item1", :updated}], cache_name: test)
assert :updated == Cache.get("item1", cache_name: test, force?: true)
assert :updated == Cache.get("item2", cache_name: test, force?: true)
end
@items1 for i <- 1..200_000, do: {i, nil, :batch1}
@items2 for _ <- 1..200_000, do: {Enum.random(1..400_000), nil, :batch2}
@max_seconds 2
@tag :slow
test "merging large sets is expected to be under #{@max_seconds} seconds", %{test: test} do
{:ok, _} = start_test_cache(test)
{t1, :ok} =
:timer.tc(fn ->
:ok = Cache.merge(@items1, cache_name: test)
end)
{t2, :ok} =
:timer.tc(fn ->
:ok = Cache.merge(@items1, cache_name: test)
end)
{t3, :ok} =
:timer.tc(fn ->
:ok = Cache.merge(@items2, cache_name: test)
end)
assert t1 / 1_000_000 <= @max_seconds
assert t2 / 1_000_000 <= @max_seconds
assert t3 / 1_000_000 <= @max_seconds
end
end
defp report_back(test_pid) do
fn opts ->
send(test_pid, {:cache_warmed, %{at: System.monotonic_time(), opts: opts}})
:ok
end
end
defp start_test_cache(cache_name) do
%{start: {m, f, a}} = Cache.child_spec(cache_name: cache_name)
apply(m, f, a)
end
defp start_test_warmer(opts) do
child_name_opt = {:child_name, Keyword.fetch!(opts, :cache_name)}
%{start: {m, f, a}} = Cache.Warmer.child_spec([child_name_opt | opts])
apply(m, f, a)
end
defp start_test_cache_with_telemetry_handler(test, event: event) do
{:ok, _} = start_test_cache(test)
test_pid = self()