barrier/lib/synergy/CPacketStreamFilter.cpp
crs 48908242d2 Checkpoint. Conversion to event driven system complete for Unix.
Still need to convert win32 platform specific files.
2004-02-15 17:32:11 +00:00

224 lines
4.6 KiB
C++

/*
* synergy -- mouse and keyboard sharing utility
* Copyright (C) 2004 Chris Schoeneman
*
* This package is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* found in the file COPYING that should have accompanied this file.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*/
#include "CPacketStreamFilter.h"
#include "IEventQueue.h"
#include "CLock.h"
#include "TMethodEventJob.h"
//
// CPacketStreamFilter
//
CPacketStreamFilter::CPacketStreamFilter(IStream* stream, bool adoptStream) :
CStreamFilter(stream, adoptStream),
m_size(0),
m_eventFilter(NULL),
m_inputShutdown(false)
{
// install event filter
getStream()->setEventFilter(new TMethodEventJob<CPacketStreamFilter>(
this, &CPacketStreamFilter::filterEvent, NULL));
}
CPacketStreamFilter::~CPacketStreamFilter()
{
IEventJob* job = getStream()->getEventFilter();
getStream()->setEventFilter(NULL);
delete job;
}
void
CPacketStreamFilter::close()
{
CLock lock(&m_mutex);
m_size = 0;
m_buffer.pop(m_buffer.getSize());
CStreamFilter::close();
}
UInt32
CPacketStreamFilter::read(void* buffer, UInt32 n)
{
if (n == 0) {
return 0;
}
CLock lock(&m_mutex);
// if not enough data yet then give up
if (!isReadyNoLock()) {
return 0;
}
// read no more than what's left in the buffered packet
if (n > m_size) {
n = m_size;
}
// read it
if (buffer != NULL) {
memcpy(buffer, m_buffer.peek(n), n);
}
m_buffer.pop(n);
m_size -= n;
// get next packet's size if we've finished with this packet and
// there's enough data to do so.
readPacketSize();
if (m_inputShutdown && m_size == 0) {
sendEvent(CEvent(getInputShutdownEvent(), getEventTarget(), NULL));
}
return n;
}
void
CPacketStreamFilter::write(const void* buffer, UInt32 count)
{
// write the length of the payload
UInt8 length[4];
length[0] = (UInt8)((count >> 24) & 0xff);
length[1] = (UInt8)((count >> 16) & 0xff);
length[2] = (UInt8)((count >> 8) & 0xff);
length[3] = (UInt8)( count & 0xff);
getStream()->write(length, sizeof(length));
// write the payload
getStream()->write(buffer, count);
}
void
CPacketStreamFilter::shutdownInput()
{
CLock lock(&m_mutex);
m_size = 0;
m_buffer.pop(m_buffer.getSize());
CStreamFilter::shutdownInput();
}
void
CPacketStreamFilter::setEventFilter(IEventJob* filter)
{
CLock lock(&m_mutex);
m_eventFilter = filter;
}
bool
CPacketStreamFilter::isReady() const
{
CLock lock(&m_mutex);
return isReadyNoLock();
}
UInt32
CPacketStreamFilter::getSize() const
{
CLock lock(&m_mutex);
return isReadyNoLock() ? m_size : 0;
}
IEventJob*
CPacketStreamFilter::getEventFilter() const
{
CLock lock(&m_mutex);
return m_eventFilter;
}
bool
CPacketStreamFilter::isReadyNoLock() const
{
return (m_size != 0 && m_buffer.getSize() >= m_size);
}
void
CPacketStreamFilter::readPacketSize()
{
// note -- m_mutex must be locked on entry
if (m_size == 0 && m_buffer.getSize() >= 4) {
UInt8 buffer[4];
memcpy(buffer, m_buffer.peek(sizeof(buffer)), sizeof(buffer));
m_buffer.pop(sizeof(buffer));
m_size = ((UInt32)buffer[0] << 24) |
((UInt32)buffer[1] << 16) |
((UInt32)buffer[2] << 8) |
(UInt32)buffer[3];
}
}
void
CPacketStreamFilter::readMore()
{
// note -- m_mutex must be locked on entry
// note if we have whole packet
bool wasReady = isReadyNoLock();
// read more data
char buffer[4096];
UInt32 n = getStream()->read(buffer, sizeof(buffer));
while (n > 0) {
m_buffer.write(buffer, n);
n = getStream()->read(buffer, sizeof(buffer));
}
// if we don't yet have the next packet size then get it,
// if possible.
readPacketSize();
// note if we now have a whole packet
bool isReady = isReadyNoLock();
// if we weren't ready before but now we are then send a
// input ready event apparently from the filtered stream.
if (wasReady != isReady) {
sendEvent(CEvent(getInputReadyEvent(), getEventTarget(), NULL));
}
}
void
CPacketStreamFilter::sendEvent(const CEvent& event)
{
if (m_eventFilter != NULL) {
m_eventFilter->run(event);
}
else {
EVENTQUEUE->addEvent(event);
}
}
void
CPacketStreamFilter::filterEvent(const CEvent& event, void*)
{
CLock lock(&m_mutex);
if (event.getType() == getInputReadyEvent()) {
readMore();
return;
}
else if (event.getType() == getInputShutdownEvent()) {
// discard this if we have buffered data
m_inputShutdown = true;
if (m_size == 0) {
sendEvent(CEvent(getInputShutdownEvent(), getEventTarget(), NULL));
}
return;
}
// pass event
sendEvent(event);
}