1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.Streamer do
7 require Pleroma.Constants
10 alias Pleroma.Chat.MessageReference
12 alias Pleroma.Conversation.Participation
13 alias Pleroma.Notification
16 alias Pleroma.Web.ActivityPub.ActivityPub
17 alias Pleroma.Web.ActivityPub.Visibility
18 alias Pleroma.Web.CommonAPI
19 alias Pleroma.Web.OAuth.Token
20 alias Pleroma.Web.Plugs.OAuthScopesPlug
21 alias Pleroma.Web.StreamerView
23 @registry Pleroma.Web.StreamerRegistry
25 def registry, do: @registry
27 @public_streams Pleroma.Constants.public_streams()
28 @local_streams ["public:local", "public:local:media"]
29 @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
31 @doc "Expands and authorizes a stream, and registers the process for streaming."
32 @spec get_topic_and_add_socket(
38 {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
39 def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
40 with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do
41 add_socket(topic, oauth_token)
45 defp can_access_stream(user, oauth_token, kind) do
46 with {_, true} <- {:restrict?, Config.restrict_unauthenticated_access?(:timelines, kind)},
47 {_, %User{id: user_id}, %Token{user_id: user_id}} <- {:user, user, oauth_token},
50 OAuthScopesPlug.filter_descendants(["read:statuses"], oauth_token.scopes) != []} do
61 @doc "Expand and authorizes a stream"
62 @spec get_topic(stream :: String.t() | nil, User.t() | nil, Token.t() | nil, map()) ::
63 {:ok, topic :: String.t() | nil} | {:error, :bad_topic}
64 def get_topic(stream, user, oauth_token, params \\ %{})
66 def get_topic(nil = _stream, _user, _oauth_token, _params) do
70 # Allow all public steams if the instance allows unauthenticated access.
71 # Otherwise, only allow users with valid oauth tokens.
72 def get_topic(stream, user, oauth_token, _params) when stream in @public_streams do
73 kind = if stream in @local_streams, do: :local, else: :federated
75 if can_access_stream(user, oauth_token, kind) do
78 {:error, :unauthorized}
82 # Allow all hashtags streams.
83 def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
84 {:ok, "hashtag:" <> tag}
87 # Allow remote instance streams.
88 def get_topic("public:remote", user, oauth_token, %{"instance" => instance} = _params) do
89 if can_access_stream(user, oauth_token, :federated) do
90 {:ok, "public:remote:" <> instance}
92 {:error, :unauthorized}
96 def get_topic("public:remote:media", user, oauth_token, %{"instance" => instance} = _params) do
97 if can_access_stream(user, oauth_token, :federated) do
98 {:ok, "public:remote:media:" <> instance}
100 {:error, :unauthorized}
104 # Expand user streams.
107 %User{id: user_id} = user,
108 %Token{user_id: user_id} = oauth_token,
111 when stream in @user_streams do
112 # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
114 if stream == "user:notification" do
115 ["read:notifications"]
120 if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
121 {:error, :unauthorized}
123 {:ok, stream <> ":" <> to_string(user.id)}
127 def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
128 {:error, :unauthorized}
134 %User{id: user_id} = user,
135 %Token{user_id: user_id} = oauth_token,
139 OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
140 {:error, :unauthorized}
142 Pleroma.List.get(id, user) ->
143 {:ok, "list:" <> to_string(id)}
150 def get_topic("list", _user, _oauth_token, _params) do
151 {:error, :unauthorized}
154 def get_topic(_stream, _user, _oauth_token, _params) do
158 @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
159 def add_socket(topic, oauth_token) do
160 if should_env_send?() do
161 oauth_token_id = if oauth_token, do: oauth_token.id, else: false
162 Registry.register(@registry, topic, oauth_token_id)
168 def remove_socket(topic) do
169 if should_env_send?(), do: Registry.unregister(@registry, topic)
172 def stream(topics, items) do
173 if should_env_send?() do
174 for topic <- List.wrap(topics), item <- List.wrap(items) do
175 spawn(fn -> do_stream(topic, item) end)
180 def filtered_by_user?(user, item, streamed_type \\ :activity)
182 def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
183 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
184 User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
186 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
187 recipients = MapSet.new(item.recipients)
188 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
190 with parent <- Object.normalize(item, fetch: false) || item,
191 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
192 true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
194 !(streamed_type == :activity && item.data["type"] == "Announce" &&
195 parent.data["actor"] == user.ap_id),
196 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
197 true <- MapSet.disjoint?(recipients, recipient_blocks),
198 %{host: item_host} <- URI.parse(item.actor),
199 %{host: parent_host} <- URI.parse(parent.data["actor"]),
200 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
201 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
202 true <- thread_containment(item, user),
203 false <- CommonAPI.thread_muted?(user, parent) do
210 def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do
211 filtered_by_user?(user, activity, :notification)
214 defp do_stream("direct", item) do
216 User.get_recipients_from_activity(item)
217 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
219 Enum.each(recipient_topics, fn user_topic ->
220 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
221 push_to_socket(user_topic, item)
225 defp do_stream("follow_relationship", item) do
226 user_topic = "user:#{item.follower.id}"
227 text = StreamerView.render("follow_relationships_update.json", item, user_topic)
229 Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n")
231 Registry.dispatch(@registry, user_topic, fn list ->
232 Enum.each(list, fn {pid, _auth} ->
233 send(pid, {:text, text})
238 defp do_stream("participation", participation) do
239 user_topic = "direct:#{participation.user_id}"
240 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
242 push_to_socket(user_topic, participation)
245 defp do_stream("list", item) do
246 # filter the recipient list if the activity is not public, see #270.
248 case Visibility.public?(item) do
250 Pleroma.List.get_lists_from_activity(item)
253 Pleroma.List.get_lists_from_activity(item)
254 |> Enum.filter(fn list ->
255 owner = User.get_cached_by_id(list.user_id)
257 Visibility.visible_for_user?(item, owner)
263 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
265 Enum.each(recipient_topics, fn list_topic ->
266 Logger.debug("Trying to push message to #{list_topic}\n\n")
267 push_to_socket(list_topic, item)
271 defp do_stream(topic, %Notification{} = item)
272 when topic in ["user", "user:notification"] do
273 user_topic = "#{topic}:#{item.user_id}"
275 Registry.dispatch(@registry, user_topic, fn list ->
276 Enum.each(list, fn {pid, _auth} ->
277 send(pid, {:render_with_user, StreamerView, "notification.json", item, user_topic})
282 defp do_stream(topic, {user, %MessageReference{} = cm_ref})
283 when topic in ["user", "user:pleroma_chat"] do
284 topic = "#{topic}:#{user.id}"
286 text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}, topic)
288 Registry.dispatch(@registry, topic, fn list ->
289 Enum.each(list, fn {pid, _auth} ->
290 send(pid, {:text, text})
295 defp do_stream("user", item) do
296 Logger.debug("Trying to push to users")
299 User.get_recipients_from_activity(item)
300 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
302 Enum.each(recipient_topics, fn topic ->
303 push_to_socket(topic, item)
307 defp do_stream(topic, item) do
308 Logger.debug("Trying to push to #{topic}")
309 Logger.debug("Pushing item to #{topic}")
310 push_to_socket(topic, item)
313 defp push_to_socket(topic, %Participation{} = participation) do
314 rendered = StreamerView.render("conversation.json", participation, topic)
316 Registry.dispatch(@registry, topic, fn list ->
317 Enum.each(list, fn {pid, _} ->
318 send(pid, {:text, rendered})
323 defp push_to_socket(topic, %Activity{
324 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
326 rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
328 Registry.dispatch(@registry, topic, fn list ->
329 Enum.each(list, fn {pid, _} ->
330 send(pid, {:text, rendered})
335 defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
337 defp push_to_socket(topic, %Activity{data: %{"type" => "Update"}} = item) do
339 Pleroma.Activity.get_create_by_object_ap_id(item.object.data["id"])
340 |> Map.put(:object, item.object)
342 anon_render = StreamerView.render("status_update.json", create_activity, topic)
344 Registry.dispatch(@registry, topic, fn list ->
345 Enum.each(list, fn {pid, auth?} ->
349 {:render_with_user, StreamerView, "status_update.json", create_activity, topic}
352 send(pid, {:text, anon_render})
358 defp push_to_socket(topic, item) do
359 anon_render = StreamerView.render("update.json", item, topic)
361 Registry.dispatch(@registry, topic, fn list ->
362 Enum.each(list, fn {pid, auth?} ->
364 send(pid, {:render_with_user, StreamerView, "update.json", item, topic})
366 send(pid, {:text, anon_render})
372 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
374 defp thread_containment(activity, user) do
375 if Config.get([:instance, :skip_thread_containment]) do
378 ActivityPub.contain_activity(activity, user)
382 def close_streams_by_oauth_token(oauth_token) do
383 if should_env_send?() do
388 {:"$1", :"$2", :"$3"},
389 [{:==, :"$3", oauth_token.id}],
394 |> Enum.each(fn pid -> send(pid, :close) end)
398 # In dev/prod the streamer registry is expected to be started, so return true
399 # In test it is possible to have the registry started for a test so it will check
400 # In benchmark it will never find the process alive and return false
401 def should_env_send? do
402 if Application.get_env(:pleroma, Pleroma.Application)[:streamer_registry] do
405 case Process.whereis(@registry) do