mirror of
https://github.com/debauchee/barrier.git
synced 2024-11-24 06:15:57 +03:00
ed8ed72f26
init() it while it's already running. fixed an uninitialized pointer bug in CServer and some cleanup-on-error code in CMSWindowsPrimaryScreen. also added timeout to read() on IInputStream and a heartbeat sent by clients so the server can disconnect clients that are dead but never reset the TCP connection. previously the server would keep these dead clients around forever and if the user was locked on the client screen for some reason then the server would have to be rebooted (or the server would have to be killed via a remote login).
168 lines
3.0 KiB
C++
168 lines
3.0 KiB
C++
#include "CInputPacketStream.h"
|
|
#include "CLock.h"
|
|
#include "CStopwatch.h"
|
|
|
|
//
|
|
// CInputPacketStream
|
|
//
|
|
|
|
CInputPacketStream::CInputPacketStream(IInputStream* stream, bool adopt) :
|
|
CInputStreamFilter(stream, adopt),
|
|
m_mutex(),
|
|
m_size(0),
|
|
m_buffer(&m_mutex, NULL)
|
|
{
|
|
// do nothing
|
|
}
|
|
|
|
CInputPacketStream::~CInputPacketStream()
|
|
{
|
|
// do nothing
|
|
}
|
|
|
|
void
|
|
CInputPacketStream::close()
|
|
{
|
|
getStream()->close();
|
|
}
|
|
|
|
UInt32
|
|
CInputPacketStream::read(void* buffer, UInt32 n, double timeout)
|
|
{
|
|
CLock lock(&m_mutex);
|
|
|
|
// wait for entire message to be read. return if stream
|
|
// hungup or timeout.
|
|
switch (waitForFullMessage(timeout)) {
|
|
case kData:
|
|
break;
|
|
|
|
case kHungup:
|
|
return 0;
|
|
|
|
case kTimedout:
|
|
return (UInt32)-1;
|
|
}
|
|
|
|
// limit number of bytes to read to the number of bytes left in the
|
|
// current message.
|
|
if (n > m_size) {
|
|
n = m_size;
|
|
}
|
|
|
|
// now read from our buffer
|
|
n = m_buffer.readNoLock(buffer, n, -1.0);
|
|
assert(n <= m_size);
|
|
m_size -= n;
|
|
|
|
return n;
|
|
}
|
|
|
|
UInt32
|
|
CInputPacketStream::getSize() const
|
|
{
|
|
CLock lock(&m_mutex);
|
|
return getSizeNoLock();
|
|
}
|
|
|
|
UInt32
|
|
CInputPacketStream::getSizeNoLock() const
|
|
{
|
|
CStopwatch timer(true);
|
|
while (!hasFullMessage() && getStream()->getSize() > 0) {
|
|
// read more data
|
|
if (getMoreMessage(-1.0) != kData) {
|
|
// stream hungup
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
return m_size;
|
|
}
|
|
|
|
CInputPacketStream::EResult
|
|
CInputPacketStream::waitForFullMessage(double timeout) const
|
|
{
|
|
CStopwatch timer(true);
|
|
while (!hasFullMessage()) {
|
|
// compute remaining timeout
|
|
double t = timeout - timer.getTime();
|
|
if (timeout >= 0.0 && t <= 0.0) {
|
|
// timeout
|
|
return kTimedout;
|
|
}
|
|
|
|
// read more data
|
|
switch (getMoreMessage(t)) {
|
|
case kData:
|
|
break;
|
|
|
|
case kHungup:
|
|
// stream hungup
|
|
return kHungup;
|
|
|
|
case kTimedout:
|
|
// stream timed out
|
|
return kTimedout;
|
|
}
|
|
}
|
|
|
|
return kData;
|
|
}
|
|
|
|
CInputPacketStream::EResult
|
|
CInputPacketStream::getMoreMessage(double timeout) const
|
|
{
|
|
// read more data
|
|
char buffer[4096];
|
|
UInt32 n = getStream()->read(buffer, sizeof(buffer), timeout);
|
|
|
|
// return if stream timed out
|
|
if (n == (UInt32)-1) {
|
|
return kTimedout;
|
|
}
|
|
|
|
// return if stream hungup
|
|
if (n == 0) {
|
|
m_buffer.hangup();
|
|
return kHungup;
|
|
}
|
|
|
|
// append to our buffer
|
|
m_buffer.write(buffer, n);
|
|
|
|
return kData;
|
|
}
|
|
|
|
bool
|
|
CInputPacketStream::hasFullMessage() const
|
|
{
|
|
// get payload length if we don't have it yet
|
|
if (m_size == 0) {
|
|
// check if length field has been read yet
|
|
if (m_buffer.getSizeNoLock() < 4) {
|
|
// not enough data for payload length
|
|
return false;
|
|
}
|
|
|
|
// save payload length
|
|
UInt8 buffer[4];
|
|
UInt32 n = m_buffer.readNoLock(buffer, sizeof(buffer), -1.0);
|
|
assert(n == 4);
|
|
m_size = ((UInt32)buffer[0] << 24) |
|
|
((UInt32)buffer[1] << 16) |
|
|
((UInt32)buffer[2] << 8) |
|
|
(UInt32)buffer[3];
|
|
|
|
// if payload length is zero then discard null message
|
|
if (m_size == 0) {
|
|
return false;
|
|
}
|
|
}
|
|
assert(m_size > 0);
|
|
|
|
// we have the full message when we have at least m_size bytes in
|
|
// the buffer
|
|
return (m_buffer.getSizeNoLock() >= m_size);
|
|
}
|