1
1
mirror of https://github.com/Kozea/WeasyPrint.git synced 2024-10-04 07:57:52 +03:00

Fix #86: Support gzip and deflate encoding in HTTP responses

This commit is contained in:
Simon Sapin 2014-04-21 23:52:58 +01:00
parent a6a9f15826
commit 9404375d3e
5 changed files with 129 additions and 6 deletions

11
CHANGES
View File

@ -2,6 +2,17 @@ WeasyPrint changelog
====================
Version 0.22
------------
Not released yet.
New features:
* `#86 <https://github.com/Kozea/WeasyPrint/pull/86>`_:
Support gzip and deflate encoding in HTTP responses
Version 0.21
------------

View File

@ -96,3 +96,23 @@ else:
def ints_from_bytes(byte_string):
"""Return a list of ints from a byte string"""
return imap(ord, byte_string)
if sys.version_info >= (3, 2):
from gzip import GzipFile
class StreamingGzipFile(GzipFile):
def __init__(self, fileobj):
GzipFile.__init__(self, fileobj=fileobj)
self.fileobj_to_close = fileobj
def close():
GzipFile.close(self)
self.fileobj_to_close.close()
# Inform html5lib to not rely on these:
seek = tell = None
else:
# On older Python versions, GzipFile requires .seek() and .tell()
# which file-like objects for HTTP response do not have.
StreamingGzipFile = None

View File

@ -20,6 +20,8 @@ import contextlib
import threading
import shutil
import tempfile
import gzip
import zlib
import lxml.html
import lxml.etree
@ -27,7 +29,7 @@ import cairocffi as cairo
import pytest
from .testing_utils import (
resource_filename, assert_no_logs, capture_logs, TestHTML)
resource_filename, assert_no_logs, capture_logs, TestHTML, http_server)
from .test_draw import image_to_pixels
from ..compat import urljoin, urlencode, urlparse_uses_relative, iteritems
from ..urls import path2url
@ -988,3 +990,36 @@ def test_html_meta():
title='One',
authors=['', 'Me'])
@assert_no_logs
def test_http():
def gzip_compress(data):
file_obj = io.BytesIO()
gzip_file = gzip.GzipFile(fileobj=file_obj, mode='wb')
gzip_file.write(data)
gzip_file.close()
return file_obj.getvalue()
with http_server({
'/gzip': lambda env: (
(gzip_compress(b'<html test=ok>'), [('Content-Encoding', 'gzip')])
if 'gzip' in env.get('HTTP_ACCEPT_ENCODING', '') else
(b'<html test=accept-encoding-header-fail>', [])
),
'/deflate': lambda env: (
(zlib.compress(b'<html test=ok>'),
[('Content-Encoding', 'deflate')])
if 'deflate' in env.get('HTTP_ACCEPT_ENCODING', '') else
(b'<html test=accept-encoding-header-fail>', [])
),
'/raw-deflate': lambda env: (
# Remove zlib header and checksum
(zlib.compress(b'<html test=ok>')[2:-4],
[('Content-Encoding', 'deflate')])
if 'deflate' in env.get('HTTP_ACCEPT_ENCODING', '') else
(b'<html test=accept-encoding-header-fail>', [])
),
}) as root_url:
assert HTML(root_url + '/gzip').root_element.get('test') == 'ok'
assert HTML(root_url + '/deflate').root_element.get('test') == 'ok'
assert HTML(root_url + '/raw-deflate').root_element.get('test') == 'ok'

View File

@ -17,6 +17,8 @@ import os.path
import logging
import contextlib
import functools
import wsgiref.simple_server
import threading
from .. import HTML, CSS
from ..logger import LOGGER
@ -97,3 +99,31 @@ def almost_equal(a, b):
if isinstance(a, float) or isinstance(b, float):
return round(abs(a - b), 6) == 0
return a == b
@contextlib.contextmanager
def http_server(handlers):
def wsgi_app(environ, start_response):
handler = handlers.get(environ['PATH_INFO'])
if handler:
status = str('200 OK')
response, headers = handler(environ)
headers = [(str(name), str(value)) for name, value in headers]
else:
status = str('404 Not Found')
response = b''
headers = []
start_response(status, headers)
return [response]
# Port 0: let the OS pick an available port number
# http://stackoverflow.com/a/1365284/1162888
server = wsgiref.simple_server.make_server('127.0.0.1', 0, wsgi_app)
_host, port = server.socket.getsockname()
thread = threading.Thread(target=server.serve_forever)
thread.start()
try:
yield 'http://127.0.0.1:%s' % port
finally:
server.shutdown()
thread.join()

View File

@ -12,18 +12,22 @@
from __future__ import division, unicode_literals
import io
import re
import sys
import codecs
import os.path
import mimetypes
import contextlib
import gzip
import zlib
from . import VERSION_STRING
from .logger import LOGGER
from .compat import (
urljoin, urlsplit, quote, unquote, unquote_to_bytes, urlopen_contenttype,
Request, parse_email, pathname2url, unicode, base64_decode)
Request, parse_email, pathname2url, unicode, base64_decode,
StreamingGzipFile)
# Unlinke HTML, CSS and PNG, the SVG MIME type is not always builtin
@ -227,6 +231,11 @@ def open_data_url(url):
redirected_url=url)
HTTP_HEADERS = {
'User-Agent': VERSION_STRING,
'Accept-Encoding': 'gzip, deflate',
}
def default_url_fetcher(url):
"""Fetch an external resource such as an image or stylesheet.
@ -259,10 +268,28 @@ def default_url_fetcher(url):
return open_data_url(url)
elif UNICODE_SCHEME_RE.match(url):
url = iri_to_uri(url)
result, mime_type, charset = urlopen_contenttype(Request(
url, headers={'User-Agent': VERSION_STRING}))
return dict(file_obj=result, redirected_url=result.geturl(),
mime_type=mime_type, encoding=charset)
response, mime_type, charset = urlopen_contenttype(Request(
url, headers=HTTP_HEADERS))
result = dict(redirected_url=response.geturl(),
mime_type=mime_type, encoding=charset)
content_encoding = response.info().get('Content-Encoding')
if content_encoding == 'gzip':
if StreamingGzipFile is None:
result['string'] = gzip.GzipFile(
fileobj=io.BytesIO(response.read())).read()
response.close()
else:
result['file_obj'] = StreamingGzipFile(fileobj=response)
elif content_encoding == 'deflate':
data = response.read()
try:
result['string'] = zlib.decompress(data)
except zlib.error:
# Try without zlib header or checksum
result['string'] = zlib.decompress(data, -15)
else:
result['file_obj'] = response
return result
else:
raise ValueError('Not an absolute URI: %r' % url)