gpgme/src/kdpipeiodevice.cpp
Marcus Brinkmann 66d0fa1973 008-11-03 Marcus Brinkmann <marcus@g10code.com>
* configure.ac: Replace gpgme paths with src.
        * gpgme: Move to ...
        * src: ... this new directory.

assuan/
2008-11-03  Marcus Brinkmann  <marcus@g10code.com>

	* Makefile.am (INCLUDES): Replace gpgme path with src.

tests/
2008-11-03  Marcus Brinkmann  <marcus@g10code.com>

        * gpgsm/Makefile.am (INCLUDES, LDADD): Replace gpgme path with src.
        * gpg/Makefile.am (INCLUDES, LDADD, t_thread1_LDADD): Likewise.
	* Makefile.am (LDADD): Likewise.
2008-11-03 17:24:09 +00:00

952 lines
26 KiB
C++

/*
Copyright (C) 2007 Klarälvdalens Datakonsult AB
KDPipeIODevice is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
KDPipeIODevice is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Library General Public License for more details.
You should have received a copy of the GNU Library General Public License
along with KDPipeIODevice; see the file COPYING.LIB. If not, write to the
Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
Boston, MA 02110-1301, USA.
*/
#include "kdpipeiodevice.h"
#include <QtCore>
#include <cassert>
#include <memory>
#include <algorithm>
#ifdef Q_OS_WIN32
# ifndef NOMINMAX
# define NOMINMAX
# endif
# include <windows.h>
# include <io.h>
#else
# include <unistd.h>
# include <errno.h>
#endif
using namespace _gpgme_;
#ifndef KDAB_CHECK_THIS
# define KDAB_CHECK_CTOR (void)1
# define KDAB_CHECK_DTOR KDAB_CHECK_CTOR
# define KDAB_CHECK_THIS KDAB_CHECK_CTOR
#endif
#define LOCKED( d ) const QMutexLocker locker( &d->mutex )
#define synchronized( d ) if ( int i = 0 ) {} else for ( const QMutexLocker locker( &d->mutex ) ; !i ; ++i )
const unsigned int BUFFER_SIZE = 4096;
const bool ALLOW_QIODEVICE_BUFFERING = true;
// comment to get trace output:
//#define qDebug if(1){}else qDebug
namespace {
class Reader : public QThread {
Q_OBJECT
public:
Reader( int fd, Qt::HANDLE handle );
~Reader();
qint64 readData( char * data, qint64 maxSize );
unsigned int bytesInBuffer() const {
return ( wptr + sizeof buffer - rptr ) % sizeof buffer ;
}
bool bufferFull() const {
return bytesInBuffer() == sizeof buffer - 1;
}
bool bufferEmpty() const {
return bytesInBuffer() == 0;
}
bool bufferContains( char ch ) {
const unsigned int bib = bytesInBuffer();
for ( unsigned int i = rptr ; i < rptr + bib ; ++i )
if ( buffer[i%sizeof buffer] == ch )
return true;
return false;
}
void notifyReadyRead();
Q_SIGNALS:
void readyRead();
protected:
/* reimp */ void run();
private:
int fd;
Qt::HANDLE handle;
public:
QMutex mutex;
QWaitCondition waitForCancelCondition;
QWaitCondition bufferNotFullCondition;
QWaitCondition bufferNotEmptyCondition;
QWaitCondition hasStarted;
QWaitCondition readyReadSentCondition;
QWaitCondition blockedConsumerIsDoneCondition;
bool cancel;
bool eof;
bool error;
bool eofShortCut;
int errorCode;
bool isReading;
bool consumerBlocksOnUs;
private:
unsigned int rptr, wptr;
char buffer[BUFFER_SIZE+1]; // need to keep one byte free to detect empty state
};
Reader::Reader( int fd_, Qt::HANDLE handle_ )
: QThread(),
fd( fd_ ),
handle( handle_ ),
mutex(),
bufferNotFullCondition(),
bufferNotEmptyCondition(),
hasStarted(),
cancel( false ),
eof( false ),
error( false ),
eofShortCut( false ),
errorCode( 0 ),
isReading( false ),
consumerBlocksOnUs( false ),
rptr( 0 ), wptr( 0 )
{
}
Reader::~Reader() {}
class Writer : public QThread {
Q_OBJECT
public:
Writer( int fd, Qt::HANDLE handle );
~Writer();
qint64 writeData( const char * data, qint64 size );
unsigned int bytesInBuffer() const { return numBytesInBuffer; }
bool bufferFull() const {
return numBytesInBuffer == sizeof buffer;
}
bool bufferEmpty() const {
return numBytesInBuffer == 0;
}
Q_SIGNALS:
void bytesWritten( qint64 );
protected:
/* reimp */ void run();
private:
int fd;
Qt::HANDLE handle;
public:
QMutex mutex;
QWaitCondition bufferEmptyCondition;
QWaitCondition bufferNotEmptyCondition;
QWaitCondition hasStarted;
bool cancel;
bool error;
int errorCode;
private:
unsigned int numBytesInBuffer;
char buffer[BUFFER_SIZE];
};
}
Writer::Writer( int fd_, Qt::HANDLE handle_ )
: QThread(),
fd( fd_ ),
handle( handle_ ),
mutex(),
bufferEmptyCondition(),
bufferNotEmptyCondition(),
hasStarted(),
cancel( false ),
error( false ),
errorCode( 0 ),
numBytesInBuffer( 0 )
{
}
Writer::~Writer() {}
class KDPipeIODevice::Private : public QObject {
Q_OBJECT
friend class ::KDPipeIODevice;
KDPipeIODevice * const q;
public:
explicit Private( KDPipeIODevice * qq );
~Private();
bool doOpen( int, Qt::HANDLE, OpenMode );
bool startReaderThread();
bool startWriterThread();
void stopThreads();
public Q_SLOTS:
void emitReadyRead();
private:
int fd;
Qt::HANDLE handle;
Reader * reader;
Writer * writer;
bool triedToStartReader;
bool triedToStartWriter;
};
KDPipeIODevice::Private::Private( KDPipeIODevice * qq )
: QObject( qq ), q( qq ),
fd( -1 ),
handle( 0 ),
reader( 0 ),
writer( 0 ),
triedToStartReader( false ), triedToStartWriter( false )
{
}
KDPipeIODevice::Private::~Private() {
qDebug( "KDPipeIODevice::~Private(): Destroying %p", q );
}
KDPipeIODevice::KDPipeIODevice( QObject * p )
: QIODevice( p ), d( new Private( this ) )
{
KDAB_CHECK_CTOR;
}
KDPipeIODevice::KDPipeIODevice( int fd, OpenMode mode, QObject * p )
: QIODevice( p ), d( new Private( this ) )
{
KDAB_CHECK_CTOR;
open( fd, mode );
}
KDPipeIODevice::KDPipeIODevice( Qt::HANDLE handle, OpenMode mode, QObject * p )
: QIODevice( p ), d( new Private( this ) )
{
KDAB_CHECK_CTOR;
open( handle, mode );
}
KDPipeIODevice::~KDPipeIODevice() { KDAB_CHECK_DTOR;
if ( isOpen() )
close();
delete d; d = 0;
}
bool KDPipeIODevice::open( int fd, OpenMode mode ) { KDAB_CHECK_THIS;
#ifdef Q_OS_WIN32
return d->doOpen( fd, (HANDLE)_get_osfhandle( fd ), mode );
#else
return d->doOpen( fd, 0, mode );
#endif
}
bool KDPipeIODevice::open( Qt::HANDLE h, OpenMode mode ) { KDAB_CHECK_THIS;
#ifdef Q_OS_WIN32
return d->doOpen( -1, h, mode );
#else
Q_UNUSED( h );
Q_UNUSED( mode );
assert( !"KDPipeIODevice::open( Qt::HANDLE, OpenMode ) should never be called except on Windows." );
#endif
}
bool KDPipeIODevice::Private::startReaderThread()
{
if ( triedToStartReader )
return true;
triedToStartReader = true;
if ( reader && !reader->isRunning() && !reader->isFinished() ) {
qDebug("KDPipeIODevice::Private::startReaderThread(): locking reader (CONSUMER THREAD)" );
LOCKED( reader );
qDebug("KDPipeIODevice::Private::startReaderThread(): locked reader (CONSUMER THREAD)" );
reader->start( QThread::HighestPriority );
qDebug("KDPipeIODevice::Private::startReaderThread(): waiting for hasStarted (CONSUMER THREAD)" );
const bool hasStarted = reader->hasStarted.wait( &reader->mutex, 1000 );
qDebug("KDPipeIODevice::Private::startReaderThread(): returned from hasStarted (CONSUMER THREAD)" );
return hasStarted;
}
return true;
}
bool KDPipeIODevice::Private::startWriterThread()
{
if ( triedToStartWriter )
return true;
triedToStartWriter = true;
if ( writer && !writer->isRunning() && !writer->isFinished() ) {
LOCKED( writer );
writer->start( QThread::HighestPriority );
if ( !writer->hasStarted.wait( &writer->mutex, 1000 ) )
return false;
}
return true;
}
void KDPipeIODevice::Private::emitReadyRead()
{
QPointer<Private> thisPointer( this );
qDebug( "KDPipeIODevice::Private::emitReadyRead %p", this );
emit q->readyRead();
if ( !thisPointer )
return;
bool mustNotify = false;
if ( reader ) {
qDebug( "KDPipeIODevice::Private::emitReadyRead %p: locking reader (CONSUMER THREAD)", this );
synchronized( reader ) {
qDebug( "KDPipeIODevice::Private::emitReadyRead %p: locked reader (CONSUMER THREAD)", this );
reader->readyReadSentCondition.wakeAll();
mustNotify = !reader->bufferEmpty() && reader->isReading;
qDebug( "KDPipeIODevice::emitReadyRead %p: bufferEmpty: %d reader in ReadFile: %d", this, reader->bufferEmpty(), reader->isReading );
}
}
if ( mustNotify )
QTimer::singleShot( 100, this, SLOT( emitReadyRead() ) );
qDebug( "KDPipeIODevice::Private::emitReadyRead %p leaving", this );
}
bool KDPipeIODevice::Private::doOpen( int fd_, Qt::HANDLE handle_, OpenMode mode_ ) {
if ( q->isOpen() )
return false;
#ifdef Q_OS_WIN32
if ( !handle_ )
return false;
#else
if ( fd_ < 0 )
return false;
#endif
if ( !(mode_ & ReadWrite) )
return false; // need to have at least read -or- write
std::auto_ptr<Reader> reader_;
std::auto_ptr<Writer> writer_;
if ( mode_ & ReadOnly ) {
reader_.reset( new Reader( fd_, handle_ ) );
qDebug( "KDPipeIODevice::doOpen (%p): created reader (%p) for fd %d", this, reader_.get(), fd_ );
connect( reader_.get(), SIGNAL(readyRead()), this, SLOT(emitReadyRead()),
Qt::QueuedConnection );
}
if ( mode_ & WriteOnly ) {
writer_.reset( new Writer( fd_, handle_ ) );
qDebug( "KDPipeIODevice::doOpen (%p): created writer (%p) for fd %d", this, writer_.get(), fd_ );
connect( writer_.get(), SIGNAL(bytesWritten(qint64)), q, SIGNAL(bytesWritten(qint64)),
Qt::QueuedConnection );
}
// commit to *this:
fd = fd_;
handle = handle_;
reader = reader_.release();
writer = writer_.release();
q->setOpenMode( mode_|Unbuffered );
return true;
}
int KDPipeIODevice::descriptor() const { KDAB_CHECK_THIS;
return d->fd;
}
Qt::HANDLE KDPipeIODevice::handle() const { KDAB_CHECK_THIS;
return d->handle;
}
qint64 KDPipeIODevice::bytesAvailable() const { KDAB_CHECK_THIS;
const qint64 base = QIODevice::bytesAvailable();
if ( !d->triedToStartReader ) {
d->startReaderThread();
return base;
}
if ( d->reader )
synchronized( d->reader ) {
const qint64 inBuffer = d->reader->bytesInBuffer();
return base + inBuffer;
}
return base;
}
qint64 KDPipeIODevice::bytesToWrite() const { KDAB_CHECK_THIS;
d->startWriterThread();
const qint64 base = QIODevice::bytesToWrite();
if ( d->writer )
synchronized( d->writer ) return base + d->writer->bytesInBuffer();
return base;
}
bool KDPipeIODevice::canReadLine() const { KDAB_CHECK_THIS;
d->startReaderThread();
if ( QIODevice::canReadLine() )
return true;
if ( d->reader )
synchronized( d->reader ) return d->reader->bufferContains( '\n' );
return true;
}
bool KDPipeIODevice::isSequential() const {
return true;
}
bool KDPipeIODevice::atEnd() const { KDAB_CHECK_THIS;
d->startReaderThread();
if ( !QIODevice::atEnd() ) {
qDebug( "%p: KDPipeIODevice::atEnd returns false since QIODevice::atEnd does (with bytesAvailable=%ld)", this, static_cast<long>(bytesAvailable()) );
return false;
}
if ( !isOpen() )
return true;
if ( d->reader->eofShortCut )
return true;
LOCKED( d->reader );
const bool eof = ( d->reader->error || d->reader->eof ) && d->reader->bufferEmpty();
if ( !eof ) {
if ( !d->reader->error && !d->reader->eof )
qDebug( "%p: KDPipeIODevice::atEnd returns false since !reader->error && !reader->eof", this );
if ( !d->reader->bufferEmpty() )
qDebug( "%p: KDPipeIODevice::atEnd returns false since !reader->bufferEmpty()", this );
}
return eof;
}
bool KDPipeIODevice::waitForBytesWritten( int msecs ) { KDAB_CHECK_THIS;
d->startWriterThread();
Writer * const w = d->writer;
if ( !w )
return true;
LOCKED( w );
qDebug( "KDPipeIODevice::waitForBytesWritten (%p,w=%p): entered locked area", this, w
);
return w->bufferEmpty() || w->error || w->bufferEmptyCondition.wait( &w->mutex, msecs ) ;
}
bool KDPipeIODevice::waitForReadyRead( int msecs ) { KDAB_CHECK_THIS;
qDebug( "KDPipeIODEvice::waitForReadyRead()(%p)", this);
d->startReaderThread();
if ( ALLOW_QIODEVICE_BUFFERING ) {
if ( bytesAvailable() > 0 )
return true;
}
Reader * const r = d->reader;
if ( !r || r->eofShortCut )
return true;
LOCKED( r );
if ( r->bytesInBuffer() != 0 || r->eof || r->error )
return true;
return msecs >= 0 ? r->bufferNotEmptyCondition.wait( &r->mutex, msecs ) : r->bufferNotEmptyCondition.wait( &r->mutex );
}
template <typename T>
class TemporaryValue {
public:
TemporaryValue( T& var_, const T& tv ) : var( var_ ), oldValue( var_ ) { var = tv; }
~TemporaryValue() { var = oldValue; }
private:
T& var;
const T oldValue;
};
bool KDPipeIODevice::readWouldBlock() const
{
d->startReaderThread();
LOCKED( d->reader );
return d->reader->bufferEmpty() && !d->reader->eof && !d->reader->error;
}
bool KDPipeIODevice::writeWouldBlock() const
{
d->startWriterThread();
LOCKED( d->writer );
return !d->writer->bufferEmpty() && !d->writer->error;
}
qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS;
qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld", this, data, maxSize );
d->startReaderThread();
Reader * const r = d->reader;
assert( r );
//assert( r->isRunning() ); // wrong (might be eof, error)
assert( data || maxSize == 0 );
assert( maxSize >= 0 );
if ( r->eofShortCut ) {
qDebug( "%p: KDPipeIODevice::readData: hit eofShortCut, returning 0", this );
return 0;
}
if ( maxSize < 0 )
maxSize = 0;
if ( ALLOW_QIODEVICE_BUFFERING ) {
if ( bytesAvailable() > 0 )
maxSize = std::min( maxSize, bytesAvailable() ); // don't block
}
qDebug( "%p: KDPipeIODevice::readData: try to lock reader (CONSUMER THREAD)", this );
LOCKED( r );
qDebug( "%p: KDPipeIODevice::readData: locked reader (CONSUMER THREAD)", this );
r->readyReadSentCondition.wakeAll();
if ( /* maxSize > 0 && */ r->bufferEmpty() && !r->error && !r->eof ) { // ### block on maxSize == 0?
qDebug( "%p: KDPipeIODevice::readData: waiting for bufferNotEmptyCondition (CONSUMER THREAD)", this );
const TemporaryValue<bool> tmp( d->reader->consumerBlocksOnUs, true );
r->bufferNotEmptyCondition.wait( &r->mutex );
r->blockedConsumerIsDoneCondition.wakeAll();
qDebug( "%p: KDPipeIODevice::readData: woke up from bufferNotEmptyCondition (CONSUMER THREAD)", this );
}
if ( r->bufferEmpty() ) {
qDebug( "%p: KDPipeIODevice::readData: got empty buffer, signal eof", this );
// woken with an empty buffer must mean either EOF or error:
assert( r->eof || r->error );
r->eofShortCut = true;
return r->eof ? 0 : -1 ;
}
qDebug( "%p: KDPipeIODevice::readData: got bufferNotEmptyCondition, trying to read %lld bytes", this, maxSize );
const qint64 bytesRead = r->readData( data, maxSize );
qDebug( "%p: KDPipeIODevice::readData: read %lld bytes", this, bytesRead );
qDebug( "%p (fd=%d): KDPipeIODevice::readData: %s", this, d->fd, data );
return bytesRead;
}
qint64 Reader::readData( char * data, qint64 maxSize ) {
qint64 numRead = rptr < wptr ? wptr - rptr : sizeof buffer - rptr ;
if ( numRead > maxSize )
numRead = maxSize;
qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld; rptr=%u, wptr=%u (bytesInBuffer=%u); -> numRead=%lld", this,
data, maxSize, rptr, wptr, bytesInBuffer(), numRead );
std::memcpy( data, buffer + rptr, numRead );
rptr = ( rptr + numRead ) % sizeof buffer ;
if ( !bufferFull() ) {
qDebug( "%p: KDPipeIODevice::readData: signal bufferNotFullCondition", this );
bufferNotFullCondition.wakeAll();
}
return numRead;
}
qint64 KDPipeIODevice::writeData( const char * data, qint64 size ) { KDAB_CHECK_THIS;
d->startWriterThread();
Writer * const w = d->writer;
assert( w );
assert( w->error || w->isRunning() );
assert( data || size == 0 );
assert( size >= 0 );
LOCKED( w );
while ( !w->error && !w->bufferEmpty() ) {
qDebug( "%p: KDPipeIODevice::writeData: wait for empty buffer", this );
w->bufferEmptyCondition.wait( &w->mutex );
qDebug( "%p: KDPipeIODevice::writeData: empty buffer signaled", this );
}
if ( w->error )
return -1;
assert( w->bufferEmpty() );
return w->writeData( data, size );
}
qint64 Writer::writeData( const char * data, qint64 size ) {
assert( bufferEmpty() );
if ( size > static_cast<qint64>( sizeof buffer ) )
size = sizeof buffer;
std::memcpy( buffer, data, size );
numBytesInBuffer = size;
if ( !bufferEmpty() ) {
bufferNotEmptyCondition.wakeAll();
}
return size;
}
void KDPipeIODevice::Private::stopThreads()
{
if ( triedToStartWriter )
{
if ( writer && q->bytesToWrite() > 0 )
q->waitForBytesWritten( -1 );
assert( q->bytesToWrite() == 0 );
}
if ( Reader * & r = reader ) {
disconnect( r, SIGNAL( readyRead() ), this, SLOT( emitReadyRead() ) );
synchronized( r ) {
// tell thread to cancel:
r->cancel = true;
// and wake it, so it can terminate:
r->waitForCancelCondition.wakeAll();
r->bufferNotFullCondition.wakeAll();
r->readyReadSentCondition.wakeAll();
}
}
if ( Writer * & w = writer ) {
synchronized( w ) {
// tell thread to cancel:
w->cancel = true;
// and wake it, so it can terminate:
w->bufferNotEmptyCondition.wakeAll();
}
}
}
void KDPipeIODevice::close() { KDAB_CHECK_THIS;
qDebug( "KDPipeIODevice::close(%p)", this );
if ( !isOpen() )
return;
// tell clients we're about to close:
emit aboutToClose();
d->stopThreads();
#define waitAndDelete( t ) if ( t ) { t->wait(); QThread* const t2 = t; t = 0; delete t2; }
qDebug( "KPipeIODevice::close(%p): wait and closing writer %p", this, d->writer );
waitAndDelete( d->writer );
qDebug( "KPipeIODevice::close(%p): wait and closing reader %p", this, d->reader );
if ( d->reader ) {
LOCKED( d->reader );
d->reader->readyReadSentCondition.wakeAll();
}
waitAndDelete( d->reader );
#undef waitAndDelete
#ifdef Q_OS_WIN32
if ( d->fd != -1 )
_close( d->fd );
else
CloseHandle( d->handle );
#else
::close( d->fd );
#endif
setOpenMode( NotOpen );
d->fd = -1;
d->handle = 0;
}
void Reader::run() {
LOCKED( this );
// too bad QThread doesn't have that itself; a signal isn't enough
hasStarted.wakeAll();
qDebug( "%p: Reader::run: started", this );
while ( true ) {
if ( !cancel && ( eof || error ) ) {
//notify the client until the buffer is empty and then once
//again so he receives eof/error. After that, wait for him
//to cancel
const bool wasEmpty = bufferEmpty();
qDebug( "%p: Reader::run: received eof(%d) or error(%d), waking everyone", this, eof, error );
notifyReadyRead();
if ( !cancel && wasEmpty )
waitForCancelCondition.wait( &mutex );
} else if ( !cancel && !bufferFull() && !bufferEmpty() ) {
qDebug( "%p: Reader::run: buffer no longer empty, waking everyone", this );
notifyReadyRead();
}
while ( !cancel && !error && bufferFull() ) {
notifyReadyRead();
if ( !cancel && bufferFull() ) {
qDebug( "%p: Reader::run: buffer is full, going to sleep", this );
bufferNotFullCondition.wait( &mutex );
}
}
if ( cancel ) {
qDebug( "%p: Reader::run: detected cancel", this );
goto leave;
}
if ( !eof && !error ) {
if ( rptr == wptr ) // optimize for larger chunks in case the buffer is empty
rptr = wptr = 0;
unsigned int numBytes = ( rptr + sizeof buffer - wptr - 1 ) % sizeof buffer;
if ( numBytes > sizeof buffer - wptr )
numBytes = sizeof buffer - wptr;
qDebug( "%p: Reader::run: rptr=%d, wptr=%d -> numBytes=%d", this, rptr, wptr, numBytes );
assert( numBytes > 0 );
qDebug( "%p: Reader::run: trying to read %d bytes", this, numBytes );
#ifdef Q_OS_WIN32
isReading = true;
mutex.unlock();
DWORD numRead;
const bool ok = ReadFile( handle, buffer + wptr, numBytes, &numRead, 0 );
mutex.lock();
isReading = false;
if ( ok ) {
if ( numRead == 0 ) {
qDebug( "%p: Reader::run: got eof (numRead==0)", this );
eof = true;
}
} else { // !ok
errorCode = static_cast<int>( GetLastError() );
if ( errorCode == ERROR_BROKEN_PIPE ) {
assert( numRead == 0 );
qDebug( "%p: Reader::run: got eof (broken pipe)", this );
eof = true;
} else {
assert( numRead == 0 );
qDebug( "%p: Reader::run: got error: %s (%d)", this, strerror( errorCode ), errorCode );
error = true;
}
}
#else
qint64 numRead;
mutex.unlock();
do {
numRead = ::read( fd, buffer + wptr, numBytes );
} while ( numRead == -1 && errno == EINTR );
mutex.lock();
if ( numRead < 0 ) {
errorCode = errno;
error = true;
qDebug( "%p: Reader::run: got error: %d", this, errorCode );
} else if ( numRead == 0 ) {
qDebug( "%p: Reader::run: eof detected", this );
eof = true;
}
#endif
qDebug( "%p: Reader::run: read %ld bytes", this, static_cast<long>(numRead) );
qDebug( "%p: Reader::run(fd=%d): %s", this, fd, buffer );
if ( numRead > 0 ) {
qDebug( "%p: Reader::run: buffer before: rptr=%4d, wptr=%4d", this, rptr, wptr );
wptr = ( wptr + numRead ) % sizeof buffer;
qDebug( "%p: Reader::run: buffer after: rptr=%4d, wptr=%4d", this, rptr, wptr );
}
}
}
leave:
qDebug( "%p: Reader::run: terminated", this );
}
void Reader::notifyReadyRead()
{
qDebug( "notifyReadyRead: %d bytes available", bytesInBuffer() );
assert( !cancel );
if ( consumerBlocksOnUs ) {
bufferNotEmptyCondition.wakeAll();
blockedConsumerIsDoneCondition.wait( &mutex );
return;
}
qDebug( "notifyReadyRead: emit signal" );
emit readyRead();
readyReadSentCondition.wait( &mutex );
qDebug( "notifyReadyRead: returning from waiting, leave" );
}
void Writer::run() {
LOCKED( this );
// too bad QThread doesn't have that itself; a signal isn't enough
hasStarted.wakeAll();
qDebug( "%p: Writer::run: started", this );
while ( true ) {
while ( !cancel && bufferEmpty() ) {
qDebug( "%p: Writer::run: buffer is empty, wake bufferEmptyCond listeners", this );
bufferEmptyCondition.wakeAll();
emit bytesWritten( 0 );
qDebug( "%p: Writer::run: buffer is empty, going to sleep", this );
bufferNotEmptyCondition.wait( &mutex );
qDebug( "%p: Writer::run: woke up", this );
}
if ( cancel ) {
qDebug( "%p: Writer::run: detected cancel", this );
goto leave;
}
assert( numBytesInBuffer > 0 );
qDebug( "%p: Writer::run: Trying to write %u bytes", this, numBytesInBuffer );
qint64 totalWritten = 0;
do {
mutex.unlock();
#ifdef Q_OS_WIN32
DWORD numWritten;
qDebug( "%p (fd=%d): Writer::run: buffer before WriteFile (numBytes=%lld): %s:", this, fd, numBytesInBuffer, buffer );
qDebug( "%p (fd=%d): Writer::run: Going into WriteFile", this, fd );
if ( !WriteFile( handle, buffer + totalWritten, numBytesInBuffer - totalWritten, &numWritten, 0 ) ) {
mutex.lock();
errorCode = static_cast<int>( GetLastError() );
qDebug( "%p: Writer::run: got error code: %d", this, errorCode );
error = true;
goto leave;
}
#else
qint64 numWritten;
do {
numWritten = ::write( fd, buffer + totalWritten, numBytesInBuffer - totalWritten );
} while ( numWritten == -1 && errno == EINTR );
if ( numWritten < 0 ) {
mutex.lock();
errorCode = errno;
qDebug( "%p: Writer::run: got error code: %d", this, errorCode );
error = true;
goto leave;
}
#endif
qDebug( "%p (fd=%d): Writer::run: buffer after WriteFile (numBytes=%u): %s:", this, fd, numBytesInBuffer, buffer );
totalWritten += numWritten;
mutex.lock();
} while ( totalWritten < numBytesInBuffer );
qDebug( "%p: Writer::run: wrote %lld bytes", this, totalWritten );
numBytesInBuffer = 0;
qDebug( "%p: Writer::run: buffer is empty, wake bufferEmptyCond listeners", this );
bufferEmptyCondition.wakeAll();
emit bytesWritten( totalWritten );
}
leave:
qDebug( "%p: Writer::run: terminating", this );
numBytesInBuffer = 0;
qDebug( "%p: Writer::run: buffer is empty, wake bufferEmptyCond listeners", this );
bufferEmptyCondition.wakeAll();
emit bytesWritten( 0 );
}
// static
std::pair<KDPipeIODevice*,KDPipeIODevice*> KDPipeIODevice::makePairOfConnectedPipes() {
KDPipeIODevice * read = 0;
KDPipeIODevice * write = 0;
#ifdef Q_OS_WIN32
HANDLE rh;
HANDLE wh;
SECURITY_ATTRIBUTES sa;
memset( &sa, 0, sizeof(sa) );
sa.nLength = sizeof(sa);
sa.bInheritHandle = TRUE;
if ( CreatePipe( &rh, &wh, &sa, BUFFER_SIZE ) ) {
read = new KDPipeIODevice;
read->open( rh, ReadOnly );
write = new KDPipeIODevice;
write->open( wh, WriteOnly );
}
#else
int fds[2];
if ( pipe( fds ) == 0 ) {
read = new KDPipeIODevice;
read->open( fds[0], ReadOnly );
write = new KDPipeIODevice;
write->open( fds[1], WriteOnly );
}
#endif
return std::make_pair( read, write );
}
#ifdef KDAB_DEFINE_CHECKS
KDAB_DEFINE_CHECKS( KDPipeIODevice ) {
if ( !isOpen() ) {
assert( openMode() == NotOpen );
assert( !d->reader );
assert( !d->writer );
#ifdef Q_OS_WIN32
assert( !d->handle );
#else
assert( d->fd < 0 );
#endif
} else {
assert( openMode() != NotOpen );
assert( openMode() & ReadWrite );
if ( openMode() & ReadOnly ) {
assert( d->reader );
synchronized( d->reader )
assert( d->reader->eof || d->reader->error || d->reader->isRunning() );
}
if ( openMode() & WriteOnly ) {
assert( d->writer );
synchronized( d->writer )
assert( d->writer->error || d->writer->isRunning() );
}
#ifdef Q_OS_WIN32
assert( d->handle );
#else
assert( d->fd >= 0 );
#endif
}
}
#endif // KDAB_DEFINE_CHECKS
#include "moc_kdpipeiodevice.cpp"
#include "kdpipeiodevice.moc"