Hi, I'm trying to understand the rationale for your proposed amount of
splitting and more precisely why that one is THE one.

If I put labels on your example numbers in one of your previous post:

 nbrOfElements <- 97
 nbrOfWorkers <- 5

With these, there are two extremes in how you can split up the
processing in chunks such that all workers are utilized:

(A) Each worker, called multiple times, processes one element each time:

> nbrOfElements <- 97
> nbrOfWorkers <- 5
> nbrOfChunks <- nbrOfElements
> sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length)
 [1] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
[30] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
[59] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
[88] 1 1 1 1 1 1 1 1 1 1


(B) Each worker, called once, processes multiple element:

> nbrOfElements <- 97
> nbrOfWorkers <- 5
> nbrOfChunks <- nbrOfWorkers
> sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length)
[1] 20 19 19 19 20

I understand that neither of these two extremes may be the best when
it comes to orchestration overhead and load balancing. Instead, the
best might be somewhere in-between, e.g.

(C) Each worker, called multiple times, processing multiple elements:

> nbrOfElements <- 97
> nbrOfWorkers <- 5
> nbrOfChunks <- nbrOfElements / nbrOfWorkers
> sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length)
 [1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5

However, there are multiple alternatives between the two extremes, e.g.

> nbrOfChunks <- scale * nbrOfElements / nbrOfWorkers

So, is there a reason why you argue for scale = 1.0 to be the optimal?

FYI, In future.apply::future_lapply(X, FUN, ...) there is a
'future.scheduling' scale factor(*) argument where default
future.scheduling = 1 corresponds to (B) and future.scheduling = +Inf
to (A).  Using future.scheduling = 4 achieves the amount of
load-balancing you propose in (C).   (*) Different definition from the
above 'scale'. (Disclaimer: I'm the author)

/Henrik

On Mon, Feb 19, 2018 at 10:21 AM, Christian Krause
<christian.kra...@idiv.de> wrote:
> Dear R-Devel List,
>
> I have installed R 3.4.3 with the patch applied on our cluster and ran a 
> *real-world* job of one of our users to confirm that the patch works to my 
> satisfaction. Here are the results.
>
> The original was a series of jobs, all essentially doing the same stuff using 
> bootstrapped data, so for the original there is more data and I show the 
> arithmetic mean with standard deviation. The confirmation with the patched R 
> was only a single instance of that series of jobs.
>
> ## Job Efficiency
>
> The job efficiency is defined as (this is what the `qacct-efficiency` tool 
> below does):
>
> ```
> efficiency = cputime / cores / wallclocktime * 100%
> ```
>
> In simpler words: how well did the job utilize its CPU cores. It shows the 
> percentage of time the job was actually doing stuff, as opposed to the 
> difference:
>
> ```
> wasted = 100% - efficiency
> ```
>
> ... which, essentially, tells us how much of the resources were wasted, i.e. 
> CPU cores just idling, without being used by anyone. We care a lot about that 
> because, for our scientific computing cluster, wasted resources is like 
> burning money.
>
> ### original
>
> This is the entire series from our job accounting database, filteres the 
> successful jobs, calculates efficiency and then shows the average and 
> standard deviation of the efficiency:
>
> ```
> $ qacct -j 4433299 | qacct-success | qacct-efficiency | meansd
> n=945 ∅ 61.7276 ± 7.78719
> ```
>
> This is the entire series from our job accounting database, filteres the 
> successful jobs, calculates efficiency and does sort of a histogram-like 
> binning before calculation of mean and standard deviation (to get a more 
> detailed impression of the distribution when standard deviation of the 
> previous command is comparatively high):
>
> ```
> $ qacct -j 4433299 | qacct-success | qacct-efficiency | meansd-bin -w 10 | 
> sort -gk1 | column -t
> 10  -  20  ->  n=3    ∅  19.21666666666667   ±  0.9112811494447459
> 20  -  30  ->  n=6    ∅  26.418333333333333  ±  2.665996374091058
> 30  -  40  ->  n=12   ∅  35.11583333333334   ±  2.8575783082671196
> 40  -  50  ->  n=14   ∅  45.35285714285715   ±  2.98623361591005
> 50  -  60  ->  n=344  ∅  57.114593023255814  ±  2.1922005551774415
> 60  -  70  ->  n=453  ∅  64.29536423841049   ±  2.8334788433963856
> 70  -  80  ->  n=108  ∅  72.95592592592598   ±  2.5219474143639276
> 80  -  90  ->  n=5    ∅  81.526              ±  1.2802265424525452
> ```
>
> I have attached an example graph from our monitoring system of a single 
> instance in my previous mail. There you can see that the load balancing does 
> not actually work, i.e. same as `parLapply`. This reflects in the job 
> efficiency.
>
> ### patch applied
>
> This is the single instance I used to confirm that the patch works:
>
> ```
> $ qacct -j 4562202 | qacct-efficiency
> 97.36
> ```
>
> The graph from our monitoring system is attached. As you can see, the load 
> balancing works to a satisfying degree and the efficiency is well above 90% 
> which was what I had hoped for :-)
>
> ## Additional Notes
>
> The list used in this jobs `parLapplyLB` is 5812 elements long. With the 
> `splitList`-chunking from the patch, you'll get 208 lists of about 28 
> elements (208 chunks of size 28). The job ran on 28 CPU cores and had a 
> wallclock time of 120351.590 seconds, i.e. 33.43 hours. Thus, the function we 
> apply to our list takes about 580 seconds per list element, i.e. about 10 
> minutes. I suppose, for that runtime, we would get even better load balancing 
> if we would reduce the chunk size even further, maybe even down to 1, thus 
> getting our efficiency even closer to 100%.
>
> Of course, for really short-running functions, a higher chunk size may be 
> more efficient because of the overhead. In our case, the overhead is 
> negligible and that is why the low chunk size works really well. In contrast, 
> for smallish lists with short-running functions, you might not even need load 
> balancing and `parLapply` suffices. It only becomes an issue, when the 
> runtime of the function is high and / or varying.
>
> In our case, the entire runtime of the entire series of jobs was:
>
> ```
> $ qacct -j 4433299 | awk '$1 == "wallclock" { sum += $2 } END { print sum, 
> "seconds" }'
> 4.72439e+09 seconds
> ```
>
> Thats about 150 years on a single core or 7.5 years on a 20 core server! Our 
> user was constantly using about 500 cores, so this took about 110 days. If 
> you compare this to my 97% efficiency example, the jobs could have been 
> finished in 75 days instead ;-)
>
> ## Upcoming Patch
>
> If this patch gets applied to the R code base (and I hope it will :-)) my 
> colleague and I will submit another patch that adds the chunk size as an 
> optional parameter to all off the load balancing functions. With that 
> parameter, users of these functions *can* decide for themselves which chunk 
> size they prefer for their code. As mentioned before, the most efficient 
> chunk size depends on the used functions runtime, which is the only thing R 
> does not know and users really should be allowed to specify explicitly. The 
> default of this new optional parameter would be the one we used here and this 
> would make that upcoming patch fully source-compatible.
>
> Best Regards
>
> On 02/12/2018 08:08 PM, Christian Krause wrote:
>> Dear R-Devel List,
>>
>> **TL;DR:** The function **parLapplyLB** of the parallel package has 
>> [reportedly][1] (see also attached RRD output) not
>> been doing its job, i.e. not actually balancing the load. My colleague Dirk 
>> Sarpe and I found the cause of the problem
>> and we also have a patch to fix it (attached). A similar fix has also been 
>> provided [here][2].
>>
>> [1]: 
>> https://stackoverflow.com/questions/38230831/why-does-parlapplylb-not-actually-balance-load
>> [2]: https://bugs.r-project.org/bugzilla3/show_bug.cgi?id=16792
>>
>>
>> ## The Call Chain
>>
>> First, we traced the relevant R function calls through the code, beginning 
>> with `parLapplyLB`:
>>
>> 1.  **parLapplyLB:** clusterApply.R:177, calls **splitList**, then 
>> **clusterApplyLB**
>> 2.  **splitList:** clusterApply.R:157
>> 3.  **clusterApplyLB:** clusterApply.R:87, calls **dynamicClusterApply**
>> 4.  **dynamicClusterApply:** clusterApply.R:39
>>
>>
>> ## splitList
>>
>> We used both our whiteboard and an R session to manually *run* a few 
>> examples. We were using lists of 100 elements and 5
>> workers. First, lets take a look at **splitList**:
>>
>> ```r
>>> sapply(parallel:::splitList(1:100, 5), length)
>> [1] 20 20 20 20 20
>>
>>> sapply(parallel:::splitList(1:97, 5), length)
>> [1] 20 19 19 19 20
>>
>>> sapply(parallel:::splitList(1:97, 20), length)
>>  [1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5
>> ```
>>
>> As we can see in the examples, the work is distributed as equally as 
>> possible.
>>
>>
>> ## dynamicClusterApply
>>
>> **dynamicClusterApply** works this way (simplified):
>>
>> 1.  it first gives a chunk to each worker
>> 2.  once a worker comes back with the result, it is given the next chunk
>>
>> **This is the important part:** As long as there are **more** chunks than 
>> workers, there will be load balancing. If
>> there are fewer chunks than workers, each worker will get **at most one 
>> chunk** and there is **no** load balancing.
>>
>>
>> ## parLapplyLB
>>
>> This is how **parLapplyLB** splits the input list (with a bit of 
>> refactoring, for readability):
>>
>> ```r
>> parLapplyLB <- function(cl = NULL, X, fun, ...)
>> {
>>     cl <- defaultCluster(cl)
>>
>>     chunks <- splitList(X, length(cl))
>>
>>     do.call(c,
>>             clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
>>             quote = TRUE)
>> }
>> ```
>>
>> For our examples, the chunks have these sizes:
>>
>> ```r
>>> sapply(parallel:::splitList(1:100, 5), length)
>> [1] 20 20 20 20 20
>> ```
>>
>> There we have it: 5 chunks. 5 workers. With this work distribution, there 
>> can't possibly be any load balancing, because
>> each worker is given a single chunk and then it stops working because there 
>> are no more chunks.
>>
>> Instead, **parLapplyLB** should look like this (patch is attached):
>>
>> ```r
>> parLapplyLB <- function(cl = NULL, X, fun, ...)
>> {
>>     cl <- defaultCluster(cl)
>>
>>     chunkSize <- max(length(cl), ceiling(length(X) / length(cl)))
>>
>>     chunks <- splitList(X, chunkSize)
>>
>>     do.call(c,
>>             clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
>>             quote = TRUE)
>> }
>> ```
>>
>> Examples with a cluster of 5 workers:
>>
>> ```r
>> # length(cl) < length(X)
>>> sapply(parallel:::splitList(1:100, ceiling(100 / 5)), length)
>>  [1] 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5
>>
>> # length(cl) >= length(X)
>>> sapply(parallel:::splitList(1:4, 4), length)
>> [1] 1 1 1 1
>> # one worker idles here, but we can't do better than that
>> ```
>>
>> With this patch, the number of chunks is larger than the number of workers, 
>> if possible at all, and then load balancing
>> should work.
>>
>> Best Regards
>>
>>
>>
>> ______________________________________________
>> R-devel@r-project.org mailing list
>> https://stat.ethz.ch/mailman/listinfo/r-devel
>>
>
> --
> Christian Krause
>
> Scientific Computing Administration and Support
>
> ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Email: christian.kra...@idiv.de
>
> Office: BioCity Leipzig 5e, Room 3.201.3
>
> Phone: +49 341 97 33144
>
> ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> German Centre for Integrative Biodiversity Research (iDiv) Halle-Jena-Leipzig
>
> Deutscher Platz 5e
>
> 04103 Leipzig
>
> Germany
>
> ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> iDiv is a research centre of the DFG – Deutsche Forschungsgemeinschaft
>
> iDiv ist eine zentrale Einrichtung der Universität Leipzig im Sinne des § 92 
> Abs. 1 SächsHSFG und wird zusammen mit der Martin-Luther-Universität 
> Halle-Wittenberg und der Friedrich-Schiller-Universität Jena betrieben sowie 
> in Kooperation mit dem Helmholtz-Zentrum für Umweltforschung GmbH – UFZ. 
> Beteiligte Kooperationspartner sind die folgenden außeruniversitären 
> Forschungseinrichtungen: das Helmholtz-Zentrum für Umweltforschung GmbH - 
> UFZ, das Max-Planck-Institut für Biogeochemie (MPI BGC), das 
> Max-Planck-Institut für chemische Ökologie (MPI CE), das Max-Planck-Institut 
> für evolutionäre Anthropologie (MPI EVA), das Leibniz-Institut Deutsche 
> Sammlung von Mikroorganismen und Zellkulturen (DSMZ), das Leibniz-Institut 
> für Pflanzenbiochemie (IPB), das Leibniz-Institut für Pflanzengenetik und 
> Kulturpflanzenforschung (IPK) und das Leibniz-Institut Senckenberg Museum für 
> Naturkunde Görlitz (SMNG). USt-IdNr. DE 141510383
>
>
> ______________________________________________
> R-devel@r-project.org mailing list
> https://stat.ethz.ch/mailman/listinfo/r-devel
>

______________________________________________
R-devel@r-project.org mailing list
https://stat.ethz.ch/mailman/listinfo/r-devel

Reply via email to