http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6f9c9e6/modules/scalar/src/main/scala/org/gridgain/scalar/pimps/ScalarGridPimp.scala ---------------------------------------------------------------------- diff --git a/modules/scalar/src/main/scala/org/gridgain/scalar/pimps/ScalarGridPimp.scala b/modules/scalar/src/main/scala/org/gridgain/scalar/pimps/ScalarGridPimp.scala deleted file mode 100644 index e75d37c..0000000 --- a/modules/scalar/src/main/scala/org/gridgain/scalar/pimps/ScalarGridPimp.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.scalar.pimps - -import org.apache.ignite.scheduler.SchedulerFuture -import org.apache.ignite.{IgniteCluster, Ignite} -import org.apache.ignite._ -import org.gridgain.grid._ -import org.jetbrains.annotations.Nullable - -/** - * Companion object. - */ -object ScalarGridPimp { - /** - * Creates new Scalar grid pimp with given Java-side implementation. - * - * @param impl Java-side implementation. - */ - def apply(impl: Ignite) = { - if (impl == null) - throw new NullPointerException("impl") - - val pimp = new ScalarGridPimp - - pimp.impl = impl.cluster() - - pimp - } -} - -/** - * ==Overview== - * Defines Scalar "pimp" for `Grid` on Java side. - * - * Essentially this class extends Java `GridProjection` interface with Scala specific - * API adapters using primarily implicit conversions defined in `ScalarConversions` object. What - * it means is that you can use functions defined in this class on object - * of Java `GridProjection` type. Scala will automatically (implicitly) convert it into - * Scalar's pimp and replace the original call with a call on that pimp. - * - * Note that Scalar provide extensive library of implicit conversion between Java and - * Scala GridGain counterparts in `ScalarConversions` object - * - * ==Suffix '$' In Names== - * Symbol `$` is used in names when they conflict with the names in the base Java class - * that Scala pimp is shadowing or with Java package name that your Scala code is importing. - * Instead of giving two different names to the same function we've decided to simply mark - * Scala's side method with `$` suffix. - */ -class ScalarGridPimp extends ScalarProjectionPimp[IgniteCluster] with ScalarTaskThreadContext[IgniteCluster] { - /** - * Schedules closure for execution using local cron-based scheduling. - * - * @param s Closure to schedule to run as a background cron-based job. - * @param ptrn Scheduling pattern in UNIX cron format with optional prefix `{n1, n2}` - * where `n1` is delay of scheduling in seconds and `n2` is the number of execution. Both - * parameters are optional. - */ - def scheduleLocalCall[R](@Nullable s: Call[R], ptrn: String): SchedulerFuture[R] = { - assert(ptrn != null) - - value.ignite().scheduler().scheduleLocal(toCallable(s), ptrn) - } - - /** - * Schedules closure for execution using local cron-based scheduling. - * - * @param s Closure to schedule to run as a background cron-based job. - * @param ptrn Scheduling pattern in UNIX cron format with optional prefix `{n1, n2}` - * where `n1` is delay of scheduling in seconds and `n2` is the number of execution. Both - * parameters are optional. - */ - def scheduleLocalRun(@Nullable s: Run, ptrn: String): SchedulerFuture[_] = { - assert(ptrn != null) - - value.ignite().scheduler().scheduleLocal(toRunnable(s), ptrn) - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6f9c9e6/modules/scalar/src/main/scala/org/gridgain/scalar/pimps/ScalarProjectionPimp.scala ---------------------------------------------------------------------- diff --git a/modules/scalar/src/main/scala/org/gridgain/scalar/pimps/ScalarProjectionPimp.scala b/modules/scalar/src/main/scala/org/gridgain/scalar/pimps/ScalarProjectionPimp.scala deleted file mode 100644 index 21db933..0000000 --- a/modules/scalar/src/main/scala/org/gridgain/scalar/pimps/ScalarProjectionPimp.scala +++ /dev/null @@ -1,659 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.scalar.pimps - -import org.apache.ignite.cluster.{ClusterGroupEmptyException, ClusterGroup, ClusterNode} -import org.apache.ignite.lang.{IgniteFutureCancelledException, IgniteFuture, IgnitePredicate} -import org.apache.ignite._ -import org.gridgain.grid._ -import org.jetbrains.annotations._ - -/** - * Companion object. - */ -object ScalarProjectionPimp { - /** - * Creates new Scalar projection pimp with given Java-side implementation. - * - * @param impl Java-side implementation. - */ - def apply(impl: ClusterGroup) = { - if (impl == null) - throw new NullPointerException("impl") - - val pimp = new ScalarProjectionPimp[ClusterGroup] - - pimp.impl = impl - - pimp - } -} - -/** - * ==Overview== - * Defines Scalar "pimp" for `GridProjection` on Java side. - * - * Essentially this class extends Java `GridProjection` interface with Scala specific - * API adapters using primarily implicit conversions defined in `ScalarConversions` object. What - * it means is that you can use functions defined in this class on object - * of Java `GridProjection` type. Scala will automatically (implicitly) convert it into - * Scalar's pimp and replace the original call with a call on that pimp. - * - * Note that Scalar provide extensive library of implicit conversion between Java and - * Scala GridGain counterparts in `ScalarConversions` object - * - * ==Suffix '$' In Names== - * Symbol `$` is used in names when they conflict with the names in the base Java class - * that Scala pimp is shadowing or with Java package name that your Scala code is importing. - * Instead of giving two different names to the same function we've decided to simply mark - * Scala's side method with `$` suffix. - */ -class ScalarProjectionPimp[A <: ClusterGroup] extends PimpedType[A] with Iterable[ClusterNode] - with ScalarTaskThreadContext[A] { - /** */ - lazy val value: A = impl - - /** */ - protected var impl: A = _ - - /** Type alias for '() => Unit'. */ - protected type Run = () => Unit - - /** Type alias for '() => R'. */ - protected type Call[R] = () => R - - /** Type alias for '(E1) => R'. */ - protected type Call1[E1, R] = (E1) => R - - /** Type alias for '(E1, E2) => R'. */ - protected type Call2[E1, E2, R] = (E1, E2) => R - - /** Type alias for '(E1, E2, E3) => R'. */ - protected type Call3[E1, E2, E3, R] = (E1, E2, E3) => R - - /** Type alias for '() => Boolean'. */ - protected type Pred = () => Boolean - - /** Type alias for '(E1) => Boolean'. */ - protected type Pred1[E1] = (E1) => Boolean - - /** Type alias for '(E1, E2) => Boolean'. */ - protected type Pred2[E1, E2] = (E1, E2) => Boolean - - /** Type alias for '(E1, E2, E3) => Boolean'. */ - protected type Pred3[E1, E2, E3] = (E1, E2, E3) => Boolean - - /** Type alias for node filter predicate. */ - protected type NF = IgnitePredicate[ClusterNode] - - /** - * Gets iterator for this projection's nodes. - */ - def iterator = nodes$(null).iterator - - /** - * Utility function to workaround issue that `GridProjection` does not permit `null` predicates. - * - * @param p Optional predicate. - * @return If `p` not `null` return projection for this predicate otherwise return pimped projection. - */ - private def forPredicate(@Nullable p: NF): ClusterGroup = - if (p != null) value.forPredicate(p) else value - - /** - * Gets sequence of all nodes in this projection for given predicate. - * - * @param p Optional node filter predicates. It `null` provided - all nodes will be returned. - * @see `org.gridgain.grid.GridProjection.nodes(...)` - */ - def nodes$(@Nullable p: NF): Seq[ClusterNode] = - toScalaSeq(forPredicate(p).nodes()) - - /** - * Gets sequence of all remote nodes in this projection for given predicate. - * - * @param p Optional node filter predicate. It `null` provided - all remote nodes will be returned. - * @see `org.gridgain.grid.GridProjection.remoteNodes(...)` - */ - def remoteNodes$(@Nullable p: NF = null): Seq[ClusterNode] = - toScalaSeq(forPredicate(p).forRemotes().nodes()) - - /** - * <b>Alias</b> for method `send$(...)`. - * - * @param obj Optional object to send. If `null` - this method is no-op. - * @param p Optional node filter predicates. If none provided or `null` - - * all nodes in the projection will be used. - * @see `org.gridgain.grid.GridProjection.send(...)` - */ - def !<(@Nullable obj: AnyRef, @Nullable p: NF) { - value.ignite().message(forPredicate(p)).send(null, obj) - } - - /** - * <b>Alias</b> for method `send$(...)`. - * - * @param seq Optional sequence of objects to send. If empty or `null` - this - * method is no-op. - * @param p Optional node filter predicate. If none provided or `null` - - * all nodes in the projection will be used. - * @see `org.gridgain.grid.GridProjection.send(...)` - */ - def !<(@Nullable seq: Seq[AnyRef], @Nullable p: NF) { - value.ignite().message(forPredicate(p)).send(null, seq) - } - - /** - * Sends given object to the nodes in this projection. - * - * @param obj Optional object to send. If `null` - this method is no-op. - * @param p Optional node filter predicate. If none provided or `null` - - * all nodes in the projection will be used. - * @see `org.gridgain.grid.GridProjection.send(...)` - */ - def send$(@Nullable obj: AnyRef, @Nullable p: NF) { - value.ignite().message(forPredicate(p)).send(null, obj) - } - - /** - * Sends given object to the nodes in this projection. - * - * @param seq Optional sequence of objects to send. If empty or `null` - this - * method is no-op. - * @param p Optional node filter predicate. If `null` provided - all nodes in the projection will be used. - * @see `org.gridgain.grid.GridProjection.send(...)` - */ - def send$(@Nullable seq: Seq[AnyRef], @Nullable p: NF) { - value.ignite().message(forPredicate(p)).send(null, seq) - } - - /** - * Synchronous closures call on this projection with return value. - * This call will block until all results are received and ready. - * - * @param s Optional sequence of closures to call. If empty or `null` - this method is no-op and returns `null`. - * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. - * @return Sequence of result values from all nodes where given closures were executed or `null` (see above). - */ - def call$[R](@Nullable s: Seq[Call[R]], @Nullable p: NF): Seq[R] = - toScalaSeq(callAsync$(s, p).get) - - /** - * Synchronous closures call on this projection with return value. - * This call will block until all results are received and ready. If this projection - * is empty than `dflt` closure will be executed and its result returned. - * - * @param s Optional sequence of closures to call. If empty or `null` - this method is no-op and returns `null`. - * @param dflt Closure to execute if projection is empty. - * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. - * @return Sequence of result values from all nodes where given closures were executed or `null` (see above). - */ - def callSafe[R](@Nullable s: Seq[Call[R]], dflt: () => Seq[R], @Nullable p: NF): Seq[R] = { - assert(dflt != null) - - try - call$(s, p) - catch { - case _: ClusterGroupEmptyException => dflt() - } - } - - /** - * <b>Alias</b> for the same function `call$`. - * - * @param s Optional sequence of closures to call. If empty or `null` - this method is no-op and returns `null`. - * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. - * @return Sequence of result values from all nodes where given closures were executed or `null` (see above). - * @see `org.gridgain.grid.GridProjection.call(...)` - */ - def #<[R](@Nullable s: Seq[Call[R]], @Nullable p: NF): Seq[R] = - call$(s, p) - - /** - * Synchronous closure call on this projection with return value. - * This call will block until all results are received and ready. - * - * @param s Optional closure to call. If `null` - this method is no-op and returns `null`. - * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. - * @return Sequence of result values from all nodes where given closures were executed or `null` (see above). - * @see `org.gridgain.grid.GridProjection.call(...)` - */ - def call$[R](@Nullable s: Call[R], @Nullable p: NF): Seq[R] = - call$(Seq(s), p) - - /** - * Synchronous closure call on this projection with return value. - * This call will block until all results are received and ready. If this projection - * is empty than `dflt` closure will be executed and its result returned. - * - * @param s Optional closure to call. If `null` - this method is no-op and returns `null`. - * @param dflt Closure to execute if projection is empty. - * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. - * @return Sequence of result values from all nodes where given closures were executed or `null` (see above). - * @see `org.gridgain.grid.GridProjection.call(...)` - */ - def callSafe[R](@Nullable s: Call[R], dflt: () => Seq[R], @Nullable p: NF): Seq[R] = { - assert(dflt != null) - - try - call$(Seq(s), p) - catch { - case _: ClusterGroupEmptyException => dflt() - } - } - - /** - * <b>Alias</b> for the same function `call$`. - * - * @param s Optional closure to call. If `null` - this method is no-op and returns `null`. - * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. - * @return Sequence of result values from all nodes where given closures were executed - * or `null` (see above). - * @see `org.gridgain.grid.GridProjection.call(...)` - */ - def #<[R](@Nullable s: Call[R], @Nullable p: NF): Seq[R] = - call$(s, p) - - /** - * Synchronous closures call on this projection without return value. - * This call will block until all executions are complete. - * - * @param s Optional sequence of closures to call. If empty or `null` - this method is no-op. - * @param p Optional node filter predicate. If `null` provided- all nodes in projection will be used. - * @see `org.gridgain.grid.GridProjection.run(...)` - */ - def run$(@Nullable s: Seq[Run], @Nullable p: NF) { - runAsync$(s, p).get - } - - /** - * Synchronous broadcast closure call on this projection without return value. - * - * @param r Closure to run all nodes in projection. - * @param p Optional node filter predicate. If `null` provided- all nodes in projection will be used. - */ - def bcastRun(@Nullable r: Run, @Nullable p: NF) { - value.ignite().compute(forPredicate(p)).broadcast(toRunnable(r)) - } - - /** - * Synchronous closures call on this projection without return value. - * This call will block until all executions are complete. If this projection - * is empty than `dflt` closure will be executed. - * - * @param s Optional sequence of closures to call. If empty or `null` - this - * method is no-op. - * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. - * @param dflt Closure to execute if projection is empty. - * @see `org.gridgain.grid.GridProjection.run(...)` - */ - def runSafe(@Nullable s: Seq[Run], @Nullable dflt: Run, @Nullable p: NF) { - try { - run$(s, p) - } - catch { - case _: ClusterGroupEmptyException => if (dflt != null) dflt() else () - } - } - - /** - * <b>Alias</b> alias for the same function `run$`. - * - * @param s Optional sequence of closures to call. If empty or `null` - this method is no-op. - * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. - * @see `org.gridgain.grid.GridProjection.run(...)` - */ - def *<(@Nullable s: Seq[Run], @Nullable p: NF) { - run$(s, p) - } - - /** - * Synchronous closure call on this projection without return value. - * This call will block until all executions are complete. - * - * @param s Optional closure to call. If empty or `null` - this method is no-op. - * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. - * @see `org.gridgain.grid.GridProjection.run(...)` - */ - def run$(@Nullable s: Run, @Nullable p: NF) { - run$(Seq(s), p) - } - - /** - * Synchronous closure call on this projection without return value. - * This call will block until all executions are complete. If this projection - * is empty than `dflt` closure will be executed. - * - * @param s Optional closure to call. If empty or `null` - this method is no-op. - * @param dflt Closure to execute if projection is empty. - * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. - * @see `org.gridgain.grid.GridProjection.run(...)` - */ - def runSafe(@Nullable s: Run, @Nullable dflt: Run, @Nullable p: NF) { - try { - run$(s, p) - } - catch { - case _: ClusterGroupEmptyException => if (dflt != null) dflt() else () - } - } - - /** - * <b>Alias</b> for the same function `run$`. - * - * @param s Optional closure to call. If empty or `null` - this method is no-op. - * @param p Optional node filter predicate. If none provided or `null` - all nodes in projection will be used. - * @see `org.gridgain.grid.GridProjection.run(...)` - */ - def *<(@Nullable s: Run, @Nullable p: NF) { - run$(s, p) - } - - /** - * Asynchronous closures call on this projection with return value. This call will - * return immediately with the future that can be used to wait asynchronously for the results. - * - * @param s Optional sequence of closures to call. If empty or `null` - this method - * is no-op and finished future over `null` is returned. - * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. - * @return Future of Java collection containing result values from all nodes where given - * closures were executed or `null` (see above). - * @see `org.gridgain.grid.GridProjection.call(...)` - */ - def callAsync$[R](@Nullable s: Seq[Call[R]], @Nullable p: NF): - IgniteFuture[java.util.Collection[R]] = { - val comp = value.ignite().compute(forPredicate(p)).enableAsync() - - comp.call[R](toJavaCollection(s, (f: Call[R]) => toCallable(f))) - - comp.future() - } - - /** - * <b>Alias</b> for the same function `callAsync$`. - * - * @param s Optional sequence of closures to call. If empty or `null` - this method - * is no-op and finished future over `null` is returned. - * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. - * @return Future of Java collection containing result values from all nodes where given - * closures were executed or `null` (see above). - * @see `org.gridgain.grid.GridProjection.call(...)` - */ - def #?[R](@Nullable s: Seq[Call[R]], @Nullable p: NF): IgniteFuture[java.util.Collection[R]] = { - callAsync$(s, p) - } - - /** - * Asynchronous closure call on this projection with return value. This call will - * return immediately with the future that can be used to wait asynchronously for the results. - * - * @param s Optional closure to call. If `null` - this method is no-op and finished - * future over `null` is returned. - * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. - * @return Future of Java collection containing result values from all nodes where given - * closures were executed or `null` (see above). - * @see `org.gridgain.grid.GridProjection.call(...)` - */ - def callAsync$[R](@Nullable s: Call[R], @Nullable p: NF): IgniteFuture[java.util.Collection[R]] = { - callAsync$(Seq(s), p) - } - - /** - * <b>Alias</b> for the same function `callAsync$`. - * - * @param s Optional closure to call. If `null` - this method is no-op and finished - * future over `null` is returned. - * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. - * @return Future of Java collection containing result values from all nodes where given - * closures were executed or `null` (see above). - * @see `org.gridgain.grid.GridProjection.call(...)` - */ - def #?[R](@Nullable s: Call[R], @Nullable p: NF): IgniteFuture[java.util.Collection[R]] = { - callAsync$(s, p) - } - - /** - * Asynchronous closures call on this projection without return value. This call will - * return immediately with the future that can be used to wait asynchronously for the results. - * - * @param s Optional sequence of absolute closures to call. If empty or `null` - this method - * is no-op and finished future over `null` will be returned. - * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. - * @see `org.gridgain.grid.GridProjection.call(...)` - */ - def runAsync$(@Nullable s: Seq[Run], @Nullable p: NF): IgniteFuture[_] = { - val comp = value.ignite().compute(forPredicate(p)).enableAsync() - - comp.run(toJavaCollection(s, (f: Run) => toRunnable(f))) - - comp.future() - } - - /** - * <b>Alias</b> for the same function `runAsync$`. - * - * @param s Optional sequence of absolute closures to call. If empty or `null` - this method - * is no-op and finished future over `null` will be returned. - * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. - * @see `org.gridgain.grid.GridProjection.call(...)` - */ - def *?(@Nullable s: Seq[Run], @Nullable p: NF): IgniteFuture[_] = { - runAsync$(s, p) - } - - /** - * Asynchronous closure call on this projection without return value. This call will - * return immediately with the future that can be used to wait asynchronously for the results. - * - * @param s Optional absolute closure to call. If `null` - this method - * is no-op and finished future over `null` will be returned. - * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. - * @see `org.gridgain.grid.GridProjection.run(...)` - */ - def runAsync$(@Nullable s: Run, @Nullable p: NF): IgniteFuture[_] = { - runAsync$(Seq(s), p) - } - - /** - * <b>Alias</b> for the same function `runAsync$`. - * - * @param s Optional absolute closure to call. If `null` - this method - * is no-op and finished future over `null` will be returned. - * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. - * @see `org.gridgain.grid.GridProjection.run(...)` - */ - def *?(@Nullable s: Run, @Nullable p: NF): IgniteFuture[_] = { - runAsync$(s, p) - } - - /** - * Asynchronous closures execution on this projection with reduction. This call will - * return immediately with the future that can be used to wait asynchronously for the results. - * - * @param s Optional sequence of closures to call. If empty or `null` - this method - * is no-op and will return finished future over `null`. - * @param r Optional reduction function. If `null` - this method - * is no-op and will return finished future over `null`. - * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. - * @return Future over the reduced result or `null` (see above). - * @see `org.gridgain.grid.GridProjection.reduce(...)` - */ - def reduceAsync$[R1, R2](s: Seq[Call[R1]], r: Seq[R1] => R2, @Nullable p: NF): IgniteFuture[R2] = { - assert(s != null && r != null) - - val comp = value.ignite().compute(forPredicate(p)).enableAsync() - - comp.call(toJavaCollection(s, (f: Call[R1]) => toCallable(f)), r) - - comp.future() - } - - /** - * <b>Alias</b> for the same function `reduceAsync$`. - * - * @param s Optional sequence of closures to call. If empty or `null` - this method - * is no-op and will return finished future over `null`. - * @param r Optional reduction function. If `null` - this method - * is no-op and will return finished future over `null`. - * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. - * @return Future over the reduced result or `null` (see above). - * @see `org.gridgain.grid.GridProjection.reduce(...)` - */ - def @?[R1, R2](s: Seq[Call[R1]], r: Seq[R1] => R2, @Nullable p: NF): IgniteFuture[R2] = { - reduceAsync$(s, r, p) - } - - /** - * Synchronous closures execution on this projection with reduction. - * This call will block until all results are reduced. - * - * @param s Optional sequence of closures to call. If empty or `null` - this method - * is no-op and will return `null`. - * @param r Optional reduction function. If `null` - this method - * is no-op and will return `null`. - * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. - * @return Reduced result or `null` (see above). - * @see `org.gridgain.grid.GridProjection.reduce(...)` - */ - def reduce$[R1, R2](@Nullable s: Seq[Call[R1]], @Nullable r: Seq[R1] => R2, @Nullable p: NF): R2 = - reduceAsync$(s, r, p).get - - /** - * Synchronous closures execution on this projection with reduction. - * This call will block until all results are reduced. If this projection - * is empty than `dflt` closure will be executed and its result returned. - * - * @param s Optional sequence of closures to call. If empty or `null` - this method - * is no-op and will return `null`. - * @param r Optional reduction function. If `null` - this method - * is no-op and will return `null`. - * @param dflt Closure to execute if projection is empty. - * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. - * @return Reduced result or `null` (see above). - * @see `org.gridgain.grid.GridProjection.reduce(...)` - */ - def reduceSafe[R1, R2](@Nullable s: Seq[Call[R1]], @Nullable r: Seq[R1] => R2, - dflt: () => R2, @Nullable p: NF): R2 = { - assert(dflt != null) - - try - reduceAsync$(s, r, p).get - catch { - case _: ClusterGroupEmptyException => dflt() - } - } - - /** - * <b>Alias</b> for the same function `reduce$`. - * - * @param s Optional sequence of closures to call. If empty or `null` - this method is no-op and will return `null`. - * @param r Optional reduction function. If `null` - this method is no-op and will return `null`. - * @param p Optional node filter predicate. If `null` provided - all nodes in projection will be used. - * @return Reduced result or `null` (see above). - * @see `org.gridgain.grid.GridProjection.reduce(...)` - */ - def @<[R1, R2](@Nullable s: Seq[Call[R1]], @Nullable r: Seq[R1] => R2, @Nullable p: NF): R2 = - reduceAsync$(s, r, p).get - - /** - * Executes given closure on the nodes where data for provided affinity key is located. This - * is known as affinity co-location between compute grid (a closure) and in-memory data grid - * (value with affinity key). Note that implementation of multiple executions of the same closure will - * be wrapped as a single task that splits into multiple `job`s that will be mapped to nodes - * with provided affinity keys. - * - * This method will block until its execution is complete or an exception is thrown. - * All default SPI implementations configured for this grid instance will be - * used (i.e. failover, load balancing, collision resolution, etc.). - * Note that if you need greater control on any aspects of Java code execution on the grid - * you should implement `GridComputeTask` which will provide you with full control over the execution. - * - * Notice that `Runnable` and `Callable` implementations must support serialization as required - * by the configured marshaller. For example, JDK marshaller will require that implementations would - * be serializable. Other marshallers, e.g. JBoss marshaller, may not have this limitation. Please consult - * with specific marshaller implementation for the details. Note that all closures and predicates in - * `org.gridgain.grid.lang` package are serializable and can be freely used in the distributed - * context with all marshallers currently shipped with GridGain. - * - * @param cacheName Name of the cache to use for affinity co-location. - * @param affKey Affinity key. - * @param r Closure to affinity co-located on the node with given affinity key and execute. - * If `null` - this method is no-op. - * @param p Optional filtering predicate. If `null` provided - all nodes in this projection will be used for topology. - * @throws IgniteCheckedException Thrown in case of any error. - * @throws ClusterGroupEmptyException Thrown in case when this projection is empty. - * Note that in case of dynamic projection this method will take a snapshot of all the - * nodes at the time of this call, apply all filtering predicates, if any, and if the - * resulting collection of nodes is empty - the exception will be thrown. - * @throws IgniteInterruptedException Subclass of `IgniteCheckedException` thrown if the wait was interrupted. - * @throws IgniteFutureCancelledException Subclass of `IgniteCheckedException` thrown if computation was cancelled. - */ - def affinityRun$(cacheName: String, @Nullable affKey: Any, @Nullable r: Run, @Nullable p: NF) { - affinityRunAsync$(cacheName, affKey, r, p).get - } - - /** - * Executes given closure on the nodes where data for provided affinity key is located. This - * is known as affinity co-location between compute grid (a closure) and in-memory data grid - * (value with affinity key). Note that implementation of multiple executions of the same closure will - * be wrapped as a single task that splits into multiple `job`s that will be mapped to nodes - * with provided affinity keys. - * - * Unlike its sibling method `affinityRun(String, Collection, Runnable, GridPredicate[])` this method does - * not block and returns immediately with future. All default SPI implementations - * configured for this grid instance will be used (i.e. failover, load balancing, collision resolution, etc.). - * Note that if you need greater control on any aspects of Java code execution on the grid - * you should implement `GridComputeTask` which will provide you with full control over the execution. - * - * Note that class `GridAbsClosure` implements `Runnable` and class `GridOutClosure` - * implements `Callable` interface. Note also that class `GridFunc` and typedefs provide rich - * APIs and functionality for closures and predicates based processing in GridGain. While Java interfaces - * `Runnable` and `Callable` allow for lowest common denominator for APIs - it is advisable - * to use richer Functional Programming support provided by GridGain available in `org.gridgain.grid.lang` - * package. - * - * Notice that `Runnable` and `Callable` implementations must support serialization as required - * by the configured marshaller. For example, JDK marshaller will require that implementations would - * be serializable. Other marshallers, e.g. JBoss marshaller, may not have this limitation. Please consult - * with specific marshaller implementation for the details. Note that all closures and predicates in - * `org.gridgain.grid.lang` package are serializable and can be freely used in the distributed - * context with all marshallers currently shipped with GridGain. - * - * @param cacheName Name of the cache to use for affinity co-location. - * @param affKey Affinity key. - * @param r Closure to affinity co-located on the node with given affinity key and execute. - * If `null` - this method is no-op. - * @param p Optional filtering predicate. If `null` provided - all nodes in this projection will be used for topology. - * @throws IgniteCheckedException Thrown in case of any error. - * @throws ClusterGroupEmptyException Thrown in case when this projection is empty. - * Note that in case of dynamic projection this method will take a snapshot of all the - * nodes at the time of this call, apply all filtering predicates, if any, and if the - * resulting collection of nodes is empty - the exception will be thrown. - * @return Non-cancellable future of this execution. - * @throws IgniteInterruptedException Subclass of `IgniteCheckedException` thrown if the wait was interrupted. - * @throws IgniteFutureCancelledException Subclass of `IgniteCheckedException` thrown if computation was cancelled. - */ - def affinityRunAsync$(cacheName: String, @Nullable affKey: Any, @Nullable r: Run, - @Nullable p: NF): IgniteFuture[_] = { - val comp = value.ignite().compute(forPredicate(p)).enableAsync() - - comp.affinityRun(cacheName, affKey, toRunnable(r)) - - comp.future() - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6f9c9e6/modules/scalar/src/main/scala/org/gridgain/scalar/pimps/ScalarTaskThreadContext.scala ---------------------------------------------------------------------- diff --git a/modules/scalar/src/main/scala/org/gridgain/scalar/pimps/ScalarTaskThreadContext.scala b/modules/scalar/src/main/scala/org/gridgain/scalar/pimps/ScalarTaskThreadContext.scala deleted file mode 100644 index 47d2078..0000000 --- a/modules/scalar/src/main/scala/org/gridgain/scalar/pimps/ScalarTaskThreadContext.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.scalar.pimps - -import org.apache.ignite.cluster.ClusterGroup -import org.apache.ignite._ -import org.gridgain.grid._ -import org.gridgain.scalar._ -import org.jetbrains.annotations._ - -/** - * This trait provide mixin for properly typed version of `GridProjection#with...()` methods. - * - * Method on `GridProjection` always returns an instance of type `GridProjection` even when - * called on a sub-class. This trait's methods return the instance of the same type - * it was called on. - */ -trait ScalarTaskThreadContext[T <: ClusterGroup] extends ScalarConversions { this: PimpedType[T] => - /** - * Properly typed version of `GridCompute#withName(...)` method. - * - * @param taskName Name of the task. - */ - def withName$(@Nullable taskName: String): T = - value.ignite().compute(value).withName(taskName).asInstanceOf[T] - - /** - * Properly typed version of `GridCompute#withNoFailover()` method. - */ - def withNoFailover$(): T = - value.ignite().compute(value).withNoFailover().asInstanceOf[T] -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6f9c9e6/modules/scalar/src/main/scala/org/gridgain/scalar/scalar.scala ---------------------------------------------------------------------- diff --git a/modules/scalar/src/main/scala/org/gridgain/scalar/scalar.scala b/modules/scalar/src/main/scala/org/gridgain/scalar/scalar.scala deleted file mode 100644 index 83f04ee..0000000 --- a/modules/scalar/src/main/scala/org/gridgain/scalar/scalar.scala +++ /dev/null @@ -1,457 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.scalar - -import java.net.URL -import org.apache.ignite.cache.GridCache -import org.apache.ignite.cache.query.{GridCacheQuerySqlField, GridCacheQueryTextField} -import org.apache.ignite.internal.GridProductImpl -import org.apache.ignite.{IgniteState, IgniteDataLoader, Ignition, Ignite} -import org.apache.ignite.cluster.ClusterNode -import org.apache.ignite.configuration.IgniteConfiguration -import org.jetbrains.annotations.Nullable -import java.util.UUID -import annotation.meta.field - -/** - * {{{ - * ________ ______ ______ _______ - * __ ___/_____________ ____ /______ _________ __/__ \ __ __ \ - * _____ \ _ ___/_ __ `/__ / _ __ `/__ ___/ ____/ / _ / / / - * ____/ / / /__ / /_/ / _ / / /_/ / _ / _ __/___/ /_/ / - * /____/ \___/ \__,_/ /_/ \__,_/ /_/ /____/_(_)____/ - * - * }}} - * - * ==Overview== - * `scalar` is the main object that encapsulates Scalar DSL. It includes global functions - * on "scalar" keyword, helper converters as well as necessary implicit conversions. `scalar` also - * mimics many methods in `GridGain` class from Java side. - * - * The idea behind Scalar DSL - '''zero additional logic and only conversions''' implemented - * using Scala "Pimp" pattern. Note that most of the Scalar DSL development happened on Java - * side of GridGain 3.0 product line - Java APIs had to be adjusted quite significantly to - * support natural adaptation of functional APIs. That basically means that all functional - * logic must be available on Java side and Scalar only provides conversions from Scala - * language constructs to Java constructs. Note that currently GridGain supports Scala 2.8 - * and up only. - * - * This design approach ensures that Java side does not starve and usage paradigm - * is mostly the same between Java and Scala - yet with full power of Scala behind. - * In other words, Scalar only adds Scala specifics, but not greatly altering semantics - * of how GridGain APIs work. Most of the time the code in Scalar can be written in - * Java in almost the same number of lines. - * - * ==Suffix '$' In Names== - * Symbol `$` is used in names when they conflict with the names in the base Java class - * that Scala pimp is shadowing or with Java package name that your Scala code is importing. - * Instead of giving two different names to the same function we've decided to simply mark - * Scala's side method with `$` suffix. - * - * ==Importing== - * Scalar needs to be imported in a proper way so that necessary objects and implicit - * conversions got available in the scope: - * <pre name="code" class="scala"> - * import org.gridgain.scalar._ - * import scalar._ - * </pre> - * This way you import object `scalar` as well as all methods declared or inherited in that - * object as well. - * - * ==Examples== - * Here are few short examples of how Scalar can be used to program routine distributed - * task. All examples below use default GridGain configuration and default grid. All these - * examples take an implicit advantage of auto-discovery and failover, load balancing and - * collision resolution, zero deployment and many other underlying technologies in the - * GridGain - while remaining absolutely distilled to the core domain logic. - * - * This code snippet prints out full topology: - * <pre name="code" class="scala"> - * scalar { - * grid$ foreach (n => println("Node: " + n.id8)) - * } - * </pre> - * The obligatory example - cloud enabled `Hello World!`. It splits the phrase - * into multiple words and prints each word on a separate grid node: - * <pre name="code" class="scala"> - * scalar { - * grid$ *< (SPREAD, (for (w <- "Hello World!".split(" ")) yield () => println(w))) - * } - * </pre> - * This example broadcasts message to all nodes: - * <pre name="code" class="scala"> - * scalar { - * grid$ *< (BROADCAST, () => println("Broadcasting!!!")) - * } - * </pre> - * This example "greets" remote nodes only (note usage of Java-side closure): - * <pre name="code" class="scala"> - * scalar { - * val me = grid$.localNode.id - * grid$.remoteProjection() *< (BROADCAST, F.println("Greetings from: " + me)) - * } - * </pre> - * - * Next example creates a function that calculates lengths of the string - * using MapReduce type of processing by splitting the input string into - * multiple substrings, calculating each substring length on the remote - * node and aggregating results for the final length of the original string: - * <pre name="code" class="scala"> - * def count(msg: String) = - * grid$ @< (SPREAD, for (w <- msg.split(" ")) yield () => w.length, (s: Seq[Int]) => s.sum) - * </pre> - * This example shows a simple example of how Scalar can be used to work with in-memory data grid: - * <pre name="code" class="scala"> - * scalar { - * val t = cache$[Symbol, Double]("partitioned") - * t += ('symbol -> 2.0) - * t -= ('symbol) - * } - * </pre> - */ -object scalar extends ScalarConversions { - /** Visor copyright blurb. */ - private val COPYRIGHT = GridProductImpl.COPYRIGHT - - /** Type alias for `GridCacheQuerySqlField`. */ - type ScalarCacheQuerySqlField = GridCacheQuerySqlField @field - - /** Type alias for `GridCacheQueryTextField`. */ - type ScalarCacheQueryTextField = GridCacheQueryTextField @field - - /** - * Prints Scalar ASCII-logo. - */ - def logo() { - val NL = System getProperty "line.separator" - - val s = - " ________ ______ " + NL + - " __ ___/_____________ ____ /______ _________ " + NL + - " _____ \\ _ ___/_ __ `/__ / _ __ `/__ ___/ " + NL + - " ____/ / / /__ / /_/ / _ / / /_/ / _ / " + NL + - " /____/ \\___/ \\__,_/ /_/ \\__,_/ /_/ " + NL + NL + - " GRIDGAIN SCALAR" + - " " + COPYRIGHT + NL - - println(s) - } - - /** - * Note that grid instance will be stopped with cancel flat set to `true`. - * - * @param g Grid instance. - * @param body Closure with grid instance as body's parameter. - */ - private def init[T](g: Ignite, body: Ignite => T): T = { - assert(g != null, body != null) - - try { - body(g) - } - finally { - Ignition.stop(g.name, true) - } - } - - /** - * Note that grid instance will be stopped with cancel flat set to `true`. - * - * @param g Grid instance. - * @param body Passed by name body. - */ - private def init0[T](g: Ignite, body: => T): T = { - assert(g != null) - - try { - body - } - finally { - Ignition.stop(g.name, true) - } - } - - /** - * Executes given closure within automatically managed default grid instance. - * If default grid is already started the passed in closure will simply - * execute. - * - * @param body Closure to execute within automatically managed default grid instance. - */ - def apply(body: Ignite => Unit) { - if (!isStarted) init(Ignition.start, body) else body(grid$) - } - - /** - * Executes given closure within automatically managed default grid instance. - * If default grid is already started the passed in closure will simply - * execute. - * - * @param body Closure to execute within automatically managed default grid instance. - */ - def apply[T](body: Ignite => T): T = - if (!isStarted) init(Ignition.start, body) else body(grid$) - - /** - * Executes given closure within automatically managed default grid instance. - * If default grid is already started the passed in closure will simply - * execute. - * - * @param body Closure to execute within automatically managed default grid instance. - */ - def apply[T](body: => T): T = - if (!isStarted) init0(Ignition.start, body) else body - - /** - * Executes given closure within automatically managed default grid instance. - * If default grid is already started the passed in closure will simply - * execute. - * - * @param body Closure to execute within automatically managed grid instance. - */ - def apply(body: => Unit) { - if (!isStarted) init0(Ignition.start, body) else body - } - - /** - * Executes given closure within automatically managed grid instance. - * - * @param springCfgPath Spring XML configuration file path or URL. - * @param body Closure to execute within automatically managed grid instance. - */ - def apply(springCfgPath: String)(body: => Unit) { - init0(Ignition.start(springCfgPath), body) - } - - /** - * Executes given closure within automatically managed grid instance. - * - * @param cfg Grid configuration instance. - * @param body Closure to execute within automatically managed grid instance. - */ - def apply(cfg: IgniteConfiguration)(body: => Unit) { - init0(Ignition.start(cfg), body) - } - - /** - * Executes given closure within automatically managed grid instance. - * - * @param springCfgUrl Spring XML configuration file URL. - * @param body Closure to execute within automatically managed grid instance. - */ - def apply(springCfgUrl: URL)(body: => Unit) { - init0(Ignition.start(springCfgUrl), body) - } - - /** - * Gets default cache. - * - * Note that you always need to provide types when calling - * this function - otherwise Scala will create `GridCache[Nothing, Nothing]` - * typed instance that cannot be used. - */ - @inline def cache$[K, V]: Option[GridCache[K, V]] = - Option(Ignition.ignite.cache[K, V](null)) - - /** - * Gets named cache from default grid. - * - * @param cacheName Name of the cache to get. - */ - @inline def cache$[K, V](@Nullable cacheName: String): Option[GridCache[K, V]] = - Option(Ignition.ignite.cache(cacheName)) - - /** - * Gets named cache from specified grid. - * - * @param gridName Name of the grid. - * @param cacheName Name of the cache to get. - */ - @inline def cache$[K, V](@Nullable gridName: String, @Nullable cacheName: String): Option[GridCache[K, V]] = - grid$(gridName) match { - case Some(g) => Option(g.cache(cacheName)) - case None => None - } - - /** - * Gets a new instance of data loader associated with given cache name. - * - * @param cacheName Cache name (`null` for default cache). - * @param bufSize Per node buffer size. - * @return New instance of data loader. - */ - @inline def dataLoader$[K, V]( - @Nullable cacheName: String, - bufSize: Int): IgniteDataLoader[K, V] = { - val dl = grid$.dataLoader[K, V](cacheName) - - dl.perNodeBufferSize(bufSize) - - dl - } - - /** - * Gets default grid instance. - */ - @inline def grid$: Ignite = Ignition.ignite - - /** - * Gets node ID as ID8 string. - */ - def nid8$(node: ClusterNode) = node.id().toString.take(8).toUpperCase - - /** - * Gets named grid. - * - * @param name Grid name. - */ - @inline def grid$(@Nullable name: String): Option[Ignite] = - try { - Option(Ignition.ignite(name)) - } - catch { - case _: IllegalStateException => None - } - - /** - * Gets grid for given node ID. - * - * @param locNodeId Local node ID for which to get grid instance option. - */ - @inline def grid$(locNodeId: UUID): Option[Ignite] = { - assert(locNodeId != null) - - try { - Option(Ignition.ignite(locNodeId)) - } - catch { - case _: IllegalStateException => None - } - } - - /** - * Tests if specified grid is started. - * - * @param name Gird name. - */ - def isStarted(@Nullable name: String) = - Ignition.state(name) == IgniteState.STARTED - - /** - * Tests if specified grid is stopped. - * - * @param name Gird name. - */ - def isStopped(@Nullable name: String) = - Ignition.state(name) == IgniteState.STOPPED - - /** - * Tests if default grid is started. - */ - def isStarted = - Ignition.state == IgniteState.STARTED - - /** - * Tests if default grid is stopped. - */ - def isStopped = - Ignition.state == IgniteState.STOPPED - - /** - * Stops given grid and specified cancel flag. - * If specified grid is already stopped - it's no-op. - * - * @param name Grid name to cancel. - * @param cancel Whether or not to cancel all currently running jobs. - */ - def stop(@Nullable name: String, cancel: Boolean) = - if (isStarted(name)) - Ignition.stop(name, cancel) - - /** - * Stops default grid with given cancel flag. - * If default grid is already stopped - it's no-op. - * - * @param cancel Whether or not to cancel all currently running jobs. - */ - def stop(cancel: Boolean) = - if (isStarted) Ignition.stop(cancel) - - /** - * Stops default grid with cancel flag set to `true`. - * If default grid is already stopped - it's no-op. - */ - def stop() = - if (isStarted) Ignition.stop(true) - - /** - * Sets daemon flag to grid factory. Note that this method should be called - * before grid instance starts. - * - * @param f Daemon flag to set. - */ - def daemon(f: Boolean) { - Ignition.setDaemon(f) - } - - /** - * Gets daemon flag set in the grid factory. - */ - def isDaemon = - Ignition.isDaemon - - /** - * Starts default grid. It's no-op if default grid is already started. - * - * @return Started grid. - */ - def start(): Ignite = { - if (!isStarted) Ignition.start else grid$ - } - - /** - * Starts grid with given parameter(s). - * - * @param springCfgPath Spring XML configuration file path or URL. - * @return Started grid. If Spring configuration contains multiple grid instances, - * then the 1st found instance is returned. - */ - def start(@Nullable springCfgPath: String): Ignite = { - Ignition.start(springCfgPath) - } - - /** - * Starts grid with given parameter(s). - * - * @param cfg Grid configuration. This cannot be `null`. - * @return Started grid. - */ - def start(cfg: IgniteConfiguration): Ignite = { - Ignition.start(cfg) - } - - /** - * Starts grid with given parameter(s). - * - * @param springCfgUrl Spring XML configuration file URL. - * @return Started grid. - */ - def start(springCfgUrl: URL): Ignite = { - Ignition.start(springCfgUrl) - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6f9c9e6/modules/scalar/src/test/scala/org/apache/ignite/scalar/tests/ScalarAffinityRoutingSpec.scala ---------------------------------------------------------------------- diff --git a/modules/scalar/src/test/scala/org/apache/ignite/scalar/tests/ScalarAffinityRoutingSpec.scala b/modules/scalar/src/test/scala/org/apache/ignite/scalar/tests/ScalarAffinityRoutingSpec.scala new file mode 100644 index 0000000..989e72d --- /dev/null +++ b/modules/scalar/src/test/scala/org/apache/ignite/scalar/tests/ScalarAffinityRoutingSpec.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.scalar.tests + +import org.scalatest.matchers._ +import org.scalatest._ +import junit.JUnitRunner +import org.gridgain.scalar.scalar +import scalar._ +import collection.JavaConversions._ +import java.util.concurrent.atomic.AtomicInteger +import org.junit.runner.RunWith + +/** + * Tests for `affinityRun..` and `affinityCall..` methods. + */ +@RunWith(classOf[JUnitRunner]) +class ScalarAffinityRoutingSpec extends FlatSpec with ShouldMatchers with BeforeAndAfterAll { + /** Cache name. */ + private val CACHE_NAME = "partitioned_tx" + + "affinityRun$ method" should "run correctly" in scalar("examples/config/example-cache.xml") { + val c = cache$[Int, Int](CACHE_NAME).get + + c += (0 -> 0) + c += (1 -> 1) + c += (2 -> 2) + + val cnt = c.dataStructures().atomicLong("affinityRun", 0, true) + + grid$.affinityRun$(CACHE_NAME, 0, () => { cnt.incrementAndGet() }, null) + grid$.affinityRun$(CACHE_NAME, 1, () => { cnt.incrementAndGet() }, null) + grid$.affinityRun$(CACHE_NAME, 2, () => { cnt.incrementAndGet() }, null) + + assert(cnt.get === 3) + } + + "affinityRunAsync$ method" should "run correctly" in scalar("examples/config/example-cache.xml") { + val c = cache$[Int, Int](CACHE_NAME).get + + c += (0 -> 0) + c += (1 -> 1) + c += (2 -> 2) + + val cnt = c.dataStructures().atomicLong("affinityRunAsync", 0, true) + + grid$.affinityRunAsync$(CACHE_NAME, 0, () => { cnt.incrementAndGet() }, null).get + grid$.affinityRunAsync$(CACHE_NAME, 1, () => { cnt.incrementAndGet() }, null).get + grid$.affinityRunAsync$(CACHE_NAME, 2, () => { cnt.incrementAndGet() }, null).get + + assert(cnt.get === 3) + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6f9c9e6/modules/scalar/src/test/scala/org/apache/ignite/scalar/tests/ScalarCacheProjectionSpec.scala ---------------------------------------------------------------------- diff --git a/modules/scalar/src/test/scala/org/apache/ignite/scalar/tests/ScalarCacheProjectionSpec.scala b/modules/scalar/src/test/scala/org/apache/ignite/scalar/tests/ScalarCacheProjectionSpec.scala new file mode 100644 index 0000000..0714f16 --- /dev/null +++ b/modules/scalar/src/test/scala/org/apache/ignite/scalar/tests/ScalarCacheProjectionSpec.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.scalar.tests + +import org.gridgain.scalar._ +import scalar._ +import org.scalatest.FlatSpec +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +/** + * Test for using grid.cache(..).projection(...) from scala code. + */ +@RunWith(classOf[JUnitRunner]) +class ScalarCacheProjectionSpec extends FlatSpec { + behavior of "Cache projection" + + it should "work properly via grid.cache(...).viewByType(...)" in scalar("examples/config/example-cache.xml") { + val cache = grid$.cache("local").viewByType(classOf[String], classOf[Int]) + + assert(cache.putx("1", 1)) + assert(cache.get("1") == 1) + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6f9c9e6/modules/scalar/src/test/scala/org/apache/ignite/scalar/tests/ScalarCacheQueriesSpec.scala ---------------------------------------------------------------------- diff --git a/modules/scalar/src/test/scala/org/apache/ignite/scalar/tests/ScalarCacheQueriesSpec.scala b/modules/scalar/src/test/scala/org/apache/ignite/scalar/tests/ScalarCacheQueriesSpec.scala new file mode 100644 index 0000000..08d3b7d --- /dev/null +++ b/modules/scalar/src/test/scala/org/apache/ignite/scalar/tests/ScalarCacheQueriesSpec.scala @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.scalar.tests + +import org.apache.ignite.cache.GridCache +import org.apache.ignite.cluster.ClusterNode +import org.gridgain.scalar._ +import scalar._ +import org.scalatest.matchers._ +import org.scalatest._ +import junit.JUnitRunner +import org.gridgain.grid.cache._ +import org.apache.ignite._ +import org.gridgain.grid._ +import org.junit.runner.RunWith + +/** + * Tests for Scalar cache queries API. + */ +@RunWith(classOf[JUnitRunner]) +class ScalarCacheQueriesSpec extends FlatSpec with ShouldMatchers with BeforeAndAfterAll { + /** Entries count. */ + private val ENTRY_CNT = 10 + + /** Words. */ + private val WORDS = List("", "one", "two", "three", "four", "five", + "six", "seven", "eight", "nine", "ten") + + /** Node. */ + private var n: ClusterNode = null + + /** Cache. */ + private var c: GridCache[Int, ObjectValue] = null + + /** + * Start node and put data to cache. + */ + override def beforeAll() { + n = start("modules/scalar/src/test/resources/spring-cache.xml").cluster().localNode + + c = cache$[Int, ObjectValue].get + + (1 to ENTRY_CNT).foreach(i => c.putx(i, ObjectValue(i, "str " + WORDS(i)))) + + assert(c.size == ENTRY_CNT) + + c.foreach(e => println(e.getKey + " -> " + e.getValue)) + } + + /** + * Stop node. + */ + override def afterAll() { + stop() + } + + behavior of "Scalar cache queries API" + + it should "correctly execute SCAN queries" in { + var res = c.scan(classOf[ObjectValue], (k: Int, v: ObjectValue) => k > 5 && v.intVal < 8) + + assert(res.size == 2) + + res.foreach(t => assert(t._1 > 5 && t._1 < 8 && t._1 == t._2.intVal)) + + res = c.scan((k: Int, v: ObjectValue) => k > 5 && v.intVal < 8) + + assert(res.size == 2) + + res.foreach(t => assert(t._1 > 5 && t._1 < 8 && t._1 == t._2.intVal)) + + res = c.scan(classOf[ObjectValue], (k: Int, v: ObjectValue) => k > 5 && v.intVal < 8) + + assert(res.size == 2) + + res.foreach(t => assert(t._1 > 5 && t._1 < 8 && t._1 == t._2.intVal)) + + res = c.scan((k: Int, v: ObjectValue) => k > 5 && v.intVal < 8) + + assert(res.size == 2) + + res.foreach(t => assert(t._1 > 5 && t._1 < 8 && t._1 == t._2.intVal)) + } + + it should "correctly execute SQL queries" in { + var res = c.sql(classOf[ObjectValue], "intVal > 5") + + assert(res.size == ENTRY_CNT - 5) + + res.foreach(t => assert(t._1 > 5 && t._1 == t._2.intVal)) + + res = c.sql(classOf[ObjectValue], "intVal > ?", 5) + + assert(res.size == ENTRY_CNT - 5) + + res.foreach(t => assert(t._1 > 5 && t._1 == t._2.intVal)) + + res = c.sql("intVal > 5") + + assert(res.size == ENTRY_CNT - 5) + + res.foreach(t => assert(t._1 > 5 && t._1 == t._2.intVal)) + + res = c.sql("intVal > ?", 5) + + assert(res.size == ENTRY_CNT - 5) + + res.foreach(t => assert(t._1 > 5 && t._1 == t._2.intVal)) + + res = c.sql(classOf[ObjectValue], "intVal > 5") + + assert(res.size == ENTRY_CNT - 5) + + res.foreach(t => assert(t._1 > 5 && t._1 == t._2.intVal)) + + res = c.sql(classOf[ObjectValue], "intVal > ?", 5) + + assert(res.size == ENTRY_CNT - 5) + + res.foreach(t => assert(t._1 > 5 && t._1 == t._2.intVal)) + + res.foreach(t => assert(t._1 > 5 && t._1 == t._2.intVal)) + + res = c.sql("intVal > 5") + + assert(res.size == ENTRY_CNT - 5) + + res.foreach(t => assert(t._1 > 5 && t._1 == t._2.intVal)) + + res = c.sql("intVal > ?", 5) + + assert(res.size == ENTRY_CNT - 5) + + res.foreach(t => assert(t._1 > 5 && t._1 == t._2.intVal)) + } + + it should "correctly execute TEXT queries" in { + var res = c.text(classOf[ObjectValue], "str") + + assert(res.size == ENTRY_CNT) + + res = c.text(classOf[ObjectValue], "five") + + assert(res.size == 1) + assert(res.head._1 == 5) + + res = c.text("str") + + assert(res.size == ENTRY_CNT) + + res = c.text("five") + + assert(res.size == 1) + assert(res.head._1 == 5) + + res = c.text(classOf[ObjectValue], "str") + + assert(res.size == ENTRY_CNT) + + res = c.text(classOf[ObjectValue], "five") + + assert(res.size == 1) + assert(res.head._1 == 5) + + res = c.text("str") + + assert(res.size == ENTRY_CNT) + + res = c.text("five") + + assert(res.size == 1) + assert(res.head._1 == 5) + } + + it should "correctly execute SCAN transform queries" in { + var res = c.scanTransform(classOf[ObjectValue], (k: Int, v: ObjectValue) => k > 5 && v.intVal < 8, + (v: ObjectValue) => v.intVal + 1) + + assert(res.size == 2) + + res.foreach(t => assert(t._1 > 5 && t._1 < 8 && t._1 + 1 == t._2)) + + res = c.scanTransform((k: Int, v: ObjectValue) => k > 5 && v.intVal < 8, (v: ObjectValue) => v.intVal + 1) + + assert(res.size == 2) + + res.foreach(t => assert(t._1 > 5 && t._1 < 8 && t._1 + 1 == t._2)) + + res = c.scanTransform(classOf[ObjectValue], (k: Int, v: ObjectValue) => k > 5 && v.intVal < 8, (v: ObjectValue) => v.intVal + 1) + + assert(res.size == 2) + + res.foreach(t => assert(t._1 > 5 && t._1 < 8 && t._1 + 1 == t._2)) + + res = c.scanTransform((k: Int, v: ObjectValue) => k > 5 && v.intVal < 8, (v: ObjectValue) => v.intVal + 1) + + assert(res.size == 2) + + res.foreach(t => assert(t._1 > 5 && t._1 < 8 && t._1 + 1 == t._2)) + } + + it should "correctly execute SQL transform queries" in { + var res = c.sqlTransform(classOf[ObjectValue], "intVal > 5", (v: ObjectValue) => v.intVal + 1) + + assert(res.size == ENTRY_CNT - 5) + + res.foreach(t => assert(t._1 > 5 && t._1 + 1 == t._2)) + + res = c.sqlTransform(classOf[ObjectValue], "intVal > ?", (v: ObjectValue) => v.intVal + 1, 5) + + assert(res.size == ENTRY_CNT - 5) + + res.foreach(t => assert(t._1 > 5 && t._1 + 1 == t._2)) + + res = c.sqlTransform("intVal > 5", (v: ObjectValue) => v.intVal + 1) + + assert(res.size == ENTRY_CNT - 5) + + res.foreach(t => assert(t._1 > 5 && t._1 + 1 == t._2)) + + res = c.sqlTransform("intVal > ?", (v: ObjectValue) => v.intVal + 1, 5) + + assert(res.size == ENTRY_CNT - 5) + + res.foreach(t => assert(t._1 > 5 && t._1 + 1 == t._2)) + + res = c.sqlTransform(classOf[ObjectValue], "intVal > 5", (v: ObjectValue) => v.intVal + 1) + + assert(res.size == ENTRY_CNT - 5) + + res.foreach(t => assert(t._1 > 5 && t._1 + 1 == t._2)) + + res = c.sqlTransform(classOf[ObjectValue], "intVal > ?", (v: ObjectValue) => v.intVal + 1, 5) + + assert(res.size == ENTRY_CNT - 5) + + res.foreach(t => assert(t._1 > 5 && t._1 + 1 == t._2)) + + res = c.sqlTransform("intVal > 5", (v: ObjectValue) => v.intVal + 1) + + assert(res.size == ENTRY_CNT - 5) + + res.foreach(t => assert(t._1 > 5 && t._1 + 1 == t._2)) + + res = c.sqlTransform("intVal > ?", (v: ObjectValue) => v.intVal + 1, 5) + + assert(res.size == ENTRY_CNT - 5) + + res.foreach(t => assert(t._1 > 5 && t._1 + 1 == t._2)) + } + + it should "correctly execute TEXT transform queries" in { + var res = c.textTransform(classOf[ObjectValue], "str", (v: ObjectValue) => v.intVal + 1) + + assert(res.size == ENTRY_CNT) + + res.foreach(t => assert(t._1 + 1 == t._2)) + + res = c.textTransform(classOf[ObjectValue], "five", (v: ObjectValue) => v.intVal + 1) + + assert(res.size == 1) + assert(res.head._1 == 5 && res.head._2 == 6) + + res = c.textTransform("str", (v: ObjectValue) => v.intVal + 1) + + assert(res.size == ENTRY_CNT) + + res.foreach(t => assert(t._1 + 1 == t._2)) + + res = c.textTransform("five", (v: ObjectValue) => v.intVal + 1) + + assert(res.size == 1) + assert(res.head._1 == 5 && res.head._2 == 6) + + res = c.textTransform(classOf[ObjectValue], "str", (v: ObjectValue) => v.intVal + 1) + + assert(res.size == ENTRY_CNT) + + res.foreach(t => assert(t._1 + 1 == t._2)) + + res = c.textTransform(classOf[ObjectValue], "five", (v: ObjectValue) => v.intVal + 1) + + assert(res.size == 1) + assert(res.head._1 == 5 && res.head._2 == 6) + + res = c.textTransform("str", (v: ObjectValue) => v.intVal + 1) + + assert(res.size == ENTRY_CNT) + + res.foreach(t => assert(t._1 + 1 == t._2)) + + res = c.textTransform("five", (v: ObjectValue) => v.intVal + 1) + + assert(res.size == 1) + assert(res.head._1 == 5 && res.head._2 == 6) + } + + it should "correctly execute SCAN reduce queries with two reducers" in { + var res = c.scanReduce(classOf[ObjectValue], (k: Int, v: ObjectValue) => k > 5 && v.intVal < 8, + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, (i: Iterable[Int]) => i.sum) + + assert(res == 13) + + res = c.scanReduce((k: Int, v: ObjectValue) => k > 5 && v.intVal < 8, + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, (i: Iterable[Int]) => i.sum) + + assert(res == 13) + + res = c.scanReduce(classOf[ObjectValue], (k: Int, v: ObjectValue) => k > 5 && v.intVal < 8, + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, (i: Iterable[Int]) => i.sum) + + assert(res == 13) + + res = c.scanReduce((k: Int, v: ObjectValue) => k > 5 && v.intVal < 8, + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, (i: Iterable[Int]) => i.sum) + + assert(res == 13) + } + + it should "correctly execute SQL reduce queries with two reducers" in { + var res = c.sqlReduce(classOf[ObjectValue], "intVal > 5", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, (i: Iterable[Int]) => i.sum) + + assert(res == 40) + + res = c.sqlReduce(classOf[ObjectValue], "intVal > ?", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, (i: Iterable[Int]) => i.sum, 3) + + assert(res == 49) + + res = c.sqlReduce("intVal > 5", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, (i: Iterable[Int]) => i.sum) + + assert(res == 40) + + res = c.sqlReduce("intVal > ?", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, (i: Iterable[Int]) => i.sum, 3) + + assert(res == 49) + + res = c.sqlReduce(classOf[ObjectValue], "intVal > 5", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, (i: Iterable[Int]) => i.sum) + + assert(res == 40) + + res = c.sqlReduce(classOf[ObjectValue], "intVal > ?", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, (i: Iterable[Int]) => i.sum, 3) + + assert(res == 49) + + res = c.sqlReduce("intVal > 5", (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, + (i: Iterable[Int]) => i.sum) + + assert(res == 40) + + res = c.sqlReduce("intVal > ?", (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, + (i: Iterable[Int]) => i.sum, 3) + + assert(res == 49) + } + + it should "correctly execute TEXT reduce queries with two reducers" in { + var res = c.textReduce(classOf[ObjectValue], "str", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, (i: Iterable[Int]) => i.sum) + + assert(res == 55) + + res = c.textReduce(classOf[ObjectValue], "three five seven", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, (i: Iterable[Int]) => i.sum) + + assert(res == 15) + + res = c.textReduce("str", (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, + (i: Iterable[Int]) => i.sum) + + assert(res == 55) + + res = c.textReduce("three five seven", (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, + (i: Iterable[Int]) => i.sum) + + assert(res == 15) + + res = c.textReduce(classOf[ObjectValue], "str", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, (i: Iterable[Int]) => i.sum) + + assert(res == 55) + + res = c.textReduce(classOf[ObjectValue], "three five seven", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, (i: Iterable[Int]) => i.sum) + + assert(res == 15) + + res = c.textReduce("str", (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, + (i: Iterable[Int]) => i.sum) + + assert(res == 55) + + res = c.textReduce("three five seven", (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, + (i: Iterable[Int]) => i.sum) + + assert(res == 15) + + res = c.textReduce(classOf[ObjectValue], "str", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, (i: Iterable[Int]) => i.sum) + + assert(res == 55) + + res = c.textReduce(classOf[ObjectValue], "seven", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, (i: Iterable[Int]) => i.sum) + + assert(res == 7) + + res = c.textReduce("str", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, (i: Iterable[Int]) => i.sum) + + assert(res == 55) + + res = c.textReduce("seven", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, (i: Iterable[Int]) => i.sum) + + assert(res == 7) + + res = c.textReduce(classOf[ObjectValue], "str", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, (i: Iterable[Int]) => i.sum) + + assert(res == 55) + + res = c.textReduce(classOf[ObjectValue], "seven", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, (i: Iterable[Int]) => i.sum) + + assert(res == 7) + + res = c.textReduce("str", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, (i: Iterable[Int]) => i.sum) + + assert(res == 55) + + res = c.textReduce("seven", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, (i: Iterable[Int]) => i.sum) + + assert(res == 7) + } + + it should "correctly execute SCAN reduce queries with one reducer" in { + var res = c.scanReduceRemote(classOf[ObjectValue], (k: Int, v: ObjectValue) => k > 5 && v.intVal < 8, + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum) + + assert(res.sum == 13) + + res = c.scanReduceRemote((k: Int, v: ObjectValue) => k > 5 && v.intVal < 8, + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum) + + assert(res.sum == 13) + + res = c.scanReduceRemote(classOf[ObjectValue], (k: Int, v: ObjectValue) => k > 5 && v.intVal < 8, + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum) + + assert(res.sum == 13) + + res = c.scanReduceRemote((k: Int, v: ObjectValue) => k > 5 && v.intVal < 8, + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum) + + assert(res.sum == 13) + } + + it should "correctly execute SQL reduce queries with one reducer" in { + var res = c.sqlReduceRemote(classOf[ObjectValue], "intVal > 5", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum) + + assert(res.sum == 40) + + res = c.sqlReduceRemote(classOf[ObjectValue], "intVal > ?", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, 3) + + assert(res.sum == 49) + + res = c.sqlReduceRemote("intVal > 5", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum) + + assert(res.sum == 40) + + res = c.sqlReduceRemote("intVal > ?", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, 3) + + assert(res.sum == 49) + + res = c.sqlReduceRemote(classOf[ObjectValue], "intVal > 5", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum) + + assert(res.sum == 40) + + res = c.sqlReduceRemote(classOf[ObjectValue], "intVal > ?", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, 3) + + assert(res.sum == 49) + + res = c.sqlReduceRemote("intVal > 5", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum) + + assert(res.sum == 40) + + res = c.sqlReduceRemote("intVal > ?", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum, 3) + + assert(res.sum == 49) + } + + it should "correctly execute TEXT reduce queries with one reducer" in { + var res = c.textReduceRemote(classOf[ObjectValue], "str", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum) + + assert(res.sum == 55) + + res = c.textReduceRemote(classOf[ObjectValue], "seven", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum) + + assert(res.sum == 7) + + res = c.textReduceRemote("str", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum) + + assert(res.sum == 55) + + res = c.textReduceRemote("seven", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum) + + assert(res.sum == 7) + + res = c.textReduceRemote(classOf[ObjectValue], "str", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum) + + assert(res.sum == 55) + + res = c.textReduceRemote(classOf[ObjectValue], "seven", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum) + + assert(res.sum == 7) + + res = c.textReduceRemote("str", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum) + + assert(res.sum == 55) + + res = c.textReduceRemote("seven", + (i: Iterable[(Int, ObjectValue)]) => i.map(_._2.intVal).sum) + + assert(res.sum == 7) + } + + it should "correctly execute fields queries" in { + var res = c.sqlFields(null, "select intVal from ObjectValue where intVal > 5") + + assert(res.size == ENTRY_CNT - 5) + + res.foreach(t => assert(t.size == 1 && t.head.asInstanceOf[Int] > 5)) + + res = c.sqlFields(null, "select intVal from ObjectValue where intVal > ?", 5) + + assert(res.size == ENTRY_CNT - 5) + + res.foreach(t => assert(t.size == 1 && t.head.asInstanceOf[Int] > 5)) + } + + it should "correctly execute queries with multiple arguments" in { + val res = c.sql("from ObjectValue where intVal in (?, ?, ?)", 1, 2, 3) + + assert(res.size == 3) + } +} + +/** + * Object for queries. + */ +private case class ObjectValue( + /** Integer value. */ + @ScalarCacheQuerySqlField + intVal: Int, + + /** String value. */ + @ScalarCacheQueryTextField + strVal: String +) { + override def toString: String = { + "ObjectValue [" + intVal + ", " + strVal + "]" + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a6f9c9e6/modules/scalar/src/test/scala/org/apache/ignite/scalar/tests/ScalarCacheSpec.scala ---------------------------------------------------------------------- diff --git a/modules/scalar/src/test/scala/org/apache/ignite/scalar/tests/ScalarCacheSpec.scala b/modules/scalar/src/test/scala/org/apache/ignite/scalar/tests/ScalarCacheSpec.scala new file mode 100644 index 0000000..03432e6 --- /dev/null +++ b/modules/scalar/src/test/scala/org/apache/ignite/scalar/tests/ScalarCacheSpec.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.scalar.tests + +import org.apache.ignite.events.{IgniteEventType, IgniteEvent} +import org.apache.ignite.lang.IgnitePredicate +import org.gridgain.scalar._ +import scalar._ +import org.scalatest.matchers._ +import org.scalatest._ +import junit.JUnitRunner +import IgniteEventType._ +import collection.JavaConversions._ +import org.junit.runner.RunWith + +/** + * Scalar cache test. + */ +@RunWith(classOf[JUnitRunner]) +class ScalarCacheSpec extends FlatSpec with ShouldMatchers { + behavior of "Scalar cache" + + it should "work properly via Java APIs" in { + scalar("examples/config/example-cache.xml") { + registerListener() + + val c = cache$("partitioned").get.viewByType(classOf[Int], classOf[Int]) + + c.putx(1, 1) + c.putx(2, 2) + + c.values foreach println + + println("Size is: " + c.size) + } + } + + /** + * This method will register listener for cache events on all nodes, + * so we can actually see what happens underneath locally and remotely. + */ + def registerListener() { + val g = grid$ + + g *< (() => { + val lsnr = new IgnitePredicate[IgniteEvent]() { + override def apply(e: IgniteEvent): Boolean = { + println(e.shortDisplay) + + true + } + } + + if (g.cluster().nodeLocalMap[String, AnyRef].putIfAbsent("lsnr", lsnr) == null) { + g.events.localListen(lsnr, + EVT_CACHE_OBJECT_PUT, + EVT_CACHE_OBJECT_READ, + EVT_CACHE_OBJECT_REMOVED) + + println("Listener is registered.") + } + }, null) + } +}