barrier/lib/io/CBufferedInputStream.cpp

119 lines
1.9 KiB
C++
Raw Normal View History

2001-10-06 18:13:28 +04:00
#include "CBufferedInputStream.h"
#include "CLock.h"
#include "CMutex.h"
#include "CThread.h"
#include "CStopwatch.h"
2001-10-06 18:13:28 +04:00
#include "IJob.h"
#include "XIO.h"
#include <cstring>
2001-10-06 18:13:28 +04:00
//
// CBufferedInputStream
//
CBufferedInputStream::CBufferedInputStream(
CMutex* mutex, IJob* adoptedCloseCB) :
m_mutex(mutex),
m_empty(mutex, true),
m_closeCB(adoptedCloseCB),
m_closed(false),
m_hungup(false)
2001-10-06 18:13:28 +04:00
{
assert(m_mutex != NULL);
}
CBufferedInputStream::~CBufferedInputStream()
{
delete m_closeCB;
}
void
CBufferedInputStream::write(const void* buffer, UInt32 n)
2001-10-06 18:13:28 +04:00
{
if (!m_hungup && n > 0) {
m_buffer.write(buffer, n);
2001-10-06 18:13:28 +04:00
m_empty = (m_buffer.getSize() == 0);
m_empty.broadcast();
}
}
void
CBufferedInputStream::hangup()
2001-10-06 18:13:28 +04:00
{
m_hungup = true;
m_empty.broadcast();
}
UInt32
CBufferedInputStream::readNoLock(void* buffer, UInt32 n, double timeout)
2001-10-06 18:13:28 +04:00
{
if (m_closed) {
throw XIOClosed();
}
// wait for data, hangup, or timeout
CStopwatch timer(true);
2001-10-06 18:13:28 +04:00
while (!m_hungup && m_empty == true) {
if (!m_empty.wait(timer, timeout)) {
// timed out
return (UInt32)-1;
}
2001-10-06 18:13:28 +04:00
}
// read data
const UInt32 count = m_buffer.getSize();
if (n > count) {
n = count;
}
if (n > 0) {
if (buffer != NULL) {
memcpy(buffer, m_buffer.peek(n), n);
2001-10-06 18:13:28 +04:00
}
m_buffer.pop(n);
}
// update empty state
if (m_buffer.getSize() == 0) {
m_empty = true;
m_empty.broadcast();
}
return n;
}
UInt32
CBufferedInputStream::getSizeNoLock() const
2001-10-06 18:13:28 +04:00
{
return m_buffer.getSize();
}
void
CBufferedInputStream::close()
2001-10-06 18:13:28 +04:00
{
CLock lock(m_mutex);
if (m_closed) {
throw XIOClosed();
}
m_closed = true;
2001-10-21 04:21:02 +04:00
m_hungup = true;
m_buffer.pop(m_buffer.getSize());
m_empty.broadcast();
if (m_closeCB != NULL) {
2001-10-06 18:13:28 +04:00
m_closeCB->run();
}
}
UInt32
CBufferedInputStream::read(void* buffer, UInt32 n, double timeout)
2001-10-06 18:13:28 +04:00
{
CLock lock(m_mutex);
return readNoLock(buffer, n, timeout);
2001-10-06 18:13:28 +04:00
}
UInt32
CBufferedInputStream::getSize() const
2001-10-06 18:13:28 +04:00
{
CLock lock(m_mutex);
return getSizeNoLock();
}