First
[anni] / test / support / oban_helpers.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Tests.ObanHelpers do
6   @moduledoc """
7   Oban test helpers.
8   """
9
10   require Ecto.Query
11
12   alias Pleroma.Repo
13
14   def wipe_all do
15     Repo.delete_all(Oban.Job)
16   end
17
18   def perform_all do
19     Oban.Job
20     |> Ecto.Query.where(state: "available")
21     |> Repo.all()
22     |> perform()
23   end
24
25   def perform(%Oban.Job{} = job) do
26     res = apply(String.to_existing_atom("Elixir." <> job.worker), :perform, [job])
27     Repo.delete(job)
28     res
29   end
30
31   def perform(jobs) when is_list(jobs) do
32     for job <- jobs, do: perform(job)
33   end
34
35   def member?(%{} = job_args, jobs) when is_list(jobs) do
36     Enum.any?(jobs, fn job ->
37       member?(job_args, job.args)
38     end)
39   end
40
41   def member?(%{} = test_attrs, %{} = attrs) do
42     Enum.all?(
43       test_attrs,
44       fn {k, _v} -> member?(test_attrs[k], attrs[k]) end
45     )
46   end
47
48   def member?(x, y), do: x == y
49 end