OpenShot Library | libopenshot-audio 0.2.0
juce_ThreadPool.cpp
1/*
2 ==============================================================================
3
4 This file is part of the JUCE library.
5 Copyright (c) 2017 - ROLI Ltd.
6
7 JUCE is an open source library subject to commercial or open-source
8 licensing.
9
10 The code included in this file is provided under the terms of the ISC license
11 http://www.isc.org/downloads/software-support-policy/isc-license. Permission
12 To use, copy, modify, and/or distribute this software for any purpose with or
13 without fee is hereby granted provided that the above copyright notice and
14 this permission notice appear in all copies.
15
16 JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
17 EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
18 DISCLAIMED.
19
20 ==============================================================================
21*/
22
23namespace juce
24{
25
27{
29 : Thread ("Pool", stackSize), pool (p)
30 {
31 }
32
33 void run() override
34 {
35 while (! threadShouldExit())
36 if (! pool.runNextJob (*this))
37 wait (500);
38 }
39
40 std::atomic<ThreadPoolJob*> currentJob { nullptr };
41 ThreadPool& pool;
42
43 JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ThreadPoolThread)
44};
45
46//==============================================================================
47ThreadPoolJob::ThreadPoolJob (const String& name) : jobName (name)
48{
49}
50
52{
53 // you mustn't delete a job while it's still in a pool! Use ThreadPool::removeJob()
54 // to remove it first!
55 jassert (pool == nullptr || ! pool->contains (this));
56}
57
59{
60 return jobName;
61}
62
63void ThreadPoolJob::setJobName (const String& newName)
64{
65 jobName = newName;
66}
67
69{
70 shouldStop = true;
71 listeners.call ([] (Thread::Listener& l) { l.exitSignalSent(); });
72}
73
75{
76 listeners.add (listener);
77}
78
80{
81 listeners.remove (listener);
82}
83
85{
86 if (auto* t = dynamic_cast<ThreadPool::ThreadPoolThread*> (Thread::getCurrentThread()))
87 return t->currentJob.load();
88
89 return nullptr;
90}
91
92//==============================================================================
93ThreadPool::ThreadPool (int numThreads, size_t threadStackSize)
94{
95 jassert (numThreads > 0); // not much point having a pool without any threads!
96
97 createThreads (numThreads, threadStackSize);
98}
99
101{
102 createThreads (SystemStats::getNumCpus());
103}
104
106{
107 removeAllJobs (true, 5000);
108 stopThreads();
109}
110
111void ThreadPool::createThreads (int numThreads, size_t threadStackSize)
112{
113 for (int i = jmax (1, numThreads); --i >= 0;)
114 threads.add (new ThreadPoolThread (*this, threadStackSize));
115
116 for (auto* t : threads)
117 t->startThread();
118}
119
120void ThreadPool::stopThreads()
121{
122 for (auto* t : threads)
123 t->signalThreadShouldExit();
124
125 for (auto* t : threads)
126 t->stopThread (500);
127}
128
130{
131 jassert (job != nullptr);
132 jassert (job->pool == nullptr);
133
134 if (job->pool == nullptr)
135 {
136 job->pool = this;
137 job->shouldStop = false;
138 job->isActive = false;
139 job->shouldBeDeleted = deleteJobWhenFinished;
140
141 {
142 const ScopedLock sl (lock);
143 jobs.add (job);
144 }
145
146 for (auto* t : threads)
147 t->notify();
148 }
149}
150
152{
153 struct LambdaJobWrapper : public ThreadPoolJob
154 {
155 LambdaJobWrapper (std::function<ThreadPoolJob::JobStatus()> j) : ThreadPoolJob ("lambda"), job (j) {}
156 JobStatus runJob() override { return job(); }
157
158 std::function<ThreadPoolJob::JobStatus()> job;
159 };
160
161 addJob (new LambdaJobWrapper (jobToRun), true);
162}
163
164void ThreadPool::addJob (std::function<void()> jobToRun)
165{
166 struct LambdaJobWrapper : public ThreadPoolJob
167 {
168 LambdaJobWrapper (std::function<void()> j) : ThreadPoolJob ("lambda"), job (j) {}
169 JobStatus runJob() override { job(); return ThreadPoolJob::jobHasFinished; }
170
171 std::function<void()> job;
172 };
173
174 addJob (new LambdaJobWrapper (jobToRun), true);
175}
176
178{
179 return jobs.size();
180}
181
183{
184 return threads.size();
185}
186
187ThreadPoolJob* ThreadPool::getJob (int index) const noexcept
188{
189 const ScopedLock sl (lock);
190 return jobs [index];
191}
192
193bool ThreadPool::contains (const ThreadPoolJob* job) const noexcept
194{
195 const ScopedLock sl (lock);
196 return jobs.contains (const_cast<ThreadPoolJob*> (job));
197}
198
199bool ThreadPool::isJobRunning (const ThreadPoolJob* job) const noexcept
200{
201 const ScopedLock sl (lock);
202 return jobs.contains (const_cast<ThreadPoolJob*> (job)) && job->isActive;
203}
204
206{
207 const ScopedLock sl (lock);
208
209 auto index = jobs.indexOf (const_cast<ThreadPoolJob*> (job));
210
211 if (index > 0 && ! job->isActive)
212 jobs.move (index, 0);
213}
214
216{
217 if (job != nullptr)
218 {
219 auto start = Time::getMillisecondCounter();
220
221 while (contains (job))
222 {
223 if (timeOutMs >= 0 && Time::getMillisecondCounter() >= start + (uint32) timeOutMs)
224 return false;
225
226 jobFinishedSignal.wait (2);
227 }
228 }
229
230 return true;
231}
232
234{
235 bool dontWait = true;
237
238 if (job != nullptr)
239 {
240 const ScopedLock sl (lock);
241
242 if (jobs.contains (job))
243 {
244 if (job->isActive)
245 {
247 job->signalJobShouldExit();
248
249 dontWait = false;
250 }
251 else
252 {
254 addToDeleteList (deletionList, job);
255 }
256 }
257 }
258
260}
261
264{
266
267 {
269
270 {
271 const ScopedLock sl (lock);
272
273 for (int i = jobs.size(); --i >= 0;)
274 {
275 auto* job = jobs.getUnchecked(i);
276
277 if (selectedJobsToRemove == nullptr || selectedJobsToRemove->isJobSuitable (job))
278 {
279 if (job->isActive)
280 {
282
284 job->signalJobShouldExit();
285 }
286 else
287 {
288 jobs.remove (i);
289 addToDeleteList (deletionList, job);
290 }
291 }
292 }
293 }
294 }
295
296 auto start = Time::getMillisecondCounter();
297
298 for (;;)
299 {
300 for (int i = jobsToWaitFor.size(); --i >= 0;)
301 {
302 auto* job = jobsToWaitFor.getUnchecked (i);
303
304 if (! isJobRunning (job))
306 }
307
308 if (jobsToWaitFor.size() == 0)
309 break;
310
311 if (timeOutMs >= 0 && Time::getMillisecondCounter() >= start + (uint32) timeOutMs)
312 return false;
313
314 jobFinishedSignal.wait (20);
315 }
316
317 return true;
318}
319
321{
323 const ScopedLock sl (lock);
324
325 for (auto* job : jobs)
326 if (job->isActive || ! onlyReturnActiveJobs)
327 s.add (job->getJobName());
328
329 return s;
330}
331
333{
334 bool ok = true;
335
336 for (auto* t : threads)
337 if (! t->setPriority (newPriority))
338 ok = false;
339
340 return ok;
341}
342
343ThreadPoolJob* ThreadPool::pickNextJobToRun()
344{
346
347 {
348 const ScopedLock sl (lock);
349
350 for (int i = 0; i < jobs.size(); ++i)
351 {
352 if (auto* job = jobs[i])
353 {
354 if (! job->isActive)
355 {
356 if (job->shouldStop)
357 {
358 jobs.remove (i);
359 addToDeleteList (deletionList, job);
360 --i;
361 continue;
362 }
363
364 job->isActive = true;
365 return job;
366 }
367 }
368 }
369 }
370
371 return nullptr;
372}
373
374bool ThreadPool::runNextJob (ThreadPoolThread& thread)
375{
376 if (auto* job = pickNextJobToRun())
377 {
378 auto result = ThreadPoolJob::jobHasFinished;
379 thread.currentJob = job;
380
381 try
382 {
383 result = job->runJob();
384 }
385 catch (...)
386 {
387 jassertfalse; // Your runJob() method mustn't throw any exceptions!
388 }
389
390 thread.currentJob = nullptr;
391
392 OwnedArray<ThreadPoolJob> deletionList;
393
394 {
395 const ScopedLock sl (lock);
396
397 if (jobs.contains (job))
398 {
399 job->isActive = false;
400
401 if (result != ThreadPoolJob::jobNeedsRunningAgain || job->shouldStop)
402 {
403 jobs.removeFirstMatchingValue (job);
404 addToDeleteList (deletionList, job);
405
406 jobFinishedSignal.signal();
407 }
408 else
409 {
410 // move the job to the end of the queue if it wants another go
411 jobs.move (jobs.indexOf (job), -1);
412 }
413 }
414 }
415
416 return true;
417 }
418
419 return false;
420}
421
422void ThreadPool::addToDeleteList (OwnedArray<ThreadPoolJob>& deletionList, ThreadPoolJob* job) const
423{
424 job->shouldStop = true;
425 job->pool = nullptr;
426
427 if (job->shouldBeDeleted)
428 deletionList.add (job);
429}
430
431} // namespace juce
Holds a resizable array of primitive or copy-by-value objects.
Definition juce_Array.h:60
ElementType getUnchecked(int index) const
Returns one of the elements in the array, without checking the index passed in.
Definition juce_Array.h:256
int size() const noexcept
Returns the current number of elements in the array.
Definition juce_Array.h:219
void removeFirstMatchingValue(ParameterType valueToRemove)
Removes an item from the array.
Definition juce_Array.h:791
void remove(int indexToRemove)
Removes an element from the array.
Definition juce_Array.h:724
int indexOf(ParameterType elementToLookFor) const
Finds the index of the first element which matches the value passed in.
Definition juce_Array.h:339
void add(const ElementType &newElement)
Appends a new element at the end of the array.
Definition juce_Array.h:375
bool contains(ParameterType elementToLookFor) const
Returns true if the array contains at least one occurrence of an object.
Definition juce_Array.h:357
void move(int currentIndex, int newIndex) noexcept
Moves one of the values to a different position.
Definition juce_Array.h:991
A special array for holding a list of strings.
The JUCE String class!
Definition juce_String.h:43
static int getNumCpus() noexcept
Returns the number of logical CPU cores.
A task that is executed by a ThreadPool object.
void signalJobShouldExit()
Calling this will cause the shouldExit() method to return true, and the job should (if it's been impl...
JobStatus
These are the values that can be returned by the runJob() method.
@ jobHasFinished
indicates that the job has finished and can be removed from the pool.
@ jobNeedsRunningAgain
indicates that the job would like to be called again when a thread is free.
void setJobName(const String &newName)
Changes the job's name.
String getJobName() const
Returns the name of this job.
void addListener(Thread::Listener *)
Add a listener to this thread job which will receive a callback when signalJobShouldExit was called o...
virtual ~ThreadPoolJob()
Destructor.
static ThreadPoolJob * getCurrentThreadPoolJob()
If the calling thread is being invoked inside a runJob() method, this will return the ThreadPoolJob t...
void removeListener(Thread::Listener *)
Removes a listener added with addListener.
ThreadPoolJob(const String &name)
Creates a thread pool job object.
A callback class used when you need to select which ThreadPoolJob objects are suitable for some kind ...
A set of threads that will run a list of jobs.
void moveJobToFront(const ThreadPoolJob *jobToMove) noexcept
If the given job is in the queue, this will move it to the front so that it is the next one to be exe...
void addJob(ThreadPoolJob *job, bool deleteJobWhenFinished)
Adds a job to the queue.
int getNumThreads() const noexcept
Returns the number of threads assigned to this thread pool.
ThreadPoolJob * getJob(int index) const noexcept
Returns one of the jobs in the queue.
int getNumJobs() const noexcept
Returns the number of jobs currently running or queued.
bool removeAllJobs(bool interruptRunningJobs, int timeOutMilliseconds, JobSelector *selectedJobsToRemove=nullptr)
Tries to remove all jobs from the pool.
~ThreadPool()
Destructor.
StringArray getNamesOfAllJobs(bool onlyReturnActiveJobs) const
Returns a list of the names of all the jobs currently running or queued.
bool isJobRunning(const ThreadPoolJob *job) const noexcept
Returns true if the given job is currently being run by a thread.
bool setThreadPriorities(int newPriority)
Changes the priority of all the threads.
bool removeJob(ThreadPoolJob *job, bool interruptIfRunning, int timeOutMilliseconds)
Tries to remove a job from the pool.
bool contains(const ThreadPoolJob *job) const noexcept
Returns true if the given job is currently queued or running.
bool waitForJobToFinish(const ThreadPoolJob *job, int timeOutMilliseconds) const
Waits until a job has finished running and has been removed from the pool.
ThreadPool()
Creates a thread pool with one thread per CPU core.
Used to receive callbacks for thread exit calls.
Encapsulates a thread.
Definition juce_Thread.h:47
static Thread *JUCE_CALLTYPE getCurrentThread()
Finds the thread object that is currently running.
bool wait(int timeOutMilliseconds) const
Suspends the execution of this thread until either the specified timeout period has elapsed,...
bool threadShouldExit() const
Checks whether the thread has been told to stop running.
static uint32 getMillisecondCounter() noexcept
Returns the number of millisecs since a fixed event (usually system startup).
bool wait(int timeOutMilliseconds=-1) const noexcept
Suspends the calling thread until the event has been signalled.
void signal() const noexcept
Wakes up any threads that are currently waiting on this object.
void run() override
Must be implemented to perform the thread's actual code.