First
[anni] / lib / pleroma / web / streamer.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer do
6   require Logger
7
8   alias Pleroma.Activity
9   alias Pleroma.Chat.MessageReference
10   alias Pleroma.Config
11   alias Pleroma.Conversation.Participation
12   alias Pleroma.Notification
13   alias Pleroma.Object
14   alias Pleroma.User
15   alias Pleroma.Web.ActivityPub.ActivityPub
16   alias Pleroma.Web.ActivityPub.Visibility
17   alias Pleroma.Web.CommonAPI
18   alias Pleroma.Web.OAuth.Token
19   alias Pleroma.Web.Plugs.OAuthScopesPlug
20   alias Pleroma.Web.StreamerView
21
22   @mix_env Mix.env()
23   @registry Pleroma.Web.StreamerRegistry
24
25   def registry, do: @registry
26
27   @public_streams ["public", "public:local", "public:media", "public:local:media"]
28   @local_streams ["public:local", "public:local:media"]
29   @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
30
31   @doc "Expands and authorizes a stream, and registers the process for streaming."
32   @spec get_topic_and_add_socket(
33           stream :: String.t(),
34           User.t() | nil,
35           Token.t() | nil,
36           Map.t() | nil
37         ) ::
38           {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
39   def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
40     with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do
41       add_socket(topic, oauth_token)
42     end
43   end
44
45   defp can_access_stream(user, oauth_token, kind) do
46     with {_, true} <- {:restrict?, Config.restrict_unauthenticated_access?(:timelines, kind)},
47          {_, %User{id: user_id}, %Token{user_id: user_id}} <- {:user, user, oauth_token},
48          {_, true} <-
49            {:scopes,
50             OAuthScopesPlug.filter_descendants(["read:statuses"], oauth_token.scopes) != []} do
51       true
52     else
53       {:restrict?, _} ->
54         true
55
56       _ ->
57         false
58     end
59   end
60
61   @doc "Expand and authorizes a stream"
62   @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
63           {:ok, topic :: String.t()} | {:error, :bad_topic}
64   def get_topic(stream, user, oauth_token, params \\ %{})
65
66   # Allow all public steams if the instance allows unauthenticated access.
67   # Otherwise, only allow users with valid oauth tokens.
68   def get_topic(stream, user, oauth_token, _params) when stream in @public_streams do
69     kind = if stream in @local_streams, do: :local, else: :federated
70
71     if can_access_stream(user, oauth_token, kind) do
72       {:ok, stream}
73     else
74       {:error, :unauthorized}
75     end
76   end
77
78   # Allow all hashtags streams.
79   def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
80     {:ok, "hashtag:" <> tag}
81   end
82
83   # Allow remote instance streams.
84   def get_topic("public:remote", user, oauth_token, %{"instance" => instance} = _params) do
85     if can_access_stream(user, oauth_token, :federated) do
86       {:ok, "public:remote:" <> instance}
87     else
88       {:error, :unauthorized}
89     end
90   end
91
92   def get_topic("public:remote:media", user, oauth_token, %{"instance" => instance} = _params) do
93     if can_access_stream(user, oauth_token, :federated) do
94       {:ok, "public:remote:media:" <> instance}
95     else
96       {:error, :unauthorized}
97     end
98   end
99
100   # Expand user streams.
101   def get_topic(
102         stream,
103         %User{id: user_id} = user,
104         %Token{user_id: user_id} = oauth_token,
105         _params
106       )
107       when stream in @user_streams do
108     # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
109     required_scopes =
110       if stream == "user:notification" do
111         ["read:notifications"]
112       else
113         ["read:statuses"]
114       end
115
116     if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
117       {:error, :unauthorized}
118     else
119       {:ok, stream <> ":" <> to_string(user.id)}
120     end
121   end
122
123   def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
124     {:error, :unauthorized}
125   end
126
127   # List streams.
128   def get_topic(
129         "list",
130         %User{id: user_id} = user,
131         %Token{user_id: user_id} = oauth_token,
132         %{"list" => id}
133       ) do
134     cond do
135       OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
136         {:error, :unauthorized}
137
138       Pleroma.List.get(id, user) ->
139         {:ok, "list:" <> to_string(id)}
140
141       true ->
142         {:error, :bad_topic}
143     end
144   end
145
146   def get_topic("list", _user, _oauth_token, _params) do
147     {:error, :unauthorized}
148   end
149
150   def get_topic(_stream, _user, _oauth_token, _params) do
151     {:error, :bad_topic}
152   end
153
154   @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
155   def add_socket(topic, oauth_token) do
156     if should_env_send?() do
157       oauth_token_id = if oauth_token, do: oauth_token.id, else: false
158       Registry.register(@registry, topic, oauth_token_id)
159     end
160
161     {:ok, topic}
162   end
163
164   def remove_socket(topic) do
165     if should_env_send?(), do: Registry.unregister(@registry, topic)
166   end
167
168   def stream(topics, items) do
169     if should_env_send?() do
170       for topic <- List.wrap(topics), item <- List.wrap(items) do
171         spawn(fn -> do_stream(topic, item) end)
172       end
173     end
174   end
175
176   def filtered_by_user?(user, item, streamed_type \\ :activity)
177
178   def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
179     %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
180       User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
181
182     recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
183     recipients = MapSet.new(item.recipients)
184     domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
185
186     with parent <- Object.normalize(item, fetch: false) || item,
187          true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
188          true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
189          true <-
190            !(streamed_type == :activity && item.data["type"] == "Announce" &&
191                parent.data["actor"] == user.ap_id),
192          true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
193          true <- MapSet.disjoint?(recipients, recipient_blocks),
194          %{host: item_host} <- URI.parse(item.actor),
195          %{host: parent_host} <- URI.parse(parent.data["actor"]),
196          false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
197          false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
198          true <- thread_containment(item, user),
199          false <- CommonAPI.thread_muted?(user, parent) do
200       false
201     else
202       _ -> true
203     end
204   end
205
206   def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do
207     filtered_by_user?(user, activity, :notification)
208   end
209
210   defp do_stream("direct", item) do
211     recipient_topics =
212       User.get_recipients_from_activity(item)
213       |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
214
215     Enum.each(recipient_topics, fn user_topic ->
216       Logger.debug("Trying to push direct message to #{user_topic}\n\n")
217       push_to_socket(user_topic, item)
218     end)
219   end
220
221   defp do_stream("follow_relationship", item) do
222     text = StreamerView.render("follow_relationships_update.json", item)
223     user_topic = "user:#{item.follower.id}"
224
225     Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n")
226
227     Registry.dispatch(@registry, user_topic, fn list ->
228       Enum.each(list, fn {pid, _auth} ->
229         send(pid, {:text, text})
230       end)
231     end)
232   end
233
234   defp do_stream("participation", participation) do
235     user_topic = "direct:#{participation.user_id}"
236     Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
237
238     push_to_socket(user_topic, participation)
239   end
240
241   defp do_stream("list", item) do
242     # filter the recipient list if the activity is not public, see #270.
243     recipient_lists =
244       case Visibility.is_public?(item) do
245         true ->
246           Pleroma.List.get_lists_from_activity(item)
247
248         _ ->
249           Pleroma.List.get_lists_from_activity(item)
250           |> Enum.filter(fn list ->
251             owner = User.get_cached_by_id(list.user_id)
252
253             Visibility.visible_for_user?(item, owner)
254           end)
255       end
256
257     recipient_topics =
258       recipient_lists
259       |> Enum.map(fn %{id: id} -> "list:#{id}" end)
260
261     Enum.each(recipient_topics, fn list_topic ->
262       Logger.debug("Trying to push message to #{list_topic}\n\n")
263       push_to_socket(list_topic, item)
264     end)
265   end
266
267   defp do_stream(topic, %Notification{} = item)
268        when topic in ["user", "user:notification"] do
269     Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
270       Enum.each(list, fn {pid, _auth} ->
271         send(pid, {:render_with_user, StreamerView, "notification.json", item})
272       end)
273     end)
274   end
275
276   defp do_stream(topic, {user, %MessageReference{} = cm_ref})
277        when topic in ["user", "user:pleroma_chat"] do
278     topic = "#{topic}:#{user.id}"
279
280     text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
281
282     Registry.dispatch(@registry, topic, fn list ->
283       Enum.each(list, fn {pid, _auth} ->
284         send(pid, {:text, text})
285       end)
286     end)
287   end
288
289   defp do_stream("user", item) do
290     Logger.debug("Trying to push to users")
291
292     recipient_topics =
293       User.get_recipients_from_activity(item)
294       |> Enum.map(fn %{id: id} -> "user:#{id}" end)
295
296     Enum.each(recipient_topics, fn topic ->
297       push_to_socket(topic, item)
298     end)
299   end
300
301   defp do_stream(topic, item) do
302     Logger.debug("Trying to push to #{topic}")
303     Logger.debug("Pushing item to #{topic}")
304     push_to_socket(topic, item)
305   end
306
307   defp push_to_socket(topic, %Participation{} = participation) do
308     rendered = StreamerView.render("conversation.json", participation)
309
310     Registry.dispatch(@registry, topic, fn list ->
311       Enum.each(list, fn {pid, _} ->
312         send(pid, {:text, rendered})
313       end)
314     end)
315   end
316
317   defp push_to_socket(topic, %Activity{
318          data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
319        }) do
320     rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
321
322     Registry.dispatch(@registry, topic, fn list ->
323       Enum.each(list, fn {pid, _} ->
324         send(pid, {:text, rendered})
325       end)
326     end)
327   end
328
329   defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
330
331   defp push_to_socket(topic, %Activity{data: %{"type" => "Update"}} = item) do
332     create_activity =
333       Pleroma.Activity.get_create_by_object_ap_id(item.object.data["id"])
334       |> Map.put(:object, item.object)
335
336     anon_render = StreamerView.render("status_update.json", create_activity)
337
338     Registry.dispatch(@registry, topic, fn list ->
339       Enum.each(list, fn {pid, auth?} ->
340         if auth? do
341           send(pid, {:render_with_user, StreamerView, "status_update.json", create_activity})
342         else
343           send(pid, {:text, anon_render})
344         end
345       end)
346     end)
347   end
348
349   defp push_to_socket(topic, item) do
350     anon_render = StreamerView.render("update.json", item)
351
352     Registry.dispatch(@registry, topic, fn list ->
353       Enum.each(list, fn {pid, auth?} ->
354         if auth? do
355           send(pid, {:render_with_user, StreamerView, "update.json", item})
356         else
357           send(pid, {:text, anon_render})
358         end
359       end)
360     end)
361   end
362
363   defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
364
365   defp thread_containment(activity, user) do
366     if Config.get([:instance, :skip_thread_containment]) do
367       true
368     else
369       ActivityPub.contain_activity(activity, user)
370     end
371   end
372
373   def close_streams_by_oauth_token(oauth_token) do
374     if should_env_send?() do
375       Registry.select(
376         @registry,
377         [
378           {
379             {:"$1", :"$2", :"$3"},
380             [{:==, :"$3", oauth_token.id}],
381             [:"$2"]
382           }
383         ]
384       )
385       |> Enum.each(fn pid -> send(pid, :close) end)
386     end
387   end
388
389   # In test environement, only return true if the registry is started.
390   # In benchmark environment, returns false.
391   # In any other environment, always returns true.
392   cond do
393     @mix_env == :test ->
394       def should_env_send? do
395         case Process.whereis(@registry) do
396           nil ->
397             false
398
399           pid ->
400             Process.alive?(pid)
401         end
402       end
403
404     @mix_env == :benchmark ->
405       def should_env_send?, do: false
406
407     true ->
408       def should_env_send?, do: true
409   end
410 end