Joseph Wood
2018-09-26 16:17:40 UTC
As the subject states, this question is regarding populating a matrix
in parallel. I am currently reading "C++ Concurrency in Action:
Practical Multithreading" as I'd like to take some algorithms I have
to the next level. I have looked at the RcppParallel package, but the
features offered there do not seem to apply to this situation. I will
explain my reasoning further down. First, we set up our scenario.
1. We have an empty matrix with the number of rows equal to numRows
2. We are able to generate the ith row of the matrix at will
3. Our underlying subroutine populates the matrix from any
particular starting point one by one.
This scenario easily extends to a parallel setup. We have each entry
in our matrix being visited exactly one time by only one thread. The
idea is that if we have m threads, we can split up the work so that
each thread is responsible for populating roughly (numRows / m) number
of rows of our matrix.
Here is a simplified example that represents my real situation (In my
project I don't have the cpp11 plugin as I take care of this in
Makevars file with CXX_STD = CXX11):
#include <Rcpp.h>
#include <thread>
// [[Rcpp::plugins(cpp11)]]
int myFactorial(int n) {
int res = 1;
for (int i = 1; i <= n; ++i)
res *= i;
return res;
}
std::vector<int> nthPerm(int n, int index) {
int temp = myFactorial(n);
std::vector<int> indexVec(n), res(n);
std::iota(indexVec.begin(), indexVec.end(), 1);
for (int k = 0, r = n; k < n; ++k, --r) {
temp /= r;
int j = (int) index / temp;
res[k] = indexVec[j];
index -= (temp * j);
indexVec.erase(indexVec.begin() + j);
}
return res;
}
// Simplified version for demonstration only. The real subroutines
// that carry out this task are more complicated
void PopulateMatrix(Rcpp::IntegerMatrix permuteMatrix,
std::vector<int> z, int count, int nRows, int n) {
for (; count < nRows; ++count) {
for (int j = 0; j < n; ++j)
permuteMatrix(count, j) = z[j];
std::next_permutation(z.begin(), z.end());
}
}
// [[Rcpp::export]]
SEXP ParallelPerms(int n, int userThrds = 0) {
int nThreads = std::thread::hardware_concurrency() - 1;
std::vector<std::thread> myThreads;
nThreads = (userThrds > 0) ? std::min(userThrds, nThreads) : nThreads;
int step = 0, numRows = myFactorial(n);
int stepSize = numRows / nThreads;
int nextStep = stepSize;
std::vector<int> z(n);
std::iota(z.begin(), z.end(), 1);
Rcpp::IntegerMatrix myMat = Rcpp::no_init_matrix(numRows, n);
for (std::size_t j = 0; j < (nThreads - 1); ++j) {
myThreads.emplace_back(PopulateMatrix, std::ref(myMat), z,
step, nextStep, n);
step += stepSize;
nextStep += stepSize;
z = nthPerm(n, step);
}
// Guarantee that we get all the rows. N.B. We are passing numRows
// instead of nextStep... they aren't guaranteed to be the same
myThreads.emplace_back(PopulateMatrix, std::ref(myMat), z, step,
numRows, n);
for (auto& thr: myThreads)
thr.join();
return myMat;
}
I have read that Rcpp objects are not thread safe as they make
unpredictable calls to the garbage collector
(https://github.com/RcppCore/RcppParallel/issues/17), however Romain
Francois states:
"As soon as you don't use references for Rcpp types, you are not safe.
If you use references, it all depends on what you do with them."
I have a couple of questions regarding this.
My initial thought was I thought Rcpp objects were passed by reference
by default. Secondly, if this isn't the case, is it as simple as
adding an ampersand to all of my Rcpp objects in the function
parameters?
The project that I'm implementing this in (RcppAlgos) builds fine on
win-builder as well as all of the various rhub checks with no errors
(even check_with_sanitizers() and check_with_valgrind()). When I
submitted v 2.1.0 to CRAN, there were sporadic build errors on some of
the linux platforms. By sporadic, I mean sometimes it passes, and
other times it would fail with the error : segfault from C stack
overflow. The current version (v 2.2.0) still has the argument for
parallel computing, but it doesn't do anything. It is only there for
backwards compatibility.
When I initially submitted, it should be noted that I did not have my
matrices wrapped with std::ref in the call to create new threads, so
I'm not sure what effect this will have on those builds if I were to
submit to CRAN now. I will say that after I saw the errors on the
CRAN checks, I immediately sought to replicate them. I was successful
in extreme situations. For example, if I called the parallel
implementation thousands of times I could generate the error. I would
put these extreme tests in my tests folder and check them in a unit
test environment so as not to crash my r session.
I then sought out a more robust solution to my situation and found
that thread function arguments are by default pass by value, and if
you have a particular variable that is expected to be passed by
reference, then you must add std::ref (See
https://en.cppreference.com/w/cpp/thread/thread/thread). I have done
this and have noticed that I can't generate the issues with the
extreme tests above. HOWEVER, If I call it say 50000 times 10 times in
a row, I can sometimes generate an issue (not necessarily the segfault
above.. most of the time it is a stack imbalance warning... the
warning you get when the number of UNPROTECTS doesn't match the number
of PROTECTS in the R C API).
I then revisited the RcppParallel package to see if there were any
solutions there. I know that RcppParallel implements fully thread safe
objects like RMatrix<T>, however I can't see a way to set up a Worker
to populate my matrix. I guess the issue I see is that if you look at
my set-up above, we first get the starter vector with a call to
nthPerm from the parent function, then I pass this to PopulateMatrix
which proceeds to populate rows of my matrix for a given number of
rows. With the examples I've seen, there is no dependency on an
external function to get a specific entry point. I thought about
bypassing the RcppParallel::Worker altogether and simply use the
RMatrix<T> object, however I don't think this is how that object is to
be utilized. For example, I have not seen any RcppParallel examples
that preallocate an RMatrix<T> object.
My question is, is there a way to make my current set up thread-safe?
If not, is it possible at all to simply populate an object in parallel
safely?
Alternatively, if my question seems a bit naive, a nudge in the right
direction would be greatly appreciated. I don't mind going back to
square one, if need be.
Thanks,
Joseph Wood
in parallel. I am currently reading "C++ Concurrency in Action:
Practical Multithreading" as I'd like to take some algorithms I have
to the next level. I have looked at the RcppParallel package, but the
features offered there do not seem to apply to this situation. I will
explain my reasoning further down. First, we set up our scenario.
1. We have an empty matrix with the number of rows equal to numRows
2. We are able to generate the ith row of the matrix at will
3. Our underlying subroutine populates the matrix from any
particular starting point one by one.
This scenario easily extends to a parallel setup. We have each entry
in our matrix being visited exactly one time by only one thread. The
idea is that if we have m threads, we can split up the work so that
each thread is responsible for populating roughly (numRows / m) number
of rows of our matrix.
Here is a simplified example that represents my real situation (In my
project I don't have the cpp11 plugin as I take care of this in
Makevars file with CXX_STD = CXX11):
#include <Rcpp.h>
#include <thread>
// [[Rcpp::plugins(cpp11)]]
int myFactorial(int n) {
int res = 1;
for (int i = 1; i <= n; ++i)
res *= i;
return res;
}
std::vector<int> nthPerm(int n, int index) {
int temp = myFactorial(n);
std::vector<int> indexVec(n), res(n);
std::iota(indexVec.begin(), indexVec.end(), 1);
for (int k = 0, r = n; k < n; ++k, --r) {
temp /= r;
int j = (int) index / temp;
res[k] = indexVec[j];
index -= (temp * j);
indexVec.erase(indexVec.begin() + j);
}
return res;
}
// Simplified version for demonstration only. The real subroutines
// that carry out this task are more complicated
void PopulateMatrix(Rcpp::IntegerMatrix permuteMatrix,
std::vector<int> z, int count, int nRows, int n) {
for (; count < nRows; ++count) {
for (int j = 0; j < n; ++j)
permuteMatrix(count, j) = z[j];
std::next_permutation(z.begin(), z.end());
}
}
// [[Rcpp::export]]
SEXP ParallelPerms(int n, int userThrds = 0) {
int nThreads = std::thread::hardware_concurrency() - 1;
std::vector<std::thread> myThreads;
nThreads = (userThrds > 0) ? std::min(userThrds, nThreads) : nThreads;
int step = 0, numRows = myFactorial(n);
int stepSize = numRows / nThreads;
int nextStep = stepSize;
std::vector<int> z(n);
std::iota(z.begin(), z.end(), 1);
Rcpp::IntegerMatrix myMat = Rcpp::no_init_matrix(numRows, n);
for (std::size_t j = 0; j < (nThreads - 1); ++j) {
myThreads.emplace_back(PopulateMatrix, std::ref(myMat), z,
step, nextStep, n);
step += stepSize;
nextStep += stepSize;
z = nthPerm(n, step);
}
// Guarantee that we get all the rows. N.B. We are passing numRows
// instead of nextStep... they aren't guaranteed to be the same
myThreads.emplace_back(PopulateMatrix, std::ref(myMat), z, step,
numRows, n);
for (auto& thr: myThreads)
thr.join();
return myMat;
}
I have read that Rcpp objects are not thread safe as they make
unpredictable calls to the garbage collector
(https://github.com/RcppCore/RcppParallel/issues/17), however Romain
Francois states:
"As soon as you don't use references for Rcpp types, you are not safe.
If you use references, it all depends on what you do with them."
I have a couple of questions regarding this.
My initial thought was I thought Rcpp objects were passed by reference
by default. Secondly, if this isn't the case, is it as simple as
adding an ampersand to all of my Rcpp objects in the function
parameters?
The project that I'm implementing this in (RcppAlgos) builds fine on
win-builder as well as all of the various rhub checks with no errors
(even check_with_sanitizers() and check_with_valgrind()). When I
submitted v 2.1.0 to CRAN, there were sporadic build errors on some of
the linux platforms. By sporadic, I mean sometimes it passes, and
other times it would fail with the error : segfault from C stack
overflow. The current version (v 2.2.0) still has the argument for
parallel computing, but it doesn't do anything. It is only there for
backwards compatibility.
When I initially submitted, it should be noted that I did not have my
matrices wrapped with std::ref in the call to create new threads, so
I'm not sure what effect this will have on those builds if I were to
submit to CRAN now. I will say that after I saw the errors on the
CRAN checks, I immediately sought to replicate them. I was successful
in extreme situations. For example, if I called the parallel
implementation thousands of times I could generate the error. I would
put these extreme tests in my tests folder and check them in a unit
test environment so as not to crash my r session.
I then sought out a more robust solution to my situation and found
that thread function arguments are by default pass by value, and if
you have a particular variable that is expected to be passed by
reference, then you must add std::ref (See
https://en.cppreference.com/w/cpp/thread/thread/thread). I have done
this and have noticed that I can't generate the issues with the
extreme tests above. HOWEVER, If I call it say 50000 times 10 times in
a row, I can sometimes generate an issue (not necessarily the segfault
above.. most of the time it is a stack imbalance warning... the
warning you get when the number of UNPROTECTS doesn't match the number
of PROTECTS in the R C API).
I then revisited the RcppParallel package to see if there were any
solutions there. I know that RcppParallel implements fully thread safe
objects like RMatrix<T>, however I can't see a way to set up a Worker
to populate my matrix. I guess the issue I see is that if you look at
my set-up above, we first get the starter vector with a call to
nthPerm from the parent function, then I pass this to PopulateMatrix
which proceeds to populate rows of my matrix for a given number of
rows. With the examples I've seen, there is no dependency on an
external function to get a specific entry point. I thought about
bypassing the RcppParallel::Worker altogether and simply use the
RMatrix<T> object, however I don't think this is how that object is to
be utilized. For example, I have not seen any RcppParallel examples
that preallocate an RMatrix<T> object.
My question is, is there a way to make my current set up thread-safe?
If not, is it possible at all to simply populate an object in parallel
safely?
Alternatively, if my question seems a bit naive, a nudge in the right
direction would be greatly appreciated. I don't mind going back to
square one, if need be.
Thanks,
Joseph Wood