7f262d228cc9ebf8a5222df45446702b8342e855
[anni] / benchmarks / load_testing / activities.ex
1 defmodule Pleroma.LoadTesting.Activities do
2   @moduledoc """
3   Module for generating different activities.
4   """
5   import Ecto.Query
6   import Pleroma.LoadTesting.Helper, only: [to_sec: 1]
7
8   alias Ecto.UUID
9   alias Pleroma.Constants
10   alias Pleroma.LoadTesting.Users
11   alias Pleroma.Repo
12   alias Pleroma.Web.CommonAPI
13
14   require Constants
15
16   @defaults [
17     iterations: 170,
18     friends_used: 20,
19     non_friends_used: 20
20   ]
21
22   @max_concurrency 10
23
24   @visibility ~w(public private direct unlisted)
25   @types [
26     :simple,
27     :simple_filtered,
28     :emoji,
29     :mentions,
30     :hell_thread,
31     :attachment,
32     :tag,
33     :like,
34     :reblog,
35     :simple_thread
36   ]
37   @groups [:friends_local, :friends_remote, :non_friends_local, :non_friends_local]
38   @remote_groups [:friends_remote, :non_friends_remote]
39   @friends_groups [:friends_local, :friends_remote]
40   @non_friends_groups [:non_friends_local, :non_friends_remote]
41
42   @spec generate(User.t(), keyword()) :: :ok
43   def generate(user, opts \\ []) do
44     {:ok, _} =
45       Agent.start_link(fn -> %{} end,
46         name: :benchmark_state
47       )
48
49     opts = Keyword.merge(@defaults, opts)
50
51     users = Users.prepare_users(user, opts)
52
53     {:ok, _} = Agent.start_link(fn -> users[:non_friends_remote] end, name: :non_friends_remote)
54
55     task_data =
56       for visibility <- @visibility,
57           type <- @types,
58           group <- [:user | @groups],
59           do: {visibility, type, group}
60
61     IO.puts("Starting generating #{opts[:iterations]} iterations of activities...")
62
63     public_long_thread = fn ->
64       generate_long_thread("public", users, opts)
65     end
66
67     private_long_thread = fn ->
68       generate_long_thread("private", users, opts)
69     end
70
71     iterations = opts[:iterations]
72
73     {time, _} =
74       :timer.tc(fn ->
75         Enum.each(
76           1..iterations,
77           fn
78             i when i == iterations - 2 ->
79               spawn(public_long_thread)
80               spawn(private_long_thread)
81               generate_activities(users, Enum.shuffle(task_data), opts)
82
83             _ ->
84               generate_activities(users, Enum.shuffle(task_data), opts)
85           end
86         )
87       end)
88
89     IO.puts("Generating iterations of activities took #{to_sec(time)} sec.\n")
90     :ok
91   end
92
93   def generate_power_intervals(opts \\ []) do
94     count = Keyword.get(opts, :count, 20)
95     power = Keyword.get(opts, :power, 2)
96     IO.puts("Generating #{count} intervals for a power #{power} series...")
97     counts = Enum.map(1..count, fn n -> :math.pow(n, power) end)
98     sum = Enum.sum(counts)
99
100     densities =
101       Enum.map(counts, fn c ->
102         c / sum
103       end)
104
105     densities
106     |> Enum.reduce(0, fn density, acc ->
107       if acc == 0 do
108         [{0, density}]
109       else
110         [{_, lower} | _] = acc
111         [{lower, lower + density} | acc]
112       end
113     end)
114     |> Enum.reverse()
115   end
116
117   def generate_tagged_activities(opts \\ []) do
118     tag_count = Keyword.get(opts, :tag_count, 20)
119     users = Keyword.get(opts, :users, Repo.all(Pleroma.User))
120     activity_count = Keyword.get(opts, :count, 200_000)
121
122     intervals = generate_power_intervals(count: tag_count)
123
124     IO.puts(
125       "Generating #{activity_count} activities using #{tag_count} different tags of format `tag_n`, starting at tag_0"
126     )
127
128     Enum.each(1..activity_count, fn _ ->
129       random = :rand.uniform()
130       i = Enum.find_index(intervals, fn {lower, upper} -> lower <= random && upper > random end)
131       CommonAPI.post(Enum.random(users), %{status: "a post with the tag #tag_#{i}"})
132     end)
133   end
134
135   defp generate_long_thread(visibility, users, _opts) do
136     group =
137       if visibility == "public",
138         do: :friends_local,
139         else: :user
140
141     tasks = get_reply_tasks(visibility, group) |> Stream.cycle() |> Enum.take(50)
142
143     {:ok, activity} =
144       CommonAPI.post(users[:user], %{
145         status: "Start of #{visibility} long thread",
146         visibility: visibility
147       })
148
149     Agent.update(:benchmark_state, fn state ->
150       key =
151         if visibility == "public",
152           do: :public_thread,
153           else: :private_thread
154
155       Map.put(state, key, activity)
156     end)
157
158     acc = {activity.id, ["@" <> users[:user].nickname, "reply to long thread"]}
159     insert_replies_for_long_thread(tasks, visibility, users, acc)
160     IO.puts("Generating #{visibility} long thread ended\n")
161   end
162
163   defp insert_replies_for_long_thread(tasks, visibility, users, acc) do
164     Enum.reduce(tasks, acc, fn
165       :user, {id, data} ->
166         user = users[:user]
167         insert_reply(user, List.delete(data, "@" <> user.nickname), id, visibility)
168
169       group, {id, data} ->
170         replier = Enum.random(users[group])
171         insert_reply(replier, List.delete(data, "@" <> replier.nickname), id, visibility)
172     end)
173   end
174
175   defp generate_activities(users, task_data, opts) do
176     Task.async_stream(
177       task_data,
178       fn {visibility, type, group} ->
179         insert_activity(type, visibility, group, users, opts)
180       end,
181       max_concurrency: @max_concurrency,
182       timeout: 30_000
183     )
184     |> Stream.run()
185   end
186
187   defp insert_local_activity(visibility, group, users, status) do
188     {:ok, _} =
189       group
190       |> get_actor(users)
191       |> CommonAPI.post(%{status: status, visibility: visibility})
192   end
193
194   defp insert_remote_activity(visibility, group, users, status) do
195     actor = get_actor(group, users)
196     {act_data, obj_data} = prepare_activity_data(actor, visibility, users[:user])
197     {activity_data, object_data} = other_data(actor, status)
198
199     activity_data
200     |> Map.merge(act_data)
201     |> Map.put("object", Map.merge(object_data, obj_data))
202     |> Pleroma.Web.ActivityPub.ActivityPub.insert(false)
203   end
204
205   defp user_mentions(users) do
206     user_mentions =
207       Enum.reduce(
208         @groups,
209         [],
210         fn group, acc ->
211           acc ++ get_random_mentions(users[group], Enum.random(0..2))
212         end
213       )
214
215     if Enum.random([true, false]),
216       do: ["@" <> users[:user].nickname | user_mentions],
217       else: user_mentions
218   end
219
220   defp hell_thread_mentions(users) do
221     with {:ok, nil} <- Cachex.get(:user_cache, "hell_thread_mentions") do
222       cached =
223         @groups
224         |> Enum.reduce([users[:user]], fn group, acc ->
225           acc ++ Enum.take(users[group], 5)
226         end)
227         |> Enum.map(&"@#{&1.nickname}")
228         |> Enum.join(", ")
229
230       Cachex.put(:user_cache, "hell_thread_mentions", cached)
231       cached
232     else
233       {:ok, cached} -> cached
234     end
235   end
236
237   defp insert_activity(:simple, visibility, group, users, _opts)
238        when group in @remote_groups do
239     insert_remote_activity(visibility, group, users, "Remote status")
240   end
241
242   defp insert_activity(:simple, visibility, group, users, _opts) do
243     insert_local_activity(visibility, group, users, "Simple status")
244   end
245
246   defp insert_activity(:simple_filtered, visibility, group, users, _opts)
247        when group in @remote_groups do
248     insert_remote_activity(visibility, group, users, "Remote status which must be filtered")
249   end
250
251   defp insert_activity(:simple_filtered, visibility, group, users, _opts) do
252     insert_local_activity(visibility, group, users, "Simple status which must be filtered")
253   end
254
255   defp insert_activity(:emoji, visibility, group, users, _opts)
256        when group in @remote_groups do
257     insert_remote_activity(visibility, group, users, "Remote status with emoji :firefox:")
258   end
259
260   defp insert_activity(:emoji, visibility, group, users, _opts) do
261     insert_local_activity(visibility, group, users, "Simple status with emoji :firefox:")
262   end
263
264   defp insert_activity(:mentions, visibility, group, users, _opts)
265        when group in @remote_groups do
266     mentions = user_mentions(users)
267
268     status = Enum.join(mentions, ", ") <> " remote status with mentions"
269
270     insert_remote_activity(visibility, group, users, status)
271   end
272
273   defp insert_activity(:mentions, visibility, group, users, _opts) do
274     mentions = user_mentions(users)
275
276     status = Enum.join(mentions, ", ") <> " simple status with mentions"
277     insert_remote_activity(visibility, group, users, status)
278   end
279
280   defp insert_activity(:hell_thread, visibility, group, users, _)
281        when group in @remote_groups do
282     mentions = hell_thread_mentions(users)
283     insert_remote_activity(visibility, group, users, mentions <> " remote hell thread status")
284   end
285
286   defp insert_activity(:hell_thread, visibility, group, users, _opts) do
287     mentions = hell_thread_mentions(users)
288
289     insert_local_activity(visibility, group, users, mentions <> " hell thread status")
290   end
291
292   defp insert_activity(:attachment, visibility, group, users, _opts) do
293     actor = get_actor(group, users)
294
295     obj_data = %{
296       "actor" => actor.ap_id,
297       "name" => "4467-11.jpg",
298       "type" => "Document",
299       "url" => [
300         %{
301           "href" =>
302             "#{Pleroma.Web.Endpoint.url()}/media/b1b873552422a07bf53af01f3c231c841db4dfc42c35efde681abaf0f2a4eab7.jpg",
303           "mediaType" => "image/jpeg",
304           "type" => "Link"
305         }
306       ]
307     }
308
309     object = Repo.insert!(%Pleroma.Object{data: obj_data})
310
311     {:ok, _activity} =
312       CommonAPI.post(actor, %{
313         status: "Post with attachment",
314         visibility: visibility,
315         media_ids: [object.id]
316       })
317   end
318
319   defp insert_activity(:tag, visibility, group, users, _opts) do
320     insert_local_activity(visibility, group, users, "Status with #tag")
321   end
322
323   defp insert_activity(:like, visibility, group, users, opts) do
324     actor = get_actor(group, users)
325
326     with activity_id when not is_nil(activity_id) <- get_random_create_activity_id(),
327          {:ok, _activity} <- CommonAPI.favorite(actor, activity_id) do
328       :ok
329     else
330       {:error, _} ->
331         insert_activity(:like, visibility, group, users, opts)
332
333       nil ->
334         Process.sleep(15)
335         insert_activity(:like, visibility, group, users, opts)
336     end
337   end
338
339   defp insert_activity(:reblog, visibility, group, users, opts) do
340     actor = get_actor(group, users)
341
342     with activity_id when not is_nil(activity_id) <- get_random_create_activity_id(),
343          {:ok, _activity} <- CommonAPI.repeat(activity_id, actor) do
344       :ok
345     else
346       {:error, _} ->
347         insert_activity(:reblog, visibility, group, users, opts)
348
349       nil ->
350         Process.sleep(15)
351         insert_activity(:reblog, visibility, group, users, opts)
352     end
353   end
354
355   defp insert_activity(:simple_thread, "direct", group, users, _opts) do
356     actor = get_actor(group, users)
357     tasks = get_reply_tasks("direct", group)
358
359     list =
360       case group do
361         :user ->
362           group = Enum.random(@friends_groups)
363           Enum.take(users[group], 3)
364
365         _ ->
366           Enum.take(users[group], 3)
367       end
368
369     data = Enum.map(list, &("@" <> &1.nickname))
370
371     {:ok, activity} =
372       CommonAPI.post(actor, %{
373         status: Enum.join(data, ", ") <> "simple status",
374         visibility: "direct"
375       })
376
377     acc = {activity.id, ["@" <> users[:user].nickname | data] ++ ["reply to status"]}
378     insert_direct_replies(tasks, users[:user], list, acc)
379   end
380
381   defp insert_activity(:simple_thread, visibility, group, users, _opts) do
382     actor = get_actor(group, users)
383     tasks = get_reply_tasks(visibility, group)
384
385     {:ok, activity} =
386       CommonAPI.post(users[:user], %{status: "Simple status", visibility: visibility})
387
388     acc = {activity.id, ["@" <> actor.nickname, "reply to status"]}
389     insert_replies(tasks, visibility, users, acc)
390   end
391
392   defp get_actor(:user, %{user: user}), do: user
393   defp get_actor(group, users), do: Enum.random(users[group])
394
395   defp other_data(actor, content) do
396     %{host: host} = URI.parse(actor.ap_id)
397     datetime = DateTime.utc_now() |> to_string()
398     context_id = "https://#{host}/contexts/#{UUID.generate()}"
399     activity_id = "https://#{host}/activities/#{UUID.generate()}"
400     object_id = "https://#{host}/objects/#{UUID.generate()}"
401
402     activity_data = %{
403       "actor" => actor.ap_id,
404       "context" => context_id,
405       "id" => activity_id,
406       "published" => datetime,
407       "type" => "Create",
408       "directMessage" => false
409     }
410
411     object_data = %{
412       "actor" => actor.ap_id,
413       "attachment" => [],
414       "attributedTo" => actor.ap_id,
415       "bcc" => [],
416       "bto" => [],
417       "content" => content,
418       "context" => context_id,
419       "conversation" => context_id,
420       "emoji" => %{},
421       "id" => object_id,
422       "published" => datetime,
423       "sensitive" => false,
424       "summary" => "",
425       "tag" => [],
426       "to" => ["https://www.w3.org/ns/activitystreams#Public"],
427       "type" => "Note"
428     }
429
430     {activity_data, object_data}
431   end
432
433   defp prepare_activity_data(actor, "public", _mention) do
434     obj_data = %{
435       "cc" => [actor.follower_address],
436       "to" => [Constants.as_public()]
437     }
438
439     act_data = %{
440       "cc" => [actor.follower_address],
441       "to" => [Constants.as_public()]
442     }
443
444     {act_data, obj_data}
445   end
446
447   defp prepare_activity_data(actor, "private", _mention) do
448     obj_data = %{
449       "cc" => [],
450       "to" => [actor.follower_address]
451     }
452
453     act_data = %{
454       "cc" => [],
455       "to" => [actor.follower_address]
456     }
457
458     {act_data, obj_data}
459   end
460
461   defp prepare_activity_data(actor, "unlisted", _mention) do
462     obj_data = %{
463       "cc" => [Constants.as_public()],
464       "to" => [actor.follower_address]
465     }
466
467     act_data = %{
468       "cc" => [Constants.as_public()],
469       "to" => [actor.follower_address]
470     }
471
472     {act_data, obj_data}
473   end
474
475   defp prepare_activity_data(_actor, "direct", mention) do
476     %{host: mentioned_host} = URI.parse(mention.ap_id)
477
478     obj_data = %{
479       "cc" => [],
480       "content" =>
481         "<span class=\"h-card\"><a class=\"u-url mention\" href=\"#{mention.ap_id}\" rel=\"ugc\">@<span>#{
482           mention.nickname
483         }</span></a></span> direct message",
484       "tag" => [
485         %{
486           "href" => mention.ap_id,
487           "name" => "@#{mention.nickname}@#{mentioned_host}",
488           "type" => "Mention"
489         }
490       ],
491       "to" => [mention.ap_id]
492     }
493
494     act_data = %{
495       "cc" => [],
496       "directMessage" => true,
497       "to" => [mention.ap_id]
498     }
499
500     {act_data, obj_data}
501   end
502
503   defp get_reply_tasks("public", :user) do
504     [:friends_local, :friends_remote, :non_friends_local, :non_friends_remote, :user]
505   end
506
507   defp get_reply_tasks("public", group) when group in @friends_groups do
508     [:non_friends_local, :non_friends_remote, :user, :friends_local, :friends_remote]
509   end
510
511   defp get_reply_tasks("public", group) when group in @non_friends_groups do
512     [:user, :friends_local, :friends_remote, :non_friends_local, :non_friends_remote]
513   end
514
515   defp get_reply_tasks(visibility, :user) when visibility in ["unlisted", "private"] do
516     [:friends_local, :friends_remote, :user, :friends_local, :friends_remote]
517   end
518
519   defp get_reply_tasks(visibility, group)
520        when visibility in ["unlisted", "private"] and group in @friends_groups do
521     [:user, :friends_remote, :friends_local, :user]
522   end
523
524   defp get_reply_tasks(visibility, group)
525        when visibility in ["unlisted", "private"] and
526               group in @non_friends_groups,
527        do: []
528
529   defp get_reply_tasks("direct", :user), do: [:friends_local, :user, :friends_remote]
530
531   defp get_reply_tasks("direct", group) when group in @friends_groups,
532     do: [:user, group, :user]
533
534   defp get_reply_tasks("direct", group) when group in @non_friends_groups do
535     [:user, :non_friends_remote, :user, :non_friends_local]
536   end
537
538   defp insert_replies(tasks, visibility, users, acc) do
539     Enum.reduce(tasks, acc, fn
540       :user, {id, data} ->
541         insert_reply(users[:user], data, id, visibility)
542
543       group, {id, data} ->
544         replier = Enum.random(users[group])
545         insert_reply(replier, data, id, visibility)
546     end)
547   end
548
549   defp insert_direct_replies(tasks, user, list, acc) do
550     Enum.reduce(tasks, acc, fn
551       :user, {id, data} ->
552         {reply_id, _} = insert_reply(user, List.delete(data, "@" <> user.nickname), id, "direct")
553         {reply_id, data}
554
555       _, {id, data} ->
556         actor = Enum.random(list)
557
558         {reply_id, _} =
559           insert_reply(actor, List.delete(data, "@" <> actor.nickname), id, "direct")
560
561         {reply_id, data}
562     end)
563   end
564
565   defp insert_reply(actor, data, activity_id, visibility) do
566     {:ok, reply} =
567       CommonAPI.post(actor, %{
568         status: Enum.join(data, ", "),
569         visibility: visibility,
570         in_reply_to_status_id: activity_id
571       })
572
573     {reply.id, ["@" <> actor.nickname | data]}
574   end
575
576   defp get_random_mentions(_users, count) when count == 0, do: []
577
578   defp get_random_mentions(users, count) do
579     users
580     |> Enum.shuffle()
581     |> Enum.take(count)
582     |> Enum.map(&"@#{&1.nickname}")
583   end
584
585   defp get_random_create_activity_id do
586     Repo.one(
587       from(a in Pleroma.Activity,
588         where: fragment("(?)->>'type' = ?", a.data, ^"Create"),
589         order_by: fragment("RANDOM()"),
590         limit: 1,
591         select: a.id
592       )
593     )
594   end
595 end