# Pleroma: A lightweight social networking server # Copyright © 2017-2022 Pleroma Authors # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.User.Backup do use Ecto.Schema import Ecto.Changeset import Ecto.Query import Pleroma.Web.Gettext require Logger require Pleroma.Constants alias Pleroma.Activity alias Pleroma.Bookmark alias Pleroma.Repo alias Pleroma.User alias Pleroma.User.Backup.State alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.UserView alias Pleroma.Workers.BackupWorker @type t :: %__MODULE__{} schema "backups" do field(:content_type, :string) field(:file_name, :string) field(:file_size, :integer, default: 0) field(:processed, :boolean, default: false) field(:state, State, default: :invalid) field(:processed_number, :integer, default: 0) belongs_to(:user, User, type: FlakeId.Ecto.CompatType) timestamps() end @config_impl Application.compile_env(:pleroma, [__MODULE__, :config_impl], Pleroma.Config) def create(user, admin_id \\ nil) do with :ok <- validate_limit(user, admin_id), {:ok, backup} <- user |> new() |> Repo.insert() do BackupWorker.process(backup, admin_id) end end def new(user) do rand_str = :crypto.strong_rand_bytes(32) |> Base.url_encode64(padding: false) datetime = Calendar.NaiveDateTime.Format.iso8601_basic(NaiveDateTime.utc_now()) name = "archive-#{user.nickname}-#{datetime}-#{rand_str}.zip" %__MODULE__{ user_id: user.id, content_type: "application/zip", file_name: name, state: :pending } end def delete(backup) do uploader = Pleroma.Config.get([Pleroma.Upload, :uploader]) with :ok <- uploader.delete_file(Path.join("backups", backup.file_name)) do Repo.delete(backup) end end defp validate_limit(_user, admin_id) when is_binary(admin_id), do: :ok defp validate_limit(user, nil) do case get_last(user.id) do %__MODULE__{inserted_at: inserted_at} -> days = Pleroma.Config.get([__MODULE__, :limit_days]) diff = Timex.diff(NaiveDateTime.utc_now(), inserted_at, :days) if diff > days do :ok else {:error, dngettext( "errors", "Last export was less than a day ago", "Last export was less than %{days} days ago", days, days: days )} end nil -> :ok end end def get_last(user_id) do __MODULE__ |> where(user_id: ^user_id) |> order_by(desc: :id) |> limit(1) |> Repo.one() end def list(%User{id: user_id}) do __MODULE__ |> where(user_id: ^user_id) |> order_by(desc: :id) |> Repo.all() end def remove_outdated(%__MODULE__{id: latest_id, user_id: user_id}) do __MODULE__ |> where(user_id: ^user_id) |> where([b], b.id != ^latest_id) |> Repo.all() |> Enum.each(&BackupWorker.delete/1) end def get(id), do: Repo.get(__MODULE__, id) defp set_state(backup, state, processed_number \\ nil) do struct = %{state: state} |> Pleroma.Maps.put_if_present(:processed_number, processed_number) backup |> cast(struct, [:state, :processed_number]) |> Repo.update() end def process( %__MODULE__{} = backup, processor_module \\ __MODULE__.Processor ) do set_state(backup, :running, 0) current_pid = self() task = Task.Supervisor.async_nolink( Pleroma.TaskSupervisor, processor_module, :do_process, [backup, current_pid] ) wait_backup(backup, backup.processed_number, task) end defp wait_backup(backup, current_processed, task) do wait_time = @config_impl.get([__MODULE__, :process_wait_time]) receive do {:progress, new_processed} -> total_processed = current_processed + new_processed set_state(backup, :running, total_processed) wait_backup(backup, total_processed, task) {:DOWN, _ref, _proc, _pid, reason} -> backup = get(backup.id) if reason != :normal do Logger.error("Backup #{backup.id} process ended abnormally: #{inspect(reason)}") {:ok, backup} = set_state(backup, :failed) cleanup(backup) {:error, %{ backup: backup, reason: :exit, details: reason }} else {:ok, backup} end after wait_time -> Logger.error( "Backup #{backup.id} timed out after no response for #{wait_time}ms, terminating" ) Task.Supervisor.terminate_child(Pleroma.TaskSupervisor, task.pid) {:ok, backup} = set_state(backup, :failed) cleanup(backup) {:error, %{ backup: backup, reason: :timeout }} end end @files [ 'actor.json', 'outbox.json', 'likes.json', 'bookmarks.json', 'followers.json', 'following.json' ] @spec export(Pleroma.User.Backup.t(), pid()) :: {:ok, String.t()} | :error def export(%__MODULE__{} = backup, caller_pid) do backup = Repo.preload(backup, :user) dir = backup_tempdir(backup) with :ok <- File.mkdir(dir), :ok <- actor(dir, backup.user, caller_pid), :ok <- statuses(dir, backup.user, caller_pid), :ok <- likes(dir, backup.user, caller_pid), :ok <- bookmarks(dir, backup.user, caller_pid), :ok <- followers(dir, backup.user, caller_pid), :ok <- following(dir, backup.user, caller_pid), {:ok, zip_path} <- :zip.create(backup.file_name, @files, cwd: dir), {:ok, _} <- File.rm_rf(dir) do {:ok, zip_path} else _ -> :error end end def dir(name) do dir = Pleroma.Config.get([__MODULE__, :dir]) || System.tmp_dir!() Path.join(dir, name) end def upload(%__MODULE__{} = backup, zip_path) do uploader = Pleroma.Config.get([Pleroma.Upload, :uploader]) upload = %Pleroma.Upload{ name: backup.file_name, tempfile: zip_path, content_type: backup.content_type, path: Path.join("backups", backup.file_name) } with {:ok, _} <- Pleroma.Uploaders.Uploader.put_file(uploader, upload), :ok <- File.rm(zip_path) do {:ok, upload} end end defp actor(dir, user, caller_pid) do with {:ok, json} <- UserView.render("user.json", %{user: user}) |> Map.merge(%{"likes" => "likes.json", "bookmarks" => "bookmarks.json"}) |> Jason.encode() do send(caller_pid, {:progress, 1}) File.write(Path.join(dir, "actor.json"), json) end end defp write_header(file, name) do IO.write( file, """ { "@context": "https://www.w3.org/ns/activitystreams", "id": "#{name}.json", "type": "OrderedCollection", "orderedItems": [ """ ) end defp should_report?(num, chunk_size), do: rem(num, chunk_size) == 0 defp backup_tempdir(backup) do name = String.trim_trailing(backup.file_name, ".zip") dir(name) end defp cleanup(backup) do dir = backup_tempdir(backup) File.rm_rf(dir) end defp write(query, dir, name, fun, caller_pid) do path = Path.join(dir, "#{name}.json") chunk_size = Pleroma.Config.get([__MODULE__, :process_chunk_size]) with {:ok, file} <- File.open(path, [:write, :utf8]), :ok <- write_header(file, name) do total = query |> Pleroma.Repo.chunk_stream(chunk_size, _returns_as = :one, timeout: :infinity) |> Enum.reduce(0, fn i, acc -> with {:ok, data} <- (try do fun.(i) rescue e -> {:error, e} end), {:ok, str} <- Jason.encode(data), :ok <- IO.write(file, str <> ",\n") do if should_report?(acc + 1, chunk_size) do send(caller_pid, {:progress, chunk_size}) end acc + 1 else {:error, e} -> Logger.warning( "Error processing backup item: #{inspect(e)}\n The item is: #{inspect(i)}" ) acc _ -> acc end end) send(caller_pid, {:progress, rem(total, chunk_size)}) with :ok <- :file.pwrite(file, {:eof, -2}, "\n],\n \"totalItems\": #{total}}") do File.close(file) end end end defp bookmarks(dir, %{id: user_id} = _user, caller_pid) do Bookmark |> where(user_id: ^user_id) |> join(:inner, [b], activity in assoc(b, :activity)) |> select([b, a], %{id: b.id, object: fragment("(?)->>'object'", a.data)}) |> write(dir, "bookmarks", fn a -> {:ok, a.object} end, caller_pid) end defp likes(dir, user, caller_pid) do user.ap_id |> Activity.Queries.by_actor() |> Activity.Queries.by_type("Like") |> select([like], %{id: like.id, object: fragment("(?)->>'object'", like.data)}) |> write(dir, "likes", fn a -> {:ok, a.object} end, caller_pid) end defp statuses(dir, user, caller_pid) do opts = %{} |> Map.put(:type, ["Create", "Announce"]) |> Map.put(:actor_id, user.ap_id) [ [Pleroma.Constants.as_public(), user.ap_id], User.following(user), Pleroma.List.memberships(user) ] |> Enum.concat() |> ActivityPub.fetch_activities_query(opts) |> write( dir, "outbox", fn a -> with {:ok, activity} <- Transmogrifier.prepare_outgoing(a.data) do {:ok, Map.delete(activity, "@context")} end end, caller_pid ) end defp followers(dir, user, caller_pid) do User.get_followers_query(user) |> write(dir, "followers", fn a -> {:ok, a.ap_id} end, caller_pid) end defp following(dir, user, caller_pid) do User.get_friends_query(user) |> write(dir, "following", fn a -> {:ok, a.ap_id} end, caller_pid) end end defmodule Pleroma.User.Backup.ProcessorAPI do @callback do_process(%Pleroma.User.Backup{}, pid()) :: {:ok, %Pleroma.User.Backup{}} | {:error, any()} end defmodule Pleroma.User.Backup.Processor do @behaviour Pleroma.User.Backup.ProcessorAPI alias Pleroma.Repo alias Pleroma.User.Backup import Ecto.Changeset @impl true def do_process(backup, current_pid) do with {:ok, zip_file} <- Backup.export(backup, current_pid), {:ok, %{size: size}} <- File.stat(zip_file), {:ok, _upload} <- Backup.upload(backup, zip_file) do backup |> cast( %{ file_size: size, processed: true, state: :complete }, [:file_size, :processed, :state] ) |> Repo.update() else e -> {:error, e} end end end