move to 2.5.5
[anni] / lib / pleroma / scheduled_activity.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.ScheduledActivity do
6   use Ecto.Schema
7
8   alias Ecto.Multi
9   alias Pleroma.Config
10   alias Pleroma.Repo
11   alias Pleroma.ScheduledActivity
12   alias Pleroma.User
13   alias Pleroma.Web.CommonAPI.Utils
14   alias Pleroma.Workers.ScheduledActivityWorker
15
16   import Ecto.Query
17   import Ecto.Changeset
18
19   @type t :: %__MODULE__{}
20
21   @min_offset :timer.minutes(5)
22
23   schema "scheduled_activities" do
24     belongs_to(:user, User, type: FlakeId.Ecto.CompatType)
25     field(:scheduled_at, :naive_datetime)
26     field(:params, :map)
27
28     timestamps()
29   end
30
31   def changeset(%ScheduledActivity{} = scheduled_activity, attrs) do
32     scheduled_activity
33     |> cast(attrs, [:scheduled_at, :params])
34     |> validate_required([:scheduled_at, :params])
35     |> validate_scheduled_at()
36     |> with_media_attachments()
37   end
38
39   defp with_media_attachments(
40          %{changes: %{params: %{"media_ids" => media_ids} = params}} = changeset
41        )
42        when is_list(media_ids) do
43     media_attachments =
44       Utils.attachments_from_ids(
45         %{media_ids: media_ids},
46         User.get_cached_by_id(changeset.data.user_id)
47       )
48
49     params =
50       params
51       |> Map.put("media_attachments", media_attachments)
52       |> Map.put("media_ids", media_ids)
53
54     put_change(changeset, :params, params)
55   end
56
57   defp with_media_attachments(changeset), do: changeset
58
59   def update_changeset(%ScheduledActivity{} = scheduled_activity, attrs) do
60     scheduled_activity
61     |> cast(attrs, [:scheduled_at])
62     |> validate_required([:scheduled_at])
63     |> validate_scheduled_at()
64   end
65
66   def validate_scheduled_at(changeset) do
67     validate_change(changeset, :scheduled_at, fn _, scheduled_at ->
68       cond do
69         not far_enough?(scheduled_at) ->
70           [scheduled_at: "must be at least 5 minutes from now"]
71
72         exceeds_daily_user_limit?(changeset.data.user_id, scheduled_at) ->
73           [scheduled_at: "daily limit exceeded"]
74
75         exceeds_total_user_limit?(changeset.data.user_id) ->
76           [scheduled_at: "total limit exceeded"]
77
78         true ->
79           []
80       end
81     end)
82   end
83
84   def exceeds_daily_user_limit?(user_id, scheduled_at) do
85     ScheduledActivity
86     |> where(user_id: ^user_id)
87     |> where([sa], type(sa.scheduled_at, :date) == type(^scheduled_at, :date))
88     |> select([sa], count(sa.id))
89     |> Repo.one()
90     |> Kernel.>=(Config.get([ScheduledActivity, :daily_user_limit]))
91   end
92
93   def exceeds_total_user_limit?(user_id) do
94     ScheduledActivity
95     |> where(user_id: ^user_id)
96     |> select([sa], count(sa.id))
97     |> Repo.one()
98     |> Kernel.>=(Config.get([ScheduledActivity, :total_user_limit]))
99   end
100
101   def far_enough?(scheduled_at) when is_binary(scheduled_at) do
102     with {:ok, scheduled_at} <- Ecto.Type.cast(:naive_datetime, scheduled_at) do
103       far_enough?(scheduled_at)
104     else
105       _ -> false
106     end
107   end
108
109   def far_enough?(scheduled_at) do
110     now = NaiveDateTime.utc_now()
111     diff = NaiveDateTime.diff(scheduled_at, now, :millisecond)
112     diff > @min_offset
113   end
114
115   def new(%User{} = user, attrs) do
116     changeset(%ScheduledActivity{user_id: user.id}, attrs)
117   end
118
119   @doc """
120   Creates ScheduledActivity and add to queue to perform at scheduled_at date
121   """
122   @spec create(User.t(), map()) :: {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
123   def create(%User{} = user, attrs) do
124     Multi.new()
125     |> Multi.insert(:scheduled_activity, new(user, attrs))
126     |> maybe_add_jobs(Config.get([ScheduledActivity, :enabled]))
127     |> Repo.transaction()
128     |> transaction_response
129   end
130
131   defp maybe_add_jobs(multi, true) do
132     multi
133     |> Multi.run(:scheduled_activity_job, fn _repo, %{scheduled_activity: activity} ->
134       %{activity_id: activity.id}
135       |> ScheduledActivityWorker.new(scheduled_at: activity.scheduled_at)
136       |> Oban.insert()
137     end)
138   end
139
140   defp maybe_add_jobs(multi, _), do: multi
141
142   def get(%User{} = user, scheduled_activity_id) do
143     ScheduledActivity
144     |> where(user_id: ^user.id)
145     |> where(id: ^scheduled_activity_id)
146     |> Repo.one()
147   end
148
149   @spec update(ScheduledActivity.t(), map()) ::
150           {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
151   def update(%ScheduledActivity{id: id} = scheduled_activity, attrs) do
152     with {:error, %Ecto.Changeset{valid?: true} = changeset} <-
153            {:error, update_changeset(scheduled_activity, attrs)} do
154       Multi.new()
155       |> Multi.update(:scheduled_activity, changeset)
156       |> Multi.update_all(:scheduled_job, job_query(id),
157         set: [scheduled_at: get_field(changeset, :scheduled_at)]
158       )
159       |> Repo.transaction()
160       |> transaction_response
161     end
162   end
163
164   @doc "Deletes a ScheduledActivity and linked jobs."
165   @spec delete(ScheduledActivity.t() | binary() | integer) ::
166           {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
167   def delete(%ScheduledActivity{id: id} = scheduled_activity) do
168     Multi.new()
169     |> Multi.delete(:scheduled_activity, scheduled_activity, stale_error_field: :id)
170     |> Multi.delete_all(:jobs, job_query(id))
171     |> Repo.transaction()
172     |> transaction_response
173   end
174
175   def delete(id) when is_binary(id) or is_integer(id) do
176     delete(%__MODULE__{id: id})
177   end
178
179   defp transaction_response(result) do
180     case result do
181       {:ok, %{scheduled_activity: scheduled_activity}} ->
182         {:ok, scheduled_activity}
183
184       {:error, _, changeset, _} ->
185         {:error, changeset}
186     end
187   end
188
189   def for_user_query(%User{} = user) do
190     ScheduledActivity
191     |> where(user_id: ^user.id)
192   end
193
194   def due_activities(offset \\ 0) do
195     naive_datetime =
196       NaiveDateTime.utc_now()
197       |> NaiveDateTime.add(offset, :millisecond)
198
199     ScheduledActivity
200     |> where([sa], sa.scheduled_at < ^naive_datetime)
201     |> Repo.all()
202   end
203
204   def job_query(scheduled_activity_id) do
205     from(j in Oban.Job,
206       where: j.queue == "scheduled_activities",
207       where: fragment("args ->> 'activity_id' = ?::text", ^to_string(scheduled_activity_id))
208     )
209   end
210 end