1 /**
2  * ae.utils.parallelism
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Vladimir Panteleev <ae@cy.md>
12  */
13 
14 module ae.utils.parallelism;
15 
16 import std.algorithm.mutation;
17 import std.algorithm.searching;
18 import std.algorithm.sorting;
19 import std.parallelism;
20 import std.range : chunks, iota;
21 
22 // https://gist.github.com/63e139a16b9b278fb5d449ace611e7b8
23 
24 /// Sort `r` using all CPU cores.
25 auto parallelSort(alias less = "a < b", R)(R r)
26 {
27 	auto impl(size_t depth = 0)(R order)
28 	{
29 		static if (depth < 8)
30 			if ((1L << depth) < totalCPUs)
31 				foreach (chunk; order.chunks(order.length / 2 + 1).parallel(1))
32 					impl!(depth + 1)(chunk);
33 
34 		return order.sort!(less, SwapStrategy.stable, R);
35 	}
36 	return impl(r);
37 }
38 
39 unittest
40 {
41 	assert([3, 1, 2].parallelSort.release == [1, 2, 3]);
42 }
43 
44 
45 /// Parallel map.  Like TaskPool.amap, but uses functors for
46 /// predicates instead of alias arguments, and as such does not have
47 /// the multiple-context problem.
48 /// https://forum.dlang.org/post/qnigarkuxxnqwdernhzv@forum.dlang.org
49 auto parallelEagerMap(R, Pred)(R input, Pred pred, size_t workUnitSize = 0)
50 {
51 	if (workUnitSize == 0)
52 		workUnitSize = taskPool.defaultWorkUnitSize(input.length);
53 	alias RT = typeof(pred(input[0]));
54 	auto result = new RT[input.length];
55 	foreach (i; input.length.iota.parallel(workUnitSize))
56 		result[i] = pred(input[i]);
57 	return result;
58 }
59 
60 unittest
61 {
62 	assert([1, 2, 3].parallelEagerMap((int n) => n + 1) == [2, 3, 4]);
63 }
64 
65 
66 /// Compare two arrays for equality, in parallel.
67 bool parallelEqual(T)(T[] a, T[] b)
68 {
69 	if (a.length != b.length)
70 		return false;
71 
72 	static bool[] chunkEqualBuf;
73 	if (!chunkEqualBuf)
74 		chunkEqualBuf = new bool[totalCPUs];
75 	auto chunkEqual = chunkEqualBuf;
76 	foreach (threadIndex; totalCPUs.iota.parallel(1))
77 	{
78 		auto start = a.length * (threadIndex    ) / totalCPUs;
79 		auto end   = a.length * (threadIndex + 1) / totalCPUs;
80 		chunkEqual[threadIndex] = a[start .. end] == b[start .. end];
81 	}
82 	return chunkEqual.all!(a => a)();
83 }
84 
85 unittest
86 {
87 	import std.array : array;
88 	auto a = 1024.iota.array;
89 	auto b = a.dup;
90 	assert(parallelEqual(a, b));
91 	b[500] = 0;
92 	assert(!parallelEqual(a, b));
93 }