http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNodeStartup.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNodeStartup.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNodeStartup.java index 400bbb1..8051a3e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNodeStartup.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNodeStartup.java @@ -41,7 +41,7 @@ public class IpcSharedMemoryNodeStartup { public static void main(String[] args) throws Exception{ IgniteConfiguration cfg = new IgniteConfiguration(); - IgfsConfiguration igfsCfg = new IgfsConfiguration(); + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); @@ -60,7 +60,7 @@ public class IpcSharedMemoryNodeStartup { igfsCfg.setMetaCacheName("partitioned"); igfsCfg.setName("igfs"); - cfg.setIgfsConfiguration(igfsCfg); + cfg.setFileSystemConfiguration(igfsCfg); CacheConfiguration cacheCfg = new CacheConfiguration();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java index 58478d3..220d5d6 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java @@ -176,12 +176,12 @@ public class IgniteMock implements Ignite { } /** {@inheritDoc} */ - @Override public IgniteFs fileSystem(String name) { + @Override public IgniteFileSystem fileSystem(String name) { return null; } /** {@inheritDoc} */ - @Override public Collection<IgniteFs> fileSystems() { + @Override public Collection<IgniteFileSystem> fileSystems() { return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java index 8f06117..cea510a 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java @@ -48,7 +48,7 @@ public class IgniteIgfsTestSuite extends TestSuite { if (U.isWindows()) suite.addTest(new TestSuite(IgfsServerManagerIpcEndpointRegistrationOnWindowsSelfTest.class)); - suite.addTest(new TestSuite(GridCacheIgfsPerBlockLruEvictionPolicySelfTest.class)); + suite.addTest(new TestSuite(IgfsCachePerBlockLruEvictionPolicySelfTest.class)); suite.addTest(new TestSuite(IgfsStreamsSelfTest.class)); suite.addTest(new TestSuite(IgfsModesSelfTest.class)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocol.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocol.java deleted file mode 100644 index bd31951..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocol.java +++ /dev/null @@ -1,334 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.client.hadoop; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.ipc.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.protocol.*; -import org.apache.hadoop.mapreduce.security.token.delegation.*; -import org.apache.hadoop.mapreduce.v2.*; -import org.apache.hadoop.mapreduce.v2.jobhistory.*; -import org.apache.hadoop.security.*; -import org.apache.hadoop.security.authorize.*; -import org.apache.hadoop.security.token.*; -import org.apache.ignite.*; -import org.apache.ignite.client.hadoop.counter.*; -import org.apache.ignite.internal.client.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.proto.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*; - -/** - * Hadoop client protocol. - */ -public class GridHadoopClientProtocol implements ClientProtocol { - /** Ignite framework name property. */ - public static final String FRAMEWORK_NAME = "ignite"; - - /** Protocol version. */ - private static final long PROTO_VER = 1L; - - /** Default Ignite system directory. */ - private static final String SYS_DIR = ".ignite/system"; - - /** Configuration. */ - private final Configuration conf; - - /** Ignite client. */ - private volatile GridClient cli; - - /** Last received version. */ - private long lastVer = -1; - - /** Last received status. */ - private GridHadoopJobStatus lastStatus; - - /** - * Constructor. - * - * @param conf Configuration. - * @param cli Ignite client. - */ - GridHadoopClientProtocol(Configuration conf, GridClient cli) { - assert cli != null; - - this.conf = conf; - this.cli = cli; - } - - /** {@inheritDoc} */ - @Override public JobID getNewJobID() throws IOException, InterruptedException { - try { - conf.setLong(REQ_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis()); - - GridHadoopJobId jobID = cli.compute().execute(GridHadoopProtocolNextTaskIdTask.class.getName(), null); - - conf.setLong(RESPONSE_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis()); - - return new JobID(jobID.globalId().toString(), jobID.localId()); - } - catch (GridClientException e) { - throw new IOException("Failed to get new job ID.", e); - } - } - - /** {@inheritDoc} */ - @Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, - InterruptedException { - try { - conf.setLong(JOB_SUBMISSION_START_TS_PROPERTY, U.currentTimeMillis()); - - GridHadoopJobStatus status = cli.compute().execute(GridHadoopProtocolSubmitJobTask.class.getName(), - new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf))); - - if (status == null) - throw new IOException("Failed to submit job (null status obtained): " + jobId); - - return processStatus(status); - } - catch (GridClientException | IgniteCheckedException e) { - throw new IOException("Failed to submit job.", e); - } - } - - /** {@inheritDoc} */ - @Override public ClusterMetrics getClusterMetrics() throws IOException, InterruptedException { - return new ClusterMetrics(0, 0, 0, 0, 0, 0, 1000, 1000, 1, 100, 0, 0); - } - - /** {@inheritDoc} */ - @Override public Cluster.JobTrackerStatus getJobTrackerStatus() throws IOException, InterruptedException { - return Cluster.JobTrackerStatus.RUNNING; - } - - /** {@inheritDoc} */ - @Override public long getTaskTrackerExpiryInterval() throws IOException, InterruptedException { - return 0; - } - - /** {@inheritDoc} */ - @Override public AccessControlList getQueueAdmins(String queueName) throws IOException { - return new AccessControlList("*"); - } - - /** {@inheritDoc} */ - @Override public void killJob(JobID jobId) throws IOException, InterruptedException { - try { - cli.compute().execute(GridHadoopProtocolKillJobTask.class.getName(), - new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId())); - } - catch (GridClientException e) { - throw new IOException("Failed to kill job: " + jobId, e); - } - } - - /** {@inheritDoc} */ - @Override public void setJobPriority(JobID jobid, String priority) throws IOException, InterruptedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException, - InterruptedException { - return false; - } - - /** {@inheritDoc} */ - @Override public JobStatus getJobStatus(JobID jobId) throws IOException, InterruptedException { - try { - Long delay = conf.getLong(GridHadoopJobProperty.JOB_STATUS_POLL_DELAY.propertyName(), -1); - - GridHadoopProtocolTaskArguments args = delay >= 0 ? - new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), delay) : - new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()); - - GridHadoopJobStatus status = cli.compute().execute(GridHadoopProtocolJobStatusTask.class.getName(), args); - - if (status == null) - throw new IOException("Job tracker doesn't have any information about the job: " + jobId); - - return processStatus(status); - } - catch (GridClientException e) { - throw new IOException("Failed to get job status: " + jobId, e); - } - } - - /** {@inheritDoc} */ - @Override public Counters getJobCounters(JobID jobId) throws IOException, InterruptedException { - try { - final GridHadoopCounters counters = cli.compute().execute(GridHadoopProtocolJobCountersTask.class.getName(), - new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId())); - - if (counters == null) - throw new IOException("Job tracker doesn't have any information about the job: " + jobId); - - return new GridHadoopClientCounters(counters); - } - catch (GridClientException e) { - throw new IOException("Failed to get job counters: " + jobId, e); - } - } - - /** {@inheritDoc} */ - @Override public TaskReport[] getTaskReports(JobID jobid, TaskType type) throws IOException, InterruptedException { - return new TaskReport[0]; - } - - /** {@inheritDoc} */ - @Override public String getFilesystemName() throws IOException, InterruptedException { - return FileSystem.get(conf).getUri().toString(); - } - - /** {@inheritDoc} */ - @Override public JobStatus[] getAllJobs() throws IOException, InterruptedException { - return new JobStatus[0]; - } - - /** {@inheritDoc} */ - @Override public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid, int fromEventId, int maxEvents) - throws IOException, InterruptedException { - return new TaskCompletionEvent[0]; - } - - /** {@inheritDoc} */ - @Override public String[] getTaskDiagnostics(TaskAttemptID taskId) throws IOException, InterruptedException { - return new String[0]; - } - - /** {@inheritDoc} */ - @Override public TaskTrackerInfo[] getActiveTrackers() throws IOException, InterruptedException { - return new TaskTrackerInfo[0]; - } - - /** {@inheritDoc} */ - @Override public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, InterruptedException { - return new TaskTrackerInfo[0]; - } - - /** {@inheritDoc} */ - @Override public String getSystemDir() throws IOException, InterruptedException { - Path sysDir = new Path(SYS_DIR); - - return sysDir.toString(); - } - - /** {@inheritDoc} */ - @Override public String getStagingAreaDir() throws IOException, InterruptedException { - String usr = UserGroupInformation.getCurrentUser().getShortUserName(); - - return GridHadoopUtils.stagingAreaDir(conf, usr).toString(); - } - - /** {@inheritDoc} */ - @Override public String getJobHistoryDir() throws IOException, InterruptedException { - return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf); - } - - /** {@inheritDoc} */ - @Override public QueueInfo[] getQueues() throws IOException, InterruptedException { - return new QueueInfo[0]; - } - - /** {@inheritDoc} */ - @Override public QueueInfo getQueue(String queueName) throws IOException, InterruptedException { - return null; - } - - /** {@inheritDoc} */ - @Override public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, InterruptedException { - return new QueueAclsInfo[0]; - } - - /** {@inheritDoc} */ - @Override public QueueInfo[] getRootQueues() throws IOException, InterruptedException { - return new QueueInfo[0]; - } - - /** {@inheritDoc} */ - @Override public QueueInfo[] getChildQueues(String queueName) throws IOException, InterruptedException { - return new QueueInfo[0]; - } - - /** {@inheritDoc} */ - @Override public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException, - InterruptedException { - return null; - } - - /** {@inheritDoc} */ - @Override public long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException, - InterruptedException { - return 0; - } - - /** {@inheritDoc} */ - @Override public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException, - InterruptedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID) throws IOException, - InterruptedException { - return null; - } - - /** {@inheritDoc} */ - @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { - return PROTO_VER; - } - - /** {@inheritDoc} */ - @Override public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) - throws IOException { - return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash); - } - - /** - * Process received status update. - * - * @param status Ignite status. - * @return Hadoop status. - */ - private JobStatus processStatus(GridHadoopJobStatus status) { - // IMPORTANT! This method will only work in single-threaded environment. It is valid at the moment because - // GridHadoopClientProtocolProvider creates new instance of this class for every new job and Job class - // serializes invocations of submitJob() and getJobStatus() methods. However, if any of these conditions will - // change in future and either protocol will serve statuses for several jobs or status update will not be - // serialized anymore, then we have to fallback to concurrent approach (e.g. using ConcurrentHashMap). - // (vozerov) - if (lastVer < status.version()) { - lastVer = status.version(); - - lastStatus = status; - } - else - assert lastStatus != null; - - return GridHadoopUtils.status(lastStatus, conf); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolProvider.java deleted file mode 100644 index a9a1c9d..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolProvider.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.client.hadoop; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.protocol.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.client.*; -import org.apache.ignite.internal.client.marshaller.optimized.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.typedef.*; - -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.internal.client.GridClientProtocol.*; -import static org.apache.ignite.client.hadoop.GridHadoopClientProtocol.*; - - -/** - * Grid Hadoop client protocol provider. - */ -public class GridHadoopClientProtocolProvider extends ClientProtocolProvider { - /** Clients. */ - private static final ConcurrentHashMap<String, IgniteInternalFuture<GridClient>> cliMap = new ConcurrentHashMap<>(); - - /** {@inheritDoc} */ - @Override public ClientProtocol create(Configuration conf) throws IOException { - if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) { - String addr = conf.get(MRConfig.MASTER_ADDRESS); - - if (F.isEmpty(addr)) - throw new IOException("Failed to create client protocol because server address is not specified (is " + - MRConfig.MASTER_ADDRESS + " property set?)."); - - if (F.eq(addr, "local")) - throw new IOException("Local execution mode is not supported, please point " + - MRConfig.MASTER_ADDRESS + " to real Ignite node."); - - return createProtocol(addr, conf); - } - - return null; - } - - /** {@inheritDoc} */ - @Override public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException { - if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) - return createProtocol(addr.getHostString() + ":" + addr.getPort(), conf); - - return null; - } - - /** {@inheritDoc} */ - @Override public void close(ClientProtocol cliProto) throws IOException { - // No-op. - } - - /** - * Internal protocol creation routine. - * - * @param addr Address. - * @param conf Configuration. - * @return Client protocol. - * @throws IOException If failed. - */ - private static ClientProtocol createProtocol(String addr, Configuration conf) throws IOException { - return new GridHadoopClientProtocol(conf, client(addr)); - } - - /** - * Create client. - * - * @param addr Endpoint address. - * @return Client. - * @throws IOException If failed. - */ - private static GridClient client(String addr) throws IOException { - try { - IgniteInternalFuture<GridClient> fut = cliMap.get(addr); - - if (fut == null) { - GridFutureAdapter<GridClient> fut0 = new GridFutureAdapter<>(); - - IgniteInternalFuture<GridClient> oldFut = cliMap.putIfAbsent(addr, fut0); - - if (oldFut != null) - return oldFut.get(); - else { - GridClientConfiguration cliCfg = new GridClientConfiguration(); - - cliCfg.setProtocol(TCP); - cliCfg.setServers(Collections.singletonList(addr)); - cliCfg.setMarshaller(new GridClientOptimizedMarshaller()); - cliCfg.setDaemon(true); - - try { - GridClient cli = GridClientFactory.start(cliCfg); - - fut0.onDone(cli); - - return cli; - } - catch (GridClientException e) { - fut0.onDone(e); - - throw new IOException("Failed to establish connection with Ignite node: " + addr, e); - } - } - } - else - return fut.get(); - } - catch (IgniteCheckedException e) { - throw new IOException("Failed to establish connection with Ignite node: " + addr, e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/counter/GridHadoopClientCounterGroup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/counter/GridHadoopClientCounterGroup.java b/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/counter/GridHadoopClientCounterGroup.java deleted file mode 100644 index 37cd28d..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/counter/GridHadoopClientCounterGroup.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.client.hadoop.counter; - -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.counters.*; - -import java.io.*; -import java.util.*; - -/** - * Hadoop Client API Counters adapter. - */ -class GridHadoopClientCounterGroup implements CounterGroup { - /** Counters. */ - private final GridHadoopClientCounters cntrs; - - /** Group name. */ - private final String name; - - /** - * Creates new instance. - * - * @param cntrs Client counters instance. - * @param name Group name. - */ - GridHadoopClientCounterGroup(GridHadoopClientCounters cntrs, String name) { - this.cntrs = cntrs; - this.name = name; - } - - /** {@inheritDoc} */ - @Override public String getName() { - return name; - } - - /** {@inheritDoc} */ - @Override public String getDisplayName() { - return name; - } - - /** {@inheritDoc} */ - @Override public void setDisplayName(String displayName) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void addCounter(Counter counter) { - addCounter(counter.getName(), counter.getDisplayName(), 0); - } - - /** {@inheritDoc} */ - @Override public Counter addCounter(String name, String displayName, long value) { - final Counter counter = cntrs.findCounter(this.name, name); - - counter.setValue(value); - - return counter; - } - - /** {@inheritDoc} */ - @Override public Counter findCounter(String counterName, String displayName) { - return cntrs.findCounter(name, counterName); - } - - /** {@inheritDoc} */ - @Override public Counter findCounter(String counterName, boolean create) { - return cntrs.findCounter(name, counterName, create); - } - - /** {@inheritDoc} */ - @Override public Counter findCounter(String counterName) { - return cntrs.findCounter(name, counterName); - } - - /** {@inheritDoc} */ - @Override public int size() { - return cntrs.groupSize(name); - } - - /** {@inheritDoc} */ - @Override public void incrAllCounters(CounterGroupBase<Counter> rightGroup) { - for (final Counter counter : rightGroup) - cntrs.findCounter(name, counter.getName()).increment(counter.getValue()); - } - - /** {@inheritDoc} */ - @Override public CounterGroupBase<Counter> getUnderlyingGroup() { - return this; - } - - /** {@inheritDoc} */ - @Override public Iterator<Counter> iterator() { - return cntrs.iterateGroup(name); - } - - /** {@inheritDoc} */ - @Override public void write(DataOutput out) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } - - /** {@inheritDoc} */ - @Override public void readFields(DataInput in) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/counter/GridHadoopClientCounters.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/counter/GridHadoopClientCounters.java b/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/counter/GridHadoopClientCounters.java deleted file mode 100644 index 9f4ec02..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/counter/GridHadoopClientCounters.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.client.hadoop.counter; - -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.counters.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.counter.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; -import org.apache.ignite.internal.util.typedef.*; - -import java.io.*; -import java.util.*; - -/** - * Hadoop Client API Counters adapter. - */ -public class GridHadoopClientCounters extends Counters { - /** */ - private final Map<T2<String,String>,GridHadoopLongCounter> cntrs = new HashMap<>(); - - /** - * Creates new instance based on given counters. - * - * @param cntrs Counters to adapt. - */ - public GridHadoopClientCounters(GridHadoopCounters cntrs) { - for (GridHadoopCounter cntr : cntrs.all()) - if (cntr instanceof GridHadoopLongCounter) - this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (GridHadoopLongCounter) cntr); - } - - /** {@inheritDoc} */ - @Override public synchronized CounterGroup addGroup(CounterGroup grp) { - return addGroup(grp.getName(), grp.getDisplayName()); - } - - /** {@inheritDoc} */ - @Override public CounterGroup addGroup(String name, String displayName) { - return new GridHadoopClientCounterGroup(this, name); - } - - /** {@inheritDoc} */ - @Override public Counter findCounter(String grpName, String cntrName) { - return findCounter(grpName, cntrName, true); - } - - /** {@inheritDoc} */ - @Override public synchronized Counter findCounter(Enum<?> key) { - return findCounter(key.getDeclaringClass().getName(), key.name(), true); - } - - /** {@inheritDoc} */ - @Override public synchronized Counter findCounter(String scheme, FileSystemCounter key) { - return findCounter(String.format("FileSystem Counter (%s)", scheme), key.name()); - } - - /** {@inheritDoc} */ - @Override public synchronized Iterable<String> getGroupNames() { - Collection<String> res = new HashSet<>(); - - for (GridHadoopCounter counter : cntrs.values()) - res.add(counter.group()); - - return res; - } - - /** {@inheritDoc} */ - @Override public Iterator<CounterGroup> iterator() { - final Iterator<String> iter = getGroupNames().iterator(); - - return new Iterator<CounterGroup>() { - @Override public boolean hasNext() { - return iter.hasNext(); - } - - @Override public CounterGroup next() { - if (!hasNext()) - throw new NoSuchElementException(); - - return new GridHadoopClientCounterGroup(GridHadoopClientCounters.this, iter.next()); - } - - @Override public void remove() { - throw new UnsupportedOperationException("not implemented"); - } - }; - } - - /** {@inheritDoc} */ - @Override public synchronized CounterGroup getGroup(String grpName) { - return new GridHadoopClientCounterGroup(this, grpName); - } - - /** {@inheritDoc} */ - @Override public synchronized int countCounters() { - return cntrs.size(); - } - - /** {@inheritDoc} */ - @Override public synchronized void write(DataOutput out) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } - - /** {@inheritDoc} */ - @Override public synchronized void readFields(DataInput in) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } - - /** {@inheritDoc} */ - @Override public synchronized void incrAllCounters(AbstractCounters<Counter, CounterGroup> other) { - for (CounterGroup group : other) { - for (Counter counter : group) { - findCounter(group.getName(), counter.getName()).increment(counter.getValue()); - } - } - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object genericRight) { - if (!(genericRight instanceof GridHadoopClientCounters)) - return false; - - return cntrs.equals(((GridHadoopClientCounters) genericRight).cntrs); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return cntrs.hashCode(); - } - - /** {@inheritDoc} */ - @Override public void setWriteAllCounters(boolean snd) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean getWriteAllCounters() { - return true; - } - - /** {@inheritDoc} */ - @Override public Limits limits() { - return null; - } - - /** - * Returns size of a group. - * - * @param grpName Name of the group. - * @return amount of counters in the given group. - */ - public int groupSize(String grpName) { - int res = 0; - - for (GridHadoopCounter counter : cntrs.values()) { - if (grpName.equals(counter.group())) - res++; - } - - return res; - } - - /** - * Returns counters iterator for specified group. - * - * @param grpName Name of the group to iterate. - * @return Counters iterator. - */ - public Iterator<Counter> iterateGroup(String grpName) { - Collection<Counter> grpCounters = new ArrayList<>(); - - for (GridHadoopLongCounter counter : cntrs.values()) { - if (grpName.equals(counter.group())) - grpCounters.add(new GridHadoopV2Counter(counter)); - } - - return grpCounters.iterator(); - } - - /** - * Find a counter in the group. - * - * @param grpName The name of the counter group. - * @param cntrName The name of the counter. - * @param create Create the counter if not found if true. - * @return The counter that was found or added or {@code null} if create is false. - */ - public Counter findCounter(String grpName, String cntrName, boolean create) { - T2<String, String> key = new T2<>(grpName, cntrName); - - GridHadoopLongCounter internalCntr = cntrs.get(key); - - if (internalCntr == null & create) { - internalCntr = new GridHadoopLongCounter(grpName,cntrName); - - cntrs.put(key, new GridHadoopLongCounter(grpName,cntrName)); - } - - return internalCntr == null ? null : new GridHadoopV2Counter(internalCntr); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/package.html ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/package.html b/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/package.html deleted file mode 100644 index f182598..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/package.html +++ /dev/null @@ -1,24 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> - <!-- Package description. --> - Ignite Hadoop client protocol. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java new file mode 100644 index 0000000..66e9761 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.fs; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.util.typedef.*; + +import java.io.*; +import java.util.*; + +/** + * Statistic writer implementation that writes info into any Hadoop file system. + */ +public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter { + /** */ + public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance"; + + /** */ + private static final String DEFAULT_USER_NAME = "anonymous"; + + /** */ + public static final String COUNTER_WRITER_DIR_PROPERTY = "ignite.counters.fswriter.directory"; + + /** */ + private static final String USER_MACRO = "${USER}"; + + /** */ + private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + USER_MACRO; + + /** {@inheritDoc} */ + @Override public void write(HadoopJobInfo jobInfo, HadoopJobId jobId, HadoopCounters cntrs) + throws IgniteCheckedException { + + Configuration hadoopCfg = new Configuration(); + + for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet()) + hadoopCfg.set(e.getKey(), e.getValue()); + + String user = jobInfo.user(); + + if (F.isEmpty(user)) + user = DEFAULT_USER_NAME; + + String dir = jobInfo.property(COUNTER_WRITER_DIR_PROPERTY); + + if (dir == null) + dir = DEFAULT_COUNTER_WRITER_DIR; + + Path jobStatPath = new Path(new Path(dir.replace(USER_MACRO, user)), jobId.toString()); + + HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null); + + try { + FileSystem fs = jobStatPath.getFileSystem(hadoopCfg); + + fs.mkdirs(jobStatPath); + + try (PrintStream out = new PrintStream(fs.create(new Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME)))) { + for (T2<String, Long> evt : perfCntr.evts()) { + out.print(evt.get1()); + out.print(':'); + out.println(evt.get2().toString()); + } + + out.flush(); + } + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java new file mode 100644 index 0000000..98f2e46 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.fs; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.permission.*; +import org.apache.hadoop.ipc.*; +import org.apache.ignite.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.igfs.secondary.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.igfs.*; +import org.apache.ignite.internal.processors.igfs.*; +import org.apache.ignite.internal.util.typedef.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +import static org.apache.ignite.internal.processors.igfs.IgfsEx.*; + +/** + * Adapter to use any Hadoop file system {@link FileSystem} as {@link IgfsSecondaryFileSystem}. + */ +public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, AutoCloseable { + + /** Hadoop file system. */ + private final FileSystem fileSys; + + /** Properties of file system */ + private final Map<String, String> props = new HashMap<>(); + + /** + * Simple constructor that is to be used by default. + * + * @param uri URI of file system. + * @throws IgniteCheckedException In case of error. + */ + public IgniteHadoopIgfsSecondaryFileSystem(String uri) throws IgniteCheckedException { + this(uri, null); + } + + /** + * Constructor. + * + * @param uri URI of file system. + * @param cfgPath Additional path to Hadoop configuration. + * @throws IgniteCheckedException In case of error. + */ + public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath) throws IgniteCheckedException { + try { + SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(uri, cfgPath); + + fileSys = secProvider.createFileSystem(); + + uri = secProvider.uri().toString(); + + if (!uri.endsWith("/")) + uri += "/"; + + props.put(SECONDARY_FS_CONFIG_PATH, cfgPath); + props.put(SECONDARY_FS_URI, uri); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** + * Convert IGFS path into Hadoop path. + * + * @param path IGFS path. + * @return Hadoop path. + */ + private Path convert(IgfsPath path) { + URI uri = fileSys.getUri(); + + return new Path(uri.getScheme(), uri.getAuthority(), path.toString()); + } + + /** + * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception. + * + * @param e Exception to check. + * @param detailMsg Detailed error message. + * @return Appropriate exception. + */ + @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"}) + private IgfsException handleSecondaryFsError(IOException e, String detailMsg) { + boolean wrongVer = X.hasCause(e, RemoteException.class) || + (e.getMessage() != null && e.getMessage().contains("Failed on local")); + + return !wrongVer ? cast(detailMsg, e) : + new IgfsInvalidHdfsVersionException("HDFS version you are connecting to differs from local " + + "version.", e); } + + /** + * Cast IO exception to IGFS exception. + * + * @param e IO exception. + * @return IGFS exception. + */ + public static IgfsException cast(String msg, IOException e) { + if (e instanceof FileNotFoundException) + return new IgfsFileNotFoundException(e); + else if (e instanceof ParentNotDirectoryException) + return new IgfsParentNotDirectoryException(msg, e); + else if (e instanceof PathIsNotEmptyDirectoryException) + return new IgfsDirectoryNotEmptyException(e); + else if (e instanceof PathExistsException) + return new IgfsPathAlreadyExistsException(msg, e); + else + return new IgfsException(msg, e); + } + + /** + * Convert Hadoop FileStatus properties to map. + * + * @param status File status. + * @return IGFS attributes. + */ + private static Map<String, String> properties(FileStatus status) { + FsPermission perm = status.getPermission(); + + if (perm == null) + perm = FsPermission.getDefault(); + + return F.asMap(PROP_PERMISSION, String.format("%04o", perm.toShort()), PROP_USER_NAME, status.getOwner(), + PROP_GROUP_NAME, status.getGroup()); + } + + /** {@inheritDoc} */ + @Override public boolean exists(IgfsPath path) { + try { + return fileSys.exists(convert(path)); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]"); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) { + HadoopIgfsProperties props0 = new HadoopIgfsProperties(props); + + try { + if (props0.userName() != null || props0.groupName() != null) + fileSys.setOwner(convert(path), props0.userName(), props0.groupName()); + + if (props0.permission() != null) + fileSys.setPermission(convert(path), props0.permission()); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]"); + } + + //Result is not used in case of secondary FS. + return null; + } + + /** {@inheritDoc} */ + @Override public void rename(IgfsPath src, IgfsPath dest) { + // Delegate to the secondary file system. + try { + if (!fileSys.rename(convert(src), convert(dest))) + throw new IgfsException("Failed to rename (secondary file system returned false) " + + "[src=" + src + ", dest=" + dest + ']'); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest=" + dest + ']'); + } + } + + /** {@inheritDoc} */ + @Override public boolean delete(IgfsPath path, boolean recursive) { + try { + return fileSys.delete(convert(path), recursive); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]"); + } + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path) { + try { + if (!fileSys.mkdirs(convert(path))) + throw new IgniteException("Failed to make directories [path=" + path + "]"); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + "]"); + } + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) { + try { + if (!fileSys.mkdirs(convert(path), new HadoopIgfsProperties(props).permission())) + throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]"); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + ", props=" + props + "]"); + } + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsPath> listPaths(IgfsPath path) { + try { + FileStatus[] statuses = fileSys.listStatus(convert(path)); + + if (statuses == null) + throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); + + Collection<IgfsPath> res = new ArrayList<>(statuses.length); + + for (FileStatus status : statuses) + res.add(new IgfsPath(path, status.getPath().getName())); + + return res; + } + catch (FileNotFoundException ignored) { + throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); + } + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsFile> listFiles(IgfsPath path) { + try { + FileStatus[] statuses = fileSys.listStatus(convert(path)); + + if (statuses == null) + throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); + + Collection<IgfsFile> res = new ArrayList<>(statuses.length); + + for (FileStatus status : statuses) { + IgfsFileInfo fsInfo = status.isDirectory() ? new IgfsFileInfo(true, properties(status)) : + new IgfsFileInfo((int)status.getBlockSize(), status.getLen(), null, null, false, + properties(status)); + + res.add(new IgfsFileImpl(new IgfsPath(path, status.getPath().getName()), fsInfo, 1)); + } + + return res; + } + catch (FileNotFoundException ignored) { + throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); + } + } + + /** {@inheritDoc} */ + @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) { + return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSys, convert(path), bufSize); + } + + /** {@inheritDoc} */ + @Override public OutputStream create(IgfsPath path, boolean overwrite) { + try { + return fileSys.create(convert(path), overwrite); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]"); + } + } + + /** {@inheritDoc} */ + @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, + long blockSize, @Nullable Map<String, String> props) { + HadoopIgfsProperties props0 = + new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap()); + + try { + return fileSys.create(convert(path), props0.permission(), overwrite, bufSize, (short)replication, blockSize, + null); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props + + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" + replication + + ", blockSize=" + blockSize + "]"); + } + } + + /** {@inheritDoc} */ + @Override public OutputStream append(IgfsPath path, int bufSize, boolean create, + @Nullable Map<String, String> props) { + try { + return fileSys.append(convert(path), bufSize); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]"); + } + } + + /** {@inheritDoc} */ + @Override public IgfsFile info(final IgfsPath path) { + try { + final FileStatus status = fileSys.getFileStatus(convert(path)); + + if (status == null) + return null; + + final Map<String, String> props = properties(status); + + return new IgfsFile() { + @Override public IgfsPath path() { + return path; + } + + @Override public boolean isFile() { + return status.isFile(); + } + + @Override public boolean isDirectory() { + return status.isDirectory(); + } + + @Override public int blockSize() { + return (int)status.getBlockSize(); + } + + @Override public long groupBlockSize() { + return status.getBlockSize(); + } + + @Override public long accessTime() { + return status.getAccessTime(); + } + + @Override public long modificationTime() { + return status.getModificationTime(); + } + + @Override public String property(String name) throws IllegalArgumentException { + String val = props.get(name); + + if (val == null) + throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']'); + + return val; + } + + @Nullable @Override public String property(String name, @Nullable String dfltVal) { + String val = props.get(name); + + return val == null ? dfltVal : val; + } + + @Override public long length() { + return status.getLen(); + } + + /** {@inheritDoc} */ + @Override public Map<String, String> properties() { + return props; + } + }; + + } + catch (FileNotFoundException ignore) { + return null; + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]"); + } + } + + /** {@inheritDoc} */ + @Override public long usedSpaceSize() { + try { + return fileSys.getContentSummary(new Path(fileSys.getUri())).getSpaceConsumed(); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to get used space size of file system."); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<String, String> properties() { + return props; + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + try { + fileSys.close(); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/package.html ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/package.html b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/package.html new file mode 100644 index 0000000..1d78952 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/package.html @@ -0,0 +1,24 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + Ignite Hadoop Accelerator file system API. +</body> +</html>