total rebase
[anni] / lib / pleroma / web / activity_pub / side_effects.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
6   @moduledoc """
7   This module looks at an inserted object and executes the side effects that it
8   implies. For example, a `Like` activity will increase the like count on the
9   liked object, a `Follow` activity will add the user to the follower
10   collection, and so on.
11   """
12   alias Pleroma.Activity
13   alias Pleroma.Chat
14   alias Pleroma.Chat.MessageReference
15   alias Pleroma.FollowingRelationship
16   alias Pleroma.Notification
17   alias Pleroma.Object
18   alias Pleroma.Repo
19   alias Pleroma.User
20   alias Pleroma.Web.ActivityPub.ActivityPub
21   alias Pleroma.Web.ActivityPub.Builder
22   alias Pleroma.Web.ActivityPub.Pipeline
23   alias Pleroma.Web.ActivityPub.Utils
24   alias Pleroma.Web.Push
25   alias Pleroma.Web.Streamer
26   alias Pleroma.Workers.PollWorker
27
28   require Pleroma.Constants
29   require Logger
30
31   @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
32   @logger Pleroma.Config.get([:side_effects, :logger], Logger)
33
34   @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
35
36   defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
37
38   @impl true
39   def handle(object, meta \\ [])
40
41   # Task this handles
42   # - Follows
43   # - Sends a notification
44   @impl true
45   def handle(
46         %{
47           data: %{
48             "actor" => actor,
49             "type" => "Accept",
50             "object" => follow_activity_id
51           }
52         } = object,
53         meta
54       ) do
55     with %Activity{actor: follower_id} = follow_activity <-
56            Activity.get_by_ap_id(follow_activity_id),
57          %User{} = followed <- User.get_cached_by_ap_id(actor),
58          %User{} = follower <- User.get_cached_by_ap_id(follower_id),
59          {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
60          {:ok, _follower, followed} <-
61            FollowingRelationship.update(follower, followed, :follow_accept) do
62       Notification.update_notification_type(followed, follow_activity)
63     end
64
65     {:ok, object, meta}
66   end
67
68   # Task this handles
69   # - Rejects all existing follow activities for this person
70   # - Updates the follow state
71   # - Dismisses notification
72   @impl true
73   def handle(
74         %{
75           data: %{
76             "actor" => actor,
77             "type" => "Reject",
78             "object" => follow_activity_id
79           }
80         } = object,
81         meta
82       ) do
83     with %Activity{actor: follower_id} = follow_activity <-
84            Activity.get_by_ap_id(follow_activity_id),
85          %User{} = followed <- User.get_cached_by_ap_id(actor),
86          %User{} = follower <- User.get_cached_by_ap_id(follower_id),
87          {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
88       FollowingRelationship.update(follower, followed, :follow_reject)
89       Notification.dismiss(follow_activity)
90     end
91
92     {:ok, object, meta}
93   end
94
95   # Tasks this handle
96   # - Follows if possible
97   # - Sends a notification
98   # - Generates accept or reject if appropriate
99   @impl true
100   def handle(
101         %{
102           data: %{
103             "id" => follow_id,
104             "type" => "Follow",
105             "object" => followed_user,
106             "actor" => following_user
107           }
108         } = object,
109         meta
110       ) do
111     with %User{} = follower <- User.get_cached_by_ap_id(following_user),
112          %User{} = followed <- User.get_cached_by_ap_id(followed_user),
113          {_, {:ok, _, _}, _, _} <-
114            {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
115       if followed.local && !followed.is_locked do
116         {:ok, accept_data, _} = Builder.accept(followed, object)
117         {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
118       end
119     else
120       {:following, {:error, _}, _follower, followed} ->
121         {:ok, reject_data, _} = Builder.reject(followed, object)
122         {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
123
124       _ ->
125         nil
126     end
127
128     {:ok, notifications} = Notification.create_notifications(object, do_send: false)
129
130     meta =
131       meta
132       |> add_notifications(notifications)
133
134     updated_object = Activity.get_by_ap_id(follow_id)
135
136     {:ok, updated_object, meta}
137   end
138
139   # Tasks this handles:
140   # - Unfollow and block
141   @impl true
142   def handle(
143         %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
144           object,
145         meta
146       ) do
147     with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
148          %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
149       User.block(blocker, blocked)
150     end
151
152     {:ok, object, meta}
153   end
154
155   # Tasks this handles:
156   # - Update the user
157   # - Update a non-user object (Note, Question, etc.)
158   #
159   # For a local user, we also get a changeset with the full information, so we
160   # can update non-federating, non-activitypub settings as well.
161   @impl true
162   def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
163     updated_object_id = updated_object["id"]
164
165     with {_, true} <- {:has_id, is_binary(updated_object_id)},
166          %{"type" => type} <- updated_object,
167          {_, is_user} <- {:is_user, type in Pleroma.Constants.actor_types()} do
168       if is_user do
169         handle_update_user(object, meta)
170       else
171         handle_update_object(object, meta)
172       end
173     else
174       _ ->
175         {:ok, object, meta}
176     end
177   end
178
179   # Tasks this handles:
180   # - Add like to object
181   # - Set up notification
182   @impl true
183   def handle(%{data: %{"type" => "Like"}} = object, meta) do
184     liked_object = Object.get_by_ap_id(object.data["object"])
185     Utils.add_like_to_object(object, liked_object)
186
187     Notification.create_notifications(object)
188
189     {:ok, object, meta}
190   end
191
192   # Tasks this handles
193   # - Actually create object
194   # - Rollback if we couldn't create it
195   # - Increase the user note count
196   # - Increase the reply count
197   # - Increase replies count
198   # - Set up ActivityExpiration
199   # - Set up notifications
200   # - Index incoming posts for search (if needed)
201   @impl true
202   def handle(%{data: %{"type" => "Create"}} = activity, meta) do
203     with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
204          %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
205       {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
206       {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
207       {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
208
209       if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
210         Object.increase_replies_count(in_reply_to)
211       end
212
213       if quote_url = object.data["quoteUrl"] do
214         Object.increase_quotes_count(quote_url)
215       end
216
217       reply_depth = (meta[:depth] || 0) + 1
218
219       # FIXME: Force inReplyTo to replies
220       if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
221            object.data["replies"] != nil do
222         for reply_id <- object.data["replies"] do
223           Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
224             "id" => reply_id,
225             "depth" => reply_depth
226           })
227         end
228       end
229
230       Pleroma.Web.RichMedia.Card.get_by_activity(activity)
231
232       Pleroma.Search.add_to_index(Map.put(activity, :object, object))
233
234       Utils.maybe_handle_group_posts(activity)
235
236       meta =
237         meta
238         |> add_notifications(notifications)
239
240       ap_streamer().stream_out(activity)
241
242       {:ok, activity, meta}
243     else
244       e -> Repo.rollback(e)
245     end
246   end
247
248   # Tasks this handles:
249   # - Add announce to object
250   # - Set up notification
251   # - Stream out the announce
252   @impl true
253   def handle(%{data: %{"type" => "Announce"}} = object, meta) do
254     announced_object = Object.get_by_ap_id(object.data["object"])
255     user = User.get_cached_by_ap_id(object.data["actor"])
256
257     Utils.add_announce_to_object(object, announced_object)
258
259     if !User.internal?(user) do
260       Notification.create_notifications(object)
261
262       ap_streamer().stream_out(object)
263     end
264
265     {:ok, object, meta}
266   end
267
268   @impl true
269   def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
270     with undone_object <- Activity.get_by_ap_id(undone_object),
271          :ok <- handle_undoing(undone_object) do
272       {:ok, object, meta}
273     end
274   end
275
276   # Tasks this handles:
277   # - Add reaction to object
278   # - Set up notification
279   @impl true
280   def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
281     reacted_object = Object.get_by_ap_id(object.data["object"])
282     Utils.add_emoji_reaction_to_object(object, reacted_object)
283
284     Notification.create_notifications(object)
285
286     {:ok, object, meta}
287   end
288
289   # Tasks this handles:
290   # - Delete and unpins the create activity
291   # - Replace object with Tombstone
292   # - Reduce the user note count
293   # - Reduce the reply count
294   # - Stream out the activity
295   # - Removes posts from search index (if needed)
296   @impl true
297   def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
298     deleted_object =
299       Object.normalize(deleted_object, fetch: false) ||
300         User.get_cached_by_ap_id(deleted_object)
301
302     result =
303       case deleted_object do
304         %Object{} ->
305           with {_, {:ok, deleted_object, _activity}} <- {:object, Object.delete(deleted_object)},
306                {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
307                {_, %User{} = user} <- {:user, User.get_cached_by_ap_id(actor)} do
308             User.remove_pinned_object_id(user, deleted_object.data["id"])
309
310             {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
311
312             if in_reply_to = deleted_object.data["inReplyTo"] do
313               Object.decrease_replies_count(in_reply_to)
314             end
315
316             if quote_url = deleted_object.data["quoteUrl"] do
317               Object.decrease_quotes_count(quote_url)
318             end
319
320             MessageReference.delete_for_object(deleted_object)
321
322             ap_streamer().stream_out(object)
323             ap_streamer().stream_out_participations(deleted_object, user)
324             :ok
325           else
326             {:actor, _} ->
327               @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
328               :no_object_actor
329
330             {:user, _} ->
331               @logger.error(
332                 "The object's actor could not be resolved to a user: #{inspect(deleted_object)}"
333               )
334
335               :no_object_user
336
337             {:object, _} ->
338               @logger.error("The object could not be deleted: #{inspect(deleted_object)}")
339               {:error, object}
340           end
341
342         %User{} ->
343           with {:ok, _} <- User.delete(deleted_object) do
344             :ok
345           end
346       end
347
348     if result == :ok do
349       # Only remove from index when deleting actual objects, not users or anything else
350       with %Pleroma.Object{} <- deleted_object do
351         Pleroma.Search.remove_from_index(deleted_object)
352       end
353
354       {:ok, object, meta}
355     else
356       {:error, result}
357     end
358   end
359
360   # Tasks this handles:
361   # - adds pin to user
362   # - removes expiration job for pinned activity, if was set for expiration
363   @impl true
364   def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
365     with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
366          {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
367       # if pinned activity was scheduled for deletion, we remove job
368       if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
369         Oban.cancel_job(expiration.id)
370       end
371
372       {:ok, object, meta}
373     else
374       nil ->
375         {:error, :user_not_found}
376
377       {:error, changeset} ->
378         if changeset.errors[:pinned_objects] do
379           {:error, :pinned_statuses_limit_reached}
380         else
381           changeset.errors
382         end
383     end
384   end
385
386   # Tasks this handles:
387   # - removes pin from user
388   # - removes corresponding Add activity
389   # - if activity had expiration, recreates activity expiration job
390   @impl true
391   def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
392     with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
393          {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
394       data["object"]
395       |> Activity.add_by_params_query(user.ap_id, user.featured_address)
396       |> Repo.delete_all()
397
398       # if pinned activity was scheduled for deletion, we reschedule it for deletion
399       if meta[:expires_at] do
400         # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
401         {:ok, expires_at} =
402           Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
403
404         Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
405           activity_id: meta[:activity_id],
406           expires_at: expires_at
407         })
408       end
409
410       {:ok, object, meta}
411     else
412       nil -> {:error, :user_not_found}
413       error -> error
414     end
415   end
416
417   # Nothing to do
418   @impl true
419   def handle(object, meta) do
420     {:ok, object, meta}
421   end
422
423   defp handle_update_user(
424          %{data: %{"type" => "Update", "object" => updated_object}} = object,
425          meta
426        ) do
427     if changeset = Keyword.get(meta, :user_update_changeset) do
428       changeset
429       |> User.update_and_set_cache()
430     else
431       {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
432
433       User.get_by_ap_id(updated_object["id"])
434       |> User.remote_user_changeset(new_user_data)
435       |> User.update_and_set_cache()
436     end
437
438     {:ok, object, meta}
439   end
440
441   defp handle_update_object(
442          %{data: %{"type" => "Update", "object" => updated_object}} = object,
443          meta
444        ) do
445     orig_object_ap_id = updated_object["id"]
446     orig_object = Object.get_by_ap_id(orig_object_ap_id)
447     orig_object_data = orig_object.data
448
449     updated_object =
450       if meta[:local] do
451         # If this is a local Update, we don't process it by transmogrifier,
452         # so we use the embedded object as-is.
453         updated_object
454       else
455         meta[:object_data]
456       end
457
458     if orig_object_data["type"] in Pleroma.Constants.updatable_object_types() do
459       {:ok, _, updated} =
460         Object.Updater.do_update_and_invalidate_cache(orig_object, updated_object)
461
462       if updated do
463         object
464         |> Activity.normalize()
465         |> ActivityPub.notify_and_stream()
466       end
467     end
468
469     {:ok, object, meta}
470   end
471
472   def handle_object_creation(%{"type" => "ChatMessage"} = object, _activity, meta) do
473     with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
474       actor = User.get_cached_by_ap_id(object.data["actor"])
475       recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
476
477       streamables =
478         [[actor, recipient], [recipient, actor]]
479         |> Enum.uniq()
480         |> Enum.map(fn [user, other_user] ->
481           if user.local do
482             {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
483             {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
484
485             @cachex.put(
486               :chat_message_id_idempotency_key_cache,
487               cm_ref.id,
488               meta[:idempotency_key]
489             )
490
491             {
492               ["user", "user:pleroma_chat"],
493               {user, %{cm_ref | chat: chat, object: object}}
494             }
495           end
496         end)
497         |> Enum.filter(& &1)
498
499       meta =
500         meta
501         |> add_streamables(streamables)
502
503       {:ok, object, meta}
504     end
505   end
506
507   def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
508     with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
509       PollWorker.schedule_poll_end(activity)
510       {:ok, object, meta}
511     end
512   end
513
514   def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
515     with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
516       Object.increase_vote_count(
517         object.data["inReplyTo"],
518         object.data["name"],
519         object.data["actor"]
520       )
521
522       {:ok, object, meta}
523     end
524   end
525
526   def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
527       when objtype in ~w[Audio Video Image Event Article Note Page] do
528     with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
529       {:ok, object, meta}
530     end
531   end
532
533   # Nothing to do
534   def handle_object_creation(object, _activity, meta) do
535     {:ok, object, meta}
536   end
537
538   defp undo_like(nil, object), do: delete_object(object)
539
540   defp undo_like(%Object{} = liked_object, object) do
541     with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
542       delete_object(object)
543     end
544   end
545
546   def handle_undoing(%{data: %{"type" => "Like"}} = object) do
547     object.data["object"]
548     |> Object.get_by_ap_id()
549     |> undo_like(object)
550   end
551
552   def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
553     with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
554          {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
555          {:ok, _} <- Repo.delete(object) do
556       :ok
557     end
558   end
559
560   def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
561     with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
562          {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
563          {:ok, _} <- Repo.delete(object) do
564       :ok
565     end
566   end
567
568   def handle_undoing(
569         %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
570       ) do
571     with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
572          %User{} = blocked <- User.get_cached_by_ap_id(blocked),
573          {:ok, _} <- User.unblock(blocker, blocked),
574          {:ok, _} <- Repo.delete(object) do
575       :ok
576     end
577   end
578
579   def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
580
581   @spec delete_object(Activity.t()) :: :ok | {:error, Ecto.Changeset.t()}
582   defp delete_object(object) do
583     with {:ok, _} <- Repo.delete(object), do: :ok
584   end
585
586   defp send_notifications(meta) do
587     Keyword.get(meta, :notifications, [])
588     |> Enum.each(fn notification ->
589       Streamer.stream(["user", "user:notification"], notification)
590       Push.send(notification)
591     end)
592
593     meta
594   end
595
596   defp send_streamables(meta) do
597     Keyword.get(meta, :streamables, [])
598     |> Enum.each(fn {topics, items} ->
599       Streamer.stream(topics, items)
600     end)
601
602     meta
603   end
604
605   defp add_streamables(meta, streamables) do
606     existing = Keyword.get(meta, :streamables, [])
607
608     meta
609     |> Keyword.put(:streamables, streamables ++ existing)
610   end
611
612   defp add_notifications(meta, notifications) do
613     existing = Keyword.get(meta, :notifications, [])
614
615     meta
616     |> Keyword.put(:notifications, notifications ++ existing)
617   end
618
619   @impl true
620   def handle_after_transaction(meta) do
621     meta
622     |> send_notifications()
623     |> send_streamables()
624   end
625 end