diff options
Diffstat (limited to 'lib/pleroma/web/streamer.ex')
| -rw-r--r--[-rwxr-xr-x] | lib/pleroma/web/streamer.ex | 70 |
1 files changed, 37 insertions, 33 deletions
diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex index b9a04cc..7065fda 100755..100644 --- a/lib/pleroma/web/streamer.ex +++ b/lib/pleroma/web/streamer.ex @@ -4,6 +4,7 @@ defmodule Pleroma.Web.Streamer do require Logger + require Pleroma.Constants alias Pleroma.Activity alias Pleroma.Chat.MessageReference @@ -19,12 +20,11 @@ defmodule Pleroma.Web.Streamer do alias Pleroma.Web.Plugs.OAuthScopesPlug alias Pleroma.Web.StreamerView - @mix_env Mix.env() @registry Pleroma.Web.StreamerRegistry def registry, do: @registry - @public_streams ["public", "public:local", "public:media", "public:local:media"] + @public_streams Pleroma.Constants.public_streams() @local_streams ["public:local", "public:local:media"] @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"] @@ -33,7 +33,7 @@ defmodule Pleroma.Web.Streamer do stream :: String.t(), User.t() | nil, Token.t() | nil, - Map.t() | nil + map() | nil ) :: {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized} def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do @@ -59,10 +59,14 @@ defmodule Pleroma.Web.Streamer do end @doc "Expand and authorizes a stream" - @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) :: - {:ok, topic :: String.t()} | {:error, :bad_topic} + @spec get_topic(stream :: String.t() | nil, User.t() | nil, Token.t() | nil, map()) :: + {:ok, topic :: String.t() | nil} | {:error, :bad_topic} def get_topic(stream, user, oauth_token, params \\ %{}) + def get_topic(nil = _stream, _user, _oauth_token, _params) do + {:ok, nil} + end + # Allow all public steams if the instance allows unauthenticated access. # Otherwise, only allow users with valid oauth tokens. def get_topic(stream, user, oauth_token, _params) when stream in @public_streams do @@ -219,8 +223,8 @@ defmodule Pleroma.Web.Streamer do end defp do_stream("follow_relationship", item) do - text = StreamerView.render("follow_relationships_update.json", item) user_topic = "user:#{item.follower.id}" + text = StreamerView.render("follow_relationships_update.json", item, user_topic) Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n") @@ -241,7 +245,7 @@ defmodule Pleroma.Web.Streamer do defp do_stream("list", item) do # filter the recipient list if the activity is not public, see #270. recipient_lists = - case Visibility.is_public?(item) do + case Visibility.public?(item) do true -> Pleroma.List.get_lists_from_activity(item) @@ -266,9 +270,11 @@ defmodule Pleroma.Web.Streamer do defp do_stream(topic, %Notification{} = item) when topic in ["user", "user:notification"] do - Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list -> + user_topic = "#{topic}:#{item.user_id}" + + Registry.dispatch(@registry, user_topic, fn list -> Enum.each(list, fn {pid, _auth} -> - send(pid, {:render_with_user, StreamerView, "notification.json", item}) + send(pid, {:render_with_user, StreamerView, "notification.json", item, user_topic}) end) end) end @@ -277,7 +283,7 @@ defmodule Pleroma.Web.Streamer do when topic in ["user", "user:pleroma_chat"] do topic = "#{topic}:#{user.id}" - text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}) + text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}, topic) Registry.dispatch(@registry, topic, fn list -> Enum.each(list, fn {pid, _auth} -> @@ -305,7 +311,7 @@ defmodule Pleroma.Web.Streamer do end defp push_to_socket(topic, %Participation{} = participation) do - rendered = StreamerView.render("conversation.json", participation) + rendered = StreamerView.render("conversation.json", participation, topic) Registry.dispatch(@registry, topic, fn list -> Enum.each(list, fn {pid, _} -> @@ -333,12 +339,15 @@ defmodule Pleroma.Web.Streamer do Pleroma.Activity.get_create_by_object_ap_id(item.object.data["id"]) |> Map.put(:object, item.object) - anon_render = StreamerView.render("status_update.json", create_activity) + anon_render = StreamerView.render("status_update.json", create_activity, topic) Registry.dispatch(@registry, topic, fn list -> Enum.each(list, fn {pid, auth?} -> if auth? do - send(pid, {:render_with_user, StreamerView, "status_update.json", create_activity}) + send( + pid, + {:render_with_user, StreamerView, "status_update.json", create_activity, topic} + ) else send(pid, {:text, anon_render}) end @@ -347,12 +356,12 @@ defmodule Pleroma.Web.Streamer do end defp push_to_socket(topic, item) do - anon_render = StreamerView.render("update.json", item) + anon_render = StreamerView.render("update.json", item, topic) Registry.dispatch(@registry, topic, fn list -> Enum.each(list, fn {pid, auth?} -> if auth? do - send(pid, {:render_with_user, StreamerView, "update.json", item}) + send(pid, {:render_with_user, StreamerView, "update.json", item, topic}) else send(pid, {:text, anon_render}) end @@ -386,25 +395,20 @@ defmodule Pleroma.Web.Streamer do end end - # In test environement, only return true if the registry is started. - # In benchmark environment, returns false. - # In any other environment, always returns true. - cond do - @mix_env == :test -> - def should_env_send? do - case Process.whereis(@registry) do - nil -> - false + # In dev/prod the streamer registry is expected to be started, so return true + # In test it is possible to have the registry started for a test so it will check + # In benchmark it will never find the process alive and return false + def should_env_send? do + if Application.get_env(:pleroma, Pleroma.Application)[:streamer_registry] do + true + else + case Process.whereis(@registry) do + nil -> + false - pid -> - Process.alive?(pid) - end + pid -> + Process.alive?(pid) end - - @mix_env == :benchmark -> - def should_env_send?, do: false - - true -> - def should_env_send?, do: true + end end end |
