First
[anni] / lib / pleroma / marker.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Marker do
6   use Ecto.Schema
7
8   import Ecto.Changeset
9   import Ecto.Query
10
11   alias Ecto.Multi
12   alias Pleroma.Notification
13   alias Pleroma.Repo
14   alias Pleroma.User
15   alias __MODULE__
16
17   @timelines ["notifications"]
18   @type t :: %__MODULE__{}
19
20   schema "markers" do
21     field(:last_read_id, :string, default: "")
22     field(:timeline, :string, default: "")
23     field(:lock_version, :integer, default: 0)
24     field(:unread_count, :integer, default: 0, virtual: true)
25
26     belongs_to(:user, User, type: FlakeId.Ecto.CompatType)
27     timestamps()
28   end
29
30   @doc "Gets markers by user and timeline."
31   @spec get_markers(User.t(), list(String)) :: list(t())
32   def get_markers(user, timelines \\ []) do
33     user
34     |> get_query(timelines)
35     |> unread_count_query()
36     |> Repo.all()
37   end
38
39   @spec upsert(User.t(), map()) :: {:ok | :error, any()}
40   def upsert(%User{} = user, attrs) do
41     attrs
42     |> Map.take(@timelines)
43     |> Enum.reduce(Multi.new(), fn {timeline, timeline_attrs}, multi ->
44       marker =
45         user
46         |> get_marker(timeline)
47         |> changeset(timeline_attrs)
48
49       Multi.insert(multi, timeline, marker,
50         returning: true,
51         on_conflict: {:replace, [:last_read_id]},
52         conflict_target: [:user_id, :timeline]
53       )
54     end)
55     |> Repo.transaction()
56   end
57
58   @spec multi_set_last_read_id(Multi.t(), User.t(), String.t()) :: Multi.t()
59   def multi_set_last_read_id(multi, %User{} = user, "notifications") do
60     multi
61     |> Multi.run(:counters, fn _repo, _changes ->
62       {:ok, %{last_read_id: Repo.one(Notification.last_read_query(user))}}
63     end)
64     |> Multi.insert(
65       :marker,
66       fn %{counters: attrs} ->
67         %Marker{timeline: "notifications", user_id: user.id}
68         |> struct(attrs)
69         |> Ecto.Changeset.change()
70       end,
71       returning: true,
72       on_conflict: {:replace, [:last_read_id]},
73       conflict_target: [:user_id, :timeline]
74     )
75   end
76
77   def multi_set_last_read_id(multi, _, _), do: multi
78
79   defp get_marker(user, timeline) do
80     case Repo.find_resource(get_query(user, timeline)) do
81       {:ok, marker} -> %__MODULE__{marker | user: user}
82       _ -> %__MODULE__{timeline: timeline, user_id: user.id}
83     end
84   end
85
86   @doc false
87   defp changeset(marker, attrs) do
88     marker
89     |> cast(attrs, [:last_read_id])
90     |> validate_required([:user_id, :timeline, :last_read_id])
91     |> validate_inclusion(:timeline, @timelines)
92   end
93
94   defp by_timeline(query, timeline) do
95     from(m in query, where: m.timeline in ^List.wrap(timeline))
96   end
97
98   defp by_user_id(query, id), do: from(m in query, where: m.user_id == ^id)
99
100   defp get_query(user, timelines) do
101     __MODULE__
102     |> by_user_id(user.id)
103     |> by_timeline(timelines)
104   end
105
106   defp unread_count_query(query) do
107     from(
108       q in query,
109       left_join: n in "notifications",
110       on: n.user_id == q.user_id and n.seen == false,
111       group_by: [:id],
112       select_merge: %{
113         unread_count: fragment("count(?)", n.id)
114       }
115     )
116   end
117 end