IGNITE-389 - IPC checked and API improvements.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6b51f99e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6b51f99e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6b51f99e Branch: refs/heads/ignite-883-1 Commit: 6b51f99e72eb11af25403f8ec50087c03b1f1fb7 Parents: 1d8643c Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Thu Jun 4 19:19:36 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Thu Jun 4 19:19:36 2015 -0700 ---------------------------------------------------------------------- .../ignite/internal/util/IgniteUtils.java | 4 +- .../shmem/IpcSharedMemoryClientEndpoint.java | 2 +- .../ipc/shmem/IpcSharedMemoryNativeLoader.java | 150 +++++++++++++++++-- .../shmem/IpcSharedMemoryServerEndpoint.java | 2 +- .../IpcSharedMemoryCrashDetectionSelfTest.java | 2 +- .../ipc/shmem/IpcSharedMemorySpaceSelfTest.java | 2 +- .../ipc/shmem/IpcSharedMemoryUtilsSelfTest.java | 2 +- .../LoadWithCorruptedLibFileTestRunner.java | 2 +- .../IpcSharedMemoryBenchmarkReader.java | 2 +- .../IpcSharedMemoryBenchmarkWriter.java | 2 +- .../hadoop/HadoopAbstractSelfTest.java | 1 + .../org/apache/ignite/spark/IgniteContext.scala | 19 ++- .../org/apache/ignite/spark/IgniteRDD.scala | 8 +- 13 files changed, 171 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 0932212..9016b10 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -9025,11 +9025,11 @@ public abstract class IgniteUtils { hasShmem = false; else { try { - IpcSharedMemoryNativeLoader.load(); + IpcSharedMemoryNativeLoader.load(null); hasShmem = true; } - catch (IgniteCheckedException e) { + catch (IgniteCheckedException ignore) { hasShmem = false; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java index 27a234f..c935c4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java @@ -112,7 +112,7 @@ public class IpcSharedMemoryClientEndpoint implements IpcEndpoint { boolean clear = true; try { - IpcSharedMemoryNativeLoader.load(); + IpcSharedMemoryNativeLoader.load(log); sock.connect(new InetSocketAddress("127.0.0.1", port), timeout); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java index dc00ca6..8c345f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.util.ipc.shmem; import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import java.io.*; @@ -25,6 +26,8 @@ import java.net.*; import java.nio.channels.*; import java.security.*; import java.util.*; +import java.util.jar.*; +import java.util.zip.*; import static org.apache.ignite.internal.IgniteVersionUtils.*; @@ -36,6 +39,9 @@ public class IpcSharedMemoryNativeLoader { /** Library name base. */ private static final String LIB_NAME_BASE = "igniteshmem"; + /** Library jar name base. */ + private static final String JAR_NAME_BASE = "shmem"; + /** Library name. */ static final String LIB_NAME = LIB_NAME_BASE + "-" + VER_STR; @@ -84,9 +90,10 @@ public class IpcSharedMemoryNativeLoader { } /** + * @param log Logger, if available. If null, warnings will be printed out to console. * @throws IgniteCheckedException If failed. */ - public static void load() throws IgniteCheckedException { + public static void load(IgniteLogger log) throws IgniteCheckedException { if (loaded) return; @@ -94,7 +101,7 @@ public class IpcSharedMemoryNativeLoader { if (loaded) return; - doLoad(); + doLoad(log); loaded = true; } @@ -103,7 +110,7 @@ public class IpcSharedMemoryNativeLoader { /** * @throws IgniteCheckedException If failed. */ - private static void doLoad() throws IgniteCheckedException { + private static void doLoad(IgniteLogger log) throws IgniteCheckedException { assert Thread.holdsLock(IpcSharedMemoryNativeLoader.class); Collection<Throwable> errs = new ArrayList<>(); @@ -124,7 +131,7 @@ public class IpcSharedMemoryNativeLoader { // Obtain lock on file to prevent concurrent extracts. try (RandomAccessFile randomAccessFile = new RandomAccessFile(lockFile, "rws"); - FileLock ignored = randomAccessFile.getChannel().lock()) { + FileLock ignored = randomAccessFile.getChannel().lock()) { if (extractAndLoad(errs, tmpDir, platformSpecificResourcePath())) return; @@ -134,6 +141,30 @@ public class IpcSharedMemoryNativeLoader { if (extractAndLoad(errs, tmpDir, resourcePath())) return; + try { + U.quietAndWarn(log, "Failed to load 'igniteshmem' library from classpath. Will try to load it from IGNITE_HOME."); + + String igniteHome = X.resolveIgniteHome(); + + File shmemJar = findShmemJar(errs, igniteHome); + + if (shmemJar != null) { + try (JarFile jar = new JarFile(shmemJar, false, JarFile.OPEN_READ)) { + if (extractAndLoad(errs, jar, tmpDir, platformSpecificResourcePath())) + return; + + if (extractAndLoad(errs, jar, tmpDir, osSpecificResourcePath())) + return; + + if (extractAndLoad(errs, jar, tmpDir, resourcePath())) + return; + } + } + } + catch (IgniteCheckedException ignore) { + + } + // Failed to find the library. assert !errs.isEmpty(); @@ -145,6 +176,32 @@ public class IpcSharedMemoryNativeLoader { } /** + * Tries to find shmem jar in IGNITE_HOME/libs folder. + * + * @param errs Collection of errors to add readable exception to. + * @param igniteHome Resolver IGNITE_HOME variable. + * @return File, if found. + */ + private static File findShmemJar(Collection<Throwable> errs, String igniteHome) { + File libs = new File(igniteHome, "libs"); + + if (!libs.exists() || libs.isFile()) { + errs.add(new IllegalStateException("Failed to find libs folder in resolved IGNITE_HOME: " + igniteHome)); + + return null; + } + + for (File lib : libs.listFiles()) { + if (lib.getName().endsWith(".jar") && lib.getName().contains(JAR_NAME_BASE)) + return lib; + } + + errs.add(new IllegalStateException("Failed to find shmem jar in resolved IGNITE_HOME: " + igniteHome)); + + return null; + } + + /** * Gets temporary directory unique for each OS user. * The directory guaranteed to exist, though may not be empty. */ @@ -220,6 +277,24 @@ public class IpcSharedMemoryNativeLoader { /** * @param errs Errors collection. + * @param rsrcPath Path. + * @return {@code True} if library was found and loaded. + */ + private static boolean extractAndLoad(Collection<Throwable> errs, JarFile jar, File tmpDir, String rsrcPath) { + ZipEntry rsrc = jar.getEntry(rsrcPath); + + if (rsrc != null) + return extract(errs, rsrc, jar, new File(tmpDir, mapLibraryName(LIB_NAME))); + else { + errs.add(new IllegalStateException("Failed to find resource within specified jar file " + + "[rsrc=" + rsrcPath + ", jar=" + jar.getName() + ']')); + + return false; + } + } + + /** + * @param errs Errors collection. * @param src Source. * @param target Target. * @return {@code True} if resource was found and loaded. @@ -230,7 +305,7 @@ public class IpcSharedMemoryNativeLoader { InputStream is = null; try { - if (!target.exists() || !haveEqualMD5(target, src)) { + if (!target.exists() || !haveEqualMD5(target, src.openStream())) { is = src.openStream(); if (is != null) { @@ -265,20 +340,69 @@ public class IpcSharedMemoryNativeLoader { } /** - * @param target Target. + * @param errs Errors collection. * @param src Source. + * @param target Target. + * @return {@code True} if resource was found and loaded. + */ + @SuppressWarnings("ResultOfMethodCallIgnored") + private static boolean extract(Collection<Throwable> errs, ZipEntry src, JarFile jar, File target) { + FileOutputStream os = null; + InputStream is = null; + + try { + if (!target.exists() || !haveEqualMD5(target, jar.getInputStream(src))) { + is = jar.getInputStream(src); + + if (is != null) { + os = new FileOutputStream(target); + + int read; + + byte[] buf = new byte[4096]; + + while ((read = is.read(buf)) != -1) + os.write(buf, 0, read); + } + } + + // chmod 775. + if (!U.isWindows()) + Runtime.getRuntime().exec(new String[] {"chmod", "775", target.getCanonicalPath()}).waitFor(); + + System.load(target.getPath()); + + return true; + } + catch (IOException | UnsatisfiedLinkError | InterruptedException | NoSuchAlgorithmException e) { + errs.add(e); + } + finally { + U.closeQuiet(os); + U.closeQuiet(is); + } + + return false; + } + + /** + * @param target Target. + * @param srcIS Source input stream. * @return {@code True} if target md5-sum equal to source md5-sum. * @throws NoSuchAlgorithmException If md5 algorithm was not found. * @throws IOException If an I/O exception occurs. */ - private static boolean haveEqualMD5(File target, URL src) throws NoSuchAlgorithmException, IOException { - try (InputStream targetIS = new FileInputStream(target); - InputStream srcIS = src.openStream()) { - - String targetMD5 = U.calculateMD5(targetIS); - String srcMD5 = U.calculateMD5(srcIS); + private static boolean haveEqualMD5(File target, InputStream srcIS) throws NoSuchAlgorithmException, IOException { + try { + try (InputStream targetIS = new FileInputStream(target)) { + String targetMD5 = U.calculateMD5(targetIS); + String srcMD5 = U.calculateMD5(srcIS); - return targetMD5.equals(srcMD5); + return targetMD5.equals(srcMD5); + } + } + finally { + srcIS.close(); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java index 5185856..102c5b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java @@ -146,7 +146,7 @@ public class IpcSharedMemoryServerEndpoint implements IpcServerEndpoint { /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { - IpcSharedMemoryNativeLoader.load(); + IpcSharedMemoryNativeLoader.load(log); pid = IpcSharedMemoryUtils.pid(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java index 2ddf6f3..c6f590e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java @@ -42,7 +42,7 @@ public class IpcSharedMemoryCrashDetectionSelfTest extends GridCommonAbstractTes @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); - IpcSharedMemoryNativeLoader.load(); + IpcSharedMemoryNativeLoader.load(log()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java index 7dc0870..4afb64b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java @@ -51,7 +51,7 @@ public class IpcSharedMemorySpaceSelfTest extends GridCommonAbstractTest { @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); - IpcSharedMemoryNativeLoader.load(); + IpcSharedMemoryNativeLoader.load(log()); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java index 4c5413c..176429e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java @@ -31,7 +31,7 @@ public class IpcSharedMemoryUtilsSelfTest extends GridCommonAbstractTest { @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); - IpcSharedMemoryNativeLoader.load(); + IpcSharedMemoryNativeLoader.load(log()); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java index 8ff827b..8fee239 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java @@ -37,7 +37,7 @@ public class LoadWithCorruptedLibFileTestRunner { createCorruptedLibFile(); - IpcSharedMemoryNativeLoader.load(); + IpcSharedMemoryNativeLoader.load(null); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java index 28495af..89eeda1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java @@ -43,7 +43,7 @@ public class IpcSharedMemoryBenchmarkReader implements IpcSharedMemoryBenchmarkP * @throws IgniteCheckedException If failed. */ public static void main(String[] args) throws IgniteCheckedException { - IpcSharedMemoryNativeLoader.load(); + IpcSharedMemoryNativeLoader.load(null); int nThreads = (args.length > 0 ? Integer.parseInt(args[0]) : 1); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java index 2ade145..e8a8402 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java @@ -42,7 +42,7 @@ public class IpcSharedMemoryBenchmarkWriter implements IpcSharedMemoryBenchmarkP * @throws IgniteCheckedException If failed. */ public static void main(String[] args) throws IgniteCheckedException { - IpcSharedMemoryNativeLoader.load(); + IpcSharedMemoryNativeLoader.load(null); int nThreads = args.length > 0 ? Integer.parseInt(args[0]) : 1; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java index 517a587..a3c9bde 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java @@ -22,6 +22,7 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.igfs.*; import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem; import org.apache.ignite.internal.processors.hadoop.fs.*; +import org.apache.ignite.spi.communication.tcp.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala index 5cdbad0..2cfebd6 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala @@ -34,8 +34,23 @@ import org.apache.spark.sql.SQLContext */ class IgniteContext[K, V]( @scala.transient val sparkContext: SparkContext, - cfgF: () â IgniteConfiguration + cfgF: () â IgniteConfiguration, + client: Boolean = true ) extends Serializable { + @scala.transient private val driver = true + + if (!client) { + val workers = sparkContext.getExecutorStorageStatus.length - 1 + + if (workers <= 0) + throw new IllegalStateException("No Spark executors found to start Ignite nodes.") + + println("Will start Ignite nodes on " + workers + " workers") + + // Start ignite server node on each worker in server mode. + sparkContext.parallelize(1 to workers, workers).foreach(it â ignite()) + } + def this( sc: SparkContext, springUrl: String @@ -62,7 +77,7 @@ class IgniteContext[K, V]( catch { case e: Exception â try { - igniteCfg.setClientMode(true) + igniteCfg.setClientMode(client || driver) Ignition.start(igniteCfg) } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala index 0b8e845..0d1a3be 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala @@ -114,7 +114,7 @@ class IgniteRDD[K, V] ( ic.sqlContext.createDataFrame(rowRdd, schema) } - def saveValues(rdd: RDD[V]) = { + def saveValues(rdd: RDD[V], overwrite: Boolean = false) = { rdd.foreachPartition(it â { val ig = ic.ignite() @@ -127,6 +127,8 @@ class IgniteRDD[K, V] ( val streamer = ig.dataStreamer[Object, V](cacheName) try { + streamer.allowOverwrite(overwrite) + it.foreach(value â { val key = affinityKeyFunc(value, node.orNull) @@ -139,7 +141,7 @@ class IgniteRDD[K, V] ( }) } - def savePairs(rdd: RDD[(K, V)]) = { + def savePairs(rdd: RDD[(K, V)], overwrite: Boolean = false) = { rdd.foreachPartition(it â { val ig = ic.ignite() @@ -149,6 +151,8 @@ class IgniteRDD[K, V] ( val streamer = ig.dataStreamer[K, V](cacheName) try { + streamer.allowOverwrite(overwrite) + it.foreach(tup â { streamer.addData(tup._1, tup._2) })