First
[anni] / lib / pleroma / migrators / context_objects_deletion_migrator.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Migrators.ContextObjectsDeletionMigrator do
6   defmodule State do
7     use Pleroma.Migrators.Support.BaseMigratorState
8
9     @impl Pleroma.Migrators.Support.BaseMigratorState
10     defdelegate data_migration(), to: Pleroma.DataMigration, as: :delete_context_objects
11   end
12
13   use Pleroma.Migrators.Support.BaseMigrator
14
15   alias Pleroma.Migrators.Support.BaseMigrator
16   alias Pleroma.Object
17
18   @doc "This migration removes objects created exclusively for contexts, containing only an `id` field."
19
20   @impl BaseMigrator
21   def feature_config_path, do: [:features, :delete_context_objects]
22
23   @impl BaseMigrator
24   def fault_rate_allowance, do: Config.get([:delete_context_objects, :fault_rate_allowance], 0)
25
26   @impl BaseMigrator
27   def perform do
28     data_migration_id = data_migration_id()
29     max_processed_id = get_stat(:max_processed_id, 0)
30
31     Logger.info("Deleting context objects from `objects` (from oid: #{max_processed_id})...")
32
33     query()
34     |> where([object], object.id > ^max_processed_id)
35     |> Repo.chunk_stream(100, :batches, timeout: :infinity)
36     |> Stream.each(fn objects ->
37       object_ids = Enum.map(objects, & &1.id)
38
39       results = Enum.map(object_ids, &delete_context_object(&1))
40
41       failed_ids =
42         results
43         |> Enum.filter(&(elem(&1, 0) == :error))
44         |> Enum.map(&elem(&1, 1))
45
46       chunk_affected_count =
47         results
48         |> Enum.filter(&(elem(&1, 0) == :ok))
49         |> length()
50
51       for failed_id <- failed_ids do
52         _ =
53           Repo.query(
54             "INSERT INTO data_migration_failed_ids(data_migration_id, record_id) " <>
55               "VALUES ($1, $2) ON CONFLICT DO NOTHING;",
56             [data_migration_id, failed_id]
57           )
58       end
59
60       _ =
61         Repo.query(
62           "DELETE FROM data_migration_failed_ids " <>
63             "WHERE data_migration_id = $1 AND record_id = ANY($2)",
64           [data_migration_id, object_ids -- failed_ids]
65         )
66
67       max_object_id = Enum.at(object_ids, -1)
68
69       put_stat(:max_processed_id, max_object_id)
70       increment_stat(:iteration_processed_count, length(object_ids))
71       increment_stat(:processed_count, length(object_ids))
72       increment_stat(:failed_count, length(failed_ids))
73       increment_stat(:affected_count, chunk_affected_count)
74       put_stat(:records_per_second, records_per_second())
75       persist_state()
76
77       # A quick and dirty approach to controlling the load this background migration imposes
78       sleep_interval = Config.get([:delete_context_objects, :sleep_interval_ms], 0)
79       Process.sleep(sleep_interval)
80     end)
81     |> Stream.run()
82   end
83
84   @impl BaseMigrator
85   def query do
86     # Context objects have no activity type, and only one field, `id`.
87     # Only those context objects are without types.
88     from(
89       object in Object,
90       where: fragment("(?)->'type' IS NULL", object.data),
91       select: %{
92         id: object.id
93       }
94     )
95   end
96
97   @spec delete_context_object(integer()) :: {:ok | :error, integer()}
98   defp delete_context_object(id) do
99     result =
100       %Object{id: id}
101       |> Repo.delete()
102       |> elem(0)
103
104     {result, id}
105   end
106
107   @impl BaseMigrator
108   def retry_failed do
109     data_migration_id = data_migration_id()
110
111     failed_objects_query()
112     |> Repo.chunk_stream(100, :one)
113     |> Stream.each(fn object ->
114       with {res, _} when res != :error <- delete_context_object(object.id) do
115         _ =
116           Repo.query(
117             "DELETE FROM data_migration_failed_ids " <>
118               "WHERE data_migration_id = $1 AND record_id = $2",
119             [data_migration_id, object.id]
120           )
121       end
122     end)
123     |> Stream.run()
124
125     put_stat(:failed_count, failures_count())
126     persist_state()
127
128     force_continue()
129   end
130
131   defp failed_objects_query do
132     from(o in Object)
133     |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"),
134       on: dmf.record_id == o.id
135     )
136     |> where([_o, dmf], dmf.data_migration_id == ^data_migration_id())
137     |> order_by([o], asc: o.id)
138   end
139 end