1 /** 2 * ae.utils.parallelism 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Vladimir Panteleev <ae@cy.md> 12 */ 13 14 module ae.utils.parallelism; 15 16 import ae.utils.array : amap; 17 18 import std.algorithm.comparison : min; 19 import std.algorithm.iteration; 20 import std.algorithm.mutation; 21 import std.algorithm.searching; 22 import std.algorithm.sorting; 23 import std.array; 24 import std.parallelism; 25 import std.range : chunks, iota; 26 import std.range.primitives; 27 28 // https://gist.github.com/63e139a16b9b278fb5d449ace611e7b8 29 30 /// Sort `r` using all CPU cores. 31 auto parallelSort(alias less = "a < b", R)(R r) 32 { 33 auto impl(size_t depth = 0)(R order) 34 { 35 static if (depth < 8) 36 if ((1L << depth) < totalCPUs) 37 foreach (chunk; order.chunks(order.length / 2 + 1).parallel(1)) 38 impl!(depth + 1)(chunk); 39 40 return order.sort!(less, SwapStrategy.stable, R); 41 } 42 return impl(r); 43 } 44 45 debug(ae_unittest) unittest 46 { 47 assert([3, 1, 2].parallelSort.release == [1, 2, 3]); 48 } 49 50 51 /// Parallel map. Like TaskPool.amap, but uses functors for 52 /// predicates instead of alias arguments, and as such does not have 53 /// the multiple-context problem. 54 /// https://forum.dlang.org/post/qnigarkuxxnqwdernhzv@forum.dlang.org 55 auto parallelEagerMap(R, Pred)(R input, Pred pred, size_t workUnitSize = 0) 56 { 57 if (workUnitSize == 0) 58 workUnitSize = taskPool.defaultWorkUnitSize(input.length); 59 alias RT = typeof(pred(input[0])); 60 auto result = new RT[input.length]; 61 foreach (i; input.length.iota.parallel(workUnitSize)) 62 result[i] = pred(input[i]); 63 return result; 64 } 65 66 debug(ae_unittest) unittest 67 { 68 assert([1, 2, 3].parallelEagerMap((int n) => n + 1) == [2, 3, 4]); 69 } 70 71 72 /// Compare two arrays for equality, in parallel. 73 bool parallelEqual(T)(T[] a, T[] b) 74 { 75 if (a.length != b.length) 76 return false; 77 78 static bool[] chunkEqualBuf; 79 if (!chunkEqualBuf) 80 chunkEqualBuf = new bool[totalCPUs]; 81 auto chunkEqual = chunkEqualBuf; 82 foreach (threadIndex; totalCPUs.iota.parallel(1)) 83 { 84 auto start = a.length * (threadIndex ) / totalCPUs; 85 auto end = a.length * (threadIndex + 1) / totalCPUs; 86 chunkEqual[threadIndex] = a[start .. end] == b[start .. end]; 87 } 88 return chunkEqual.all!(a => a)(); 89 } 90 91 debug(ae_unittest) unittest 92 { 93 import std.array : array; 94 auto a = 1024.iota.array; 95 auto b = a.dup; 96 assert(parallelEqual(a, b)); 97 b[500] = 0; 98 assert(!parallelEqual(a, b)); 99 } 100 101 // ************************************************************************ 102 103 private auto parallelChunkOffsets(size_t length) 104 { 105 size_t numChunks = min(length, totalCPUs); 106 return (numChunks + 1).iota.map!(chunkIndex => chunkIndex * length / numChunks); 107 } 108 109 /// Split a range into chunks, processing each chunk in parallel. 110 /// Returns a dynamic array containing the result of calling `fun` on each chunk. 111 /// `fun` is called at most once per CPU core. 112 T[] parallelChunks(R, T)(R range, scope T delegate(R) fun) 113 if (isRandomAccessRange!R) 114 { 115 auto offsets = parallelChunkOffsets(range.length); 116 size_t numChunks = offsets.length - 1; 117 auto result = new T[numChunks]; 118 foreach (chunkIndex; numChunks.iota.parallel(1)) 119 result[chunkIndex] = fun(range[offsets[chunkIndex] .. offsets[chunkIndex + 1]]); 120 return result; 121 } 122 123 /// ditto 124 T[] parallelChunks(N, T)(N total, scope T delegate(N start, N end) fun) 125 if (is(N : ulong)) 126 { 127 auto offsets = parallelChunkOffsets(total); 128 size_t numChunks = offsets.length - 1; 129 auto result = new T[numChunks]; 130 foreach (chunkIndex; numChunks.iota.parallel(1)) 131 result[chunkIndex] = fun(cast(N)offsets[chunkIndex], cast(N)offsets[chunkIndex + 1]); 132 return result; 133 } 134 135 /// ditto 136 auto parallelChunks(alias fun, R)(R range) 137 if (isRandomAccessRange!R) 138 { 139 alias T = typeof(fun(range[0..0])); 140 auto offsets = parallelChunkOffsets(range.length); 141 size_t numChunks = offsets.length - 1; 142 auto result = new T[numChunks]; 143 foreach (chunkIndex; numChunks.iota.parallel(1)) 144 result[chunkIndex] = fun(range[offsets[chunkIndex] .. offsets[chunkIndex + 1]]); 145 return result; 146 } 147 148 /// ditto 149 auto parallelChunks(alias fun, N)(N total) 150 if (is(N : ulong)) 151 { 152 alias T = typeof(fun(N.init, N.init)); 153 auto offsets = parallelChunkOffsets(total); 154 size_t numChunks = offsets.length - 1; 155 auto result = new T[numChunks]; 156 foreach (chunkIndex; numChunks.iota.parallel(1)) 157 result[chunkIndex] = fun(cast(N)offsets[chunkIndex], cast(N)offsets[chunkIndex + 1]); 158 return result; 159 } 160 161 debug(ae_unittest) unittest 162 { 163 import std.algorithm.iteration : sum; 164 assert([1, 2, 3].parallelChunks((int[] arr) => arr.sum).sum == 6); 165 assert(4.parallelChunks((int low, int high) => iota(low, high).sum).sum == 6); 166 assert([1, 2, 3].parallelChunks!(arr => arr.sum).sum == 6); 167 assert(4.parallelChunks!((low, high) => iota(low, high).sum).sum == 6); 168 } 169 170 // ************************************************************************ 171 172 /// Filters `input` in parallel. 173 /// This version calls `fun` only once per `input` element 174 /// (at the expense of additional used memory). 175 auto parallelCachedFilter(alias fun, R)(R input) 176 if (isInputRange!R && is(typeof(fun(input.front)))) 177 { 178 import ae.utils.functor.primitives : functor; 179 180 auto inputOffsets = parallelChunkOffsets(input.length); 181 182 bool[][] wantedChunks = input.parallelChunks!(chunk => chunk.map!fun.array); 183 auto numChunks = wantedChunks.length; 184 auto outputCounts = wantedChunks.parallelEagerMap(functor!((bool[] chunk) => chunk.reduce!((a, b) => size_t(a) + size_t(b)))); 185 auto outputOffsets = 0 ~ outputCounts.cumulativeFold!((a, b) => a + b).array; 186 auto outputTotal = outputOffsets[$-1]; 187 auto output = new typeof(input.front)[outputTotal]; 188 foreach (chunkIndex; numChunks.iota.parallel) 189 { 190 auto chunkOutputIndex = outputOffsets[chunkIndex]; 191 auto chunkInputOffset = inputOffsets[chunkIndex]; 192 foreach (chunkInputIndex, wanted; wantedChunks[chunkIndex]) 193 if (wanted) 194 output[chunkOutputIndex++] = input[chunkInputOffset + chunkInputIndex]; 195 } 196 return output; 197 } 198 199 debug(ae_unittest) unittest 200 { 201 assert([1, 2, 3].parallelCachedFilter!(x => x % 2 == 0) == [2]); 202 } 203 204 // ************************************************************************ 205 206 template parallelReduce(alias fun) 207 { 208 auto parallelReduce(R)(R range) 209 { 210 import std.algorithm.iteration : reduce; 211 return range.parallelChunks!(chunk => chunk.reduce!fun).reduce!fun; 212 } 213 } 214 215 // alias parallelSum = parallelReduce!((a, b) => a + b); 216 217 debug(ae_unittest) unittest 218 { 219 import std.algorithm.iteration : sum; 220 assert([1, 2, 3].parallelReduce!((a, b) => a + b) == 6); 221 } 222 223 auto parallelSum(R)(R range) 224 { 225 import std.algorithm.iteration : sum; 226 return range.parallelChunks!sum.sum; 227 } 228 229 debug(ae_unittest) unittest 230 { 231 import std.algorithm.iteration : sum; 232 assert([1, 2, 3].parallelSum == 6); 233 }