1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.SideEffects do
7 This module looks at an inserted object and executes the side effects that it
8 implies. For example, a `Like` activity will increase the like count on the
9 liked object, a `Follow` activity will add the user to the follower
10 collection, and so on.
12 alias Pleroma.Activity
14 alias Pleroma.Chat.MessageReference
15 alias Pleroma.FollowingRelationship
16 alias Pleroma.Notification
20 alias Pleroma.Web.ActivityPub.ActivityPub
21 alias Pleroma.Web.ActivityPub.Builder
22 alias Pleroma.Web.ActivityPub.Pipeline
23 alias Pleroma.Web.ActivityPub.Utils
24 alias Pleroma.Web.Push
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Workers.PollWorker
28 require Pleroma.Constants
31 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
32 @logger Pleroma.Config.get([:side_effects, :logger], Logger)
34 @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
36 defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
39 def handle(object, meta \\ [])
43 # - Sends a notification
50 "object" => follow_activity_id
55 with %Activity{actor: follower_id} = follow_activity <-
56 Activity.get_by_ap_id(follow_activity_id),
57 %User{} = followed <- User.get_cached_by_ap_id(actor),
58 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
59 {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
60 {:ok, _follower, followed} <-
61 FollowingRelationship.update(follower, followed, :follow_accept) do
62 Notification.update_notification_type(followed, follow_activity)
69 # - Rejects all existing follow activities for this person
70 # - Updates the follow state
71 # - Dismisses notification
78 "object" => follow_activity_id
83 with %Activity{actor: follower_id} = follow_activity <-
84 Activity.get_by_ap_id(follow_activity_id),
85 %User{} = followed <- User.get_cached_by_ap_id(actor),
86 %User{} = follower <- User.get_cached_by_ap_id(follower_id),
87 {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
88 FollowingRelationship.update(follower, followed, :follow_reject)
89 Notification.dismiss(follow_activity)
96 # - Follows if possible
97 # - Sends a notification
98 # - Generates accept or reject if appropriate
105 "object" => followed_user,
106 "actor" => following_user
111 with %User{} = follower <- User.get_cached_by_ap_id(following_user),
112 %User{} = followed <- User.get_cached_by_ap_id(followed_user),
113 {_, {:ok, _, _}, _, _} <-
114 {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
115 if followed.local && !followed.is_locked do
116 {:ok, accept_data, _} = Builder.accept(followed, object)
117 {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
120 {:following, {:error, _}, _follower, followed} ->
121 {:ok, reject_data, _} = Builder.reject(followed, object)
122 {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
128 {:ok, notifications} = Notification.create_notifications(object, do_send: false)
132 |> add_notifications(notifications)
134 updated_object = Activity.get_by_ap_id(follow_id)
136 {:ok, updated_object, meta}
139 # Tasks this handles:
140 # - Unfollow and block
143 %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
147 with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
148 %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
149 User.block(blocker, blocked)
155 # Tasks this handles:
157 # - Update a non-user object (Note, Question, etc.)
159 # For a local user, we also get a changeset with the full information, so we
160 # can update non-federating, non-activitypub settings as well.
162 def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
163 updated_object_id = updated_object["id"]
165 with {_, true} <- {:has_id, is_binary(updated_object_id)},
166 %{"type" => type} <- updated_object,
167 {_, is_user} <- {:is_user, type in Pleroma.Constants.actor_types()} do
169 handle_update_user(object, meta)
171 handle_update_object(object, meta)
179 # Tasks this handles:
180 # - Add like to object
181 # - Set up notification
183 def handle(%{data: %{"type" => "Like"}} = object, meta) do
184 liked_object = Object.get_by_ap_id(object.data["object"])
185 Utils.add_like_to_object(object, liked_object)
187 Notification.create_notifications(object)
193 # - Actually create object
194 # - Rollback if we couldn't create it
195 # - Increase the user note count
196 # - Increase the reply count
197 # - Increase replies count
198 # - Set up ActivityExpiration
199 # - Set up notifications
200 # - Index incoming posts for search (if needed)
202 def handle(%{data: %{"type" => "Create"}} = activity, meta) do
203 with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
204 %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
205 {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
206 {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
207 {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
209 if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
210 Object.increase_replies_count(in_reply_to)
213 if quote_url = object.data["quoteUrl"] do
214 Object.increase_quotes_count(quote_url)
217 reply_depth = (meta[:depth] || 0) + 1
219 # FIXME: Force inReplyTo to replies
220 if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
221 object.data["replies"] != nil do
222 for reply_id <- object.data["replies"] do
223 Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
225 "depth" => reply_depth
230 Pleroma.Web.RichMedia.Card.get_by_activity(activity)
232 Pleroma.Search.add_to_index(Map.put(activity, :object, object))
234 Utils.maybe_handle_group_posts(activity)
238 |> add_notifications(notifications)
240 ap_streamer().stream_out(activity)
242 {:ok, activity, meta}
244 e -> Repo.rollback(e)
248 # Tasks this handles:
249 # - Add announce to object
250 # - Set up notification
251 # - Stream out the announce
253 def handle(%{data: %{"type" => "Announce"}} = object, meta) do
254 announced_object = Object.get_by_ap_id(object.data["object"])
255 user = User.get_cached_by_ap_id(object.data["actor"])
257 Utils.add_announce_to_object(object, announced_object)
259 if !User.internal?(user) do
260 Notification.create_notifications(object)
262 ap_streamer().stream_out(object)
269 def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
270 with undone_object <- Activity.get_by_ap_id(undone_object),
271 :ok <- handle_undoing(undone_object) do
276 # Tasks this handles:
277 # - Add reaction to object
278 # - Set up notification
280 def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
281 reacted_object = Object.get_by_ap_id(object.data["object"])
282 Utils.add_emoji_reaction_to_object(object, reacted_object)
284 Notification.create_notifications(object)
289 # Tasks this handles:
290 # - Delete and unpins the create activity
291 # - Replace object with Tombstone
292 # - Reduce the user note count
293 # - Reduce the reply count
294 # - Stream out the activity
295 # - Removes posts from search index (if needed)
297 def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
299 Object.normalize(deleted_object, fetch: false) ||
300 User.get_cached_by_ap_id(deleted_object)
303 case deleted_object do
305 with {_, {:ok, deleted_object, _activity}} <- {:object, Object.delete(deleted_object)},
306 {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
307 {_, %User{} = user} <- {:user, User.get_cached_by_ap_id(actor)} do
308 User.remove_pinned_object_id(user, deleted_object.data["id"])
310 {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
312 if in_reply_to = deleted_object.data["inReplyTo"] do
313 Object.decrease_replies_count(in_reply_to)
316 if quote_url = deleted_object.data["quoteUrl"] do
317 Object.decrease_quotes_count(quote_url)
320 MessageReference.delete_for_object(deleted_object)
322 ap_streamer().stream_out(object)
323 ap_streamer().stream_out_participations(deleted_object, user)
327 @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
332 "The object's actor could not be resolved to a user: #{inspect(deleted_object)}"
338 @logger.error("The object could not be deleted: #{inspect(deleted_object)}")
343 with {:ok, _} <- User.delete(deleted_object) do
349 # Only remove from index when deleting actual objects, not users or anything else
350 with %Pleroma.Object{} <- deleted_object do
351 Pleroma.Search.remove_from_index(deleted_object)
360 # Tasks this handles:
362 # - removes expiration job for pinned activity, if was set for expiration
364 def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
365 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
366 {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
367 # if pinned activity was scheduled for deletion, we remove job
368 if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
369 Oban.cancel_job(expiration.id)
375 {:error, :user_not_found}
377 {:error, changeset} ->
378 if changeset.errors[:pinned_objects] do
379 {:error, :pinned_statuses_limit_reached}
386 # Tasks this handles:
387 # - removes pin from user
388 # - removes corresponding Add activity
389 # - if activity had expiration, recreates activity expiration job
391 def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
392 with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
393 {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
395 |> Activity.add_by_params_query(user.ap_id, user.featured_address)
398 # if pinned activity was scheduled for deletion, we reschedule it for deletion
399 if meta[:expires_at] do
400 # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
402 Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
404 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
405 activity_id: meta[:activity_id],
406 expires_at: expires_at
412 nil -> {:error, :user_not_found}
419 def handle(object, meta) do
423 defp handle_update_user(
424 %{data: %{"type" => "Update", "object" => updated_object}} = object,
427 if changeset = Keyword.get(meta, :user_update_changeset) do
429 |> User.update_and_set_cache()
431 {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
433 User.get_by_ap_id(updated_object["id"])
434 |> User.remote_user_changeset(new_user_data)
435 |> User.update_and_set_cache()
441 defp handle_update_object(
442 %{data: %{"type" => "Update", "object" => updated_object}} = object,
445 orig_object_ap_id = updated_object["id"]
446 orig_object = Object.get_by_ap_id(orig_object_ap_id)
447 orig_object_data = orig_object.data
451 # If this is a local Update, we don't process it by transmogrifier,
452 # so we use the embedded object as-is.
458 if orig_object_data["type"] in Pleroma.Constants.updatable_object_types() do
460 Object.Updater.do_update_and_invalidate_cache(orig_object, updated_object)
464 |> Activity.normalize()
465 |> ActivityPub.notify_and_stream()
472 def handle_object_creation(%{"type" => "ChatMessage"} = object, _activity, meta) do
473 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
474 actor = User.get_cached_by_ap_id(object.data["actor"])
475 recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
478 [[actor, recipient], [recipient, actor]]
480 |> Enum.map(fn [user, other_user] ->
482 {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
483 {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
486 :chat_message_id_idempotency_key_cache,
488 meta[:idempotency_key]
492 ["user", "user:pleroma_chat"],
493 {user, %{cm_ref | chat: chat, object: object}}
501 |> add_streamables(streamables)
507 def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
508 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
509 PollWorker.schedule_poll_end(activity)
514 def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
515 with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
516 Object.increase_vote_count(
517 object.data["inReplyTo"],
526 def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
527 when objtype in ~w[Audio Video Image Event Article Note Page] do
528 with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
534 def handle_object_creation(object, _activity, meta) do
538 defp undo_like(nil, object), do: delete_object(object)
540 defp undo_like(%Object{} = liked_object, object) do
541 with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
542 delete_object(object)
546 def handle_undoing(%{data: %{"type" => "Like"}} = object) do
547 object.data["object"]
548 |> Object.get_by_ap_id()
552 def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
553 with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
554 {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
555 {:ok, _} <- Repo.delete(object) do
560 def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
561 with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
562 {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
563 {:ok, _} <- Repo.delete(object) do
569 %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
571 with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
572 %User{} = blocked <- User.get_cached_by_ap_id(blocked),
573 {:ok, _} <- User.unblock(blocker, blocked),
574 {:ok, _} <- Repo.delete(object) do
579 def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
581 @spec delete_object(Activity.t()) :: :ok | {:error, Ecto.Changeset.t()}
582 defp delete_object(object) do
583 with {:ok, _} <- Repo.delete(object), do: :ok
586 defp send_notifications(meta) do
587 Keyword.get(meta, :notifications, [])
588 |> Enum.each(fn notification ->
589 Streamer.stream(["user", "user:notification"], notification)
590 Push.send(notification)
596 defp send_streamables(meta) do
597 Keyword.get(meta, :streamables, [])
598 |> Enum.each(fn {topics, items} ->
599 Streamer.stream(topics, items)
605 defp add_streamables(meta, streamables) do
606 existing = Keyword.get(meta, :streamables, [])
609 |> Keyword.put(:streamables, streamables ++ existing)
612 defp add_notifications(meta, notifications) do
613 existing = Keyword.get(meta, :notifications, [])
616 |> Keyword.put(:notifications, notifications ++ existing)
620 def handle_after_transaction(meta) do
622 |> send_notifications()
623 |> send_streamables()