|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
3 | 3 | import io |
| 4 | +import mimetypes |
4 | 5 | import os |
| 6 | +import re |
5 | 7 | import typing |
6 | 8 | from pathlib import Path |
7 | 9 |
|
|
14 | 16 | SyncByteStream, |
15 | 17 | ) |
16 | 18 | from ._utils import ( |
17 | | - format_form_param, |
18 | | - guess_content_type, |
19 | 19 | peek_filelike_length, |
20 | 20 | primitive_value_to_str, |
21 | 21 | to_bytes, |
22 | 22 | ) |
23 | 23 |
|
| 24 | +_HTML5_FORM_ENCODING_REPLACEMENTS = {'"': "%22", "\\": "\\\\"} |
| 25 | +_HTML5_FORM_ENCODING_REPLACEMENTS.update( |
| 26 | + {chr(c): "%{:02X}".format(c) for c in range(0x1F + 1) if c != 0x1B} |
| 27 | +) |
| 28 | +_HTML5_FORM_ENCODING_RE = re.compile( |
| 29 | + r"|".join([re.escape(c) for c in _HTML5_FORM_ENCODING_REPLACEMENTS.keys()]) |
| 30 | +) |
| 31 | + |
| 32 | + |
| 33 | +def _format_form_param(name: str, value: str) -> bytes: |
| 34 | + """ |
| 35 | + Encode a name/value pair within a multipart form. |
| 36 | + """ |
| 37 | + |
| 38 | + def replacer(match: typing.Match[str]) -> str: |
| 39 | + return _HTML5_FORM_ENCODING_REPLACEMENTS[match.group(0)] |
| 40 | + |
| 41 | + value = _HTML5_FORM_ENCODING_RE.sub(replacer, value) |
| 42 | + return f'{name}="{value}"'.encode() |
| 43 | + |
| 44 | + |
| 45 | +def _guess_content_type(filename: str | None) -> str | None: |
| 46 | + """ |
| 47 | + Guesses the mimetype based on a filename. Defaults to `application/octet-stream`. |
| 48 | +
|
| 49 | + Returns `None` if `filename` is `None` or empty. |
| 50 | + """ |
| 51 | + if filename: |
| 52 | + return mimetypes.guess_type(filename)[0] or "application/octet-stream" |
| 53 | + return None |
| 54 | + |
24 | 55 |
|
25 | 56 | def get_multipart_boundary_from_content_type( |
26 | 57 | content_type: bytes | None, |
@@ -58,7 +89,7 @@ def __init__(self, name: str, value: str | bytes | int | float | None) -> None: |
58 | 89 |
|
59 | 90 | def render_headers(self) -> bytes: |
60 | 91 | if not hasattr(self, "_headers"): |
61 | | - name = format_form_param("name", self.name) |
| 92 | + name = _format_form_param("name", self.name) |
62 | 93 | self._headers = b"".join( |
63 | 94 | [b"Content-Disposition: form-data; ", name, b"\r\n\r\n"] |
64 | 95 | ) |
@@ -115,7 +146,7 @@ def __init__(self, name: str, value: FileTypes) -> None: |
115 | 146 | fileobj = value |
116 | 147 |
|
117 | 148 | if content_type is None: |
118 | | - content_type = guess_content_type(filename) |
| 149 | + content_type = _guess_content_type(filename) |
119 | 150 |
|
120 | 151 | has_content_type_header = any("content-type" in key.lower() for key in headers) |
121 | 152 | if content_type is not None and not has_content_type_header: |
@@ -156,10 +187,10 @@ def render_headers(self) -> bytes: |
156 | 187 | if not hasattr(self, "_headers"): |
157 | 188 | parts = [ |
158 | 189 | b"Content-Disposition: form-data; ", |
159 | | - format_form_param("name", self.name), |
| 190 | + _format_form_param("name", self.name), |
160 | 191 | ] |
161 | 192 | if self.filename: |
162 | | - filename = format_form_param("filename", self.filename) |
| 193 | + filename = _format_form_param("filename", self.filename) |
163 | 194 | parts.extend([b"; ", filename]) |
164 | 195 | for header_name, header_value in self.headers.items(): |
165 | 196 | key, val = f"\r\n{header_name}: ".encode(), header_value.encode() |
|
0 commit comments