barrier/lib/synergy/CInputPacketStream.cpp
crs fee4095624 Reorganized source tree. Moved client.cpp into cmd/synergy as
synergy.cpp and server.cpp into cmd/synergyd as synergyd.cpp.
Moved and renamed related files.  Moved remaining source files
into lib/....  Modified and added makefiles as appropriate.
Result is that library files are under lib with each library
in its own directory and program files are under cmd with each
command in its own directory.
2002-07-30 16:52:46 +00:00

168 lines
3.0 KiB
C++

#include "CInputPacketStream.h"
#include "CLock.h"
#include "CStopwatch.h"
//
// CInputPacketStream
//
CInputPacketStream::CInputPacketStream(IInputStream* stream, bool adopt) :
CInputStreamFilter(stream, adopt),
m_mutex(),
m_size(0),
m_buffer(&m_mutex, NULL)
{
// do nothing
}
CInputPacketStream::~CInputPacketStream()
{
// do nothing
}
void
CInputPacketStream::close()
{
getStream()->close();
}
UInt32
CInputPacketStream::read(void* buffer, UInt32 n, double timeout)
{
CLock lock(&m_mutex);
// wait for entire message to be read. return if stream
// hungup or timeout.
switch (waitForFullMessage(timeout)) {
case kData:
break;
case kHungup:
return 0;
case kTimedout:
return (UInt32)-1;
}
// limit number of bytes to read to the number of bytes left in the
// current message.
if (n > m_size) {
n = m_size;
}
// now read from our buffer
n = m_buffer.readNoLock(buffer, n, -1.0);
assert(n <= m_size);
m_size -= n;
return n;
}
UInt32
CInputPacketStream::getSize() const
{
CLock lock(&m_mutex);
return getSizeNoLock();
}
UInt32
CInputPacketStream::getSizeNoLock() const
{
CStopwatch timer(true);
while (!hasFullMessage() && getStream()->getSize() > 0) {
// read more data
if (getMoreMessage(-1.0) != kData) {
// stream hungup
return 0;
}
}
return m_size;
}
CInputPacketStream::EResult
CInputPacketStream::waitForFullMessage(double timeout) const
{
CStopwatch timer(true);
while (!hasFullMessage()) {
// compute remaining timeout
double t = timeout - timer.getTime();
if (timeout >= 0.0 && t <= 0.0) {
// timeout
return kTimedout;
}
// read more data
switch (getMoreMessage(t)) {
case kData:
break;
case kHungup:
// stream hungup
return kHungup;
case kTimedout:
// stream timed out
return kTimedout;
}
}
return kData;
}
CInputPacketStream::EResult
CInputPacketStream::getMoreMessage(double timeout) const
{
// read more data
char buffer[4096];
UInt32 n = getStream()->read(buffer, sizeof(buffer), timeout);
// return if stream timed out
if (n == (UInt32)-1) {
return kTimedout;
}
// return if stream hungup
if (n == 0) {
m_buffer.hangup();
return kHungup;
}
// append to our buffer
m_buffer.write(buffer, n);
return kData;
}
bool
CInputPacketStream::hasFullMessage() const
{
// get payload length if we don't have it yet
if (m_size == 0) {
// check if length field has been read yet
if (m_buffer.getSizeNoLock() < 4) {
// not enough data for payload length
return false;
}
// save payload length
UInt8 buffer[4];
UInt32 n = m_buffer.readNoLock(buffer, sizeof(buffer), -1.0);
assert(n == 4);
m_size = ((UInt32)buffer[0] << 24) |
((UInt32)buffer[1] << 16) |
((UInt32)buffer[2] << 8) |
(UInt32)buffer[3];
// if payload length is zero then discard null message
if (m_size == 0) {
return false;
}
}
assert(m_size > 0);
// we have the full message when we have at least m_size bytes in
// the buffer
return (m_buffer.getSizeNoLock() >= m_size);
}