First
[anni] / lib / pleroma / web / activity_pub / activity_pub.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.ActivityPub.ActivityPub do
6   alias Pleroma.Activity
7   alias Pleroma.Activity.Ir.Topics
8   alias Pleroma.Config
9   alias Pleroma.Constants
10   alias Pleroma.Conversation
11   alias Pleroma.Conversation.Participation
12   alias Pleroma.Filter
13   alias Pleroma.Hashtag
14   alias Pleroma.Maps
15   alias Pleroma.Notification
16   alias Pleroma.Object
17   alias Pleroma.Object.Containment
18   alias Pleroma.Object.Fetcher
19   alias Pleroma.Pagination
20   alias Pleroma.Repo
21   alias Pleroma.Upload
22   alias Pleroma.User
23   alias Pleroma.Web.ActivityPub.MRF
24   alias Pleroma.Web.ActivityPub.Transmogrifier
25   alias Pleroma.Web.Streamer
26   alias Pleroma.Web.WebFinger
27   alias Pleroma.Workers.BackgroundWorker
28   alias Pleroma.Workers.PollWorker
29
30   import Ecto.Query
31   import Pleroma.Web.ActivityPub.Utils
32   import Pleroma.Web.ActivityPub.Visibility
33
34   require Logger
35   require Pleroma.Constants
36
37   @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
38   @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
39
40   defp get_recipients(%{"type" => "Create"} = data) do
41     to = Map.get(data, "to", [])
42     cc = Map.get(data, "cc", [])
43     bcc = Map.get(data, "bcc", [])
44     actor = Map.get(data, "actor", [])
45     recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
46     {recipients, to, cc}
47   end
48
49   defp get_recipients(data) do
50     to = Map.get(data, "to", [])
51     cc = Map.get(data, "cc", [])
52     bcc = Map.get(data, "bcc", [])
53     recipients = Enum.concat([to, cc, bcc])
54     {recipients, to, cc}
55   end
56
57   defp check_actor_can_insert(%{"type" => "Delete"}), do: true
58   defp check_actor_can_insert(%{"type" => "Undo"}), do: true
59
60   defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
61     case User.get_cached_by_ap_id(actor) do
62       %User{is_active: true} -> true
63       _ -> false
64     end
65   end
66
67   defp check_actor_can_insert(_), do: true
68
69   defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
70     limit = Config.get([:instance, :remote_limit])
71     String.length(content) <= limit
72   end
73
74   defp check_remote_limit(_), do: true
75
76   def increase_note_count_if_public(actor, object) do
77     if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
78   end
79
80   def decrease_note_count_if_public(actor, object) do
81     if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
82   end
83
84   def update_last_status_at_if_public(actor, object) do
85     if is_public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
86   end
87
88   defp increase_replies_count_if_reply(%{
89          "object" => %{"inReplyTo" => reply_ap_id} = object,
90          "type" => "Create"
91        }) do
92     if is_public?(object) do
93       Object.increase_replies_count(reply_ap_id)
94     end
95   end
96
97   defp increase_replies_count_if_reply(_create_data), do: :noop
98
99   @object_types ~w[ChatMessage Question Answer Audio Video Event Article Note Page]
100   @impl true
101   def persist(%{"type" => type} = object, meta) when type in @object_types do
102     with {:ok, object} <- Object.create(object) do
103       {:ok, object, meta}
104     end
105   end
106
107   @impl true
108   def persist(object, meta) do
109     with local <- Keyword.fetch!(meta, :local),
110          {recipients, _, _} <- get_recipients(object),
111          {:ok, activity} <-
112            Repo.insert(%Activity{
113              data: object,
114              local: local,
115              recipients: recipients,
116              actor: object["actor"]
117            }),
118          # TODO: add tests for expired activities, when Note type will be supported in new pipeline
119          {:ok, _} <- maybe_create_activity_expiration(activity) do
120       {:ok, activity, meta}
121     end
122   end
123
124   @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
125   def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
126     with nil <- Activity.normalize(map),
127          map <- lazy_put_activity_defaults(map, fake),
128          {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
129          {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
130          {:ok, map} <- MRF.filter(map),
131          {recipients, _, _} = get_recipients(map),
132          {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
133          {:containment, :ok} <- {:containment, Containment.contain_child(map)},
134          {:ok, map, object} <- insert_full_object(map),
135          {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
136       # Splice in the child object if we have one.
137       activity = Maps.put_if_present(activity, :object, object)
138
139       ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
140         Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
141       end)
142
143       {:ok, activity}
144     else
145       %Activity{} = activity ->
146         {:ok, activity}
147
148       {:actor_check, _} ->
149         {:error, false}
150
151       {:containment, _} = error ->
152         error
153
154       {:error, _} = error ->
155         error
156
157       {:fake, true, map, recipients} ->
158         activity = %Activity{
159           data: map,
160           local: local,
161           actor: map["actor"],
162           recipients: recipients,
163           id: "pleroma:fakeid"
164         }
165
166         Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
167         {:ok, activity}
168
169       {:remote_limit_pass, _} ->
170         {:error, :remote_limit}
171
172       {:reject, _} = e ->
173         {:error, e}
174     end
175   end
176
177   defp insert_activity_with_expiration(data, local, recipients) do
178     struct = %Activity{
179       data: data,
180       local: local,
181       actor: data["actor"],
182       recipients: recipients
183     }
184
185     with {:ok, activity} <- Repo.insert(struct) do
186       maybe_create_activity_expiration(activity)
187     end
188   end
189
190   def notify_and_stream(activity) do
191     Notification.create_notifications(activity)
192
193     original_activity =
194       case activity do
195         %{data: %{"type" => "Update"}, object: %{data: %{"id" => id}}} ->
196           Activity.get_create_by_object_ap_id_with_object(id)
197
198         _ ->
199           activity
200       end
201
202     conversation = create_or_bump_conversation(original_activity, original_activity.actor)
203     participations = get_participations(conversation)
204     stream_out(activity)
205     stream_out_participations(participations)
206   end
207
208   defp maybe_create_activity_expiration(
209          %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
210        ) do
211     with {:ok, _job} <-
212            Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
213              activity_id: activity.id,
214              expires_at: expires_at
215            }) do
216       {:ok, activity}
217     end
218   end
219
220   defp maybe_create_activity_expiration(activity), do: {:ok, activity}
221
222   defp create_or_bump_conversation(activity, actor) do
223     with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
224          %User{} = user <- User.get_cached_by_ap_id(actor) do
225       Participation.mark_as_read(user, conversation)
226       {:ok, conversation}
227     end
228   end
229
230   defp get_participations({:ok, conversation}) do
231     conversation
232     |> Repo.preload(:participations, force: true)
233     |> Map.get(:participations)
234   end
235
236   defp get_participations(_), do: []
237
238   def stream_out_participations(participations) do
239     participations =
240       participations
241       |> Repo.preload(:user)
242
243     Streamer.stream("participation", participations)
244   end
245
246   @impl true
247   def stream_out_participations(%Object{data: %{"context" => context}}, user) do
248     with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
249       conversation = Repo.preload(conversation, :participations)
250
251       last_activity_id =
252         fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
253           user: user,
254           blocking_user: user
255         })
256
257       if last_activity_id do
258         stream_out_participations(conversation.participations)
259       end
260     end
261   end
262
263   @impl true
264   def stream_out_participations(_, _), do: :noop
265
266   @impl true
267   def stream_out(%Activity{data: %{"type" => data_type}} = activity)
268       when data_type in ["Create", "Announce", "Delete", "Update"] do
269     activity
270     |> Topics.get_activity_topics()
271     |> Streamer.stream(activity)
272   end
273
274   @impl true
275   def stream_out(_activity) do
276     :noop
277   end
278
279   @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
280   def create(params, fake \\ false) do
281     with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
282       result
283     end
284   end
285
286   defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
287     additional = params[:additional] || %{}
288     # only accept false as false value
289     local = !(params[:local] == false)
290     published = params[:published]
291     quick_insert? = Config.get([:env]) == :benchmark
292
293     create_data =
294       make_create_data(
295         %{to: to, actor: actor, published: published, context: context, object: object},
296         additional
297       )
298
299     with {:ok, activity} <- insert(create_data, local, fake),
300          {:fake, false, activity} <- {:fake, fake, activity},
301          _ <- increase_replies_count_if_reply(create_data),
302          {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
303          {:ok, _actor} <- increase_note_count_if_public(actor, activity),
304          {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
305          _ <- notify_and_stream(activity),
306          :ok <- maybe_schedule_poll_notifications(activity),
307          :ok <- maybe_federate(activity) do
308       {:ok, activity}
309     else
310       {:quick_insert, true, activity} ->
311         {:ok, activity}
312
313       {:fake, true, activity} ->
314         {:ok, activity}
315
316       {:error, message} ->
317         Repo.rollback(message)
318     end
319   end
320
321   defp maybe_schedule_poll_notifications(activity) do
322     PollWorker.schedule_poll_end(activity)
323     :ok
324   end
325
326   @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
327   def listen(%{to: to, actor: actor, context: context, object: object} = params) do
328     additional = params[:additional] || %{}
329     # only accept false as false value
330     local = !(params[:local] == false)
331     published = params[:published]
332
333     listen_data =
334       make_listen_data(
335         %{to: to, actor: actor, published: published, context: context, object: object},
336         additional
337       )
338
339     with {:ok, activity} <- insert(listen_data, local),
340          _ <- notify_and_stream(activity),
341          :ok <- maybe_federate(activity) do
342       {:ok, activity}
343     end
344   end
345
346   @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
347           {:ok, Activity.t()} | nil | {:error, any()}
348   def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
349     with {:ok, result} <-
350            Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
351       result
352     end
353   end
354
355   defp do_unfollow(follower, followed, activity_id, local) do
356     with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
357          {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
358          unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
359          {:ok, activity} <- insert(unfollow_data, local),
360          _ <- notify_and_stream(activity),
361          :ok <- maybe_federate(activity) do
362       {:ok, activity}
363     else
364       nil -> nil
365       {:error, error} -> Repo.rollback(error)
366     end
367   end
368
369   @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
370   def flag(params) do
371     with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
372       result
373     end
374   end
375
376   defp do_flag(
377          %{
378            actor: actor,
379            context: _context,
380            account: account,
381            statuses: statuses,
382            content: content
383          } = params
384        ) do
385     # only accept false as false value
386     local = !(params[:local] == false)
387     forward = !(params[:forward] == false)
388
389     additional = params[:additional] || %{}
390
391     additional =
392       if forward do
393         Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
394       else
395         Map.merge(additional, %{"to" => [], "cc" => []})
396       end
397
398     with flag_data <- make_flag_data(params, additional),
399          {:ok, activity} <- insert(flag_data, local),
400          {:ok, stripped_activity} <- strip_report_status_data(activity),
401          _ <- notify_and_stream(activity),
402          :ok <-
403            maybe_federate(stripped_activity) do
404       User.all_users_with_privilege(:reports_manage_reports)
405       |> Enum.filter(fn user -> user.ap_id != actor end)
406       |> Enum.filter(fn user -> not is_nil(user.email) end)
407       |> Enum.each(fn privileged_user ->
408         privileged_user
409         |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
410         |> Pleroma.Emails.Mailer.deliver_async()
411       end)
412
413       {:ok, activity}
414     else
415       {:error, error} -> Repo.rollback(error)
416     end
417   end
418
419   @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
420   def move(%User{} = origin, %User{} = target, local \\ true) do
421     params = %{
422       "type" => "Move",
423       "actor" => origin.ap_id,
424       "object" => origin.ap_id,
425       "target" => target.ap_id,
426       "to" => [origin.follower_address]
427     }
428
429     with true <- origin.ap_id in target.also_known_as,
430          {:ok, activity} <- insert(params, local),
431          _ <- notify_and_stream(activity) do
432       maybe_federate(activity)
433
434       BackgroundWorker.enqueue("move_following", %{
435         "origin_id" => origin.id,
436         "target_id" => target.id
437       })
438
439       {:ok, activity}
440     else
441       false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
442       err -> err
443     end
444   end
445
446   def fetch_activities_for_context_query(context, opts) do
447     public = [Constants.as_public()]
448
449     recipients =
450       if opts[:user],
451         do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
452         else: public
453
454     from(activity in Activity)
455     |> maybe_preload_objects(opts)
456     |> maybe_preload_bookmarks(opts)
457     |> maybe_set_thread_muted_field(opts)
458     |> restrict_blocked(opts)
459     |> restrict_blockers_visibility(opts)
460     |> restrict_recipients(recipients, opts[:user])
461     |> restrict_filtered(opts)
462     |> where(
463       [activity],
464       fragment(
465         "?->>'type' = ? and ?->>'context' = ?",
466         activity.data,
467         "Create",
468         activity.data,
469         ^context
470       )
471     )
472     |> exclude_poll_votes(opts)
473     |> exclude_id(opts)
474     |> order_by([activity], desc: activity.id)
475   end
476
477   @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
478   def fetch_activities_for_context(context, opts \\ %{}) do
479     context
480     |> fetch_activities_for_context_query(opts)
481     |> Repo.all()
482   end
483
484   @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
485           FlakeId.Ecto.CompatType.t() | nil
486   def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
487     context
488     |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
489     |> restrict_visibility(%{visibility: "direct"})
490     |> limit(1)
491     |> select([a], a.id)
492     |> Repo.one()
493   end
494
495   defp fetch_paginated_optimized(query, opts, pagination) do
496     # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
497     #   and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
498     opts = Map.put(opts, :skip_extra_order, true)
499
500     Pagination.fetch_paginated(query, opts, pagination)
501   end
502
503   def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
504     list_memberships = Pleroma.List.memberships(opts[:user])
505
506     fetch_activities_query(recipients ++ list_memberships, opts)
507     |> fetch_paginated_optimized(opts, pagination)
508     |> Enum.reverse()
509     |> maybe_update_cc(list_memberships, opts[:user])
510   end
511
512   @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
513   def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
514     includes_local_public = Map.get(opts, :includes_local_public, false)
515
516     opts = Map.delete(opts, :user)
517
518     intended_recipients =
519       if includes_local_public do
520         [Constants.as_public(), as_local_public()]
521       else
522         [Constants.as_public()]
523       end
524
525     intended_recipients
526     |> fetch_activities_query(opts)
527     |> restrict_unlisted(opts)
528     |> fetch_paginated_optimized(opts, pagination)
529   end
530
531   @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
532   def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
533     opts
534     |> Map.put(:restrict_unlisted, true)
535     |> fetch_public_or_unlisted_activities(pagination)
536   end
537
538   @valid_visibilities ~w[direct unlisted public private]
539
540   defp restrict_visibility(query, %{visibility: visibility})
541        when is_list(visibility) do
542     if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
543       from(
544         a in query,
545         where:
546           fragment(
547             "activity_visibility(?, ?, ?) = ANY (?)",
548             a.actor,
549             a.recipients,
550             a.data,
551             ^visibility
552           )
553       )
554     else
555       Logger.error("Could not restrict visibility to #{visibility}")
556     end
557   end
558
559   defp restrict_visibility(query, %{visibility: visibility})
560        when visibility in @valid_visibilities do
561     from(
562       a in query,
563       where:
564         fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
565     )
566   end
567
568   defp restrict_visibility(_query, %{visibility: visibility})
569        when visibility not in @valid_visibilities do
570     Logger.error("Could not restrict visibility to #{visibility}")
571   end
572
573   defp restrict_visibility(query, _visibility), do: query
574
575   defp exclude_visibility(query, %{exclude_visibilities: visibility})
576        when is_list(visibility) do
577     if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
578       from(
579         a in query,
580         where:
581           not fragment(
582             "activity_visibility(?, ?, ?) = ANY (?)",
583             a.actor,
584             a.recipients,
585             a.data,
586             ^visibility
587           )
588       )
589     else
590       Logger.error("Could not exclude visibility to #{visibility}")
591       query
592     end
593   end
594
595   defp exclude_visibility(query, %{exclude_visibilities: visibility})
596        when visibility in @valid_visibilities do
597     from(
598       a in query,
599       where:
600         not fragment(
601           "activity_visibility(?, ?, ?) = ?",
602           a.actor,
603           a.recipients,
604           a.data,
605           ^visibility
606         )
607     )
608   end
609
610   defp exclude_visibility(query, %{exclude_visibilities: visibility})
611        when visibility not in [nil | @valid_visibilities] do
612     Logger.error("Could not exclude visibility to #{visibility}")
613     query
614   end
615
616   defp exclude_visibility(query, _visibility), do: query
617
618   defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
619     do: query
620
621   defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
622     do: query
623
624   defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
625     local_public = as_local_public()
626
627     from(
628       a in query,
629       where: fragment("thread_visibility(?, (?)->>'id', ?) = true", ^ap_id, a.data, ^local_public)
630     )
631   end
632
633   defp restrict_thread_visibility(query, _, _), do: query
634
635   def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
636     params =
637       params
638       |> Map.put(:user, reading_user)
639       |> Map.put(:actor_id, user.ap_id)
640
641     %{
642       godmode: params[:godmode],
643       reading_user: reading_user
644     }
645     |> user_activities_recipients()
646     |> fetch_activities(params)
647     |> Enum.reverse()
648   end
649
650   def fetch_user_activities(user, reading_user, params \\ %{})
651
652   def fetch_user_activities(user, reading_user, %{total: true} = params) do
653     result = fetch_activities_for_user(user, reading_user, params)
654
655     Keyword.put(result, :items, Enum.reverse(result[:items]))
656   end
657
658   def fetch_user_activities(user, reading_user, params) do
659     user
660     |> fetch_activities_for_user(reading_user, params)
661     |> Enum.reverse()
662   end
663
664   defp fetch_activities_for_user(user, reading_user, params) do
665     params =
666       params
667       |> Map.put(:type, ["Create", "Announce"])
668       |> Map.put(:user, reading_user)
669       |> Map.put(:actor_id, user.ap_id)
670       |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
671
672     params =
673       if User.blocks?(reading_user, user) do
674         params
675       else
676         params
677         |> Map.put(:blocking_user, reading_user)
678         |> Map.put(:muting_user, reading_user)
679       end
680
681     pagination_type = Map.get(params, :pagination_type) || :keyset
682
683     %{
684       godmode: params[:godmode],
685       reading_user: reading_user
686     }
687     |> user_activities_recipients()
688     |> fetch_activities(params, pagination_type)
689   end
690
691   def fetch_statuses(reading_user, %{total: true} = params) do
692     result = fetch_activities_for_reading_user(reading_user, params)
693     Keyword.put(result, :items, Enum.reverse(result[:items]))
694   end
695
696   def fetch_statuses(reading_user, params) do
697     reading_user
698     |> fetch_activities_for_reading_user(params)
699     |> Enum.reverse()
700   end
701
702   defp fetch_activities_for_reading_user(reading_user, params) do
703     params = Map.put(params, :type, ["Create", "Announce"])
704
705     %{
706       godmode: params[:godmode],
707       reading_user: reading_user
708     }
709     |> user_activities_recipients()
710     |> fetch_activities(params, :offset)
711   end
712
713   defp user_activities_recipients(%{godmode: true}), do: []
714
715   defp user_activities_recipients(%{reading_user: reading_user}) do
716     if not is_nil(reading_user) and reading_user.local do
717       [
718         Constants.as_public(),
719         as_local_public(),
720         reading_user.ap_id | User.following(reading_user)
721       ]
722     else
723       [Constants.as_public()]
724     end
725   end
726
727   defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
728     raise "Can't use the child object without preloading!"
729   end
730
731   defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
732     from(
733       [activity, object] in query,
734       where:
735         fragment(
736           "?->>'type' != ? or ?->>'actor' != ?",
737           activity.data,
738           "Announce",
739           object.data,
740           ^actor
741         )
742     )
743   end
744
745   defp restrict_announce_object_actor(query, _), do: query
746
747   defp restrict_since(query, %{since_id: ""}), do: query
748
749   defp restrict_since(query, %{since_id: since_id}) do
750     from(activity in query, where: activity.id > ^since_id)
751   end
752
753   defp restrict_since(query, _), do: query
754
755   defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
756     raise_on_missing_preload()
757   end
758
759   defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
760     from(
761       [_activity, object] in query,
762       where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
763     )
764   end
765
766   defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
767     restrict_embedded_tag_any(query, %{tag: tag})
768   end
769
770   defp restrict_embedded_tag_all(query, _), do: query
771
772   defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
773     raise_on_missing_preload()
774   end
775
776   defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
777     from(
778       [_activity, object] in query,
779       where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
780     )
781   end
782
783   defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
784     restrict_embedded_tag_any(query, %{tag: [tag]})
785   end
786
787   defp restrict_embedded_tag_any(query, _), do: query
788
789   defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
790     raise_on_missing_preload()
791   end
792
793   defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
794     from(
795       [_activity, object] in query,
796       where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
797     )
798   end
799
800   defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
801        when is_binary(tag_reject) do
802     restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
803   end
804
805   defp restrict_embedded_tag_reject_any(query, _), do: query
806
807   defp object_ids_query_for_tags(tags) do
808     from(hto in "hashtags_objects")
809     |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
810     |> where([hto, ht], ht.name in ^tags)
811     |> select([hto], hto.object_id)
812     |> distinct([hto], true)
813   end
814
815   defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
816     raise_on_missing_preload()
817   end
818
819   defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
820     restrict_hashtag_any(query, %{tag: single_tag})
821   end
822
823   defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
824     from(
825       [_activity, object] in query,
826       where:
827         fragment(
828           """
829           (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
830             ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
831               AND hashtags_objects.object_id = ?) @> ?
832           """,
833           ^tags,
834           object.id,
835           ^tags
836         )
837     )
838   end
839
840   defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
841     restrict_hashtag_all(query, %{tag_all: [tag]})
842   end
843
844   defp restrict_hashtag_all(query, _), do: query
845
846   defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
847     raise_on_missing_preload()
848   end
849
850   defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
851     hashtag_ids =
852       from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
853       |> Repo.all()
854
855     # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
856     from(
857       [_activity, object] in query,
858       join: hto in "hashtags_objects",
859       on: hto.object_id == object.id,
860       where: hto.hashtag_id in ^hashtag_ids,
861       distinct: [desc: object.id],
862       order_by: [desc: object.id]
863     )
864   end
865
866   defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
867     restrict_hashtag_any(query, %{tag: [tag]})
868   end
869
870   defp restrict_hashtag_any(query, _), do: query
871
872   defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
873     raise_on_missing_preload()
874   end
875
876   defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
877     from(
878       [_activity, object] in query,
879       where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
880     )
881   end
882
883   defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
884     restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
885   end
886
887   defp restrict_hashtag_reject_any(query, _), do: query
888
889   defp raise_on_missing_preload do
890     raise "Can't use the child object without preloading!"
891   end
892
893   defp restrict_recipients(query, [], _user), do: query
894
895   defp restrict_recipients(query, recipients, nil) do
896     from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
897   end
898
899   defp restrict_recipients(query, recipients, user) do
900     from(
901       activity in query,
902       where: fragment("? && ?", ^recipients, activity.recipients),
903       or_where: activity.actor == ^user.ap_id
904     )
905   end
906
907   defp restrict_local(query, %{local_only: true}) do
908     from(activity in query, where: activity.local == true)
909   end
910
911   defp restrict_local(query, _), do: query
912
913   defp restrict_remote(query, %{remote: true}) do
914     from(activity in query, where: activity.local == false)
915   end
916
917   defp restrict_remote(query, _), do: query
918
919   defp restrict_actor(query, %{actor_id: actor_id}) do
920     from(activity in query, where: activity.actor == ^actor_id)
921   end
922
923   defp restrict_actor(query, _), do: query
924
925   defp restrict_type(query, %{type: type}) when is_binary(type) do
926     from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
927   end
928
929   defp restrict_type(query, %{type: type}) do
930     from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
931   end
932
933   defp restrict_type(query, _), do: query
934
935   defp restrict_state(query, %{state: state}) do
936     from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
937   end
938
939   defp restrict_state(query, _), do: query
940
941   defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
942     from(
943       [_activity, object] in query,
944       where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
945     )
946   end
947
948   defp restrict_favorited_by(query, _), do: query
949
950   defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
951     raise "Can't use the child object without preloading!"
952   end
953
954   defp restrict_media(query, %{only_media: true}) do
955     from(
956       [activity, object] in query,
957       where: fragment("(?)->>'type' = ?", activity.data, "Create"),
958       where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
959     )
960   end
961
962   defp restrict_media(query, _), do: query
963
964   defp restrict_replies(query, %{exclude_replies: true}) do
965     from(
966       [_activity, object] in query,
967       where: fragment("?->>'inReplyTo' is null", object.data)
968     )
969   end
970
971   defp restrict_replies(query, %{
972          reply_filtering_user: %User{} = user,
973          reply_visibility: "self"
974        }) do
975     from(
976       [activity, object] in query,
977       where:
978         fragment(
979           "?->>'inReplyTo' is null OR ? = ANY(?)",
980           object.data,
981           ^user.ap_id,
982           activity.recipients
983         )
984     )
985   end
986
987   defp restrict_replies(query, %{
988          reply_filtering_user: %User{} = user,
989          reply_visibility: "following"
990        }) do
991     from(
992       [activity, object] in query,
993       where:
994         fragment(
995           """
996           ?->>'type' != 'Create'     -- This isn't a Create
997           OR ?->>'inReplyTo' is null -- this isn't a reply
998           OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
999                                      -- unless they are the author (because authors
1000                                      -- are also part of the recipients). This leads
1001                                      -- to a bug that self-replies by friends won't
1002                                      -- show up.
1003           OR ? = ?                   -- The actor is us
1004           """,
1005           activity.data,
1006           object.data,
1007           ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
1008           activity.recipients,
1009           activity.actor,
1010           activity.actor,
1011           ^user.ap_id
1012         )
1013     )
1014   end
1015
1016   defp restrict_replies(query, _), do: query
1017
1018   defp restrict_reblogs(query, %{exclude_reblogs: true}) do
1019     from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
1020   end
1021
1022   defp restrict_reblogs(query, _), do: query
1023
1024   defp restrict_muted(query, %{with_muted: true}), do: query
1025
1026   defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
1027     mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
1028
1029     query =
1030       from([activity] in query,
1031         where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
1032         where:
1033           fragment(
1034             "not (?->'to' \\?| ?) or ? = ?",
1035             activity.data,
1036             ^mutes,
1037             activity.actor,
1038             ^user.ap_id
1039           )
1040       )
1041
1042     unless opts[:skip_preload] do
1043       from([thread_mute: tm] in query, where: is_nil(tm.user_id))
1044     else
1045       query
1046     end
1047   end
1048
1049   defp restrict_muted(query, _), do: query
1050
1051   defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
1052     blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
1053     domain_blocks = user.domain_blocks || []
1054
1055     following_ap_ids = User.get_friends_ap_ids(user)
1056
1057     query =
1058       if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
1059
1060     from(
1061       [activity, object: o] in query,
1062       # You don't block the author
1063       where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
1064
1065       # You don't block any recipients, and didn't author the post
1066       where:
1067         fragment(
1068           "((not (? && ?)) or ? = ?)",
1069           activity.recipients,
1070           ^blocked_ap_ids,
1071           activity.actor,
1072           ^user.ap_id
1073         ),
1074
1075       # You don't block the domain of any recipients, and didn't author the post
1076       where:
1077         fragment(
1078           "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
1079           activity.recipients,
1080           ^domain_blocks,
1081           activity.actor,
1082           ^user.ap_id
1083         ),
1084
1085       # It's not a boost of a user you block
1086       where:
1087         fragment(
1088           "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1089           activity.data,
1090           activity.data,
1091           ^blocked_ap_ids
1092         ),
1093
1094       # You don't block the author's domain, and also don't follow the author
1095       where:
1096         fragment(
1097           "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
1098           activity.actor,
1099           ^domain_blocks,
1100           activity.actor,
1101           ^following_ap_ids
1102         ),
1103
1104       # Same as above, but checks the Object
1105       where:
1106         fragment(
1107           "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
1108           o.data,
1109           ^domain_blocks,
1110           o.data,
1111           ^following_ap_ids
1112         )
1113     )
1114   end
1115
1116   defp restrict_blocked(query, _), do: query
1117
1118   defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
1119     if Config.get([:activitypub, :blockers_visible]) == true do
1120       query
1121     else
1122       blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
1123
1124       from(
1125         activity in query,
1126         # The author doesn't block you
1127         where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
1128
1129         # It's not a boost of a user that blocks you
1130         where:
1131           fragment(
1132             "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
1133             activity.data,
1134             activity.data,
1135             ^blocker_ap_ids
1136           )
1137       )
1138     end
1139   end
1140
1141   defp restrict_blockers_visibility(query, _), do: query
1142
1143   defp restrict_unlisted(query, %{restrict_unlisted: true}) do
1144     from(
1145       activity in query,
1146       where:
1147         fragment(
1148           "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
1149           activity.data,
1150           ^[Constants.as_public()]
1151         )
1152     )
1153   end
1154
1155   defp restrict_unlisted(query, _), do: query
1156
1157   defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
1158     from(
1159       [activity, object: o] in query,
1160       where:
1161         fragment(
1162           "(?)->>'type' = 'Create' and associated_object_id((?)) = any (?)",
1163           activity.data,
1164           activity.data,
1165           ^ids
1166         )
1167     )
1168   end
1169
1170   defp restrict_pinned(query, _), do: query
1171
1172   defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
1173     muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
1174
1175     from(
1176       activity in query,
1177       where:
1178         fragment(
1179           "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
1180           activity.data,
1181           activity.actor,
1182           ^muted_reblogs
1183         )
1184     )
1185   end
1186
1187   defp restrict_muted_reblogs(query, _), do: query
1188
1189   defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
1190     from(
1191       activity in query,
1192       where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
1193     )
1194   end
1195
1196   defp restrict_instance(query, _), do: query
1197
1198   defp restrict_filtered(query, %{user: %User{} = user}) do
1199     case Filter.compose_regex(user) do
1200       nil ->
1201         query
1202
1203       regex ->
1204         from([activity, object] in query,
1205           where:
1206             fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
1207               activity.actor == ^user.ap_id
1208         )
1209     end
1210   end
1211
1212   defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
1213     restrict_filtered(query, %{user: user})
1214   end
1215
1216   defp restrict_filtered(query, _), do: query
1217
1218   defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
1219
1220   defp exclude_poll_votes(query, _) do
1221     if has_named_binding?(query, :object) do
1222       from([activity, object: o] in query,
1223         where: fragment("not(?->>'type' = ?)", o.data, "Answer")
1224       )
1225     else
1226       query
1227     end
1228   end
1229
1230   defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
1231
1232   defp exclude_chat_messages(query, _) do
1233     if has_named_binding?(query, :object) do
1234       from([activity, object: o] in query,
1235         where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
1236       )
1237     else
1238       query
1239     end
1240   end
1241
1242   defp exclude_invisible_actors(query, %{type: "Flag"}), do: query
1243   defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
1244
1245   defp exclude_invisible_actors(query, _opts) do
1246     query
1247     |> join(:inner, [activity], u in User,
1248       as: :u,
1249       on: activity.actor == u.ap_id and u.invisible == false
1250     )
1251   end
1252
1253   defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
1254     from(activity in query, where: activity.id != ^id)
1255   end
1256
1257   defp exclude_id(query, _), do: query
1258
1259   defp maybe_preload_objects(query, %{skip_preload: true}), do: query
1260
1261   defp maybe_preload_objects(query, _) do
1262     query
1263     |> Activity.with_preloaded_object()
1264   end
1265
1266   defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
1267
1268   defp maybe_preload_bookmarks(query, opts) do
1269     query
1270     |> Activity.with_preloaded_bookmark(opts[:user])
1271   end
1272
1273   defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
1274     query
1275     |> Activity.with_preloaded_report_notes()
1276   end
1277
1278   defp maybe_preload_report_notes(query, _), do: query
1279
1280   defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
1281
1282   defp maybe_set_thread_muted_field(query, opts) do
1283     query
1284     |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
1285   end
1286
1287   defp maybe_order(query, %{order: :desc}) do
1288     query
1289     |> order_by(desc: :id)
1290   end
1291
1292   defp maybe_order(query, %{order: :asc}) do
1293     query
1294     |> order_by(asc: :id)
1295   end
1296
1297   defp maybe_order(query, _), do: query
1298
1299   defp normalize_fetch_activities_query_opts(opts) do
1300     Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
1301       case opts[key] do
1302         value when is_bitstring(value) ->
1303           Map.put(opts, key, Hashtag.normalize_name(value))
1304
1305         value when is_list(value) ->
1306           normalized_value =
1307             value
1308             |> Enum.map(&Hashtag.normalize_name/1)
1309             |> Enum.uniq()
1310
1311           Map.put(opts, key, normalized_value)
1312
1313         _ ->
1314           opts
1315       end
1316     end)
1317   end
1318
1319   defp fetch_activities_query_ap_ids_ops(opts) do
1320     source_user = opts[:muting_user]
1321     ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
1322
1323     ap_id_relationships =
1324       if opts[:blocking_user] && opts[:blocking_user] == source_user do
1325         [:block | ap_id_relationships]
1326       else
1327         ap_id_relationships
1328       end
1329
1330     preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
1331
1332     restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
1333     restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
1334
1335     restrict_muted_reblogs_opts =
1336       Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
1337
1338     {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
1339   end
1340
1341   def fetch_activities_query(recipients, opts \\ %{}) do
1342     opts = normalize_fetch_activities_query_opts(opts)
1343
1344     {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
1345       fetch_activities_query_ap_ids_ops(opts)
1346
1347     config = %{
1348       skip_thread_containment: Config.get([:instance, :skip_thread_containment])
1349     }
1350
1351     query =
1352       Activity
1353       |> maybe_preload_objects(opts)
1354       |> maybe_preload_bookmarks(opts)
1355       |> maybe_preload_report_notes(opts)
1356       |> maybe_set_thread_muted_field(opts)
1357       |> maybe_order(opts)
1358       |> restrict_recipients(recipients, opts[:user])
1359       |> restrict_replies(opts)
1360       |> restrict_since(opts)
1361       |> restrict_local(opts)
1362       |> restrict_remote(opts)
1363       |> restrict_actor(opts)
1364       |> restrict_type(opts)
1365       |> restrict_state(opts)
1366       |> restrict_favorited_by(opts)
1367       |> restrict_blocked(restrict_blocked_opts)
1368       |> restrict_blockers_visibility(opts)
1369       |> restrict_muted(restrict_muted_opts)
1370       |> restrict_filtered(opts)
1371       |> restrict_media(opts)
1372       |> restrict_visibility(opts)
1373       |> restrict_thread_visibility(opts, config)
1374       |> restrict_reblogs(opts)
1375       |> restrict_pinned(opts)
1376       |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
1377       |> restrict_instance(opts)
1378       |> restrict_announce_object_actor(opts)
1379       |> restrict_filtered(opts)
1380       |> maybe_restrict_deactivated_users(opts)
1381       |> exclude_poll_votes(opts)
1382       |> exclude_chat_messages(opts)
1383       |> exclude_invisible_actors(opts)
1384       |> exclude_visibility(opts)
1385
1386     if Config.feature_enabled?(:improved_hashtag_timeline) do
1387       query
1388       |> restrict_hashtag_any(opts)
1389       |> restrict_hashtag_all(opts)
1390       |> restrict_hashtag_reject_any(opts)
1391     else
1392       query
1393       |> restrict_embedded_tag_any(opts)
1394       |> restrict_embedded_tag_all(opts)
1395       |> restrict_embedded_tag_reject_any(opts)
1396     end
1397   end
1398
1399   @doc """
1400   Fetch favorites activities of user with order by sort adds to favorites
1401   """
1402   @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
1403   def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
1404     user.ap_id
1405     |> Activity.Queries.by_actor()
1406     |> Activity.Queries.by_type("Like")
1407     |> Activity.with_joined_object()
1408     |> Object.with_joined_activity()
1409     |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
1410     |> order_by([like, _, _], desc_nulls_last: like.id)
1411     |> Pagination.fetch_paginated(
1412       Map.merge(params, %{skip_order: true}),
1413       pagination
1414     )
1415   end
1416
1417   defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
1418     Enum.map(activities, fn
1419       %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
1420         if Enum.any?(bcc, &(&1 in list_memberships)) do
1421           update_in(activity.data["cc"], &[user_ap_id | &1])
1422         else
1423           activity
1424         end
1425
1426       activity ->
1427         activity
1428     end)
1429   end
1430
1431   defp maybe_update_cc(activities, _, _), do: activities
1432
1433   defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
1434     from(activity in query,
1435       where:
1436         fragment("? && ?", activity.recipients, ^recipients) or
1437           (fragment("? && ?", activity.recipients, ^recipients_with_public) and
1438              ^Constants.as_public() in activity.recipients)
1439     )
1440   end
1441
1442   def fetch_activities_bounded(
1443         recipients,
1444         recipients_with_public,
1445         opts \\ %{},
1446         pagination \\ :keyset
1447       ) do
1448     fetch_activities_query([], opts)
1449     |> fetch_activities_bounded_query(recipients, recipients_with_public)
1450     |> Pagination.fetch_paginated(opts, pagination)
1451     |> Enum.reverse()
1452   end
1453
1454   @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
1455   def upload(file, opts \\ []) do
1456     with {:ok, data} <- Upload.store(sanitize_upload_file(file), opts) do
1457       obj_data = Maps.put_if_present(data, "actor", opts[:actor])
1458
1459       Repo.insert(%Object{data: obj_data})
1460     end
1461   end
1462
1463   defp sanitize_upload_file(%Plug.Upload{filename: filename} = upload) when is_binary(filename) do
1464     %Plug.Upload{
1465       upload
1466       | filename: Path.basename(filename)
1467     }
1468   end
1469
1470   defp sanitize_upload_file(upload), do: upload
1471
1472   @spec get_actor_url(any()) :: binary() | nil
1473   defp get_actor_url(url) when is_binary(url), do: url
1474   defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
1475
1476   defp get_actor_url(url) when is_list(url) do
1477     url
1478     |> List.first()
1479     |> get_actor_url()
1480   end
1481
1482   defp get_actor_url(_url), do: nil
1483
1484   defp normalize_image(%{"url" => url}) do
1485     %{
1486       "type" => "Image",
1487       "url" => [%{"href" => url}]
1488     }
1489   end
1490
1491   defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
1492   defp normalize_image(_), do: nil
1493
1494   defp object_to_user_data(data, additional) do
1495     fields =
1496       data
1497       |> Map.get("attachment", [])
1498       |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
1499       |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
1500
1501     emojis =
1502       data
1503       |> Map.get("tag", [])
1504       |> Enum.filter(fn
1505         %{"type" => "Emoji"} -> true
1506         _ -> false
1507       end)
1508       |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
1509         {String.trim(name, ":"), url}
1510       end)
1511
1512     is_locked = data["manuallyApprovesFollowers"] || false
1513     capabilities = data["capabilities"] || %{}
1514     accepts_chat_messages = capabilities["acceptsChatMessages"]
1515     data = Transmogrifier.maybe_fix_user_object(data)
1516     is_discoverable = data["discoverable"] || false
1517     invisible = data["invisible"] || false
1518     actor_type = data["type"] || "Person"
1519
1520     featured_address = data["featured"]
1521     {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
1522
1523     public_key =
1524       if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
1525         data["publicKey"]["publicKeyPem"]
1526       end
1527
1528     shared_inbox =
1529       if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
1530         data["endpoints"]["sharedInbox"]
1531       end
1532
1533     birthday =
1534       if is_binary(data["vcard:bday"]) do
1535         case Date.from_iso8601(data["vcard:bday"]) do
1536           {:ok, date} -> date
1537           {:error, _} -> nil
1538         end
1539       end
1540
1541     show_birthday = !!birthday
1542
1543     # if WebFinger request was already done, we probably have acct, otherwise
1544     # we request WebFinger here
1545     nickname = additional[:nickname_from_acct] || generate_nickname(data)
1546
1547     %{
1548       ap_id: data["id"],
1549       uri: get_actor_url(data["url"]),
1550       ap_enabled: true,
1551       banner: normalize_image(data["image"]),
1552       fields: fields,
1553       emoji: emojis,
1554       is_locked: is_locked,
1555       is_discoverable: is_discoverable,
1556       invisible: invisible,
1557       avatar: normalize_image(data["icon"]),
1558       name: data["name"],
1559       follower_address: data["followers"],
1560       following_address: data["following"],
1561       featured_address: featured_address,
1562       bio: data["summary"] || "",
1563       actor_type: actor_type,
1564       also_known_as: Map.get(data, "alsoKnownAs", []),
1565       public_key: public_key,
1566       inbox: data["inbox"],
1567       shared_inbox: shared_inbox,
1568       accepts_chat_messages: accepts_chat_messages,
1569       birthday: birthday,
1570       show_birthday: show_birthday,
1571       pinned_objects: pinned_objects,
1572       nickname: nickname
1573     }
1574   end
1575
1576   defp generate_nickname(%{"preferredUsername" => username} = data) when is_binary(username) do
1577     generated = "#{username}@#{URI.parse(data["id"]).host}"
1578
1579     if Config.get([WebFinger, :update_nickname_on_user_fetch]) do
1580       case WebFinger.finger(generated) do
1581         {:ok, %{"subject" => "acct:" <> acct}} -> acct
1582         _ -> generated
1583       end
1584     else
1585       generated
1586     end
1587   end
1588
1589   # nickname can be nil because of virtual actors
1590   defp generate_nickname(_), do: nil
1591
1592   def fetch_follow_information_for_user(user) do
1593     with {:ok, following_data} <-
1594            Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
1595          {:ok, hide_follows} <- collection_private(following_data),
1596          {:ok, followers_data} <-
1597            Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
1598          {:ok, hide_followers} <- collection_private(followers_data) do
1599       {:ok,
1600        %{
1601          hide_follows: hide_follows,
1602          follower_count: normalize_counter(followers_data["totalItems"]),
1603          following_count: normalize_counter(following_data["totalItems"]),
1604          hide_followers: hide_followers
1605        }}
1606     else
1607       {:error, _} = e -> e
1608       e -> {:error, e}
1609     end
1610   end
1611
1612   defp normalize_counter(counter) when is_integer(counter), do: counter
1613   defp normalize_counter(_), do: 0
1614
1615   def maybe_update_follow_information(user_data) do
1616     with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
1617          {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
1618          {_, true} <-
1619            {:collections_available,
1620             !!(user_data[:following_address] && user_data[:follower_address])},
1621          {:ok, info} <-
1622            fetch_follow_information_for_user(user_data) do
1623       info = Map.merge(user_data[:info] || %{}, info)
1624
1625       user_data
1626       |> Map.put(:info, info)
1627     else
1628       {:user_type_check, false} ->
1629         user_data
1630
1631       {:collections_available, false} ->
1632         user_data
1633
1634       {:enabled, false} ->
1635         user_data
1636
1637       e ->
1638         Logger.error(
1639           "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
1640         )
1641
1642         user_data
1643     end
1644   end
1645
1646   defp collection_private(%{"first" => %{"type" => type}})
1647        when type in ["CollectionPage", "OrderedCollectionPage"],
1648        do: {:ok, false}
1649
1650   defp collection_private(%{"first" => first}) do
1651     with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
1652            Fetcher.fetch_and_contain_remote_object_from_id(first) do
1653       {:ok, false}
1654     else
1655       {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
1656       {:error, _} = e -> e
1657       e -> {:error, e}
1658     end
1659   end
1660
1661   defp collection_private(_data), do: {:ok, true}
1662
1663   def user_data_from_user_object(data, additional \\ []) do
1664     with {:ok, data} <- MRF.filter(data) do
1665       {:ok, object_to_user_data(data, additional)}
1666     else
1667       e -> {:error, e}
1668     end
1669   end
1670
1671   def fetch_and_prepare_user_from_ap_id(ap_id, additional \\ []) do
1672     with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
1673          {:ok, data} <- user_data_from_user_object(data, additional) do
1674       {:ok, maybe_update_follow_information(data)}
1675     else
1676       # If this has been deleted, only log a debug and not an error
1677       {:error, "Object has been deleted" = e} ->
1678         Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1679         {:error, e}
1680
1681       {:error, {:reject, reason} = e} ->
1682         Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
1683         {:error, e}
1684
1685       {:error, e} ->
1686         Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
1687         {:error, e}
1688     end
1689   end
1690
1691   def maybe_handle_clashing_nickname(data) do
1692     with nickname when is_binary(nickname) <- data[:nickname],
1693          %User{} = old_user <- User.get_by_nickname(nickname),
1694          {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
1695       Logger.info(
1696         "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
1697       )
1698
1699       old_user
1700       |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
1701       |> User.update_and_set_cache()
1702     else
1703       {:ap_id_comparison, true} ->
1704         Logger.info(
1705           "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
1706         )
1707
1708       _ ->
1709         nil
1710     end
1711   end
1712
1713   def pin_data_from_featured_collection(%{
1714         "type" => type,
1715         "orderedItems" => objects
1716       })
1717       when type in ["OrderedCollection", "Collection"] do
1718     Map.new(objects, fn
1719       %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()}
1720       object_ap_id when is_binary(object_ap_id) -> {object_ap_id, NaiveDateTime.utc_now()}
1721     end)
1722   end
1723
1724   def fetch_and_prepare_featured_from_ap_id(nil) do
1725     {:ok, %{}}
1726   end
1727
1728   def fetch_and_prepare_featured_from_ap_id(ap_id) do
1729     with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
1730       {:ok, pin_data_from_featured_collection(data)}
1731     else
1732       e ->
1733         Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
1734         {:ok, %{}}
1735     end
1736   end
1737
1738   def pinned_fetch_task(nil), do: nil
1739
1740   def pinned_fetch_task(%{pinned_objects: pins}) do
1741     if Enum.all?(pins, fn {ap_id, _} ->
1742          Object.get_cached_by_ap_id(ap_id) ||
1743            match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
1744        end) do
1745       :ok
1746     else
1747       :error
1748     end
1749   end
1750
1751   def make_user_from_ap_id(ap_id, additional \\ []) do
1752     user = User.get_cached_by_ap_id(ap_id)
1753
1754     if user && !User.ap_enabled?(user) do
1755       Transmogrifier.upgrade_user_from_ap_id(ap_id)
1756     else
1757       with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, additional) do
1758         {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
1759
1760         if user do
1761           user
1762           |> User.remote_user_changeset(data)
1763           |> User.update_and_set_cache()
1764         else
1765           maybe_handle_clashing_nickname(data)
1766
1767           data
1768           |> User.remote_user_changeset()
1769           |> Repo.insert()
1770           |> User.set_cache()
1771         end
1772       end
1773     end
1774   end
1775
1776   def make_user_from_nickname(nickname) do
1777     with {:ok, %{"ap_id" => ap_id, "subject" => "acct:" <> acct}} when not is_nil(ap_id) <-
1778            WebFinger.finger(nickname) do
1779       make_user_from_ap_id(ap_id, nickname_from_acct: acct)
1780     else
1781       _e -> {:error, "No AP id in WebFinger"}
1782     end
1783   end
1784
1785   # filter out broken threads
1786   defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
1787     entire_thread_visible_for_user?(activity, user)
1788   end
1789
1790   # do post-processing on a specific activity
1791   def contain_activity(%Activity{} = activity, %User{} = user) do
1792     contain_broken_threads(activity, user)
1793   end
1794
1795   def fetch_direct_messages_query do
1796     Activity
1797     |> restrict_type(%{type: "Create"})
1798     |> restrict_visibility(%{visibility: "direct"})
1799     |> order_by([activity], asc: activity.id)
1800   end
1801
1802   defp maybe_restrict_deactivated_users(activity, %{type: "Flag"}), do: activity
1803
1804   defp maybe_restrict_deactivated_users(activity, _opts),
1805     do: Activity.restrict_deactivated_users(activity)
1806 end