http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolKillJobTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolKillJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolKillJobTask.java deleted file mode 100644 index 384bc23..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolKillJobTask.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.proto; - -import org.apache.ignite.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.internal.processors.hadoop.*; - -import java.util.*; - -/** - * Kill job task. - */ -public class GridHadoopProtocolKillJobTask extends GridHadoopProtocolTaskAdapter<Boolean> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public Boolean run(ComputeJobContext jobCtx, GridHadoop hadoop, - GridHadoopProtocolTaskArguments args) throws IgniteCheckedException { - UUID nodeId = UUID.fromString(args.<String>get(0)); - Integer id = args.get(1); - - assert nodeId != null; - assert id != null; - - GridHadoopJobId jobId = new GridHadoopJobId(nodeId, id); - - return hadoop.kill(jobId); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolNextTaskIdTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolNextTaskIdTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolNextTaskIdTask.java deleted file mode 100644 index f76f3b6..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolNextTaskIdTask.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.proto; - -import org.apache.ignite.compute.*; -import org.apache.ignite.internal.processors.hadoop.*; - -/** - * Task to get the next job ID. - */ -public class GridHadoopProtocolNextTaskIdTask extends GridHadoopProtocolTaskAdapter<GridHadoopJobId> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public GridHadoopJobId run(ComputeJobContext jobCtx, GridHadoop hadoop, - GridHadoopProtocolTaskArguments args) { - return hadoop.nextJobId(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java deleted file mode 100644 index c734acd..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.proto; - -import org.apache.ignite.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.internal.processors.hadoop.*; - -import java.util.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*; - -/** - * Submit job task. - */ -public class GridHadoopProtocolSubmitJobTask extends GridHadoopProtocolTaskAdapter<GridHadoopJobStatus> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public GridHadoopJobStatus run(ComputeJobContext jobCtx, GridHadoop hadoop, - GridHadoopProtocolTaskArguments args) throws IgniteCheckedException { - UUID nodeId = UUID.fromString(args.<String>get(0)); - Integer id = args.get(1); - GridHadoopDefaultJobInfo info = args.get(2); - - assert nodeId != null; - assert id != null; - assert info != null; - - GridHadoopJobId jobId = new GridHadoopJobId(nodeId, id); - - hadoop.submit(jobId, info); - - GridHadoopJobStatus res = hadoop.status(jobId); - - if (res == null) // Submission failed. - res = new GridHadoopJobStatus(jobId, info.jobName(), info.user(), 0, 0, 0, 0, PHASE_CANCELLING, true, 1); - - return res; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskAdapter.java deleted file mode 100644 index 086545c..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskAdapter.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.proto; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.resources.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Hadoop protocol task adapter. - */ -public abstract class GridHadoopProtocolTaskAdapter<R> implements ComputeTask<GridHadoopProtocolTaskArguments, R> { - /** {@inheritDoc} */ - @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, - @Nullable GridHadoopProtocolTaskArguments arg) { - return Collections.singletonMap(new Job(arg), subgrid.get(0)); - } - - /** {@inheritDoc} */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { - return ComputeJobResultPolicy.REDUCE; - } - - /** {@inheritDoc} */ - @Nullable @Override public R reduce(List<ComputeJobResult> results) { - if (!F.isEmpty(results)) { - ComputeJobResult res = results.get(0); - - return res.getData(); - } - else - return null; - } - - /** - * Job wrapper. - */ - private class Job implements ComputeJob { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - @IgniteInstanceResource - private Ignite ignite; - - /** */ - @SuppressWarnings("UnusedDeclaration") - @JobContextResource - private ComputeJobContext jobCtx; - - /** Argument. */ - private final GridHadoopProtocolTaskArguments args; - - /** - * Constructor. - * - * @param args Job argument. - */ - private Job(GridHadoopProtocolTaskArguments args) { - this.args = args; - } - - /** {@inheritDoc} */ - @Override public void cancel() { - // No-op. - } - - /** {@inheritDoc} */ - @Nullable @Override public Object execute() { - try { - return run(jobCtx, ((IgniteEx)ignite).hadoop(), args); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - } - - /** - * Run the task. - * - * @param jobCtx Job context. - * @param hadoop Hadoop facade. - * @param args Arguments. - * @return Job result. - * @throws IgniteCheckedException If failed. - */ - public abstract R run(ComputeJobContext jobCtx, GridHadoop hadoop, GridHadoopProtocolTaskArguments args) - throws IgniteCheckedException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskArguments.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskArguments.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskArguments.java deleted file mode 100644 index ae91a52..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskArguments.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.proto; - -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * Task arguments. - */ -public class GridHadoopProtocolTaskArguments implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Arguments. */ - private Object[] args; - - /** - * {@link Externalizable} support. - */ - public GridHadoopProtocolTaskArguments() { - // No-op. - } - - /** - * Constructor. - * - * @param args Arguments. - */ - public GridHadoopProtocolTaskArguments(Object... args) { - this.args = args; - } - - /** - * @param idx Argument index. - * @return Argument. - */ - @SuppressWarnings("unchecked") - @Nullable public <T> T get(int idx) { - return (args != null && args.length > idx) ? (T)args[idx] : null; - } - - /** - * @return Size. - */ - public int size() { - return args != null ? args.length : 0; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeArray(out, args); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - args = U.readArray(in); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopProtocolTaskArguments.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java new file mode 100644 index 0000000..b454760 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.proto; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.ipc.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.protocol.*; +import org.apache.hadoop.mapreduce.security.token.delegation.*; +import org.apache.hadoop.mapreduce.v2.*; +import org.apache.hadoop.mapreduce.v2.jobhistory.*; +import org.apache.hadoop.security.*; +import org.apache.hadoop.security.authorize.*; +import org.apache.hadoop.security.token.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.client.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Hadoop client protocol. + */ +public class HadoopClientProtocol implements ClientProtocol { + /** Ignite framework name property. */ + public static final String FRAMEWORK_NAME = "ignite"; + + /** Protocol version. */ + private static final long PROTO_VER = 1L; + + /** Default Ignite system directory. */ + private static final String SYS_DIR = ".ignite/system"; + + /** Configuration. */ + private final Configuration conf; + + /** Ignite client. */ + private volatile GridClient cli; + + /** Last received version. */ + private long lastVer = -1; + + /** Last received status. */ + private HadoopJobStatus lastStatus; + + /** + * Constructor. + * + * @param conf Configuration. + * @param cli Ignite client. + */ + public HadoopClientProtocol(Configuration conf, GridClient cli) { + assert cli != null; + + this.conf = conf; + this.cli = cli; + } + + /** {@inheritDoc} */ + @Override public JobID getNewJobID() throws IOException, InterruptedException { + try { + conf.setLong(REQ_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis()); + + HadoopJobId jobID = cli.compute().execute(HadoopProtocolNextTaskIdTask.class.getName(), null); + + conf.setLong(RESPONSE_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis()); + + return new JobID(jobID.globalId().toString(), jobID.localId()); + } + catch (GridClientException e) { + throw new IOException("Failed to get new job ID.", e); + } + } + + /** {@inheritDoc} */ + @Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, + InterruptedException { + try { + conf.setLong(JOB_SUBMISSION_START_TS_PROPERTY, U.currentTimeMillis()); + + HadoopJobStatus status = cli.compute().execute(HadoopProtocolSubmitJobTask.class.getName(), + new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf))); + + if (status == null) + throw new IOException("Failed to submit job (null status obtained): " + jobId); + + return processStatus(status); + } + catch (GridClientException | IgniteCheckedException e) { + throw new IOException("Failed to submit job.", e); + } + } + + /** {@inheritDoc} */ + @Override public ClusterMetrics getClusterMetrics() throws IOException, InterruptedException { + return new ClusterMetrics(0, 0, 0, 0, 0, 0, 1000, 1000, 1, 100, 0, 0); + } + + /** {@inheritDoc} */ + @Override public Cluster.JobTrackerStatus getJobTrackerStatus() throws IOException, InterruptedException { + return Cluster.JobTrackerStatus.RUNNING; + } + + /** {@inheritDoc} */ + @Override public long getTaskTrackerExpiryInterval() throws IOException, InterruptedException { + return 0; + } + + /** {@inheritDoc} */ + @Override public AccessControlList getQueueAdmins(String queueName) throws IOException { + return new AccessControlList("*"); + } + + /** {@inheritDoc} */ + @Override public void killJob(JobID jobId) throws IOException, InterruptedException { + try { + cli.compute().execute(HadoopProtocolKillJobTask.class.getName(), + new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId())); + } + catch (GridClientException e) { + throw new IOException("Failed to kill job: " + jobId, e); + } + } + + /** {@inheritDoc} */ + @Override public void setJobPriority(JobID jobid, String priority) throws IOException, InterruptedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException, + InterruptedException { + return false; + } + + /** {@inheritDoc} */ + @Override public JobStatus getJobStatus(JobID jobId) throws IOException, InterruptedException { + try { + Long delay = conf.getLong(HadoopJobProperty.JOB_STATUS_POLL_DELAY.propertyName(), -1); + + HadoopProtocolTaskArguments args = delay >= 0 ? + new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), delay) : + new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()); + + HadoopJobStatus status = cli.compute().execute(HadoopProtocolJobStatusTask.class.getName(), args); + + if (status == null) + throw new IOException("Job tracker doesn't have any information about the job: " + jobId); + + return processStatus(status); + } + catch (GridClientException e) { + throw new IOException("Failed to get job status: " + jobId, e); + } + } + + /** {@inheritDoc} */ + @Override public Counters getJobCounters(JobID jobId) throws IOException, InterruptedException { + try { + final HadoopCounters counters = cli.compute().execute(HadoopProtocolJobCountersTask.class.getName(), + new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId())); + + if (counters == null) + throw new IOException("Job tracker doesn't have any information about the job: " + jobId); + + return new HadoopMapReduceCounters(counters); + } + catch (GridClientException e) { + throw new IOException("Failed to get job counters: " + jobId, e); + } + } + + /** {@inheritDoc} */ + @Override public TaskReport[] getTaskReports(JobID jobid, TaskType type) throws IOException, InterruptedException { + return new TaskReport[0]; + } + + /** {@inheritDoc} */ + @Override public String getFilesystemName() throws IOException, InterruptedException { + return FileSystem.get(conf).getUri().toString(); + } + + /** {@inheritDoc} */ + @Override public JobStatus[] getAllJobs() throws IOException, InterruptedException { + return new JobStatus[0]; + } + + /** {@inheritDoc} */ + @Override public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid, int fromEventId, int maxEvents) + throws IOException, InterruptedException { + return new TaskCompletionEvent[0]; + } + + /** {@inheritDoc} */ + @Override public String[] getTaskDiagnostics(TaskAttemptID taskId) throws IOException, InterruptedException { + return new String[0]; + } + + /** {@inheritDoc} */ + @Override public TaskTrackerInfo[] getActiveTrackers() throws IOException, InterruptedException { + return new TaskTrackerInfo[0]; + } + + /** {@inheritDoc} */ + @Override public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, InterruptedException { + return new TaskTrackerInfo[0]; + } + + /** {@inheritDoc} */ + @Override public String getSystemDir() throws IOException, InterruptedException { + Path sysDir = new Path(SYS_DIR); + + return sysDir.toString(); + } + + /** {@inheritDoc} */ + @Override public String getStagingAreaDir() throws IOException, InterruptedException { + String usr = UserGroupInformation.getCurrentUser().getShortUserName(); + + return HadoopUtils.stagingAreaDir(conf, usr).toString(); + } + + /** {@inheritDoc} */ + @Override public String getJobHistoryDir() throws IOException, InterruptedException { + return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf); + } + + /** {@inheritDoc} */ + @Override public QueueInfo[] getQueues() throws IOException, InterruptedException { + return new QueueInfo[0]; + } + + /** {@inheritDoc} */ + @Override public QueueInfo getQueue(String queueName) throws IOException, InterruptedException { + return null; + } + + /** {@inheritDoc} */ + @Override public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, InterruptedException { + return new QueueAclsInfo[0]; + } + + /** {@inheritDoc} */ + @Override public QueueInfo[] getRootQueues() throws IOException, InterruptedException { + return new QueueInfo[0]; + } + + /** {@inheritDoc} */ + @Override public QueueInfo[] getChildQueues(String queueName) throws IOException, InterruptedException { + return new QueueInfo[0]; + } + + /** {@inheritDoc} */ + @Override public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException, + InterruptedException { + return null; + } + + /** {@inheritDoc} */ + @Override public long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException, + InterruptedException { + return 0; + } + + /** {@inheritDoc} */ + @Override public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException, + InterruptedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID) throws IOException, + InterruptedException { + return null; + } + + /** {@inheritDoc} */ + @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { + return PROTO_VER; + } + + /** {@inheritDoc} */ + @Override public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) + throws IOException { + return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash); + } + + /** + * Process received status update. + * + * @param status Ignite status. + * @return Hadoop status. + */ + private JobStatus processStatus(HadoopJobStatus status) { + // IMPORTANT! This method will only work in single-threaded environment. It is valid at the moment because + // IgniteHadoopClientProtocolProvider creates new instance of this class for every new job and Job class + // serializes invocations of submitJob() and getJobStatus() methods. However, if any of these conditions will + // change in future and either protocol will serve statuses for several jobs or status update will not be + // serialized anymore, then we have to fallback to concurrent approach (e.g. using ConcurrentHashMap). + // (vozerov) + if (lastVer < status.version()) { + lastVer = status.version(); + + lastStatus = status; + } + else + assert lastStatus != null; + + return HadoopUtils.status(lastStatus, conf); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java new file mode 100644 index 0000000..ebdda9f --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.proto; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; + +import java.util.*; + +/** + * Task to get job counters. + */ +public class HadoopProtocolJobCountersTask extends HadoopProtocolTaskAdapter<HadoopCounters> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public HadoopCounters run(ComputeJobContext jobCtx, Hadoop hadoop, + HadoopProtocolTaskArguments args) throws IgniteCheckedException { + + UUID nodeId = UUID.fromString(args.<String>get(0)); + Integer id = args.get(1); + + assert nodeId != null; + assert id != null; + + return hadoop.counters(new HadoopJobId(nodeId, id)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java new file mode 100644 index 0000000..1734562 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.proto; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; + +import java.util.*; + +/** + * Job status task. + */ +public class HadoopProtocolJobStatusTask extends HadoopProtocolTaskAdapter<HadoopJobStatus> { + /** */ + private static final long serialVersionUID = 0L; + + /** Default poll delay */ + private static final long DFLT_POLL_DELAY = 100L; + + /** Attribute for held status. */ + private static final String ATTR_HELD = "held"; + + /** {@inheritDoc} */ + @Override public HadoopJobStatus run(final ComputeJobContext jobCtx, Hadoop hadoop, + HadoopProtocolTaskArguments args) throws IgniteCheckedException { + UUID nodeId = UUID.fromString(args.<String>get(0)); + Integer id = args.get(1); + Long pollDelay = args.get(2); + + assert nodeId != null; + assert id != null; + + HadoopJobId jobId = new HadoopJobId(nodeId, id); + + if (pollDelay == null) + pollDelay = DFLT_POLL_DELAY; + + if (pollDelay > 0) { + IgniteInternalFuture<?> fut = hadoop.finishFuture(jobId); + + if (fut != null) { + if (fut.isDone() || F.eq(jobCtx.getAttribute(ATTR_HELD), true)) + return hadoop.status(jobId); + else { + fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut0) { + jobCtx.callcc(); + } + }); + + jobCtx.setAttribute(ATTR_HELD, true); + + return jobCtx.holdcc(pollDelay); + } + } + else + return null; + } + else + return hadoop.status(jobId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java new file mode 100644 index 0000000..d173612 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.proto; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.internal.processors.hadoop.*; + +import java.util.*; + +/** + * Kill job task. + */ +public class HadoopProtocolKillJobTask extends HadoopProtocolTaskAdapter<Boolean> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public Boolean run(ComputeJobContext jobCtx, Hadoop hadoop, + HadoopProtocolTaskArguments args) throws IgniteCheckedException { + UUID nodeId = UUID.fromString(args.<String>get(0)); + Integer id = args.get(1); + + assert nodeId != null; + assert id != null; + + HadoopJobId jobId = new HadoopJobId(nodeId, id); + + return hadoop.kill(jobId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java new file mode 100644 index 0000000..2782530 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.proto; + +import org.apache.ignite.compute.*; +import org.apache.ignite.internal.processors.hadoop.*; + +/** + * Task to get the next job ID. + */ +public class HadoopProtocolNextTaskIdTask extends HadoopProtocolTaskAdapter<HadoopJobId> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public HadoopJobId run(ComputeJobContext jobCtx, Hadoop hadoop, + HadoopProtocolTaskArguments args) { + return hadoop.nextJobId(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java new file mode 100644 index 0000000..f65d9bb --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.proto; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.internal.processors.hadoop.*; + +import java.util.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.*; + +/** + * Submit job task. + */ +public class HadoopProtocolSubmitJobTask extends HadoopProtocolTaskAdapter<HadoopJobStatus> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public HadoopJobStatus run(ComputeJobContext jobCtx, Hadoop hadoop, + HadoopProtocolTaskArguments args) throws IgniteCheckedException { + UUID nodeId = UUID.fromString(args.<String>get(0)); + Integer id = args.get(1); + HadoopDefaultJobInfo info = args.get(2); + + assert nodeId != null; + assert id != null; + assert info != null; + + HadoopJobId jobId = new HadoopJobId(nodeId, id); + + hadoop.submit(jobId, info); + + HadoopJobStatus res = hadoop.status(jobId); + + if (res == null) // Submission failed. + res = new HadoopJobStatus(jobId, info.jobName(), info.user(), 0, 0, 0, 0, PHASE_CANCELLING, true, 1); + + return res; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java new file mode 100644 index 0000000..f763ccc --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.proto; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.resources.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Hadoop protocol task adapter. + */ +public abstract class HadoopProtocolTaskAdapter<R> implements ComputeTask<HadoopProtocolTaskArguments, R> { + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable HadoopProtocolTaskArguments arg) { + return Collections.singletonMap(new Job(arg), subgrid.get(0)); + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + return ComputeJobResultPolicy.REDUCE; + } + + /** {@inheritDoc} */ + @Nullable @Override public R reduce(List<ComputeJobResult> results) { + if (!F.isEmpty(results)) { + ComputeJobResult res = results.get(0); + + return res.getData(); + } + else + return null; + } + + /** + * Job wrapper. + */ + private class Job implements ComputeJob { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @SuppressWarnings("UnusedDeclaration") + @JobContextResource + private ComputeJobContext jobCtx; + + /** Argument. */ + private final HadoopProtocolTaskArguments args; + + /** + * Constructor. + * + * @param args Job argument. + */ + private Job(HadoopProtocolTaskArguments args) { + this.args = args; + } + + /** {@inheritDoc} */ + @Override public void cancel() { + // No-op. + } + + /** {@inheritDoc} */ + @Nullable @Override public Object execute() { + try { + return run(jobCtx, ((IgniteEx)ignite).hadoop(), args); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + } + + /** + * Run the task. + * + * @param jobCtx Job context. + * @param hadoop Hadoop facade. + * @param args Arguments. + * @return Job result. + * @throws IgniteCheckedException If failed. + */ + public abstract R run(ComputeJobContext jobCtx, Hadoop hadoop, HadoopProtocolTaskArguments args) + throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java new file mode 100644 index 0000000..5c470ba --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.proto; + +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Task arguments. + */ +public class HadoopProtocolTaskArguments implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Arguments. */ + private Object[] args; + + /** + * {@link Externalizable} support. + */ + public HadoopProtocolTaskArguments() { + // No-op. + } + + /** + * Constructor. + * + * @param args Arguments. + */ + public HadoopProtocolTaskArguments(Object... args) { + this.args = args; + } + + /** + * @param idx Argument index. + * @return Argument. + */ + @SuppressWarnings("unchecked") + @Nullable public <T> T get(int idx) { + return (args != null && args.length > idx) ? (T)args[idx] : null; + } + + /** + * @return Size. + */ + public int size() { + return args != null ? args.length : 0; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeArray(out, args); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + args = U.readArray(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopProtocolTaskArguments.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffle.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffle.java deleted file mode 100644 index 396124e..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffle.java +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.message.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.offheap.unsafe.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; - -import java.util.*; -import java.util.concurrent.*; - -/** - * Shuffle. - */ -public class GridHadoopShuffle extends GridHadoopComponent { - /** */ - private final ConcurrentMap<GridHadoopJobId, GridHadoopShuffleJob<UUID>> jobs = new ConcurrentHashMap<>(); - - /** */ - protected final GridUnsafeMemory mem = new GridUnsafeMemory(0); - - /** {@inheritDoc} */ - @Override public void start(GridHadoopContext ctx) throws IgniteCheckedException { - super.start(ctx); - - ctx.kernalContext().io().addUserMessageListener(GridTopic.TOPIC_HADOOP, - new IgniteBiPredicate<UUID, Object>() { - @Override public boolean apply(UUID nodeId, Object msg) { - return onMessageReceived(nodeId, (GridHadoopMessage)msg); - } - }); - } - - /** - * Stops shuffle. - * - * @param cancel If should cancel all ongoing activities. - */ - @Override public void stop(boolean cancel) { - for (GridHadoopShuffleJob job : jobs.values()) { - try { - job.close(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to close job.", e); - } - } - - jobs.clear(); - } - - /** - * Creates new shuffle job. - * - * @param jobId Job ID. - * @return Created shuffle job. - * @throws IgniteCheckedException If job creation failed. - */ - private GridHadoopShuffleJob<UUID> newJob(GridHadoopJobId jobId) throws IgniteCheckedException { - GridHadoopMapReducePlan plan = ctx.jobTracker().plan(jobId); - - GridHadoopShuffleJob<UUID> job = new GridHadoopShuffleJob<>(ctx.localNodeId(), log, - ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId())); - - UUID[] rdcAddrs = new UUID[plan.reducers()]; - - for (int i = 0; i < rdcAddrs.length; i++) { - UUID nodeId = plan.nodeForReducer(i); - - assert nodeId != null : "Plan is missing node for reducer [plan=" + plan + ", rdc=" + i + ']'; - - rdcAddrs[i] = nodeId; - } - - boolean init = job.initializeReduceAddresses(rdcAddrs); - - assert init; - - return job; - } - - /** - * @param nodeId Node ID to send message to. - * @param msg Message to send. - * @throws IgniteCheckedException If send failed. - */ - private void send0(UUID nodeId, Object msg) throws IgniteCheckedException { - ClusterNode node = ctx.kernalContext().discovery().node(nodeId); - - ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0); - } - - /** - * @param jobId Task info. - * @return Shuffle job. - */ - private GridHadoopShuffleJob<UUID> job(GridHadoopJobId jobId) throws IgniteCheckedException { - GridHadoopShuffleJob<UUID> res = jobs.get(jobId); - - if (res == null) { - res = newJob(jobId); - - GridHadoopShuffleJob<UUID> old = jobs.putIfAbsent(jobId, res); - - if (old != null) { - res.close(); - - res = old; - } - else if (res.reducersInitialized()) - startSending(res); - } - - return res; - } - - /** - * Starts message sending thread. - * - * @param shuffleJob Job to start sending for. - */ - private void startSending(GridHadoopShuffleJob<UUID> shuffleJob) { - shuffleJob.startSending(ctx.kernalContext().gridName(), - new IgniteInClosure2X<UUID, GridHadoopShuffleMessage>() { - @Override public void applyx(UUID dest, GridHadoopShuffleMessage msg) throws IgniteCheckedException { - send0(dest, msg); - } - } - ); - } - - /** - * Message received callback. - * - * @param src Sender node ID. - * @param msg Received message. - * @return {@code True}. - */ - public boolean onMessageReceived(UUID src, GridHadoopMessage msg) { - if (msg instanceof GridHadoopShuffleMessage) { - GridHadoopShuffleMessage m = (GridHadoopShuffleMessage)msg; - - try { - job(m.jobId()).onShuffleMessage(m); - } - catch (IgniteCheckedException e) { - U.error(log, "Message handling failed.", e); - } - - try { - // Reply with ack. - send0(src, new GridHadoopShuffleAck(m.id(), m.jobId())); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to reply back to shuffle message sender [snd=" + src + ", msg=" + msg + ']', e); - } - } - else if (msg instanceof GridHadoopShuffleAck) { - GridHadoopShuffleAck m = (GridHadoopShuffleAck)msg; - - try { - job(m.jobId()).onShuffleAck(m); - } - catch (IgniteCheckedException e) { - U.error(log, "Message handling failed.", e); - } - } - else - throw new IllegalStateException("Unknown message type received to Hadoop shuffle [src=" + src + - ", msg=" + msg + ']'); - - return true; - } - - /** - * @param taskCtx Task info. - * @return Output. - */ - public GridHadoopTaskOutput output(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - return job(taskCtx.taskInfo().jobId()).output(taskCtx); - } - - /** - * @param taskCtx Task info. - * @return Input. - */ - public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - return job(taskCtx.taskInfo().jobId()).input(taskCtx); - } - - /** - * @param jobId Job id. - */ - public void jobFinished(GridHadoopJobId jobId) { - GridHadoopShuffleJob job = jobs.remove(jobId); - - if (job != null) { - try { - job.close(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to close job: " + jobId, e); - } - } - } - - /** - * Flushes all the outputs for the given job to remote nodes. - * - * @param jobId Job ID. - * @return Future. - */ - public IgniteInternalFuture<?> flush(GridHadoopJobId jobId) { - GridHadoopShuffleJob job = jobs.get(jobId); - - if (job == null) - return new GridFinishedFutureEx<>(); - - try { - return job.flush(); - } - catch (IgniteCheckedException e) { - return new GridFinishedFutureEx<>(e); - } - } - - /** - * @return Memory. - */ - public GridUnsafeMemory memory() { - return mem; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleAck.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleAck.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleAck.java deleted file mode 100644 index a8a52a9..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleAck.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle; - -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.message.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; - -/** - * Acknowledgement message. - */ -public class GridHadoopShuffleAck implements GridHadoopMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - @GridToStringInclude - private long msgId; - - /** */ - @GridToStringInclude - private GridHadoopJobId jobId; - - /** - * - */ - public GridHadoopShuffleAck() { - // No-op. - } - - /** - * @param msgId Message ID. - */ - public GridHadoopShuffleAck(long msgId, GridHadoopJobId jobId) { - assert jobId != null; - - this.msgId = msgId; - this.jobId = jobId; - } - - /** - * @return Message ID. - */ - public long id() { - return msgId; - } - - /** - * @return Job ID. - */ - public GridHadoopJobId jobId() { - return jobId; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - jobId.writeExternal(out); - out.writeLong(msgId); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobId = new GridHadoopJobId(); - - jobId.readExternal(in); - msgId = in.readLong(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopShuffleAck.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleJob.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleJob.java deleted file mode 100644 index 545c1b8..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleJob.java +++ /dev/null @@ -1,593 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.counter.*; -import org.apache.ignite.internal.processors.hadoop.shuffle.collections.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.io.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.offheap.unsafe.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.worker.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.thread.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobProperty.*; -import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*; - -/** - * Shuffle job. - */ -public class GridHadoopShuffleJob<T> implements AutoCloseable { - /** */ - private static final int MSG_BUF_SIZE = 128 * 1024; - - /** */ - private final GridHadoopJob job; - - /** */ - private final GridUnsafeMemory mem; - - /** */ - private final boolean needPartitioner; - - /** Collection of task contexts for each reduce task. */ - private final Map<Integer, GridHadoopTaskContext> reducersCtx = new HashMap<>(); - - /** Reducers addresses. */ - private T[] reduceAddrs; - - /** Local reducers address. */ - private final T locReduceAddr; - - /** */ - private final GridHadoopShuffleMessage[] msgs; - - /** */ - private final AtomicReferenceArray<GridHadoopMultimap> maps; - - /** */ - private volatile IgniteInClosure2X<T, GridHadoopShuffleMessage> io; - - /** */ - protected ConcurrentMap<Long, IgniteBiTuple<GridHadoopShuffleMessage, GridFutureAdapterEx<?>>> sentMsgs = - new ConcurrentHashMap<>(); - - /** */ - private volatile GridWorker snd; - - /** Latch for remote addresses waiting. */ - private final CountDownLatch ioInitLatch = new CountDownLatch(1); - - /** Finished flag. Set on flush or close. */ - private volatile boolean flushed; - - /** */ - private final IgniteLogger log; - - /** - * @param locReduceAddr Local reducer address. - * @param log Logger. - * @param job Job. - * @param mem Memory. - * @param totalReducerCnt Amount of reducers in the Job. - * @param locReducers Reducers will work on current node. - * @throws IgniteCheckedException If error. - */ - public GridHadoopShuffleJob(T locReduceAddr, IgniteLogger log, GridHadoopJob job, GridUnsafeMemory mem, - int totalReducerCnt, int[] locReducers) throws IgniteCheckedException { - this.locReduceAddr = locReduceAddr; - this.job = job; - this.mem = mem; - this.log = log.getLogger(GridHadoopShuffleJob.class); - - if (!F.isEmpty(locReducers)) { - for (int rdc : locReducers) { - GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(GridHadoopTaskType.REDUCE, job.id(), rdc, 0, null); - - reducersCtx.put(rdc, job.getTaskContext(taskInfo)); - } - } - - needPartitioner = totalReducerCnt > 1; - - maps = new AtomicReferenceArray<>(totalReducerCnt); - msgs = new GridHadoopShuffleMessage[totalReducerCnt]; - } - - /** - * @param reduceAddrs Addresses of reducers. - * @return {@code True} if addresses were initialized by this call. - */ - public boolean initializeReduceAddresses(T[] reduceAddrs) { - if (this.reduceAddrs == null) { - this.reduceAddrs = reduceAddrs; - - return true; - } - - return false; - } - - /** - * @return {@code True} if reducers addresses were initialized. - */ - public boolean reducersInitialized() { - return reduceAddrs != null; - } - - /** - * @param gridName Grid name. - * @param io IO Closure for sending messages. - */ - @SuppressWarnings("BusyWait") - public void startSending(String gridName, IgniteInClosure2X<T, GridHadoopShuffleMessage> io) { - assert snd == null; - assert io != null; - - this.io = io; - - if (!flushed) { - snd = new GridWorker(gridName, "hadoop-shuffle-" + job.id(), log) { - @Override protected void body() throws InterruptedException { - try { - while (!isCancelled()) { - Thread.sleep(5); - - collectUpdatesAndSend(false); - } - } - catch (IgniteCheckedException e) { - throw new IllegalStateException(e); - } - } - }; - - new IgniteThread(snd).start(); - } - - ioInitLatch.countDown(); - } - - /** - * @param maps Maps. - * @param idx Index. - * @return Map. - */ - private GridHadoopMultimap getOrCreateMap(AtomicReferenceArray<GridHadoopMultimap> maps, int idx) { - GridHadoopMultimap map = maps.get(idx); - - if (map == null) { // Create new map. - map = get(job.info(), SHUFFLE_REDUCER_NO_SORTING, false) ? - new GridHadoopConcurrentHashMultimap(job.info(), mem, get(job.info(), PARTITION_HASHMAP_SIZE, 8 * 1024)): - new GridHadoopSkipList(job.info(), mem); - - if (!maps.compareAndSet(idx, null, map)) { - map.close(); - - return maps.get(idx); - } - } - - return map; - } - - /** - * @param msg Message. - * @throws IgniteCheckedException Exception. - */ - public void onShuffleMessage(GridHadoopShuffleMessage msg) throws IgniteCheckedException { - assert msg.buffer() != null; - assert msg.offset() > 0; - - GridHadoopTaskContext taskCtx = reducersCtx.get(msg.reducer()); - - GridHadoopPerformanceCounter perfCntr = GridHadoopPerformanceCounter.getCounter(taskCtx.counters(), null); - - perfCntr.onShuffleMessage(msg.reducer(), U.currentTimeMillis()); - - GridHadoopMultimap map = getOrCreateMap(maps, msg.reducer()); - - // Add data from message to the map. - try (GridHadoopMultimap.Adder adder = map.startAdding(taskCtx)) { - final GridUnsafeDataInput dataInput = new GridUnsafeDataInput(); - final UnsafeValue val = new UnsafeValue(msg.buffer()); - - msg.visit(new GridHadoopShuffleMessage.Visitor() { - /** */ - private GridHadoopMultimap.Key key; - - @Override public void onKey(byte[] buf, int off, int len) throws IgniteCheckedException { - dataInput.bytes(buf, off, off + len); - - key = adder.addKey(dataInput, key); - } - - @Override public void onValue(byte[] buf, int off, int len) { - val.off = off; - val.size = len; - - key.add(val); - } - }); - } - } - - /** - * @param ack Shuffle ack. - */ - @SuppressWarnings("ConstantConditions") - public void onShuffleAck(GridHadoopShuffleAck ack) { - IgniteBiTuple<GridHadoopShuffleMessage, GridFutureAdapterEx<?>> tup = sentMsgs.get(ack.id()); - - if (tup != null) - tup.get2().onDone(); - else - log.warning("Received shuffle ack for not registered shuffle id: " + ack); - } - - /** - * Unsafe value. - */ - private static class UnsafeValue implements GridHadoopMultimap.Value { - /** */ - private final byte[] buf; - - /** */ - private int off; - - /** */ - private int size; - - /** - * @param buf Buffer. - */ - private UnsafeValue(byte[] buf) { - assert buf != null; - - this.buf = buf; - } - - /** */ - @Override public int size() { - return size; - } - - /** */ - @Override public void copyTo(long ptr) { - UNSAFE.copyMemory(buf, BYTE_ARR_OFF + off, null, ptr, size); - } - } - - /** - * Sends map updates to remote reducers. - */ - private void collectUpdatesAndSend(boolean flush) throws IgniteCheckedException { - for (int i = 0; i < maps.length(); i++) { - GridHadoopMultimap map = maps.get(i); - - if (map == null || locReduceAddr.equals(reduceAddrs[i])) - continue; // Skip empty map and local node. - - if (msgs[i] == null) - msgs[i] = new GridHadoopShuffleMessage(job.id(), i, MSG_BUF_SIZE); - - final int idx = i; - - map.visit(false, new GridHadoopMultimap.Visitor() { - /** */ - private long keyPtr; - - /** */ - private int keySize; - - /** */ - private boolean keyAdded; - - /** {@inheritDoc} */ - @Override public void onKey(long keyPtr, int keySize) { - this.keyPtr = keyPtr; - this.keySize = keySize; - - keyAdded = false; - } - - private boolean tryAdd(long valPtr, int valSize) { - GridHadoopShuffleMessage msg = msgs[idx]; - - if (!keyAdded) { // Add key and value. - int size = keySize + valSize; - - if (!msg.available(size, false)) - return false; - - msg.addKey(keyPtr, keySize); - msg.addValue(valPtr, valSize); - - keyAdded = true; - - return true; - } - - if (!msg.available(valSize, true)) - return false; - - msg.addValue(valPtr, valSize); - - return true; - } - - /** {@inheritDoc} */ - @Override public void onValue(long valPtr, int valSize) { - if (tryAdd(valPtr, valSize)) - return; - - send(idx, keySize + valSize); - - keyAdded = false; - - if (!tryAdd(valPtr, valSize)) - throw new IllegalStateException(); - } - }); - - if (flush && msgs[i].offset() != 0) - send(i, 0); - } - } - - /** - * @param idx Index of message. - * @param newBufMinSize Min new buffer size. - */ - private void send(final int idx, int newBufMinSize) { - final GridFutureAdapterEx<?> fut = new GridFutureAdapterEx<>(); - - GridHadoopShuffleMessage msg = msgs[idx]; - - final long msgId = msg.id(); - - IgniteBiTuple<GridHadoopShuffleMessage, GridFutureAdapterEx<?>> old = sentMsgs.putIfAbsent(msgId, - new IgniteBiTuple<GridHadoopShuffleMessage, GridFutureAdapterEx<?>>(msg, fut)); - - assert old == null; - - try { - io.apply(reduceAddrs[idx], msg); - } - catch (GridClosureException e) { - fut.onDone(U.unwrap(e)); - } - - fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { - try { - f.get(); - - // Clean up the future from map only if there was no exception. - // Otherwise flush() should fail. - sentMsgs.remove(msgId); - } - catch (IgniteCheckedException e) { - log.error("Failed to send message.", e); - } - } - }); - - msgs[idx] = newBufMinSize == 0 ? null : new GridHadoopShuffleMessage(job.id(), idx, - Math.max(MSG_BUF_SIZE, newBufMinSize)); - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteCheckedException { - if (snd != null) { - snd.cancel(); - - try { - snd.join(); - } - catch (InterruptedException e) { - throw new IgniteInterruptedCheckedException(e); - } - } - - close(maps); - } - - /** - * @param maps Maps. - */ - private void close(AtomicReferenceArray<GridHadoopMultimap> maps) { - for (int i = 0; i < maps.length(); i++) { - GridHadoopMultimap map = maps.get(i); - - if (map != null) - map.close(); - } - } - - /** - * @return Future. - */ - @SuppressWarnings("unchecked") - public IgniteInternalFuture<?> flush() throws IgniteCheckedException { - if (log.isDebugEnabled()) - log.debug("Flushing job " + job.id() + " on address " + locReduceAddr); - - flushed = true; - - if (maps.length() == 0) - return new GridFinishedFutureEx<>(); - - U.await(ioInitLatch); - - GridWorker snd0 = snd; - - if (snd0 != null) { - if (log.isDebugEnabled()) - log.debug("Cancelling sender thread."); - - snd0.cancel(); - - try { - snd0.join(); - - if (log.isDebugEnabled()) - log.debug("Finished waiting for sending thread to complete on shuffle job flush: " + job.id()); - } - catch (InterruptedException e) { - throw new IgniteInterruptedCheckedException(e); - } - } - - collectUpdatesAndSend(true); // With flush. - - if (log.isDebugEnabled()) - log.debug("Finished sending collected updates to remote reducers: " + job.id()); - - GridCompoundFuture fut = new GridCompoundFuture<>(); - - for (IgniteBiTuple<GridHadoopShuffleMessage, GridFutureAdapterEx<?>> tup : sentMsgs.values()) - fut.add(tup.get2()); - - fut.markInitialized(); - - if (log.isDebugEnabled()) - log.debug("Collected futures to compound futures for flush: " + sentMsgs.size()); - - return fut; - } - - /** - * @param taskCtx Task context. - * @return Output. - * @throws IgniteCheckedException If failed. - */ - public GridHadoopTaskOutput output(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - switch (taskCtx.taskInfo().type()) { - case MAP: - assert !job.info().hasCombiner() : "The output creation is allowed if combiner has not been defined."; - - case COMBINE: - return new PartitionedOutput(taskCtx); - - default: - throw new IllegalStateException("Illegal type: " + taskCtx.taskInfo().type()); - } - } - - /** - * @param taskCtx Task context. - * @return Input. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings("unchecked") - public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - switch (taskCtx.taskInfo().type()) { - case REDUCE: - int reducer = taskCtx.taskInfo().taskNumber(); - - GridHadoopMultimap m = maps.get(reducer); - - if (m != null) - return m.input(taskCtx); - - return new GridHadoopTaskInput() { // Empty input. - @Override public boolean next() { - return false; - } - - @Override public Object key() { - throw new IllegalStateException(); - } - - @Override public Iterator<?> values() { - throw new IllegalStateException(); - } - - @Override public void close() { - // No-op. - } - }; - - default: - throw new IllegalStateException("Illegal type: " + taskCtx.taskInfo().type()); - } - } - - /** - * Partitioned output. - */ - private class PartitionedOutput implements GridHadoopTaskOutput { - /** */ - private final GridHadoopTaskOutput[] adders = new GridHadoopTaskOutput[maps.length()]; - - /** */ - private GridHadoopPartitioner partitioner; - - /** */ - private final GridHadoopTaskContext taskCtx; - - /** - * Constructor. - * @param taskCtx Task context. - */ - private PartitionedOutput(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - this.taskCtx = taskCtx; - - if (needPartitioner) - partitioner = taskCtx.partitioner(); - } - - /** {@inheritDoc} */ - @Override public void write(Object key, Object val) throws IgniteCheckedException { - int part = 0; - - if (partitioner != null) { - part = partitioner.partition(key, val, adders.length); - - if (part < 0 || part >= adders.length) - throw new IgniteCheckedException("Invalid partition: " + part); - } - - GridHadoopTaskOutput out = adders[part]; - - if (out == null) - adders[part] = out = getOrCreateMap(maps, part).startAdding(taskCtx); - - out.write(key, val); - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteCheckedException { - for (GridHadoopTaskOutput adder : adders) { - if (adder != null) - adder.close(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleMessage.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleMessage.java deleted file mode 100644 index 24ebc0c..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleMessage.java +++ /dev/null @@ -1,242 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.message.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*; - -/** - * Shuffle message. - */ -public class GridHadoopShuffleMessage implements GridHadoopMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private static final AtomicLong ids = new AtomicLong(); - - /** */ - private static final byte MARKER_KEY = (byte)17; - - /** */ - private static final byte MARKER_VALUE = (byte)31; - - /** */ - @GridToStringInclude - private long msgId; - - /** */ - @GridToStringInclude - private GridHadoopJobId jobId; - - /** */ - @GridToStringInclude - private int reducer; - - /** */ - private byte[] buf; - - /** */ - @GridToStringInclude - private int off; - - /** - * - */ - public GridHadoopShuffleMessage() { - // No-op. - } - - /** - * @param size Size. - */ - public GridHadoopShuffleMessage(GridHadoopJobId jobId, int reducer, int size) { - assert jobId != null; - - buf = new byte[size]; - - this.jobId = jobId; - this.reducer = reducer; - - msgId = ids.incrementAndGet(); - } - - /** - * @return Message ID. - */ - public long id() { - return msgId; - } - - /** - * @return Job ID. - */ - public GridHadoopJobId jobId() { - return jobId; - } - - /** - * @return Reducer. - */ - public int reducer() { - return reducer; - } - - /** - * @return Buffer. - */ - public byte[] buffer() { - return buf; - } - - /** - * @return Offset. - */ - public int offset() { - return off; - } - - /** - * @param size Size. - * @param valOnly Only value wll be added. - * @return {@code true} If this message can fit additional data of this size - */ - public boolean available(int size, boolean valOnly) { - size += valOnly ? 5 : 10; - - if (off + size > buf.length) { - if (off == 0) { // Resize if requested size is too big. - buf = new byte[size]; - - return true; - } - - return false; - } - - return true; - } - - /** - * @param keyPtr Key pointer. - * @param keySize Key size. - */ - public void addKey(long keyPtr, int keySize) { - add(MARKER_KEY, keyPtr, keySize); - } - - /** - * @param valPtr Value pointer. - * @param valSize Value size. - */ - public void addValue(long valPtr, int valSize) { - add(MARKER_VALUE, valPtr, valSize); - } - - /** - * @param marker Marker. - * @param ptr Pointer. - * @param size Size. - */ - private void add(byte marker, long ptr, int size) { - buf[off++] = marker; - - UNSAFE.putInt(buf, BYTE_ARR_OFF + off, size); - - off += 4; - - UNSAFE.copyMemory(null, ptr, buf, BYTE_ARR_OFF + off, size); - - off += size; - } - - /** - * @param v Visitor. - */ - public void visit(Visitor v) throws IgniteCheckedException { - for (int i = 0; i < off;) { - byte marker = buf[i++]; - - int size = UNSAFE.getInt(buf, BYTE_ARR_OFF + i); - - i += 4; - - if (marker == MARKER_VALUE) - v.onValue(buf, i, size); - else if (marker == MARKER_KEY) - v.onKey(buf, i, size); - else - throw new IllegalStateException(); - - i += size; - } - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - jobId.writeExternal(out); - out.writeLong(msgId); - out.writeInt(reducer); - out.writeInt(off); - U.writeByteArray(out, buf); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobId = new GridHadoopJobId(); - - jobId.readExternal(in); - msgId = in.readLong(); - reducer = in.readInt(); - off = in.readInt(); - buf = U.readByteArray(in); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopShuffleMessage.class, this); - } - - /** - * Visitor. - */ - public static interface Visitor { - /** - * @param buf Buffer. - * @param off Offset. - * @param len Length. - */ - public void onKey(byte[] buf, int off, int len) throws IgniteCheckedException; - - /** - * @param buf Buffer. - * @param off Offset. - * @param len Length. - */ - public void onValue(byte[] buf, int off, int len) throws IgniteCheckedException; - } -}