http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6f9c9e6/modules/scalar/src/main/scala/org/apache/ignite/scalar/pimps/ScalarGridPimp.scala ---------------------------------------------------------------------- diff --git a/modules/scalar/src/main/scala/org/apache/ignite/scalar/pimps/ScalarGridPimp.scala b/modules/scalar/src/main/scala/org/apache/ignite/scalar/pimps/ScalarGridPimp.scala new file mode 100644 index 0000000..e75d37c --- /dev/null +++ b/modules/scalar/src/main/scala/org/apache/ignite/scalar/pimps/ScalarGridPimp.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.scalar.pimps + +import org.apache.ignite.scheduler.SchedulerFuture +import org.apache.ignite.{IgniteCluster, Ignite} +import org.apache.ignite._ +import org.gridgain.grid._ +import org.jetbrains.annotations.Nullable + +/** + * Companion object. + */ +object ScalarGridPimp { + /** + * Creates new Scalar grid pimp with given Java-side implementation. + * + * @param impl Java-side implementation. + */ + def apply(impl: Ignite) = { + if (impl == null) + throw new NullPointerException("impl") + + val pimp = new ScalarGridPimp + + pimp.impl = impl.cluster() + + pimp + } +} + +/** + * ==Overview== + * Defines Scalar "pimp" for `Grid` on Java side. + * + * Essentially this class extends Java `GridProjection` interface with Scala specific + * API adapters using primarily implicit conversions defined in `ScalarConversions` object. What + * it means is that you can use functions defined in this class on object + * of Java `GridProjection` type. Scala will automatically (implicitly) convert it into + * Scalar's pimp and replace the original call with a call on that pimp. + * + * Note that Scalar provide extensive library of implicit conversion between Java and + * Scala GridGain counterparts in `ScalarConversions` object + * + * ==Suffix '$' In Names== + * Symbol `$` is used in names when they conflict with the names in the base Java class + * that Scala pimp is shadowing or with Java package name that your Scala code is importing. + * Instead of giving two different names to the same function we've decided to simply mark + * Scala's side method with `$` suffix. + */ +class ScalarGridPimp extends ScalarProjectionPimp[IgniteCluster] with ScalarTaskThreadContext[IgniteCluster] { + /** + * Schedules closure for execution using local cron-based scheduling. + * + * @param s Closure to schedule to run as a background cron-based job. + * @param ptrn Scheduling pattern in UNIX cron format with optional prefix `{n1, n2}` + * where `n1` is delay of scheduling in seconds and `n2` is the number of execution. Both + * parameters are optional. + */ + def scheduleLocalCall[R](@Nullable s: Call[R], ptrn: String): SchedulerFuture[R] = { + assert(ptrn != null) + + value.ignite().scheduler().scheduleLocal(toCallable(s), ptrn) + } + + /** + * Schedules closure for execution using local cron-based scheduling. + * + * @param s Closure to schedule to run as a background cron-based job. + * @param ptrn Scheduling pattern in UNIX cron format with optional prefix `{n1, n2}` + * where `n1` is delay of scheduling in seconds and `n2` is the number of execution. Both + * parameters are optional. + */ + def scheduleLocalRun(@Nullable s: Run, ptrn: String): SchedulerFuture[_] = { + assert(ptrn != null) + + value.ignite().scheduler().scheduleLocal(toRunnable(s), ptrn) + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6f9c9e6/modules/scalar/src/main/scala/org/apache/ignite/scalar/pimps/ScalarProjectionPimp.scala ---------------------------------------------------------------------- diff --git a/modules/scalar/src/main/scala/org/apache/ignite/scalar/pimps/ScalarProjectionPimp.scala b/modules/scalar/src/main/scala/org/apache/ignite/scalar/pimps/ScalarProjectionPimp.scala new file mode 100644 index 0000000..21db933 --- /dev/null +++ b/modules/scalar/src/main/scala/org/apache/ignite/scalar/pimps/ScalarProjectionPimp.scala @@ -0,0 +1,659 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.scalar.pimps + +import org.apache.ignite.cluster.{ClusterGroupEmptyException, ClusterGroup, ClusterNode} +import org.apache.ignite.lang.{IgniteFutureCancelledException, IgniteFuture, IgnitePredicate} +import org.apache.ignite._ +import org.gridgain.grid._ +import org.jetbrains.annotations._ + +/** + * Companion object. + */ +object ScalarProjectionPimp { + /** + * Creates new Scalar projection pimp with given Java-side implementation. + * + * @param impl Java-side implementation. + */ + def apply(impl: ClusterGroup) = { + if (impl == null) + throw new NullPointerException("impl") + + val pimp = new ScalarProjectionPimp[ClusterGroup] + + pimp.impl = impl + + pimp + } +} + +/** + * ==Overview== + * Defines Scalar "pimp" for `GridProjection` on Java side. + * + * Essentially this class extends Java `GridProjection` interface with Scala specific + * API adapters using primarily implicit conversions defined in `ScalarConversions` object. What + * it means is that you can use functions defined in this class on object + * of Java `GridProjection` type. Scala will automatically (implicitly) convert it into + * Scalar's pimp and replace the original call with a call on that pimp. + * + * Note that Scalar provide extensive library of implicit conversion between Java and + * Scala GridGain counterparts in `ScalarConversions` object + * + * ==Suffix '$' In Names== + * Symbol `$` is used in names when they conflict with the names in the base Java class + * that Scala pimp is shadowing or with Java package name that your Scala code is importing. + * Instead of giving two different names to the same function we've decided to simply mark + * Scala's side method with `$` suffix. + */ +class ScalarProjectionPimp[A <: ClusterGroup] extends PimpedType[A] with Iterable[ClusterNode] + with ScalarTaskThreadContext[A] { + /** */ + lazy val value: A = impl + + /** */ + protected var impl: A = _ + + /** Type alias for '() => Unit'. */ + protected type Run = () => Unit + + /** Type alias for '() => R'. */ + protected type Call[R] = () => R + + /** Type alias for '(E1) => R'. */ + protected type Call1[E1, R] = (E1) => R + + /** Type alias for '(E1, E2) => R'. */ + protected type Call2[E1, E2, R] = (E1, E2) => R + + /** Type alias for '(E1, E2, E3) => R'. */ + protected type Call3[E1, E2, E3, R] = (E1, E2, E3) => R + + /** Type alias for '() => Boolean'. */ + protected type Pred = () => Boolean + + /** Type alias for '(E1) => Boolean'. */ + protected type Pred1[E1] = (E1) => Boolean + + /** Type alias for '(E1, E2) => Boolean'. */ + protected type Pred2[E1, E2] = (E1, E2) => Boolean + + /** Type alias for '(E1, E2, E3) => Boolean'. */ + protected type Pred3[E1, E2, E3] = (E1, E2, E3) => Boolean + + /** Type alias for node filter predicate. */ + protected type NF = IgnitePredicate[ClusterNode] + + /** + * Gets iterator for this projection's nodes. + */ + def iterator = nodes$(null).iterator + + /** + * Utility function to workaround issue that `GridProjection` does not permit `null` predicates. + * + * @param p Optional predicate. + * @return If `p` not `null` return projection for this predicate otherwise return pimped projection. + */ + private def forPredicate(@Nullable p: NF): ClusterGroup = + if (p != null) value.forPredicate(p) else value + + /** + * Gets sequence of all nodes in this projection for given predicate. + * + * @param p Optional node filter predicates. It `null` provided - all nodes will be returned. + * @see `org.gridgain.grid.GridProjection.nodes(...)` + */ + def nodes$(@Nullable p: NF): Seq[ClusterNode] = + toScalaSeq(forPredicate(p).nodes()) + + /** + * Gets sequence of all remote nodes in this projection for given predicate. + * + * @param p Optional node filter predicate. It `null` provided - all remote nodes will be returned. + * @see `org.gridgain.grid.GridProjection.remoteNodes(...)` + */ + def remoteNodes$(@Nullable p: NF = null): Seq[ClusterNode] = + toScalaSeq(forPredicate(p).forRemotes().nodes()) + + /** + * <b>Alias</b> for method `send$(...)`. + * + * @param obj Optional object to send. If `null` - this method is no-op. + * @param p Optional node filter predicates. If none provided or `null` - + * all nodes in the projection will be used. + * @see `org.gridgain.grid.GridProjection.send(...)` + */ + def !<(@Nullable obj: AnyRef, @Nullable p: NF) { + value.ignite().message(forPredicate(p)).send(null, obj) + } + + /** + * <b>Alias</b> for method `send$(...)`. + * + * @param seq Optional sequence of objects to send. If empty or `null` - this + * method is no-op. + * @param p Optional node filter predicate. If none provided or `null` - + * all nodes in the projection will be used. + * @see `org.gridgain.grid.GridProjection.send(...)` + */ + def !<(@Nullable seq: Seq[AnyRef], @Nullable p: NF) { + value.ignite().message(forPredicate(p)).send(null, seq) + } + + /** + * Sends given object to the nodes in this projection. + * + * @param obj Optional object to send. If `null` - this method is no-op. + * @param p Optional node filter predicate. If none provided or `null` - + * all nodes in the projection will be used. + * @see `org.gridgain.grid.GridProjection.send(...)` + */ + def send$(@Nullable obj: AnyRef, @Nullable p: NF) { + value.ignite().message(forPredicate(p)).send(null, obj) + } + + /** + * Sends given object to the nodes in this projection. + * + * @param seq Optional sequence of objects to send. If empty or `null` - this + * method is no-op. + * @param p Optional node filter predicate. If `null` provided - all nodes in the projection will be used. + * @see `org.gridgain.grid.GridProjection.send(...)` + */ + def send$(@Nullable seq: Seq[AnyRef], @Nullable p: NF) { + value.ignite().message(forPredicate(p)).send(null, seq) + } + + /** + * Synchronous closures call on this projection with return value. + * This call will block until all results are received and ready. + * + * @param s Optional sequence of closures to call. If empty or `null` - this method is no-op and returns `null`. + * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. + * @return Sequence of result values from all nodes where given closures were executed or `null` (see above). + */ + def call$[R](@Nullable s: Seq[Call[R]], @Nullable p: NF): Seq[R] = + toScalaSeq(callAsync$(s, p).get) + + /** + * Synchronous closures call on this projection with return value. + * This call will block until all results are received and ready. If this projection + * is empty than `dflt` closure will be executed and its result returned. + * + * @param s Optional sequence of closures to call. If empty or `null` - this method is no-op and returns `null`. + * @param dflt Closure to execute if projection is empty. + * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. + * @return Sequence of result values from all nodes where given closures were executed or `null` (see above). + */ + def callSafe[R](@Nullable s: Seq[Call[R]], dflt: () => Seq[R], @Nullable p: NF): Seq[R] = { + assert(dflt != null) + + try + call$(s, p) + catch { + case _: ClusterGroupEmptyException => dflt() + } + } + + /** + * <b>Alias</b> for the same function `call$`. + * + * @param s Optional sequence of closures to call. If empty or `null` - this method is no-op and returns `null`. + * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. + * @return Sequence of result values from all nodes where given closures were executed or `null` (see above). + * @see `org.gridgain.grid.GridProjection.call(...)` + */ + def #<[R](@Nullable s: Seq[Call[R]], @Nullable p: NF): Seq[R] = + call$(s, p) + + /** + * Synchronous closure call on this projection with return value. + * This call will block until all results are received and ready. + * + * @param s Optional closure to call. If `null` - this method is no-op and returns `null`. + * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. + * @return Sequence of result values from all nodes where given closures were executed or `null` (see above). + * @see `org.gridgain.grid.GridProjection.call(...)` + */ + def call$[R](@Nullable s: Call[R], @Nullable p: NF): Seq[R] = + call$(Seq(s), p) + + /** + * Synchronous closure call on this projection with return value. + * This call will block until all results are received and ready. If this projection + * is empty than `dflt` closure will be executed and its result returned. + * + * @param s Optional closure to call. If `null` - this method is no-op and returns `null`. + * @param dflt Closure to execute if projection is empty. + * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. + * @return Sequence of result values from all nodes where given closures were executed or `null` (see above). + * @see `org.gridgain.grid.GridProjection.call(...)` + */ + def callSafe[R](@Nullable s: Call[R], dflt: () => Seq[R], @Nullable p: NF): Seq[R] = { + assert(dflt != null) + + try + call$(Seq(s), p) + catch { + case _: ClusterGroupEmptyException => dflt() + } + } + + /** + * <b>Alias</b> for the same function `call$`. + * + * @param s Optional closure to call. If `null` - this method is no-op and returns `null`. + * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. + * @return Sequence of result values from all nodes where given closures were executed + * or `null` (see above). + * @see `org.gridgain.grid.GridProjection.call(...)` + */ + def #<[R](@Nullable s: Call[R], @Nullable p: NF): Seq[R] = + call$(s, p) + + /** + * Synchronous closures call on this projection without return value. + * This call will block until all executions are complete. + * + * @param s Optional sequence of closures to call. If empty or `null` - this method is no-op. + * @param p Optional node filter predicate. If `null` provided- all nodes in projection will be used. + * @see `org.gridgain.grid.GridProjection.run(...)` + */ + def run$(@Nullable s: Seq[Run], @Nullable p: NF) { + runAsync$(s, p).get + } + + /** + * Synchronous broadcast closure call on this projection without return value. + * + * @param r Closure to run all nodes in projection. + * @param p Optional node filter predicate. If `null` provided- all nodes in projection will be used. + */ + def bcastRun(@Nullable r: Run, @Nullable p: NF) { + value.ignite().compute(forPredicate(p)).broadcast(toRunnable(r)) + } + + /** + * Synchronous closures call on this projection without return value. + * This call will block until all executions are complete. If this projection + * is empty than `dflt` closure will be executed. + * + * @param s Optional sequence of closures to call. If empty or `null` - this + * method is no-op. + * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. + * @param dflt Closure to execute if projection is empty. + * @see `org.gridgain.grid.GridProjection.run(...)` + */ + def runSafe(@Nullable s: Seq[Run], @Nullable dflt: Run, @Nullable p: NF) { + try { + run$(s, p) + } + catch { + case _: ClusterGroupEmptyException => if (dflt != null) dflt() else () + } + } + + /** + * <b>Alias</b> alias for the same function `run$`. + * + * @param s Optional sequence of closures to call. If empty or `null` - this method is no-op. + * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. + * @see `org.gridgain.grid.GridProjection.run(...)` + */ + def *<(@Nullable s: Seq[Run], @Nullable p: NF) { + run$(s, p) + } + + /** + * Synchronous closure call on this projection without return value. + * This call will block until all executions are complete. + * + * @param s Optional closure to call. If empty or `null` - this method is no-op. + * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. + * @see `org.gridgain.grid.GridProjection.run(...)` + */ + def run$(@Nullable s: Run, @Nullable p: NF) { + run$(Seq(s), p) + } + + /** + * Synchronous closure call on this projection without return value. + * This call will block until all executions are complete. If this projection + * is empty than `dflt` closure will be executed. + * + * @param s Optional closure to call. If empty or `null` - this method is no-op. + * @param dflt Closure to execute if projection is empty. + * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. + * @see `org.gridgain.grid.GridProjection.run(...)` + */ + def runSafe(@Nullable s: Run, @Nullable dflt: Run, @Nullable p: NF) { + try { + run$(s, p) + } + catch { + case _: ClusterGroupEmptyException => if (dflt != null) dflt() else () + } + } + + /** + * <b>Alias</b> for the same function `run$`. + * + * @param s Optional closure to call. If empty or `null` - this method is no-op. + * @param p Optional node filter predicate. If none provided or `null` - all nodes in projection will be used. + * @see `org.gridgain.grid.GridProjection.run(...)` + */ + def *<(@Nullable s: Run, @Nullable p: NF) { + run$(s, p) + } + + /** + * Asynchronous closures call on this projection with return value. This call will + * return immediately with the future that can be used to wait asynchronously for the results. + * + * @param s Optional sequence of closures to call. If empty or `null` - this method + * is no-op and finished future over `null` is returned. + * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. + * @return Future of Java collection containing result values from all nodes where given + * closures were executed or `null` (see above). + * @see `org.gridgain.grid.GridProjection.call(...)` + */ + def callAsync$[R](@Nullable s: Seq[Call[R]], @Nullable p: NF): + IgniteFuture[java.util.Collection[R]] = { + val comp = value.ignite().compute(forPredicate(p)).enableAsync() + + comp.call[R](toJavaCollection(s, (f: Call[R]) => toCallable(f))) + + comp.future() + } + + /** + * <b>Alias</b> for the same function `callAsync$`. + * + * @param s Optional sequence of closures to call. If empty or `null` - this method + * is no-op and finished future over `null` is returned. + * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. + * @return Future of Java collection containing result values from all nodes where given + * closures were executed or `null` (see above). + * @see `org.gridgain.grid.GridProjection.call(...)` + */ + def #?[R](@Nullable s: Seq[Call[R]], @Nullable p: NF): IgniteFuture[java.util.Collection[R]] = { + callAsync$(s, p) + } + + /** + * Asynchronous closure call on this projection with return value. This call will + * return immediately with the future that can be used to wait asynchronously for the results. + * + * @param s Optional closure to call. If `null` - this method is no-op and finished + * future over `null` is returned. + * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. + * @return Future of Java collection containing result values from all nodes where given + * closures were executed or `null` (see above). + * @see `org.gridgain.grid.GridProjection.call(...)` + */ + def callAsync$[R](@Nullable s: Call[R], @Nullable p: NF): IgniteFuture[java.util.Collection[R]] = { + callAsync$(Seq(s), p) + } + + /** + * <b>Alias</b> for the same function `callAsync$`. + * + * @param s Optional closure to call. If `null` - this method is no-op and finished + * future over `null` is returned. + * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. + * @return Future of Java collection containing result values from all nodes where given + * closures were executed or `null` (see above). + * @see `org.gridgain.grid.GridProjection.call(...)` + */ + def #?[R](@Nullable s: Call[R], @Nullable p: NF): IgniteFuture[java.util.Collection[R]] = { + callAsync$(s, p) + } + + /** + * Asynchronous closures call on this projection without return value. This call will + * return immediately with the future that can be used to wait asynchronously for the results. + * + * @param s Optional sequence of absolute closures to call. If empty or `null` - this method + * is no-op and finished future over `null` will be returned. + * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. + * @see `org.gridgain.grid.GridProjection.call(...)` + */ + def runAsync$(@Nullable s: Seq[Run], @Nullable p: NF): IgniteFuture[_] = { + val comp = value.ignite().compute(forPredicate(p)).enableAsync() + + comp.run(toJavaCollection(s, (f: Run) => toRunnable(f))) + + comp.future() + } + + /** + * <b>Alias</b> for the same function `runAsync$`. + * + * @param s Optional sequence of absolute closures to call. If empty or `null` - this method + * is no-op and finished future over `null` will be returned. + * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. + * @see `org.gridgain.grid.GridProjection.call(...)` + */ + def *?(@Nullable s: Seq[Run], @Nullable p: NF): IgniteFuture[_] = { + runAsync$(s, p) + } + + /** + * Asynchronous closure call on this projection without return value. This call will + * return immediately with the future that can be used to wait asynchronously for the results. + * + * @param s Optional absolute closure to call. If `null` - this method + * is no-op and finished future over `null` will be returned. + * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. + * @see `org.gridgain.grid.GridProjection.run(...)` + */ + def runAsync$(@Nullable s: Run, @Nullable p: NF): IgniteFuture[_] = { + runAsync$(Seq(s), p) + } + + /** + * <b>Alias</b> for the same function `runAsync$`. + * + * @param s Optional absolute closure to call. If `null` - this method + * is no-op and finished future over `null` will be returned. + * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. + * @see `org.gridgain.grid.GridProjection.run(...)` + */ + def *?(@Nullable s: Run, @Nullable p: NF): IgniteFuture[_] = { + runAsync$(s, p) + } + + /** + * Asynchronous closures execution on this projection with reduction. This call will + * return immediately with the future that can be used to wait asynchronously for the results. + * + * @param s Optional sequence of closures to call. If empty or `null` - this method + * is no-op and will return finished future over `null`. + * @param r Optional reduction function. If `null` - this method + * is no-op and will return finished future over `null`. + * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. + * @return Future over the reduced result or `null` (see above). + * @see `org.gridgain.grid.GridProjection.reduce(...)` + */ + def reduceAsync$[R1, R2](s: Seq[Call[R1]], r: Seq[R1] => R2, @Nullable p: NF): IgniteFuture[R2] = { + assert(s != null && r != null) + + val comp = value.ignite().compute(forPredicate(p)).enableAsync() + + comp.call(toJavaCollection(s, (f: Call[R1]) => toCallable(f)), r) + + comp.future() + } + + /** + * <b>Alias</b> for the same function `reduceAsync$`. + * + * @param s Optional sequence of closures to call. If empty or `null` - this method + * is no-op and will return finished future over `null`. + * @param r Optional reduction function. If `null` - this method + * is no-op and will return finished future over `null`. + * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. + * @return Future over the reduced result or `null` (see above). + * @see `org.gridgain.grid.GridProjection.reduce(...)` + */ + def @?[R1, R2](s: Seq[Call[R1]], r: Seq[R1] => R2, @Nullable p: NF): IgniteFuture[R2] = { + reduceAsync$(s, r, p) + } + + /** + * Synchronous closures execution on this projection with reduction. + * This call will block until all results are reduced. + * + * @param s Optional sequence of closures to call. If empty or `null` - this method + * is no-op and will return `null`. + * @param r Optional reduction function. If `null` - this method + * is no-op and will return `null`. + * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. + * @return Reduced result or `null` (see above). + * @see `org.gridgain.grid.GridProjection.reduce(...)` + */ + def reduce$[R1, R2](@Nullable s: Seq[Call[R1]], @Nullable r: Seq[R1] => R2, @Nullable p: NF): R2 = + reduceAsync$(s, r, p).get + + /** + * Synchronous closures execution on this projection with reduction. + * This call will block until all results are reduced. If this projection + * is empty than `dflt` closure will be executed and its result returned. + * + * @param s Optional sequence of closures to call. If empty or `null` - this method + * is no-op and will return `null`. + * @param r Optional reduction function. If `null` - this method + * is no-op and will return `null`. + * @param dflt Closure to execute if projection is empty. + * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. + * @return Reduced result or `null` (see above). + * @see `org.gridgain.grid.GridProjection.reduce(...)` + */ + def reduceSafe[R1, R2](@Nullable s: Seq[Call[R1]], @Nullable r: Seq[R1] => R2, + dflt: () => R2, @Nullable p: NF): R2 = { + assert(dflt != null) + + try + reduceAsync$(s, r, p).get + catch { + case _: ClusterGroupEmptyException => dflt() + } + } + + /** + * <b>Alias</b> for the same function `reduce$`. + * + * @param s Optional sequence of closures to call. If empty or `null` - this method is no-op and will return `null`. + * @param r Optional reduction function. If `null` - this method is no-op and will return `null`. + * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. + * @return Reduced result or `null` (see above). + * @see `org.gridgain.grid.GridProjection.reduce(...)` + */ + def @<[R1, R2](@Nullable s: Seq[Call[R1]], @Nullable r: Seq[R1] => R2, @Nullable p: NF): R2 = + reduceAsync$(s, r, p).get + + /** + * Executes given closure on the nodes where data for provided affinity key is located. This + * is known as affinity co-location between compute grid (a closure) and in-memory data grid + * (value with affinity key). Note that implementation of multiple executions of the same closure will + * be wrapped as a single task that splits into multiple `job`s that will be mapped to nodes + * with provided affinity keys. + * + * This method will block until its execution is complete or an exception is thrown. + * All default SPI implementations configured for this grid instance will be + * used (i.e. failover, load balancing, collision resolution, etc.). + * Note that if you need greater control on any aspects of Java code execution on the grid + * you should implement `GridComputeTask` which will provide you with full control over the execution. + * + * Notice that `Runnable` and `Callable` implementations must support serialization as required + * by the configured marshaller. For example, JDK marshaller will require that implementations would + * be serializable. Other marshallers, e.g. JBoss marshaller, may not have this limitation. Please consult + * with specific marshaller implementation for the details. Note that all closures and predicates in + * `org.gridgain.grid.lang` package are serializable and can be freely used in the distributed + * context with all marshallers currently shipped with GridGain. + * + * @param cacheName Name of the cache to use for affinity co-location. + * @param affKey Affinity key. + * @param r Closure to affinity co-located on the node with given affinity key and execute. + * If `null` - this method is no-op. + * @param p Optional filtering predicate. If `null` provided - all nodes in this projection will be used for topology. + * @throws IgniteCheckedException Thrown in case of any error. + * @throws ClusterGroupEmptyException Thrown in case when this projection is empty. + * Note that in case of dynamic projection this method will take a snapshot of all the + * nodes at the time of this call, apply all filtering predicates, if any, and if the + * resulting collection of nodes is empty - the exception will be thrown. + * @throws IgniteInterruptedException Subclass of `IgniteCheckedException` thrown if the wait was interrupted. + * @throws IgniteFutureCancelledException Subclass of `IgniteCheckedException` thrown if computation was cancelled. + */ + def affinityRun$(cacheName: String, @Nullable affKey: Any, @Nullable r: Run, @Nullable p: NF) { + affinityRunAsync$(cacheName, affKey, r, p).get + } + + /** + * Executes given closure on the nodes where data for provided affinity key is located. This + * is known as affinity co-location between compute grid (a closure) and in-memory data grid + * (value with affinity key). Note that implementation of multiple executions of the same closure will + * be wrapped as a single task that splits into multiple `job`s that will be mapped to nodes + * with provided affinity keys. + * + * Unlike its sibling method `affinityRun(String, Collection, Runnable, GridPredicate[])` this method does + * not block and returns immediately with future. All default SPI implementations + * configured for this grid instance will be used (i.e. failover, load balancing, collision resolution, etc.). + * Note that if you need greater control on any aspects of Java code execution on the grid + * you should implement `GridComputeTask` which will provide you with full control over the execution. + * + * Note that class `GridAbsClosure` implements `Runnable` and class `GridOutClosure` + * implements `Callable` interface. Note also that class `GridFunc` and typedefs provide rich + * APIs and functionality for closures and predicates based processing in GridGain. While Java interfaces + * `Runnable` and `Callable` allow for lowest common denominator for APIs - it is advisable + * to use richer Functional Programming support provided by GridGain available in `org.gridgain.grid.lang` + * package. + * + * Notice that `Runnable` and `Callable` implementations must support serialization as required + * by the configured marshaller. For example, JDK marshaller will require that implementations would + * be serializable. Other marshallers, e.g. JBoss marshaller, may not have this limitation. Please consult + * with specific marshaller implementation for the details. Note that all closures and predicates in + * `org.gridgain.grid.lang` package are serializable and can be freely used in the distributed + * context with all marshallers currently shipped with GridGain. + * + * @param cacheName Name of the cache to use for affinity co-location. + * @param affKey Affinity key. + * @param r Closure to affinity co-located on the node with given affinity key and execute. + * If `null` - this method is no-op. + * @param p Optional filtering predicate. If `null` provided - all nodes in this projection will be used for topology. + * @throws IgniteCheckedException Thrown in case of any error. + * @throws ClusterGroupEmptyException Thrown in case when this projection is empty. + * Note that in case of dynamic projection this method will take a snapshot of all the + * nodes at the time of this call, apply all filtering predicates, if any, and if the + * resulting collection of nodes is empty - the exception will be thrown. + * @return Non-cancellable future of this execution. + * @throws IgniteInterruptedException Subclass of `IgniteCheckedException` thrown if the wait was interrupted. + * @throws IgniteFutureCancelledException Subclass of `IgniteCheckedException` thrown if computation was cancelled. + */ + def affinityRunAsync$(cacheName: String, @Nullable affKey: Any, @Nullable r: Run, + @Nullable p: NF): IgniteFuture[_] = { + val comp = value.ignite().compute(forPredicate(p)).enableAsync() + + comp.affinityRun(cacheName, affKey, toRunnable(r)) + + comp.future() + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6f9c9e6/modules/scalar/src/main/scala/org/apache/ignite/scalar/pimps/ScalarTaskThreadContext.scala ---------------------------------------------------------------------- diff --git a/modules/scalar/src/main/scala/org/apache/ignite/scalar/pimps/ScalarTaskThreadContext.scala b/modules/scalar/src/main/scala/org/apache/ignite/scalar/pimps/ScalarTaskThreadContext.scala new file mode 100644 index 0000000..47d2078 --- /dev/null +++ b/modules/scalar/src/main/scala/org/apache/ignite/scalar/pimps/ScalarTaskThreadContext.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.scalar.pimps + +import org.apache.ignite.cluster.ClusterGroup +import org.apache.ignite._ +import org.gridgain.grid._ +import org.gridgain.scalar._ +import org.jetbrains.annotations._ + +/** + * This trait provide mixin for properly typed version of `GridProjection#with...()` methods. + * + * Method on `GridProjection` always returns an instance of type `GridProjection` even when + * called on a sub-class. This trait's methods return the instance of the same type + * it was called on. + */ +trait ScalarTaskThreadContext[T <: ClusterGroup] extends ScalarConversions { this: PimpedType[T] => + /** + * Properly typed version of `GridCompute#withName(...)` method. + * + * @param taskName Name of the task. + */ + def withName$(@Nullable taskName: String): T = + value.ignite().compute(value).withName(taskName).asInstanceOf[T] + + /** + * Properly typed version of `GridCompute#withNoFailover()` method. + */ + def withNoFailover$(): T = + value.ignite().compute(value).withNoFailover().asInstanceOf[T] +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6f9c9e6/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala ---------------------------------------------------------------------- diff --git a/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala b/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala new file mode 100644 index 0000000..83f04ee --- /dev/null +++ b/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.scalar + +import java.net.URL +import org.apache.ignite.cache.GridCache +import org.apache.ignite.cache.query.{GridCacheQuerySqlField, GridCacheQueryTextField} +import org.apache.ignite.internal.GridProductImpl +import org.apache.ignite.{IgniteState, IgniteDataLoader, Ignition, Ignite} +import org.apache.ignite.cluster.ClusterNode +import org.apache.ignite.configuration.IgniteConfiguration +import org.jetbrains.annotations.Nullable +import java.util.UUID +import annotation.meta.field + +/** + * {{{ + * ________ ______ ______ _______ + * __ ___/_____________ ____ /______ _________ __/__ \ __ __ \ + * _____ \ _ ___/_ __ `/__ / _ __ `/__ ___/ ____/ / _ / / / + * ____/ / / /__ / /_/ / _ / / /_/ / _ / _ __/___/ /_/ / + * /____/ \___/ \__,_/ /_/ \__,_/ /_/ /____/_(_)____/ + * + * }}} + * + * ==Overview== + * `scalar` is the main object that encapsulates Scalar DSL. It includes global functions + * on "scalar" keyword, helper converters as well as necessary implicit conversions. `scalar` also + * mimics many methods in `GridGain` class from Java side. + * + * The idea behind Scalar DSL - '''zero additional logic and only conversions''' implemented + * using Scala "Pimp" pattern. Note that most of the Scalar DSL development happened on Java + * side of GridGain 3.0 product line - Java APIs had to be adjusted quite significantly to + * support natural adaptation of functional APIs. That basically means that all functional + * logic must be available on Java side and Scalar only provides conversions from Scala + * language constructs to Java constructs. Note that currently GridGain supports Scala 2.8 + * and up only. + * + * This design approach ensures that Java side does not starve and usage paradigm + * is mostly the same between Java and Scala - yet with full power of Scala behind. + * In other words, Scalar only adds Scala specifics, but not greatly altering semantics + * of how GridGain APIs work. Most of the time the code in Scalar can be written in + * Java in almost the same number of lines. + * + * ==Suffix '$' In Names== + * Symbol `$` is used in names when they conflict with the names in the base Java class + * that Scala pimp is shadowing or with Java package name that your Scala code is importing. + * Instead of giving two different names to the same function we've decided to simply mark + * Scala's side method with `$` suffix. + * + * ==Importing== + * Scalar needs to be imported in a proper way so that necessary objects and implicit + * conversions got available in the scope: + * <pre name="code" class="scala"> + * import org.gridgain.scalar._ + * import scalar._ + * </pre> + * This way you import object `scalar` as well as all methods declared or inherited in that + * object as well. + * + * ==Examples== + * Here are few short examples of how Scalar can be used to program routine distributed + * task. All examples below use default GridGain configuration and default grid. All these + * examples take an implicit advantage of auto-discovery and failover, load balancing and + * collision resolution, zero deployment and many other underlying technologies in the + * GridGain - while remaining absolutely distilled to the core domain logic. + * + * This code snippet prints out full topology: + * <pre name="code" class="scala"> + * scalar { + * grid$ foreach (n => println("Node: " + n.id8)) + * } + * </pre> + * The obligatory example - cloud enabled `Hello World!`. It splits the phrase + * into multiple words and prints each word on a separate grid node: + * <pre name="code" class="scala"> + * scalar { + * grid$ *< (SPREAD, (for (w <- "Hello World!".split(" ")) yield () => println(w))) + * } + * </pre> + * This example broadcasts message to all nodes: + * <pre name="code" class="scala"> + * scalar { + * grid$ *< (BROADCAST, () => println("Broadcasting!!!")) + * } + * </pre> + * This example "greets" remote nodes only (note usage of Java-side closure): + * <pre name="code" class="scala"> + * scalar { + * val me = grid$.localNode.id + * grid$.remoteProjection() *< (BROADCAST, F.println("Greetings from: " + me)) + * } + * </pre> + * + * Next example creates a function that calculates lengths of the string + * using MapReduce type of processing by splitting the input string into + * multiple substrings, calculating each substring length on the remote + * node and aggregating results for the final length of the original string: + * <pre name="code" class="scala"> + * def count(msg: String) = + * grid$ @< (SPREAD, for (w <- msg.split(" ")) yield () => w.length, (s: Seq[Int]) => s.sum) + * </pre> + * This example shows a simple example of how Scalar can be used to work with in-memory data grid: + * <pre name="code" class="scala"> + * scalar { + * val t = cache$[Symbol, Double]("partitioned") + * t += ('symbol -> 2.0) + * t -= ('symbol) + * } + * </pre> + */ +object scalar extends ScalarConversions { + /** Visor copyright blurb. */ + private val COPYRIGHT = GridProductImpl.COPYRIGHT + + /** Type alias for `GridCacheQuerySqlField`. */ + type ScalarCacheQuerySqlField = GridCacheQuerySqlField @field + + /** Type alias for `GridCacheQueryTextField`. */ + type ScalarCacheQueryTextField = GridCacheQueryTextField @field + + /** + * Prints Scalar ASCII-logo. + */ + def logo() { + val NL = System getProperty "line.separator" + + val s = + " ________ ______ " + NL + + " __ ___/_____________ ____ /______ _________ " + NL + + " _____ \\ _ ___/_ __ `/__ / _ __ `/__ ___/ " + NL + + " ____/ / / /__ / /_/ / _ / / /_/ / _ / " + NL + + " /____/ \\___/ \\__,_/ /_/ \\__,_/ /_/ " + NL + NL + + " GRIDGAIN SCALAR" + + " " + COPYRIGHT + NL + + println(s) + } + + /** + * Note that grid instance will be stopped with cancel flat set to `true`. + * + * @param g Grid instance. + * @param body Closure with grid instance as body's parameter. + */ + private def init[T](g: Ignite, body: Ignite => T): T = { + assert(g != null, body != null) + + try { + body(g) + } + finally { + Ignition.stop(g.name, true) + } + } + + /** + * Note that grid instance will be stopped with cancel flat set to `true`. + * + * @param g Grid instance. + * @param body Passed by name body. + */ + private def init0[T](g: Ignite, body: => T): T = { + assert(g != null) + + try { + body + } + finally { + Ignition.stop(g.name, true) + } + } + + /** + * Executes given closure within automatically managed default grid instance. + * If default grid is already started the passed in closure will simply + * execute. + * + * @param body Closure to execute within automatically managed default grid instance. + */ + def apply(body: Ignite => Unit) { + if (!isStarted) init(Ignition.start, body) else body(grid$) + } + + /** + * Executes given closure within automatically managed default grid instance. + * If default grid is already started the passed in closure will simply + * execute. + * + * @param body Closure to execute within automatically managed default grid instance. + */ + def apply[T](body: Ignite => T): T = + if (!isStarted) init(Ignition.start, body) else body(grid$) + + /** + * Executes given closure within automatically managed default grid instance. + * If default grid is already started the passed in closure will simply + * execute. + * + * @param body Closure to execute within automatically managed default grid instance. + */ + def apply[T](body: => T): T = + if (!isStarted) init0(Ignition.start, body) else body + + /** + * Executes given closure within automatically managed default grid instance. + * If default grid is already started the passed in closure will simply + * execute. + * + * @param body Closure to execute within automatically managed grid instance. + */ + def apply(body: => Unit) { + if (!isStarted) init0(Ignition.start, body) else body + } + + /** + * Executes given closure within automatically managed grid instance. + * + * @param springCfgPath Spring XML configuration file path or URL. + * @param body Closure to execute within automatically managed grid instance. + */ + def apply(springCfgPath: String)(body: => Unit) { + init0(Ignition.start(springCfgPath), body) + } + + /** + * Executes given closure within automatically managed grid instance. + * + * @param cfg Grid configuration instance. + * @param body Closure to execute within automatically managed grid instance. + */ + def apply(cfg: IgniteConfiguration)(body: => Unit) { + init0(Ignition.start(cfg), body) + } + + /** + * Executes given closure within automatically managed grid instance. + * + * @param springCfgUrl Spring XML configuration file URL. + * @param body Closure to execute within automatically managed grid instance. + */ + def apply(springCfgUrl: URL)(body: => Unit) { + init0(Ignition.start(springCfgUrl), body) + } + + /** + * Gets default cache. + * + * Note that you always need to provide types when calling + * this function - otherwise Scala will create `GridCache[Nothing, Nothing]` + * typed instance that cannot be used. + */ + @inline def cache$[K, V]: Option[GridCache[K, V]] = + Option(Ignition.ignite.cache[K, V](null)) + + /** + * Gets named cache from default grid. + * + * @param cacheName Name of the cache to get. + */ + @inline def cache$[K, V](@Nullable cacheName: String): Option[GridCache[K, V]] = + Option(Ignition.ignite.cache(cacheName)) + + /** + * Gets named cache from specified grid. + * + * @param gridName Name of the grid. + * @param cacheName Name of the cache to get. + */ + @inline def cache$[K, V](@Nullable gridName: String, @Nullable cacheName: String): Option[GridCache[K, V]] = + grid$(gridName) match { + case Some(g) => Option(g.cache(cacheName)) + case None => None + } + + /** + * Gets a new instance of data loader associated with given cache name. + * + * @param cacheName Cache name (`null` for default cache). + * @param bufSize Per node buffer size. + * @return New instance of data loader. + */ + @inline def dataLoader$[K, V]( + @Nullable cacheName: String, + bufSize: Int): IgniteDataLoader[K, V] = { + val dl = grid$.dataLoader[K, V](cacheName) + + dl.perNodeBufferSize(bufSize) + + dl + } + + /** + * Gets default grid instance. + */ + @inline def grid$: Ignite = Ignition.ignite + + /** + * Gets node ID as ID8 string. + */ + def nid8$(node: ClusterNode) = node.id().toString.take(8).toUpperCase + + /** + * Gets named grid. + * + * @param name Grid name. + */ + @inline def grid$(@Nullable name: String): Option[Ignite] = + try { + Option(Ignition.ignite(name)) + } + catch { + case _: IllegalStateException => None + } + + /** + * Gets grid for given node ID. + * + * @param locNodeId Local node ID for which to get grid instance option. + */ + @inline def grid$(locNodeId: UUID): Option[Ignite] = { + assert(locNodeId != null) + + try { + Option(Ignition.ignite(locNodeId)) + } + catch { + case _: IllegalStateException => None + } + } + + /** + * Tests if specified grid is started. + * + * @param name Gird name. + */ + def isStarted(@Nullable name: String) = + Ignition.state(name) == IgniteState.STARTED + + /** + * Tests if specified grid is stopped. + * + * @param name Gird name. + */ + def isStopped(@Nullable name: String) = + Ignition.state(name) == IgniteState.STOPPED + + /** + * Tests if default grid is started. + */ + def isStarted = + Ignition.state == IgniteState.STARTED + + /** + * Tests if default grid is stopped. + */ + def isStopped = + Ignition.state == IgniteState.STOPPED + + /** + * Stops given grid and specified cancel flag. + * If specified grid is already stopped - it's no-op. + * + * @param name Grid name to cancel. + * @param cancel Whether or not to cancel all currently running jobs. + */ + def stop(@Nullable name: String, cancel: Boolean) = + if (isStarted(name)) + Ignition.stop(name, cancel) + + /** + * Stops default grid with given cancel flag. + * If default grid is already stopped - it's no-op. + * + * @param cancel Whether or not to cancel all currently running jobs. + */ + def stop(cancel: Boolean) = + if (isStarted) Ignition.stop(cancel) + + /** + * Stops default grid with cancel flag set to `true`. + * If default grid is already stopped - it's no-op. + */ + def stop() = + if (isStarted) Ignition.stop(true) + + /** + * Sets daemon flag to grid factory. Note that this method should be called + * before grid instance starts. + * + * @param f Daemon flag to set. + */ + def daemon(f: Boolean) { + Ignition.setDaemon(f) + } + + /** + * Gets daemon flag set in the grid factory. + */ + def isDaemon = + Ignition.isDaemon + + /** + * Starts default grid. It's no-op if default grid is already started. + * + * @return Started grid. + */ + def start(): Ignite = { + if (!isStarted) Ignition.start else grid$ + } + + /** + * Starts grid with given parameter(s). + * + * @param springCfgPath Spring XML configuration file path or URL. + * @return Started grid. If Spring configuration contains multiple grid instances, + * then the 1st found instance is returned. + */ + def start(@Nullable springCfgPath: String): Ignite = { + Ignition.start(springCfgPath) + } + + /** + * Starts grid with given parameter(s). + * + * @param cfg Grid configuration. This cannot be `null`. + * @return Started grid. + */ + def start(cfg: IgniteConfiguration): Ignite = { + Ignition.start(cfg) + } + + /** + * Starts grid with given parameter(s). + * + * @param springCfgUrl Spring XML configuration file URL. + * @return Started grid. + */ + def start(springCfgUrl: URL): Ignite = { + Ignition.start(springCfgUrl) + } +}