mirror of
https://github.com/Kozea/WeasyPrint.git
synced 2024-10-05 00:21:15 +03:00
New PDF: add support for (over)writing objects.
This commit is contained in:
parent
5da826520b
commit
6e7ef6b62a
@ -34,12 +34,15 @@ import os
|
||||
import re
|
||||
import itertools
|
||||
|
||||
from .compat import xrange
|
||||
from . import VERSION_STRING
|
||||
from .compat import xrange, iteritems
|
||||
|
||||
|
||||
TRAILER_RE = re.compile(b'\ntrailer\n(.+)\nstartxref\n(\d+)\n%%EOF\n$',
|
||||
re.DOTALL)
|
||||
DICT_TYPE_RE = re.compile(b'/Type /(\w+)') # + default to greedy
|
||||
|
||||
# No end delimiter, + defaults to greedy
|
||||
DICT_TYPE_RE = re.compile(b'/Type /(\w+)')
|
||||
|
||||
|
||||
class PDFDictionary(object):
|
||||
@ -47,20 +50,6 @@ class PDFDictionary(object):
|
||||
self.object_number = object_number
|
||||
self.byte_string = byte_string
|
||||
|
||||
@classmethod
|
||||
def from_file(cls, pdf_file, object_number):
|
||||
fileobj = pdf_file.fileobj
|
||||
fileobj.seek(pdf_file.object_offsets[object_number])
|
||||
line = next(fileobj)
|
||||
assert line.endswith(b' 0 obj\n')
|
||||
assert int(line[:-7]) == object_number # len(b' 0 obj\n') == 7
|
||||
object_lines = []
|
||||
for line in fileobj:
|
||||
object_lines.append(line)
|
||||
if line == b'>>\n':
|
||||
assert next(fileobj) == b'endobj\n'
|
||||
return cls(object_number, b''.join(object_lines))
|
||||
|
||||
def __repr__(self):
|
||||
return self.__class__.__name__ + repr(
|
||||
(self.object_number, self.byte_string))
|
||||
@ -68,9 +57,12 @@ class PDFDictionary(object):
|
||||
def get_type(self):
|
||||
return DICT_TYPE_RE.search(self.byte_string).group(1).decode('ascii')
|
||||
|
||||
# __cache is a shared mutable, not an actual parameter.
|
||||
def get_indirect_object_number(self, key, __cache={}):
|
||||
"""
|
||||
"""Read the value for `key`, assuming it is an indirect object.
|
||||
|
||||
:return: (int) the object number
|
||||
|
||||
"""
|
||||
regex = __cache.get(key)
|
||||
if not regex:
|
||||
@ -78,16 +70,12 @@ class PDFDictionary(object):
|
||||
__cache[key] = regex
|
||||
return int(regex.search(self.byte_string).group(1))
|
||||
|
||||
def get_indirect_dict(self, key, pdf_file):
|
||||
"""
|
||||
:return: a new PDFDictionary instance
|
||||
"""
|
||||
return self.from_file(
|
||||
pdf_file, self.get_indirect_object_number(key))
|
||||
|
||||
# __cache is a shared mutable, not an actual parameter.
|
||||
def get_array(self, key, __cache={}):
|
||||
"""
|
||||
:return: (bytes) the unparsed array content
|
||||
"""Read the value for `key`, assuming it is an array.
|
||||
|
||||
:return: (bytes) the unparsed array content.
|
||||
|
||||
"""
|
||||
regex = __cache.get(key)
|
||||
if not regex:
|
||||
@ -95,28 +83,49 @@ class PDFDictionary(object):
|
||||
__cache[key] = regex
|
||||
return regex.search(self.byte_string).group(1)
|
||||
|
||||
def get_indirect_dict(self, key, pdf_file):
|
||||
"""Read the value for `key` and follow the reference, assuming
|
||||
it is an indirect dictionary object.
|
||||
|
||||
:return: a new PDFDictionary instance.
|
||||
|
||||
"""
|
||||
object_number = self.get_indirect_object_number(key)
|
||||
return type(self)(object_number, pdf_file.read_object(object_number))
|
||||
|
||||
def get_indirect_dict_array(self, key, pdf_file):
|
||||
"""Read the value for `key` and follow the references, assuming
|
||||
it is an array of indirect dictionary objects.
|
||||
|
||||
:return: a list of new PDFDictionary instance.
|
||||
|
||||
"""
|
||||
:return: a list of new PDFDictionary instance
|
||||
"""
|
||||
parts = self.get_array(key).split(b' 0 R ')
|
||||
parts = self.get_array(key).split(b' 0 R')
|
||||
# The array looks like this: ' <a> 0 R <b> 0 R <c> 0 R '
|
||||
# so `parts` ends up like this [' <a>', ' <b>', ' <c>', ' ']
|
||||
# With the trailing white space in the list.
|
||||
trail = parts.pop()
|
||||
assert not trail.strip()
|
||||
return [self.from_file(pdf_file, int(n)) for n in parts]
|
||||
class_ = type(self)
|
||||
read = pdf_file.read_object
|
||||
return [class_(n, read(n)) for n in map(int, parts)]
|
||||
|
||||
|
||||
class PDFFile(object):
|
||||
"""
|
||||
:param fileobj:
|
||||
A seekable binary file-like object for a PDF generated by cairo.
|
||||
|
||||
"""
|
||||
def __init__(self, fileobj):
|
||||
# cairo’s trailer + startxref + EOF is typically under 100 bytes
|
||||
# cairo’s trailer only has Size, Root and Info.
|
||||
# The trailer + startxref + EOF is typically under 100 bytes
|
||||
fileobj.seek(-200, os.SEEK_END)
|
||||
trailer, startxref = TRAILER_RE.search(fileobj.read()).groups()
|
||||
trailer = PDFDictionary(None, trailer)
|
||||
startxref = int(startxref)
|
||||
|
||||
fileobj.seek(int(startxref))
|
||||
fileobj.seek(startxref)
|
||||
line = next(fileobj)
|
||||
assert line == b'xref\n'
|
||||
|
||||
@ -128,15 +137,15 @@ class PDFFile(object):
|
||||
line = next(fileobj)
|
||||
assert line == b'0000000000 65535 f \n'
|
||||
|
||||
object_offsets = [None]
|
||||
objects_offsets = [None]
|
||||
for object_number in xrange(1, total_objects):
|
||||
line = next(fileobj)
|
||||
assert line[10:] == b' 00000 n \n'
|
||||
object_offsets.append(int(line[:10]))
|
||||
objects_offsets.append(int(line[:10]))
|
||||
|
||||
self.fileobj = fileobj
|
||||
#: Maps object number -> bytes from the start of the file
|
||||
self.object_offsets = object_offsets
|
||||
self.objects_offsets = objects_offsets
|
||||
|
||||
info = trailer.get_indirect_dict('Info', self)
|
||||
catalog = trailer.get_indirect_dict('Root', self)
|
||||
@ -146,25 +155,148 @@ class PDFFile(object):
|
||||
assert all(p.get_type() == 'Page' for p in pages)
|
||||
|
||||
self.startxref = startxref
|
||||
self.trailer = trailer
|
||||
self.info = info
|
||||
self.catalog = catalog
|
||||
self.page_tree = page_tree
|
||||
self.pages = pages
|
||||
|
||||
self.finished = False
|
||||
self.overwritten_objects_offsets = {}
|
||||
self.new_objects_offsets = []
|
||||
|
||||
def read_object(self, object_number):
|
||||
"""
|
||||
:param object_number:
|
||||
An integer N so that 1 <= N < len(self.objects_offsets)
|
||||
:returns:
|
||||
The object content as a byte string.
|
||||
|
||||
"""
|
||||
fileobj = self.fileobj
|
||||
fileobj.seek(self.objects_offsets[object_number])
|
||||
line = next(fileobj)
|
||||
assert line.endswith(b' 0 obj\n')
|
||||
assert int(line[:-7]) == object_number # len(b' 0 obj\n') == 7
|
||||
object_lines = []
|
||||
for line in fileobj:
|
||||
object_lines.append(line)
|
||||
if line == b'>>\n':
|
||||
assert next(fileobj) == b'endobj\n'
|
||||
return b''.join(object_lines)
|
||||
|
||||
def overwrite_object(self, object_number, byte_string):
|
||||
"""Write the new content for an existing object at the end of the file.
|
||||
|
||||
:param object_number:
|
||||
An integer N so that 1 <= N < len(self.objects_offsets)
|
||||
:param byte_string:
|
||||
The new object content as a byte string.
|
||||
|
||||
"""
|
||||
self.overwritten_objects_offsets[object_number] = (
|
||||
self._write_object(object_number, byte_string))
|
||||
|
||||
def write_new_object(self, byte_string):
|
||||
"""Write a new object at the end of the file.
|
||||
|
||||
:param byte_string:
|
||||
The object content as a byte string.
|
||||
:return:
|
||||
The new object number.
|
||||
|
||||
"""
|
||||
new_objects_offsets = self.new_objects_offsets
|
||||
object_number = len(self.objects_offsets) + len(new_objects_offsets)
|
||||
new_objects_offsets.append(
|
||||
self._write_object(object_number, byte_string))
|
||||
return object_number
|
||||
|
||||
def finish(self):
|
||||
"""
|
||||
Write the cross-reference table and the trailer for the new and
|
||||
overwritten objects. This makes `fileobj` a valid (updated) PDF file.
|
||||
|
||||
"""
|
||||
new_startxref, write = self._start_writing()
|
||||
self.finished = True
|
||||
write(b'xref\n')
|
||||
|
||||
# Don’t bother sorting or finding contiguous numbers,
|
||||
# just write a new sub-section for each overwritten object.
|
||||
for object_number, offset in iteritems(
|
||||
self.overwritten_objects_offsets):
|
||||
write('{} 1\n{:010} 00000 n \n'.format(
|
||||
object_number, offset).encode('ascii'))
|
||||
|
||||
if self.new_objects_offsets:
|
||||
first_new_object = len(self.objects_offsets)
|
||||
write('{} {}\n'.format(
|
||||
first_new_object, len(self.new_objects_offsets)
|
||||
).encode('ascii'))
|
||||
for object_number, offset in enumerate(
|
||||
self.new_objects_offsets, start=first_new_object):
|
||||
write('{:010} 00000 n \n'.format(offset).encode('ascii'))
|
||||
|
||||
size = object_number + 1
|
||||
write(
|
||||
'trailer\n<< /Size {} /Root {} 0 R /Info {} 0 R /Prev {} >>\n'
|
||||
'startxref\n{}\n%%EOF\n'.format(
|
||||
size, self.catalog.object_number, self.info.object_number,
|
||||
self.startxref, new_startxref).encode('ascii'))
|
||||
|
||||
def _write_object(self, object_number, byte_string):
|
||||
offset, write = self._start_writing()
|
||||
write('{} 0 obj\n'.format(object_number).encode('ascii'))
|
||||
write(byte_string)
|
||||
write(b'\nendobj\n')
|
||||
return offset
|
||||
|
||||
def _start_writing(self):
|
||||
assert not self.finished
|
||||
fileobj = self.fileobj
|
||||
fileobj.seek(0, os.SEEK_END)
|
||||
return fileobj.tell(), fileobj.write
|
||||
|
||||
|
||||
def encode_pdf_string(unicode_string):
|
||||
"""UTF-16 BE with a BOM, then backshlash-escape parentheses.
|
||||
|
||||
:returns: an Unicode string that needs to be wrapped in parentheses and
|
||||
encoded to latin1
|
||||
|
||||
"""
|
||||
byte_string = ('\ufeff' + unicode_string).encode('utf-16-be')
|
||||
# Make a round-trip back through Unicode for the .translate() method.
|
||||
# (bytes.translate only maps to single bytes.)
|
||||
# Use latin1 to map all byte values.
|
||||
return byte_string.decode('latin1').translate(
|
||||
{40: r'\(', 41: r'\)', 92: r'\\'})
|
||||
|
||||
|
||||
def add_pdf_metadata(fileobj):
|
||||
pdf = PDFFile(fileobj)
|
||||
pdf.overwrite_object(pdf.info.object_number,
|
||||
'<< /Producer ({0}) >>'.format(
|
||||
encode_pdf_string(VERSION_STRING)
|
||||
).encode('latin1'))
|
||||
pdf.write_new_object(b'foo')
|
||||
pdf.finish()
|
||||
print(pdf.fileobj.getvalue().decode('latin1'))
|
||||
|
||||
|
||||
def test():
|
||||
import cairo
|
||||
import io
|
||||
fileobj = io.BytesIO()
|
||||
surface = cairo.PDFSurface(fileobj, 100, 100)
|
||||
for i in xrange(20):
|
||||
surface.show_page()
|
||||
# for i in xrange(20):
|
||||
# surface.show_page()
|
||||
surface.finish()
|
||||
add_pdf_metadata(fileobj)
|
||||
|
||||
pdf = PDFFile(fileobj)
|
||||
print(pdf.page_tree)
|
||||
print(len(pdf.pages))
|
||||
# pdf = PDFFile(fileobj)
|
||||
# print(pdf.page_tree)
|
||||
# print(len(pdf.pages))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
Loading…
Reference in New Issue
Block a user