http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java index d9a3c59..f215efb 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.permission.*; +import org.apache.hadoop.security.*; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; @@ -43,6 +44,7 @@ import org.jsr166.*; import java.io.*; import java.lang.reflect.*; import java.net.*; +import java.security.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -72,6 +74,9 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA /** Secondary file system configuration path. */ private static final String SECONDARY_CFG_PATH = "/work/core-site-test.xml"; + /** Secondary file system user. */ + private static final String SECONDARY_FS_USER = "secondary-default"; + /** Secondary endpoint configuration. */ protected static final IgfsIpcEndpointConfiguration SECONDARY_ENDPOINT_CFG; @@ -85,7 +90,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA private static final int THREAD_CNT = 8; /** IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + private final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); /** Barrier for multithreaded tests. */ private static CyclicBarrier barrier; @@ -145,6 +150,14 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA endpoint = skipLocShmem ? "127.0.0.1:10500" : "shmem:10500"; } + /** + * Gets the user the Fs client operates on bahalf of. + * @return The user the Fs client operates on bahalf of. + */ + protected String getClientFsUser() { + return "foo"; + } + /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { Configuration secondaryConf = configuration(SECONDARY_AUTHORITY, true, true); @@ -235,7 +248,17 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA primaryFsCfg = configuration(PRIMARY_AUTHORITY, skipEmbed, skipLocShmem); - fs = FileSystem.get(primaryFsUri, primaryFsCfg); + UserGroupInformation clientUgi = UserGroupInformation.getBestUGI(null, getClientFsUser()); + assertNotNull(clientUgi); + + // Create the Fs on behalf of the specific user: + clientUgi.doAs(new PrivilegedExceptionAction<Object>() { + @Override public Object run() throws Exception { + fs = FileSystem.get(primaryFsUri, primaryFsCfg); + + return null; + } + }); barrier = new CyclicBarrier(THREAD_CNT); } @@ -324,7 +347,8 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA cfg.setDefaultMode(mode); if (mode != PRIMARY) - cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG_PATH)); + cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem( + SECONDARY_URI, SECONDARY_CFG_PATH, SECONDARY_FS_USER)); cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName)); @@ -870,6 +894,8 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA os.close(); + assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner()); + fs.setOwner(file, "aUser", "aGroup"); assertEquals("aUser", fs.getFileStatus(file).getOwner()); @@ -1001,19 +1027,19 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA int cnt = 2 * 1024; - FSDataOutputStream out = fs.create(file, true, 1024); - - for (long i = 0; i < cnt; i++) - out.writeLong(i); + try (FSDataOutputStream out = fs.create(file, true, 1024)) { - out.close(); + for (long i = 0; i < cnt; i++) + out.writeLong(i); + } - FSDataInputStream in = fs.open(file, 1024); + assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner()); - for (long i = 0; i < cnt; i++) - assertEquals(i, in.readLong()); + try (FSDataInputStream in = fs.open(file, 1024)) { - in.close(); + for (long i = 0; i < cnt; i++) + assertEquals(i, in.readLong()); + } } /** @throws Exception If failed. */ @@ -1344,7 +1370,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA String path = fs.getFileStatus(file).getPath().toString(); - assertTrue(path.endsWith("/user/" + System.getProperty("user.name", "anonymous") + "/file")); + assertTrue(path.endsWith("/user/" + getClientFsUser() + "/file")); } /** @throws Exception If failed. */ @@ -1374,7 +1400,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA public void testGetWorkingDirectoryIfDefault() throws Exception { String path = fs.getWorkingDirectory().toString(); - assertTrue(path.endsWith("/user/" + System.getProperty("user.name", "anonymous"))); + assertTrue(path.endsWith("/user/" + getClientFsUser())); } /** @throws Exception If failed. */ @@ -1412,17 +1438,20 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA @SuppressWarnings("OctalInteger") public void testMkdirs() throws Exception { Path fsHome = new Path(PRIMARY_URI); - Path dir = new Path(fsHome, "/tmp/staging"); - Path nestedDir = new Path(dir, "nested"); + final Path dir = new Path(fsHome, "/tmp/staging"); + final Path nestedDir = new Path(dir, "nested"); - FsPermission dirPerm = FsPermission.createImmutable((short)0700); - FsPermission nestedDirPerm = FsPermission.createImmutable((short)111); + final FsPermission dirPerm = FsPermission.createImmutable((short)0700); + final FsPermission nestedDirPerm = FsPermission.createImmutable((short)111); assertTrue(fs.mkdirs(dir, dirPerm)); assertTrue(fs.mkdirs(nestedDir, nestedDirPerm)); assertEquals(dirPerm, fs.getFileStatus(dir).getPermission()); assertEquals(nestedDirPerm, fs.getFileStatus(nestedDir).getPermission()); + + assertEquals(getClientFsUser(), fs.getFileStatus(dir).getOwner()); + assertEquals(getClientFsUser(), fs.getFileStatus(nestedDir).getOwner()); } /** @throws Exception If failed. */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java index b92b213..fcfd587 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java @@ -125,7 +125,7 @@ public class IgniteHadoopFileSystemClientSelfTest extends IgfsCommonAbstractTest try { switchHandlerErrorFlag(true); - HadoopIgfs client = new HadoopIgfsOutProc("127.0.0.1", 10500, getTestGridName(0), "igfs", LOG); + HadoopIgfs client = new HadoopIgfsOutProc("127.0.0.1", 10500, getTestGridName(0), "igfs", LOG, null); client.handshake(null); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java index e103c5f..2c17ba9 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java @@ -144,6 +144,8 @@ public class IgniteHadoopFileSystemIpcCacheSelfTest extends IgfsCommonAbstractTe Map<String, HadoopIgfsIpcIo> cache = (Map<String, HadoopIgfsIpcIo>)cacheField.get(null); + cache.clear(); // avoid influence of previous tests in the same process. + String name = "igfs:" + getTestGridName(0) + "@"; Configuration cfg = new Configuration(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java index af1a1e1..e8a0a6f 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java @@ -22,7 +22,6 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.igfs.*; import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem; import org.apache.ignite.internal.processors.hadoop.fs.*; -import org.apache.ignite.spi.communication.tcp.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; @@ -62,6 +61,17 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest { /** Initial REST port. */ private int restPort = REST_PORT; + /** Secondary file system REST endpoint configuration. */ + protected static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG; + + static { + SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration(); + + SECONDARY_REST_CFG.setType(IgfsIpcEndpointType.TCP); + SECONDARY_REST_CFG.setPort(11500); + } + + /** Initial classpath. */ private static String initCp; @@ -133,7 +143,7 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest { /** * @return IGFS configuration. */ - public FileSystemConfiguration igfsConfiguration() { + public FileSystemConfiguration igfsConfiguration() throws Exception { FileSystemConfiguration cfg = new FileSystemConfiguration(); cfg.setName(igfsName); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java index d10ee5c..c66cdf3 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java @@ -19,12 +19,16 @@ package org.apache.ignite.internal.processors.hadoop; import com.google.common.base.*; import org.apache.ignite.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.hadoop.fs.*; import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.processors.hadoop.jobtracker.*; +import org.apache.ignite.internal.processors.resource.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.testframework.junits.common.*; import org.jsr166.*; @@ -205,7 +209,15 @@ public class HadoopCommandLineTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { - igfs = (IgfsEx) Ignition.start("config/hadoop/default-config.xml").fileSystem(igfsName); + String cfgPath = "config/hadoop/default-config.xml"; + + IgniteBiTuple<IgniteConfiguration, GridSpringResourceContext> tup = IgnitionEx.loadConfiguration(cfgPath); + + IgniteConfiguration cfg = tup.get1(); + + cfg.setLocalHost("127.0.0.1"); // Avoid connecting to other nodes. + + igfs = (IgfsEx) Ignition.start(cfg).fileSystem(igfsName); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java index 8cf31a2..5f90bd4 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.*; * Test file systems for the working directory multi-threading support. */ public class HadoopFileSystemsTest extends HadoopAbstractSelfTest { + /** the number of threads */ private static final int THREAD_COUNT = 3; /** {@inheritDoc} */ @@ -87,10 +88,6 @@ public class HadoopFileSystemsTest extends HadoopAbstractSelfTest { try { int curThreadNum = threadNum.getAndIncrement(); - FileSystem fs = FileSystem.get(uri, cfg); - - HadoopFileSystemsUtils.setUser(fs, "user" + curThreadNum); - if ("file".equals(uri.getScheme())) FileSystem.get(uri, cfg).setWorkingDirectory(new Path("file:///user/user" + curThreadNum)); @@ -149,24 +146,6 @@ public class HadoopFileSystemsTest extends HadoopAbstractSelfTest { } /** - * Test IGFS multi-thread working directory. - * - * @throws Exception If fails. - */ - public void testIgfs() throws Exception { - testFileSystem(URI.create(igfsScheme())); - } - - /** - * Test HDFS multi-thread working directory. - * - * @throws Exception If fails. - */ - public void testHdfs() throws Exception { - testFileSystem(URI.create("hdfs://localhost/")); - } - - /** * Test LocalFS multi-thread working directory. * * @throws Exception If fails. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java index 8a3a0ac..66c14b5 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java @@ -24,31 +24,104 @@ import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.hadoop.fs.*; import org.apache.ignite.igfs.*; +import org.apache.ignite.igfs.secondary.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.processors.hadoop.examples.*; +import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.testframework.*; +import org.jetbrains.annotations.*; import java.io.*; import java.util.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.igfs.IgfsMode.*; import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; /** * Test of whole cycle of map-reduce processing via Job tracker. */ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { + /** IGFS block size. */ + protected static final int IGFS_BLOCK_SIZE = 512 * 1024; + + /** Amount of blocks to prefetch. */ + protected static final int PREFETCH_BLOCKS = 1; + + /** Amount of sequential block reads before prefetch is triggered. */ + protected static final int SEQ_READS_BEFORE_PREFETCH = 2; + + /** Secondary file system URI. */ + protected static final String SECONDARY_URI = "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/"; + + /** Secondary file system configuration path. */ + protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml"; + + /** The user to run Hadoop job on behalf of. */ + protected static final String USER = "vasya"; + + /** Secondary IGFS name. */ + protected static final String SECONDARY_IGFS_NAME = "igfs-secondary"; + + /** The secondary Ignite node. */ + protected Ignite igniteSecondary; + + /** The secondary Fs. */ + protected IgfsSecondaryFileSystem secondaryFs; + /** {@inheritDoc} */ @Override protected int gridCount() { return 3; } /** + * Gets owner of a IgfsEx path. + * @param p The path. + * @return The owner. + */ + private static String getOwner(IgfsEx i, IgfsPath p) { + return i.info(p).property(IgfsEx.PROP_USER_NAME); + } + + /** + * Gets owner of a secondary Fs path. + * @param secFs The sec Fs. + * @param p The path. + * @return The owner. + */ + private static String getOwnerSecondary(final IgfsSecondaryFileSystem secFs, final IgfsPath p) { + return IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() { + @Override public String apply() { + return secFs.info(p).property(IgfsEx.PROP_USER_NAME); + } + }); + } + + /** + * Checks owner of the path. + * @param p The path. + */ + private void checkOwner(IgfsPath p) { + String ownerPrim = getOwner(igfs, p); + assertEquals(USER, ownerPrim); + + String ownerSec = getOwnerSecondary(secondaryFs, p); + assertEquals(USER, ownerSec); + } + + /** * Tests whole job execution with all phases in all combination of new and old versions of API. * @throws Exception If fails. */ @@ -59,9 +132,14 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input"); - generateTestFile(inFile.toString(), "red", 100000, "blue", 200000, "green", 150000, "yellow", 70000 ); + final int red = 10_000; + final int blue = 20_000; + final int green = 15_000; + final int yellow = 7_000; - for (int i = 0; i < 8; i++) { + generateTestFile(inFile.toString(), "red", red, "blue", blue, "green", green, "yellow", yellow ); + + for (int i = 0; i < 3; i++) { igfs.delete(new IgfsPath(PATH_OUTPUT), true); boolean useNewMapper = (i & 1) == 0; @@ -71,7 +149,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { JobConf jobConf = new JobConf(); jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName()); - jobConf.setUser("yyy"); + jobConf.setUser(USER); jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz"); //To split into about 40 items for v2 @@ -105,13 +183,19 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { checkJobStatistics(jobId); + final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000"; + + checkOwner(new IgfsPath(PATH_OUTPUT + "/" + "_SUCCESS")); + + checkOwner(new IgfsPath(outFile)); + assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " + useNewReducer, - "blue\t200000\n" + - "green\t150000\n" + - "red\t100000\n" + - "yellow\t70000\n", - readAndSortFile(PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000") + "blue\t" + blue + "\n" + + "green\t" + green + "\n" + + "red\t" + red + "\n" + + "yellow\t" + yellow + "\n", + readAndSortFile(outFile) ); } } @@ -182,7 +266,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { } } - final IgfsPath statPath = new IgfsPath("/xxx/yyy/zzz/" + jobId + "/performance"); + final IgfsPath statPath = new IgfsPath("/xxx/" + USER + "/zzz/" + jobId + "/performance"); assert GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { @@ -212,4 +296,85 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { ", actual=" + HadoopTestUtils.simpleCheckJobStatFile(reader) + ']'; } } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + igniteSecondary = startGridWithIgfs("grid-secondary", SECONDARY_IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG); + + super.beforeTest(); + } + + /** + * Start grid with IGFS. + * + * @param gridName Grid name. + * @param igfsName IGFS name + * @param mode IGFS mode. + * @param secondaryFs Secondary file system (optional). + * @param restCfg Rest configuration string (optional). + * @return Started grid instance. + * @throws Exception If failed. + */ + protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode, + @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception { + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); + + igfsCfg.setDataCacheName("dataCache"); + igfsCfg.setMetaCacheName("metaCache"); + igfsCfg.setName(igfsName); + igfsCfg.setBlockSize(IGFS_BLOCK_SIZE); + igfsCfg.setDefaultMode(mode); + igfsCfg.setIpcEndpointConfiguration(restCfg); + igfsCfg.setSecondaryFileSystem(secondaryFs); + igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS); + igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH); + + CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); + + dataCacheCfg.setName("dataCache"); + dataCacheCfg.setCacheMode(PARTITIONED); + dataCacheCfg.setNearConfiguration(null); + dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2)); + dataCacheCfg.setBackups(0); + dataCacheCfg.setAtomicityMode(TRANSACTIONAL); + dataCacheCfg.setOffHeapMaxMemory(0); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("metaCache"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridName(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); + cfg.setFileSystemConfiguration(igfsCfg); + + cfg.setLocalHost("127.0.0.1"); + cfg.setConnectorConfiguration(null); + + return G.start(cfg); + } + + /** + * @return IGFS configuration. + */ + @Override public FileSystemConfiguration igfsConfiguration() throws Exception { + FileSystemConfiguration fsCfg = super.igfsConfiguration(); + + secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG); + + fsCfg.setSecondaryFileSystem(secondaryFs); + + return fsCfg; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java index 8dc9830..eee5c8b 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java @@ -72,7 +72,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest { /** {@inheritDoc} */ - @Override public FileSystemConfiguration igfsConfiguration() { + @Override public FileSystemConfiguration igfsConfiguration() throws Exception { FileSystemConfiguration cfg = super.igfsConfiguration(); cfg.setFragmentizerEnabled(false); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java index aaf0f92..6930020 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java @@ -22,7 +22,6 @@ import org.apache.hadoop.io.*; import org.apache.ignite.*; import org.apache.ignite.igfs.*; import org.apache.ignite.internal.processors.hadoop.examples.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; import java.io.*; import java.net.*; @@ -43,7 +42,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { * @return Hadoop job. * @throws IOException If fails. */ - public abstract HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception; + public abstract HadoopJob getHadoopJob(String inFile, String outFile) throws Exception; /** * @return prefix of reducer output file name. It's "part-" for v1 and "part-r-" for v2 API @@ -79,7 +78,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, fileBlock1.length(), igfs.info(inFile).length() - fileBlock1.length()); - HadoopV2Job gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT); + HadoopJob gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT); HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock1); @@ -110,7 +109,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { * @return Context with mock output. * @throws IgniteCheckedException If fails. */ - private HadoopTestTaskContext runTaskWithInput(HadoopV2Job gridJob, HadoopTaskType taskType, + private HadoopTestTaskContext runTaskWithInput(HadoopJob gridJob, HadoopTaskType taskType, int taskNum, String... words) throws IgniteCheckedException { HadoopTaskInfo taskInfo = new HadoopTaskInfo(taskType, gridJob.id(), taskNum, 0, null); @@ -136,7 +135,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { * @throws Exception If fails. */ public void testReduceTask() throws Exception { - HadoopV2Job gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT); + HadoopJob gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT); runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 0, "word1", "5", "word2", "10"); runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 1, "word3", "7", "word4", "15"); @@ -162,7 +161,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { * @throws Exception If fails. */ public void testCombinerTask() throws Exception { - HadoopV2Job gridJob = getHadoopJob("/", "/"); + HadoopJob gridJob = getHadoopJob("/", "/"); HadoopTestTaskContext ctx = runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 0, "word1", "5", "word2", "10"); @@ -182,7 +181,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { * @return Context of combine task with mock output. * @throws IgniteCheckedException If fails. */ - private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopV2Job gridJob) + private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopJob gridJob) throws IgniteCheckedException { HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock); @@ -228,7 +227,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, l); HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, l, fileLen - l); - HadoopV2Job gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT); + HadoopJob gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT); HadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, gridJob); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java index b41a260..48e83cc 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.hadoop.mapred.*; import org.apache.ignite.internal.processors.hadoop.examples.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; import java.io.*; import java.util.*; @@ -38,7 +37,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest { * @return Hadoop job. * @throws IOException If fails. */ - @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception { + @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception { JobConf jobConf = HadoopWordCount1.getJob(inFile, outFile); setupFileSystems(jobConf); @@ -47,7 +46,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest { HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0); - return new HadoopV2Job(jobId, jobInfo, log); + return jobInfo.createJob(jobId, log); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java index b677c63..e73fae3 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java @@ -24,7 +24,6 @@ import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.ignite.internal.processors.hadoop.examples.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; import java.util.*; @@ -42,7 +41,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest { * @return Hadoop job. * @throws Exception if fails. */ - @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception { + @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception { Job job = Job.getInstance(); job.setOutputKeyClass(Text.class); @@ -65,7 +64,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest { HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0); - return new HadoopV2Job(jobId, jobInfo, log); + return jobInfo.createJob(jobId, log); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java index ebc89f4..f3b9307 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java @@ -66,7 +66,11 @@ public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest { cfg.setMapOutputValueClass(Text.class); cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName()); - HadoopJob job = new HadoopV2Job(new HadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log); + HadoopDefaultJobInfo info = createJobInfo(cfg); + + HadoopJobId id = new HadoopJobId(UUID.randomUUID(), 1); + + HadoopJob job = info.createJob(id, log); HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0, null)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java index b4ed5e1..9395c5e 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java @@ -28,6 +28,7 @@ import org.apache.ignite.testframework.junits.common.*; import org.jetbrains.annotations.*; import java.util.*; +import java.util.concurrent.*; /** * Abstract class for maps test. @@ -95,9 +96,20 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest { assert false; } + /** {@inheritDoc} */ @Override public void cleanupTaskEnvironment() throws IgniteCheckedException { assert false; } + + /** {@inheritDoc} */ + @Override public <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException { + try { + return c.call(); + } + catch (Exception e) { + throw new IgniteCheckedException(e); + } + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMapSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMapSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMapSelfTest.java index 5b1b6a8..f28bfd9 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMapSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMapSelfTest.java @@ -32,7 +32,9 @@ import java.util.concurrent.*; */ public class HadoopHashMapSelfTest extends HadoopAbstractMapTest { - public void _testAllocation() throws Exception { + public void testAllocation() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-826"); + final GridUnsafeMemory mem = new GridUnsafeMemory(0); long size = 3L * 1024 * 1024 * 1024; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java index 8a046e0..89bf830 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java @@ -61,10 +61,10 @@ public class HadoopSkipListSelfTest extends HadoopAbstractMapTest { int sigma = max((int)ceil(precission * exp), 5); - X.println("Level: " + level + " exp: " + exp + " act: " + levelsCnts[level] + " precission: " + precission + + X.println("Level: " + level + " exp: " + exp + " act: " + levelsCnts[level] + " precision: " + precission + " sigma: " + sigma); - assertTrue(abs(exp - levelsCnts[level]) <= sigma); + assertTrue(abs(exp - levelsCnts[level]) <= sigma); // Sometimes fails. } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java index 0b7ac15..b712636 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java @@ -47,6 +47,8 @@ public class HadoopExternalTaskExecutionSelfTest extends HadoopAbstractSelfTest /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-404"); + startGrids(gridCount()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java index 45fb3db..625b265 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java @@ -33,6 +33,11 @@ import java.util.concurrent.*; * Tests Hadoop external communication component. */ public class HadoopExternalCommunicationSelfTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-404"); + } + /** * @throws Exception If failed. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index 179f7f0..4be5d72 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -26,6 +26,8 @@ import org.apache.ignite.igfs.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.shuffle.collections.*; import org.apache.ignite.internal.processors.hadoop.shuffle.streams.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -106,9 +108,8 @@ public class IgniteHadoopTestSuite extends TestSuite { suite.addTest(new TestSuite(ldr.loadClass(HadoopSortingTest.class.getName()))); - // TODO: IGNITE-404: Uncomment when fixed. - //suite.addTest(new TestSuite(ldr.loadClass(HadoopExternalTaskExecutionSelfTest.class.getName()))); - //suite.addTest(new TestSuite(ldr.loadClass(HadoopExternalCommunicationSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopExternalTaskExecutionSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopExternalCommunicationSelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopSortingExternalTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopGroupingTest.class.getName()))); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheHibernateStoreSessionListener.java ---------------------------------------------------------------------- diff --git a/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheHibernateStoreSessionListener.java b/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheHibernateStoreSessionListener.java new file mode 100644 index 0000000..cfad322 --- /dev/null +++ b/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheHibernateStoreSessionListener.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.hibernate; + +import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lifecycle.*; +import org.apache.ignite.resources.*; +import org.hibernate.*; +import org.hibernate.cfg.*; + +import javax.cache.integration.*; +import java.io.*; +import java.net.*; + +/** + * Hibernate-based cache store session listener. + * <p> + * This listener creates a new Hibernate session for each store + * session. If there is an ongoing cache transaction, a corresponding + * Hibernate transaction is created as well. + * <p> + * The Hibernate session is saved as a store session + * {@link CacheStoreSession#attachment() attachment}. + * The listener guarantees that the session will be + * available for any store operation. If there is an + * ongoing cache transaction, all operations within this + * transaction will share a DB transaction. + * <p> + * As an example, here is how the {@link CacheStore#write(javax.cache.Cache.Entry)} + * method can be implemented if {@link CacheHibernateStoreSessionListener} + * is configured: + * <pre name="code" class="java"> + * private static class Store extends CacheStoreAdapter<Integer, Integer> { + * @CacheStoreSessionResource + * private CacheStoreSession ses; + * + * @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) throws CacheWriterException { + * // Get Hibernate session from the current store session. + * Session hibSes = ses.attachment(); + * + * // Persist the value. + * hibSes.persist(entry.getValue()); + * } + * } + * </pre> + * Hibernate session will be automatically created by the listener + * at the start of the session and closed when it ends. + * <p> + * {@link CacheHibernateStoreSessionListener} requires that either + * {@link #setSessionFactory(SessionFactory)} session factory} + * or {@link #setHibernateConfigurationPath(String) Hibernate configuration file} + * is provided. If non of them is set, exception is thrown. Is both are provided, + * session factory will be used. + */ +public class CacheHibernateStoreSessionListener implements CacheStoreSessionListener, LifecycleAware { + /** Hibernate session factory. */ + private SessionFactory sesFactory; + + /** Hibernate configuration file path. */ + private String hibernateCfgPath; + + /** Logger. */ + @LoggerResource + private IgniteLogger log; + + /** Whether to close session on stop. */ + private boolean closeSesOnStop; + + /** + * Sets Hibernate session factory. + * <p> + * Either session factory or configuration file is required. + * If none is provided, exception will be thrown on startup. + * + * @param sesFactory Session factory. + */ + public void setSessionFactory(SessionFactory sesFactory) { + this.sesFactory = sesFactory; + } + + /** + * Gets Hibernate session factory. + * + * @return Session factory. + */ + public SessionFactory getSessionFactory() { + return sesFactory; + } + + /** + * Sets hibernate configuration path. + * <p> + * Either session factory or configuration file is required. + * If none is provided, exception will be thrown on startup. + * + * @param hibernateCfgPath Hibernate configuration path. + */ + public void setHibernateConfigurationPath(String hibernateCfgPath) { + this.hibernateCfgPath = hibernateCfgPath; + } + + /** + * Gets hibernate configuration path. + * + * @return Hibernate configuration path. + */ + public String getHibernateConfigurationPath() { + return hibernateCfgPath; + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public void start() throws IgniteException { + if (sesFactory == null && F.isEmpty(hibernateCfgPath)) + throw new IgniteException("Either session factory or Hibernate configuration file is required by " + + getClass().getSimpleName() + '.'); + + if (!F.isEmpty(hibernateCfgPath)) { + if (sesFactory == null) { + try { + URL url = new URL(hibernateCfgPath); + + sesFactory = new Configuration().configure(url).buildSessionFactory(); + } + catch (MalformedURLException ignored) { + // No-op. + } + + if (sesFactory == null) { + File cfgFile = new File(hibernateCfgPath); + + if (cfgFile.exists()) + sesFactory = new Configuration().configure(cfgFile).buildSessionFactory(); + } + + if (sesFactory == null) + sesFactory = new Configuration().configure(hibernateCfgPath).buildSessionFactory(); + + if (sesFactory == null) + throw new IgniteException("Failed to resolve Hibernate configuration file: " + hibernateCfgPath); + + closeSesOnStop = true; + } + else + U.warn(log, "Hibernate configuration file configured in " + getClass().getSimpleName() + + " will be ignored (session factory is already set)."); + } + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + if (closeSesOnStop && sesFactory != null && !sesFactory.isClosed()) + sesFactory.close(); + } + + /** {@inheritDoc} */ + @Override public void onSessionStart(CacheStoreSession ses) { + if (ses.attachment() == null) { + try { + Session hibSes = sesFactory.openSession(); + + ses.attach(hibSes); + + if (ses.isWithinTransaction()) + hibSes.beginTransaction(); + } + catch (HibernateException e) { + throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e); + } + } + } + + /** {@inheritDoc} */ + @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) { + Session hibSes = ses.attach(null); + + if (hibSes != null) { + try { + Transaction tx = hibSes.getTransaction(); + + if (commit) { + hibSes.flush(); + + if (tx.isActive()) + tx.commit(); + } + else if (tx.isActive()) + tx.rollback(); + } + catch (HibernateException e) { + throw new CacheWriterException("Failed to end store session [tx=" + ses.transaction() + ']', e); + } + finally { + hibSes.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hibernate/src/test/java/org/apache/ignite/cache/hibernate/HibernateL2CacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hibernate/src/test/java/org/apache/ignite/cache/hibernate/HibernateL2CacheSelfTest.java b/modules/hibernate/src/test/java/org/apache/ignite/cache/hibernate/HibernateL2CacheSelfTest.java index 63cb3ac..bb5884a 100644 --- a/modules/hibernate/src/test/java/org/apache/ignite/cache/hibernate/HibernateL2CacheSelfTest.java +++ b/modules/hibernate/src/test/java/org/apache/ignite/cache/hibernate/HibernateL2CacheSelfTest.java @@ -51,6 +51,11 @@ import static org.hibernate.cfg.Environment.*; * Tests Hibernate L2 cache. */ public class HibernateL2CacheSelfTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-591"); + } + /** */ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hibernate/src/test/java/org/apache/ignite/cache/hibernate/HibernateL2CacheTransactionalSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hibernate/src/test/java/org/apache/ignite/cache/hibernate/HibernateL2CacheTransactionalSelfTest.java b/modules/hibernate/src/test/java/org/apache/ignite/cache/hibernate/HibernateL2CacheTransactionalSelfTest.java index efc6f3e..c429d9a 100644 --- a/modules/hibernate/src/test/java/org/apache/ignite/cache/hibernate/HibernateL2CacheTransactionalSelfTest.java +++ b/modules/hibernate/src/test/java/org/apache/ignite/cache/hibernate/HibernateL2CacheTransactionalSelfTest.java @@ -41,6 +41,11 @@ import java.util.*; * to used the same TransactionManager). */ public class HibernateL2CacheTransactionalSelfTest extends HibernateL2CacheSelfTest { + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-591"); + } + /** */ private static Jotm jotm; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheHibernateStoreSessionListenerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheHibernateStoreSessionListenerSelfTest.java b/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheHibernateStoreSessionListenerSelfTest.java new file mode 100644 index 0000000..c30e216 --- /dev/null +++ b/modules/hibernate/src/test/java/org/apache/ignite/cache/store/hibernate/CacheHibernateStoreSessionListenerSelfTest.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.hibernate; + +import org.apache.ignite.cache.store.*; +import org.apache.ignite.cache.store.jdbc.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.hibernate.*; +import org.hibernate.cfg.Configuration; + +import javax.cache.Cache; +import javax.cache.configuration.*; +import javax.cache.integration.*; +import javax.persistence.*; +import java.io.*; +import java.util.*; + +/** + * Tests for {@link CacheJdbcStoreSessionListener}. + */ +public class CacheHibernateStoreSessionListenerSelfTest extends CacheStoreSessionListenerAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected Factory<? extends CacheStore<Integer, Integer>> storeFactory() { + return new Factory<CacheStore<Integer, Integer>>() { + @Override public CacheStore<Integer, Integer> create() { + return new Store(); + } + }; + } + + /** {@inheritDoc} */ + @Override protected Factory<CacheStoreSessionListener> sessionListenerFactory() { + return new Factory<CacheStoreSessionListener>() { + @Override public CacheStoreSessionListener create() { + CacheHibernateStoreSessionListener lsnr = new CacheHibernateStoreSessionListener(); + + SessionFactory sesFactory = new Configuration(). + setProperty("hibernate.connection.url", URL). + addAnnotatedClass(Table1.class). + addAnnotatedClass(Table2.class). + buildSessionFactory(); + + lsnr.setSessionFactory(sesFactory); + + return lsnr; + } + }; + } + + /** + */ + private static class Store extends CacheStoreAdapter<Integer, Integer> { + /** */ + private static String SES_CONN_KEY = "ses_conn"; + + /** */ + @CacheStoreSessionResource + private CacheStoreSession ses; + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<Integer, Integer> clo, Object... args) { + loadCacheCnt.incrementAndGet(); + + checkSession(); + } + + /** {@inheritDoc} */ + @Override public Integer load(Integer key) throws CacheLoaderException { + loadCnt.incrementAndGet(); + + checkSession(); + + return null; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) + throws CacheWriterException { + writeCnt.incrementAndGet(); + + checkSession(); + + if (write.get()) { + Session hibSes = ses.attachment(); + + switch (ses.cacheName()) { + case "cache1": + hibSes.save(new Table1(entry.getKey(), entry.getValue())); + + break; + + case "cache2": + if (fail.get()) + throw new CacheWriterException("Expected failure."); + + hibSes.save(new Table2(entry.getKey(), entry.getValue())); + + break; + + default: + throw new CacheWriterException("Wring cache: " + ses.cacheName()); + } + } + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + deleteCnt.incrementAndGet(); + + checkSession(); + } + + /** {@inheritDoc} */ + @Override public void sessionEnd(boolean commit) { + assertNull(ses.attachment()); + } + + /** + */ + private void checkSession() { + Session hibSes = ses.attachment(); + + assertNotNull(hibSes); + + assertTrue(hibSes.isOpen()); + + Transaction tx = hibSes.getTransaction(); + + assertNotNull(tx); + + if (ses.isWithinTransaction()) + assertTrue(tx.isActive()); + else + assertFalse(tx.isActive()); + + verifySameInstance(hibSes); + } + + /** + * @param hibSes Session. + */ + private void verifySameInstance(Session hibSes) { + Map<String, Session> props = ses.properties(); + + Session sesConn = props.get(SES_CONN_KEY); + + if (sesConn == null) + props.put(SES_CONN_KEY, hibSes); + else { + assertSame(hibSes, sesConn); + + reuseCnt.incrementAndGet(); + } + } + } + + /** + */ + @Entity + @Table(name = "Table1") + private static class Table1 implements Serializable { + /** */ + @Id @GeneratedValue + @Column(name = "id") + private Integer id; + + /** */ + @Column(name = "key") + private int key; + + /** */ + @Column(name = "value") + private int value; + + /** + * @param key Key. + * @param value Value. + */ + private Table1(int key, int value) { + this.key = key; + this.value = value; + } + } + + /** + */ + @Entity + @Table(name = "Table2") + private static class Table2 implements Serializable { + /** */ + @Id @GeneratedValue + @Column(name = "id") + private Integer id; + + /** */ + @Column(name = "key") + private int key; + + /** */ + @Column(name = "value") + private int value; + + /** + * @param key Key. + * @param value Value. + */ + private Table2(int key, int value) { + this.key = key; + this.value = value; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/hibernate/src/test/java/org/apache/ignite/testsuites/IgniteHibernateTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hibernate/src/test/java/org/apache/ignite/testsuites/IgniteHibernateTestSuite.java b/modules/hibernate/src/test/java/org/apache/ignite/testsuites/IgniteHibernateTestSuite.java index da741f8..3ac5ec4 100644 --- a/modules/hibernate/src/test/java/org/apache/ignite/testsuites/IgniteHibernateTestSuite.java +++ b/modules/hibernate/src/test/java/org/apache/ignite/testsuites/IgniteHibernateTestSuite.java @@ -33,14 +33,16 @@ public class IgniteHibernateTestSuite extends TestSuite { TestSuite suite = new TestSuite("Hibernate Integration Test Suite"); // Hibernate L2 cache. -// suite.addTestSuite(HibernateL2CacheSelfTest.class); // TODO GG-9141 -// suite.addTestSuite(HibernateL2CacheTransactionalSelfTest.class); + suite.addTestSuite(HibernateL2CacheSelfTest.class); + suite.addTestSuite(HibernateL2CacheTransactionalSelfTest.class); suite.addTestSuite(HibernateL2CacheConfigurationSelfTest.class); suite.addTestSuite(CacheHibernateBlobStoreSelfTest.class); suite.addTestSuite(CacheHibernateBlobStoreNodeRestartTest.class); + suite.addTestSuite(CacheHibernateStoreSessionListenerSelfTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java index bf335fa..59e1120 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java @@ -113,7 +113,9 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void _testTwoStep() throws Exception { + public void testTwoStep() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-827"); + String cache = "partitioned"; GridQueryProcessor qryProc = ((IgniteKernal) ignite).context().query(); @@ -247,7 +249,9 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { // return 10 * 60 * 1000; // } - public void _testLoop() throws Exception { + public void testLoop() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-827"); + final IgniteCache<Object,Object> c = ignite.cache("partitioned"); X.println("___ GET READY"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapIndexEntryEvictTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapIndexEntryEvictTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapIndexEntryEvictTest.java new file mode 100644 index 0000000..8afd746 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapIndexEntryEvictTest.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.spi.swapspace.file.*; +import org.apache.ignite.testframework.junits.common.*; + +import javax.cache.*; +import java.io.*; +import java.util.*; +import java.util.concurrent.locks.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMemoryMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * + */ +public class GridCacheOffheapIndexEntryEvictTest extends GridCommonAbstractTest { + /** */ + private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + cfg.setNetworkTimeout(2000); + + cfg.setSwapSpaceSpi(new FileSwapSpaceSpi()); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setBackups(1); + cacheCfg.setOffHeapMaxMemory(0); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + cacheCfg.setMemoryMode(OFFHEAP_TIERED); + cacheCfg.setEvictionPolicy(null); + cacheCfg.setSqlOnheapRowCacheSize(10); + cacheCfg.setIndexedTypes(Integer.class, TestValue.class); + cacheCfg.setNearConfiguration(null); + + cfg.setCacheConfiguration(cacheCfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrids(1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testQueryWhenLocked() throws Exception { + IgniteCache<Integer, TestValue> cache = grid(0).cache(null); + + List<Lock> locks = new ArrayList<>(); + + final int ENTRIES = 1000; + + try { + for (int i = 0; i < ENTRIES; i++) { + cache.put(i, new TestValue(i)); + + Lock lock = cache.lock(i); + + lock.lock(); // Lock entry so that it should not be evicted. + + locks.add(lock); + + for (int j = 0; j < 3; j++) + assertNotNull(cache.get(i)); + } + + checkQuery(cache, "_key >= 100", ENTRIES - 100); + } + finally { + for (Lock lock : locks) + lock.unlock(); + } + } + + /** + * @throws Exception If failed. + */ + public void testUpdates() throws Exception { + final int ENTRIES = 500; + + IgniteCache<Integer, TestValue> cache = grid(0).cache(null); + + for (int i = 0; i < ENTRIES; i++) { + for (int j = 0; j < 3; j++) { + cache.getAndPut(i, new TestValue(i)); + + assertNotNull(cache.get(i)); + + assertNotNull(cache.localPeek(i)); + } + + checkQuery(cache, "_key >= 0", i + 1); + } + + for (int i = 0; i < ENTRIES; i++) { + if (i % 2 == 0) + cache.getAndRemove(i); + else + cache.remove(i); + + checkQuery(cache, "_key >= 0", ENTRIES - (i + 1)); + } + } + + /** + * @param cache Cache. + * @param sql Query. + * @param expCnt Number of expected entries. + */ + private void checkQuery(IgniteCache<Integer, TestValue> cache, String sql, int expCnt) { + SqlQuery<Integer, TestValue> qry = new SqlQuery<>(TestValue.class, sql); + + List<Cache.Entry<Integer, TestValue>> res = cache.query(qry).getAll(); + + assertEquals(expCnt, res.size()); + + for (Cache.Entry<Integer, TestValue> e : res) { + assertNotNull(e.getKey()); + + assertEquals((int)e.getKey(), e.getValue().val); + } + } + + /** + * + */ + static class TestValue implements Externalizable { + /** */ + private int val; + + /** + * + */ + public TestValue() { + // No-op. + } + + /** + * @param val Value. + */ + public TestValue(int val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(val); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + val = in.readInt(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapIndexGetSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapIndexGetSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapIndexGetSelfTest.java index 4e40040..4e613ae 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapIndexGetSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapIndexGetSelfTest.java @@ -18,12 +18,19 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cache.query.annotations.*; import org.apache.ignite.configuration.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.spi.swapspace.file.*; import org.apache.ignite.testframework.junits.common.*; +import org.apache.ignite.transactions.*; + +import javax.cache.*; +import java.io.*; +import java.util.*; import static org.apache.ignite.cache.CacheAtomicityMode.*; import static org.apache.ignite.cache.CacheMemoryMode.*; @@ -67,8 +74,7 @@ public class GridCacheOffheapIndexGetSelfTest extends GridCommonAbstractTest { cacheCfg.setAtomicityMode(TRANSACTIONAL); cacheCfg.setMemoryMode(OFFHEAP_TIERED); cacheCfg.setEvictionPolicy(null); - cacheCfg.setOffHeapMaxMemory(OFFHEAP_MEM); - cacheCfg.setIndexedTypes(Long.class, Long.class); + cacheCfg.setIndexedTypes(Long.class, Long.class, String.class, TestEntity.class); cfg.setCacheConfiguration(cacheCfg); @@ -98,8 +104,6 @@ public class GridCacheOffheapIndexGetSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testGet() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-873"); - IgniteCache<Long, Long> cache = grid(0).cache(null); for (long i = 0; i < 100; i++) @@ -107,5 +111,73 @@ public class GridCacheOffheapIndexGetSelfTest extends GridCommonAbstractTest { for (long i = 0; i < 100; i++) assertEquals((Long)i, cache.get(i)); + + SqlQuery<Long, Long> qry = new SqlQuery<>(Long.class, "_val >= 90"); + + List<Cache.Entry<Long, Long>> res = cache.query(qry).getAll(); + + assertEquals(10, res.size()); + + for (Cache.Entry<Long, Long> e : res) { + assertNotNull(e.getKey()); + assertNotNull(e.getValue()); + } + } + + /** + * @throws Exception If failed. + */ + public void testPutGet() throws Exception { + IgniteCache<Object, Object> cache = grid(0).cache(null); + + Map map = new HashMap(); + + try (Transaction tx = grid(0).transactions().txStart(TransactionConcurrency.PESSIMISTIC, + TransactionIsolation.REPEATABLE_READ, 100000, 1000)) { + + for (int i = 4; i < 400; i++) { + map.put("key" + i, new TestEntity("value")); + map.put(i, "value"); + } + + cache.putAll(map); + + tx.commit(); + } + + for (int i = 0; i < 100; i++) { + cache.get("key" + i); + cache.get(i); + } + } + + /** + * Test entry class. + */ + private static class TestEntity implements Serializable { + /** Value. */ + @QuerySqlField(index = true) + private String val; + + /** + * @param value Value. + */ + public TestEntity(String value) { + this.val = value; + } + + /** + * @return Value. + */ + public String getValue() { + return val; + } + + /** + * @param val Value + */ + public void setValue(String val) { + this.val = val; + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryMetricsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryMetricsSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryMetricsSelfTest.java index 81c2820..24011b4 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryMetricsSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheQueryMetricsSelfTest.java @@ -62,26 +62,35 @@ public class GridCacheQueryMetricsSelfTest extends GridCommonAbstractTest { cfg.setDiscoverySpi(disco); - CacheConfiguration cacheCfg = defaultCacheConfiguration(); + CacheConfiguration<String, Integer> cacheCfg1 = defaultCacheConfiguration(); - cacheCfg.setCacheMode(CACHE_MODE); - cacheCfg.setWriteSynchronizationMode(FULL_SYNC); - cacheCfg.setIndexedTypes(String.class, Integer.class); + cacheCfg1.setName("A"); + cacheCfg1.setCacheMode(CACHE_MODE); + cacheCfg1.setWriteSynchronizationMode(FULL_SYNC); + cacheCfg1.setIndexedTypes(String.class, Integer.class); - cfg.setCacheConfiguration(cacheCfg); + CacheConfiguration<String, Integer> cacheCfg2 = defaultCacheConfiguration(); + + cacheCfg2.setName("B"); + cacheCfg2.setCacheMode(CACHE_MODE); + cacheCfg2.setWriteSynchronizationMode(FULL_SYNC); + cacheCfg2.setIndexedTypes(String.class, Integer.class); + + cfg.setCacheConfiguration(cacheCfg1, cacheCfg2); return cfg; } /** - * JUnit. + * Test metrics for SQL queries. * * @throws Exception In case of error. */ - public void testAccumulativeMetrics() throws Exception { - IgniteCache<String, Integer> cache = grid(0).cache(null); + public void testSqlFieldsQueryMetrics() throws Exception { + IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A"); - SqlQuery<String, Integer> qry = new SqlQuery(Integer.class, "_val >= 0"); + // Execute query. + SqlFieldsQuery qry = new SqlFieldsQuery("select * from Integer"); cache.query(qry).getAll(); @@ -114,20 +123,22 @@ public class GridCacheQueryMetricsSelfTest extends GridCommonAbstractTest { } /** - * JUnit. + * Test metrics for Scan queries. * * @throws Exception In case of error. */ - public void testSingleQueryMetrics() throws Exception { - IgniteCache<String, Integer> cache = grid(0).cache(null); + public void testScanQueryMetrics() throws Exception { + IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A"); - SqlQuery<String, Integer> qry = new SqlQuery(Integer.class, "_val >= 0"); + // Execute query. + ScanQuery<String, Integer> qry = new ScanQuery<>(); - // Execute. cache.query(qry).getAll(); QueryMetrics m = cache.queryMetrics(); + assert m != null; + info("Metrics: " + m); assertEquals(1, m.executions()); @@ -136,11 +147,54 @@ public class GridCacheQueryMetricsSelfTest extends GridCommonAbstractTest { assertTrue(m.maximumTime() >= 0); assertTrue(m.minimumTime() >= 0); - // Execute. + // Execute again with the same parameters. cache.query(qry).getAll(); m = cache.queryMetrics(); + assert m != null; + + info("Metrics: " + m); + + assertEquals(2, m.executions()); + assertEquals(0, m.fails()); + assertTrue(m.averageTime() >= 0); + assertTrue(m.maximumTime() >= 0); + assertTrue(m.minimumTime() >= 0); + } + + /** + * Test metrics for SQL cross cache queries. + * + * @throws Exception In case of error. + */ + public void testSqlCrossCacheQueryMetrics() throws Exception { + IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A"); + + // Execute query. + SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"B\".Integer"); + + cache.query(qry).getAll(); + + QueryMetrics m = cache.queryMetrics(); + + assert m != null; + + info("Metrics: " + m); + + assertEquals(1, m.executions()); + assertEquals(0, m.fails()); + assertTrue(m.averageTime() >= 0); + assertTrue(m.maximumTime() >= 0); + assertTrue(m.minimumTime() >= 0); + + // Execute again with the same parameters. + cache.query(qry).getAll(); + + m = cache.queryMetrics(); + + assert m != null; + info("Metrics: " + m); assertEquals(2, m.executions()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridIndexingWithNoopSwapSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridIndexingWithNoopSwapSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridIndexingWithNoopSwapSelfTest.java index 2c0962b..5b623da 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridIndexingWithNoopSwapSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridIndexingWithNoopSwapSelfTest.java @@ -64,7 +64,11 @@ public class GridIndexingWithNoopSwapSelfTest extends GridCommonAbstractTest { cc.setRebalanceMode(SYNC); cc.setSwapEnabled(true); cc.setNearConfiguration(new NearCacheConfiguration()); - cc.setEvictionPolicy(new FifoEvictionPolicy(1000)); + + FifoEvictionPolicy plc = new FifoEvictionPolicy(); + plc.setMaxSize(1000); + + cc.setEvictionPolicy(plc); cc.setBackups(1); cc.setAtomicityMode(TRANSACTIONAL); cc.setIndexedTypes( http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java index 4bdd149..f5254e6 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java @@ -99,11 +99,7 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration c = super.getConfiguration(gridName); - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(ipFinder); - - c.setDiscoverySpi(disco); + c.setDiscoverySpi(new TcpDiscoverySpi().setForceServerMode(true).setIpFinder(ipFinder)); // Otherwise noop swap space will be chosen on Windows. c.setSwapSpaceSpi(new FileSwapSpaceSpi());