2007-10-04 Marcus Brinkmann <marcus@g10code.de>
* kdpipeiodevice.h, kdpipeiodevice.cpp, kdpipeiodevice.moc, w32-qt-io.cpp: New versions from Frank Osterfeld.
This commit is contained in:
parent
228ca8fab2
commit
a70248939f
@ -1,3 +1,8 @@
|
|||||||
|
2007-10-04 Marcus Brinkmann <marcus@g10code.de>
|
||||||
|
|
||||||
|
* kdpipeiodevice.h, kdpipeiodevice.cpp, kdpipeiodevice.moc,
|
||||||
|
w32-qt-io.cpp: New versions from Frank Osterfeld.
|
||||||
|
|
||||||
2007-10-02 Marcus Brinkmann <marcus@g10code.de>
|
2007-10-02 Marcus Brinkmann <marcus@g10code.de>
|
||||||
|
|
||||||
* kdpipeiodevice.cpp, kdpipeiodevice.moc: New versions.
|
* kdpipeiodevice.cpp, kdpipeiodevice.moc: New versions.
|
||||||
|
@ -99,14 +99,14 @@ public:
|
|||||||
QWaitCondition bufferNotEmptyCondition;
|
QWaitCondition bufferNotEmptyCondition;
|
||||||
QWaitCondition hasStarted;
|
QWaitCondition hasStarted;
|
||||||
QWaitCondition readyReadSentCondition;
|
QWaitCondition readyReadSentCondition;
|
||||||
QWaitCondition notInReadDataCondition;
|
QWaitCondition blockedConsumerIsDoneCondition;
|
||||||
bool cancel;
|
bool cancel;
|
||||||
bool eof;
|
bool eof;
|
||||||
bool error;
|
bool error;
|
||||||
bool eofShortCut;
|
bool eofShortCut;
|
||||||
int errorCode;
|
int errorCode;
|
||||||
bool inReadData;
|
bool consumerBlocksOnUs;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
unsigned int rptr, wptr;
|
unsigned int rptr, wptr;
|
||||||
char buffer[BUFFER_SIZE+1]; // need to keep one byte free to detect empty state
|
char buffer[BUFFER_SIZE+1]; // need to keep one byte free to detect empty state
|
||||||
@ -127,7 +127,7 @@ Reader::Reader( int fd_, Qt::HANDLE handle_ )
|
|||||||
eofShortCut( false ),
|
eofShortCut( false ),
|
||||||
errorCode( 0 ),
|
errorCode( 0 ),
|
||||||
rptr( 0 ), wptr( 0 ),
|
rptr( 0 ), wptr( 0 ),
|
||||||
inReadData( false )
|
consumerBlocksOnUs( false )
|
||||||
{
|
{
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -226,13 +226,13 @@ KDPipeIODevice::Private::Private( KDPipeIODevice * qq )
|
|||||||
handle( 0 ),
|
handle( 0 ),
|
||||||
reader( 0 ),
|
reader( 0 ),
|
||||||
writer( 0 ),
|
writer( 0 ),
|
||||||
triedToStartReader( false ), triedToStartWriter( false )
|
triedToStartReader( false ), triedToStartWriter( false )
|
||||||
{
|
{
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
KDPipeIODevice::Private::~Private() {
|
KDPipeIODevice::Private::~Private() {
|
||||||
qDebug( "KDPipeIODevice::~Private(): Destroying %p", this );
|
qDebug( "KDPipeIODevice::~Private(): Destroying %p", q );
|
||||||
}
|
}
|
||||||
|
|
||||||
KDPipeIODevice::KDPipeIODevice( QObject * p )
|
KDPipeIODevice::KDPipeIODevice( QObject * p )
|
||||||
@ -290,11 +290,15 @@ bool KDPipeIODevice::Private::startReaderThread()
|
|||||||
return true;
|
return true;
|
||||||
triedToStartReader = true;
|
triedToStartReader = true;
|
||||||
if ( reader && !reader->isRunning() && !reader->isFinished() ) {
|
if ( reader && !reader->isRunning() && !reader->isFinished() ) {
|
||||||
|
qDebug("KDPipeIODevice::Private::startReaderThread(): locking reader (CONSUMER THREAD)" );
|
||||||
LOCKED( reader );
|
LOCKED( reader );
|
||||||
|
qDebug("KDPipeIODevice::Private::startReaderThread(): locked reader (CONSUMER THREAD)" );
|
||||||
reader->start( QThread::HighestPriority );
|
reader->start( QThread::HighestPriority );
|
||||||
if ( !reader->hasStarted.wait( &reader->mutex, 1000 ) )
|
qDebug("KDPipeIODevice::Private::startReaderThread(): waiting for hasStarted (CONSUMER THREAD)" );
|
||||||
return false;
|
const bool hasStarted = reader->hasStarted.wait( &reader->mutex, 1000 );
|
||||||
|
qDebug("KDPipeIODevice::Private::startReaderThread(): returned from hasStarted (CONSUMER THREAD)" );
|
||||||
|
|
||||||
|
return hasStarted;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -320,11 +324,16 @@ void KDPipeIODevice::Private::emitReadyRead()
|
|||||||
const int counter = s_counter++;
|
const int counter = s_counter++;
|
||||||
QPointer<Private> thisPointer( this );
|
QPointer<Private> thisPointer( this );
|
||||||
qDebug( "KDPipeIODevice::Private::emitReadyRead %p, %d", this, counter );
|
qDebug( "KDPipeIODevice::Private::emitReadyRead %p, %d", this, counter );
|
||||||
|
|
||||||
emit q->readyRead();
|
emit q->readyRead();
|
||||||
|
|
||||||
if ( !thisPointer )
|
if ( !thisPointer )
|
||||||
return;
|
return;
|
||||||
LOCKED( reader );
|
qDebug( "KDPipeIODevice::Private::emitReadyRead %p, %d: locking reader (CONSUMER THREAD)", this, counter );
|
||||||
reader->readyReadSentCondition.wakeAll();
|
synchronized( reader ) {
|
||||||
|
qDebug( "KDPipeIODevice::Private::emitReadyRead %p, %d: locked reader (CONSUMER THREAD)", this, counter );
|
||||||
|
reader->readyReadSentCondition.wakeAll();
|
||||||
|
}
|
||||||
qDebug( "KDPipeIODevice::Private::emitReadyRead %p leaving %d", this, counter );
|
qDebug( "KDPipeIODevice::Private::emitReadyRead %p leaving %d", this, counter );
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -342,8 +351,6 @@ bool KDPipeIODevice::Private::doOpen( int fd_, Qt::HANDLE handle_, OpenMode mode
|
|||||||
if ( !(mode_ & ReadWrite) )
|
if ( !(mode_ & ReadWrite) )
|
||||||
return false; // need to have at least read -or- write
|
return false; // need to have at least read -or- write
|
||||||
|
|
||||||
fd = fd_;
|
|
||||||
handle = handle_;
|
|
||||||
|
|
||||||
std::auto_ptr<Reader> reader_;
|
std::auto_ptr<Reader> reader_;
|
||||||
std::auto_ptr<Writer> writer_;
|
std::auto_ptr<Writer> writer_;
|
||||||
@ -387,7 +394,10 @@ qint64 KDPipeIODevice::bytesAvailable() const { KDAB_CHECK_THIS;
|
|||||||
return base;
|
return base;
|
||||||
}
|
}
|
||||||
if ( d->reader )
|
if ( d->reader )
|
||||||
synchronized( d->reader ) return base + d->reader->bytesInBuffer();
|
synchronized( d->reader ) {
|
||||||
|
const qint64 inBuffer = d->reader->bytesInBuffer();
|
||||||
|
return base + inBuffer;
|
||||||
|
}
|
||||||
return base;
|
return base;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -445,6 +455,7 @@ bool KDPipeIODevice::waitForBytesWritten( int msecs ) { KDAB_CHECK_THIS;
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool KDPipeIODevice::waitForReadyRead( int msecs ) { KDAB_CHECK_THIS;
|
bool KDPipeIODevice::waitForReadyRead( int msecs ) { KDAB_CHECK_THIS;
|
||||||
|
qDebug( "KDPipeIODEvice::waitForReadyRead()(%p)", this);
|
||||||
d->startReaderThread();
|
d->startReaderThread();
|
||||||
if ( ALLOW_QIODEVICE_BUFFERING ) {
|
if ( ALLOW_QIODEVICE_BUFFERING ) {
|
||||||
if ( bytesAvailable() > 0 )
|
if ( bytesAvailable() > 0 )
|
||||||
@ -454,7 +465,10 @@ bool KDPipeIODevice::waitForReadyRead( int msecs ) { KDAB_CHECK_THIS;
|
|||||||
if ( !r || r->eofShortCut )
|
if ( !r || r->eofShortCut )
|
||||||
return true;
|
return true;
|
||||||
LOCKED( r );
|
LOCKED( r );
|
||||||
return r->bytesInBuffer() != 0 || r->eof || r->error || r->bufferNotEmptyCondition.wait( &r->mutex, msecs ) ;
|
if ( r->bytesInBuffer() != 0 || r->eof || r->error )
|
||||||
|
return true;
|
||||||
|
assert( false );
|
||||||
|
return r->bufferNotEmptyCondition.wait( &r->mutex, msecs ) ;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename T>
|
template <typename T>
|
||||||
@ -467,6 +481,22 @@ private:
|
|||||||
const T oldValue;
|
const T oldValue;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
bool KDPipeIODevice::readWouldBlock() const
|
||||||
|
{
|
||||||
|
d->startReaderThread();
|
||||||
|
LOCKED( d->reader );
|
||||||
|
return d->reader->bufferEmpty() && !d->reader->eof && !d->reader->error;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool KDPipeIODevice::writeWouldBlock() const
|
||||||
|
{
|
||||||
|
d->startWriterThread();
|
||||||
|
LOCKED( d->writer );
|
||||||
|
return d->writer->bufferFull() && !d->writer->error;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS;
|
qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS;
|
||||||
qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld", this, data, maxSize );
|
qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld", this, data, maxSize );
|
||||||
d->startReaderThread();
|
d->startReaderThread();
|
||||||
@ -491,15 +521,17 @@ qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS
|
|||||||
if ( bytesAvailable() > 0 )
|
if ( bytesAvailable() > 0 )
|
||||||
maxSize = std::min( maxSize, bytesAvailable() ); // don't block
|
maxSize = std::min( maxSize, bytesAvailable() ); // don't block
|
||||||
}
|
}
|
||||||
|
qDebug( "%p: KDPipeIODevice::readData: try to lock reader (CONSUMER THREAD)" );
|
||||||
LOCKED( r );
|
LOCKED( r );
|
||||||
const TemporaryValue<bool> tmp( d->reader->inReadData, true );
|
qDebug( "%p: KDPipeIODevice::readData: locked reader (CONSUMER THREAD)" );
|
||||||
assert( d->reader->inReadData );
|
|
||||||
while ( /* maxSize > 0 && */ r->bufferEmpty() && !r->error && !r->eof ) { // ### block on maxSize == 0?
|
r->readyReadSentCondition.wakeAll();
|
||||||
qDebug( "%p: KDPipeIODevice::readData: waiting for bufferNotEmptyCondition", this );
|
if ( /* maxSize > 0 && */ r->bufferEmpty() && !r->error && !r->eof ) { // ### block on maxSize == 0?
|
||||||
r->readyReadSentCondition.wakeAll();
|
qDebug( "%p: KDPipeIODevice::readData: waiting for bufferNotEmptyCondition (CONSUMER THREAD)", this );
|
||||||
r->notInReadDataCondition.wakeAll();
|
const TemporaryValue<bool> tmp( d->reader->consumerBlocksOnUs, true );
|
||||||
r->bufferNotEmptyCondition.wait( &r->mutex );
|
r->bufferNotEmptyCondition.wait( &r->mutex );
|
||||||
|
r->blockedConsumerIsDoneCondition.wakeAll();
|
||||||
|
qDebug( "%p: KDPipeIODevice::readData: woke up from bufferNotEmptyCondition (CONSUMER THREAD)", this );
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( r->bufferEmpty() ) {
|
if ( r->bufferEmpty() ) {
|
||||||
@ -507,7 +539,6 @@ qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS
|
|||||||
// woken with an empty buffer must mean either EOF or error:
|
// woken with an empty buffer must mean either EOF or error:
|
||||||
assert( r->eof || r->error );
|
assert( r->eof || r->error );
|
||||||
r->eofShortCut = true;
|
r->eofShortCut = true;
|
||||||
r->notInReadDataCondition.wakeAll();
|
|
||||||
return r->eof ? 0 : -1 ;
|
return r->eof ? 0 : -1 ;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -515,7 +546,7 @@ qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS
|
|||||||
const qint64 bytesRead = r->readData( data, maxSize );
|
const qint64 bytesRead = r->readData( data, maxSize );
|
||||||
qDebug( "%p: KDPipeIODevice::readData: read %lld bytes", this, bytesRead );
|
qDebug( "%p: KDPipeIODevice::readData: read %lld bytes", this, bytesRead );
|
||||||
qDebug( "%p (fd=%d): KDPipeIODevice::readData: %s", this, d->fd, data );
|
qDebug( "%p (fd=%d): KDPipeIODevice::readData: %s", this, d->fd, data );
|
||||||
r->notInReadDataCondition.wakeAll();
|
|
||||||
return bytesRead;
|
return bytesRead;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -590,13 +621,14 @@ void KDPipeIODevice::Private::stopThreads()
|
|||||||
assert( q->bytesToWrite() == 0 );
|
assert( q->bytesToWrite() == 0 );
|
||||||
}
|
}
|
||||||
if ( Reader * & r = reader ) {
|
if ( Reader * & r = reader ) {
|
||||||
|
disconnect( r, SIGNAL( readyRead() ), this, SLOT( emitReadyRead() ) );
|
||||||
synchronized( r ) {
|
synchronized( r ) {
|
||||||
// tell thread to cancel:
|
// tell thread to cancel:
|
||||||
r->cancel = true;
|
r->cancel = true;
|
||||||
// and wake it, so it can terminate:
|
// and wake it, so it can terminate:
|
||||||
r->bufferNotFullCondition.wakeAll();
|
r->bufferNotFullCondition.wakeAll();
|
||||||
r->readyReadSentCondition.wakeAll();
|
r->readyReadSentCondition.wakeAll();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if ( Writer * & w = writer ) {
|
if ( Writer * & w = writer ) {
|
||||||
synchronized( w ) {
|
synchronized( w ) {
|
||||||
@ -617,10 +649,14 @@ void KDPipeIODevice::close() { KDAB_CHECK_THIS;
|
|||||||
emit aboutToClose();
|
emit aboutToClose();
|
||||||
d->stopThreads();
|
d->stopThreads();
|
||||||
|
|
||||||
#define waitAndDelete( t ) if ( t ) { t->wait(); QThread* t2 = t; t = 0; delete t2; }
|
#define waitAndDelete( t ) if ( t ) { t->wait(); QThread* const t2 = t; t = 0; delete t2; }
|
||||||
qDebug( "KPipeIODevice::close(%p): wait and closing writer %p", this, d->writer );
|
qDebug( "KPipeIODevice::close(%p): wait and closing writer %p", this, d->writer );
|
||||||
waitAndDelete( d->writer );
|
waitAndDelete( d->writer );
|
||||||
qDebug( "KPipeIODevice::close(%p): wait and closing reader %p", this, d->reader );
|
qDebug( "KPipeIODevice::close(%p): wait and closing reader %p", this, d->reader );
|
||||||
|
{
|
||||||
|
LOCKED( d->reader );
|
||||||
|
d->reader->readyReadSentCondition.wakeAll();
|
||||||
|
}
|
||||||
waitAndDelete( d->reader );
|
waitAndDelete( d->reader );
|
||||||
#undef waitAndDelete
|
#undef waitAndDelete
|
||||||
#ifdef Q_OS_WIN32
|
#ifdef Q_OS_WIN32
|
||||||
@ -645,115 +681,102 @@ void Reader::run() {
|
|||||||
qDebug( "%p: Reader::run: started", this );
|
qDebug( "%p: Reader::run: started", this );
|
||||||
|
|
||||||
while ( true ) {
|
while ( true ) {
|
||||||
|
if ( !cancel && ( eof || error ) ) {
|
||||||
if ( !bufferFull() && !bufferEmpty() ) {
|
qDebug( "%p: Reader::run: received eof(%d) or error(%d), waking everyone", this, eof, error );
|
||||||
|
notifyReadyRead();
|
||||||
|
} else if ( !cancel && !bufferFull() && !bufferEmpty() ) {
|
||||||
qDebug( "%p: Reader::run: buffer no longer empty, waking everyone", this );
|
qDebug( "%p: Reader::run: buffer no longer empty, waking everyone", this );
|
||||||
notifyReadyRead();
|
notifyReadyRead();
|
||||||
}
|
}
|
||||||
|
|
||||||
while ( !cancel && bufferFull() ) {
|
while ( !error && !cancel && bufferFull() ) {
|
||||||
bufferNotEmptyCondition.wakeAll();
|
|
||||||
notifyReadyRead();
|
notifyReadyRead();
|
||||||
if ( !bufferFull() )
|
if ( bufferFull() ) {
|
||||||
break;
|
qDebug( "%p: Reader::run: buffer is full, going to sleep", this );
|
||||||
qDebug( "%p: Reader::run: buffer is full, going to sleep", this );
|
bufferNotFullCondition.wait( &mutex );
|
||||||
bufferNotFullCondition.wait( &mutex );
|
}
|
||||||
qDebug( "%p: Reader::run: woke up", this );
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if ( cancel ) {
|
if ( cancel ) {
|
||||||
qDebug( "%p: Reader::run: detected cancel", this );
|
qDebug( "%p: Reader::run: detected cancel", this );
|
||||||
goto leave;
|
goto leave;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ( !eof && !error ) {
|
||||||
|
if ( rptr == wptr ) // optimize for larger chunks in case the buffer is empty
|
||||||
|
rptr = wptr = 0;
|
||||||
|
|
||||||
if ( rptr == wptr ) // optimize for larger chunks in case the buffer is empty
|
unsigned int numBytes = ( rptr + sizeof buffer - wptr - 1 ) % sizeof buffer;
|
||||||
rptr = wptr = 0;
|
if ( numBytes > sizeof buffer - wptr )
|
||||||
|
numBytes = sizeof buffer - wptr;
|
||||||
|
|
||||||
unsigned int numBytes = ( rptr + sizeof buffer - wptr - 1 ) % sizeof buffer;
|
qDebug( "%p: Reader::run: rptr=%d, wptr=%d -> numBytes=%d", this, rptr, wptr, numBytes );
|
||||||
if ( numBytes > sizeof buffer - wptr )
|
|
||||||
numBytes = sizeof buffer - wptr;
|
|
||||||
|
|
||||||
qDebug( "%p: Reader::run: rptr=%d, wptr=%d -> numBytes=%d", this, rptr, wptr, numBytes );
|
assert( numBytes > 0 );
|
||||||
|
|
||||||
assert( numBytes > 0 );
|
qDebug( "%p: Reader::run: trying to read %d bytes", this, numBytes );
|
||||||
|
|
||||||
qDebug( "%p: Reader::run: trying to read %d bytes", this, numBytes );
|
|
||||||
#ifdef Q_OS_WIN32
|
#ifdef Q_OS_WIN32
|
||||||
DWORD numRead;
|
mutex.unlock();
|
||||||
mutex.unlock();
|
DWORD numRead;
|
||||||
const bool ok = ReadFile( handle, buffer + wptr, numBytes, &numRead, 0 );
|
const bool ok = ReadFile( handle, buffer + wptr, numBytes, &numRead, 0 );
|
||||||
mutex.lock();
|
mutex.lock();
|
||||||
if ( !ok ) {
|
if ( !ok ) {
|
||||||
errorCode = static_cast<int>( GetLastError() );
|
errorCode = static_cast<int>( GetLastError() );
|
||||||
if ( errorCode == ERROR_BROKEN_PIPE ) {
|
if ( errorCode == ERROR_BROKEN_PIPE ) {
|
||||||
assert( numRead == 0 );
|
assert( numRead == 0 );
|
||||||
qDebug( "%p: Reader::run: got eof (broken pipe)", this );
|
qDebug( "%p: Reader::run: got eof (broken pipe)", this );
|
||||||
eof = true;
|
eof = true;
|
||||||
} else {
|
} else {
|
||||||
assert( numRead == 0 );
|
assert( numRead == 0 );
|
||||||
qDebug( "%p: Reader::run: got error: %s (%d)", this, strerror( errorCode ), errorCode );
|
qDebug( "%p: Reader::run: got error: %s (%d)", this, strerror( errorCode ), errorCode );
|
||||||
error = true;
|
error = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
goto leave;
|
|
||||||
}
|
|
||||||
#else
|
#else
|
||||||
qint64 numRead;
|
qint64 numRead;
|
||||||
mutex.unlock();
|
mutex.unlock();
|
||||||
do {
|
do {
|
||||||
numRead = ::read( fd, buffer + wptr, numBytes );
|
numRead = ::read( fd, buffer + wptr, numBytes );
|
||||||
} while ( numRead == -1 && errno == EINTR );
|
} while ( numRead == -1 && errno == EINTR );
|
||||||
mutex.lock();
|
mutex.lock();
|
||||||
|
|
||||||
if ( numRead < 0 ) {
|
if ( numRead < 0 ) {
|
||||||
errorCode = errno;
|
errorCode = errno;
|
||||||
error = true;
|
error = true;
|
||||||
qDebug( "%p: Reader::run: got error: %d", this, errorCode );
|
qDebug( "%p: Reader::run: got error: %d", this, errorCode );
|
||||||
goto leave;
|
} else if ( numRead == 0 ) {
|
||||||
}
|
qDebug( "%p: Reader::run: eof detected", this );
|
||||||
|
eof = true;
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
qDebug( "%p: Reader::run: read %ld bytes", this, static_cast<long>(numRead) );
|
qDebug( "%p: Reader::run: read %ld bytes", this, static_cast<long>(numRead) );
|
||||||
qDebug( "%p (fd=%d): KDPipeIODevice::readData: %s", this, fd, buffer );
|
qDebug( "%p (fd=%d): KDPipeIODevice::readData: %s", this, fd, buffer );
|
||||||
if ( numRead == 0 ) {
|
|
||||||
qDebug( "%p: Reader::run: eof detected", this );
|
|
||||||
eof = true;
|
|
||||||
goto leave;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( cancel ) {
|
if ( numRead > 0 ) {
|
||||||
qDebug( "%p: Reader::run: detected cancel", this );
|
qDebug( "%p: Reader::run: buffer before: rptr=%4d, wptr=%4d", this, rptr, wptr );
|
||||||
goto leave;
|
wptr = ( wptr + numRead ) % sizeof buffer;
|
||||||
}
|
qDebug( "%p: Reader::run: buffer after: rptr=%4d, wptr=%4d", this, rptr, wptr );
|
||||||
qDebug( "%p: Reader::run: buffer before: rptr=%4d, wptr=%4d", this, rptr, wptr );
|
}
|
||||||
wptr = ( wptr + numRead ) % sizeof buffer;
|
}
|
||||||
qDebug( "%p: Reader::run: buffer after: rptr=%4d, wptr=%4d", this, rptr, wptr );
|
|
||||||
}
|
}
|
||||||
leave:
|
leave:
|
||||||
qDebug( "%p: Reader::run: terminating: loop while not canceled and not empty", this );
|
|
||||||
while ( !cancel && !bufferEmpty() ) {
|
|
||||||
notifyReadyRead();
|
|
||||||
}
|
|
||||||
notifyReadyRead();
|
|
||||||
qDebug( "%p: Reader::run: terminated", this );
|
qDebug( "%p: Reader::run: terminated", this );
|
||||||
}
|
}
|
||||||
|
|
||||||
void Reader::notifyReadyRead()
|
void Reader::notifyReadyRead()
|
||||||
{
|
{
|
||||||
qDebug( "notifyReadyRead" );
|
qDebug( "notifyReadyRead: %d bytes available", bytesInBuffer() );
|
||||||
if ( cancel )
|
assert( !cancel );
|
||||||
return;
|
|
||||||
bufferNotEmptyCondition.wakeAll();
|
if ( consumerBlocksOnUs ) {
|
||||||
if ( inReadData ) {
|
bufferNotEmptyCondition.wakeAll();
|
||||||
qDebug( "notifyReadyRead: inReadData: waiting" );
|
blockedConsumerIsDoneCondition.wait( &mutex );
|
||||||
notInReadDataCondition.wait( &mutex );
|
return;
|
||||||
}
|
}
|
||||||
if ( cancel || ( !eof && !error && bufferEmpty() ) )
|
qDebug( "notifyReadyRead: emit signal" );
|
||||||
return;
|
|
||||||
qDebug( "readyReadData: actually emit signal" );
|
|
||||||
emit readyRead();
|
emit readyRead();
|
||||||
bufferNotEmptyCondition.wakeAll();
|
|
||||||
readyReadSentCondition.wait( &mutex );
|
readyReadSentCondition.wait( &mutex );
|
||||||
bufferNotEmptyCondition.wakeAll();
|
qDebug( "notifyReadyRead: returning from waiting, leave" );
|
||||||
}
|
}
|
||||||
|
|
||||||
void Writer::run() {
|
void Writer::run() {
|
||||||
@ -796,7 +819,7 @@ void Writer::run() {
|
|||||||
errorCode = static_cast<int>( GetLastError() );
|
errorCode = static_cast<int>( GetLastError() );
|
||||||
qDebug( "%p: Writer::run: got error code: %d", this, errorCode );
|
qDebug( "%p: Writer::run: got error code: %d", this, errorCode );
|
||||||
error = true;
|
error = true;
|
||||||
goto leave;
|
goto leave;
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
qint64 numWritten;
|
qint64 numWritten;
|
||||||
@ -805,17 +828,17 @@ void Writer::run() {
|
|||||||
} while ( numWritten == -1 && errno == EINTR );
|
} while ( numWritten == -1 && errno == EINTR );
|
||||||
|
|
||||||
if ( numWritten < 0 ) {
|
if ( numWritten < 0 ) {
|
||||||
mutex.lock();
|
mutex.lock();
|
||||||
errorCode = errno;
|
errorCode = errno;
|
||||||
qDebug( "%p: Writer::run: got error code: %d", this, errorCode );
|
qDebug( "%p: Writer::run: got error code: %d", this, errorCode );
|
||||||
error = true;
|
error = true;
|
||||||
goto leave;
|
goto leave;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
qDebug( "%p (fd=%d): Writer::run: buffer after WriteFile (numBytes=%lld): %s:", this, fd, numBytesInBuffer,
|
qDebug( "%p (fd=%d): Writer::run: buffer after WriteFile (numBytes=%lld): %s:", this, fd, numBytesInBuffer,
|
||||||
buffer );
|
buffer );
|
||||||
totalWritten += numWritten;
|
totalWritten += numWritten;
|
||||||
mutex.lock();
|
mutex.lock();
|
||||||
} while ( totalWritten < numBytesInBuffer );
|
} while ( totalWritten < numBytesInBuffer );
|
||||||
|
|
||||||
qDebug( "%p: Writer::run: wrote %lld bytes", this, totalWritten );
|
qDebug( "%p: Writer::run: wrote %lld bytes", this, totalWritten );
|
||||||
|
@ -45,6 +45,9 @@ public:
|
|||||||
Qt::HANDLE handle() const;
|
Qt::HANDLE handle() const;
|
||||||
int descriptor() const;
|
int descriptor() const;
|
||||||
|
|
||||||
|
bool readWouldBlock() const;
|
||||||
|
bool writeWouldBlock() const;
|
||||||
|
|
||||||
/* reimp */ qint64 bytesAvailable() const;
|
/* reimp */ qint64 bytesAvailable() const;
|
||||||
/* reimp */ qint64 bytesToWrite() const;
|
/* reimp */ qint64 bytesToWrite() const;
|
||||||
/* reimp */ bool canReadLine() const;
|
/* reimp */ bool canReadLine() const;
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
/****************************************************************************
|
/****************************************************************************
|
||||||
** Meta object code from reading C++ file 'kdpipeiodevice.cpp'
|
** Meta object code from reading C++ file 'kdpipeiodevice.cpp'
|
||||||
**
|
**
|
||||||
** Created: Mon Oct 1 16:08:44 2007
|
** Created: Tue Oct 2 19:30:13 2007
|
||||||
** by: The Qt Meta Object Compiler version 59 (Qt 4.3.1)
|
** by: The Qt Meta Object Compiler version 59 (Qt 4.3.1)
|
||||||
**
|
**
|
||||||
** WARNING! All changes made in this file will be lost!
|
** WARNING! All changes made in this file will be lost!
|
||||||
|
@ -82,30 +82,29 @@ using _gpgme_::KDPipeIODevice;
|
|||||||
really nice callback interfaces to let the user control all this at
|
really nice callback interfaces to let the user control all this at
|
||||||
a per-context level. */
|
a per-context level. */
|
||||||
|
|
||||||
#define MAX_SLAFD 50000
|
#define MAX_SLAFD 1024
|
||||||
|
|
||||||
struct DeviceEntry {
|
struct DeviceEntry {
|
||||||
DeviceEntry() : iodev( 0 ), actual_fd( -1 ), refCount( 1 ) {}
|
DeviceEntry() : iodev( 0 ), refCount( 1 ) {}
|
||||||
QIODevice* iodev;
|
KDPipeIODevice* iodev;
|
||||||
int actual_fd;
|
|
||||||
mutable int refCount;
|
mutable int refCount;
|
||||||
void ref() const { ++refCount; }
|
void ref() const { ++refCount; }
|
||||||
int unref() const { return --refCount; }
|
int unref() const { assert( refCount > 0 ); return --refCount; }
|
||||||
};
|
};
|
||||||
|
|
||||||
DeviceEntry* iodevice_table[MAX_SLAFD];
|
DeviceEntry* iodevice_table[MAX_SLAFD];
|
||||||
|
|
||||||
|
|
||||||
static QIODevice *
|
static KDPipeIODevice *
|
||||||
find_channel (int fd, int create)
|
find_channel (int fd, int create)
|
||||||
{
|
{
|
||||||
|
assert( fd < MAX_SLAFD );
|
||||||
if (fd < 0 || fd >= MAX_SLAFD)
|
if (fd < 0 || fd >= MAX_SLAFD)
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
||||||
if (create && !iodevice_table[fd])
|
if (create && !iodevice_table[fd])
|
||||||
{
|
{
|
||||||
DeviceEntry* entry = new DeviceEntry;
|
DeviceEntry* entry = new DeviceEntry;
|
||||||
entry->actual_fd = fd;
|
|
||||||
entry->iodev = new KDPipeIODevice
|
entry->iodev = new KDPipeIODevice
|
||||||
(fd, QIODevice::ReadWrite|QIODevice::Unbuffered);
|
(fd, QIODevice::ReadWrite|QIODevice::Unbuffered);
|
||||||
iodevice_table[fd] = entry;
|
iodevice_table[fd] = entry;
|
||||||
@ -117,11 +116,9 @@ find_channel (int fd, int create)
|
|||||||
BUFLEN. The printable version is the representation on the command
|
BUFLEN. The printable version is the representation on the command
|
||||||
line that the child process expects. */
|
line that the child process expects. */
|
||||||
int
|
int
|
||||||
_gpgme_io_fd2str (char *buf, int buflen, int fd_)
|
_gpgme_io_fd2str (char *buf, int buflen, int fd)
|
||||||
{
|
{
|
||||||
const int actual_fd = iodevice_table[fd_] ? iodevice_table[fd_]->actual_fd : fd_;
|
return snprintf (buf, buflen, "%d", (long)_get_osfhandle( fd ) );
|
||||||
return snprintf (buf, buflen, "%ld", (long) _get_osfhandle (actual_fd));
|
|
||||||
// return snprintf (buf, buflen, "%d", fd);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -143,7 +140,7 @@ _gpgme_io_read (int fd, void *buffer, size_t count)
|
|||||||
{
|
{
|
||||||
int saved_errno = 0;
|
int saved_errno = 0;
|
||||||
qint64 nread;
|
qint64 nread;
|
||||||
QIODevice *chan;
|
KDPipeIODevice *chan;
|
||||||
TRACE_BEG2 (DEBUG_SYSIO, "_gpgme_io_read", fd,
|
TRACE_BEG2 (DEBUG_SYSIO, "_gpgme_io_read", fd,
|
||||||
"buffer=%p, count=%u", buffer, count);
|
"buffer=%p, count=%u", buffer, count);
|
||||||
|
|
||||||
@ -175,7 +172,7 @@ int
|
|||||||
_gpgme_io_write (int fd, const void *buffer, size_t count)
|
_gpgme_io_write (int fd, const void *buffer, size_t count)
|
||||||
{
|
{
|
||||||
qint64 nwritten;
|
qint64 nwritten;
|
||||||
QIODevice *chan;
|
KDPipeIODevice *chan;
|
||||||
TRACE_BEG2 (DEBUG_SYSIO, "_gpgme_io_write", fd,
|
TRACE_BEG2 (DEBUG_SYSIO, "_gpgme_io_write", fd,
|
||||||
"buffer=%p, count=%u", buffer, count);
|
"buffer=%p, count=%u", buffer, count);
|
||||||
TRACE_LOGBUF ((char *) buffer, count);
|
TRACE_LOGBUF ((char *) buffer, count);
|
||||||
@ -204,7 +201,7 @@ _gpgme_io_write (int fd, const void *buffer, size_t count)
|
|||||||
int
|
int
|
||||||
_gpgme_io_pipe (int filedes[2], int inherit_idx)
|
_gpgme_io_pipe (int filedes[2], int inherit_idx)
|
||||||
{
|
{
|
||||||
QIODevice *chan;
|
KDPipeIODevice *chan;
|
||||||
TRACE_BEG2 (DEBUG_SYSIO, "_gpgme_io_pipe", filedes,
|
TRACE_BEG2 (DEBUG_SYSIO, "_gpgme_io_pipe", filedes,
|
||||||
"inherit_idx=%i (GPGME uses it for %s)",
|
"inherit_idx=%i (GPGME uses it for %s)",
|
||||||
inherit_idx, inherit_idx ? "reading" : "writing");
|
inherit_idx, inherit_idx ? "reading" : "writing");
|
||||||
@ -266,7 +263,7 @@ _gpgme_io_pipe (int filedes[2], int inherit_idx)
|
|||||||
int
|
int
|
||||||
_gpgme_io_close (int fd)
|
_gpgme_io_close (int fd)
|
||||||
{
|
{
|
||||||
QIODevice *chan;
|
KDPipeIODevice *chan;
|
||||||
TRACE_BEG (DEBUG_SYSIO, "_gpgme_io_close", fd);
|
TRACE_BEG (DEBUG_SYSIO, "_gpgme_io_close", fd);
|
||||||
|
|
||||||
if (fd < 0 || fd >= MAX_SLAFD)
|
if (fd < 0 || fd >= MAX_SLAFD)
|
||||||
@ -286,27 +283,19 @@ _gpgme_io_close (int fd)
|
|||||||
/* Then do the close. */
|
/* Then do the close. */
|
||||||
|
|
||||||
DeviceEntry* const entry = iodevice_table[fd];
|
DeviceEntry* const entry = iodevice_table[fd];
|
||||||
if ( entry )
|
if ( entry ) {
|
||||||
{
|
if ( entry->unref() == 0 ) {
|
||||||
assert( entry->refCount > 0 );
|
entry->iodev->close();
|
||||||
const int actual_fd = entry->actual_fd;
|
delete entry->iodev;
|
||||||
assert( actual_fd > 0 );
|
delete entry;
|
||||||
if ( !entry->unref() ) {
|
iodevice_table[fd] = 0;
|
||||||
entry->iodev->close();
|
|
||||||
delete entry->iodev;
|
|
||||||
delete entry;
|
|
||||||
for ( int i = 0; i < MAX_SLAFD; ++i ) {
|
|
||||||
if ( iodevice_table[i] == entry )
|
|
||||||
iodevice_table[i] = 0;
|
|
||||||
}
|
}
|
||||||
}
|
} else {
|
||||||
|
_close( fd );
|
||||||
if ( fd != actual_fd )
|
|
||||||
_close( fd );
|
|
||||||
}
|
}
|
||||||
else
|
|
||||||
_close (fd);
|
|
||||||
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -337,7 +326,6 @@ _gpgme_io_set_nonblocking (int fd)
|
|||||||
/* Qt always uses non-blocking IO, except for files, maybe, but who
|
/* Qt always uses non-blocking IO, except for files, maybe, but who
|
||||||
uses that? */
|
uses that? */
|
||||||
TRACE_BEG (DEBUG_SYSIO, "_gpgme_io_set_nonblocking", fd);
|
TRACE_BEG (DEBUG_SYSIO, "_gpgme_io_set_nonblocking", fd);
|
||||||
|
|
||||||
return TRACE_SYSRES (0);
|
return TRACE_SYSRES (0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -538,6 +526,7 @@ _gpgme_io_spawn (const char *path, char **argv,
|
|||||||
/* Close the other ends of the pipes. */
|
/* Close the other ends of the pipes. */
|
||||||
for (i = 0; fd_parent_list[i].fd != -1; i++)
|
for (i = 0; fd_parent_list[i].fd != -1; i++)
|
||||||
_gpgme_io_close (fd_parent_list[i].fd);
|
_gpgme_io_close (fd_parent_list[i].fd);
|
||||||
|
|
||||||
|
|
||||||
TRACE_LOG4 ("CreateProcess ready: hProcess=%p, hThread=%p, "
|
TRACE_LOG4 ("CreateProcess ready: hProcess=%p, hThread=%p, "
|
||||||
"dwProcessID=%d, dwThreadId=%d",
|
"dwProcessID=%d, dwThreadId=%d",
|
||||||
@ -565,8 +554,6 @@ _gpgme_io_spawn (const char *path, char **argv,
|
|||||||
int
|
int
|
||||||
_gpgme_io_select (struct io_select_fd_s *fds, size_t nfds, int nonblock)
|
_gpgme_io_select (struct io_select_fd_s *fds, size_t nfds, int nonblock)
|
||||||
{
|
{
|
||||||
int i;
|
|
||||||
int count;
|
|
||||||
/* Use a 1s timeout. */
|
/* Use a 1s timeout. */
|
||||||
|
|
||||||
void *dbg_help = NULL;
|
void *dbg_help = NULL;
|
||||||
@ -576,30 +563,32 @@ _gpgme_io_select (struct io_select_fd_s *fds, size_t nfds, int nonblock)
|
|||||||
/* We only implement the special case of nonblock == true. */
|
/* We only implement the special case of nonblock == true. */
|
||||||
assert (nonblock);
|
assert (nonblock);
|
||||||
|
|
||||||
count = 0;
|
int count = 0;
|
||||||
|
|
||||||
TRACE_SEQ (dbg_help, "select on [ ");
|
TRACE_SEQ (dbg_help, "select on [ ");
|
||||||
for (i = 0; i < nfds; i++)
|
for (int i = 0; i < nfds; i++)
|
||||||
{
|
{
|
||||||
if (fds[i].fd == -1)
|
if (fds[i].fd == -1)
|
||||||
{
|
{
|
||||||
fds[i].signaled = 0;
|
fds[i].signaled = 0;
|
||||||
}
|
}
|
||||||
else if (fds[i].for_read)
|
else if (fds[i].for_read )
|
||||||
{
|
{
|
||||||
const QIODevice * const chan = find_channel (fds[i].fd, 0);
|
const KDPipeIODevice * const chan = find_channel (fds[i].fd, 0);
|
||||||
assert (chan);
|
assert (chan);
|
||||||
fds[i].signaled = chan->bytesAvailable() > 0 ? 1 : 0 ;
|
fds[i].signaled = chan->readWouldBlock() ? 0 : 1;
|
||||||
TRACE_ADD1 (dbg_help, "w0x%x ", fds[i].fd);
|
TRACE_ADD1 (dbg_help, "w0x%x ", fds[i].fd);
|
||||||
count++;
|
if ( fds[i].signaled )
|
||||||
|
count++;
|
||||||
}
|
}
|
||||||
else if (fds[i].for_write)
|
else if (fds[i].for_write)
|
||||||
{
|
{
|
||||||
const QIODevice * const chan = find_channel (fds[i].fd, 0);
|
const KDPipeIODevice * const chan = find_channel (fds[i].fd, 0);
|
||||||
assert (chan);
|
assert (chan);
|
||||||
fds[i].signaled = chan->bytesToWrite() > 0 ? 0 : 1 ;
|
fds[i].signaled = chan->writeWouldBlock() ? 0 : 1;
|
||||||
TRACE_ADD1 (dbg_help, "w0x%x ", fds[i].fd);
|
TRACE_ADD1 (dbg_help, "w0x%x ", fds[i].fd);
|
||||||
count++;
|
if ( fds[i].signaled )
|
||||||
|
count++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
TRACE_END (dbg_help, "]");
|
TRACE_END (dbg_help, "]");
|
||||||
@ -629,21 +618,8 @@ gpgme_get_giochannel (int fd)
|
|||||||
int
|
int
|
||||||
_gpgme_io_dup (int fd)
|
_gpgme_io_dup (int fd)
|
||||||
{
|
{
|
||||||
DeviceEntry* const existing = iodevice_table[fd];
|
assert( iodevice_table[fd] );
|
||||||
if ( existing )
|
iodevice_table[fd]->ref();
|
||||||
existing->ref();
|
|
||||||
else
|
|
||||||
find_channel( fd, /*create=*/1 );
|
|
||||||
return fd;
|
return fd;
|
||||||
|
|
||||||
#if 0
|
|
||||||
const int new_fd = _dup( fd );
|
|
||||||
iodevice_table[new_fd] = iodevice_table[fd];
|
|
||||||
if ( iodevice_table[new_fd] )
|
|
||||||
iodevice_table[new_fd]->ref();
|
|
||||||
else
|
|
||||||
find_channel( new_fd, /*create=*/1 );
|
|
||||||
return new_fd;
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user