First
[anni] / lib / pleroma / object.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Object do
6   use Ecto.Schema
7
8   import Ecto.Query
9   import Ecto.Changeset
10
11   alias Pleroma.Activity
12   alias Pleroma.Config
13   alias Pleroma.Hashtag
14   alias Pleroma.Object
15   alias Pleroma.Object.Fetcher
16   alias Pleroma.ObjectTombstone
17   alias Pleroma.Repo
18   alias Pleroma.User
19   alias Pleroma.Workers.AttachmentsCleanupWorker
20
21   require Logger
22
23   @type t() :: %__MODULE__{}
24
25   @derive {Jason.Encoder, only: [:data]}
26
27   @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
28
29   schema "objects" do
30     field(:data, :map)
31
32     many_to_many(:hashtags, Hashtag, join_through: "hashtags_objects", on_replace: :delete)
33
34     timestamps()
35   end
36
37   def with_joined_activity(query, activity_type \\ "Create", join_type \\ :inner) do
38     object_position = Map.get(query.aliases, :object, 0)
39
40     join(query, join_type, [{object, object_position}], a in Activity,
41       on:
42         fragment(
43           "associated_object_id(?) = (? ->> 'id') AND (?->>'type' = ?) ",
44           a.data,
45           object.data,
46           a.data,
47           ^activity_type
48         ),
49       as: :object_activity
50     )
51   end
52
53   def create(data) do
54     %Object{}
55     |> Object.change(%{data: data})
56     |> Repo.insert()
57   end
58
59   def change(struct, params \\ %{}) do
60     struct
61     |> cast(params, [:data])
62     |> validate_required([:data])
63     |> unique_constraint(:ap_id, name: :objects_unique_apid_index)
64     # Expecting `maybe_handle_hashtags_change/1` to run last:
65     |> maybe_handle_hashtags_change(struct)
66   end
67
68   # Note: not checking activity type (assuming non-legacy objects are associated with Create act.)
69   defp maybe_handle_hashtags_change(changeset, struct) do
70     with %Ecto.Changeset{valid?: true} <- changeset,
71          data_hashtags_change = get_change(changeset, :data),
72          {_, true} <- {:changed, hashtags_changed?(struct, data_hashtags_change)},
73          {:ok, hashtag_records} <-
74            data_hashtags_change
75            |> object_data_hashtags()
76            |> Hashtag.get_or_create_by_names() do
77       put_assoc(changeset, :hashtags, hashtag_records)
78     else
79       %{valid?: false} ->
80         changeset
81
82       {:changed, false} ->
83         changeset
84
85       {:error, _} ->
86         validate_change(changeset, :data, fn _, _ ->
87           [data: "error referencing hashtags"]
88         end)
89     end
90   end
91
92   defp hashtags_changed?(%Object{} = struct, %{"tag" => _} = data) do
93     Enum.sort(embedded_hashtags(struct)) !=
94       Enum.sort(object_data_hashtags(data))
95   end
96
97   defp hashtags_changed?(_, _), do: false
98
99   def get_by_id(nil), do: nil
100   def get_by_id(id), do: Repo.get(Object, id)
101
102   def get_by_id_and_maybe_refetch(id, opts \\ []) do
103     %{updated_at: updated_at} = object = get_by_id(id)
104
105     if opts[:interval] &&
106          NaiveDateTime.diff(NaiveDateTime.utc_now(), updated_at) > opts[:interval] do
107       case Fetcher.refetch_object(object) do
108         {:ok, %Object{} = object} ->
109           object
110
111         e ->
112           Logger.error("Couldn't refresh #{object.data["id"]}:\n#{inspect(e)}")
113           object
114       end
115     else
116       object
117     end
118   end
119
120   def get_by_ap_id(nil), do: nil
121
122   def get_by_ap_id(ap_id) do
123     Repo.one(from(object in Object, where: fragment("(?)->>'id' = ?", object.data, ^ap_id)))
124   end
125
126   @doc """
127   Get a single attachment by it's name and href
128   """
129   @spec get_attachment_by_name_and_href(String.t(), String.t()) :: Object.t() | nil
130   def get_attachment_by_name_and_href(name, href) do
131     query =
132       from(o in Object,
133         where: fragment("(?)->>'name' = ?", o.data, ^name),
134         where: fragment("(?)->>'href' = ?", o.data, ^href)
135       )
136
137     Repo.one(query)
138   end
139
140   defp warn_on_no_object_preloaded(ap_id) do
141     "Object.normalize() called without preloaded object (#{inspect(ap_id)}). Consider preloading the object"
142     |> Logger.debug()
143
144     Logger.debug("Backtrace: #{inspect(Process.info(:erlang.self(), :current_stacktrace))}")
145   end
146
147   def normalize(_, options \\ [fetch: false, id_only: false])
148
149   # If we pass an Activity to Object.normalize(), we can try to use the preloaded object.
150   # Use this whenever possible, especially when walking graphs in an O(N) loop!
151   def normalize(%Object{} = object, _), do: object
152   def normalize(%Activity{object: %Object{} = object}, _), do: object
153
154   # A hack for fake activities
155   def normalize(%Activity{data: %{"object" => %{"fake" => true} = data}}, _) do
156     %Object{id: "pleroma:fake_object_id", data: data}
157   end
158
159   # No preloaded object
160   def normalize(%Activity{data: %{"object" => %{"id" => ap_id}}}, options) do
161     warn_on_no_object_preloaded(ap_id)
162     normalize(ap_id, options)
163   end
164
165   # No preloaded object
166   def normalize(%Activity{data: %{"object" => ap_id}}, options) do
167     warn_on_no_object_preloaded(ap_id)
168     normalize(ap_id, options)
169   end
170
171   # Old way, try fetching the object through cache.
172   def normalize(%{"id" => ap_id}, options), do: normalize(ap_id, options)
173
174   def normalize(ap_id, options) when is_binary(ap_id) do
175     cond do
176       Keyword.get(options, :id_only) ->
177         ap_id
178
179       Keyword.get(options, :fetch) ->
180         Fetcher.fetch_object_from_id!(ap_id, options)
181
182       true ->
183         get_cached_by_ap_id(ap_id)
184     end
185   end
186
187   def normalize(_, _), do: nil
188
189   # Owned objects can only be accessed by their owner
190   def authorize_access(%Object{data: %{"actor" => actor}}, %User{ap_id: ap_id}) do
191     if actor == ap_id do
192       :ok
193     else
194       {:error, :forbidden}
195     end
196   end
197
198   # Legacy objects can be accessed by anybody
199   def authorize_access(%Object{}, %User{}), do: :ok
200
201   @spec get_cached_by_ap_id(String.t()) :: Object.t() | nil
202   def get_cached_by_ap_id(ap_id) do
203     key = "object:#{ap_id}"
204
205     with {:ok, nil} <- @cachex.get(:object_cache, key),
206          object when not is_nil(object) <- get_by_ap_id(ap_id),
207          {:ok, true} <- @cachex.put(:object_cache, key, object) do
208       object
209     else
210       {:ok, object} -> object
211       nil -> nil
212     end
213   end
214
215   def make_tombstone(%Object{data: %{"id" => id, "type" => type}}, deleted \\ DateTime.utc_now()) do
216     %ObjectTombstone{
217       id: id,
218       formerType: type,
219       deleted: deleted
220     }
221     |> Map.from_struct()
222   end
223
224   def swap_object_with_tombstone(object) do
225     tombstone = make_tombstone(object)
226
227     with {:ok, object} <-
228            object
229            |> Object.change(%{data: tombstone})
230            |> Repo.update() do
231       Hashtag.unlink(object)
232       {:ok, object}
233     end
234   end
235
236   def delete(%Object{data: %{"id" => id}} = object) do
237     with {:ok, _obj} = swap_object_with_tombstone(object),
238          deleted_activity = Activity.delete_all_by_object_ap_id(id),
239          {:ok, _} <- invalid_object_cache(object) do
240       cleanup_attachments(
241         Config.get([:instance, :cleanup_attachments]),
242         %{"object" => object}
243       )
244
245       {:ok, object, deleted_activity}
246     end
247   end
248
249   @spec cleanup_attachments(boolean(), %{required(:object) => map()}) ::
250           {:ok, Oban.Job.t() | nil}
251   def cleanup_attachments(true, %{"object" => _} = params) do
252     AttachmentsCleanupWorker.enqueue("cleanup_attachments", params)
253   end
254
255   def cleanup_attachments(_, _), do: {:ok, nil}
256
257   def prune(%Object{data: %{"id" => _id}} = object) do
258     with {:ok, object} <- Repo.delete(object),
259          {:ok, _} <- invalid_object_cache(object) do
260       {:ok, object}
261     end
262   end
263
264   def invalid_object_cache(%Object{data: %{"id" => id}}) do
265     with {:ok, true} <- @cachex.del(:object_cache, "object:#{id}") do
266       @cachex.del(:web_resp_cache, URI.parse(id).path)
267     end
268   end
269
270   def set_cache(%Object{data: %{"id" => ap_id}} = object) do
271     @cachex.put(:object_cache, "object:#{ap_id}", object)
272     {:ok, object}
273   end
274
275   def update_and_set_cache(changeset) do
276     with {:ok, object} <- Repo.update(changeset) do
277       set_cache(object)
278     end
279   end
280
281   def increase_replies_count(ap_id) do
282     Object
283     |> where([o], fragment("?->>'id' = ?::text", o.data, ^to_string(ap_id)))
284     |> update([o],
285       set: [
286         data:
287           fragment(
288             """
289             safe_jsonb_set(?, '{repliesCount}',
290               (coalesce((?->>'repliesCount')::int, 0) + 1)::varchar::jsonb, true)
291             """,
292             o.data,
293             o.data
294           )
295       ]
296     )
297     |> Repo.update_all([])
298     |> case do
299       {1, [object]} -> set_cache(object)
300       _ -> {:error, "Not found"}
301     end
302   end
303
304   defp poll_is_multiple?(%Object{data: %{"anyOf" => [_ | _]}}), do: true
305
306   defp poll_is_multiple?(_), do: false
307
308   def decrease_replies_count(ap_id) do
309     Object
310     |> where([o], fragment("?->>'id' = ?::text", o.data, ^to_string(ap_id)))
311     |> update([o],
312       set: [
313         data:
314           fragment(
315             """
316             safe_jsonb_set(?, '{repliesCount}',
317               (greatest(0, (?->>'repliesCount')::int - 1))::varchar::jsonb, true)
318             """,
319             o.data,
320             o.data
321           )
322       ]
323     )
324     |> Repo.update_all([])
325     |> case do
326       {1, [object]} -> set_cache(object)
327       _ -> {:error, "Not found"}
328     end
329   end
330
331   def increase_vote_count(ap_id, name, actor) do
332     with %Object{} = object <- Object.normalize(ap_id, fetch: false),
333          "Question" <- object.data["type"] do
334       key = if poll_is_multiple?(object), do: "anyOf", else: "oneOf"
335
336       options =
337         object.data[key]
338         |> Enum.map(fn
339           %{"name" => ^name} = option ->
340             Kernel.update_in(option["replies"]["totalItems"], &(&1 + 1))
341
342           option ->
343             option
344         end)
345
346       voters = [actor | object.data["voters"] || []] |> Enum.uniq()
347
348       data =
349         object.data
350         |> Map.put(key, options)
351         |> Map.put("voters", voters)
352
353       object
354       |> Object.change(%{data: data})
355       |> update_and_set_cache()
356     else
357       _ -> :noop
358     end
359   end
360
361   @doc "Updates data field of an object"
362   def update_data(%Object{data: data} = object, attrs \\ %{}) do
363     object
364     |> Object.change(%{data: Map.merge(data || %{}, attrs)})
365     |> Repo.update()
366   end
367
368   def local?(%Object{data: %{"id" => id}}) do
369     String.starts_with?(id, Pleroma.Web.Endpoint.url() <> "/")
370   end
371
372   def replies(object, opts \\ []) do
373     object = Object.normalize(object, fetch: false)
374
375     query =
376       Object
377       |> where(
378         [o],
379         fragment("(?)->>'inReplyTo' = ?", o.data, ^object.data["id"])
380       )
381       |> order_by([o], asc: o.id)
382
383     if opts[:self_only] do
384       actor = object.data["actor"]
385       where(query, [o], fragment("(?)->>'actor' = ?", o.data, ^actor))
386     else
387       query
388     end
389   end
390
391   def self_replies(object, opts \\ []),
392     do: replies(object, Keyword.put(opts, :self_only, true))
393
394   def tags(%Object{data: %{"tag" => tags}}) when is_list(tags), do: tags
395
396   def tags(_), do: []
397
398   def hashtags(%Object{} = object) do
399     # Note: always using embedded hashtags regardless whether they are migrated to hashtags table
400     #   (embedded hashtags stay in sync anyways, and we avoid extra joins and preload hassle)
401     embedded_hashtags(object)
402   end
403
404   def embedded_hashtags(%Object{data: data}) do
405     object_data_hashtags(data)
406   end
407
408   def embedded_hashtags(_), do: []
409
410   def object_data_hashtags(%{"tag" => tags}) when is_list(tags) do
411     tags
412     |> Enum.filter(fn
413       %{"type" => "Hashtag"} = data -> Map.has_key?(data, "name")
414       plain_text when is_bitstring(plain_text) -> true
415       _ -> false
416     end)
417     |> Enum.map(fn
418       %{"name" => "#" <> hashtag} -> String.downcase(hashtag)
419       %{"name" => hashtag} -> String.downcase(hashtag)
420       hashtag when is_bitstring(hashtag) -> String.downcase(hashtag)
421     end)
422     |> Enum.uniq()
423     # Note: "" elements (plain text) might occur in `data.tag` for incoming objects
424     |> Enum.filter(&(&1 not in [nil, ""]))
425   end
426
427   def object_data_hashtags(_), do: []
428 end