First
[anni] / lib / pleroma / reverse_proxy.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.ReverseProxy do
6   @range_headers ~w(range if-range)
7   @keep_req_headers ~w(accept accept-encoding cache-control if-modified-since) ++
8                       ~w(if-unmodified-since if-none-match) ++ @range_headers
9   @resp_cache_headers ~w(etag date last-modified)
10   @keep_resp_headers @resp_cache_headers ++
11                        ~w(content-length content-type content-disposition content-encoding) ++
12                        ~w(content-range accept-ranges vary)
13   @default_cache_control_header "public, max-age=1209600"
14   @valid_resp_codes [200, 206, 304]
15   @max_read_duration :timer.seconds(30)
16   @max_body_length :infinity
17   @failed_request_ttl :timer.seconds(60)
18   @methods ~w(GET HEAD)
19
20   @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
21
22   def max_read_duration_default, do: @max_read_duration
23   def default_cache_control_header, do: @default_cache_control_header
24
25   @moduledoc """
26   A reverse proxy.
27
28       Pleroma.ReverseProxy.call(conn, url, options)
29
30   It is not meant to be added into a plug pipeline, but to be called from another plug or controller.
31
32   Supports `#{inspect(@methods)}` HTTP methods, and only allows `#{inspect(@valid_resp_codes)}` status codes.
33
34   Responses are chunked to the client while downloading from the upstream.
35
36   Some request / responses headers are preserved:
37
38   * request: `#{inspect(@keep_req_headers)}`
39   * response: `#{inspect(@keep_resp_headers)}`
40
41   Options:
42
43   * `redirect_on_failure` (default `false`). Redirects the client to the real remote URL if there's any HTTP
44   errors. Any error during body processing will not be redirected as the response is chunked. This may expose
45   remote URL, clients IPs, ….
46
47   * `max_body_length` (default `#{inspect(@max_body_length)}`): limits the content length to be approximately the
48   specified length. It is validated with the `content-length` header and also verified when proxying.
49
50   * `max_read_duration` (default `#{inspect(@max_read_duration)}` ms): the total time the connection is allowed to
51   read from the remote upstream.
52
53   * `failed_request_ttl` (default `#{inspect(@failed_request_ttl)}` ms): the time the failed request is cached and cannot be retried.
54
55   * `inline_content_types`:
56     * `true` will not alter `content-disposition` (up to the upstream),
57     * `false` will add `content-disposition: attachment` to any request,
58     * a list of whitelisted content types
59
60   * `req_headers`, `resp_headers` additional headers.
61
62   * `http`: options for [hackney](https://github.com/benoitc/hackney) or [gun](https://github.com/ninenines/gun).
63
64   """
65   @default_options [pool: :media]
66
67   @inline_content_types [
68     "image/gif",
69     "image/jpeg",
70     "image/jpg",
71     "image/png",
72     "image/svg+xml",
73     "audio/mpeg",
74     "audio/mp3",
75     "video/webm",
76     "video/mp4",
77     "video/quicktime"
78   ]
79
80   require Logger
81   import Plug.Conn
82
83   @type option() ::
84           {:max_read_duration, :timer.time() | :infinity}
85           | {:max_body_length, non_neg_integer() | :infinity}
86           | {:failed_request_ttl, :timer.time() | :infinity}
87           | {:http, []}
88           | {:req_headers, [{String.t(), String.t()}]}
89           | {:resp_headers, [{String.t(), String.t()}]}
90           | {:inline_content_types, boolean() | [String.t()]}
91           | {:redirect_on_failure, boolean()}
92
93   @spec call(Plug.Conn.t(), url :: String.t(), [option()]) :: Plug.Conn.t()
94   def call(_conn, _url, _opts \\ [])
95
96   def call(conn = %{method: method}, url, opts) when method in @methods do
97     client_opts = Keyword.merge(@default_options, Keyword.get(opts, :http, []))
98
99     req_headers = build_req_headers(conn.req_headers, opts)
100
101     opts =
102       if filename = Pleroma.Web.MediaProxy.filename(url) do
103         Keyword.put_new(opts, :attachment_name, filename)
104       else
105         opts
106       end
107
108     with {:ok, nil} <- @cachex.get(:failed_proxy_url_cache, url),
109          {:ok, code, headers, client} <- request(method, url, req_headers, client_opts),
110          :ok <-
111            header_length_constraint(
112              headers,
113              Keyword.get(opts, :max_body_length, @max_body_length)
114            ) do
115       response(conn, client, url, code, headers, opts)
116     else
117       {:ok, true} ->
118         conn
119         |> error_or_redirect(url, 500, "Request failed", opts)
120         |> halt()
121
122       {:ok, code, headers} ->
123         head_response(conn, url, code, headers, opts)
124         |> halt()
125
126       {:error, {:invalid_http_response, code}} ->
127         Logger.error("#{__MODULE__}: request to #{inspect(url)} failed with HTTP status #{code}")
128         track_failed_url(url, code, opts)
129
130         conn
131         |> error_or_redirect(
132           url,
133           code,
134           "Request failed: " <> Plug.Conn.Status.reason_phrase(code),
135           opts
136         )
137         |> halt()
138
139       {:error, error} ->
140         Logger.error("#{__MODULE__}: request to #{inspect(url)} failed: #{inspect(error)}")
141         track_failed_url(url, error, opts)
142
143         conn
144         |> error_or_redirect(url, 500, "Request failed", opts)
145         |> halt()
146     end
147   end
148
149   def call(conn, _, _) do
150     conn
151     |> send_resp(400, Plug.Conn.Status.reason_phrase(400))
152     |> halt()
153   end
154
155   defp request(method, url, headers, opts) do
156     Logger.debug("#{__MODULE__} #{method} #{url} #{inspect(headers)}")
157     method = method |> String.downcase() |> String.to_existing_atom()
158
159     case client().request(method, url, headers, "", opts) do
160       {:ok, code, headers, client} when code in @valid_resp_codes ->
161         {:ok, code, downcase_headers(headers), client}
162
163       {:ok, code, headers} when code in @valid_resp_codes ->
164         {:ok, code, downcase_headers(headers)}
165
166       {:ok, code, _, _} ->
167         {:error, {:invalid_http_response, code}}
168
169       {:ok, code, _} ->
170         {:error, {:invalid_http_response, code}}
171
172       {:error, error} ->
173         {:error, error}
174     end
175   end
176
177   defp response(conn, client, url, status, headers, opts) do
178     Logger.debug("#{__MODULE__} #{status} #{url} #{inspect(headers)}")
179
180     result =
181       conn
182       |> put_resp_headers(build_resp_headers(headers, opts))
183       |> send_chunked(status)
184       |> chunk_reply(client, opts)
185
186     case result do
187       {:ok, conn} ->
188         halt(conn)
189
190       {:error, :closed, conn} ->
191         client().close(client)
192         halt(conn)
193
194       {:error, error, conn} ->
195         Logger.warn(
196           "#{__MODULE__} request to #{url} failed while reading/chunking: #{inspect(error)}"
197         )
198
199         client().close(client)
200         halt(conn)
201     end
202   end
203
204   defp chunk_reply(conn, client, opts) do
205     chunk_reply(conn, client, opts, 0, 0)
206   end
207
208   defp chunk_reply(conn, client, opts, sent_so_far, duration) do
209     with {:ok, duration} <-
210            check_read_duration(
211              duration,
212              Keyword.get(opts, :max_read_duration, @max_read_duration)
213            ),
214          {:ok, data, client} <- client().stream_body(client),
215          {:ok, duration} <- increase_read_duration(duration),
216          sent_so_far = sent_so_far + byte_size(data),
217          :ok <-
218            body_size_constraint(
219              sent_so_far,
220              Keyword.get(opts, :max_body_length, @max_body_length)
221            ),
222          {:ok, conn} <- chunk(conn, data) do
223       chunk_reply(conn, client, opts, sent_so_far, duration)
224     else
225       :done -> {:ok, conn}
226       {:error, error} -> {:error, error, conn}
227     end
228   end
229
230   defp head_response(conn, url, code, headers, opts) do
231     Logger.debug("#{__MODULE__} #{code} #{url} #{inspect(headers)}")
232
233     conn
234     |> put_resp_headers(build_resp_headers(headers, opts))
235     |> send_resp(code, "")
236   end
237
238   defp error_or_redirect(conn, url, code, body, opts) do
239     if Keyword.get(opts, :redirect_on_failure, false) do
240       conn
241       |> Phoenix.Controller.redirect(external: url)
242       |> halt()
243     else
244       conn
245       |> send_resp(code, body)
246       |> halt
247     end
248   end
249
250   defp downcase_headers(headers) do
251     Enum.map(headers, fn {k, v} ->
252       {String.downcase(k), v}
253     end)
254   end
255
256   defp get_content_type(headers) do
257     {_, content_type} =
258       List.keyfind(headers, "content-type", 0, {"content-type", "application/octet-stream"})
259
260     [content_type | _] = String.split(content_type, ";")
261     content_type
262   end
263
264   defp put_resp_headers(conn, headers) do
265     Enum.reduce(headers, conn, fn {k, v}, conn ->
266       put_resp_header(conn, k, v)
267     end)
268   end
269
270   defp build_req_headers(headers, opts) do
271     headers
272     |> downcase_headers()
273     |> Enum.filter(fn {k, _} -> k in @keep_req_headers end)
274     |> build_req_range_or_encoding_header(opts)
275     |> build_req_user_agent_header(opts)
276     |> Keyword.merge(Keyword.get(opts, :req_headers, []))
277   end
278
279   # Disable content-encoding if any @range_headers are requested (see #1823).
280   defp build_req_range_or_encoding_header(headers, _opts) do
281     range? = Enum.any?(headers, fn {header, _} -> Enum.member?(@range_headers, header) end)
282
283     if range? && List.keymember?(headers, "accept-encoding", 0) do
284       List.keydelete(headers, "accept-encoding", 0)
285     else
286       headers
287     end
288   end
289
290   defp build_req_user_agent_header(headers, _opts) do
291     List.keystore(
292       headers,
293       "user-agent",
294       0,
295       {"user-agent", Pleroma.Application.user_agent()}
296     )
297   end
298
299   defp build_resp_headers(headers, opts) do
300     headers
301     |> Enum.filter(fn {k, _} -> k in @keep_resp_headers end)
302     |> build_resp_cache_headers(opts)
303     |> build_resp_content_disposition_header(opts)
304     |> Keyword.merge(Keyword.get(opts, :resp_headers, []))
305   end
306
307   defp build_resp_cache_headers(headers, _opts) do
308     has_cache? = Enum.any?(headers, fn {k, _} -> k in @resp_cache_headers end)
309
310     cond do
311       has_cache? ->
312         # There's caching header present but no cache-control -- we need to set our own
313         # as Plug defaults to "max-age=0, private, must-revalidate"
314         List.keystore(
315           headers,
316           "cache-control",
317           0,
318           {"cache-control", @default_cache_control_header}
319         )
320
321       true ->
322         List.keystore(
323           headers,
324           "cache-control",
325           0,
326           {"cache-control", @default_cache_control_header}
327         )
328     end
329   end
330
331   defp build_resp_content_disposition_header(headers, opts) do
332     opt = Keyword.get(opts, :inline_content_types, @inline_content_types)
333
334     content_type = get_content_type(headers)
335
336     attachment? =
337       cond do
338         is_list(opt) && !Enum.member?(opt, content_type) -> true
339         opt == false -> true
340         true -> false
341       end
342
343     if attachment? do
344       name =
345         try do
346           {{"content-disposition", content_disposition_string}, _} =
347             List.keytake(headers, "content-disposition", 0)
348
349           [name | _] =
350             Regex.run(
351               ~r/filename="((?:[^"\\]|\\.)*)"/u,
352               content_disposition_string || "",
353               capture: :all_but_first
354             )
355
356           name
357         rescue
358           MatchError -> Keyword.get(opts, :attachment_name, "attachment")
359         end
360
361       disposition = "attachment; filename=\"#{name}\""
362
363       List.keystore(headers, "content-disposition", 0, {"content-disposition", disposition})
364     else
365       headers
366     end
367   end
368
369   defp header_length_constraint(headers, limit) when is_integer(limit) and limit > 0 do
370     with {_, size} <- List.keyfind(headers, "content-length", 0),
371          {size, _} <- Integer.parse(size),
372          true <- size <= limit do
373       :ok
374     else
375       false ->
376         {:error, :body_too_large}
377
378       _ ->
379         :ok
380     end
381   end
382
383   defp header_length_constraint(_, _), do: :ok
384
385   defp body_size_constraint(size, limit) when is_integer(limit) and limit > 0 and size >= limit do
386     {:error, :body_too_large}
387   end
388
389   defp body_size_constraint(_, _), do: :ok
390
391   defp check_read_duration(nil = _duration, max), do: check_read_duration(@max_read_duration, max)
392
393   defp check_read_duration(duration, max)
394        when is_integer(duration) and is_integer(max) and max > 0 do
395     if duration > max do
396       {:error, :read_duration_exceeded}
397     else
398       {:ok, {duration, :erlang.system_time(:millisecond)}}
399     end
400   end
401
402   defp check_read_duration(_, _), do: {:ok, :no_duration_limit, :no_duration_limit}
403
404   defp increase_read_duration({previous_duration, started})
405        when is_integer(previous_duration) and is_integer(started) do
406     duration = :erlang.system_time(:millisecond) - started
407     {:ok, previous_duration + duration}
408   end
409
410   defp increase_read_duration(_) do
411     {:ok, :no_duration_limit, :no_duration_limit}
412   end
413
414   defp client, do: Pleroma.ReverseProxy.Client.Wrapper
415
416   defp track_failed_url(url, error, opts) do
417     ttl =
418       unless error in [:body_too_large, 400, 204] do
419         Keyword.get(opts, :failed_request_ttl, @failed_request_ttl)
420       else
421         nil
422       end
423
424     @cachex.put(:failed_proxy_url_cache, url, true, ttl: ttl)
425   end
426 end