aboutsummaryrefslogtreecommitdiffstats
path: root/gpgme/kdpipeiodevice.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'gpgme/kdpipeiodevice.cpp')
-rw-r--r--gpgme/kdpipeiodevice.cpp241
1 files changed, 132 insertions, 109 deletions
diff --git a/gpgme/kdpipeiodevice.cpp b/gpgme/kdpipeiodevice.cpp
index 0cd4ef8e..1fd0ef02 100644
--- a/gpgme/kdpipeiodevice.cpp
+++ b/gpgme/kdpipeiodevice.cpp
@@ -99,14 +99,14 @@ public:
QWaitCondition bufferNotEmptyCondition;
QWaitCondition hasStarted;
QWaitCondition readyReadSentCondition;
- QWaitCondition notInReadDataCondition;
+ QWaitCondition blockedConsumerIsDoneCondition;
bool cancel;
bool eof;
bool error;
bool eofShortCut;
int errorCode;
- bool inReadData;
-
+ bool consumerBlocksOnUs;
+
private:
unsigned int rptr, wptr;
char buffer[BUFFER_SIZE+1]; // need to keep one byte free to detect empty state
@@ -127,7 +127,7 @@ Reader::Reader( int fd_, Qt::HANDLE handle_ )
eofShortCut( false ),
errorCode( 0 ),
rptr( 0 ), wptr( 0 ),
- inReadData( false )
+ consumerBlocksOnUs( false )
{
}
@@ -226,13 +226,13 @@ KDPipeIODevice::Private::Private( KDPipeIODevice * qq )
handle( 0 ),
reader( 0 ),
writer( 0 ),
- triedToStartReader( false ), triedToStartWriter( false )
+ triedToStartReader( false ), triedToStartWriter( false )
{
}
KDPipeIODevice::Private::~Private() {
- qDebug( "KDPipeIODevice::~Private(): Destroying %p", this );
+ qDebug( "KDPipeIODevice::~Private(): Destroying %p", q );
}
KDPipeIODevice::KDPipeIODevice( QObject * p )
@@ -290,11 +290,15 @@ bool KDPipeIODevice::Private::startReaderThread()
return true;
triedToStartReader = true;
if ( reader && !reader->isRunning() && !reader->isFinished() ) {
+ qDebug("KDPipeIODevice::Private::startReaderThread(): locking reader (CONSUMER THREAD)" );
LOCKED( reader );
-
+ qDebug("KDPipeIODevice::Private::startReaderThread(): locked reader (CONSUMER THREAD)" );
reader->start( QThread::HighestPriority );
- if ( !reader->hasStarted.wait( &reader->mutex, 1000 ) )
- return false;
+ qDebug("KDPipeIODevice::Private::startReaderThread(): waiting for hasStarted (CONSUMER THREAD)" );
+ const bool hasStarted = reader->hasStarted.wait( &reader->mutex, 1000 );
+ qDebug("KDPipeIODevice::Private::startReaderThread(): returned from hasStarted (CONSUMER THREAD)" );
+
+ return hasStarted;
}
return true;
}
@@ -320,11 +324,16 @@ void KDPipeIODevice::Private::emitReadyRead()
const int counter = s_counter++;
QPointer<Private> thisPointer( this );
qDebug( "KDPipeIODevice::Private::emitReadyRead %p, %d", this, counter );
+
emit q->readyRead();
+
if ( !thisPointer )
return;
- LOCKED( reader );
- reader->readyReadSentCondition.wakeAll();
+ qDebug( "KDPipeIODevice::Private::emitReadyRead %p, %d: locking reader (CONSUMER THREAD)", this, counter );
+ synchronized( reader ) {
+ qDebug( "KDPipeIODevice::Private::emitReadyRead %p, %d: locked reader (CONSUMER THREAD)", this, counter );
+ reader->readyReadSentCondition.wakeAll();
+ }
qDebug( "KDPipeIODevice::Private::emitReadyRead %p leaving %d", this, counter );
}
@@ -342,8 +351,6 @@ bool KDPipeIODevice::Private::doOpen( int fd_, Qt::HANDLE handle_, OpenMode mode
if ( !(mode_ & ReadWrite) )
return false; // need to have at least read -or- write
- fd = fd_;
- handle = handle_;
std::auto_ptr<Reader> reader_;
std::auto_ptr<Writer> writer_;
@@ -387,7 +394,10 @@ qint64 KDPipeIODevice::bytesAvailable() const { KDAB_CHECK_THIS;
return base;
}
if ( d->reader )
- synchronized( d->reader ) return base + d->reader->bytesInBuffer();
+ synchronized( d->reader ) {
+ const qint64 inBuffer = d->reader->bytesInBuffer();
+ return base + inBuffer;
+ }
return base;
}
@@ -445,6 +455,7 @@ bool KDPipeIODevice::waitForBytesWritten( int msecs ) { KDAB_CHECK_THIS;
}
bool KDPipeIODevice::waitForReadyRead( int msecs ) { KDAB_CHECK_THIS;
+ qDebug( "KDPipeIODEvice::waitForReadyRead()(%p)", this);
d->startReaderThread();
if ( ALLOW_QIODEVICE_BUFFERING ) {
if ( bytesAvailable() > 0 )
@@ -454,7 +465,10 @@ bool KDPipeIODevice::waitForReadyRead( int msecs ) { KDAB_CHECK_THIS;
if ( !r || r->eofShortCut )
return true;
LOCKED( r );
- return r->bytesInBuffer() != 0 || r->eof || r->error || r->bufferNotEmptyCondition.wait( &r->mutex, msecs ) ;
+ if ( r->bytesInBuffer() != 0 || r->eof || r->error )
+ return true;
+ assert( false );
+ return r->bufferNotEmptyCondition.wait( &r->mutex, msecs ) ;
}
template <typename T>
@@ -467,6 +481,22 @@ private:
const T oldValue;
};
+
+bool KDPipeIODevice::readWouldBlock() const
+{
+ d->startReaderThread();
+ LOCKED( d->reader );
+ return d->reader->bufferEmpty() && !d->reader->eof && !d->reader->error;
+}
+
+bool KDPipeIODevice::writeWouldBlock() const
+{
+ d->startWriterThread();
+ LOCKED( d->writer );
+ return d->writer->bufferFull() && !d->writer->error;
+}
+
+
qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS;
qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld", this, data, maxSize );
d->startReaderThread();
@@ -491,15 +521,17 @@ qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS
if ( bytesAvailable() > 0 )
maxSize = std::min( maxSize, bytesAvailable() ); // don't block
}
-
+ qDebug( "%p: KDPipeIODevice::readData: try to lock reader (CONSUMER THREAD)" );
LOCKED( r );
- const TemporaryValue<bool> tmp( d->reader->inReadData, true );
- assert( d->reader->inReadData );
- while ( /* maxSize > 0 && */ r->bufferEmpty() && !r->error && !r->eof ) { // ### block on maxSize == 0?
- qDebug( "%p: KDPipeIODevice::readData: waiting for bufferNotEmptyCondition", this );
- r->readyReadSentCondition.wakeAll();
- r->notInReadDataCondition.wakeAll();
+ qDebug( "%p: KDPipeIODevice::readData: locked reader (CONSUMER THREAD)" );
+
+ r->readyReadSentCondition.wakeAll();
+ if ( /* maxSize > 0 && */ r->bufferEmpty() && !r->error && !r->eof ) { // ### block on maxSize == 0?
+ qDebug( "%p: KDPipeIODevice::readData: waiting for bufferNotEmptyCondition (CONSUMER THREAD)", this );
+ const TemporaryValue<bool> tmp( d->reader->consumerBlocksOnUs, true );
r->bufferNotEmptyCondition.wait( &r->mutex );
+ r->blockedConsumerIsDoneCondition.wakeAll();
+ qDebug( "%p: KDPipeIODevice::readData: woke up from bufferNotEmptyCondition (CONSUMER THREAD)", this );
}
if ( r->bufferEmpty() ) {
@@ -507,7 +539,6 @@ qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS
// woken with an empty buffer must mean either EOF or error:
assert( r->eof || r->error );
r->eofShortCut = true;
- r->notInReadDataCondition.wakeAll();
return r->eof ? 0 : -1 ;
}
@@ -515,7 +546,7 @@ qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS
const qint64 bytesRead = r->readData( data, maxSize );
qDebug( "%p: KDPipeIODevice::readData: read %lld bytes", this, bytesRead );
qDebug( "%p (fd=%d): KDPipeIODevice::readData: %s", this, d->fd, data );
- r->notInReadDataCondition.wakeAll();
+
return bytesRead;
}
@@ -590,13 +621,14 @@ void KDPipeIODevice::Private::stopThreads()
assert( q->bytesToWrite() == 0 );
}
if ( Reader * & r = reader ) {
+ disconnect( r, SIGNAL( readyRead() ), this, SLOT( emitReadyRead() ) );
synchronized( r ) {
// tell thread to cancel:
r->cancel = true;
// and wake it, so it can terminate:
r->bufferNotFullCondition.wakeAll();
r->readyReadSentCondition.wakeAll();
- }
+ }
}
if ( Writer * & w = writer ) {
synchronized( w ) {
@@ -617,10 +649,14 @@ void KDPipeIODevice::close() { KDAB_CHECK_THIS;
emit aboutToClose();
d->stopThreads();
-#define waitAndDelete( t ) if ( t ) { t->wait(); QThread* t2 = t; t = 0; delete t2; }
+#define waitAndDelete( t ) if ( t ) { t->wait(); QThread* const t2 = t; t = 0; delete t2; }
qDebug( "KPipeIODevice::close(%p): wait and closing writer %p", this, d->writer );
waitAndDelete( d->writer );
qDebug( "KPipeIODevice::close(%p): wait and closing reader %p", this, d->reader );
+ {
+ LOCKED( d->reader );
+ d->reader->readyReadSentCondition.wakeAll();
+ }
waitAndDelete( d->reader );
#undef waitAndDelete
#ifdef Q_OS_WIN32
@@ -645,115 +681,102 @@ void Reader::run() {
qDebug( "%p: Reader::run: started", this );
while ( true ) {
-
- if ( !bufferFull() && !bufferEmpty() ) {
+ if ( !cancel && ( eof || error ) ) {
+ qDebug( "%p: Reader::run: received eof(%d) or error(%d), waking everyone", this, eof, error );
+ notifyReadyRead();
+ } else if ( !cancel && !bufferFull() && !bufferEmpty() ) {
qDebug( "%p: Reader::run: buffer no longer empty, waking everyone", this );
notifyReadyRead();
}
- while ( !cancel && bufferFull() ) {
- bufferNotEmptyCondition.wakeAll();
+ while ( !error && !cancel && bufferFull() ) {
notifyReadyRead();
- if ( !bufferFull() )
- break;
- qDebug( "%p: Reader::run: buffer is full, going to sleep", this );
- bufferNotFullCondition.wait( &mutex );
- qDebug( "%p: Reader::run: woke up", this );
- }
+ if ( bufferFull() ) {
+ qDebug( "%p: Reader::run: buffer is full, going to sleep", this );
+ bufferNotFullCondition.wait( &mutex );
+ }
+ }
if ( cancel ) {
- qDebug( "%p: Reader::run: detected cancel", this );
+ qDebug( "%p: Reader::run: detected cancel", this );
goto leave;
}
+ if ( !eof && !error ) {
+ if ( rptr == wptr ) // optimize for larger chunks in case the buffer is empty
+ rptr = wptr = 0;
- if ( rptr == wptr ) // optimize for larger chunks in case the buffer is empty
- rptr = wptr = 0;
+ unsigned int numBytes = ( rptr + sizeof buffer - wptr - 1 ) % sizeof buffer;
+ if ( numBytes > sizeof buffer - wptr )
+ numBytes = sizeof buffer - wptr;
- unsigned int numBytes = ( rptr + sizeof buffer - wptr - 1 ) % sizeof buffer;
- if ( numBytes > sizeof buffer - wptr )
- numBytes = sizeof buffer - wptr;
+ qDebug( "%p: Reader::run: rptr=%d, wptr=%d -> numBytes=%d", this, rptr, wptr, numBytes );
- qDebug( "%p: Reader::run: rptr=%d, wptr=%d -> numBytes=%d", this, rptr, wptr, numBytes );
+ assert( numBytes > 0 );
- assert( numBytes > 0 );
-
- qDebug( "%p: Reader::run: trying to read %d bytes", this, numBytes );
+ qDebug( "%p: Reader::run: trying to read %d bytes", this, numBytes );
#ifdef Q_OS_WIN32
- DWORD numRead;
- mutex.unlock();
- const bool ok = ReadFile( handle, buffer + wptr, numBytes, &numRead, 0 );
- mutex.lock();
- if ( !ok ) {
- errorCode = static_cast<int>( GetLastError() );
- if ( errorCode == ERROR_BROKEN_PIPE ) {
- assert( numRead == 0 );
- qDebug( "%p: Reader::run: got eof (broken pipe)", this );
- eof = true;
- } else {
- assert( numRead == 0 );
- qDebug( "%p: Reader::run: got error: %s (%d)", this, strerror( errorCode ), errorCode );
- error = true;
+ mutex.unlock();
+ DWORD numRead;
+ const bool ok = ReadFile( handle, buffer + wptr, numBytes, &numRead, 0 );
+ mutex.lock();
+ if ( !ok ) {
+ errorCode = static_cast<int>( GetLastError() );
+ if ( errorCode == ERROR_BROKEN_PIPE ) {
+ assert( numRead == 0 );
+ qDebug( "%p: Reader::run: got eof (broken pipe)", this );
+ eof = true;
+ } else {
+ assert( numRead == 0 );
+ qDebug( "%p: Reader::run: got error: %s (%d)", this, strerror( errorCode ), errorCode );
+ error = true;
+ }
}
- goto leave;
- }
#else
- qint64 numRead;
- mutex.unlock();
- do {
- numRead = ::read( fd, buffer + wptr, numBytes );
- } while ( numRead == -1 && errno == EINTR );
- mutex.lock();
+ qint64 numRead;
+ mutex.unlock();
+ do {
+ numRead = ::read( fd, buffer + wptr, numBytes );
+ } while ( numRead == -1 && errno == EINTR );
+ mutex.lock();
- if ( numRead < 0 ) {
- errorCode = errno;
- error = true;
- qDebug( "%p: Reader::run: got error: %d", this, errorCode );
- goto leave;
- }
+ if ( numRead < 0 ) {
+ errorCode = errno;
+ error = true;
+ qDebug( "%p: Reader::run: got error: %d", this, errorCode );
+ } else if ( numRead == 0 ) {
+ qDebug( "%p: Reader::run: eof detected", this );
+ eof = true;
+ }
#endif
- qDebug( "%p: Reader::run: read %ld bytes", this, static_cast<long>(numRead) );
- qDebug( "%p (fd=%d): KDPipeIODevice::readData: %s", this, fd, buffer );
- if ( numRead == 0 ) {
- qDebug( "%p: Reader::run: eof detected", this );
- eof = true;
- goto leave;
- }
+ qDebug( "%p: Reader::run: read %ld bytes", this, static_cast<long>(numRead) );
+ qDebug( "%p (fd=%d): KDPipeIODevice::readData: %s", this, fd, buffer );
- if ( cancel ) {
- qDebug( "%p: Reader::run: detected cancel", this );
- goto leave;
- }
- qDebug( "%p: Reader::run: buffer before: rptr=%4d, wptr=%4d", this, rptr, wptr );
- wptr = ( wptr + numRead ) % sizeof buffer;
- qDebug( "%p: Reader::run: buffer after: rptr=%4d, wptr=%4d", this, rptr, wptr );
+ if ( numRead > 0 ) {
+ qDebug( "%p: Reader::run: buffer before: rptr=%4d, wptr=%4d", this, rptr, wptr );
+ wptr = ( wptr + numRead ) % sizeof buffer;
+ qDebug( "%p: Reader::run: buffer after: rptr=%4d, wptr=%4d", this, rptr, wptr );
+ }
+ }
}
leave:
- qDebug( "%p: Reader::run: terminating: loop while not canceled and not empty", this );
- while ( !cancel && !bufferEmpty() ) {
- notifyReadyRead();
- }
- notifyReadyRead();
qDebug( "%p: Reader::run: terminated", this );
}
void Reader::notifyReadyRead()
{
- qDebug( "notifyReadyRead" );
- if ( cancel )
- return;
- bufferNotEmptyCondition.wakeAll();
- if ( inReadData ) {
- qDebug( "notifyReadyRead: inReadData: waiting" );
- notInReadDataCondition.wait( &mutex );
+ qDebug( "notifyReadyRead: %d bytes available", bytesInBuffer() );
+ assert( !cancel );
+
+ if ( consumerBlocksOnUs ) {
+ bufferNotEmptyCondition.wakeAll();
+ blockedConsumerIsDoneCondition.wait( &mutex );
+ return;
}
- if ( cancel || ( !eof && !error && bufferEmpty() ) )
- return;
- qDebug( "readyReadData: actually emit signal" );
+ qDebug( "notifyReadyRead: emit signal" );
emit readyRead();
- bufferNotEmptyCondition.wakeAll();
readyReadSentCondition.wait( &mutex );
- bufferNotEmptyCondition.wakeAll();
+ qDebug( "notifyReadyRead: returning from waiting, leave" );
}
void Writer::run() {
@@ -796,7 +819,7 @@ void Writer::run() {
errorCode = static_cast<int>( GetLastError() );
qDebug( "%p: Writer::run: got error code: %d", this, errorCode );
error = true;
- goto leave;
+ goto leave;
}
#else
qint64 numWritten;
@@ -805,17 +828,17 @@ void Writer::run() {
} while ( numWritten == -1 && errno == EINTR );
if ( numWritten < 0 ) {
- mutex.lock();
+ mutex.lock();
errorCode = errno;
qDebug( "%p: Writer::run: got error code: %d", this, errorCode );
error = true;
- goto leave;
+ goto leave;
}
#endif
qDebug( "%p (fd=%d): Writer::run: buffer after WriteFile (numBytes=%lld): %s:", this, fd, numBytesInBuffer,
buffer );
totalWritten += numWritten;
- mutex.lock();
+ mutex.lock();
} while ( totalWritten < numBytesInBuffer );
qDebug( "%p: Writer::run: wrote %lld bytes", this, totalWritten );