1 /**
2  * ae.utils.parallelism
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Vladimir Panteleev <ae@cy.md>
12  */
13 
14 module ae.utils.parallelism;
15 
16 import ae.utils.array : amap;
17 
18 import std.algorithm.comparison : min;
19 import std.algorithm.iteration;
20 import std.algorithm.mutation;
21 import std.algorithm.searching;
22 import std.algorithm.sorting;
23 import std.array;
24 import std.parallelism;
25 import std.range : chunks, iota;
26 import std.range.primitives;
27 
28 // https://gist.github.com/63e139a16b9b278fb5d449ace611e7b8
29 
30 /// Sort `r` using all CPU cores.
31 auto parallelSort(alias less = "a < b", R)(R r)
32 {
33 	auto impl(size_t depth = 0)(R order)
34 	{
35 		static if (depth < 8)
36 			if ((1L << depth) < totalCPUs)
37 				foreach (chunk; order.chunks(order.length / 2 + 1).parallel(1))
38 					impl!(depth + 1)(chunk);
39 
40 		return order.sort!(less, SwapStrategy.stable, R);
41 	}
42 	return impl(r);
43 }
44 
45 debug(ae_unittest) unittest
46 {
47 	assert([3, 1, 2].parallelSort.release == [1, 2, 3]);
48 }
49 
50 
51 /// Parallel map.  Like TaskPool.amap, but uses functors for
52 /// predicates instead of alias arguments, and as such does not have
53 /// the multiple-context problem.
54 /// https://forum.dlang.org/post/qnigarkuxxnqwdernhzv@forum.dlang.org
55 auto parallelEagerMap(R, Pred)(R input, Pred pred, size_t workUnitSize = 0)
56 {
57 	if (workUnitSize == 0)
58 		workUnitSize = taskPool.defaultWorkUnitSize(input.length);
59 	alias RT = typeof(pred(input[0]));
60 	auto result = new RT[input.length];
61 	foreach (i; input.length.iota.parallel(workUnitSize))
62 		result[i] = pred(input[i]);
63 	return result;
64 }
65 
66 debug(ae_unittest) unittest
67 {
68 	assert([1, 2, 3].parallelEagerMap((int n) => n + 1) == [2, 3, 4]);
69 }
70 
71 
72 /// Compare two arrays for equality, in parallel.
73 bool parallelEqual(T)(T[] a, T[] b)
74 {
75 	if (a.length != b.length)
76 		return false;
77 
78 	static bool[] chunkEqualBuf;
79 	if (!chunkEqualBuf)
80 		chunkEqualBuf = new bool[totalCPUs];
81 	auto chunkEqual = chunkEqualBuf;
82 	foreach (threadIndex; totalCPUs.iota.parallel(1))
83 	{
84 		auto start = a.length * (threadIndex    ) / totalCPUs;
85 		auto end   = a.length * (threadIndex + 1) / totalCPUs;
86 		chunkEqual[threadIndex] = a[start .. end] == b[start .. end];
87 	}
88 	return chunkEqual.all!(a => a)();
89 }
90 
91 debug(ae_unittest) unittest
92 {
93 	import std.array : array;
94 	auto a = 1024.iota.array;
95 	auto b = a.dup;
96 	assert(parallelEqual(a, b));
97 	b[500] = 0;
98 	assert(!parallelEqual(a, b));
99 }
100 
101 // ************************************************************************
102 
103 private auto parallelChunkOffsets(size_t length)
104 {
105 	size_t numChunks = min(length, totalCPUs);
106 	return (numChunks + 1).iota.map!(chunkIndex => chunkIndex * length / numChunks);
107 }
108 
109 /// Split a range into chunks, processing each chunk in parallel.
110 /// Returns a dynamic array containing the result of calling `fun` on each chunk.
111 /// `fun` is called at most once per CPU core.
112 T[] parallelChunks(R, T)(R range, scope T delegate(R) fun)
113 if (isRandomAccessRange!R)
114 {
115 	auto offsets = parallelChunkOffsets(range.length);
116 	size_t numChunks = offsets.length - 1;
117 	auto result = new T[numChunks];
118 	foreach (chunkIndex; numChunks.iota.parallel(1))
119 		result[chunkIndex] = fun(range[offsets[chunkIndex] .. offsets[chunkIndex + 1]]);
120 	return result;
121 }
122 
123 /// ditto
124 T[] parallelChunks(N, T)(N total, scope T delegate(N start, N end) fun)
125 if (is(N : ulong))
126 {
127 	auto offsets = parallelChunkOffsets(total);
128 	size_t numChunks = offsets.length - 1;
129 	auto result = new T[numChunks];
130 	foreach (chunkIndex; numChunks.iota.parallel(1))
131 		result[chunkIndex] = fun(cast(N)offsets[chunkIndex], cast(N)offsets[chunkIndex + 1]);
132 	return result;
133 }
134 
135 /// ditto
136 auto parallelChunks(alias fun, R)(R range)
137 if (isRandomAccessRange!R)
138 {
139 	alias T = typeof(fun(range[0..0]));
140 	auto offsets = parallelChunkOffsets(range.length);
141 	size_t numChunks = offsets.length - 1;
142 	auto result = new T[numChunks];
143 	foreach (chunkIndex; numChunks.iota.parallel(1))
144 		result[chunkIndex] = fun(range[offsets[chunkIndex] .. offsets[chunkIndex + 1]]);
145 	return result;
146 }
147 
148 /// ditto
149 auto parallelChunks(alias fun, N)(N total)
150 if (is(N : ulong))
151 {
152 	alias T = typeof(fun(N.init, N.init));
153 	auto offsets = parallelChunkOffsets(total);
154 	size_t numChunks = offsets.length - 1;
155 	auto result = new T[numChunks];
156 	foreach (chunkIndex; numChunks.iota.parallel(1))
157 		result[chunkIndex] = fun(cast(N)offsets[chunkIndex], cast(N)offsets[chunkIndex + 1]);
158 	return result;
159 }
160 
161 debug(ae_unittest) unittest
162 {
163 	import std.algorithm.iteration : sum;
164 	assert([1, 2, 3].parallelChunks((int[] arr) => arr.sum).sum == 6);
165 	assert(4.parallelChunks((int low, int high) => iota(low, high).sum).sum == 6);
166 	assert([1, 2, 3].parallelChunks!(arr => arr.sum).sum == 6);
167 	assert(4.parallelChunks!((low, high) => iota(low, high).sum).sum == 6);
168 }
169 
170 // ************************************************************************
171 
172 /// Filters `input` in parallel.
173 /// This version calls `fun` only once per `input` element
174 /// (at the expense of additional used memory).
175 auto parallelCachedFilter(alias fun, R)(R input)
176 if (isInputRange!R && is(typeof(fun(input.front))))
177 {
178 	import ae.utils.functor.primitives : functor;
179 
180 	auto inputOffsets = parallelChunkOffsets(input.length);
181 
182 	bool[][] wantedChunks = input.parallelChunks!(chunk => chunk.map!fun.array);
183 	auto numChunks = wantedChunks.length;
184 	auto outputCounts = wantedChunks.parallelEagerMap(functor!((bool[] chunk) => chunk.reduce!((a, b) => size_t(a) + size_t(b))));
185 	auto outputOffsets = 0 ~ outputCounts.cumulativeFold!((a, b) => a + b).array;
186 	auto outputTotal = outputOffsets[$-1];
187 	auto output = new typeof(input.front)[outputTotal];
188 	foreach (chunkIndex; numChunks.iota.parallel)
189 	{
190 		auto chunkOutputIndex = outputOffsets[chunkIndex];
191 		auto chunkInputOffset = inputOffsets[chunkIndex];
192 		foreach (chunkInputIndex, wanted; wantedChunks[chunkIndex])
193 			if (wanted)
194 				output[chunkOutputIndex++] = input[chunkInputOffset + chunkInputIndex];
195 	}
196 	return output;
197 }
198 
199 debug(ae_unittest) unittest
200 {
201 	assert([1, 2, 3].parallelCachedFilter!(x => x % 2 == 0) == [2]);
202 }
203 
204 // ************************************************************************
205 
206 template parallelReduce(alias fun)
207 {
208 	auto parallelReduce(R)(R range)
209 	{
210 		import std.algorithm.iteration : reduce;
211 		return range.parallelChunks!(chunk => chunk.reduce!fun).reduce!fun;
212 	}
213 }
214 
215 // alias parallelSum = parallelReduce!((a, b) => a + b);
216 
217 debug(ae_unittest) unittest
218 {
219 	import std.algorithm.iteration : sum;
220 	assert([1, 2, 3].parallelReduce!((a, b) => a + b) == 6);
221 }
222 
223 auto parallelSum(R)(R range)
224 {
225 	import std.algorithm.iteration : sum;
226 	return range.parallelChunks!sum.sum;
227 }
228 
229 debug(ae_unittest) unittest
230 {
231 	import std.algorithm.iteration : sum;
232 	assert([1, 2, 3].parallelSum == 6);
233 }