diff options
Diffstat (limited to 'src/kdpipeiodevice.cpp')
| -rw-r--r-- | src/kdpipeiodevice.cpp | 951 | 
1 files changed, 951 insertions, 0 deletions
| diff --git a/src/kdpipeiodevice.cpp b/src/kdpipeiodevice.cpp new file mode 100644 index 00000000..5661790a --- /dev/null +++ b/src/kdpipeiodevice.cpp @@ -0,0 +1,951 @@ +/* +  Copyright (C) 2007 Klar�lvdalens Datakonsult AB + +  KDPipeIODevice is free software; you can redistribute it and/or +  modify it under the terms of the GNU Library General Public +  License as published by the Free Software Foundation; either +  version 2 of the License, or (at your option) any later version. + +  KDPipeIODevice is distributed in the hope that it will be useful, +  but WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +  GNU Library General Public License for more details. + +  You should have received a copy of the GNU Library General Public License +  along with KDPipeIODevice; see the file COPYING.LIB.  If not, write to the +  Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, +  Boston, MA 02110-1301, USA. +*/ + +#include "kdpipeiodevice.h" + +#include <QtCore> + +#include <cassert> +#include <memory> +#include <algorithm> + +#ifdef Q_OS_WIN32 +# ifndef NOMINMAX +#  define NOMINMAX +# endif +# include <windows.h> +# include <io.h> +#else +# include <unistd.h> +# include <errno.h> +#endif + +using namespace _gpgme_; + +#ifndef KDAB_CHECK_THIS +# define KDAB_CHECK_CTOR (void)1 +# define KDAB_CHECK_DTOR KDAB_CHECK_CTOR +# define KDAB_CHECK_THIS KDAB_CHECK_CTOR +#endif + +#define LOCKED( d ) const QMutexLocker locker( &d->mutex ) +#define synchronized( d ) if ( int i = 0 ) {} else for ( const QMutexLocker locker( &d->mutex ) ; !i ; ++i ) + +const unsigned int BUFFER_SIZE = 4096; +const bool ALLOW_QIODEVICE_BUFFERING = true; + +// comment to get trace output: +//#define qDebug if(1){}else qDebug + +namespace { +class Reader : public QThread { +    Q_OBJECT +public: +    Reader( int fd, Qt::HANDLE handle ); +    ~Reader(); + +    qint64 readData( char * data, qint64 maxSize ); + +    unsigned int bytesInBuffer() const { +        return ( wptr + sizeof buffer - rptr ) % sizeof buffer ; +    } + +    bool bufferFull() const { +        return bytesInBuffer() == sizeof buffer - 1; +    } + +    bool bufferEmpty() const { +	return bytesInBuffer() == 0; +    } + +    bool bufferContains( char ch ) { +	const unsigned int bib = bytesInBuffer(); +	for ( unsigned int i = rptr ; i < rptr + bib ; ++i ) +	    if ( buffer[i%sizeof buffer] == ch ) +		return true; +	return false; +    } +         +    void notifyReadyRead(); + +Q_SIGNALS: +    void readyRead(); + +protected: +    /* reimp */ void run(); + +private: +    int fd; +    Qt::HANDLE handle; +public: +    QMutex mutex; +    QWaitCondition waitForCancelCondition; +    QWaitCondition bufferNotFullCondition; +    QWaitCondition bufferNotEmptyCondition; +    QWaitCondition hasStarted; +    QWaitCondition readyReadSentCondition; +    QWaitCondition blockedConsumerIsDoneCondition; +    bool cancel; +    bool eof; +    bool error; +    bool eofShortCut; +    int errorCode; +    bool isReading; +    bool consumerBlocksOnUs; +    +private: +    unsigned int rptr, wptr; +    char buffer[BUFFER_SIZE+1]; // need to keep one byte free to detect empty state +}; + + +Reader::Reader( int fd_, Qt::HANDLE handle_ ) +    : QThread(), +      fd( fd_ ), +      handle( handle_ ), +      mutex(), +      bufferNotFullCondition(), +      bufferNotEmptyCondition(), +      hasStarted(), +      cancel( false ), +      eof( false ), +      error( false ), +      eofShortCut( false ), +      errorCode( 0 ), +      isReading( false ), +      consumerBlocksOnUs( false ), +      rptr( 0 ), wptr( 0 ) +{ +     +} + +Reader::~Reader() {} + + +class Writer : public QThread { +    Q_OBJECT +public: +    Writer( int fd, Qt::HANDLE handle ); +    ~Writer(); + +    qint64 writeData( const char * data, qint64 size ); + +    unsigned int bytesInBuffer() const { return numBytesInBuffer; } + +    bool bufferFull() const { +        return numBytesInBuffer == sizeof buffer; +    } + +    bool bufferEmpty() const { +	return numBytesInBuffer == 0; +    } + +Q_SIGNALS: +    void bytesWritten( qint64 ); + +protected: +    /* reimp */ void run(); + +private: +    int fd; +    Qt::HANDLE handle; +public: +    QMutex mutex; +    QWaitCondition bufferEmptyCondition; +    QWaitCondition bufferNotEmptyCondition; +    QWaitCondition hasStarted; +    bool cancel; +    bool error; +    int errorCode; +private: +    unsigned int numBytesInBuffer; +    char buffer[BUFFER_SIZE]; +}; +} + +Writer::Writer( int fd_, Qt::HANDLE handle_ ) +    : QThread(), +      fd( fd_ ), +      handle( handle_ ), +      mutex(), +      bufferEmptyCondition(), +      bufferNotEmptyCondition(), +      hasStarted(), +      cancel( false ), +      error( false ), +      errorCode( 0 ), +      numBytesInBuffer( 0 ) +{ + +} + +Writer::~Writer() {} + + +class KDPipeIODevice::Private : public QObject { +Q_OBJECT +    friend class ::KDPipeIODevice; +    KDPipeIODevice * const q; +public: +    explicit Private( KDPipeIODevice * qq ); +    ~Private(); + +    bool doOpen( int, Qt::HANDLE, OpenMode ); +    bool startReaderThread();  +    bool startWriterThread();  +    void stopThreads(); + +public Q_SLOTS: +    void emitReadyRead(); +  +private: +    int fd; +    Qt::HANDLE handle; +    Reader * reader; +    Writer * writer; +    bool triedToStartReader; +    bool triedToStartWriter; +}; + +KDPipeIODevice::Private::Private( KDPipeIODevice * qq ) +    : QObject( qq ), q( qq ), +      fd( -1 ), +      handle( 0 ), +      reader( 0 ), +      writer( 0 ), +      triedToStartReader( false ), triedToStartWriter( false )  +{ + +} + +KDPipeIODevice::Private::~Private() { +    qDebug( "KDPipeIODevice::~Private(): Destroying %p", q ); +} + +KDPipeIODevice::KDPipeIODevice( QObject * p ) +    : QIODevice( p ), d( new Private( this ) ) +{ +    KDAB_CHECK_CTOR; +} + +KDPipeIODevice::KDPipeIODevice( int fd, OpenMode mode, QObject * p ) +    : QIODevice( p ), d( new Private( this ) ) +{ +    KDAB_CHECK_CTOR; +    open( fd, mode ); +} + +KDPipeIODevice::KDPipeIODevice( Qt::HANDLE handle, OpenMode mode, QObject * p ) +    : QIODevice( p ), d( new Private( this ) ) +{ +    KDAB_CHECK_CTOR; +    open( handle, mode ); +} + +KDPipeIODevice::~KDPipeIODevice() { KDAB_CHECK_DTOR; +    if ( isOpen() ) +	close(); +    delete d; d = 0; +} + + +bool KDPipeIODevice::open( int fd, OpenMode mode ) { KDAB_CHECK_THIS; + +#ifdef Q_OS_WIN32 +    return d->doOpen( fd, (HANDLE)_get_osfhandle( fd ), mode ); +#else +    return d->doOpen( fd, 0, mode ); +#endif + +} + +bool KDPipeIODevice::open( Qt::HANDLE h, OpenMode mode ) { KDAB_CHECK_THIS; + +#ifdef Q_OS_WIN32 +    return d->doOpen( -1, h, mode ); +#else +    Q_UNUSED( h ); +    Q_UNUSED( mode ); +    assert( !"KDPipeIODevice::open( Qt::HANDLE, OpenMode ) should never be called except on Windows." ); +#endif + +} + +bool KDPipeIODevice::Private::startReaderThread() +{ +   if ( triedToStartReader ) +       return true; +   triedToStartReader = true;     +   if ( reader && !reader->isRunning() && !reader->isFinished() ) { +       qDebug("KDPipeIODevice::Private::startReaderThread(): locking reader (CONSUMER THREAD)" ); +       LOCKED( reader ); +       qDebug("KDPipeIODevice::Private::startReaderThread(): locked reader (CONSUMER THREAD)" ); +       reader->start( QThread::HighestPriority ); +       qDebug("KDPipeIODevice::Private::startReaderThread(): waiting for hasStarted (CONSUMER THREAD)" ); +       const bool hasStarted = reader->hasStarted.wait( &reader->mutex, 1000 ); +       qDebug("KDPipeIODevice::Private::startReaderThread(): returned from hasStarted (CONSUMER THREAD)" ); + +       return hasStarted; +   } +   return true; +} + +bool KDPipeIODevice::Private::startWriterThread() +{ +   if ( triedToStartWriter ) +       return true; +   triedToStartWriter = true;     +   if ( writer && !writer->isRunning() && !writer->isFinished() ) { +       LOCKED( writer ); +        +       writer->start( QThread::HighestPriority ); +       if ( !writer->hasStarted.wait( &writer->mutex, 1000 ) ) +            return false; +   } +   return true; +} + +void KDPipeIODevice::Private::emitReadyRead() +{ +    QPointer<Private> thisPointer( this ); +    qDebug( "KDPipeIODevice::Private::emitReadyRead %p", this ); + +    emit q->readyRead(); + +    if ( !thisPointer ) +        return; + +    bool mustNotify = false; + +    if ( reader ) { +        qDebug( "KDPipeIODevice::Private::emitReadyRead %p: locking reader (CONSUMER THREAD)", this ); +        synchronized( reader ) { +            qDebug( "KDPipeIODevice::Private::emitReadyRead %p: locked reader (CONSUMER THREAD)", this ); +            reader->readyReadSentCondition.wakeAll(); +            mustNotify = !reader->bufferEmpty() && reader->isReading; +            qDebug( "KDPipeIODevice::emitReadyRead %p: bufferEmpty: %d reader in ReadFile: %d", this, reader->bufferEmpty(), reader->isReading ); +        } +    } +    if ( mustNotify ) +        QTimer::singleShot( 100, this, SLOT( emitReadyRead() ) );   +    qDebug( "KDPipeIODevice::Private::emitReadyRead %p leaving", this ); + +} + +bool KDPipeIODevice::Private::doOpen( int fd_, Qt::HANDLE handle_, OpenMode mode_ ) { + +    if ( q->isOpen() ) +	return false; + +#ifdef Q_OS_WIN32 +    if ( !handle_ ) +	return false; +#else +    if ( fd_ < 0 ) +	return false; +#endif + +    if ( !(mode_ & ReadWrite) ) +	return false; // need to have at least read -or- write + + +    std::auto_ptr<Reader> reader_; +    std::auto_ptr<Writer> writer_; + +    if ( mode_ & ReadOnly ) { +	reader_.reset( new Reader( fd_, handle_ ) ); +        qDebug( "KDPipeIODevice::doOpen (%p): created reader (%p) for fd %d", this, reader_.get(), fd_ );  +	connect( reader_.get(), SIGNAL(readyRead()), this, SLOT(emitReadyRead()),  +Qt::QueuedConnection ); +    } +    if ( mode_ & WriteOnly ) { +	writer_.reset( new Writer( fd_, handle_ ) ); +        qDebug( "KDPipeIODevice::doOpen (%p): created writer (%p) for fd %d", this, writer_.get(), fd_ );  +        connect( writer_.get(), SIGNAL(bytesWritten(qint64)), q, SIGNAL(bytesWritten(qint64)),  +Qt::QueuedConnection ); +    } + +    // commit to *this: +    fd = fd_; +    handle = handle_; +    reader = reader_.release(); +    writer = writer_.release(); + +    q->setOpenMode( mode_|Unbuffered ); +    return true; +} + +int KDPipeIODevice::descriptor() const { KDAB_CHECK_THIS; +    return d->fd; +} + + +Qt::HANDLE KDPipeIODevice::handle() const { KDAB_CHECK_THIS; +    return d->handle; +} + +qint64 KDPipeIODevice::bytesAvailable() const { KDAB_CHECK_THIS; +    const qint64 base = QIODevice::bytesAvailable(); +    if ( !d->triedToStartReader ) { +         d->startReaderThread(); +         return base; +    } +    if ( d->reader ) +	synchronized( d->reader ) { +            const qint64 inBuffer = d->reader->bytesInBuffer();      +            return base + inBuffer; +       } +    return base; +} + +qint64 KDPipeIODevice::bytesToWrite() const { KDAB_CHECK_THIS; +    d->startWriterThread(); +    const qint64 base = QIODevice::bytesToWrite(); +    if ( d->writer ) +	synchronized( d->writer ) return base + d->writer->bytesInBuffer(); +    return base; +} + +bool KDPipeIODevice::canReadLine() const { KDAB_CHECK_THIS; +    d->startReaderThread(); +   if ( QIODevice::canReadLine() ) +	return true; +    if ( d->reader ) +	synchronized( d->reader ) return d->reader->bufferContains( '\n' ); +    return true; +} + +bool KDPipeIODevice::isSequential() const { +    return true; +} + +bool KDPipeIODevice::atEnd() const { KDAB_CHECK_THIS; +    d->startReaderThread(); +    if ( !QIODevice::atEnd() ) { +	qDebug( "%p: KDPipeIODevice::atEnd returns false since QIODevice::atEnd does (with bytesAvailable=%ld)", this, static_cast<long>(bytesAvailable()) ); +	return false; +    } +    if ( !isOpen() ) +	return true; +    if ( d->reader->eofShortCut ) +	return true; +    LOCKED( d->reader ); +    const bool eof = ( d->reader->error || d->reader->eof ) && d->reader->bufferEmpty(); +    if ( !eof ) { +	if ( !d->reader->error && !d->reader->eof ) +	    qDebug( "%p: KDPipeIODevice::atEnd returns false since !reader->error && !reader->eof", this ); +	if ( !d->reader->bufferEmpty() ) +	    qDebug( "%p: KDPipeIODevice::atEnd returns false since !reader->bufferEmpty()", this ); +    } +    return eof; +} + +bool KDPipeIODevice::waitForBytesWritten( int msecs ) { KDAB_CHECK_THIS; +    d->startWriterThread(); +    Writer * const w = d->writer; +    if ( !w ) +	return true; +    LOCKED( w ); +    qDebug( "KDPipeIODevice::waitForBytesWritten (%p,w=%p): entered locked area", this, w  +);  +    return w->bufferEmpty() || w->error || w->bufferEmptyCondition.wait( &w->mutex, msecs ) ; +} + +bool KDPipeIODevice::waitForReadyRead( int msecs ) { KDAB_CHECK_THIS; +    qDebug( "KDPipeIODEvice::waitForReadyRead()(%p)", this); +    d->startReaderThread(); +    if ( ALLOW_QIODEVICE_BUFFERING ) { +	if ( bytesAvailable() > 0 ) +	    return true; +    } +    Reader * const r = d->reader; +    if ( !r || r->eofShortCut ) +	return true; +    LOCKED( r ); +    if ( r->bytesInBuffer() != 0 || r->eof || r->error ) +        return true; + +    return msecs >= 0 ? r->bufferNotEmptyCondition.wait( &r->mutex, msecs ) : r->bufferNotEmptyCondition.wait( &r->mutex ); +} + +template <typename T> +class TemporaryValue { +public: +   TemporaryValue( T& var_, const T& tv ) : var( var_ ), oldValue( var_ ) { var = tv; } +   ~TemporaryValue() { var = oldValue; } +private:    +   T& var; +   const T oldValue; +};  + + +bool KDPipeIODevice::readWouldBlock() const +{ +   d->startReaderThread(); +   LOCKED( d->reader ); +   return d->reader->bufferEmpty() && !d->reader->eof && !d->reader->error; +}   + +bool KDPipeIODevice::writeWouldBlock() const +{ +   d->startWriterThread(); +   LOCKED( d->writer ); +   return !d->writer->bufferEmpty() && !d->writer->error; +}   + + +qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS; +    qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld", this, data, maxSize ); +    d->startReaderThread(); +    Reader * const r = d->reader; + +    assert( r ); + + +    //assert( r->isRunning() ); // wrong (might be eof, error) +    assert( data || maxSize == 0 ); +    assert( maxSize >= 0 ); + +    if ( r->eofShortCut ) { +	qDebug( "%p: KDPipeIODevice::readData: hit eofShortCut, returning 0", this ); +	return 0; +    } + +    if ( maxSize < 0 ) +	maxSize = 0; + +    if ( ALLOW_QIODEVICE_BUFFERING ) { +	if ( bytesAvailable() > 0 ) +	    maxSize = std::min( maxSize, bytesAvailable() ); // don't block +    } +    qDebug( "%p: KDPipeIODevice::readData: try to lock reader (CONSUMER THREAD)", this ); +    LOCKED( r ); +    qDebug( "%p: KDPipeIODevice::readData: locked reader (CONSUMER THREAD)", this ); + +    r->readyReadSentCondition.wakeAll(); +    if ( /* maxSize > 0 && */ r->bufferEmpty() &&  !r->error && !r->eof ) { // ### block on maxSize == 0? +	qDebug( "%p: KDPipeIODevice::readData: waiting for bufferNotEmptyCondition (CONSUMER THREAD)", this ); +        const TemporaryValue<bool> tmp( d->reader->consumerBlocksOnUs, true ); +	r->bufferNotEmptyCondition.wait( &r->mutex ); +        r->blockedConsumerIsDoneCondition.wakeAll(); +	qDebug( "%p: KDPipeIODevice::readData: woke up from bufferNotEmptyCondition (CONSUMER THREAD)", this );  +    } + +    if ( r->bufferEmpty() ) { +	qDebug( "%p: KDPipeIODevice::readData: got empty buffer, signal eof", this ); +	// woken with an empty buffer must mean either EOF or error: +	assert( r->eof || r->error ); +	r->eofShortCut = true; +	return r->eof ? 0 : -1 ; +    } + +    qDebug( "%p: KDPipeIODevice::readData: got bufferNotEmptyCondition, trying to read %lld bytes", this, maxSize ); +    const qint64 bytesRead = r->readData( data, maxSize ); +    qDebug( "%p: KDPipeIODevice::readData: read %lld bytes", this, bytesRead ); +    qDebug( "%p (fd=%d): KDPipeIODevice::readData: %s", this, d->fd, data ); +  +    return bytesRead; +} + +qint64 Reader::readData( char * data, qint64 maxSize ) { +    qint64 numRead = rptr < wptr ? wptr - rptr : sizeof buffer - rptr ; +    if ( numRead > maxSize ) +	numRead = maxSize; + +    qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld; rptr=%u, wptr=%u (bytesInBuffer=%u); -> numRead=%lld", this, +	    data, maxSize, rptr, wptr, bytesInBuffer(), numRead ); + +    std::memcpy( data, buffer + rptr, numRead ); + +    rptr = ( rptr + numRead ) % sizeof buffer ; + +    if ( !bufferFull() ) { +	qDebug( "%p: KDPipeIODevice::readData: signal bufferNotFullCondition", this ); +	bufferNotFullCondition.wakeAll(); +    } + +    return numRead; +} + +qint64 KDPipeIODevice::writeData( const char * data, qint64 size ) { KDAB_CHECK_THIS; +    d->startWriterThread(); +    Writer * const w = d->writer; + +    assert( w ); +    assert( w->error || w->isRunning() ); +    assert( data || size == 0 ); +    assert( size >= 0 ); + +    LOCKED( w ); + +    while ( !w->error && !w->bufferEmpty() ) {  +	qDebug( "%p: KDPipeIODevice::writeData: wait for empty buffer", this ); +	w->bufferEmptyCondition.wait( &w->mutex ); +	qDebug( "%p: KDPipeIODevice::writeData: empty buffer signaled", this ); + +    } +    if ( w->error ) +	return -1; + +    assert( w->bufferEmpty() ); + +    return w->writeData( data, size ); +} + +qint64 Writer::writeData( const char * data, qint64 size ) { +    assert( bufferEmpty() ); + +    if ( size > static_cast<qint64>( sizeof buffer ) ) +	size = sizeof buffer; + +    std::memcpy( buffer, data, size ); +     +    numBytesInBuffer = size; + +    if ( !bufferEmpty() ) { +	bufferNotEmptyCondition.wakeAll(); +    } +   return size; +} + +void KDPipeIODevice::Private::stopThreads() +{ +    if ( triedToStartWriter ) +    { +        if ( writer && q->bytesToWrite() > 0 ) +	    q->waitForBytesWritten( -1 ); + +        assert( q->bytesToWrite() == 0 ); +    } +    if ( Reader * & r = reader ) { +        disconnect( r, SIGNAL( readyRead() ), this, SLOT( emitReadyRead() ) );  +	synchronized( r ) { +	    // tell thread to cancel: +	    r->cancel = true; +	    // and wake it, so it can terminate: +            r->waitForCancelCondition.wakeAll(); +	    r->bufferNotFullCondition.wakeAll(); +            r->readyReadSentCondition.wakeAll(); +      	} +    } +    if ( Writer * & w = writer ) { +	synchronized( w ) { +	    // tell thread to cancel: +	    w->cancel = true; +	    // and wake it, so it can terminate: +	    w->bufferNotEmptyCondition.wakeAll(); +	} +    } +} + +void KDPipeIODevice::close() { KDAB_CHECK_THIS; +    qDebug( "KDPipeIODevice::close(%p)", this ); +    if ( !isOpen() ) +	return; + +    // tell clients we're about to close: +    emit aboutToClose(); +    d->stopThreads(); + +#define waitAndDelete( t ) if ( t ) { t->wait(); QThread* const t2 = t; t = 0; delete t2; } +    qDebug( "KPipeIODevice::close(%p): wait and closing writer %p", this, d->writer ); +    waitAndDelete( d->writer ); +    qDebug( "KPipeIODevice::close(%p): wait and closing reader %p", this, d->reader ); +    if ( d->reader ) { +        LOCKED( d->reader ); +        d->reader->readyReadSentCondition.wakeAll(); +    } +    waitAndDelete( d->reader ); +#undef waitAndDelete +#ifdef Q_OS_WIN32 +    if ( d->fd != -1 ) +        _close( d->fd ); +    else +        CloseHandle( d->handle ); +#else +    ::close( d->fd ); +#endif + +    setOpenMode( NotOpen ); +    d->fd = -1; +    d->handle = 0; +} + +void Reader::run() { + +    LOCKED( this ); + +    // too bad QThread doesn't have that itself; a signal isn't enough +    hasStarted.wakeAll(); + +    qDebug( "%p: Reader::run: started", this ); + +    while ( true ) { +        if ( !cancel && ( eof || error ) ) { +            //notify the client until the buffer is empty and then once  +            //again so he receives eof/error. After that, wait for him  +            //to cancel  +            const bool wasEmpty = bufferEmpty(); +	    qDebug( "%p: Reader::run: received eof(%d) or error(%d), waking everyone", this, eof, error ); +            notifyReadyRead(); +            if ( !cancel && wasEmpty )  +                waitForCancelCondition.wait( &mutex ); +        } else if ( !cancel && !bufferFull() && !bufferEmpty() ) { +	    qDebug( "%p: Reader::run: buffer no longer empty, waking everyone", this ); +            notifyReadyRead(); +        }  +  +        while ( !cancel && !error && bufferFull() ) { +            notifyReadyRead(); +            if ( !cancel && bufferFull() ) { +                qDebug( "%p: Reader::run: buffer is full, going to sleep", this ); +	        bufferNotFullCondition.wait( &mutex ); +	    } +        } +         +	if ( cancel ) { +            qDebug( "%p: Reader::run: detected cancel", this ); +	    goto leave; +	} + +        if ( !eof && !error ) { +            if ( rptr == wptr ) // optimize for larger chunks in case the buffer is empty +	        rptr = wptr = 0; + +            unsigned int numBytes = ( rptr + sizeof buffer - wptr - 1 ) % sizeof buffer; +	    if ( numBytes > sizeof buffer - wptr ) +	        numBytes = sizeof buffer - wptr; + +	    qDebug( "%p: Reader::run: rptr=%d, wptr=%d -> numBytes=%d", this, rptr, wptr, numBytes ); + +	    assert( numBytes > 0 ); + +	    qDebug( "%p: Reader::run: trying to read %d bytes", this, numBytes ); +#ifdef Q_OS_WIN32 +            isReading = true; +	    mutex.unlock(); +            DWORD numRead; +	    const bool ok = ReadFile( handle, buffer + wptr, numBytes, &numRead, 0 ); +	    mutex.lock(); +            isReading = false; +	    if ( ok ) { +                if ( numRead == 0 ) { +                    qDebug( "%p: Reader::run: got eof (numRead==0)", this ); +                    eof = true; +                }  +            } else { // !ok +	        errorCode = static_cast<int>( GetLastError() ); +	        if ( errorCode == ERROR_BROKEN_PIPE ) { +                    assert( numRead == 0 ); +                    qDebug( "%p: Reader::run: got eof (broken pipe)", this ); +		    eof = true; +	        } else { +                    assert( numRead == 0 ); +		    qDebug( "%p: Reader::run: got error: %s (%d)", this, strerror( errorCode ), errorCode ); +		    error = true; +	        } +	    } +#else +	    qint64 numRead; +	    mutex.unlock(); +	    do { +	        numRead = ::read( fd, buffer + wptr, numBytes ); +	    } while ( numRead == -1 && errno == EINTR ); +	    mutex.lock(); + +	    if ( numRead < 0 ) { +	        errorCode = errno; +	        error = true; +	        qDebug( "%p: Reader::run: got error: %d", this, errorCode ); +            } else if ( numRead == 0 ) { +	        qDebug( "%p: Reader::run: eof detected", this );   +                eof = true; +            } +#endif +	    qDebug( "%p: Reader::run: read %ld bytes", this, static_cast<long>(numRead) ); +	    qDebug( "%p: Reader::run(fd=%d): %s", this, fd, buffer ); + +	    if ( numRead > 0 ) { +	        qDebug( "%p: Reader::run: buffer before: rptr=%4d, wptr=%4d", this, rptr, wptr ); +	        wptr = ( wptr + numRead ) % sizeof buffer; +	        qDebug( "%p: Reader::run: buffer after:  rptr=%4d, wptr=%4d", this, rptr, wptr ); +            } +        } +    } + leave: +    qDebug( "%p: Reader::run: terminated", this ); +} + +void Reader::notifyReadyRead() +{ +    qDebug( "notifyReadyRead: %d bytes available", bytesInBuffer() ); +    assert( !cancel ); + +    if ( consumerBlocksOnUs ) { +        bufferNotEmptyCondition.wakeAll(); +        blockedConsumerIsDoneCondition.wait( &mutex ); +        return; +    } +    qDebug( "notifyReadyRead: emit signal" ); +    emit readyRead(); +    readyReadSentCondition.wait( &mutex ); +    qDebug( "notifyReadyRead: returning from waiting, leave" ); +} + +void Writer::run() { + +    LOCKED( this ); + +    // too bad QThread doesn't have that itself; a signal isn't enough +    hasStarted.wakeAll(); + +    qDebug( "%p: Writer::run: started", this ); + +    while ( true ) { + +	while ( !cancel && bufferEmpty() ) { +	    qDebug( "%p: Writer::run: buffer is empty, wake bufferEmptyCond listeners", this ); +            bufferEmptyCondition.wakeAll(); +            emit bytesWritten( 0 ); +	    qDebug( "%p: Writer::run: buffer is empty, going to sleep", this ); +            bufferNotEmptyCondition.wait( &mutex ); +	    qDebug( "%p: Writer::run: woke up", this ); +	} + +	if ( cancel ) { +	    qDebug( "%p: Writer::run: detected cancel", this ); +	    goto leave; +	} + +	assert( numBytesInBuffer > 0 ); + +	qDebug( "%p: Writer::run: Trying to write %u bytes", this, numBytesInBuffer ); +	qint64 totalWritten = 0; +	do {   +            mutex.unlock(); +#ifdef Q_OS_WIN32 +            DWORD numWritten; +            qDebug( "%p (fd=%d): Writer::run: buffer before WriteFile (numBytes=%lld): %s:", this, fd, numBytesInBuffer, buffer );  +            qDebug( "%p (fd=%d): Writer::run: Going into WriteFile", this, fd ); +	    if ( !WriteFile( handle, buffer + totalWritten, numBytesInBuffer - totalWritten, &numWritten, 0 ) ) { +		mutex.lock(); +		errorCode = static_cast<int>( GetLastError() ); +		qDebug( "%p: Writer::run: got error code: %d", this, errorCode ); +		error = true; +                goto leave; +	    } +#else +	    qint64 numWritten; +	    do { +		numWritten = ::write( fd, buffer + totalWritten, numBytesInBuffer - totalWritten ); +	    } while ( numWritten == -1 && errno == EINTR ); + +	    if ( numWritten < 0 ) { +	        mutex.lock(); +		errorCode = errno; +		qDebug( "%p: Writer::run: got error code: %d", this, errorCode ); +		error = true; +                goto leave; +	    } +#endif +            qDebug( "%p (fd=%d): Writer::run: buffer after WriteFile (numBytes=%u): %s:", this, fd, numBytesInBuffer, buffer ); +	    totalWritten += numWritten; +            mutex.lock(); +	} while ( totalWritten < numBytesInBuffer ); + +	qDebug( "%p: Writer::run: wrote %lld bytes", this, totalWritten ); + +	numBytesInBuffer = 0; + +	qDebug( "%p: Writer::run: buffer is empty, wake bufferEmptyCond listeners", this ); +	bufferEmptyCondition.wakeAll(); +	emit bytesWritten( totalWritten ); +    } + leave: +    qDebug( "%p: Writer::run: terminating", this ); +    numBytesInBuffer = 0; +    qDebug( "%p: Writer::run: buffer is empty, wake bufferEmptyCond listeners", this ); +    bufferEmptyCondition.wakeAll(); +    emit bytesWritten( 0 ); +} + +// static  +std::pair<KDPipeIODevice*,KDPipeIODevice*> KDPipeIODevice::makePairOfConnectedPipes() { +    KDPipeIODevice * read = 0; +    KDPipeIODevice * write = 0; +#ifdef Q_OS_WIN32 +    HANDLE rh; +    HANDLE wh; +    SECURITY_ATTRIBUTES sa; +    memset( &sa, 0, sizeof(sa) ); +    sa.nLength = sizeof(sa); +    sa.bInheritHandle = TRUE; +	if ( CreatePipe( &rh, &wh, &sa, BUFFER_SIZE ) ) { +	read = new KDPipeIODevice; +	read->open( rh, ReadOnly ); +	write = new KDPipeIODevice; +	write->open( wh, WriteOnly ); +    } +#else +    int fds[2]; +    if ( pipe( fds ) == 0 ) { +	read = new KDPipeIODevice; +	read->open( fds[0], ReadOnly ); +	write = new KDPipeIODevice; +	write->open( fds[1], WriteOnly ); +    } +#endif +    return std::make_pair( read, write ); +} + +#ifdef KDAB_DEFINE_CHECKS +KDAB_DEFINE_CHECKS( KDPipeIODevice ) { +    if ( !isOpen() ) { +	assert( openMode() == NotOpen ); +	assert( !d->reader ); +	assert( !d->writer ); +#ifdef Q_OS_WIN32 +	assert( !d->handle ); +#else +	assert( d->fd < 0 ); +#endif +    } else { +	assert( openMode() != NotOpen ); +	assert( openMode() & ReadWrite ); +	if ( openMode() & ReadOnly ) { +	    assert( d->reader ); +	    synchronized( d->reader ) +		assert( d->reader->eof || d->reader->error || d->reader->isRunning() ); +	} +	if ( openMode() & WriteOnly ) { +	    assert( d->writer ); +	    synchronized( d->writer ) +		assert( d->writer->error || d->writer->isRunning() ); +	} +#ifdef Q_OS_WIN32 +	assert( d->handle ); +#else +	assert( d->fd >= 0 ); +#endif +    } +} +#endif // KDAB_DEFINE_CHECKS + +#include "moc_kdpipeiodevice.cpp" +#include "kdpipeiodevice.moc" | 
