QTrk
threads.h
Go to the documentation of this file.
1 #pragma once
2 #include <list>
4 #ifdef USE_PTHREADS
5 
6 #include "pthread.h"
7 
8 struct Threads
9 {
10  struct Handle;
11 
12  pthread_attr_t joinable_attr;
13 
14  struct Mutex {
15  pthread_mutex_t h;
16  Mutex() { pthread_mutex_init(&h, 0); }
17  ~Mutex() { pthread_mutex_destroy(&h); }
18  void lock() {
19  pthread_mutex_lock(&h); }
20  void unlock() { pthread_mutex_unlock(&h); }
21  };
22 
23  static Handle* Create(DWORD (WINAPI *method)(void* param), void* param) {
24  pthread_t h;
25  pthread_attr_t joinable_attr;
26  pthread_attr_init(&joinable_attr);
27  pthread_attr_setdetachstate(&joinable_attr, PTHREAD_CREATE_JOINABLE);
28  pthread_create(&h, &joinable_attr, method, param);
29  if (!h) {
30  throw std::runtime_error("Failed to create processing thread.");
31  }
32 
33  pthread_attr_destroy(&joinable_attr);
34  return (Handle*)h;
35  }
36 
37  static void WaitAndClose(Handle* h) {
38  pthread_join((pthread_t)h, 0);
39  }
40 };
41 
42 
43 #else
44 
45 #include <Windows.h>
46 #undef AddJob
47 #undef Sleep
48 #undef max
49 #undef min
50 
51 struct Threads
52 {
53  typedef void (*ThreadEntryPoint)(void* param);
54  struct Handle {
55  DWORD threadID;
57  HANDLE winhdl;
58  void* param;
59  };
60 
61  struct Mutex {
62  HANDLE h;
63  std::string name;
64  bool trace;
65  int lockCount;
66 
67  Mutex(const char*name=0) : name(name?name:"") {
68  msg("create");
69  h=CreateMutex(0,FALSE,0);
70  trace=false;
71  lockCount=0;
72  }
73  ~Mutex() { msg("end"); CloseHandle(h); }
74  void lock() {
75  msg("lock");
76  WaitForSingleObject(h, INFINITE);
77  lockCount++;
78  }
79  void unlock() {
80  msg("unlock");
81  lockCount--;
82  ReleaseMutex(h);
83  }
84  void msg(const char* m) {
85  if(name.length()>0 && trace) {
86  char buf[32];
87  SNPRINTF(buf, sizeof(buf), "mutex %s: %s\n", name.c_str(), m);
88  OutputDebugString(buf);
89  }
90  }
91  };
92 
93  static DWORD WINAPI ThreadCaller (void *param) {
94  Handle* hdl = (Handle*)param;
95  hdl->callback (hdl->param);
96  return 0;
97  }
98 
99  static Handle* Create(ThreadEntryPoint method, void* param) {
100  Handle* hdl = new Handle;
101  hdl->param = param;
102  hdl->callback = method;
103  hdl->winhdl = CreateThread(0, 0, ThreadCaller, hdl, 0, &hdl->threadID);
104 
105  if (!hdl->winhdl) {
106  throw std::runtime_error("Failed to create processing thread.");
107  }
108  return hdl;
109  }
110 
111  static bool RunningVistaOrBetter ()
112  {
113  OSVERSIONINFO v;
114  GetVersionEx(&v);
115  return v.dwMajorVersion >= 6;
116  }
117 
118  static void SetBackgroundPriority(Handle* thread, bool bg)
119  {
120  HANDLE h = (HANDLE)thread;
121  // >= Windows Vista
122  if (RunningVistaOrBetter())
123  SetThreadPriority(h, bg ? THREAD_MODE_BACKGROUND_BEGIN : THREAD_MODE_BACKGROUND_END);
124  else
125  SetThreadPriority(h, bg ? THREAD_PRIORITY_BELOW_NORMAL : THREAD_PRIORITY_NORMAL);
126  }
127 
128  static void WaitAndClose(Handle* h) {
129  WaitForSingleObject(h->winhdl, INFINITE);
130  CloseHandle(h->winhdl);
131  delete h;
132  }
133 
134  static void Sleep(int ms) {
135  ::Sleep(ms);
136  }
137 
138  static int GetCPUCount() {
139  // preferably
140  #ifdef WIN32
141  SYSTEM_INFO sysInfo;
142  GetSystemInfo(&sysInfo);
143  return sysInfo.dwNumberOfProcessors;
144  #else
145  return 4;
146  #endif
147  }
148 };
149 
151 
152 
153 #endif
154 
155 template<typename T>
156 class Atomic {
157  mutable Threads::Mutex m;
158  T data;
159 public:
160  Atomic(const T& o=T()) : data(o) {} // no need for locking: the object is not allowed to be used before the constructor is done anyway
161  operator T() const { return get(); };
162  Atomic& operator=(const T& x) { set(x); return *this; }
163  void set(const T& x) {
164  m.lock();
165  data=x;
166  m.unlock();
167  }
168  T get() const {
169  m.lock();
170  T x=data;
171  m.unlock();
172  return x;
173  }
174 };
175 
176 template<typename TWorkItem, typename TFunctor>
177 class ThreadPool {
178 public:
179  ThreadPool(TFunctor f, int Nthreads=-1) : worker(f) {
180  if (Nthreads<0)
181  Nthreads = Threads::GetCPUCount();
182  threads.resize(Nthreads);
183  quit=false;
184  inProgress=0;
185  for (int i=0;i<Nthreads;i++)
186  threads[i]=Threads::Create(&ThreadEntryPoint,this);
187  }
189  Quit();
190  }
191  void ProcessArray(TWorkItem* items, int n) {
192  for(int i=0;i<n;i++)
193  AddWork(items[i]);
194  }
195  void AddWork(TWorkItem w) {
196  workMutex.lock();
197  work.push_back(w);
198  workMutex.unlock();
199  }
200  void WaitUntilDone() {
201  while(!IsDone()) Threads::Sleep(1);
202  }
203  bool IsDone() {
204  workMutex.lock();
205  bool r=work.empty() && inProgress==0;
206  workMutex.unlock();
207  return r;
208  }
209  void Quit() {
210  quit=true;
211  for(uint i=0;i<threads.size();i++)
212  Threads::WaitAndClose(threads[i]);
213  threads.clear();
214  }
215 protected:
216  static void ThreadEntryPoint(void *param) {
217  ThreadPool* pool = ( ThreadPool *)param;
218  TWorkItem item;
219  while (!pool->quit) {
220  if ( pool->GetNewItem(item) ) {
221  pool->worker(item);
222  pool->ItemDone();
223  } else Threads::Sleep(1);
224  }
225  }
226  void ItemDone() {
227  workMutex.lock();
228  inProgress--;
229  workMutex.unlock();
230  }
231  bool GetNewItem(TWorkItem& item) {
232  workMutex.lock();
233  bool r = !work.empty();
234  if (r) {
235  item = work.front();
236  work.pop_front();
237  inProgress++;
238  }
239  workMutex.unlock();
240  return r;
241  }
242  std::vector<Threads::Handle*> threads;
244  std::list<TWorkItem> work;
247  TFunctor worker;
248 };
249 
250 template<typename TF>
251 void parallel_for(int count, TF f) {
252 
253  if (count == 1)
254  f(0);
255  else {
256  ThreadPool<int, TF> threadPool(f, std::min (count, Threads::GetCPUCount()) );
257  for (int i=0;i<count;i++) threadPool.AddWork(i);
258  threadPool.WaitUntilDone();
259  }
260 }
static bool RunningVistaOrBetter()
Definition: threads.h:111
Atomic & operator=(const T &x)
Definition: threads.h:162
ThreadPool(TFunctor f, int Nthreads=-1)
Definition: threads.h:179
int inProgress
Definition: threads.h:245
Mutex(const char *name=0)
Definition: threads.h:67
#define SNPRINTF
Definition: std_incl.h:149
void lock()
Definition: threads.h:74
unsigned int uint
Definition: std_incl.h:127
std::list< TWorkItem > work
Definition: threads.h:244
void WaitUntilDone()
Definition: threads.h:200
void ProcessArray(TWorkItem *items, int n)
Definition: threads.h:191
bool trace
Definition: threads.h:64
HANDLE winhdl
Definition: threads.h:57
void(* ThreadEntryPoint)(void *param)
Definition: threads.h:53
static Handle * Create(ThreadEntryPoint method, void *param)
Definition: threads.h:99
std::string name
Definition: threads.h:63
bool IsDone()
Definition: threads.h:203
static void ThreadEntryPoint(void *param)
Definition: threads.h:216
DWORD threadID
Definition: threads.h:55
TFunctor worker
Definition: threads.h:247
void parallel_for(int count, TF f)
Definition: threads.h:251
void Quit()
Definition: threads.h:209
std::vector< Threads::Handle * > threads
Definition: threads.h:242
static int GetCPUCount()
Definition: threads.h:138
bool GetNewItem(TWorkItem &item)
Definition: threads.h:231
Thread OS related code is abstracted into a simple "Threads" struct.
Definition: threads.h:51
static DWORD WINAPI ThreadCaller(void *param)
Definition: threads.h:93
Atomic< bool > quit
Definition: threads.h:246
static void WaitAndClose(Handle *h)
Definition: threads.h:128
Threads::Handle ThreadHandle
Definition: threads.h:150
void * param
Definition: threads.h:58
HANDLE h
Definition: threads.h:62
void AddWork(TWorkItem w)
Definition: threads.h:195
void msg(const char *m)
Definition: threads.h:84
Atomic(const T &o=T())
Definition: threads.h:160
~ThreadPool()
Definition: threads.h:188
static void SetBackgroundPriority(Handle *thread, bool bg)
Definition: threads.h:118
T data
Definition: threads.h:158
static void Sleep(int ms)
Definition: threads.h:134
Threads::Mutex workMutex
Definition: threads.h:243
void unlock()
Definition: threads.h:79
int lockCount
Definition: threads.h:65
void ItemDone()
Definition: threads.h:226
ThreadEntryPoint callback
Definition: threads.h:56
Threads::Mutex m
Definition: threads.h:157