diff options
Diffstat (limited to 'lib/pleroma/workers')
| -rw-r--r-- | lib/pleroma/workers/attachments_cleanup_worker.ex | 112 | ||||
| -rw-r--r-- | lib/pleroma/workers/background_worker.ex | 49 | ||||
| -rw-r--r-- | lib/pleroma/workers/backup_worker.ex | 74 | ||||
| -rw-r--r-- | lib/pleroma/workers/cron/digest_emails_worker.ex | 61 | ||||
| -rw-r--r-- | lib/pleroma/workers/cron/new_users_digest_worker.ex | 63 | ||||
| -rw-r--r-- | lib/pleroma/workers/mailer_worker.ex | 18 | ||||
| -rw-r--r-- | lib/pleroma/workers/mute_expire_worker.ex | 23 | ||||
| -rw-r--r-- | lib/pleroma/workers/poll_worker.ex | 48 | ||||
| -rw-r--r-- | lib/pleroma/workers/publisher_worker.ex | 28 | ||||
| -rw-r--r-- | lib/pleroma/workers/purge_expired_activity.ex | 75 | ||||
| -rw-r--r-- | lib/pleroma/workers/purge_expired_filter.ex | 46 | ||||
| -rw-r--r-- | lib/pleroma/workers/purge_expired_token.ex | 32 | ||||
| -rw-r--r-- | lib/pleroma/workers/receiver_worker.ex | 26 | ||||
| -rw-r--r-- | lib/pleroma/workers/remote_fetcher_worker.ex | 17 | ||||
| -rw-r--r-- | lib/pleroma/workers/scheduled_activity_worker.ex | 61 | ||||
| -rw-r--r-- | lib/pleroma/workers/transmogrifier_worker.ex | 18 | ||||
| -rw-r--r-- | lib/pleroma/workers/web_pusher_worker.ex | 23 | ||||
| -rw-r--r-- | lib/pleroma/workers/worker_helper.ex | 48 |
18 files changed, 822 insertions, 0 deletions
diff --git a/lib/pleroma/workers/attachments_cleanup_worker.ex b/lib/pleroma/workers/attachments_cleanup_worker.ex new file mode 100644 index 0000000..4c17640 --- /dev/null +++ b/lib/pleroma/workers/attachments_cleanup_worker.ex @@ -0,0 +1,112 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.AttachmentsCleanupWorker do + import Ecto.Query + + alias Pleroma.Object + alias Pleroma.Repo + + use Pleroma.Workers.WorkerHelper, queue: "attachments_cleanup" + + @impl Oban.Worker + def perform(%Job{ + args: %{ + "op" => "cleanup_attachments", + "object" => %{"data" => %{"attachment" => [_ | _] = attachments, "actor" => actor}} + } + }) do + if Pleroma.Config.get([:instance, :cleanup_attachments], false) do + attachments + |> Enum.flat_map(fn item -> Enum.map(item["url"], & &1["href"]) end) + |> fetch_objects + |> prepare_objects(actor, Enum.map(attachments, & &1["name"])) + |> filter_objects + |> do_clean + end + + {:ok, :success} + end + + def perform(%Job{args: %{"op" => "cleanup_attachments", "object" => _object}}), do: {:ok, :skip} + + @impl Oban.Worker + def timeout(_job), do: :timer.seconds(900) + + defp do_clean({object_ids, attachment_urls}) do + uploader = Pleroma.Config.get([Pleroma.Upload, :uploader]) + + base_url = + String.trim_trailing( + Pleroma.Upload.base_url(), + "/" + ) + + Enum.each(attachment_urls, fn href -> + href + |> String.trim_leading("#{base_url}") + |> uploader.delete_file() + end) + + delete_objects(object_ids) + end + + defp delete_objects([_ | _] = object_ids) do + Repo.delete_all(from(o in Object, where: o.id in ^object_ids)) + end + + defp delete_objects(_), do: :ok + + # we should delete 1 object for any given attachment, but don't delete + # files if there are more than 1 object for it + defp filter_objects(objects) do + Enum.reduce(objects, {[], []}, fn {href, %{id: id, count: count}}, {ids, hrefs} -> + with 1 <- count do + {ids ++ [id], hrefs ++ [href]} + else + _ -> {ids ++ [id], hrefs} + end + end) + end + + defp prepare_objects(objects, actor, names) do + objects + |> Enum.reduce(%{}, fn %{ + id: id, + data: %{ + "url" => [%{"href" => href}], + "actor" => obj_actor, + "name" => name + } + }, + acc -> + Map.update(acc, href, %{id: id, count: 1}, fn val -> + case obj_actor == actor and name in names do + true -> + # set id of the actor's object that will be deleted + %{val | id: id, count: val.count + 1} + + false -> + # another actor's object, just increase count to not delete file + %{val | count: val.count + 1} + end + end) + end) + end + + defp fetch_objects(hrefs) do + from(o in Object, + where: + fragment( + "to_jsonb(array(select jsonb_array_elements((?)#>'{url}') ->> 'href' where jsonb_typeof((?)#>'{url}') = 'array'))::jsonb \\?| (?)", + o.data, + o.data, + ^hrefs + ) + ) + # The query above can be time consumptive on large instances until we + # refactor how uploads are stored + |> Repo.all(timeout: :infinity) + end +end diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex new file mode 100644 index 0000000..7944176 --- /dev/null +++ b/lib/pleroma/workers/background_worker.ex @@ -0,0 +1,49 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.BackgroundWorker do + alias Pleroma.Instances.Instance + alias Pleroma.User + + use Pleroma.Workers.WorkerHelper, queue: "background" + + @impl Oban.Worker + + def perform(%Job{args: %{"op" => "user_activation", "user_id" => user_id, "status" => status}}) do + user = User.get_cached_by_id(user_id) + User.perform(:set_activation_async, user, status) + end + + def perform(%Job{args: %{"op" => "delete_user", "user_id" => user_id}}) do + user = User.get_cached_by_id(user_id) + User.perform(:delete, user) + end + + def perform(%Job{args: %{"op" => "force_password_reset", "user_id" => user_id}}) do + user = User.get_cached_by_id(user_id) + User.perform(:force_password_reset, user) + end + + def perform(%Job{args: %{"op" => op, "user_id" => user_id, "identifiers" => identifiers}}) + when op in ["blocks_import", "follow_import", "mutes_import"] do + user = User.get_cached_by_id(user_id) + {:ok, User.Import.perform(String.to_atom(op), user, identifiers)} + end + + def perform(%Job{ + args: %{"op" => "move_following", "origin_id" => origin_id, "target_id" => target_id} + }) do + origin = User.get_cached_by_id(origin_id) + target = User.get_cached_by_id(target_id) + + Pleroma.FollowingRelationship.move_following(origin, target) + end + + def perform(%Job{args: %{"op" => "delete_instance", "host" => host}}) do + Instance.perform(:delete_instance, host) + end + + @impl Oban.Worker + def timeout(_job), do: :timer.seconds(900) +end diff --git a/lib/pleroma/workers/backup_worker.ex b/lib/pleroma/workers/backup_worker.ex new file mode 100644 index 0000000..12ee70f --- /dev/null +++ b/lib/pleroma/workers/backup_worker.ex @@ -0,0 +1,74 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.BackupWorker do + use Oban.Worker, queue: :backup, max_attempts: 1 + + alias Oban.Job + alias Pleroma.User.Backup + + def process(backup, admin_user_id \\ nil) do + %{"op" => "process", "backup_id" => backup.id, "admin_user_id" => admin_user_id} + |> new() + |> Oban.insert() + end + + def schedule_deletion(backup) do + days = Pleroma.Config.get([Backup, :purge_after_days]) + time = 60 * 60 * 24 * days + scheduled_at = Calendar.NaiveDateTime.add!(backup.inserted_at, time) + + %{"op" => "delete", "backup_id" => backup.id} + |> new(scheduled_at: scheduled_at) + |> Oban.insert() + end + + def delete(backup) do + %{"op" => "delete", "backup_id" => backup.id} + |> new() + |> Oban.insert() + end + + @impl Oban.Worker + def perform(%Job{ + args: %{"op" => "process", "backup_id" => backup_id, "admin_user_id" => admin_user_id} + }) do + with {:ok, %Backup{} = backup} <- + backup_id |> Backup.get() |> Backup.process(), + {:ok, _job} <- schedule_deletion(backup), + :ok <- Backup.remove_outdated(backup), + :ok <- maybe_deliver_email(backup, admin_user_id) do + {:ok, backup} + end + end + + def perform(%Job{args: %{"op" => "delete", "backup_id" => backup_id}}) do + case Backup.get(backup_id) do + %Backup{} = backup -> Backup.delete(backup) + nil -> :ok + end + end + + @impl Oban.Worker + def timeout(_job), do: :timer.seconds(900) + + defp has_email?(user) do + not is_nil(user.email) and user.email != "" + end + + defp maybe_deliver_email(backup, admin_user_id) do + has_mailer = Pleroma.Config.get([Pleroma.Emails.Mailer, :enabled]) + backup = backup |> Pleroma.Repo.preload(:user) + + if has_email?(backup.user) and has_mailer do + backup + |> Pleroma.Emails.UserEmail.backup_is_ready_email(admin_user_id) + |> Pleroma.Emails.Mailer.deliver() + + :ok + else + :ok + end + end +end diff --git a/lib/pleroma/workers/cron/digest_emails_worker.ex b/lib/pleroma/workers/cron/digest_emails_worker.ex new file mode 100644 index 0000000..1540c16 --- /dev/null +++ b/lib/pleroma/workers/cron/digest_emails_worker.ex @@ -0,0 +1,61 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Cron.DigestEmailsWorker do + @moduledoc """ + The worker to send digest emails. + """ + + use Oban.Worker, queue: "digest_emails" + + alias Pleroma.Config + alias Pleroma.Emails + alias Pleroma.Repo + alias Pleroma.User + + import Ecto.Query + + require Logger + + @impl Oban.Worker + def perform(_job) do + config = Config.get([:email_notifications, :digest]) + + if config[:active] do + negative_interval = -Map.fetch!(config, :interval) + inactivity_threshold = Map.fetch!(config, :inactivity_threshold) + inactive_users_query = User.list_inactive_users_query(inactivity_threshold) + + now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second) + + from(u in inactive_users_query, + where: fragment(~s(? ->'digest' @> 'true'), u.email_notifications), + where: not is_nil(u.email), + where: u.last_digest_emailed_at < datetime_add(^now, ^negative_interval, "day"), + select: u + ) + |> Repo.all() + |> send_emails + end + + :ok + end + + def send_emails(users) do + Enum.each(users, &send_email/1) + end + + @doc """ + Send digest email to the given user. + Updates `last_digest_emailed_at` field for the user and returns the updated user. + """ + @spec send_email(User.t()) :: User.t() + def send_email(user) do + with %Swoosh.Email{} = email <- Emails.UserEmail.digest_email(user) do + Emails.Mailer.deliver_async(email) + end + + User.touch_last_digest_emailed_at(user) + end +end diff --git a/lib/pleroma/workers/cron/new_users_digest_worker.ex b/lib/pleroma/workers/cron/new_users_digest_worker.ex new file mode 100644 index 0000000..267fe28 --- /dev/null +++ b/lib/pleroma/workers/cron/new_users_digest_worker.ex @@ -0,0 +1,63 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Cron.NewUsersDigestWorker do + alias Pleroma.Activity + alias Pleroma.Repo + alias Pleroma.User + + import Ecto.Query + + use Pleroma.Workers.WorkerHelper, queue: "new_users_digest" + + @impl Oban.Worker + def perform(_job) do + if Pleroma.Config.get([Pleroma.Emails.NewUsersDigestEmail, :enabled]) do + today = NaiveDateTime.utc_now() |> Timex.beginning_of_day() + + a_day_ago = + today + |> Timex.shift(days: -1) + |> Timex.beginning_of_day() + + users_and_statuses = + %{ + local: true, + order_by: :inserted_at + } + |> User.Query.build() + |> where([u], u.inserted_at >= ^a_day_ago and u.inserted_at < ^today) + |> Repo.all() + |> Enum.map(fn user -> + latest_status = + Activity + |> Activity.Queries.by_actor(user.ap_id) + |> Activity.Queries.by_type("Create") + |> Activity.with_preloaded_object() + |> order_by(desc: :inserted_at) + |> limit(1) + |> Repo.one() + + total_statuses = + Activity + |> Activity.Queries.by_actor(user.ap_id) + |> Activity.Queries.by_type("Create") + |> Repo.aggregate(:count, :id) + + {user, total_statuses, latest_status} + end) + + if users_and_statuses != [] do + %{is_admin: true} + |> User.Query.build() + |> where([u], not is_nil(u.email)) + |> Repo.all() + |> Enum.map(&Pleroma.Emails.NewUsersDigestEmail.new_users(&1, users_and_statuses)) + |> Enum.each(&Pleroma.Emails.Mailer.deliver/1) + end + end + + :ok + end +end diff --git a/lib/pleroma/workers/mailer_worker.ex b/lib/pleroma/workers/mailer_worker.ex new file mode 100644 index 0000000..9407165 --- /dev/null +++ b/lib/pleroma/workers/mailer_worker.ex @@ -0,0 +1,18 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.MailerWorker do + use Pleroma.Workers.WorkerHelper, queue: "mailer" + + @impl Oban.Worker + def perform(%Job{args: %{"op" => "email", "encoded_email" => encoded_email, "config" => config}}) do + encoded_email + |> Base.decode64!() + |> :erlang.binary_to_term() + |> Pleroma.Emails.Mailer.deliver(config) + end + + @impl Oban.Worker + def timeout(_job), do: :timer.seconds(5) +end diff --git a/lib/pleroma/workers/mute_expire_worker.ex b/lib/pleroma/workers/mute_expire_worker.ex new file mode 100644 index 0000000..8ce458d --- /dev/null +++ b/lib/pleroma/workers/mute_expire_worker.ex @@ -0,0 +1,23 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.MuteExpireWorker do + use Pleroma.Workers.WorkerHelper, queue: "mute_expire" + + @impl Oban.Worker + def perform(%Job{args: %{"op" => "unmute_user", "muter_id" => muter_id, "mutee_id" => mutee_id}}) do + Pleroma.User.unmute(muter_id, mutee_id) + :ok + end + + def perform(%Job{ + args: %{"op" => "unmute_conversation", "user_id" => user_id, "activity_id" => activity_id} + }) do + Pleroma.Web.CommonAPI.remove_mute(user_id, activity_id) + :ok + end + + @impl Oban.Worker + def timeout(_job), do: :timer.seconds(5) +end diff --git a/lib/pleroma/workers/poll_worker.ex b/lib/pleroma/workers/poll_worker.ex new file mode 100644 index 0000000..022d026 --- /dev/null +++ b/lib/pleroma/workers/poll_worker.ex @@ -0,0 +1,48 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.PollWorker do + @moduledoc """ + Generates notifications when a poll ends. + """ + use Pleroma.Workers.WorkerHelper, queue: "poll_notifications" + + alias Pleroma.Activity + alias Pleroma.Notification + alias Pleroma.Object + + @impl Oban.Worker + def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do + with %Activity{} = activity <- find_poll_activity(activity_id) do + Notification.create_poll_notifications(activity) + end + end + + @impl Oban.Worker + def timeout(_job), do: :timer.seconds(5) + + defp find_poll_activity(activity_id) do + with nil <- Activity.get_by_id(activity_id) do + {:error, :poll_activity_not_found} + end + end + + def schedule_poll_end(%Activity{data: %{"type" => "Create"}, id: activity_id} = activity) do + with %Object{data: %{"type" => "Question", "closed" => closed}} when is_binary(closed) <- + Object.normalize(activity), + {:ok, end_time} <- NaiveDateTime.from_iso8601(closed), + :gt <- NaiveDateTime.compare(end_time, NaiveDateTime.utc_now()) do + %{ + op: "poll_end", + activity_id: activity_id + } + |> new(scheduled_at: end_time) + |> Oban.insert() + else + _ -> {:error, activity} + end + end + + def schedule_poll_end(activity), do: {:error, activity} +end diff --git a/lib/pleroma/workers/publisher_worker.ex b/lib/pleroma/workers/publisher_worker.ex new file mode 100644 index 0000000..598ae37 --- /dev/null +++ b/lib/pleroma/workers/publisher_worker.ex @@ -0,0 +1,28 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.PublisherWorker do + alias Pleroma.Activity + alias Pleroma.Web.Federator + + use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing" + + def backoff(%Job{attempt: attempt}) when is_integer(attempt) do + Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5) + end + + @impl Oban.Worker + def perform(%Job{args: %{"op" => "publish", "activity_id" => activity_id}}) do + activity = Activity.get_by_id(activity_id) + Federator.perform(:publish, activity) + end + + def perform(%Job{args: %{"op" => "publish_one", "module" => module_name, "params" => params}}) do + params = Map.new(params, fn {k, v} -> {String.to_atom(k), v} end) + Federator.perform(:publish_one, String.to_atom(module_name), params) + end + + @impl Oban.Worker + def timeout(_job), do: :timer.seconds(10) +end diff --git a/lib/pleroma/workers/purge_expired_activity.ex b/lib/pleroma/workers/purge_expired_activity.ex new file mode 100644 index 0000000..e554684 --- /dev/null +++ b/lib/pleroma/workers/purge_expired_activity.ex @@ -0,0 +1,75 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.PurgeExpiredActivity do + @moduledoc """ + Worker which purges expired activity. + """ + + use Oban.Worker, queue: :activity_expiration, max_attempts: 1, unique: [period: :infinity] + + import Ecto.Query + + alias Pleroma.Activity + + @spec enqueue(map()) :: + {:ok, Oban.Job.t()} + | {:error, :expired_activities_disabled} + | {:error, :expiration_too_close} + def enqueue(args) do + with true <- enabled?() do + {scheduled_at, args} = Map.pop(args, :expires_at) + + args + |> new(scheduled_at: scheduled_at) + |> Oban.insert() + end + end + + @impl true + def perform(%Oban.Job{args: %{"activity_id" => id}}) do + with %Activity{} = activity <- find_activity(id), + %Pleroma.User{} = user <- find_user(activity.object.data["actor"]) do + Pleroma.Web.CommonAPI.delete(activity.id, user) + end + end + + @impl Oban.Worker + def timeout(_job), do: :timer.seconds(5) + + defp enabled? do + with false <- Pleroma.Config.get([__MODULE__, :enabled], false) do + {:error, :expired_activities_disabled} + end + end + + defp find_activity(id) do + with nil <- Activity.get_by_id_with_object(id) do + {:error, :activity_not_found} + end + end + + defp find_user(ap_id) do + with nil <- Pleroma.User.get_by_ap_id(ap_id) do + {:error, :user_not_found} + end + end + + def get_expiration(id) do + from(j in Oban.Job, + where: j.state == "scheduled", + where: j.queue == "activity_expiration", + where: fragment("?->>'activity_id' = ?", j.args, ^id) + ) + |> Pleroma.Repo.one() + end + + @spec expires_late_enough?(DateTime.t()) :: boolean() + def expires_late_enough?(scheduled_at) do + now = DateTime.utc_now() + diff = DateTime.diff(scheduled_at, now, :millisecond) + min_lifetime = Pleroma.Config.get([__MODULE__, :min_lifetime], 600) + diff > :timer.seconds(min_lifetime) + end +end diff --git a/lib/pleroma/workers/purge_expired_filter.ex b/lib/pleroma/workers/purge_expired_filter.ex new file mode 100644 index 0000000..9114aeb --- /dev/null +++ b/lib/pleroma/workers/purge_expired_filter.ex @@ -0,0 +1,46 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.PurgeExpiredFilter do + @moduledoc """ + Worker which purges expired filters + """ + + use Oban.Worker, queue: :filter_expiration, max_attempts: 1, unique: [period: :infinity] + + import Ecto.Query + + alias Oban.Job + alias Pleroma.Repo + + @spec enqueue(%{filter_id: integer(), expires_at: DateTime.t()}) :: + {:ok, Job.t()} | {:error, Ecto.Changeset.t()} + def enqueue(args) do + {scheduled_at, args} = Map.pop(args, :expires_at) + + args + |> new(scheduled_at: scheduled_at) + |> Oban.insert() + end + + @impl true + def perform(%Job{args: %{"filter_id" => id}}) do + Pleroma.Filter + |> Repo.get(id) + |> Repo.delete() + end + + @impl Oban.Worker + def timeout(_job), do: :timer.seconds(5) + + @spec get_expiration(pos_integer()) :: Job.t() | nil + def get_expiration(id) do + from(j in Job, + where: j.state == "scheduled", + where: j.queue == "filter_expiration", + where: fragment("?->'filter_id' = ?", j.args, ^id) + ) + |> Repo.one() + end +end diff --git a/lib/pleroma/workers/purge_expired_token.ex b/lib/pleroma/workers/purge_expired_token.ex new file mode 100644 index 0000000..2ccd9e8 --- /dev/null +++ b/lib/pleroma/workers/purge_expired_token.ex @@ -0,0 +1,32 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.PurgeExpiredToken do + @moduledoc """ + Worker which purges expired OAuth tokens + """ + + use Oban.Worker, queue: :token_expiration, max_attempts: 1 + + @spec enqueue(%{token_id: integer(), valid_until: DateTime.t(), mod: module()}) :: + {:ok, Oban.Job.t()} | {:error, Ecto.Changeset.t()} + def enqueue(args) do + {scheduled_at, args} = Map.pop(args, :valid_until) + + args + |> __MODULE__.new(scheduled_at: scheduled_at) + |> Oban.insert() + end + + @impl true + def perform(%Oban.Job{args: %{"token_id" => id, "mod" => module}}) do + module + |> String.to_existing_atom() + |> Pleroma.Repo.get(id) + |> Pleroma.Repo.delete() + end + + @impl Oban.Worker + def timeout(_job), do: :timer.seconds(5) +end diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex new file mode 100644 index 0000000..cf1bb62 --- /dev/null +++ b/lib/pleroma/workers/receiver_worker.ex @@ -0,0 +1,26 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.ReceiverWorker do + alias Pleroma.Web.Federator + + use Pleroma.Workers.WorkerHelper, queue: "federator_incoming" + + @impl Oban.Worker + def perform(%Job{args: %{"op" => "incoming_ap_doc", "params" => params}}) do + with {:ok, res} <- Federator.perform(:incoming_ap_doc, params) do + {:ok, res} + else + {:error, :origin_containment_failed} -> {:cancel, :origin_containment_failed} + {:error, :already_present} -> {:cancel, :already_present} + {:error, {:validate_object, reason}} -> {:cancel, reason} + {:error, {:error, {:validate, reason}}} -> {:cancel, reason} + {:error, {:reject, reason}} -> {:cancel, reason} + e -> e + end + end + + @impl Oban.Worker + def timeout(_job), do: :timer.seconds(5) +end diff --git a/lib/pleroma/workers/remote_fetcher_worker.ex b/lib/pleroma/workers/remote_fetcher_worker.ex new file mode 100644 index 0000000..d2a77aa --- /dev/null +++ b/lib/pleroma/workers/remote_fetcher_worker.ex @@ -0,0 +1,17 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.RemoteFetcherWorker do + alias Pleroma.Object.Fetcher + + use Pleroma.Workers.WorkerHelper, queue: "remote_fetcher" + + @impl Oban.Worker + def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do + {:ok, _object} = Fetcher.fetch_object_from_id(id, depth: args["depth"]) + end + + @impl Oban.Worker + def timeout(_job), do: :timer.seconds(10) +end diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex new file mode 100644 index 0000000..4df84d0 --- /dev/null +++ b/lib/pleroma/workers/scheduled_activity_worker.ex @@ -0,0 +1,61 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.ScheduledActivityWorker do + @moduledoc """ + The worker to post scheduled activity. + """ + + use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities" + + alias Pleroma.Repo + alias Pleroma.ScheduledActivity + alias Pleroma.User + + require Logger + + @impl Oban.Worker + def perform(%Job{args: %{"activity_id" => activity_id}}) do + with %ScheduledActivity{} = scheduled_activity <- find_scheduled_activity(activity_id), + %User{} = user <- find_user(scheduled_activity.user_id) do + params = atomize_keys(scheduled_activity.params) + + Repo.transaction(fn -> + {:ok, activity} = Pleroma.Web.CommonAPI.post(user, params) + {:ok, _} = ScheduledActivity.delete(scheduled_activity) + activity + end) + else + {:error, :scheduled_activity_not_found} = error -> + Logger.error("#{__MODULE__} Couldn't find scheduled activity: #{activity_id}") + error + + {:error, :user_not_found} = error -> + Logger.error("#{__MODULE__} Couldn't find user for scheduled activity: #{activity_id}") + error + end + end + + @impl Oban.Worker + def timeout(_job), do: :timer.seconds(5) + + defp find_scheduled_activity(id) do + with nil <- Repo.get(ScheduledActivity, id) do + {:error, :scheduled_activity_not_found} + end + end + + defp find_user(id) do + with nil <- User.get_cached_by_id(id) do + {:error, :user_not_found} + end + end + + defp atomize_keys(map) do + Map.new(map, fn + {key, value} when is_map(value) -> {String.to_existing_atom(key), atomize_keys(value)} + {key, value} -> {String.to_existing_atom(key), value} + end) + end +end diff --git a/lib/pleroma/workers/transmogrifier_worker.ex b/lib/pleroma/workers/transmogrifier_worker.ex new file mode 100644 index 0000000..1f3f538 --- /dev/null +++ b/lib/pleroma/workers/transmogrifier_worker.ex @@ -0,0 +1,18 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.TransmogrifierWorker do + alias Pleroma.User + + use Pleroma.Workers.WorkerHelper, queue: "transmogrifier" + + @impl Oban.Worker + def perform(%Job{args: %{"op" => "user_upgrade", "user_id" => user_id}}) do + user = User.get_cached_by_id(user_id) + Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user) + end + + @impl Oban.Worker + def timeout(_job), do: :timer.seconds(5) +end diff --git a/lib/pleroma/workers/web_pusher_worker.ex b/lib/pleroma/workers/web_pusher_worker.ex new file mode 100644 index 0000000..67e84b0 --- /dev/null +++ b/lib/pleroma/workers/web_pusher_worker.ex @@ -0,0 +1,23 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.WebPusherWorker do + alias Pleroma.Notification + alias Pleroma.Repo + + use Pleroma.Workers.WorkerHelper, queue: "web_push" + + @impl Oban.Worker + def perform(%Job{args: %{"op" => "web_push", "notification_id" => notification_id}}) do + notification = + Notification + |> Repo.get(notification_id) + |> Repo.preload([:activity, :user]) + + Pleroma.Web.Push.Impl.perform(notification) + end + + @impl Oban.Worker + def timeout(_job), do: :timer.seconds(5) +end diff --git a/lib/pleroma/workers/worker_helper.ex b/lib/pleroma/workers/worker_helper.ex new file mode 100644 index 0000000..1d20cbd --- /dev/null +++ b/lib/pleroma/workers/worker_helper.ex @@ -0,0 +1,48 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.WorkerHelper do + alias Pleroma.Config + alias Pleroma.Workers.WorkerHelper + + def worker_args(queue) do + case Config.get([:workers, :retries, queue]) do + nil -> [] + max_attempts -> [max_attempts: max_attempts] + end + end + + def sidekiq_backoff(attempt, pow \\ 4, base_backoff \\ 15) do + backoff = + :math.pow(attempt, pow) + + base_backoff + + :rand.uniform(2 * base_backoff) * attempt + + trunc(backoff) + end + + defmacro __using__(opts) do + caller_module = __CALLER__.module + queue = Keyword.fetch!(opts, :queue) + + quote do + # Note: `max_attempts` is intended to be overridden in `new/2` call + use Oban.Worker, + queue: unquote(queue), + max_attempts: 1 + + alias Oban.Job + + def enqueue(op, params, worker_args \\ []) do + params = Map.merge(%{"op" => op}, params) + queue_atom = String.to_atom(unquote(queue)) + worker_args = worker_args ++ WorkerHelper.worker_args(queue_atom) + + unquote(caller_module) + |> apply(:new, [params, worker_args]) + |> Oban.insert() + end + end + end +end |
