First
[anni] / test / pleroma / web / plugs / o_auth_plug_test.exs
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Web.Plugs.OAuthPlugTest do
6   use Pleroma.Web.ConnCase, async: true
7
8   alias Pleroma.Helpers.AuthHelper
9   alias Pleroma.Web.OAuth.Token
10   alias Pleroma.Web.OAuth.Token.Strategy.Revoke
11   alias Pleroma.Web.Plugs.OAuthPlug
12   alias Plug.Session
13
14   import Pleroma.Factory
15
16   setup %{conn: conn} do
17     user = insert(:user)
18     {:ok, oauth_token} = Token.create(insert(:oauth_app), user)
19     %{user: user, token: oauth_token, conn: conn}
20   end
21
22   test "it does nothing if a user is assigned", %{conn: conn} do
23     conn = assign(conn, :user, %Pleroma.User{})
24     ret_conn = OAuthPlug.call(conn, %{})
25
26     assert ret_conn == conn
27   end
28
29   test "with valid token (uppercase) in auth header, it assigns the user", %{conn: conn} = opts do
30     conn =
31       conn
32       |> put_req_header("authorization", "BEARER #{opts[:token].token}")
33       |> OAuthPlug.call(%{})
34
35     assert conn.assigns[:user] == opts[:user]
36   end
37
38   test "with valid token (downcase) in auth header, it assigns the user", %{conn: conn} = opts do
39     conn =
40       conn
41       |> put_req_header("authorization", "bearer #{opts[:token].token}")
42       |> OAuthPlug.call(%{})
43
44     assert conn.assigns[:user] == opts[:user]
45   end
46
47   test "with valid token (downcase) in url parameters, it assigns the user", opts do
48     conn =
49       :get
50       |> build_conn("/?access_token=#{opts[:token].token}")
51       |> put_req_header("content-type", "application/json")
52       |> fetch_query_params()
53       |> OAuthPlug.call(%{})
54
55     assert conn.assigns[:user] == opts[:user]
56   end
57
58   test "with valid token (downcase) in body parameters, it assigns the user", opts do
59     conn =
60       :post
61       |> build_conn("/api/v1/statuses", access_token: opts[:token].token, status: "test")
62       |> OAuthPlug.call(%{})
63
64     assert conn.assigns[:user] == opts[:user]
65   end
66
67   test "with invalid token, it does not assign the user", %{conn: conn} do
68     conn =
69       conn
70       |> put_req_header("authorization", "bearer TTTTT")
71       |> OAuthPlug.call(%{})
72
73     refute conn.assigns[:user]
74   end
75
76   describe "with :oauth_token in session, " do
77     setup %{token: oauth_token, conn: conn} do
78       session_opts = [
79         store: :cookie,
80         key: "_test",
81         signing_salt: "cooldude"
82       ]
83
84       conn =
85         conn
86         |> Session.call(Session.init(session_opts))
87         |> fetch_session()
88         |> AuthHelper.put_session_token(oauth_token.token)
89
90       %{conn: conn}
91     end
92
93     test "if session-stored token matches a valid OAuth token, assigns :user and :token", %{
94       conn: conn,
95       user: user,
96       token: oauth_token
97     } do
98       conn = OAuthPlug.call(conn, %{})
99
100       assert conn.assigns.user && conn.assigns.user.id == user.id
101       assert conn.assigns.token && conn.assigns.token.id == oauth_token.id
102     end
103
104     test "if session-stored token matches an expired OAuth token, does nothing", %{
105       conn: conn,
106       token: oauth_token
107     } do
108       expired_valid_until = NaiveDateTime.add(NaiveDateTime.utc_now(), -3600 * 24, :second)
109
110       oauth_token
111       |> Ecto.Changeset.change(valid_until: expired_valid_until)
112       |> Pleroma.Repo.update()
113
114       ret_conn = OAuthPlug.call(conn, %{})
115       assert ret_conn == conn
116     end
117
118     test "if session-stored token matches a revoked OAuth token, does nothing", %{
119       conn: conn,
120       token: oauth_token
121     } do
122       Revoke.revoke(oauth_token)
123
124       ret_conn = OAuthPlug.call(conn, %{})
125       assert ret_conn == conn
126     end
127   end
128 end