1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.Streamer do
9 alias Pleroma.Chat.MessageReference
11 alias Pleroma.Conversation.Participation
12 alias Pleroma.Notification
15 alias Pleroma.Web.ActivityPub.ActivityPub
16 alias Pleroma.Web.ActivityPub.Visibility
17 alias Pleroma.Web.CommonAPI
18 alias Pleroma.Web.OAuth.Token
19 alias Pleroma.Web.Plugs.OAuthScopesPlug
20 alias Pleroma.Web.StreamerView
23 @registry Pleroma.Web.StreamerRegistry
25 def registry, do: @registry
27 @public_streams ["public", "public:local", "public:media", "public:local:media"]
28 @local_streams ["public:local", "public:local:media"]
29 @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
31 @doc "Expands and authorizes a stream, and registers the process for streaming."
32 @spec get_topic_and_add_socket(
38 {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
39 def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
40 with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do
41 add_socket(topic, oauth_token)
45 defp can_access_stream(user, oauth_token, kind) do
46 with {_, true} <- {:restrict?, Config.restrict_unauthenticated_access?(:timelines, kind)},
47 {_, %User{id: user_id}, %Token{user_id: user_id}} <- {:user, user, oauth_token},
50 OAuthScopesPlug.filter_descendants(["read:statuses"], oauth_token.scopes) != []} do
61 @doc "Expand and authorizes a stream"
62 @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
63 {:ok, topic :: String.t()} | {:error, :bad_topic}
64 def get_topic(stream, user, oauth_token, params \\ %{})
66 # Allow all public steams if the instance allows unauthenticated access.
67 # Otherwise, only allow users with valid oauth tokens.
68 def get_topic(stream, user, oauth_token, _params) when stream in @public_streams do
69 kind = if stream in @local_streams, do: :local, else: :federated
71 if can_access_stream(user, oauth_token, kind) do
74 {:error, :unauthorized}
78 # Allow all hashtags streams.
79 def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
80 {:ok, "hashtag:" <> tag}
83 # Allow remote instance streams.
84 def get_topic("public:remote", user, oauth_token, %{"instance" => instance} = _params) do
85 if can_access_stream(user, oauth_token, :federated) do
86 {:ok, "public:remote:" <> instance}
88 {:error, :unauthorized}
92 def get_topic("public:remote:media", user, oauth_token, %{"instance" => instance} = _params) do
93 if can_access_stream(user, oauth_token, :federated) do
94 {:ok, "public:remote:media:" <> instance}
96 {:error, :unauthorized}
100 # Expand user streams.
103 %User{id: user_id} = user,
104 %Token{user_id: user_id} = oauth_token,
107 when stream in @user_streams do
108 # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
110 if stream == "user:notification" do
111 ["read:notifications"]
116 if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
117 {:error, :unauthorized}
119 {:ok, stream <> ":" <> to_string(user.id)}
123 def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
124 {:error, :unauthorized}
130 %User{id: user_id} = user,
131 %Token{user_id: user_id} = oauth_token,
135 OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
136 {:error, :unauthorized}
138 Pleroma.List.get(id, user) ->
139 {:ok, "list:" <> to_string(id)}
146 def get_topic("list", _user, _oauth_token, _params) do
147 {:error, :unauthorized}
150 def get_topic(_stream, _user, _oauth_token, _params) do
154 @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
155 def add_socket(topic, oauth_token) do
156 if should_env_send?() do
157 oauth_token_id = if oauth_token, do: oauth_token.id, else: false
158 Registry.register(@registry, topic, oauth_token_id)
164 def remove_socket(topic) do
165 if should_env_send?(), do: Registry.unregister(@registry, topic)
168 def stream(topics, items) do
169 if should_env_send?() do
170 for topic <- List.wrap(topics), item <- List.wrap(items) do
171 spawn(fn -> do_stream(topic, item) end)
176 def filtered_by_user?(user, item, streamed_type \\ :activity)
178 def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
179 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
180 User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
182 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
183 recipients = MapSet.new(item.recipients)
184 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
186 with parent <- Object.normalize(item, fetch: false) || item,
187 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
188 true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
190 !(streamed_type == :activity && item.data["type"] == "Announce" &&
191 parent.data["actor"] == user.ap_id),
192 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
193 true <- MapSet.disjoint?(recipients, recipient_blocks),
194 %{host: item_host} <- URI.parse(item.actor),
195 %{host: parent_host} <- URI.parse(parent.data["actor"]),
196 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
197 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
198 true <- thread_containment(item, user),
199 false <- CommonAPI.thread_muted?(user, parent) do
206 def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do
207 filtered_by_user?(user, activity, :notification)
210 defp do_stream("direct", item) do
212 User.get_recipients_from_activity(item)
213 |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
215 Enum.each(recipient_topics, fn user_topic ->
216 Logger.debug("Trying to push direct message to #{user_topic}\n\n")
217 push_to_socket(user_topic, item)
221 defp do_stream("follow_relationship", item) do
222 text = StreamerView.render("follow_relationships_update.json", item)
223 user_topic = "user:#{item.follower.id}"
225 Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n")
227 Registry.dispatch(@registry, user_topic, fn list ->
228 Enum.each(list, fn {pid, _auth} ->
229 send(pid, {:text, text})
234 defp do_stream("participation", participation) do
235 user_topic = "direct:#{participation.user_id}"
236 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
238 push_to_socket(user_topic, participation)
241 defp do_stream("list", item) do
242 # filter the recipient list if the activity is not public, see #270.
244 case Visibility.is_public?(item) do
246 Pleroma.List.get_lists_from_activity(item)
249 Pleroma.List.get_lists_from_activity(item)
250 |> Enum.filter(fn list ->
251 owner = User.get_cached_by_id(list.user_id)
253 Visibility.visible_for_user?(item, owner)
259 |> Enum.map(fn %{id: id} -> "list:#{id}" end)
261 Enum.each(recipient_topics, fn list_topic ->
262 Logger.debug("Trying to push message to #{list_topic}\n\n")
263 push_to_socket(list_topic, item)
267 defp do_stream(topic, %Notification{} = item)
268 when topic in ["user", "user:notification"] do
269 Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
270 Enum.each(list, fn {pid, _auth} ->
271 send(pid, {:render_with_user, StreamerView, "notification.json", item})
276 defp do_stream(topic, {user, %MessageReference{} = cm_ref})
277 when topic in ["user", "user:pleroma_chat"] do
278 topic = "#{topic}:#{user.id}"
280 text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
282 Registry.dispatch(@registry, topic, fn list ->
283 Enum.each(list, fn {pid, _auth} ->
284 send(pid, {:text, text})
289 defp do_stream("user", item) do
290 Logger.debug("Trying to push to users")
293 User.get_recipients_from_activity(item)
294 |> Enum.map(fn %{id: id} -> "user:#{id}" end)
296 Enum.each(recipient_topics, fn topic ->
297 push_to_socket(topic, item)
301 defp do_stream(topic, item) do
302 Logger.debug("Trying to push to #{topic}")
303 Logger.debug("Pushing item to #{topic}")
304 push_to_socket(topic, item)
307 defp push_to_socket(topic, %Participation{} = participation) do
308 rendered = StreamerView.render("conversation.json", participation)
310 Registry.dispatch(@registry, topic, fn list ->
311 Enum.each(list, fn {pid, _} ->
312 send(pid, {:text, rendered})
317 defp push_to_socket(topic, %Activity{
318 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
320 rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
322 Registry.dispatch(@registry, topic, fn list ->
323 Enum.each(list, fn {pid, _} ->
324 send(pid, {:text, rendered})
329 defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
331 defp push_to_socket(topic, %Activity{data: %{"type" => "Update"}} = item) do
333 Pleroma.Activity.get_create_by_object_ap_id(item.object.data["id"])
334 |> Map.put(:object, item.object)
336 anon_render = StreamerView.render("status_update.json", create_activity)
338 Registry.dispatch(@registry, topic, fn list ->
339 Enum.each(list, fn {pid, auth?} ->
341 send(pid, {:render_with_user, StreamerView, "status_update.json", create_activity})
343 send(pid, {:text, anon_render})
349 defp push_to_socket(topic, item) do
350 anon_render = StreamerView.render("update.json", item)
352 Registry.dispatch(@registry, topic, fn list ->
353 Enum.each(list, fn {pid, auth?} ->
355 send(pid, {:render_with_user, StreamerView, "update.json", item})
357 send(pid, {:text, anon_render})
363 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
365 defp thread_containment(activity, user) do
366 if Config.get([:instance, :skip_thread_containment]) do
369 ActivityPub.contain_activity(activity, user)
373 def close_streams_by_oauth_token(oauth_token) do
374 if should_env_send?() do
379 {:"$1", :"$2", :"$3"},
380 [{:==, :"$3", oauth_token.id}],
385 |> Enum.each(fn pid -> send(pid, :close) end)
389 # In test environement, only return true if the registry is started.
390 # In benchmark environment, returns false.
391 # In any other environment, always returns true.
394 def should_env_send? do
395 case Process.whereis(@registry) do
404 @mix_env == :benchmark ->
405 def should_env_send?, do: false
408 def should_env_send?, do: true