diff options
Diffstat (limited to 'gpgme/kdpipeiodevice.cpp')
| -rw-r--r-- | gpgme/kdpipeiodevice.cpp | 203 |
1 files changed, 127 insertions, 76 deletions
diff --git a/gpgme/kdpipeiodevice.cpp b/gpgme/kdpipeiodevice.cpp index 6f48292a..c0515e36 100644 --- a/gpgme/kdpipeiodevice.cpp +++ b/gpgme/kdpipeiodevice.cpp @@ -51,7 +51,7 @@ const unsigned int BUFFER_SIZE = 4096; const bool ALLOW_QIODEVICE_BUFFERING = true; // comment to get trace output: -#define qDebug if(1){}else qDebug +//#define qDebug if(1){}else qDebug namespace { class Reader : public QThread { @@ -81,7 +81,7 @@ public: return true; return false; } - + Q_SIGNALS: void readyRead(); @@ -188,7 +188,8 @@ Writer::Writer( int fd_, Qt::HANDLE handle_ ) Writer::~Writer() {} -class KDPipeIODevice::Private { +class KDPipeIODevice::Private : public QObject { +Q_OBJECT friend class ::KDPipeIODevice; KDPipeIODevice * const q; public: @@ -196,6 +197,11 @@ public: ~Private(); bool doOpen( int, Qt::HANDLE, OpenMode ); + bool startReaderThread(); + bool startWriterThread(); + void stopThreads(); + bool triedToStartReader; + bool triedToStartWriter; private: int fd; @@ -205,19 +211,18 @@ private: }; KDPipeIODevice::Private::Private( KDPipeIODevice * qq ) - : q( qq ), + : QObject( qq ), q( qq ), fd( -1 ), handle( 0 ), reader( 0 ), - writer( 0 ) + writer( 0 ), + triedToStartReader( false ), triedToStartWriter( false ) { } - KDPipeIODevice::Private::~Private() {} - KDPipeIODevice::KDPipeIODevice( QObject * p ) : QIODevice( p ), d( new Private( this ) ) { @@ -267,6 +272,36 @@ bool KDPipeIODevice::open( Qt::HANDLE h, OpenMode mode ) { KDAB_CHECK_THIS; } +bool KDPipeIODevice::Private::startReaderThread() +{ + if ( triedToStartReader ) + return true; + triedToStartReader = true; + if ( reader && !reader->isRunning() && !reader->isFinished() ) { + LOCKED( reader ); + + reader->start( QThread::HighestPriority ); + if ( !reader->hasStarted.wait( &reader->mutex, 1000 ) ) + return false; + } + return true; +} + +bool KDPipeIODevice::Private::startWriterThread() +{ + if ( triedToStartWriter ) + return true; + triedToStartWriter = true; + if ( writer && !writer->isRunning() && !writer->isFinished() ) { + LOCKED( writer ); + + writer->start( QThread::HighestPriority ); + if ( !writer->hasStarted.wait( &writer->mutex, 1000 ) ) + return false; + } + return true; +} + bool KDPipeIODevice::Private::doOpen( int fd_, Qt::HANDLE handle_, OpenMode mode_ ) { if ( q->isOpen() || fd_ < 0 ) @@ -288,19 +323,15 @@ bool KDPipeIODevice::Private::doOpen( int fd_, Qt::HANDLE handle_, OpenMode mode if ( mode_ & ReadOnly ) { reader_.reset( new Reader( fd_, handle_ ) ); - LOCKED( reader_ ); - reader_->start( QThread::HighestPriority ); - if ( !reader_->hasStarted.wait( &reader_->mutex, 1000 ) ) - return false; - connect( reader_.get(), SIGNAL(readyRead()), q, SIGNAL(readyRead()), Qt::QueuedConnection ); + qDebug( "KDPipeIODevice::doOpen: created reader for fd %d", fd_ ); + connect( reader_.get(), SIGNAL(readyRead()), q, +SIGNAL(readyRead()), Qt::QueuedConnection ); } if ( mode_ & WriteOnly ) { writer_.reset( new Writer( fd_, handle_ ) ); - LOCKED( writer_ ); - writer_->start( QThread::HighestPriority ); - if ( !writer_->hasStarted.wait( &writer_->mutex, 1000 ) ) - return false; - connect( writer_.get(), SIGNAL(bytesWritten(qint64)), q, SIGNAL(bytesWritten(qint64)), Qt::QueuedConnection ); + qDebug( "KDPipeIODevice::doOpen: created writer for fd %d", fd_ ); + connect( writer_.get(), SIGNAL(bytesWritten(qint64)), q, SIGNAL(bytesWritten(qint64)), +Qt::QueuedConnection ); } // commit to *this: @@ -310,7 +341,6 @@ bool KDPipeIODevice::Private::doOpen( int fd_, Qt::HANDLE handle_, OpenMode mode writer = writer_.release(); q->setOpenMode( mode_|Unbuffered ); - return true; } @@ -318,18 +348,24 @@ int KDPipeIODevice::descriptor() const { KDAB_CHECK_THIS; return d->fd; } + Qt::HANDLE KDPipeIODevice::handle() const { KDAB_CHECK_THIS; return d->handle; } qint64 KDPipeIODevice::bytesAvailable() const { KDAB_CHECK_THIS; const qint64 base = QIODevice::bytesAvailable(); + if ( !d->triedToStartReader ) { + d->startReaderThread(); + return base; + } if ( d->reader ) synchronized( d->reader ) return base + d->reader->bytesInBuffer(); return base; } qint64 KDPipeIODevice::bytesToWrite() const { KDAB_CHECK_THIS; + d->startWriterThread(); const qint64 base = QIODevice::bytesToWrite(); if ( d->writer ) synchronized( d->writer ) return base + d->writer->bytesInBuffer(); @@ -337,7 +373,8 @@ qint64 KDPipeIODevice::bytesToWrite() const { KDAB_CHECK_THIS; } bool KDPipeIODevice::canReadLine() const { KDAB_CHECK_THIS; - if ( QIODevice::canReadLine() ) + d->startReaderThread(); + if ( QIODevice::canReadLine() ) return true; if ( d->reader ) synchronized( d->reader ) return d->reader->bufferContains( '\n' ); @@ -349,8 +386,9 @@ bool KDPipeIODevice::isSequential() const { } bool KDPipeIODevice::atEnd() const { KDAB_CHECK_THIS; + d->startReaderThread(); if ( !QIODevice::atEnd() ) { - qDebug( "KDPipeIODevice::atEnd returns false since QIODevice::atEnd does (with bytesAvailable=%ld)", static_cast<long>(bytesAvailable()) ); + qDebug( "%p: KDPipeIODevice::atEnd returns false since QIODevice::atEnd does (with bytesAvailable=%ld)", this, static_cast<long>(bytesAvailable()) ); return false; } if ( !isOpen() ) @@ -361,14 +399,15 @@ bool KDPipeIODevice::atEnd() const { KDAB_CHECK_THIS; const bool eof = ( d->reader->error || d->reader->eof ) && d->reader->bufferEmpty(); if ( !eof ) { if ( !d->reader->error && !d->reader->eof ) - qDebug( "KDPipeIODevice::atEnd returns false since !reader->error && !reader->eof" ); + qDebug( "%p: KDPipeIODevice::atEnd returns false since !reader->error && !reader->eof", this ); if ( !d->reader->bufferEmpty() ) - qDebug( "KDPipeIODevice::atEnd returns false since !reader->bufferEmpty()" ); + qDebug( "%p: KDPipeIODevice::atEnd returns false since !reader->bufferEmpty()", this ); } return eof; } bool KDPipeIODevice::waitForBytesWritten( int msecs ) { KDAB_CHECK_THIS; + d->startWriterThread(); Writer * const w = d->writer; if ( !w ) return true; @@ -377,6 +416,7 @@ bool KDPipeIODevice::waitForBytesWritten( int msecs ) { KDAB_CHECK_THIS; } bool KDPipeIODevice::waitForReadyRead( int msecs ) { KDAB_CHECK_THIS; + d->startReaderThread(); if ( ALLOW_QIODEVICE_BUFFERING ) { if ( bytesAvailable() > 0 ) return true; @@ -389,18 +429,22 @@ bool KDPipeIODevice::waitForReadyRead( int msecs ) { KDAB_CHECK_THIS; } qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS; + qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld", this, data, maxSize ); - qDebug( "KDPipeIODevice::readData: data=%p, maxSize=%lld", data, maxSize ); + if ( maxSize == 0 ) + return 0; + d->startReaderThread(); Reader * const r = d->reader; assert( r ); + //assert( r->isRunning() ); // wrong (might be eof, error) assert( data || maxSize == 0 ); assert( maxSize >= 0 ); if ( r->eofShortCut ) { - qDebug( "KDPipeIODevice::readData: hit eofShortCut, returning 0" ); + qDebug( "%p: KDPipeIODevice::readData: hit eofShortCut, returning 0", this ); return 0; } @@ -414,31 +458,31 @@ qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS LOCKED( r ); if ( /* maxSize > 0 && */ r->bufferEmpty() && !r->error && !r->eof ) { // ### block on maxSize == 0? - qDebug( "KDPipeIODevice::readData: waiting for bufferNotEmptyCondition" ); + qDebug( "%p: KDPipeIODevice::readData: waiting for bufferNotEmptyCondition", this ); r->bufferNotEmptyCondition.wait( &r->mutex ); } if ( r->bufferEmpty() ) { - qDebug( "KDPipeIODevice::readData: got empty buffer, signal eof" ); + qDebug( "%p: KDPipeIODevice::readData: got empty buffer, signal eof", this ); // woken with an empty buffer must mean either EOF or error: assert( r->eof || r->error ); r->eofShortCut = true; return r->eof ? 0 : -1 ; } - qDebug( "KDPipeIODevice::readData: got bufferNotEmptyCondition, trying to read %lld bytes", maxSize ); + qDebug( "%p: KDPipeIODevice::readData: got bufferNotEmptyCondition, trying to read %lld bytes", this, maxSize ); const qint64 bytesRead = r->readData( data, maxSize ); - qDebug( "KDPipeIODevice::readData: read %lld bytes", bytesRead ); + qDebug( "%p: KDPipeIODevice::readData: read %lld bytes", this, bytesRead ); + qDebug( "%p (fd=%d): KDPipeIODevice::readData: %s", this, d->fd, data ); return bytesRead; } qint64 Reader::readData( char * data, qint64 maxSize ) { - qint64 numRead = rptr < wptr ? wptr - rptr : sizeof buffer - rptr ; if ( numRead > maxSize ) numRead = maxSize; - qDebug( "KDPipeIODevice::readData: data=%p, maxSize=%lld; rptr=%u, wptr=%u (bytesInBuffer=%u); -> numRead=%lld", + qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld; rptr=%u, wptr=%u (bytesInBuffer=%u); -> numRead=%lld", this, data, maxSize, rptr, wptr, bytesInBuffer(), numRead ); std::memcpy( data, buffer + rptr, numRead ); @@ -446,7 +490,7 @@ qint64 Reader::readData( char * data, qint64 maxSize ) { rptr = ( rptr + numRead ) % sizeof buffer ; if ( !bufferFull() ) { - qDebug( "KDPipeIODevice::readData: signal bufferNotFullCondition" ); + qDebug( "%p: KDPipeIODevice::readData: signal bufferNotFullCondition", this ); bufferNotFullCondition.wakeAll(); } @@ -454,7 +498,7 @@ qint64 Reader::readData( char * data, qint64 maxSize ) { } qint64 KDPipeIODevice::writeData( const char * data, qint64 size ) { KDAB_CHECK_THIS; - + d->startWriterThread(); Writer * const w = d->writer; assert( w ); @@ -476,7 +520,6 @@ qint64 KDPipeIODevice::writeData( const char * data, qint64 size ) { KDAB_CHECK_ } qint64 Writer::writeData( const char * data, qint64 size ) { - assert( bufferEmpty() ); if ( size > static_cast<qint64>( sizeof buffer ) ) @@ -492,39 +535,46 @@ qint64 Writer::writeData( const char * data, qint64 size ) { return size; } -void KDPipeIODevice::close() { KDAB_CHECK_THIS; - - if ( !isOpen() ) - return; - - // tell clients we're about to close: - emit aboutToClose(); - - if ( d->writer && bytesToWrite() > 0 ) - waitForBytesWritten( -1 ); - - assert( bytesToWrite() == 0 ); +void KDPipeIODevice::Private::stopThreads() +{ + if ( triedToStartWriter ) + { + if ( writer && q->bytesToWrite() > 0 ) + q->waitForBytesWritten( -1 ); - if ( Reader * & r = d->reader ) { + assert( q->bytesToWrite() == 0 ); + } + if ( Reader * & r = reader ) { synchronized( r ) { // tell thread to cancel: r->cancel = true; // and wake it, so it can terminate: r->bufferNotFullCondition.wakeAll(); } - r->wait(); - delete r; r = 0; } - if ( Writer * & w = d->writer ) { + if ( Writer * & w = writer ) { synchronized( w ) { // tell thread to cancel: w->cancel = true; // and wake it, so it can terminate: w->bufferNotEmptyCondition.wakeAll(); } - w->wait(); - delete w; w = 0; } +} + +void KDPipeIODevice::close() { KDAB_CHECK_THIS; + + if ( !isOpen() ) + return; + + // tell clients we're about to close: + emit aboutToClose(); + d->stopThreads(); + +#define waitAndDelete( t ) if ( t ) { t->wait(); delete t; t = 0; } + waitAndDelete( d->writer ); + waitAndDelete( d->reader ); +#undef waitAndDelete #ifdef Q_OS_WIN32 CloseHandle( d->handle ); @@ -544,19 +594,19 @@ void Reader::run() { // too bad QThread doesn't have that itself; a signal isn't enough hasStarted.wakeAll(); - qDebug( "Reader::run: started" ); + qDebug( "%p: Reader::run: started", this ); while ( true ) { while ( !cancel && bufferFull() ) { bufferNotEmptyCondition.wakeAll(); - qDebug( "Reader::run: buffer is full, going to sleep" ); + qDebug( "%p: Reader::run: buffer is full, going to sleep", this ); bufferNotFullCondition.wait( &mutex ); - qDebug( "Reader::run: woke up" ); + qDebug( "%p: Reader::run: woke up", this ); } if ( cancel ) { - qDebug( "Reader::run: detected cancel" ); + qDebug( "%p: Reader::run: detected cancel", this ); goto leave; } @@ -567,11 +617,11 @@ void Reader::run() { if ( numBytes > sizeof buffer - wptr ) numBytes = sizeof buffer - wptr; - qDebug( "Reader::run: rptr=%d, wptr=%d -> numBytes=%d", rptr, wptr, numBytes ); + qDebug( "%p: Reader::run: rptr=%d, wptr=%d -> numBytes=%d", this, rptr, wptr, numBytes ); assert( numBytes > 0 ); - qDebug( "Reader::run: trying to read %d bytes", numBytes ); + qDebug( "%p: Reader::run: trying to read %d bytes", this, numBytes ); #ifdef Q_OS_WIN32 DWORD numRead; mutex.unlock(); @@ -580,10 +630,10 @@ void Reader::run() { if ( !ok ) { errorCode = static_cast<int>( GetLastError() ); if ( errorCode == ERROR_BROKEN_PIPE ) { - qDebug( "Reader::run: got eof" ); + qDebug( "%p: Reader::run: got eof (broken pipe)", this ); eof = true; } else { - qDebug( "Reader::run: got error: %d", errorCode ); + qDebug( "%p: Reader::run: got error: %s (%d)", this, strerror( errorCode ), errorCode ); error = true; } goto leave; @@ -599,32 +649,33 @@ void Reader::run() { if ( numRead < 0 ) { errorCode = errno; error = true; - qDebug( "Reader::run: got error: %d", errorCode ); + qDebug( "%p: Reader::run: got error: %d", this, errorCode ); goto leave; } #endif - qDebug( "Reader::run: read %ld bytes", static_cast<long>(numRead) ); + qDebug( "%p: Reader::run: read %ld bytes", this, static_cast<long>(numRead) ); + qDebug( "%p (fd=%d): KDPipeIODevice::readData: %s", this, fd, buffer ); if ( numRead == 0 ) { - qDebug( "Reader::run: eof detected" ); + qDebug( "%p: Reader::run: eof detected", this ); eof = true; goto leave; } if ( cancel ) { - qDebug( "Reader::run: detected cancel" ); + qDebug( "%p: Reader::run: detected cancel", this ); goto leave; } - qDebug( "Reader::run: buffer before: rptr=%4d, wptr=%4d", rptr, wptr ); + qDebug( "%p: Reader::run: buffer before: rptr=%4d, wptr=%4d", this, rptr, wptr ); wptr = ( wptr + numRead ) % sizeof buffer; - qDebug( "Reader::run: buffer after: rptr=%4d, wptr=%4d", rptr, wptr ); + qDebug( "%p: Reader::run: buffer after: rptr=%4d, wptr=%4d", this, rptr, wptr ); if ( !bufferEmpty() ) { - qDebug( "Reader::run: buffer no longer empty, waking everyone" ); + qDebug( "%p: Reader::run: buffer no longer empty, waking everyone", this ); bufferNotEmptyCondition.wakeAll(); emit readyRead(); } } leave: - qDebug( "Reader::run: terminating" ); + qDebug( "%p: Reader::run: terminating", this ); bufferNotEmptyCondition.wakeAll(); emit readyRead(); } @@ -636,25 +687,25 @@ void Writer::run() { // too bad QThread doesn't have that itself; a signal isn't enough hasStarted.wakeAll(); - qDebug( "Writer::run: started" ); + qDebug( "%p: Writer::run: started", this ); while ( true ) { while ( !cancel && bufferEmpty() ) { bufferEmptyCondition.wakeAll(); - qDebug( "Writer::run: buffer is empty, going to sleep" ); + qDebug( "%p: Writer::run: buffer is empty, going to sleep", this ); bufferNotEmptyCondition.wait( &mutex ); - qDebug( "Writer::run: woke up" ); + qDebug( "%p: Writer::run: woke up", this ); } if ( cancel ) { - qDebug( "Writer::run: detected cancel" ); + qDebug( "%p: Writer::run: detected cancel", this ); goto leave; } assert( numBytesInBuffer > 0 ); - qDebug( "Writer::run: Trying to write %u bytes", numBytesInBuffer ); + qDebug( "%p: Writer::run: Trying to write %u bytes", this, numBytesInBuffer ); qint64 totalWritten = 0; do { mutex.unlock(); @@ -663,7 +714,7 @@ void Writer::run() { if ( !WriteFile( handle, buffer + totalWritten, numBytesInBuffer - totalWritten, &numWritten, 0 ) ) { mutex.lock(); errorCode = static_cast<int>( GetLastError() ); - qDebug( "Writer::run: got error code: %d", errorCode ); + qDebug( "%p: Writer::run: got error code: %d", this, errorCode ); error = true; goto leave; } @@ -676,7 +727,7 @@ void Writer::run() { if ( numWritten < 0 ) { mutex.lock(); errorCode = errno; - qDebug( "Writer::run: got error code: %d", errorCode ); + qDebug( "%p: Writer::run: got error code: %d", this, errorCode ); error = true; goto leave; } @@ -685,14 +736,14 @@ void Writer::run() { mutex.lock(); } while ( totalWritten < numBytesInBuffer ); - qDebug( "Writer::run: wrote %lld bytes", totalWritten ); + qDebug( "%p: Writer::run: wrote %lld bytes", this, totalWritten ); numBytesInBuffer = 0; bufferEmptyCondition.wakeAll(); emit bytesWritten( totalWritten ); } leave: - qDebug( "Writer::run: terminating" ); + qDebug( "%p: Writer::run: terminating", this ); numBytesInBuffer = 0; bufferEmptyCondition.wakeAll(); emit bytesWritten( 0 ); |
