total rebase
[anni] / lib / pleroma / user / backup.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.User.Backup do
6   use Ecto.Schema
7
8   import Ecto.Changeset
9   import Ecto.Query
10   import Pleroma.Web.Gettext
11
12   require Logger
13   require Pleroma.Constants
14
15   alias Pleroma.Activity
16   alias Pleroma.Bookmark
17   alias Pleroma.Repo
18   alias Pleroma.User
19   alias Pleroma.User.Backup.State
20   alias Pleroma.Web.ActivityPub.ActivityPub
21   alias Pleroma.Web.ActivityPub.Transmogrifier
22   alias Pleroma.Web.ActivityPub.UserView
23   alias Pleroma.Workers.BackupWorker
24
25   @type t :: %__MODULE__{}
26
27   schema "backups" do
28     field(:content_type, :string)
29     field(:file_name, :string)
30     field(:file_size, :integer, default: 0)
31     field(:processed, :boolean, default: false)
32     field(:state, State, default: :invalid)
33     field(:processed_number, :integer, default: 0)
34
35     belongs_to(:user, User, type: FlakeId.Ecto.CompatType)
36
37     timestamps()
38   end
39
40   @config_impl Application.compile_env(:pleroma, [__MODULE__, :config_impl], Pleroma.Config)
41
42   def create(user, admin_id \\ nil) do
43     with :ok <- validate_limit(user, admin_id),
44          {:ok, backup} <- user |> new() |> Repo.insert() do
45       BackupWorker.process(backup, admin_id)
46     end
47   end
48
49   def new(user) do
50     rand_str = :crypto.strong_rand_bytes(32) |> Base.url_encode64(padding: false)
51     datetime = Calendar.NaiveDateTime.Format.iso8601_basic(NaiveDateTime.utc_now())
52     name = "archive-#{user.nickname}-#{datetime}-#{rand_str}.zip"
53
54     %__MODULE__{
55       user_id: user.id,
56       content_type: "application/zip",
57       file_name: name,
58       state: :pending
59     }
60   end
61
62   def delete(backup) do
63     uploader = Pleroma.Config.get([Pleroma.Upload, :uploader])
64
65     with :ok <- uploader.delete_file(Path.join("backups", backup.file_name)) do
66       Repo.delete(backup)
67     end
68   end
69
70   defp validate_limit(_user, admin_id) when is_binary(admin_id), do: :ok
71
72   defp validate_limit(user, nil) do
73     case get_last(user.id) do
74       %__MODULE__{inserted_at: inserted_at} ->
75         days = Pleroma.Config.get([__MODULE__, :limit_days])
76         diff = Timex.diff(NaiveDateTime.utc_now(), inserted_at, :days)
77
78         if diff > days do
79           :ok
80         else
81           {:error,
82            dngettext(
83              "errors",
84              "Last export was less than a day ago",
85              "Last export was less than %{days} days ago",
86              days,
87              days: days
88            )}
89         end
90
91       nil ->
92         :ok
93     end
94   end
95
96   def get_last(user_id) do
97     __MODULE__
98     |> where(user_id: ^user_id)
99     |> order_by(desc: :id)
100     |> limit(1)
101     |> Repo.one()
102   end
103
104   def list(%User{id: user_id}) do
105     __MODULE__
106     |> where(user_id: ^user_id)
107     |> order_by(desc: :id)
108     |> Repo.all()
109   end
110
111   def remove_outdated(%__MODULE__{id: latest_id, user_id: user_id}) do
112     __MODULE__
113     |> where(user_id: ^user_id)
114     |> where([b], b.id != ^latest_id)
115     |> Repo.all()
116     |> Enum.each(&BackupWorker.delete/1)
117   end
118
119   def get(id), do: Repo.get(__MODULE__, id)
120
121   defp set_state(backup, state, processed_number \\ nil) do
122     struct =
123       %{state: state}
124       |> Pleroma.Maps.put_if_present(:processed_number, processed_number)
125
126     backup
127     |> cast(struct, [:state, :processed_number])
128     |> Repo.update()
129   end
130
131   def process(
132         %__MODULE__{} = backup,
133         processor_module \\ __MODULE__.Processor
134       ) do
135     set_state(backup, :running, 0)
136
137     current_pid = self()
138
139     task =
140       Task.Supervisor.async_nolink(
141         Pleroma.TaskSupervisor,
142         processor_module,
143         :do_process,
144         [backup, current_pid]
145       )
146
147     wait_backup(backup, backup.processed_number, task)
148   end
149
150   defp wait_backup(backup, current_processed, task) do
151     wait_time = @config_impl.get([__MODULE__, :process_wait_time])
152
153     receive do
154       {:progress, new_processed} ->
155         total_processed = current_processed + new_processed
156
157         set_state(backup, :running, total_processed)
158         wait_backup(backup, total_processed, task)
159
160       {:DOWN, _ref, _proc, _pid, reason} ->
161         backup = get(backup.id)
162
163         if reason != :normal do
164           Logger.error("Backup #{backup.id} process ended abnormally: #{inspect(reason)}")
165
166           {:ok, backup} = set_state(backup, :failed)
167
168           cleanup(backup)
169
170           {:error,
171            %{
172              backup: backup,
173              reason: :exit,
174              details: reason
175            }}
176         else
177           {:ok, backup}
178         end
179     after
180       wait_time ->
181         Logger.error(
182           "Backup #{backup.id} timed out after no response for #{wait_time}ms, terminating"
183         )
184
185         Task.Supervisor.terminate_child(Pleroma.TaskSupervisor, task.pid)
186
187         {:ok, backup} = set_state(backup, :failed)
188
189         cleanup(backup)
190
191         {:error,
192          %{
193            backup: backup,
194            reason: :timeout
195          }}
196     end
197   end
198
199   @files [
200     'actor.json',
201     'outbox.json',
202     'likes.json',
203     'bookmarks.json',
204     'followers.json',
205     'following.json'
206   ]
207   @spec export(Pleroma.User.Backup.t(), pid()) :: {:ok, String.t()} | :error
208   def export(%__MODULE__{} = backup, caller_pid) do
209     backup = Repo.preload(backup, :user)
210     dir = backup_tempdir(backup)
211
212     with :ok <- File.mkdir(dir),
213          :ok <- actor(dir, backup.user, caller_pid),
214          :ok <- statuses(dir, backup.user, caller_pid),
215          :ok <- likes(dir, backup.user, caller_pid),
216          :ok <- bookmarks(dir, backup.user, caller_pid),
217          :ok <- followers(dir, backup.user, caller_pid),
218          :ok <- following(dir, backup.user, caller_pid),
219          {:ok, zip_path} <- :zip.create(backup.file_name, @files, cwd: dir),
220          {:ok, _} <- File.rm_rf(dir) do
221       {:ok, zip_path}
222     else
223       _ -> :error
224     end
225   end
226
227   def dir(name) do
228     dir = Pleroma.Config.get([__MODULE__, :dir]) || System.tmp_dir!()
229     Path.join(dir, name)
230   end
231
232   def upload(%__MODULE__{} = backup, zip_path) do
233     uploader = Pleroma.Config.get([Pleroma.Upload, :uploader])
234
235     upload = %Pleroma.Upload{
236       name: backup.file_name,
237       tempfile: zip_path,
238       content_type: backup.content_type,
239       path: Path.join("backups", backup.file_name)
240     }
241
242     with {:ok, _} <- Pleroma.Uploaders.Uploader.put_file(uploader, upload),
243          :ok <- File.rm(zip_path) do
244       {:ok, upload}
245     end
246   end
247
248   defp actor(dir, user, caller_pid) do
249     with {:ok, json} <-
250            UserView.render("user.json", %{user: user})
251            |> Map.merge(%{"likes" => "likes.json", "bookmarks" => "bookmarks.json"})
252            |> Jason.encode() do
253       send(caller_pid, {:progress, 1})
254       File.write(Path.join(dir, "actor.json"), json)
255     end
256   end
257
258   defp write_header(file, name) do
259     IO.write(
260       file,
261       """
262       {
263         "@context": "https://www.w3.org/ns/activitystreams",
264         "id": "#{name}.json",
265         "type": "OrderedCollection",
266         "orderedItems": [
267
268       """
269     )
270   end
271
272   defp should_report?(num, chunk_size), do: rem(num, chunk_size) == 0
273
274   defp backup_tempdir(backup) do
275     name = String.trim_trailing(backup.file_name, ".zip")
276     dir(name)
277   end
278
279   defp cleanup(backup) do
280     dir = backup_tempdir(backup)
281     File.rm_rf(dir)
282   end
283
284   defp write(query, dir, name, fun, caller_pid) do
285     path = Path.join(dir, "#{name}.json")
286
287     chunk_size = Pleroma.Config.get([__MODULE__, :process_chunk_size])
288
289     with {:ok, file} <- File.open(path, [:write, :utf8]),
290          :ok <- write_header(file, name) do
291       total =
292         query
293         |> Pleroma.Repo.chunk_stream(chunk_size, _returns_as = :one, timeout: :infinity)
294         |> Enum.reduce(0, fn i, acc ->
295           with {:ok, data} <-
296                  (try do
297                     fun.(i)
298                   rescue
299                     e -> {:error, e}
300                   end),
301                {:ok, str} <- Jason.encode(data),
302                :ok <- IO.write(file, str <> ",\n") do
303             if should_report?(acc + 1, chunk_size) do
304               send(caller_pid, {:progress, chunk_size})
305             end
306
307             acc + 1
308           else
309             {:error, e} ->
310               Logger.warning(
311                 "Error processing backup item: #{inspect(e)}\n The item is: #{inspect(i)}"
312               )
313
314               acc
315
316             _ ->
317               acc
318           end
319         end)
320
321       send(caller_pid, {:progress, rem(total, chunk_size)})
322
323       with :ok <- :file.pwrite(file, {:eof, -2}, "\n],\n  \"totalItems\": #{total}}") do
324         File.close(file)
325       end
326     end
327   end
328
329   defp bookmarks(dir, %{id: user_id} = _user, caller_pid) do
330     Bookmark
331     |> where(user_id: ^user_id)
332     |> join(:inner, [b], activity in assoc(b, :activity))
333     |> select([b, a], %{id: b.id, object: fragment("(?)->>'object'", a.data)})
334     |> write(dir, "bookmarks", fn a -> {:ok, a.object} end, caller_pid)
335   end
336
337   defp likes(dir, user, caller_pid) do
338     user.ap_id
339     |> Activity.Queries.by_actor()
340     |> Activity.Queries.by_type("Like")
341     |> select([like], %{id: like.id, object: fragment("(?)->>'object'", like.data)})
342     |> write(dir, "likes", fn a -> {:ok, a.object} end, caller_pid)
343   end
344
345   defp statuses(dir, user, caller_pid) do
346     opts =
347       %{}
348       |> Map.put(:type, ["Create", "Announce"])
349       |> Map.put(:actor_id, user.ap_id)
350
351     [
352       [Pleroma.Constants.as_public(), user.ap_id],
353       User.following(user),
354       Pleroma.List.memberships(user)
355     ]
356     |> Enum.concat()
357     |> ActivityPub.fetch_activities_query(opts)
358     |> write(
359       dir,
360       "outbox",
361       fn a ->
362         with {:ok, activity} <- Transmogrifier.prepare_outgoing(a.data) do
363           {:ok, Map.delete(activity, "@context")}
364         end
365       end,
366       caller_pid
367     )
368   end
369
370   defp followers(dir, user, caller_pid) do
371     User.get_followers_query(user)
372     |> write(dir, "followers", fn a -> {:ok, a.ap_id} end, caller_pid)
373   end
374
375   defp following(dir, user, caller_pid) do
376     User.get_friends_query(user)
377     |> write(dir, "following", fn a -> {:ok, a.ap_id} end, caller_pid)
378   end
379 end
380
381 defmodule Pleroma.User.Backup.ProcessorAPI do
382   @callback do_process(%Pleroma.User.Backup{}, pid()) ::
383               {:ok, %Pleroma.User.Backup{}} | {:error, any()}
384 end
385
386 defmodule Pleroma.User.Backup.Processor do
387   @behaviour Pleroma.User.Backup.ProcessorAPI
388
389   alias Pleroma.Repo
390   alias Pleroma.User.Backup
391
392   import Ecto.Changeset
393
394   @impl true
395   def do_process(backup, current_pid) do
396     with {:ok, zip_file} <- Backup.export(backup, current_pid),
397          {:ok, %{size: size}} <- File.stat(zip_file),
398          {:ok, _upload} <- Backup.upload(backup, zip_file) do
399       backup
400       |> cast(
401         %{
402           file_size: size,
403           processed: true,
404           state: :complete
405         },
406         [:file_size, :processed, :state]
407       )
408       |> Repo.update()
409     else
410       e -> {:error, e}
411     end
412   end
413 end