http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b08492a5/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/GridGgfsHadoopDualAbstractSelfTest.java
----------------------------------------------------------------------
diff --cc 
modules/hadoop/src/test/java/org/apache/ignite/ignitefs/GridGgfsHadoopDualAbstractSelfTest.java
index 62c2c95,0000000..d797346
mode 100644,000000..100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/GridGgfsHadoopDualAbstractSelfTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/GridGgfsHadoopDualAbstractSelfTest.java
@@@ -1,304 -1,0 +1,304 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.ignitefs;
 +
 +import org.apache.hadoop.conf.*;
 +import org.apache.hadoop.fs.*;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.ignite.*;
 +import org.apache.ignite.cache.*;
 +import org.apache.ignite.configuration.*;
 +import org.apache.ignite.internal.fs.hadoop.*;
 +import org.apache.ignite.internal.processors.fs.*;
 +import org.apache.ignite.internal.util.typedef.*;
 +import org.apache.ignite.internal.util.typedef.internal.*;
 +import org.apache.ignite.spi.discovery.tcp.*;
 +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 +import org.apache.ignite.testframework.*;
 +import org.jetbrains.annotations.*;
 +
 +import java.io.*;
 +import java.net.*;
 +import java.util.*;
 +import java.util.concurrent.*;
 +
 +import static org.apache.ignite.cache.CacheAtomicityMode.*;
 +import static org.apache.ignite.cache.CacheMode.*;
 +import static org.apache.ignite.ignitefs.IgniteFsMode.*;
 +import static org.apache.ignite.ignitefs.hadoop.GridGgfsHadoopParameters.*;
 +import static 
org.apache.ignite.internal.processors.fs.GridGgfsAbstractSelfTest.*;
 +
 +/**
 + * Tests for GGFS working in mode when remote file system exists: DUAL_SYNC, 
DUAL_ASYNC.
 + */
 +public abstract class GridGgfsHadoopDualAbstractSelfTest extends 
GridGgfsCommonAbstractTest {
 +    /** GGFS block size. */
 +    protected static final int GGFS_BLOCK_SIZE = 512 * 1024;
 +
 +    /** Amount of blocks to prefetch. */
 +    protected static final int PREFETCH_BLOCKS = 1;
 +
 +    /** Amount of sequential block reads before prefetch is triggered. */
 +    protected static final int SEQ_READS_BEFORE_PREFETCH = 2;
 +
 +    /** Secondary file system URI. */
 +    protected static final String SECONDARY_URI = 
"ggfs://ggfs-secondary:grid-secondary@127.0.0.1:11500/";
 +
 +    /** Secondary file system configuration path. */
 +    protected static final String SECONDARY_CFG = 
"modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml";
 +
 +    /** Primary file system URI. */
 +    protected static final String PRIMARY_URI = "ggfs://ggfs:grid@/";
 +
 +    /** Primary file system configuration path. */
 +    protected static final String PRIMARY_CFG = 
"modules/core/src/test/config/hadoop/core-site-loopback.xml";
 +
 +    /** Primary file system REST endpoint configuration map. */
 +    protected static final Map<String, String> PRIMARY_REST_CFG = new 
HashMap<String, String>() {{
 +        put("type", "tcp");
 +        put("port", "10500");
 +    }};
 +
 +    /** Secondary file system REST endpoint configuration map. */
 +    protected static final Map<String, String> SECONDARY_REST_CFG = new 
HashMap<String, String>() {{
 +        put("type", "tcp");
 +        put("port", "11500");
 +    }};
 +
 +    /** Directory. */
 +    protected static final IgniteFsPath DIR = new IgniteFsPath("/dir");
 +
 +    /** Sub-directory. */
 +    protected static final IgniteFsPath SUBDIR = new IgniteFsPath(DIR, 
"subdir");
 +
 +    /** File. */
 +    protected static final IgniteFsPath FILE = new IgniteFsPath(SUBDIR, 
"file");
 +
 +    /** Default data chunk (128 bytes). */
 +    protected static byte[] chunk;
 +
 +    /** Primary GGFS. */
 +    protected static GridGgfsImpl ggfs;
 +
 +    /** Secondary GGFS. */
 +    protected static GridGgfsImpl ggfsSecondary;
 +
 +    /** GGFS mode. */
 +    protected final IgniteFsMode mode;
 +
 +    /**
 +     * Constructor.
 +     *
 +     * @param mode GGFS mode.
 +     */
 +    protected GridGgfsHadoopDualAbstractSelfTest(IgniteFsMode mode) {
 +        this.mode = mode;
 +        assert mode == DUAL_SYNC || mode == DUAL_ASYNC;
 +    }
 +
 +    /**
 +     * Start grid with GGFS.
 +     *
 +     * @param gridName Grid name.
 +     * @param ggfsName GGFS name
 +     * @param mode GGFS mode.
 +     * @param secondaryFs Secondary file system (optional).
 +     * @param restCfg Rest configuration string (optional).
 +     * @return Started grid instance.
 +     * @throws Exception If failed.
 +     */
 +    protected Ignite startGridWithGgfs(String gridName, String ggfsName, 
IgniteFsMode mode,
 +        @Nullable IgniteFsFileSystem secondaryFs, @Nullable Map<String, 
String> restCfg) throws Exception {
 +        IgniteFsConfiguration ggfsCfg = new IgniteFsConfiguration();
 +
 +        ggfsCfg.setDataCacheName("dataCache");
 +        ggfsCfg.setMetaCacheName("metaCache");
 +        ggfsCfg.setName(ggfsName);
 +        ggfsCfg.setBlockSize(GGFS_BLOCK_SIZE);
 +        ggfsCfg.setDefaultMode(mode);
 +        ggfsCfg.setIpcEndpointConfiguration(restCfg);
 +        ggfsCfg.setSecondaryFileSystem(secondaryFs);
 +        ggfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS);
 +        ggfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH);
 +
 +        CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
 +
 +        dataCacheCfg.setName("dataCache");
 +        dataCacheCfg.setCacheMode(PARTITIONED);
 +        
dataCacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY);
 +        
dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
 +        dataCacheCfg.setAffinityMapper(new 
IgniteFsGroupDataBlocksKeyMapper(2));
 +        dataCacheCfg.setBackups(0);
 +        dataCacheCfg.setQueryIndexEnabled(false);
 +        dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
 +        dataCacheCfg.setOffHeapMaxMemory(0);
 +
 +        CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
 +
 +        metaCacheCfg.setName("metaCache");
 +        metaCacheCfg.setCacheMode(REPLICATED);
 +        
metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
 +        metaCacheCfg.setQueryIndexEnabled(false);
 +        metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
 +
 +        IgniteConfiguration cfg = new IgniteConfiguration();
 +
 +        cfg.setGridName(gridName);
 +
 +        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
 +
 +        discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
 +
 +        cfg.setDiscoverySpi(discoSpi);
 +        cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
 +        cfg.setGgfsConfiguration(ggfsCfg);
 +
 +        cfg.setLocalHost("127.0.0.1");
 +        cfg.setRestEnabled(false);
 +
 +        return G.start(cfg);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected void beforeTestsStarted() throws Exception {
 +        chunk = new byte[128];
 +
 +        for (int i = 0; i < chunk.length; i++)
 +            chunk[i] = (byte)i;
 +
 +        Ignite igniteSecondary = startGridWithGgfs("grid-secondary", 
"ggfs-secondary", PRIMARY, null, SECONDARY_REST_CFG);
 +
 +        IgniteFsFileSystem hadoopFs = new 
GridGgfsHadoopFileSystemWrapper(SECONDARY_URI, SECONDARY_CFG);
 +
 +        Ignite ignite = startGridWithGgfs("grid", "ggfs", mode, hadoopFs, 
PRIMARY_REST_CFG);
 +
 +        ggfsSecondary = (GridGgfsImpl) 
igniteSecondary.fileSystem("ggfs-secondary");
 +        ggfs = (GridGgfsImpl) ignite.fileSystem("ggfs");
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected void afterTest() throws Exception {
 +        clear(ggfs);
 +        clear(ggfsSecondary);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected void afterTestsStopped() throws Exception {
 +        G.stopAll(true);
 +    }
 +
 +    /**
 +     * Convenient method to group paths.
 +     *
 +     * @param paths Paths to group.
 +     * @return Paths as array.
 +     */
 +    protected IgniteFsPath[] paths(IgniteFsPath... paths) {
 +        return paths;
 +    }
 +
 +    /**
 +     * Check how prefetch override works.
 +     *
 +     * @throws Exception IF failed.
 +     */
 +    public void testOpenPrefetchOverride() throws Exception {
 +        create(ggfsSecondary, paths(DIR, SUBDIR), paths(FILE));
 +
 +        // Write enough data to the secondary file system.
 +        final int blockSize = GGFS_BLOCK_SIZE;
 +
 +        IgniteFsOutputStream out = ggfsSecondary.append(FILE, false);
 +
 +        int totalWritten = 0;
 +
 +        while (totalWritten < blockSize * 2 + chunk.length) {
 +            out.write(chunk);
 +
 +            totalWritten += chunk.length;
 +        }
 +
 +        out.close();
 +
 +        awaitFileClose(ggfsSecondary, FILE);
 +
 +        // Instantiate file system with overridden "seq reads before 
prefetch" property.
 +        Configuration cfg = new Configuration();
 +
-         cfg.addResource(U.resolveGridGainUrl(PRIMARY_CFG));
++        cfg.addResource(U.resolveIgniteUrl(PRIMARY_CFG));
 +
 +        int seqReads = SEQ_READS_BEFORE_PREFETCH + 1;
 +
 +        cfg.setInt(String.format(PARAM_GGFS_SEQ_READS_BEFORE_PREFETCH, 
"ggfs:grid@"), seqReads);
 +
 +        FileSystem fs = FileSystem.get(new URI(PRIMARY_URI), cfg);
 +
 +        // Read the first two blocks.
 +        Path fsHome = new Path(PRIMARY_URI);
 +        Path dir = new Path(fsHome, DIR.name());
 +        Path subdir = new Path(dir, SUBDIR.name());
 +        Path file = new Path(subdir, FILE.name());
 +
 +        FSDataInputStream fsIn = fs.open(file);
 +
 +        final byte[] readBuf = new byte[blockSize * 2];
 +
 +        fsIn.readFully(0, readBuf, 0, readBuf.length);
 +
 +        // Wait for a while for prefetch to finish (if any).
 +        GridGgfsMetaManager meta = ggfs.context().meta();
 +
 +        GridGgfsFileInfo info = meta.info(meta.fileId(FILE));
 +
 +        GridGgfsBlockKey key = new GridGgfsBlockKey(info.id(), 
info.affinityKey(), info.evictExclude(), 2);
 +
 +        GridCache<GridGgfsBlockKey, byte[]> dataCache = 
ggfs.context().kernalContext().cache().cache(
 +            ggfs.configuration().getDataCacheName());
 +
 +        for (int i = 0; i < 10; i++) {
 +            if (dataCache.containsKey(key))
 +                break;
 +            else
 +                U.sleep(100);
 +        }
 +
 +        fsIn.close();
 +
 +        // Remove the file from the secondary file system.
 +        ggfsSecondary.delete(FILE, false);
 +
 +        // Try reading the third block. Should fail.
 +        GridTestUtils.assertThrows(log, new Callable<Object>() {
 +            @Override public Object call() throws Exception {
 +                IgniteFsInputStream in0 = ggfs.open(FILE);
 +
 +                in0.seek(blockSize * 2);
 +
 +                try {
 +                    in0.read(readBuf);
 +                }
 +                finally {
 +                    U.closeQuiet(in0);
 +                }
 +
 +                return null;
 +            }
 +        }, IOException.class,
 +            "Failed to read data due to secondary file system exception: 
/dir/subdir/file");
 +    }
 +}

Reply via email to