1 /** 2 * ae.utils.parallelism 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Vladimir Panteleev <ae@cy.md> 12 */ 13 14 module ae.utils.parallelism; 15 16 import std.algorithm.comparison : min; 17 import std.algorithm.mutation; 18 import std.algorithm.searching; 19 import std.algorithm.sorting; 20 import std.parallelism; 21 import std.range : chunks, iota; 22 import std.range.primitives; 23 24 // https://gist.github.com/63e139a16b9b278fb5d449ace611e7b8 25 26 /// Sort `r` using all CPU cores. 27 auto parallelSort(alias less = "a < b", R)(R r) 28 { 29 auto impl(size_t depth = 0)(R order) 30 { 31 static if (depth < 8) 32 if ((1L << depth) < totalCPUs) 33 foreach (chunk; order.chunks(order.length / 2 + 1).parallel(1)) 34 impl!(depth + 1)(chunk); 35 36 return order.sort!(less, SwapStrategy.stable, R); 37 } 38 return impl(r); 39 } 40 41 unittest 42 { 43 assert([3, 1, 2].parallelSort.release == [1, 2, 3]); 44 } 45 46 47 /// Parallel map. Like TaskPool.amap, but uses functors for 48 /// predicates instead of alias arguments, and as such does not have 49 /// the multiple-context problem. 50 /// https://forum.dlang.org/post/qnigarkuxxnqwdernhzv@forum.dlang.org 51 auto parallelEagerMap(R, Pred)(R input, Pred pred, size_t workUnitSize = 0) 52 { 53 if (workUnitSize == 0) 54 workUnitSize = taskPool.defaultWorkUnitSize(input.length); 55 alias RT = typeof(pred(input[0])); 56 auto result = new RT[input.length]; 57 foreach (i; input.length.iota.parallel(workUnitSize)) 58 result[i] = pred(input[i]); 59 return result; 60 } 61 62 unittest 63 { 64 assert([1, 2, 3].parallelEagerMap((int n) => n + 1) == [2, 3, 4]); 65 } 66 67 68 /// Compare two arrays for equality, in parallel. 69 bool parallelEqual(T)(T[] a, T[] b) 70 { 71 if (a.length != b.length) 72 return false; 73 74 static bool[] chunkEqualBuf; 75 if (!chunkEqualBuf) 76 chunkEqualBuf = new bool[totalCPUs]; 77 auto chunkEqual = chunkEqualBuf; 78 foreach (threadIndex; totalCPUs.iota.parallel(1)) 79 { 80 auto start = a.length * (threadIndex ) / totalCPUs; 81 auto end = a.length * (threadIndex + 1) / totalCPUs; 82 chunkEqual[threadIndex] = a[start .. end] == b[start .. end]; 83 } 84 return chunkEqual.all!(a => a)(); 85 } 86 87 unittest 88 { 89 import std.array : array; 90 auto a = 1024.iota.array; 91 auto b = a.dup; 92 assert(parallelEqual(a, b)); 93 b[500] = 0; 94 assert(!parallelEqual(a, b)); 95 } 96 97 98 /// Split a range into chunks, processing each chunk in parallel. 99 /// Returns a dynamic array containing the result of calling `fun` on each chunk. 100 /// `fun` is called at most once per CPU core. 101 T[] parallelChunks(R, T)(R range, scope T delegate(R) fun) 102 if (isRandomAccessRange!R) 103 { 104 auto total = range.length; 105 size_t numChunks = min(total, totalCPUs); 106 auto result = new T[numChunks]; 107 foreach (chunkIndex; numChunks.iota.parallel(1)) 108 result[chunkIndex] = fun(range[ 109 (chunkIndex + 0) * total / numChunks .. 110 (chunkIndex + 1) * total / numChunks 111 ]); 112 return result; 113 } 114 115 /// ditto 116 T[] parallelChunks(N, T)(N total, scope T delegate(N start, N end) fun) 117 if (is(N : ulong)) 118 { 119 size_t numChunks = min(total, totalCPUs); 120 auto result = new T[numChunks]; 121 foreach (chunkIndex; numChunks.iota.parallel(1)) 122 result[chunkIndex] = fun( 123 cast(N)((chunkIndex + 0) * total / numChunks), 124 cast(N)((chunkIndex + 1) * total / numChunks), 125 ); 126 return result; 127 } 128 129 /// ditto 130 auto parallelChunks(alias fun, R)(R range) 131 if (isRandomAccessRange!R) 132 { 133 alias T = typeof(fun(range[0..0])); 134 auto total = range.length; 135 size_t numChunks = min(total, totalCPUs); 136 auto result = new T[numChunks]; 137 foreach (chunkIndex; numChunks.iota.parallel(1)) 138 result[chunkIndex] = fun(range[ 139 (chunkIndex + 0) * total / numChunks .. 140 (chunkIndex + 1) * total / numChunks 141 ]); 142 return result; 143 } 144 145 /// ditto 146 auto parallelChunks(alias fun, N)(N total) 147 if (is(N : ulong)) 148 { 149 alias T = typeof(fun(N.init, N.init)); 150 size_t numChunks = min(total, totalCPUs); 151 auto result = new T[numChunks]; 152 foreach (chunkIndex; numChunks.iota.parallel(1)) 153 result[chunkIndex] = fun( 154 cast(N)((chunkIndex + 0) * total / numChunks), 155 cast(N)((chunkIndex + 1) * total / numChunks), 156 ); 157 return result; 158 } 159 160 unittest 161 { 162 import std.algorithm.iteration : sum; 163 assert([1, 2, 3].parallelChunks((int[] arr) => arr.sum).sum == 6); 164 assert(4.parallelChunks((int low, int high) => iota(low, high).sum).sum == 6); 165 assert([1, 2, 3].parallelChunks!(arr => arr.sum).sum == 6); 166 assert(4.parallelChunks!((low, high) => iota(low, high).sum).sum == 6); 167 }