First
[anni] / lib / pleroma / reverse_proxy / client / tesla.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.ReverseProxy.Client.Tesla do
6   @behaviour Pleroma.ReverseProxy.Client
7
8   alias Pleroma.Gun.ConnectionPool
9
10   @type headers() :: [{String.t(), String.t()}]
11   @type status() :: pos_integer()
12
13   @spec request(atom(), String.t(), headers(), String.t(), keyword()) ::
14           {:ok, status(), headers}
15           | {:ok, status(), headers, map()}
16           | {:error, atom() | String.t()}
17           | no_return()
18
19   @impl true
20   def request(method, url, headers, body, opts \\ []) do
21     check_adapter()
22
23     opts = Keyword.put(opts, :body_as, :chunks)
24
25     with {:ok, response} <-
26            Pleroma.HTTP.request(
27              method,
28              url,
29              body,
30              headers,
31              opts
32            ) do
33       if is_map(response.body) and method != :head do
34         {:ok, response.status, response.headers, response.body}
35       else
36         conn_pid = response.opts[:adapter][:conn]
37         ConnectionPool.release_conn(conn_pid)
38         {:ok, response.status, response.headers}
39       end
40     else
41       {:error, error} -> {:error, error}
42     end
43   end
44
45   @impl true
46   @spec stream_body(map()) ::
47           {:ok, binary(), map()} | {:error, atom() | String.t()} | :done | no_return()
48   def stream_body(%{pid: pid, fin: true}) do
49     ConnectionPool.release_conn(pid)
50     :done
51   end
52
53   def stream_body(client) do
54     case read_chunk!(client) do
55       {:fin, body} ->
56         {:ok, body, Map.put(client, :fin, true)}
57
58       {:nofin, part} ->
59         {:ok, part, client}
60
61       {:error, error} ->
62         {:error, error}
63     end
64   end
65
66   defp read_chunk!(%{pid: pid, stream: stream, opts: opts}) do
67     adapter = check_adapter()
68     adapter.read_chunk(pid, stream, opts)
69   end
70
71   @impl true
72   @spec close(map) :: :ok | no_return()
73   def close(%{pid: pid}) do
74     ConnectionPool.release_conn(pid)
75   end
76
77   defp check_adapter do
78     adapter = Application.get_env(:tesla, :adapter)
79
80     unless adapter == Tesla.Adapter.Gun do
81       raise "#{adapter} doesn't support reading body in chunks"
82     end
83
84     adapter
85   end
86 end