1 /**
2  * ae.utils.parallelism
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Vladimir Panteleev <ae@cy.md>
12  */
13 
14 module ae.utils.parallelism;
15 
16 import std.algorithm.comparison : min;
17 import std.algorithm.mutation;
18 import std.algorithm.searching;
19 import std.algorithm.sorting;
20 import std.parallelism;
21 import std.range : chunks, iota;
22 import std.range.primitives;
23 
24 // https://gist.github.com/63e139a16b9b278fb5d449ace611e7b8
25 
26 /// Sort `r` using all CPU cores.
27 auto parallelSort(alias less = "a < b", R)(R r)
28 {
29 	auto impl(size_t depth = 0)(R order)
30 	{
31 		static if (depth < 8)
32 			if ((1L << depth) < totalCPUs)
33 				foreach (chunk; order.chunks(order.length / 2 + 1).parallel(1))
34 					impl!(depth + 1)(chunk);
35 
36 		return order.sort!(less, SwapStrategy.stable, R);
37 	}
38 	return impl(r);
39 }
40 
41 unittest
42 {
43 	assert([3, 1, 2].parallelSort.release == [1, 2, 3]);
44 }
45 
46 
47 /// Parallel map.  Like TaskPool.amap, but uses functors for
48 /// predicates instead of alias arguments, and as such does not have
49 /// the multiple-context problem.
50 /// https://forum.dlang.org/post/qnigarkuxxnqwdernhzv@forum.dlang.org
51 auto parallelEagerMap(R, Pred)(R input, Pred pred, size_t workUnitSize = 0)
52 {
53 	if (workUnitSize == 0)
54 		workUnitSize = taskPool.defaultWorkUnitSize(input.length);
55 	alias RT = typeof(pred(input[0]));
56 	auto result = new RT[input.length];
57 	foreach (i; input.length.iota.parallel(workUnitSize))
58 		result[i] = pred(input[i]);
59 	return result;
60 }
61 
62 unittest
63 {
64 	assert([1, 2, 3].parallelEagerMap((int n) => n + 1) == [2, 3, 4]);
65 }
66 
67 
68 /// Compare two arrays for equality, in parallel.
69 bool parallelEqual(T)(T[] a, T[] b)
70 {
71 	if (a.length != b.length)
72 		return false;
73 
74 	static bool[] chunkEqualBuf;
75 	if (!chunkEqualBuf)
76 		chunkEqualBuf = new bool[totalCPUs];
77 	auto chunkEqual = chunkEqualBuf;
78 	foreach (threadIndex; totalCPUs.iota.parallel(1))
79 	{
80 		auto start = a.length * (threadIndex    ) / totalCPUs;
81 		auto end   = a.length * (threadIndex + 1) / totalCPUs;
82 		chunkEqual[threadIndex] = a[start .. end] == b[start .. end];
83 	}
84 	return chunkEqual.all!(a => a)();
85 }
86 
87 unittest
88 {
89 	import std.array : array;
90 	auto a = 1024.iota.array;
91 	auto b = a.dup;
92 	assert(parallelEqual(a, b));
93 	b[500] = 0;
94 	assert(!parallelEqual(a, b));
95 }
96 
97 
98 /// Split a range into chunks, processing each chunk in parallel.
99 /// Returns a dynamic array containing the result of calling `fun` on each chunk.
100 /// `fun` is called at most once per CPU core.
101 T[] parallelChunks(R, T)(R range, scope T delegate(R) fun)
102 if (isRandomAccessRange!R)
103 {
104 	auto total = range.length;
105 	size_t numChunks = min(total, totalCPUs);
106 	auto result = new T[numChunks];
107 	foreach (chunkIndex; numChunks.iota.parallel(1))
108 		result[chunkIndex] = fun(range[
109 			(chunkIndex + 0) * total / numChunks ..
110 			(chunkIndex + 1) * total / numChunks
111 		]);
112 	return result;
113 }
114 
115 /// ditto
116 T[] parallelChunks(N, T)(N total, scope T delegate(N start, N end) fun)
117 if (is(N : ulong))
118 {
119 	size_t numChunks = min(total, totalCPUs);
120 	auto result = new T[numChunks];
121 	foreach (chunkIndex; numChunks.iota.parallel(1))
122 		result[chunkIndex] = fun(
123 			cast(N)((chunkIndex + 0) * total / numChunks),
124 			cast(N)((chunkIndex + 1) * total / numChunks),
125 		);
126 	return result;
127 }
128 
129 /// ditto
130 auto parallelChunks(alias fun, R)(R range)
131 if (isRandomAccessRange!R)
132 {
133 	alias T = typeof(fun(range[0..0]));
134 	auto total = range.length;
135 	size_t numChunks = min(total, totalCPUs);
136 	auto result = new T[numChunks];
137 	foreach (chunkIndex; numChunks.iota.parallel(1))
138 		result[chunkIndex] = fun(range[
139 			(chunkIndex + 0) * total / numChunks ..
140 			(chunkIndex + 1) * total / numChunks
141 		]);
142 	return result;
143 }
144 
145 /// ditto
146 auto parallelChunks(alias fun, N)(N total)
147 if (is(N : ulong))
148 {
149 	alias T = typeof(fun(N.init, N.init));
150 	size_t numChunks = min(total, totalCPUs);
151 	auto result = new T[numChunks];
152 	foreach (chunkIndex; numChunks.iota.parallel(1))
153 		result[chunkIndex] = fun(
154 			cast(N)((chunkIndex + 0) * total / numChunks),
155 			cast(N)((chunkIndex + 1) * total / numChunks),
156 		);
157 	return result;
158 }
159 
160 unittest
161 {
162 	import std.algorithm.iteration : sum;
163 	assert([1, 2, 3].parallelChunks((int[] arr) => arr.sum).sum == 6);
164 	assert(4.parallelChunks((int low, int high) => iota(low, high).sum).sum == 6);
165 	assert([1, 2, 3].parallelChunks!(arr => arr.sum).sum == 6);
166 	assert(4.parallelChunks!((low, high) => iota(low, high).sum).sum == 6);
167 }