Hi, I'm trying to understand the rationale for your proposed amount of splitting and more precisely why that one is THE one.
If I put labels on your example numbers in one of your previous post: nbrOfElements <- 97 nbrOfWorkers <- 5 With these, there are two extremes in how you can split up the processing in chunks such that all workers are utilized: (A) Each worker, called multiple times, processes one element each time: > nbrOfElements <- 97 > nbrOfWorkers <- 5 > nbrOfChunks <- nbrOfElements > sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length) [1] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 [30] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 [59] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 [88] 1 1 1 1 1 1 1 1 1 1 (B) Each worker, called once, processes multiple element: > nbrOfElements <- 97 > nbrOfWorkers <- 5 > nbrOfChunks <- nbrOfWorkers > sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length) [1] 20 19 19 19 20 I understand that neither of these two extremes may be the best when it comes to orchestration overhead and load balancing. Instead, the best might be somewhere in-between, e.g. (C) Each worker, called multiple times, processing multiple elements: > nbrOfElements <- 97 > nbrOfWorkers <- 5 > nbrOfChunks <- nbrOfElements / nbrOfWorkers > sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length) [1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5 However, there are multiple alternatives between the two extremes, e.g. > nbrOfChunks <- scale * nbrOfElements / nbrOfWorkers So, is there a reason why you argue for scale = 1.0 to be the optimal? FYI, In future.apply::future_lapply(X, FUN, ...) there is a 'future.scheduling' scale factor(*) argument where default future.scheduling = 1 corresponds to (B) and future.scheduling = +Inf to (A). Using future.scheduling = 4 achieves the amount of load-balancing you propose in (C). (*) Different definition from the above 'scale'. (Disclaimer: I'm the author) /Henrik On Mon, Feb 19, 2018 at 10:21 AM, Christian Krause <christian.kra...@idiv.de> wrote: > Dear R-Devel List, > > I have installed R 3.4.3 with the patch applied on our cluster and ran a > *real-world* job of one of our users to confirm that the patch works to my > satisfaction. Here are the results. > > The original was a series of jobs, all essentially doing the same stuff using > bootstrapped data, so for the original there is more data and I show the > arithmetic mean with standard deviation. The confirmation with the patched R > was only a single instance of that series of jobs. > > ## Job Efficiency > > The job efficiency is defined as (this is what the `qacct-efficiency` tool > below does): > > ``` > efficiency = cputime / cores / wallclocktime * 100% > ``` > > In simpler words: how well did the job utilize its CPU cores. It shows the > percentage of time the job was actually doing stuff, as opposed to the > difference: > > ``` > wasted = 100% - efficiency > ``` > > ... which, essentially, tells us how much of the resources were wasted, i.e. > CPU cores just idling, without being used by anyone. We care a lot about that > because, for our scientific computing cluster, wasted resources is like > burning money. > > ### original > > This is the entire series from our job accounting database, filteres the > successful jobs, calculates efficiency and then shows the average and > standard deviation of the efficiency: > > ``` > $ qacct -j 4433299 | qacct-success | qacct-efficiency | meansd > n=945 ∅ 61.7276 ± 7.78719 > ``` > > This is the entire series from our job accounting database, filteres the > successful jobs, calculates efficiency and does sort of a histogram-like > binning before calculation of mean and standard deviation (to get a more > detailed impression of the distribution when standard deviation of the > previous command is comparatively high): > > ``` > $ qacct -j 4433299 | qacct-success | qacct-efficiency | meansd-bin -w 10 | > sort -gk1 | column -t > 10 - 20 -> n=3 ∅ 19.21666666666667 ± 0.9112811494447459 > 20 - 30 -> n=6 ∅ 26.418333333333333 ± 2.665996374091058 > 30 - 40 -> n=12 ∅ 35.11583333333334 ± 2.8575783082671196 > 40 - 50 -> n=14 ∅ 45.35285714285715 ± 2.98623361591005 > 50 - 60 -> n=344 ∅ 57.114593023255814 ± 2.1922005551774415 > 60 - 70 -> n=453 ∅ 64.29536423841049 ± 2.8334788433963856 > 70 - 80 -> n=108 ∅ 72.95592592592598 ± 2.5219474143639276 > 80 - 90 -> n=5 ∅ 81.526 ± 1.2802265424525452 > ``` > > I have attached an example graph from our monitoring system of a single > instance in my previous mail. There you can see that the load balancing does > not actually work, i.e. same as `parLapply`. This reflects in the job > efficiency. > > ### patch applied > > This is the single instance I used to confirm that the patch works: > > ``` > $ qacct -j 4562202 | qacct-efficiency > 97.36 > ``` > > The graph from our monitoring system is attached. As you can see, the load > balancing works to a satisfying degree and the efficiency is well above 90% > which was what I had hoped for :-) > > ## Additional Notes > > The list used in this jobs `parLapplyLB` is 5812 elements long. With the > `splitList`-chunking from the patch, you'll get 208 lists of about 28 > elements (208 chunks of size 28). The job ran on 28 CPU cores and had a > wallclock time of 120351.590 seconds, i.e. 33.43 hours. Thus, the function we > apply to our list takes about 580 seconds per list element, i.e. about 10 > minutes. I suppose, for that runtime, we would get even better load balancing > if we would reduce the chunk size even further, maybe even down to 1, thus > getting our efficiency even closer to 100%. > > Of course, for really short-running functions, a higher chunk size may be > more efficient because of the overhead. In our case, the overhead is > negligible and that is why the low chunk size works really well. In contrast, > for smallish lists with short-running functions, you might not even need load > balancing and `parLapply` suffices. It only becomes an issue, when the > runtime of the function is high and / or varying. > > In our case, the entire runtime of the entire series of jobs was: > > ``` > $ qacct -j 4433299 | awk '$1 == "wallclock" { sum += $2 } END { print sum, > "seconds" }' > 4.72439e+09 seconds > ``` > > Thats about 150 years on a single core or 7.5 years on a 20 core server! Our > user was constantly using about 500 cores, so this took about 110 days. If > you compare this to my 97% efficiency example, the jobs could have been > finished in 75 days instead ;-) > > ## Upcoming Patch > > If this patch gets applied to the R code base (and I hope it will :-)) my > colleague and I will submit another patch that adds the chunk size as an > optional parameter to all off the load balancing functions. With that > parameter, users of these functions *can* decide for themselves which chunk > size they prefer for their code. As mentioned before, the most efficient > chunk size depends on the used functions runtime, which is the only thing R > does not know and users really should be allowed to specify explicitly. The > default of this new optional parameter would be the one we used here and this > would make that upcoming patch fully source-compatible. > > Best Regards > > On 02/12/2018 08:08 PM, Christian Krause wrote: >> Dear R-Devel List, >> >> **TL;DR:** The function **parLapplyLB** of the parallel package has >> [reportedly][1] (see also attached RRD output) not >> been doing its job, i.e. not actually balancing the load. My colleague Dirk >> Sarpe and I found the cause of the problem >> and we also have a patch to fix it (attached). A similar fix has also been >> provided [here][2]. >> >> [1]: >> https://stackoverflow.com/questions/38230831/why-does-parlapplylb-not-actually-balance-load >> [2]: https://bugs.r-project.org/bugzilla3/show_bug.cgi?id=16792 >> >> >> ## The Call Chain >> >> First, we traced the relevant R function calls through the code, beginning >> with `parLapplyLB`: >> >> 1. **parLapplyLB:** clusterApply.R:177, calls **splitList**, then >> **clusterApplyLB** >> 2. **splitList:** clusterApply.R:157 >> 3. **clusterApplyLB:** clusterApply.R:87, calls **dynamicClusterApply** >> 4. **dynamicClusterApply:** clusterApply.R:39 >> >> >> ## splitList >> >> We used both our whiteboard and an R session to manually *run* a few >> examples. We were using lists of 100 elements and 5 >> workers. First, lets take a look at **splitList**: >> >> ```r >>> sapply(parallel:::splitList(1:100, 5), length) >> [1] 20 20 20 20 20 >> >>> sapply(parallel:::splitList(1:97, 5), length) >> [1] 20 19 19 19 20 >> >>> sapply(parallel:::splitList(1:97, 20), length) >> [1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5 >> ``` >> >> As we can see in the examples, the work is distributed as equally as >> possible. >> >> >> ## dynamicClusterApply >> >> **dynamicClusterApply** works this way (simplified): >> >> 1. it first gives a chunk to each worker >> 2. once a worker comes back with the result, it is given the next chunk >> >> **This is the important part:** As long as there are **more** chunks than >> workers, there will be load balancing. If >> there are fewer chunks than workers, each worker will get **at most one >> chunk** and there is **no** load balancing. >> >> >> ## parLapplyLB >> >> This is how **parLapplyLB** splits the input list (with a bit of >> refactoring, for readability): >> >> ```r >> parLapplyLB <- function(cl = NULL, X, fun, ...) >> { >> cl <- defaultCluster(cl) >> >> chunks <- splitList(X, length(cl)) >> >> do.call(c, >> clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...), >> quote = TRUE) >> } >> ``` >> >> For our examples, the chunks have these sizes: >> >> ```r >>> sapply(parallel:::splitList(1:100, 5), length) >> [1] 20 20 20 20 20 >> ``` >> >> There we have it: 5 chunks. 5 workers. With this work distribution, there >> can't possibly be any load balancing, because >> each worker is given a single chunk and then it stops working because there >> are no more chunks. >> >> Instead, **parLapplyLB** should look like this (patch is attached): >> >> ```r >> parLapplyLB <- function(cl = NULL, X, fun, ...) >> { >> cl <- defaultCluster(cl) >> >> chunkSize <- max(length(cl), ceiling(length(X) / length(cl))) >> >> chunks <- splitList(X, chunkSize) >> >> do.call(c, >> clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...), >> quote = TRUE) >> } >> ``` >> >> Examples with a cluster of 5 workers: >> >> ```r >> # length(cl) < length(X) >>> sapply(parallel:::splitList(1:100, ceiling(100 / 5)), length) >> [1] 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 >> >> # length(cl) >= length(X) >>> sapply(parallel:::splitList(1:4, 4), length) >> [1] 1 1 1 1 >> # one worker idles here, but we can't do better than that >> ``` >> >> With this patch, the number of chunks is larger than the number of workers, >> if possible at all, and then load balancing >> should work. >> >> Best Regards >> >> >> >> ______________________________________________ >> R-devel@r-project.org mailing list >> https://stat.ethz.ch/mailman/listinfo/r-devel >> > > -- > Christian Krause > > Scientific Computing Administration and Support > > ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ > > Email: christian.kra...@idiv.de > > Office: BioCity Leipzig 5e, Room 3.201.3 > > Phone: +49 341 97 33144 > > ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ > > German Centre for Integrative Biodiversity Research (iDiv) Halle-Jena-Leipzig > > Deutscher Platz 5e > > 04103 Leipzig > > Germany > > ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ > > iDiv is a research centre of the DFG – Deutsche Forschungsgemeinschaft > > iDiv ist eine zentrale Einrichtung der Universität Leipzig im Sinne des § 92 > Abs. 1 SächsHSFG und wird zusammen mit der Martin-Luther-Universität > Halle-Wittenberg und der Friedrich-Schiller-Universität Jena betrieben sowie > in Kooperation mit dem Helmholtz-Zentrum für Umweltforschung GmbH – UFZ. > Beteiligte Kooperationspartner sind die folgenden außeruniversitären > Forschungseinrichtungen: das Helmholtz-Zentrum für Umweltforschung GmbH - > UFZ, das Max-Planck-Institut für Biogeochemie (MPI BGC), das > Max-Planck-Institut für chemische Ökologie (MPI CE), das Max-Planck-Institut > für evolutionäre Anthropologie (MPI EVA), das Leibniz-Institut Deutsche > Sammlung von Mikroorganismen und Zellkulturen (DSMZ), das Leibniz-Institut > für Pflanzenbiochemie (IPB), das Leibniz-Institut für Pflanzengenetik und > Kulturpflanzenforschung (IPK) und das Leibniz-Institut Senckenberg Museum für > Naturkunde Görlitz (SMNG). USt-IdNr. DE 141510383 > > > ______________________________________________ > R-devel@r-project.org mailing list > https://stat.ethz.ch/mailman/listinfo/r-devel > ______________________________________________ R-devel@r-project.org mailing list https://stat.ethz.ch/mailman/listinfo/r-devel