First
[anni] / lib / pleroma / migrators / support / base_migrator.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Migrators.Support.BaseMigrator do
6   @moduledoc """
7   Base background migrator functionality.
8   """
9
10   @callback perform() :: any()
11   @callback retry_failed() :: any()
12   @callback feature_config_path() :: list(atom())
13   @callback query() :: Ecto.Query.t()
14   @callback fault_rate_allowance() :: integer() | float()
15
16   defmacro __using__(_opts) do
17     quote do
18       use GenServer
19
20       require Logger
21
22       import Ecto.Query
23
24       alias __MODULE__.State
25       alias Pleroma.Config
26       alias Pleroma.Repo
27
28       @behaviour Pleroma.Migrators.Support.BaseMigrator
29
30       defdelegate data_migration(), to: State
31       defdelegate data_migration_id(), to: State
32       defdelegate state(), to: State
33       defdelegate persist_state(), to: State, as: :persist_to_db
34       defdelegate get_stat(key, value \\ nil), to: State, as: :get_data_key
35       defdelegate put_stat(key, value), to: State, as: :put_data_key
36       defdelegate increment_stat(key, increment), to: State, as: :increment_data_key
37
38       @reg_name {:global, __MODULE__}
39
40       def whereis, do: GenServer.whereis(@reg_name)
41
42       def start_link(_) do
43         case whereis() do
44           nil ->
45             GenServer.start_link(__MODULE__, nil, name: @reg_name)
46
47           pid ->
48             {:ok, pid}
49         end
50       end
51
52       @impl true
53       def init(_) do
54         {:ok, nil, {:continue, :init_state}}
55       end
56
57       @impl true
58       def handle_continue(:init_state, _state) do
59         {:ok, _} = State.start_link(nil)
60
61         data_migration = data_migration()
62         manual_migrations = Config.get([:instance, :manual_data_migrations], [])
63
64         cond do
65           Config.get(:env) == :test ->
66             update_status(:noop)
67
68           is_nil(data_migration) ->
69             message = "Data migration does not exist."
70             update_status(:failed, message)
71             Logger.error("#{__MODULE__}: #{message}")
72
73           data_migration.state == :manual or data_migration.name in manual_migrations ->
74             message = "Data migration is in manual execution or manual fix mode."
75             update_status(:manual, message)
76             Logger.warn("#{__MODULE__}: #{message}")
77
78           data_migration.state == :complete ->
79             on_complete(data_migration)
80
81           true ->
82             send(self(), :perform)
83         end
84
85         {:noreply, nil}
86       end
87
88       @impl true
89       def handle_info(:perform, state) do
90         State.reinit()
91
92         update_status(:running)
93         put_stat(:iteration_processed_count, 0)
94         put_stat(:started_at, NaiveDateTime.utc_now())
95
96         perform()
97
98         fault_rate = fault_rate()
99         put_stat(:fault_rate, fault_rate)
100         fault_rate_allowance = fault_rate_allowance()
101
102         cond do
103           fault_rate == 0 ->
104             set_complete()
105
106           is_float(fault_rate) and fault_rate <= fault_rate_allowance ->
107             message = """
108             Done with fault rate of #{fault_rate} which doesn't exceed #{fault_rate_allowance}.
109             Putting data migration to manual fix mode. Try running `#{__MODULE__}.retry_failed/0`.
110             """
111
112             Logger.warn("#{__MODULE__}: #{message}")
113             update_status(:manual, message)
114             on_complete(data_migration())
115
116           true ->
117             message = "Too many failures. Try running `#{__MODULE__}.retry_failed/0`."
118             Logger.error("#{__MODULE__}: #{message}")
119             update_status(:failed, message)
120         end
121
122         persist_state()
123         {:noreply, state}
124       end
125
126       defp on_complete(data_migration) do
127         if data_migration.feature_lock || feature_state() == :disabled do
128           Logger.warn(
129             "#{__MODULE__}: migration complete but feature is locked; consider enabling."
130           )
131
132           :noop
133         else
134           Config.put(feature_config_path(), :enabled)
135           :ok
136         end
137       end
138
139       @doc "Approximate count for current iteration (including processed records count)"
140       def count(force \\ false, timeout \\ :infinity) do
141         stored_count = get_stat(:count)
142
143         if stored_count && !force do
144           stored_count
145         else
146           processed_count = get_stat(:processed_count, 0)
147           max_processed_id = get_stat(:max_processed_id, 0)
148           query = where(query(), [entity], entity.id > ^max_processed_id)
149
150           count = Repo.aggregate(query, :count, :id, timeout: timeout) + processed_count
151           put_stat(:count, count)
152           persist_state()
153
154           count
155         end
156       end
157
158       def failures_count do
159         with {:ok, %{rows: [[count]]}} <-
160                Repo.query(
161                  "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;",
162                  [data_migration_id()]
163                ) do
164           count
165         end
166       end
167
168       def feature_state, do: Config.get(feature_config_path())
169
170       def force_continue do
171         send(whereis(), :perform)
172       end
173
174       def force_restart do
175         :ok = State.reset()
176         force_continue()
177       end
178
179       def set_complete do
180         update_status(:complete)
181         persist_state()
182         on_complete(data_migration())
183       end
184
185       defp update_status(status, message \\ nil) do
186         put_stat(:state, status)
187         put_stat(:message, message)
188       end
189
190       defp fault_rate do
191         with failures_count when is_integer(failures_count) <- failures_count() do
192           failures_count / Enum.max([get_stat(:affected_count, 0), 1])
193         else
194           _ -> :error
195         end
196       end
197
198       defp records_per_second do
199         get_stat(:iteration_processed_count, 0) / Enum.max([running_time(), 1])
200       end
201
202       defp running_time do
203         NaiveDateTime.diff(
204           NaiveDateTime.utc_now(),
205           get_stat(:started_at, NaiveDateTime.utc_now())
206         )
207       end
208     end
209   end
210 end