aboutsummaryrefslogtreecommitdiff
path: root/lib/pleroma/workers
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pleroma/workers')
-rw-r--r--lib/pleroma/workers/attachments_cleanup_worker.ex112
-rw-r--r--lib/pleroma/workers/background_worker.ex49
-rw-r--r--lib/pleroma/workers/backup_worker.ex74
-rw-r--r--lib/pleroma/workers/cron/digest_emails_worker.ex61
-rw-r--r--lib/pleroma/workers/cron/new_users_digest_worker.ex63
-rw-r--r--lib/pleroma/workers/mailer_worker.ex18
-rw-r--r--lib/pleroma/workers/mute_expire_worker.ex23
-rw-r--r--lib/pleroma/workers/poll_worker.ex48
-rw-r--r--lib/pleroma/workers/publisher_worker.ex28
-rw-r--r--lib/pleroma/workers/purge_expired_activity.ex75
-rw-r--r--lib/pleroma/workers/purge_expired_filter.ex46
-rw-r--r--lib/pleroma/workers/purge_expired_token.ex32
-rw-r--r--lib/pleroma/workers/receiver_worker.ex26
-rw-r--r--lib/pleroma/workers/remote_fetcher_worker.ex17
-rw-r--r--lib/pleroma/workers/scheduled_activity_worker.ex61
-rw-r--r--lib/pleroma/workers/transmogrifier_worker.ex18
-rw-r--r--lib/pleroma/workers/web_pusher_worker.ex23
-rw-r--r--lib/pleroma/workers/worker_helper.ex48
18 files changed, 822 insertions, 0 deletions
diff --git a/lib/pleroma/workers/attachments_cleanup_worker.ex b/lib/pleroma/workers/attachments_cleanup_worker.ex
new file mode 100644
index 0000000..4c17640
--- /dev/null
+++ b/lib/pleroma/workers/attachments_cleanup_worker.ex
@@ -0,0 +1,112 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.AttachmentsCleanupWorker do
+ import Ecto.Query
+
+ alias Pleroma.Object
+ alias Pleroma.Repo
+
+ use Pleroma.Workers.WorkerHelper, queue: "attachments_cleanup"
+
+ @impl Oban.Worker
+ def perform(%Job{
+ args: %{
+ "op" => "cleanup_attachments",
+ "object" => %{"data" => %{"attachment" => [_ | _] = attachments, "actor" => actor}}
+ }
+ }) do
+ if Pleroma.Config.get([:instance, :cleanup_attachments], false) do
+ attachments
+ |> Enum.flat_map(fn item -> Enum.map(item["url"], & &1["href"]) end)
+ |> fetch_objects
+ |> prepare_objects(actor, Enum.map(attachments, & &1["name"]))
+ |> filter_objects
+ |> do_clean
+ end
+
+ {:ok, :success}
+ end
+
+ def perform(%Job{args: %{"op" => "cleanup_attachments", "object" => _object}}), do: {:ok, :skip}
+
+ @impl Oban.Worker
+ def timeout(_job), do: :timer.seconds(900)
+
+ defp do_clean({object_ids, attachment_urls}) do
+ uploader = Pleroma.Config.get([Pleroma.Upload, :uploader])
+
+ base_url =
+ String.trim_trailing(
+ Pleroma.Upload.base_url(),
+ "/"
+ )
+
+ Enum.each(attachment_urls, fn href ->
+ href
+ |> String.trim_leading("#{base_url}")
+ |> uploader.delete_file()
+ end)
+
+ delete_objects(object_ids)
+ end
+
+ defp delete_objects([_ | _] = object_ids) do
+ Repo.delete_all(from(o in Object, where: o.id in ^object_ids))
+ end
+
+ defp delete_objects(_), do: :ok
+
+ # we should delete 1 object for any given attachment, but don't delete
+ # files if there are more than 1 object for it
+ defp filter_objects(objects) do
+ Enum.reduce(objects, {[], []}, fn {href, %{id: id, count: count}}, {ids, hrefs} ->
+ with 1 <- count do
+ {ids ++ [id], hrefs ++ [href]}
+ else
+ _ -> {ids ++ [id], hrefs}
+ end
+ end)
+ end
+
+ defp prepare_objects(objects, actor, names) do
+ objects
+ |> Enum.reduce(%{}, fn %{
+ id: id,
+ data: %{
+ "url" => [%{"href" => href}],
+ "actor" => obj_actor,
+ "name" => name
+ }
+ },
+ acc ->
+ Map.update(acc, href, %{id: id, count: 1}, fn val ->
+ case obj_actor == actor and name in names do
+ true ->
+ # set id of the actor's object that will be deleted
+ %{val | id: id, count: val.count + 1}
+
+ false ->
+ # another actor's object, just increase count to not delete file
+ %{val | count: val.count + 1}
+ end
+ end)
+ end)
+ end
+
+ defp fetch_objects(hrefs) do
+ from(o in Object,
+ where:
+ fragment(
+ "to_jsonb(array(select jsonb_array_elements((?)#>'{url}') ->> 'href' where jsonb_typeof((?)#>'{url}') = 'array'))::jsonb \\?| (?)",
+ o.data,
+ o.data,
+ ^hrefs
+ )
+ )
+ # The query above can be time consumptive on large instances until we
+ # refactor how uploads are stored
+ |> Repo.all(timeout: :infinity)
+ end
+end
diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex
new file mode 100644
index 0000000..7944176
--- /dev/null
+++ b/lib/pleroma/workers/background_worker.ex
@@ -0,0 +1,49 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.BackgroundWorker do
+ alias Pleroma.Instances.Instance
+ alias Pleroma.User
+
+ use Pleroma.Workers.WorkerHelper, queue: "background"
+
+ @impl Oban.Worker
+
+ def perform(%Job{args: %{"op" => "user_activation", "user_id" => user_id, "status" => status}}) do
+ user = User.get_cached_by_id(user_id)
+ User.perform(:set_activation_async, user, status)
+ end
+
+ def perform(%Job{args: %{"op" => "delete_user", "user_id" => user_id}}) do
+ user = User.get_cached_by_id(user_id)
+ User.perform(:delete, user)
+ end
+
+ def perform(%Job{args: %{"op" => "force_password_reset", "user_id" => user_id}}) do
+ user = User.get_cached_by_id(user_id)
+ User.perform(:force_password_reset, user)
+ end
+
+ def perform(%Job{args: %{"op" => op, "user_id" => user_id, "identifiers" => identifiers}})
+ when op in ["blocks_import", "follow_import", "mutes_import"] do
+ user = User.get_cached_by_id(user_id)
+ {:ok, User.Import.perform(String.to_atom(op), user, identifiers)}
+ end
+
+ def perform(%Job{
+ args: %{"op" => "move_following", "origin_id" => origin_id, "target_id" => target_id}
+ }) do
+ origin = User.get_cached_by_id(origin_id)
+ target = User.get_cached_by_id(target_id)
+
+ Pleroma.FollowingRelationship.move_following(origin, target)
+ end
+
+ def perform(%Job{args: %{"op" => "delete_instance", "host" => host}}) do
+ Instance.perform(:delete_instance, host)
+ end
+
+ @impl Oban.Worker
+ def timeout(_job), do: :timer.seconds(900)
+end
diff --git a/lib/pleroma/workers/backup_worker.ex b/lib/pleroma/workers/backup_worker.ex
new file mode 100644
index 0000000..12ee70f
--- /dev/null
+++ b/lib/pleroma/workers/backup_worker.ex
@@ -0,0 +1,74 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.BackupWorker do
+ use Oban.Worker, queue: :backup, max_attempts: 1
+
+ alias Oban.Job
+ alias Pleroma.User.Backup
+
+ def process(backup, admin_user_id \\ nil) do
+ %{"op" => "process", "backup_id" => backup.id, "admin_user_id" => admin_user_id}
+ |> new()
+ |> Oban.insert()
+ end
+
+ def schedule_deletion(backup) do
+ days = Pleroma.Config.get([Backup, :purge_after_days])
+ time = 60 * 60 * 24 * days
+ scheduled_at = Calendar.NaiveDateTime.add!(backup.inserted_at, time)
+
+ %{"op" => "delete", "backup_id" => backup.id}
+ |> new(scheduled_at: scheduled_at)
+ |> Oban.insert()
+ end
+
+ def delete(backup) do
+ %{"op" => "delete", "backup_id" => backup.id}
+ |> new()
+ |> Oban.insert()
+ end
+
+ @impl Oban.Worker
+ def perform(%Job{
+ args: %{"op" => "process", "backup_id" => backup_id, "admin_user_id" => admin_user_id}
+ }) do
+ with {:ok, %Backup{} = backup} <-
+ backup_id |> Backup.get() |> Backup.process(),
+ {:ok, _job} <- schedule_deletion(backup),
+ :ok <- Backup.remove_outdated(backup),
+ :ok <- maybe_deliver_email(backup, admin_user_id) do
+ {:ok, backup}
+ end
+ end
+
+ def perform(%Job{args: %{"op" => "delete", "backup_id" => backup_id}}) do
+ case Backup.get(backup_id) do
+ %Backup{} = backup -> Backup.delete(backup)
+ nil -> :ok
+ end
+ end
+
+ @impl Oban.Worker
+ def timeout(_job), do: :timer.seconds(900)
+
+ defp has_email?(user) do
+ not is_nil(user.email) and user.email != ""
+ end
+
+ defp maybe_deliver_email(backup, admin_user_id) do
+ has_mailer = Pleroma.Config.get([Pleroma.Emails.Mailer, :enabled])
+ backup = backup |> Pleroma.Repo.preload(:user)
+
+ if has_email?(backup.user) and has_mailer do
+ backup
+ |> Pleroma.Emails.UserEmail.backup_is_ready_email(admin_user_id)
+ |> Pleroma.Emails.Mailer.deliver()
+
+ :ok
+ else
+ :ok
+ end
+ end
+end
diff --git a/lib/pleroma/workers/cron/digest_emails_worker.ex b/lib/pleroma/workers/cron/digest_emails_worker.ex
new file mode 100644
index 0000000..1540c16
--- /dev/null
+++ b/lib/pleroma/workers/cron/digest_emails_worker.ex
@@ -0,0 +1,61 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Cron.DigestEmailsWorker do
+ @moduledoc """
+ The worker to send digest emails.
+ """
+
+ use Oban.Worker, queue: "digest_emails"
+
+ alias Pleroma.Config
+ alias Pleroma.Emails
+ alias Pleroma.Repo
+ alias Pleroma.User
+
+ import Ecto.Query
+
+ require Logger
+
+ @impl Oban.Worker
+ def perform(_job) do
+ config = Config.get([:email_notifications, :digest])
+
+ if config[:active] do
+ negative_interval = -Map.fetch!(config, :interval)
+ inactivity_threshold = Map.fetch!(config, :inactivity_threshold)
+ inactive_users_query = User.list_inactive_users_query(inactivity_threshold)
+
+ now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
+
+ from(u in inactive_users_query,
+ where: fragment(~s(? ->'digest' @> 'true'), u.email_notifications),
+ where: not is_nil(u.email),
+ where: u.last_digest_emailed_at < datetime_add(^now, ^negative_interval, "day"),
+ select: u
+ )
+ |> Repo.all()
+ |> send_emails
+ end
+
+ :ok
+ end
+
+ def send_emails(users) do
+ Enum.each(users, &send_email/1)
+ end
+
+ @doc """
+ Send digest email to the given user.
+ Updates `last_digest_emailed_at` field for the user and returns the updated user.
+ """
+ @spec send_email(User.t()) :: User.t()
+ def send_email(user) do
+ with %Swoosh.Email{} = email <- Emails.UserEmail.digest_email(user) do
+ Emails.Mailer.deliver_async(email)
+ end
+
+ User.touch_last_digest_emailed_at(user)
+ end
+end
diff --git a/lib/pleroma/workers/cron/new_users_digest_worker.ex b/lib/pleroma/workers/cron/new_users_digest_worker.ex
new file mode 100644
index 0000000..267fe28
--- /dev/null
+++ b/lib/pleroma/workers/cron/new_users_digest_worker.ex
@@ -0,0 +1,63 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Cron.NewUsersDigestWorker do
+ alias Pleroma.Activity
+ alias Pleroma.Repo
+ alias Pleroma.User
+
+ import Ecto.Query
+
+ use Pleroma.Workers.WorkerHelper, queue: "new_users_digest"
+
+ @impl Oban.Worker
+ def perform(_job) do
+ if Pleroma.Config.get([Pleroma.Emails.NewUsersDigestEmail, :enabled]) do
+ today = NaiveDateTime.utc_now() |> Timex.beginning_of_day()
+
+ a_day_ago =
+ today
+ |> Timex.shift(days: -1)
+ |> Timex.beginning_of_day()
+
+ users_and_statuses =
+ %{
+ local: true,
+ order_by: :inserted_at
+ }
+ |> User.Query.build()
+ |> where([u], u.inserted_at >= ^a_day_ago and u.inserted_at < ^today)
+ |> Repo.all()
+ |> Enum.map(fn user ->
+ latest_status =
+ Activity
+ |> Activity.Queries.by_actor(user.ap_id)
+ |> Activity.Queries.by_type("Create")
+ |> Activity.with_preloaded_object()
+ |> order_by(desc: :inserted_at)
+ |> limit(1)
+ |> Repo.one()
+
+ total_statuses =
+ Activity
+ |> Activity.Queries.by_actor(user.ap_id)
+ |> Activity.Queries.by_type("Create")
+ |> Repo.aggregate(:count, :id)
+
+ {user, total_statuses, latest_status}
+ end)
+
+ if users_and_statuses != [] do
+ %{is_admin: true}
+ |> User.Query.build()
+ |> where([u], not is_nil(u.email))
+ |> Repo.all()
+ |> Enum.map(&Pleroma.Emails.NewUsersDigestEmail.new_users(&1, users_and_statuses))
+ |> Enum.each(&Pleroma.Emails.Mailer.deliver/1)
+ end
+ end
+
+ :ok
+ end
+end
diff --git a/lib/pleroma/workers/mailer_worker.ex b/lib/pleroma/workers/mailer_worker.ex
new file mode 100644
index 0000000..9407165
--- /dev/null
+++ b/lib/pleroma/workers/mailer_worker.ex
@@ -0,0 +1,18 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.MailerWorker do
+ use Pleroma.Workers.WorkerHelper, queue: "mailer"
+
+ @impl Oban.Worker
+ def perform(%Job{args: %{"op" => "email", "encoded_email" => encoded_email, "config" => config}}) do
+ encoded_email
+ |> Base.decode64!()
+ |> :erlang.binary_to_term()
+ |> Pleroma.Emails.Mailer.deliver(config)
+ end
+
+ @impl Oban.Worker
+ def timeout(_job), do: :timer.seconds(5)
+end
diff --git a/lib/pleroma/workers/mute_expire_worker.ex b/lib/pleroma/workers/mute_expire_worker.ex
new file mode 100644
index 0000000..8ce458d
--- /dev/null
+++ b/lib/pleroma/workers/mute_expire_worker.ex
@@ -0,0 +1,23 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.MuteExpireWorker do
+ use Pleroma.Workers.WorkerHelper, queue: "mute_expire"
+
+ @impl Oban.Worker
+ def perform(%Job{args: %{"op" => "unmute_user", "muter_id" => muter_id, "mutee_id" => mutee_id}}) do
+ Pleroma.User.unmute(muter_id, mutee_id)
+ :ok
+ end
+
+ def perform(%Job{
+ args: %{"op" => "unmute_conversation", "user_id" => user_id, "activity_id" => activity_id}
+ }) do
+ Pleroma.Web.CommonAPI.remove_mute(user_id, activity_id)
+ :ok
+ end
+
+ @impl Oban.Worker
+ def timeout(_job), do: :timer.seconds(5)
+end
diff --git a/lib/pleroma/workers/poll_worker.ex b/lib/pleroma/workers/poll_worker.ex
new file mode 100644
index 0000000..022d026
--- /dev/null
+++ b/lib/pleroma/workers/poll_worker.ex
@@ -0,0 +1,48 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.PollWorker do
+ @moduledoc """
+ Generates notifications when a poll ends.
+ """
+ use Pleroma.Workers.WorkerHelper, queue: "poll_notifications"
+
+ alias Pleroma.Activity
+ alias Pleroma.Notification
+ alias Pleroma.Object
+
+ @impl Oban.Worker
+ def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do
+ with %Activity{} = activity <- find_poll_activity(activity_id) do
+ Notification.create_poll_notifications(activity)
+ end
+ end
+
+ @impl Oban.Worker
+ def timeout(_job), do: :timer.seconds(5)
+
+ defp find_poll_activity(activity_id) do
+ with nil <- Activity.get_by_id(activity_id) do
+ {:error, :poll_activity_not_found}
+ end
+ end
+
+ def schedule_poll_end(%Activity{data: %{"type" => "Create"}, id: activity_id} = activity) do
+ with %Object{data: %{"type" => "Question", "closed" => closed}} when is_binary(closed) <-
+ Object.normalize(activity),
+ {:ok, end_time} <- NaiveDateTime.from_iso8601(closed),
+ :gt <- NaiveDateTime.compare(end_time, NaiveDateTime.utc_now()) do
+ %{
+ op: "poll_end",
+ activity_id: activity_id
+ }
+ |> new(scheduled_at: end_time)
+ |> Oban.insert()
+ else
+ _ -> {:error, activity}
+ end
+ end
+
+ def schedule_poll_end(activity), do: {:error, activity}
+end
diff --git a/lib/pleroma/workers/publisher_worker.ex b/lib/pleroma/workers/publisher_worker.ex
new file mode 100644
index 0000000..598ae37
--- /dev/null
+++ b/lib/pleroma/workers/publisher_worker.ex
@@ -0,0 +1,28 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.PublisherWorker do
+ alias Pleroma.Activity
+ alias Pleroma.Web.Federator
+
+ use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
+
+ def backoff(%Job{attempt: attempt}) when is_integer(attempt) do
+ Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5)
+ end
+
+ @impl Oban.Worker
+ def perform(%Job{args: %{"op" => "publish", "activity_id" => activity_id}}) do
+ activity = Activity.get_by_id(activity_id)
+ Federator.perform(:publish, activity)
+ end
+
+ def perform(%Job{args: %{"op" => "publish_one", "module" => module_name, "params" => params}}) do
+ params = Map.new(params, fn {k, v} -> {String.to_atom(k), v} end)
+ Federator.perform(:publish_one, String.to_atom(module_name), params)
+ end
+
+ @impl Oban.Worker
+ def timeout(_job), do: :timer.seconds(10)
+end
diff --git a/lib/pleroma/workers/purge_expired_activity.ex b/lib/pleroma/workers/purge_expired_activity.ex
new file mode 100644
index 0000000..e554684
--- /dev/null
+++ b/lib/pleroma/workers/purge_expired_activity.ex
@@ -0,0 +1,75 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.PurgeExpiredActivity do
+ @moduledoc """
+ Worker which purges expired activity.
+ """
+
+ use Oban.Worker, queue: :activity_expiration, max_attempts: 1, unique: [period: :infinity]
+
+ import Ecto.Query
+
+ alias Pleroma.Activity
+
+ @spec enqueue(map()) ::
+ {:ok, Oban.Job.t()}
+ | {:error, :expired_activities_disabled}
+ | {:error, :expiration_too_close}
+ def enqueue(args) do
+ with true <- enabled?() do
+ {scheduled_at, args} = Map.pop(args, :expires_at)
+
+ args
+ |> new(scheduled_at: scheduled_at)
+ |> Oban.insert()
+ end
+ end
+
+ @impl true
+ def perform(%Oban.Job{args: %{"activity_id" => id}}) do
+ with %Activity{} = activity <- find_activity(id),
+ %Pleroma.User{} = user <- find_user(activity.object.data["actor"]) do
+ Pleroma.Web.CommonAPI.delete(activity.id, user)
+ end
+ end
+
+ @impl Oban.Worker
+ def timeout(_job), do: :timer.seconds(5)
+
+ defp enabled? do
+ with false <- Pleroma.Config.get([__MODULE__, :enabled], false) do
+ {:error, :expired_activities_disabled}
+ end
+ end
+
+ defp find_activity(id) do
+ with nil <- Activity.get_by_id_with_object(id) do
+ {:error, :activity_not_found}
+ end
+ end
+
+ defp find_user(ap_id) do
+ with nil <- Pleroma.User.get_by_ap_id(ap_id) do
+ {:error, :user_not_found}
+ end
+ end
+
+ def get_expiration(id) do
+ from(j in Oban.Job,
+ where: j.state == "scheduled",
+ where: j.queue == "activity_expiration",
+ where: fragment("?->>'activity_id' = ?", j.args, ^id)
+ )
+ |> Pleroma.Repo.one()
+ end
+
+ @spec expires_late_enough?(DateTime.t()) :: boolean()
+ def expires_late_enough?(scheduled_at) do
+ now = DateTime.utc_now()
+ diff = DateTime.diff(scheduled_at, now, :millisecond)
+ min_lifetime = Pleroma.Config.get([__MODULE__, :min_lifetime], 600)
+ diff > :timer.seconds(min_lifetime)
+ end
+end
diff --git a/lib/pleroma/workers/purge_expired_filter.ex b/lib/pleroma/workers/purge_expired_filter.ex
new file mode 100644
index 0000000..9114aeb
--- /dev/null
+++ b/lib/pleroma/workers/purge_expired_filter.ex
@@ -0,0 +1,46 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.PurgeExpiredFilter do
+ @moduledoc """
+ Worker which purges expired filters
+ """
+
+ use Oban.Worker, queue: :filter_expiration, max_attempts: 1, unique: [period: :infinity]
+
+ import Ecto.Query
+
+ alias Oban.Job
+ alias Pleroma.Repo
+
+ @spec enqueue(%{filter_id: integer(), expires_at: DateTime.t()}) ::
+ {:ok, Job.t()} | {:error, Ecto.Changeset.t()}
+ def enqueue(args) do
+ {scheduled_at, args} = Map.pop(args, :expires_at)
+
+ args
+ |> new(scheduled_at: scheduled_at)
+ |> Oban.insert()
+ end
+
+ @impl true
+ def perform(%Job{args: %{"filter_id" => id}}) do
+ Pleroma.Filter
+ |> Repo.get(id)
+ |> Repo.delete()
+ end
+
+ @impl Oban.Worker
+ def timeout(_job), do: :timer.seconds(5)
+
+ @spec get_expiration(pos_integer()) :: Job.t() | nil
+ def get_expiration(id) do
+ from(j in Job,
+ where: j.state == "scheduled",
+ where: j.queue == "filter_expiration",
+ where: fragment("?->'filter_id' = ?", j.args, ^id)
+ )
+ |> Repo.one()
+ end
+end
diff --git a/lib/pleroma/workers/purge_expired_token.ex b/lib/pleroma/workers/purge_expired_token.ex
new file mode 100644
index 0000000..2ccd9e8
--- /dev/null
+++ b/lib/pleroma/workers/purge_expired_token.ex
@@ -0,0 +1,32 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.PurgeExpiredToken do
+ @moduledoc """
+ Worker which purges expired OAuth tokens
+ """
+
+ use Oban.Worker, queue: :token_expiration, max_attempts: 1
+
+ @spec enqueue(%{token_id: integer(), valid_until: DateTime.t(), mod: module()}) ::
+ {:ok, Oban.Job.t()} | {:error, Ecto.Changeset.t()}
+ def enqueue(args) do
+ {scheduled_at, args} = Map.pop(args, :valid_until)
+
+ args
+ |> __MODULE__.new(scheduled_at: scheduled_at)
+ |> Oban.insert()
+ end
+
+ @impl true
+ def perform(%Oban.Job{args: %{"token_id" => id, "mod" => module}}) do
+ module
+ |> String.to_existing_atom()
+ |> Pleroma.Repo.get(id)
+ |> Pleroma.Repo.delete()
+ end
+
+ @impl Oban.Worker
+ def timeout(_job), do: :timer.seconds(5)
+end
diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex
new file mode 100644
index 0000000..cf1bb62
--- /dev/null
+++ b/lib/pleroma/workers/receiver_worker.ex
@@ -0,0 +1,26 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.ReceiverWorker do
+ alias Pleroma.Web.Federator
+
+ use Pleroma.Workers.WorkerHelper, queue: "federator_incoming"
+
+ @impl Oban.Worker
+ def perform(%Job{args: %{"op" => "incoming_ap_doc", "params" => params}}) do
+ with {:ok, res} <- Federator.perform(:incoming_ap_doc, params) do
+ {:ok, res}
+ else
+ {:error, :origin_containment_failed} -> {:cancel, :origin_containment_failed}
+ {:error, :already_present} -> {:cancel, :already_present}
+ {:error, {:validate_object, reason}} -> {:cancel, reason}
+ {:error, {:error, {:validate, reason}}} -> {:cancel, reason}
+ {:error, {:reject, reason}} -> {:cancel, reason}
+ e -> e
+ end
+ end
+
+ @impl Oban.Worker
+ def timeout(_job), do: :timer.seconds(5)
+end
diff --git a/lib/pleroma/workers/remote_fetcher_worker.ex b/lib/pleroma/workers/remote_fetcher_worker.ex
new file mode 100644
index 0000000..d2a77aa
--- /dev/null
+++ b/lib/pleroma/workers/remote_fetcher_worker.ex
@@ -0,0 +1,17 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.RemoteFetcherWorker do
+ alias Pleroma.Object.Fetcher
+
+ use Pleroma.Workers.WorkerHelper, queue: "remote_fetcher"
+
+ @impl Oban.Worker
+ def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do
+ {:ok, _object} = Fetcher.fetch_object_from_id(id, depth: args["depth"])
+ end
+
+ @impl Oban.Worker
+ def timeout(_job), do: :timer.seconds(10)
+end
diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex
new file mode 100644
index 0000000..4df84d0
--- /dev/null
+++ b/lib/pleroma/workers/scheduled_activity_worker.ex
@@ -0,0 +1,61 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.ScheduledActivityWorker do
+ @moduledoc """
+ The worker to post scheduled activity.
+ """
+
+ use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities"
+
+ alias Pleroma.Repo
+ alias Pleroma.ScheduledActivity
+ alias Pleroma.User
+
+ require Logger
+
+ @impl Oban.Worker
+ def perform(%Job{args: %{"activity_id" => activity_id}}) do
+ with %ScheduledActivity{} = scheduled_activity <- find_scheduled_activity(activity_id),
+ %User{} = user <- find_user(scheduled_activity.user_id) do
+ params = atomize_keys(scheduled_activity.params)
+
+ Repo.transaction(fn ->
+ {:ok, activity} = Pleroma.Web.CommonAPI.post(user, params)
+ {:ok, _} = ScheduledActivity.delete(scheduled_activity)
+ activity
+ end)
+ else
+ {:error, :scheduled_activity_not_found} = error ->
+ Logger.error("#{__MODULE__} Couldn't find scheduled activity: #{activity_id}")
+ error
+
+ {:error, :user_not_found} = error ->
+ Logger.error("#{__MODULE__} Couldn't find user for scheduled activity: #{activity_id}")
+ error
+ end
+ end
+
+ @impl Oban.Worker
+ def timeout(_job), do: :timer.seconds(5)
+
+ defp find_scheduled_activity(id) do
+ with nil <- Repo.get(ScheduledActivity, id) do
+ {:error, :scheduled_activity_not_found}
+ end
+ end
+
+ defp find_user(id) do
+ with nil <- User.get_cached_by_id(id) do
+ {:error, :user_not_found}
+ end
+ end
+
+ defp atomize_keys(map) do
+ Map.new(map, fn
+ {key, value} when is_map(value) -> {String.to_existing_atom(key), atomize_keys(value)}
+ {key, value} -> {String.to_existing_atom(key), value}
+ end)
+ end
+end
diff --git a/lib/pleroma/workers/transmogrifier_worker.ex b/lib/pleroma/workers/transmogrifier_worker.ex
new file mode 100644
index 0000000..1f3f538
--- /dev/null
+++ b/lib/pleroma/workers/transmogrifier_worker.ex
@@ -0,0 +1,18 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.TransmogrifierWorker do
+ alias Pleroma.User
+
+ use Pleroma.Workers.WorkerHelper, queue: "transmogrifier"
+
+ @impl Oban.Worker
+ def perform(%Job{args: %{"op" => "user_upgrade", "user_id" => user_id}}) do
+ user = User.get_cached_by_id(user_id)
+ Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user)
+ end
+
+ @impl Oban.Worker
+ def timeout(_job), do: :timer.seconds(5)
+end
diff --git a/lib/pleroma/workers/web_pusher_worker.ex b/lib/pleroma/workers/web_pusher_worker.ex
new file mode 100644
index 0000000..67e84b0
--- /dev/null
+++ b/lib/pleroma/workers/web_pusher_worker.ex
@@ -0,0 +1,23 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.WebPusherWorker do
+ alias Pleroma.Notification
+ alias Pleroma.Repo
+
+ use Pleroma.Workers.WorkerHelper, queue: "web_push"
+
+ @impl Oban.Worker
+ def perform(%Job{args: %{"op" => "web_push", "notification_id" => notification_id}}) do
+ notification =
+ Notification
+ |> Repo.get(notification_id)
+ |> Repo.preload([:activity, :user])
+
+ Pleroma.Web.Push.Impl.perform(notification)
+ end
+
+ @impl Oban.Worker
+ def timeout(_job), do: :timer.seconds(5)
+end
diff --git a/lib/pleroma/workers/worker_helper.ex b/lib/pleroma/workers/worker_helper.ex
new file mode 100644
index 0000000..1d20cbd
--- /dev/null
+++ b/lib/pleroma/workers/worker_helper.ex
@@ -0,0 +1,48 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.WorkerHelper do
+ alias Pleroma.Config
+ alias Pleroma.Workers.WorkerHelper
+
+ def worker_args(queue) do
+ case Config.get([:workers, :retries, queue]) do
+ nil -> []
+ max_attempts -> [max_attempts: max_attempts]
+ end
+ end
+
+ def sidekiq_backoff(attempt, pow \\ 4, base_backoff \\ 15) do
+ backoff =
+ :math.pow(attempt, pow) +
+ base_backoff +
+ :rand.uniform(2 * base_backoff) * attempt
+
+ trunc(backoff)
+ end
+
+ defmacro __using__(opts) do
+ caller_module = __CALLER__.module
+ queue = Keyword.fetch!(opts, :queue)
+
+ quote do
+ # Note: `max_attempts` is intended to be overridden in `new/2` call
+ use Oban.Worker,
+ queue: unquote(queue),
+ max_attempts: 1
+
+ alias Oban.Job
+
+ def enqueue(op, params, worker_args \\ []) do
+ params = Map.merge(%{"op" => op}, params)
+ queue_atom = String.to_atom(unquote(queue))
+ worker_args = worker_args ++ WorkerHelper.worker_args(queue_atom)
+
+ unquote(caller_module)
+ |> apply(:new, [params, worker_args])
+ |> Oban.insert()
+ end
+ end
+ end
+end