1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
15 alias Pleroma.Notification
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28 alias Pleroma.Workers.PollWorker
31 import Pleroma.Web.ActivityPub.Utils
32 import Pleroma.Web.ActivityPub.Visibility
35 require Pleroma.Constants
37 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
40 defp get_recipients(%{"type" => "Create"} = data) do
41 to = Map.get(data, "to", [])
42 cc = Map.get(data, "cc", [])
43 bcc = Map.get(data, "bcc", [])
44 actor = Map.get(data, "actor", [])
45 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
49 defp get_recipients(data) do
50 to = Map.get(data, "to", [])
51 cc = Map.get(data, "cc", [])
52 bcc = Map.get(data, "bcc", [])
53 recipients = Enum.concat([to, cc, bcc])
57 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
58 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
60 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
61 case User.get_cached_by_ap_id(actor) do
62 %User{is_active: true} -> true
67 defp check_actor_can_insert(_), do: true
69 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
70 limit = Config.get([:instance, :remote_limit])
71 String.length(content) <= limit
74 defp check_remote_limit(_), do: true
76 def increase_note_count_if_public(actor, object) do
77 if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
80 def decrease_note_count_if_public(actor, object) do
81 if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
84 def update_last_status_at_if_public(actor, object) do
85 if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
88 defp increase_replies_count_if_reply(%{
89 "object" => %{"inReplyTo" => reply_ap_id} = object,
92 if is_public?(object) do
93 Object.increase_replies_count(reply_ap_id)
97 defp increase_replies_count_if_reply(_create_data), do: :noop
99 @object_types ~w[ChatMessage Question Answer Audio Video Event Article Note Page]
101 def persist(%{"type" => type} = object, meta) when type in @object_types do
102 with {:ok, object} <- Object.create(object) do
108 def persist(object, meta) do
109 with local <- Keyword.fetch!(meta, :local),
110 {recipients, _, _} <- get_recipients(object),
112 Repo.insert(%Activity{
115 recipients: recipients,
116 actor: object["actor"]
118 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
119 {:ok, _} <- maybe_create_activity_expiration(activity) do
120 {:ok, activity, meta}
124 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
125 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
126 with nil <- Activity.normalize(map),
127 map <- lazy_put_activity_defaults(map, fake),
128 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
129 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
130 {:ok, map} <- MRF.filter(map),
131 {recipients, _, _} = get_recipients(map),
132 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
133 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
134 {:ok, map, object} <- insert_full_object(map),
135 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
136 # Splice in the child object if we have one.
137 activity = Maps.put_if_present(activity, :object, object)
139 ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
140 Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
145 %Activity{} = activity ->
151 {:containment, _} = error ->
154 {:error, _} = error ->
157 {:fake, true, map, recipients} ->
158 activity = %Activity{
162 recipients: recipients,
166 Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
169 {:remote_limit_pass, _} ->
170 {:error, :remote_limit}
177 defp insert_activity_with_expiration(data, local, recipients) do
181 actor: data["actor"],
182 recipients: recipients
185 with {:ok, activity} <- Repo.insert(struct) do
186 maybe_create_activity_expiration(activity)
190 def notify_and_stream(activity) do
191 Notification.create_notifications(activity)
195 %{data: %{"type" => "Update"}, object: %{data: %{"id" => id}}} ->
196 Activity.get_create_by_object_ap_id_with_object(id)
202 conversation = create_or_bump_conversation(original_activity, original_activity.actor)
203 participations = get_participations(conversation)
205 stream_out_participations(participations)
208 defp maybe_create_activity_expiration(
209 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
212 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
213 activity_id: activity.id,
214 expires_at: expires_at
220 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
222 defp create_or_bump_conversation(activity, actor) do
223 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
224 %User{} = user <- User.get_cached_by_ap_id(actor) do
225 Participation.mark_as_read(user, conversation)
230 defp get_participations({:ok, conversation}) do
232 |> Repo.preload(:participations, force: true)
233 |> Map.get(:participations)
236 defp get_participations(_), do: []
238 def stream_out_participations(participations) do
241 |> Repo.preload(:user)
243 Streamer.stream("participation", participations)
247 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
248 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
249 conversation = Repo.preload(conversation, :participations)
252 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
257 if last_activity_id do
258 stream_out_participations(conversation.participations)
264 def stream_out_participations(_, _), do: :noop
267 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
268 when data_type in ["Create", "Announce", "Delete", "Update"] do
270 |> Topics.get_activity_topics()
271 |> Streamer.stream(activity)
275 def stream_out(_activity) do
279 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
280 def create(params, fake \\ false) do
281 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
286 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
287 additional = params[:additional] || %{}
288 # only accept false as false value
289 local = !(params[:local] == false)
290 published = params[:published]
291 quick_insert? = Config.get([:env]) == :benchmark
295 %{to: to, actor: actor, published: published, context: context, object: object},
299 with {:ok, activity} <- insert(create_data, local, fake),
300 {:fake, false, activity} <- {:fake, fake, activity},
301 _ <- increase_replies_count_if_reply(create_data),
302 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
303 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
304 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
305 _ <- notify_and_stream(activity),
306 :ok <- maybe_schedule_poll_notifications(activity),
307 :ok <- maybe_federate(activity) do
310 {:quick_insert, true, activity} ->
313 {:fake, true, activity} ->
317 Repo.rollback(message)
321 defp maybe_schedule_poll_notifications(activity) do
322 PollWorker.schedule_poll_end(activity)
326 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
327 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
328 additional = params[:additional] || %{}
329 # only accept false as false value
330 local = !(params[:local] == false)
331 published = params[:published]
335 %{to: to, actor: actor, published: published, context: context, object: object},
339 with {:ok, activity} <- insert(listen_data, local),
340 _ <- notify_and_stream(activity),
341 :ok <- maybe_federate(activity) do
346 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
347 {:ok, Activity.t()} | nil | {:error, any()}
348 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
349 with {:ok, result} <-
350 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
355 defp do_unfollow(follower, followed, activity_id, local) do
356 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
357 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
358 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
359 {:ok, activity} <- insert(unfollow_data, local),
360 _ <- notify_and_stream(activity),
361 :ok <- maybe_federate(activity) do
365 {:error, error} -> Repo.rollback(error)
369 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
371 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
385 # only accept false as false value
386 local = !(params[:local] == false)
387 forward = !(params[:forward] == false)
389 additional = params[:additional] || %{}
393 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
395 Map.merge(additional, %{"to" => [], "cc" => []})
398 with flag_data <- make_flag_data(params, additional),
399 {:ok, activity} <- insert(flag_data, local),
400 {:ok, stripped_activity} <- strip_report_status_data(activity),
401 _ <- notify_and_stream(activity),
403 maybe_federate(stripped_activity) do
404 User.all_users_with_privilege(:reports_manage_reports)
405 |> Enum.filter(fn user -> user.ap_id != actor end)
406 |> Enum.filter(fn user -> not is_nil(user.email) end)
407 |> Enum.each(fn privileged_user ->
409 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
410 |> Pleroma.Emails.Mailer.deliver_async()
415 {:error, error} -> Repo.rollback(error)
419 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
420 def move(%User{} = origin, %User{} = target, local \\ true) do
423 "actor" => origin.ap_id,
424 "object" => origin.ap_id,
425 "target" => target.ap_id,
426 "to" => [origin.follower_address]
429 with true <- origin.ap_id in target.also_known_as,
430 {:ok, activity} <- insert(params, local),
431 _ <- notify_and_stream(activity) do
432 maybe_federate(activity)
434 BackgroundWorker.enqueue("move_following", %{
435 "origin_id" => origin.id,
436 "target_id" => target.id
441 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
446 def fetch_activities_for_context_query(context, opts) do
447 public = [Constants.as_public()]
451 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
454 from(activity in Activity)
455 |> maybe_preload_objects(opts)
456 |> maybe_preload_bookmarks(opts)
457 |> maybe_set_thread_muted_field(opts)
458 |> restrict_blocked(opts)
459 |> restrict_blockers_visibility(opts)
460 |> restrict_recipients(recipients, opts[:user])
461 |> restrict_filtered(opts)
465 "?->>'type' = ? and ?->>'context' = ?",
472 |> exclude_poll_votes(opts)
474 |> order_by([activity], desc: activity.id)
477 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
478 def fetch_activities_for_context(context, opts \\ %{}) do
480 |> fetch_activities_for_context_query(opts)
484 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
485 FlakeId.Ecto.CompatType.t() | nil
486 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
488 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
489 |> restrict_visibility(%{visibility: "direct"})
495 defp fetch_paginated_optimized(query, opts, pagination) do
496 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
497 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
498 opts = Map.put(opts, :skip_extra_order, true)
500 Pagination.fetch_paginated(query, opts, pagination)
503 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
504 list_memberships = Pleroma.List.memberships(opts[:user])
506 fetch_activities_query(recipients ++ list_memberships, opts)
507 |> fetch_paginated_optimized(opts, pagination)
509 |> maybe_update_cc(list_memberships, opts[:user])
512 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
513 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
514 includes_local_public = Map.get(opts, :includes_local_public, false)
516 opts = Map.delete(opts, :user)
518 intended_recipients =
519 if includes_local_public do
520 [Constants.as_public(), as_local_public()]
522 [Constants.as_public()]
526 |> fetch_activities_query(opts)
527 |> restrict_unlisted(opts)
528 |> fetch_paginated_optimized(opts, pagination)
531 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
532 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
534 |> Map.put(:restrict_unlisted, true)
535 |> fetch_public_or_unlisted_activities(pagination)
538 @valid_visibilities ~w[direct unlisted public private]
540 defp restrict_visibility(query, %{visibility: visibility})
541 when is_list(visibility) do
542 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
547 "activity_visibility(?, ?, ?) = ANY (?)",
555 Logger.error("Could not restrict visibility to #{visibility}")
559 defp restrict_visibility(query, %{visibility: visibility})
560 when visibility in @valid_visibilities do
564 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
568 defp restrict_visibility(_query, %{visibility: visibility})
569 when visibility not in @valid_visibilities do
570 Logger.error("Could not restrict visibility to #{visibility}")
573 defp restrict_visibility(query, _visibility), do: query
575 defp exclude_visibility(query, %{exclude_visibilities: visibility})
576 when is_list(visibility) do
577 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
582 "activity_visibility(?, ?, ?) = ANY (?)",
590 Logger.error("Could not exclude visibility to #{visibility}")
595 defp exclude_visibility(query, %{exclude_visibilities: visibility})
596 when visibility in @valid_visibilities do
601 "activity_visibility(?, ?, ?) = ?",
610 defp exclude_visibility(query, %{exclude_visibilities: visibility})
611 when visibility not in [nil | @valid_visibilities] do
612 Logger.error("Could not exclude visibility to #{visibility}")
616 defp exclude_visibility(query, _visibility), do: query
618 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
621 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
624 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
625 local_public = as_local_public()
629 where: fragment("thread_visibility(?, (?)->>'id', ?) = true", ^ap_id, a.data, ^local_public)
633 defp restrict_thread_visibility(query, _, _), do: query
635 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
638 |> Map.put(:user, reading_user)
639 |> Map.put(:actor_id, user.ap_id)
642 godmode: params[:godmode],
643 reading_user: reading_user
645 |> user_activities_recipients()
646 |> fetch_activities(params)
650 def fetch_user_activities(user, reading_user, params \\ %{})
652 def fetch_user_activities(user, reading_user, %{total: true} = params) do
653 result = fetch_activities_for_user(user, reading_user, params)
655 Keyword.put(result, :items, Enum.reverse(result[:items]))
658 def fetch_user_activities(user, reading_user, params) do
660 |> fetch_activities_for_user(reading_user, params)
664 defp fetch_activities_for_user(user, reading_user, params) do
667 |> Map.put(:type, ["Create", "Announce"])
668 |> Map.put(:user, reading_user)
669 |> Map.put(:actor_id, user.ap_id)
670 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
673 if User.blocks?(reading_user, user) do
677 |> Map.put(:blocking_user, reading_user)
678 |> Map.put(:muting_user, reading_user)
681 pagination_type = Map.get(params, :pagination_type) || :keyset
684 godmode: params[:godmode],
685 reading_user: reading_user
687 |> user_activities_recipients()
688 |> fetch_activities(params, pagination_type)
691 def fetch_statuses(reading_user, %{total: true} = params) do
692 result = fetch_activities_for_reading_user(reading_user, params)
693 Keyword.put(result, :items, Enum.reverse(result[:items]))
696 def fetch_statuses(reading_user, params) do
698 |> fetch_activities_for_reading_user(params)
702 defp fetch_activities_for_reading_user(reading_user, params) do
703 params = Map.put(params, :type, ["Create", "Announce"])
706 godmode: params[:godmode],
707 reading_user: reading_user
709 |> user_activities_recipients()
710 |> fetch_activities(params, :offset)
713 defp user_activities_recipients(%{godmode: true}), do: []
715 defp user_activities_recipients(%{reading_user: reading_user}) do
716 if not is_nil(reading_user) and reading_user.local do
718 Constants.as_public(),
720 reading_user.ap_id | User.following(reading_user)
723 [Constants.as_public()]
727 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
728 raise "Can't use the child object without preloading!"
731 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
733 [activity, object] in query,
736 "?->>'type' != ? or ?->>'actor' != ?",
745 defp restrict_announce_object_actor(query, _), do: query
747 defp restrict_since(query, %{since_id: ""}), do: query
749 defp restrict_since(query, %{since_id: since_id}) do
750 from(activity in query, where: activity.id > ^since_id)
753 defp restrict_since(query, _), do: query
755 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
756 raise_on_missing_preload()
759 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
761 [_activity, object] in query,
762 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
766 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
767 restrict_embedded_tag_any(query, %{tag: tag})
770 defp restrict_embedded_tag_all(query, _), do: query
772 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
773 raise_on_missing_preload()
776 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
778 [_activity, object] in query,
779 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
783 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
784 restrict_embedded_tag_any(query, %{tag: [tag]})
787 defp restrict_embedded_tag_any(query, _), do: query
789 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
790 raise_on_missing_preload()
793 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
795 [_activity, object] in query,
796 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
800 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
801 when is_binary(tag_reject) do
802 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
805 defp restrict_embedded_tag_reject_any(query, _), do: query
807 defp object_ids_query_for_tags(tags) do
808 from(hto in "hashtags_objects")
809 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
810 |> where([hto, ht], ht.name in ^tags)
811 |> select([hto], hto.object_id)
812 |> distinct([hto], true)
815 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
816 raise_on_missing_preload()
819 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
820 restrict_hashtag_any(query, %{tag: single_tag})
823 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
825 [_activity, object] in query,
829 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
830 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
831 AND hashtags_objects.object_id = ?) @> ?
840 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
841 restrict_hashtag_all(query, %{tag_all: [tag]})
844 defp restrict_hashtag_all(query, _), do: query
846 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
847 raise_on_missing_preload()
850 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
852 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
855 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
857 [_activity, object] in query,
858 join: hto in "hashtags_objects",
859 on: hto.object_id == object.id,
860 where: hto.hashtag_id in ^hashtag_ids,
861 distinct: [desc: object.id],
862 order_by: [desc: object.id]
866 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
867 restrict_hashtag_any(query, %{tag: [tag]})
870 defp restrict_hashtag_any(query, _), do: query
872 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
873 raise_on_missing_preload()
876 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
878 [_activity, object] in query,
879 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
883 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
884 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
887 defp restrict_hashtag_reject_any(query, _), do: query
889 defp raise_on_missing_preload do
890 raise "Can't use the child object without preloading!"
893 defp restrict_recipients(query, [], _user), do: query
895 defp restrict_recipients(query, recipients, nil) do
896 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
899 defp restrict_recipients(query, recipients, user) do
902 where: fragment("? && ?", ^recipients, activity.recipients),
903 or_where: activity.actor == ^user.ap_id
907 defp restrict_local(query, %{local_only: true}) do
908 from(activity in query, where: activity.local == true)
911 defp restrict_local(query, _), do: query
913 defp restrict_remote(query, %{remote: true}) do
914 from(activity in query, where: activity.local == false)
917 defp restrict_remote(query, _), do: query
919 defp restrict_actor(query, %{actor_id: actor_id}) do
920 from(activity in query, where: activity.actor == ^actor_id)
923 defp restrict_actor(query, _), do: query
925 defp restrict_type(query, %{type: type}) when is_binary(type) do
926 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
929 defp restrict_type(query, %{type: type}) do
930 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
933 defp restrict_type(query, _), do: query
935 defp restrict_state(query, %{state: state}) do
936 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
939 defp restrict_state(query, _), do: query
941 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
943 [_activity, object] in query,
944 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
948 defp restrict_favorited_by(query, _), do: query
950 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
951 raise "Can't use the child object without preloading!"
954 defp restrict_media(query, %{only_media: true}) do
956 [activity, object] in query,
957 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
958 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
962 defp restrict_media(query, _), do: query
964 defp restrict_replies(query, %{exclude_replies: true}) do
966 [_activity, object] in query,
967 where: fragment("?->>'inReplyTo' is null", object.data)
971 defp restrict_replies(query, %{
972 reply_filtering_user: %User{} = user,
973 reply_visibility: "self"
976 [activity, object] in query,
979 "?->>'inReplyTo' is null OR ? = ANY(?)",
987 defp restrict_replies(query, %{
988 reply_filtering_user: %User{} = user,
989 reply_visibility: "following"
992 [activity, object] in query,
996 ?->>'type' != 'Create' -- This isn't a Create
997 OR ?->>'inReplyTo' is null -- this isn't a reply
998 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
999 -- unless they are the author (because authors
1000 -- are also part of the recipients). This leads
1001 -- to a bug that self-replies by friends won't
1003 OR ? = ? -- The actor is us
1007 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
1008 activity.recipients,
1016 defp restrict_replies(query, _), do: query
1018 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
1019 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1022 defp restrict_reblogs(query, _), do: query
1024 defp restrict_muted(query, %{with_muted: true}), do: query
1026 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1027 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1030 from([activity] in query,
1031 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1034 "not (?->'to' \\?| ?) or ? = ?",
1042 unless opts[:skip_preload] do
1043 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1049 defp restrict_muted(query, _), do: query
1051 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1052 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1053 domain_blocks = user.domain_blocks || []
1055 following_ap_ids = User.get_friends_ap_ids(user)
1058 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1061 [activity, object: o] in query,
1062 # You don't block the author
1063 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1065 # You don't block any recipients, and didn't author the post
1068 "((not (? && ?)) or ? = ?)",
1069 activity.recipients,
1075 # You don't block the domain of any recipients, and didn't author the post
1078 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1079 activity.recipients,
1085 # It's not a boost of a user you block
1088 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1094 # You don't block the author's domain, and also don't follow the author
1097 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1104 # Same as above, but checks the Object
1107 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1116 defp restrict_blocked(query, _), do: query
1118 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1119 if Config.get([:activitypub, :blockers_visible]) == true do
1122 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1126 # The author doesn't block you
1127 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1129 # It's not a boost of a user that blocks you
1132 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1141 defp restrict_blockers_visibility(query, _), do: query
1143 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1148 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1150 ^[Constants.as_public()]
1155 defp restrict_unlisted(query, _), do: query
1157 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1159 [activity, object: o] in query,
1162 "(?)->>'type' = 'Create' and associated_object_id((?)) = any (?)",
1170 defp restrict_pinned(query, _), do: query
1172 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1173 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1179 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1187 defp restrict_muted_reblogs(query, _), do: query
1189 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1192 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1196 defp restrict_instance(query, _), do: query
1198 defp restrict_filtered(query, %{user: %User{} = user}) do
1199 case Filter.compose_regex(user) do
1204 from([activity, object] in query,
1206 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1207 activity.actor == ^user.ap_id
1212 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1213 restrict_filtered(query, %{user: user})
1216 defp restrict_filtered(query, _), do: query
1218 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1220 defp exclude_poll_votes(query, _) do
1221 if has_named_binding?(query, :object) do
1222 from([activity, object: o] in query,
1223 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1230 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1232 defp exclude_chat_messages(query, _) do
1233 if has_named_binding?(query, :object) do
1234 from([activity, object: o] in query,
1235 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1242 defp exclude_invisible_actors(query, %{type: "Flag"}), do: query
1243 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1245 defp exclude_invisible_actors(query, _opts) do
1247 |> join(:inner, [activity], u in User,
1249 on: activity.actor == u.ap_id and u.invisible == false
1253 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1254 from(activity in query, where: activity.id != ^id)
1257 defp exclude_id(query, _), do: query
1259 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1261 defp maybe_preload_objects(query, _) do
1263 |> Activity.with_preloaded_object()
1266 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1268 defp maybe_preload_bookmarks(query, opts) do
1270 |> Activity.with_preloaded_bookmark(opts[:user])
1273 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1275 |> Activity.with_preloaded_report_notes()
1278 defp maybe_preload_report_notes(query, _), do: query
1280 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1282 defp maybe_set_thread_muted_field(query, opts) do
1284 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1287 defp maybe_order(query, %{order: :desc}) do
1289 |> order_by(desc: :id)
1292 defp maybe_order(query, %{order: :asc}) do
1294 |> order_by(asc: :id)
1297 defp maybe_order(query, _), do: query
1299 defp normalize_fetch_activities_query_opts(opts) do
1300 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1302 value when is_bitstring(value) ->
1303 Map.put(opts, key, Hashtag.normalize_name(value))
1305 value when is_list(value) ->
1308 |> Enum.map(&Hashtag.normalize_name/1)
1311 Map.put(opts, key, normalized_value)
1319 defp fetch_activities_query_ap_ids_ops(opts) do
1320 source_user = opts[:muting_user]
1321 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1323 ap_id_relationships =
1324 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1325 [:block | ap_id_relationships]
1330 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1332 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1333 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1335 restrict_muted_reblogs_opts =
1336 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1338 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1341 def fetch_activities_query(recipients, opts \\ %{}) do
1342 opts = normalize_fetch_activities_query_opts(opts)
1344 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1345 fetch_activities_query_ap_ids_ops(opts)
1348 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1353 |> maybe_preload_objects(opts)
1354 |> maybe_preload_bookmarks(opts)
1355 |> maybe_preload_report_notes(opts)
1356 |> maybe_set_thread_muted_field(opts)
1357 |> maybe_order(opts)
1358 |> restrict_recipients(recipients, opts[:user])
1359 |> restrict_replies(opts)
1360 |> restrict_since(opts)
1361 |> restrict_local(opts)
1362 |> restrict_remote(opts)
1363 |> restrict_actor(opts)
1364 |> restrict_type(opts)
1365 |> restrict_state(opts)
1366 |> restrict_favorited_by(opts)
1367 |> restrict_blocked(restrict_blocked_opts)
1368 |> restrict_blockers_visibility(opts)
1369 |> restrict_muted(restrict_muted_opts)
1370 |> restrict_filtered(opts)
1371 |> restrict_media(opts)
1372 |> restrict_visibility(opts)
1373 |> restrict_thread_visibility(opts, config)
1374 |> restrict_reblogs(opts)
1375 |> restrict_pinned(opts)
1376 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1377 |> restrict_instance(opts)
1378 |> restrict_announce_object_actor(opts)
1379 |> restrict_filtered(opts)
1380 |> maybe_restrict_deactivated_users(opts)
1381 |> exclude_poll_votes(opts)
1382 |> exclude_chat_messages(opts)
1383 |> exclude_invisible_actors(opts)
1384 |> exclude_visibility(opts)
1386 if Config.feature_enabled?(:improved_hashtag_timeline) do
1388 |> restrict_hashtag_any(opts)
1389 |> restrict_hashtag_all(opts)
1390 |> restrict_hashtag_reject_any(opts)
1393 |> restrict_embedded_tag_any(opts)
1394 |> restrict_embedded_tag_all(opts)
1395 |> restrict_embedded_tag_reject_any(opts)
1400 Fetch favorites activities of user with order by sort adds to favorites
1402 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1403 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1405 |> Activity.Queries.by_actor()
1406 |> Activity.Queries.by_type("Like")
1407 |> Activity.with_joined_object()
1408 |> Object.with_joined_activity()
1409 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1410 |> order_by([like, _, _], desc_nulls_last: like.id)
1411 |> Pagination.fetch_paginated(
1412 Map.merge(params, %{skip_order: true}),
1417 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1418 Enum.map(activities, fn
1419 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1420 if Enum.any?(bcc, &(&1 in list_memberships)) do
1421 update_in(activity.data["cc"], &[user_ap_id | &1])
1431 defp maybe_update_cc(activities, _, _), do: activities
1433 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1434 from(activity in query,
1436 fragment("? && ?", activity.recipients, ^recipients) or
1437 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1438 ^Constants.as_public() in activity.recipients)
1442 def fetch_activities_bounded(
1444 recipients_with_public,
1446 pagination \\ :keyset
1448 fetch_activities_query([], opts)
1449 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1450 |> Pagination.fetch_paginated(opts, pagination)
1454 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1455 def upload(file, opts \\ []) do
1456 with {:ok, data} <- Upload.store(sanitize_upload_file(file), opts) do
1457 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1459 Repo.insert(%Object{data: obj_data})
1463 defp sanitize_upload_file(%Plug.Upload{filename: filename} = upload) when is_binary(filename) do
1466 | filename: Path.basename(filename)
1470 defp sanitize_upload_file(upload), do: upload
1472 @spec get_actor_url(any()) :: binary() | nil
1473 defp get_actor_url(url) when is_binary(url), do: url
1474 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1476 defp get_actor_url(url) when is_list(url) do
1482 defp get_actor_url(_url), do: nil
1484 defp normalize_image(%{"url" => url}) do
1487 "url" => [%{"href" => url}]
1491 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1492 defp normalize_image(_), do: nil
1494 defp object_to_user_data(data, additional) do
1497 |> Map.get("attachment", [])
1498 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1499 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1503 |> Map.get("tag", [])
1505 %{"type" => "Emoji"} -> true
1508 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1509 {String.trim(name, ":"), url}
1512 is_locked = data["manuallyApprovesFollowers"] || false
1513 capabilities = data["capabilities"] || %{}
1514 accepts_chat_messages = capabilities["acceptsChatMessages"]
1515 data = Transmogrifier.maybe_fix_user_object(data)
1516 is_discoverable = data["discoverable"] || false
1517 invisible = data["invisible"] || false
1518 actor_type = data["type"] || "Person"
1520 featured_address = data["featured"]
1521 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1524 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1525 data["publicKey"]["publicKeyPem"]
1529 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1530 data["endpoints"]["sharedInbox"]
1534 if is_binary(data["vcard:bday"]) do
1535 case Date.from_iso8601(data["vcard:bday"]) do
1541 show_birthday = !!birthday
1543 # if WebFinger request was already done, we probably have acct, otherwise
1544 # we request WebFinger here
1545 nickname = additional[:nickname_from_acct] || generate_nickname(data)
1549 uri: get_actor_url(data["url"]),
1551 banner: normalize_image(data["image"]),
1554 is_locked: is_locked,
1555 is_discoverable: is_discoverable,
1556 invisible: invisible,
1557 avatar: normalize_image(data["icon"]),
1559 follower_address: data["followers"],
1560 following_address: data["following"],
1561 featured_address: featured_address,
1562 bio: data["summary"] || "",
1563 actor_type: actor_type,
1564 also_known_as: Map.get(data, "alsoKnownAs", []),
1565 public_key: public_key,
1566 inbox: data["inbox"],
1567 shared_inbox: shared_inbox,
1568 accepts_chat_messages: accepts_chat_messages,
1570 show_birthday: show_birthday,
1571 pinned_objects: pinned_objects,
1576 defp generate_nickname(%{"preferredUsername" => username} = data) when is_binary(username) do
1577 generated = "#{username}@#{URI.parse(data["id"]).host}"
1579 if Config.get([WebFinger, :update_nickname_on_user_fetch]) do
1580 case WebFinger.finger(generated) do
1581 {:ok, %{"subject" => "acct:" <> acct}} -> acct
1589 # nickname can be nil because of virtual actors
1590 defp generate_nickname(_), do: nil
1592 def fetch_follow_information_for_user(user) do
1593 with {:ok, following_data} <-
1594 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1595 {:ok, hide_follows} <- collection_private(following_data),
1596 {:ok, followers_data} <-
1597 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1598 {:ok, hide_followers} <- collection_private(followers_data) do
1601 hide_follows: hide_follows,
1602 follower_count: normalize_counter(followers_data["totalItems"]),
1603 following_count: normalize_counter(following_data["totalItems"]),
1604 hide_followers: hide_followers
1607 {:error, _} = e -> e
1612 defp normalize_counter(counter) when is_integer(counter), do: counter
1613 defp normalize_counter(_), do: 0
1615 def maybe_update_follow_information(user_data) do
1616 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1617 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1619 {:collections_available,
1620 !!(user_data[:following_address] && user_data[:follower_address])},
1622 fetch_follow_information_for_user(user_data) do
1623 info = Map.merge(user_data[:info] || %{}, info)
1626 |> Map.put(:info, info)
1628 {:user_type_check, false} ->
1631 {:collections_available, false} ->
1634 {:enabled, false} ->
1639 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1646 defp collection_private(%{"first" => %{"type" => type}})
1647 when type in ["CollectionPage", "OrderedCollectionPage"],
1650 defp collection_private(%{"first" => first}) do
1651 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1652 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1655 {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1656 {:error, _} = e -> e
1661 defp collection_private(_data), do: {:ok, true}
1663 def user_data_from_user_object(data, additional \\ []) do
1664 with {:ok, data} <- MRF.filter(data) do
1665 {:ok, object_to_user_data(data, additional)}
1671 def fetch_and_prepare_user_from_ap_id(ap_id, additional \\ []) do
1672 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1673 {:ok, data} <- user_data_from_user_object(data, additional) do
1674 {:ok, maybe_update_follow_information(data)}
1676 # If this has been deleted, only log a debug and not an error
1677 {:error, "Object has been deleted" = e} ->
1678 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1681 {:error, {:reject, reason} = e} ->
1682 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1686 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1691 def maybe_handle_clashing_nickname(data) do
1692 with nickname when is_binary(nickname) <- data[:nickname],
1693 %User{} = old_user <- User.get_by_nickname(nickname),
1694 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1696 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1700 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1701 |> User.update_and_set_cache()
1703 {:ap_id_comparison, true} ->
1705 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1713 def pin_data_from_featured_collection(%{
1715 "orderedItems" => objects
1717 when type in ["OrderedCollection", "Collection"] do
1719 %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()}
1720 object_ap_id when is_binary(object_ap_id) -> {object_ap_id, NaiveDateTime.utc_now()}
1724 def fetch_and_prepare_featured_from_ap_id(nil) do
1728 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1729 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1730 {:ok, pin_data_from_featured_collection(data)}
1733 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1738 def pinned_fetch_task(nil), do: nil
1740 def pinned_fetch_task(%{pinned_objects: pins}) do
1741 if Enum.all?(pins, fn {ap_id, _} ->
1742 Object.get_cached_by_ap_id(ap_id) ||
1743 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1751 def make_user_from_ap_id(ap_id, additional \\ []) do
1752 user = User.get_cached_by_ap_id(ap_id)
1754 if user && !User.ap_enabled?(user) do
1755 Transmogrifier.upgrade_user_from_ap_id(ap_id)
1757 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, additional) do
1758 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1762 |> User.remote_user_changeset(data)
1763 |> User.update_and_set_cache()
1765 maybe_handle_clashing_nickname(data)
1768 |> User.remote_user_changeset()
1776 def make_user_from_nickname(nickname) do
1777 with {:ok, %{"ap_id" => ap_id, "subject" => "acct:" <> acct}} when not is_nil(ap_id) <-
1778 WebFinger.finger(nickname) do
1779 make_user_from_ap_id(ap_id, nickname_from_acct: acct)
1781 _e -> {:error, "No AP id in WebFinger"}
1785 # filter out broken threads
1786 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1787 entire_thread_visible_for_user?(activity, user)
1790 # do post-processing on a specific activity
1791 def contain_activity(%Activity{} = activity, %User{} = user) do
1792 contain_broken_threads(activity, user)
1795 def fetch_direct_messages_query do
1797 |> restrict_type(%{type: "Create"})
1798 |> restrict_visibility(%{visibility: "direct"})
1799 |> order_by([activity], asc: activity.id)
1802 defp maybe_restrict_deactivated_users(activity, %{type: "Flag"}), do: activity
1804 defp maybe_restrict_deactivated_users(activity, _opts),
1805 do: Activity.restrict_deactivated_users(activity)