First
[anni] / test / pleroma / web / streamer_test.exs
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.StreamerTest do
6   use Pleroma.DataCase
7
8   import Pleroma.Factory
9
10   alias Pleroma.Chat
11   alias Pleroma.Chat.MessageReference
12   alias Pleroma.Conversation.Participation
13   alias Pleroma.List
14   alias Pleroma.Object
15   alias Pleroma.User
16   alias Pleroma.Web.CommonAPI
17   alias Pleroma.Web.Streamer
18   alias Pleroma.Web.StreamerView
19
20   @moduletag needs_streamer: true, capture_log: true
21
22   setup do: clear_config([:instance, :skip_thread_containment])
23
24   describe "get_topic/_ (unauthenticated)" do
25     test "allows public" do
26       assert {:ok, "public"} = Streamer.get_topic("public", nil, nil)
27       assert {:ok, "public:local"} = Streamer.get_topic("public:local", nil, nil)
28       assert {:ok, "public:media"} = Streamer.get_topic("public:media", nil, nil)
29       assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", nil, nil)
30     end
31
32     test "rejects local public streams if restricted_unauthenticated is on" do
33       clear_config([:restrict_unauthenticated, :timelines, :local], true)
34
35       assert {:error, :unauthorized} = Streamer.get_topic("public:local", nil, nil)
36       assert {:error, :unauthorized} = Streamer.get_topic("public:local:media", nil, nil)
37     end
38
39     test "rejects remote public streams if restricted_unauthenticated is on" do
40       clear_config([:restrict_unauthenticated, :timelines, :federated], true)
41
42       assert {:error, :unauthorized} = Streamer.get_topic("public", nil, nil)
43       assert {:error, :unauthorized} = Streamer.get_topic("public:media", nil, nil)
44
45       assert {:error, :unauthorized} =
46                Streamer.get_topic("public:remote", nil, nil, %{"instance" => "lain.com"})
47
48       assert {:error, :unauthorized} =
49                Streamer.get_topic("public:remote:media", nil, nil, %{"instance" => "lain.com"})
50     end
51
52     test "allows instance streams" do
53       assert {:ok, "public:remote:lain.com"} =
54                Streamer.get_topic("public:remote", nil, nil, %{"instance" => "lain.com"})
55
56       assert {:ok, "public:remote:media:lain.com"} =
57                Streamer.get_topic("public:remote:media", nil, nil, %{"instance" => "lain.com"})
58     end
59
60     test "allows hashtag streams" do
61       assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", nil, nil, %{"tag" => "cofe"})
62     end
63
64     test "disallows user streams" do
65       assert {:error, _} = Streamer.get_topic("user", nil, nil)
66       assert {:error, _} = Streamer.get_topic("user:notification", nil, nil)
67       assert {:error, _} = Streamer.get_topic("direct", nil, nil)
68     end
69
70     test "disallows list streams" do
71       assert {:error, _} = Streamer.get_topic("list", nil, nil, %{"list" => 42})
72     end
73   end
74
75   describe "get_topic/_ (authenticated)" do
76     setup do: oauth_access(["read"])
77
78     test "allows public streams (regardless of OAuth token scopes)", %{
79       user: user,
80       token: read_oauth_token
81     } do
82       with oauth_token <- [nil, read_oauth_token] do
83         assert {:ok, "public"} = Streamer.get_topic("public", user, oauth_token)
84         assert {:ok, "public:local"} = Streamer.get_topic("public:local", user, oauth_token)
85         assert {:ok, "public:media"} = Streamer.get_topic("public:media", user, oauth_token)
86
87         assert {:ok, "public:local:media"} =
88                  Streamer.get_topic("public:local:media", user, oauth_token)
89       end
90     end
91
92     test "allows local public streams if restricted_unauthenticated is on", %{
93       user: user,
94       token: oauth_token
95     } do
96       clear_config([:restrict_unauthenticated, :timelines, :local], true)
97
98       %{token: read_notifications_token} = oauth_access(["read:notifications"], user: user)
99       %{token: badly_scoped_token} = oauth_access(["irrelevant:scope"], user: user)
100
101       assert {:ok, "public:local"} = Streamer.get_topic("public:local", user, oauth_token)
102
103       assert {:ok, "public:local:media"} =
104                Streamer.get_topic("public:local:media", user, oauth_token)
105
106       for token <- [read_notifications_token, badly_scoped_token] do
107         assert {:error, :unauthorized} = Streamer.get_topic("public:local", user, token)
108
109         assert {:error, :unauthorized} = Streamer.get_topic("public:local:media", user, token)
110       end
111     end
112
113     test "allows remote public streams if restricted_unauthenticated is on", %{
114       user: user,
115       token: oauth_token
116     } do
117       clear_config([:restrict_unauthenticated, :timelines, :federated], true)
118
119       %{token: read_notifications_token} = oauth_access(["read:notifications"], user: user)
120       %{token: badly_scoped_token} = oauth_access(["irrelevant:scope"], user: user)
121
122       assert {:ok, "public"} = Streamer.get_topic("public", user, oauth_token)
123       assert {:ok, "public:media"} = Streamer.get_topic("public:media", user, oauth_token)
124
125       assert {:ok, "public:remote:lain.com"} =
126                Streamer.get_topic("public:remote", user, oauth_token, %{"instance" => "lain.com"})
127
128       assert {:ok, "public:remote:media:lain.com"} =
129                Streamer.get_topic("public:remote:media", user, oauth_token, %{
130                  "instance" => "lain.com"
131                })
132
133       for token <- [read_notifications_token, badly_scoped_token] do
134         assert {:error, :unauthorized} = Streamer.get_topic("public", user, token)
135         assert {:error, :unauthorized} = Streamer.get_topic("public:media", user, token)
136
137         assert {:error, :unauthorized} =
138                  Streamer.get_topic("public:remote", user, token, %{
139                    "instance" => "lain.com"
140                  })
141
142         assert {:error, :unauthorized} =
143                  Streamer.get_topic("public:remote:media", user, token, %{
144                    "instance" => "lain.com"
145                  })
146       end
147     end
148
149     test "allows user streams (with proper OAuth token scopes)", %{
150       user: user,
151       token: read_oauth_token
152     } do
153       %{token: read_notifications_token} = oauth_access(["read:notifications"], user: user)
154       %{token: read_statuses_token} = oauth_access(["read:statuses"], user: user)
155       %{token: badly_scoped_token} = oauth_access(["irrelevant:scope"], user: user)
156
157       expected_user_topic = "user:#{user.id}"
158       expected_notification_topic = "user:notification:#{user.id}"
159       expected_direct_topic = "direct:#{user.id}"
160       expected_pleroma_chat_topic = "user:pleroma_chat:#{user.id}"
161
162       for valid_user_token <- [read_oauth_token, read_statuses_token] do
163         assert {:ok, ^expected_user_topic} = Streamer.get_topic("user", user, valid_user_token)
164
165         assert {:ok, ^expected_direct_topic} =
166                  Streamer.get_topic("direct", user, valid_user_token)
167
168         assert {:ok, ^expected_pleroma_chat_topic} =
169                  Streamer.get_topic("user:pleroma_chat", user, valid_user_token)
170       end
171
172       for invalid_user_token <- [read_notifications_token, badly_scoped_token],
173           user_topic <- ["user", "direct", "user:pleroma_chat"] do
174         assert {:error, :unauthorized} = Streamer.get_topic(user_topic, user, invalid_user_token)
175       end
176
177       for valid_notification_token <- [read_oauth_token, read_notifications_token] do
178         assert {:ok, ^expected_notification_topic} =
179                  Streamer.get_topic("user:notification", user, valid_notification_token)
180       end
181
182       for invalid_notification_token <- [read_statuses_token, badly_scoped_token] do
183         assert {:error, :unauthorized} =
184                  Streamer.get_topic("user:notification", user, invalid_notification_token)
185       end
186     end
187
188     test "allows hashtag streams (regardless of OAuth token scopes)", %{
189       user: user,
190       token: read_oauth_token
191     } do
192       for oauth_token <- [nil, read_oauth_token] do
193         assert {:ok, "hashtag:cofe"} =
194                  Streamer.get_topic("hashtag", user, oauth_token, %{"tag" => "cofe"})
195       end
196     end
197
198     test "disallows registering to another user's stream", %{user: user, token: read_oauth_token} do
199       another_user = insert(:user)
200       assert {:error, _} = Streamer.get_topic("user:#{another_user.id}", user, read_oauth_token)
201
202       assert {:error, _} =
203                Streamer.get_topic("user:notification:#{another_user.id}", user, read_oauth_token)
204
205       assert {:error, _} = Streamer.get_topic("direct:#{another_user.id}", user, read_oauth_token)
206     end
207
208     test "allows list stream that are owned by the user (with `read` or `read:lists` scopes)", %{
209       user: user,
210       token: read_oauth_token
211     } do
212       %{token: read_lists_token} = oauth_access(["read:lists"], user: user)
213       %{token: invalid_token} = oauth_access(["irrelevant:scope"], user: user)
214       {:ok, list} = List.create("Test", user)
215
216       assert {:error, _} = Streamer.get_topic("list:#{list.id}", user, read_oauth_token)
217
218       for valid_token <- [read_oauth_token, read_lists_token] do
219         assert {:ok, _} = Streamer.get_topic("list", user, valid_token, %{"list" => list.id})
220       end
221
222       assert {:error, _} = Streamer.get_topic("list", user, invalid_token, %{"list" => list.id})
223     end
224
225     test "disallows list stream that are not owned by the user", %{user: user, token: oauth_token} do
226       another_user = insert(:user)
227       {:ok, list} = List.create("Test", another_user)
228
229       assert {:error, _} = Streamer.get_topic("list:#{list.id}", user, oauth_token)
230       assert {:error, _} = Streamer.get_topic("list", user, oauth_token, %{"list" => list.id})
231     end
232   end
233
234   describe "user streams" do
235     setup do
236       %{user: user, token: token} = oauth_access(["read"])
237       notify = insert(:notification, user: user, activity: build(:note_activity))
238       {:ok, %{user: user, notify: notify, token: token}}
239     end
240
241     test "it streams the user's post in the 'user' stream", %{user: user, token: oauth_token} do
242       Streamer.get_topic_and_add_socket("user", user, oauth_token)
243       {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
244
245       assert_receive {:render_with_user, _, _, ^activity}
246       refute Streamer.filtered_by_user?(user, activity)
247     end
248
249     test "it streams boosts of the user in the 'user' stream", %{user: user, token: oauth_token} do
250       Streamer.get_topic_and_add_socket("user", user, oauth_token)
251
252       other_user = insert(:user)
253       {:ok, activity} = CommonAPI.post(other_user, %{status: "hey"})
254       {:ok, announce} = CommonAPI.repeat(activity.id, user)
255
256       assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce}
257       refute Streamer.filtered_by_user?(user, announce)
258     end
259
260     test "it does not stream announces of the user's own posts in the 'user' stream", %{
261       user: user,
262       token: oauth_token
263     } do
264       Streamer.get_topic_and_add_socket("user", user, oauth_token)
265
266       other_user = insert(:user)
267       {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
268       {:ok, announce} = CommonAPI.repeat(activity.id, other_user)
269
270       assert Streamer.filtered_by_user?(user, announce)
271     end
272
273     test "it does stream notifications announces of the user's own posts in the 'user' stream", %{
274       user: user,
275       token: oauth_token
276     } do
277       Streamer.get_topic_and_add_socket("user", user, oauth_token)
278
279       other_user = insert(:user)
280       {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
281       {:ok, announce} = CommonAPI.repeat(activity.id, other_user)
282
283       notification =
284         Pleroma.Notification
285         |> Repo.get_by(%{user_id: user.id, activity_id: announce.id})
286         |> Repo.preload(:activity)
287
288       refute Streamer.filtered_by_user?(user, notification)
289     end
290
291     test "it streams boosts of mastodon user in the 'user' stream", %{
292       user: user,
293       token: oauth_token
294     } do
295       Streamer.get_topic_and_add_socket("user", user, oauth_token)
296
297       other_user = insert(:user)
298       {:ok, activity} = CommonAPI.post(other_user, %{status: "hey"})
299
300       data =
301         File.read!("test/fixtures/mastodon-announce.json")
302         |> Jason.decode!()
303         |> Map.put("object", activity.data["object"])
304         |> Map.put("actor", user.ap_id)
305
306       {:ok, %Pleroma.Activity{data: _data, local: false} = announce} =
307         Pleroma.Web.ActivityPub.Transmogrifier.handle_incoming(data)
308
309       assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce}
310       refute Streamer.filtered_by_user?(user, announce)
311     end
312
313     test "it sends notify to in the 'user' stream", %{
314       user: user,
315       token: oauth_token,
316       notify: notify
317     } do
318       Streamer.get_topic_and_add_socket("user", user, oauth_token)
319       Streamer.stream("user", notify)
320
321       assert_receive {:render_with_user, _, _, ^notify}
322       refute Streamer.filtered_by_user?(user, notify)
323     end
324
325     test "it sends notify to in the 'user:notification' stream", %{
326       user: user,
327       token: oauth_token,
328       notify: notify
329     } do
330       Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
331       Streamer.stream("user:notification", notify)
332
333       assert_receive {:render_with_user, _, _, ^notify}
334       refute Streamer.filtered_by_user?(user, notify)
335     end
336
337     test "it sends chat messages to the 'user:pleroma_chat' stream", %{
338       user: user,
339       token: oauth_token
340     } do
341       other_user = insert(:user)
342
343       {:ok, create_activity} =
344         CommonAPI.post_chat_message(other_user, user, "hey cirno", idempotency_key: "123")
345
346       object = Object.normalize(create_activity, fetch: false)
347       chat = Chat.get(user.id, other_user.ap_id)
348       cm_ref = MessageReference.for_chat_and_object(chat, object)
349       cm_ref = %{cm_ref | chat: chat, object: object}
350
351       Streamer.get_topic_and_add_socket("user:pleroma_chat", user, oauth_token)
352       Streamer.stream("user:pleroma_chat", {user, cm_ref})
353
354       text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
355
356       assert text =~ "hey cirno"
357       assert_receive {:text, ^text}
358     end
359
360     test "it sends chat messages to the 'user' stream", %{user: user, token: oauth_token} do
361       other_user = insert(:user)
362
363       {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno")
364       object = Object.normalize(create_activity, fetch: false)
365       chat = Chat.get(user.id, other_user.ap_id)
366       cm_ref = MessageReference.for_chat_and_object(chat, object)
367       cm_ref = %{cm_ref | chat: chat, object: object}
368
369       Streamer.get_topic_and_add_socket("user", user, oauth_token)
370       Streamer.stream("user", {user, cm_ref})
371
372       text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
373
374       assert text =~ "hey cirno"
375       assert_receive {:text, ^text}
376     end
377
378     test "it sends chat message notifications to the 'user:notification' stream", %{
379       user: user,
380       token: oauth_token
381     } do
382       other_user = insert(:user)
383
384       {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey")
385
386       notify =
387         Repo.get_by(Pleroma.Notification, user_id: user.id, activity_id: create_activity.id)
388         |> Repo.preload(:activity)
389
390       Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
391       Streamer.stream("user:notification", notify)
392
393       assert_receive {:render_with_user, _, _, ^notify}
394       refute Streamer.filtered_by_user?(user, notify)
395     end
396
397     test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
398       user: user,
399       token: oauth_token
400     } do
401       blocked = insert(:user)
402       {:ok, _user_relationship} = User.block(user, blocked)
403
404       Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
405
406       {:ok, activity} = CommonAPI.post(user, %{status: ":("})
407       {:ok, _} = CommonAPI.favorite(blocked, activity.id)
408
409       refute_receive _
410     end
411
412     test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
413       user: user,
414       token: oauth_token
415     } do
416       user2 = insert(:user)
417
418       {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
419       {:ok, _} = CommonAPI.add_mute(user, activity)
420
421       Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
422
423       {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
424
425       refute_receive _
426       assert Streamer.filtered_by_user?(user, favorite_activity)
427     end
428
429     test "it sends favorite to 'user:notification' stream'", %{
430       user: user,
431       token: oauth_token
432     } do
433       user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
434
435       {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
436       Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
437       {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
438
439       assert_receive {:render_with_user, _, "notification.json", notif}
440       assert notif.activity.id == favorite_activity.id
441       refute Streamer.filtered_by_user?(user, notif)
442     end
443
444     test "it doesn't send the 'user:notification' stream' when a domain is blocked", %{
445       user: user,
446       token: oauth_token
447     } do
448       user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
449
450       {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
451       {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
452       Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
453       {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
454
455       refute_receive _
456       assert Streamer.filtered_by_user?(user, favorite_activity)
457     end
458
459     test "it sends follow activities to the 'user:notification' stream", %{
460       user: user,
461       token: oauth_token
462     } do
463       user2 = insert(:user)
464
465       Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
466       {:ok, _follower, _followed, follow_activity} = CommonAPI.follow(user2, user)
467
468       assert_receive {:render_with_user, _, "notification.json", notif}
469       assert notif.activity.id == follow_activity.id
470       refute Streamer.filtered_by_user?(user, notif)
471     end
472
473     test "it sends follow relationships updates to the 'user' stream", %{
474       user: user,
475       token: oauth_token
476     } do
477       user_id = user.id
478       other_user = insert(:user)
479       other_user_id = other_user.id
480
481       Streamer.get_topic_and_add_socket("user", user, oauth_token)
482       {:ok, _follower, _followed, _follow_activity} = CommonAPI.follow(user, other_user)
483
484       assert_receive {:text, event}
485
486       assert %{"event" => "pleroma:follow_relationships_update", "payload" => payload} =
487                Jason.decode!(event)
488
489       assert %{
490                "follower" => %{
491                  "follower_count" => 0,
492                  "following_count" => 0,
493                  "id" => ^user_id
494                },
495                "following" => %{
496                  "follower_count" => 0,
497                  "following_count" => 0,
498                  "id" => ^other_user_id
499                },
500                "state" => "follow_pending"
501              } = Jason.decode!(payload)
502
503       assert_receive {:text, event}
504
505       assert %{"event" => "pleroma:follow_relationships_update", "payload" => payload} =
506                Jason.decode!(event)
507
508       assert %{
509                "follower" => %{
510                  "follower_count" => 0,
511                  "following_count" => 1,
512                  "id" => ^user_id
513                },
514                "following" => %{
515                  "follower_count" => 1,
516                  "following_count" => 0,
517                  "id" => ^other_user_id
518                },
519                "state" => "follow_accept"
520              } = Jason.decode!(payload)
521     end
522
523     test "it streams edits in the 'user' stream", %{user: user, token: oauth_token} do
524       sender = insert(:user)
525       {:ok, _, _, _} = CommonAPI.follow(user, sender)
526
527       {:ok, activity} = CommonAPI.post(sender, %{status: "hey"})
528
529       Streamer.get_topic_and_add_socket("user", user, oauth_token)
530       {:ok, edited} = CommonAPI.update(sender, activity, %{status: "mew mew"})
531       create = Pleroma.Activity.get_create_by_object_ap_id_with_object(activity.object.data["id"])
532
533       assert_receive {:render_with_user, _, "status_update.json", ^create}
534       refute Streamer.filtered_by_user?(user, edited)
535     end
536
537     test "it streams own edits in the 'user' stream", %{user: user, token: oauth_token} do
538       {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
539
540       Streamer.get_topic_and_add_socket("user", user, oauth_token)
541       {:ok, edited} = CommonAPI.update(user, activity, %{status: "mew mew"})
542       create = Pleroma.Activity.get_create_by_object_ap_id_with_object(activity.object.data["id"])
543
544       assert_receive {:render_with_user, _, "status_update.json", ^create}
545       refute Streamer.filtered_by_user?(user, edited)
546     end
547   end
548
549   describe "public streams" do
550     test "it sends to public (authenticated)" do
551       %{user: user, token: oauth_token} = oauth_access(["read"])
552       other_user = insert(:user)
553
554       Streamer.get_topic_and_add_socket("public", user, oauth_token)
555
556       {:ok, activity} = CommonAPI.post(other_user, %{status: "Test"})
557       assert_receive {:render_with_user, _, _, ^activity}
558       refute Streamer.filtered_by_user?(other_user, activity)
559     end
560
561     test "it sends to public (unauthenticated)" do
562       user = insert(:user)
563
564       Streamer.get_topic_and_add_socket("public", nil, nil)
565
566       {:ok, activity} = CommonAPI.post(user, %{status: "Test"})
567       activity_id = activity.id
568       assert_receive {:text, event}
569       assert %{"event" => "update", "payload" => payload} = Jason.decode!(event)
570       assert %{"id" => ^activity_id} = Jason.decode!(payload)
571
572       {:ok, _} = CommonAPI.delete(activity.id, user)
573       assert_receive {:text, event}
574       assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
575     end
576
577     test "handles deletions" do
578       %{user: user, token: oauth_token} = oauth_access(["read"])
579       other_user = insert(:user)
580       {:ok, activity} = CommonAPI.post(other_user, %{status: "Test"})
581
582       Streamer.get_topic_and_add_socket("public", user, oauth_token)
583
584       {:ok, _} = CommonAPI.delete(activity.id, other_user)
585       activity_id = activity.id
586       assert_receive {:text, event}
587       assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
588     end
589
590     test "it streams edits in the 'public' stream" do
591       sender = insert(:user)
592
593       Streamer.get_topic_and_add_socket("public", nil, nil)
594       {:ok, activity} = CommonAPI.post(sender, %{status: "hey"})
595       assert_receive {:text, _}
596
597       {:ok, edited} = CommonAPI.update(sender, activity, %{status: "mew mew"})
598
599       edited = Pleroma.Activity.normalize(edited)
600
601       %{id: activity_id} = Pleroma.Activity.get_create_by_object_ap_id(edited.object.data["id"])
602
603       assert_receive {:text, event}
604       assert %{"event" => "status.update", "payload" => payload} = Jason.decode!(event)
605       assert %{"id" => ^activity_id} = Jason.decode!(payload)
606       refute Streamer.filtered_by_user?(sender, edited)
607     end
608
609     test "it streams multiple edits in the 'public' stream correctly" do
610       sender = insert(:user)
611
612       Streamer.get_topic_and_add_socket("public", nil, nil)
613       {:ok, activity} = CommonAPI.post(sender, %{status: "hey"})
614       assert_receive {:text, _}
615
616       {:ok, edited} = CommonAPI.update(sender, activity, %{status: "mew mew"})
617
618       edited = Pleroma.Activity.normalize(edited)
619
620       %{id: activity_id} = Pleroma.Activity.get_create_by_object_ap_id(edited.object.data["id"])
621
622       assert_receive {:text, event}
623       assert %{"event" => "status.update", "payload" => payload} = Jason.decode!(event)
624       assert %{"id" => ^activity_id} = Jason.decode!(payload)
625       refute Streamer.filtered_by_user?(sender, edited)
626
627       {:ok, edited} = CommonAPI.update(sender, activity, %{status: "mew mew 2"})
628
629       edited = Pleroma.Activity.normalize(edited)
630
631       %{id: activity_id} = Pleroma.Activity.get_create_by_object_ap_id(edited.object.data["id"])
632       assert_receive {:text, event}
633       assert %{"event" => "status.update", "payload" => payload} = Jason.decode!(event)
634       assert %{"id" => ^activity_id, "content" => "mew mew 2"} = Jason.decode!(payload)
635       refute Streamer.filtered_by_user?(sender, edited)
636     end
637   end
638
639   describe "thread_containment/2" do
640     test "it filters to user if recipients invalid and thread containment is enabled" do
641       clear_config([:instance, :skip_thread_containment], false)
642       author = insert(:user)
643       %{user: user, token: oauth_token} = oauth_access(["read"])
644       User.follow(user, author, :follow_accept)
645
646       activity =
647         insert(:note_activity,
648           note:
649             insert(:note,
650               user: author,
651               data: %{"to" => ["TEST-FFF"]}
652             )
653         )
654
655       Streamer.get_topic_and_add_socket("public", user, oauth_token)
656       Streamer.stream("public", activity)
657       assert_receive {:render_with_user, _, _, ^activity}
658       assert Streamer.filtered_by_user?(user, activity)
659     end
660
661     test "it sends message if recipients invalid and thread containment is disabled" do
662       clear_config([:instance, :skip_thread_containment], true)
663       author = insert(:user)
664       %{user: user, token: oauth_token} = oauth_access(["read"])
665       User.follow(user, author, :follow_accept)
666
667       activity =
668         insert(:note_activity,
669           note:
670             insert(:note,
671               user: author,
672               data: %{"to" => ["TEST-FFF"]}
673             )
674         )
675
676       Streamer.get_topic_and_add_socket("public", user, oauth_token)
677       Streamer.stream("public", activity)
678
679       assert_receive {:render_with_user, _, _, ^activity}
680       refute Streamer.filtered_by_user?(user, activity)
681     end
682
683     test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do
684       clear_config([:instance, :skip_thread_containment], false)
685       author = insert(:user)
686       user = insert(:user, skip_thread_containment: true)
687       %{token: oauth_token} = oauth_access(["read"], user: user)
688       User.follow(user, author, :follow_accept)
689
690       activity =
691         insert(:note_activity,
692           note:
693             insert(:note,
694               user: author,
695               data: %{"to" => ["TEST-FFF"]}
696             )
697         )
698
699       Streamer.get_topic_and_add_socket("public", user, oauth_token)
700       Streamer.stream("public", activity)
701
702       assert_receive {:render_with_user, _, _, ^activity}
703       refute Streamer.filtered_by_user?(user, activity)
704     end
705   end
706
707   describe "blocks" do
708     setup do: oauth_access(["read"])
709
710     test "it filters messages involving blocked users", %{user: user, token: oauth_token} do
711       blocked_user = insert(:user)
712       {:ok, _user_relationship} = User.block(user, blocked_user)
713
714       Streamer.get_topic_and_add_socket("public", user, oauth_token)
715       {:ok, activity} = CommonAPI.post(blocked_user, %{status: "Test"})
716       assert_receive {:render_with_user, _, _, ^activity}
717       assert Streamer.filtered_by_user?(user, activity)
718     end
719
720     test "it filters messages transitively involving blocked users", %{
721       user: blocker,
722       token: blocker_token
723     } do
724       blockee = insert(:user)
725       friend = insert(:user)
726
727       Streamer.get_topic_and_add_socket("public", blocker, blocker_token)
728
729       {:ok, _user_relationship} = User.block(blocker, blockee)
730
731       {:ok, activity_one} = CommonAPI.post(friend, %{status: "hey! @#{blockee.nickname}"})
732
733       assert_receive {:render_with_user, _, _, ^activity_one}
734       assert Streamer.filtered_by_user?(blocker, activity_one)
735
736       {:ok, activity_two} = CommonAPI.post(blockee, %{status: "hey! @#{friend.nickname}"})
737
738       assert_receive {:render_with_user, _, _, ^activity_two}
739       assert Streamer.filtered_by_user?(blocker, activity_two)
740
741       {:ok, activity_three} = CommonAPI.post(blockee, %{status: "hey! @#{blocker.nickname}"})
742
743       assert_receive {:render_with_user, _, _, ^activity_three}
744       assert Streamer.filtered_by_user?(blocker, activity_three)
745     end
746   end
747
748   describe "lists" do
749     setup do: oauth_access(["read"])
750
751     test "it doesn't send unwanted DMs to list", %{user: user_a, token: user_a_token} do
752       user_b = insert(:user)
753       user_c = insert(:user)
754
755       {:ok, user_a, user_b} = User.follow(user_a, user_b)
756
757       {:ok, list} = List.create("Test", user_a)
758       {:ok, list} = List.follow(list, user_b)
759
760       Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id})
761
762       {:ok, _activity} =
763         CommonAPI.post(user_b, %{
764           status: "@#{user_c.nickname} Test",
765           visibility: "direct"
766         })
767
768       refute_receive _
769     end
770
771     test "it doesn't send unwanted private posts to list", %{user: user_a, token: user_a_token} do
772       user_b = insert(:user)
773
774       {:ok, list} = List.create("Test", user_a)
775       {:ok, list} = List.follow(list, user_b)
776
777       Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id})
778
779       {:ok, _activity} =
780         CommonAPI.post(user_b, %{
781           status: "Test",
782           visibility: "private"
783         })
784
785       refute_receive _
786     end
787
788     test "it sends wanted private posts to list", %{user: user_a, token: user_a_token} do
789       user_b = insert(:user)
790
791       {:ok, user_a, user_b} = User.follow(user_a, user_b)
792
793       {:ok, list} = List.create("Test", user_a)
794       {:ok, list} = List.follow(list, user_b)
795
796       Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id})
797
798       {:ok, activity} =
799         CommonAPI.post(user_b, %{
800           status: "Test",
801           visibility: "private"
802         })
803
804       assert_receive {:render_with_user, _, _, ^activity}
805       refute Streamer.filtered_by_user?(user_a, activity)
806     end
807   end
808
809   describe "muted reblogs" do
810     setup do: oauth_access(["read"])
811
812     test "it filters muted reblogs", %{user: user1, token: user1_token} do
813       user2 = insert(:user)
814       user3 = insert(:user)
815       CommonAPI.follow(user1, user2)
816       CommonAPI.hide_reblogs(user1, user2)
817
818       {:ok, create_activity} = CommonAPI.post(user3, %{status: "I'm kawen"})
819
820       Streamer.get_topic_and_add_socket("user", user1, user1_token)
821       {:ok, announce_activity} = CommonAPI.repeat(create_activity.id, user2)
822       assert_receive {:render_with_user, _, _, ^announce_activity}
823       assert Streamer.filtered_by_user?(user1, announce_activity)
824     end
825
826     test "it filters reblog notification for reblog-muted actors", %{
827       user: user1,
828       token: user1_token
829     } do
830       user2 = insert(:user)
831       CommonAPI.follow(user1, user2)
832       CommonAPI.hide_reblogs(user1, user2)
833
834       {:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"})
835       Streamer.get_topic_and_add_socket("user", user1, user1_token)
836       {:ok, _announce_activity} = CommonAPI.repeat(create_activity.id, user2)
837
838       assert_receive {:render_with_user, _, "notification.json", notif}
839       assert Streamer.filtered_by_user?(user1, notif)
840     end
841
842     test "it send non-reblog notification for reblog-muted actors", %{
843       user: user1,
844       token: user1_token
845     } do
846       user2 = insert(:user)
847       CommonAPI.follow(user1, user2)
848       CommonAPI.hide_reblogs(user1, user2)
849
850       {:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"})
851       Streamer.get_topic_and_add_socket("user", user1, user1_token)
852       {:ok, _favorite_activity} = CommonAPI.favorite(user2, create_activity.id)
853
854       assert_receive {:render_with_user, _, "notification.json", notif}
855       refute Streamer.filtered_by_user?(user1, notif)
856     end
857   end
858
859   describe "muted threads" do
860     test "it filters posts from muted threads" do
861       user = insert(:user)
862       %{user: user2, token: user2_token} = oauth_access(["read"])
863       Streamer.get_topic_and_add_socket("user", user2, user2_token)
864
865       {:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
866       {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
867       {:ok, _} = CommonAPI.add_mute(user2, activity)
868
869       assert_receive {:render_with_user, _, _, ^activity}
870       assert Streamer.filtered_by_user?(user2, activity)
871     end
872   end
873
874   describe "direct streams" do
875     setup do: oauth_access(["read"])
876
877     test "it sends conversation update to the 'direct' stream", %{user: user, token: oauth_token} do
878       another_user = insert(:user)
879
880       Streamer.get_topic_and_add_socket("direct", user, oauth_token)
881
882       {:ok, _create_activity} =
883         CommonAPI.post(another_user, %{
884           status: "hey @#{user.nickname}",
885           visibility: "direct"
886         })
887
888       assert_receive {:text, received_event}
889
890       assert %{"event" => "conversation", "payload" => received_payload} =
891                Jason.decode!(received_event)
892
893       assert %{"last_status" => last_status} = Jason.decode!(received_payload)
894       [participation] = Participation.for_user(user)
895       assert last_status["pleroma"]["direct_conversation_id"] == participation.id
896     end
897
898     test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted",
899          %{user: user, token: oauth_token} do
900       another_user = insert(:user)
901
902       Streamer.get_topic_and_add_socket("direct", user, oauth_token)
903
904       {:ok, create_activity} =
905         CommonAPI.post(another_user, %{
906           status: "hi @#{user.nickname}",
907           visibility: "direct"
908         })
909
910       create_activity_id = create_activity.id
911       assert_receive {:render_with_user, _, _, ^create_activity}
912       assert_receive {:text, received_conversation1}
913       assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
914
915       {:ok, _} = CommonAPI.delete(create_activity_id, another_user)
916
917       assert_receive {:text, received_event}
918
919       assert %{"event" => "delete", "payload" => ^create_activity_id} =
920                Jason.decode!(received_event)
921
922       refute_receive _
923     end
924
925     @tag :erratic
926     test "it sends conversation update to the 'direct' stream when a message is deleted", %{
927       user: user,
928       token: oauth_token
929     } do
930       another_user = insert(:user)
931       Streamer.get_topic_and_add_socket("direct", user, oauth_token)
932
933       {:ok, create_activity} =
934         CommonAPI.post(another_user, %{
935           status: "hi @#{user.nickname}",
936           visibility: "direct"
937         })
938
939       {:ok, create_activity2} =
940         CommonAPI.post(another_user, %{
941           status: "hi @#{user.nickname} 2",
942           in_reply_to_status_id: create_activity.id,
943           visibility: "direct"
944         })
945
946       assert_receive {:render_with_user, _, _, ^create_activity}
947       assert_receive {:render_with_user, _, _, ^create_activity2}
948       assert_receive {:text, received_conversation1}
949       assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
950       assert_receive {:text, received_conversation1}
951       assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
952
953       {:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
954
955       assert_receive {:text, received_event}
956       assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
957
958       assert_receive {:text, received_event}
959
960       assert %{"event" => "conversation", "payload" => received_payload} =
961                Jason.decode!(received_event)
962
963       assert %{"last_status" => last_status} = Jason.decode!(received_payload)
964       assert last_status["id"] == to_string(create_activity.id)
965     end
966   end
967
968   describe "stop streaming if token got revoked" do
969     setup do
970       child_proc = fn start, finalize ->
971         fn ->
972           start.()
973
974           receive do
975             {StreamerTest, :ready} ->
976               assert_receive {:render_with_user, _, "update.json", _}
977
978               receive do
979                 {StreamerTest, :revoked} -> finalize.()
980               end
981           end
982         end
983       end
984
985       starter = fn user, token ->
986         fn -> Streamer.get_topic_and_add_socket("user", user, token) end
987       end
988
989       hit = fn -> assert_receive :close end
990       miss = fn -> refute_receive :close end
991
992       send_all = fn tasks, thing -> Enum.each(tasks, &send(&1.pid, thing)) end
993
994       %{
995         child_proc: child_proc,
996         starter: starter,
997         hit: hit,
998         miss: miss,
999         send_all: send_all
1000       }
1001     end
1002
1003     test "do not revoke other tokens", %{
1004       child_proc: child_proc,
1005       starter: starter,
1006       hit: hit,
1007       miss: miss,
1008       send_all: send_all
1009     } do
1010       %{user: user, token: token} = oauth_access(["read"])
1011       %{token: token2} = oauth_access(["read"], user: user)
1012       %{user: user2, token: user2_token} = oauth_access(["read"])
1013
1014       post_user = insert(:user)
1015       CommonAPI.follow(user, post_user)
1016       CommonAPI.follow(user2, post_user)
1017
1018       tasks = [
1019         Task.async(child_proc.(starter.(user, token), hit)),
1020         Task.async(child_proc.(starter.(user, token2), miss)),
1021         Task.async(child_proc.(starter.(user2, user2_token), miss))
1022       ]
1023
1024       {:ok, _} =
1025         CommonAPI.post(post_user, %{
1026           status: "hi"
1027         })
1028
1029       send_all.(tasks, {StreamerTest, :ready})
1030
1031       Pleroma.Web.OAuth.Token.Strategy.Revoke.revoke(token)
1032
1033       send_all.(tasks, {StreamerTest, :revoked})
1034
1035       Enum.each(tasks, &Task.await/1)
1036     end
1037
1038     test "revoke all streams for this token", %{
1039       child_proc: child_proc,
1040       starter: starter,
1041       hit: hit,
1042       send_all: send_all
1043     } do
1044       %{user: user, token: token} = oauth_access(["read"])
1045
1046       post_user = insert(:user)
1047       CommonAPI.follow(user, post_user)
1048
1049       tasks = [
1050         Task.async(child_proc.(starter.(user, token), hit)),
1051         Task.async(child_proc.(starter.(user, token), hit))
1052       ]
1053
1054       {:ok, _} =
1055         CommonAPI.post(post_user, %{
1056           status: "hi"
1057         })
1058
1059       send_all.(tasks, {StreamerTest, :ready})
1060
1061       Pleroma.Web.OAuth.Token.Strategy.Revoke.revoke(token)
1062
1063       send_all.(tasks, {StreamerTest, :revoked})
1064
1065       Enum.each(tasks, &Task.await/1)
1066     end
1067   end
1068 end