Add parallelTransformReduce and parallelForEachError
authorReid Kleckner <rnk@google.com>
Mon, 2 Nov 2020 18:15:34 +0000 (10:15 -0800)
committerReid Kleckner <rnk@google.com>
Tue, 3 Nov 2020 00:50:14 +0000 (16:50 -0800)
parallelTransformReduce is modelled on the C++17 pstl API of
std::transform_reduce, except our wrappers do not use execution policy
parameters.

parallelForEachError allows loops that contain potentially failing
operations to propagate errors out of the loop. This was one of the
major challenges I encountered while parallelizing PDB type merging in
LLD. Parallelizing a loop with parallelForEachError is not behavior
preserving: the loop will no longer stop on the first error, it will
continue working and report all errors it encounters in a list.

I plan to use this to propagate errors out of LLD's
coff::TpiSource::remapTpiWithGHashes, which currently stores errors an
error in the TpiSource object.

Differential Revision: https://reviews.llvm.org/D90639

llvm/include/llvm/Support/Parallel.h
llvm/unittests/Support/ParallelTest.cpp

index 2c0edfb..d2f0067 100644 (file)
@@ -11,6 +11,7 @@
 
 #include "llvm/ADT/STLExtras.h"
 #include "llvm/Config/llvm-config.h"
+#include "llvm/Support/Error.h"
 #include "llvm/Support/MathExtras.h"
 #include "llvm/Support/Threading.h"
 
@@ -120,13 +121,17 @@ void parallel_sort(RandomAccessIterator Start, RandomAccessIterator End,
                       llvm::Log2_64(std::distance(Start, End)) + 1);
 }
 
+// TaskGroup has a relatively high overhead, so we want to reduce
+// the number of spawn() calls. We'll create up to 1024 tasks here.
+// (Note that 1024 is an arbitrary number. This code probably needs
+// improving to take the number of available cores into account.)
+enum { MaxTasksPerGroup = 1024 };
+
 template <class IterTy, class FuncTy>
 void parallel_for_each(IterTy Begin, IterTy End, FuncTy Fn) {
-  // TaskGroup has a relatively high overhead, so we want to reduce
-  // the number of spawn() calls. We'll create up to 1024 tasks here.
-  // (Note that 1024 is an arbitrary number. This code probably needs
-  // improving to take the number of available cores into account.)
-  ptrdiff_t TaskSize = std::distance(Begin, End) / 1024;
+  // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
+  // overhead on large inputs.
+  ptrdiff_t TaskSize = std::distance(Begin, End) / MaxTasksPerGroup;
   if (TaskSize == 0)
     TaskSize = 1;
 
@@ -140,7 +145,9 @@ void parallel_for_each(IterTy Begin, IterTy End, FuncTy Fn) {
 
 template <class IndexTy, class FuncTy>
 void parallel_for_each_n(IndexTy Begin, IndexTy End, FuncTy Fn) {
-  ptrdiff_t TaskSize = (End - Begin) / 1024;
+  // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
+  // overhead on large inputs.
+  ptrdiff_t TaskSize = (End - Begin) / MaxTasksPerGroup;
   if (TaskSize == 0)
     TaskSize = 1;
 
@@ -156,6 +163,50 @@ void parallel_for_each_n(IndexTy Begin, IndexTy End, FuncTy Fn) {
     Fn(J);
 }
 
+template <class IterTy, class ResultTy, class ReduceFuncTy,
+          class TransformFuncTy>
+ResultTy parallel_transform_reduce(IterTy Begin, IterTy End, ResultTy Init,
+                                   ReduceFuncTy Reduce,
+                                   TransformFuncTy Transform) {
+  // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
+  // overhead on large inputs.
+  size_t NumInputs = std::distance(Begin, End);
+  if (NumInputs == 0)
+    return std::move(Init);
+  size_t NumTasks = std::min(static_cast<size_t>(MaxTasksPerGroup), NumInputs);
+  std::vector<ResultTy> Results(NumTasks, Init);
+  {
+    // Each task processes either TaskSize or TaskSize+1 inputs. Any inputs
+    // remaining after dividing them equally amongst tasks are distributed as
+    // one extra input over the first tasks.
+    TaskGroup TG;
+    size_t TaskSize = NumInputs / NumTasks;
+    size_t RemainingInputs = NumInputs % NumTasks;
+    IterTy TBegin = Begin;
+    for (size_t TaskId = 0; TaskId < NumTasks; ++TaskId) {
+      IterTy TEnd = TBegin + TaskSize + (TaskId < RemainingInputs ? 1 : 0);
+      TG.spawn([=, &Transform, &Reduce, &Results] {
+        // Reduce the result of transformation eagerly within each task.
+        ResultTy R = Init;
+        for (IterTy It = TBegin; It != TEnd; ++It)
+          R = Reduce(R, Transform(*It));
+        Results[TaskId] = R;
+      });
+      TBegin = TEnd;
+    }
+    assert(TBegin == End);
+  }
+
+  // Do a final reduction. There are at most 1024 tasks, so this only adds
+  // constant single-threaded overhead for large inputs. Hopefully most
+  // reductions are cheaper than the transformation.
+  ResultTy FinalResult = std::move(Results.front());
+  for (ResultTy &PartialResult :
+       makeMutableArrayRef(Results.data() + 1, Results.size() - 1))
+    FinalResult = Reduce(FinalResult, std::move(PartialResult));
+  return std::move(FinalResult);
+}
+
 #endif
 
 } // namespace detail
@@ -198,6 +249,22 @@ void parallelForEachN(size_t Begin, size_t End, FuncTy Fn) {
     Fn(I);
 }
 
+template <class IterTy, class ResultTy, class ReduceFuncTy,
+          class TransformFuncTy>
+ResultTy parallelTransformReduce(IterTy Begin, IterTy End, ResultTy Init,
+                                 ReduceFuncTy Reduce,
+                                 TransformFuncTy Transform) {
+#if LLVM_ENABLE_THREADS
+  if (parallel::strategy.ThreadsRequested != 1) {
+    return parallel::detail::parallel_transform_reduce(Begin, End, Init, Reduce,
+                                                       Transform);
+  }
+#endif
+  for (IterTy I = Begin; I != End; ++I)
+    Init = Reduce(std::move(Init), Transform(*I));
+  return std::move(Init);
+}
+
 // Range wrappers.
 template <class RangeTy,
           class Comparator = std::less<decltype(*std::begin(RangeTy()))>>
@@ -210,6 +277,31 @@ void parallelForEach(RangeTy &&R, FuncTy Fn) {
   parallelForEach(std::begin(R), std::end(R), Fn);
 }
 
+template <class RangeTy, class ResultTy, class ReduceFuncTy,
+          class TransformFuncTy>
+ResultTy parallelTransformReduce(RangeTy &&R, ResultTy Init,
+                                 ReduceFuncTy Reduce,
+                                 TransformFuncTy Transform) {
+  return parallelTransformReduce(std::begin(R), std::end(R), Init, Reduce,
+                                 Transform);
+}
+
+// Parallel for-each, but with error handling.
+template <class RangeTy, class FuncTy>
+Error parallelForEachError(RangeTy &&R, FuncTy Fn) {
+  // The transform_reduce algorithm requires that the initial value be copyable.
+  // Error objects are uncopyable. We only need to copy initial success values,
+  // so work around this mismatch via the C API. The C API represents success
+  // values with a null pointer. The joinErrors discards null values and joins
+  // multiple errors into an ErrorList.
+  return unwrap(parallelTransformReduce(
+      std::begin(R), std::end(R), wrap(Error::success()),
+      [](LLVMErrorRef Lhs, LLVMErrorRef Rhs) {
+        return wrap(joinErrors(unwrap(Lhs), unwrap(Rhs)));
+      },
+      [&Fn](auto &&V) { return wrap(Fn(V)); }));
+}
+
 } // namespace llvm
 
 #endif // LLVM_SUPPORT_PARALLEL_H
index 2283c51..f4bd10b 100644 (file)
@@ -49,4 +49,47 @@ TEST(Parallel, parallel_for) {
   ASSERT_EQ(range[2049], 1u);
 }
 
+TEST(Parallel, TransformReduce) {
+  // Sum an empty list, check that it works.
+  auto identity = [](uint32_t v) { return v; };
+  uint32_t sum = parallelTransformReduce(ArrayRef<uint32_t>(), 0U,
+                                         std::plus<uint32_t>(), identity);
+  EXPECT_EQ(sum, 0U);
+
+  // Sum the lengths of these strings in parallel.
+  const char *strs[] = {"a", "ab", "abc", "abcd", "abcde", "abcdef"};
+  size_t lenSum =
+      parallelTransformReduce(strs, static_cast<size_t>(0), std::plus<size_t>(),
+                              [](const char *s) { return strlen(s); });
+  EXPECT_EQ(lenSum, static_cast<size_t>(21));
+
+  // Check that we handle non-divisible task sizes as above.
+  uint32_t range[2050];
+  std::fill(std::begin(range), std::end(range), 1);
+  sum = parallelTransformReduce(range, 0U, std::plus<uint32_t>(), identity);
+  EXPECT_EQ(sum, 2050U);
+
+  std::fill(std::begin(range), std::end(range), 2);
+  sum = parallelTransformReduce(range, 0U, std::plus<uint32_t>(), identity);
+  EXPECT_EQ(sum, 4100U);
+
+  // Avoid one large task.
+  uint32_t range2[3060];
+  std::fill(std::begin(range2), std::end(range2), 1);
+  sum = parallelTransformReduce(range2, 0U, std::plus<uint32_t>(), identity);
+  EXPECT_EQ(sum, 3060U);
+}
+
+TEST(Parallel, ForEachError) {
+  int nums[] = {1, 2, 3, 4, 5, 6};
+  Error e = parallelForEachError(nums, [](int v) -> Error {
+    if ((v & 1) == 0)
+      return createStringError(std::errc::invalid_argument, "asdf");
+    return Error::success();
+  });
+  EXPECT_TRUE(e.isA<ErrorList>());
+  std::string errText = toString(std::move(e));
+  EXPECT_EQ(errText, std::string("asdf\nasdf\nasdf"));
+}
+
 #endif