From 3a4773c3c2bd0bbef244eb519b07208da9108e49 Mon Sep 17 00:00:00 2001 From: dcc Date: Sat, 2 Sep 2023 00:52:52 -0700 Subject: First --- benchmarks/load_testing/activities.ex | 595 ++++++++++++++++++++ benchmarks/load_testing/fetcher.ex | 624 +++++++++++++++++++++ benchmarks/load_testing/helper.ex | 14 + benchmarks/load_testing/users.ex | 189 +++++++ benchmarks/mix/tasks/pleroma/benchmarks/tags.ex | 114 ++++ .../mix/tasks/pleroma/benchmarks/timelines.ex | 70 +++ benchmarks/mix/tasks/pleroma/load_testing.ex | 67 +++ 7 files changed, 1673 insertions(+) create mode 100644 benchmarks/load_testing/activities.ex create mode 100644 benchmarks/load_testing/fetcher.ex create mode 100644 benchmarks/load_testing/helper.ex create mode 100644 benchmarks/load_testing/users.ex create mode 100644 benchmarks/mix/tasks/pleroma/benchmarks/tags.ex create mode 100644 benchmarks/mix/tasks/pleroma/benchmarks/timelines.ex create mode 100644 benchmarks/mix/tasks/pleroma/load_testing.ex (limited to 'benchmarks') diff --git a/benchmarks/load_testing/activities.ex b/benchmarks/load_testing/activities.ex new file mode 100644 index 0000000..7f262d2 --- /dev/null +++ b/benchmarks/load_testing/activities.ex @@ -0,0 +1,595 @@ +defmodule Pleroma.LoadTesting.Activities do + @moduledoc """ + Module for generating different activities. + """ + import Ecto.Query + import Pleroma.LoadTesting.Helper, only: [to_sec: 1] + + alias Ecto.UUID + alias Pleroma.Constants + alias Pleroma.LoadTesting.Users + alias Pleroma.Repo + alias Pleroma.Web.CommonAPI + + require Constants + + @defaults [ + iterations: 170, + friends_used: 20, + non_friends_used: 20 + ] + + @max_concurrency 10 + + @visibility ~w(public private direct unlisted) + @types [ + :simple, + :simple_filtered, + :emoji, + :mentions, + :hell_thread, + :attachment, + :tag, + :like, + :reblog, + :simple_thread + ] + @groups [:friends_local, :friends_remote, :non_friends_local, :non_friends_local] + @remote_groups [:friends_remote, :non_friends_remote] + @friends_groups [:friends_local, :friends_remote] + @non_friends_groups [:non_friends_local, :non_friends_remote] + + @spec generate(User.t(), keyword()) :: :ok + def generate(user, opts \\ []) do + {:ok, _} = + Agent.start_link(fn -> %{} end, + name: :benchmark_state + ) + + opts = Keyword.merge(@defaults, opts) + + users = Users.prepare_users(user, opts) + + {:ok, _} = Agent.start_link(fn -> users[:non_friends_remote] end, name: :non_friends_remote) + + task_data = + for visibility <- @visibility, + type <- @types, + group <- [:user | @groups], + do: {visibility, type, group} + + IO.puts("Starting generating #{opts[:iterations]} iterations of activities...") + + public_long_thread = fn -> + generate_long_thread("public", users, opts) + end + + private_long_thread = fn -> + generate_long_thread("private", users, opts) + end + + iterations = opts[:iterations] + + {time, _} = + :timer.tc(fn -> + Enum.each( + 1..iterations, + fn + i when i == iterations - 2 -> + spawn(public_long_thread) + spawn(private_long_thread) + generate_activities(users, Enum.shuffle(task_data), opts) + + _ -> + generate_activities(users, Enum.shuffle(task_data), opts) + end + ) + end) + + IO.puts("Generating iterations of activities took #{to_sec(time)} sec.\n") + :ok + end + + def generate_power_intervals(opts \\ []) do + count = Keyword.get(opts, :count, 20) + power = Keyword.get(opts, :power, 2) + IO.puts("Generating #{count} intervals for a power #{power} series...") + counts = Enum.map(1..count, fn n -> :math.pow(n, power) end) + sum = Enum.sum(counts) + + densities = + Enum.map(counts, fn c -> + c / sum + end) + + densities + |> Enum.reduce(0, fn density, acc -> + if acc == 0 do + [{0, density}] + else + [{_, lower} | _] = acc + [{lower, lower + density} | acc] + end + end) + |> Enum.reverse() + end + + def generate_tagged_activities(opts \\ []) do + tag_count = Keyword.get(opts, :tag_count, 20) + users = Keyword.get(opts, :users, Repo.all(Pleroma.User)) + activity_count = Keyword.get(opts, :count, 200_000) + + intervals = generate_power_intervals(count: tag_count) + + IO.puts( + "Generating #{activity_count} activities using #{tag_count} different tags of format `tag_n`, starting at tag_0" + ) + + Enum.each(1..activity_count, fn _ -> + random = :rand.uniform() + i = Enum.find_index(intervals, fn {lower, upper} -> lower <= random && upper > random end) + CommonAPI.post(Enum.random(users), %{status: "a post with the tag #tag_#{i}"}) + end) + end + + defp generate_long_thread(visibility, users, _opts) do + group = + if visibility == "public", + do: :friends_local, + else: :user + + tasks = get_reply_tasks(visibility, group) |> Stream.cycle() |> Enum.take(50) + + {:ok, activity} = + CommonAPI.post(users[:user], %{ + status: "Start of #{visibility} long thread", + visibility: visibility + }) + + Agent.update(:benchmark_state, fn state -> + key = + if visibility == "public", + do: :public_thread, + else: :private_thread + + Map.put(state, key, activity) + end) + + acc = {activity.id, ["@" <> users[:user].nickname, "reply to long thread"]} + insert_replies_for_long_thread(tasks, visibility, users, acc) + IO.puts("Generating #{visibility} long thread ended\n") + end + + defp insert_replies_for_long_thread(tasks, visibility, users, acc) do + Enum.reduce(tasks, acc, fn + :user, {id, data} -> + user = users[:user] + insert_reply(user, List.delete(data, "@" <> user.nickname), id, visibility) + + group, {id, data} -> + replier = Enum.random(users[group]) + insert_reply(replier, List.delete(data, "@" <> replier.nickname), id, visibility) + end) + end + + defp generate_activities(users, task_data, opts) do + Task.async_stream( + task_data, + fn {visibility, type, group} -> + insert_activity(type, visibility, group, users, opts) + end, + max_concurrency: @max_concurrency, + timeout: 30_000 + ) + |> Stream.run() + end + + defp insert_local_activity(visibility, group, users, status) do + {:ok, _} = + group + |> get_actor(users) + |> CommonAPI.post(%{status: status, visibility: visibility}) + end + + defp insert_remote_activity(visibility, group, users, status) do + actor = get_actor(group, users) + {act_data, obj_data} = prepare_activity_data(actor, visibility, users[:user]) + {activity_data, object_data} = other_data(actor, status) + + activity_data + |> Map.merge(act_data) + |> Map.put("object", Map.merge(object_data, obj_data)) + |> Pleroma.Web.ActivityPub.ActivityPub.insert(false) + end + + defp user_mentions(users) do + user_mentions = + Enum.reduce( + @groups, + [], + fn group, acc -> + acc ++ get_random_mentions(users[group], Enum.random(0..2)) + end + ) + + if Enum.random([true, false]), + do: ["@" <> users[:user].nickname | user_mentions], + else: user_mentions + end + + defp hell_thread_mentions(users) do + with {:ok, nil} <- Cachex.get(:user_cache, "hell_thread_mentions") do + cached = + @groups + |> Enum.reduce([users[:user]], fn group, acc -> + acc ++ Enum.take(users[group], 5) + end) + |> Enum.map(&"@#{&1.nickname}") + |> Enum.join(", ") + + Cachex.put(:user_cache, "hell_thread_mentions", cached) + cached + else + {:ok, cached} -> cached + end + end + + defp insert_activity(:simple, visibility, group, users, _opts) + when group in @remote_groups do + insert_remote_activity(visibility, group, users, "Remote status") + end + + defp insert_activity(:simple, visibility, group, users, _opts) do + insert_local_activity(visibility, group, users, "Simple status") + end + + defp insert_activity(:simple_filtered, visibility, group, users, _opts) + when group in @remote_groups do + insert_remote_activity(visibility, group, users, "Remote status which must be filtered") + end + + defp insert_activity(:simple_filtered, visibility, group, users, _opts) do + insert_local_activity(visibility, group, users, "Simple status which must be filtered") + end + + defp insert_activity(:emoji, visibility, group, users, _opts) + when group in @remote_groups do + insert_remote_activity(visibility, group, users, "Remote status with emoji :firefox:") + end + + defp insert_activity(:emoji, visibility, group, users, _opts) do + insert_local_activity(visibility, group, users, "Simple status with emoji :firefox:") + end + + defp insert_activity(:mentions, visibility, group, users, _opts) + when group in @remote_groups do + mentions = user_mentions(users) + + status = Enum.join(mentions, ", ") <> " remote status with mentions" + + insert_remote_activity(visibility, group, users, status) + end + + defp insert_activity(:mentions, visibility, group, users, _opts) do + mentions = user_mentions(users) + + status = Enum.join(mentions, ", ") <> " simple status with mentions" + insert_remote_activity(visibility, group, users, status) + end + + defp insert_activity(:hell_thread, visibility, group, users, _) + when group in @remote_groups do + mentions = hell_thread_mentions(users) + insert_remote_activity(visibility, group, users, mentions <> " remote hell thread status") + end + + defp insert_activity(:hell_thread, visibility, group, users, _opts) do + mentions = hell_thread_mentions(users) + + insert_local_activity(visibility, group, users, mentions <> " hell thread status") + end + + defp insert_activity(:attachment, visibility, group, users, _opts) do + actor = get_actor(group, users) + + obj_data = %{ + "actor" => actor.ap_id, + "name" => "4467-11.jpg", + "type" => "Document", + "url" => [ + %{ + "href" => + "#{Pleroma.Web.Endpoint.url()}/media/b1b873552422a07bf53af01f3c231c841db4dfc42c35efde681abaf0f2a4eab7.jpg", + "mediaType" => "image/jpeg", + "type" => "Link" + } + ] + } + + object = Repo.insert!(%Pleroma.Object{data: obj_data}) + + {:ok, _activity} = + CommonAPI.post(actor, %{ + status: "Post with attachment", + visibility: visibility, + media_ids: [object.id] + }) + end + + defp insert_activity(:tag, visibility, group, users, _opts) do + insert_local_activity(visibility, group, users, "Status with #tag") + end + + defp insert_activity(:like, visibility, group, users, opts) do + actor = get_actor(group, users) + + with activity_id when not is_nil(activity_id) <- get_random_create_activity_id(), + {:ok, _activity} <- CommonAPI.favorite(actor, activity_id) do + :ok + else + {:error, _} -> + insert_activity(:like, visibility, group, users, opts) + + nil -> + Process.sleep(15) + insert_activity(:like, visibility, group, users, opts) + end + end + + defp insert_activity(:reblog, visibility, group, users, opts) do + actor = get_actor(group, users) + + with activity_id when not is_nil(activity_id) <- get_random_create_activity_id(), + {:ok, _activity} <- CommonAPI.repeat(activity_id, actor) do + :ok + else + {:error, _} -> + insert_activity(:reblog, visibility, group, users, opts) + + nil -> + Process.sleep(15) + insert_activity(:reblog, visibility, group, users, opts) + end + end + + defp insert_activity(:simple_thread, "direct", group, users, _opts) do + actor = get_actor(group, users) + tasks = get_reply_tasks("direct", group) + + list = + case group do + :user -> + group = Enum.random(@friends_groups) + Enum.take(users[group], 3) + + _ -> + Enum.take(users[group], 3) + end + + data = Enum.map(list, &("@" <> &1.nickname)) + + {:ok, activity} = + CommonAPI.post(actor, %{ + status: Enum.join(data, ", ") <> "simple status", + visibility: "direct" + }) + + acc = {activity.id, ["@" <> users[:user].nickname | data] ++ ["reply to status"]} + insert_direct_replies(tasks, users[:user], list, acc) + end + + defp insert_activity(:simple_thread, visibility, group, users, _opts) do + actor = get_actor(group, users) + tasks = get_reply_tasks(visibility, group) + + {:ok, activity} = + CommonAPI.post(users[:user], %{status: "Simple status", visibility: visibility}) + + acc = {activity.id, ["@" <> actor.nickname, "reply to status"]} + insert_replies(tasks, visibility, users, acc) + end + + defp get_actor(:user, %{user: user}), do: user + defp get_actor(group, users), do: Enum.random(users[group]) + + defp other_data(actor, content) do + %{host: host} = URI.parse(actor.ap_id) + datetime = DateTime.utc_now() |> to_string() + context_id = "https://#{host}/contexts/#{UUID.generate()}" + activity_id = "https://#{host}/activities/#{UUID.generate()}" + object_id = "https://#{host}/objects/#{UUID.generate()}" + + activity_data = %{ + "actor" => actor.ap_id, + "context" => context_id, + "id" => activity_id, + "published" => datetime, + "type" => "Create", + "directMessage" => false + } + + object_data = %{ + "actor" => actor.ap_id, + "attachment" => [], + "attributedTo" => actor.ap_id, + "bcc" => [], + "bto" => [], + "content" => content, + "context" => context_id, + "conversation" => context_id, + "emoji" => %{}, + "id" => object_id, + "published" => datetime, + "sensitive" => false, + "summary" => "", + "tag" => [], + "to" => ["https://www.w3.org/ns/activitystreams#Public"], + "type" => "Note" + } + + {activity_data, object_data} + end + + defp prepare_activity_data(actor, "public", _mention) do + obj_data = %{ + "cc" => [actor.follower_address], + "to" => [Constants.as_public()] + } + + act_data = %{ + "cc" => [actor.follower_address], + "to" => [Constants.as_public()] + } + + {act_data, obj_data} + end + + defp prepare_activity_data(actor, "private", _mention) do + obj_data = %{ + "cc" => [], + "to" => [actor.follower_address] + } + + act_data = %{ + "cc" => [], + "to" => [actor.follower_address] + } + + {act_data, obj_data} + end + + defp prepare_activity_data(actor, "unlisted", _mention) do + obj_data = %{ + "cc" => [Constants.as_public()], + "to" => [actor.follower_address] + } + + act_data = %{ + "cc" => [Constants.as_public()], + "to" => [actor.follower_address] + } + + {act_data, obj_data} + end + + defp prepare_activity_data(_actor, "direct", mention) do + %{host: mentioned_host} = URI.parse(mention.ap_id) + + obj_data = %{ + "cc" => [], + "content" => + "@#{ + mention.nickname + } direct message", + "tag" => [ + %{ + "href" => mention.ap_id, + "name" => "@#{mention.nickname}@#{mentioned_host}", + "type" => "Mention" + } + ], + "to" => [mention.ap_id] + } + + act_data = %{ + "cc" => [], + "directMessage" => true, + "to" => [mention.ap_id] + } + + {act_data, obj_data} + end + + defp get_reply_tasks("public", :user) do + [:friends_local, :friends_remote, :non_friends_local, :non_friends_remote, :user] + end + + defp get_reply_tasks("public", group) when group in @friends_groups do + [:non_friends_local, :non_friends_remote, :user, :friends_local, :friends_remote] + end + + defp get_reply_tasks("public", group) when group in @non_friends_groups do + [:user, :friends_local, :friends_remote, :non_friends_local, :non_friends_remote] + end + + defp get_reply_tasks(visibility, :user) when visibility in ["unlisted", "private"] do + [:friends_local, :friends_remote, :user, :friends_local, :friends_remote] + end + + defp get_reply_tasks(visibility, group) + when visibility in ["unlisted", "private"] and group in @friends_groups do + [:user, :friends_remote, :friends_local, :user] + end + + defp get_reply_tasks(visibility, group) + when visibility in ["unlisted", "private"] and + group in @non_friends_groups, + do: [] + + defp get_reply_tasks("direct", :user), do: [:friends_local, :user, :friends_remote] + + defp get_reply_tasks("direct", group) when group in @friends_groups, + do: [:user, group, :user] + + defp get_reply_tasks("direct", group) when group in @non_friends_groups do + [:user, :non_friends_remote, :user, :non_friends_local] + end + + defp insert_replies(tasks, visibility, users, acc) do + Enum.reduce(tasks, acc, fn + :user, {id, data} -> + insert_reply(users[:user], data, id, visibility) + + group, {id, data} -> + replier = Enum.random(users[group]) + insert_reply(replier, data, id, visibility) + end) + end + + defp insert_direct_replies(tasks, user, list, acc) do + Enum.reduce(tasks, acc, fn + :user, {id, data} -> + {reply_id, _} = insert_reply(user, List.delete(data, "@" <> user.nickname), id, "direct") + {reply_id, data} + + _, {id, data} -> + actor = Enum.random(list) + + {reply_id, _} = + insert_reply(actor, List.delete(data, "@" <> actor.nickname), id, "direct") + + {reply_id, data} + end) + end + + defp insert_reply(actor, data, activity_id, visibility) do + {:ok, reply} = + CommonAPI.post(actor, %{ + status: Enum.join(data, ", "), + visibility: visibility, + in_reply_to_status_id: activity_id + }) + + {reply.id, ["@" <> actor.nickname | data]} + end + + defp get_random_mentions(_users, count) when count == 0, do: [] + + defp get_random_mentions(users, count) do + users + |> Enum.shuffle() + |> Enum.take(count) + |> Enum.map(&"@#{&1.nickname}") + end + + defp get_random_create_activity_id do + Repo.one( + from(a in Pleroma.Activity, + where: fragment("(?)->>'type' = ?", a.data, ^"Create"), + order_by: fragment("RANDOM()"), + limit: 1, + select: a.id + ) + ) + end +end diff --git a/benchmarks/load_testing/fetcher.ex b/benchmarks/load_testing/fetcher.ex new file mode 100644 index 0000000..607b7d4 --- /dev/null +++ b/benchmarks/load_testing/fetcher.ex @@ -0,0 +1,624 @@ +defmodule Pleroma.LoadTesting.Fetcher do + alias Pleroma.Activity + alias Pleroma.Pagination + alias Pleroma.Repo + alias Pleroma.User + alias Pleroma.Web.ActivityPub.ActivityPub + alias Pleroma.Web.MastodonAPI.MastodonAPI + alias Pleroma.Web.MastodonAPI.StatusView + + @spec run_benchmarks(User.t()) :: any() + def run_benchmarks(user) do + fetch_user(user) + fetch_timelines(user) + render_views(user) + end + + defp formatters do + [ + Benchee.Formatters.Console + ] + end + + defp fetch_user(user) do + Benchee.run( + %{ + "By id" => fn -> Repo.get_by(User, id: user.id) end, + "By ap_id" => fn -> Repo.get_by(User, ap_id: user.ap_id) end, + "By email" => fn -> Repo.get_by(User, email: user.email) end, + "By nickname" => fn -> Repo.get_by(User, nickname: user.nickname) end + }, + formatters: formatters() + ) + end + + defp create_filter(user) do + Pleroma.Filter.create(%{ + user_id: user.id, + phrase: "must be filtered", + hide: true, + context: ["home"] + }) + end + + defp delete_filter(filter), do: Repo.delete(filter) + + defp fetch_timelines(user) do + fetch_home_timeline(user) + fetch_home_timeline_with_filter(user) + fetch_direct_timeline(user) + fetch_public_timeline(user) + fetch_public_timeline_with_filter(user) + fetch_public_timeline(user, :with_blocks) + fetch_public_timeline(user, :local) + fetch_public_timeline(user, :tag) + fetch_notifications(user) + fetch_favourites(user) + fetch_long_thread(user) + fetch_timelines_with_reply_filtering(user) + end + + defp render_views(user) do + render_timelines(user) + render_long_thread(user) + end + + defp opts_for_home_timeline(user) do + %{ + blocking_user: user, + count: "20", + muting_user: user, + type: ["Create", "Announce"], + user: user, + with_muted: true + } + end + + defp fetch_home_timeline(user, title_end \\ "") do + opts = opts_for_home_timeline(user) + + recipients = [user.ap_id | User.following(user)] + + first_page_last = + ActivityPub.fetch_activities(recipients, opts) |> Enum.reverse() |> List.last() + + second_page_last = + ActivityPub.fetch_activities(recipients, Map.put(opts, :max_id, first_page_last.id)) + |> Enum.reverse() + |> List.last() + + third_page_last = + ActivityPub.fetch_activities(recipients, Map.put(opts, :max_id, second_page_last.id)) + |> Enum.reverse() + |> List.last() + + forth_page_last = + ActivityPub.fetch_activities(recipients, Map.put(opts, :max_id, third_page_last.id)) + |> Enum.reverse() + |> List.last() + + title = "home timeline " <> title_end + + Benchee.run( + %{ + title => fn opts -> ActivityPub.fetch_activities(recipients, opts) end + }, + inputs: %{ + "1 page" => opts, + "2 page" => Map.put(opts, :max_id, first_page_last.id), + "3 page" => Map.put(opts, :max_id, second_page_last.id), + "4 page" => Map.put(opts, :max_id, third_page_last.id), + "5 page" => Map.put(opts, :max_id, forth_page_last.id), + "1 page only media" => Map.put(opts, :only_media, true), + "2 page only media" => + Map.put(opts, :max_id, first_page_last.id) |> Map.put(:only_media, true), + "3 page only media" => + Map.put(opts, :max_id, second_page_last.id) |> Map.put(:only_media, true), + "4 page only media" => + Map.put(opts, :max_id, third_page_last.id) |> Map.put(:only_media, true), + "5 page only media" => + Map.put(opts, :max_id, forth_page_last.id) |> Map.put(:only_media, true) + }, + formatters: formatters() + ) + end + + defp fetch_home_timeline_with_filter(user) do + {:ok, filter} = create_filter(user) + + fetch_home_timeline(user, "with filters") + + delete_filter(filter) + end + + defp opts_for_direct_timeline(user) do + %{ + visibility: "direct", + blocking_user: user, + count: "20", + type: "Create", + user: user, + with_muted: true + } + end + + defp fetch_direct_timeline(user) do + recipients = [user.ap_id] + + opts = opts_for_direct_timeline(user) + + first_page_last = + recipients + |> ActivityPub.fetch_activities_query(opts) + |> Pagination.fetch_paginated(opts) + |> List.last() + + opts2 = Map.put(opts, :max_id, first_page_last.id) + + second_page_last = + recipients + |> ActivityPub.fetch_activities_query(opts2) + |> Pagination.fetch_paginated(opts2) + |> List.last() + + opts3 = Map.put(opts, :max_id, second_page_last.id) + + third_page_last = + recipients + |> ActivityPub.fetch_activities_query(opts3) + |> Pagination.fetch_paginated(opts3) + |> List.last() + + opts4 = Map.put(opts, :max_id, third_page_last.id) + + forth_page_last = + recipients + |> ActivityPub.fetch_activities_query(opts4) + |> Pagination.fetch_paginated(opts4) + |> List.last() + + Benchee.run( + %{ + "direct timeline" => fn opts -> + ActivityPub.fetch_activities_query(recipients, opts) |> Pagination.fetch_paginated(opts) + end + }, + inputs: %{ + "1 page" => opts, + "2 page" => opts2, + "3 page" => opts3, + "4 page" => opts4, + "5 page" => Map.put(opts4, :max_id, forth_page_last.id) + }, + formatters: formatters() + ) + end + + defp opts_for_public_timeline(user) do + %{ + type: ["Create", "Announce"], + local_only: false, + blocking_user: user, + muting_user: user + } + end + + defp opts_for_public_timeline(user, :local) do + %{ + type: ["Create", "Announce"], + local_only: true, + blocking_user: user, + muting_user: user + } + end + + defp opts_for_public_timeline(user, :tag) do + %{ + blocking_user: user, + count: "20", + local_only: nil, + muting_user: user, + tag: ["tag"], + tag_all: [], + tag_reject: [], + type: "Create", + user: user, + with_muted: true + } + end + + defp fetch_public_timeline(user) do + opts = opts_for_public_timeline(user) + + fetch_public_timeline(opts, "public timeline") + end + + defp fetch_public_timeline_with_filter(user) do + {:ok, filter} = create_filter(user) + opts = opts_for_public_timeline(user) + + fetch_public_timeline(opts, "public timeline with filters") + delete_filter(filter) + end + + defp fetch_public_timeline(user, :local) do + opts = opts_for_public_timeline(user, :local) + + fetch_public_timeline(opts, "public timeline only local") + end + + defp fetch_public_timeline(user, :tag) do + opts = opts_for_public_timeline(user, :tag) + + fetch_public_timeline(opts, "hashtag timeline") + end + + defp fetch_public_timeline(user, :only_media) do + opts = opts_for_public_timeline(user) |> Map.put(:only_media, true) + + fetch_public_timeline(opts, "public timeline only media") + end + + defp fetch_public_timeline(user, :with_blocks) do + opts = opts_for_public_timeline(user) + + remote_non_friends = Agent.get(:non_friends_remote, & &1) + + Benchee.run(%{ + "public timeline without blocks" => fn -> + ActivityPub.fetch_public_activities(opts) + end + }) + + Enum.each(remote_non_friends, fn non_friend -> + {:ok, _} = User.block(user, non_friend) + end) + + user = User.get_by_id(user.id) + + opts = Map.put(opts, :blocking_user, user) + + Benchee.run(%{ + "public timeline with user block" => fn -> + ActivityPub.fetch_public_activities(opts) + end + }) + + domains = + Enum.reduce(remote_non_friends, [], fn non_friend, domains -> + {:ok, _user} = User.unblock(user, non_friend) + %{host: host} = URI.parse(non_friend.ap_id) + [host | domains] + end) + + domains = Enum.uniq(domains) + + Enum.each(domains, fn domain -> + {:ok, _} = User.block_domain(user, domain) + end) + + user = User.get_by_id(user.id) + opts = Map.put(opts, :blocking_user, user) + + Benchee.run(%{ + "public timeline with domain block" => fn -> + ActivityPub.fetch_public_activities(opts) + end + }) + end + + defp fetch_public_timeline(opts, title) when is_binary(title) do + first_page_last = ActivityPub.fetch_public_activities(opts) |> List.last() + + second_page_last = + ActivityPub.fetch_public_activities(Map.put(opts, :max_id, first_page_last.id)) + |> List.last() + + third_page_last = + ActivityPub.fetch_public_activities(Map.put(opts, :max_id, second_page_last.id)) + |> List.last() + + forth_page_last = + ActivityPub.fetch_public_activities(Map.put(opts, :max_id, third_page_last.id)) + |> List.last() + + Benchee.run( + %{ + title => fn opts -> + ActivityPub.fetch_public_activities(opts) + end + }, + inputs: %{ + "1 page" => opts, + "2 page" => Map.put(opts, :max_id, first_page_last.id), + "3 page" => Map.put(opts, :max_id, second_page_last.id), + "4 page" => Map.put(opts, :max_id, third_page_last.id), + "5 page" => Map.put(opts, :max_id, forth_page_last.id) + }, + formatters: formatters() + ) + end + + defp opts_for_notifications do + %{count: "20", with_muted: true} + end + + defp fetch_notifications(user) do + opts = opts_for_notifications() + + first_page_last = MastodonAPI.get_notifications(user, opts) |> List.last() + + second_page_last = + MastodonAPI.get_notifications(user, Map.put(opts, :max_id, first_page_last.id)) + |> List.last() + + third_page_last = + MastodonAPI.get_notifications(user, Map.put(opts, :max_id, second_page_last.id)) + |> List.last() + + forth_page_last = + MastodonAPI.get_notifications(user, Map.put(opts, :max_id, third_page_last.id)) + |> List.last() + + Benchee.run( + %{ + "Notifications" => fn opts -> + MastodonAPI.get_notifications(user, opts) + end + }, + inputs: %{ + "1 page" => opts, + "2 page" => Map.put(opts, :max_id, first_page_last.id), + "3 page" => Map.put(opts, :max_id, second_page_last.id), + "4 page" => Map.put(opts, :max_id, third_page_last.id), + "5 page" => Map.put(opts, :max_id, forth_page_last.id) + }, + formatters: formatters() + ) + end + + defp fetch_favourites(user) do + first_page_last = ActivityPub.fetch_favourites(user) |> List.last() + + second_page_last = + ActivityPub.fetch_favourites(user, %{:max_id => first_page_last.id}) |> List.last() + + third_page_last = + ActivityPub.fetch_favourites(user, %{:max_id => second_page_last.id}) |> List.last() + + forth_page_last = + ActivityPub.fetch_favourites(user, %{:max_id => third_page_last.id}) |> List.last() + + Benchee.run( + %{ + "Favourites" => fn opts -> + ActivityPub.fetch_favourites(user, opts) + end + }, + inputs: %{ + "1 page" => %{}, + "2 page" => %{:max_id => first_page_last.id}, + "3 page" => %{:max_id => second_page_last.id}, + "4 page" => %{:max_id => third_page_last.id}, + "5 page" => %{:max_id => forth_page_last.id} + }, + formatters: formatters() + ) + end + + defp opts_for_long_thread(user) do + %{ + blocking_user: user, + user: user + } + end + + defp fetch_long_thread(user) do + %{public_thread: public, private_thread: private} = + Agent.get(:benchmark_state, fn state -> state end) + + opts = opts_for_long_thread(user) + + private_input = {private.data["context"], Map.put(opts, :exclude_id, private.id)} + + public_input = {public.data["context"], Map.put(opts, :exclude_id, public.id)} + + Benchee.run( + %{ + "fetch context" => fn {context, opts} -> + ActivityPub.fetch_activities_for_context(context, opts) + end + }, + inputs: %{ + "Private long thread" => private_input, + "Public long thread" => public_input + }, + formatters: formatters() + ) + end + + defp render_timelines(user) do + opts = opts_for_home_timeline(user) + + recipients = [user.ap_id | User.following(user)] + + home_activities = ActivityPub.fetch_activities(recipients, opts) |> Enum.reverse() + + recipients = [user.ap_id] + + opts = opts_for_direct_timeline(user) + + direct_activities = + recipients + |> ActivityPub.fetch_activities_query(opts) + |> Pagination.fetch_paginated(opts) + + opts = opts_for_public_timeline(user) + + public_activities = ActivityPub.fetch_public_activities(opts) + + opts = opts_for_public_timeline(user, :tag) + + tag_activities = ActivityPub.fetch_public_activities(opts) + + opts = opts_for_notifications() + + notifications = MastodonAPI.get_notifications(user, opts) + + favourites = ActivityPub.fetch_favourites(user) + + Benchee.run( + %{ + "Rendering home timeline" => fn -> + StatusView.render("index.json", %{ + activities: home_activities, + for: user, + as: :activity + }) + end, + "Rendering direct timeline" => fn -> + StatusView.render("index.json", %{ + activities: direct_activities, + for: user, + as: :activity + }) + end, + "Rendering public timeline" => fn -> + StatusView.render("index.json", %{ + activities: public_activities, + for: user, + as: :activity + }) + end, + "Rendering tag timeline" => fn -> + StatusView.render("index.json", %{ + activities: tag_activities, + for: user, + as: :activity + }) + end, + "Rendering notifications" => fn -> + Pleroma.Web.MastodonAPI.NotificationView.render("index.json", %{ + notifications: notifications, + for: user + }) + end, + "Rendering favourites timeline" => fn -> + StatusView.render("index.json", %{ + activities: favourites, + for: user, + as: :activity + }) + end + }, + formatters: formatters() + ) + end + + defp render_long_thread(user) do + %{public_thread: public, private_thread: private} = + Agent.get(:benchmark_state, fn state -> state end) + + opts = %{for: user} + public_activity = Activity.get_by_id_with_object(public.id) + private_activity = Activity.get_by_id_with_object(private.id) + + Benchee.run( + %{ + "render" => fn opts -> + StatusView.render("show.json", opts) + end + }, + inputs: %{ + "Public root" => Map.put(opts, :activity, public_activity), + "Private root" => Map.put(opts, :activity, private_activity) + }, + formatters: formatters() + ) + + fetch_opts = opts_for_long_thread(user) + + public_context = + ActivityPub.fetch_activities_for_context( + public.data["context"], + Map.put(fetch_opts, :exclude_id, public.id) + ) + + private_context = + ActivityPub.fetch_activities_for_context( + private.data["context"], + Map.put(fetch_opts, :exclude_id, private.id) + ) + + Benchee.run( + %{ + "render" => fn opts -> + StatusView.render("context.json", opts) + end + }, + inputs: %{ + "Public context" => %{user: user, activity: public_activity, activities: public_context}, + "Private context" => %{ + user: user, + activity: private_activity, + activities: private_context + } + }, + formatters: formatters() + ) + end + + defp fetch_timelines_with_reply_filtering(user) do + public_params = opts_for_public_timeline(user) + + Benchee.run( + %{ + "Public timeline without reply filtering" => fn -> + ActivityPub.fetch_public_activities(public_params) + end, + "Public timeline with reply filtering - following" => fn -> + public_params + |> Map.put(:reply_visibility, "following") + |> Map.put(:reply_filtering_user, user) + |> ActivityPub.fetch_public_activities() + end, + "Public timeline with reply filtering - self" => fn -> + public_params + |> Map.put(:reply_visibility, "self") + |> Map.put(:reply_filtering_user, user) + |> ActivityPub.fetch_public_activities() + end + }, + formatters: formatters() + ) + + private_params = opts_for_home_timeline(user) + + recipients = [user.ap_id | User.following(user)] + + Benchee.run( + %{ + "Home timeline without reply filtering" => fn -> + ActivityPub.fetch_activities(recipients, private_params) + end, + "Home timeline with reply filtering - following" => fn -> + private_params = + private_params + |> Map.put(:reply_filtering_user, user) + |> Map.put(:reply_visibility, "following") + + ActivityPub.fetch_activities(recipients, private_params) + end, + "Home timeline with reply filtering - self" => fn -> + private_params = + private_params + |> Map.put(:reply_filtering_user, user) + |> Map.put(:reply_visibility, "self") + + ActivityPub.fetch_activities(recipients, private_params) + end + }, + formatters: formatters() + ) + end +end diff --git a/benchmarks/load_testing/helper.ex b/benchmarks/load_testing/helper.ex new file mode 100644 index 0000000..cab60ac --- /dev/null +++ b/benchmarks/load_testing/helper.ex @@ -0,0 +1,14 @@ +defmodule Pleroma.LoadTesting.Helper do + alias Ecto.Adapters.SQL + alias Pleroma.Repo + + def to_sec(microseconds), do: microseconds / 1_000_000 + + def clean_tables do + IO.puts("Deleting old data...\n") + SQL.query!(Repo, "TRUNCATE users CASCADE;") + SQL.query!(Repo, "TRUNCATE activities CASCADE;") + SQL.query!(Repo, "TRUNCATE objects CASCADE;") + SQL.query!(Repo, "TRUNCATE oban_jobs CASCADE;") + end +end diff --git a/benchmarks/load_testing/users.ex b/benchmarks/load_testing/users.ex new file mode 100644 index 0000000..0a33cbf --- /dev/null +++ b/benchmarks/load_testing/users.ex @@ -0,0 +1,189 @@ +defmodule Pleroma.LoadTesting.Users do + @moduledoc """ + Module for generating users with friends. + """ + import Ecto.Query + import Pleroma.LoadTesting.Helper, only: [to_sec: 1] + + alias Pleroma.Repo + alias Pleroma.User + alias Pleroma.User.Query + + @defaults [ + users: 20_000, + friends: 100 + ] + + @max_concurrency 10 + + @spec generate(keyword()) :: User.t() + def generate(opts \\ []) do + opts = Keyword.merge(@defaults, opts) + + generate_users(opts[:users]) + + main_user = + Repo.one(from(u in User, where: u.local == true, order_by: fragment("RANDOM()"), limit: 1)) + + make_friends(main_user, opts[:friends]) + + User.get_by_id(main_user.id) + end + + def generate_users(max) do + IO.puts("Starting generating #{max} users...") + + {time, users} = + :timer.tc(fn -> + Task.async_stream( + 1..max, + &generate_user(&1), + max_concurrency: @max_concurrency, + timeout: 30_000 + ) + |> Enum.to_list() + end) + + IO.puts("Generating users took #{to_sec(time)} sec.\n") + users + end + + defp generate_user(i) do + remote = Enum.random([true, false]) + + %User{ + name: "Test ใƒ†ใ‚นใƒˆ User #{i}", + email: "user#{i}@example.com", + nickname: "nick#{i}", + password_hash: Pleroma.Password.Pbkdf2.hash_pwd_salt("test"), + bio: "Tester Number #{i}", + local: !remote + } + |> user_urls() + |> Repo.insert!() + end + + defp user_urls(%{local: true} = user) do + urls = %{ + ap_id: User.ap_id(user), + follower_address: User.ap_followers(user), + following_address: User.ap_following(user) + } + + Map.merge(user, urls) + end + + defp user_urls(%{local: false} = user) do + base_domain = Enum.random(["domain1.com", "domain2.com", "domain3.com"]) + + ap_id = "https://#{base_domain}/users/#{user.nickname}" + + urls = %{ + ap_id: ap_id, + follower_address: ap_id <> "/followers", + following_address: ap_id <> "/following" + } + + Map.merge(user, urls) + end + + def make_friends(main_user, max) when is_integer(max) do + IO.puts("Starting making friends for #{max} users...") + + {time, _} = + :timer.tc(fn -> + number_of_users = + (max / 2) + |> Kernel.trunc() + + main_user + |> get_users(%{limit: number_of_users, local: :local}) + |> run_stream(main_user) + + main_user + |> get_users(%{limit: number_of_users, local: :external}) + |> run_stream(main_user) + end) + + IO.puts("Making friends took #{to_sec(time)} sec.\n") + end + + def make_friends(%User{} = main_user, %User{} = user) do + {:ok, _, _} = User.follow(main_user, user) + {:ok, _, _} = User.follow(user, main_user) + end + + @spec get_users(User.t(), keyword()) :: [User.t()] + def get_users(user, opts) do + criteria = %{limit: opts[:limit]} + + criteria = + if opts[:local] do + Map.put(criteria, opts[:local], true) + else + criteria + end + + criteria = + if opts[:friends?] do + Map.put(criteria, :friends, user) + else + criteria + end + + query = + criteria + |> Query.build() + |> random_without_user(user) + + query = + if opts[:friends?] == false do + friends_ids = + %{friends: user} + |> Query.build() + |> Repo.all() + |> Enum.map(& &1.id) + + from(u in query, where: u.id not in ^friends_ids) + else + query + end + + Repo.all(query) + end + + defp random_without_user(query, user) do + from(u in query, + where: u.id != ^user.id, + order_by: fragment("RANDOM()") + ) + end + + defp run_stream(users, main_user) do + Task.async_stream(users, &make_friends(main_user, &1), + max_concurrency: @max_concurrency, + timeout: 30_000 + ) + |> Stream.run() + end + + @spec prepare_users(User.t(), keyword()) :: map() + def prepare_users(user, opts) do + friends_limit = opts[:friends_used] + non_friends_limit = opts[:non_friends_used] + + %{ + user: user, + friends_local: fetch_users(user, friends_limit, :local, true), + friends_remote: fetch_users(user, friends_limit, :external, true), + non_friends_local: fetch_users(user, non_friends_limit, :local, false), + non_friends_remote: fetch_users(user, non_friends_limit, :external, false) + } + end + + defp fetch_users(user, limit, local, friends?) do + user + |> get_users(limit: limit, local: local, friends?: friends?) + |> Enum.shuffle() + end +end diff --git a/benchmarks/mix/tasks/pleroma/benchmarks/tags.ex b/benchmarks/mix/tasks/pleroma/benchmarks/tags.ex new file mode 100644 index 0000000..a32de2d --- /dev/null +++ b/benchmarks/mix/tasks/pleroma/benchmarks/tags.ex @@ -0,0 +1,114 @@ +defmodule Mix.Tasks.Pleroma.Benchmarks.Tags do + use Mix.Task + + import Pleroma.LoadTesting.Helper, only: [clean_tables: 0] + import Ecto.Query + + alias Pleroma.Repo + + def run(_args) do + Mix.Pleroma.start_pleroma() + activities_count = Repo.aggregate(from(a in Pleroma.Activity), :count, :id) + + if activities_count == 0 do + IO.puts("Did not find any activities, cleaning and generating") + clean_tables() + Pleroma.LoadTesting.Users.generate_users(10) + Pleroma.LoadTesting.Activities.generate_tagged_activities() + else + IO.puts("Found #{activities_count} activities, won't generate new ones") + end + + tags = Enum.map(0..20, fn i -> {"For #tag_#{i}", "tag_#{i}"} end) + + Enum.each(tags, fn {_, tag} -> + query = + from(o in Pleroma.Object, + where: fragment("(?)->'tag' \\? (?)", o.data, ^tag) + ) + + count = Repo.aggregate(query, :count, :id) + IO.puts("Database contains #{count} posts tagged with #{tag}") + end) + + user = Repo.all(Pleroma.User) |> List.first() + + Benchee.run( + %{ + "Hashtag fetching, any" => fn tags -> + hashtag_fetching( + %{ + "any" => tags + }, + user, + false + ) + end, + # Will always return zero results because no overlapping hashtags are generated. + "Hashtag fetching, all" => fn tags -> + hashtag_fetching( + %{ + "all" => tags + }, + user, + false + ) + end + }, + inputs: + tags + |> Enum.map(fn {_, v} -> v end) + |> Enum.chunk_every(2) + |> Enum.map(fn tags -> {"For #{inspect(tags)}", tags} end), + time: 5 + ) + + Benchee.run( + %{ + "Hashtag fetching" => fn tag -> + hashtag_fetching( + %{ + "tag" => tag + }, + user, + false + ) + end + }, + inputs: tags, + time: 5 + ) + end + + defp hashtag_fetching(params, user, local_only) do + tags = + [params["tag"], params["any"]] + |> List.flatten() + |> Enum.uniq() + |> Enum.filter(& &1) + |> Enum.map(&String.downcase(&1)) + + tag_all = + params + |> Map.get("all", []) + |> Enum.map(&String.downcase(&1)) + + tag_reject = + params + |> Map.get("none", []) + |> Enum.map(&String.downcase(&1)) + + _activities = + %{ + type: "Create", + local_only: local_only, + blocking_user: user, + muting_user: user, + user: user, + tag: tags, + tag_all: tag_all, + tag_reject: tag_reject, + } + |> Pleroma.Web.ActivityPub.ActivityPub.fetch_public_activities() + end +end diff --git a/benchmarks/mix/tasks/pleroma/benchmarks/timelines.ex b/benchmarks/mix/tasks/pleroma/benchmarks/timelines.ex new file mode 100644 index 0000000..3770ca1 --- /dev/null +++ b/benchmarks/mix/tasks/pleroma/benchmarks/timelines.ex @@ -0,0 +1,70 @@ +defmodule Mix.Tasks.Pleroma.Benchmarks.Timelines do + use Mix.Task + + import Pleroma.LoadTesting.Helper, only: [clean_tables: 0] + + alias Pleroma.Web.CommonAPI + alias Plug.Conn + + def run(_args) do + Mix.Pleroma.start_pleroma() + + # Cleaning tables + clean_tables() + + [{:ok, user} | users] = Pleroma.LoadTesting.Users.generate_users(1000) + + # Let the user make 100 posts + + 1..100 + |> Enum.each(fn i -> CommonAPI.post(user, %{status: to_string(i)}) end) + + # Let 10 random users post + posts = + users + |> Enum.take_random(10) + |> Enum.map(fn {:ok, random_user} -> + {:ok, activity} = CommonAPI.post(random_user, %{status: "."}) + activity + end) + + # let our user repeat them + posts + |> Enum.each(fn activity -> + CommonAPI.repeat(activity.id, user) + end) + + Benchee.run( + %{ + "user timeline, no followers" => fn reading_user -> + conn = + Phoenix.ConnTest.build_conn() + |> Conn.assign(:user, reading_user) + |> Conn.assign(:skip_link_headers, true) + + Pleroma.Web.MastodonAPI.AccountController.statuses(conn, %{id: user.id}) + end + }, + inputs: %{"user" => user, "no user" => nil}, + time: 60 + ) + + users + |> Enum.each(fn {:ok, follower} -> Pleroma.User.follow(follower, user) end) + + Benchee.run( + %{ + "user timeline, all following" => fn reading_user -> + conn = + Phoenix.ConnTest.build_conn() + |> Conn.assign(:user, reading_user) + |> Conn.assign(:skip_link_headers, true) + + Pleroma.Web.MastodonAPI.AccountController.statuses(conn, %{id: user.id}) + end + }, + inputs: %{"user" => user, "no user" => nil}, + time: 60 + ) + end +end diff --git a/benchmarks/mix/tasks/pleroma/load_testing.ex b/benchmarks/mix/tasks/pleroma/load_testing.ex new file mode 100644 index 0000000..3888832 --- /dev/null +++ b/benchmarks/mix/tasks/pleroma/load_testing.ex @@ -0,0 +1,67 @@ +defmodule Mix.Tasks.Pleroma.LoadTesting do + use Mix.Task + import Ecto.Query + import Pleroma.LoadTesting.Helper, only: [clean_tables: 0] + + alias Pleroma.Repo + alias Pleroma.User + + @shortdoc "Factory for generation data" + @moduledoc """ + Generates data like: + - local/remote users + - local/remote activities with differrent visibility: + - simple activiities + - with emoji + - with mentions + - hellthreads + - with attachments + - with tags + - likes + - reblogs + - simple threads + - long threads + + ## Generate data + MIX_ENV=benchmark mix pleroma.load_testing --users 20000 --friends 1000 --iterations 170 --friends_used 20 --non_friends_used 20 + MIX_ENV=benchmark mix pleroma.load_testing -u 20000 -f 1000 -i 170 -fu 20 -nfu 20 + + Options: + - `--users NUMBER` - number of users to generate. Defaults to: 20000. Alias: `-u` + - `--friends NUMBER` - number of friends for main user. Defaults to: 1000. Alias: `-f` + - `--iterations NUMBER` - number of iterations to generate activities. For each iteration in database is inserted about 120+ activities with different visibility, actors and types.Defaults to: 170. Alias: `-i` + - `--friends_used NUMBER` - number of main user friends used in activity generation. Defaults to: 20. Alias: `-fu` + - `--non_friends_used NUMBER` - number of non friends used in activity generation. Defaults to: 20. Alias: `-nfu` + """ + + @aliases [u: :users, f: :friends, i: :iterations, fu: :friends_used, nfu: :non_friends_used] + @switches [ + users: :integer, + friends: :integer, + iterations: :integer, + friends_used: :integer, + non_friends_used: :integer + ] + + def run(args) do + Logger.configure(level: :error) + Mix.Pleroma.start_pleroma() + clean_tables() + {opts, _} = OptionParser.parse!(args, strict: @switches, aliases: @aliases) + + user = Pleroma.LoadTesting.Users.generate(opts) + Pleroma.LoadTesting.Activities.generate(user, opts) + + IO.puts("Users in DB: #{Repo.aggregate(from(u in User), :count, :id)}") + + IO.puts("Activities in DB: #{Repo.aggregate(from(a in Pleroma.Activity), :count, :id)}") + + IO.puts("Objects in DB: #{Repo.aggregate(from(o in Pleroma.Object), :count, :id)}") + + IO.puts( + "Notifications in DB: #{Repo.aggregate(from(n in Pleroma.Notification), :count, :id)}" + ) + + Pleroma.LoadTesting.Fetcher.run_benchmarks(user) + end +end -- cgit v1.2.3