total rebase
[anni] / lib / pleroma / workers / attachments_cleanup_worker.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Workers.AttachmentsCleanupWorker do
6   import Ecto.Query
7
8   alias Pleroma.Object
9   alias Pleroma.Repo
10
11   use Pleroma.Workers.WorkerHelper, queue: "attachments_cleanup"
12
13   @impl Oban.Worker
14   def perform(%Job{
15         args: %{
16           "op" => "cleanup_attachments",
17           "object" => %{"data" => %{"attachment" => [_ | _] = attachments, "actor" => actor}}
18         }
19       }) do
20     if Pleroma.Config.get([:instance, :cleanup_attachments], false) do
21       attachments
22       |> Enum.flat_map(fn item -> Enum.map(item["url"], & &1["href"]) end)
23       |> fetch_objects
24       |> prepare_objects(actor, Enum.map(attachments, & &1["name"]))
25       |> filter_objects
26       |> do_clean
27     end
28
29     {:ok, :success}
30   end
31
32   def perform(%Job{args: %{"op" => "cleanup_attachments", "object" => _object}}), do: {:ok, :skip}
33
34   @impl Oban.Worker
35   def timeout(_job), do: :timer.seconds(900)
36
37   defp do_clean({object_ids, attachment_urls}) do
38     uploader = Pleroma.Config.get([Pleroma.Upload, :uploader])
39
40     base_url =
41       String.trim_trailing(
42         Pleroma.Upload.base_url(),
43         "/"
44       )
45
46     Enum.each(attachment_urls, fn href ->
47       href
48       |> String.trim_leading("#{base_url}")
49       |> uploader.delete_file()
50     end)
51
52     delete_objects(object_ids)
53   end
54
55   defp delete_objects([_ | _] = object_ids) do
56     Repo.delete_all(from(o in Object, where: o.id in ^object_ids))
57   end
58
59   defp delete_objects(_), do: :ok
60
61   # we should delete 1 object for any given attachment, but don't delete
62   # files if there are more than 1 object for it
63   defp filter_objects(objects) do
64     Enum.reduce(objects, {[], []}, fn {href, %{id: id, count: count}}, {ids, hrefs} ->
65       with 1 <- count do
66         {ids ++ [id], hrefs ++ [href]}
67       else
68         _ -> {ids ++ [id], hrefs}
69       end
70     end)
71   end
72
73   defp prepare_objects(objects, actor, names) do
74     objects
75     |> Enum.reduce(%{}, fn %{
76                              id: id,
77                              data: %{
78                                "url" => [%{"href" => href}],
79                                "actor" => obj_actor,
80                                "name" => name
81                              }
82                            },
83                            acc ->
84       Map.update(acc, href, %{id: id, count: 1}, fn val ->
85         case obj_actor == actor and name in names do
86           true ->
87             # set id of the actor's object that will be deleted
88             %{val | id: id, count: val.count + 1}
89
90           false ->
91             # another actor's object, just increase count to not delete file
92             %{val | count: val.count + 1}
93         end
94       end)
95     end)
96   end
97
98   defp fetch_objects(hrefs) do
99     from(o in Object,
100       where:
101         fragment(
102           "to_jsonb(array(select jsonb_array_elements((?)#>'{url}') ->> 'href' where jsonb_typeof((?)#>'{url}') = 'array'))::jsonb \\?| (?)",
103           o.data,
104           o.data,
105           ^hrefs
106         )
107     )
108     # The query above can be time consumptive on large instances until we
109     # refactor how uploads are stored
110     |> Repo.all(timeout: :infinity)
111   end
112 end