diff options
Diffstat (limited to 'gpgme/kdpipeiodevice.cpp')
| -rw-r--r-- | gpgme/kdpipeiodevice.cpp | 241 |
1 files changed, 132 insertions, 109 deletions
diff --git a/gpgme/kdpipeiodevice.cpp b/gpgme/kdpipeiodevice.cpp index 0cd4ef8e..1fd0ef02 100644 --- a/gpgme/kdpipeiodevice.cpp +++ b/gpgme/kdpipeiodevice.cpp @@ -99,14 +99,14 @@ public: QWaitCondition bufferNotEmptyCondition; QWaitCondition hasStarted; QWaitCondition readyReadSentCondition; - QWaitCondition notInReadDataCondition; + QWaitCondition blockedConsumerIsDoneCondition; bool cancel; bool eof; bool error; bool eofShortCut; int errorCode; - bool inReadData; - + bool consumerBlocksOnUs; + private: unsigned int rptr, wptr; char buffer[BUFFER_SIZE+1]; // need to keep one byte free to detect empty state @@ -127,7 +127,7 @@ Reader::Reader( int fd_, Qt::HANDLE handle_ ) eofShortCut( false ), errorCode( 0 ), rptr( 0 ), wptr( 0 ), - inReadData( false ) + consumerBlocksOnUs( false ) { } @@ -226,13 +226,13 @@ KDPipeIODevice::Private::Private( KDPipeIODevice * qq ) handle( 0 ), reader( 0 ), writer( 0 ), - triedToStartReader( false ), triedToStartWriter( false ) + triedToStartReader( false ), triedToStartWriter( false ) { } KDPipeIODevice::Private::~Private() { - qDebug( "KDPipeIODevice::~Private(): Destroying %p", this ); + qDebug( "KDPipeIODevice::~Private(): Destroying %p", q ); } KDPipeIODevice::KDPipeIODevice( QObject * p ) @@ -290,11 +290,15 @@ bool KDPipeIODevice::Private::startReaderThread() return true; triedToStartReader = true; if ( reader && !reader->isRunning() && !reader->isFinished() ) { + qDebug("KDPipeIODevice::Private::startReaderThread(): locking reader (CONSUMER THREAD)" ); LOCKED( reader ); - + qDebug("KDPipeIODevice::Private::startReaderThread(): locked reader (CONSUMER THREAD)" ); reader->start( QThread::HighestPriority ); - if ( !reader->hasStarted.wait( &reader->mutex, 1000 ) ) - return false; + qDebug("KDPipeIODevice::Private::startReaderThread(): waiting for hasStarted (CONSUMER THREAD)" ); + const bool hasStarted = reader->hasStarted.wait( &reader->mutex, 1000 ); + qDebug("KDPipeIODevice::Private::startReaderThread(): returned from hasStarted (CONSUMER THREAD)" ); + + return hasStarted; } return true; } @@ -320,11 +324,16 @@ void KDPipeIODevice::Private::emitReadyRead() const int counter = s_counter++; QPointer<Private> thisPointer( this ); qDebug( "KDPipeIODevice::Private::emitReadyRead %p, %d", this, counter ); + emit q->readyRead(); + if ( !thisPointer ) return; - LOCKED( reader ); - reader->readyReadSentCondition.wakeAll(); + qDebug( "KDPipeIODevice::Private::emitReadyRead %p, %d: locking reader (CONSUMER THREAD)", this, counter ); + synchronized( reader ) { + qDebug( "KDPipeIODevice::Private::emitReadyRead %p, %d: locked reader (CONSUMER THREAD)", this, counter ); + reader->readyReadSentCondition.wakeAll(); + } qDebug( "KDPipeIODevice::Private::emitReadyRead %p leaving %d", this, counter ); } @@ -342,8 +351,6 @@ bool KDPipeIODevice::Private::doOpen( int fd_, Qt::HANDLE handle_, OpenMode mode if ( !(mode_ & ReadWrite) ) return false; // need to have at least read -or- write - fd = fd_; - handle = handle_; std::auto_ptr<Reader> reader_; std::auto_ptr<Writer> writer_; @@ -387,7 +394,10 @@ qint64 KDPipeIODevice::bytesAvailable() const { KDAB_CHECK_THIS; return base; } if ( d->reader ) - synchronized( d->reader ) return base + d->reader->bytesInBuffer(); + synchronized( d->reader ) { + const qint64 inBuffer = d->reader->bytesInBuffer(); + return base + inBuffer; + } return base; } @@ -445,6 +455,7 @@ bool KDPipeIODevice::waitForBytesWritten( int msecs ) { KDAB_CHECK_THIS; } bool KDPipeIODevice::waitForReadyRead( int msecs ) { KDAB_CHECK_THIS; + qDebug( "KDPipeIODEvice::waitForReadyRead()(%p)", this); d->startReaderThread(); if ( ALLOW_QIODEVICE_BUFFERING ) { if ( bytesAvailable() > 0 ) @@ -454,7 +465,10 @@ bool KDPipeIODevice::waitForReadyRead( int msecs ) { KDAB_CHECK_THIS; if ( !r || r->eofShortCut ) return true; LOCKED( r ); - return r->bytesInBuffer() != 0 || r->eof || r->error || r->bufferNotEmptyCondition.wait( &r->mutex, msecs ) ; + if ( r->bytesInBuffer() != 0 || r->eof || r->error ) + return true; + assert( false ); + return r->bufferNotEmptyCondition.wait( &r->mutex, msecs ) ; } template <typename T> @@ -467,6 +481,22 @@ private: const T oldValue; }; + +bool KDPipeIODevice::readWouldBlock() const +{ + d->startReaderThread(); + LOCKED( d->reader ); + return d->reader->bufferEmpty() && !d->reader->eof && !d->reader->error; +} + +bool KDPipeIODevice::writeWouldBlock() const +{ + d->startWriterThread(); + LOCKED( d->writer ); + return d->writer->bufferFull() && !d->writer->error; +} + + qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS; qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld", this, data, maxSize ); d->startReaderThread(); @@ -491,15 +521,17 @@ qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS if ( bytesAvailable() > 0 ) maxSize = std::min( maxSize, bytesAvailable() ); // don't block } - + qDebug( "%p: KDPipeIODevice::readData: try to lock reader (CONSUMER THREAD)" ); LOCKED( r ); - const TemporaryValue<bool> tmp( d->reader->inReadData, true ); - assert( d->reader->inReadData ); - while ( /* maxSize > 0 && */ r->bufferEmpty() && !r->error && !r->eof ) { // ### block on maxSize == 0? - qDebug( "%p: KDPipeIODevice::readData: waiting for bufferNotEmptyCondition", this ); - r->readyReadSentCondition.wakeAll(); - r->notInReadDataCondition.wakeAll(); + qDebug( "%p: KDPipeIODevice::readData: locked reader (CONSUMER THREAD)" ); + + r->readyReadSentCondition.wakeAll(); + if ( /* maxSize > 0 && */ r->bufferEmpty() && !r->error && !r->eof ) { // ### block on maxSize == 0? + qDebug( "%p: KDPipeIODevice::readData: waiting for bufferNotEmptyCondition (CONSUMER THREAD)", this ); + const TemporaryValue<bool> tmp( d->reader->consumerBlocksOnUs, true ); r->bufferNotEmptyCondition.wait( &r->mutex ); + r->blockedConsumerIsDoneCondition.wakeAll(); + qDebug( "%p: KDPipeIODevice::readData: woke up from bufferNotEmptyCondition (CONSUMER THREAD)", this ); } if ( r->bufferEmpty() ) { @@ -507,7 +539,6 @@ qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS // woken with an empty buffer must mean either EOF or error: assert( r->eof || r->error ); r->eofShortCut = true; - r->notInReadDataCondition.wakeAll(); return r->eof ? 0 : -1 ; } @@ -515,7 +546,7 @@ qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS const qint64 bytesRead = r->readData( data, maxSize ); qDebug( "%p: KDPipeIODevice::readData: read %lld bytes", this, bytesRead ); qDebug( "%p (fd=%d): KDPipeIODevice::readData: %s", this, d->fd, data ); - r->notInReadDataCondition.wakeAll(); + return bytesRead; } @@ -590,13 +621,14 @@ void KDPipeIODevice::Private::stopThreads() assert( q->bytesToWrite() == 0 ); } if ( Reader * & r = reader ) { + disconnect( r, SIGNAL( readyRead() ), this, SLOT( emitReadyRead() ) ); synchronized( r ) { // tell thread to cancel: r->cancel = true; // and wake it, so it can terminate: r->bufferNotFullCondition.wakeAll(); r->readyReadSentCondition.wakeAll(); - } + } } if ( Writer * & w = writer ) { synchronized( w ) { @@ -617,10 +649,14 @@ void KDPipeIODevice::close() { KDAB_CHECK_THIS; emit aboutToClose(); d->stopThreads(); -#define waitAndDelete( t ) if ( t ) { t->wait(); QThread* t2 = t; t = 0; delete t2; } +#define waitAndDelete( t ) if ( t ) { t->wait(); QThread* const t2 = t; t = 0; delete t2; } qDebug( "KPipeIODevice::close(%p): wait and closing writer %p", this, d->writer ); waitAndDelete( d->writer ); qDebug( "KPipeIODevice::close(%p): wait and closing reader %p", this, d->reader ); + { + LOCKED( d->reader ); + d->reader->readyReadSentCondition.wakeAll(); + } waitAndDelete( d->reader ); #undef waitAndDelete #ifdef Q_OS_WIN32 @@ -645,115 +681,102 @@ void Reader::run() { qDebug( "%p: Reader::run: started", this ); while ( true ) { - - if ( !bufferFull() && !bufferEmpty() ) { + if ( !cancel && ( eof || error ) ) { + qDebug( "%p: Reader::run: received eof(%d) or error(%d), waking everyone", this, eof, error ); + notifyReadyRead(); + } else if ( !cancel && !bufferFull() && !bufferEmpty() ) { qDebug( "%p: Reader::run: buffer no longer empty, waking everyone", this ); notifyReadyRead(); } - while ( !cancel && bufferFull() ) { - bufferNotEmptyCondition.wakeAll(); + while ( !error && !cancel && bufferFull() ) { notifyReadyRead(); - if ( !bufferFull() ) - break; - qDebug( "%p: Reader::run: buffer is full, going to sleep", this ); - bufferNotFullCondition.wait( &mutex ); - qDebug( "%p: Reader::run: woke up", this ); - } + if ( bufferFull() ) { + qDebug( "%p: Reader::run: buffer is full, going to sleep", this ); + bufferNotFullCondition.wait( &mutex ); + } + } if ( cancel ) { - qDebug( "%p: Reader::run: detected cancel", this ); + qDebug( "%p: Reader::run: detected cancel", this ); goto leave; } + if ( !eof && !error ) { + if ( rptr == wptr ) // optimize for larger chunks in case the buffer is empty + rptr = wptr = 0; - if ( rptr == wptr ) // optimize for larger chunks in case the buffer is empty - rptr = wptr = 0; + unsigned int numBytes = ( rptr + sizeof buffer - wptr - 1 ) % sizeof buffer; + if ( numBytes > sizeof buffer - wptr ) + numBytes = sizeof buffer - wptr; - unsigned int numBytes = ( rptr + sizeof buffer - wptr - 1 ) % sizeof buffer; - if ( numBytes > sizeof buffer - wptr ) - numBytes = sizeof buffer - wptr; + qDebug( "%p: Reader::run: rptr=%d, wptr=%d -> numBytes=%d", this, rptr, wptr, numBytes ); - qDebug( "%p: Reader::run: rptr=%d, wptr=%d -> numBytes=%d", this, rptr, wptr, numBytes ); + assert( numBytes > 0 ); - assert( numBytes > 0 ); - - qDebug( "%p: Reader::run: trying to read %d bytes", this, numBytes ); + qDebug( "%p: Reader::run: trying to read %d bytes", this, numBytes ); #ifdef Q_OS_WIN32 - DWORD numRead; - mutex.unlock(); - const bool ok = ReadFile( handle, buffer + wptr, numBytes, &numRead, 0 ); - mutex.lock(); - if ( !ok ) { - errorCode = static_cast<int>( GetLastError() ); - if ( errorCode == ERROR_BROKEN_PIPE ) { - assert( numRead == 0 ); - qDebug( "%p: Reader::run: got eof (broken pipe)", this ); - eof = true; - } else { - assert( numRead == 0 ); - qDebug( "%p: Reader::run: got error: %s (%d)", this, strerror( errorCode ), errorCode ); - error = true; + mutex.unlock(); + DWORD numRead; + const bool ok = ReadFile( handle, buffer + wptr, numBytes, &numRead, 0 ); + mutex.lock(); + if ( !ok ) { + errorCode = static_cast<int>( GetLastError() ); + if ( errorCode == ERROR_BROKEN_PIPE ) { + assert( numRead == 0 ); + qDebug( "%p: Reader::run: got eof (broken pipe)", this ); + eof = true; + } else { + assert( numRead == 0 ); + qDebug( "%p: Reader::run: got error: %s (%d)", this, strerror( errorCode ), errorCode ); + error = true; + } } - goto leave; - } #else - qint64 numRead; - mutex.unlock(); - do { - numRead = ::read( fd, buffer + wptr, numBytes ); - } while ( numRead == -1 && errno == EINTR ); - mutex.lock(); + qint64 numRead; + mutex.unlock(); + do { + numRead = ::read( fd, buffer + wptr, numBytes ); + } while ( numRead == -1 && errno == EINTR ); + mutex.lock(); - if ( numRead < 0 ) { - errorCode = errno; - error = true; - qDebug( "%p: Reader::run: got error: %d", this, errorCode ); - goto leave; - } + if ( numRead < 0 ) { + errorCode = errno; + error = true; + qDebug( "%p: Reader::run: got error: %d", this, errorCode ); + } else if ( numRead == 0 ) { + qDebug( "%p: Reader::run: eof detected", this ); + eof = true; + } #endif - qDebug( "%p: Reader::run: read %ld bytes", this, static_cast<long>(numRead) ); - qDebug( "%p (fd=%d): KDPipeIODevice::readData: %s", this, fd, buffer ); - if ( numRead == 0 ) { - qDebug( "%p: Reader::run: eof detected", this ); - eof = true; - goto leave; - } + qDebug( "%p: Reader::run: read %ld bytes", this, static_cast<long>(numRead) ); + qDebug( "%p (fd=%d): KDPipeIODevice::readData: %s", this, fd, buffer ); - if ( cancel ) { - qDebug( "%p: Reader::run: detected cancel", this ); - goto leave; - } - qDebug( "%p: Reader::run: buffer before: rptr=%4d, wptr=%4d", this, rptr, wptr ); - wptr = ( wptr + numRead ) % sizeof buffer; - qDebug( "%p: Reader::run: buffer after: rptr=%4d, wptr=%4d", this, rptr, wptr ); + if ( numRead > 0 ) { + qDebug( "%p: Reader::run: buffer before: rptr=%4d, wptr=%4d", this, rptr, wptr ); + wptr = ( wptr + numRead ) % sizeof buffer; + qDebug( "%p: Reader::run: buffer after: rptr=%4d, wptr=%4d", this, rptr, wptr ); + } + } } leave: - qDebug( "%p: Reader::run: terminating: loop while not canceled and not empty", this ); - while ( !cancel && !bufferEmpty() ) { - notifyReadyRead(); - } - notifyReadyRead(); qDebug( "%p: Reader::run: terminated", this ); } void Reader::notifyReadyRead() { - qDebug( "notifyReadyRead" ); - if ( cancel ) - return; - bufferNotEmptyCondition.wakeAll(); - if ( inReadData ) { - qDebug( "notifyReadyRead: inReadData: waiting" ); - notInReadDataCondition.wait( &mutex ); + qDebug( "notifyReadyRead: %d bytes available", bytesInBuffer() ); + assert( !cancel ); + + if ( consumerBlocksOnUs ) { + bufferNotEmptyCondition.wakeAll(); + blockedConsumerIsDoneCondition.wait( &mutex ); + return; } - if ( cancel || ( !eof && !error && bufferEmpty() ) ) - return; - qDebug( "readyReadData: actually emit signal" ); + qDebug( "notifyReadyRead: emit signal" ); emit readyRead(); - bufferNotEmptyCondition.wakeAll(); readyReadSentCondition.wait( &mutex ); - bufferNotEmptyCondition.wakeAll(); + qDebug( "notifyReadyRead: returning from waiting, leave" ); } void Writer::run() { @@ -796,7 +819,7 @@ void Writer::run() { errorCode = static_cast<int>( GetLastError() ); qDebug( "%p: Writer::run: got error code: %d", this, errorCode ); error = true; - goto leave; + goto leave; } #else qint64 numWritten; @@ -805,17 +828,17 @@ void Writer::run() { } while ( numWritten == -1 && errno == EINTR ); if ( numWritten < 0 ) { - mutex.lock(); + mutex.lock(); errorCode = errno; qDebug( "%p: Writer::run: got error code: %d", this, errorCode ); error = true; - goto leave; + goto leave; } #endif qDebug( "%p (fd=%d): Writer::run: buffer after WriteFile (numBytes=%lld): %s:", this, fd, numBytesInBuffer, buffer ); totalWritten += numWritten; - mutex.lock(); + mutex.lock(); } while ( totalWritten < numBytesInBuffer ); qDebug( "%p: Writer::run: wrote %lld bytes", this, totalWritten ); |
