From 61b489ab3de4c7befe841c91f34c415d40262e71 Mon Sep 17 00:00:00 2001 From: "Jerry (Xinyu Hou)" Date: Wed, 24 Aug 2016 17:09:42 +0100 Subject: [PATCH] Refactor write and read into functions --- src/lib/net/TCPSocket.cpp | 339 ++++++++++++++++++++------------------ src/lib/net/TCPSocket.h | 8 + 2 files changed, 185 insertions(+), 162 deletions(-) diff --git a/src/lib/net/TCPSocket.cpp b/src/lib/net/TCPSocket.cpp index e0ec92a7..cf5a81d0 100644 --- a/src/lib/net/TCPSocket.cpp +++ b/src/lib/net/TCPSocket.cpp @@ -323,6 +323,179 @@ TCPSocket::init() } } +TCPSocket::EJobResult +TCPSocket::doRead() +{ + try { + static UInt8 buffer[4096]; + memset(buffer, 0, sizeof(buffer)); + int bytesRead = 0; + int status = 0; + + if (isSecure()) { + if (isSecureReady()) { + status = secureRead(buffer, sizeof(buffer), bytesRead); + if (status < 0) { + return kBreak; + } + else if (status == 0) { + return kNew; + } + } + else { + return kRetry; + } + } + else { + bytesRead = (int) ARCH->readSocket(m_socket, buffer, sizeof(buffer)); + } + + if (bytesRead > 0) { + bool wasEmpty = (m_inputBuffer.getSize() == 0); + + // slurp up as much as possible + do { + m_inputBuffer.write(buffer, bytesRead); + + if (isSecure() && isSecureReady()) { + status = secureRead(buffer, sizeof(buffer), bytesRead); + if (status < 0) { + return kBreak; + } + } + else { + bytesRead = (int) ARCH->readSocket(m_socket, buffer, sizeof(buffer)); + } + + } while (bytesRead > 0 || status > 0); + + // send input ready if input buffer was empty + if (wasEmpty) { + sendEvent(m_events->forIStream().inputReady()); + } + } + else { + // remote write end of stream hungup. our input side + // has therefore shutdown but don't flush our buffer + // since there's still data to be read. + sendEvent(m_events->forIStream().inputShutdown()); + if (!m_writable && m_inputBuffer.getSize() == 0) { + sendEvent(m_events->forISocket().disconnected()); + m_connected = false; + } + m_readable = false; + return kNew; + } + } + catch (XArchNetworkDisconnected&) { + // stream hungup + sendEvent(m_events->forISocket().disconnected()); + onDisconnected(); + return kNew; + } + catch (XArchNetwork& e) { + // ignore other read error + LOG((CLOG_WARN "error reading socket: %s", e.what())); + } + + return kRetry; +} + +TCPSocket::EJobResult +TCPSocket::doWrite() +{ + static bool s_retry = false; + static int s_retrySize = 0; + static void* s_staticBuffer = NULL; + + try { + // write data + int bufferSize = 0; + int bytesWrote = 0; + int status = 0; + + if (s_retry) { + bufferSize = s_retrySize; + } + else { + bufferSize = m_outputBuffer.getSize(); + s_staticBuffer = malloc(bufferSize); + memcpy(s_staticBuffer, m_outputBuffer.peek(bufferSize), bufferSize); + } + + if (bufferSize == 0) { + return kRetry; + } + + if (isSecure()) { + if (isSecureReady()) { + status = secureWrite(s_staticBuffer, bufferSize, bytesWrote); + if (status > 0) { + s_retry = false; + bufferSize = 0; + free(s_staticBuffer); + s_staticBuffer = NULL; + } + else if (status < 0) { + return kBreak; + } + else if (status == 0) { + s_retry = true; + s_retrySize = bufferSize; + return kNew; + } + } + else { + return kRetry; + } + } + else { + bytesWrote = (UInt32)ARCH->writeSocket(m_socket, s_staticBuffer, bufferSize); + bufferSize = 0; + free(s_staticBuffer); + s_staticBuffer = NULL; + } + + // discard written data + if (bytesWrote > 0) { + m_outputBuffer.pop(bytesWrote); + if (m_outputBuffer.getSize() == 0) { + sendEvent(m_events->forIStream().outputFlushed()); + m_flushed = true; + m_flushed.broadcast(); + return kNew; + } + } + } + catch (XArchNetworkShutdown&) { + // remote read end of stream hungup. our output side + // has therefore shutdown. + onOutputShutdown(); + sendEvent(m_events->forIStream().outputShutdown()); + if (!m_readable && m_inputBuffer.getSize() == 0) { + sendEvent(m_events->forISocket().disconnected()); + m_connected = false; + } + return kNew; + } + catch (XArchNetworkDisconnected&) { + // stream hungup + onDisconnected(); + sendEvent(m_events->forISocket().disconnected()); + return kNew; + } + catch (XArchNetwork& e) { + // other write error + LOG((CLOG_WARN "error writing socket: %s", e.what())); + onDisconnected(); + sendEvent(m_events->forIStream().outputError()); + sendEvent(m_events->forISocket().disconnected()); + return kNew; + } + + return kRetry; +} + void TCPSocket::setJob(ISocketMultiplexerJob* job) { @@ -468,172 +641,14 @@ TCPSocket::serviceConnected(ISocketMultiplexerJob* job, return newJob(); } - bool needNewJob = false; - - static bool s_retry = false; - static int s_retrySize = 0; - static void* s_staticBuffer = NULL; - + EJobResult result = kRetry; if (write) { - try { - // write data - int bufferSize = 0; - int bytesWrote = 0; - int status = 0; - - if (s_retry) { - bufferSize = s_retrySize; - } - else { - bufferSize = m_outputBuffer.getSize(); - s_staticBuffer = malloc(bufferSize); - memcpy(s_staticBuffer, m_outputBuffer.peek(bufferSize), bufferSize); - } - - if (bufferSize == 0) { - return job; - } - - if (isSecure()) { - if (isSecureReady()) { - status = secureWrite(s_staticBuffer, bufferSize, bytesWrote); - if (status > 0) { - s_retry = false; - bufferSize = 0; - free(s_staticBuffer); - s_staticBuffer = NULL; - } - else if (status < 0) { - return NULL; - } - else if (status == 0) { - s_retry = true; - s_retrySize = bufferSize; - return newJob(); - } - } - else { - return job; - } - } - else { - bytesWrote = (UInt32)ARCH->writeSocket(m_socket, s_staticBuffer, bufferSize); - bufferSize = 0; - free(s_staticBuffer); - s_staticBuffer = NULL; - } - - // discard written data - if (bytesWrote > 0) { - m_outputBuffer.pop(bytesWrote); - if (m_outputBuffer.getSize() == 0) { - sendEvent(m_events->forIStream().outputFlushed()); - m_flushed = true; - m_flushed.broadcast(); - needNewJob = true; - } - } - } - catch (XArchNetworkShutdown&) { - // remote read end of stream hungup. our output side - // has therefore shutdown. - onOutputShutdown(); - sendEvent(m_events->forIStream().outputShutdown()); - if (!m_readable && m_inputBuffer.getSize() == 0) { - sendEvent(m_events->forISocket().disconnected()); - m_connected = false; - } - needNewJob = true; - } - catch (XArchNetworkDisconnected&) { - // stream hungup - onDisconnected(); - sendEvent(m_events->forISocket().disconnected()); - needNewJob = true; - } - catch (XArchNetwork& e) { - // other write error - LOG((CLOG_WARN "error writing socket: %s", e.what())); - onDisconnected(); - sendEvent(m_events->forIStream().outputError()); - sendEvent(m_events->forISocket().disconnected()); - needNewJob = true; - } + result = doWrite(); } if (read && m_readable) { - try { - static UInt8 buffer[4096]; - memset(buffer, 0, sizeof(buffer)); - int bytesRead = 0; - int status = 0; - - if (isSecure()) { - if (isSecureReady()) { - status = secureRead(buffer, sizeof(buffer), bytesRead); - if (status < 0) { - return NULL; - } - else if (status == 0) { - return newJob(); - } - } - else { - return job; - } - } - else { - bytesRead = (int) ARCH->readSocket(m_socket, buffer, sizeof(buffer)); - } - - if (bytesRead > 0) { - bool wasEmpty = (m_inputBuffer.getSize() == 0); - - // slurp up as much as possible - do { - m_inputBuffer.write(buffer, bytesRead); - - if (isSecure() && isSecureReady()) { - status = secureRead(buffer, sizeof(buffer), bytesRead); - if (status < 0) { - return NULL; - } - } - else { - bytesRead = (int) ARCH->readSocket(m_socket, buffer, sizeof(buffer)); - } - - } while (bytesRead > 0 || status > 0); - - // send input ready if input buffer was empty - if (wasEmpty) { - sendEvent(m_events->forIStream().inputReady()); - } - } - else { - // remote write end of stream hungup. our input side - // has therefore shutdown but don't flush our buffer - // since there's still data to be read. - sendEvent(m_events->forIStream().inputShutdown()); - if (!m_writable && m_inputBuffer.getSize() == 0) { - sendEvent(m_events->forISocket().disconnected()); - m_connected = false; - } - m_readable = false; - needNewJob = true; - } - } - catch (XArchNetworkDisconnected&) { - // stream hungup - sendEvent(m_events->forISocket().disconnected()); - onDisconnected(); - needNewJob = true; - } - catch (XArchNetwork& e) { - // ignore other read error - LOG((CLOG_WARN "error reading socket: %s", e.what())); - } + result = doRead(); } - return needNewJob ? newJob() : job; + return result == kBreak ? NULL : result == kNew ? newJob() : job; } diff --git a/src/lib/net/TCPSocket.h b/src/lib/net/TCPSocket.h index a84dace0..e4565385 100644 --- a/src/lib/net/TCPSocket.h +++ b/src/lib/net/TCPSocket.h @@ -66,12 +66,20 @@ public: virtual void setFingerprintFilename(String& f) {} protected: + enum EJobResult { + kBreak = -1, //!< Break the Job chain + kRetry, //!< Retry the same job + kNew //!< Require a new job + }; + ArchSocket getSocket() { return m_socket; } IEventQueue* getEvents() { return m_events; } virtual bool isSecureReady() { return false; } virtual bool isSecure() { return false; } virtual int secureRead(void* buffer, int, int& ) { return 0; } virtual int secureWrite(const void*, int, int& ) { return 0; } + virtual EJobResult doRead(); + virtual EJobResult doWrite(); void setJob(ISocketMultiplexerJob*);