total rebase
[anni] / test / pleroma / web / streamer_test.exs
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.StreamerTest do
6   use Pleroma.DataCase
7
8   import Pleroma.Factory
9
10   alias Pleroma.Chat
11   alias Pleroma.Chat.MessageReference
12   alias Pleroma.Conversation.Participation
13   alias Pleroma.List
14   alias Pleroma.Object
15   alias Pleroma.User
16   alias Pleroma.Web.CommonAPI
17   alias Pleroma.Web.Streamer
18   alias Pleroma.Web.StreamerView
19
20   @moduletag needs_streamer: true, capture_log: true
21
22   setup do: clear_config([:instance, :skip_thread_containment])
23
24   describe "get_topic/_ (unauthenticated)" do
25     test "allows no stream" do
26       assert {:ok, nil} = Streamer.get_topic(nil, nil, nil)
27     end
28
29     test "allows public" do
30       assert {:ok, "public"} = Streamer.get_topic("public", nil, nil)
31       assert {:ok, "public:local"} = Streamer.get_topic("public:local", nil, nil)
32       assert {:ok, "public:media"} = Streamer.get_topic("public:media", nil, nil)
33       assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", nil, nil)
34     end
35
36     test "rejects local public streams if restricted_unauthenticated is on" do
37       clear_config([:restrict_unauthenticated, :timelines, :local], true)
38
39       assert {:error, :unauthorized} = Streamer.get_topic("public:local", nil, nil)
40       assert {:error, :unauthorized} = Streamer.get_topic("public:local:media", nil, nil)
41     end
42
43     test "rejects remote public streams if restricted_unauthenticated is on" do
44       clear_config([:restrict_unauthenticated, :timelines, :federated], true)
45
46       assert {:error, :unauthorized} = Streamer.get_topic("public", nil, nil)
47       assert {:error, :unauthorized} = Streamer.get_topic("public:media", nil, nil)
48
49       assert {:error, :unauthorized} =
50                Streamer.get_topic("public:remote", nil, nil, %{"instance" => "lain.com"})
51
52       assert {:error, :unauthorized} =
53                Streamer.get_topic("public:remote:media", nil, nil, %{"instance" => "lain.com"})
54     end
55
56     test "allows instance streams" do
57       assert {:ok, "public:remote:lain.com"} =
58                Streamer.get_topic("public:remote", nil, nil, %{"instance" => "lain.com"})
59
60       assert {:ok, "public:remote:media:lain.com"} =
61                Streamer.get_topic("public:remote:media", nil, nil, %{"instance" => "lain.com"})
62     end
63
64     test "allows hashtag streams" do
65       assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", nil, nil, %{"tag" => "cofe"})
66     end
67
68     test "disallows user streams" do
69       assert {:error, _} = Streamer.get_topic("user", nil, nil)
70       assert {:error, _} = Streamer.get_topic("user:notification", nil, nil)
71       assert {:error, _} = Streamer.get_topic("direct", nil, nil)
72     end
73
74     test "disallows list streams" do
75       assert {:error, _} = Streamer.get_topic("list", nil, nil, %{"list" => 42})
76     end
77   end
78
79   describe "get_topic/_ (authenticated)" do
80     setup do: oauth_access(["read"])
81
82     test "allows public streams (regardless of OAuth token scopes)", %{
83       user: user,
84       token: read_oauth_token
85     } do
86       with oauth_token <- [nil, read_oauth_token] do
87         assert {:ok, "public"} = Streamer.get_topic("public", user, oauth_token)
88         assert {:ok, "public:local"} = Streamer.get_topic("public:local", user, oauth_token)
89         assert {:ok, "public:media"} = Streamer.get_topic("public:media", user, oauth_token)
90
91         assert {:ok, "public:local:media"} =
92                  Streamer.get_topic("public:local:media", user, oauth_token)
93       end
94     end
95
96     test "allows local public streams if restricted_unauthenticated is on", %{
97       user: user,
98       token: oauth_token
99     } do
100       clear_config([:restrict_unauthenticated, :timelines, :local], true)
101
102       %{token: read_notifications_token} = oauth_access(["read:notifications"], user: user)
103       %{token: badly_scoped_token} = oauth_access(["irrelevant:scope"], user: user)
104
105       assert {:ok, "public:local"} = Streamer.get_topic("public:local", user, oauth_token)
106
107       assert {:ok, "public:local:media"} =
108                Streamer.get_topic("public:local:media", user, oauth_token)
109
110       for token <- [read_notifications_token, badly_scoped_token] do
111         assert {:error, :unauthorized} = Streamer.get_topic("public:local", user, token)
112
113         assert {:error, :unauthorized} = Streamer.get_topic("public:local:media", user, token)
114       end
115     end
116
117     test "allows remote public streams if restricted_unauthenticated is on", %{
118       user: user,
119       token: oauth_token
120     } do
121       clear_config([:restrict_unauthenticated, :timelines, :federated], true)
122
123       %{token: read_notifications_token} = oauth_access(["read:notifications"], user: user)
124       %{token: badly_scoped_token} = oauth_access(["irrelevant:scope"], user: user)
125
126       assert {:ok, "public"} = Streamer.get_topic("public", user, oauth_token)
127       assert {:ok, "public:media"} = Streamer.get_topic("public:media", user, oauth_token)
128
129       assert {:ok, "public:remote:lain.com"} =
130                Streamer.get_topic("public:remote", user, oauth_token, %{"instance" => "lain.com"})
131
132       assert {:ok, "public:remote:media:lain.com"} =
133                Streamer.get_topic("public:remote:media", user, oauth_token, %{
134                  "instance" => "lain.com"
135                })
136
137       for token <- [read_notifications_token, badly_scoped_token] do
138         assert {:error, :unauthorized} = Streamer.get_topic("public", user, token)
139         assert {:error, :unauthorized} = Streamer.get_topic("public:media", user, token)
140
141         assert {:error, :unauthorized} =
142                  Streamer.get_topic("public:remote", user, token, %{
143                    "instance" => "lain.com"
144                  })
145
146         assert {:error, :unauthorized} =
147                  Streamer.get_topic("public:remote:media", user, token, %{
148                    "instance" => "lain.com"
149                  })
150       end
151     end
152
153     test "allows user streams (with proper OAuth token scopes)", %{
154       user: user,
155       token: read_oauth_token
156     } do
157       %{token: read_notifications_token} = oauth_access(["read:notifications"], user: user)
158       %{token: read_statuses_token} = oauth_access(["read:statuses"], user: user)
159       %{token: badly_scoped_token} = oauth_access(["irrelevant:scope"], user: user)
160
161       expected_user_topic = "user:#{user.id}"
162       expected_notification_topic = "user:notification:#{user.id}"
163       expected_direct_topic = "direct:#{user.id}"
164       expected_pleroma_chat_topic = "user:pleroma_chat:#{user.id}"
165
166       for valid_user_token <- [read_oauth_token, read_statuses_token] do
167         assert {:ok, ^expected_user_topic} = Streamer.get_topic("user", user, valid_user_token)
168
169         assert {:ok, ^expected_direct_topic} =
170                  Streamer.get_topic("direct", user, valid_user_token)
171
172         assert {:ok, ^expected_pleroma_chat_topic} =
173                  Streamer.get_topic("user:pleroma_chat", user, valid_user_token)
174       end
175
176       for invalid_user_token <- [read_notifications_token, badly_scoped_token],
177           user_topic <- ["user", "direct", "user:pleroma_chat"] do
178         assert {:error, :unauthorized} = Streamer.get_topic(user_topic, user, invalid_user_token)
179       end
180
181       for valid_notification_token <- [read_oauth_token, read_notifications_token] do
182         assert {:ok, ^expected_notification_topic} =
183                  Streamer.get_topic("user:notification", user, valid_notification_token)
184       end
185
186       for invalid_notification_token <- [read_statuses_token, badly_scoped_token] do
187         assert {:error, :unauthorized} =
188                  Streamer.get_topic("user:notification", user, invalid_notification_token)
189       end
190     end
191
192     test "allows hashtag streams (regardless of OAuth token scopes)", %{
193       user: user,
194       token: read_oauth_token
195     } do
196       for oauth_token <- [nil, read_oauth_token] do
197         assert {:ok, "hashtag:cofe"} =
198                  Streamer.get_topic("hashtag", user, oauth_token, %{"tag" => "cofe"})
199       end
200     end
201
202     test "disallows registering to another user's stream", %{user: user, token: read_oauth_token} do
203       another_user = insert(:user)
204       assert {:error, _} = Streamer.get_topic("user:#{another_user.id}", user, read_oauth_token)
205
206       assert {:error, _} =
207                Streamer.get_topic("user:notification:#{another_user.id}", user, read_oauth_token)
208
209       assert {:error, _} = Streamer.get_topic("direct:#{another_user.id}", user, read_oauth_token)
210     end
211
212     test "allows list stream that are owned by the user (with `read` or `read:lists` scopes)", %{
213       user: user,
214       token: read_oauth_token
215     } do
216       %{token: read_lists_token} = oauth_access(["read:lists"], user: user)
217       %{token: invalid_token} = oauth_access(["irrelevant:scope"], user: user)
218       {:ok, list} = List.create("Test", user)
219
220       assert {:error, _} = Streamer.get_topic("list:#{list.id}", user, read_oauth_token)
221
222       for valid_token <- [read_oauth_token, read_lists_token] do
223         assert {:ok, _} = Streamer.get_topic("list", user, valid_token, %{"list" => list.id})
224       end
225
226       assert {:error, _} = Streamer.get_topic("list", user, invalid_token, %{"list" => list.id})
227     end
228
229     test "disallows list stream that are not owned by the user", %{user: user, token: oauth_token} do
230       another_user = insert(:user)
231       {:ok, list} = List.create("Test", another_user)
232
233       assert {:error, _} = Streamer.get_topic("list:#{list.id}", user, oauth_token)
234       assert {:error, _} = Streamer.get_topic("list", user, oauth_token, %{"list" => list.id})
235     end
236   end
237
238   describe "user streams" do
239     setup do
240       %{user: user, token: token} = oauth_access(["read"])
241       notify = insert(:notification, user: user, activity: build(:note_activity))
242       {:ok, %{user: user, notify: notify, token: token}}
243     end
244
245     test "it streams the user's post in the 'user' stream", %{user: user, token: oauth_token} do
246       Streamer.get_topic_and_add_socket("user", user, oauth_token)
247       {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
248
249       assert_receive {:render_with_user, _, _, ^activity, _}
250       refute Streamer.filtered_by_user?(user, activity)
251     end
252
253     test "it streams boosts of the user in the 'user' stream", %{user: user, token: oauth_token} do
254       Streamer.get_topic_and_add_socket("user", user, oauth_token)
255
256       other_user = insert(:user)
257       {:ok, activity} = CommonAPI.post(other_user, %{status: "hey"})
258       {:ok, announce} = CommonAPI.repeat(activity.id, user)
259
260       assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce, _}
261       refute Streamer.filtered_by_user?(user, announce)
262     end
263
264     test "it does not stream announces of the user's own posts in the 'user' stream", %{
265       user: user,
266       token: oauth_token
267     } do
268       Streamer.get_topic_and_add_socket("user", user, oauth_token)
269
270       other_user = insert(:user)
271       {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
272       {:ok, announce} = CommonAPI.repeat(activity.id, other_user)
273
274       assert Streamer.filtered_by_user?(user, announce)
275     end
276
277     test "it does stream notifications announces of the user's own posts in the 'user' stream", %{
278       user: user,
279       token: oauth_token
280     } do
281       Streamer.get_topic_and_add_socket("user", user, oauth_token)
282
283       other_user = insert(:user)
284       {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
285       {:ok, announce} = CommonAPI.repeat(activity.id, other_user)
286
287       notification =
288         Pleroma.Notification
289         |> Repo.get_by(%{user_id: user.id, activity_id: announce.id})
290         |> Repo.preload(:activity)
291
292       refute Streamer.filtered_by_user?(user, notification)
293     end
294
295     test "it streams boosts of mastodon user in the 'user' stream", %{
296       user: user,
297       token: oauth_token
298     } do
299       Streamer.get_topic_and_add_socket("user", user, oauth_token)
300
301       other_user = insert(:user)
302       {:ok, activity} = CommonAPI.post(other_user, %{status: "hey"})
303
304       data =
305         File.read!("test/fixtures/mastodon-announce.json")
306         |> Jason.decode!()
307         |> Map.put("object", activity.data["object"])
308         |> Map.put("actor", user.ap_id)
309
310       {:ok, %Pleroma.Activity{data: _data, local: false} = announce} =
311         Pleroma.Web.ActivityPub.Transmogrifier.handle_incoming(data)
312
313       assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce, _}
314       refute Streamer.filtered_by_user?(user, announce)
315     end
316
317     test "it sends notify to in the 'user' stream", %{
318       user: user,
319       token: oauth_token,
320       notify: notify
321     } do
322       Streamer.get_topic_and_add_socket("user", user, oauth_token)
323       Streamer.stream("user", notify)
324
325       assert_receive {:render_with_user, _, _, ^notify, _}
326       refute Streamer.filtered_by_user?(user, notify)
327     end
328
329     test "it sends notify to in the 'user:notification' stream", %{
330       user: user,
331       token: oauth_token,
332       notify: notify
333     } do
334       Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
335       Streamer.stream("user:notification", notify)
336
337       assert_receive {:render_with_user, _, _, ^notify, _}
338       refute Streamer.filtered_by_user?(user, notify)
339     end
340
341     test "it sends chat messages to the 'user:pleroma_chat' stream", %{
342       user: user,
343       token: oauth_token
344     } do
345       other_user = insert(:user)
346
347       {:ok, create_activity} =
348         CommonAPI.post_chat_message(other_user, user, "hey cirno", idempotency_key: "123")
349
350       object = Object.normalize(create_activity, fetch: false)
351       chat = Chat.get(user.id, other_user.ap_id)
352       cm_ref = MessageReference.for_chat_and_object(chat, object)
353       cm_ref = %{cm_ref | chat: chat, object: object}
354
355       Streamer.get_topic_and_add_socket("user:pleroma_chat", user, oauth_token)
356       Streamer.stream("user:pleroma_chat", {user, cm_ref})
357
358       text =
359         StreamerView.render(
360           "chat_update.json",
361           %{chat_message_reference: cm_ref},
362           "user:pleroma_chat:#{user.id}"
363         )
364
365       assert text =~ "hey cirno"
366       assert_receive {:text, ^text}
367     end
368
369     test "it sends chat messages to the 'user' stream", %{user: user, token: oauth_token} do
370       other_user = insert(:user)
371
372       {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno")
373       object = Object.normalize(create_activity, fetch: false)
374       chat = Chat.get(user.id, other_user.ap_id)
375       cm_ref = MessageReference.for_chat_and_object(chat, object)
376       cm_ref = %{cm_ref | chat: chat, object: object}
377
378       Streamer.get_topic_and_add_socket("user", user, oauth_token)
379       Streamer.stream("user", {user, cm_ref})
380
381       text =
382         StreamerView.render(
383           "chat_update.json",
384           %{chat_message_reference: cm_ref},
385           "user:#{user.id}"
386         )
387
388       assert text =~ "hey cirno"
389       assert_receive {:text, ^text}
390     end
391
392     test "it sends chat message notifications to the 'user:notification' stream", %{
393       user: user,
394       token: oauth_token
395     } do
396       other_user = insert(:user)
397
398       {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey")
399
400       notify =
401         Repo.get_by(Pleroma.Notification, user_id: user.id, activity_id: create_activity.id)
402         |> Repo.preload(:activity)
403
404       Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
405       Streamer.stream("user:notification", notify)
406
407       assert_receive {:render_with_user, _, _, ^notify, _}
408       refute Streamer.filtered_by_user?(user, notify)
409     end
410
411     test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
412       user: user,
413       token: oauth_token
414     } do
415       blocked = insert(:user)
416       {:ok, _user_relationship} = User.block(user, blocked)
417
418       Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
419
420       {:ok, activity} = CommonAPI.post(user, %{status: ":("})
421       {:ok, _} = CommonAPI.favorite(blocked, activity.id)
422
423       refute_receive _
424     end
425
426     test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
427       user: user,
428       token: oauth_token
429     } do
430       user2 = insert(:user)
431
432       {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
433       {:ok, _} = CommonAPI.add_mute(user, activity)
434
435       Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
436
437       {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
438
439       refute_receive _
440       assert Streamer.filtered_by_user?(user, favorite_activity)
441     end
442
443     test "it sends favorite to 'user:notification' stream'", %{
444       user: user,
445       token: oauth_token
446     } do
447       user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
448
449       {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
450       Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
451       {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
452
453       assert_receive {:render_with_user, _, "notification.json", notif, _}
454       assert notif.activity.id == favorite_activity.id
455       refute Streamer.filtered_by_user?(user, notif)
456     end
457
458     test "it doesn't send the 'user:notification' stream' when a domain is blocked", %{
459       user: user,
460       token: oauth_token
461     } do
462       user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
463
464       {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
465       {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
466       Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
467       {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
468
469       refute_receive _
470       assert Streamer.filtered_by_user?(user, favorite_activity)
471     end
472
473     test "it sends follow activities to the 'user:notification' stream", %{
474       user: user,
475       token: oauth_token
476     } do
477       user2 = insert(:user)
478
479       Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
480       {:ok, _follower, _followed, follow_activity} = CommonAPI.follow(user2, user)
481
482       assert_receive {:render_with_user, _, "notification.json", notif, _}
483       assert notif.activity.id == follow_activity.id
484       refute Streamer.filtered_by_user?(user, notif)
485     end
486
487     test "it sends follow relationships updates to the 'user' stream", %{
488       user: user,
489       token: oauth_token
490     } do
491       user_id = user.id
492       other_user = insert(:user)
493       other_user_id = other_user.id
494
495       Streamer.get_topic_and_add_socket("user", user, oauth_token)
496       {:ok, _follower, _followed, _follow_activity} = CommonAPI.follow(user, other_user)
497
498       assert_receive {:text, event}
499
500       assert %{"event" => "pleroma:follow_relationships_update", "payload" => payload} =
501                Jason.decode!(event)
502
503       assert %{
504                "follower" => %{
505                  "follower_count" => 0,
506                  "following_count" => 0,
507                  "id" => ^user_id
508                },
509                "following" => %{
510                  "follower_count" => 0,
511                  "following_count" => 0,
512                  "id" => ^other_user_id
513                },
514                "state" => "follow_pending"
515              } = Jason.decode!(payload)
516
517       assert_receive {:text, event}
518
519       assert %{"event" => "pleroma:follow_relationships_update", "payload" => payload} =
520                Jason.decode!(event)
521
522       assert %{
523                "follower" => %{
524                  "follower_count" => 0,
525                  "following_count" => 1,
526                  "id" => ^user_id
527                },
528                "following" => %{
529                  "follower_count" => 1,
530                  "following_count" => 0,
531                  "id" => ^other_user_id
532                },
533                "state" => "follow_accept"
534              } = Jason.decode!(payload)
535     end
536
537     test "it streams edits in the 'user' stream", %{user: user, token: oauth_token} do
538       sender = insert(:user)
539       {:ok, _, _, _} = CommonAPI.follow(user, sender)
540
541       {:ok, activity} = CommonAPI.post(sender, %{status: "hey"})
542
543       Streamer.get_topic_and_add_socket("user", user, oauth_token)
544       {:ok, edited} = CommonAPI.update(sender, activity, %{status: "mew mew"})
545       create = Pleroma.Activity.get_create_by_object_ap_id_with_object(activity.object.data["id"])
546
547       assert_receive {:render_with_user, _, "status_update.json", ^create, _}
548       refute Streamer.filtered_by_user?(user, edited)
549     end
550
551     test "it streams own edits in the 'user' stream", %{user: user, token: oauth_token} do
552       {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
553
554       Streamer.get_topic_and_add_socket("user", user, oauth_token)
555       {:ok, edited} = CommonAPI.update(user, activity, %{status: "mew mew"})
556       create = Pleroma.Activity.get_create_by_object_ap_id_with_object(activity.object.data["id"])
557
558       assert_receive {:render_with_user, _, "status_update.json", ^create, _}
559       refute Streamer.filtered_by_user?(user, edited)
560     end
561   end
562
563   describe "public streams" do
564     test "it sends to public (authenticated)" do
565       %{user: user, token: oauth_token} = oauth_access(["read"])
566       other_user = insert(:user)
567
568       Streamer.get_topic_and_add_socket("public", user, oauth_token)
569
570       {:ok, activity} = CommonAPI.post(other_user, %{status: "Test"})
571       assert_receive {:render_with_user, _, _, ^activity, _}
572       refute Streamer.filtered_by_user?(other_user, activity)
573     end
574
575     test "it sends to public (unauthenticated)" do
576       user = insert(:user)
577
578       Streamer.get_topic_and_add_socket("public", nil, nil)
579
580       {:ok, activity} = CommonAPI.post(user, %{status: "Test"})
581       activity_id = activity.id
582       assert_receive {:text, event}
583       assert %{"event" => "update", "payload" => payload} = Jason.decode!(event)
584       assert %{"id" => ^activity_id} = Jason.decode!(payload)
585
586       {:ok, _} = CommonAPI.delete(activity.id, user)
587       assert_receive {:text, event}
588       assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
589     end
590
591     test "handles deletions" do
592       %{user: user, token: oauth_token} = oauth_access(["read"])
593       other_user = insert(:user)
594       {:ok, activity} = CommonAPI.post(other_user, %{status: "Test"})
595
596       Streamer.get_topic_and_add_socket("public", user, oauth_token)
597
598       {:ok, _} = CommonAPI.delete(activity.id, other_user)
599       activity_id = activity.id
600       assert_receive {:text, event}
601       assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
602     end
603
604     test "it streams edits in the 'public' stream" do
605       sender = insert(:user)
606
607       Streamer.get_topic_and_add_socket("public", nil, nil)
608       {:ok, activity} = CommonAPI.post(sender, %{status: "hey"})
609       assert_receive {:text, _}
610
611       {:ok, edited} = CommonAPI.update(sender, activity, %{status: "mew mew"})
612
613       edited = Pleroma.Activity.normalize(edited)
614
615       %{id: activity_id} = Pleroma.Activity.get_create_by_object_ap_id(edited.object.data["id"])
616
617       assert_receive {:text, event}
618       assert %{"event" => "status.update", "payload" => payload} = Jason.decode!(event)
619       assert %{"id" => ^activity_id} = Jason.decode!(payload)
620       refute Streamer.filtered_by_user?(sender, edited)
621     end
622
623     test "it streams multiple edits in the 'public' stream correctly" do
624       sender = insert(:user)
625
626       Streamer.get_topic_and_add_socket("public", nil, nil)
627       {:ok, activity} = CommonAPI.post(sender, %{status: "hey"})
628       assert_receive {:text, _}
629
630       {:ok, edited} = CommonAPI.update(sender, activity, %{status: "mew mew"})
631
632       edited = Pleroma.Activity.normalize(edited)
633
634       %{id: activity_id} = Pleroma.Activity.get_create_by_object_ap_id(edited.object.data["id"])
635
636       assert_receive {:text, event}
637       assert %{"event" => "status.update", "payload" => payload} = Jason.decode!(event)
638       assert %{"id" => ^activity_id} = Jason.decode!(payload)
639       refute Streamer.filtered_by_user?(sender, edited)
640
641       {:ok, edited} = CommonAPI.update(sender, activity, %{status: "mew mew 2"})
642
643       edited = Pleroma.Activity.normalize(edited)
644
645       %{id: activity_id} = Pleroma.Activity.get_create_by_object_ap_id(edited.object.data["id"])
646       assert_receive {:text, event}
647       assert %{"event" => "status.update", "payload" => payload} = Jason.decode!(event)
648       assert %{"id" => ^activity_id, "content" => "mew mew 2"} = Jason.decode!(payload)
649       refute Streamer.filtered_by_user?(sender, edited)
650     end
651   end
652
653   describe "thread_containment/2" do
654     test "it filters to user if recipients invalid and thread containment is enabled" do
655       clear_config([:instance, :skip_thread_containment], false)
656       author = insert(:user)
657       %{user: user, token: oauth_token} = oauth_access(["read"])
658       User.follow(user, author, :follow_accept)
659
660       activity =
661         insert(:note_activity,
662           note:
663             insert(:note,
664               user: author,
665               data: %{"to" => ["TEST-FFF"]}
666             )
667         )
668
669       Streamer.get_topic_and_add_socket("public", user, oauth_token)
670       Streamer.stream("public", activity)
671       assert_receive {:render_with_user, _, _, ^activity, _}
672       assert Streamer.filtered_by_user?(user, activity)
673     end
674
675     test "it sends message if recipients invalid and thread containment is disabled" do
676       clear_config([:instance, :skip_thread_containment], true)
677       author = insert(:user)
678       %{user: user, token: oauth_token} = oauth_access(["read"])
679       User.follow(user, author, :follow_accept)
680
681       activity =
682         insert(:note_activity,
683           note:
684             insert(:note,
685               user: author,
686               data: %{"to" => ["TEST-FFF"]}
687             )
688         )
689
690       Streamer.get_topic_and_add_socket("public", user, oauth_token)
691       Streamer.stream("public", activity)
692
693       assert_receive {:render_with_user, _, _, ^activity, _}
694       refute Streamer.filtered_by_user?(user, activity)
695     end
696
697     test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do
698       clear_config([:instance, :skip_thread_containment], false)
699       author = insert(:user)
700       user = insert(:user, skip_thread_containment: true)
701       %{token: oauth_token} = oauth_access(["read"], user: user)
702       User.follow(user, author, :follow_accept)
703
704       activity =
705         insert(:note_activity,
706           note:
707             insert(:note,
708               user: author,
709               data: %{"to" => ["TEST-FFF"]}
710             )
711         )
712
713       Streamer.get_topic_and_add_socket("public", user, oauth_token)
714       Streamer.stream("public", activity)
715
716       assert_receive {:render_with_user, _, _, ^activity, _}
717       refute Streamer.filtered_by_user?(user, activity)
718     end
719   end
720
721   describe "blocks" do
722     setup do: oauth_access(["read"])
723
724     test "it filters messages involving blocked users", %{user: user, token: oauth_token} do
725       blocked_user = insert(:user)
726       {:ok, _user_relationship} = User.block(user, blocked_user)
727
728       Streamer.get_topic_and_add_socket("public", user, oauth_token)
729       {:ok, activity} = CommonAPI.post(blocked_user, %{status: "Test"})
730       assert_receive {:render_with_user, _, _, ^activity, _}
731       assert Streamer.filtered_by_user?(user, activity)
732     end
733
734     test "it filters messages transitively involving blocked users", %{
735       user: blocker,
736       token: blocker_token
737     } do
738       blockee = insert(:user)
739       friend = insert(:user)
740
741       Streamer.get_topic_and_add_socket("public", blocker, blocker_token)
742
743       {:ok, _user_relationship} = User.block(blocker, blockee)
744
745       {:ok, activity_one} = CommonAPI.post(friend, %{status: "hey! @#{blockee.nickname}"})
746
747       assert_receive {:render_with_user, _, _, ^activity_one, _}
748       assert Streamer.filtered_by_user?(blocker, activity_one)
749
750       {:ok, activity_two} = CommonAPI.post(blockee, %{status: "hey! @#{friend.nickname}"})
751
752       assert_receive {:render_with_user, _, _, ^activity_two, _}
753       assert Streamer.filtered_by_user?(blocker, activity_two)
754
755       {:ok, activity_three} = CommonAPI.post(blockee, %{status: "hey! @#{blocker.nickname}"})
756
757       assert_receive {:render_with_user, _, _, ^activity_three, _}
758       assert Streamer.filtered_by_user?(blocker, activity_three)
759     end
760   end
761
762   describe "lists" do
763     setup do: oauth_access(["read"])
764
765     test "it doesn't send unwanted DMs to list", %{user: user_a, token: user_a_token} do
766       user_b = insert(:user)
767       user_c = insert(:user)
768
769       {:ok, user_a, user_b} = User.follow(user_a, user_b)
770
771       {:ok, list} = List.create("Test", user_a)
772       {:ok, list} = List.follow(list, user_b)
773
774       Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id})
775
776       {:ok, _activity} =
777         CommonAPI.post(user_b, %{
778           status: "@#{user_c.nickname} Test",
779           visibility: "direct"
780         })
781
782       refute_receive _
783     end
784
785     test "it doesn't send unwanted private posts to list", %{user: user_a, token: user_a_token} do
786       user_b = insert(:user)
787
788       {:ok, list} = List.create("Test", user_a)
789       {:ok, list} = List.follow(list, user_b)
790
791       Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id})
792
793       {:ok, _activity} =
794         CommonAPI.post(user_b, %{
795           status: "Test",
796           visibility: "private"
797         })
798
799       refute_receive _
800     end
801
802     test "it sends wanted private posts to list", %{user: user_a, token: user_a_token} do
803       user_b = insert(:user)
804
805       {:ok, user_a, user_b} = User.follow(user_a, user_b)
806
807       {:ok, list} = List.create("Test", user_a)
808       {:ok, list} = List.follow(list, user_b)
809
810       Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id})
811
812       {:ok, activity} =
813         CommonAPI.post(user_b, %{
814           status: "Test",
815           visibility: "private"
816         })
817
818       assert_receive {:render_with_user, _, _, ^activity, _}
819       refute Streamer.filtered_by_user?(user_a, activity)
820     end
821   end
822
823   describe "muted reblogs" do
824     setup do: oauth_access(["read"])
825
826     test "it filters muted reblogs", %{user: user1, token: user1_token} do
827       user2 = insert(:user)
828       user3 = insert(:user)
829       CommonAPI.follow(user1, user2)
830       CommonAPI.hide_reblogs(user1, user2)
831
832       {:ok, create_activity} = CommonAPI.post(user3, %{status: "I'm kawen"})
833
834       Streamer.get_topic_and_add_socket("user", user1, user1_token)
835       {:ok, announce_activity} = CommonAPI.repeat(create_activity.id, user2)
836       assert_receive {:render_with_user, _, _, ^announce_activity, _}
837       assert Streamer.filtered_by_user?(user1, announce_activity)
838     end
839
840     test "it filters reblog notification for reblog-muted actors", %{
841       user: user1,
842       token: user1_token
843     } do
844       user2 = insert(:user)
845       CommonAPI.follow(user1, user2)
846       CommonAPI.hide_reblogs(user1, user2)
847
848       {:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"})
849       Streamer.get_topic_and_add_socket("user", user1, user1_token)
850       {:ok, _announce_activity} = CommonAPI.repeat(create_activity.id, user2)
851
852       assert_receive {:render_with_user, _, "notification.json", notif, _}
853       assert Streamer.filtered_by_user?(user1, notif)
854     end
855
856     test "it send non-reblog notification for reblog-muted actors", %{
857       user: user1,
858       token: user1_token
859     } do
860       user2 = insert(:user)
861       CommonAPI.follow(user1, user2)
862       CommonAPI.hide_reblogs(user1, user2)
863
864       {:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"})
865       Streamer.get_topic_and_add_socket("user", user1, user1_token)
866       {:ok, _favorite_activity} = CommonAPI.favorite(user2, create_activity.id)
867
868       assert_receive {:render_with_user, _, "notification.json", notif, _}
869       refute Streamer.filtered_by_user?(user1, notif)
870     end
871   end
872
873   describe "muted threads" do
874     test "it filters posts from muted threads" do
875       user = insert(:user)
876       %{user: user2, token: user2_token} = oauth_access(["read"])
877       Streamer.get_topic_and_add_socket("user", user2, user2_token)
878
879       {:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
880       {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
881       {:ok, _} = CommonAPI.add_mute(user2, activity)
882
883       assert_receive {:render_with_user, _, _, ^activity, _}
884       assert Streamer.filtered_by_user?(user2, activity)
885     end
886   end
887
888   describe "direct streams" do
889     setup do: oauth_access(["read"])
890
891     test "it sends conversation update to the 'direct' stream", %{user: user, token: oauth_token} do
892       another_user = insert(:user)
893
894       Streamer.get_topic_and_add_socket("direct", user, oauth_token)
895
896       {:ok, _create_activity} =
897         CommonAPI.post(another_user, %{
898           status: "hey @#{user.nickname}",
899           visibility: "direct"
900         })
901
902       assert_receive {:text, received_event}
903
904       assert %{"event" => "conversation", "payload" => received_payload} =
905                Jason.decode!(received_event)
906
907       assert %{"last_status" => last_status} = Jason.decode!(received_payload)
908       [participation] = Participation.for_user(user)
909       assert last_status["pleroma"]["direct_conversation_id"] == participation.id
910     end
911
912     test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted",
913          %{user: user, token: oauth_token} do
914       another_user = insert(:user)
915
916       Streamer.get_topic_and_add_socket("direct", user, oauth_token)
917
918       {:ok, create_activity} =
919         CommonAPI.post(another_user, %{
920           status: "hi @#{user.nickname}",
921           visibility: "direct"
922         })
923
924       create_activity_id = create_activity.id
925       assert_receive {:render_with_user, _, _, ^create_activity, _}
926       assert_receive {:text, received_conversation1}
927       assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
928
929       {:ok, _} = CommonAPI.delete(create_activity_id, another_user)
930
931       assert_receive {:text, received_event}
932
933       assert %{"event" => "delete", "payload" => ^create_activity_id} =
934                Jason.decode!(received_event)
935
936       refute_receive _
937     end
938
939     @tag :erratic
940     test "it sends conversation update to the 'direct' stream when a message is deleted", %{
941       user: user,
942       token: oauth_token
943     } do
944       another_user = insert(:user)
945       Streamer.get_topic_and_add_socket("direct", user, oauth_token)
946
947       {:ok, create_activity} =
948         CommonAPI.post(another_user, %{
949           status: "hi @#{user.nickname}",
950           visibility: "direct"
951         })
952
953       {:ok, create_activity2} =
954         CommonAPI.post(another_user, %{
955           status: "hi @#{user.nickname} 2",
956           in_reply_to_status_id: create_activity.id,
957           visibility: "direct"
958         })
959
960       assert_receive {:render_with_user, _, _, ^create_activity, _}
961       assert_receive {:render_with_user, _, _, ^create_activity2, _}
962       assert_receive {:text, received_conversation1}
963       assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
964       assert_receive {:text, received_conversation1}
965       assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
966
967       {:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
968
969       assert_receive {:text, received_event}
970       assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
971
972       assert_receive {:text, received_event}
973
974       assert %{"event" => "conversation", "payload" => received_payload} =
975                Jason.decode!(received_event)
976
977       assert %{"last_status" => last_status} = Jason.decode!(received_payload)
978       assert last_status["id"] == to_string(create_activity.id)
979     end
980   end
981
982   describe "stop streaming if token got revoked" do
983     setup do
984       child_proc = fn start, finalize ->
985         fn ->
986           start.()
987
988           receive do
989             {StreamerTest, :ready} ->
990               assert_receive {:render_with_user, _, "update.json", _, _}
991
992               receive do
993                 {StreamerTest, :revoked} -> finalize.()
994               end
995           end
996         end
997       end
998
999       starter = fn user, token ->
1000         fn -> Streamer.get_topic_and_add_socket("user", user, token) end
1001       end
1002
1003       hit = fn -> assert_receive :close end
1004       miss = fn -> refute_receive :close end
1005
1006       send_all = fn tasks, thing -> Enum.each(tasks, &send(&1.pid, thing)) end
1007
1008       %{
1009         child_proc: child_proc,
1010         starter: starter,
1011         hit: hit,
1012         miss: miss,
1013         send_all: send_all
1014       }
1015     end
1016
1017     test "do not revoke other tokens", %{
1018       child_proc: child_proc,
1019       starter: starter,
1020       hit: hit,
1021       miss: miss,
1022       send_all: send_all
1023     } do
1024       %{user: user, token: token} = oauth_access(["read"])
1025       %{token: token2} = oauth_access(["read"], user: user)
1026       %{user: user2, token: user2_token} = oauth_access(["read"])
1027
1028       post_user = insert(:user)
1029       CommonAPI.follow(user, post_user)
1030       CommonAPI.follow(user2, post_user)
1031
1032       tasks = [
1033         Task.async(child_proc.(starter.(user, token), hit)),
1034         Task.async(child_proc.(starter.(user, token2), miss)),
1035         Task.async(child_proc.(starter.(user2, user2_token), miss))
1036       ]
1037
1038       {:ok, _} =
1039         CommonAPI.post(post_user, %{
1040           status: "hi"
1041         })
1042
1043       send_all.(tasks, {StreamerTest, :ready})
1044
1045       Pleroma.Web.OAuth.Token.Strategy.Revoke.revoke(token)
1046
1047       send_all.(tasks, {StreamerTest, :revoked})
1048
1049       Enum.each(tasks, &Task.await/1)
1050     end
1051
1052     test "revoke all streams for this token", %{
1053       child_proc: child_proc,
1054       starter: starter,
1055       hit: hit,
1056       send_all: send_all
1057     } do
1058       %{user: user, token: token} = oauth_access(["read"])
1059
1060       post_user = insert(:user)
1061       CommonAPI.follow(user, post_user)
1062
1063       tasks = [
1064         Task.async(child_proc.(starter.(user, token), hit)),
1065         Task.async(child_proc.(starter.(user, token), hit))
1066       ]
1067
1068       {:ok, _} =
1069         CommonAPI.post(post_user, %{
1070           status: "hi"
1071         })
1072
1073       send_all.(tasks, {StreamerTest, :ready})
1074
1075       Pleroma.Web.OAuth.Token.Strategy.Revoke.revoke(token)
1076
1077       send_all.(tasks, {StreamerTest, :revoked})
1078
1079       Enum.each(tasks, &Task.await/1)
1080     end
1081   end
1082 end