ACE notes
From OriWiki
Basic multithreading
The following code creates a thread that runs in parallel to the main thread. Both the main and the additional thread repeatedly print a message and fall asleep.
#include <iostream>
#include "ace/Task.h"
class MyThread : public ACE_Task_Base {
public:
virtual int svc() {
for(int i=0; i<7; ++i) {
std::cerr << "MyThread::svc() is running\n";
ACE_OS::sleep(1);
}
return 0;
}
};
int main() {
std::cout << "main stats..\n";
MyThread mthrd;
int result = mthrd.activate();
std::cerr << "result: " << result << "\n";
for(int i=0; i<7; ++i) {
std::cerr << "main() is running\n";
ACE_OS::sleep(2);
}
mthrd.wait();
std::cout << "main done.\n";
}
Thread Mutex
The function foo has a critical section. It keeps track of the threads that runs it and the threads that are in the critical section (should be only one at a time)
#include <iostream>
#include <set>
#include <algorithm>
#include <iterator>
#include "ace/Task.h"
typedef std::set<int> Set;
void foo(int id) {
static ACE_Thread_Mutex mtx; // or: static ACE_Mutex mtx; (requires #include "ace/Mutex.h")
static Set whoInFoo,whoInSection;
whoInFoo.insert(id);
std::cerr << "threads currently in foo: ";
copy(whoInFoo.begin(),whoInFoo.end(), std::ostream_iterator<int>(std::cerr,", ")); std::cerr << "\n";
std::cerr << "\n";
mtx.acquire(); // -- begin critical section
whoInSection.insert(id);
std::cerr << "threads currently in critical section: ";
copy(whoInSection.begin(),whoInSection.end(), std::ostream_iterator<int>(std::cerr,", ")); std::cerr << "\n";
std::cerr << "\n";
ACE_OS::sleep(1);
whoInSection.erase(id);
mtx.release(); // -- end critical section
whoInFoo.erase(id);
}
class MyThread : public ACE_Task_Base {
int _id;
public:
MyThread(int id) : _id(id) {}
virtual int svc() {
foo(_id);
}
};
int main() {
MyThread t1(1),t2(2),t3(3);
t1.activate();
t2.activate();
t3.activate();
t1.wait();
t2.wait();
t3.wait();
}
Output:
threads currently in foo: 1, threads currently in critical section: 1, threads currently in foo: 1, 2, threads currently in foo: 1, 2, 3, threads currently in critical section: 2, threads currently in critical section: 3,
Alternative main:
#include <vector>
int main() {
std::vector<MyThread*> v;
for(int i=0; i<7; ++i) {
v.push_back(new MyThread(i));
v[i]->activate();
}
for(int i=0; i<7; ++i) {
v[i]->wait();
delete v[i];
}
}
Guard
A Guard acquires the mutex upon construction and releases it upon destruction.
#include "ace/Mutex.h"
void foo(int id) {
static ACE_Mutex mtx;
ACE_Guard<ACE_Mutex> guard(mtx);
...
}
Signal & wait example
#include <iostream>
#include "ace/Task.h"
#include "ace/Mutex.h"
#include "ace/Condition_T.h"
class MyThread : public ACE_Task_Base {
ACE_Condition<ACE_Mutex>& _cond;
public:
MyThread(ACE_Condition<ACE_Mutex>& cond): _cond(cond) {}
virtual int svc() {
std::cerr << "Mythread sleeping...\n";
ACE_OS::sleep(1);
std::cerr << "Mythread awoked!\n";
_cond.signal();
ACE_OS::sleep(1);
std::cerr << "Mythread still alive\n";
}
};
int main() {
ACE_Mutex mtx;
ACE_Condition<ACE_Mutex> cond(mtx);
MyThread t(cond);
t.activate();
std::cerr << "main: waiting for a signal..\n";
cond.wait();
std::cerr << "main: got a signal\n";
t.wait();
}
Signal chain
The following code creates a serial of threads, each one waiting to its predecessor to signal it. The first thread is waiting to a signal from main. Note that if the signal from main is sent before it is being listen to - the signal is lost (at least that what appears to be happening)
#include <iostream>
#include <vector>
#include "ace/Task.h"
#include "ace/Mutex.h"
#include "ace/Condition_T.h"
class MyThread : public ACE_Task_Base {
unsigned int m_tid;
ACE_Condition<ACE_Mutex> *m_cond1, *m_cond2;
public:
MyThread(unsigned int tid, ACE_Condition<ACE_Mutex> *cond1,
ACE_Condition<ACE_Mutex> *cond2):
m_tid(tid), m_cond1(cond1), m_cond2(cond2) {}
virtual int svc() {
std::cerr << "Thread#" << m_tid << " activated and waiting..\n";
m_cond1->wait();
std::cerr << "Thread#" << m_tid << " got a signal\n";
m_cond2->signal();
std::cerr << "Thread#" << m_tid << "done.\n";
}
};
typedef ACE_Condition<ACE_Mutex> Cond;
int main() {
unsigned int threadNums = 10; //100
std::vector<ACE_Mutex*> mtxs;
std::vector<Cond*> conds;
std::vector<MyThread*> threads;
ACE_Mutex mtx;
Cond mainCond(mtx);
for(unsigned int i=0; i<threadNums; ++i) {
mtxs.push_back(new ACE_Mutex());
conds.push_back(new Cond(*(mtxs[i])));
Cond *prevCond;
if(i==0)
prevCond = &mainCond;
else
prevCond = conds[i-1];
threads.push_back(new MyThread(i,prevCond,conds[i]));
threads[i]->activate();
}
std::cerr << "main: all threads are activated\n";
ACE_OS::sleep(1); // This sleep seems essential.. if the signal is emitted before the
// wait, the signal is lost!
mainCond.signal();
for(unsigned int i=0; i<threadNums; ++i)
threads[i]->wait();
for(unsigned int i=0; i<threadNums; ++i) {
delete mtxs[i];
delete conds[i];
delete threads[i];
}
}
Signal Before Wait - problem
In the following code one thread send a signal before the other one starts to wait for it. It seems like the signal is lost and the wait stuck forever. This suggest a need for a quing mecahnism in which the signal is not lost but rather kept in a queue for a future wait() to take it.
#include <iostream>
#include "ace/Task.h"
#include "ace/Mutex.h"
#include "ace/Condition_T.h"
class MyThread : public ACE_Task_Base {
ACE_Condition<ACE_Mutex>& _cond;
public:
MyThread(ACE_Condition<ACE_Mutex>& cond): _cond(cond) {}
virtual int svc() {
_cond.signal();
std::cerr << "Mythread: signal sent!\n";
}
};
int main() {
ACE_Mutex mtx;
ACE_Condition<ACE_Mutex> cond(mtx);
MyThread t(cond);
t.activate();
ACE_OS::sleep(4);
std::cerr << "main: waiting for a signal..\n";
cond.wait();
std::cerr << "main: got a signal\n";
t.wait();
}
Time limited wait
ACE_Time_Value current_time = ACE_OS::gettimeofday (); ACE_Time_Value deadline = current_time + ACE_Time_Value(4); // 4 seconds to timeout int res = m_cond.wait(&deadline); std::cerr << "*** wait return value: " << res << "\n"; // 0: - got a signal, -1: timeout
Alternative multi threading technique
The following code was tested on visual studio (all the above were tested on g++). The technique is taken from here.
#include <iostream>
#include <ace/Task.h>
class MyThread: public ACE_Task <ACE_MT_SYNCH> {
public:
MyThread() {}
int svc() {
std::cerr << "MyThread::svc()\n";
return 0;
}
};
int main(int argc,char **argv)
{
MyThread trd;
trd.activate();
std::cerr << "starts...\n" << std::endl;
ACE_Thread_Manager::instance()->wait();
std::cerr << "main - done.\n";
std::cin.get();
return 0;
}
Signaling in windows
The following code get stuck in window (probably so does the previous signal/wait example)
#include <iostream>
#include <ace/Task.h>
#include <ace/Mutex.h>
#include <ace/Condition_T.h>
class MyThread: public ACE_Task <ACE_MT_SYNCH> {
ACE_Mutex m_mtx;
ACE_Condition<ACE_Mutex>* m_pCond;
public:
MyThread(ACE_Condition<ACE_Mutex>* cond) : m_pCond(cond) {}
int svc() {
ACE_Guard<ACE_Mutex> guard(m_mtx);
std::cerr << "MyThread::svc()\n";
m_pCond->signal();
return 0;
}
};
int main(int argc,char **argv)
{
ACE_Mutex mtx;
ACE_Condition<ACE_Mutex> cond(mtx);
MyThread trd(&cond);
trd.activate();
std::cerr << "starts...\n" << std::endl;
cond.wait();
std::cerr << "out of wait!\n" << std::endl;
ACE_Thread_Manager::instance()->wait();
std::cerr << "main - done.\n";
std::cin.get();
return 0;
}
Consumer Producer example
The following example is taken from here with several minor modifications. I've checked it on VS and it seems to work fine.
#include <iostream>
#include <queue>
#include "ace/Synch.h"
#include "ace/OS.h"
#include "ace/Task.h"
using std::queue;
using std::cout;
using std::endl;
const long DELAY=6000000;
class Queue{
private:
queue<int> q;
ACE_Thread_Mutex _lock;
ACE_Condition<ACE_Thread_Mutex> _cond;
public:
Queue();
void enqueue(int val);
int dequeue();
};
Queue::Queue():_cond(_lock) {}
void Queue::enqueue(int val)
{
//asquering lock
ACE_Guard<ACE_Thread_Mutex> guard(_lock);
q.push(val);
//notifying threads waiting for a lock
if(q.size() == 1)
_cond.signal();
}
int Queue::dequeue()
{
//asquering lock
ACE_Guard<ACE_Thread_Mutex> guard(_lock);
while (q.empty())
//wait till the queue is not empty
//will be awaken when some thread will
//execute _cond.signal() or _cond.broadcast()
_cond.wait();
int ret = q.front();
q.pop();
return ret;
}
class Producer : public ACE_Task<ACE_MT_SYNCH>
{
private:
Queue *_q;
public:
Producer(Queue* _q);
int svc();
};
Producer::Producer(Queue* q):_q(q){}
int Producer::svc()
{
for (int i = 0 ; i<50 ; ++i){
if (i%5==0)
//sumulating slow processing
for (long j=0; j<DELAY; j++);
cout<<"Producer sent: "<<i<<endl;
cout.flush();
_q->enqueue(i);
}
return 0;
}
class Consumer : public ACE_Task<ACE_MT_SYNCH>
{
private:
Queue *_q;
public:
Consumer(Queue *_q);
int svc();
};
Consumer::Consumer(Queue *q):_q(q){}
int Consumer::svc()
{
for (int i =0; i<50; i++) {
cout<<"Consumer got: "<<_q->dequeue()<<endl;
cout.flush();
}
return 0;
}
int main(int argc,char*argv[])
{
Queue q;
Producer producer(&q);
Consumer consumer(&q);
producer.activate();
consumer.activate();
//ACE_Thread_Manager::instance() -- equvalent to Thread.currentTread
//blocks till all treads started from this thread (main thread) are done
ACE_Thread_Manager::instance()->wait();
std::cout<<"consumer and producer finished"<<std::endl;
std::cin.get();
return 0;
}
Subtle point in using mtx and condition
The following example demonstrates a subtle point. Try choosing the other definition of the type Cond and see that the waiter is stuck forever (only on Windows, on Gnu/Linux it runs fine). It seems there is importance to the fact that the guard and the condition are using the same mutex. .. still need to think about it.
#include <iostream>
#include "ace/Synch.h"
#include "ace/Task.h"
class MyCond;
// #########################################
// Choose one:
typedef MyCond Cond; // Use my home made condition
// typedef ACE_Condition<ACE_Thread_Mutex> Cond; // Use the built in condition
// #########################################
using std::cout;
using std::endl;
class MyCond {
private:
ACE_Thread_Mutex m_mtx;
ACE_Condition<ACE_Thread_Mutex> m_cond;
public:
MyCond(ACE_Thread_Mutex& dammy) // the dammy is just to have same interface as ACE_Condition<T>
: m_cond(m_mtx) {}
void signal() {
ACE_Guard<ACE_Thread_Mutex> guard(m_mtx);
m_cond.signal();
}
void wait() {
ACE_Guard<ACE_Thread_Mutex> guard(m_mtx);
m_cond.wait();
}
};
class Signaler : public ACE_Task<ACE_MT_SYNCH>
{
private:
Cond *_q;
public:
Signaler(Cond* q):_q(q){}
int svc() {
std::cerr << "Signaler: starting..\n";
_q->signal();
std::cerr << "Signaler: sent a signal.\n";
return 0;
}
};
class Waiter : public ACE_Task<ACE_MT_SYNCH>
{
private:
Cond *_q;
public:
Waiter(Cond* q):_q(q){}
int svc() {
std::cerr << "Waiter: starting..(waiting for a signal)\n";
_q->wait();
std::cerr << "Waiter: got the signal!\n";
return 0;
}
};
int main(int argc,char*argv[])
{
ACE_Thread_Mutex mtx;
Cond cond(mtx);
Signaler signaler(&cond);
Waiter waiter(&cond);
waiter.activate();
ACE_OS::sleep(1);
signaler.activate();
ACE_Thread_Manager::instance()->wait();
std::cout<<"please press enter."<<std::endl;
std::cin.get();
return 0;
}
It seems like the mutex of the condition is actually refers to the wrapper scope.. the mutex is assumed to be acquired before calling wait and is released by wait when it starts and reacquired when it is done .. this mechanism is to avoid deadlocks .. still need to think about it..
Consider the following:
{
acquire mutex
awake signal
wait #need to release before wait and acquire after it..
something
release mutex
}
If two processes are running this code, one might be waiting and blocking the scope for the awaking signal to wake him up - this is a deadlock. Thus, the wait needs to release the wrapping scope mutex to enable the signal but after the waiting is over, it needs to acquire the mutex again..
Overcoming the problem of signal-before-wait
The signal/wait mechanism of ACE's condition get stack when the signal occurs before the the wait - the signal gets lost. The following condition wrapper aims to address this problem by keeping track of the signals-without-wait.
#include <iostream>
#include <ace/Synch.h>
#include <ace/Task.h>
using std::cout;
using std::endl;
class Cond {
private:
ACE_Thread_Mutex m_mtx;
ACE_Condition<ACE_Thread_Mutex> m_cond;
int m_signalCounter;
public:
Cond() : m_cond(m_mtx),m_signalCounter(0) {}
void signal() {
ACE_Guard<ACE_Thread_Mutex> guard(m_mtx);
m_cond.signal();
++m_signalCounter;
}
void wait() {
ACE_Guard<ACE_Thread_Mutex> guard(m_mtx);
if(m_signalCounter == 0) {
cout << "real wait!\n";
m_cond.wait();
}
--m_signalCounter;
}
};
class Signaler : public ACE_Task_Base
{
private:
Cond *_q;
public:
Signaler(Cond* q):_q(q){}
int svc() {
std::cerr << "Signaler: starting..\n";
_q->signal();
std::cerr << "Signaler: sent a signal.\n";
return 0;
}
};
class Waiter : public ACE_Task<ACE_MT_SYNCH>
{
private:
Cond *_q;
public:
Waiter(Cond* q):_q(q){}
int svc() {
std::cerr << "Waiter: starting..(waiting for a signal)\n";
_q->wait();
std::cerr << "Waiter: got the signal!\n";
return 0;
}
};
int main(int argc,char*argv[])
{
ACE_Thread_Mutex mtx;
Cond cond;
Signaler signaler(&cond);
Waiter waiter(&cond);
signaler.activate();
ACE_OS::sleep(1);
waiter.activate(); // try to switch the order
signaler.wait();
waiter.wait();
std::cout<<"please press enter."<<std::endl;
std::cin.get();
return 0;
}

