1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
12 alias Pleroma.Activity
14 alias Pleroma.Chat.MessageReference
15 alias Pleroma.FollowingRelationship
16 alias Pleroma.Notification
20 alias Pleroma.Web.ActivityPub.ActivityPub
21 alias Pleroma.Web.ActivityPub.Builder
22 alias Pleroma.Web.ActivityPub.Pipeline
23 alias Pleroma.Web.ActivityPub.Utils
24 alias Pleroma.Web.Push
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Workers.PollWorker
28 require Pleroma.Constants
31 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
32 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
34 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
36 defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
39 def handle(object, meta \\ [])
43 # - Sends a notification
50 "object" => follow_activity_id
55 with %Activity{actor: follower_id} = follow_activity <-
56 Activity.get_by_ap_id(follow_activity_id),
57 %User{} = followed <- User.get_cached_by_ap_id(actor),
58 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
59 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
60 {:ok, _follower, followed} <-
61 FollowingRelationship.update(follower, followed, :follow_accept) do
62 Notification.update_notification_type(followed, follow_activity)
69 # - Rejects all existing follow activities for this person
70 # - Updates the follow state
71 # - Dismisses notification
78 "object" => follow_activity_id
83 with %Activity{actor: follower_id} = follow_activity <-
84 Activity.get_by_ap_id(follow_activity_id),
85 %User{} = followed <- User.get_cached_by_ap_id(actor),
86 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
87 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
88 FollowingRelationship.update(follower, followed, :follow_reject)
89 Notification.dismiss(follow_activity)
96 # - Follows if possible
97 # - Sends a notification
98 # - Generates accept or reject if appropriate
105 "object" => followed_user,
106 "actor" => following_user
111 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
112 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
113 {_, {:ok, _, _}, _, _} <-
114 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
115 if followed.local && !followed.is_locked do
116 {:ok, accept_data, _} = Builder.accept(followed, object)
117 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
120 {:following, {:error, _}, _follower, followed} ->
121 {:ok, reject_data, _} = Builder.reject(followed, object)
122 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
128 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
132 |> add_notifications(notifications)
134 updated_object = Activity.get_by_ap_id(follow_id)
136 {:ok, updated_object, meta}
139 # Tasks this handles:
140 # - Unfollow and block
143 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
147 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
148 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
149 User.block(blocker, blocked)
155 # Tasks this handles:
157 # - Update a non-user object (Note, Question, etc.)
159 # For a local user, we also get a changeset with the full information, so we
160 # can update non-federating, non-activitypub settings as well.
162 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
163 updated_object_id = updated_object["id"]
165 with {_, true} <- {:has_id, is_binary(updated_object_id)},
166 %{"type" => type} <- updated_object,
167 {_, is_user} <- {:is_user, type in Pleroma.Constants.actor_types()} do
169 handle_update_user(object, meta)
171 handle_update_object(object, meta)
179 # Tasks this handles:
180 # - Add like to object
181 # - Set up notification
183 def handle(%{data: %{"type" => "Like"}} = object, meta) do
184 liked_object = Object.get_by_ap_id(object.data["object"])
185 Utils.add_like_to_object(object, liked_object)
187 Notification.create_notifications(object)
193 # - Actually create object
194 # - Rollback if we couldn't create it
195 # - Increase the user note count
196 # - Increase the reply count
197 # - Increase replies count
198 # - Set up ActivityExpiration
199 # - Set up notifications
201 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
202 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
203 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
204 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
205 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
206 {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
208 if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
209 Object.increase_replies_count(in_reply_to)
212 reply_depth = (meta[:depth] || 0) + 1
214 # FIXME: Force inReplyTo to replies
215 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
216 object.data["replies"] != nil do
217 for reply_id <- object.data["replies"] do
218 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
220 "depth" => reply_depth
225 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
226 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
231 |> add_notifications(notifications)
233 ap_streamer().stream_out(activity)
235 {:ok, activity, meta}
237 e -> Repo.rollback(e)
241 # Tasks this handles:
242 # - Add announce to object
243 # - Set up notification
244 # - Stream out the announce
246 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
247 announced_object = Object.get_by_ap_id(object.data["object"])
248 user = User.get_cached_by_ap_id(object.data["actor"])
250 Utils.add_announce_to_object(object, announced_object)
252 if !User.is_internal_user?(user) do
253 Notification.create_notifications(object)
255 ap_streamer().stream_out(object)
262 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
263 with undone_object <- Activity.get_by_ap_id(undone_object),
264 :ok <- handle_undoing(undone_object) do
269 # Tasks this handles:
270 # - Add reaction to object
271 # - Set up notification
273 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
274 reacted_object = Object.get_by_ap_id(object.data["object"])
275 Utils.add_emoji_reaction_to_object(object, reacted_object)
277 Notification.create_notifications(object)
282 # Tasks this handles:
283 # - Delete and unpins the create activity
284 # - Replace object with Tombstone
285 # - Reduce the user note count
286 # - Reduce the reply count
287 # - Stream out the activity
289 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
291 Object.normalize(deleted_object, fetch: false) ||
292 User.get_cached_by_ap_id(deleted_object)
295 case deleted_object do
297 with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
298 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
299 %User{} = user <- User.get_cached_by_ap_id(actor) do
300 User.remove_pinned_object_id(user, deleted_object.data["id"])
302 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
304 if in_reply_to = deleted_object.data["inReplyTo"] do
305 Object.decrease_replies_count(in_reply_to)
308 MessageReference.delete_for_object(deleted_object)
310 ap_streamer().stream_out(object)
311 ap_streamer().stream_out_participations(deleted_object, user)
315 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
320 with {:ok, _} <- User.delete(deleted_object) do
332 # Tasks this handles:
334 # - removes expiration job for pinned activity, if was set for expiration
336 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
337 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
338 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
339 # if pinned activity was scheduled for deletion, we remove job
340 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
341 Oban.cancel_job(expiration.id)
347 {:error, :user_not_found}
349 {:error, changeset} ->
350 if changeset.errors[:pinned_objects] do
351 {:error, :pinned_statuses_limit_reached}
358 # Tasks this handles:
359 # - removes pin from user
360 # - removes corresponding Add activity
361 # - if activity had expiration, recreates activity expiration job
363 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
364 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
365 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
367 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
370 # if pinned activity was scheduled for deletion, we reschedule it for deletion
371 if meta[:expires_at] do
372 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
374 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
376 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
377 activity_id: meta[:activity_id],
378 expires_at: expires_at
384 nil -> {:error, :user_not_found}
391 def handle(object, meta) do
395 defp handle_update_user(
396 %{data: %{"type" => "Update", "object" => updated_object}} = object,
399 if changeset = Keyword.get(meta, :user_update_changeset) do
401 |> User.update_and_set_cache()
403 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
405 User.get_by_ap_id(updated_object["id"])
406 |> User.remote_user_changeset(new_user_data)
407 |> User.update_and_set_cache()
413 defp handle_update_object(
414 %{data: %{"type" => "Update", "object" => updated_object}} = object,
417 orig_object_ap_id = updated_object["id"]
418 orig_object = Object.get_by_ap_id(orig_object_ap_id)
419 orig_object_data = orig_object.data
423 # If this is a local Update, we don't process it by transmogrifier,
424 # so we use the embedded object as-is.
430 if orig_object_data["type"] in Pleroma.Constants.updatable_object_types() do
432 Object.Updater.do_update_and_invalidate_cache(orig_object, updated_object)
436 |> Activity.normalize()
437 |> ActivityPub.notify_and_stream()
444 def handle_object_creation(%{"type" => "ChatMessage"} = object, _activity, meta) do
445 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
446 actor = User.get_cached_by_ap_id(object.data["actor"])
447 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
450 [[actor, recipient], [recipient, actor]]
452 |> Enum.map(fn [user, other_user] ->
454 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
455 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
458 :chat_message_id_idempotency_key_cache,
460 meta[:idempotency_key]
464 ["user", "user:pleroma_chat"],
465 {user, %{cm_ref | chat: chat, object: object}}
473 |> add_streamables(streamables)
479 def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
480 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
481 PollWorker.schedule_poll_end(activity)
486 def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
487 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
488 Object.increase_vote_count(
489 object.data["inReplyTo"],
498 def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
499 when objtype in ~w[Audio Video Event Article Note Page] do
500 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
506 def handle_object_creation(object, _activity, meta) do
510 defp undo_like(nil, object), do: delete_object(object)
512 defp undo_like(%Object{} = liked_object, object) do
513 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
514 delete_object(object)
518 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
519 object.data["object"]
520 |> Object.get_by_ap_id()
524 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
525 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
526 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
527 {:ok, _} <- Repo.delete(object) do
532 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
533 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
534 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
535 {:ok, _} <- Repo.delete(object) do
541 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
543 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
544 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
545 {:ok, _} <- User.unblock(blocker, blocked),
546 {:ok, _} <- Repo.delete(object) do
551 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
553 @spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
554 defp delete_object(object) do
555 with {:ok, _} <- Repo.delete(object), do: :ok
558 defp send_notifications(meta) do
559 Keyword.get(meta, :notifications, [])
560 |> Enum.each(fn notification ->
561 Streamer.stream(["user", "user:notification"], notification)
562 Push.send(notification)
568 defp send_streamables(meta) do
569 Keyword.get(meta, :streamables, [])
570 |> Enum.each(fn {topics, items} ->
571 Streamer.stream(topics, items)
577 defp add_streamables(meta, streamables) do
578 existing = Keyword.get(meta, :streamables, [])
581 |> Keyword.put(:streamables, streamables ++ existing)
584 defp add_notifications(meta, notifications) do
585 existing = Keyword.get(meta, :notifications, [])
588 |> Keyword.put(:notifications, notifications ++ existing)
592 def handle_after_transaction(meta) do
594 |> send_notifications()
595 |> send_streamables()