total rebase
[anni] / test / support / data_case.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.DataCase do
6   @moduledoc """
7   This module defines the setup for tests requiring
8   access to the application's data layer.
9
10   You may define functions here to be used as helpers in
11   your tests.
12
13   Finally, if the test case interacts with the database,
14   it cannot be async. For this reason, every test runs
15   inside a transaction which is reset at the beginning
16   of the test unless the test case is marked as async.
17   """
18
19   use ExUnit.CaseTemplate
20
21   import Pleroma.Tests.Helpers, only: [clear_config: 2]
22
23   using do
24     quote do
25       alias Pleroma.Repo
26
27       import Ecto
28       import Ecto.Changeset
29       import Ecto.Query
30       import Pleroma.DataCase
31       use Pleroma.Tests.Helpers
32
33       # Sets up OAuth access with specified scopes
34       defp oauth_access(scopes, opts \\ []) do
35         user =
36           Keyword.get_lazy(opts, :user, fn ->
37             Pleroma.Factory.insert(:user)
38           end)
39
40         token =
41           Keyword.get_lazy(opts, :oauth_token, fn ->
42             Pleroma.Factory.insert(:oauth_token, user: user, scopes: scopes)
43           end)
44
45         %{user: user, token: token}
46       end
47     end
48   end
49
50   def clear_cachex do
51     Pleroma.Supervisor
52     |> Supervisor.which_children()
53     |> Enum.each(fn
54       {name, _, _, [Cachex]} ->
55         name
56         |> to_string
57         |> String.trim_leading("cachex_")
58         |> Kernel.<>("_cache")
59         |> String.to_existing_atom()
60         |> Cachex.clear()
61
62       _ ->
63         nil
64     end)
65   end
66
67   def setup_multi_process_mode(tags) do
68     :ok = Ecto.Adapters.SQL.Sandbox.checkout(Pleroma.Repo)
69
70     if tags[:async] do
71       Mox.stub_with(Pleroma.CachexMock, Pleroma.NullCache)
72       Mox.set_mox_private()
73     else
74       Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, {:shared, self()})
75
76       Mox.set_mox_global()
77       Mox.stub_with(Pleroma.CachexMock, Pleroma.CachexProxy)
78       clear_cachex()
79     end
80
81     :ok
82   end
83
84   def setup_streamer(tags) do
85     if tags[:needs_streamer] do
86       start_supervised(%{
87         id: Pleroma.Web.Streamer.registry(),
88         start:
89           {Registry, :start_link, [[keys: :duplicate, name: Pleroma.Web.Streamer.registry()]]}
90       })
91     end
92
93     :ok
94   end
95
96   setup tags do
97     setup_multi_process_mode(tags)
98     setup_streamer(tags)
99     stub_pipeline()
100
101     Mox.verify_on_exit!()
102
103     :ok
104   end
105
106   def stub_pipeline do
107     Mox.stub_with(Pleroma.Web.ActivityPub.SideEffectsMock, Pleroma.Web.ActivityPub.SideEffects)
108
109     Mox.stub_with(
110       Pleroma.Web.ActivityPub.ObjectValidatorMock,
111       Pleroma.Web.ActivityPub.ObjectValidator
112     )
113
114     Mox.stub_with(Pleroma.Web.ActivityPub.MRFMock, Pleroma.Web.ActivityPub.MRF)
115     Mox.stub_with(Pleroma.Web.ActivityPub.ActivityPubMock, Pleroma.Web.ActivityPub.ActivityPub)
116     Mox.stub_with(Pleroma.Web.FederatorMock, Pleroma.Web.Federator)
117     Mox.stub_with(Pleroma.ConfigMock, Pleroma.Config)
118     Mox.stub_with(Pleroma.StaticStubbedConfigMock, Pleroma.Test.StaticConfig)
119   end
120
121   def ensure_local_uploader(context) do
122     test_uploader = Map.get(context, :uploader) || Pleroma.Uploaders.Local
123
124     clear_config([Pleroma.Upload, :uploader], test_uploader)
125     clear_config([Pleroma.Upload, :filters], [])
126
127     :ok
128   end
129
130   @doc """
131   A helper that transform changeset errors to a map of messages.
132
133       changeset = Accounts.create_user(%{password: "short"})
134       assert "password is too short" in errors_on(changeset).password
135       assert %{password: ["password is too short"]} = errors_on(changeset)
136
137   """
138   def errors_on(changeset) do
139     Ecto.Changeset.traverse_errors(changeset, fn {message, opts} ->
140       Enum.reduce(opts, message, fn {key, value}, acc ->
141         String.replace(acc, "%{#{key}}", to_string(value))
142       end)
143     end)
144   end
145 end