1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
7 alias Pleroma.Activity.Ir.Topics
9 alias Pleroma.Constants
10 alias Pleroma.Conversation
11 alias Pleroma.Conversation.Participation
15 alias Pleroma.Notification
17 alias Pleroma.Object.Containment
18 alias Pleroma.Object.Fetcher
19 alias Pleroma.Pagination
23 alias Pleroma.Web.ActivityPub.MRF
24 alias Pleroma.Web.ActivityPub.Transmogrifier
25 alias Pleroma.Web.Streamer
26 alias Pleroma.Web.WebFinger
27 alias Pleroma.Workers.BackgroundWorker
28 alias Pleroma.Workers.PollWorker
31 import Pleroma.Web.ActivityPub.Utils
32 import Pleroma.Web.ActivityPub.Visibility
35 require Pleroma.Constants
37 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
38 @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
40 defp get_recipients(%{"type" => "Create"} = data) do
41 to = Map.get(data, "to", [])
42 cc = Map.get(data, "cc", [])
43 bcc = Map.get(data, "bcc", [])
44 actor = Map.get(data, "actor", [])
45 recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
49 defp get_recipients(data) do
50 to = Map.get(data, "to", [])
51 cc = Map.get(data, "cc", [])
52 bcc = Map.get(data, "bcc", [])
53 recipients = Enum.concat([to, cc, bcc])
57 defp check_actor_can_insert(%{"type" => "Delete"}), do: true
58 defp check_actor_can_insert(%{"type" => "Undo"}), do: true
60 defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
61 case User.get_cached_by_ap_id(actor) do
62 %User{is_active: true} -> true
67 defp check_actor_can_insert(_), do: true
69 defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
70 limit = Config.get([:instance, :remote_limit])
71 String.length(content) <= limit
74 defp check_remote_limit(_), do: true
76 def increase_note_count_if_public(actor, object) do
77 if public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
80 def decrease_note_count_if_public(actor, object) do
81 if public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
84 def update_last_status_at_if_public(actor, object) do
85 if public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
88 defp increase_replies_count_if_reply(%{
89 "object" => %{"inReplyTo" => reply_ap_id} = object,
93 Object.increase_replies_count(reply_ap_id)
97 defp increase_replies_count_if_reply(_create_data), do: :noop
99 defp increase_quotes_count_if_quote(%{
100 "object" => %{"quoteUrl" => quote_ap_id} = object,
103 if public?(object) do
104 Object.increase_quotes_count(quote_ap_id)
108 defp increase_quotes_count_if_quote(_create_data), do: :noop
110 @object_types ~w[ChatMessage Question Answer Audio Video Image Event Article Note Page]
112 def persist(%{"type" => type} = object, meta) when type in @object_types do
113 with {:ok, object} <- Object.create(object) do
119 def persist(object, meta) do
120 with local <- Keyword.fetch!(meta, :local),
121 {recipients, _, _} <- get_recipients(object),
123 Repo.insert(%Activity{
126 recipients: recipients,
127 actor: object["actor"]
129 # TODO: add tests for expired activities, when Note type will be supported in new pipeline
130 {:ok, _} <- maybe_create_activity_expiration(activity) do
131 {:ok, activity, meta}
135 @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
136 def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
137 with nil <- Activity.normalize(map),
138 map <- lazy_put_activity_defaults(map, fake),
139 {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
140 {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
141 {:ok, map} <- MRF.filter(map),
142 {recipients, _, _} = get_recipients(map),
143 {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
144 {:containment, :ok} <- {:containment, Containment.contain_child(map)},
145 {:ok, map, object} <- insert_full_object(map),
146 {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
147 # Splice in the child object if we have one.
148 activity = Maps.put_if_present(activity, :object, object)
150 Pleroma.Web.RichMedia.Card.get_by_activity(activity)
152 # Add local posts to search index
153 if local, do: Pleroma.Search.add_to_index(activity)
157 %Activity{} = activity ->
163 {:containment, _} = error ->
166 {:error, _} = error ->
169 {:fake, true, map, recipients} ->
170 activity = %Activity{
174 recipients: recipients,
178 Pleroma.Web.RichMedia.Card.get_by_activity(activity)
181 {:remote_limit_pass, _} ->
182 {:error, :remote_limit}
189 defp insert_activity_with_expiration(data, local, recipients) do
193 actor: data["actor"],
194 recipients: recipients
197 with {:ok, activity} <- Repo.insert(struct) do
198 maybe_create_activity_expiration(activity)
202 def notify_and_stream(activity) do
203 Notification.create_notifications(activity)
207 %{data: %{"type" => "Update"}, object: %{data: %{"id" => id}}} ->
208 Activity.get_create_by_object_ap_id_with_object(id)
214 conversation = create_or_bump_conversation(original_activity, original_activity.actor)
215 participations = get_participations(conversation)
217 stream_out_participations(participations)
220 defp maybe_create_activity_expiration(
221 %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
224 Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
225 activity_id: activity.id,
226 expires_at: expires_at
232 defp maybe_create_activity_expiration(activity), do: {:ok, activity}
234 defp create_or_bump_conversation(activity, actor) do
235 with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
236 %User{} = user <- User.get_cached_by_ap_id(actor) do
237 Participation.mark_as_read(user, conversation)
242 defp get_participations({:ok, conversation}) do
244 |> Repo.preload(:participations, force: true)
245 |> Map.get(:participations)
248 defp get_participations(_), do: []
250 def stream_out_participations(participations) do
253 |> Repo.preload(:user)
255 Streamer.stream("participation", participations)
259 def stream_out_participations(%Object{data: %{"context" => context}}, user) do
260 with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
261 conversation = Repo.preload(conversation, :participations)
264 fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
269 if last_activity_id do
270 stream_out_participations(conversation.participations)
276 def stream_out_participations(_, _), do: :noop
279 def stream_out(%Activity{data: %{"type" => data_type}} = activity)
280 when data_type in ["Create", "Announce", "Delete", "Update"] do
282 |> Topics.get_activity_topics()
283 |> Streamer.stream(activity)
287 def stream_out(_activity) do
291 @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
292 def create(params, fake \\ false) do
293 with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
298 defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
299 additional = params[:additional] || %{}
300 # only accept false as false value
301 local = !(params[:local] == false)
302 published = params[:published]
303 quick_insert? = Config.get([:env]) == :benchmark
307 %{to: to, actor: actor, published: published, context: context, object: object},
311 with {:ok, activity} <- insert(create_data, local, fake),
312 {:fake, false, activity} <- {:fake, fake, activity},
313 _ <- increase_replies_count_if_reply(create_data),
314 _ <- increase_quotes_count_if_quote(create_data),
315 {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
316 {:ok, _actor} <- increase_note_count_if_public(actor, activity),
317 {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
318 _ <- notify_and_stream(activity),
319 :ok <- maybe_schedule_poll_notifications(activity),
320 :ok <- maybe_handle_group_posts(activity),
321 :ok <- maybe_federate(activity) do
324 {:quick_insert, true, activity} ->
327 {:fake, true, activity} ->
331 Repo.rollback(message)
335 defp maybe_schedule_poll_notifications(activity) do
336 PollWorker.schedule_poll_end(activity)
340 @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
341 def listen(%{to: to, actor: actor, context: context, object: object} = params) do
342 additional = params[:additional] || %{}
343 # only accept false as false value
344 local = !(params[:local] == false)
345 published = params[:published]
349 %{to: to, actor: actor, published: published, context: context, object: object},
353 with {:ok, activity} <- insert(listen_data, local),
354 _ <- notify_and_stream(activity),
355 :ok <- maybe_federate(activity) do
360 @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
361 {:ok, Activity.t()} | nil | {:error, any()}
362 def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
363 with {:ok, result} <-
364 Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
369 defp do_unfollow(follower, followed, activity_id, local) do
370 with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
371 {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
372 unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
373 {:ok, activity} <- insert(unfollow_data, local),
374 _ <- notify_and_stream(activity),
375 :ok <- maybe_federate(activity) do
379 {:error, error} -> Repo.rollback(error)
383 @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
385 with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
399 # only accept false as false value
400 local = !(params[:local] == false)
401 forward = !(params[:forward] == false)
403 additional = params[:additional] || %{}
407 Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
409 Map.merge(additional, %{"to" => [], "cc" => []})
412 with flag_data <- make_flag_data(params, additional),
413 {:ok, activity} <- insert(flag_data, local),
414 {:ok, stripped_activity} <- strip_report_status_data(activity),
415 _ <- notify_and_stream(activity),
417 maybe_federate(stripped_activity) do
418 User.all_users_with_privilege(:reports_manage_reports)
419 |> Enum.filter(fn user -> user.ap_id != actor end)
420 |> Enum.filter(fn user -> not is_nil(user.email) end)
421 |> Enum.each(fn privileged_user ->
423 |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
424 |> Pleroma.Emails.Mailer.deliver_async()
429 {:error, error} -> Repo.rollback(error)
433 @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
434 def move(%User{} = origin, %User{} = target, local \\ true) do
437 "actor" => origin.ap_id,
438 "object" => origin.ap_id,
439 "target" => target.ap_id,
440 "to" => [origin.follower_address]
443 with true <- origin.ap_id in target.also_known_as,
444 {:ok, activity} <- insert(params, local),
445 _ <- notify_and_stream(activity) do
446 maybe_federate(activity)
448 BackgroundWorker.enqueue("move_following", %{
449 "origin_id" => origin.id,
450 "target_id" => target.id
455 false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
460 def fetch_activities_for_context_query(context, opts) do
461 public = [Constants.as_public()]
465 do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
468 from(activity in Activity)
469 |> maybe_preload_objects(opts)
470 |> maybe_preload_bookmarks(opts)
471 |> maybe_set_thread_muted_field(opts)
472 |> restrict_unauthenticated(opts[:user])
473 |> restrict_blocked(opts)
474 |> restrict_blockers_visibility(opts)
475 |> restrict_recipients(recipients, opts[:user])
476 |> restrict_filtered(opts)
480 "?->>'type' = ? and ?->>'context' = ?",
487 |> exclude_poll_votes(opts)
489 |> order_by([activity], desc: activity.id)
492 @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
493 def fetch_activities_for_context(context, opts \\ %{}) do
495 |> fetch_activities_for_context_query(opts)
499 @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
501 def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
503 |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
504 |> restrict_visibility(%{visibility: "direct"})
510 defp fetch_paginated_optimized(query, opts, pagination) do
511 # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
512 # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
513 opts = Map.put(opts, :skip_extra_order, true)
515 Pagination.fetch_paginated(query, opts, pagination)
518 def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
519 list_memberships = Pleroma.List.memberships(opts[:user])
521 fetch_activities_query(recipients ++ list_memberships, opts)
522 |> fetch_paginated_optimized(opts, pagination)
524 |> maybe_update_cc(list_memberships, opts[:user])
527 @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
528 def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
529 includes_local_public = Map.get(opts, :includes_local_public, false)
531 opts = Map.delete(opts, :user)
533 intended_recipients =
534 if includes_local_public do
535 [Constants.as_public(), as_local_public()]
537 [Constants.as_public()]
541 |> fetch_activities_query(opts)
542 |> restrict_unlisted(opts)
543 |> fetch_paginated_optimized(opts, pagination)
546 @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
547 def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
549 |> Map.put(:restrict_unlisted, true)
550 |> fetch_public_or_unlisted_activities(pagination)
553 @valid_visibilities ~w[direct unlisted public private]
555 defp restrict_visibility(query, %{visibility: visibility})
556 when is_list(visibility) do
557 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
562 "activity_visibility(?, ?, ?) = ANY (?)",
570 Logger.error("Could not restrict visibility to #{visibility}")
574 defp restrict_visibility(query, %{visibility: visibility})
575 when visibility in @valid_visibilities do
579 fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
583 defp restrict_visibility(_query, %{visibility: visibility})
584 when visibility not in @valid_visibilities do
585 Logger.error("Could not restrict visibility to #{visibility}")
588 defp restrict_visibility(query, _visibility), do: query
590 defp exclude_visibility(query, %{exclude_visibilities: visibility})
591 when is_list(visibility) do
592 if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
597 "activity_visibility(?, ?, ?) = ANY (?)",
605 Logger.error("Could not exclude visibility to #{visibility}")
610 defp exclude_visibility(query, %{exclude_visibilities: visibility})
611 when visibility in @valid_visibilities do
616 "activity_visibility(?, ?, ?) = ?",
625 defp exclude_visibility(query, %{exclude_visibilities: visibility})
626 when visibility not in [nil | @valid_visibilities] do
627 Logger.error("Could not exclude visibility to #{visibility}")
631 defp exclude_visibility(query, _visibility), do: query
633 defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
636 defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
639 defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
640 local_public = as_local_public()
644 where: fragment("thread_visibility(?, (?)->>'id', ?) = true", ^ap_id, a.data, ^local_public)
648 defp restrict_thread_visibility(query, _, _), do: query
650 def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
653 |> Map.put(:user, reading_user)
654 |> Map.put(:actor_id, user.ap_id)
657 godmode: params[:godmode],
658 reading_user: reading_user
660 |> user_activities_recipients()
661 |> fetch_activities(params)
665 def fetch_user_activities(user, reading_user, params \\ %{})
667 def fetch_user_activities(user, reading_user, %{total: true} = params) do
668 result = fetch_activities_for_user(user, reading_user, params)
670 Keyword.put(result, :items, Enum.reverse(result[:items]))
673 def fetch_user_activities(user, reading_user, params) do
675 |> fetch_activities_for_user(reading_user, params)
679 defp fetch_activities_for_user(user, reading_user, params) do
682 |> Map.put(:type, ["Create", "Announce"])
683 |> Map.put(:user, reading_user)
684 |> Map.put(:actor_id, user.ap_id)
685 |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
688 if User.blocks?(reading_user, user) do
692 |> Map.put(:blocking_user, reading_user)
693 |> Map.put(:muting_user, reading_user)
696 pagination_type = Map.get(params, :pagination_type) || :keyset
699 godmode: params[:godmode],
700 reading_user: reading_user
702 |> user_activities_recipients()
703 |> fetch_activities(params, pagination_type)
706 def fetch_statuses(reading_user, %{total: true} = params) do
707 result = fetch_activities_for_reading_user(reading_user, params)
708 Keyword.put(result, :items, Enum.reverse(result[:items]))
711 def fetch_statuses(reading_user, params) do
713 |> fetch_activities_for_reading_user(params)
717 defp fetch_activities_for_reading_user(reading_user, params) do
718 params = Map.put(params, :type, ["Create", "Announce"])
721 godmode: params[:godmode],
722 reading_user: reading_user
724 |> user_activities_recipients()
725 |> fetch_activities(params, :offset)
728 defp user_activities_recipients(%{godmode: true}), do: []
730 defp user_activities_recipients(%{reading_user: reading_user}) do
731 if not is_nil(reading_user) and reading_user.local do
733 Constants.as_public(),
735 reading_user.ap_id | User.following(reading_user)
738 [Constants.as_public()]
742 defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
743 raise "Can't use the child object without preloading!"
746 defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
748 [activity, object] in query,
751 "?->>'type' != ? or ?->>'actor' != ?",
760 defp restrict_announce_object_actor(query, _), do: query
762 defp restrict_since(query, %{since_id: ""}), do: query
764 defp restrict_since(query, %{since_id: since_id}) do
765 from(activity in query, where: activity.id > ^since_id)
768 defp restrict_since(query, _), do: query
770 defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
771 raise_on_missing_preload()
774 defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
776 [_activity, object] in query,
777 where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
781 defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
782 restrict_embedded_tag_any(query, %{tag: tag})
785 defp restrict_embedded_tag_all(query, _), do: query
787 defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
788 raise_on_missing_preload()
791 defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
793 [_activity, object] in query,
794 where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
798 defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
799 restrict_embedded_tag_any(query, %{tag: [tag]})
802 defp restrict_embedded_tag_any(query, _), do: query
804 defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
805 raise_on_missing_preload()
808 defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
810 [_activity, object] in query,
811 where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
815 defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
816 when is_binary(tag_reject) do
817 restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
820 defp restrict_embedded_tag_reject_any(query, _), do: query
822 defp object_ids_query_for_tags(tags) do
823 from(hto in "hashtags_objects")
824 |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
825 |> where([hto, ht], ht.name in ^tags)
826 |> select([hto], hto.object_id)
827 |> distinct([hto], true)
830 defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
831 raise_on_missing_preload()
834 defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
835 restrict_hashtag_any(query, %{tag: single_tag})
838 defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
840 [_activity, object] in query,
844 (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
845 ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
846 AND hashtags_objects.object_id = ?) @> ?
855 defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
856 restrict_hashtag_all(query, %{tag_all: [tag]})
859 defp restrict_hashtag_all(query, _), do: query
861 defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
862 raise_on_missing_preload()
865 defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
867 from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
870 # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
872 [_activity, object] in query,
873 join: hto in "hashtags_objects",
874 on: hto.object_id == object.id,
875 where: hto.hashtag_id in ^hashtag_ids,
876 distinct: [desc: object.id],
877 order_by: [desc: object.id]
881 defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
882 restrict_hashtag_any(query, %{tag: [tag]})
885 defp restrict_hashtag_any(query, _), do: query
887 defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
888 raise_on_missing_preload()
891 defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
893 [_activity, object] in query,
894 where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
898 defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
899 restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
902 defp restrict_hashtag_reject_any(query, _), do: query
904 defp raise_on_missing_preload do
905 raise "Can't use the child object without preloading!"
908 defp restrict_recipients(query, [], _user), do: query
910 defp restrict_recipients(query, recipients, nil) do
911 from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
914 defp restrict_recipients(query, recipients, user) do
917 where: fragment("? && ?", ^recipients, activity.recipients),
918 or_where: activity.actor == ^user.ap_id
922 defp restrict_local(query, %{local_only: true}) do
923 from(activity in query, where: activity.local == true)
926 defp restrict_local(query, _), do: query
928 defp restrict_remote(query, %{remote: true}) do
929 from(activity in query, where: activity.local == false)
932 defp restrict_remote(query, _), do: query
934 defp restrict_actor(query, %{actor_id: actor_id}) do
935 from(activity in query, where: activity.actor == ^actor_id)
938 defp restrict_actor(query, _), do: query
940 defp restrict_type(query, %{type: type}) when is_binary(type) do
941 from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
944 defp restrict_type(query, %{type: type}) do
945 from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
948 defp restrict_type(query, _), do: query
950 defp restrict_state(query, %{state: state}) do
951 from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
954 defp restrict_state(query, _), do: query
956 defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
958 [_activity, object] in query,
959 where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
963 defp restrict_favorited_by(query, _), do: query
965 defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
966 raise "Can't use the child object without preloading!"
969 defp restrict_media(query, %{only_media: true}) do
971 [activity, object] in query,
972 where: fragment("(?)->>'type' = ?", activity.data, "Create"),
973 where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
977 defp restrict_media(query, _), do: query
979 defp restrict_replies(query, %{exclude_replies: true}) do
981 [_activity, object] in query,
982 where: fragment("?->>'inReplyTo' is null", object.data)
986 defp restrict_replies(query, %{
987 reply_filtering_user: %User{} = user,
988 reply_visibility: "self"
991 [activity, object] in query,
994 "?->>'inReplyTo' is null OR ? = ANY(?)",
1002 defp restrict_replies(query, %{
1003 reply_filtering_user: %User{} = user,
1004 reply_visibility: "following"
1007 [activity, object] in query,
1011 ?->>'type' != 'Create' -- This isn't a Create
1012 OR ?->>'inReplyTo' is null -- this isn't a reply
1013 OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
1014 -- unless they are the author (because authors
1015 -- are also part of the recipients). This leads
1016 -- to a bug that self-replies by friends won't
1018 OR ? = ? -- The actor is us
1022 ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
1023 activity.recipients,
1031 defp restrict_replies(query, _), do: query
1033 defp restrict_reblogs(query, %{exclude_reblogs: true}) do
1034 from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1037 defp restrict_reblogs(query, _), do: query
1039 defp restrict_muted(query, %{with_muted: true}), do: query
1041 defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1042 mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1045 from([activity] in query,
1046 where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1049 "not (?->'to' \\?| ?) or ? = ?",
1057 unless opts[:skip_preload] do
1058 from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1064 defp restrict_muted(query, _), do: query
1066 defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1067 blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1068 domain_blocks = user.domain_blocks || []
1070 following_ap_ids = User.get_friends_ap_ids(user)
1073 if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1076 [activity, object: o] in query,
1077 # You don't block the author
1078 where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1080 # You don't block any recipients, and didn't author the post
1083 "((not (? && ?)) or ? = ?)",
1084 activity.recipients,
1090 # You don't block the domain of any recipients, and didn't author the post
1093 "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1094 activity.recipients,
1100 # It's not a boost of a user you block
1103 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1109 # You don't block the author's domain, and also don't follow the author
1112 "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1119 # Same as above, but checks the Object
1122 "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1131 defp restrict_blocked(query, _), do: query
1133 defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1134 if Config.get([:activitypub, :blockers_visible]) == true do
1137 blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1141 # The author doesn't block you
1142 where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1144 # It's not a boost of a user that blocks you
1147 "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1156 defp restrict_blockers_visibility(query, _), do: query
1158 defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1163 "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1165 ^[Constants.as_public()]
1170 defp restrict_unlisted(query, _), do: query
1172 defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1174 [activity, object: o] in query,
1177 "(?)->>'type' = 'Create' and associated_object_id((?)) = any (?)",
1185 defp restrict_pinned(query, _), do: query
1187 defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1188 muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1194 "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1202 defp restrict_muted_reblogs(query, _), do: query
1204 defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1207 where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1211 defp restrict_instance(query, _), do: query
1213 defp restrict_filtered(query, %{user: %User{} = user}) do
1214 case Filter.compose_regex(user) do
1219 from([activity, object] in query,
1221 fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1222 activity.actor == ^user.ap_id
1227 defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1228 restrict_filtered(query, %{user: user})
1231 defp restrict_filtered(query, _), do: query
1233 defp restrict_unauthenticated(query, nil) do
1234 local = Config.restrict_unauthenticated_access?(:activities, :local)
1235 remote = Config.restrict_unauthenticated_access?(:activities, :remote)
1239 from(activity in query, where: false)
1242 from(activity in query, where: activity.local == false)
1245 from(activity in query, where: activity.local == true)
1252 defp restrict_unauthenticated(query, _), do: query
1254 defp restrict_quote_url(query, %{quote_url: quote_url}) do
1255 from([_activity, object] in query,
1256 where: fragment("(?)->'quoteUrl' = ?", object.data, ^quote_url)
1260 defp restrict_quote_url(query, _), do: query
1262 defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1264 defp exclude_poll_votes(query, _) do
1265 if has_named_binding?(query, :object) do
1266 from([activity, object: o] in query,
1267 where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1274 defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1276 defp exclude_chat_messages(query, _) do
1277 if has_named_binding?(query, :object) do
1278 from([activity, object: o] in query,
1279 where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1286 defp exclude_invisible_actors(query, %{type: "Flag"}), do: query
1287 defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1289 defp exclude_invisible_actors(query, _opts) do
1291 |> join(:inner, [activity], u in User,
1293 on: activity.actor == u.ap_id and u.invisible == false
1297 defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1298 from(activity in query, where: activity.id != ^id)
1301 defp exclude_id(query, _), do: query
1303 defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1305 defp maybe_preload_objects(query, _) do
1307 |> Activity.with_preloaded_object()
1310 defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1312 defp maybe_preload_bookmarks(query, opts) do
1314 |> Activity.with_preloaded_bookmark(opts[:user])
1317 defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1319 |> Activity.with_preloaded_report_notes()
1322 defp maybe_preload_report_notes(query, _), do: query
1324 defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1326 defp maybe_set_thread_muted_field(query, opts) do
1328 |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1331 defp maybe_order(query, %{order: :desc}) do
1333 |> order_by(desc: :id)
1336 defp maybe_order(query, %{order: :asc}) do
1338 |> order_by(asc: :id)
1341 defp maybe_order(query, _), do: query
1343 defp normalize_fetch_activities_query_opts(opts) do
1344 Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1346 value when is_bitstring(value) ->
1347 Map.put(opts, key, Hashtag.normalize_name(value))
1349 value when is_list(value) ->
1352 |> Enum.map(&Hashtag.normalize_name/1)
1355 Map.put(opts, key, normalized_value)
1363 defp fetch_activities_query_ap_ids_ops(opts) do
1364 source_user = opts[:muting_user]
1365 ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1367 ap_id_relationships =
1368 if opts[:blocking_user] && opts[:blocking_user] == source_user do
1369 [:block | ap_id_relationships]
1374 preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1376 restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1377 restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1379 restrict_muted_reblogs_opts =
1380 Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1382 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1385 def fetch_activities_query(recipients, opts \\ %{}) do
1386 opts = normalize_fetch_activities_query_opts(opts)
1388 {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1389 fetch_activities_query_ap_ids_ops(opts)
1392 skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1397 |> maybe_preload_objects(opts)
1398 |> maybe_preload_bookmarks(opts)
1399 |> maybe_preload_report_notes(opts)
1400 |> maybe_set_thread_muted_field(opts)
1401 |> maybe_order(opts)
1402 |> restrict_recipients(recipients, opts[:user])
1403 |> restrict_replies(opts)
1404 |> restrict_since(opts)
1405 |> restrict_local(opts)
1406 |> restrict_remote(opts)
1407 |> restrict_actor(opts)
1408 |> restrict_type(opts)
1409 |> restrict_state(opts)
1410 |> restrict_favorited_by(opts)
1411 |> restrict_blocked(restrict_blocked_opts)
1412 |> restrict_blockers_visibility(opts)
1413 |> restrict_muted(restrict_muted_opts)
1414 |> restrict_filtered(opts)
1415 |> restrict_media(opts)
1416 |> restrict_visibility(opts)
1417 |> restrict_thread_visibility(opts, config)
1418 |> restrict_reblogs(opts)
1419 |> restrict_pinned(opts)
1420 |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1421 |> restrict_instance(opts)
1422 |> restrict_announce_object_actor(opts)
1423 |> restrict_filtered(opts)
1424 |> restrict_quote_url(opts)
1425 |> maybe_restrict_deactivated_users(opts)
1426 |> exclude_poll_votes(opts)
1427 |> exclude_chat_messages(opts)
1428 |> exclude_invisible_actors(opts)
1429 |> exclude_visibility(opts)
1431 if Config.feature_enabled?(:improved_hashtag_timeline) do
1433 |> restrict_hashtag_any(opts)
1434 |> restrict_hashtag_all(opts)
1435 |> restrict_hashtag_reject_any(opts)
1438 |> restrict_embedded_tag_any(opts)
1439 |> restrict_embedded_tag_all(opts)
1440 |> restrict_embedded_tag_reject_any(opts)
1445 Fetch favorites activities of user with order by sort adds to favorites
1447 @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1448 def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1450 |> Activity.Queries.by_actor()
1451 |> Activity.Queries.by_type("Like")
1452 |> Activity.with_joined_object()
1453 |> Object.with_joined_activity()
1454 |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1455 |> order_by([like, _, _], desc_nulls_last: like.id)
1456 |> Pagination.fetch_paginated(
1457 Map.merge(params, %{skip_order: true}),
1462 defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1463 Enum.map(activities, fn
1464 %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1465 if Enum.any?(bcc, &(&1 in list_memberships)) do
1466 update_in(activity.data["cc"], &[user_ap_id | &1])
1476 defp maybe_update_cc(activities, _, _), do: activities
1478 defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1479 from(activity in query,
1481 fragment("? && ?", activity.recipients, ^recipients) or
1482 (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1483 ^Constants.as_public() in activity.recipients)
1487 def fetch_activities_bounded(
1489 recipients_with_public,
1491 pagination \\ :keyset
1493 fetch_activities_query([], opts)
1494 |> fetch_activities_bounded_query(recipients, recipients_with_public)
1495 |> Pagination.fetch_paginated(opts, pagination)
1499 @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1500 def upload(file, opts \\ []) do
1501 with {:ok, data} <- Upload.store(sanitize_upload_file(file), opts) do
1502 obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1504 Repo.insert(%Object{data: obj_data})
1508 defp sanitize_upload_file(%Plug.Upload{filename: filename} = upload) when is_binary(filename) do
1511 | filename: Path.basename(filename)
1515 defp sanitize_upload_file(upload), do: upload
1517 @spec get_actor_url(any()) :: binary() | nil
1518 defp get_actor_url(url) when is_binary(url), do: url
1519 defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1521 defp get_actor_url(url) when is_list(url) do
1527 defp get_actor_url(_url), do: nil
1529 defp normalize_image(%{"url" => url}) do
1532 "url" => [%{"href" => url}]
1536 defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1537 defp normalize_image(_), do: nil
1539 defp object_to_user_data(data, additional) do
1542 |> Map.get("attachment", [])
1543 |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1544 |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1548 |> Map.get("tag", [])
1550 %{"type" => "Emoji"} -> true
1553 |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1554 {String.trim(name, ":"), url}
1557 is_locked = data["manuallyApprovesFollowers"] || false
1558 capabilities = data["capabilities"] || %{}
1559 accepts_chat_messages = capabilities["acceptsChatMessages"]
1560 data = Transmogrifier.maybe_fix_user_object(data)
1561 is_discoverable = data["discoverable"] || false
1562 invisible = data["invisible"] || false
1563 actor_type = data["type"] || "Person"
1565 featured_address = data["featured"]
1566 {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1569 if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1570 data["publicKey"]["publicKeyPem"]
1574 if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1575 data["endpoints"]["sharedInbox"]
1579 if is_binary(data["vcard:bday"]) do
1580 case Date.from_iso8601(data["vcard:bday"]) do
1586 show_birthday = !!birthday
1588 # if WebFinger request was already done, we probably have acct, otherwise
1589 # we request WebFinger here
1590 nickname = additional[:nickname_from_acct] || generate_nickname(data)
1594 uri: get_actor_url(data["url"]),
1595 banner: normalize_image(data["image"]),
1598 is_locked: is_locked,
1599 is_discoverable: is_discoverable,
1600 invisible: invisible,
1601 avatar: normalize_image(data["icon"]),
1603 follower_address: data["followers"],
1604 following_address: data["following"],
1605 featured_address: featured_address,
1606 bio: data["summary"] || "",
1607 actor_type: actor_type,
1608 also_known_as: Map.get(data, "alsoKnownAs", []),
1609 public_key: public_key,
1610 inbox: data["inbox"],
1611 shared_inbox: shared_inbox,
1612 accepts_chat_messages: accepts_chat_messages,
1614 show_birthday: show_birthday,
1615 pinned_objects: pinned_objects,
1620 defp generate_nickname(%{"preferredUsername" => username} = data) when is_binary(username) do
1621 generated = "#{username}@#{URI.parse(data["id"]).host}"
1623 if Config.get([WebFinger, :update_nickname_on_user_fetch]) do
1624 case WebFinger.finger(generated) do
1625 {:ok, %{"subject" => "acct:" <> acct}} -> acct
1633 # nickname can be nil because of virtual actors
1634 defp generate_nickname(_), do: nil
1636 def fetch_follow_information_for_user(user) do
1637 with {:ok, following_data} <-
1638 Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1639 {:ok, hide_follows} <- collection_private(following_data),
1640 {:ok, followers_data} <-
1641 Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1642 {:ok, hide_followers} <- collection_private(followers_data) do
1645 hide_follows: hide_follows,
1646 follower_count: normalize_counter(followers_data["totalItems"]),
1647 following_count: normalize_counter(following_data["totalItems"]),
1648 hide_followers: hide_followers
1651 {:error, _} = e -> e
1656 defp normalize_counter(counter) when is_integer(counter), do: counter
1657 defp normalize_counter(_), do: 0
1659 def maybe_update_follow_information(user_data) do
1660 with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1661 {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1663 {:collections_available,
1664 !!(user_data[:following_address] && user_data[:follower_address])},
1666 fetch_follow_information_for_user(user_data) do
1667 info = Map.merge(user_data[:info] || %{}, info)
1670 |> Map.put(:info, info)
1672 {:user_type_check, false} ->
1675 {:collections_available, false} ->
1678 {:enabled, false} ->
1683 "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1690 defp collection_private(%{"first" => %{"type" => type}})
1691 when type in ["CollectionPage", "OrderedCollectionPage"],
1694 defp collection_private(%{"first" => first}) do
1695 with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1696 Fetcher.fetch_and_contain_remote_object_from_id(first) do
1699 {:error, _} -> {:ok, true}
1703 defp collection_private(_data), do: {:ok, true}
1705 def user_data_from_user_object(data, additional \\ []) do
1706 with {:ok, data} <- MRF.filter(data) do
1707 {:ok, object_to_user_data(data, additional)}
1713 defp fetch_and_prepare_user_from_ap_id(ap_id, additional) do
1714 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1715 {:ok, data} <- user_data_from_user_object(data, additional) do
1716 {:ok, maybe_update_follow_information(data)}
1718 # If this has been deleted, only log a debug and not an error
1719 {:error, "Object has been deleted" = e} ->
1720 Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1723 {:error, {:reject, reason} = e} ->
1724 Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1728 Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1733 def maybe_handle_clashing_nickname(data) do
1734 with nickname when is_binary(nickname) <- data[:nickname],
1735 %User{} = old_user <- User.get_by_nickname(nickname),
1736 {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1738 "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1742 |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1743 |> User.update_and_set_cache()
1745 {:ap_id_comparison, true} ->
1747 "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1755 def pin_data_from_featured_collection(%{
1757 "orderedItems" => objects
1759 when type in ["OrderedCollection", "Collection"] do
1761 %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()}
1762 object_ap_id when is_binary(object_ap_id) -> {object_ap_id, NaiveDateTime.utc_now()}
1766 def pin_data_from_featured_collection(obj) do
1767 Logger.error("Could not parse featured collection #{inspect(obj)}")
1771 def fetch_and_prepare_featured_from_ap_id(nil) do
1775 def fetch_and_prepare_featured_from_ap_id(ap_id) do
1776 with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1777 {:ok, pin_data_from_featured_collection(data)}
1780 Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1785 def pinned_fetch_task(nil), do: nil
1787 def pinned_fetch_task(%{pinned_objects: pins}) do
1788 if Enum.all?(pins, fn {ap_id, _} ->
1789 Object.get_cached_by_ap_id(ap_id) ||
1790 match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1798 def make_user_from_ap_id(ap_id, additional \\ []) do
1799 user = User.get_cached_by_ap_id(ap_id)
1801 with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, additional) do
1802 {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1806 |> User.remote_user_changeset(data)
1807 |> User.update_and_set_cache()
1809 maybe_handle_clashing_nickname(data)
1812 |> User.remote_user_changeset()
1819 def make_user_from_nickname(nickname) do
1820 with {:ok, %{"ap_id" => ap_id, "subject" => "acct:" <> acct}} when not is_nil(ap_id) <-
1821 WebFinger.finger(nickname) do
1822 make_user_from_ap_id(ap_id, nickname_from_acct: acct)
1824 _e -> {:error, "No AP id in WebFinger"}
1828 # filter out broken threads
1829 defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1830 entire_thread_visible_for_user?(activity, user)
1833 # do post-processing on a specific activity
1834 def contain_activity(%Activity{} = activity, %User{} = user) do
1835 contain_broken_threads(activity, user)
1838 def fetch_direct_messages_query do
1840 |> restrict_type(%{type: "Create"})
1841 |> restrict_visibility(%{visibility: "direct"})
1842 |> order_by([activity], asc: activity.id)
1845 defp maybe_restrict_deactivated_users(activity, %{type: "Flag"}), do: activity
1847 defp maybe_restrict_deactivated_users(activity, _opts),
1848 do: Activity.restrict_deactivated_users(activity)