total rebase
[anni] / lib / pleroma / user / backup.ex
old mode 100755 (executable)
new mode 100644 (file)
index 9df0106..65e0bac
@@ -9,28 +9,36 @@ defmodule Pleroma.User.Backup do
   import Ecto.Query
   import Pleroma.Web.Gettext
 
+  require Logger
   require Pleroma.Constants
 
   alias Pleroma.Activity
   alias Pleroma.Bookmark
   alias Pleroma.Repo
   alias Pleroma.User
+  alias Pleroma.User.Backup.State
   alias Pleroma.Web.ActivityPub.ActivityPub
   alias Pleroma.Web.ActivityPub.Transmogrifier
   alias Pleroma.Web.ActivityPub.UserView
   alias Pleroma.Workers.BackupWorker
 
+  @type t :: %__MODULE__{}
+
   schema "backups" do
     field(:content_type, :string)
     field(:file_name, :string)
     field(:file_size, :integer, default: 0)
     field(:processed, :boolean, default: false)
+    field(:state, State, default: :invalid)
+    field(:processed_number, :integer, default: 0)
 
     belongs_to(:user, User, type: FlakeId.Ecto.CompatType)
 
     timestamps()
   end
 
+  @config_impl Application.compile_env(:pleroma, [__MODULE__, :config_impl], Pleroma.Config)
+
   def create(user, admin_id \\ nil) do
     with :ok <- validate_limit(user, admin_id),
          {:ok, backup} <- user |> new() |> Repo.insert() do
@@ -46,7 +54,8 @@ defmodule Pleroma.User.Backup do
     %__MODULE__{
       user_id: user.id,
       content_type: "application/zip",
-      file_name: name
+      file_name: name,
+      state: :pending
     }
   end
 
@@ -109,30 +118,109 @@ defmodule Pleroma.User.Backup do
 
   def get(id), do: Repo.get(__MODULE__, id)
 
-  def process(%__MODULE__{} = backup) do
-    with {:ok, zip_file} <- export(backup),
-         {:ok, %{size: size}} <- File.stat(zip_file),
-         {:ok, _upload} <- upload(backup, zip_file) do
-      backup
-      |> cast(%{file_size: size, processed: true}, [:file_size, :processed])
-      |> Repo.update()
+  defp set_state(backup, state, processed_number \\ nil) do
+    struct =
+      %{state: state}
+      |> Pleroma.Maps.put_if_present(:processed_number, processed_number)
+
+    backup
+    |> cast(struct, [:state, :processed_number])
+    |> Repo.update()
+  end
+
+  def process(
+        %__MODULE__{} = backup,
+        processor_module \\ __MODULE__.Processor
+      ) do
+    set_state(backup, :running, 0)
+
+    current_pid = self()
+
+    task =
+      Task.Supervisor.async_nolink(
+        Pleroma.TaskSupervisor,
+        processor_module,
+        :do_process,
+        [backup, current_pid]
+      )
+
+    wait_backup(backup, backup.processed_number, task)
+  end
+
+  defp wait_backup(backup, current_processed, task) do
+    wait_time = @config_impl.get([__MODULE__, :process_wait_time])
+
+    receive do
+      {:progress, new_processed} ->
+        total_processed = current_processed + new_processed
+
+        set_state(backup, :running, total_processed)
+        wait_backup(backup, total_processed, task)
+
+      {:DOWN, _ref, _proc, _pid, reason} ->
+        backup = get(backup.id)
+
+        if reason != :normal do
+          Logger.error("Backup #{backup.id} process ended abnormally: #{inspect(reason)}")
+
+          {:ok, backup} = set_state(backup, :failed)
+
+          cleanup(backup)
+
+          {:error,
+           %{
+             backup: backup,
+             reason: :exit,
+             details: reason
+           }}
+        else
+          {:ok, backup}
+        end
+    after
+      wait_time ->
+        Logger.error(
+          "Backup #{backup.id} timed out after no response for #{wait_time}ms, terminating"
+        )
+
+        Task.Supervisor.terminate_child(Pleroma.TaskSupervisor, task.pid)
+
+        {:ok, backup} = set_state(backup, :failed)
+
+        cleanup(backup)
+
+        {:error,
+         %{
+           backup: backup,
+           reason: :timeout
+         }}
     end
   end
 
-  @files ['actor.json', 'outbox.json', 'likes.json', 'bookmarks.json']
-  def export(%__MODULE__{} = backup) do
+  @files [
+    'actor.json',
+    'outbox.json',
+    'likes.json',
+    'bookmarks.json',
+    'followers.json',
+    'following.json'
+  ]
+  @spec export(Pleroma.User.Backup.t(), pid()) :: {:ok, String.t()} | :error
+  def export(%__MODULE__{} = backup, caller_pid) do
     backup = Repo.preload(backup, :user)
-    name = String.trim_trailing(backup.file_name, ".zip")
-    dir = dir(name)
+    dir = backup_tempdir(backup)
 
     with :ok <- File.mkdir(dir),
-         :ok <- actor(dir, backup.user),
-         :ok <- statuses(dir, backup.user),
-         :ok <- likes(dir, backup.user),
-         :ok <- bookmarks(dir, backup.user),
-         {:ok, zip_path} <- :zip.create(String.to_charlist(dir <> ".zip"), @files, cwd: dir),
+         :ok <- actor(dir, backup.user, caller_pid),
+         :ok <- statuses(dir, backup.user, caller_pid),
+         :ok <- likes(dir, backup.user, caller_pid),
+         :ok <- bookmarks(dir, backup.user, caller_pid),
+         :ok <- followers(dir, backup.user, caller_pid),
+         :ok <- following(dir, backup.user, caller_pid),
+         {:ok, zip_path} <- :zip.create(backup.file_name, @files, cwd: dir),
          {:ok, _} <- File.rm_rf(dir) do
-      {:ok, to_string(zip_path)}
+      {:ok, zip_path}
+    else
+      _ -> :error
     end
   end
 
@@ -157,11 +245,12 @@ defmodule Pleroma.User.Backup do
     end
   end
 
-  defp actor(dir, user) do
+  defp actor(dir, user, caller_pid) do
     with {:ok, json} <-
            UserView.render("user.json", %{user: user})
            |> Map.merge(%{"likes" => "likes.json", "bookmarks" => "bookmarks.json"})
            |> Jason.encode() do
+      send(caller_pid, {:progress, 1})
       File.write(Path.join(dir, "actor.json"), json)
     end
   end
@@ -180,47 +269,80 @@ defmodule Pleroma.User.Backup do
     )
   end
 
-  defp write(query, dir, name, fun) do
+  defp should_report?(num, chunk_size), do: rem(num, chunk_size) == 0
+
+  defp backup_tempdir(backup) do
+    name = String.trim_trailing(backup.file_name, ".zip")
+    dir(name)
+  end
+
+  defp cleanup(backup) do
+    dir = backup_tempdir(backup)
+    File.rm_rf(dir)
+  end
+
+  defp write(query, dir, name, fun, caller_pid) do
     path = Path.join(dir, "#{name}.json")
 
+    chunk_size = Pleroma.Config.get([__MODULE__, :process_chunk_size])
+
     with {:ok, file} <- File.open(path, [:write, :utf8]),
          :ok <- write_header(file, name) do
       total =
         query
-        |> Pleroma.Repo.chunk_stream(100)
+        |> Pleroma.Repo.chunk_stream(chunk_size, _returns_as = :one, timeout: :infinity)
         |> Enum.reduce(0, fn i, acc ->
-          with {:ok, data} <- fun.(i),
+          with {:ok, data} <-
+                 (try do
+                    fun.(i)
+                  rescue
+                    e -> {:error, e}
+                  end),
                {:ok, str} <- Jason.encode(data),
                :ok <- IO.write(file, str <> ",\n") do
+            if should_report?(acc + 1, chunk_size) do
+              send(caller_pid, {:progress, chunk_size})
+            end
+
             acc + 1
           else
-            _ -> acc
+            {:error, e} ->
+              Logger.warning(
+                "Error processing backup item: #{inspect(e)}\n The item is: #{inspect(i)}"
+              )
+
+              acc
+
+            _ ->
+              acc
           end
         end)
 
+      send(caller_pid, {:progress, rem(total, chunk_size)})
+
       with :ok <- :file.pwrite(file, {:eof, -2}, "\n],\n  \"totalItems\": #{total}}") do
         File.close(file)
       end
     end
   end
 
-  defp bookmarks(dir, %{id: user_id} = _user) do
+  defp bookmarks(dir, %{id: user_id} = _user, caller_pid) do
     Bookmark
     |> where(user_id: ^user_id)
     |> join(:inner, [b], activity in assoc(b, :activity))
     |> select([b, a], %{id: b.id, object: fragment("(?)->>'object'", a.data)})
-    |> write(dir, "bookmarks", fn a -> {:ok, a.object} end)
+    |> write(dir, "bookmarks", fn a -> {:ok, a.object} end, caller_pid)
   end
 
-  defp likes(dir, user) do
+  defp likes(dir, user, caller_pid) do
     user.ap_id
     |> Activity.Queries.by_actor()
     |> Activity.Queries.by_type("Like")
     |> select([like], %{id: like.id, object: fragment("(?)->>'object'", like.data)})
-    |> write(dir, "likes", fn a -> {:ok, a.object} end)
+    |> write(dir, "likes", fn a -> {:ok, a.object} end, caller_pid)
   end
 
-  defp statuses(dir, user) do
+  defp statuses(dir, user, caller_pid) do
     opts =
       %{}
       |> Map.put(:type, ["Create", "Announce"])
@@ -233,10 +355,59 @@ defmodule Pleroma.User.Backup do
     ]
     |> Enum.concat()
     |> ActivityPub.fetch_activities_query(opts)
-    |> write(dir, "outbox", fn a ->
-      with {:ok, activity} <- Transmogrifier.prepare_outgoing(a.data) do
-        {:ok, Map.delete(activity, "@context")}
-      end
-    end)
+    |> write(
+      dir,
+      "outbox",
+      fn a ->
+        with {:ok, activity} <- Transmogrifier.prepare_outgoing(a.data) do
+          {:ok, Map.delete(activity, "@context")}
+        end
+      end,
+      caller_pid
+    )
+  end
+
+  defp followers(dir, user, caller_pid) do
+    User.get_followers_query(user)
+    |> write(dir, "followers", fn a -> {:ok, a.ap_id} end, caller_pid)
+  end
+
+  defp following(dir, user, caller_pid) do
+    User.get_friends_query(user)
+    |> write(dir, "following", fn a -> {:ok, a.ap_id} end, caller_pid)
+  end
+end
+
+defmodule Pleroma.User.Backup.ProcessorAPI do
+  @callback do_process(%Pleroma.User.Backup{}, pid()) ::
+              {:ok, %Pleroma.User.Backup{}} | {:error, any()}
+end
+
+defmodule Pleroma.User.Backup.Processor do
+  @behaviour Pleroma.User.Backup.ProcessorAPI
+
+  alias Pleroma.Repo
+  alias Pleroma.User.Backup
+
+  import Ecto.Changeset
+
+  @impl true
+  def do_process(backup, current_pid) do
+    with {:ok, zip_file} <- Backup.export(backup, current_pid),
+         {:ok, %{size: size}} <- File.stat(zip_file),
+         {:ok, _upload} <- Backup.upload(backup, zip_file) do
+      backup
+      |> cast(
+        %{
+          file_size: size,
+          processed: true,
+          state: :complete
+        },
+        [:file_size, :processed, :state]
+      )
+      |> Repo.update()
+    else
+      e -> {:error, e}
+    end
   end
 end