total rebase
[anni] / lib / pleroma / web / streamer.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Streamer do
6   require Logger
7   require Pleroma.Constants
8
9   alias Pleroma.Activity
10   alias Pleroma.Chat.MessageReference
11   alias Pleroma.Config
12   alias Pleroma.Conversation.Participation
13   alias Pleroma.Notification
14   alias Pleroma.Object
15   alias Pleroma.User
16   alias Pleroma.Web.ActivityPub.ActivityPub
17   alias Pleroma.Web.ActivityPub.Visibility
18   alias Pleroma.Web.CommonAPI
19   alias Pleroma.Web.OAuth.Token
20   alias Pleroma.Web.Plugs.OAuthScopesPlug
21   alias Pleroma.Web.StreamerView
22
23   @registry Pleroma.Web.StreamerRegistry
24
25   def registry, do: @registry
26
27   @public_streams Pleroma.Constants.public_streams()
28   @local_streams ["public:local", "public:local:media"]
29   @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
30
31   @doc "Expands and authorizes a stream, and registers the process for streaming."
32   @spec get_topic_and_add_socket(
33           stream :: String.t(),
34           User.t() | nil,
35           Token.t() | nil,
36           map() | nil
37         ) ::
38           {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
39   def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
40     with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do
41       add_socket(topic, oauth_token)
42     end
43   end
44
45   defp can_access_stream(user, oauth_token, kind) do
46     with {_, true} <- {:restrict?, Config.restrict_unauthenticated_access?(:timelines, kind)},
47          {_, %User{id: user_id}, %Token{user_id: user_id}} <- {:user, user, oauth_token},
48          {_, true} <-
49            {:scopes,
50             OAuthScopesPlug.filter_descendants(["read:statuses"], oauth_token.scopes) != []} do
51       true
52     else
53       {:restrict?, _} ->
54         true
55
56       _ ->
57         false
58     end
59   end
60
61   @doc "Expand and authorizes a stream"
62   @spec get_topic(stream :: String.t() | nil, User.t() | nil, Token.t() | nil, map()) ::
63           {:ok, topic :: String.t() | nil} | {:error, :bad_topic}
64   def get_topic(stream, user, oauth_token, params \\ %{})
65
66   def get_topic(nil = _stream, _user, _oauth_token, _params) do
67     {:ok, nil}
68   end
69
70   # Allow all public steams if the instance allows unauthenticated access.
71   # Otherwise, only allow users with valid oauth tokens.
72   def get_topic(stream, user, oauth_token, _params) when stream in @public_streams do
73     kind = if stream in @local_streams, do: :local, else: :federated
74
75     if can_access_stream(user, oauth_token, kind) do
76       {:ok, stream}
77     else
78       {:error, :unauthorized}
79     end
80   end
81
82   # Allow all hashtags streams.
83   def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
84     {:ok, "hashtag:" <> tag}
85   end
86
87   # Allow remote instance streams.
88   def get_topic("public:remote", user, oauth_token, %{"instance" => instance} = _params) do
89     if can_access_stream(user, oauth_token, :federated) do
90       {:ok, "public:remote:" <> instance}
91     else
92       {:error, :unauthorized}
93     end
94   end
95
96   def get_topic("public:remote:media", user, oauth_token, %{"instance" => instance} = _params) do
97     if can_access_stream(user, oauth_token, :federated) do
98       {:ok, "public:remote:media:" <> instance}
99     else
100       {:error, :unauthorized}
101     end
102   end
103
104   # Expand user streams.
105   def get_topic(
106         stream,
107         %User{id: user_id} = user,
108         %Token{user_id: user_id} = oauth_token,
109         _params
110       )
111       when stream in @user_streams do
112     # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
113     required_scopes =
114       if stream == "user:notification" do
115         ["read:notifications"]
116       else
117         ["read:statuses"]
118       end
119
120     if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
121       {:error, :unauthorized}
122     else
123       {:ok, stream <> ":" <> to_string(user.id)}
124     end
125   end
126
127   def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
128     {:error, :unauthorized}
129   end
130
131   # List streams.
132   def get_topic(
133         "list",
134         %User{id: user_id} = user,
135         %Token{user_id: user_id} = oauth_token,
136         %{"list" => id}
137       ) do
138     cond do
139       OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
140         {:error, :unauthorized}
141
142       Pleroma.List.get(id, user) ->
143         {:ok, "list:" <> to_string(id)}
144
145       true ->
146         {:error, :bad_topic}
147     end
148   end
149
150   def get_topic("list", _user, _oauth_token, _params) do
151     {:error, :unauthorized}
152   end
153
154   def get_topic(_stream, _user, _oauth_token, _params) do
155     {:error, :bad_topic}
156   end
157
158   @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
159   def add_socket(topic, oauth_token) do
160     if should_env_send?() do
161       oauth_token_id = if oauth_token, do: oauth_token.id, else: false
162       Registry.register(@registry, topic, oauth_token_id)
163     end
164
165     {:ok, topic}
166   end
167
168   def remove_socket(topic) do
169     if should_env_send?(), do: Registry.unregister(@registry, topic)
170   end
171
172   def stream(topics, items) do
173     if should_env_send?() do
174       for topic <- List.wrap(topics), item <- List.wrap(items) do
175         spawn(fn -> do_stream(topic, item) end)
176       end
177     end
178   end
179
180   def filtered_by_user?(user, item, streamed_type \\ :activity)
181
182   def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
183     %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
184       User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
185
186     recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
187     recipients = MapSet.new(item.recipients)
188     domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
189
190     with parent <- Object.normalize(item, fetch: false) || item,
191          true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
192          true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
193          true <-
194            !(streamed_type == :activity && item.data["type"] == "Announce" &&
195                parent.data["actor"] == user.ap_id),
196          true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
197          true <- MapSet.disjoint?(recipients, recipient_blocks),
198          %{host: item_host} <- URI.parse(item.actor),
199          %{host: parent_host} <- URI.parse(parent.data["actor"]),
200          false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
201          false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
202          true <- thread_containment(item, user),
203          false <- CommonAPI.thread_muted?(user, parent) do
204       false
205     else
206       _ -> true
207     end
208   end
209
210   def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do
211     filtered_by_user?(user, activity, :notification)
212   end
213
214   defp do_stream("direct", item) do
215     recipient_topics =
216       User.get_recipients_from_activity(item)
217       |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
218
219     Enum.each(recipient_topics, fn user_topic ->
220       Logger.debug("Trying to push direct message to #{user_topic}\n\n")
221       push_to_socket(user_topic, item)
222     end)
223   end
224
225   defp do_stream("follow_relationship", item) do
226     user_topic = "user:#{item.follower.id}"
227     text = StreamerView.render("follow_relationships_update.json", item, user_topic)
228
229     Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n")
230
231     Registry.dispatch(@registry, user_topic, fn list ->
232       Enum.each(list, fn {pid, _auth} ->
233         send(pid, {:text, text})
234       end)
235     end)
236   end
237
238   defp do_stream("participation", participation) do
239     user_topic = "direct:#{participation.user_id}"
240     Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
241
242     push_to_socket(user_topic, participation)
243   end
244
245   defp do_stream("list", item) do
246     # filter the recipient list if the activity is not public, see #270.
247     recipient_lists =
248       case Visibility.public?(item) do
249         true ->
250           Pleroma.List.get_lists_from_activity(item)
251
252         _ ->
253           Pleroma.List.get_lists_from_activity(item)
254           |> Enum.filter(fn list ->
255             owner = User.get_cached_by_id(list.user_id)
256
257             Visibility.visible_for_user?(item, owner)
258           end)
259       end
260
261     recipient_topics =
262       recipient_lists
263       |> Enum.map(fn %{id: id} -> "list:#{id}" end)
264
265     Enum.each(recipient_topics, fn list_topic ->
266       Logger.debug("Trying to push message to #{list_topic}\n\n")
267       push_to_socket(list_topic, item)
268     end)
269   end
270
271   defp do_stream(topic, %Notification{} = item)
272        when topic in ["user", "user:notification"] do
273     user_topic = "#{topic}:#{item.user_id}"
274
275     Registry.dispatch(@registry, user_topic, fn list ->
276       Enum.each(list, fn {pid, _auth} ->
277         send(pid, {:render_with_user, StreamerView, "notification.json", item, user_topic})
278       end)
279     end)
280   end
281
282   defp do_stream(topic, {user, %MessageReference{} = cm_ref})
283        when topic in ["user", "user:pleroma_chat"] do
284     topic = "#{topic}:#{user.id}"
285
286     text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}, topic)
287
288     Registry.dispatch(@registry, topic, fn list ->
289       Enum.each(list, fn {pid, _auth} ->
290         send(pid, {:text, text})
291       end)
292     end)
293   end
294
295   defp do_stream("user", item) do
296     Logger.debug("Trying to push to users")
297
298     recipient_topics =
299       User.get_recipients_from_activity(item)
300       |> Enum.map(fn %{id: id} -> "user:#{id}" end)
301
302     Enum.each(recipient_topics, fn topic ->
303       push_to_socket(topic, item)
304     end)
305   end
306
307   defp do_stream(topic, item) do
308     Logger.debug("Trying to push to #{topic}")
309     Logger.debug("Pushing item to #{topic}")
310     push_to_socket(topic, item)
311   end
312
313   defp push_to_socket(topic, %Participation{} = participation) do
314     rendered = StreamerView.render("conversation.json", participation, topic)
315
316     Registry.dispatch(@registry, topic, fn list ->
317       Enum.each(list, fn {pid, _} ->
318         send(pid, {:text, rendered})
319       end)
320     end)
321   end
322
323   defp push_to_socket(topic, %Activity{
324          data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
325        }) do
326     rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
327
328     Registry.dispatch(@registry, topic, fn list ->
329       Enum.each(list, fn {pid, _} ->
330         send(pid, {:text, rendered})
331       end)
332     end)
333   end
334
335   defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
336
337   defp push_to_socket(topic, %Activity{data: %{"type" => "Update"}} = item) do
338     create_activity =
339       Pleroma.Activity.get_create_by_object_ap_id(item.object.data["id"])
340       |> Map.put(:object, item.object)
341
342     anon_render = StreamerView.render("status_update.json", create_activity, topic)
343
344     Registry.dispatch(@registry, topic, fn list ->
345       Enum.each(list, fn {pid, auth?} ->
346         if auth? do
347           send(
348             pid,
349             {:render_with_user, StreamerView, "status_update.json", create_activity, topic}
350           )
351         else
352           send(pid, {:text, anon_render})
353         end
354       end)
355     end)
356   end
357
358   defp push_to_socket(topic, item) do
359     anon_render = StreamerView.render("update.json", item, topic)
360
361     Registry.dispatch(@registry, topic, fn list ->
362       Enum.each(list, fn {pid, auth?} ->
363         if auth? do
364           send(pid, {:render_with_user, StreamerView, "update.json", item, topic})
365         else
366           send(pid, {:text, anon_render})
367         end
368       end)
369     end)
370   end
371
372   defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
373
374   defp thread_containment(activity, user) do
375     if Config.get([:instance, :skip_thread_containment]) do
376       true
377     else
378       ActivityPub.contain_activity(activity, user)
379     end
380   end
381
382   def close_streams_by_oauth_token(oauth_token) do
383     if should_env_send?() do
384       Registry.select(
385         @registry,
386         [
387           {
388             {:"$1", :"$2", :"$3"},
389             [{:==, :"$3", oauth_token.id}],
390             [:"$2"]
391           }
392         ]
393       )
394       |> Enum.each(fn pid -> send(pid, :close) end)
395     end
396   end
397
398   # In dev/prod the streamer registry is expected to be started, so return true
399   # In test it is possible to have the registry started for a test so it will check
400   # In benchmark it will never find the process alive and return false
401   def should_env_send? do
402     if Application.get_env(:pleroma, Pleroma.Application)[:streamer_registry] do
403       true
404     else
405       case Process.whereis(@registry) do
406         nil ->
407           false
408
409         pid ->
410           Process.alive?(pid)
411       end
412     end
413   end
414 end