Krzysztof Szukiełojć | 15b62b7 | 2017-02-15 08:58:18 +0100 | [diff] [blame] | 1 | # Copyright 2012 Canonical Ltd. This software is licensed under the |
| 2 | # GNU Affero General Public License version 3 (see the file LICENSE). |
| 3 | |
| 4 | """Encoding of MIME multipart data.""" |
| 5 | |
| 6 | from __future__ import ( |
| 7 | absolute_import, |
| 8 | print_function, |
| 9 | unicode_literals, |
| 10 | ) |
| 11 | |
| 12 | str = None |
| 13 | |
| 14 | __metaclass__ = type |
| 15 | __all__ = [ |
| 16 | 'encode_multipart_data', |
| 17 | ] |
| 18 | |
| 19 | from collections import ( |
| 20 | Iterable, |
| 21 | Mapping, |
| 22 | ) |
| 23 | from email.generator import Generator |
| 24 | from email.mime.application import MIMEApplication |
| 25 | from email.mime.multipart import MIMEMultipart |
| 26 | from io import ( |
| 27 | BytesIO, |
| 28 | IOBase, |
| 29 | ) |
| 30 | from itertools import chain |
| 31 | import mimetypes |
| 32 | |
| 33 | |
| 34 | def get_content_type(*names): |
| 35 | """Return the MIME content type for the file with the given name.""" |
| 36 | for name in names: |
| 37 | if name is not None: |
| 38 | mimetype, encoding = mimetypes.guess_type(name) |
| 39 | if mimetype is not None: |
| 40 | return mimetype |
| 41 | else: |
| 42 | return "application/octet-stream" |
| 43 | |
| 44 | |
| 45 | def make_bytes_payload(name, content): |
| 46 | payload = MIMEApplication(content) |
| 47 | payload.add_header("Content-Disposition", "form-data", name=name) |
| 48 | return payload |
| 49 | |
| 50 | |
| 51 | def make_string_payload(name, content): |
| 52 | payload = MIMEApplication(content.encode("utf-8"), charset="utf-8") |
| 53 | payload.add_header("Content-Disposition", "form-data", name=name) |
| 54 | payload.set_type("text/plain") |
| 55 | return payload |
| 56 | |
| 57 | |
| 58 | def make_file_payload(name, content): |
Krzysztof Szukiełojć | c4b3309 | 2017-02-15 13:25:38 +0100 | [diff] [blame] | 59 | print('********************************') |
| 60 | print('content %s' % content) |
Krzysztof Szukiełojć | 15b62b7 | 2017-02-15 08:58:18 +0100 | [diff] [blame] | 61 | payload = MIMEApplication(content.read()) |
| 62 | payload.add_header( |
| 63 | "Content-Disposition", "form-data", name=name, filename=name) |
| 64 | names = name, getattr(content, "name", None) |
| 65 | payload.set_type(get_content_type(*names)) |
| 66 | return payload |
| 67 | |
| 68 | |
| 69 | def make_payloads(name, content): |
| 70 | if isinstance(content, bytes): |
| 71 | yield make_bytes_payload(name, content) |
| 72 | elif isinstance(content, unicode): |
| 73 | yield make_string_payload(name, content) |
| 74 | elif isinstance(content, IOBase): |
| 75 | yield make_file_payload(name, content) |
| 76 | elif callable(content): |
| 77 | with content() as content: |
| 78 | for payload in make_payloads(name, content): |
| 79 | yield payload |
| 80 | elif isinstance(content, Iterable): |
| 81 | for part in content: |
| 82 | for payload in make_payloads(name, part): |
| 83 | yield payload |
| 84 | else: |
| 85 | raise AssertionError( |
| 86 | "%r is unrecognised: %r" % (name, content)) |
| 87 | |
| 88 | |
| 89 | def build_multipart_message(data): |
| 90 | message = MIMEMultipart("form-data") |
Gabor Toth | 36bc028 | 2018-08-18 10:39:51 +0200 | [diff] [blame] | 91 | for name, contents in data: |
| 92 | if not isinstance(contents, list): |
| 93 | contents = [contents] |
| 94 | for content in contents: |
| 95 | for payload in make_payloads(name, content): |
| 96 | message.attach(payload) |
Krzysztof Szukiełojć | 15b62b7 | 2017-02-15 08:58:18 +0100 | [diff] [blame] | 97 | return message |
| 98 | |
| 99 | |
| 100 | def encode_multipart_message(message): |
| 101 | # The message must be multipart. |
| 102 | assert message.is_multipart() |
| 103 | # The body length cannot yet be known. |
| 104 | assert "Content-Length" not in message |
| 105 | # So line-endings can be fixed-up later on, component payloads must have |
| 106 | # no Content-Length and their Content-Transfer-Encoding must be base64 |
| 107 | # (and not quoted-printable, which Django doesn't appear to understand). |
| 108 | for part in message.get_payload(): |
| 109 | assert "Content-Length" not in part |
| 110 | assert part["Content-Transfer-Encoding"] == "base64" |
| 111 | # Flatten the message without headers. |
| 112 | buf = BytesIO() |
| 113 | generator = Generator(buf, False) # Don't mangle "^From". |
| 114 | generator._write_headers = lambda self: None # Ignore. |
| 115 | generator.flatten(message) |
| 116 | # Ensure the body has CRLF-delimited lines. See |
| 117 | # http://bugs.python.org/issue1349106. |
| 118 | body = b"\r\n".join(buf.getvalue().splitlines()) |
| 119 | # Only now is it safe to set the content length. |
| 120 | message.add_header("Content-Length", "%d" % len(body)) |
| 121 | return message.items(), body |
| 122 | |
| 123 | |
| 124 | def encode_multipart_data(data=(), files=()): |
| 125 | """Create a MIME multipart payload from L{data} and L{files}. |
| 126 | |
| 127 | **Note** that this function is deprecated. Use `build_multipart_message` |
| 128 | and `encode_multipart_message` instead. |
| 129 | |
| 130 | @param data: A mapping of names (ASCII strings) to data (byte string). |
| 131 | @param files: A mapping of names (ASCII strings) to file objects ready to |
| 132 | be read. |
| 133 | @return: A 2-tuple of C{(body, headers)}, where C{body} is a a byte string |
| 134 | and C{headers} is a dict of headers to add to the enclosing request in |
| 135 | which this payload will travel. |
| 136 | """ |
| 137 | if isinstance(data, Mapping): |
| 138 | data = data.items() |
| 139 | if isinstance(files, Mapping): |
| 140 | files = files.items() |
| 141 | message = build_multipart_message(chain(data, files)) |
| 142 | headers, body = encode_multipart_message(message) |
| 143 | return body, dict(headers) |