total rebase
[anni] / lib / pleroma / object.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Object do
6   use Ecto.Schema
7
8   import Ecto.Query
9   import Ecto.Changeset
10
11   alias Pleroma.Activity
12   alias Pleroma.Config
13   alias Pleroma.Hashtag
14   alias Pleroma.Object
15   alias Pleroma.Object.Fetcher
16   alias Pleroma.ObjectTombstone
17   alias Pleroma.Repo
18   alias Pleroma.User
19   alias Pleroma.Workers.AttachmentsCleanupWorker
20
21   require Logger
22
23   @type t() :: %__MODULE__{}
24
25   @derive {Jason.Encoder, only: [:data]}
26
27   @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
28
29   schema "objects" do
30     field(:data, :map)
31
32     many_to_many(:hashtags, Hashtag, join_through: "hashtags_objects", on_replace: :delete)
33
34     timestamps()
35   end
36
37   def with_joined_activity(query, activity_type \\ "Create", join_type \\ :inner) do
38     object_position = Map.get(query.aliases, :object, 0)
39
40     join(query, join_type, [{object, object_position}], a in Activity,
41       on:
42         fragment(
43           "associated_object_id(?) = (? ->> 'id') AND (?->>'type' = ?) ",
44           a.data,
45           object.data,
46           a.data,
47           ^activity_type
48         ),
49       as: :object_activity
50     )
51   end
52
53   def create(data) do
54     %Object{}
55     |> Object.change(%{data: data})
56     |> Repo.insert()
57   end
58
59   def change(struct, params \\ %{}) do
60     struct
61     |> cast(params, [:data])
62     |> validate_required([:data])
63     |> unique_constraint(:ap_id, name: :objects_unique_apid_index)
64     # Expecting `maybe_handle_hashtags_change/1` to run last:
65     |> maybe_handle_hashtags_change(struct)
66   end
67
68   # Note: not checking activity type (assuming non-legacy objects are associated with Create act.)
69   defp maybe_handle_hashtags_change(changeset, struct) do
70     with %Ecto.Changeset{valid?: true} <- changeset,
71          data_hashtags_change = get_change(changeset, :data),
72          {_, true} <- {:changed, hashtags_changed?(struct, data_hashtags_change)},
73          {:ok, hashtag_records} <-
74            data_hashtags_change
75            |> object_data_hashtags()
76            |> Hashtag.get_or_create_by_names() do
77       put_assoc(changeset, :hashtags, hashtag_records)
78     else
79       %{valid?: false} ->
80         changeset
81
82       {:changed, false} ->
83         changeset
84
85       {:error, _} ->
86         validate_change(changeset, :data, fn _, _ ->
87           [data: "error referencing hashtags"]
88         end)
89     end
90   end
91
92   defp hashtags_changed?(%Object{} = struct, %{"tag" => _} = data) do
93     Enum.sort(embedded_hashtags(struct)) !=
94       Enum.sort(object_data_hashtags(data))
95   end
96
97   defp hashtags_changed?(_, _), do: false
98
99   def get_by_id(nil), do: nil
100   def get_by_id(id), do: Repo.get(Object, id)
101
102   def get_by_id_and_maybe_refetch(id, opts \\ []) do
103     %{updated_at: updated_at} = object = get_by_id(id)
104
105     if opts[:interval] &&
106          NaiveDateTime.diff(NaiveDateTime.utc_now(), updated_at) > opts[:interval] do
107       case Fetcher.refetch_object(object) do
108         {:ok, %Object{} = object} ->
109           object
110
111         e ->
112           Logger.error("Couldn't refresh #{object.data["id"]}:\n#{inspect(e)}")
113           object
114       end
115     else
116       object
117     end
118   end
119
120   def get_by_ap_id(nil), do: nil
121
122   def get_by_ap_id(ap_id) do
123     Repo.one(from(object in Object, where: fragment("(?)->>'id' = ?", object.data, ^ap_id)))
124   end
125
126   @doc """
127   Get a single attachment by it's name and href
128   """
129   @spec get_attachment_by_name_and_href(String.t(), String.t()) :: Object.t() | nil
130   def get_attachment_by_name_and_href(name, href) do
131     query =
132       from(o in Object,
133         where: fragment("(?)->>'name' = ?", o.data, ^name),
134         where: fragment("(?)->>'href' = ?", o.data, ^href)
135       )
136
137     Repo.one(query)
138   end
139
140   defp warn_on_no_object_preloaded(ap_id) do
141     "Object.normalize() called without preloaded object (#{inspect(ap_id)}). Consider preloading the object"
142     |> Logger.debug()
143
144     Logger.debug("Backtrace: #{inspect(Process.info(:erlang.self(), :current_stacktrace))}")
145   end
146
147   def normalize(_, options \\ [fetch: false, id_only: false])
148
149   # If we pass an Activity to Object.normalize(), we can try to use the preloaded object.
150   # Use this whenever possible, especially when walking graphs in an O(N) loop!
151   def normalize(%Object{} = object, _), do: object
152   def normalize(%Activity{object: %Object{} = object}, _), do: object
153
154   # A hack for fake activities
155   def normalize(%Activity{data: %{"object" => %{"fake" => true} = data}}, _) do
156     %Object{id: "pleroma:fake_object_id", data: data}
157   end
158
159   # No preloaded object
160   def normalize(%Activity{data: %{"object" => %{"id" => ap_id}}}, options) do
161     warn_on_no_object_preloaded(ap_id)
162     normalize(ap_id, options)
163   end
164
165   # No preloaded object
166   def normalize(%Activity{data: %{"object" => ap_id}}, options) do
167     warn_on_no_object_preloaded(ap_id)
168     normalize(ap_id, options)
169   end
170
171   # Old way, try fetching the object through cache.
172   def normalize(%{"id" => ap_id}, options), do: normalize(ap_id, options)
173
174   def normalize(ap_id, options) when is_binary(ap_id) do
175     cond do
176       Keyword.get(options, :id_only) ->
177         ap_id
178
179       Keyword.get(options, :fetch) ->
180         case Fetcher.fetch_object_from_id(ap_id, options) do
181           {:ok, object} -> object
182           _ -> nil
183         end
184
185       true ->
186         get_cached_by_ap_id(ap_id)
187     end
188   end
189
190   def normalize(_, _), do: nil
191
192   # Owned objects can only be accessed by their owner
193   def authorize_access(%Object{data: %{"actor" => actor}}, %User{ap_id: ap_id}) do
194     if actor == ap_id do
195       :ok
196     else
197       {:error, :forbidden}
198     end
199   end
200
201   # Legacy objects can be accessed by anybody
202   def authorize_access(%Object{}, %User{}), do: :ok
203
204   @spec get_cached_by_ap_id(String.t()) :: Object.t() | nil
205   def get_cached_by_ap_id(ap_id) do
206     key = "object:#{ap_id}"
207
208     with {:ok, nil} <- @cachex.get(:object_cache, key),
209          object when not is_nil(object) <- get_by_ap_id(ap_id),
210          {:ok, true} <- @cachex.put(:object_cache, key, object) do
211       object
212     else
213       {:ok, object} -> object
214       nil -> nil
215     end
216   end
217
218   def make_tombstone(%Object{data: %{"id" => id, "type" => type}}, deleted \\ DateTime.utc_now()) do
219     %ObjectTombstone{
220       id: id,
221       formerType: type,
222       deleted: deleted
223     }
224     |> Map.from_struct()
225   end
226
227   def swap_object_with_tombstone(object) do
228     tombstone = make_tombstone(object)
229
230     with {:ok, object} <-
231            object
232            |> Object.change(%{data: tombstone})
233            |> Repo.update() do
234       Hashtag.unlink(object)
235       {:ok, object}
236     end
237   end
238
239   def delete(%Object{data: %{"id" => id}} = object) do
240     with {:ok, _obj} = swap_object_with_tombstone(object),
241          deleted_activity = Activity.delete_all_by_object_ap_id(id),
242          {:ok, _} <- invalid_object_cache(object) do
243       cleanup_attachments(
244         Config.get([:instance, :cleanup_attachments]),
245         object
246       )
247
248       {:ok, object, deleted_activity}
249     end
250   end
251
252   @spec cleanup_attachments(boolean(), Object.t()) ::
253           {:ok, Oban.Job.t() | nil}
254   def cleanup_attachments(true, %Object{} = object) do
255     AttachmentsCleanupWorker.enqueue("cleanup_attachments", %{"object" => object})
256   end
257
258   def cleanup_attachments(_, _), do: {:ok, nil}
259
260   def prune(%Object{data: %{"id" => _id}} = object) do
261     with {:ok, object} <- Repo.delete(object),
262          {:ok, _} <- invalid_object_cache(object) do
263       {:ok, object}
264     end
265   end
266
267   def invalid_object_cache(%Object{data: %{"id" => id}}) do
268     with {:ok, true} <- @cachex.del(:object_cache, "object:#{id}") do
269       @cachex.del(:web_resp_cache, URI.parse(id).path)
270     end
271   end
272
273   def set_cache(%Object{data: %{"id" => ap_id}} = object) do
274     @cachex.put(:object_cache, "object:#{ap_id}", object)
275     {:ok, object}
276   end
277
278   def update_and_set_cache(changeset) do
279     with {:ok, object} <- Repo.update(changeset) do
280       set_cache(object)
281     end
282   end
283
284   def increase_replies_count(ap_id) do
285     Object
286     |> where([o], fragment("?->>'id' = ?::text", o.data, ^to_string(ap_id)))
287     |> update([o],
288       set: [
289         data:
290           fragment(
291             """
292             safe_jsonb_set(?, '{repliesCount}',
293               (coalesce((?->>'repliesCount')::int, 0) + 1)::varchar::jsonb, true)
294             """,
295             o.data,
296             o.data
297           )
298       ]
299     )
300     |> Repo.update_all([])
301     |> case do
302       {1, [object]} -> set_cache(object)
303       _ -> {:error, "Not found"}
304     end
305   end
306
307   defp poll_is_multiple?(%Object{data: %{"anyOf" => [_ | _]}}), do: true
308
309   defp poll_is_multiple?(_), do: false
310
311   def decrease_replies_count(ap_id) do
312     Object
313     |> where([o], fragment("?->>'id' = ?::text", o.data, ^to_string(ap_id)))
314     |> update([o],
315       set: [
316         data:
317           fragment(
318             """
319             safe_jsonb_set(?, '{repliesCount}',
320               (greatest(0, (?->>'repliesCount')::int - 1))::varchar::jsonb, true)
321             """,
322             o.data,
323             o.data
324           )
325       ]
326     )
327     |> Repo.update_all([])
328     |> case do
329       {1, [object]} -> set_cache(object)
330       _ -> {:error, "Not found"}
331     end
332   end
333
334   def increase_quotes_count(ap_id) do
335     Object
336     |> where([o], fragment("?->>'id' = ?::text", o.data, ^to_string(ap_id)))
337     |> update([o],
338       set: [
339         data:
340           fragment(
341             """
342             safe_jsonb_set(?, '{quotesCount}',
343               (coalesce((?->>'quotesCount')::int, 0) + 1)::varchar::jsonb, true)
344             """,
345             o.data,
346             o.data
347           )
348       ]
349     )
350     |> Repo.update_all([])
351     |> case do
352       {1, [object]} -> set_cache(object)
353       _ -> {:error, "Not found"}
354     end
355   end
356
357   def decrease_quotes_count(ap_id) do
358     Object
359     |> where([o], fragment("?->>'id' = ?::text", o.data, ^to_string(ap_id)))
360     |> update([o],
361       set: [
362         data:
363           fragment(
364             """
365             safe_jsonb_set(?, '{quotesCount}',
366               (greatest(0, (?->>'quotesCount')::int - 1))::varchar::jsonb, true)
367             """,
368             o.data,
369             o.data
370           )
371       ]
372     )
373     |> Repo.update_all([])
374     |> case do
375       {1, [object]} -> set_cache(object)
376       _ -> {:error, "Not found"}
377     end
378   end
379
380   def increase_vote_count(ap_id, name, actor) do
381     with %Object{} = object <- Object.normalize(ap_id, fetch: false),
382          "Question" <- object.data["type"] do
383       key = if poll_is_multiple?(object), do: "anyOf", else: "oneOf"
384
385       options =
386         object.data[key]
387         |> Enum.map(fn
388           %{"name" => ^name} = option ->
389             Kernel.update_in(option["replies"]["totalItems"], &(&1 + 1))
390
391           option ->
392             option
393         end)
394
395       voters = [actor | object.data["voters"] || []] |> Enum.uniq()
396
397       data =
398         object.data
399         |> Map.put(key, options)
400         |> Map.put("voters", voters)
401
402       object
403       |> Object.change(%{data: data})
404       |> update_and_set_cache()
405     else
406       _ -> :noop
407     end
408   end
409
410   @doc "Updates data field of an object"
411   def update_data(%Object{data: data} = object, attrs \\ %{}) do
412     object
413     |> Object.change(%{data: Map.merge(data || %{}, attrs)})
414     |> Repo.update()
415   end
416
417   def local?(%Object{data: %{"id" => id}}) do
418     String.starts_with?(id, Pleroma.Web.Endpoint.url() <> "/")
419   end
420
421   def replies(object, opts \\ []) do
422     object = Object.normalize(object, fetch: false)
423
424     query =
425       Object
426       |> where(
427         [o],
428         fragment("(?)->>'inReplyTo' = ?", o.data, ^object.data["id"])
429       )
430       |> order_by([o], asc: o.id)
431
432     if opts[:self_only] do
433       actor = object.data["actor"]
434       where(query, [o], fragment("(?)->>'actor' = ?", o.data, ^actor))
435     else
436       query
437     end
438   end
439
440   def self_replies(object, opts \\ []),
441     do: replies(object, Keyword.put(opts, :self_only, true))
442
443   def tags(%Object{data: %{"tag" => tags}}) when is_list(tags), do: tags
444
445   def tags(_), do: []
446
447   def hashtags(%Object{} = object) do
448     # Note: always using embedded hashtags regardless whether they are migrated to hashtags table
449     #   (embedded hashtags stay in sync anyways, and we avoid extra joins and preload hassle)
450     embedded_hashtags(object)
451   end
452
453   def embedded_hashtags(%Object{data: data}) do
454     object_data_hashtags(data)
455   end
456
457   def embedded_hashtags(_), do: []
458
459   def object_data_hashtags(%{"tag" => tags}) when is_list(tags) do
460     tags
461     |> Enum.filter(fn
462       %{"type" => "Hashtag"} = data -> Map.has_key?(data, "name")
463       plain_text when is_bitstring(plain_text) -> true
464       _ -> false
465     end)
466     |> Enum.map(fn
467       %{"name" => "#" <> hashtag} -> String.downcase(hashtag)
468       %{"name" => hashtag} -> String.downcase(hashtag)
469       hashtag when is_bitstring(hashtag) -> String.downcase(hashtag)
470     end)
471     |> Enum.uniq()
472     # Note: "" elements (plain text) might occur in `data.tag` for incoming objects
473     |> Enum.filter(&(&1 not in [nil, ""]))
474   end
475
476   def object_data_hashtags(_), do: []
477
478   def get_emoji_reactions(object) do
479     reactions = object.data["reactions"]
480
481     if is_list(reactions) or is_map(reactions) do
482       reactions
483       |> Enum.map(fn
484         [_emoji, users, _maybe_url] = item when is_list(users) ->
485           item
486
487         [emoji, users] when is_list(users) ->
488           [emoji, users, nil]
489
490         # This case is here to process the Map situation, which will happen
491         # only with the legacy two-value format.
492         {emoji, users} when is_list(users) ->
493           [emoji, users, nil]
494
495         _ ->
496           nil
497       end)
498       |> Enum.reject(&is_nil/1)
499     else
500       []
501     end
502   end
503 end