http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java new file mode 100644 index 0000000..e95b8cb --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.message.*; +import org.apache.ignite.internal.processors.hadoop.shuffle.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.offheap.unsafe.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.*; + +/** + * Hadoop process base. + */ +@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") +public class HadoopChildProcessRunner { + /** Node process descriptor. */ + private HadoopProcessDescriptor nodeDesc; + + /** Message processing executor service. */ + private ExecutorService msgExecSvc; + + /** Task executor service. */ + private HadoopExecutorService execSvc; + + /** */ + protected GridUnsafeMemory mem = new GridUnsafeMemory(0); + + /** External communication. */ + private HadoopExternalCommunication comm; + + /** Logger. */ + private IgniteLogger log; + + /** Init guard. */ + private final AtomicBoolean initGuard = new AtomicBoolean(); + + /** Start time. */ + private long startTime; + + /** Init future. */ + private final GridFutureAdapterEx<?> initFut = new GridFutureAdapterEx<>(); + + /** Job instance. */ + private HadoopJob job; + + /** Number of uncompleted tasks. */ + private final AtomicInteger pendingTasks = new AtomicInteger(); + + /** Shuffle job. */ + private HadoopShuffleJob<HadoopProcessDescriptor> shuffleJob; + + /** Concurrent mappers. */ + private int concMappers; + + /** Concurrent reducers. */ + private int concReducers; + + /** + * Starts child process runner. + */ + public void start(HadoopExternalCommunication comm, HadoopProcessDescriptor nodeDesc, + ExecutorService msgExecSvc, IgniteLogger parentLog) + throws IgniteCheckedException { + this.comm = comm; + this.nodeDesc = nodeDesc; + this.msgExecSvc = msgExecSvc; + + comm.setListener(new MessageListener()); + log = parentLog.getLogger(HadoopChildProcessRunner.class); + + startTime = U.currentTimeMillis(); + + // At this point node knows that this process has started. + comm.sendMessage(this.nodeDesc, new HadoopProcessStartedAck()); + } + + /** + * Initializes process for task execution. + * + * @param req Initialization request. + */ + private void prepareProcess(HadoopPrepareForJobRequest req) { + if (initGuard.compareAndSet(false, true)) { + try { + if (log.isDebugEnabled()) + log.debug("Initializing external hadoop task: " + req); + + assert job == null; + + job = req.jobInfo().createJob(req.jobId(), log); + + job.initialize(true, nodeDesc.processId()); + + shuffleJob = new HadoopShuffleJob<>(comm.localProcessDescriptor(), log, job, mem, + req.totalReducerCount(), req.localReducers()); + + initializeExecutors(req); + + if (log.isDebugEnabled()) + log.debug("External process initialized [initWaitTime=" + + (U.currentTimeMillis() - startTime) + ']'); + + initFut.onDone(null, null); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to initialize process: " + req, e); + + initFut.onDone(e); + } + } + else + log.warning("Duplicate initialize process request received (will ignore): " + req); + } + + /** + * @param req Task execution request. + */ + private void runTasks(final HadoopTaskExecutionRequest req) { + if (!initFut.isDone() && log.isDebugEnabled()) + log.debug("Will wait for process initialization future completion: " + req); + + initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + try { + // Make sure init was successful. + f.get(); + + boolean set = pendingTasks.compareAndSet(0, req.tasks().size()); + + assert set; + + HadoopTaskInfo info = F.first(req.tasks()); + + assert info != null; + + int size = info.type() == MAP ? concMappers : concReducers; + +// execSvc.setCorePoolSize(size); +// execSvc.setMaximumPoolSize(size); + + if (log.isDebugEnabled()) + log.debug("Set executor service size for task type [type=" + info.type() + + ", size=" + size + ']'); + + for (HadoopTaskInfo taskInfo : req.tasks()) { + if (log.isDebugEnabled()) + log.debug("Submitted task for external execution: " + taskInfo); + + execSvc.submit(new HadoopRunnableTask(log, job, mem, taskInfo, nodeDesc.parentNodeId()) { + @Override protected void onTaskFinished(HadoopTaskStatus status) { + onTaskFinished0(this, status); + } + + @Override protected HadoopTaskInput createInput(HadoopTaskContext ctx) + throws IgniteCheckedException { + return shuffleJob.input(ctx); + } + + @Override protected HadoopTaskOutput createOutput(HadoopTaskContext ctx) + throws IgniteCheckedException { + return shuffleJob.output(ctx); + } + }); + } + } + catch (IgniteCheckedException e) { + for (HadoopTaskInfo info : req.tasks()) + notifyTaskFinished(info, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false); + } + } + }); + } + + /** + * Creates executor services. + * + * @param req Init child process request. + */ + private void initializeExecutors(HadoopPrepareForJobRequest req) { + int cpus = Runtime.getRuntime().availableProcessors(); +// +// concMappers = get(req.jobInfo(), EXTERNAL_CONCURRENT_MAPPERS, cpus); +// concReducers = get(req.jobInfo(), EXTERNAL_CONCURRENT_REDUCERS, cpus); + + execSvc = new HadoopExecutorService(log, "", cpus * 2, 1024); + } + + /** + * Updates external process map so that shuffle can proceed with sending messages to reducers. + * + * @param req Update request. + */ + private void updateTasks(final HadoopJobInfoUpdateRequest req) { + initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> gridFut) { + assert initGuard.get(); + + assert req.jobId().equals(job.id()); + + if (req.reducersAddresses() != null) { + if (shuffleJob.initializeReduceAddresses(req.reducersAddresses())) { + shuffleJob.startSending("external", + new IgniteInClosure2X<HadoopProcessDescriptor, HadoopShuffleMessage>() { + @Override public void applyx(HadoopProcessDescriptor dest, + HadoopShuffleMessage msg) throws IgniteCheckedException { + comm.sendMessage(dest, msg); + } + }); + } + } + } + }); + } + + /** + * Stops all executors and running tasks. + */ + private void shutdown() { + if (execSvc != null) + execSvc.shutdown(5000); + + if (msgExecSvc != null) + msgExecSvc.shutdownNow(); + + try { + job.dispose(true); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to dispose job.", e); + } + } + + /** + * Notifies node about task finish. + * + * @param run Finished task runnable. + * @param status Task status. + */ + private void onTaskFinished0(HadoopRunnableTask run, HadoopTaskStatus status) { + HadoopTaskInfo info = run.taskInfo(); + + int pendingTasks0 = pendingTasks.decrementAndGet(); + + if (log.isDebugEnabled()) + log.debug("Hadoop task execution finished [info=" + info + + ", state=" + status.state() + ", waitTime=" + run.waitTime() + ", execTime=" + run.executionTime() + + ", pendingTasks=" + pendingTasks0 + + ", err=" + status.failCause() + ']'); + + assert info.type() == MAP || info.type() == REDUCE : "Only MAP or REDUCE tasks are supported."; + + boolean flush = pendingTasks0 == 0 && info.type() == MAP; + + notifyTaskFinished(info, status, flush); + } + + /** + * @param taskInfo Finished task info. + * @param status Task status. + */ + private void notifyTaskFinished(final HadoopTaskInfo taskInfo, final HadoopTaskStatus status, + boolean flush) { + + final HadoopTaskState state = status.state(); + final Throwable err = status.failCause(); + + if (!flush) { + try { + if (log.isDebugEnabled()) + log.debug("Sending notification to parent node [taskInfo=" + taskInfo + ", state=" + state + + ", err=" + err + ']'); + + comm.sendMessage(nodeDesc, new HadoopTaskFinishedMessage(taskInfo, status)); + } + catch (IgniteCheckedException e) { + log.error("Failed to send message to parent node (will terminate child process).", e); + + shutdown(); + + terminate(); + } + } + else { + if (log.isDebugEnabled()) + log.debug("Flushing shuffle messages before sending last task completion notification [taskInfo=" + + taskInfo + ", state=" + state + ", err=" + err + ']'); + + final long start = U.currentTimeMillis(); + + try { + shuffleJob.flush().listenAsync(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + long end = U.currentTimeMillis(); + + if (log.isDebugEnabled()) + log.debug("Finished flushing shuffle messages [taskInfo=" + taskInfo + + ", flushTime=" + (end - start) + ']'); + + try { + // Check for errors on shuffle. + f.get(); + + notifyTaskFinished(taskInfo, status, false); + } + catch (IgniteCheckedException e) { + log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo + + ", state=" + state + ", err=" + err + ']', e); + + notifyTaskFinished(taskInfo, + new HadoopTaskStatus(HadoopTaskState.FAILED, e), false); + } + } + }); + } + catch (IgniteCheckedException e) { + log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo + + ", state=" + state + ", err=" + err + ']', e); + + notifyTaskFinished(taskInfo, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false); + } + } + } + + /** + * Checks if message was received from parent node and prints warning if not. + * + * @param desc Sender process ID. + * @param msg Received message. + * @return {@code True} if received from parent node. + */ + private boolean validateNodeMessage(HadoopProcessDescriptor desc, HadoopMessage msg) { + if (!nodeDesc.processId().equals(desc.processId())) { + log.warning("Received process control request from unknown process (will ignore) [desc=" + desc + + ", msg=" + msg + ']'); + + return false; + } + + return true; + } + + /** + * Stops execution of this process. + */ + private void terminate() { + System.exit(1); + } + + /** + * Message listener. + */ + private class MessageListener implements HadoopMessageListener { + /** {@inheritDoc} */ + @Override public void onMessageReceived(final HadoopProcessDescriptor desc, final HadoopMessage msg) { + if (msg instanceof HadoopTaskExecutionRequest) { + if (validateNodeMessage(desc, msg)) + runTasks((HadoopTaskExecutionRequest)msg); + } + else if (msg instanceof HadoopJobInfoUpdateRequest) { + if (validateNodeMessage(desc, msg)) + updateTasks((HadoopJobInfoUpdateRequest)msg); + } + else if (msg instanceof HadoopPrepareForJobRequest) { + if (validateNodeMessage(desc, msg)) + prepareProcess((HadoopPrepareForJobRequest)msg); + } + else if (msg instanceof HadoopShuffleMessage) { + if (log.isTraceEnabled()) + log.trace("Received shuffle message [desc=" + desc + ", msg=" + msg + ']'); + + initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + try { + HadoopShuffleMessage m = (HadoopShuffleMessage)msg; + + shuffleJob.onShuffleMessage(m); + + comm.sendMessage(desc, new HadoopShuffleAck(m.id(), m.jobId())); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to process hadoop shuffle message [desc=" + desc + ", msg=" + msg + ']', e); + } + } + }); + } + else if (msg instanceof HadoopShuffleAck) { + if (log.isTraceEnabled()) + log.trace("Received shuffle ack [desc=" + desc + ", msg=" + msg + ']'); + + shuffleJob.onShuffleAck((HadoopShuffleAck)msg); + } + else + log.warning("Unknown message received (will ignore) [desc=" + desc + ", msg=" + msg + ']'); + } + + /** {@inheritDoc} */ + @Override public void onConnectionLost(HadoopProcessDescriptor desc) { + if (log.isDebugEnabled()) + log.debug("Lost connection with remote process: " + desc); + + if (desc == null) + U.warn(log, "Handshake failed."); + else if (desc.processId().equals(nodeDesc.processId())) { + log.warning("Child process lost connection with parent node (will terminate child process)."); + + shutdown(); + + terminate(); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopExternalProcessStarter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopExternalProcessStarter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopExternalProcessStarter.java new file mode 100644 index 0000000..3a94d43 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopExternalProcessStarter.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.logger.log4j.*; +import org.apache.ignite.marshaller.optimized.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Hadoop external process base class. + */ +public class HadoopExternalProcessStarter { + /** Path to Log4j configuration file. */ + public static final String DFLT_LOG4J_CONFIG = "config/ignite-log4j.xml"; + + /** Arguments. */ + private Args args; + + /** System out. */ + private OutputStream out; + + /** System err. */ + private OutputStream err; + + /** + * @param args Parsed arguments. + */ + public HadoopExternalProcessStarter(Args args) { + this.args = args; + } + + /** + * @param cmdArgs Process arguments. + */ + public static void main(String[] cmdArgs) { + try { + Args args = arguments(cmdArgs); + + new HadoopExternalProcessStarter(args).run(); + } + catch (Exception e) { + System.err.println("Failed"); + + System.err.println(e.getMessage()); + + e.printStackTrace(System.err); + } + } + + /** + * + * @throws Exception + */ + public void run() throws Exception { + U.setWorkDirectory(args.workDir, U.getIgniteHome()); + + File outputDir = outputDirectory(); + + initializeStreams(outputDir); + + ExecutorService msgExecSvc = Executors.newFixedThreadPool( + Integer.getInteger("MSG_THREAD_POOL_SIZE", Runtime.getRuntime().availableProcessors() * 2)); + + IgniteLogger log = logger(outputDir); + + HadoopExternalCommunication comm = new HadoopExternalCommunication( + args.nodeId, + args.childProcId, + new OptimizedMarshaller(), + log, + msgExecSvc, + "external" + ); + + comm.start(); + + HadoopProcessDescriptor nodeDesc = new HadoopProcessDescriptor(args.nodeId, args.parentProcId); + nodeDesc.address(args.addr); + nodeDesc.tcpPort(args.tcpPort); + nodeDesc.sharedMemoryPort(args.shmemPort); + + HadoopChildProcessRunner runner = new HadoopChildProcessRunner(); + + runner.start(comm, nodeDesc, msgExecSvc, log); + + System.err.println("Started"); + System.err.flush(); + + System.setOut(new PrintStream(out)); + System.setErr(new PrintStream(err)); + } + + /** + * @param outputDir Directory for process output. + * @throws Exception + */ + private void initializeStreams(File outputDir) throws Exception { + out = new FileOutputStream(new File(outputDir, args.childProcId + ".out")); + err = new FileOutputStream(new File(outputDir, args.childProcId + ".err")); + } + + /** + * @return Path to output directory. + * @throws IOException If failed. + */ + private File outputDirectory() throws IOException { + File f = new File(args.out); + + if (!f.exists()) { + if (!f.mkdirs()) + throw new IOException("Failed to create output directory: " + args.out); + } + else { + if (f.isFile()) + throw new IOException("Output directory is a file: " + args.out); + } + + return f; + } + + /** + * @param outputDir Directory for process output. + * @return Logger. + */ + private IgniteLogger logger(final File outputDir) { + final URL url = U.resolveIgniteUrl(DFLT_LOG4J_CONFIG); + + Log4JLogger logger; + + try { + logger = url != null ? new Log4JLogger(url) : new Log4JLogger(true); + } + catch (IgniteCheckedException e) { + System.err.println("Failed to create URL-based logger. Will use default one."); + + e.printStackTrace(); + + logger = new Log4JLogger(true); + } + + logger.updateFilePath(new IgniteClosure<String, String>() { + @Override public String apply(String s) { + return new File(outputDir, args.childProcId + ".log").getAbsolutePath(); + } + }); + + return logger; + } + + /** + * @param processArgs Process arguments. + * @return Child process instance. + */ + private static Args arguments(String[] processArgs) throws Exception { + Args args = new Args(); + + for (int i = 0; i < processArgs.length; i++) { + String arg = processArgs[i]; + + switch (arg) { + case "-cpid": { + if (i == processArgs.length - 1) + throw new Exception("Missing process ID for '-cpid' parameter"); + + String procIdStr = processArgs[++i]; + + args.childProcId = UUID.fromString(procIdStr); + + break; + } + + case "-ppid": { + if (i == processArgs.length - 1) + throw new Exception("Missing process ID for '-ppid' parameter"); + + String procIdStr = processArgs[++i]; + + args.parentProcId = UUID.fromString(procIdStr); + + break; + } + + case "-nid": { + if (i == processArgs.length - 1) + throw new Exception("Missing node ID for '-nid' parameter"); + + String nodeIdStr = processArgs[++i]; + + args.nodeId = UUID.fromString(nodeIdStr); + + break; + } + + case "-addr": { + if (i == processArgs.length - 1) + throw new Exception("Missing node address for '-addr' parameter"); + + args.addr = processArgs[++i]; + + break; + } + + case "-tport": { + if (i == processArgs.length - 1) + throw new Exception("Missing tcp port for '-tport' parameter"); + + args.tcpPort = Integer.parseInt(processArgs[++i]); + + break; + } + + case "-sport": { + if (i == processArgs.length - 1) + throw new Exception("Missing shared memory port for '-sport' parameter"); + + args.shmemPort = Integer.parseInt(processArgs[++i]); + + break; + } + + case "-out": { + if (i == processArgs.length - 1) + throw new Exception("Missing output folder name for '-out' parameter"); + + args.out = processArgs[++i]; + + break; + } + + case "-wd": { + if (i == processArgs.length - 1) + throw new Exception("Missing work folder name for '-wd' parameter"); + + args.workDir = processArgs[++i]; + + break; + } + } + } + + return args; + } + + /** + * Execution arguments. + */ + private static class Args { + /** Process ID. */ + private UUID childProcId; + + /** Process ID. */ + private UUID parentProcId; + + /** Process ID. */ + private UUID nodeId; + + /** Node address. */ + private String addr; + + /** TCP port */ + private int tcpPort; + + /** Shmem port. */ + private int shmemPort = -1; + + /** Output folder. */ + private String out; + + /** Work directory. */ + private String workDir; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopAbstractCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopAbstractCommunicationClient.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopAbstractCommunicationClient.java deleted file mode 100644 index 5dee79b..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopAbstractCommunicationClient.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication; - -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.concurrent.atomic.*; - -/** - * Implements basic lifecycle for communication clients. - */ -public abstract class GridHadoopAbstractCommunicationClient implements GridHadoopCommunicationClient { - /** Time when this client was last used. */ - private volatile long lastUsed = U.currentTimeMillis(); - - /** Reservations. */ - private final AtomicInteger reserves = new AtomicInteger(); - - /** {@inheritDoc} */ - @Override public boolean close() { - return reserves.compareAndSet(0, -1); - } - - /** {@inheritDoc} */ - @Override public void forceClose() { - reserves.set(-1); - } - - /** {@inheritDoc} */ - @Override public boolean closed() { - return reserves.get() == -1; - } - - /** {@inheritDoc} */ - @Override public boolean reserve() { - while (true) { - int r = reserves.get(); - - if (r == -1) - return false; - - if (reserves.compareAndSet(r, r + 1)) - return true; - } - } - - /** {@inheritDoc} */ - @Override public void release() { - while (true) { - int r = reserves.get(); - - if (r == -1) - return; - - if (reserves.compareAndSet(r, r - 1)) - return; - } - } - - /** {@inheritDoc} */ - @Override public boolean reserved() { - return reserves.get() > 0; - } - - /** {@inheritDoc} */ - @Override public long getIdleTime() { - return U.currentTimeMillis() - lastUsed; - } - - /** - * Updates used time. - */ - protected void markUsed() { - lastUsed = U.currentTimeMillis(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopAbstractCommunicationClient.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopCommunicationClient.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopCommunicationClient.java deleted file mode 100644 index b375b55..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopCommunicationClient.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.message.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; - -/** - * - */ -public interface GridHadoopCommunicationClient { - /** - * @return {@code True} if client has been closed by this call, - * {@code false} if failed to close client (due to concurrent reservation or concurrent close). - */ - public boolean close(); - - /** - * Forces client close. - */ - public void forceClose(); - - /** - * @return {@code True} if client is closed; - */ - public boolean closed(); - - /** - * @return {@code True} if client was reserved, {@code false} otherwise. - */ - public boolean reserve(); - - /** - * Releases this client by decreasing reservations. - */ - public void release(); - - /** - * @return {@code True} if client was reserved. - */ - public boolean reserved(); - - /** - * Gets idle time of this client. - * - * @return Idle time of this client. - */ - public long getIdleTime(); - - /** - * @param desc Process descriptor. - * @param msg Message to send. - * @throws IgniteCheckedException If failed. - */ - public void sendMessage(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) throws IgniteCheckedException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunication.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunication.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunication.java deleted file mode 100644 index e3457a9..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunication.java +++ /dev/null @@ -1,1431 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.message.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.ipc.*; -import org.apache.ignite.internal.util.ipc.shmem.*; -import org.apache.ignite.internal.util.nio.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.worker.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.thread.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; -import java.nio.*; -import java.nio.channels.*; -import java.util.*; -import java.util.concurrent.*; - -/** - * Hadoop external communication class. - */ -public class GridHadoopExternalCommunication { - /** IPC error message. */ - public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " + - "(switching to TCP, may be slower)."; - - /** Default port which node sets listener to (value is <tt>47100</tt>). */ - public static final int DFLT_PORT = 27100; - - /** Default connection timeout (value is <tt>1000</tt>ms). */ - public static final long DFLT_CONN_TIMEOUT = 1000; - - /** Default Maximum connection timeout (value is <tt>600,000</tt>ms). */ - public static final long DFLT_MAX_CONN_TIMEOUT = 10 * 60 * 1000; - - /** Default reconnect attempts count (value is <tt>10</tt>). */ - public static final int DFLT_RECONNECT_CNT = 10; - - /** Default message queue limit per connection (for incoming and outgoing . */ - public static final int DFLT_MSG_QUEUE_LIMIT = GridNioServer.DFLT_SEND_QUEUE_LIMIT; - - /** - * Default count of selectors for TCP server equals to - * {@code "Math.min(4, Runtime.getRuntime().availableProcessors())"}. - */ - public static final int DFLT_SELECTORS_CNT = 1; - - /** Node ID meta for session. */ - private static final int PROCESS_META = GridNioSessionMetaKey.nextUniqueKey(); - - /** Handshake timeout meta for session. */ - private static final int HANDSHAKE_FINISH_META = GridNioSessionMetaKey.nextUniqueKey(); - - /** Message tracker meta for session. */ - private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey(); - - /** - * Default local port range (value is <tt>100</tt>). - * See {@link #setLocalPortRange(int)} for details. - */ - public static final int DFLT_PORT_RANGE = 100; - - /** Default value for {@code TCP_NODELAY} socket option (value is <tt>true</tt>). */ - public static final boolean DFLT_TCP_NODELAY = true; - - /** Server listener. */ - private final GridNioServerListener<GridHadoopMessage> srvLsnr = - new GridNioServerListenerAdapter<GridHadoopMessage>() { - @Override public void onConnected(GridNioSession ses) { - GridHadoopProcessDescriptor desc = ses.meta(PROCESS_META); - - assert desc != null : "Received connected notification without finished handshake: " + ses; - } - - /** {@inheritDoc} */ - @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { - if (log.isDebugEnabled()) - log.debug("Closed connection for session: " + ses); - - if (e != null) - U.error(log, "Session disconnected due to exception: " + ses, e); - - GridHadoopProcessDescriptor desc = ses.meta(PROCESS_META); - - if (desc != null) { - GridHadoopCommunicationClient rmv = clients.remove(desc.processId()); - - if (rmv != null) - rmv.forceClose(); - } - - GridHadoopMessageListener lsnr0 = lsnr; - - if (lsnr0 != null) - // Notify listener about connection close. - lsnr0.onConnectionLost(desc); - } - - /** {@inheritDoc} */ - @Override public void onMessage(GridNioSession ses, GridHadoopMessage msg) { - notifyListener(ses.<GridHadoopProcessDescriptor>meta(PROCESS_META), msg); - - if (msgQueueLimit > 0) { - GridNioMessageTracker tracker = ses.meta(TRACKER_META); - - assert tracker != null : "Missing tracker for limited message queue: " + ses; - - tracker.run(); - } - } - }; - - /** Logger. */ - private IgniteLogger log; - - /** Local process descriptor. */ - private GridHadoopProcessDescriptor locProcDesc; - - /** Marshaller. */ - private Marshaller marsh; - - /** Message notification executor service. */ - private ExecutorService execSvc; - - /** Grid name. */ - private String gridName; - - /** Complex variable that represents this node IP address. */ - private volatile InetAddress locHost; - - /** Local port which node uses. */ - private int locPort = DFLT_PORT; - - /** Local port range. */ - private int locPortRange = DFLT_PORT_RANGE; - - /** Local port which node uses to accept shared memory connections. */ - private int shmemPort = -1; - - /** Allocate direct buffer or heap buffer. */ - private boolean directBuf = true; - - /** Connect timeout. */ - private long connTimeout = DFLT_CONN_TIMEOUT; - - /** Maximum connect timeout. */ - private long maxConnTimeout = DFLT_MAX_CONN_TIMEOUT; - - /** Reconnect attempts count. */ - @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) - private int reconCnt = DFLT_RECONNECT_CNT; - - /** Socket send buffer. */ - private int sockSndBuf; - - /** Socket receive buffer. */ - private int sockRcvBuf; - - /** Message queue limit. */ - private int msgQueueLimit = DFLT_MSG_QUEUE_LIMIT; - - /** NIO server. */ - private GridNioServer<GridHadoopMessage> nioSrvr; - - /** Shared memory server. */ - private IpcSharedMemoryServerEndpoint shmemSrv; - - /** {@code TCP_NODELAY} option value for created sockets. */ - private boolean tcpNoDelay = DFLT_TCP_NODELAY; - - /** Shared memory accept worker. */ - private ShmemAcceptWorker shmemAcceptWorker; - - /** Shared memory workers. */ - private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8<>(); - - /** Clients. */ - private final ConcurrentMap<UUID, GridHadoopCommunicationClient> clients = GridConcurrentFactory.newMap(); - - /** Message listener. */ - private volatile GridHadoopMessageListener lsnr; - - /** Bound port. */ - private int boundTcpPort = -1; - - /** Bound port for shared memory server. */ - private int boundTcpShmemPort = -1; - - /** Count of selectors to use in TCP server. */ - private int selectorsCnt = DFLT_SELECTORS_CNT; - - /** Local node ID message. */ - private ProcessHandshakeMessage locIdMsg; - - /** Locks. */ - private final GridKeyLock locks = new GridKeyLock(); - - /** - * @param parentNodeId Parent node ID. - * @param procId Process ID. - * @param marsh Marshaller to use. - * @param log Logger. - * @param execSvc Executor service for message notification. - * @param gridName Grid name. - */ - public GridHadoopExternalCommunication( - UUID parentNodeId, - UUID procId, - Marshaller marsh, - IgniteLogger log, - ExecutorService execSvc, - String gridName - ) { - locProcDesc = new GridHadoopProcessDescriptor(parentNodeId, procId); - - this.marsh = marsh; - this.log = log.getLogger(GridHadoopExternalCommunication.class); - this.execSvc = execSvc; - this.gridName = gridName; - } - - /** - * Sets local port for socket binding. - * <p> - * If not provided, default value is {@link #DFLT_PORT}. - * - * @param locPort Port number. - */ - public void setLocalPort(int locPort) { - this.locPort = locPort; - } - - /** - * Gets local port for socket binding. - * - * @return Local port. - */ - public int getLocalPort() { - return locPort; - } - - /** - * Sets local port range for local host ports (value must greater than or equal to <tt>0</tt>). - * If provided local port (see {@link #setLocalPort(int)}} is occupied, - * implementation will try to increment the port number for as long as it is less than - * initial value plus this range. - * <p> - * If port range value is <tt>0</tt>, then implementation will try bind only to the port provided by - * {@link #setLocalPort(int)} method and fail if binding to this port did not succeed. - * <p> - * Local port range is very useful during development when more than one grid nodes need to run - * on the same physical machine. - * <p> - * If not provided, default value is {@link #DFLT_PORT_RANGE}. - * - * @param locPortRange New local port range. - */ - public void setLocalPortRange(int locPortRange) { - this.locPortRange = locPortRange; - } - - /** - * @return Local port range. - */ - public int getLocalPortRange() { - return locPortRange; - } - - /** - * Sets local port to accept shared memory connections. - * <p> - * If set to {@code -1} shared memory communication will be disabled. - * <p> - * If not provided, shared memory is disabled. - * - * @param shmemPort Port number. - */ - public void setSharedMemoryPort(int shmemPort) { - this.shmemPort = shmemPort; - } - - /** - * Gets shared memory port to accept incoming connections. - * - * @return Shared memory port. - */ - public int getSharedMemoryPort() { - return shmemPort; - } - - /** - * Sets connect timeout used when establishing connection - * with remote nodes. - * <p> - * {@code 0} is interpreted as infinite timeout. - * <p> - * If not provided, default value is {@link #DFLT_CONN_TIMEOUT}. - * - * @param connTimeout Connect timeout. - */ - public void setConnectTimeout(long connTimeout) { - this.connTimeout = connTimeout; - } - - /** - * @return Connection timeout. - */ - public long getConnectTimeout() { - return connTimeout; - } - - /** - * Sets maximum connect timeout. If handshake is not established within connect timeout, - * then SPI tries to repeat handshake procedure with increased connect timeout. - * Connect timeout can grow till maximum timeout value, - * if maximum timeout value is reached then the handshake is considered as failed. - * <p> - * {@code 0} is interpreted as infinite timeout. - * <p> - * If not provided, default value is {@link #DFLT_MAX_CONN_TIMEOUT}. - * - * @param maxConnTimeout Maximum connect timeout. - */ - public void setMaxConnectTimeout(long maxConnTimeout) { - this.maxConnTimeout = maxConnTimeout; - } - - /** - * Gets maximum connection timeout. - * - * @return Maximum connection timeout. - */ - public long getMaxConnectTimeout() { - return maxConnTimeout; - } - - /** - * Sets maximum number of reconnect attempts used when establishing connection - * with remote nodes. - * <p> - * If not provided, default value is {@link #DFLT_RECONNECT_CNT}. - * - * @param reconCnt Maximum number of reconnection attempts. - */ - public void setReconnectCount(int reconCnt) { - this.reconCnt = reconCnt; - } - - /** - * @return Reconnect count. - */ - public int getReconnectCount() { - return reconCnt; - } - - /** - * Sets flag to allocate direct or heap buffer in SPI. - * If value is {@code true}, then SPI will use {@link ByteBuffer#allocateDirect(int)} call. - * Otherwise, SPI will use {@link ByteBuffer#allocate(int)} call. - * <p> - * If not provided, default value is {@code true}. - * - * @param directBuf Flag indicates to allocate direct or heap buffer in SPI. - */ - public void setDirectBuffer(boolean directBuf) { - this.directBuf = directBuf; - } - - /** - * @return Direct buffer flag. - */ - public boolean isDirectBuffer() { - return directBuf; - } - - /** - * Sets the count of selectors te be used in TCP server. - * <p/> - * If not provided, default value is {@link #DFLT_SELECTORS_CNT}. - * - * @param selectorsCnt Selectors count. - */ - public void setSelectorsCount(int selectorsCnt) { - this.selectorsCnt = selectorsCnt; - } - - /** - * @return Number of selectors to use. - */ - public int getSelectorsCount() { - return selectorsCnt; - } - - /** - * Sets value for {@code TCP_NODELAY} socket option. Each - * socket will be opened using provided value. - * <p> - * Setting this option to {@code true} disables Nagle's algorithm - * for socket decreasing latency and delivery time for small messages. - * <p> - * For systems that work under heavy network load it is advisable to - * set this value to {@code false}. - * <p> - * If not provided, default value is {@link #DFLT_TCP_NODELAY}. - * - * @param tcpNoDelay {@code True} to disable TCP delay. - */ - public void setTcpNoDelay(boolean tcpNoDelay) { - this.tcpNoDelay = tcpNoDelay; - } - - /** - * @return {@code TCP_NO_DELAY} flag. - */ - public boolean isTcpNoDelay() { - return tcpNoDelay; - } - - /** - * Sets receive buffer size for sockets created or accepted by this SPI. - * <p> - * If not provided, default is {@code 0} which leaves buffer unchanged after - * socket creation (OS defaults). - * - * @param sockRcvBuf Socket receive buffer size. - */ - public void setSocketReceiveBuffer(int sockRcvBuf) { - this.sockRcvBuf = sockRcvBuf; - } - - /** - * @return Socket receive buffer size. - */ - public int getSocketReceiveBuffer() { - return sockRcvBuf; - } - - /** - * Sets send buffer size for sockets created or accepted by this SPI. - * <p> - * If not provided, default is {@code 0} which leaves the buffer unchanged - * after socket creation (OS defaults). - * - * @param sockSndBuf Socket send buffer size. - */ - public void setSocketSendBuffer(int sockSndBuf) { - this.sockSndBuf = sockSndBuf; - } - - /** - * @return Socket send buffer size. - */ - public int getSocketSendBuffer() { - return sockSndBuf; - } - - /** - * Sets message queue limit for incoming and outgoing messages. - * <p> - * When set to positive number send queue is limited to the configured value. - * {@code 0} disables the size limitations. - * <p> - * If not provided, default is {@link #DFLT_MSG_QUEUE_LIMIT}. - * - * @param msgQueueLimit Send queue size limit. - */ - public void setMessageQueueLimit(int msgQueueLimit) { - this.msgQueueLimit = msgQueueLimit; - } - - /** - * @return Message queue size limit. - */ - public int getMessageQueueLimit() { - return msgQueueLimit; - } - - /** - * Sets Hadoop communication message listener. - * - * @param lsnr Message listener. - */ - public void setListener(GridHadoopMessageListener lsnr) { - this.lsnr = lsnr; - } - - /** - * @return Outbound message queue size. - */ - public int getOutboundMessagesQueueSize() { - return nioSrvr.outboundMessagesQueueSize(); - } - - /** - * Starts communication. - * - * @throws IgniteCheckedException If failed. - */ - public void start() throws IgniteCheckedException { - try { - locHost = U.getLocalHost(); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to initialize local address.", e); - } - - try { - shmemSrv = resetShmemServer(); - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to start shared memory communication server.", e); - } - - try { - // This method potentially resets local port to the value - // local node was bound to. - nioSrvr = resetNioServer(); - } - catch (IgniteCheckedException e) { - throw new IgniteCheckedException("Failed to initialize TCP server: " + locHost, e); - } - - locProcDesc.address(locHost.getHostAddress()); - locProcDesc.sharedMemoryPort(boundTcpShmemPort); - locProcDesc.tcpPort(boundTcpPort); - - locIdMsg = new ProcessHandshakeMessage(locProcDesc); - - if (shmemSrv != null) { - shmemAcceptWorker = new ShmemAcceptWorker(shmemSrv); - - new IgniteThread(shmemAcceptWorker).start(); - } - - nioSrvr.start(); - } - - /** - * Gets local process descriptor. - * - * @return Local process descriptor. - */ - public GridHadoopProcessDescriptor localProcessDescriptor() { - return locProcDesc; - } - - /** - * Gets filters used by communication. - * - * @return Filters array. - */ - private GridNioFilter[] filters() { - return new GridNioFilter[] { - new GridNioAsyncNotifyFilter(gridName, execSvc, log), - new HandshakeAndBackpressureFilter(), - new GridHadoopMarshallerFilter(marsh), - new GridNioCodecFilter(new GridBufferedParser(directBuf, ByteOrder.nativeOrder()), log, false) - }; - } - - /** - * Recreates tpcSrvr socket instance. - * - * @return Server instance. - * @throws IgniteCheckedException Thrown if it's not possible to create server. - */ - private GridNioServer<GridHadoopMessage> resetNioServer() throws IgniteCheckedException { - if (boundTcpPort >= 0) - throw new IgniteCheckedException("Tcp NIO server was already created on port " + boundTcpPort); - - IgniteCheckedException lastEx = null; - - // If configured TCP port is busy, find first available in range. - for (int port = locPort; port < locPort + locPortRange; port++) { - try { - GridNioServer<GridHadoopMessage> srvr = - GridNioServer.<GridHadoopMessage>builder() - .address(locHost) - .port(port) - .listener(srvLsnr) - .logger(log.getLogger(GridNioServer.class)) - .selectorCount(selectorsCnt) - .gridName(gridName) - .tcpNoDelay(tcpNoDelay) - .directBuffer(directBuf) - .byteOrder(ByteOrder.nativeOrder()) - .socketSendBufferSize(sockSndBuf) - .socketReceiveBufferSize(sockRcvBuf) - .sendQueueLimit(msgQueueLimit) - .directMode(false) - .filters(filters()) - .build(); - - boundTcpPort = port; - - // Ack Port the TCP server was bound to. - if (log.isInfoEnabled()) - log.info("Successfully bound to TCP port [port=" + boundTcpPort + - ", locHost=" + locHost + ']'); - - return srvr; - } - catch (IgniteCheckedException e) { - lastEx = e; - - if (log.isDebugEnabled()) - log.debug("Failed to bind to local port (will try next port within range) [port=" + port + - ", locHost=" + locHost + ']'); - } - } - - // If free port wasn't found. - throw new IgniteCheckedException("Failed to bind to any port within range [startPort=" + locPort + - ", portRange=" + locPortRange + ", locHost=" + locHost + ']', lastEx); - } - - /** - * Creates new shared memory communication server. - * @return Server. - * @throws IgniteCheckedException If failed. - */ - @Nullable private IpcSharedMemoryServerEndpoint resetShmemServer() throws IgniteCheckedException { - if (boundTcpShmemPort >= 0) - throw new IgniteCheckedException("Shared memory server was already created on port " + boundTcpShmemPort); - - if (shmemPort == -1 || U.isWindows()) - return null; - - IgniteCheckedException lastEx = null; - - // If configured TCP port is busy, find first available in range. - for (int port = shmemPort; port < shmemPort + locPortRange; port++) { - try { - IpcSharedMemoryServerEndpoint srv = new IpcSharedMemoryServerEndpoint( - log.getLogger(IpcSharedMemoryServerEndpoint.class), - locProcDesc.processId(), gridName); - - srv.setPort(port); - - srv.omitOutOfResourcesWarning(true); - - srv.start(); - - boundTcpShmemPort = port; - - // Ack Port the TCP server was bound to. - if (log.isInfoEnabled()) - log.info("Successfully bound shared memory communication to TCP port [port=" + boundTcpShmemPort + - ", locHost=" + locHost + ']'); - - return srv; - } - catch (IgniteCheckedException e) { - lastEx = e; - - if (log.isDebugEnabled()) - log.debug("Failed to bind to local port (will try next port within range) [port=" + port + - ", locHost=" + locHost + ']'); - } - } - - // If free port wasn't found. - throw new IgniteCheckedException("Failed to bind shared memory communication to any port within range [startPort=" + - locPort + ", portRange=" + locPortRange + ", locHost=" + locHost + ']', lastEx); - } - - /** - * Stops the server. - * - * @throws IgniteCheckedException - */ - public void stop() throws IgniteCheckedException { - // Stop TCP server. - if (nioSrvr != null) - nioSrvr.stop(); - - U.cancel(shmemAcceptWorker); - U.join(shmemAcceptWorker, log); - - U.cancel(shmemWorkers); - U.join(shmemWorkers, log); - - shmemWorkers.clear(); - - // Force closing on stop (safety). - for (GridHadoopCommunicationClient client : clients.values()) - client.forceClose(); - - // Clear resources. - nioSrvr = null; - - boundTcpPort = -1; - } - - /** - * Sends message to Hadoop process. - * - * @param desc - * @param msg - * @throws IgniteCheckedException - */ - public void sendMessage(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) throws - IgniteCheckedException { - assert desc != null; - assert msg != null; - - if (log.isTraceEnabled()) - log.trace("Sending message to Hadoop process [desc=" + desc + ", msg=" + msg + ']'); - - GridHadoopCommunicationClient client = null; - - boolean closeOnRelease = true; - - try { - client = reserveClient(desc); - - client.sendMessage(desc, msg); - - closeOnRelease = false; - } - finally { - if (client != null) { - if (closeOnRelease) { - client.forceClose(); - - clients.remove(desc.processId(), client); - } - else - client.release(); - } - } - } - - /** - * Returns existing or just created client to node. - * - * @param desc Node to which client should be open. - * @return The existing or just created client. - * @throws IgniteCheckedException Thrown if any exception occurs. - */ - private GridHadoopCommunicationClient reserveClient(GridHadoopProcessDescriptor desc) throws IgniteCheckedException { - assert desc != null; - - UUID procId = desc.processId(); - - while (true) { - GridHadoopCommunicationClient client = clients.get(procId); - - if (client == null) { - if (log.isDebugEnabled()) - log.debug("Did not find client for remote process [locProcDesc=" + locProcDesc + ", desc=" + - desc + ']'); - - // Do not allow concurrent connects. - Object sync = locks.lock(procId); - - try { - client = clients.get(procId); - - if (client == null) { - GridHadoopCommunicationClient old = clients.put(procId, client = createNioClient(desc)); - - assert old == null; - } - } - finally { - locks.unlock(procId, sync); - } - - assert client != null; - } - - if (client.reserve()) - return client; - else - // Client has just been closed by idle worker. Help it and try again. - clients.remove(procId, client); - } - } - - /** - * @param desc Process descriptor. - * @return Client. - * @throws IgniteCheckedException If failed. - */ - @Nullable protected GridHadoopCommunicationClient createNioClient(GridHadoopProcessDescriptor desc) - throws IgniteCheckedException { - assert desc != null; - - int shmemPort = desc.sharedMemoryPort(); - - // If remote node has shared memory server enabled and has the same set of MACs - // then we are likely to run on the same host and shared memory communication could be tried. - if (shmemPort != -1 && locProcDesc.parentNodeId().equals(desc.parentNodeId())) { - try { - return createShmemClient(desc, shmemPort); - } - catch (IgniteCheckedException e) { - if (e.hasCause(IpcOutOfSystemResourcesException.class)) - // Has cause or is itself the IpcOutOfSystemResourcesException. - LT.warn(log, null, OUT_OF_RESOURCES_TCP_MSG); - else if (log.isDebugEnabled()) - log.debug("Failed to establish shared memory connection with local hadoop process: " + - desc); - } - } - - return createTcpClient(desc); - } - - /** - * @param desc Process descriptor. - * @param port Port. - * @return Client. - * @throws IgniteCheckedException If failed. - */ - @Nullable protected GridHadoopCommunicationClient createShmemClient(GridHadoopProcessDescriptor desc, int port) - throws IgniteCheckedException { - int attempt = 1; - - int connectAttempts = 1; - - long connTimeout0 = connTimeout; - - while (true) { - IpcEndpoint clientEndpoint; - - try { - clientEndpoint = new IpcSharedMemoryClientEndpoint(port, (int)connTimeout, log); - } - catch (IgniteCheckedException e) { - // Reconnect for the second time, if connection is not established. - if (connectAttempts < 2 && X.hasCause(e, ConnectException.class)) { - connectAttempts++; - - continue; - } - - throw e; - } - - GridHadoopCommunicationClient client = null; - - try { - ShmemWorker worker = new ShmemWorker(clientEndpoint, false); - - shmemWorkers.add(worker); - - GridNioSession ses = worker.session(); - - HandshakeFinish fin = new HandshakeFinish(); - - // We are in lock, it is safe to get session and attach - ses.addMeta(HANDSHAKE_FINISH_META, fin); - - client = new GridHadoopTcpNioCommunicationClient(ses); - - new IgniteThread(worker).start(); - - fin.await(connTimeout0); - } - catch (GridHadoopHandshakeTimeoutException e) { - if (log.isDebugEnabled()) - log.debug("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 + - ", err=" + e.getMessage() + ", client=" + client + ']'); - - if (client != null) - client.forceClose(); - - if (attempt == reconCnt || connTimeout0 > maxConnTimeout) { - if (log.isDebugEnabled()) - log.debug("Handshake timedout (will stop attempts to perform the handshake) " + - "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout + - ", attempt=" + attempt + ", reconCnt=" + reconCnt + - ", err=" + e.getMessage() + ", client=" + client + ']'); - - throw e; - } - else { - attempt++; - - connTimeout0 *= 2; - - continue; - } - } - catch (RuntimeException | Error e) { - if (log.isDebugEnabled()) - log.debug( - "Caught exception (will close client) [err=" + e.getMessage() + ", client=" + client + ']'); - - if (client != null) - client.forceClose(); - - throw e; - } - - return client; - } - } - - /** - * Establish TCP connection to remote hadoop process and returns client. - * - * @param desc Process descriptor. - * @return Client. - * @throws IgniteCheckedException If failed. - */ - protected GridHadoopCommunicationClient createTcpClient(GridHadoopProcessDescriptor desc) throws IgniteCheckedException { - String addr = desc.address(); - - int port = desc.tcpPort(); - - if (log.isDebugEnabled()) - log.debug("Trying to connect to remote process [locProcDesc=" + locProcDesc + ", desc=" + desc + ']'); - - boolean conn = false; - GridHadoopTcpNioCommunicationClient client = null; - IgniteCheckedException errs = null; - - int connectAttempts = 1; - - long connTimeout0 = connTimeout; - - int attempt = 1; - - while (!conn) { // Reconnection on handshake timeout. - try { - SocketChannel ch = SocketChannel.open(); - - ch.configureBlocking(true); - - ch.socket().setTcpNoDelay(tcpNoDelay); - ch.socket().setKeepAlive(true); - - if (sockRcvBuf > 0) - ch.socket().setReceiveBufferSize(sockRcvBuf); - - if (sockSndBuf > 0) - ch.socket().setSendBufferSize(sockSndBuf); - - ch.socket().connect(new InetSocketAddress(addr, port), (int)connTimeout); - - HandshakeFinish fin = new HandshakeFinish(); - - GridNioSession ses = nioSrvr.createSession(ch, F.asMap(HANDSHAKE_FINISH_META, fin)).get(); - - client = new GridHadoopTcpNioCommunicationClient(ses); - - if (log.isDebugEnabled()) - log.debug("Waiting for handshake finish for client: " + client); - - fin.await(connTimeout0); - - conn = true; - } - catch (GridHadoopHandshakeTimeoutException e) { - if (client != null) { - client.forceClose(); - - client = null; - } - - if (log.isDebugEnabled()) - log.debug( - "Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 + - ", desc=" + desc + ", port=" + port + ", err=" + e + ']'); - - if (attempt == reconCnt || connTimeout0 > maxConnTimeout) { - if (log.isDebugEnabled()) - log.debug("Handshake timed out (will stop attempts to perform the handshake) " + - "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout + - ", attempt=" + attempt + ", reconCnt=" + reconCnt + - ", err=" + e.getMessage() + ", addr=" + addr + ']'); - - if (errs == null) - errs = new IgniteCheckedException("Failed to connect to remote Hadoop process " + - "(is process still running?) [desc=" + desc + ", addrs=" + addr + ']'); - - errs.addSuppressed(e); - - break; - } - else { - attempt++; - - connTimeout0 *= 2; - - // Continue loop. - } - } - catch (Exception e) { - if (client != null) { - client.forceClose(); - - client = null; - } - - if (log.isDebugEnabled()) - log.debug("Client creation failed [addr=" + addr + ", port=" + port + - ", err=" + e + ']'); - - if (X.hasCause(e, SocketTimeoutException.class)) - LT.warn(log, null, "Connect timed out (consider increasing 'connTimeout' " + - "configuration property) [addr=" + addr + ", port=" + port + ']'); - - if (errs == null) - errs = new IgniteCheckedException("Failed to connect to remote Hadoop process (is process still running?) " + - "[desc=" + desc + ", addrs=" + addr + ']'); - - errs.addSuppressed(e); - - // Reconnect for the second time, if connection is not established. - if (connectAttempts < 2 && - (e instanceof ConnectException || X.hasCause(e, ConnectException.class))) { - connectAttempts++; - - continue; - } - - break; - } - } - - if (client == null) { - assert errs != null; - - if (X.hasCause(errs, ConnectException.class)) - LT.warn(log, null, "Failed to connect to a remote Hadoop process (is process still running?). " + - "Make sure operating system firewall is disabled on local and remote host) " + - "[addrs=" + addr + ", port=" + port + ']'); - - throw errs; - } - - if (log.isDebugEnabled()) - log.debug("Created client: " + client); - - return client; - } - - /** - * @param desc Sender process descriptor. - * @param msg Communication message. - */ - protected void notifyListener(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) { - GridHadoopMessageListener lsnr = this.lsnr; - - if (lsnr != null) - // Notify listener of a new message. - lsnr.onMessageReceived(desc, msg); - else if (log.isDebugEnabled()) - log.debug("Received communication message without any registered listeners (will ignore) " + - "[senderProcDesc=" + desc + ", msg=" + msg + ']'); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopExternalCommunication.class, this); - } - - /** - * This worker takes responsibility to shut the server down when stopping, - * No other thread shall stop passed server. - */ - private class ShmemAcceptWorker extends GridWorker { - /** */ - private final IpcSharedMemoryServerEndpoint srv; - - /** - * @param srv Server. - */ - ShmemAcceptWorker(IpcSharedMemoryServerEndpoint srv) { - super(gridName, "shmem-communication-acceptor", log); - - this.srv = srv; - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - try { - while (!Thread.interrupted()) { - ShmemWorker e = new ShmemWorker(srv.accept(), true); - - shmemWorkers.add(e); - - new IgniteThread(e).start(); - } - } - catch (IgniteCheckedException e) { - if (!isCancelled()) - U.error(log, "Shmem server failed.", e); - } - finally { - srv.close(); - } - } - - /** {@inheritDoc} */ - @Override public void cancel() { - super.cancel(); - - srv.close(); - } - } - - /** - * - */ - private class ShmemWorker extends GridWorker { - /** */ - private final IpcEndpoint endpoint; - - /** Adapter. */ - private GridHadoopIpcToNioAdapter<GridHadoopMessage> adapter; - - /** - * @param endpoint Endpoint. - */ - private ShmemWorker(IpcEndpoint endpoint, boolean accepted) { - super(gridName, "shmem-worker", log); - - this.endpoint = endpoint; - - adapter = new GridHadoopIpcToNioAdapter<>( - GridHadoopExternalCommunication.this.log, - endpoint, - accepted, - srvLsnr, - filters()); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - try { - adapter.serve(); - } - finally { - shmemWorkers.remove(this); - - endpoint.close(); - } - } - - /** {@inheritDoc} */ - @Override public void cancel() { - super.cancel(); - - endpoint.close(); - } - - /** @{@inheritDoc} */ - @Override protected void cleanup() { - super.cleanup(); - - endpoint.close(); - } - - /** @{@inheritDoc} */ - @Override public String toString() { - return S.toString(ShmemWorker.class, this); - } - - /** - * @return NIO session for this worker. - */ - public GridNioSession session() { - return adapter.session(); - } - } - - /** - * - */ - private static class HandshakeFinish { - /** Await latch. */ - private CountDownLatch latch = new CountDownLatch(1); - - /** - * Finishes handshake. - */ - public void finish() { - latch.countDown(); - } - - /** - * @param time Time to wait. - * @throws GridHadoopHandshakeTimeoutException If failed to wait. - */ - public void await(long time) throws GridHadoopHandshakeTimeoutException { - try { - if (!latch.await(time, TimeUnit.MILLISECONDS)) - throw new GridHadoopHandshakeTimeoutException("Failed to wait for handshake to finish [timeout=" + - time + ']'); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new GridHadoopHandshakeTimeoutException("Failed to wait for handshake to finish (thread was " + - "interrupted) [timeout=" + time + ']', e); - } - } - } - - /** - * - */ - private class HandshakeAndBackpressureFilter extends GridNioFilterAdapter { - /** - * Assigns filter name to a filter. - */ - protected HandshakeAndBackpressureFilter() { - super("HadoopHandshakeFilter"); - } - - /** {@inheritDoc} */ - @Override public void onSessionOpened(final GridNioSession ses) throws IgniteCheckedException { - if (ses.accepted()) { - if (log.isDebugEnabled()) - log.debug("Accepted connection, initiating handshake: " + ses); - - // Server initiates handshake. - ses.send(locIdMsg).listenAsync(new CI1<GridNioFuture<?>>() { - @Override public void apply(GridNioFuture<?> fut) { - try { - // Make sure there were no errors. - fut.get(); - } - catch (IgniteCheckedException | IOException e) { - log.warning("Failed to send handshake message, will close session: " + ses, e); - - ses.close(); - } - } - }); - } - } - - /** {@inheritDoc} */ - @Override public void onSessionClosed(GridNioSession ses) throws IgniteCheckedException { - proceedSessionClosed(ses); - } - - /** {@inheritDoc} */ - @Override public void onExceptionCaught(GridNioSession ses, IgniteCheckedException ex) throws IgniteCheckedException { - proceedExceptionCaught(ses, ex); - } - - /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException { - if (ses.meta(PROCESS_META) == null && !(msg instanceof ProcessHandshakeMessage)) - log.warning("Writing message before handshake has finished [ses=" + ses + ", msg=" + msg + ']'); - - return proceedSessionWrite(ses, msg); - } - - /** {@inheritDoc} */ - @Override public void onMessageReceived(GridNioSession ses, Object msg) throws IgniteCheckedException { - GridHadoopProcessDescriptor desc = ses.meta(PROCESS_META); - - UUID rmtProcId = desc == null ? null : desc.processId(); - - if (rmtProcId == null) { - if (!(msg instanceof ProcessHandshakeMessage)) { - log.warning("Invalid handshake message received, will close connection [ses=" + ses + - ", msg=" + msg + ']'); - - ses.close(); - - return; - } - - ProcessHandshakeMessage nId = (ProcessHandshakeMessage)msg; - - if (log.isDebugEnabled()) - log.debug("Received handshake message [ses=" + ses + ", msg=" + msg + ']'); - - ses.addMeta(PROCESS_META, nId.processDescriptor()); - - if (!ses.accepted()) - // Send handshake reply. - ses.send(locIdMsg); - else { - // - rmtProcId = nId.processDescriptor().processId(); - - if (log.isDebugEnabled()) - log.debug("Finished handshake with remote client: " + ses); - - Object sync = locks.tryLock(rmtProcId); - - if (sync != null) { - try { - if (clients.get(rmtProcId) == null) { - if (log.isDebugEnabled()) - log.debug("Will reuse session for descriptor: " + rmtProcId); - - // Handshake finished flag is true. - clients.put(rmtProcId, new GridHadoopTcpNioCommunicationClient(ses)); - } - else { - if (log.isDebugEnabled()) - log.debug("Will not reuse client as another already exists [locProcDesc=" + - locProcDesc + ", desc=" + desc + ']'); - } - } - finally { - locks.unlock(rmtProcId, sync); - } - } - else { - if (log.isDebugEnabled()) - log.debug("Concurrent connection is being established, will not reuse client session [" + - "locProcDesc=" + locProcDesc + ", desc=" + desc + ']'); - } - } - - if (log.isDebugEnabled()) - log.debug("Handshake is finished for session [ses=" + ses + ", locProcDesc=" + locProcDesc + ']'); - - HandshakeFinish to = ses.meta(HANDSHAKE_FINISH_META); - - if (to != null) - to.finish(); - - // Notify session opened (both parties). - proceedSessionOpened(ses); - } - else { - if (msgQueueLimit > 0) { - GridNioMessageTracker tracker = ses.meta(TRACKER_META); - - if (tracker == null) { - GridNioMessageTracker old = ses.addMeta(TRACKER_META, tracker = - new GridNioMessageTracker(ses, msgQueueLimit)); - - assert old == null; - } - - tracker.onMessageReceived(); - } - - proceedMessageReceived(ses, msg); - } - } - - /** {@inheritDoc} */ - @Override public GridNioFuture<Boolean> onSessionClose(GridNioSession ses) throws IgniteCheckedException { - return proceedSessionClose(ses); - } - - /** {@inheritDoc} */ - @Override public void onSessionIdleTimeout(GridNioSession ses) throws IgniteCheckedException { - proceedSessionIdleTimeout(ses); - } - - /** {@inheritDoc} */ - @Override public void onSessionWriteTimeout(GridNioSession ses) throws IgniteCheckedException { - proceedSessionWriteTimeout(ses); - } - } - - /** - * Process ID message. - */ - @SuppressWarnings("PublicInnerClass") - public static class ProcessHandshakeMessage implements GridHadoopMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** Node ID. */ - private GridHadoopProcessDescriptor procDesc; - - /** */ - public ProcessHandshakeMessage() { - // No-op. - } - - /** - * @param procDesc Process descriptor. - */ - private ProcessHandshakeMessage(GridHadoopProcessDescriptor procDesc) { - this.procDesc = procDesc; - } - - /** - * @return Process ID. - */ - public GridHadoopProcessDescriptor processDescriptor() { - return procDesc; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(procDesc); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - procDesc = (GridHadoopProcessDescriptor)in.readObject(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(ProcessHandshakeMessage.class, this); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopHandshakeTimeoutException.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopHandshakeTimeoutException.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopHandshakeTimeoutException.java deleted file mode 100644 index e001dc9..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopHandshakeTimeoutException.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication; - -import org.apache.ignite.*; -import org.jetbrains.annotations.*; - -/** Internal exception class for proper timeout handling. */ -class GridHadoopHandshakeTimeoutException extends IgniteCheckedException { - /** */ - private static final long serialVersionUID = 0L; - - /** - * @param msg Message. - */ - GridHadoopHandshakeTimeoutException(String msg) { - super(msg); - } - - /** - * @param msg Message. - * @param cause Cause. - */ - GridHadoopHandshakeTimeoutException(String msg, @Nullable Throwable cause) { - super(msg, cause); - } -}