Use new Session.CacheStore in favour of Session.Store (#1934)

* Remove Session.Store in favour of Session.CacheStore

* Add CHANGELOG entry

* Use appropriate enum function
This commit is contained in:
Uku Taht 2022-06-06 10:44:33 +03:00 committed by GitHub
parent ae61aecfd0
commit 3e5695408a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 57 additions and 260 deletions

View File

@ -40,6 +40,7 @@ All notable changes to this project will be documented in this file.
- Paginate /api/sites results and add a `View all` link to the site-switcher dropdown in the dashboard.
- Remove the `+ Add Site` link to the site-switcher dropdown in the dashboard.
- `DISABLE_REGISTRATIONS` configuration parameter can now accept `invite_only` to allow invited users to register an account while keeping regular registrations disabled plausible/analytics#1841
- New and improved Session tracking module for higher throughput and lower latency. [PR#1934](https://github.com/plausible/analytics#1934)
## v1.4.1

View File

@ -13,7 +13,6 @@ defmodule Plausible.Application do
Plausible.Session.Salts,
Plausible.Event.WriteBuffer,
Plausible.Session.WriteBuffer,
Plausible.Session.Store,
ReferrerBlocklist,
Supervisor.child_spec({Cachex, name: :user_agents, limit: 1000}, id: :cachex_user_agents),
Supervisor.child_spec({Cachex, name: :sessions, limit: nil}, id: :cachex_sessions),

View File

@ -2,18 +2,17 @@ defmodule Plausible.Session.CacheStore do
require Logger
alias Plausible.Session.WriteBuffer
def on_event(event, prev_user_id) do
found_session =
find_session(event.domain, event.user_id) || find_session(event.domain, prev_user_id)
def on_event(event, prev_user_id, buffer \\ WriteBuffer) do
found_session = find_session(event, event.user_id) || find_session(event, prev_user_id)
session =
if found_session do
updated_session = update_session(found_session, event)
WriteBuffer.insert([%{updated_session | sign: 1}, %{found_session | sign: -1}])
buffer.insert([%{updated_session | sign: 1}, %{found_session | sign: -1}])
persist_session(updated_session)
else
new_session = new_session_from_event(event)
WriteBuffer.insert([new_session])
buffer.insert([new_session])
persist_session(new_session)
end
@ -22,10 +21,15 @@ defmodule Plausible.Session.CacheStore do
defp find_session(_domain, nil), do: nil
defp find_session(domain, user_id) do
case Cachex.get(:sessions, {domain, user_id}) do
{:ok, val} ->
val
defp find_session(event, user_id) do
case Cachex.get(:sessions, {event.domain, user_id}) do
{:ok, nil} ->
nil
{:ok, session} ->
if Timex.diff(event.timestamp, session.timestamp, :minutes) <= 30 do
session
end
{:error, e} ->
Sentry.capture_message("Cachex error", extra: %{error: e})
@ -118,7 +122,9 @@ defmodule Plausible.Session.CacheStore do
browser: event.browser,
browser_version: event.browser_version,
timestamp: event.timestamp,
start: event.timestamp
start: event.timestamp,
"entry_meta.key": Map.get(event, :"meta.key"),
"entry_meta.value": Map.get(event, :"meta.value")
}
end
end

View File

@ -1,200 +0,0 @@
defmodule Plausible.Session.Store do
use GenServer
use Plausible.Repo
require Logger
@garbage_collect_interval_milliseconds 60 * 1000
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(opts) do
buffer = Keyword.get(opts, :buffer, Plausible.Session.WriteBuffer)
timer = Process.send_after(self(), :garbage_collect, @garbage_collect_interval_milliseconds)
{:ok, %{timer: timer, sessions: %{}, buffer: buffer}}
end
def on_event(event, prev_user_id, pid \\ __MODULE__) do
GenServer.call(pid, {:on_event, event, prev_user_id})
end
def handle_call(
{:on_event, event, prev_user_id},
_from,
%{sessions: sessions, buffer: buffer} = state
) do
session_key = {event.domain, event.user_id}
found_session =
sessions[session_key] || (prev_user_id && sessions[{event.domain, prev_user_id}])
active = is_active?(found_session, event)
updated_sessions =
cond do
found_session && active ->
new_session = update_session(found_session, event)
buffer.insert([%{new_session | sign: 1}, %{found_session | sign: -1}])
Map.put(sessions, session_key, new_session)
found_session && !active ->
new_session = new_session_from_event(event)
buffer.insert([new_session])
Map.put(sessions, session_key, new_session)
true ->
new_session = new_session_from_event(event)
buffer.insert([new_session])
Map.put(sessions, session_key, new_session)
end
session_id = updated_sessions[session_key].session_id
{:reply, session_id, %{state | sessions: updated_sessions}}
end
def reconcile_event(sessions, event) do
session_key = {event.domain, event.user_id}
found_session = sessions[session_key]
active = is_active?(found_session, event)
updated_sessions =
cond do
found_session && active ->
new_session = update_session(found_session, event)
Map.put(sessions, session_key, new_session)
found_session && !active ->
new_session = new_session_from_event(event)
Map.put(sessions, session_key, new_session)
true ->
new_session = new_session_from_event(event)
Map.put(sessions, session_key, new_session)
end
updated_sessions
end
defp is_active?(session, event) do
session && Timex.diff(event.timestamp, session.timestamp, :second) < session_length_seconds()
end
defp update_session(session, event) do
%{
session
| user_id: event.user_id,
timestamp: event.timestamp,
exit_page: event.pathname,
is_bounce: false,
duration: Timex.diff(event.timestamp, session.start, :second) |> abs,
pageviews:
if(event.name == "pageview", do: session.pageviews + 1, else: session.pageviews),
country_code:
if(session.country_code == "", do: event.country_code, else: session.country_code),
subdivision1_code:
if(session.subdivision1_code == "",
do: event.subdivision1_code,
else: session.subdivision1_code
),
subdivision2_code:
if(session.subdivision2_code == "",
do: event.subdivision2_code,
else: session.subdivision2_code
),
city_geoname_id:
if(session.city_geoname_id == 0,
do: event.city_geoname_id,
else: session.city_geoname_id
),
operating_system:
if(session.operating_system == "",
do: event.operating_system,
else: session.operating_system
),
operating_system_version:
if(session.operating_system_version == "",
do: event.operating_system_version,
else: session.operating_system_version
),
browser: if(session.browser == "", do: event.browser, else: session.browser),
browser_version:
if(session.browser_version == "",
do: event.browser_version,
else: session.browser_version
),
screen_size:
if(session.screen_size == "", do: event.screen_size, else: session.screen_size),
events: session.events + 1
}
end
defp new_session_from_event(event) do
%Plausible.ClickhouseSession{
sign: 1,
session_id: Plausible.ClickhouseSession.random_uint64(),
hostname: event.hostname,
domain: event.domain,
user_id: event.user_id,
entry_page: event.pathname,
exit_page: event.pathname,
is_bounce: true,
duration: 0,
pageviews: if(event.name == "pageview", do: 1, else: 0),
events: 1,
referrer: event.referrer,
referrer_source: event.referrer_source,
utm_medium: event.utm_medium,
utm_source: event.utm_source,
utm_campaign: event.utm_campaign,
utm_content: event.utm_content,
utm_term: event.utm_term,
country_code: event.country_code,
subdivision1_code: event.subdivision1_code,
subdivision2_code: event.subdivision2_code,
city_geoname_id: event.city_geoname_id,
screen_size: event.screen_size,
operating_system: event.operating_system,
operating_system_version: event.operating_system_version,
browser: event.browser,
browser_version: event.browser_version,
timestamp: event.timestamp,
start: event.timestamp,
"entry_meta.key": Map.get(event, :"meta.key"),
"entry_meta.value": Map.get(event, :"meta.value")
}
end
def handle_info(:garbage_collect, state) do
Logger.debug("Session store collecting garbage")
now = Timex.now()
new_sessions =
Enum.reduce(state[:sessions], %{}, fn {key, session}, acc ->
if Timex.diff(now, session.timestamp, :second) <= forget_session_after() do
Map.put(acc, key, session)
else
# forget the session
acc
end
end)
Process.cancel_timer(state[:timer])
new_timer =
Process.send_after(self(), :garbage_collect, @garbage_collect_interval_milliseconds)
Logger.debug(fn ->
n_old = Enum.count(state[:sessions])
n_new = Enum.count(new_sessions)
"Removed #{n_old - n_new} sessions from store"
end)
{:noreply, %{state | sessions: new_sessions, timer: new_timer}}
end
defp session_length_seconds(), do: Application.get_env(:plausible, :session_length_minutes) * 60
defp forget_session_after(), do: session_length_seconds() * 2
end

View File

@ -17,13 +17,18 @@ defmodule Plausible.Session.WriteBuffer do
{:ok, sessions}
end
def flush() do
GenServer.call(__MODULE__, :flush, :infinity)
:ok
end
def handle_cast({:insert, sessions}, %{buffer: buffer} = state) do
new_buffer = sessions ++ buffer
if length(new_buffer) >= max_buffer_size() do
Logger.info("Buffer full, flushing to disk")
Process.cancel_timer(state[:timer])
flush(new_buffer)
do_flush(new_buffer)
new_timer = Process.send_after(self(), :tick, flush_interval_ms())
{:noreply, %{buffer: [], timer: new_timer}}
else
@ -32,17 +37,24 @@ defmodule Plausible.Session.WriteBuffer do
end
def handle_info(:tick, %{buffer: buffer}) do
flush(buffer)
do_flush(buffer)
timer = Process.send_after(self(), :tick, flush_interval_ms())
{:noreply, %{buffer: [], timer: timer}}
end
def terminate(_reason, %{buffer: buffer}) do
Logger.info("Flushing session buffer before shutdown...")
flush(buffer)
def handle_call(:flush, _from, %{buffer: buffer} = state) do
Process.cancel_timer(state[:timer])
do_flush(buffer)
new_timer = Process.send_after(self(), :tick, flush_interval_ms())
{:reply, nil, %{buffer: [], timer: new_timer}}
end
defp flush(buffer) do
def terminate(_reason, %{buffer: buffer}) do
Logger.info("Flushing session buffer before shutdown...")
do_flush(buffer)
end
defp do_flush(buffer) do
case buffer do
[] ->
nil

View File

@ -178,14 +178,8 @@ defmodule PlausibleWeb.Api.ExternalController do
event = Ecto.Changeset.apply_changes(changeset)
session_id =
if FunWithFlags.enabled?(:cache_store, for: "domain:" <> domain) do
Tracer.with_span "cache_store_event" do
Plausible.Session.CacheStore.on_event(event, previous_user_id)
end
else
Tracer.with_span "store_event" do
Plausible.Session.Store.on_event(event, previous_user_id)
end
Tracer.with_span "cache_store_event" do
Plausible.Session.CacheStore.on_event(event, previous_user_id)
end
event

View File

@ -1,18 +1,17 @@
defmodule Plausible.Session.StoreTest do
defmodule Plausible.Session.CacheStoreTest do
use Plausible.DataCase
import Double
alias Plausible.Session.{Store, WriteBuffer}
alias Plausible.Session.{CacheStore, WriteBuffer}
setup do
buffer =
WriteBuffer
|> stub(:insert, fn _sessions -> nil end)
{:ok, store} = GenServer.start_link(Store, buffer: buffer)
[store: store, buffer: buffer]
[buffer: buffer]
end
test "creates a session from an event", %{store: store} do
test "creates a session from an event", %{buffer: buffer} do
event =
build(:event,
name: "pageview",
@ -33,7 +32,7 @@ defmodule Plausible.Session.StoreTest do
"meta.value": ["true", "false"]
)
Store.on_event(event, nil, store)
CacheStore.on_event(event, nil, buffer)
assert_receive({WriteBuffer, :insert, [sessions]})
assert [session] = sessions
@ -65,7 +64,7 @@ defmodule Plausible.Session.StoreTest do
# assert Map.get(session, :"entry.meta.value") == ["true", "false"]
end
test "updates a session", %{store: store} do
test "updates a session", %{buffer: buffer} do
timestamp = Timex.now()
event1 = build(:event, name: "pageview", timestamp: timestamp |> Timex.shift(seconds: -10))
@ -86,8 +85,8 @@ defmodule Plausible.Session.StoreTest do
browser_version: "10"
)
Store.on_event(event1, nil, store)
Store.on_event(event2, nil, store)
CacheStore.on_event(event1, nil, buffer)
CacheStore.on_event(event2, nil, buffer)
assert_receive({WriteBuffer, :insert, [[session, _negative_record]]})
assert session.is_bounce == false
assert session.duration == 10
@ -104,7 +103,7 @@ defmodule Plausible.Session.StoreTest do
assert session.screen_size == "Desktop"
end
test "calculates duration correctly for out-of-order events", %{store: store} do
test "calculates duration correctly for out-of-order events", %{buffer: buffer} do
timestamp = Timex.now()
event1 = build(:event, name: "pageview", timestamp: timestamp |> Timex.shift(seconds: 10))
@ -116,8 +115,8 @@ defmodule Plausible.Session.StoreTest do
timestamp: timestamp
)
Store.on_event(event1, nil, store)
Store.on_event(event2, nil, store)
CacheStore.on_event(event1, nil, buffer)
CacheStore.on_event(event2, nil, buffer)
assert_receive({WriteBuffer, :insert, [[session, _negative_record]]})
assert session.duration == 10

View File

@ -645,10 +645,7 @@ defmodule PlausibleWeb.Api.StatsController.PagesTest do
pathname: "/page2",
user_id: @user_id,
timestamp: ~N[2021-01-01 00:15:00]
)
])
populate_stats(site, [
),
build(:pageview,
pathname: "/page2",
user_id: @user_id,

View File

@ -120,23 +120,17 @@ defmodule Plausible.TestUtils do
defp populate_native_stats(events) do
sessions =
Enum.reduce(events, %{}, fn event, sessions ->
Plausible.Session.Store.reconcile_event(sessions, event)
session_id = Plausible.Session.CacheStore.on_event(event, nil)
Map.put(sessions, {event.domain, event.user_id}, session_id)
end)
events =
Enum.map(events, fn event ->
Map.put(event, :session_id, sessions[{event.domain, event.user_id}].session_id)
end)
Enum.each(events, fn event ->
event = Map.put(event, :session_id, sessions[{event.domain, event.user_id}])
Plausible.Event.WriteBuffer.insert(event)
end)
Plausible.ClickhouseRepo.insert_all(
Plausible.ClickhouseEvent,
Enum.map(events, &schema_to_map/1)
)
Plausible.ClickhouseRepo.insert_all(
Plausible.ClickhouseSession,
Enum.map(Map.values(sessions), &schema_to_map/1)
)
Plausible.Session.WriteBuffer.flush()
Plausible.Event.WriteBuffer.flush()
end
defp populate_imported_stats(events) do
@ -149,9 +143,4 @@ defmodule Plausible.TestUtils do
|> Timex.shift(shifts)
|> NaiveDateTime.truncate(:second)
end
defp schema_to_map(schema) do
Map.from_struct(schema)
|> Map.delete(:__meta__)
end
end