1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Migrators.ContextObjectsDeletionMigrator do
7 use Pleroma.Migrators.Support.BaseMigratorState
9 @impl Pleroma.Migrators.Support.BaseMigratorState
10 defdelegate data_migration(), to: Pleroma.DataMigration, as: :delete_context_objects
13 use Pleroma.Migrators.Support.BaseMigrator
15 alias Pleroma.Migrators.Support.BaseMigrator
18 @doc "This migration removes objects created exclusively for contexts, containing only an `id` field."
21 def feature_config_path, do: [:features, :delete_context_objects]
24 def fault_rate_allowance, do: Config.get([:delete_context_objects, :fault_rate_allowance], 0)
28 data_migration_id = data_migration_id()
29 max_processed_id = get_stat(:max_processed_id, 0)
31 Logger.info("Deleting context objects from `objects` (from oid: #{max_processed_id})...")
34 |> where([object], object.id > ^max_processed_id)
35 |> Repo.chunk_stream(100, :batches, timeout: :infinity)
36 |> Stream.each(fn objects ->
37 object_ids = Enum.map(objects, & &1.id)
39 results = Enum.map(object_ids, &delete_context_object(&1))
43 |> Enum.filter(&(elem(&1, 0) == :error))
44 |> Enum.map(&elem(&1, 1))
46 chunk_affected_count =
48 |> Enum.filter(&(elem(&1, 0) == :ok))
51 for failed_id <- failed_ids do
54 "INSERT INTO data_migration_failed_ids(data_migration_id, record_id) " <>
55 "VALUES ($1, $2) ON CONFLICT DO NOTHING;",
56 [data_migration_id, failed_id]
62 "DELETE FROM data_migration_failed_ids " <>
63 "WHERE data_migration_id = $1 AND record_id = ANY($2)",
64 [data_migration_id, object_ids -- failed_ids]
67 max_object_id = Enum.at(object_ids, -1)
69 put_stat(:max_processed_id, max_object_id)
70 increment_stat(:iteration_processed_count, length(object_ids))
71 increment_stat(:processed_count, length(object_ids))
72 increment_stat(:failed_count, length(failed_ids))
73 increment_stat(:affected_count, chunk_affected_count)
74 put_stat(:records_per_second, records_per_second())
77 # A quick and dirty approach to controlling the load this background migration imposes
78 sleep_interval = Config.get([:delete_context_objects, :sleep_interval_ms], 0)
79 Process.sleep(sleep_interval)
86 # Context objects have no activity type, and only one field, `id`.
87 # Only those context objects are without types.
90 where: fragment("(?)->'type' IS NULL", object.data),
97 @spec delete_context_object(integer()) :: {:ok | :error, integer()}
98 defp delete_context_object(id) do
109 data_migration_id = data_migration_id()
111 failed_objects_query()
112 |> Repo.chunk_stream(100, :one)
113 |> Stream.each(fn object ->
114 with {res, _} when res != :error <- delete_context_object(object.id) do
117 "DELETE FROM data_migration_failed_ids " <>
118 "WHERE data_migration_id = $1 AND record_id = $2",
119 [data_migration_id, object.id]
125 put_stat(:failed_count, failures_count())
131 defp failed_objects_query do
133 |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"),
134 on: dmf.record_id == o.id
136 |> where([_o, dmf], dmf.data_migration_id == ^data_migration_id())
137 |> order_by([o], asc: o.id)