Add batched SQL for insert/update operations (#306)

This introduce a new way of doing "migration" of data based on the
operation table. Before, we simply executed the SQL query as the
migration were run. Like for a "new" operation, we did an insert of a
"transactions" row and an update to the "operations" table _for each
entry in the synced file._ For a file containing 1000 keys, we did a
total of 2000 SQL queries.

After this commit, we only do 2 SQL queries (well 4 since we batch the
SQL params by 500). By aggregating operation like {:update_all,
operation.id, %{synced: true}}, we are able to batch the operations
efficiently.

This results in a much faster sync/add translations/correct/uncorrect
operations.
This commit is contained in:
Simon Prévost 2022-08-29 07:50:55 -04:00 committed by GitHub
parent a492e834b1
commit 3325e5e9d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 287 additions and 95 deletions

View File

@ -15,6 +15,7 @@ defmodule Accent.UserRemote.Persister do
alias Accent.AuthProvider
alias Accent.Repo
alias Accent.User, as: RepoUser
alias Accent.UserRemote.TokenGiver
alias Accent.UserRemote.User, as: FetchedUser
alias Ecto.Changeset
@ -44,7 +45,12 @@ defmodule Accent.UserRemote.Persister do
end
defp create_provider(user, name, uid), do: Repo.insert!(%AuthProvider{name: name, uid: uid, user_id: user.id})
defp create_user(fetched_user), do: Repo.insert!(%RepoUser{email: fetched_user.email, fullname: fetched_user.fullname, picture_url: fetched_user.picture_url})
defp create_user(fetched_user) do
user = Repo.insert!(%RepoUser{email: fetched_user.email, fullname: fetched_user.fullname, picture_url: fetched_user.picture_url})
TokenGiver.grant_global_token(user)
user
end
defp update_user(user, fetched_user) do
user

View File

@ -7,6 +7,14 @@ defmodule Accent.UserRemote.TokenGiver do
create_token(user)
end
def grant_global_token(user) do
Repo.insert!(%Accent.AccessToken{
user_id: user.id,
global: true,
token: Accent.Utils.SecureRandom.urlsafe_base64(70)
})
end
defp invalidate_tokens(user) do
user
|> Ecto.assoc(:private_access_tokens)

View File

@ -20,7 +20,9 @@ defmodule Accent.Operation do
:version_id,
:project_id,
:stats,
:previous_translation
:previous_translation,
:inserted_at,
:updated_at
]
schema "operations" do

View File

@ -40,7 +40,13 @@ defmodule Accent.GraphQL.Resolvers.Translation do
|> (&fn -> BasePersister.execute(&1) end).()
|> Repo.transaction()
|> case do
{:ok, {_context, [translation]}} ->
{:ok, {_context, _}} ->
translation =
Translation
|> Query.where(id: ^translation.id)
|> Repo.one()
|> Repo.preload(:revision)
{:ok, %{translation: translation, errors: nil}}
{:error, _reason} ->
@ -57,7 +63,13 @@ defmodule Accent.GraphQL.Resolvers.Translation do
|> (&fn -> BasePersister.execute(&1) end).()
|> Repo.transaction()
|> case do
{:ok, {_context, [translation]}} ->
{:ok, {_context, _}} ->
translation =
Translation
|> Query.where(id: ^translation.id)
|> Repo.one()
|> Repo.preload(:revision)
{:ok, %{translation: translation, errors: nil}}
{:error, _reason} ->

View File

@ -32,6 +32,7 @@ defmodule Movement.Builders.NewSlave do
value_type: translation.value_type,
plural: translation.plural,
locked: translation.locked,
translation_id: translation.id,
placeholders: translation.placeholders,
options: assigns.new_slave_options
})

View File

@ -1,18 +1,11 @@
defmodule Movement.EctoMigrationHelper do
alias Accent.Repo
alias Movement.Migration
def update(struct, params), do: {:update, {struct, params}}
@doc """
Update given model by merging the existing parameters and the arguments.
"""
@spec update(model :: map, params :: map()) :: Migration.t()
def update(model, params) do
model
|> model.__struct__.changeset(params)
|> Repo.update!()
end
def insert(struct), do: {:insert, struct}
def insert(model) do
Repo.insert!(model)
end
def insert_all(schema, struct), do: {:insert_all, {schema, struct}}
def update_all_dynamic(struct, types, fields, values), do: {:update_all_dynamic, {struct.__struct__, struct.id, types, fields, values}}
def update_all(struct, params), do: {:update_all, {struct.__struct__, struct.id, params}}
end

View File

@ -26,6 +26,7 @@ defmodule Movement.EntriesCommitProcessor do
value_type: entry.value_type,
plural: entry.plural,
locked: entry.locked,
translation_id: current_translation && current_translation.id,
revision_id: Map.get(assigns[:revision], :id),
version_id: assigns[:version] && Map.get(assigns[:version], :id),
placeholders: entry.placeholders

View File

@ -15,6 +15,7 @@ defmodule Movement.Mappers.Operation do
revision_id: Map.get(suggested_translation, :revision_id),
document_id: Map.get(suggested_translation, :document_id),
version_id: Map.get(suggested_translation, :version_id),
translation_id: Map.get(suggested_translation, :translation_id),
previous_translation: PreviousTranslation.from_translation(current_translation),
placeholders: suggested_translation.placeholders
}

View File

@ -1,5 +1,5 @@
defmodule Movement.Migration do
@type t :: map
@callback call(atom, map) :: t()
@callback call(atom, map) :: [t()] | t()
end

View File

@ -6,17 +6,21 @@ defmodule Movement.Migration.Conflict do
def call(:correct, operation) do
Accent.OperationBatcher.batch(operation)
update(operation.translation, %{
corrected_text: operation.text,
conflicted: false
})
update_all_dynamic(
operation.translation,
[:text, :boolean],
[:corrected_text, :conflicted],
[operation.text, false]
)
end
def call(:uncorrect, operation) do
update(operation.translation, %{
conflicted_text: operation.previous_translation && operation.previous_translation.conflicted_text,
conflicted: true
})
update_all_dynamic(
operation.translation,
[:text, :boolean],
[:conflicted_text, :conflicted],
[operation.previous_translation && operation.previous_translation.conflicted_text, true]
)
end
def call(:on_corrected, operation) do

View File

@ -4,17 +4,23 @@ defmodule Movement.Migration.Rollback do
import Movement.EctoMigrationHelper
def call(:new, operation) do
update(operation, %{rollbacked: true})
update(operation.translation, %{removed: true})
[
update_all(operation, %{rollbacked: true}),
update_all(operation.translation, %{removed: true})
]
end
def call(:remove, operation) do
update(operation, %{rollbacked: true})
update(operation.translation, %{removed: false})
[
update_all(operation, %{rollbacked: true}),
update_all(operation.translation, %{removed: false})
]
end
def call(:restore, operation) do
update(operation, %{rollbacked: true})
update(operation.translation, Map.from_struct(operation.previous_translation))
[
update_all(operation, %{rollbacked: true}),
update(operation.translation, Map.from_struct(operation.previous_translation))
]
end
end

View File

@ -4,10 +4,10 @@ defmodule Movement.Migration.Translation do
import Movement.EctoMigrationHelper
alias Accent.{Operation, Translation}
alias Ecto.UUID
def call(:update_proposed, operation) do
operation.translation
|> update(%{
update(operation.translation, %{
proposed_text: operation.text
})
end
@ -15,8 +15,7 @@ defmodule Movement.Migration.Translation do
def call(:update, operation) do
Accent.OperationBatcher.batch(operation)
operation.translation
|> update(%{
update(operation.translation, %{
value_type: operation.value_type,
corrected_text: operation.text,
conflicted_text: operation.previous_translation && operation.previous_translation.corrected_text,
@ -25,25 +24,25 @@ defmodule Movement.Migration.Translation do
end
def call(:remove, operation) do
update(operation.translation, %{removed: true})
update_all(operation.translation, %{removed: true})
end
def call(:renew, operation) do
new_translation = %{
proposed_text: operation.text,
corrected_text: operation.text,
conflicted: true,
removed: false
}
update(operation, %{rollbacked: false})
update(operation.translation, new_translation)
[
update_all(operation, %{rollbacked: false}),
update_all_dynamic(
operation.translation,
[:text, :text, :boolean, :boolean],
[:proposed_text, :corrected_text, :conflicted, :removed],
[operation.text, operation.text, true, false]
)
]
end
def call(:new, operation) do
id = Ecto.UUID.generate()
id = UUID.generate()
translation = %Translation{
translation = %{
id: id,
key: operation.key,
proposed_text: operation.text,
@ -58,17 +57,22 @@ defmodule Movement.Migration.Translation do
revision_id: operation.revision_id,
document_id: operation.document_id,
version_id: operation.version_id,
placeholders: operation.placeholders
source_translation_id: operation.translation_id,
placeholders: operation.placeholders,
inserted_at: DateTime.utc_now(),
updated_at: DateTime.utc_now()
}
insert(translation)
update(operation, %{translation_id: id})
[
insert_all(Translation, translation),
update_all_dynamic(operation, [:uuid], [:translation_id], [UUID.dump!(id)])
]
end
def call(:version_new, operation) do
id = Ecto.UUID.generate()
id = UUID.generate()
translation = %Translation{
translation = %{
id: id,
key: operation.key,
proposed_text: operation.text,
@ -82,17 +86,23 @@ defmodule Movement.Migration.Translation do
document_id: operation.document_id,
version_id: operation.version_id,
source_translation_id: operation.translation_id,
placeholders: operation.placeholders
placeholders: operation.placeholders,
inserted_at: DateTime.utc_now(),
updated_at: DateTime.utc_now()
}
version_operation = Operation.copy(operation, %{action: "add_to_version", translation_id: id})
insert(translation)
insert(version_operation)
[
insert_all(Translation, translation),
insert(version_operation)
]
end
def call(:restore, operation) do
update(operation, %{rollbacked: false})
update(operation.translation, Map.from_struct(operation.previous_translation))
[
update_all(operation, %{rollbacked: false}),
update(operation.translation, Map.from_struct(operation.previous_translation))
]
end
end

View File

@ -16,35 +16,102 @@ defmodule Movement.Migrator do
operation is the same operation object passed to `Migrator.up/2`.
"""
# Inserts operations by batch of 500 to prevent parameters
# overflow in database adapter
@operations_chunk 500
require Ecto.Query
alias Accent.Repo
alias Movement.Migration.{Conflict, Rollback, Translation}
def up(%{action: "noop"}), do: nil
def up(%{action: "autocorrect"}), do: nil
def up(operation = %{action: "correct_conflict"}), do: Conflict.call(:correct, operation)
def up(operation = %{action: "uncorrect_conflict"}), do: Conflict.call(:uncorrect, operation)
def up(operation = %{action: "conflict_on_proposed"}), do: Conflict.call(:on_proposed, operation)
def up(operation = %{action: "merge_on_proposed"}), do: Conflict.call(:on_proposed, operation)
def up(operation = %{action: "merge_on_proposed_force"}), do: Conflict.call(:on_proposed, operation)
def up(operation = %{action: "merge_on_corrected_force"}), do: Conflict.call(:on_proposed, operation)
def up(operation = %{action: "conflict_on_slave"}), do: Conflict.call(:on_slave, operation)
def up(operation = %{action: "conflict_on_corrected"}), do: Conflict.call(:on_corrected, operation)
def up(operation = %{action: "merge_on_corrected"}), do: Conflict.call(:on_corrected, operation)
def up(operation = %{action: "remove"}), do: Translation.call(:remove, operation)
def up(operation = %{action: "update"}), do: Translation.call(:update, operation)
def up(operation = %{action: "update_proposed"}), do: Translation.call(:update_proposed, operation)
def up(operation = %{action: "version_new"}), do: Translation.call(:version_new, operation)
def up(operation = %{action: "new"}), do: Translation.call(:new, operation)
def up(operation = %{action: "renew"}), do: Translation.call(:renew, operation)
def up(operation = %{action: "rollback"}), do: Translation.call(:restore, operation)
def down(operations), do: persist(Enum.map(List.wrap(operations), &do_down/1))
def up(operations), do: persist(Enum.map(List.wrap(operations), &do_up/1))
def up(operations) when is_list(operations), do: Enum.map(operations, &up/1)
defp do_up(%{action: "noop"}), do: []
defp do_up(%{action: "autocorrect"}), do: []
defp do_up(operation = %{action: "correct_conflict"}), do: Conflict.call(:correct, operation)
defp do_up(operation = %{action: "uncorrect_conflict"}), do: Conflict.call(:uncorrect, operation)
defp do_up(operation = %{action: "conflict_on_proposed"}), do: Conflict.call(:on_proposed, operation)
defp do_up(operation = %{action: "merge_on_proposed"}), do: Conflict.call(:on_proposed, operation)
defp do_up(operation = %{action: "merge_on_proposed_force"}), do: Conflict.call(:on_proposed, operation)
defp do_up(operation = %{action: "merge_on_corrected_force"}), do: Conflict.call(:on_proposed, operation)
defp do_up(operation = %{action: "conflict_on_slave"}), do: Conflict.call(:on_slave, operation)
defp do_up(operation = %{action: "conflict_on_corrected"}), do: Conflict.call(:on_corrected, operation)
defp do_up(operation = %{action: "merge_on_corrected"}), do: Conflict.call(:on_corrected, operation)
defp do_up(operation = %{action: "remove"}), do: Translation.call(:remove, operation)
defp do_up(operation = %{action: "update"}), do: Translation.call(:update, operation)
defp do_up(operation = %{action: "update_proposed"}), do: Translation.call(:update_proposed, operation)
defp do_up(operation = %{action: "version_new"}), do: Translation.call(:version_new, operation)
defp do_up(operation = %{action: "new"}), do: Translation.call(:new, operation)
defp do_up(operation = %{action: "renew"}), do: Translation.call(:renew, operation)
defp do_up(operation = %{action: "rollback"}), do: Translation.call(:restore, operation)
def down(%{action: "noop"}), do: nil
def down(%{action: "autocorrect"}), do: nil
def down(operation = %{action: "new"}), do: Rollback.call(:new, operation)
def down(operation = %{action: "renew"}), do: Rollback.call(:new, operation)
def down(operation = %{action: "remove"}), do: Rollback.call(:remove, operation)
def down(operation = %{action: _}), do: Rollback.call(:restore, operation)
defp do_down(%{action: "noop"}), do: []
defp do_down(%{action: "autocorrect"}), do: []
defp do_down(operation = %{action: "new"}), do: Rollback.call(:new, operation)
defp do_down(operation = %{action: "renew"}), do: Rollback.call(:new, operation)
defp do_down(operation = %{action: "remove"}), do: Rollback.call(:remove, operation)
defp do_down(operation = %{action: _}), do: Rollback.call(:restore, operation)
def down(operations) when is_list(operations), do: Enum.map(operations, &down/1)
defp persist(operations) do
operations = List.flatten(operations)
actions = Enum.group_by(operations, fn {action, _payload} -> action end, &elem(&1, 1))
results = []
results = results ++ migrate_insert_all_operations(Map.get(actions, :insert_all, []))
results = results ++ migrate_update_all_operations(Map.get(actions, :update_all, []))
results = results ++ migrate_update_all_dynamic_operations(Map.get(actions, :update_all_dynamic, []))
results = results ++ migrate_insert_operations(Map.get(actions, :insert, []))
results = results ++ migrate_update_operations(Map.get(actions, :update, []))
results
end
defp migrate_insert_operations(operations) do
Enum.map(operations, &Repo.insert!/1)
end
defp migrate_update_operations(operations) do
Enum.map(operations, fn {struct, params} ->
struct
|> struct.__struct__.changeset(params)
|> Repo.update!()
end)
end
defp migrate_insert_all_operations(operations) do
operations
|> Enum.group_by(fn {schema, _payload} -> schema end, &elem(&1, 1))
|> Enum.map(fn {schema, records} ->
records
|> Enum.chunk_every(@operations_chunk)
|> Enum.map(&Repo.insert_all(schema, &1))
end)
end
defp migrate_update_all_operations(operations) do
operations
|> Enum.group_by(fn {schema, _struct_id, params} -> {schema, params} end, &elem(&1, 1))
|> Enum.map(fn {{schema, params}, record_ids} ->
record_ids
|> Enum.chunk_every(@operations_chunk)
|> Enum.map(fn ids ->
query = Ecto.Query.from(entries in schema, where: entries.id in ^ids)
Repo.update_all(query, set: Map.to_list(params))
end)
end)
end
defp migrate_update_all_dynamic_operations(operations) do
operations
|> Enum.group_by(fn {schema, _struct_id, types, fields, _values} -> {schema, types, fields} end, &{elem(&1, 1), elem(&1, 4)})
|> Enum.map(fn {{schema, types, fields}, records} ->
records
|> Enum.chunk_every(@operations_chunk)
|> Enum.map(fn records ->
Movement.Persisters.OperationsUpdateAllDynamic.update({{schema, types, fields}, records})
end)
end)
end
end

View File

@ -4,7 +4,7 @@ defmodule Movement.Operation do
text: nil,
file_comment: nil,
file_index: 0,
value_type: nil,
value_type: "string",
plural: false,
locked: false,
batch: false,

View File

@ -96,7 +96,7 @@ defmodule Movement.Persisters.Base do
defp migrate_down_operations(context = %Movement.Context{assigns: %{operation: operation}}) do
operation = Repo.preload(operation, :translation)
{context, Migrator.down(operation)}
{context, Migrator.down([operation])}
end
defp assign_project(operation, nil), do: operation

View File

@ -0,0 +1,79 @@
defmodule Movement.Persisters.OperationsUpdateAllDynamic do
import Ecto.Query
alias Accent.Repo
@uuid_fragment "SELECT * FROM unnest(?::uuid[], ?::uuid[]) AS t(a, b)"
@text_text_bool_bool_fragment "SELECT * FROM unnest(?::uuid[], ?::text[], ?::text[], ?::boolean[], ?::boolean[]) AS t(a, b, c, d, e)"
@text_bool_fragment "SELECT * FROM unnest(?::uuid[], ?::text[], ?::boolean[]) AS t(a, b, c)"
def update({{schema, [:text, :text, :boolean, :boolean], fields}, records}) do
[bind_1, bind_2, bind_3, bind_4] = values_binding(records, fields)
update_all(
from(entries in schema,
join:
values_list in fragment(
@text_text_bool_bool_fragment,
^ids_binding(records),
^bind_1,
^bind_2,
^bind_3,
^bind_4
),
on: values_list.a == entries.id,
update: [
set: [{^Enum.at(fields, 0), values_list.b}, {^Enum.at(fields, 1), values_list.c}, {^Enum.at(fields, 2), values_list.d}, {^Enum.at(fields, 3), values_list.e}]
]
)
)
end
def update({{schema, [:uuid], fields}, records}) do
[bind_1] = values_binding(records, fields)
update_all(
from(entries in schema,
join:
values_list in fragment(
@uuid_fragment,
^ids_binding(records),
^bind_1
),
on: values_list.a == entries.id,
update: [set: [{^Enum.at(fields, 0), values_list.b}]]
)
)
end
def update({{schema, [:text, :boolean], fields}, records}) do
[bind_1, bind_2] = values_binding(records, fields)
update_all(
from(entries in schema,
join:
values_list in fragment(
@text_bool_fragment,
^ids_binding(records),
^bind_1,
^bind_2
),
on: values_list.a == entries.id,
update: [set: [{^Enum.at(fields, 0), values_list.b}, {^Enum.at(fields, 1), values_list.c}]]
)
)
end
defp update_all(query) do
Repo.update_all(query, [])
end
defp ids_binding(records) do
Enum.map(records, &Ecto.UUID.dump!(elem(&1, 0)))
end
defp values_binding(records, fields) do
range = 0..(length(fields) - 1)
for i <- range, do: Enum.map(records, &Enum.at(elem(&1, 1), i))
end
end

View File

@ -1,4 +1,4 @@
defmodule Movement.SuggestedTranslation do
@enforce_keys ~w(text)a
defstruct ~w(text key file_comment file_index value_type revision_id version_id plural locked placeholders)a
defstruct ~w(text key file_comment file_index value_type revision_id translation_id version_id plural locked placeholders)a
end

View File

@ -12,8 +12,7 @@ defmodule Accent.Utils.SecureRandom do
def urlsafe_base64(length) do
length
|> :crypto.strong_rand_bytes()
|> :base64.encode_to_string()
|> to_string
|> :base64.encode()
|> String.replace(~r/[\n\=]/, "")
|> String.replace(~r/\+/, "-")
|> String.replace(~r/\//, "_")

View File

@ -11,7 +11,7 @@ defmodule AccentTest.Migrator.Down do
alias Movement.Migrator
test ":noop" do
assert nil == Migrator.down(%{action: "noop"})
assert [] === Migrator.down(%{action: "noop"})
end
test ":conflict_on_corrected" do

View File

@ -13,7 +13,7 @@ defmodule AccentTest.Movement.Migrator.Up do
}
test ":noop" do
assert nil == Migrator.up(%{action: "noop"})
assert [] === Migrator.up(%{action: "noop"})
end
test ":correct" do

View File

@ -16,7 +16,10 @@ defmodule AccentTest.AuthenticationController do
|> AuthController.callback(nil)
user = Repo.get_by(User, email: "dummy@test.com")
token = Repo.get_by(AccessToken, user_id: user.id)
token = Repo.get_by(AccessToken, user_id: user.id, global: false)
global_token = Repo.get_by(AccessToken, user_id: user.id, global: true)
assert global_token
assert redirected_to(conn, 302) =~ "/?token=#{token.token}"
end
@ -27,7 +30,7 @@ defmodule AccentTest.AuthenticationController do
|> AuthController.callback(nil)
user = Repo.get_by(User, email: "dummy@test.com")
token = Repo.get_by(AccessToken, user_id: user.id)
token = Repo.get_by(AccessToken, user_id: user.id, global: false)
assert user.fullname === "Dummy"
assert redirected_to(conn, 302) =~ "/?token=#{token.token}"
end

View File

@ -446,7 +446,7 @@
},
"token_how": "How?",
"github_webhook_url_how": "How?",
"github_webhook_accent_cli_1": "You need to have a valid <a target=\"_blank\" rel=\"noopener noreferrer\" href=\"http\"://github.com/mirego/accent/tree/master/cli\">accent-cli</a> setup for the hook to work.",
"github_webhook_accent_cli_1": "You need to have a valid <a target=\"_blank\" rel=\"noopener noreferrer\" href=\"https://github.com/mirego/accent/tree/master/cli\">accent-cli</a> setup for the hook to work.",
"github_webhook_accent_cli_2": "The <a target=\"_blank\" rel=\"noopener noreferrer\" href=\"https://github.com/mirego/accent/blob/master/cli/examples/react/accent.json\">accent.json</a> file at the root of your project will be used.",
"github_webhook_url": "Make sure to add the webhook in your projects settings",
"webhook_url_how": "How to obtain a webhook URL?",