1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
5 defmodule Pleroma.Web.StreamerTest do
11 alias Pleroma.Chat.MessageReference
12 alias Pleroma.Conversation.Participation
16 alias Pleroma.Web.CommonAPI
17 alias Pleroma.Web.Streamer
18 alias Pleroma.Web.StreamerView
20 @moduletag needs_streamer: true, capture_log: true
22 setup do: clear_config([:instance, :skip_thread_containment])
24 describe "get_topic/_ (unauthenticated)" do
25 test "allows no stream" do
26 assert {:ok, nil} = Streamer.get_topic(nil, nil, nil)
29 test "allows public" do
30 assert {:ok, "public"} = Streamer.get_topic("public", nil, nil)
31 assert {:ok, "public:local"} = Streamer.get_topic("public:local", nil, nil)
32 assert {:ok, "public:media"} = Streamer.get_topic("public:media", nil, nil)
33 assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", nil, nil)
36 test "rejects local public streams if restricted_unauthenticated is on" do
37 clear_config([:restrict_unauthenticated, :timelines, :local], true)
39 assert {:error, :unauthorized} = Streamer.get_topic("public:local", nil, nil)
40 assert {:error, :unauthorized} = Streamer.get_topic("public:local:media", nil, nil)
43 test "rejects remote public streams if restricted_unauthenticated is on" do
44 clear_config([:restrict_unauthenticated, :timelines, :federated], true)
46 assert {:error, :unauthorized} = Streamer.get_topic("public", nil, nil)
47 assert {:error, :unauthorized} = Streamer.get_topic("public:media", nil, nil)
49 assert {:error, :unauthorized} =
50 Streamer.get_topic("public:remote", nil, nil, %{"instance" => "lain.com"})
52 assert {:error, :unauthorized} =
53 Streamer.get_topic("public:remote:media", nil, nil, %{"instance" => "lain.com"})
56 test "allows instance streams" do
57 assert {:ok, "public:remote:lain.com"} =
58 Streamer.get_topic("public:remote", nil, nil, %{"instance" => "lain.com"})
60 assert {:ok, "public:remote:media:lain.com"} =
61 Streamer.get_topic("public:remote:media", nil, nil, %{"instance" => "lain.com"})
64 test "allows hashtag streams" do
65 assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", nil, nil, %{"tag" => "cofe"})
68 test "disallows user streams" do
69 assert {:error, _} = Streamer.get_topic("user", nil, nil)
70 assert {:error, _} = Streamer.get_topic("user:notification", nil, nil)
71 assert {:error, _} = Streamer.get_topic("direct", nil, nil)
74 test "disallows list streams" do
75 assert {:error, _} = Streamer.get_topic("list", nil, nil, %{"list" => 42})
79 describe "get_topic/_ (authenticated)" do
80 setup do: oauth_access(["read"])
82 test "allows public streams (regardless of OAuth token scopes)", %{
84 token: read_oauth_token
86 with oauth_token <- [nil, read_oauth_token] do
87 assert {:ok, "public"} = Streamer.get_topic("public", user, oauth_token)
88 assert {:ok, "public:local"} = Streamer.get_topic("public:local", user, oauth_token)
89 assert {:ok, "public:media"} = Streamer.get_topic("public:media", user, oauth_token)
91 assert {:ok, "public:local:media"} =
92 Streamer.get_topic("public:local:media", user, oauth_token)
96 test "allows local public streams if restricted_unauthenticated is on", %{
100 clear_config([:restrict_unauthenticated, :timelines, :local], true)
102 %{token: read_notifications_token} = oauth_access(["read:notifications"], user: user)
103 %{token: badly_scoped_token} = oauth_access(["irrelevant:scope"], user: user)
105 assert {:ok, "public:local"} = Streamer.get_topic("public:local", user, oauth_token)
107 assert {:ok, "public:local:media"} =
108 Streamer.get_topic("public:local:media", user, oauth_token)
110 for token <- [read_notifications_token, badly_scoped_token] do
111 assert {:error, :unauthorized} = Streamer.get_topic("public:local", user, token)
113 assert {:error, :unauthorized} = Streamer.get_topic("public:local:media", user, token)
117 test "allows remote public streams if restricted_unauthenticated is on", %{
121 clear_config([:restrict_unauthenticated, :timelines, :federated], true)
123 %{token: read_notifications_token} = oauth_access(["read:notifications"], user: user)
124 %{token: badly_scoped_token} = oauth_access(["irrelevant:scope"], user: user)
126 assert {:ok, "public"} = Streamer.get_topic("public", user, oauth_token)
127 assert {:ok, "public:media"} = Streamer.get_topic("public:media", user, oauth_token)
129 assert {:ok, "public:remote:lain.com"} =
130 Streamer.get_topic("public:remote", user, oauth_token, %{"instance" => "lain.com"})
132 assert {:ok, "public:remote:media:lain.com"} =
133 Streamer.get_topic("public:remote:media", user, oauth_token, %{
134 "instance" => "lain.com"
137 for token <- [read_notifications_token, badly_scoped_token] do
138 assert {:error, :unauthorized} = Streamer.get_topic("public", user, token)
139 assert {:error, :unauthorized} = Streamer.get_topic("public:media", user, token)
141 assert {:error, :unauthorized} =
142 Streamer.get_topic("public:remote", user, token, %{
143 "instance" => "lain.com"
146 assert {:error, :unauthorized} =
147 Streamer.get_topic("public:remote:media", user, token, %{
148 "instance" => "lain.com"
153 test "allows user streams (with proper OAuth token scopes)", %{
155 token: read_oauth_token
157 %{token: read_notifications_token} = oauth_access(["read:notifications"], user: user)
158 %{token: read_statuses_token} = oauth_access(["read:statuses"], user: user)
159 %{token: badly_scoped_token} = oauth_access(["irrelevant:scope"], user: user)
161 expected_user_topic = "user:#{user.id}"
162 expected_notification_topic = "user:notification:#{user.id}"
163 expected_direct_topic = "direct:#{user.id}"
164 expected_pleroma_chat_topic = "user:pleroma_chat:#{user.id}"
166 for valid_user_token <- [read_oauth_token, read_statuses_token] do
167 assert {:ok, ^expected_user_topic} = Streamer.get_topic("user", user, valid_user_token)
169 assert {:ok, ^expected_direct_topic} =
170 Streamer.get_topic("direct", user, valid_user_token)
172 assert {:ok, ^expected_pleroma_chat_topic} =
173 Streamer.get_topic("user:pleroma_chat", user, valid_user_token)
176 for invalid_user_token <- [read_notifications_token, badly_scoped_token],
177 user_topic <- ["user", "direct", "user:pleroma_chat"] do
178 assert {:error, :unauthorized} = Streamer.get_topic(user_topic, user, invalid_user_token)
181 for valid_notification_token <- [read_oauth_token, read_notifications_token] do
182 assert {:ok, ^expected_notification_topic} =
183 Streamer.get_topic("user:notification", user, valid_notification_token)
186 for invalid_notification_token <- [read_statuses_token, badly_scoped_token] do
187 assert {:error, :unauthorized} =
188 Streamer.get_topic("user:notification", user, invalid_notification_token)
192 test "allows hashtag streams (regardless of OAuth token scopes)", %{
194 token: read_oauth_token
196 for oauth_token <- [nil, read_oauth_token] do
197 assert {:ok, "hashtag:cofe"} =
198 Streamer.get_topic("hashtag", user, oauth_token, %{"tag" => "cofe"})
202 test "disallows registering to another user's stream", %{user: user, token: read_oauth_token} do
203 another_user = insert(:user)
204 assert {:error, _} = Streamer.get_topic("user:#{another_user.id}", user, read_oauth_token)
207 Streamer.get_topic("user:notification:#{another_user.id}", user, read_oauth_token)
209 assert {:error, _} = Streamer.get_topic("direct:#{another_user.id}", user, read_oauth_token)
212 test "allows list stream that are owned by the user (with `read` or `read:lists` scopes)", %{
214 token: read_oauth_token
216 %{token: read_lists_token} = oauth_access(["read:lists"], user: user)
217 %{token: invalid_token} = oauth_access(["irrelevant:scope"], user: user)
218 {:ok, list} = List.create("Test", user)
220 assert {:error, _} = Streamer.get_topic("list:#{list.id}", user, read_oauth_token)
222 for valid_token <- [read_oauth_token, read_lists_token] do
223 assert {:ok, _} = Streamer.get_topic("list", user, valid_token, %{"list" => list.id})
226 assert {:error, _} = Streamer.get_topic("list", user, invalid_token, %{"list" => list.id})
229 test "disallows list stream that are not owned by the user", %{user: user, token: oauth_token} do
230 another_user = insert(:user)
231 {:ok, list} = List.create("Test", another_user)
233 assert {:error, _} = Streamer.get_topic("list:#{list.id}", user, oauth_token)
234 assert {:error, _} = Streamer.get_topic("list", user, oauth_token, %{"list" => list.id})
238 describe "user streams" do
240 %{user: user, token: token} = oauth_access(["read"])
241 notify = insert(:notification, user: user, activity: build(:note_activity))
242 {:ok, %{user: user, notify: notify, token: token}}
245 test "it streams the user's post in the 'user' stream", %{user: user, token: oauth_token} do
246 Streamer.get_topic_and_add_socket("user", user, oauth_token)
247 {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
249 assert_receive {:render_with_user, _, _, ^activity, _}
250 refute Streamer.filtered_by_user?(user, activity)
253 test "it streams boosts of the user in the 'user' stream", %{user: user, token: oauth_token} do
254 Streamer.get_topic_and_add_socket("user", user, oauth_token)
256 other_user = insert(:user)
257 {:ok, activity} = CommonAPI.post(other_user, %{status: "hey"})
258 {:ok, announce} = CommonAPI.repeat(activity.id, user)
260 assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce, _}
261 refute Streamer.filtered_by_user?(user, announce)
264 test "it does not stream announces of the user's own posts in the 'user' stream", %{
268 Streamer.get_topic_and_add_socket("user", user, oauth_token)
270 other_user = insert(:user)
271 {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
272 {:ok, announce} = CommonAPI.repeat(activity.id, other_user)
274 assert Streamer.filtered_by_user?(user, announce)
277 test "it does stream notifications announces of the user's own posts in the 'user' stream", %{
281 Streamer.get_topic_and_add_socket("user", user, oauth_token)
283 other_user = insert(:user)
284 {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
285 {:ok, announce} = CommonAPI.repeat(activity.id, other_user)
289 |> Repo.get_by(%{user_id: user.id, activity_id: announce.id})
290 |> Repo.preload(:activity)
292 refute Streamer.filtered_by_user?(user, notification)
295 test "it streams boosts of mastodon user in the 'user' stream", %{
299 Streamer.get_topic_and_add_socket("user", user, oauth_token)
301 other_user = insert(:user)
302 {:ok, activity} = CommonAPI.post(other_user, %{status: "hey"})
305 File.read!("test/fixtures/mastodon-announce.json")
307 |> Map.put("object", activity.data["object"])
308 |> Map.put("actor", user.ap_id)
310 {:ok, %Pleroma.Activity{data: _data, local: false} = announce} =
311 Pleroma.Web.ActivityPub.Transmogrifier.handle_incoming(data)
313 assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce, _}
314 refute Streamer.filtered_by_user?(user, announce)
317 test "it sends notify to in the 'user' stream", %{
322 Streamer.get_topic_and_add_socket("user", user, oauth_token)
323 Streamer.stream("user", notify)
325 assert_receive {:render_with_user, _, _, ^notify, _}
326 refute Streamer.filtered_by_user?(user, notify)
329 test "it sends notify to in the 'user:notification' stream", %{
334 Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
335 Streamer.stream("user:notification", notify)
337 assert_receive {:render_with_user, _, _, ^notify, _}
338 refute Streamer.filtered_by_user?(user, notify)
341 test "it sends chat messages to the 'user:pleroma_chat' stream", %{
345 other_user = insert(:user)
347 {:ok, create_activity} =
348 CommonAPI.post_chat_message(other_user, user, "hey cirno", idempotency_key: "123")
350 object = Object.normalize(create_activity, fetch: false)
351 chat = Chat.get(user.id, other_user.ap_id)
352 cm_ref = MessageReference.for_chat_and_object(chat, object)
353 cm_ref = %{cm_ref | chat: chat, object: object}
355 Streamer.get_topic_and_add_socket("user:pleroma_chat", user, oauth_token)
356 Streamer.stream("user:pleroma_chat", {user, cm_ref})
361 %{chat_message_reference: cm_ref},
362 "user:pleroma_chat:#{user.id}"
365 assert text =~ "hey cirno"
366 assert_receive {:text, ^text}
369 test "it sends chat messages to the 'user' stream", %{user: user, token: oauth_token} do
370 other_user = insert(:user)
372 {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno")
373 object = Object.normalize(create_activity, fetch: false)
374 chat = Chat.get(user.id, other_user.ap_id)
375 cm_ref = MessageReference.for_chat_and_object(chat, object)
376 cm_ref = %{cm_ref | chat: chat, object: object}
378 Streamer.get_topic_and_add_socket("user", user, oauth_token)
379 Streamer.stream("user", {user, cm_ref})
384 %{chat_message_reference: cm_ref},
388 assert text =~ "hey cirno"
389 assert_receive {:text, ^text}
392 test "it sends chat message notifications to the 'user:notification' stream", %{
396 other_user = insert(:user)
398 {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey")
401 Repo.get_by(Pleroma.Notification, user_id: user.id, activity_id: create_activity.id)
402 |> Repo.preload(:activity)
404 Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
405 Streamer.stream("user:notification", notify)
407 assert_receive {:render_with_user, _, _, ^notify, _}
408 refute Streamer.filtered_by_user?(user, notify)
411 test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
415 blocked = insert(:user)
416 {:ok, _user_relationship} = User.block(user, blocked)
418 Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
420 {:ok, activity} = CommonAPI.post(user, %{status: ":("})
421 {:ok, _} = CommonAPI.favorite(blocked, activity.id)
426 test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
430 user2 = insert(:user)
432 {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
433 {:ok, _} = CommonAPI.add_mute(user, activity)
435 Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
437 {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
440 assert Streamer.filtered_by_user?(user, favorite_activity)
443 test "it sends favorite to 'user:notification' stream'", %{
447 user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
449 {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
450 Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
451 {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
453 assert_receive {:render_with_user, _, "notification.json", notif, _}
454 assert notif.activity.id == favorite_activity.id
455 refute Streamer.filtered_by_user?(user, notif)
458 test "it doesn't send the 'user:notification' stream' when a domain is blocked", %{
462 user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
464 {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
465 {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
466 Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
467 {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
470 assert Streamer.filtered_by_user?(user, favorite_activity)
473 test "it sends follow activities to the 'user:notification' stream", %{
477 user2 = insert(:user)
479 Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
480 {:ok, _follower, _followed, follow_activity} = CommonAPI.follow(user2, user)
482 assert_receive {:render_with_user, _, "notification.json", notif, _}
483 assert notif.activity.id == follow_activity.id
484 refute Streamer.filtered_by_user?(user, notif)
487 test "it sends follow relationships updates to the 'user' stream", %{
492 other_user = insert(:user)
493 other_user_id = other_user.id
495 Streamer.get_topic_and_add_socket("user", user, oauth_token)
496 {:ok, _follower, _followed, _follow_activity} = CommonAPI.follow(user, other_user)
498 assert_receive {:text, event}
500 assert %{"event" => "pleroma:follow_relationships_update", "payload" => payload} =
505 "follower_count" => 0,
506 "following_count" => 0,
510 "follower_count" => 0,
511 "following_count" => 0,
512 "id" => ^other_user_id
514 "state" => "follow_pending"
515 } = Jason.decode!(payload)
517 assert_receive {:text, event}
519 assert %{"event" => "pleroma:follow_relationships_update", "payload" => payload} =
524 "follower_count" => 0,
525 "following_count" => 1,
529 "follower_count" => 1,
530 "following_count" => 0,
531 "id" => ^other_user_id
533 "state" => "follow_accept"
534 } = Jason.decode!(payload)
537 test "it streams edits in the 'user' stream", %{user: user, token: oauth_token} do
538 sender = insert(:user)
539 {:ok, _, _, _} = CommonAPI.follow(user, sender)
541 {:ok, activity} = CommonAPI.post(sender, %{status: "hey"})
543 Streamer.get_topic_and_add_socket("user", user, oauth_token)
544 {:ok, edited} = CommonAPI.update(sender, activity, %{status: "mew mew"})
545 create = Pleroma.Activity.get_create_by_object_ap_id_with_object(activity.object.data["id"])
547 assert_receive {:render_with_user, _, "status_update.json", ^create, _}
548 refute Streamer.filtered_by_user?(user, edited)
551 test "it streams own edits in the 'user' stream", %{user: user, token: oauth_token} do
552 {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
554 Streamer.get_topic_and_add_socket("user", user, oauth_token)
555 {:ok, edited} = CommonAPI.update(user, activity, %{status: "mew mew"})
556 create = Pleroma.Activity.get_create_by_object_ap_id_with_object(activity.object.data["id"])
558 assert_receive {:render_with_user, _, "status_update.json", ^create, _}
559 refute Streamer.filtered_by_user?(user, edited)
563 describe "public streams" do
564 test "it sends to public (authenticated)" do
565 %{user: user, token: oauth_token} = oauth_access(["read"])
566 other_user = insert(:user)
568 Streamer.get_topic_and_add_socket("public", user, oauth_token)
570 {:ok, activity} = CommonAPI.post(other_user, %{status: "Test"})
571 assert_receive {:render_with_user, _, _, ^activity, _}
572 refute Streamer.filtered_by_user?(other_user, activity)
575 test "it sends to public (unauthenticated)" do
578 Streamer.get_topic_and_add_socket("public", nil, nil)
580 {:ok, activity} = CommonAPI.post(user, %{status: "Test"})
581 activity_id = activity.id
582 assert_receive {:text, event}
583 assert %{"event" => "update", "payload" => payload} = Jason.decode!(event)
584 assert %{"id" => ^activity_id} = Jason.decode!(payload)
586 {:ok, _} = CommonAPI.delete(activity.id, user)
587 assert_receive {:text, event}
588 assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
591 test "handles deletions" do
592 %{user: user, token: oauth_token} = oauth_access(["read"])
593 other_user = insert(:user)
594 {:ok, activity} = CommonAPI.post(other_user, %{status: "Test"})
596 Streamer.get_topic_and_add_socket("public", user, oauth_token)
598 {:ok, _} = CommonAPI.delete(activity.id, other_user)
599 activity_id = activity.id
600 assert_receive {:text, event}
601 assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
604 test "it streams edits in the 'public' stream" do
605 sender = insert(:user)
607 Streamer.get_topic_and_add_socket("public", nil, nil)
608 {:ok, activity} = CommonAPI.post(sender, %{status: "hey"})
609 assert_receive {:text, _}
611 {:ok, edited} = CommonAPI.update(sender, activity, %{status: "mew mew"})
613 edited = Pleroma.Activity.normalize(edited)
615 %{id: activity_id} = Pleroma.Activity.get_create_by_object_ap_id(edited.object.data["id"])
617 assert_receive {:text, event}
618 assert %{"event" => "status.update", "payload" => payload} = Jason.decode!(event)
619 assert %{"id" => ^activity_id} = Jason.decode!(payload)
620 refute Streamer.filtered_by_user?(sender, edited)
623 test "it streams multiple edits in the 'public' stream correctly" do
624 sender = insert(:user)
626 Streamer.get_topic_and_add_socket("public", nil, nil)
627 {:ok, activity} = CommonAPI.post(sender, %{status: "hey"})
628 assert_receive {:text, _}
630 {:ok, edited} = CommonAPI.update(sender, activity, %{status: "mew mew"})
632 edited = Pleroma.Activity.normalize(edited)
634 %{id: activity_id} = Pleroma.Activity.get_create_by_object_ap_id(edited.object.data["id"])
636 assert_receive {:text, event}
637 assert %{"event" => "status.update", "payload" => payload} = Jason.decode!(event)
638 assert %{"id" => ^activity_id} = Jason.decode!(payload)
639 refute Streamer.filtered_by_user?(sender, edited)
641 {:ok, edited} = CommonAPI.update(sender, activity, %{status: "mew mew 2"})
643 edited = Pleroma.Activity.normalize(edited)
645 %{id: activity_id} = Pleroma.Activity.get_create_by_object_ap_id(edited.object.data["id"])
646 assert_receive {:text, event}
647 assert %{"event" => "status.update", "payload" => payload} = Jason.decode!(event)
648 assert %{"id" => ^activity_id, "content" => "mew mew 2"} = Jason.decode!(payload)
649 refute Streamer.filtered_by_user?(sender, edited)
653 describe "thread_containment/2" do
654 test "it filters to user if recipients invalid and thread containment is enabled" do
655 clear_config([:instance, :skip_thread_containment], false)
656 author = insert(:user)
657 %{user: user, token: oauth_token} = oauth_access(["read"])
658 User.follow(user, author, :follow_accept)
661 insert(:note_activity,
665 data: %{"to" => ["TEST-FFF"]}
669 Streamer.get_topic_and_add_socket("public", user, oauth_token)
670 Streamer.stream("public", activity)
671 assert_receive {:render_with_user, _, _, ^activity, _}
672 assert Streamer.filtered_by_user?(user, activity)
675 test "it sends message if recipients invalid and thread containment is disabled" do
676 clear_config([:instance, :skip_thread_containment], true)
677 author = insert(:user)
678 %{user: user, token: oauth_token} = oauth_access(["read"])
679 User.follow(user, author, :follow_accept)
682 insert(:note_activity,
686 data: %{"to" => ["TEST-FFF"]}
690 Streamer.get_topic_and_add_socket("public", user, oauth_token)
691 Streamer.stream("public", activity)
693 assert_receive {:render_with_user, _, _, ^activity, _}
694 refute Streamer.filtered_by_user?(user, activity)
697 test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do
698 clear_config([:instance, :skip_thread_containment], false)
699 author = insert(:user)
700 user = insert(:user, skip_thread_containment: true)
701 %{token: oauth_token} = oauth_access(["read"], user: user)
702 User.follow(user, author, :follow_accept)
705 insert(:note_activity,
709 data: %{"to" => ["TEST-FFF"]}
713 Streamer.get_topic_and_add_socket("public", user, oauth_token)
714 Streamer.stream("public", activity)
716 assert_receive {:render_with_user, _, _, ^activity, _}
717 refute Streamer.filtered_by_user?(user, activity)
722 setup do: oauth_access(["read"])
724 test "it filters messages involving blocked users", %{user: user, token: oauth_token} do
725 blocked_user = insert(:user)
726 {:ok, _user_relationship} = User.block(user, blocked_user)
728 Streamer.get_topic_and_add_socket("public", user, oauth_token)
729 {:ok, activity} = CommonAPI.post(blocked_user, %{status: "Test"})
730 assert_receive {:render_with_user, _, _, ^activity, _}
731 assert Streamer.filtered_by_user?(user, activity)
734 test "it filters messages transitively involving blocked users", %{
738 blockee = insert(:user)
739 friend = insert(:user)
741 Streamer.get_topic_and_add_socket("public", blocker, blocker_token)
743 {:ok, _user_relationship} = User.block(blocker, blockee)
745 {:ok, activity_one} = CommonAPI.post(friend, %{status: "hey! @#{blockee.nickname}"})
747 assert_receive {:render_with_user, _, _, ^activity_one, _}
748 assert Streamer.filtered_by_user?(blocker, activity_one)
750 {:ok, activity_two} = CommonAPI.post(blockee, %{status: "hey! @#{friend.nickname}"})
752 assert_receive {:render_with_user, _, _, ^activity_two, _}
753 assert Streamer.filtered_by_user?(blocker, activity_two)
755 {:ok, activity_three} = CommonAPI.post(blockee, %{status: "hey! @#{blocker.nickname}"})
757 assert_receive {:render_with_user, _, _, ^activity_three, _}
758 assert Streamer.filtered_by_user?(blocker, activity_three)
763 setup do: oauth_access(["read"])
765 test "it doesn't send unwanted DMs to list", %{user: user_a, token: user_a_token} do
766 user_b = insert(:user)
767 user_c = insert(:user)
769 {:ok, user_a, user_b} = User.follow(user_a, user_b)
771 {:ok, list} = List.create("Test", user_a)
772 {:ok, list} = List.follow(list, user_b)
774 Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id})
777 CommonAPI.post(user_b, %{
778 status: "@#{user_c.nickname} Test",
785 test "it doesn't send unwanted private posts to list", %{user: user_a, token: user_a_token} do
786 user_b = insert(:user)
788 {:ok, list} = List.create("Test", user_a)
789 {:ok, list} = List.follow(list, user_b)
791 Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id})
794 CommonAPI.post(user_b, %{
796 visibility: "private"
802 test "it sends wanted private posts to list", %{user: user_a, token: user_a_token} do
803 user_b = insert(:user)
805 {:ok, user_a, user_b} = User.follow(user_a, user_b)
807 {:ok, list} = List.create("Test", user_a)
808 {:ok, list} = List.follow(list, user_b)
810 Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id})
813 CommonAPI.post(user_b, %{
815 visibility: "private"
818 assert_receive {:render_with_user, _, _, ^activity, _}
819 refute Streamer.filtered_by_user?(user_a, activity)
823 describe "muted reblogs" do
824 setup do: oauth_access(["read"])
826 test "it filters muted reblogs", %{user: user1, token: user1_token} do
827 user2 = insert(:user)
828 user3 = insert(:user)
829 CommonAPI.follow(user1, user2)
830 CommonAPI.hide_reblogs(user1, user2)
832 {:ok, create_activity} = CommonAPI.post(user3, %{status: "I'm kawen"})
834 Streamer.get_topic_and_add_socket("user", user1, user1_token)
835 {:ok, announce_activity} = CommonAPI.repeat(create_activity.id, user2)
836 assert_receive {:render_with_user, _, _, ^announce_activity, _}
837 assert Streamer.filtered_by_user?(user1, announce_activity)
840 test "it filters reblog notification for reblog-muted actors", %{
844 user2 = insert(:user)
845 CommonAPI.follow(user1, user2)
846 CommonAPI.hide_reblogs(user1, user2)
848 {:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"})
849 Streamer.get_topic_and_add_socket("user", user1, user1_token)
850 {:ok, _announce_activity} = CommonAPI.repeat(create_activity.id, user2)
852 assert_receive {:render_with_user, _, "notification.json", notif, _}
853 assert Streamer.filtered_by_user?(user1, notif)
856 test "it send non-reblog notification for reblog-muted actors", %{
860 user2 = insert(:user)
861 CommonAPI.follow(user1, user2)
862 CommonAPI.hide_reblogs(user1, user2)
864 {:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"})
865 Streamer.get_topic_and_add_socket("user", user1, user1_token)
866 {:ok, _favorite_activity} = CommonAPI.favorite(user2, create_activity.id)
868 assert_receive {:render_with_user, _, "notification.json", notif, _}
869 refute Streamer.filtered_by_user?(user1, notif)
873 describe "muted threads" do
874 test "it filters posts from muted threads" do
876 %{user: user2, token: user2_token} = oauth_access(["read"])
877 Streamer.get_topic_and_add_socket("user", user2, user2_token)
879 {:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
880 {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
881 {:ok, _} = CommonAPI.add_mute(user2, activity)
883 assert_receive {:render_with_user, _, _, ^activity, _}
884 assert Streamer.filtered_by_user?(user2, activity)
888 describe "direct streams" do
889 setup do: oauth_access(["read"])
891 test "it sends conversation update to the 'direct' stream", %{user: user, token: oauth_token} do
892 another_user = insert(:user)
894 Streamer.get_topic_and_add_socket("direct", user, oauth_token)
896 {:ok, _create_activity} =
897 CommonAPI.post(another_user, %{
898 status: "hey @#{user.nickname}",
902 assert_receive {:text, received_event}
904 assert %{"event" => "conversation", "payload" => received_payload} =
905 Jason.decode!(received_event)
907 assert %{"last_status" => last_status} = Jason.decode!(received_payload)
908 [participation] = Participation.for_user(user)
909 assert last_status["pleroma"]["direct_conversation_id"] == participation.id
912 test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted",
913 %{user: user, token: oauth_token} do
914 another_user = insert(:user)
916 Streamer.get_topic_and_add_socket("direct", user, oauth_token)
918 {:ok, create_activity} =
919 CommonAPI.post(another_user, %{
920 status: "hi @#{user.nickname}",
924 create_activity_id = create_activity.id
925 assert_receive {:render_with_user, _, _, ^create_activity, _}
926 assert_receive {:text, received_conversation1}
927 assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
929 {:ok, _} = CommonAPI.delete(create_activity_id, another_user)
931 assert_receive {:text, received_event}
933 assert %{"event" => "delete", "payload" => ^create_activity_id} =
934 Jason.decode!(received_event)
940 test "it sends conversation update to the 'direct' stream when a message is deleted", %{
944 another_user = insert(:user)
945 Streamer.get_topic_and_add_socket("direct", user, oauth_token)
947 {:ok, create_activity} =
948 CommonAPI.post(another_user, %{
949 status: "hi @#{user.nickname}",
953 {:ok, create_activity2} =
954 CommonAPI.post(another_user, %{
955 status: "hi @#{user.nickname} 2",
956 in_reply_to_status_id: create_activity.id,
960 assert_receive {:render_with_user, _, _, ^create_activity, _}
961 assert_receive {:render_with_user, _, _, ^create_activity2, _}
962 assert_receive {:text, received_conversation1}
963 assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
964 assert_receive {:text, received_conversation1}
965 assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
967 {:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
969 assert_receive {:text, received_event}
970 assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
972 assert_receive {:text, received_event}
974 assert %{"event" => "conversation", "payload" => received_payload} =
975 Jason.decode!(received_event)
977 assert %{"last_status" => last_status} = Jason.decode!(received_payload)
978 assert last_status["id"] == to_string(create_activity.id)
982 describe "stop streaming if token got revoked" do
984 child_proc = fn start, finalize ->
989 {StreamerTest, :ready} ->
990 assert_receive {:render_with_user, _, "update.json", _, _}
993 {StreamerTest, :revoked} -> finalize.()
999 starter = fn user, token ->
1000 fn -> Streamer.get_topic_and_add_socket("user", user, token) end
1003 hit = fn -> assert_receive :close end
1004 miss = fn -> refute_receive :close end
1006 send_all = fn tasks, thing -> Enum.each(tasks, &send(&1.pid, thing)) end
1009 child_proc: child_proc,
1017 test "do not revoke other tokens", %{
1018 child_proc: child_proc,
1024 %{user: user, token: token} = oauth_access(["read"])
1025 %{token: token2} = oauth_access(["read"], user: user)
1026 %{user: user2, token: user2_token} = oauth_access(["read"])
1028 post_user = insert(:user)
1029 CommonAPI.follow(user, post_user)
1030 CommonAPI.follow(user2, post_user)
1033 Task.async(child_proc.(starter.(user, token), hit)),
1034 Task.async(child_proc.(starter.(user, token2), miss)),
1035 Task.async(child_proc.(starter.(user2, user2_token), miss))
1039 CommonAPI.post(post_user, %{
1043 send_all.(tasks, {StreamerTest, :ready})
1045 Pleroma.Web.OAuth.Token.Strategy.Revoke.revoke(token)
1047 send_all.(tasks, {StreamerTest, :revoked})
1049 Enum.each(tasks, &Task.await/1)
1052 test "revoke all streams for this token", %{
1053 child_proc: child_proc,
1058 %{user: user, token: token} = oauth_access(["read"])
1060 post_user = insert(:user)
1061 CommonAPI.follow(user, post_user)
1064 Task.async(child_proc.(starter.(user, token), hit)),
1065 Task.async(child_proc.(starter.(user, token), hit))
1069 CommonAPI.post(post_user, %{
1073 send_all.(tasks, {StreamerTest, :ready})
1075 Pleroma.Web.OAuth.Token.Strategy.Revoke.revoke(token)
1077 send_all.(tasks, {StreamerTest, :revoked})
1079 Enum.each(tasks, &Task.await/1)