total rebase
[anni] / lib / pleroma / web / streamer.ex
old mode 100755 (executable)
new mode 100644 (file)
index b9a04cc..7065fda
@@ -4,6 +4,7 @@
 
 defmodule Pleroma.Web.Streamer do
   require Logger
+  require Pleroma.Constants
 
   alias Pleroma.Activity
   alias Pleroma.Chat.MessageReference
@@ -19,12 +20,11 @@ defmodule Pleroma.Web.Streamer do
   alias Pleroma.Web.Plugs.OAuthScopesPlug
   alias Pleroma.Web.StreamerView
 
-  @mix_env Mix.env()
   @registry Pleroma.Web.StreamerRegistry
 
   def registry, do: @registry
 
-  @public_streams ["public", "public:local", "public:media", "public:local:media"]
+  @public_streams Pleroma.Constants.public_streams()
   @local_streams ["public:local", "public:local:media"]
   @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
 
@@ -33,7 +33,7 @@ defmodule Pleroma.Web.Streamer do
           stream :: String.t(),
           User.t() | nil,
           Token.t() | nil,
-          Map.t() | nil
+          map() | nil
         ) ::
           {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
   def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
@@ -59,10 +59,14 @@ defmodule Pleroma.Web.Streamer do
   end
 
   @doc "Expand and authorizes a stream"
-  @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
-          {:ok, topic :: String.t()} | {:error, :bad_topic}
+  @spec get_topic(stream :: String.t() | nil, User.t() | nil, Token.t() | nil, map()) ::
+          {:ok, topic :: String.t() | nil} | {:error, :bad_topic}
   def get_topic(stream, user, oauth_token, params \\ %{})
 
+  def get_topic(nil = _stream, _user, _oauth_token, _params) do
+    {:ok, nil}
+  end
+
   # Allow all public steams if the instance allows unauthenticated access.
   # Otherwise, only allow users with valid oauth tokens.
   def get_topic(stream, user, oauth_token, _params) when stream in @public_streams do
@@ -219,8 +223,8 @@ defmodule Pleroma.Web.Streamer do
   end
 
   defp do_stream("follow_relationship", item) do
-    text = StreamerView.render("follow_relationships_update.json", item)
     user_topic = "user:#{item.follower.id}"
+    text = StreamerView.render("follow_relationships_update.json", item, user_topic)
 
     Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n")
 
@@ -241,7 +245,7 @@ defmodule Pleroma.Web.Streamer do
   defp do_stream("list", item) do
     # filter the recipient list if the activity is not public, see #270.
     recipient_lists =
-      case Visibility.is_public?(item) do
+      case Visibility.public?(item) do
         true ->
           Pleroma.List.get_lists_from_activity(item)
 
@@ -266,9 +270,11 @@ defmodule Pleroma.Web.Streamer do
 
   defp do_stream(topic, %Notification{} = item)
        when topic in ["user", "user:notification"] do
-    Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
+    user_topic = "#{topic}:#{item.user_id}"
+
+    Registry.dispatch(@registry, user_topic, fn list ->
       Enum.each(list, fn {pid, _auth} ->
-        send(pid, {:render_with_user, StreamerView, "notification.json", item})
+        send(pid, {:render_with_user, StreamerView, "notification.json", item, user_topic})
       end)
     end)
   end
@@ -277,7 +283,7 @@ defmodule Pleroma.Web.Streamer do
        when topic in ["user", "user:pleroma_chat"] do
     topic = "#{topic}:#{user.id}"
 
-    text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
+    text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}, topic)
 
     Registry.dispatch(@registry, topic, fn list ->
       Enum.each(list, fn {pid, _auth} ->
@@ -305,7 +311,7 @@ defmodule Pleroma.Web.Streamer do
   end
 
   defp push_to_socket(topic, %Participation{} = participation) do
-    rendered = StreamerView.render("conversation.json", participation)
+    rendered = StreamerView.render("conversation.json", participation, topic)
 
     Registry.dispatch(@registry, topic, fn list ->
       Enum.each(list, fn {pid, _} ->
@@ -333,12 +339,15 @@ defmodule Pleroma.Web.Streamer do
       Pleroma.Activity.get_create_by_object_ap_id(item.object.data["id"])
       |> Map.put(:object, item.object)
 
-    anon_render = StreamerView.render("status_update.json", create_activity)
+    anon_render = StreamerView.render("status_update.json", create_activity, topic)
 
     Registry.dispatch(@registry, topic, fn list ->
       Enum.each(list, fn {pid, auth?} ->
         if auth? do
-          send(pid, {:render_with_user, StreamerView, "status_update.json", create_activity})
+          send(
+            pid,
+            {:render_with_user, StreamerView, "status_update.json", create_activity, topic}
+          )
         else
           send(pid, {:text, anon_render})
         end
@@ -347,12 +356,12 @@ defmodule Pleroma.Web.Streamer do
   end
 
   defp push_to_socket(topic, item) do
-    anon_render = StreamerView.render("update.json", item)
+    anon_render = StreamerView.render("update.json", item, topic)
 
     Registry.dispatch(@registry, topic, fn list ->
       Enum.each(list, fn {pid, auth?} ->
         if auth? do
-          send(pid, {:render_with_user, StreamerView, "update.json", item})
+          send(pid, {:render_with_user, StreamerView, "update.json", item, topic})
         else
           send(pid, {:text, anon_render})
         end
@@ -386,25 +395,20 @@ defmodule Pleroma.Web.Streamer do
     end
   end
 
-  # In test environement, only return true if the registry is started.
-  # In benchmark environment, returns false.
-  # In any other environment, always returns true.
-  cond do
-    @mix_env == :test ->
-      def should_env_send? do
-        case Process.whereis(@registry) do
-          nil ->
-            false
+  # In dev/prod the streamer registry is expected to be started, so return true
+  # In test it is possible to have the registry started for a test so it will check
+  # In benchmark it will never find the process alive and return false
+  def should_env_send? do
+    if Application.get_env(:pleroma, Pleroma.Application)[:streamer_registry] do
+      true
+    else
+      case Process.whereis(@registry) do
+        nil ->
+          false
 
-          pid ->
-            Process.alive?(pid)
-        end
+        pid ->
+          Process.alive?(pid)
       end
-
-    @mix_env == :benchmark ->
-      def should_env_send?, do: false
-
-    true ->
-      def should_env_send?, do: true
+    end
   end
 end