1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.User.Backup do
10 import Pleroma.Web.Gettext
13 require Pleroma.Constants
15 alias Pleroma.Activity
16 alias Pleroma.Bookmark
19 alias Pleroma.User.Backup.State
20 alias Pleroma.Web.ActivityPub.ActivityPub
21 alias Pleroma.Web.ActivityPub.Transmogrifier
22 alias Pleroma.Web.ActivityPub.UserView
23 alias Pleroma.Workers.BackupWorker
25 @type t :: %__MODULE__{}
28 field(:content_type, :string)
29 field(:file_name, :string)
30 field(:file_size, :integer, default: 0)
31 field(:processed, :boolean, default: false)
32 field(:state, State, default: :invalid)
33 field(:processed_number, :integer, default: 0)
35 belongs_to(:user, User, type: FlakeId.Ecto.CompatType)
40 @config_impl Application.compile_env(:pleroma, [__MODULE__, :config_impl], Pleroma.Config)
42 def create(user, admin_id \\ nil) do
43 with :ok <- validate_limit(user, admin_id),
44 {:ok, backup} <- user |> new() |> Repo.insert() do
45 BackupWorker.process(backup, admin_id)
50 rand_str = :crypto.strong_rand_bytes(32) |> Base.url_encode64(padding: false)
51 datetime = Calendar.NaiveDateTime.Format.iso8601_basic(NaiveDateTime.utc_now())
52 name = "archive-#{user.nickname}-#{datetime}-#{rand_str}.zip"
56 content_type: "application/zip",
63 uploader = Pleroma.Config.get([Pleroma.Upload, :uploader])
65 with :ok <- uploader.delete_file(Path.join("backups", backup.file_name)) do
70 defp validate_limit(_user, admin_id) when is_binary(admin_id), do: :ok
72 defp validate_limit(user, nil) do
73 case get_last(user.id) do
74 %__MODULE__{inserted_at: inserted_at} ->
75 days = Pleroma.Config.get([__MODULE__, :limit_days])
76 diff = Timex.diff(NaiveDateTime.utc_now(), inserted_at, :days)
84 "Last export was less than a day ago",
85 "Last export was less than %{days} days ago",
96 def get_last(user_id) do
98 |> where(user_id: ^user_id)
99 |> order_by(desc: :id)
104 def list(%User{id: user_id}) do
106 |> where(user_id: ^user_id)
107 |> order_by(desc: :id)
111 def remove_outdated(%__MODULE__{id: latest_id, user_id: user_id}) do
113 |> where(user_id: ^user_id)
114 |> where([b], b.id != ^latest_id)
116 |> Enum.each(&BackupWorker.delete/1)
119 def get(id), do: Repo.get(__MODULE__, id)
121 defp set_state(backup, state, processed_number \\ nil) do
124 |> Pleroma.Maps.put_if_present(:processed_number, processed_number)
127 |> cast(struct, [:state, :processed_number])
132 %__MODULE__{} = backup,
133 processor_module \\ __MODULE__.Processor
135 set_state(backup, :running, 0)
140 Task.Supervisor.async_nolink(
141 Pleroma.TaskSupervisor,
144 [backup, current_pid]
147 wait_backup(backup, backup.processed_number, task)
150 defp wait_backup(backup, current_processed, task) do
151 wait_time = @config_impl.get([__MODULE__, :process_wait_time])
154 {:progress, new_processed} ->
155 total_processed = current_processed + new_processed
157 set_state(backup, :running, total_processed)
158 wait_backup(backup, total_processed, task)
160 {:DOWN, _ref, _proc, _pid, reason} ->
161 backup = get(backup.id)
163 if reason != :normal do
164 Logger.error("Backup #{backup.id} process ended abnormally: #{inspect(reason)}")
166 {:ok, backup} = set_state(backup, :failed)
182 "Backup #{backup.id} timed out after no response for #{wait_time}ms, terminating"
185 Task.Supervisor.terminate_child(Pleroma.TaskSupervisor, task.pid)
187 {:ok, backup} = set_state(backup, :failed)
207 @spec export(Pleroma.User.Backup.t(), pid()) :: {:ok, String.t()} | :error
208 def export(%__MODULE__{} = backup, caller_pid) do
209 backup = Repo.preload(backup, :user)
210 dir = backup_tempdir(backup)
212 with :ok <- File.mkdir(dir),
213 :ok <- actor(dir, backup.user, caller_pid),
214 :ok <- statuses(dir, backup.user, caller_pid),
215 :ok <- likes(dir, backup.user, caller_pid),
216 :ok <- bookmarks(dir, backup.user, caller_pid),
217 :ok <- followers(dir, backup.user, caller_pid),
218 :ok <- following(dir, backup.user, caller_pid),
219 {:ok, zip_path} <- :zip.create(backup.file_name, @files, cwd: dir),
220 {:ok, _} <- File.rm_rf(dir) do
228 dir = Pleroma.Config.get([__MODULE__, :dir]) || System.tmp_dir!()
232 def upload(%__MODULE__{} = backup, zip_path) do
233 uploader = Pleroma.Config.get([Pleroma.Upload, :uploader])
235 upload = %Pleroma.Upload{
236 name: backup.file_name,
238 content_type: backup.content_type,
239 path: Path.join("backups", backup.file_name)
242 with {:ok, _} <- Pleroma.Uploaders.Uploader.put_file(uploader, upload),
243 :ok <- File.rm(zip_path) do
248 defp actor(dir, user, caller_pid) do
250 UserView.render("user.json", %{user: user})
251 |> Map.merge(%{"likes" => "likes.json", "bookmarks" => "bookmarks.json"})
253 send(caller_pid, {:progress, 1})
254 File.write(Path.join(dir, "actor.json"), json)
258 defp write_header(file, name) do
263 "@context": "https://www.w3.org/ns/activitystreams",
264 "id": "#{name}.json",
265 "type": "OrderedCollection",
272 defp should_report?(num, chunk_size), do: rem(num, chunk_size) == 0
274 defp backup_tempdir(backup) do
275 name = String.trim_trailing(backup.file_name, ".zip")
279 defp cleanup(backup) do
280 dir = backup_tempdir(backup)
284 defp write(query, dir, name, fun, caller_pid) do
285 path = Path.join(dir, "#{name}.json")
287 chunk_size = Pleroma.Config.get([__MODULE__, :process_chunk_size])
289 with {:ok, file} <- File.open(path, [:write, :utf8]),
290 :ok <- write_header(file, name) do
293 |> Pleroma.Repo.chunk_stream(chunk_size, _returns_as = :one, timeout: :infinity)
294 |> Enum.reduce(0, fn i, acc ->
301 {:ok, str} <- Jason.encode(data),
302 :ok <- IO.write(file, str <> ",\n") do
303 if should_report?(acc + 1, chunk_size) do
304 send(caller_pid, {:progress, chunk_size})
311 "Error processing backup item: #{inspect(e)}\n The item is: #{inspect(i)}"
321 send(caller_pid, {:progress, rem(total, chunk_size)})
323 with :ok <- :file.pwrite(file, {:eof, -2}, "\n],\n \"totalItems\": #{total}}") do
329 defp bookmarks(dir, %{id: user_id} = _user, caller_pid) do
331 |> where(user_id: ^user_id)
332 |> join(:inner, [b], activity in assoc(b, :activity))
333 |> select([b, a], %{id: b.id, object: fragment("(?)->>'object'", a.data)})
334 |> write(dir, "bookmarks", fn a -> {:ok, a.object} end, caller_pid)
337 defp likes(dir, user, caller_pid) do
339 |> Activity.Queries.by_actor()
340 |> Activity.Queries.by_type("Like")
341 |> select([like], %{id: like.id, object: fragment("(?)->>'object'", like.data)})
342 |> write(dir, "likes", fn a -> {:ok, a.object} end, caller_pid)
345 defp statuses(dir, user, caller_pid) do
348 |> Map.put(:type, ["Create", "Announce"])
349 |> Map.put(:actor_id, user.ap_id)
352 [Pleroma.Constants.as_public(), user.ap_id],
353 User.following(user),
354 Pleroma.List.memberships(user)
357 |> ActivityPub.fetch_activities_query(opts)
362 with {:ok, activity} <- Transmogrifier.prepare_outgoing(a.data) do
363 {:ok, Map.delete(activity, "@context")}
370 defp followers(dir, user, caller_pid) do
371 User.get_followers_query(user)
372 |> write(dir, "followers", fn a -> {:ok, a.ap_id} end, caller_pid)
375 defp following(dir, user, caller_pid) do
376 User.get_friends_query(user)
377 |> write(dir, "following", fn a -> {:ok, a.ap_id} end, caller_pid)
381 defmodule Pleroma.User.Backup.ProcessorAPI do
382 @callback do_process(%Pleroma.User.Backup{}, pid()) ::
383 {:ok, %Pleroma.User.Backup{}} | {:error, any()}
386 defmodule Pleroma.User.Backup.Processor do
387 @behaviour Pleroma.User.Backup.ProcessorAPI
390 alias Pleroma.User.Backup
392 import Ecto.Changeset
395 def do_process(backup, current_pid) do
396 with {:ok, zip_file} <- Backup.export(backup, current_pid),
397 {:ok, %{size: size}} <- File.stat(zip_file),
398 {:ok, _upload} <- Backup.upload(backup, zip_file) do
406 [:file_size, :processed, :state]