1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Integration.MastodonWebsocketTest do
6 # Needs a streamer, needs to stay synchronous
9 import ExUnit.CaptureLog
10 import Pleroma.Factory
12 alias Pleroma.Integration.WebsocketClient
13 alias Pleroma.Web.CommonAPI
14 alias Pleroma.Web.OAuth
16 @moduletag needs_streamer: true, capture_log: true
18 @path Pleroma.Web.Endpoint.url()
20 |> Map.put(:scheme, "ws")
21 |> Map.put(:path, "/api/v1/streaming")
24 def start_socket(qs \\ nil, headers \\ []) do
31 WebsocketClient.start_link(self(), path, headers)
34 test "refuses invalid requests" do
36 assert {:error, %WebSockex.RequestError{code: 404}} = start_socket()
37 assert {:error, %WebSockex.RequestError{code: 404}} = start_socket("?stream=ncjdk")
42 test "requires authentication and a valid token for protected streams" do
44 assert {:error, %WebSockex.RequestError{code: 401}} =
45 start_socket("?stream=user&access_token=aaaaaaaaaaaa")
47 assert {:error, %WebSockex.RequestError{code: 401}} = start_socket("?stream=user")
52 test "allows public streams without authentication" do
53 assert {:ok, _} = start_socket("?stream=public")
54 assert {:ok, _} = start_socket("?stream=public:local")
55 assert {:ok, _} = start_socket("?stream=public:remote&instance=lain.com")
56 assert {:ok, _} = start_socket("?stream=hashtag&tag=lain")
59 test "receives well formatted events" do
61 {:ok, _} = start_socket("?stream=public")
62 {:ok, activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
64 assert_receive {:text, raw_json}, 1_000
65 assert {:ok, json} = Jason.decode(raw_json)
67 assert "update" == json["event"]
68 assert json["payload"]
69 assert {:ok, json} = Jason.decode(json["payload"])
72 Pleroma.Web.MastodonAPI.StatusView.render("show.json", activity: activity, for: nil)
76 assert json == view_json
79 describe "with a valid user token" do
83 OAuth.App.register_changeset(%OAuth.App{}, %{
84 client_name: "client",
92 {:ok, auth} = OAuth.Authorization.create_authorization(app, user)
94 {:ok, token} = OAuth.Token.exchange_token(app, auth)
96 %{app: app, user: user, token: token}
99 test "accepts valid tokens", state do
100 assert {:ok, _} = start_socket("?stream=user&access_token=#{state.token.token}")
103 test "accepts the 'user' stream", %{token: token} = _state do
104 assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
107 assert {:error, %WebSockex.RequestError{code: 401}} = start_socket("?stream=user")
112 test "accepts the 'user:notification' stream", %{token: token} = _state do
113 assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")
116 assert {:error, %WebSockex.RequestError{code: 401}} =
117 start_socket("?stream=user:notification")
123 test "accepts valid token on Sec-WebSocket-Protocol header", %{token: token} do
124 assert {:ok, _} = start_socket("?stream=user", [{"Sec-WebSocket-Protocol", token.token}])
127 assert {:error, %WebSockex.RequestError{code: 401}} =
128 start_socket("?stream=user", [{"Sec-WebSocket-Protocol", "I am a friend"}])
134 test "disconnect when token is revoked", %{app: app, user: user, token: token} do
135 assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")
136 assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
138 {:ok, auth} = OAuth.Authorization.create_authorization(app, user)
140 {:ok, token2} = OAuth.Token.exchange_token(app, auth)
141 assert {:ok, _} = start_socket("?stream=user&access_token=#{token2.token}")
143 OAuth.Token.Strategy.Revoke.revoke(token)
145 assert_receive {:close, _}
146 assert_receive {:close, _}
147 refute_receive {:close, _}