diff options
author | Andre Heinecke <[email protected]> | 2016-05-10 14:43:36 +0000 |
---|---|---|
committer | Andre Heinecke <[email protected]> | 2016-05-10 14:43:36 +0000 |
commit | 34e5c5b33de3608bedc2826be27a06e6eec0ea44 (patch) | |
tree | b04ce687c5dcb72d84af230e05335da0213bf6de /src/kdpipeiodevice.cpp | |
parent | Allow cc to detect missing cases in a switch. (diff) | |
parent | Qt / Cpp: Fix make dist (diff) | |
download | gpgme-34e5c5b33de3608bedc2826be27a06e6eec0ea44.tar.gz gpgme-34e5c5b33de3608bedc2826be27a06e6eec0ea44.zip |
Merge branch 'gpgmepp'
Diffstat (limited to 'src/kdpipeiodevice.cpp')
-rw-r--r-- | src/kdpipeiodevice.cpp | 951 |
1 files changed, 0 insertions, 951 deletions
diff --git a/src/kdpipeiodevice.cpp b/src/kdpipeiodevice.cpp deleted file mode 100644 index 5661790a..00000000 --- a/src/kdpipeiodevice.cpp +++ /dev/null @@ -1,951 +0,0 @@ -/* - Copyright (C) 2007 Klar�lvdalens Datakonsult AB - - KDPipeIODevice is free software; you can redistribute it and/or - modify it under the terms of the GNU Library General Public - License as published by the Free Software Foundation; either - version 2 of the License, or (at your option) any later version. - - KDPipeIODevice is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Library General Public License for more details. - - You should have received a copy of the GNU Library General Public License - along with KDPipeIODevice; see the file COPYING.LIB. If not, write to the - Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, - Boston, MA 02110-1301, USA. -*/ - -#include "kdpipeiodevice.h" - -#include <QtCore> - -#include <cassert> -#include <memory> -#include <algorithm> - -#ifdef Q_OS_WIN32 -# ifndef NOMINMAX -# define NOMINMAX -# endif -# include <windows.h> -# include <io.h> -#else -# include <unistd.h> -# include <errno.h> -#endif - -using namespace _gpgme_; - -#ifndef KDAB_CHECK_THIS -# define KDAB_CHECK_CTOR (void)1 -# define KDAB_CHECK_DTOR KDAB_CHECK_CTOR -# define KDAB_CHECK_THIS KDAB_CHECK_CTOR -#endif - -#define LOCKED( d ) const QMutexLocker locker( &d->mutex ) -#define synchronized( d ) if ( int i = 0 ) {} else for ( const QMutexLocker locker( &d->mutex ) ; !i ; ++i ) - -const unsigned int BUFFER_SIZE = 4096; -const bool ALLOW_QIODEVICE_BUFFERING = true; - -// comment to get trace output: -//#define qDebug if(1){}else qDebug - -namespace { -class Reader : public QThread { - Q_OBJECT -public: - Reader( int fd, Qt::HANDLE handle ); - ~Reader(); - - qint64 readData( char * data, qint64 maxSize ); - - unsigned int bytesInBuffer() const { - return ( wptr + sizeof buffer - rptr ) % sizeof buffer ; - } - - bool bufferFull() const { - return bytesInBuffer() == sizeof buffer - 1; - } - - bool bufferEmpty() const { - return bytesInBuffer() == 0; - } - - bool bufferContains( char ch ) { - const unsigned int bib = bytesInBuffer(); - for ( unsigned int i = rptr ; i < rptr + bib ; ++i ) - if ( buffer[i%sizeof buffer] == ch ) - return true; - return false; - } - - void notifyReadyRead(); - -Q_SIGNALS: - void readyRead(); - -protected: - /* reimp */ void run(); - -private: - int fd; - Qt::HANDLE handle; -public: - QMutex mutex; - QWaitCondition waitForCancelCondition; - QWaitCondition bufferNotFullCondition; - QWaitCondition bufferNotEmptyCondition; - QWaitCondition hasStarted; - QWaitCondition readyReadSentCondition; - QWaitCondition blockedConsumerIsDoneCondition; - bool cancel; - bool eof; - bool error; - bool eofShortCut; - int errorCode; - bool isReading; - bool consumerBlocksOnUs; - -private: - unsigned int rptr, wptr; - char buffer[BUFFER_SIZE+1]; // need to keep one byte free to detect empty state -}; - - -Reader::Reader( int fd_, Qt::HANDLE handle_ ) - : QThread(), - fd( fd_ ), - handle( handle_ ), - mutex(), - bufferNotFullCondition(), - bufferNotEmptyCondition(), - hasStarted(), - cancel( false ), - eof( false ), - error( false ), - eofShortCut( false ), - errorCode( 0 ), - isReading( false ), - consumerBlocksOnUs( false ), - rptr( 0 ), wptr( 0 ) -{ - -} - -Reader::~Reader() {} - - -class Writer : public QThread { - Q_OBJECT -public: - Writer( int fd, Qt::HANDLE handle ); - ~Writer(); - - qint64 writeData( const char * data, qint64 size ); - - unsigned int bytesInBuffer() const { return numBytesInBuffer; } - - bool bufferFull() const { - return numBytesInBuffer == sizeof buffer; - } - - bool bufferEmpty() const { - return numBytesInBuffer == 0; - } - -Q_SIGNALS: - void bytesWritten( qint64 ); - -protected: - /* reimp */ void run(); - -private: - int fd; - Qt::HANDLE handle; -public: - QMutex mutex; - QWaitCondition bufferEmptyCondition; - QWaitCondition bufferNotEmptyCondition; - QWaitCondition hasStarted; - bool cancel; - bool error; - int errorCode; -private: - unsigned int numBytesInBuffer; - char buffer[BUFFER_SIZE]; -}; -} - -Writer::Writer( int fd_, Qt::HANDLE handle_ ) - : QThread(), - fd( fd_ ), - handle( handle_ ), - mutex(), - bufferEmptyCondition(), - bufferNotEmptyCondition(), - hasStarted(), - cancel( false ), - error( false ), - errorCode( 0 ), - numBytesInBuffer( 0 ) -{ - -} - -Writer::~Writer() {} - - -class KDPipeIODevice::Private : public QObject { -Q_OBJECT - friend class ::KDPipeIODevice; - KDPipeIODevice * const q; -public: - explicit Private( KDPipeIODevice * qq ); - ~Private(); - - bool doOpen( int, Qt::HANDLE, OpenMode ); - bool startReaderThread(); - bool startWriterThread(); - void stopThreads(); - -public Q_SLOTS: - void emitReadyRead(); - -private: - int fd; - Qt::HANDLE handle; - Reader * reader; - Writer * writer; - bool triedToStartReader; - bool triedToStartWriter; -}; - -KDPipeIODevice::Private::Private( KDPipeIODevice * qq ) - : QObject( qq ), q( qq ), - fd( -1 ), - handle( 0 ), - reader( 0 ), - writer( 0 ), - triedToStartReader( false ), triedToStartWriter( false ) -{ - -} - -KDPipeIODevice::Private::~Private() { - qDebug( "KDPipeIODevice::~Private(): Destroying %p", q ); -} - -KDPipeIODevice::KDPipeIODevice( QObject * p ) - : QIODevice( p ), d( new Private( this ) ) -{ - KDAB_CHECK_CTOR; -} - -KDPipeIODevice::KDPipeIODevice( int fd, OpenMode mode, QObject * p ) - : QIODevice( p ), d( new Private( this ) ) -{ - KDAB_CHECK_CTOR; - open( fd, mode ); -} - -KDPipeIODevice::KDPipeIODevice( Qt::HANDLE handle, OpenMode mode, QObject * p ) - : QIODevice( p ), d( new Private( this ) ) -{ - KDAB_CHECK_CTOR; - open( handle, mode ); -} - -KDPipeIODevice::~KDPipeIODevice() { KDAB_CHECK_DTOR; - if ( isOpen() ) - close(); - delete d; d = 0; -} - - -bool KDPipeIODevice::open( int fd, OpenMode mode ) { KDAB_CHECK_THIS; - -#ifdef Q_OS_WIN32 - return d->doOpen( fd, (HANDLE)_get_osfhandle( fd ), mode ); -#else - return d->doOpen( fd, 0, mode ); -#endif - -} - -bool KDPipeIODevice::open( Qt::HANDLE h, OpenMode mode ) { KDAB_CHECK_THIS; - -#ifdef Q_OS_WIN32 - return d->doOpen( -1, h, mode ); -#else - Q_UNUSED( h ); - Q_UNUSED( mode ); - assert( !"KDPipeIODevice::open( Qt::HANDLE, OpenMode ) should never be called except on Windows." ); -#endif - -} - -bool KDPipeIODevice::Private::startReaderThread() -{ - if ( triedToStartReader ) - return true; - triedToStartReader = true; - if ( reader && !reader->isRunning() && !reader->isFinished() ) { - qDebug("KDPipeIODevice::Private::startReaderThread(): locking reader (CONSUMER THREAD)" ); - LOCKED( reader ); - qDebug("KDPipeIODevice::Private::startReaderThread(): locked reader (CONSUMER THREAD)" ); - reader->start( QThread::HighestPriority ); - qDebug("KDPipeIODevice::Private::startReaderThread(): waiting for hasStarted (CONSUMER THREAD)" ); - const bool hasStarted = reader->hasStarted.wait( &reader->mutex, 1000 ); - qDebug("KDPipeIODevice::Private::startReaderThread(): returned from hasStarted (CONSUMER THREAD)" ); - - return hasStarted; - } - return true; -} - -bool KDPipeIODevice::Private::startWriterThread() -{ - if ( triedToStartWriter ) - return true; - triedToStartWriter = true; - if ( writer && !writer->isRunning() && !writer->isFinished() ) { - LOCKED( writer ); - - writer->start( QThread::HighestPriority ); - if ( !writer->hasStarted.wait( &writer->mutex, 1000 ) ) - return false; - } - return true; -} - -void KDPipeIODevice::Private::emitReadyRead() -{ - QPointer<Private> thisPointer( this ); - qDebug( "KDPipeIODevice::Private::emitReadyRead %p", this ); - - emit q->readyRead(); - - if ( !thisPointer ) - return; - - bool mustNotify = false; - - if ( reader ) { - qDebug( "KDPipeIODevice::Private::emitReadyRead %p: locking reader (CONSUMER THREAD)", this ); - synchronized( reader ) { - qDebug( "KDPipeIODevice::Private::emitReadyRead %p: locked reader (CONSUMER THREAD)", this ); - reader->readyReadSentCondition.wakeAll(); - mustNotify = !reader->bufferEmpty() && reader->isReading; - qDebug( "KDPipeIODevice::emitReadyRead %p: bufferEmpty: %d reader in ReadFile: %d", this, reader->bufferEmpty(), reader->isReading ); - } - } - if ( mustNotify ) - QTimer::singleShot( 100, this, SLOT( emitReadyRead() ) ); - qDebug( "KDPipeIODevice::Private::emitReadyRead %p leaving", this ); - -} - -bool KDPipeIODevice::Private::doOpen( int fd_, Qt::HANDLE handle_, OpenMode mode_ ) { - - if ( q->isOpen() ) - return false; - -#ifdef Q_OS_WIN32 - if ( !handle_ ) - return false; -#else - if ( fd_ < 0 ) - return false; -#endif - - if ( !(mode_ & ReadWrite) ) - return false; // need to have at least read -or- write - - - std::auto_ptr<Reader> reader_; - std::auto_ptr<Writer> writer_; - - if ( mode_ & ReadOnly ) { - reader_.reset( new Reader( fd_, handle_ ) ); - qDebug( "KDPipeIODevice::doOpen (%p): created reader (%p) for fd %d", this, reader_.get(), fd_ ); - connect( reader_.get(), SIGNAL(readyRead()), this, SLOT(emitReadyRead()), -Qt::QueuedConnection ); - } - if ( mode_ & WriteOnly ) { - writer_.reset( new Writer( fd_, handle_ ) ); - qDebug( "KDPipeIODevice::doOpen (%p): created writer (%p) for fd %d", this, writer_.get(), fd_ ); - connect( writer_.get(), SIGNAL(bytesWritten(qint64)), q, SIGNAL(bytesWritten(qint64)), -Qt::QueuedConnection ); - } - - // commit to *this: - fd = fd_; - handle = handle_; - reader = reader_.release(); - writer = writer_.release(); - - q->setOpenMode( mode_|Unbuffered ); - return true; -} - -int KDPipeIODevice::descriptor() const { KDAB_CHECK_THIS; - return d->fd; -} - - -Qt::HANDLE KDPipeIODevice::handle() const { KDAB_CHECK_THIS; - return d->handle; -} - -qint64 KDPipeIODevice::bytesAvailable() const { KDAB_CHECK_THIS; - const qint64 base = QIODevice::bytesAvailable(); - if ( !d->triedToStartReader ) { - d->startReaderThread(); - return base; - } - if ( d->reader ) - synchronized( d->reader ) { - const qint64 inBuffer = d->reader->bytesInBuffer(); - return base + inBuffer; - } - return base; -} - -qint64 KDPipeIODevice::bytesToWrite() const { KDAB_CHECK_THIS; - d->startWriterThread(); - const qint64 base = QIODevice::bytesToWrite(); - if ( d->writer ) - synchronized( d->writer ) return base + d->writer->bytesInBuffer(); - return base; -} - -bool KDPipeIODevice::canReadLine() const { KDAB_CHECK_THIS; - d->startReaderThread(); - if ( QIODevice::canReadLine() ) - return true; - if ( d->reader ) - synchronized( d->reader ) return d->reader->bufferContains( '\n' ); - return true; -} - -bool KDPipeIODevice::isSequential() const { - return true; -} - -bool KDPipeIODevice::atEnd() const { KDAB_CHECK_THIS; - d->startReaderThread(); - if ( !QIODevice::atEnd() ) { - qDebug( "%p: KDPipeIODevice::atEnd returns false since QIODevice::atEnd does (with bytesAvailable=%ld)", this, static_cast<long>(bytesAvailable()) ); - return false; - } - if ( !isOpen() ) - return true; - if ( d->reader->eofShortCut ) - return true; - LOCKED( d->reader ); - const bool eof = ( d->reader->error || d->reader->eof ) && d->reader->bufferEmpty(); - if ( !eof ) { - if ( !d->reader->error && !d->reader->eof ) - qDebug( "%p: KDPipeIODevice::atEnd returns false since !reader->error && !reader->eof", this ); - if ( !d->reader->bufferEmpty() ) - qDebug( "%p: KDPipeIODevice::atEnd returns false since !reader->bufferEmpty()", this ); - } - return eof; -} - -bool KDPipeIODevice::waitForBytesWritten( int msecs ) { KDAB_CHECK_THIS; - d->startWriterThread(); - Writer * const w = d->writer; - if ( !w ) - return true; - LOCKED( w ); - qDebug( "KDPipeIODevice::waitForBytesWritten (%p,w=%p): entered locked area", this, w -); - return w->bufferEmpty() || w->error || w->bufferEmptyCondition.wait( &w->mutex, msecs ) ; -} - -bool KDPipeIODevice::waitForReadyRead( int msecs ) { KDAB_CHECK_THIS; - qDebug( "KDPipeIODEvice::waitForReadyRead()(%p)", this); - d->startReaderThread(); - if ( ALLOW_QIODEVICE_BUFFERING ) { - if ( bytesAvailable() > 0 ) - return true; - } - Reader * const r = d->reader; - if ( !r || r->eofShortCut ) - return true; - LOCKED( r ); - if ( r->bytesInBuffer() != 0 || r->eof || r->error ) - return true; - - return msecs >= 0 ? r->bufferNotEmptyCondition.wait( &r->mutex, msecs ) : r->bufferNotEmptyCondition.wait( &r->mutex ); -} - -template <typename T> -class TemporaryValue { -public: - TemporaryValue( T& var_, const T& tv ) : var( var_ ), oldValue( var_ ) { var = tv; } - ~TemporaryValue() { var = oldValue; } -private: - T& var; - const T oldValue; -}; - - -bool KDPipeIODevice::readWouldBlock() const -{ - d->startReaderThread(); - LOCKED( d->reader ); - return d->reader->bufferEmpty() && !d->reader->eof && !d->reader->error; -} - -bool KDPipeIODevice::writeWouldBlock() const -{ - d->startWriterThread(); - LOCKED( d->writer ); - return !d->writer->bufferEmpty() && !d->writer->error; -} - - -qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS; - qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld", this, data, maxSize ); - d->startReaderThread(); - Reader * const r = d->reader; - - assert( r ); - - - //assert( r->isRunning() ); // wrong (might be eof, error) - assert( data || maxSize == 0 ); - assert( maxSize >= 0 ); - - if ( r->eofShortCut ) { - qDebug( "%p: KDPipeIODevice::readData: hit eofShortCut, returning 0", this ); - return 0; - } - - if ( maxSize < 0 ) - maxSize = 0; - - if ( ALLOW_QIODEVICE_BUFFERING ) { - if ( bytesAvailable() > 0 ) - maxSize = std::min( maxSize, bytesAvailable() ); // don't block - } - qDebug( "%p: KDPipeIODevice::readData: try to lock reader (CONSUMER THREAD)", this ); - LOCKED( r ); - qDebug( "%p: KDPipeIODevice::readData: locked reader (CONSUMER THREAD)", this ); - - r->readyReadSentCondition.wakeAll(); - if ( /* maxSize > 0 && */ r->bufferEmpty() && !r->error && !r->eof ) { // ### block on maxSize == 0? - qDebug( "%p: KDPipeIODevice::readData: waiting for bufferNotEmptyCondition (CONSUMER THREAD)", this ); - const TemporaryValue<bool> tmp( d->reader->consumerBlocksOnUs, true ); - r->bufferNotEmptyCondition.wait( &r->mutex ); - r->blockedConsumerIsDoneCondition.wakeAll(); - qDebug( "%p: KDPipeIODevice::readData: woke up from bufferNotEmptyCondition (CONSUMER THREAD)", this ); - } - - if ( r->bufferEmpty() ) { - qDebug( "%p: KDPipeIODevice::readData: got empty buffer, signal eof", this ); - // woken with an empty buffer must mean either EOF or error: - assert( r->eof || r->error ); - r->eofShortCut = true; - return r->eof ? 0 : -1 ; - } - - qDebug( "%p: KDPipeIODevice::readData: got bufferNotEmptyCondition, trying to read %lld bytes", this, maxSize ); - const qint64 bytesRead = r->readData( data, maxSize ); - qDebug( "%p: KDPipeIODevice::readData: read %lld bytes", this, bytesRead ); - qDebug( "%p (fd=%d): KDPipeIODevice::readData: %s", this, d->fd, data ); - - return bytesRead; -} - -qint64 Reader::readData( char * data, qint64 maxSize ) { - qint64 numRead = rptr < wptr ? wptr - rptr : sizeof buffer - rptr ; - if ( numRead > maxSize ) - numRead = maxSize; - - qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld; rptr=%u, wptr=%u (bytesInBuffer=%u); -> numRead=%lld", this, - data, maxSize, rptr, wptr, bytesInBuffer(), numRead ); - - std::memcpy( data, buffer + rptr, numRead ); - - rptr = ( rptr + numRead ) % sizeof buffer ; - - if ( !bufferFull() ) { - qDebug( "%p: KDPipeIODevice::readData: signal bufferNotFullCondition", this ); - bufferNotFullCondition.wakeAll(); - } - - return numRead; -} - -qint64 KDPipeIODevice::writeData( const char * data, qint64 size ) { KDAB_CHECK_THIS; - d->startWriterThread(); - Writer * const w = d->writer; - - assert( w ); - assert( w->error || w->isRunning() ); - assert( data || size == 0 ); - assert( size >= 0 ); - - LOCKED( w ); - - while ( !w->error && !w->bufferEmpty() ) { - qDebug( "%p: KDPipeIODevice::writeData: wait for empty buffer", this ); - w->bufferEmptyCondition.wait( &w->mutex ); - qDebug( "%p: KDPipeIODevice::writeData: empty buffer signaled", this ); - - } - if ( w->error ) - return -1; - - assert( w->bufferEmpty() ); - - return w->writeData( data, size ); -} - -qint64 Writer::writeData( const char * data, qint64 size ) { - assert( bufferEmpty() ); - - if ( size > static_cast<qint64>( sizeof buffer ) ) - size = sizeof buffer; - - std::memcpy( buffer, data, size ); - - numBytesInBuffer = size; - - if ( !bufferEmpty() ) { - bufferNotEmptyCondition.wakeAll(); - } - return size; -} - -void KDPipeIODevice::Private::stopThreads() -{ - if ( triedToStartWriter ) - { - if ( writer && q->bytesToWrite() > 0 ) - q->waitForBytesWritten( -1 ); - - assert( q->bytesToWrite() == 0 ); - } - if ( Reader * & r = reader ) { - disconnect( r, SIGNAL( readyRead() ), this, SLOT( emitReadyRead() ) ); - synchronized( r ) { - // tell thread to cancel: - r->cancel = true; - // and wake it, so it can terminate: - r->waitForCancelCondition.wakeAll(); - r->bufferNotFullCondition.wakeAll(); - r->readyReadSentCondition.wakeAll(); - } - } - if ( Writer * & w = writer ) { - synchronized( w ) { - // tell thread to cancel: - w->cancel = true; - // and wake it, so it can terminate: - w->bufferNotEmptyCondition.wakeAll(); - } - } -} - -void KDPipeIODevice::close() { KDAB_CHECK_THIS; - qDebug( "KDPipeIODevice::close(%p)", this ); - if ( !isOpen() ) - return; - - // tell clients we're about to close: - emit aboutToClose(); - d->stopThreads(); - -#define waitAndDelete( t ) if ( t ) { t->wait(); QThread* const t2 = t; t = 0; delete t2; } - qDebug( "KPipeIODevice::close(%p): wait and closing writer %p", this, d->writer ); - waitAndDelete( d->writer ); - qDebug( "KPipeIODevice::close(%p): wait and closing reader %p", this, d->reader ); - if ( d->reader ) { - LOCKED( d->reader ); - d->reader->readyReadSentCondition.wakeAll(); - } - waitAndDelete( d->reader ); -#undef waitAndDelete -#ifdef Q_OS_WIN32 - if ( d->fd != -1 ) - _close( d->fd ); - else - CloseHandle( d->handle ); -#else - ::close( d->fd ); -#endif - - setOpenMode( NotOpen ); - d->fd = -1; - d->handle = 0; -} - -void Reader::run() { - - LOCKED( this ); - - // too bad QThread doesn't have that itself; a signal isn't enough - hasStarted.wakeAll(); - - qDebug( "%p: Reader::run: started", this ); - - while ( true ) { - if ( !cancel && ( eof || error ) ) { - //notify the client until the buffer is empty and then once - //again so he receives eof/error. After that, wait for him - //to cancel - const bool wasEmpty = bufferEmpty(); - qDebug( "%p: Reader::run: received eof(%d) or error(%d), waking everyone", this, eof, error ); - notifyReadyRead(); - if ( !cancel && wasEmpty ) - waitForCancelCondition.wait( &mutex ); - } else if ( !cancel && !bufferFull() && !bufferEmpty() ) { - qDebug( "%p: Reader::run: buffer no longer empty, waking everyone", this ); - notifyReadyRead(); - } - - while ( !cancel && !error && bufferFull() ) { - notifyReadyRead(); - if ( !cancel && bufferFull() ) { - qDebug( "%p: Reader::run: buffer is full, going to sleep", this ); - bufferNotFullCondition.wait( &mutex ); - } - } - - if ( cancel ) { - qDebug( "%p: Reader::run: detected cancel", this ); - goto leave; - } - - if ( !eof && !error ) { - if ( rptr == wptr ) // optimize for larger chunks in case the buffer is empty - rptr = wptr = 0; - - unsigned int numBytes = ( rptr + sizeof buffer - wptr - 1 ) % sizeof buffer; - if ( numBytes > sizeof buffer - wptr ) - numBytes = sizeof buffer - wptr; - - qDebug( "%p: Reader::run: rptr=%d, wptr=%d -> numBytes=%d", this, rptr, wptr, numBytes ); - - assert( numBytes > 0 ); - - qDebug( "%p: Reader::run: trying to read %d bytes", this, numBytes ); -#ifdef Q_OS_WIN32 - isReading = true; - mutex.unlock(); - DWORD numRead; - const bool ok = ReadFile( handle, buffer + wptr, numBytes, &numRead, 0 ); - mutex.lock(); - isReading = false; - if ( ok ) { - if ( numRead == 0 ) { - qDebug( "%p: Reader::run: got eof (numRead==0)", this ); - eof = true; - } - } else { // !ok - errorCode = static_cast<int>( GetLastError() ); - if ( errorCode == ERROR_BROKEN_PIPE ) { - assert( numRead == 0 ); - qDebug( "%p: Reader::run: got eof (broken pipe)", this ); - eof = true; - } else { - assert( numRead == 0 ); - qDebug( "%p: Reader::run: got error: %s (%d)", this, strerror( errorCode ), errorCode ); - error = true; - } - } -#else - qint64 numRead; - mutex.unlock(); - do { - numRead = ::read( fd, buffer + wptr, numBytes ); - } while ( numRead == -1 && errno == EINTR ); - mutex.lock(); - - if ( numRead < 0 ) { - errorCode = errno; - error = true; - qDebug( "%p: Reader::run: got error: %d", this, errorCode ); - } else if ( numRead == 0 ) { - qDebug( "%p: Reader::run: eof detected", this ); - eof = true; - } -#endif - qDebug( "%p: Reader::run: read %ld bytes", this, static_cast<long>(numRead) ); - qDebug( "%p: Reader::run(fd=%d): %s", this, fd, buffer ); - - if ( numRead > 0 ) { - qDebug( "%p: Reader::run: buffer before: rptr=%4d, wptr=%4d", this, rptr, wptr ); - wptr = ( wptr + numRead ) % sizeof buffer; - qDebug( "%p: Reader::run: buffer after: rptr=%4d, wptr=%4d", this, rptr, wptr ); - } - } - } - leave: - qDebug( "%p: Reader::run: terminated", this ); -} - -void Reader::notifyReadyRead() -{ - qDebug( "notifyReadyRead: %d bytes available", bytesInBuffer() ); - assert( !cancel ); - - if ( consumerBlocksOnUs ) { - bufferNotEmptyCondition.wakeAll(); - blockedConsumerIsDoneCondition.wait( &mutex ); - return; - } - qDebug( "notifyReadyRead: emit signal" ); - emit readyRead(); - readyReadSentCondition.wait( &mutex ); - qDebug( "notifyReadyRead: returning from waiting, leave" ); -} - -void Writer::run() { - - LOCKED( this ); - - // too bad QThread doesn't have that itself; a signal isn't enough - hasStarted.wakeAll(); - - qDebug( "%p: Writer::run: started", this ); - - while ( true ) { - - while ( !cancel && bufferEmpty() ) { - qDebug( "%p: Writer::run: buffer is empty, wake bufferEmptyCond listeners", this ); - bufferEmptyCondition.wakeAll(); - emit bytesWritten( 0 ); - qDebug( "%p: Writer::run: buffer is empty, going to sleep", this ); - bufferNotEmptyCondition.wait( &mutex ); - qDebug( "%p: Writer::run: woke up", this ); - } - - if ( cancel ) { - qDebug( "%p: Writer::run: detected cancel", this ); - goto leave; - } - - assert( numBytesInBuffer > 0 ); - - qDebug( "%p: Writer::run: Trying to write %u bytes", this, numBytesInBuffer ); - qint64 totalWritten = 0; - do { - mutex.unlock(); -#ifdef Q_OS_WIN32 - DWORD numWritten; - qDebug( "%p (fd=%d): Writer::run: buffer before WriteFile (numBytes=%lld): %s:", this, fd, numBytesInBuffer, buffer ); - qDebug( "%p (fd=%d): Writer::run: Going into WriteFile", this, fd ); - if ( !WriteFile( handle, buffer + totalWritten, numBytesInBuffer - totalWritten, &numWritten, 0 ) ) { - mutex.lock(); - errorCode = static_cast<int>( GetLastError() ); - qDebug( "%p: Writer::run: got error code: %d", this, errorCode ); - error = true; - goto leave; - } -#else - qint64 numWritten; - do { - numWritten = ::write( fd, buffer + totalWritten, numBytesInBuffer - totalWritten ); - } while ( numWritten == -1 && errno == EINTR ); - - if ( numWritten < 0 ) { - mutex.lock(); - errorCode = errno; - qDebug( "%p: Writer::run: got error code: %d", this, errorCode ); - error = true; - goto leave; - } -#endif - qDebug( "%p (fd=%d): Writer::run: buffer after WriteFile (numBytes=%u): %s:", this, fd, numBytesInBuffer, buffer ); - totalWritten += numWritten; - mutex.lock(); - } while ( totalWritten < numBytesInBuffer ); - - qDebug( "%p: Writer::run: wrote %lld bytes", this, totalWritten ); - - numBytesInBuffer = 0; - - qDebug( "%p: Writer::run: buffer is empty, wake bufferEmptyCond listeners", this ); - bufferEmptyCondition.wakeAll(); - emit bytesWritten( totalWritten ); - } - leave: - qDebug( "%p: Writer::run: terminating", this ); - numBytesInBuffer = 0; - qDebug( "%p: Writer::run: buffer is empty, wake bufferEmptyCond listeners", this ); - bufferEmptyCondition.wakeAll(); - emit bytesWritten( 0 ); -} - -// static -std::pair<KDPipeIODevice*,KDPipeIODevice*> KDPipeIODevice::makePairOfConnectedPipes() { - KDPipeIODevice * read = 0; - KDPipeIODevice * write = 0; -#ifdef Q_OS_WIN32 - HANDLE rh; - HANDLE wh; - SECURITY_ATTRIBUTES sa; - memset( &sa, 0, sizeof(sa) ); - sa.nLength = sizeof(sa); - sa.bInheritHandle = TRUE; - if ( CreatePipe( &rh, &wh, &sa, BUFFER_SIZE ) ) { - read = new KDPipeIODevice; - read->open( rh, ReadOnly ); - write = new KDPipeIODevice; - write->open( wh, WriteOnly ); - } -#else - int fds[2]; - if ( pipe( fds ) == 0 ) { - read = new KDPipeIODevice; - read->open( fds[0], ReadOnly ); - write = new KDPipeIODevice; - write->open( fds[1], WriteOnly ); - } -#endif - return std::make_pair( read, write ); -} - -#ifdef KDAB_DEFINE_CHECKS -KDAB_DEFINE_CHECKS( KDPipeIODevice ) { - if ( !isOpen() ) { - assert( openMode() == NotOpen ); - assert( !d->reader ); - assert( !d->writer ); -#ifdef Q_OS_WIN32 - assert( !d->handle ); -#else - assert( d->fd < 0 ); -#endif - } else { - assert( openMode() != NotOpen ); - assert( openMode() & ReadWrite ); - if ( openMode() & ReadOnly ) { - assert( d->reader ); - synchronized( d->reader ) - assert( d->reader->eof || d->reader->error || d->reader->isRunning() ); - } - if ( openMode() & WriteOnly ) { - assert( d->writer ); - synchronized( d->writer ) - assert( d->writer->error || d->writer->isRunning() ); - } -#ifdef Q_OS_WIN32 - assert( d->handle ); -#else - assert( d->fd >= 0 ); -#endif - } -} -#endif // KDAB_DEFINE_CHECKS - -#include "moc_kdpipeiodevice.cpp" -#include "kdpipeiodevice.moc" |