total rebase
[anni] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6   alias Pleroma.Activity
7   alias Pleroma.Activity.Ir.Topics
8   alias Pleroma.Config
9   alias Pleroma.Constants
10   alias Pleroma.Conversation
11   alias Pleroma.Conversation.Participation
12   alias Pleroma.Filter
13   alias Pleroma.Hashtag
14   alias Pleroma.Maps
15   alias Pleroma.Notification
16   alias Pleroma.Object
17   alias Pleroma.Object.Containment
18   alias Pleroma.Object.Fetcher
19   alias Pleroma.Pagination
20   alias Pleroma.Repo
21   alias Pleroma.Upload
22   alias Pleroma.User
23   alias Pleroma.Web.ActivityPub.MRF
24   alias Pleroma.Web.ActivityPub.Transmogrifier
25   alias Pleroma.Web.Streamer
26   alias Pleroma.Web.WebFinger
27   alias Pleroma.Workers.BackgroundWorker
28   alias Pleroma.Workers.PollWorker
29
30   import Ecto.Query
31   import Pleroma.Web.ActivityPub.Utils
32   import Pleroma.Web.ActivityPub.Visibility
33
34   require Logger
35   require Pleroma.Constants
36
37   @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
38   @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
39
40   defp get_recipients(%{"type" => "Create"} = data) do
41     to = Map.get(data, "to", [])
42     cc = Map.get(data, "cc", [])
43     bcc = Map.get(data, "bcc", [])
44     actor = Map.get(data, "actor", [])
45     recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
46     {recipients, to, cc}
47   end
48
49   defp get_recipients(data) do
50     to = Map.get(data, "to", [])
51     cc = Map.get(data, "cc", [])
52     bcc = Map.get(data, "bcc", [])
53     recipients = Enum.concat([to, cc, bcc])
54     {recipients, to, cc}
55   end
56
57   defp check_actor_can_insert(%{"type" => "Delete"}), do: true
58   defp check_actor_can_insert(%{"type" => "Undo"}), do: true
59
60   defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
61     case User.get_cached_by_ap_id(actor) do
62       %User{is_active: true} -> true
63       _ -> false
64     end
65   end
66
67   defp check_actor_can_insert(_), do: true
68
69   defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
70     limit = Config.get([:instance, :remote_limit])
71     String.length(content) <= limit
72   end
73
74   defp check_remote_limit(_), do: true
75
76   def increase_note_count_if_public(actor, object) do
77     if public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
78   end
79
80   def decrease_note_count_if_public(actor, object) do
81     if public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
82   end
83
84   def update_last_status_at_if_public(actor, object) do
85     if public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
86   end
87
88   defp increase_replies_count_if_reply(%{
89          "object" => %{"inReplyTo" => reply_ap_id} = object,
90          "type" => "Create"
91        }) do
92     if public?(object) do
93       Object.increase_replies_count(reply_ap_id)
94     end
95   end
96
97   defp increase_replies_count_if_reply(_create_data), do: :noop
98
99   defp increase_quotes_count_if_quote(%{
100          "object" => %{"quoteUrl" => quote_ap_id} = object,
101          "type" => "Create"
102        }) do
103     if public?(object) do
104       Object.increase_quotes_count(quote_ap_id)
105     end
106   end
107
108   defp increase_quotes_count_if_quote(_create_data), do: :noop
109
110   @object_types ~w[ChatMessage Question Answer Audio Video Image Event Article Note Page]
111   @impl true
112   def persist(%{"type" => type} = object, meta) when type in @object_types do
113     with {:ok, object} <- Object.create(object) do
114       {:ok, object, meta}
115     end
116   end
117
118   @impl true
119   def persist(object, meta) do
120     with local <- Keyword.fetch!(meta, :local),
121          {recipients, _, _} <- get_recipients(object),
122          {:ok, activity} <-
123            Repo.insert(%Activity{
124              data: object,
125              local: local,
126              recipients: recipients,
127              actor: object["actor"]
128            }),
129          # TODO: add tests for expired activities, when Note type will be supported in new pipeline
130          {:ok, _} <- maybe_create_activity_expiration(activity) do
131       {:ok, activity, meta}
132     end
133   end
134
135   @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
136   def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
137     with nil <- Activity.normalize(map),
138          map <- lazy_put_activity_defaults(map, fake),
139          {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
140          {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
141          {:ok, map} <- MRF.filter(map),
142          {recipients, _, _} = get_recipients(map),
143          {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
144          {:containment, :ok} <- {:containment, Containment.contain_child(map)},
145          {:ok, map, object} <- insert_full_object(map),
146          {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
147       # Splice in the child object if we have one.
148       activity = Maps.put_if_present(activity, :object, object)
149
150       Pleroma.Web.RichMedia.Card.get_by_activity(activity)
151
152       # Add local posts to search index
153       if local, do: Pleroma.Search.add_to_index(activity)
154
155       {:ok, activity}
156     else
157       %Activity{} = activity ->
158         {:ok, activity}
159
160       {:actor_check, _} ->
161         {:error, false}
162
163       {:containment, _} = error ->
164         error
165
166       {:error, _} = error ->
167         error
168
169       {:fake, true, map, recipients} ->
170         activity = %Activity{
171           data: map,
172           local: local,
173           actor: map["actor"],
174           recipients: recipients,
175           id: "pleroma:fakeid"
176         }
177
178         Pleroma.Web.RichMedia.Card.get_by_activity(activity)
179         {:ok, activity}
180
181       {:remote_limit_pass, _} ->
182         {:error, :remote_limit}
183
184       {:reject, _} = e ->
185         {:error, e}
186     end
187   end
188
189   defp insert_activity_with_expiration(data, local, recipients) do
190     struct = %Activity{
191       data: data,
192       local: local,
193       actor: data["actor"],
194       recipients: recipients
195     }
196
197     with {:ok, activity} <- Repo.insert(struct) do
198       maybe_create_activity_expiration(activity)
199     end
200   end
201
202   def notify_and_stream(activity) do
203     Notification.create_notifications(activity)
204
205     original_activity =
206       case activity do
207         %{data: %{"type" => "Update"}, object: %{data: %{"id" => id}}} ->
208           Activity.get_create_by_object_ap_id_with_object(id)
209
210         _ ->
211           activity
212       end
213
214     conversation = create_or_bump_conversation(original_activity, original_activity.actor)
215     participations = get_participations(conversation)
216     stream_out(activity)
217     stream_out_participations(participations)
218   end
219
220   defp maybe_create_activity_expiration(
221          %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
222        ) do
223     with {:ok, _job} <-
224            Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
225              activity_id: activity.id,
226              expires_at: expires_at
227            }) do
228       {:ok, activity}
229     end
230   end
231
232   defp maybe_create_activity_expiration(activity), do: {:ok, activity}
233
234   defp create_or_bump_conversation(activity, actor) do
235     with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
236          %User{} = user <- User.get_cached_by_ap_id(actor) do
237       Participation.mark_as_read(user, conversation)
238       {:ok, conversation}
239     end
240   end
241
242   defp get_participations({:ok, conversation}) do
243     conversation
244     |> Repo.preload(:participations, force: true)
245     |> Map.get(:participations)
246   end
247
248   defp get_participations(_), do: []
249
250   def stream_out_participations(participations) do
251     participations =
252       participations
253       |> Repo.preload(:user)
254
255     Streamer.stream("participation", participations)
256   end
257
258   @impl true
259   def stream_out_participations(%Object{data: %{"context" => context}}, user) do
260     with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
261       conversation = Repo.preload(conversation, :participations)
262
263       last_activity_id =
264         fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
265           user: user,
266           blocking_user: user
267         })
268
269       if last_activity_id do
270         stream_out_participations(conversation.participations)
271       end
272     end
273   end
274
275   @impl true
276   def stream_out_participations(_, _), do: :noop
277
278   @impl true
279   def stream_out(%Activity{data: %{"type" => data_type}} = activity)
280       when data_type in ["Create", "Announce", "Delete", "Update"] do
281     activity
282     |> Topics.get_activity_topics()
283     |> Streamer.stream(activity)
284   end
285
286   @impl true
287   def stream_out(_activity) do
288     :noop
289   end
290
291   @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
292   def create(params, fake \\ false) do
293     with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
294       result
295     end
296   end
297
298   defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
299     additional = params[:additional] || %{}
300     # only accept false as false value
301     local = !(params[:local] == false)
302     published = params[:published]
303     quick_insert? = Config.get([:env]) == :benchmark
304
305     create_data =
306       make_create_data(
307         %{to: to, actor: actor, published: published, context: context, object: object},
308         additional
309       )
310
311     with {:ok, activity} <- insert(create_data, local, fake),
312          {:fake, false, activity} <- {:fake, fake, activity},
313          _ <- increase_replies_count_if_reply(create_data),
314          _ <- increase_quotes_count_if_quote(create_data),
315          {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
316          {:ok, _actor} <- increase_note_count_if_public(actor, activity),
317          {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
318          _ <- notify_and_stream(activity),
319          :ok <- maybe_schedule_poll_notifications(activity),
320          :ok <- maybe_handle_group_posts(activity),
321          :ok <- maybe_federate(activity) do
322       {:ok, activity}
323     else
324       {:quick_insert, true, activity} ->
325         {:ok, activity}
326
327       {:fake, true, activity} ->
328         {:ok, activity}
329
330       {:error, message} ->
331         Repo.rollback(message)
332     end
333   end
334
335   defp maybe_schedule_poll_notifications(activity) do
336     PollWorker.schedule_poll_end(activity)
337     :ok
338   end
339
340   @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
341   def listen(%{to: to, actor: actor, context: context, object: object} = params) do
342     additional = params[:additional] || %{}
343     # only accept false as false value
344     local = !(params[:local] == false)
345     published = params[:published]
346
347     listen_data =
348       make_listen_data(
349         %{to: to, actor: actor, published: published, context: context, object: object},
350         additional
351       )
352
353     with {:ok, activity} <- insert(listen_data, local),
354          _ <- notify_and_stream(activity),
355          :ok <- maybe_federate(activity) do
356       {:ok, activity}
357     end
358   end
359
360   @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
361           {:ok, Activity.t()} | nil | {:error, any()}
362   def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
363     with {:ok, result} <-
364            Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
365       result
366     end
367   end
368
369   defp do_unfollow(follower, followed, activity_id, local) do
370     with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
371          {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
372          unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
373          {:ok, activity} <- insert(unfollow_data, local),
374          _ <- notify_and_stream(activity),
375          :ok <- maybe_federate(activity) do
376       {:ok, activity}
377     else
378       nil -> nil
379       {:error, error} -> Repo.rollback(error)
380     end
381   end
382
383   @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
384   def flag(params) do
385     with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
386       result
387     end
388   end
389
390   defp do_flag(
391          %{
392            actor: actor,
393            context: _context,
394            account: account,
395            statuses: statuses,
396            content: content
397          } = params
398        ) do
399     # only accept false as false value
400     local = !(params[:local] == false)
401     forward = !(params[:forward] == false)
402
403     additional = params[:additional] || %{}
404
405     additional =
406       if forward do
407         Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
408       else
409         Map.merge(additional, %{"to" => [], "cc" => []})
410       end
411
412     with flag_data <- make_flag_data(params, additional),
413          {:ok, activity} <- insert(flag_data, local),
414          {:ok, stripped_activity} <- strip_report_status_data(activity),
415          _ <- notify_and_stream(activity),
416          :ok <-
417            maybe_federate(stripped_activity) do
418       User.all_users_with_privilege(:reports_manage_reports)
419       |> Enum.filter(fn user -> user.ap_id != actor end)
420       |> Enum.filter(fn user -> not is_nil(user.email) end)
421       |> Enum.each(fn privileged_user ->
422         privileged_user
423         |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
424         |> Pleroma.Emails.Mailer.deliver_async()
425       end)
426
427       {:ok, activity}
428     else
429       {:error, error} -> Repo.rollback(error)
430     end
431   end
432
433   @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
434   def move(%User{} = origin, %User{} = target, local \\ true) do
435     params = %{
436       "type" => "Move",
437       "actor" => origin.ap_id,
438       "object" => origin.ap_id,
439       "target" => target.ap_id,
440       "to" => [origin.follower_address]
441     }
442
443     with true <- origin.ap_id in target.also_known_as,
444          {:ok, activity} <- insert(params, local),
445          _ <- notify_and_stream(activity) do
446       maybe_federate(activity)
447
448       BackgroundWorker.enqueue("move_following", %{
449         "origin_id" => origin.id,
450         "target_id" => target.id
451       })
452
453       {:ok, activity}
454     else
455       false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
456       err -> err
457     end
458   end
459
460   def fetch_activities_for_context_query(context, opts) do
461     public = [Constants.as_public()]
462
463     recipients =
464       if opts[:user],
465         do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
466         else: public
467
468     from(activity in Activity)
469     |> maybe_preload_objects(opts)
470     |> maybe_preload_bookmarks(opts)
471     |> maybe_set_thread_muted_field(opts)
472     |> restrict_unauthenticated(opts[:user])
473     |> restrict_blocked(opts)
474     |> restrict_blockers_visibility(opts)
475     |> restrict_recipients(recipients, opts[:user])
476     |> restrict_filtered(opts)
477     |> where(
478       [activity],
479       fragment(
480         "?->>'type' = ? and ?->>'context' = ?",
481         activity.data,
482         "Create",
483         activity.data,
484         ^context
485       )
486     )
487     |> exclude_poll_votes(opts)
488     |> exclude_id(opts)
489     |> order_by([activity], desc: activity.id)
490   end
491
492   @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
493   def fetch_activities_for_context(context, opts \\ %{}) do
494     context
495     |> fetch_activities_for_context_query(opts)
496     |> Repo.all()
497   end
498
499   @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
500           Ecto.UUID.t() | nil
501   def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
502     context
503     |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
504     |> restrict_visibility(%{visibility: "direct"})
505     |> limit(1)
506     |> select([a], a.id)
507     |> Repo.one()
508   end
509
510   defp fetch_paginated_optimized(query, opts, pagination) do
511     # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
512     #   and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
513     opts = Map.put(opts, :skip_extra_order, true)
514
515     Pagination.fetch_paginated(query, opts, pagination)
516   end
517
518   def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
519     list_memberships = Pleroma.List.memberships(opts[:user])
520
521     fetch_activities_query(recipients ++ list_memberships, opts)
522     |> fetch_paginated_optimized(opts, pagination)
523     |> Enum.reverse()
524     |> maybe_update_cc(list_memberships, opts[:user])
525   end
526
527   @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
528   def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
529     includes_local_public = Map.get(opts, :includes_local_public, false)
530
531     opts = Map.delete(opts, :user)
532
533     intended_recipients =
534       if includes_local_public do
535         [Constants.as_public(), as_local_public()]
536       else
537         [Constants.as_public()]
538       end
539
540     intended_recipients
541     |> fetch_activities_query(opts)
542     |> restrict_unlisted(opts)
543     |> fetch_paginated_optimized(opts, pagination)
544   end
545
546   @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
547   def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
548     opts
549     |> Map.put(:restrict_unlisted, true)
550     |> fetch_public_or_unlisted_activities(pagination)
551   end
552
553   @valid_visibilities ~w[direct unlisted public private]
554
555   defp restrict_visibility(query, %{visibility: visibility})
556        when is_list(visibility) do
557     if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
558       from(
559         a in query,
560         where:
561           fragment(
562             "activity_visibility(?, ?, ?) = ANY (?)",
563             a.actor,
564             a.recipients,
565             a.data,
566             ^visibility
567           )
568       )
569     else
570       Logger.error("Could not restrict visibility to #{visibility}")
571     end
572   end
573
574   defp restrict_visibility(query, %{visibility: visibility})
575        when visibility in @valid_visibilities do
576     from(
577       a in query,
578       where:
579         fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
580     )
581   end
582
583   defp restrict_visibility(_query, %{visibility: visibility})
584        when visibility not in @valid_visibilities do
585     Logger.error("Could not restrict visibility to #{visibility}")
586   end
587
588   defp restrict_visibility(query, _visibility), do: query
589
590   defp exclude_visibility(query, %{exclude_visibilities: visibility})
591        when is_list(visibility) do
592     if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
593       from(
594         a in query,
595         where:
596           not fragment(
597             "activity_visibility(?, ?, ?) = ANY (?)",
598             a.actor,
599             a.recipients,
600             a.data,
601             ^visibility
602           )
603       )
604     else
605       Logger.error("Could not exclude visibility to #{visibility}")
606       query
607     end
608   end
609
610   defp exclude_visibility(query, %{exclude_visibilities: visibility})
611        when visibility in @valid_visibilities do
612     from(
613       a in query,
614       where:
615         not fragment(
616           "activity_visibility(?, ?, ?) = ?",
617           a.actor,
618           a.recipients,
619           a.data,
620           ^visibility
621         )
622     )
623   end
624
625   defp exclude_visibility(query, %{exclude_visibilities: visibility})
626        when visibility not in [nil | @valid_visibilities] do
627     Logger.error("Could not exclude visibility to #{visibility}")
628     query
629   end
630
631   defp exclude_visibility(query, _visibility), do: query
632
633   defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
634     do: query
635
636   defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
637     do: query
638
639   defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
640     local_public = as_local_public()
641
642     from(
643       a in query,
644       where: fragment("thread_visibility(?, (?)->>'id', ?) = true", ^ap_id, a.data, ^local_public)
645     )
646   end
647
648   defp restrict_thread_visibility(query, _, _), do: query
649
650   def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
651     params =
652       params
653       |> Map.put(:user, reading_user)
654       |> Map.put(:actor_id, user.ap_id)
655
656     %{
657       godmode: params[:godmode],
658       reading_user: reading_user
659     }
660     |> user_activities_recipients()
661     |> fetch_activities(params)
662     |> Enum.reverse()
663   end
664
665   def fetch_user_activities(user, reading_user, params \\ %{})
666
667   def fetch_user_activities(user, reading_user, %{total: true} = params) do
668     result = fetch_activities_for_user(user, reading_user, params)
669
670     Keyword.put(result, :items, Enum.reverse(result[:items]))
671   end
672
673   def fetch_user_activities(user, reading_user, params) do
674     user
675     |> fetch_activities_for_user(reading_user, params)
676     |> Enum.reverse()
677   end
678
679   defp fetch_activities_for_user(user, reading_user, params) do
680     params =
681       params
682       |> Map.put(:type, ["Create", "Announce"])
683       |> Map.put(:user, reading_user)
684       |> Map.put(:actor_id, user.ap_id)
685       |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
686
687     params =
688       if User.blocks?(reading_user, user) do
689         params
690       else
691         params
692         |> Map.put(:blocking_user, reading_user)
693         |> Map.put(:muting_user, reading_user)
694       end
695
696     pagination_type = Map.get(params, :pagination_type) || :keyset
697
698     %{
699       godmode: params[:godmode],
700       reading_user: reading_user
701     }
702     |> user_activities_recipients()
703     |> fetch_activities(params, pagination_type)
704   end
705
706   def fetch_statuses(reading_user, %{total: true} = params) do
707     result = fetch_activities_for_reading_user(reading_user, params)
708     Keyword.put(result, :items, Enum.reverse(result[:items]))
709   end
710
711   def fetch_statuses(reading_user, params) do
712     reading_user
713     |> fetch_activities_for_reading_user(params)
714     |> Enum.reverse()
715   end
716
717   defp fetch_activities_for_reading_user(reading_user, params) do
718     params = Map.put(params, :type, ["Create", "Announce"])
719
720     %{
721       godmode: params[:godmode],
722       reading_user: reading_user
723     }
724     |> user_activities_recipients()
725     |> fetch_activities(params, :offset)
726   end
727
728   defp user_activities_recipients(%{godmode: true}), do: []
729
730   defp user_activities_recipients(%{reading_user: reading_user}) do
731     if not is_nil(reading_user) and reading_user.local do
732       [
733         Constants.as_public(),
734         as_local_public(),
735         reading_user.ap_id | User.following(reading_user)
736       ]
737     else
738       [Constants.as_public()]
739     end
740   end
741
742   defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
743     raise "Can't use the child object without preloading!"
744   end
745
746   defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
747     from(
748       [activity, object] in query,
749       where:
750         fragment(
751           "?->>'type' != ? or ?->>'actor' != ?",
752           activity.data,
753           "Announce",
754           object.data,
755           ^actor
756         )
757     )
758   end
759
760   defp restrict_announce_object_actor(query, _), do: query
761
762   defp restrict_since(query, %{since_id: ""}), do: query
763
764   defp restrict_since(query, %{since_id: since_id}) do
765     from(activity in query, where: activity.id > ^since_id)
766   end
767
768   defp restrict_since(query, _), do: query
769
770   defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
771     raise_on_missing_preload()
772   end
773
774   defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
775     from(
776       [_activity, object] in query,
777       where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
778     )
779   end
780
781   defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
782     restrict_embedded_tag_any(query, %{tag: tag})
783   end
784
785   defp restrict_embedded_tag_all(query, _), do: query
786
787   defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
788     raise_on_missing_preload()
789   end
790
791   defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
792     from(
793       [_activity, object] in query,
794       where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
795     )
796   end
797
798   defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
799     restrict_embedded_tag_any(query, %{tag: [tag]})
800   end
801
802   defp restrict_embedded_tag_any(query, _), do: query
803
804   defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
805     raise_on_missing_preload()
806   end
807
808   defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
809     from(
810       [_activity, object] in query,
811       where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
812     )
813   end
814
815   defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
816        when is_binary(tag_reject) do
817     restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
818   end
819
820   defp restrict_embedded_tag_reject_any(query, _), do: query
821
822   defp object_ids_query_for_tags(tags) do
823     from(hto in "hashtags_objects")
824     |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
825     |> where([hto, ht], ht.name in ^tags)
826     |> select([hto], hto.object_id)
827     |> distinct([hto], true)
828   end
829
830   defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
831     raise_on_missing_preload()
832   end
833
834   defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
835     restrict_hashtag_any(query, %{tag: single_tag})
836   end
837
838   defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
839     from(
840       [_activity, object] in query,
841       where:
842         fragment(
843           """
844           (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
845             ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
846               AND hashtags_objects.object_id = ?) @> ?
847           """,
848           ^tags,
849           object.id,
850           ^tags
851         )
852     )
853   end
854
855   defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
856     restrict_hashtag_all(query, %{tag_all: [tag]})
857   end
858
859   defp restrict_hashtag_all(query, _), do: query
860
861   defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
862     raise_on_missing_preload()
863   end
864
865   defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
866     hashtag_ids =
867       from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
868       |> Repo.all()
869
870     # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
871     from(
872       [_activity, object] in query,
873       join: hto in "hashtags_objects",
874       on: hto.object_id == object.id,
875       where: hto.hashtag_id in ^hashtag_ids,
876       distinct: [desc: object.id],
877       order_by: [desc: object.id]
878     )
879   end
880
881   defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
882     restrict_hashtag_any(query, %{tag: [tag]})
883   end
884
885   defp restrict_hashtag_any(query, _), do: query
886
887   defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
888     raise_on_missing_preload()
889   end
890
891   defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
892     from(
893       [_activity, object] in query,
894       where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
895     )
896   end
897
898   defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
899     restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
900   end
901
902   defp restrict_hashtag_reject_any(query, _), do: query
903
904   defp raise_on_missing_preload do
905     raise "Can't use the child object without preloading!"
906   end
907
908   defp restrict_recipients(query, [], _user), do: query
909
910   defp restrict_recipients(query, recipients, nil) do
911     from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
912   end
913
914   defp restrict_recipients(query, recipients, user) do
915     from(
916       activity in query,
917       where: fragment("? && ?", ^recipients, activity.recipients),
918       or_where: activity.actor == ^user.ap_id
919     )
920   end
921
922   defp restrict_local(query, %{local_only: true}) do
923     from(activity in query, where: activity.local == true)
924   end
925
926   defp restrict_local(query, _), do: query
927
928   defp restrict_remote(query, %{remote: true}) do
929     from(activity in query, where: activity.local == false)
930   end
931
932   defp restrict_remote(query, _), do: query
933
934   defp restrict_actor(query, %{actor_id: actor_id}) do
935     from(activity in query, where: activity.actor == ^actor_id)
936   end
937
938   defp restrict_actor(query, _), do: query
939
940   defp restrict_type(query, %{type: type}) when is_binary(type) do
941     from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
942   end
943
944   defp restrict_type(query, %{type: type}) do
945     from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
946   end
947
948   defp restrict_type(query, _), do: query
949
950   defp restrict_state(query, %{state: state}) do
951     from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
952   end
953
954   defp restrict_state(query, _), do: query
955
956   defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
957     from(
958       [_activity, object] in query,
959       where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
960     )
961   end
962
963   defp restrict_favorited_by(query, _), do: query
964
965   defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
966     raise "Can't use the child object without preloading!"
967   end
968
969   defp restrict_media(query, %{only_media: true}) do
970     from(
971       [activity, object] in query,
972       where: fragment("(?)->>'type' = ?", activity.data, "Create"),
973       where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
974     )
975   end
976
977   defp restrict_media(query, _), do: query
978
979   defp restrict_replies(query, %{exclude_replies: true}) do
980     from(
981       [_activity, object] in query,
982       where: fragment("?->>'inReplyTo' is null", object.data)
983     )
984   end
985
986   defp restrict_replies(query, %{
987          reply_filtering_user: %User{} = user,
988          reply_visibility: "self"
989        }) do
990     from(
991       [activity, object] in query,
992       where:
993         fragment(
994           "?->>'inReplyTo' is null OR ? = ANY(?)",
995           object.data,
996           ^user.ap_id,
997           activity.recipients
998         )
999     )
1000   end
1001
1002   defp restrict_replies(query, %{
1003          reply_filtering_user: %User{} = user,
1004          reply_visibility: "following"
1005        }) do
1006     from(
1007       [activity, object] in query,
1008       where:
1009         fragment(
1010           """
1011           ?->>'type' != 'Create'     -- This isn't a Create
1012           OR ?->>'inReplyTo' is null -- this isn't a reply
1013           OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
1014                                      -- unless they are the author (because authors
1015                                      -- are also part of the recipients). This leads
1016                                      -- to a bug that self-replies by friends won't
1017                                      -- show up.
1018           OR ? = ?                   -- The actor is us
1019           """,
1020           activity.data,
1021           object.data,
1022           ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
1023           activity.recipients,
1024           activity.actor,
1025           activity.actor,
1026           ^user.ap_id
1027         )
1028     )
1029   end
1030
1031   defp restrict_replies(query, _), do: query
1032
1033   defp restrict_reblogs(query, %{exclude_reblogs: true}) do
1034     from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1035   end
1036
1037   defp restrict_reblogs(query, _), do: query
1038
1039   defp restrict_muted(query, %{with_muted: true}), do: query
1040
1041   defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1042     mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1043
1044     query =
1045       from([activity] in query,
1046         where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1047         where:
1048           fragment(
1049             "not (?->'to' \\?| ?) or ? = ?",
1050             activity.data,
1051             ^mutes,
1052             activity.actor,
1053             ^user.ap_id
1054           )
1055       )
1056
1057     unless opts[:skip_preload] do
1058       from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1059     else
1060       query
1061     end
1062   end
1063
1064   defp restrict_muted(query, _), do: query
1065
1066   defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1067     blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1068     domain_blocks = user.domain_blocks || []
1069
1070     following_ap_ids = User.get_friends_ap_ids(user)
1071
1072     query =
1073       if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1074
1075     from(
1076       [activity, object: o] in query,
1077       # You don't block the author
1078       where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1079
1080       # You don't block any recipients, and didn't author the post
1081       where:
1082         fragment(
1083           "((not (? && ?)) or ? = ?)",
1084           activity.recipients,
1085           ^blocked_ap_ids,
1086           activity.actor,
1087           ^user.ap_id
1088         ),
1089
1090       # You don't block the domain of any recipients, and didn't author the post
1091       where:
1092         fragment(
1093           "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1094           activity.recipients,
1095           ^domain_blocks,
1096           activity.actor,
1097           ^user.ap_id
1098         ),
1099
1100       # It's not a boost of a user you block
1101       where:
1102         fragment(
1103           "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1104           activity.data,
1105           activity.data,
1106           ^blocked_ap_ids
1107         ),
1108
1109       # You don't block the author's domain, and also don't follow the author
1110       where:
1111         fragment(
1112           "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1113           activity.actor,
1114           ^domain_blocks,
1115           activity.actor,
1116           ^following_ap_ids
1117         ),
1118
1119       # Same as above, but checks the Object
1120       where:
1121         fragment(
1122           "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1123           o.data,
1124           ^domain_blocks,
1125           o.data,
1126           ^following_ap_ids
1127         )
1128     )
1129   end
1130
1131   defp restrict_blocked(query, _), do: query
1132
1133   defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1134     if Config.get([:activitypub, :blockers_visible]) == true do
1135       query
1136     else
1137       blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1138
1139       from(
1140         activity in query,
1141         # The author doesn't block you
1142         where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1143
1144         # It's not a boost of a user that blocks you
1145         where:
1146           fragment(
1147             "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1148             activity.data,
1149             activity.data,
1150             ^blocker_ap_ids
1151           )
1152       )
1153     end
1154   end
1155
1156   defp restrict_blockers_visibility(query, _), do: query
1157
1158   defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1159     from(
1160       activity in query,
1161       where:
1162         fragment(
1163           "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1164           activity.data,
1165           ^[Constants.as_public()]
1166         )
1167     )
1168   end
1169
1170   defp restrict_unlisted(query, _), do: query
1171
1172   defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1173     from(
1174       [activity, object: o] in query,
1175       where:
1176         fragment(
1177           "(?)->>'type' = 'Create' and associated_object_id((?)) = any (?)",
1178           activity.data,
1179           activity.data,
1180           ^ids
1181         )
1182     )
1183   end
1184
1185   defp restrict_pinned(query, _), do: query
1186
1187   defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1188     muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1189
1190     from(
1191       activity in query,
1192       where:
1193         fragment(
1194           "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1195           activity.data,
1196           activity.actor,
1197           ^muted_reblogs
1198         )
1199     )
1200   end
1201
1202   defp restrict_muted_reblogs(query, _), do: query
1203
1204   defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1205     from(
1206       activity in query,
1207       where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1208     )
1209   end
1210
1211   defp restrict_instance(query, _), do: query
1212
1213   defp restrict_filtered(query, %{user: %User{} = user}) do
1214     case Filter.compose_regex(user) do
1215       nil ->
1216         query
1217
1218       regex ->
1219         from([activity, object] in query,
1220           where:
1221             fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1222               activity.actor == ^user.ap_id
1223         )
1224     end
1225   end
1226
1227   defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1228     restrict_filtered(query, %{user: user})
1229   end
1230
1231   defp restrict_filtered(query, _), do: query
1232
1233   defp restrict_unauthenticated(query, nil) do
1234     local = Config.restrict_unauthenticated_access?(:activities, :local)
1235     remote = Config.restrict_unauthenticated_access?(:activities, :remote)
1236
1237     cond do
1238       local and remote ->
1239         from(activity in query, where: false)
1240
1241       local ->
1242         from(activity in query, where: activity.local == false)
1243
1244       remote ->
1245         from(activity in query, where: activity.local == true)
1246
1247       true ->
1248         query
1249     end
1250   end
1251
1252   defp restrict_unauthenticated(query, _), do: query
1253
1254   defp restrict_quote_url(query, %{quote_url: quote_url}) do
1255     from([_activity, object] in query,
1256       where: fragment("(?)->'quoteUrl' = ?", object.data, ^quote_url)
1257     )
1258   end
1259
1260   defp restrict_quote_url(query, _), do: query
1261
1262   defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1263
1264   defp exclude_poll_votes(query, _) do
1265     if has_named_binding?(query, :object) do
1266       from([activity, object: o] in query,
1267         where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1268       )
1269     else
1270       query
1271     end
1272   end
1273
1274   defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1275
1276   defp exclude_chat_messages(query, _) do
1277     if has_named_binding?(query, :object) do
1278       from([activity, object: o] in query,
1279         where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1280       )
1281     else
1282       query
1283     end
1284   end
1285
1286   defp exclude_invisible_actors(query, %{type: "Flag"}), do: query
1287   defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1288
1289   defp exclude_invisible_actors(query, _opts) do
1290     query
1291     |> join(:inner, [activity], u in User,
1292       as: :u,
1293       on: activity.actor == u.ap_id and u.invisible == false
1294     )
1295   end
1296
1297   defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1298     from(activity in query, where: activity.id != ^id)
1299   end
1300
1301   defp exclude_id(query, _), do: query
1302
1303   defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1304
1305   defp maybe_preload_objects(query, _) do
1306     query
1307     |> Activity.with_preloaded_object()
1308   end
1309
1310   defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1311
1312   defp maybe_preload_bookmarks(query, opts) do
1313     query
1314     |> Activity.with_preloaded_bookmark(opts[:user])
1315   end
1316
1317   defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1318     query
1319     |> Activity.with_preloaded_report_notes()
1320   end
1321
1322   defp maybe_preload_report_notes(query, _), do: query
1323
1324   defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1325
1326   defp maybe_set_thread_muted_field(query, opts) do
1327     query
1328     |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1329   end
1330
1331   defp maybe_order(query, %{order: :desc}) do
1332     query
1333     |> order_by(desc: :id)
1334   end
1335
1336   defp maybe_order(query, %{order: :asc}) do
1337     query
1338     |> order_by(asc: :id)
1339   end
1340
1341   defp maybe_order(query, _), do: query
1342
1343   defp normalize_fetch_activities_query_opts(opts) do
1344     Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1345       case opts[key] do
1346         value when is_bitstring(value) ->
1347           Map.put(opts, key, Hashtag.normalize_name(value))
1348
1349         value when is_list(value) ->
1350           normalized_value =
1351             value
1352             |> Enum.map(&Hashtag.normalize_name/1)
1353             |> Enum.uniq()
1354
1355           Map.put(opts, key, normalized_value)
1356
1357         _ ->
1358           opts
1359       end
1360     end)
1361   end
1362
1363   defp fetch_activities_query_ap_ids_ops(opts) do
1364     source_user = opts[:muting_user]
1365     ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1366
1367     ap_id_relationships =
1368       if opts[:blocking_user] && opts[:blocking_user] == source_user do
1369         [:block | ap_id_relationships]
1370       else
1371         ap_id_relationships
1372       end
1373
1374     preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1375
1376     restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1377     restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1378
1379     restrict_muted_reblogs_opts =
1380       Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1381
1382     {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1383   end
1384
1385   def fetch_activities_query(recipients, opts \\ %{}) do
1386     opts = normalize_fetch_activities_query_opts(opts)
1387
1388     {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1389       fetch_activities_query_ap_ids_ops(opts)
1390
1391     config = %{
1392       skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1393     }
1394
1395     query =
1396       Activity
1397       |> maybe_preload_objects(opts)
1398       |> maybe_preload_bookmarks(opts)
1399       |> maybe_preload_report_notes(opts)
1400       |> maybe_set_thread_muted_field(opts)
1401       |> maybe_order(opts)
1402       |> restrict_recipients(recipients, opts[:user])
1403       |> restrict_replies(opts)
1404       |> restrict_since(opts)
1405       |> restrict_local(opts)
1406       |> restrict_remote(opts)
1407       |> restrict_actor(opts)
1408       |> restrict_type(opts)
1409       |> restrict_state(opts)
1410       |> restrict_favorited_by(opts)
1411       |> restrict_blocked(restrict_blocked_opts)
1412       |> restrict_blockers_visibility(opts)
1413       |> restrict_muted(restrict_muted_opts)
1414       |> restrict_filtered(opts)
1415       |> restrict_media(opts)
1416       |> restrict_visibility(opts)
1417       |> restrict_thread_visibility(opts, config)
1418       |> restrict_reblogs(opts)
1419       |> restrict_pinned(opts)
1420       |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1421       |> restrict_instance(opts)
1422       |> restrict_announce_object_actor(opts)
1423       |> restrict_filtered(opts)
1424       |> restrict_quote_url(opts)
1425       |> maybe_restrict_deactivated_users(opts)
1426       |> exclude_poll_votes(opts)
1427       |> exclude_chat_messages(opts)
1428       |> exclude_invisible_actors(opts)
1429       |> exclude_visibility(opts)
1430
1431     if Config.feature_enabled?(:improved_hashtag_timeline) do
1432       query
1433       |> restrict_hashtag_any(opts)
1434       |> restrict_hashtag_all(opts)
1435       |> restrict_hashtag_reject_any(opts)
1436     else
1437       query
1438       |> restrict_embedded_tag_any(opts)
1439       |> restrict_embedded_tag_all(opts)
1440       |> restrict_embedded_tag_reject_any(opts)
1441     end
1442   end
1443
1444   @doc """
1445   Fetch favorites activities of user with order by sort adds to favorites
1446   """
1447   @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1448   def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1449     user.ap_id
1450     |> Activity.Queries.by_actor()
1451     |> Activity.Queries.by_type("Like")
1452     |> Activity.with_joined_object()
1453     |> Object.with_joined_activity()
1454     |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1455     |> order_by([like, _, _], desc_nulls_last: like.id)
1456     |> Pagination.fetch_paginated(
1457       Map.merge(params, %{skip_order: true}),
1458       pagination
1459     )
1460   end
1461
1462   defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1463     Enum.map(activities, fn
1464       %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1465         if Enum.any?(bcc, &(&1 in list_memberships)) do
1466           update_in(activity.data["cc"], &[user_ap_id | &1])
1467         else
1468           activity
1469         end
1470
1471       activity ->
1472         activity
1473     end)
1474   end
1475
1476   defp maybe_update_cc(activities, _, _), do: activities
1477
1478   defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1479     from(activity in query,
1480       where:
1481         fragment("? && ?", activity.recipients, ^recipients) or
1482           (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1483              ^Constants.as_public() in activity.recipients)
1484     )
1485   end
1486
1487   def fetch_activities_bounded(
1488         recipients,
1489         recipients_with_public,
1490         opts \\ %{},
1491         pagination \\ :keyset
1492       ) do
1493     fetch_activities_query([], opts)
1494     |> fetch_activities_bounded_query(recipients, recipients_with_public)
1495     |> Pagination.fetch_paginated(opts, pagination)
1496     |> Enum.reverse()
1497   end
1498
1499   @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1500   def upload(file, opts \\ []) do
1501     with {:ok, data} <- Upload.store(sanitize_upload_file(file), opts) do
1502       obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1503
1504       Repo.insert(%Object{data: obj_data})
1505     end
1506   end
1507
1508   defp sanitize_upload_file(%Plug.Upload{filename: filename} = upload) when is_binary(filename) do
1509     %Plug.Upload{
1510       upload
1511       | filename: Path.basename(filename)
1512     }
1513   end
1514
1515   defp sanitize_upload_file(upload), do: upload
1516
1517   @spec get_actor_url(any()) :: binary() | nil
1518   defp get_actor_url(url) when is_binary(url), do: url
1519   defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1520
1521   defp get_actor_url(url) when is_list(url) do
1522     url
1523     |> List.first()
1524     |> get_actor_url()
1525   end
1526
1527   defp get_actor_url(_url), do: nil
1528
1529   defp normalize_image(%{"url" => url}) do
1530     %{
1531       "type" => "Image",
1532       "url" => [%{"href" => url}]
1533     }
1534   end
1535
1536   defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1537   defp normalize_image(_), do: nil
1538
1539   defp object_to_user_data(data, additional) do
1540     fields =
1541       data
1542       |> Map.get("attachment", [])
1543       |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1544       |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1545
1546     emojis =
1547       data
1548       |> Map.get("tag", [])
1549       |> Enum.filter(fn
1550         %{"type" => "Emoji"} -> true
1551         _ -> false
1552       end)
1553       |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1554         {String.trim(name, ":"), url}
1555       end)
1556
1557     is_locked = data["manuallyApprovesFollowers"] || false
1558     capabilities = data["capabilities"] || %{}
1559     accepts_chat_messages = capabilities["acceptsChatMessages"]
1560     data = Transmogrifier.maybe_fix_user_object(data)
1561     is_discoverable = data["discoverable"] || false
1562     invisible = data["invisible"] || false
1563     actor_type = data["type"] || "Person"
1564
1565     featured_address = data["featured"]
1566     {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1567
1568     public_key =
1569       if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1570         data["publicKey"]["publicKeyPem"]
1571       end
1572
1573     shared_inbox =
1574       if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1575         data["endpoints"]["sharedInbox"]
1576       end
1577
1578     birthday =
1579       if is_binary(data["vcard:bday"]) do
1580         case Date.from_iso8601(data["vcard:bday"]) do
1581           {:ok, date} -> date
1582           {:error, _} -> nil
1583         end
1584       end
1585
1586     show_birthday = !!birthday
1587
1588     # if WebFinger request was already done, we probably have acct, otherwise
1589     # we request WebFinger here
1590     nickname = additional[:nickname_from_acct] || generate_nickname(data)
1591
1592     %{
1593       ap_id: data["id"],
1594       uri: get_actor_url(data["url"]),
1595       banner: normalize_image(data["image"]),
1596       fields: fields,
1597       emoji: emojis,
1598       is_locked: is_locked,
1599       is_discoverable: is_discoverable,
1600       invisible: invisible,
1601       avatar: normalize_image(data["icon"]),
1602       name: data["name"],
1603       follower_address: data["followers"],
1604       following_address: data["following"],
1605       featured_address: featured_address,
1606       bio: data["summary"] || "",
1607       actor_type: actor_type,
1608       also_known_as: Map.get(data, "alsoKnownAs", []),
1609       public_key: public_key,
1610       inbox: data["inbox"],
1611       shared_inbox: shared_inbox,
1612       accepts_chat_messages: accepts_chat_messages,
1613       birthday: birthday,
1614       show_birthday: show_birthday,
1615       pinned_objects: pinned_objects,
1616       nickname: nickname
1617     }
1618   end
1619
1620   defp generate_nickname(%{"preferredUsername" => username} = data) when is_binary(username) do
1621     generated = "#{username}@#{URI.parse(data["id"]).host}"
1622
1623     if Config.get([WebFinger, :update_nickname_on_user_fetch]) do
1624       case WebFinger.finger(generated) do
1625         {:ok, %{"subject" => "acct:" <> acct}} -> acct
1626         _ -> generated
1627       end
1628     else
1629       generated
1630     end
1631   end
1632
1633   # nickname can be nil because of virtual actors
1634   defp generate_nickname(_), do: nil
1635
1636   def fetch_follow_information_for_user(user) do
1637     with {:ok, following_data} <-
1638            Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1639          {:ok, hide_follows} <- collection_private(following_data),
1640          {:ok, followers_data} <-
1641            Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1642          {:ok, hide_followers} <- collection_private(followers_data) do
1643       {:ok,
1644        %{
1645          hide_follows: hide_follows,
1646          follower_count: normalize_counter(followers_data["totalItems"]),
1647          following_count: normalize_counter(following_data["totalItems"]),
1648          hide_followers: hide_followers
1649        }}
1650     else
1651       {:error, _} = e -> e
1652       e -> {:error, e}
1653     end
1654   end
1655
1656   defp normalize_counter(counter) when is_integer(counter), do: counter
1657   defp normalize_counter(_), do: 0
1658
1659   def maybe_update_follow_information(user_data) do
1660     with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1661          {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1662          {_, true} <-
1663            {:collections_available,
1664             !!(user_data[:following_address] && user_data[:follower_address])},
1665          {:ok, info} <-
1666            fetch_follow_information_for_user(user_data) do
1667       info = Map.merge(user_data[:info] || %{}, info)
1668
1669       user_data
1670       |> Map.put(:info, info)
1671     else
1672       {:user_type_check, false} ->
1673         user_data
1674
1675       {:collections_available, false} ->
1676         user_data
1677
1678       {:enabled, false} ->
1679         user_data
1680
1681       e ->
1682         Logger.error(
1683           "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1684         )
1685
1686         user_data
1687     end
1688   end
1689
1690   defp collection_private(%{"first" => %{"type" => type}})
1691        when type in ["CollectionPage", "OrderedCollectionPage"],
1692        do: {:ok, false}
1693
1694   defp collection_private(%{"first" => first}) do
1695     with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1696            Fetcher.fetch_and_contain_remote_object_from_id(first) do
1697       {:ok, false}
1698     else
1699       {:error, _} -> {:ok, true}
1700     end
1701   end
1702
1703   defp collection_private(_data), do: {:ok, true}
1704
1705   def user_data_from_user_object(data, additional \\ []) do
1706     with {:ok, data} <- MRF.filter(data) do
1707       {:ok, object_to_user_data(data, additional)}
1708     else
1709       e -> {:error, e}
1710     end
1711   end
1712
1713   defp fetch_and_prepare_user_from_ap_id(ap_id, additional) do
1714     with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1715          {:ok, data} <- user_data_from_user_object(data, additional) do
1716       {:ok, maybe_update_follow_information(data)}
1717     else
1718       # If this has been deleted, only log a debug and not an error
1719       {:error, "Object has been deleted" = e} ->
1720         Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1721         {:error, e}
1722
1723       {:error, {:reject, reason} = e} ->
1724         Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1725         {:error, e}
1726
1727       {:error, e} ->
1728         Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1729         {:error, e}
1730     end
1731   end
1732
1733   def maybe_handle_clashing_nickname(data) do
1734     with nickname when is_binary(nickname) <- data[:nickname],
1735          %User{} = old_user <- User.get_by_nickname(nickname),
1736          {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1737       Logger.info(
1738         "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1739       )
1740
1741       old_user
1742       |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1743       |> User.update_and_set_cache()
1744     else
1745       {:ap_id_comparison, true} ->
1746         Logger.info(
1747           "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1748         )
1749
1750       _ ->
1751         nil
1752     end
1753   end
1754
1755   def pin_data_from_featured_collection(%{
1756         "type" => type,
1757         "orderedItems" => objects
1758       })
1759       when type in ["OrderedCollection", "Collection"] do
1760     Map.new(objects, fn
1761       %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()}
1762       object_ap_id when is_binary(object_ap_id) -> {object_ap_id, NaiveDateTime.utc_now()}
1763     end)
1764   end
1765
1766   def pin_data_from_featured_collection(obj) do
1767     Logger.error("Could not parse featured collection #{inspect(obj)}")
1768     %{}
1769   end
1770
1771   def fetch_and_prepare_featured_from_ap_id(nil) do
1772     {:ok, %{}}
1773   end
1774
1775   def fetch_and_prepare_featured_from_ap_id(ap_id) do
1776     with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1777       {:ok, pin_data_from_featured_collection(data)}
1778     else
1779       e ->
1780         Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1781         {:ok, %{}}
1782     end
1783   end
1784
1785   def pinned_fetch_task(nil), do: nil
1786
1787   def pinned_fetch_task(%{pinned_objects: pins}) do
1788     if Enum.all?(pins, fn {ap_id, _} ->
1789          Object.get_cached_by_ap_id(ap_id) ||
1790            match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1791        end) do
1792       :ok
1793     else
1794       :error
1795     end
1796   end
1797
1798   def make_user_from_ap_id(ap_id, additional \\ []) do
1799     user = User.get_cached_by_ap_id(ap_id)
1800
1801     with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, additional) do
1802       {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1803
1804       if user do
1805         user
1806         |> User.remote_user_changeset(data)
1807         |> User.update_and_set_cache()
1808       else
1809         maybe_handle_clashing_nickname(data)
1810
1811         data
1812         |> User.remote_user_changeset()
1813         |> Repo.insert()
1814         |> User.set_cache()
1815       end
1816     end
1817   end
1818
1819   def make_user_from_nickname(nickname) do
1820     with {:ok, %{"ap_id" => ap_id, "subject" => "acct:" <> acct}} when not is_nil(ap_id) <-
1821            WebFinger.finger(nickname) do
1822       make_user_from_ap_id(ap_id, nickname_from_acct: acct)
1823     else
1824       _e -> {:error, "No AP id in WebFinger"}
1825     end
1826   end
1827
1828   # filter out broken threads
1829   defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1830     entire_thread_visible_for_user?(activity, user)
1831   end
1832
1833   # do post-processing on a specific activity
1834   def contain_activity(%Activity{} = activity, %User{} = user) do
1835     contain_broken_threads(activity, user)
1836   end
1837
1838   def fetch_direct_messages_query do
1839     Activity
1840     |> restrict_type(%{type: "Create"})
1841     |> restrict_visibility(%{visibility: "direct"})
1842     |> order_by([activity], asc: activity.id)
1843   end
1844
1845   defp maybe_restrict_deactivated_users(activity, %{type: "Flag"}), do: activity
1846
1847   defp maybe_restrict_deactivated_users(activity, _opts),
1848     do: Activity.restrict_deactivated_users(activity)
1849 end