std.parallelism
std.parallelism implements high-level primitives for SMP parallelism. These include parallel foreach, parallel reduce, parallel eager map, pipelining and future/promise parallelism. std.parallelism is recommended when the same operation is to be executed in parallel on different data, or when a function is to be executed in a background thread and its result returned to a well-defined main thread. For communication between arbitrary threads, see std.concurrency.
std.parallelism is based on the concept of a Task. A Task is an object that represents the fundamental unit of work in this library and may be executed in parallel with any other Task. Using Task directly allows programming with a future/promise paradigm. All other supported parallelism paradigms (parallel foreach, map, reduce, pipelining) represent an additional level of abstraction over Task. They automatically create one or more Task objects, or closely related types that are conceptually identical but not part of the public API.
After creation, a Task may be executed in a new thread, or submitted to a TaskPool for execution. A TaskPool encapsulates a task queue and its worker threads. Its purpose is to efficiently map a large number of Tasks onto a smaller number of threads. A task queue is a FIFO queue of Task objects that have been submitted to the TaskPool and are awaiting execution. A worker thread is a thread that is associated with exactly one task queue. It executes the Task at the front of its queue when the queue has work available, or sleeps when no work is available. Each task queue is associated with zero or more worker threads. If the result of a Task is needed before execution by a worker thread has begun, the Task can be removed from the task queue and executed immediately in the thread where the result is needed.
Warning: Unless marked as @trusted or @safe, artifacts in this module allow implicit data sharing between threads and cannot guarantee that client code is free from low level data races.
Source: std/parallelism.d Author: David Simcha
Copyright
Module Deinitializers 1
()Types 10
AbstractTask * prevAbstractTask * nextvoid function(void *) runTaskThrowable exceptionubyte taskStatusTask represents the fundamental unit of work. A Task may be executed in parallel with any other Task. Using this struct directly allows future/promise parallelism. In this paradigm, a function (or delegate or other callable) is executed in a thread other than the one it was called from. The calling thread does not block while the function is being executed. A call to workForce, yieldForce, or spinForce is used to ensure that the Task has finished executing and to obtain the return value, if any. These functions and done also act as full memory barriers, meaning that any memory writes made in the thread that executed the Task are guaranteed to be visible in the calling thread after one of these functions returns.
The task and scopedTask functions can be used to create an instance of this struct. See task for usage examples.
Function results are returned from yieldForce, spinForce and workForce by ref. If fun returns by ref, the reference will point to the returned reference of fun. Otherwise it will point to a field in this struct.
Copying of this struct is disabled, since it would provide no useful semantics. If you want to pass this struct around, you should do so by reference or pointer.
Bugs
ref and out arguments are not propagated to the
call site, only to args in this struct.
void impl(void * myTask)void enforcePool()ReturnType spinForce() @property ref @trustedIf the `Task` isn't started yet, execute it in the current thread. If it's done, return its return value, if any. If it's in progress, busy spin until it's done, then return the return value. If ...ReturnType yieldForce() @property ref @trustedIf the `Task` isn't started yet, execute it in the current thread. If it's done, return its return value, if any. If it's in progress, wait on a condition variable. If it threw an exception, reth...ReturnType workForce() @property ref @trustedIf this `Task` was not started yet, execute it in the current thread. If it is finished, return its result. If it is in progress, execute any other `Task` from the `TaskPool` instance that this `...void executeInNewThread() @trustedCreate a new thread for executing this `Task`, execute it in the newly created thread, then terminate the thread. This can be used for future/promise parallelism. An explicit priority may be give...The total number of CPU cores available on the current machine, as reported by the operating system.
This class encapsulates a task queue and a set of worker threads. Its purpose is to efficiently map a large number of Tasks onto a smaller number of threads. A task queue is a FIFO queue of Task objects that have been submitted to the TaskPool and are awaiting execution. A worker thread is a thread that executes the Task at the front of the queue when one is available and sleeps when the queue is empty.
This class should usually be used via the global instantiation available via the taskPool property. Occasionally it is useful to explicitly instantiate a TaskPool:
- When you want
TaskPoolinstances with multiple priorities, for example
a low priority pool and a high priority pool.
- When the threads in the global task pool are waiting on a synchronization
primitive (for example a mutex), and you want to parallelize the code that needs to run before these threads can be resumed.
Note
stop or finish is called, even if the main thread has finished already. This may lead to programs that never end. If you do not want this behaviour, you can set isDaemon to true.
bool isSingleTaskParallelismThread[] poolThread singleTaskThreadAbstractTask * headAbstractTask * tailPoolState statusCondition workerConditionCondition waiterConditionMutex queueMutexMutex waiterMutexsize_t nextInstanceIndexsize_t threadIndexsize_t instanceStartIndexsize_t nextThreadIndexvoid doJob(AbstractTask * job)void doSingleTask()void startWorkLoop()void executeWorkLoop()AbstractTask * pop()void abstractPut(AbstractTask * task)void abstractPutNoSync(AbstractTask * task)void abstractPutGroupNoSync(AbstractTask * h, AbstractTask * t)void tryDeleteExecute(AbstractTask * toExecute)bool deleteItem(AbstractTask * item)bool deleteItemNoSync(AbstractTask * item)void queueLock()void queueUnlock()void waiterLock()void waiterUnlock()void wait()void notify()void notifyAll()void waitUntilCompletion()void notifyWaiters()ParallelForeach!R parallel(R)(R range, size_t workUnitSize)Implements a parallel foreach loop over a range. This works by implicitly creating and submitting one `Task` to the `TaskPool` for each worker thread. A work unit is a set of consecutive elements...ParallelForeach!R parallel(R)(R range)Dittoauto asyncBuf(S)(S source, size_t bufSize = 100) if (isInputRange!S)Given a `source` range that is expensive to iterate over, returns an input range that asynchronously buffers the contents of `source` into a buffer of `bufSize` elements in a worker thread, while m...auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100) if (is(typeof(C2.init()) : bool) &&
Parameters!C1.length == 1 &&
Parameters!C2.length == 0 &&
isArray!(Parameters!C1[0])
)Given a callable object `next` that writes to a user-provided buffer and a second callable object `empty` that determines whether more data is available to write via `next`, returns an input range ...size_t workerIndex() @property @safe const nothrowGets the index of the current thread relative to this `TaskPool`. Any thread not in this pool will receive an index of 0. The worker threads in this pool receive unique indices of 1 through `this...WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init)Creates an instance of worker-local storage, initialized with a given value. The value is `lazy` so that you can, for example, easily create one instance of a class for each worker. For usage exa...void stop() @trustedSignals to all worker threads to terminate as soon as they are finished with their current `Task`, or immediately if they are not executing a `Task`. `Task`s that were in queue will not be execute...void finish(bool blocking = false) @trustedSignals worker threads to terminate when the queue becomes empty.void put(alias fun, Args...)(ref Task!(fun, Args) task) if (!isSafeReturn!(typeof(task)))Put a `Task` object on the back of the task queue. The `Task` object may be passed by pointer or reference.bool isDaemon() @property @trustedThese properties control whether the worker threads are daemon threads. A daemon thread is automatically terminated when all non-daemon threads have terminated. A non-daemon thread will prevent a ...int priority() @property @trustedThese functions allow getting and setting the OS scheduling priority of the worker threads in this `TaskPool`. They forward to `core.thread.Thread.priority`, so a given priority value here means t...this(AbstractTask * task, int priority = int.max)this()Default constructor that initializes a `TaskPool` with `totalCPUs` - 1 worker threads. The minus 1 is included because the main thread will also be available to do work.this(size_t nWorkers)Allows for custom number of worker threads.PoolStateamap(functions...)map(functions...)reduce(functions...)fold(functions...)WorkerLocalStorageStruct for creating worker-local storage. Worker-local storage is thread-local storage that exists only for worker threads in a given `TaskPool` plus a single thread outside the pool. It is alloc...WorkerLocalStorageRangeRange primitives for worker-local storage. The purpose of this is to access results produced by each worker thread from a single thread once you are no longer using the worker-local storage from m...this()Functions 21
T __lazilyInitializedConstant(T, alias outOfBandValue, alias initializer)() if (is(Unqual!T : T)
&& is(typeof(initializer()) : T)
&& is(typeof(outOfBandValue) : T)) @property purevoid atomicSetUbyte(T)(ref T stuff, T newVal) if (__traits(isIntegral, T) && is(T : ubyte))bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal) if (__traits(isIntegral, T) && is(T : ubyte))ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args)auto task(alias fun, Args...)(Args args)Creates a `Task` on the GC heap that calls an alias. This may be executed via `Task.executeInNewThread` or by submitting to a TaskPool. A globally accessible instance of `TaskPool` is provided by...auto task(F, Args...)(F delegateOrFp, Args args) if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)Creates a `Task` on the GC heap that calls a function pointer, delegate, or class/struct with overloaded opCall.@trusted auto task(F, Args...)(F fun, Args args) if (__traits(compiles, () @safe => fun(args)) && isSafeTask!F)Version of `task` usable from `@safe` code. Usage mechanics are identical to the non-@safe case, but safety introduces some restrictions:auto scopedTask(alias fun, Args...)(Args args)These functions allow the creation of `Task` objects on the stack rather than the GC heap. The lifetime of a `Task` created by `scopedTask` cannot exceed the lifetime of the scope it was created in.auto scopedTask(F, Args...)(scope F delegateOrFp, Args args) if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)Ditto@trusted auto scopedTask(F, Args...)(F fun, Args args) if (__traits(compiles, () @safe => fun(args)) && isSafeTask!F)DittoTaskPool taskPool() @property @trustedReturns a lazily initialized global instantiation of `TaskPool`. This function can safely be called concurrently from multiple non-worker threads. The worker threads in this pool are daemon thread...uint defaultPoolThreads() @property @trustedThese properties get and set the number of worker threads in the `TaskPool` instance returned by `taskPool`. The default value is `totalCPUs` - 1. Calling the setter after the first call to `taskP...ParallelForeach!R parallel(R)(R range)Convenience functions that forwards to `taskPool.parallel`. The purpose of these is to make parallel foreach less verbose and more readable.void foreachErr()int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg)Variables 3
uint _defaultPoolThreadsparallelApplyMixinRandomAccess = q{
// Handle empty thread pool as special case.
if (pool.size == 0)
{
return doSizeZeroCase(this, dg);
}
// Whether iteration is with or without an index variable.
enum withIndex = Parameters!(typeof(dg)).length == 2;
shared size_t workUnitIndex = size_t.max; // Effectively -1: chunkIndex + 1 == 0
immutable len = range.length;
if (!len) return 0;
shared bool shouldContinue = true;
void doIt()
{
import std.algorithm.comparison : min;
scope(failure)
{
// If an exception is thrown, all threads should bail.
atomicStore(shouldContinue, false);
}
while (atomicLoad(shouldContinue))
{
immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1);
immutable start = workUnitSize * myUnitIndex;
if (start >= len)
{
atomicStore(shouldContinue, false);
break;
}
immutable end = min(len, start + workUnitSize);
foreach (i; start .. end)
{
static if (withIndex)
{
if (dg(i, range[i])) foreachErr();
}
else
{
if (dg(range[i])) foreachErr();
}
}
}
}
submitAndExecute(pool, &doIt);
return 0;
}parallelApplyMixinInputRange = q{
// Handle empty thread pool as special case.
if (pool.size == 0)
{
return doSizeZeroCase(this, dg);
}
// Whether iteration is with or without an index variable.
enum withIndex = Parameters!(typeof(dg)).length == 2;
// This protects the range while copying it.
auto rangeMutex = new Mutex();
shared bool shouldContinue = true;
// The total number of elements that have been popped off range.
// This is updated only while protected by rangeMutex;
size_t nPopped = 0;
static if (
is(typeof(range.buf1)) &&
is(typeof(range.bufPos)) &&
is(typeof(range.doBufSwap()))
)
{
// Make sure we don't have the buffer recycling overload of
// asyncBuf.
static if (
is(typeof(range.source)) &&
isRoundRobin!(typeof(range.source))
)
{
static assert(0, "Cannot execute a parallel foreach loop on " ~
"the buffer recycling overload of asyncBuf.");
}
enum bool bufferTrick = true;
}
else
{
enum bool bufferTrick = false;
}
void doIt()
{
scope(failure)
{
// If an exception is thrown, all threads should bail.
atomicStore(shouldContinue, false);
}
static if (hasLvalueElements!R)
{
alias Temp = ElementType!R*[];
Temp temp;
// Returns: The previous value of nPopped.
size_t makeTemp()
{
import std.algorithm.internal : addressOf;
import std.array : uninitializedArray;
if (temp is null)
{
temp = uninitializedArray!Temp(workUnitSize);
}
rangeMutex.lock();
scope(exit) rangeMutex.unlock();
size_t i = 0;
for (; i < workUnitSize && !range.empty; range.popFront(), i++)
{
temp[i] = addressOf(range.front);
}
temp = temp[0 .. i];
auto ret = nPopped;
nPopped += temp.length;
return ret;
}
}
else
{
alias Temp = ElementType!R[];
Temp temp;
// Returns: The previous value of nPopped.
static if (!bufferTrick) size_t makeTemp()
{
import std.array : uninitializedArray;
if (temp is null)
{
temp = uninitializedArray!Temp(workUnitSize);
}
rangeMutex.lock();
scope(exit) rangeMutex.unlock();
size_t i = 0;
for (; i < workUnitSize && !range.empty; range.popFront(), i++)
{
temp[i] = range.front;
}
temp = temp[0 .. i];
auto ret = nPopped;
nPopped += temp.length;
return ret;
}
static if (bufferTrick) size_t makeTemp()
{
import std.algorithm.mutation : swap;
rangeMutex.lock();
scope(exit) rangeMutex.unlock();
// Elide copying by just swapping buffers.
temp.length = range.buf1.length;
swap(range.buf1, temp);
// This is necessary in case popFront() has been called on
// range before entering the parallel foreach loop.
temp = temp[range.bufPos..$];
static if (is(typeof(range._length)))
{
range._length -= (temp.length - range.bufPos);
}
range.doBufSwap();
auto ret = nPopped;
nPopped += temp.length;
return ret;
}
}
while (atomicLoad(shouldContinue))
{
auto overallIndex = makeTemp();
if (temp.empty)
{
atomicStore(shouldContinue, false);
break;
}
foreach (i; 0 .. temp.length)
{
scope(success) overallIndex++;
static if (hasLvalueElements!R)
{
static if (withIndex)
{
if (dg(overallIndex, *temp[i])) foreachErr();
}
else
{
if (dg(*temp[i])) foreachErr();
}
}
else
{
static if (withIndex)
{
if (dg(overallIndex, temp[i])) foreachErr();
}
else
{
if (dg(temp[i])) foreachErr();
}
}
}
}
}
submitAndExecute(pool, &doIt);
return 0;
}