First
[anni] / lib / pleroma / web / activity_pub / side_effects.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
6   @moduledoc """
7   This module looks at an inserted object and executes the side effects that it
8   implies. For example, a `Like` activity will increase the like count on the
9   liked object, a `Follow` activity will add the user to the follower
10   collection, and so on.
11   """
12   alias Pleroma.Activity
13   alias Pleroma.Chat
14   alias Pleroma.Chat.MessageReference
15   alias Pleroma.FollowingRelationship
16   alias Pleroma.Notification
17   alias Pleroma.Object
18   alias Pleroma.Repo
19   alias Pleroma.User
20   alias Pleroma.Web.ActivityPub.ActivityPub
21   alias Pleroma.Web.ActivityPub.Builder
22   alias Pleroma.Web.ActivityPub.Pipeline
23   alias Pleroma.Web.ActivityPub.Utils
24   alias Pleroma.Web.Push
25   alias Pleroma.Web.Streamer
26   alias Pleroma.Workers.PollWorker
27
28   require Pleroma.Constants
29   require Logger
30
31   @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
32   @logger Pleroma.Config.get([:side_effects, :logger], Logger)
33
34   @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
35
36   defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
37
38   @impl true
39   def handle(object, meta \\ [])
40
41   # Task this handles
42   # - Follows
43   # - Sends a notification
44   @impl true
45   def handle(
46         %{
47           data: %{
48             "actor" => actor,
49             "type" => "Accept",
50             "object" => follow_activity_id
51           }
52         } = object,
53         meta
54       ) do
55     with %Activity{actor: follower_id} = follow_activity <-
56            Activity.get_by_ap_id(follow_activity_id),
57          %User{} = followed <- User.get_cached_by_ap_id(actor),
58          %User{} = follower <- User.get_cached_by_ap_id(follower_id),
59          {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
60          {:ok, _follower, followed} <-
61            FollowingRelationship.update(follower, followed, :follow_accept) do
62       Notification.update_notification_type(followed, follow_activity)
63     end
64
65     {:ok, object, meta}
66   end
67
68   # Task this handles
69   # - Rejects all existing follow activities for this person
70   # - Updates the follow state
71   # - Dismisses notification
72   @impl true
73   def handle(
74         %{
75           data: %{
76             "actor" => actor,
77             "type" => "Reject",
78             "object" => follow_activity_id
79           }
80         } = object,
81         meta
82       ) do
83     with %Activity{actor: follower_id} = follow_activity <-
84            Activity.get_by_ap_id(follow_activity_id),
85          %User{} = followed <- User.get_cached_by_ap_id(actor),
86          %User{} = follower <- User.get_cached_by_ap_id(follower_id),
87          {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
88       FollowingRelationship.update(follower, followed, :follow_reject)
89       Notification.dismiss(follow_activity)
90     end
91
92     {:ok, object, meta}
93   end
94
95   # Tasks this handle
96   # - Follows if possible
97   # - Sends a notification
98   # - Generates accept or reject if appropriate
99   @impl true
100   def handle(
101         %{
102           data: %{
103             "id" => follow_id,
104             "type" => "Follow",
105             "object" => followed_user,
106             "actor" => following_user
107           }
108         } = object,
109         meta
110       ) do
111     with %User{} = follower <- User.get_cached_by_ap_id(following_user),
112          %User{} = followed <- User.get_cached_by_ap_id(followed_user),
113          {_, {:ok, _, _}, _, _} <-
114            {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
115       if followed.local && !followed.is_locked do
116         {:ok, accept_data, _} = Builder.accept(followed, object)
117         {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
118       end
119     else
120       {:following, {:error, _}, _follower, followed} ->
121         {:ok, reject_data, _} = Builder.reject(followed, object)
122         {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
123
124       _ ->
125         nil
126     end
127
128     {:ok, notifications} = Notification.create_notifications(object, do_send: false)
129
130     meta =
131       meta
132       |> add_notifications(notifications)
133
134     updated_object = Activity.get_by_ap_id(follow_id)
135
136     {:ok, updated_object, meta}
137   end
138
139   # Tasks this handles:
140   # - Unfollow and block
141   @impl true
142   def handle(
143         %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
144           object,
145         meta
146       ) do
147     with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
148          %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
149       User.block(blocker, blocked)
150     end
151
152     {:ok, object, meta}
153   end
154
155   # Tasks this handles:
156   # - Update the user
157   # - Update a non-user object (Note, Question, etc.)
158   #
159   # For a local user, we also get a changeset with the full information, so we
160   # can update non-federating, non-activitypub settings as well.
161   @impl true
162   def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
163     updated_object_id = updated_object["id"]
164
165     with {_, true} <- {:has_id, is_binary(updated_object_id)},
166          %{"type" => type} <- updated_object,
167          {_, is_user} <- {:is_user, type in Pleroma.Constants.actor_types()} do
168       if is_user do
169         handle_update_user(object, meta)
170       else
171         handle_update_object(object, meta)
172       end
173     else
174       _ ->
175         {:ok, object, meta}
176     end
177   end
178
179   # Tasks this handles:
180   # - Add like to object
181   # - Set up notification
182   @impl true
183   def handle(%{data: %{"type" => "Like"}} = object, meta) do
184     liked_object = Object.get_by_ap_id(object.data["object"])
185     Utils.add_like_to_object(object, liked_object)
186
187     Notification.create_notifications(object)
188
189     {:ok, object, meta}
190   end
191
192   # Tasks this handles
193   # - Actually create object
194   # - Rollback if we couldn't create it
195   # - Increase the user note count
196   # - Increase the reply count
197   # - Increase replies count
198   # - Set up ActivityExpiration
199   # - Set up notifications
200   @impl true
201   def handle(%{data: %{"type" => "Create"}} = activity, meta) do
202     with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
203          %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
204       {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
205       {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
206       {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
207
208       if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
209         Object.increase_replies_count(in_reply_to)
210       end
211
212       reply_depth = (meta[:depth] || 0) + 1
213
214       # FIXME: Force inReplyTo to replies
215       if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
216            object.data["replies"] != nil do
217         for reply_id <- object.data["replies"] do
218           Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
219             "id" => reply_id,
220             "depth" => reply_depth
221           })
222         end
223       end
224
225       ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
226         Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
227       end)
228
229       meta =
230         meta
231         |> add_notifications(notifications)
232
233       ap_streamer().stream_out(activity)
234
235       {:ok, activity, meta}
236     else
237       e -> Repo.rollback(e)
238     end
239   end
240
241   # Tasks this handles:
242   # - Add announce to object
243   # - Set up notification
244   # - Stream out the announce
245   @impl true
246   def handle(%{data: %{"type" => "Announce"}} = object, meta) do
247     announced_object = Object.get_by_ap_id(object.data["object"])
248     user = User.get_cached_by_ap_id(object.data["actor"])
249
250     Utils.add_announce_to_object(object, announced_object)
251
252     if !User.is_internal_user?(user) do
253       Notification.create_notifications(object)
254
255       ap_streamer().stream_out(object)
256     end
257
258     {:ok, object, meta}
259   end
260
261   @impl true
262   def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
263     with undone_object <- Activity.get_by_ap_id(undone_object),
264          :ok <- handle_undoing(undone_object) do
265       {:ok, object, meta}
266     end
267   end
268
269   # Tasks this handles:
270   # - Add reaction to object
271   # - Set up notification
272   @impl true
273   def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
274     reacted_object = Object.get_by_ap_id(object.data["object"])
275     Utils.add_emoji_reaction_to_object(object, reacted_object)
276
277     Notification.create_notifications(object)
278
279     {:ok, object, meta}
280   end
281
282   # Tasks this handles:
283   # - Delete and unpins the create activity
284   # - Replace object with Tombstone
285   # - Reduce the user note count
286   # - Reduce the reply count
287   # - Stream out the activity
288   @impl true
289   def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
290     deleted_object =
291       Object.normalize(deleted_object, fetch: false) ||
292         User.get_cached_by_ap_id(deleted_object)
293
294     result =
295       case deleted_object do
296         %Object{} ->
297           with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
298                {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
299                %User{} = user <- User.get_cached_by_ap_id(actor) do
300             User.remove_pinned_object_id(user, deleted_object.data["id"])
301
302             {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
303
304             if in_reply_to = deleted_object.data["inReplyTo"] do
305               Object.decrease_replies_count(in_reply_to)
306             end
307
308             MessageReference.delete_for_object(deleted_object)
309
310             ap_streamer().stream_out(object)
311             ap_streamer().stream_out_participations(deleted_object, user)
312             :ok
313           else
314             {:actor, _} ->
315               @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
316               :no_object_actor
317           end
318
319         %User{} ->
320           with {:ok, _} <- User.delete(deleted_object) do
321             :ok
322           end
323       end
324
325     if result == :ok do
326       {:ok, object, meta}
327     else
328       {:error, result}
329     end
330   end
331
332   # Tasks this handles:
333   # - adds pin to user
334   # - removes expiration job for pinned activity, if was set for expiration
335   @impl true
336   def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
337     with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
338          {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
339       # if pinned activity was scheduled for deletion, we remove job
340       if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
341         Oban.cancel_job(expiration.id)
342       end
343
344       {:ok, object, meta}
345     else
346       nil ->
347         {:error, :user_not_found}
348
349       {:error, changeset} ->
350         if changeset.errors[:pinned_objects] do
351           {:error, :pinned_statuses_limit_reached}
352         else
353           changeset.errors
354         end
355     end
356   end
357
358   # Tasks this handles:
359   # - removes pin from user
360   # - removes corresponding Add activity
361   # - if activity had expiration, recreates activity expiration job
362   @impl true
363   def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
364     with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
365          {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
366       data["object"]
367       |> Activity.add_by_params_query(user.ap_id, user.featured_address)
368       |> Repo.delete_all()
369
370       # if pinned activity was scheduled for deletion, we reschedule it for deletion
371       if meta[:expires_at] do
372         # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
373         {:ok, expires_at} =
374           Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
375
376         Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
377           activity_id: meta[:activity_id],
378           expires_at: expires_at
379         })
380       end
381
382       {:ok, object, meta}
383     else
384       nil -> {:error, :user_not_found}
385       error -> error
386     end
387   end
388
389   # Nothing to do
390   @impl true
391   def handle(object, meta) do
392     {:ok, object, meta}
393   end
394
395   defp handle_update_user(
396          %{data: %{"type" => "Update", "object" => updated_object}} = object,
397          meta
398        ) do
399     if changeset = Keyword.get(meta, :user_update_changeset) do
400       changeset
401       |> User.update_and_set_cache()
402     else
403       {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
404
405       User.get_by_ap_id(updated_object["id"])
406       |> User.remote_user_changeset(new_user_data)
407       |> User.update_and_set_cache()
408     end
409
410     {:ok, object, meta}
411   end
412
413   defp handle_update_object(
414          %{data: %{"type" => "Update", "object" => updated_object}} = object,
415          meta
416        ) do
417     orig_object_ap_id = updated_object["id"]
418     orig_object = Object.get_by_ap_id(orig_object_ap_id)
419     orig_object_data = orig_object.data
420
421     updated_object =
422       if meta[:local] do
423         # If this is a local Update, we don't process it by transmogrifier,
424         # so we use the embedded object as-is.
425         updated_object
426       else
427         meta[:object_data]
428       end
429
430     if orig_object_data["type"] in Pleroma.Constants.updatable_object_types() do
431       {:ok, _, updated} =
432         Object.Updater.do_update_and_invalidate_cache(orig_object, updated_object)
433
434       if updated do
435         object
436         |> Activity.normalize()
437         |> ActivityPub.notify_and_stream()
438       end
439     end
440
441     {:ok, object, meta}
442   end
443
444   def handle_object_creation(%{"type" => "ChatMessage"} = object, _activity, meta) do
445     with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
446       actor = User.get_cached_by_ap_id(object.data["actor"])
447       recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
448
449       streamables =
450         [[actor, recipient], [recipient, actor]]
451         |> Enum.uniq()
452         |> Enum.map(fn [user, other_user] ->
453           if user.local do
454             {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
455             {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
456
457             @cachex.put(
458               :chat_message_id_idempotency_key_cache,
459               cm_ref.id,
460               meta[:idempotency_key]
461             )
462
463             {
464               ["user", "user:pleroma_chat"],
465               {user, %{cm_ref | chat: chat, object: object}}
466             }
467           end
468         end)
469         |> Enum.filter(& &1)
470
471       meta =
472         meta
473         |> add_streamables(streamables)
474
475       {:ok, object, meta}
476     end
477   end
478
479   def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
480     with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
481       PollWorker.schedule_poll_end(activity)
482       {:ok, object, meta}
483     end
484   end
485
486   def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
487     with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
488       Object.increase_vote_count(
489         object.data["inReplyTo"],
490         object.data["name"],
491         object.data["actor"]
492       )
493
494       {:ok, object, meta}
495     end
496   end
497
498   def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
499       when objtype in ~w[Audio Video Event Article Note Page] do
500     with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
501       {:ok, object, meta}
502     end
503   end
504
505   # Nothing to do
506   def handle_object_creation(object, _activity, meta) do
507     {:ok, object, meta}
508   end
509
510   defp undo_like(nil, object), do: delete_object(object)
511
512   defp undo_like(%Object{} = liked_object, object) do
513     with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
514       delete_object(object)
515     end
516   end
517
518   def handle_undoing(%{data: %{"type" => "Like"}} = object) do
519     object.data["object"]
520     |> Object.get_by_ap_id()
521     |> undo_like(object)
522   end
523
524   def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
525     with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
526          {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
527          {:ok, _} <- Repo.delete(object) do
528       :ok
529     end
530   end
531
532   def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
533     with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
534          {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
535          {:ok, _} <- Repo.delete(object) do
536       :ok
537     end
538   end
539
540   def handle_undoing(
541         %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
542       ) do
543     with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
544          %User{} = blocked <- User.get_cached_by_ap_id(blocked),
545          {:ok, _} <- User.unblock(blocker, blocked),
546          {:ok, _} <- Repo.delete(object) do
547       :ok
548     end
549   end
550
551   def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
552
553   @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
554   defp delete_object(object) do
555     with {:ok, _} <- Repo.delete(object), do: :ok
556   end
557
558   defp send_notifications(meta) do
559     Keyword.get(meta, :notifications, [])
560     |> Enum.each(fn notification ->
561       Streamer.stream(["user", "user:notification"], notification)
562       Push.send(notification)
563     end)
564
565     meta
566   end
567
568   defp send_streamables(meta) do
569     Keyword.get(meta, :streamables, [])
570     |> Enum.each(fn {topics, items} ->
571       Streamer.stream(topics, items)
572     end)
573
574     meta
575   end
576
577   defp add_streamables(meta, streamables) do
578     existing = Keyword.get(meta, :streamables, [])
579
580     meta
581     |> Keyword.put(:streamables, streamables ++ existing)
582   end
583
584   defp add_notifications(meta, notifications) do
585     existing = Keyword.get(meta, :notifications, [])
586
587     meta
588     |> Keyword.put(:notifications, notifications ++ existing)
589   end
590
591   @impl true
592   def handle_after_transaction(meta) do
593     meta
594     |> send_notifications()
595     |> send_streamables()
596   end
597 end