QpThread Library
for C++

Designed and Implemented by Pavel Krauz Copyright (C) 1999

 

 

Table of Contents

History
Overview
Hello World
Compilation and Linking
QpThread Initialization
QpThread Basics
Timeout Basics
Exceptions

QpTimeoutException
QpSysException
QpUnsupportedException
QpOutOfMemoryException
QpErrorException
QpClonedException
QpUserException
QpCancelException

Synchronization

Locks
QpAsyncSafeSem
QpSpinLock
QpMutex
QpMutexEx
QpSem
Condition Variable
QpCond
Critical Section - QpSLock
Monitor

Read/Write Locks
QpRWLock
QpRWLockEx
Read/Write critical section protection
QpReadSLock
QpWriteSLock
QpReadForWriteSLock

QpRunnable
QpThread
QpThreadPool
QpOnce
QpTSD
QpTSDT
Signal Support
Producent/Consumer support – QpQueue/QpStack

QpQueue
QpStack

Timer
Supported Platforms
Download
Patches, Bug Reports, Suggestions

 

History

There are available (commercial or free) several thread libraries for C++ that provide object oriented framework on top of system level threads. So why did I make another one? Well, I had to use proprietary thread library in our project but I needed comparison/evaluation of stability so I wrote straightforward re-implementation of that library interface as inline functions. It was really lot of fun to do it as functions in single header file (I was little bit crazy :-). My primary goal was simplicity and reliability. Some time later I started to think how could I use this library further. I decided to rewrite it completely, including the interface and I implemented some unique features that distinguish this library from the others. It was more work than I wanted to put into the library but now you can evaluate, use and look into it as it is under GNU LGPL.

 

Overview

What are the main features of this library? First of all I wanted to do it as simple as possible but still implement advanced features. You should be able to read the source code and verify it yourself. It will give you better understanding of the threads and in general you will be able to write better threaded code. The library supports basic synchronization that has to be presented in every thread library. The synchronization primitives include spinlock, mutex, condition variable, monitor and semaphore. The difference is that some synchronization primitives support timeouts so now you can have mutex which could timeout while it is waiting for the lock. Object oriented abstraction of a thread is necessary in such library. Moreover QpThread library supports configurable pool of threads. The threads in the pool can be allocated on demand in order to do the assigned work and after some time the threads could be removed from the pool. Support for inter-thread message queues (producent(s)/consumer(s) like) are also presented. Have you ever tried to do something with signal handlers? It is not an easy task and I think that it is more difficult than thread programming. It is interesting that I haven't ever seen some abstraction of it. Have you ever tried to use signals in threaded program? Now you can. This library has support for signal handlers and you should be able to use signals in your multithreaded program. The good news is that you will be allowed to use library functions in asynchronous signal handlers. So you will be allowed to use malloc/free - new/delete, printf/cout, etc. in signal handler. Wow! Furthermore the library supports registering functions that will run at scheduled time intervals. Unique is deferred cancellation through C++ exception so the stack is properly unwound. Also some sort of exception catching and re-throwing for threads is supported. You can learn more about these cool features, so keep reading.

 

Hello World

Lets start with hello world program, the program every library starts. The first version defines new class HelloWorld that inherits from QpThread. That way the HelloWorld class is complete thread.

#include <iostream.h>
#include <qpthr/qp.h>

using namespace qpthr;

class HelloWorld: public QpThread {
    public:
	 ~HelloWorld() {Join();}
	virtual void Main() {
		cout << "Hello World" << endl;
	}
};

int main()
{
	QpInit qp_init;
	HelloWorld ht;

	ht.Start(); 
	return 0; 
}

 

The second version defines class HelloWorld which is inherited from QpRunnable interface. When the class implements QpRunnable interface it can be run in thread.

#include <iostream.h>
#include <qpthr/qp.h>

using namespace qpthr;

class HelloWorld: public QpRunnable { 
    public: 
	virtual void Main() { 
		cout << "Hello World" << endl; 
	}
};
int main() 
{ 
	QpInit qp_init; 
	HelloWorld hw;
	QpThread t(&hw);

	t.Start();
	t.Join();
	return 0; 
}

 

The third version uses thread pool. The HelloWorld is now the function which is an argument to the template QpWork_p0. These templates are in general wrappers that inherit from QpRunnable (so can be executed by threads or thread pools)

#include <iostream.h>
#include <qpthr/qp.h>
#include <qpthr/qp_work.h>

using namespace qpthr;

void HelloWorld() { 
	cout << "Hello World" << endl; 
}

int main()
{ 
	QpInit qp_init;
	QpThreadPool tpool; 
	QpWork_p0 *w = new QpWork_p0(&HelloWorld);

	tpool.Do(w); 
	tpool.Wait(); 
	delete w; 
	return 0;
}

 

Compilation and Linking

QpThread library class prototypes are located in several header files but they are automatically included when you include the base QpThread header file qp.h. The only thing that is not included through qp.h are templates for QpRunnable wrappers (see further) these are included through qp_work.h header.

#include <qpthr/qp.h>
#include <qpthr/qp_work.h>

The QpThread library is compiled to single library file (name depends on the platform) usually the names are libqpthr.so and libqpthr.a. This library depends on standard C++ library and POSIX thread library so all these libraries have to be linked to the application.

-lqpthr [-lposix4] -lpthread [-lstdc++]

I would like to remind you to compile threaded programs with special flags designed to switch the compiler to include and generate proper thread safe code. At least for all platforms you have to compile your source code with -D_REENTRANT.

 

QpThread Initialization

The QpThread library has to be initialized before it can be used. The initialization is done through creation of QpInit object. This object takes some parameters that describe some capabilities that QpThread library will offer. The signal and timer capability are options and are by default disabled. That means you do not have to pay for it if you do not use it (for signal or timer support separate threads are created to handle them).

The QpInit class has this constructor:

QpInit(QP_CAP capability = QP_NONE, QP_SCOPE scope = QP_SCOPE_SYSTEM, QP_SCHED sched = QP_SCHED_OTHER);

The capability is binary ORed value of QpInit::QP_SIGNAL, QpInit::QP_TIMER The scope is provided only for Solaris users who wanted to have thread creation scope other than SYSTEM (the constant for process wide thread scope is QpInit::QP_SCOPE_PROCESS). It is recommended to use default value QP_SCOPE_SYSTEM for the scope argument. The thierd argument QP_SCHED selects desired scheduling policy. Available options are QP_SCHED_OTHER, QP_SCHED_FIFO and QP_SCHED_RR). The FIFO and RR sched policy are real time scheduling policies and it is highly recommended to use default QP_SCHED_OTHER unless you really need real time scheduler.

For example to enable all capabilities QpThread library has you need to use:

QpInit init(QpInit::QP_SIGNAL | QpInit::QP_TIMER);

When the QpInit object that initialized the library is going to be destroyed it waits for all threads that are running until they finish. After the QpInit object is destroyed the QpThread library can’t be used.

Don't initialize QpThread library this way:

WRONG!!!:   QpInit init();

because you did not created the QpInit object. Instead you declared function that return QpInit object. Without arguments you have to do it as this:

QpInit init;

 

QpThread Basics

All QpThread classes and constants are in qpthr namespace (for compilers that support namespaces). If you do not want to prefix all QpThread library classes with qpthr:: prefix just use C++ construct

using namespace qpthr;

The basic class in QpThread library is class QpBase. Classes in QpThread library inherit from QpBase class. QpBase provides only the capability of setting and getting a name. That way every object could have their associated name. The Name of the object is usually initialized by constructor (the last argument) and default value is NULL.

The methods are:

QpBase(const char *name = NULL);
const char *GetName();
void SetName(const char *name); 

 

Timeout Basics

One of the main concepts in QpThread are timeouts. Timeouts can be specified relative to current time or as absolute time. Every function that ends with Abs takes the time with absolute format. Other functions without Abs at the end of the name takes relative timeout. Absolute timeout is specified through struct timespec and the relative timeout with milliseconds or struct timespec also. Why do we have relative and absolute timeouts? Well the absolute would be enough but for the simplicity of using the relative timeout is also provided. The value 0 for milliseconds or NULL for struct timespec * means wait forever.

#include <iostream.h>
#include <qpthr/qp.h>

using namespace qpthr;

class HelloWorld: public QpThread {
    public:
	 ~HelloWorld() {Join();}
	virtual void Main() {
		...
	}
};

int main()
{
	QpInit qp_init;
	HelloWorld ht;

	ht.Start();
	try {
		ht.Join(1000);
	}
	catch (QpTimeoutException &e) {
	}

	struct timespec ts;
	ts.tv_sec = 1;
	ts.tv_nsec = 0;
	try {
		ht.Join(&ts);
	}
	catch (QpTimeoutException &e) {
	}

	struct timeval tv;
	struct timespec tsabs;
	gettimeofday(&tv, NULL);
	tsabs.tv_sec = tv.tv_sec + 1;
	tsabs.tv_nsec = tv.tv_usec * 1000;
	try {
		ht.JoinAbs(&tsabs);
	}
	catch (QpTimeoutException &e) {
	}

	return 0; 
}

 

Exceptions

Exceptions are very useful when dealing with errors. That is why QpThread library uses them extensively. The QpException class is base class of all exceptions that QpThread library uses. Every exception has error code and associated text and can also return its type (like timeout, sys, out-of-memory exception).

Exceptions have following methods:

int ErrCode() const;
const char *ErrText() const; 
virtual const char *ErrType();

There are several Exception classes that are specialized for particular purpose.

QpTimeoutException

This exception is thrown from function that could timeout. Every function that could timeout has the last argument of type bool. If this argument is set to true (the default value) the exception is thrown on timeout otherwise (false value) the return value of false is returned to the application and no exception is thrown.

QpMutex mutex;
try {
	mutex.Lock(1000);
}
catch (QpTimeoutException &e) {
	// timeout – lock is not acquired
	...
}
if (mutex.Lock(1000, false) == false) {
	// last argument is false - do not throw exception
	// timeout - lock is not acquired
	...
}

QpSysException

This exception is responsible of reporting failed system call that is executed in QpThread library. That means if this exception is thrown some problem occurred and the library received error from the underlying libraries or operating system. This exception means that the application run into some serious problem.

QpUnsupportedException

This exception is thrown when the feature is not supported on the HW platform. Currently only QpSpinLock throws this exception on unsupported platform.

QpOutOfMemoryException

This exception indicates that the library wasn't able to allocate needed memory.

QpErrorException

This exception is thrown when the library is used badly. For example you specified bad argument to the library method, you called the library method in not permitted order, the deadlock was detected or the subsystem (signal/timer) was not initialized and you use methods from that subsystem.

Error codes for error cases are:

QP_ERROR_STATE		// bad state
QP_ERROR_ARG		// bad argument
QP_ERROR_DEADLOCK	// deadlock detected
QP_ERROR_DISABLED	// subsystem disabled

QpClonedException

QpThread library has support of catching exception that is propagated out of the Main function of the thread or object that implements QpRunnable interface. These exceptions can be (re-thrown) in another thread. Exceptions that can be successfully propagated from the Main function of the thread and later reported must be inherited from QpClonedException. The QpUserException class is derived from QpClonedException and it is dedicated as base class for user defined exceptions that could be caught by QpThread library and further reported after thread termination. The cancellation of thread is supported through exception that is derived from QpClonedException. This exception is QpCancelException and is thrown in the thread and is caught by the QpThread library. In general QpClonedException is base class for exceptions that could be caught by the QpThread library.

QpUserException

Base class for user defined exceptions that can be caught by thread or QpRunnable and after thread termination can be re-thrown. This exceptions must define the methods Clone() and Raise(). These methods will be defined usually this way:

using namespace qpthr;
    
class MyException: public QpUserException { 
    public:
	MyException(int code = 0, const char *text = NULL):
		QpUserException(code, text) {} 
	virtual QpClonedException *MyException::Clone() const {
		return new MyException(e_code, e_text);
	}
	virtual void Raise() const { throw *this; }
};

class HelloWorld: public QpThread {
    public:
	virtual void Main() {
		...
		throw MyException();
		...
	}
};

int main()
{
	QpInit init;
	HelloWorld hw;

	hw.Start();
	hw.Join();
	try {
		hw.Raise();
	}
	catch (MyException &e) {
		// my exception
		...
	}
	catch (QpClonedException &e) {
		// other exceptions
		...
	}
}

QpCancelException

This exception is thrown when the thread is canceled (the QpThread::TestCancel() is called in the thread and the cancel request was delivered to the thread through Cancel() method). This way destructors of objects on the stack are called and you can define your own catch handlers for QpCancelException. But if you want that cancellation will work you have to re-throw QpCancelException when you catch it.

using namespace qpthr;

class My: public QpThread {
	virtual void Main() { 
		try {
			...
			QpThread::TestCancel();
			...
			throw MyException(); // derived from QpUserException 
			...
		} 
		catch (QpCancelException &e) { 
			...
			throw; // re-throw it 
		}
		catch (QpUserException &e) {
			throw; // re-throw it
		}
		catch (QpException &e) {
			// other QpThread exceptions
		}
		catch (...) { 
			// others 
		}
	}
};

void main() 
{ 
	QpInit init; 
	My my;

	my.Start();
	my.Cancel(); 
	my.Join(); 
	try { 
		my.Raise(); // raise the exception that was caught by the thread 
	} 
	catch (QpCancelException &e) { 
		// was canceled 
	} 
	catch (QpUserException &e) {
		// user defined exceptions
 	}
}

 

Synchronization

Synchronization is one of the basic concepts thread library must support. Without synchronization it will not be possible to write multithreaded program where the threads cooperate on data. You have several options in QpThread library how to do synchronization. The basic synchronization primitives in QpThread library are locks and condition variables.

All locks implemented in QpThread library support at least these methods:

Lock(); 
Unlock(); 
TryLock();

moreover there are extended locks that add timeouts to Lock() operation. Methods that end with Abs take absolute time and without Abs take relative time (milliseconds or struct timespec). The timeout versions of methods have these signatures:

bool LockAbs(const struct timespec *ts, bool throw_ex = true);
bool Lock(const struct timespec *ts, bool throw_ex = true);
bool Lock(unsigned int msec, bool throw_ex = true);

The functions return true if the lock was acquired or false if timeout occurs (only when the last argument – throw_ex is false, if it is true (default value) the QpTimeoutException is thrown to the application).

Locks

QpAsyncSafeSem

Unsupported on Win32

QpAsyncSafeSem provides platform independent asynchronous safe semaphore thus it is possible to use it in synchronous signal handlers (look at QpThread signal support). This semaphore is quite expensive so use it only when you need its feature - async-safe and portable.

QpSpinLock

Unsupported on Win32

QpSpinLock is the lock that supports locking by spinning in the loop until the lock is acquired. That means, if the lock isn't available the processor time is wasted in the loop. Why this lock is useful? Well, you will probably use other synchronization primitives but this spinlock is useful when you need for example lock that could be acquired safely in synchronous signal handler and is cheep in terms of memory consumption (you can have thousands of them). This lock is Async-Signal safe. QpSpinLock feature does not come at zero cost. QpSpinLock depends on hardware so it is not supported on some platforms. Currently Intel, Alpha and Sparc platforms supports QpSpinLock. If the platform does not support QpSpinLock then it throws QpUnsupportedException. Use QpSpinLock only when the benefits of using this lock outweighs the portability issue.

QpMutex

This is the main synchronization primitive in QpThread library. It supports acquiring and releasing the lock but without timeouts. So if you need simple fast lock then this should be your choice.

QpMutex has two restrictions: you cannot call the Unlock() method before you have acquired the lock through Lock()/TryLock(). It is undefined how the lock will behave when you call the Unlock() method before the Lock()/TryLock().Furthermore you cannot acquire the mutex in one thread and then release it in another thread. The thread which acquired the mutex have to release it also. It is not possible to call the Lock() method in the thread 1 and then call Unlock() method in thread 2. The thread 1 has to do both Lock() and Unlock(), and the thread 2 can also do Lock() and Unlock() on the same thread.

QpMutexEx

This mutex is extension of QpMutex. It adds relative and absolute timeouts to Lock method. Moreover it supports identification of the lock owner through Owner() method.

QpSem

This class represents the semaphore with associated value. The value is assigned to QpSem by constructor. The lock is acquired if the value of the semaphore is greater than or equal the requested lock value (usually 1) and the semaphore value is then decreased by lock requested value. Upon Unlock the unlock value is added to the semaphore value. Additionally to the standard methods for lock the QpSem has these methods:

bool LockValAbs(int val, const struct timespec *ts, bool throw_ex = true);
bool LockVal(int val, const struct timespec *ts, bool throw_ex = true);
bool LockVal(int val, unsigned int msec, bool throw_ex = true);
void UnlockVal(int val); 
bool TryLockVal(int val); 
int CurVal();

Condition Variable

QpThread support the condition variable paradigm. Locks can have associated so called condition variables. The user acquires the lock and then calls the Wait() method of associated condition variable. The Wait() method unlocks the lock and waits until it is signaled/broadcasted or timeouts. In either case it re-acquires the lock.

QpCond

The class QpCond implements the condition variable for all types of locks and its constructor takes the lock object. That lock object will be associated with condition variable. The methods of condition variable are:

void Wait(); 
bool Wait(const struct timespec *ts, bool throw_ex = true); 
bool Wait(unsigned int msec, bool throw_ex = true); 
bool WaitAbs(const struct timespec *ts, bool throw_ex = true); 
void Signal();
void Broadcast();

 

QpMutex mutex;
QpCond cond(mutex);
int x = 1;

/* thread 1 */
void T1::Main() {
	mutex.Lock();
	...
	while (x != 0)
		cond.Wait();
	...
	mutex.Unlock();
} 

/* thread 2 */
void T2::Main() {
	mutex.Lock();
	x = 0;
	mutex.Unlock(); 
	cond.Signal(); 
}   

 

Critical Section - QpSLock

The C++ destructors make our life much easier when we work with locks. The class QpSLock is designed to take advantage of the destructors. The QpSLock takes the lock object in constructor and acquires the lock and the destructor of QpSLock class is designed to release the lock. Why this is useful? Lets imagine this code:

QpMutex mutex;

void test() { 
	QpSLock lock(&mutex);
	...
	if (...) 
		return;
	if (...)
		throw ...;
	QpThread::TestCancel();
	switch (...) {
	    case ...: 
		throw ...; 
	    case ...;
		QpThread::TestCancel();
		break;
	}
	...
	return; 
}

In this example the mutex is unlocked when the function returns or throws some exception. The compiler will do all the work for releasing the mutex through destructor and the mutex will be released. Without this concept how many times will you have to release the mutex? And how many times will someone else forget to do that when he is modifying the code? This concept could be generalized as protecting critical section with the QpSLock

using namespace qpthr;

QpMutex mutex1, mutex2;

void test() 
{ 
	{ // critical section protected by mutex1 
		QpSLock lock(&mutex1);
		... 
	}
	// no locks are held
	...
	{ // critical section protected by mutex1 
		QpSLock lock(&mutex1);
		...
	}
	{ // critical section protected by mutex2 
		QpSLock lock(&mutex2); 
		...
	}
}

 

Monitor

The concept of monitor is to provide further simplification of locks and condition variables. The monitor represents both, the lock and the condition variable. QpThread library has three monitors:

QpMonitor is composed of QpMutex and condition variable
QpMonitorEx is composed of QpMutexEx (so with timeouts on locks) and condition variable
QpMonitorSL is composed of QpSpinLock and condition variable.

Monitor supports locking through:

Lock(), 
Unlock(), 
TryLock() 

and waiting and signaling of the condition through:

void Wait(); 
bool Wait(const struct timespec *ts, bool throw_ex = true); 
bool Wait(unsigned int msec, bool throw_ex = true); 
bool WaitAbs(const struct timespec *ts, bool throw_ex = true); 
void Signal();
void Broadcast();

 

QpMonitor monitor; 
int x = 1;

// thread 1 
void T1::Main() { 
	QpSLock lock(&monitor); 
	while (x != 0) 
	monitor.Wait();
	... 
}

// thread 2 void 
T2::Main() { 
	QpSLock lock(&monitor); 
	x = 0; 
	monitor.Signal();
}

 

Read/Write Locks

Read/Write locks provide useful locking primitive when we have several threads that are only reading and need to be synchronized with thread(s) that modify data. This can be accomplished with normal mutexes also but the important difference is that with read/write locks there can be several readers in protected region and they are synchronized with writers.

QpRWLock

QpRWLock is the lock that implements simple, fast read/write lock It supports basic operations that are needed for read/write locks. These operations are:

void ReadLock();
void WriteLock();
bool TryReadLock();
bool TryWriteLock();
void Unlock();

QpRWLockEx

QpRWLockEx is extended read/write lock. The extension is in timeout that can be specified in read/write lock operations. Priority of the lock can be set also. Default priority is that writers have priority over readers (so when writer is waiting for the lock other readers that may come are not allowed to acquire the lock and have to wait until writer acquires the lock and finishes). Furthermore QpRWLockEx supports concept of promoting (upgrading) read lock to write lock.

Why promoting/upgrading is it useful? Let suppose that we have many readers of some long queue that is protected by QpRWLockEx. Then suppose we have two writers. One writer pushes new items at the end and the second checks whole queue if it contains some item and if the item is found then it removes that item from the queue. If you acquire the write lock for this task, then all readers would be blocked to operate on that queue. But most of the time the second writer also only reads the queue. But on the other hand for the second writer you cannot acquire read lock and then if the item is found unlock it, acquire write lock and remove the item because between unlock and acquiring the write lock someone else (the first writer that performs simple task) could alter that queue. Here comes the possibility of upgrading read lock to write lock through PromoteWriteLock(). With this operation you promote/upgrade read lock to write lock and this way you avoid race condition for the unlocking and reacquiring the write lock.

It is important to mention that if you call PromoteWriteLock() in two readers at the same time the deadlock occurs. That is because before acquiring the write lock all readers have to leave protected critical section. But if you have two readers that wants to upgrade read locks to write lock each one is waiting for the other to leave the critical section. This is detected by PromoteWriteLock() and QpErrorException is thrown with the cause of QP_ERROR_DEADLOCK. To avoid this from happening special ReadForWriteLock() and corresponding ReadForWriteUnlock() operations are defined for acquiring the read lock at the beginning. These are used instead of ReadLock() and Unlock() when you plan to promote/upgrade that read lock to write lock. The protection is accomplished the way that if another thread calls ReadForWriteLock() it is blocked until the lock is released by the thread who acquired the lock with ReadForWriteLock() first. Because ReadForWriteLock() acquires read lock the threads that call ReadLock() are not blocked.Of course you can use only ReadLock() and then PromoteWriteLock() but then you have to deal with deadlock exception if you are using PromoteWriteLock in more than one thread.

Note that when you use ReadForWriteLock() you must call ReadForWriteUnlock() for releasing that lock Or better, use QpReadForWriteSLock helper class.

QpRWLockEx    rw_lock_ex;

/*
 * this code leads to deadlock if run in two parallel threads
 */
rw_lock_ex.ReadLock();
...
rw_lock_ex.PromoteWriteLock();
...
rw_lock_ex.Unlock();

/*
 * this code is OK when run in two (or more) parallel threads
 */
rw_lock_ex.ReadForWriteLock();
...
rw_lock_ex.PromoteWriteLock();
...
rw_lock_ex.ReadForWriteUnlock();


/*
 * this code is OK
 */
/* thread 1 */
rw_lock_ex.ReadLock();
...
rw_lock_ex.Unlock();

/* thread 2 */
rw_lock_ex.ReadLock();
...
rw_lock_ex.PromoteWriteLock();
...
rw_lock_ex.Unlock();

QpRWLockEx supports these operations:

Constructor

enum RW_PRIO {RW_PRIO_WRITE, RW_PRIO_READ };
QpRWLockEx(RW_PRIO prio = RW_PRIO_WRITE, const char *name = NULL);

Read/Write locking operations with timeout:

bool ReadLockAbs(const struct timespec *ts, bool throw_ex = true);
bool WriteLockAbs(const struct timespec *ts, bool throw_ex = true);

void ReadLock();
bool ReadLock(const struct timespec *ts, bool throw_ex = true);
bool ReadLock(unsigned int msec, bool throw_ex = true);
bool TryReadLock();

void WriteLock();
bool WriteLock(const struct timespec *ts, bool throw_ex = true);
bool WriteLock(unsigned int msec, bool throw_ex = true);
bool TryWriteLock();

void Unlock();

ReadForWrite lock/unlock operations with timeout:

bool ReadForWriteLockAbs(const struct timespec *ts, bool throw_ex = true);
bool TryReadForWriteLock();

void ReadForWriteUnlock();

void ReadForWriteLock();
bool ReadForWriteLock(const struct timespec *ts, bool throw_ex = true);
bool ReadForWriteLock(unsigned int msec, bool throw_ex = true);

Promoting read lock to write lock operations:

bool PromoteWriteLockAbs(const struct timespec *ts, bool throw_ex = true);
bool TryPromoteWriteLock();

void PromoteWriteLock();
bool PromoteWriteLock(const struct timespec *ts, bool throw_ex = true);
bool PromoteWriteLock(unsigned int msec, bool throw_ex = true);

Read/Write critical section protection with Qp...SLock

As read/write locks have different capabilities than normal mutexes and semaphores they need little bit different helper classes to provide automatic locking and unlocking. Three new classes are defined for this purpose. All with prefix Qp and suffix SLock

QpReadSLock

Constructor of QpReadSLock acquires read lock of QpRWLock(Ex) lock and destructor unlocks it.

QpWriteSLock

Constructor of QpWriteSLock acquires write lock of QpRWLock(Ex) lock and destructor unlocks it.

QpReadForWriteSLock

Constructor of QpReadForWriteSLock acquires read for write lock of QpRWLockEx with ReadForWriteLock() and destructor unlocks it properly with ReadForWriteUnlock().

 

QpRWLockEx    rw_lock_ex;

/* thread T1 - reader */
void T1::Main() {
	for (...) {
		QpReadSLock lock(rw_lock_ex);
		...
	}
}

/* thread T2  - writer */
void T2::Main() {
	for (...) {
		QpWriteSLock lock(rw_lock_ex);
		...
	}
}

/* thread T3 - reader/writer */
void T3::Main() {
	for (...) {
		QpReadForWriteSLock lock(rw_lock_ex);
		...
		if (...) {
			rw_lock_ex.PromoteWriteLock();
			...
		}
	}
}

 

QpRunnable

This class is designed as abstract class that is used as interface definition. Users class must implement (be derived from) QpRunnable interface if it wants to be executed in thread, thread pool or invoked by signal handler or timer. The method Main() has to be re-implemented to do the desired work. The Main() method can throw exceptions derived from QpClonedException outside the Main() method (these are QpCancelException and QpUserException and derived classes). This exception is caught by QpRunnable and is stored for further processing. Other exceptions will cause application to terminate. The QpClonedException can be re-thrown from the QpRunnable class with Raise() method.

virtual void Main() = 0;
void Raise();

QpThread library has predefined wrappers that implement QpRunnable interface and these wrappers could be used as adapters for functions or member methods of objects that do not implement QpRunnable interface. These wrappers are templates and are located in qp_work.h header

The naming of wrappers always start with QpWork_ prefix and ends with number that specifies how many arguments function/member method takes. Naming convention is as follows:

QpWork_p0 ... QpWork_p5 functions that return void (procedures)
QpWork_f0 ... QpWork_f5 functions
QpWork_mp0 ... QpWork_mp5 object member functions that return void (procedures)
QpWork_mf0 ... QpWork_mf5 object member functions
QpWork_mcp0 ... QpWork_mcp5 object member const functions that return void (procedures)
QpWork_mcf0 ... QpWork_mcf5 object member const functions 

 

 

void p0() {...}
void p2(int, char) { ... }
int f2(char, char) { ... }

QpWork_p0 work_p0(p0);
QpWork_p2<int, char> work_p2(p2, 2, 'a');
QpWork_f2<int, char, char> work_f2(f2, 'a', 'b');
QpThread t0(&work_p0);
QpThread t1(&work_p2);
QpThread t2(&work_f2);

t0.Start();
t1.Start();
t2.Start();
t0.Join();
t1.Join();
t2.Join();
int result = work_f2.Result();
try {
	work_f2.Raise();
}
catch (QpCancelException &e) {
	...
}
catch (QpUserException &e) {
	...
}

 

class MyClass {
	void P2(int, int) {...}
	int F2(char, char) {...}
};

MyClass my_class;
QpWork_mp2<MyClass, int, int> work_mp2(my_class, &MyClass::P2, 10, 20);
QpWork_mf2<int, MyClass, char, char> work_mf2(my_class, &MyClass::F2, 'x', 'y');
QpThreadPool pool;

pool.Do(&work_mp2);
pool.Do(&work_mf2);
pool.Wait(&work_mp2);
pool.Wait(&work_mf2);
int result = work_mf2.Result();

 

QpThread

QpThread library like other libraries supports the concept of thread abstraction through C++ class. In this library the thread is represented by class QpThread. Every thread can have associated priority and stack size. The priorities are defined by symbolic names:

QP_PRIO_MIN
QP_PRIO_AVGMIN
QP_PRIO_AVG
QP_PRIO_AVGMAX
QP_PRIO_MAX

QP_PRIO_DFL

Special constant QP_PRIO_DFL represents default priority and this priority is inherited from parent thread.

Thread priority can be fine tunned through exact priority values. To do that you pass the value of desired priority increased with constant QP_PRIO_OFFSET. Be sure that the value (before you increase it with QP_PRIO_OFFSET) is between min and max priority values. These min/max values can be obtained through:

QpInit::PrioValue(QP_PRIO_MIN)
QpInit::PrioValue(QP_PRIO_MAX)
I recommend to fine tunne priorities of threads with direct values only if you really need it and only if you select real time scheduler.

The stack size is by default 0 which means default stack that will be assigned by system (underlying library). The thread constructor takes all configurable parameters of QpThread. It is possible to create new class which is derived from QpThread class and redefines the Main() method. Another way is to let the constructor of QpThread class to take object that implements QpRunnable interface. That objects Main() method will be executed in the thread. The first constructor is used for derived classes and the second one is used for QpRunnable enabled objects.

protected:
	QpThread(unsigned int prio = QP_PRIO_DFL,
		 unsigned int stack_size = 0,
		 const char *name = NULL,
		 bool thr_created = false);
public:
	QpThread(QpRunnable *r,
		 unsigned int prio = QP_PRIO_DFL,
		 unsigned int stack_size = 0,
		 const char *name = NULL);

After the thread is created by constructor it has to be started by Start() method. When the method Start() is called again then it waits for Main() method or QpRunnable object to finish and after that it executes it again.

void Start();

The Join methods return when the thread finishes or after there is timeout. More than one Join can wait for thread termination.

bool Join(unsigned int msec, bool throw_ex = true);
bool Join(const struct timespec *ts = NULL, bool throw_ex = true);
bool JoinAbs(const struct timespec *ts = NULL, bool throw_ex = true);

Methods for sleep and wakeup of thread are defined. The WakeUp() method interrupts the Sleep().

static bool Sleep(unsigned int msec);
static bool Sleep(const struct timespec *ts = NULL);
static bool SleepAbs(const struct timespec *ts = NULL);
void WakeUp();

Setting and getting priority is supported through these methods that takes priority symbolic constants:

unsigned int SetPriority(unsigned int prio);
unsigned int GetPriority();

The cancellation is supported through deferred approach – the TestCancel() function is called when the thread tests if the cancellation occurred. If the cancellation occurs then the QpCancelException is thrown to the thread. The function Cancel() is used to deliver cancel request to the thread object.

void Cancel();
static void TestCancel();

The exception that was caught by QpThread can be re-thrown by function Raise(). Only exceptions derived from QpClonedException can be re-thrown.

void Raise();

For testing if the thread is still a live there is method:

bool IsAlive();

For the main() function (the entry function of C/C++ program) is created special adapter object of type QpThread. Pointer to this object is available as return value of function:

static QpThread *MainThread();

To identify current thread there is method Current(). It is also possible to get thread sequences (some sort of identifiers) through CurrentSeq() or ThreadSeq().

static QpThread *Current();
static unsigned long CurrentSeq();
unsigned long ThreadSeq();

 

using namespace qpthr;
    
class T1: public QpRunnable {
	virtual void Main() { 
		...
		QpThread::TestCancel();
		QpThread::Sleep(1000);
		...
	}
};

class T2: public QpThread {
	T2(): QpThread() {};
	virtual void Main() {
		...
		Sleep();
		...
	}
};

int main() 
{ 
	QpInit init; 
	T1 t1_runnable; 
	QpThread t1(&t1_runnable);
	T2 t2(QP_PRIO_MAX);
	T2 *t22;

	cout << QpThread::Current()->GetName() << endl;
	cout << QpThread::CurrentSeq() << endl;
	t22 = new T2;

	t1.Start(); 
	t2.Start(); 
	t22->Start();

	QpThread::Current()->Sleep(2000);

	t1.Cancel(); 
	t2.Wakeup();
	t22->WakeUp();
	...
	t1.Join(); 
	t2.Join();
	t22->Join();
	delete t22;
}

 

QpThreadPool

Thread pool is designed to run several jobs (objects that implement QpRunnable interface) that are inserted to the job queue of QpThreadPool through Do() method. The jobs are then executed on available threads that are in QpThreadPool. The method Wait() then returns objects that finished execution. It is also possible to wait for particular job object. The QpThreadPool can contain configurable amount of threads that can be allocated on demand and can exit if they don't have nothing to do. The constructor of QpThreadPool takes all configurable parameters: priority, stack size of the threads, how many threads will be available in QpThreadPool (default 1), if the threads will be allocated on QpThreadPool creation or on demand (default is on thread pool creation), the life time - the amount of time the thread is permitted to be idle in ThreadPool, after this time the thread exits and is removed form the pool (default: it will stay in QpThread forever). The constructor looks like this:

QpThreadPool(unsigned int prio = QP_PRIO_DFL,
	     unsigned int stack_size = 0, 
	     unsigned int max_threads = 1, 
	     bool on_demand = false,
	     unsigned int msec_life_time = 0,
	     const char *name = NULL); 

Do() method insert the object in the queue of jobs that will be executed.

void Do(QpRunnable *j);

Wait() methods wait for any (first parameter NULL) or particular job object to be finished. They returns the object that finished execution.

QpRunnable *Wait(QpRunnable *what, unsigned int msec, bool throw_ex = true); 
QpRunnable *Wait(QpRunnable *what = NULL, const struct timespec *ts_rel = NULL, bool throw_ex = true); 
QpRunnable *WaitAbs(QpRunnable *what = NULL, const struct timespec *ts_abs = NULL, bool throw_ex = true);  

CrrrentThreads() method reports how many threads are allocated in the thread pool.

int CurrentThreads();

CurrentJob() method reports current job object the thread is executing.

QpRunnable *CurrentJob(unsigned int thr_nr = 0);

 

using namespace qpthr;
    
class J1: public QpRunnable { 
	virtual void Main() {
		...
	}
};

class J2: public QpRunnable { 
	virtual void Main() {
		...
	}
};

int main()
{
	QpInit init;
	QpThreadPool pool;

	pool.Do(new J1());
	pool.Do(new J2());
	pool.Do(new J1());
	delete pool.Wait();
	delete pool.Wait();
	delete pool.Wait();

	int i;
	QpThreadPool pool2(QP_PRIO_DFL, 0, 5, true, 10000);
	for (i = 0; i < 100; i++)
		pool2.Do(new J1());
	for (i = 0; i < 100; i++)
		delete pool2.Wait();
}   

 

QpOnce

Class QpOnce provides interface to execute QpOnce derived object or QpRunnable derived object at most once. To invoke Main method you call Do() method of that object in several threads (or several times). It is ensured that the objects Main method will be executed only once. It is completely safe to call Do() method from several threads without synchronization. As usual it supports catching and re-throwing of QpClonedException that is propagated outside Main function.

Method that invokes user defined operation:

void Do();

Method that re-throw QpClonedException:

void Raise();

 

using namespace qpthr;

class MyOnce: public QpOnce {
    public:
	virtual void Main() {
		... // will be executed only once
	}
};

MyOnce once;

class T1: public QpThread {
	virtual void Main() {
		...
		once.Do();
		...
	}
};

class T2: public QpThread {
	virtual void Main() {
		...
		once.Do();
		...
	}
};

int main()
{
	QpInit init;
    
	T1 *t1 = new T1;
	T1 *t11 = new T1;
	T2 *t2 = new T2;

	t1->Start();
	t11->Start();
	t2->Start();

	t1->Join();
	t11->Join();
	t2->Join();

	delete t1;
	delete t11;
	delete t2;
}

 

QpTSD

QpTSD class supports the concept of thread specific data (TSD). This way threads have private storage for data and these data are not shared among other threads. Every thread can set its own value through the Set(void *) method and get the value through Get(). The value stored by one thread is not affected with value stored by another thread. So every thread has its own stored value.

This method sets thread specific data:

void Set(void *);

This method retrieves thread specific data:

void *Get();

 

using namespace qpthr;

QpTSD tsd1, tsd2;

class T1: public QpThread {
	virtual void Main() {
		...
		tsd1.Set((void *) 1);
		tsd2.Set((void *) 11);
		...
		cout << (int) tsd1.Get() << " " << (int) tsd2.Get() << endl;
	}
};

int main()
{
	QpInit init;

	T1 *t1 = new T1;
	t1->Start();

	tsd1.Set((void *) 9);
	tsd2.Set((void *) 99);
	...
	t1->Join();
	delete t1;
}

 

QpTSDT

QpTSTD template provides object oriented approach to thread specific data. It main purpose is to allocate for the thread the object which is thread specific only for that thread. When the thread exits then the object is automatically destroyed. The object is for every thread constructed through "new" operation on the heap and destroyed through the "delete". For QpTSDT template object is defined operator -> which provides access to thread specific object. The object is allocated for the thread on demand so it is allocated only for threads that use it.

using namespace qpthr;

class PerThreadObject {
    public:
	int per_thread_data;
	void Invoke() {
		cout << "I am only for" << QpThread::CurrentSeq() << endl;
	}
};

QpTSDT<PerThreadObject> tsd;

class T1: public QpThread {
	virtual void Main() {
		tsd->per_thread_data = 1;
		tsd->Invoke();
	}
};

int main()
{
	QpInit init;

	T1 *t1 = new T1;
	T1 *t1a = new T1;
	t1->Start();
	t1a->Start();

	t1->Join();
	t1a->Join();
	delete t1;
	delete t1a;
}

 

Signal Support

Unsupported on Win32

Signal support in QpThread library provides unique support for signals and per thread signal handlers that are platform independent. It is necessary to understand the difference between synchronous and asynchronous signals in order to successfully use provided signal support. Synchronous signal is delivered to the application as an response to some action the application triggered. That means by example when the application dereferences unknown memory it receives SIGSEGV or when the application will divide something with floating point zero value it will receive SIGFPE or when the application will try to write data to closed pipe it receive SIGPIPE. The synchronous signals are:

    SIGILL, SIGIOT, SIGABRT, SIGEMT, SIGFPE, SIGSEGV, SIGBUS, SIGSYS, SIGPIPE

From the synchronous signal handler you can call only Async-signal safe functions. It means you are pretty restricted. In general almost all complex library functions are not Async-signal safe so you can not call them (like functions from stdio, malloc/free etc). Do not be confused that for synchronous signals you are allowed to call only Async-signal safe functions – this is the terminology.
It is common to use the sigsetjmp and siglongjmp in synchronous signal handler. QpThread library does not restrict you to use sigsetjmp / siglongjmp so feel free to do it even though it is harder to use it than in plain C because you have to take care of C++ objects on the stack (their destructors will not be called if you unwind the stack with siglongjmp).
The synchronous signal is delivered to the thread that caused the generation of the signal. Thus it is important to handle synchronous signals in thread which received the signal.

The asynchronous signals are signals that are delivered to application independently by external event. For example when user press Ctrl-C then to the application is sent SIGINT signal or when the child exits the SIGCHLD is delivered.
Asynchronous signal have different nature than synchronous. They are delivered to any one and only one of threads in application that can receive the signal (signal is unmasked). That is why the QpThread library has special thread dedicated only for handling asynchronous signal handlers. All asynchronous signals are delivered to this special thread and the signal handlers will be executed there. In asynchronous signal handlers you can use fully all libraries you want to (when you use QpThread library signal support of course). The only important thing to remember is that these signal handlers are running in separate thread, so you can not use sigsetjmp/siglongjmp to other thread or thread specific data from thread that registered signal handler. On the other hand you can write complex signal handlers and you can use any library function you want to.

It is also important to realize that the QpThread library does not’t protect you against EINTR error which is returned by system functions when they are interrupted by signal handler. Unfortunately threads are not protected by masking asynchronous signals due to portability reasons of handling asynchronous signals so the functions can return EINTR even though the registered jobs and functions for signal handler will be executed in other thread. To understand why this is done this way it is important to understand implementation of threads and handling signals for threads on various platforms. In QpThread library special signal handler is invoked in thread when asynchronous signal is delivered. That special signal handler sends request to thread dedicated to process asynchronous signals and user registered signal handlers are executed there. QpThread library supports registering objects that implement QpRunnable interface as signal handlers for both synchronous and also for asynchronous signals. Furthermore it is possible to register plain functions. Every thread can have its own set of jobs that will be executed when the signal is delivered. On thread termination the synchronous signal handlers associated with terminated thread are unregistered (they are meaningful only in the terminated thread context) but the asynchronous signals are not associated with particular thread so they are not removed. They have to be explicitly unregistered even by another thread.

In order to use signal support the QpThread library has to be initialized with the signal subsystem this way:

QpInit init(QpInit::QP_SIGNAL);

The actions needed to do before using signal handlers are these: You have to install signal handler (with particular policy) and then you register QpRunnable enabled objects or functions that will be executed in that handler when signal is delivered to the application/thread. Synchronous signals will be executed in the thread the signal was delivered to. Asynchronous signals will be executed in special thread designed to handle these signals. The types of signal handlers are these:

This method installs signal handler:

static void InstallHandler(int sig, SIG_HANDLER handler); 

Method that returns the original signal handler is:

static void (*OrigHandler(int sig))(int); 

These methods register job objects or functions for executing in signal handler. They return handler that is used for un-registering.

typedef void (*signal_func)(int sig_nr, void *arg);

static QpRegItem RegisterSignalFunc(int sig, signal_func func, void *arg = NULL); 
static QpRegItem RegisterSignalFunc(int sig, QpRunnable *j) 

Method UnregisterSignalFunc() is used for removing registered object or function from the signal handler:

static void UnregisterSignalFunc(QpRegItem);

This function is used for exiting application within signal handler:

static void Exit();

 

using namespace qpthr;

class IntHandler: public QpRunnable {
	virtual void Main() {
		if (...) {
			cout << "exiting" << endl;
			QpSignal::Exit();
		}
	}
};

class My: public QpThread {
	virtual void Main() {
		IntHandler ih3;
		QpRegItem ri3 = QpSignal::RegisterSignalFunc(SIGINT, &ih3);
		...	
		QpSignal::UnregisterSignalFunc(ri3);
	}
};

int main() 
{ 
	QpInit init(QpInit::QP_SIGNAL);
	My t1;
	IntHandler ih1, ih2;

	QpSignal::InstallHandler(SIGINT, HANDLER);
	QpRegItem ri1 = QpSignal::RegisterSignalFunc(SIGINT, &ih1);
	QpRegItem ri2 = QpSignal::RegisterSignalFunc(SIGINT, &ih2);
	t1.Start();
	...
	t1.Join();
	QpSignal::UnregisterSignalFunc(ri2);
	QpSignal::UnregisterSignalFunc(ri1);
}

 

void segv_func(int signr, void *arg)
{
	sigjmp_buf *buf = (sigjmp_buf *) arg;
	siglongjmp(*buf, 1);
}

int main()
{
	QpInit init(QpInit::QP_SIGNAL);
	sigjmp_buf jmp;
	QpRegItem ri;

	QpSignal::InstallHandler(SIGSEGV, HANDLER);

	if (sigsetjmp(jmp, 1) == 0) {
		ri = QpSignal::RegisterSignalFunc(SIGSEGV, segv_func, &jmp);
		int *ptr = NULL;
		*ptr = 1;	// segmentation fault
	}
	QpSignal::UnregisterSignalFunc(ri);
}

 

Producent/Consumer support – QpQueue/QpStack

The QpThread library supports the concept of message containers producer(s)/consumer(s) like. These containers are prepared to handle concurrent access of threads and have associated capacity. Several producers can put messages and several consumers can get them. Methods for storing and retrieving messages have timeouts and supported is also concept of closing the container so the consumer(s) is informed that no more data will be put into the container. Containers have configurable size in order to protect resources when the producer(s) is(are) faster than consumer(s). The containers are STL based so you can chose the best internal representation that will be used.

QpQueue

The QpQueue implements FIFO order of messages. As STL container is recommended to use list container. The methods of QpQueue are:

QpQueue(unsigned int capacity = 1024, const char *name = NULL) 
bool Send(T &p, const struct timespec *ts = NULL, bool throw_ex = true); 
bool Send(T &p, unsigned int msec, bool throw_ex = true); 
bool SendAbs(T &p, const struct timespec *ts_abs = NULL, bool throw_ex = true);
void SendDone();
bool CanReceive();
bool Receive(T *retval, const struct timespec *ts = NULL, bool throw_ex = true);
bool Receive(T *retval, unsigned int msec, bool throw_ex = true);
bool ReceiveAbs(T *retval, const struct timespec *ts_abs, bool throw_ex = true);
bool TryReceive(T *retval);
unsigned int Size();

 

#include <list>

QpQueue < list < int > > queue;

/* thread 1 */
T1::Main() {
	int i;
	for (i = 0; i < 100; i++)
		queue.Send(i);
	queue.SendDone();
} 

/* thread 2 */
T2::Main() {
	int i;
	while (queue.Receive(&i))
		cout << i << endl;
}

 

QpStack

The QpStack implements LIFO order of messages. As STL container is recommended to use list or vector containers. The QpStack has similar methods like QpQueue except that Send is Push and Receive is Pop. The methods of QpStack are:

QpStack(unsigned int capacity = 1024, const char *name = NULL);
bool Push(T &p, const struct timespec *ts = NULL, bool throw_ex = true);
bool Push(T &p, unsigned int msec, bool throw_ex = true);
bool PushAbs(T &p, const struct timespec *ts_abs = NULL, bool throw_ex = true);
void PushDone() bool CanPop() bool Pop(T *retval, unsigned int msec, bool throw_ex = true);
bool CanPop();
bool Pop(T *retval, const struct timespec *ts = NULL, bool throw_ex = true);
bool Pop(T *retval, unsigned int msec, bool throw_ex = true);
bool PopAbs(T *retval, const struct timespec *ts_abs = NULL, bool throw_ex = true);
bool TryPop(T *retval);
unsigned int Size()

 

Timer

Timer support allows user to register jobs that will be run at particular time and interval with configurable number of invocations. It is possible to register only objects that implement QpRunnable interface. The Register methods returns handle that identifies registered job and through this handle the job could be unregistered. It is important to realize that these jobs are running in one separate thread that is created for these jobs when the QpThread library is initialized. In order to use Timer capability it is important to initialize the library with QpInit::QP_TIMER capability:

QpInit init(QpInit::QP_TIMER);

These methods register object that will be run at relative time to the current time. This time is also used as the period if the object will be run more than once (shots > 1). The constant QpTimer::INFINITE_SHOTS represents infinite number of shots.

QpTimer::INFINITE_SHOTS;
QpRegItem QpTimer::Register(QpRunnable *r, const struct timespec *ts, int shots = 1);
QpRegItem QpTimer::Register(QpRunnable *r, unsigned int msec, int shots = 1);

These methods register object that will be executed at absolute time with optional period and number of invocations:

QpRegItem QpTimer::RegisterAbs(QpRunnable *r, const struct timespec *ts_abs, const struct timespec *ts_rel_period = NULL, int shots = 1);
QpRegItem QpTimer::RegisterAbs(QpRunnable *r, const struct timespec *ts_abs, unsigned int msec_period, int shots);

This function is used to remove the job object.

void QpTimer::Unregister(QpRegItem arg);

 

using namespace qpthr;

class Scan: public QpRunnable {
	virtual void Main() { 
		cout << "Scanning" << endl;
		... 
	}
};

int main()
{
	QpInit init(QpInit::QP_TIMER); 
	Scan scan; 

	QpRegItem scan_id = QpTimer::Register(&scan, 1000, 50); 
	QpThread::Sleep(10000); 
	QpTimer::Unregister(scan_id);
}

 

Supported Platforms

Currently supported platforms are:

Linux 2.0,2.2/Intel/Alpha/Sparc EGCS at least 1.1.2 with GLIBC-2.1.1
Solaris 2.5/Intel/Sparc SUNWspro 4.2 with STLport-3.12.3
Digital OSF1 4.0/Alpha DEC C++ 6.0
Windows NT Visual C++ 6.0

QpSpinLock is currently supported on Intel, Alpha and Sparc hardware platforms

You are more than welcome to port it to other platforms and/or compilers.

Win32 notes
The QpSignal, QpSpinLock and QpAsyncSafeSem classes are not supported on Win32 platform

 

Download

You can download QpThread source code or binary code from ftp://ftp.gncz.cz/pub/qpthread/

 

Patches, Bug Reports, Suggestions

Please send bug descriptions, patches, suggestions or successful stories to kra@gncz.cz