std.parallelism

std.parallelism implements high-level primitives for SMP parallelism. These include parallel foreach, parallel reduce, parallel eager map, pipelining and future/promise parallelism. std.parallelism is recommended when the same operation is to be executed in parallel on different data, or when a function is to be executed in a background thread and its result returned to a well-defined main thread. For communication between arbitrary threads, see std.concurrency.

std.parallelism is based on the concept of a Task. A Task is an object that represents the fundamental unit of work in this library and may be executed in parallel with any other Task. Using Task directly allows programming with a future/promise paradigm. All other supported parallelism paradigms (parallel foreach, map, reduce, pipelining) represent an additional level of abstraction over Task. They automatically create one or more Task objects, or closely related types that are conceptually identical but not part of the public API.

After creation, a Task may be executed in a new thread, or submitted to a TaskPool for execution. A TaskPool encapsulates a task queue and its worker threads. Its purpose is to efficiently map a large number of Tasks onto a smaller number of threads. A task queue is a FIFO queue of Task objects that have been submitted to the TaskPool and are awaiting execution. A worker thread is a thread that is associated with exactly one task queue. It executes the Task at the front of its queue when the queue has work available, or sleeps when no work is available. Each task queue is associated with zero or more worker threads. If the result of a Task is needed before execution by a worker thread has begun, the Task can be removed from the task queue and executed immediately in the thread where the result is needed.

Warning: Unless marked as @trusted or @safe, artifacts in this module allow implicit data sharing between threads and cannot guarantee that client code is free from low level data races.

Source: std/parallelism.d Author: David Simcha

Module Deinitializers 1

shared static ~this()

Types 10

aliascacheLineSize = __lazilyInitializedConstant!(immutable(size_t), size_t.max, cacheLineSizeImpl)
private enumTaskStatus : ubyte
notStarted
inProgress
done
private structAbstractTask
Fields
void function(void *) runTask
Throwable exception
ubyte taskStatus
Methods
bool done() @property
void job()
structTask(alias fun, Args...)

Task represents the fundamental unit of work. A Task may be executed in parallel with any other Task. Using this struct directly allows future/promise parallelism. In this paradigm, a function (or delegate or other callable) is executed in a thread other than the one it was called from. The calling thread does not block while the function is being executed. A call to workForce, yieldForce, or spinForce is used to ensure that the Task has finished executing and to obtain the return value, if any. These functions and done also act as full memory barriers, meaning that any memory writes made in the thread that executed the Task are guaranteed to be visible in the calling thread after one of these functions returns.

The task and scopedTask functions can be used to create an instance of this struct. See task for usage examples.

Function results are returned from yieldForce, spinForce and workForce by ref. If fun returns by ref, the reference will point to the returned reference of fun. Otherwise it will point to a field in this struct.

Copying of this struct is disabled, since it would provide no useful semantics. If you want to pass this struct around, you should do so by reference or pointer.

Bugs

Changes to ref and out arguments are not propagated to the

call site, only to args in this struct.

Fields
private AbstractTask base
private TaskPool pool
private bool isScoped
Args _args
Methods
private AbstractTask * basePtr() @property
private void impl(void * myTask)
private void enforcePool()
ReturnType spinForce() @property ref @trustedIf the `Task` isn't started yet, execute it in the current thread. If it's done, return its return value, if any. If it's in progress, busy spin until it's done, then return the return value. If ...
ReturnType yieldForce() @property ref @trustedIf the `Task` isn't started yet, execute it in the current thread. If it's done, return its return value, if any. If it's in progress, wait on a condition variable. If it threw an exception, reth...
ReturnType workForce() @property ref @trustedIf this `Task` was not started yet, execute it in the current thread. If it is finished, return its result. If it is in progress, execute any other `Task` from the `TaskPool` instance that this `...
bool done() @property @trustedReturns `true` if the `Task` is finished executing.
void executeInNewThread() @trustedCreate a new thread for executing this `Task`, execute it in the newly created thread, then terminate the thread. This can be used for future/promise parallelism. An explicit priority may be give...
void executeInNewThread(int priority) @trustedDitto
Destructors
aliastotalCPUs = __lazilyInitializedConstant!(immutable(uint), uint.max, totalCPUsImpl)

The total number of CPU cores available on the current machine, as reported by the operating system.

private classParallelismThread : Thread
Fields
Constructors
this(void delegate() dg)

This class encapsulates a task queue and a set of worker threads. Its purpose is to efficiently map a large number of Tasks onto a smaller number of threads. A task queue is a FIFO queue of Task objects that have been submitted to the TaskPool and are awaiting execution. A worker thread is a thread that executes the Task at the front of the queue when one is available and sleeps when the queue is empty.

This class should usually be used via the global instantiation available via the taskPool property. Occasionally it is useful to explicitly instantiate a TaskPool:

  1. When you want TaskPool instances with multiple priorities, for example

a low priority pool and a high priority pool.

  1. When the threads in the global task pool are waiting on a synchronization

primitive (for example a mutex), and you want to parallelize the code that needs to run before these threads can be resumed.

Note

The worker threads in this pool will not stop until

stop or finish is called, even if the main thread has finished already. This may lead to programs that never end. If you do not want this behaviour, you can set isDaemon to true.

Fields
bool isSingleTask
Thread singleTaskThread
PoolState status
Condition workerCondition
Condition waiterCondition
Mutex queueMutex
Mutex waiterMutex
size_t nextInstanceIndex
size_t threadIndex
size_t instanceStartIndex
size_t nextThreadIndex
Methods
void doJob(AbstractTask * job)
void tryDeleteExecute(AbstractTask * toExecute)
void queueLock()
void waiterLock()
void wait()
void notify()
void notifyAll()
size_t defaultWorkUnitSize(size_t rangeLen) const @safe pure nothrow
ParallelForeach!R parallel(R)(R range, size_t workUnitSize)Implements a parallel foreach loop over a range. This works by implicitly creating and submitting one `Task` to the `TaskPool` for each worker thread. A work unit is a set of consecutive elements...
auto asyncBuf(S)(S source, size_t bufSize = 100) if (isInputRange!S)Given a `source` range that is expensive to iterate over, returns an input range that asynchronously buffers the contents of `source` into a buffer of `bufSize` elements in a worker thread, while m...
auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100) if (is(typeof(C2.init()) : bool) && Parameters!C1.length == 1 && Parameters!C2.length == 0 && isArray!(Parameters!C1[0]) )Given a callable object `next` that writes to a user-provided buffer and a second callable object `empty` that determines whether more data is available to write via `next`, returns an input range ...
size_t workerIndex() @property @safe const nothrowGets the index of the current thread relative to this `TaskPool`. Any thread not in this pool will receive an index of 0. The worker threads in this pool receive unique indices of 1 through `this...
WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init)Creates an instance of worker-local storage, initialized with a given value. The value is `lazy` so that you can, for example, easily create one instance of a class for each worker. For usage exa...
void stop() @trustedSignals to all worker threads to terminate as soon as they are finished with their current `Task`, or immediately if they are not executing a `Task`. `Task`s that were in queue will not be execute...
void finish(bool blocking = false) @trustedSignals worker threads to terminate when the queue becomes empty.
size_t size() @property @safe const pure nothrowReturns the number of worker threads in the pool.
void put(alias fun, Args...)(ref Task!(fun, Args) task) if (!isSafeReturn!(typeof(task)))Put a `Task` object on the back of the task queue. The `Task` object may be passed by pointer or reference.
void put(alias fun, Args...)(Task!(fun, Args) * task) if (!isSafeReturn!(typeof(* task)))Ditto
void put(alias fun, Args...)(ref Task!(fun, Args) task) if (isSafeReturn!(typeof(task))) @trusted
void put(alias fun, Args...)(Task!(fun, Args) * task) if (isSafeReturn!(typeof(* task))) @trusted
bool isDaemon() @property @trustedThese properties control whether the worker threads are daemon threads. A daemon thread is automatically terminated when all non-daemon threads have terminated. A non-daemon thread will prevent a ...
void isDaemon(bool newVal) @property @trustedDitto
int priority() @property @trustedThese functions allow getting and setting the OS scheduling priority of the worker threads in this `TaskPool`. They forward to `core.thread.Thread.priority`, so a given priority value here means t...
void priority(int newPriority) @property @trustedDitto
Constructors
this(AbstractTask * task, int priority = int.max)
this()Default constructor that initializes a `TaskPool` with `totalCPUs` - 1 worker threads. The minus 1 is included because the main thread will also be available to do work.
this(size_t nWorkers)Allows for custom number of worker threads.
Nested Templates
PoolState
amap(functions...)
map(functions...)
reduce(functions...)
fold(functions...)
WorkerLocalStorageStruct for creating worker-local storage. Worker-local storage is thread-local storage that exists only for worker threads in a given `TaskPool` plus a single thread outside the pool. It is alloc...
WorkerLocalStorageRangeRange primitives for worker-local storage. The purpose of this is to access results produced by each worker thread from a single thread once you are no longer using the worker-local storage from m...
Constructors
private structParallelForeach(R)
Fields
R range
size_t workUnitSize
Methods
int opApply(scope NoIndexDg dg)
int opApply(scope IndexDg dg)
private structRoundRobinBuffer(C1, C2)
Fields
T[][] bufs
size_t index
C1 nextDel
C2 emptyDel
bool _empty
bool primed
Methods
void prime()
T[] front() @property
void popFront()
bool empty() @property const @safe pure nothrow
Constructors
this( C1 nextDel, C2 emptyDel, size_t initialBufSize, size_t nBuffers )

Functions 21

fnT __lazilyInitializedConstant(T, alias outOfBandValue, alias initializer)() if (is(Unqual!T : T) && is(typeof(initializer()) : T) && is(typeof(outOfBandValue) : T)) @property pure
private fnsize_t cacheLineSizeImpl() @nogc nothrow @trusted
private fnvoid atomicSetUbyte(T)(ref T stuff, T newVal) if (__traits(isIntegral, T) && is(T : ubyte))
private fnubyte atomicReadUbyte(T)(ref T val) if (__traits(isIntegral, T) && is(T : ubyte))
private fnbool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal) if (__traits(isIntegral, T) && is(T : ubyte))
fnReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args)
fnauto task(alias fun, Args...)(Args args)Creates a `Task` on the GC heap that calls an alias. This may be executed via `Task.executeInNewThread` or by submitting to a TaskPool. A globally accessible instance of `TaskPool` is provided by...
fnauto task(F, Args...)(F delegateOrFp, Args args) if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)Creates a `Task` on the GC heap that calls a function pointer, delegate, or class/struct with overloaded opCall.
fn@trusted auto task(F, Args...)(F fun, Args args) if (__traits(compiles, () @safe => fun(args)) && isSafeTask!F)Version of `task` usable from `@safe` code. Usage mechanics are identical to the non-@safe case, but safety introduces some restrictions:
fnauto scopedTask(alias fun, Args...)(Args args)These functions allow the creation of `Task` objects on the stack rather than the GC heap. The lifetime of a `Task` created by `scopedTask` cannot exceed the lifetime of the scope it was created in.
fnauto scopedTask(F, Args...)(scope F delegateOrFp, Args args) if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)Ditto
fn@trusted auto scopedTask(F, Args...)(F fun, Args args) if (__traits(compiles, () @safe => fun(args)) && isSafeTask!F)Ditto
fnuint totalCPUsImpl() @nogc nothrow @trusted
fnTaskPool taskPool() @property @trustedReturns a lazily initialized global instantiation of `TaskPool`. This function can safely be called concurrently from multiple non-worker threads. The worker threads in this pool are daemon thread...
fnuint defaultPoolThreads() @property @trustedThese properties get and set the number of worker threads in the `TaskPool` instance returned by `taskPool`. The default value is `totalCPUs` - 1. Calling the setter after the first call to `taskP...
fnvoid defaultPoolThreads(uint newVal) @property @trustedDitto
fnParallelForeach!R parallel(R)(R range)Convenience functions that forwards to `taskPool.parallel`. The purpose of these is to make parallel foreach less verbose and more readable.
fnParallelForeach!R parallel(R)(R range, size_t workUnitSize)Ditto
private fnvoid submitAndExecute( TaskPool pool, scope void delegate() doIt )
fnvoid foreachErr()
fnint doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg)

Variables 3

private varuint _defaultPoolThreads
private enumvarparallelApplyMixinRandomAccess = q{ // Handle empty thread pool as special case. if (pool.size == 0) { return doSizeZeroCase(this, dg); } // Whether iteration is with or without an index variable. enum withIndex = Parameters!(typeof(dg)).length == 2; shared size_t workUnitIndex = size_t.max; // Effectively -1: chunkIndex + 1 == 0 immutable len = range.length; if (!len) return 0; shared bool shouldContinue = true; void doIt() { import std.algorithm.comparison : min; scope(failure) { // If an exception is thrown, all threads should bail. atomicStore(shouldContinue, false); } while (atomicLoad(shouldContinue)) { immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1); immutable start = workUnitSize * myUnitIndex; if (start >= len) { atomicStore(shouldContinue, false); break; } immutable end = min(len, start + workUnitSize); foreach (i; start .. end) { static if (withIndex) { if (dg(i, range[i])) foreachErr(); } else { if (dg(range[i])) foreachErr(); } } } } submitAndExecute(pool, &doIt); return 0; }
enumvarparallelApplyMixinInputRange = q{ // Handle empty thread pool as special case. if (pool.size == 0) { return doSizeZeroCase(this, dg); } // Whether iteration is with or without an index variable. enum withIndex = Parameters!(typeof(dg)).length == 2; // This protects the range while copying it. auto rangeMutex = new Mutex(); shared bool shouldContinue = true; // The total number of elements that have been popped off range. // This is updated only while protected by rangeMutex; size_t nPopped = 0; static if ( is(typeof(range.buf1)) && is(typeof(range.bufPos)) && is(typeof(range.doBufSwap())) ) { // Make sure we don't have the buffer recycling overload of // asyncBuf. static if ( is(typeof(range.source)) && isRoundRobin!(typeof(range.source)) ) { static assert(0, "Cannot execute a parallel foreach loop on " ~ "the buffer recycling overload of asyncBuf."); } enum bool bufferTrick = true; } else { enum bool bufferTrick = false; } void doIt() { scope(failure) { // If an exception is thrown, all threads should bail. atomicStore(shouldContinue, false); } static if (hasLvalueElements!R) { alias Temp = ElementType!R*[]; Temp temp; // Returns: The previous value of nPopped. size_t makeTemp() { import std.algorithm.internal : addressOf; import std.array : uninitializedArray; if (temp is null) { temp = uninitializedArray!Temp(workUnitSize); } rangeMutex.lock(); scope(exit) rangeMutex.unlock(); size_t i = 0; for (; i < workUnitSize && !range.empty; range.popFront(), i++) { temp[i] = addressOf(range.front); } temp = temp[0 .. i]; auto ret = nPopped; nPopped += temp.length; return ret; } } else { alias Temp = ElementType!R[]; Temp temp; // Returns: The previous value of nPopped. static if (!bufferTrick) size_t makeTemp() { import std.array : uninitializedArray; if (temp is null) { temp = uninitializedArray!Temp(workUnitSize); } rangeMutex.lock(); scope(exit) rangeMutex.unlock(); size_t i = 0; for (; i < workUnitSize && !range.empty; range.popFront(), i++) { temp[i] = range.front; } temp = temp[0 .. i]; auto ret = nPopped; nPopped += temp.length; return ret; } static if (bufferTrick) size_t makeTemp() { import std.algorithm.mutation : swap; rangeMutex.lock(); scope(exit) rangeMutex.unlock(); // Elide copying by just swapping buffers. temp.length = range.buf1.length; swap(range.buf1, temp); // This is necessary in case popFront() has been called on // range before entering the parallel foreach loop. temp = temp[range.bufPos..$]; static if (is(typeof(range._length))) { range._length -= (temp.length - range.bufPos); } range.doBufSwap(); auto ret = nPopped; nPopped += temp.length; return ret; } } while (atomicLoad(shouldContinue)) { auto overallIndex = makeTemp(); if (temp.empty) { atomicStore(shouldContinue, false); break; } foreach (i; 0 .. temp.length) { scope(success) overallIndex++; static if (hasLvalueElements!R) { static if (withIndex) { if (dg(overallIndex, *temp[i])) foreachErr(); } else { if (dg(*temp[i])) foreachErr(); } } else { static if (withIndex) { if (dg(overallIndex, temp[i])) foreachErr(); } else { if (dg(temp[i])) foreachErr(); } } } } } submitAndExecute(pool, &doIt); return 0; }

Templates 12

tmplMapType(R, functions...)
tmplReduceType(alias fun, R, E)
tmplnoUnsharedAliasing(T)
tmplisSafeTask(F)
tmplisSafeReturn(T)
tmplrandAssignable(R)
tmplAliasReturn(alias fun, T...)
tmplreduceAdjoin(functions...)
tmplreduceFinish(functions...)
tmplisRoundRobin(R : RoundRobinBuffer!(C1, C2), C1, C2)
tmplisRoundRobin(T)
tmplrandLen(R)