Hi Andreas, Le 17/12/2017 à 22:46, Andreas Tille a écrit :
> Thanks a lot for this hint. The problem is that libcolt-free-java is > dead as well and I'm absolutely not competent to port the code from > libconcurrent-java to java.util.concurrent. Its most probably very easy > but I'm not a Java programmer. Any help would be really welcome for > libcolt. I got a look at Colt, it only uses the FJTaskRunnerGroup feature of libconcurrent-java which is an early implementation of what eventually became the Java executors. I prepared a patch that should be equivalent, but some real world testing would be nice. Alternatively, I've found the ParallelColt project on GitHub [1] that seems to be more recent than libcolt-free-java. It already uses the JDK concurrent APIs, maybe it could be used as a replacement? Emmanuel Bourg [1] https://github.com/rwl/ParallelColt
diff --git a/src/cern/colt/matrix/linalg/Smp.java b/src/cern/colt/matrix/linalg/Smp.java index 42a4285..f34e985 100644 --- a/src/cern/colt/matrix/linalg/Smp.java +++ b/src/cern/colt/matrix/linalg/Smp.java @@ -8,13 +8,19 @@ It is provided "as is" without expressed or implied warranty. */ package cern.colt.matrix.linalg; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; + import cern.colt.matrix.DoubleMatrix2D; -import EDU.oswego.cs.dl.util.concurrent.FJTask; -import EDU.oswego.cs.dl.util.concurrent.FJTaskRunnerGroup; /* */ class Smp { - protected FJTaskRunnerGroup taskGroup; // a very efficient and light weight thread pool + protected ExecutorService taskGroup; // a very efficient and light weight thread pool protected int maxThreads; /** @@ -24,7 +30,7 @@ protected Smp(int maxThreads) { maxThreads = Math.max(1,maxThreads); this.maxThreads = maxThreads; if (maxThreads>1) { - this.taskGroup = new FJTaskRunnerGroup(maxThreads); + this.taskGroup = Executors.newFixedThreadPool(maxThreads); } else { // avoid parallel overhead this.taskGroup = null; @@ -34,31 +40,29 @@ protected Smp(int maxThreads) { * Clean up deamon threads, if necessary. */ public void finalize() { - if (this.taskGroup!=null) this.taskGroup.interruptAll(); + if (this.taskGroup!=null) this.taskGroup.shutdownNow(); } protected void run(final DoubleMatrix2D[] blocksA, final DoubleMatrix2D[] blocksB, final double[] results, final Matrix2DMatrix2DFunction function) { - final FJTask[] subTasks = new FJTask[blocksA.length]; + final Callable<Void>[] subTasks = new Callable[blocksA.length]; for (int i=0; i<blocksA.length; i++) { final int k = i; - subTasks[i] = new FJTask() { - public void run() { + subTasks[i] = new Callable<Void>() { + public Void call() { double result = function.apply(blocksA[k],blocksB != null ? blocksB[k] : null); if (results!=null) results[k] = result; //System.out.print("."); + return null; } }; } // run tasks and wait for completion try { - this.taskGroup.invoke( - new FJTask() { - public void run() { - coInvoke(subTasks); - } - } - ); - } catch (InterruptedException exc) {} + List<Future<Void>> futures = this.taskGroup.invokeAll(java.util.Arrays.asList(subTasks)); + for (Future<Void> future : futures) { + future.get(); + } + } catch (InterruptedException | ExecutionException exc) {} } protected DoubleMatrix2D[] splitBlockedNN(DoubleMatrix2D A, int threshold, long flops) { /* @@ -190,6 +194,5 @@ protected DoubleMatrix2D[] splitStridedNN(DoubleMatrix2D A, int threshold, long * Prints various snapshot statistics to System.out; Simply delegates to {@link EDU.oswego.cs.dl.util.concurrent.FJTaskRunnerGroup#stats}. */ public void stats() { - if (this.taskGroup!=null) this.taskGroup.stats(); } } diff --git a/src/cern/colt/matrix/linalg/SmpBlas.java b/src/cern/colt/matrix/linalg/SmpBlas.java index 969efd7..0df0984 100644 --- a/src/cern/colt/matrix/linalg/SmpBlas.java +++ b/src/cern/colt/matrix/linalg/SmpBlas.java @@ -8,9 +8,13 @@ It is provided "as is" without expressed or implied warranty. */ package cern.colt.matrix.linalg; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + import cern.colt.matrix.DoubleMatrix1D; import cern.colt.matrix.DoubleMatrix2D; -import EDU.oswego.cs.dl.util.concurrent.FJTask; /** Parallel implementation of the Basic Linear Algebra System for symmetric multi processing boxes. Currently only a few algorithms are parallelised; the others are fully functional, but run in sequential mode. @@ -198,7 +202,7 @@ public void dgemm(final boolean transposeA, final boolean transposeB, final doub // set up concurrent tasks int span = width/noOfTasks; - final FJTask[] subTasks = new FJTask[noOfTasks]; + final Callable<Void>[] subTasks = new Callable[noOfTasks]; for (int i=0; i<noOfTasks; i++) { final int offset = i*span; if (i==noOfTasks-1) span = width - span*i; // last span may be a bit larger @@ -217,24 +221,22 @@ public void dgemm(final boolean transposeA, final boolean transposeB, final doub CC = C.viewPart(offset,0,span,p); } - subTasks[i] = new FJTask() { - public void run() { + subTasks[i] = new Callable<Void>() { + public Void call() { seqBlas.dgemm(transposeA,transposeB,alpha,AA,BB,beta,CC); //System.out.println("Hello "+offset); + return null; } }; } // run tasks and wait for completion try { - this.smp.taskGroup.invoke( - new FJTask() { - public void run() { - coInvoke(subTasks); - } - } - ); - } catch (InterruptedException exc) {} + List<Future<Void>> futures = this.smp.taskGroup.invokeAll(java.util.Arrays.asList(subTasks)); + for (Future<Void> future : futures) { + future.get(); + } + } catch (InterruptedException | ExecutionException exc) {} } public void dgemv(final boolean transposeA, final double alpha, DoubleMatrix2D A, final DoubleMatrix1D x, final double beta, DoubleMatrix1D y) { /* @@ -271,7 +273,7 @@ public void dgemv(final boolean transposeA, final double alpha, DoubleMatrix2D A // set up concurrent tasks int span = width/noOfTasks; - final FJTask[] subTasks = new FJTask[noOfTasks]; + final Callable<Void>[] subTasks = new Callable[noOfTasks]; for (int i=0; i<noOfTasks; i++) { final int offset = i*span; if (i==noOfTasks-1) span = width - span*i; // last span may be a bit larger @@ -280,24 +282,22 @@ public void dgemv(final boolean transposeA, final double alpha, DoubleMatrix2D A final DoubleMatrix2D AA = A.viewPart(offset,0,span,n); final DoubleMatrix1D yy = y.viewPart(offset,span); - subTasks[i] = new FJTask() { - public void run() { + subTasks[i] = new Callable<Void>() { + public Void call() { seqBlas.dgemv(transposeA,alpha,AA,x,beta,yy); //System.out.println("Hello "+offset); + return null; } }; } // run tasks and wait for completion try { - this.smp.taskGroup.invoke( - new FJTask() { - public void run() { - coInvoke(subTasks); - } - } - ); - } catch (InterruptedException exc) {} + List<Future<Void>> futures = this.smp.taskGroup.invokeAll(java.util.Arrays.asList(subTasks)); + for (Future<Void> future : futures) { + future.get(); + } + } catch (InterruptedException | ExecutionException exc) {} } public void dger(double alpha, DoubleMatrix1D x, DoubleMatrix1D y, DoubleMatrix2D A) { seqBlas.dger(alpha,x,y,A);