bundle2: lazy unbundle of part payload

The `unbundle` part gains a `read` method to retrieve payload content.
This method behaves as a python file-like read method.

The bundle-processing code is updated to make sure a part is fully consumed before
another one is extracted.

Test output changes because the debug output is even more interleaved now.
This commit is contained in:
Pierre-Yves David 2014-04-11 16:05:22 -04:00
parent a04ebd710a
commit acbb6d4494
2 changed files with 53 additions and 31 deletions

View File

@ -288,6 +288,7 @@ def processbundle(repo, unbundler, transactiongetter=_notransaction):
# - exception catching
unbundler.params
iterparts = iter(unbundler)
part = None
try:
for part in iterparts:
parttype = part.type
@ -302,8 +303,8 @@ def processbundle(repo, unbundler, transactiongetter=_notransaction):
# - use a more precise exception
raise
op.ui.debug('ignoring unknown advisory part %r\n' % key)
# todo:
# - consume the part once we use streaming
# consuming the part
part.read()
continue
# handler is called outside the above try block so that we don't
@ -311,9 +312,14 @@ def processbundle(repo, unbundler, transactiongetter=_notransaction):
# parthandlermapping lookup (any KeyError raised by handler()
# itself represents a defect of a different variety).
handler(op, part)
part.read()
except Exception:
if part is not None:
# consume the bundle content
part.read()
for part in iterparts:
pass # consume the bundle content
# consume the bundle content
part.read()
raise
return op
@ -544,19 +550,21 @@ class unbundlepart(unpackermixin):
# unbundle state attr
self._headerdata = header
self._headeroffset = 0
self._initialized = False
self.consumed = False
# part data
self.id = None
self.type = None
self.mandatoryparams = None
self.advisoryparams = None
self.data = None
self._readdata()
self._payloadstream = None
self._readheader()
def _fromheader(self, size):
"""return the next <size> byte from the header"""
offset = self._headeroffset
data = self._headerdata[offset:(offset + size)]
self._headeroffset += size
self._headeroffset = offset + size
return data
def _unpackheader(self, format):
@ -566,10 +574,8 @@ class unbundlepart(unpackermixin):
data = self._fromheader(struct.calcsize(format))
return _unpack(format, data)
def _readdata(self):
def _readheader(self):
"""read the header and setup the object"""
# some utility to help reading from the header block
typesize = self._unpackheader(_fparttypesize)[0]
self.type = self._fromheader(typesize)
self.ui.debug('part type: "%s"\n' % self.type)
@ -597,14 +603,29 @@ class unbundlepart(unpackermixin):
self.mandatoryparams = manparams
self.advisoryparams = advparams
## part payload
payload = []
payloadsize = self._unpack(_fpayloadsize)[0]
self.ui.debug('payload chunk size: %i\n' % payloadsize)
while payloadsize:
payload.append(self._readexact(payloadsize))
def payloadchunks():
payloadsize = self._unpack(_fpayloadsize)[0]
self.ui.debug('payload chunk size: %i\n' % payloadsize)
self.data = ''.join(payload)
while payloadsize:
yield self._readexact(payloadsize)
payloadsize = self._unpack(_fpayloadsize)[0]
self.ui.debug('payload chunk size: %i\n' % payloadsize)
self._payloadstream = util.chunkbuffer(payloadchunks())
# we read the data, tell it
self._initialized = True
def read(self, size=None):
"""read payload data"""
if not self._initialized:
self._readheader()
if size is None:
data = self._payloadstream.read()
else:
data = self._payloadstream.read(size)
if size is None or len(data) < size:
self.consumed = True
return data
@parthandler('changegroup')
def handlechangegroup(op, inpart):
@ -619,7 +640,7 @@ def handlechangegroup(op, inpart):
# we need to make sure we trigger the creation of a transaction object used
# for the whole processing scope.
op.gettransaction()
data = StringIO.StringIO(inpart.data)
data = StringIO.StringIO(inpart.read())
data.seek(0)
cg = changegroup.readbundle(data, 'bundle2part')
ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
@ -631,6 +652,7 @@ def handlechangegroup(op, inpart):
[('in-reply-to', str(inpart.id)),
('return', '%i' % ret)])
op.reply.addpart(part)
assert not inpart.read()
@parthandler('reply:changegroup')
def handlechangegroup(op, inpart):

View File

@ -28,7 +28,7 @@ Create an extension to test bundle2 API
> """handle a "test:song" bundle2 part, printing the lyrics on stdin"""
> op.ui.write('The choir starts singing:\n')
> verses = 0
> for line in part.data.split('\n'):
> for line in part.read().split('\n'):
> op.ui.write(' %s\n' % line)
> verses += 1
> op.records.add('song', {'verses': verses})
@ -152,7 +152,7 @@ Create an extension to test bundle2 API
> ui.write(' :%s:\n' % p.type)
> ui.write(' mandatory: %i\n' % len(p.mandatoryparams))
> ui.write(' advisory: %i\n' % len(p.advisoryparams))
> ui.write(' payload: %i bytes\n' % len(p.data))
> ui.write(' payload: %i bytes\n' % len(p.read()))
> ui.write('parts count: %i\n' % count)
> EOF
$ cat >> $HGRCPATH << EOF
@ -378,48 +378,48 @@ Test part
part type: "test:empty"
part id: "0"
part parameters: 0
payload chunk size: 0
:test:empty:
mandatory: 0
advisory: 0
payload chunk size: 0
payload: 0 bytes
part header size: 17
part type: "test:empty"
part id: "1"
part parameters: 0
payload chunk size: 0
:test:empty:
mandatory: 0
advisory: 0
payload chunk size: 0
payload: 0 bytes
part header size: 16
part type: "test:song"
part id: "2"
part parameters: 0
payload chunk size: 178
payload chunk size: 0
:test:song:
mandatory: 0
advisory: 0
payload chunk size: 178
payload chunk size: 0
payload: 178 bytes
part header size: 43
part type: "test:math"
part id: "3"
part parameters: 3
payload chunk size: 2
payload chunk size: 0
:test:math:
mandatory: 2
advisory: 1
payload chunk size: 2
payload chunk size: 0
payload: 2 bytes
part header size: 16
part type: "test:ping"
part id: "4"
part parameters: 0
payload chunk size: 0
:test:ping:
mandatory: 0
advisory: 0
payload chunk size: 0
payload: 0 bytes
part header size: 0
end of bundle2 stream
@ -438,22 +438,22 @@ Process the bundle
part type: "test:empty"
part id: "0"
part parameters: 0
payload chunk size: 0
ignoring unknown advisory part 'test:empty'
payload chunk size: 0
part header size: 17
part type: "test:empty"
part id: "1"
part parameters: 0
payload chunk size: 0
ignoring unknown advisory part 'test:empty'
payload chunk size: 0
part header size: 16
part type: "test:song"
part id: "2"
part parameters: 0
payload chunk size: 178
payload chunk size: 0
found a handler for part 'test:song'
The choir starts singing:
payload chunk size: 178
payload chunk size: 0
Patali Dirapata, Cromda Cromda Ripalo, Pata Pata, Ko Ko Ko
Bokoro Dipoulito, Rondi Rondi Pepino, Pata Pata, Ko Ko Ko
Emana Karassoli, Loucra Loucra Ponponto, Pata Pata, Ko Ko Ko.
@ -461,16 +461,16 @@ Process the bundle
part type: "test:math"
part id: "3"
part parameters: 3
ignoring unknown advisory part 'test:math'
payload chunk size: 2
payload chunk size: 0
ignoring unknown advisory part 'test:math'
part header size: 16
part type: "test:ping"
part id: "4"
part parameters: 0
payload chunk size: 0
found a handler for part 'test:ping'
received ping request (id 4)
payload chunk size: 0
part header size: 0
end of bundle2 stream
0 unread bytes