1 /** 2 * ae.utils.parallelism 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Vladimir Panteleev <ae@cy.md> 12 */ 13 14 module ae.utils.parallelism; 15 16 import std.algorithm.mutation; 17 import std.algorithm.searching; 18 import std.algorithm.sorting; 19 import std.parallelism; 20 import std.range : chunks, iota; 21 22 // https://gist.github.com/63e139a16b9b278fb5d449ace611e7b8 23 24 /// Sort `r` using all CPU cores. 25 auto parallelSort(alias less = "a < b", R)(R r) 26 { 27 auto impl(size_t depth = 0)(R order) 28 { 29 static if (depth < 8) 30 if ((1L << depth) < totalCPUs) 31 foreach (chunk; order.chunks(order.length / 2 + 1).parallel(1)) 32 impl!(depth + 1)(chunk); 33 34 return order.sort!(less, SwapStrategy.stable, R); 35 } 36 return impl(r); 37 } 38 39 unittest 40 { 41 assert([3, 1, 2].parallelSort.release == [1, 2, 3]); 42 } 43 44 45 /// Parallel map. Like TaskPool.amap, but uses functors for 46 /// predicates instead of alias arguments, and as such does not have 47 /// the multiple-context problem. 48 /// https://forum.dlang.org/post/qnigarkuxxnqwdernhzv@forum.dlang.org 49 auto parallelEagerMap(R, Pred)(R input, Pred pred, size_t workUnitSize = 0) 50 { 51 if (workUnitSize == 0) 52 workUnitSize = taskPool.defaultWorkUnitSize(input.length); 53 alias RT = typeof(pred(input[0])); 54 auto result = new RT[input.length]; 55 foreach (i; input.length.iota.parallel(workUnitSize)) 56 result[i] = pred(input[i]); 57 return result; 58 } 59 60 unittest 61 { 62 assert([1, 2, 3].parallelEagerMap((int n) => n + 1) == [2, 3, 4]); 63 } 64 65 66 /// Compare two arrays for equality, in parallel. 67 bool parallelEqual(T)(T[] a, T[] b) 68 { 69 if (a.length != b.length) 70 return false; 71 72 static bool[] chunkEqualBuf; 73 if (!chunkEqualBuf) 74 chunkEqualBuf = new bool[totalCPUs]; 75 auto chunkEqual = chunkEqualBuf; 76 foreach (threadIndex; totalCPUs.iota.parallel(1)) 77 { 78 auto start = a.length * (threadIndex ) / totalCPUs; 79 auto end = a.length * (threadIndex + 1) / totalCPUs; 80 chunkEqual[threadIndex] = a[start .. end] == b[start .. end]; 81 } 82 return chunkEqual.all!(a => a)(); 83 } 84 85 unittest 86 { 87 import std.array : array; 88 auto a = 1024.iota.array; 89 auto b = a.dup; 90 assert(parallelEqual(a, b)); 91 b[500] = 0; 92 assert(!parallelEqual(a, b)); 93 }