http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java deleted file mode 100644 index 4610204..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java +++ /dev/null @@ -1,1375 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.cluster.*; -import org.apache.ignite.internal.managers.communication.*; -import org.apache.ignite.internal.managers.deployment.*; -import org.apache.ignite.internal.managers.eventstorage.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.worker.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.plugin.extensions.communication.*; -import org.apache.ignite.streamer.*; -import org.apache.ignite.streamer.router.*; -import org.apache.ignite.thread.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.events.EventType.*; -import static org.apache.ignite.internal.GridTopic.*; - -/** - * - */ -public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Number of message send retries. */ - private static final int SEND_RETRY_COUNT = 3; - - /** Retry delay. */ - private static final int SEND_RETRY_DELAY = 1000; - - /** How many cancelled future IDs to keep in history. */ - private static final int CANCELLED_FUTS_HISTORY_SIZE = 4096; - - /** Log. */ - private IgniteLogger log; - - /** Context. */ - private GridKernalContext ctx; - - /** */ - private StreamerContext streamerCtx; - - /** Read-write lock. */ - private GridSpinReadWriteLock lock; - - /** Stopping flag. */ - private boolean stopping; - - /** Streamer configuration. */ - private StreamerConfiguration c; - - /** Name. */ - private String name; - - /** Stages. */ - @GridToStringInclude - private Map<String, StreamerStageWrapper> stages; - - /** Windows. */ - @GridToStringInclude - private Map<String, StreamerWindow> winMap; - - /** Default streamer window. */ - private StreamerWindow dfltWin; - - /** */ - private String firstStage; - - /** Router. */ - private StreamerEventRouter router; - - /** At least once. */ - private boolean atLeastOnce; - - /** Streamer metrics. */ - private volatile StreamerMetricsHolder streamerMetrics; - - /** Topic. */ - private Object topic; - - /** Stage execution futures. */ - private ConcurrentMap<IgniteUuid, GridStreamerStageExecutionFuture> stageFuts; - - /** Batch execution futures. */ - private ConcurrentMap<IgniteUuid, BatchExecutionFuture> batchFuts; - - /** Streamer executor service. */ - private ExecutorService execSvc; - - /** Failure listeners. */ - private Collection<StreamerFailureListener> failureLsnrs = new ConcurrentLinkedQueue<>(); - - /** Cancelled */ - private Collection<IgniteUuid> cancelledFutIds = - new GridBoundedConcurrentLinkedHashSet<>(CANCELLED_FUTS_HISTORY_SIZE); - - /** Load control semaphore. */ - private Semaphore sem; - - /** Deploy class. */ - private Class<?> depCls; - - /** Executor service capacity. */ - private int execSvcCap; - - /** Window lock. */ - private final GridSpinReadWriteLock winLock = new GridSpinReadWriteLock(); - - /** - * Empty constructor required by {@link Externalizable}. - */ - public IgniteStreamerImpl() { - // No-op. - } - - /** - * @param ctx Kernal context. - * @param c Configuration. - */ - public IgniteStreamerImpl(GridKernalContext ctx, StreamerConfiguration c) { - this.ctx = ctx; - - log = ctx.log(IgniteStreamerImpl.class); - - atLeastOnce = c.isAtLeastOnce(); - name = c.getName(); - router = c.getRouter(); - this.c = c; - - if (atLeastOnce) { - if (c.getMaximumConcurrentSessions() > 0) - sem = new Semaphore(c.getMaximumConcurrentSessions()); - } - - topic = name == null ? TOPIC_STREAM : TOPIC_STREAM.topic(name); - lock = new GridSpinReadWriteLock(); - stageFuts = new ConcurrentHashMap8<>(); - batchFuts = new ConcurrentHashMap8<>(); - - streamerCtx = new GridStreamerContextImpl(ctx, c, this); - } - - /** - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings("unchecked") - public void start() throws IgniteCheckedException { - if (log.isDebugEnabled()) - log.debug("Starting streamer: " + name); - - if (F.isEmpty(c.getStages())) - throw new IgniteCheckedException("Streamer should have at least one stage configured " + - "(fix configuration and restart): " + name); - - if (F.isEmpty(c.getWindows())) - throw new IgniteCheckedException("Streamer should have at least one window configured " + - "(fix configuration and restart): " + name); - - prepareResources(); - - U.startLifecycleAware(lifecycleAwares()); - - stages = U.newLinkedHashMap(c.getStages().size()); - - int stageIdx = 0; - - StreamerStageWrapper prev = null; - - for (StreamerStage s : c.getStages()) { - String sName = s.name(); - - if (F.isEmpty(sName)) - throw new IgniteCheckedException("Streamer stage should have non-empty name [streamerName=" + name + - ", stage=" + s + ']'); - - if (stages.containsKey(sName)) - throw new IgniteCheckedException("Streamer stages have duplicate names (all names should be unique) " + - "[streamerName=" + name + ", stage=" + s + ", stageName=" + sName + ']'); - - if (firstStage == null) - firstStage = sName; - - StreamerStageWrapper wrapper = new StreamerStageWrapper(s, stageIdx); - - stages.put(sName, wrapper); - - if (prev != null) - prev.nextStageName(s.name()); - - prev = wrapper; - - stageIdx++; - } - - winMap = new LinkedHashMap<>(); - - for (StreamerWindow w : c.getWindows()) { - String wName = w.name(); - - if (F.isEmpty(wName)) - throw new IgniteCheckedException("Streamer window should have non-empty name [streamerName=" + name + - ", window=" + w + ']'); - - if (winMap.containsKey(wName)) - throw new IgniteCheckedException("Streamer windows have duplicate names (all names should be unique). " + - "If you use two or more windows of the same type you need to assign their names explicitly " + - "[streamer=" + name + ", windowName=" + wName + ']'); - - winMap.put(wName, w); - - if (dfltWin == null) - dfltWin = w; - } - - execSvc = new IgniteThreadPoolExecutor( - ctx.gridName(), - c.getThreadPoolSize(), - c.getThreadPoolSize(), - 0, - new LinkedBlockingQueue<Runnable>()); - - execSvcCap = c.getThreadPoolSize(); - - resetMetrics(); - - if (router == null) - router = new StreamerLocalEventRouter(); - - ctx.io().addMessageListener(topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { - if (log.isDebugEnabled()) - log.debug("Received message [nodeId=" + nodeId + ", msg=" + msg + ']'); - - processStreamerMessage(nodeId, msg); - } - }); - - ctx.event().addLocalEventListener(new GridLocalEventListener() { - @Override public void onEvent(Event evt) { - DiscoveryEvent discoEvt = (DiscoveryEvent)evt; - - for (GridStreamerStageExecutionFuture fut : stageFuts.values()) - fut.onNodeLeft(discoEvt.eventNode().id()); - } - }, EVT_NODE_LEFT, EVT_NODE_FAILED); - } - - /** - * Injects resources into streamer components. - * - * @throws IgniteCheckedException If failed. - */ - private void prepareResources() throws IgniteCheckedException { - for (StreamerStage s : c.getStages()) - ctx.resource().injectGeneric(s); - - if (router == null) - router = new StreamerLocalEventRouter(); - - ctx.resource().injectGeneric(router); - - for (StreamerWindow w : c.getWindows()) - ctx.resource().injectGeneric(w); - } - - /** - * On kernal stop callback. - * - * @param cancel Cancel. - */ - public void onKernalStop(boolean cancel) { - // No further requests will be processed neither locally nor remotely. - lock.writeLock(); - - try { - stopping = true; - } - finally { - lock.writeUnlock(); - } - - if (cancel) { - for (BatchExecutionFuture execFut : batchFuts.values()) { - try { - execFut.cancel(); - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to cancel batch execution future on node stop (will ignore) " + - "[execFut=" + execFut + ", err=" + e + ']'); - } - } - } - else { - // We need to wait for all locally scheduled stage futures to complete. - for (GridStreamerStageExecutionFuture fut : stageFuts.values()) { - try { - if (fut.rootExecution()) { - if (log.isDebugEnabled()) - log.debug("Waiting root execution future on kernal stop: " + fut); - - fut.get(); - } - } - catch (IgniteCheckedException ignore) { - // For failed futures callback will be executed, no need to care about this exception here. - } - } - - for (BatchExecutionFuture execFut : batchFuts.values()) { - try { - execFut.get(); - } - catch (IgniteCheckedException e) { - if (!e.hasCause(IgniteInterruptedCheckedException.class)) - U.warn(log, "Failed to wait for batch execution future completion (will ignore) " + - "[execFut=" + execFut + ", e=" + e + ']'); - } - } - } - - for (StreamerStageWrapper stage : stages.values()) { - try { - ctx.resource().cleanupGeneric(stage.unwrap()); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to cleanup stage [stage=" + stage + ", streamer=" + this + ']', e); - } - } - } - - /** - * @param cancel Whether currently running tasks should be cancelled. - */ - public void stop(boolean cancel) { - ctx.io().removeMessageListener(topic); - - // There is no point to wait for tasks execution since it was already handled by flags. - execSvc.shutdownNow(); - - U.stopLifecycleAware(log, lifecycleAwares()); - } - - /** - * @return Streamer components which can implement {@link org.apache.ignite.lifecycle.LifecycleAware} interface. - */ - private Iterable<Object> lifecycleAwares() { - Collection<Object> objs = new ArrayList<>(); - - objs.addAll(configuration().getStages()); - objs.addAll(configuration().getWindows()); - objs.add(router); - - return objs; - } - - /** {@inheritDoc} */ - @Nullable @Override public String name() { - return name; - } - - /** {@inheritDoc} */ - @Override public StreamerConfiguration configuration() { - return c; - } - - /** {@inheritDoc} */ - @Override public void addEvent(Object evt, Object... evts) { - A.notNull(evt, "evt"); - - if (!F.isEmpty(evts)) - addEvents(F.concat(false, evt, Arrays.asList(evts))); - else - addEvents(Collections.singleton(evt)); - } - - /** {@inheritDoc} */ - @Override public void addEventToStage(String stageName, Object evt, Object... evts) { - A.notNull(stageName, "stageName"); - A.notNull(evt, "evt"); - - if (!F.isEmpty(evts)) - addEventsToStage(stageName, F.concat(false, evt, Arrays.asList(evts))); - else - addEventsToStage(stageName, Collections.singleton(evt)); - } - - /** {@inheritDoc} */ - @Override public void addEvents(Collection<?> evts) { - A.ensure(!F.isEmpty(evts), "evts cannot be null or empty"); - - addEventsToStage(firstStage, evts); - } - - /** {@inheritDoc} */ - @Override public void addEventsToStage(String stageName, Collection<?> evts) { - A.notNull(stageName, "stageName"); - A.ensure(!F.isEmpty(evts), "evts cannot be empty or null"); - - ctx.gateway().readLock(); - - try { - addEvents0(null, 0, U.currentTimeMillis(), null, Collections.singleton(ctx.localNodeId()), stageName, evts); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - finally { - ctx.gateway().readUnlock(); - } - } - - /** {@inheritDoc} */ - @Override public StreamerContext context() { - return streamerCtx; - } - - /** {@inheritDoc} */ - @Override public void addStreamerFailureListener(StreamerFailureListener lsnr) { - failureLsnrs.add(lsnr); - } - - /** {@inheritDoc} */ - @Override public void removeStreamerFailureListener(StreamerFailureListener lsnr) { - failureLsnrs.remove(lsnr); - } - - /** {@inheritDoc} */ - @Override public StreamerMetrics metrics() { - StreamerMetrics ret = new StreamerMetricsAdapter(streamerMetrics); - - streamerMetrics.sampleCurrentStages(); - - return ret; - } - - /** {@inheritDoc} */ - @Override public void reset() { - winLock.writeLock(); - - try { - for (StreamerWindow win : winMap.values()) - win.reset(); - - streamerCtx.localSpace().clear(); - } - finally { - winLock.writeUnlock(); - } - } - - /** {@inheritDoc} */ - @Override public void resetMetrics() { - StreamerStageMetricsHolder[] stageHolders = new StreamerStageMetricsHolder[c.getStages().size()]; - - int idx = 0; - - for (StreamerStage stage : c.getStages()) { - stageHolders[idx] = new StreamerStageMetricsHolder(stage.name()); - - idx++; - } - - StreamerWindowMetricsHolder[] windowHolders = new StreamerWindowMetricsHolder[c.getWindows().size()]; - - idx = 0; - - for (StreamerWindow w : c.getWindows()) { - windowHolders[idx] = new StreamerWindowMetricsHolder(w); - - idx++; - } - - streamerMetrics = new StreamerMetricsHolder(stageHolders, windowHolders, execSvcCap); - } - - /** {@inheritDoc} */ - @Override public void deployClass(Class<?> depCls) { - this.depCls = depCls; - } - - /** {@inheritDoc} */ - @Override public <E> StreamerWindow<E> window() { - return (StreamerWindow<E>)dfltWin; - } - - /** {@inheritDoc} */ - @Override public <E> StreamerWindow<E> window(String windowName) { - return (StreamerWindow<E>)winMap.get(windowName); - } - - /** - * @return {@code atLeastOnce} configuration property. - */ - boolean atLeastOnce() { - return atLeastOnce; - } - - /** - * @return Stage future map size. - */ - int stageFutureMapSize() { - return stageFuts.size(); - } - - /** - * @return Batch future map size. - */ - int batchFutureMapSize() { - return batchFuts.size(); - } - - /** - * @param execId Execution ID, {@code null} if root execution. - * @param failoverAttempt Attempt count. - * @param execStartTs Execution start timestamp, ignored if root execution. - * @param parentFutId Parent future ID. - * @param execNodeIds Execution node IDs. - * @param stageName Stage name. - * @param evts Events. - * @return Future. - * @throws IgniteInterruptedCheckedException If failed. - */ - private GridStreamerStageExecutionFuture addEvents0( - @Nullable IgniteUuid execId, - int failoverAttempt, - long execStartTs, - @Nullable IgniteUuid parentFutId, - @Nullable Collection<UUID> execNodeIds, - String stageName, - Collection<?> evts - ) throws IgniteInterruptedCheckedException { - assert !F.isEmpty(evts); - assert !F.isEmpty(stageName); - - GridStreamerStageExecutionFuture fut = new GridStreamerStageExecutionFuture( - this, - execId, - failoverAttempt, - execStartTs, - parentFutId, - execNodeIds, - stageName, - evts); - - if (atLeastOnce && fut.rootExecution()) { - StreamerMetricsHolder metrics0 = streamerMetrics; - - metrics0.onSessionStarted(); - - fut.metrics(metrics0); - } - - // Acquire semaphore on first future submit. - if (atLeastOnce && fut.rootExecution() && fut.failoverAttemptCount() == 0) { - try { - if (sem != null) - sem.acquire(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteInterruptedCheckedException(e); - } - } - - fut.map(); - - if (!atLeastOnce && fut.isFailed()) - notifyFailure(fut.stageName(), fut.events(), fut.error()); - - // Mind the gap. - for (UUID nodeId : fut.executionNodeIds()) { - if (!ctx.discovery().alive(nodeId)) - fut.onNodeLeft(nodeId); - } - - return fut; - } - - /** {@inheritDoc} */ - @Override public GridKernalContext kernalContext() { - return ctx; - } - - /** {@inheritDoc} */ - @Override public void onFutureMapped(GridStreamerStageExecutionFuture fut) { - if (atLeastOnce) { - GridStreamerStageExecutionFuture old = stageFuts.putIfAbsent(fut.id(), fut); - - assert old == null : "Streamer execution future should be mapped only once: " + old; - } - } - - /** {@inheritDoc} */ - @Override public void onFutureCompleted(GridStreamerStageExecutionFuture fut) { - if (atLeastOnce) { - if (fut.rootExecution() && !fut.isFailed() && sem != null) - sem.release(); - - GridStreamerStageExecutionFuture old = stageFuts.remove(fut.id()); - - assert fut == old : "Invalid future in map [fut=" + fut + ", old=" + old + ']'; - - if (fut.isFailed() || fut.isCancelled()) - cancelChildStages(fut); - - if (fut.rootExecution() && fut.isFailed()) - failover(fut); - } - } - - /** {@inheritDoc} */ - @Override public StreamerEventRouter eventRouter() { - return router; - } - - /** {@inheritDoc} */ - @Override public void scheduleExecutions(GridStreamerStageExecutionFuture fut, - Map<UUID, GridStreamerExecutionBatch> execs) throws IgniteCheckedException { - for (Map.Entry<UUID, GridStreamerExecutionBatch> entry : execs.entrySet()) { - UUID nodeId = entry.getKey(); - GridStreamerExecutionBatch batch = entry.getValue(); - - if (ctx.localNodeId().equals(nodeId)) - scheduleLocal(batch); - else { - if (log.isDebugEnabled()) - log.debug("Sending batch execution request to remote node [nodeId=" + nodeId + - ", futId=" + batch.futureId() + ", stageName=" + batch.stageName() + ']'); - - sendWithRetries(nodeId, createExecutionRequest(batch)); - - if (!ctx.discovery().alive(nodeId)) - fut.onNodeLeft(nodeId); - } - } - } - - /** {@inheritDoc} */ - @Override public void onUndeploy(ClassLoader undeployedLdr) { - if (log.isDebugEnabled()) - log.debug("Processing undeployment event undeployedLdr=" + undeployedLdr + ']'); - - unwindUndeploys(undeployedLdr, true); - } - - /** {@inheritDoc} */ - @Override public void onQueryCompleted(long time, int nodes) { - streamerMetrics.onQueryCompleted(time, nodes); - } - - /** - * Schedules batch execution locally. - * - * @param batch Batch to execute. - * @throws IgniteCheckedException If schedule was attempted on stopping grid. - */ - private void scheduleLocal(final GridStreamerExecutionBatch batch) throws IgniteCheckedException { - final IgniteUuid futId = batch.futureId(); - - lock.readLock(); - - try { - if (stopping) - throw new IgniteCheckedException("Failed to schedule local batch execution (grid is stopping): " + batch); - - if (log.isDebugEnabled()) - log.debug("Scheduling local batch execution [futId=" + futId + ", stageName=" + batch.stageName() + ']'); - - StreamerStageWrapper wrapper = stages.get(batch.stageName()); - - if (wrapper == null) { - completeParentStage(ctx.localNodeId(), batch.futureId(), - new IgniteCheckedException("Failed to process streamer batch (stage was not found): " + - batch.stageName() + ']')); - - return; - } - - // Capture metrics holders for batch execution. - StreamerMetricsHolder streamerMetrics0 = streamerMetrics; - - BatchWorker worker = new BatchWorker(batch, wrapper, streamerMetrics0); - - BatchExecutionFuture batchFut = worker.completionFuture(); - - BatchExecutionFuture old = batchFuts.putIfAbsent(futId, batchFut); - - assert old == null : "Duplicate batch execution future [old=" + old + ", batchFut=" + batchFut + ']'; - - if (cancelled(futId)) { - // Batch was cancelled before execution started, remove future and return. - batchFuts.remove(futId, batchFut); - - return; - } - - streamerMetrics0.onStageScheduled(); - - execSvc.submit(worker); - - batchFut.listen(new CI1<IgniteInternalFuture<Object>>() { - @Override public void apply(IgniteInternalFuture<Object> t) { - BatchExecutionFuture fut = (BatchExecutionFuture)t; - - if (log.isDebugEnabled()) - log.debug("Completed batch execution future: " + fut); - - batchFuts.remove(futId, fut); - - if (!fut.isCancelled() && atLeastOnce) - completeParentStage(ctx.localNodeId(), batch.futureId(), fut.error()); - } - }); - } - finally { - lock.readUnlock(); - } - } - - /** - * Completes parent execution future. - * - * @param completeNodeId Node ID completed batch. - * @param futId Future ID to complete. - * @param err Error, if any. - */ - private void completeParentStage(UUID completeNodeId, IgniteUuid futId, @Nullable Throwable err) { - lock.readLock(); - - try { - if (stopping && !atLeastOnce) { - if (log.isDebugEnabled()) - log.debug("Failed to notify parent stage completion (node is stopping) [futId=" + futId + - ", err=" + err + ']'); - - return; - } - - UUID dstNodeId = futId.globalId(); - - if (ctx.localNodeId().equals(dstNodeId)) { - GridStreamerStageExecutionFuture stageFut = stageFuts.get(futId); - - if (log.isDebugEnabled()) - log.debug("Notifying local execution future [completeNodeId=" + completeNodeId + - ", stageFut=" + stageFut + ", err=" + err + ']'); - - if (stageFut != null) - stageFut.onExecutionCompleted(completeNodeId, err); - } - else { - try { - if (log.isDebugEnabled()) - log.debug("Sending completion response to remote node [nodeId=" + dstNodeId + - ", futId=" + futId + ", err=" + err + ']'); - - byte[] errBytes = err != null ? ctx.config().getMarshaller().marshal(err) : null; - - sendWithRetries(dstNodeId, new GridStreamerResponse(futId, errBytes)); - } - catch (IgniteCheckedException e) { - if (!e.hasCause(ClusterTopologyCheckedException.class)) - log.error("Failed to complete parent stage [futId=" + futId + ", err=" + e + ']'); - } - } - } - finally { - lock.readUnlock(); - } - } - - /** - * Cancels child batches of execution future. - * - * @param fut Future to cancel child batches. - */ - private void cancelChildStages(GridStreamerStageExecutionFuture fut) { - for (UUID nodeId : fut.childExecutions().keySet()) - cancelChildStage(nodeId, fut.id()); - } - - /** - * @param nodeId Node ID to cancel future on. - * @param cancelledFutId Future ID to cancel. - */ - private void cancelChildStage(UUID nodeId, IgniteUuid cancelledFutId) { - assert atLeastOnce; - - if (nodeId.equals(ctx.localNodeId())) { - cancelledFutIds.add(cancelledFutId); - - Iterator<Map.Entry<IgniteUuid, BatchExecutionFuture>> it = batchFuts.entrySet().iterator(); - - while (it.hasNext()) { - Map.Entry<IgniteUuid, BatchExecutionFuture> entry = it.next(); - - if (entry.getKey().equals(cancelledFutId)) { - BatchExecutionFuture batchFut = entry.getValue(); - - try { - batchFut.cancel(); - } - catch (IgniteCheckedException e) { - log.warning("Failed to cancel batch execution future [cancelledFutId=" + cancelledFutId + - ", batchFut=" + batchFut + ']', e); - } - - it.remove(); - } - } - } - else { - try { - sendWithRetries(nodeId, new GridStreamerCancelRequest(cancelledFutId)); - } - catch (IgniteCheckedException e) { - if (!e.hasCause(ClusterTopologyCheckedException.class)) - log.error("Failed to send streamer cancel request to remote node [nodeId=" + nodeId + - ", cancelledFutId=" + cancelledFutId + ']', e); - } - } - } - - /** - * Attempts to failover pipeline execution. - * - * @param fut Future. - */ - private void failover(GridStreamerStageExecutionFuture fut) { - assert fut.rootExecution(); - assert fut.error() != null; - - if (fut.failoverAttemptCount() >= c.getMaximumFailoverAttempts() || stopping) { - // Release semaphore when no failover will be attempted anymore. - if (sem != null) - sem.release(); - - notifyFailure(fut.stageName(), fut.events(), fut.error()); - } - else { - try { - addEvents0(null, fut.failoverAttemptCount() + 1, 0, null, Collections.singleton(ctx.localNodeId()), - fut.stageName(), fut.events()); - } - catch (IgniteInterruptedCheckedException e) { - e.printStackTrace(); - - assert false : "Failover submit should never attempt to acquire semaphore: " + fut + ']'; - } - } - } - - /** - * Notifies failure listeners. - * - * @param stageName Stage name. - * @param evts Events. - * @param err Error cause. - */ - private void notifyFailure(String stageName, Collection<Object> evts, Throwable err) { - for (StreamerFailureListener lsnr : failureLsnrs) - lsnr.onFailure(stageName, evts, err); - } - - /** - * Checks if cancel request was received for this future ID. - * - * @param futId Future ID. - * @return {@code True} if future was cancelled, {@code false} otherwise. - */ - public boolean cancelled(IgniteUuid futId) { - return cancelledFutIds.contains(futId); - } - - /** - * Processes streamer message. - * - * @param sndNodeId Sender node ID. - * @param msg Message. - */ - private void processStreamerMessage(UUID sndNodeId, Object msg) { - if (msg instanceof GridStreamerExecutionRequest) { - GridStreamerExecutionRequest req = (GridStreamerExecutionRequest)msg; - - GridStreamerExecutionBatch batch; - - try { - batch = executionBatch(sndNodeId, req); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to unmarshal execution batch (was class undeployed?) " + - "[sndNodeId=" + sndNodeId + ", msg=" + msg + ']', e); - - return; - } - - try { - scheduleLocal(batch); - } - catch (IgniteCheckedException e) { - // Notify parent in case of error. - completeParentStage(ctx.localNodeId(), batch.futureId(), e); - } - } - else if (msg instanceof GridStreamerCancelRequest) - // This will be a local call. - cancelChildStage(ctx.localNodeId(), ((GridStreamerCancelRequest)msg).cancelledFutureId()); - else if (msg instanceof GridStreamerResponse) { - GridStreamerResponse res = (GridStreamerResponse)msg; - - assert res.futureId().globalId().equals(ctx.localNodeId()) : - "Wrong message received [res=" + res + ", sndNodeId=" + sndNodeId + - ", locNodeId=" + ctx.localNodeId() + ']'; - - Throwable err = null; - - if (res.errorBytes() != null) { - try { - err = ctx.config().getMarshaller().unmarshal(res.errorBytes(), null); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to unmarshal response.", e); - } - } - - // This will complete stage locally. - completeParentStage(sndNodeId, res.futureId(), err); - } - } - - /** - * Clear entries of removed classloader classes. - * - * @param undeployedClsLdr Classloader. - * @param info Whether to print info to log. - */ - private void unwindUndeploys(ClassLoader undeployedClsLdr, boolean info) { - assert undeployedClsLdr != null; - - int undeployWindowCnt = 0; - - Iterator<Object> it = streamerCtx.window().iterator(); - - while (it.hasNext()) { - Object evt = it.next(); - - if (undeployedClsLdr.equals(evt.getClass().getClassLoader())) { - it.remove(); - - undeployWindowCnt++; - } - } - - int undeploySpaceCnt = 0; - - Iterator<Map.Entry<Object, Object>> entryIt = streamerCtx.localSpace().entrySet().iterator(); - - while (entryIt.hasNext()) { - Map.Entry<Object, Object> entry = entryIt.next(); - - if (undeployedClsLdr.equals(entry.getKey().getClass().getClassLoader()) || - undeployedClsLdr.equals(entry.getValue().getClass().getClassLoader())) { - entryIt.remove(); - - undeploySpaceCnt++; - } - } - - if (info) { - if (log.isInfoEnabled() && (undeployWindowCnt > 0 || undeploySpaceCnt > 0)) - log.info("Undeployed all streamer events (if any) for obsolete class loader " + - "[undeployedClsLdr=" + undeployedClsLdr + ", undeployWindowCnt=" + undeployWindowCnt + - ", undeploySpaceCnt=" + undeploySpaceCnt + ']'); - } - } - - /** - * Creates execution request from batch. Will include deployment information if P2P class loading - * is enabled. - * - * @param batch Execution batch. - * @return Execution request. - * @throws IgniteCheckedException If failed. - */ - private Message createExecutionRequest(GridStreamerExecutionBatch batch) - throws IgniteCheckedException { - boolean depEnabled = ctx.deploy().enabled(); - - byte[] batchBytes = ctx.config().getMarshaller().marshal(batch); - - if (!depEnabled) - return new GridStreamerExecutionRequest(true, batchBytes, null, null, null, null, null); - else { - GridPeerDeployAware pda = new StreamerPda(batch.events()); - - GridDeployment dep = ctx.deploy().deploy(pda.deployClass(), pda.classLoader()); - - if (dep == null) - throw new IgniteCheckedException("Failed to get deployment for batch request [batch=" + batch + - ", pda=" + pda + ']'); - - return new GridStreamerExecutionRequest( - false, - batchBytes, - dep.deployMode(), - dep.sampleClassName(), - dep.userVersion(), - dep.participants(), - dep.classLoaderId() - ); - } - } - - /** - * Gets execution batch from execution request. Will explicitly unmarshal batch if P2P class loading - * is enabled. - * - * @param nodeId Node ID. - * @param req Execution request. - * @return Execution batch. - * @throws IgniteCheckedException If unmarshalling failed. - */ - private GridStreamerExecutionBatch executionBatch(UUID nodeId, GridStreamerExecutionRequest req) - throws IgniteCheckedException { - GridDeployment dep = null; - - if (!req.forceLocalDeployment()) { - dep = ctx.deploy().getGlobalDeployment( - req.deploymentMode(), - req.sampleClassName(), - req.sampleClassName(), - req.userVersion(), - nodeId, - req.classLoaderId(), - req.loaderParticipants(), - null); - - if (dep == null) - throw new IgniteCheckedException("Failed to obtain global deployment based on deployment metadata " + - "[nodeId=" + nodeId + ", req=" + req + ']'); - } - - GridStreamerExecutionBatch batch = ctx.config().getMarshaller().unmarshal(req.batchBytes(), - dep != null ? dep.classLoader() : null); - - // Set deployment to check for undeployment after stage execution. - batch.deployment(dep); - - return batch; - } - - /** - * Tries to send message to remote node. - * - * @param dstNodeId Destination node ID. - * @param msg Message to send. - * @throws IgniteCheckedException If failed. - */ - private void sendWithRetries(UUID dstNodeId, Message msg) throws IgniteCheckedException { - for (int i = 0; i < SEND_RETRY_COUNT; i++) { - try { - ctx.io().send(dstNodeId, topic, msg, GridIoPolicy.SYSTEM_POOL); - - return; - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send message to remote node (will retry) [dstNodeId=" + dstNodeId + - ", msg=" + msg + ", err=" + e + ']'); - - if (!ctx.discovery().alive(dstNodeId)) - throw new ClusterTopologyCheckedException("Failed to send message (destination node left grid): " + - dstNodeId); - - if (i == SEND_RETRY_COUNT - 1) - throw e; - - U.sleep(SEND_RETRY_DELAY); - } - } - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(ctx); - U.writeString(out, name); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - ctx = (GridKernalContext)in.readObject(); - name = U.readString(in); - } - - /** - * Reconstructs object on unmarshalling. - * - * @return Reconstructed object. - */ - protected Object readResolve() { - return ctx.stream().streamer(name); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgniteStreamerImpl.class, this); - } - - /** - * Data streamer peer-deploy aware. - */ - private class StreamerPda implements GridPeerDeployAware { - /** */ - private static final long serialVersionUID = 0L; - - /** Deploy class. */ - private Class<?> cls; - - /** Class loader. */ - private ClassLoader ldr; - - /** Collection of objects to detect deploy class and class loader. */ - private Collection<Object> objs; - - /** - * Constructs data streamer peer-deploy aware. - * - * @param objs Collection of objects to detect deploy class and class loader. - */ - private StreamerPda(Collection<Object> objs) { - this.objs = objs; - } - - /** {@inheritDoc} */ - @Override public Class<?> deployClass() { - if (cls == null) { - Class<?> cls0 = null; - - if (depCls != null) - cls0 = depCls; - else { - for (Iterator<Object> it = objs.iterator(); (cls0 == null || U.isJdk(cls0)) && it.hasNext();) - cls0 = U.detectClass(it.next()); - - if (cls0 == null || U.isJdk(cls0)) - cls0 = IgniteStreamerImpl.class; - } - - assert cls0 != null : "Failed to detect deploy class [objs=" + objs + ']'; - - cls = cls0; - } - - return cls; - } - - /** {@inheritDoc} */ - @Override public ClassLoader classLoader() { - if (ldr == null) { - ClassLoader ldr0 = deployClass().getClassLoader(); - - // Safety. - if (ldr0 == null) - ldr0 = U.gridClassLoader(); - - assert ldr0 != null : "Failed to detect classloader [objs=" + objs + ']'; - - ldr = ldr0; - } - - return ldr; - } - } - - /** - * Stage batch worker. - */ - private class BatchWorker extends GridWorker { - /** Batch */ - private GridStreamerExecutionBatch batch; - - /** */ - private StreamerStageWrapper stageWrapper; - - /** Streamer metrics holder. */ - private StreamerMetricsHolder streamerHolder; - - /** Schedule timestamp. */ - private long schedTs; - - /** Stage completion future. */ - private BatchExecutionFuture fut = new BatchExecutionFuture(); - - /** - * Creates worker. - * - * @param batch Batch. - * @param stageWrapper Wrapper. - * @param streamerHolder Stream holder. - */ - private BatchWorker( - GridStreamerExecutionBatch batch, - StreamerStageWrapper stageWrapper, - StreamerMetricsHolder streamerHolder - ) { - super(ctx.gridName(), "streamer-batch-worker-" + batch.stageName(), log); - - assert stageWrapper != null; - - this.batch = batch; - this.stageWrapper = stageWrapper; - this.streamerHolder = streamerHolder; - - schedTs = U.currentTimeMillis(); - - fut.setWorker(this); - } - - /** - * @return Completion future. - */ - public BatchExecutionFuture completionFuture() { - return fut; - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { - try { - long start = U.currentTimeMillis(); - - streamerHolder.onStageExecutionStarted(stageWrapper.index(), start - schedTs); - - long end = 0; - - try { - if (log.isDebugEnabled()) - log.debug("Running streamer stage [stage=" + stageWrapper.name() + - ", futId=" + batch.futureId() + ']'); - - StreamerContext ctxDelegate = new GridStreamerContextDelegate(context(), - stageWrapper.nextStageName()); - - winLock.readLock(); - - Map<String, Collection<?>> res; - - try { - res = stageWrapper.run(ctxDelegate, batch.events()); - } - finally { - winLock.readUnlock(); - } - - // Close window for undeploy event. - GridDeployment dep = batch.deployment(); - - if (dep != null && dep.obsolete()) - unwindUndeploys(dep.classLoader(), false); - - if (res != null) { - for (Map.Entry<String, Collection<?>> entry : res.entrySet()) { - if (entry.getKey() == null) - throw new IgniteCheckedException("Failed to pass events to next stage " + - "(stage name cannot be null)."); - - GridStreamerStageExecutionFuture part = addEvents0( - batch.executionId(), - 0, - batch.executionStartTimeStamp(), - batch.futureId(), - batch.executionNodeIds(), - entry.getKey(), - entry.getValue()); - - if (atLeastOnce) - fut.add(part); - } - } - else { - if (log.isDebugEnabled()) - log.debug("Finished pipeline execution [stage=" + stageWrapper.name() + - ", futId=" + batch.futureId() + ']'); - - end = U.currentTimeMillis(); - - streamerHolder.onPipelineCompleted(end - batch.executionStartTimeStamp(), - batch.executionNodeIds().size()); - } - } - catch (IgniteCheckedException e) { - if (!atLeastOnce) { - notifyFailure(batch.stageName(), batch.events(), e); - - streamerHolder.onStageFailure(stageWrapper.index()); - } - - fut.onDone(e); - } - finally { - if (end == 0) - end = U.currentTimeMillis(); - - streamerHolder.onStageExecutionFinished(stageWrapper.index(), end - start); - } - } - finally { - fut.markInitialized(); - } - } - } - - /** - * Batch execution future. - */ - private static class BatchExecutionFuture extends GridCompoundFuture<Object, Object> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private BatchWorker w; - - /** {@inheritDoc} */ - @Override public boolean cancel() throws IgniteCheckedException { - assert w != null; - - if (!super.cancel()) - return false; - - w.cancel(); - - return true; - } - - /** - * @param w Worker. - */ - public void setWorker(BatchWorker w) { - assert w != null; - - this.w = w; - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerMBeanAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerMBeanAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerMBeanAdapter.java deleted file mode 100644 index 652e0b8..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerMBeanAdapter.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.streamer.*; -import org.jetbrains.annotations.*; - -/** - * Streamer MBean implementation. - */ -public class StreamerMBeanAdapter implements StreamerMBean { - /** Streamer. */ - private IgniteStreamerImpl streamer; - - /** - * @param streamer Streamer. - */ - public StreamerMBeanAdapter(IgniteStreamerImpl streamer) { - this.streamer = streamer; - } - - /** {@inheritDoc} */ - @Nullable @Override public String getName() { - return streamer.name(); - } - - /** {@inheritDoc} */ - @Override public boolean isAtLeastOnce() { - return streamer.atLeastOnce(); - } - - /** {@inheritDoc} */ - @Override public int getStageFutureMapSize() { - return streamer.stageFutureMapSize(); - } - - /** {@inheritDoc} */ - @Override public int getBatchFutureMapSize() { - return streamer.batchFutureMapSize(); - } - - /** {@inheritDoc} */ - @Override public int getStageActiveExecutionCount() { - return streamer.metrics().stageActiveExecutionCount(); - } - - /** {@inheritDoc} */ - @Override public int getStageWaitingExecutionCount() { - return streamer.metrics().stageWaitingExecutionCount(); - } - - /** {@inheritDoc} */ - @Override public long getStageTotalExecutionCount() { - return streamer.metrics().stageTotalExecutionCount(); - } - - /** {@inheritDoc} */ - @Override public long getPipelineMaximumExecutionTime() { - return streamer.metrics().pipelineMaximumExecutionTime(); - } - - /** {@inheritDoc} */ - @Override public long getPipelineMinimumExecutionTime() { - return streamer.metrics().pipelineMinimumExecutionTime(); - } - - /** {@inheritDoc} */ - @Override public long getPipelineAverageExecutionTime() { - return streamer.metrics().pipelineAverageExecutionTime(); - } - - /** {@inheritDoc} */ - @Override public int getPipelineMaximumExecutionNodes() { - return streamer.metrics().pipelineMaximumExecutionNodes(); - } - - /** {@inheritDoc} */ - @Override public int getPipelineMinimumExecutionNodes() { - return streamer.metrics().pipelineMinimumExecutionNodes(); - } - - /** {@inheritDoc} */ - @Override public int getPipelineAverageExecutionNodes() { - return streamer.metrics().pipelineAverageExecutionNodes(); - } - - /** {@inheritDoc} */ - @Override public int getCurrentActiveSessions() { - return streamer.metrics().currentActiveSessions(); - } - - /** {@inheritDoc} */ - @Override public int getMaximumActiveSessions() { - return streamer.metrics().maximumActiveSessions(); - } - - /** {@inheritDoc} */ - @Override public int getFailuresCount() { - return streamer.metrics().failuresCount(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerMetricsAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerMetricsAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerMetricsAdapter.java deleted file mode 100644 index d1e37ba..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerMetricsAdapter.java +++ /dev/null @@ -1,272 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.streamer.*; - -import java.util.*; - -/** - * Streamer metrics adapter. - */ -public class StreamerMetricsAdapter implements StreamerMetrics { - /** */ - private int stageActiveExecCnt; - - /** */ - private int stageWaitingExecCnt; - - /** */ - private long stageTotalExecCnt; - - /** */ - private long pipelineMaxExecTime; - - /** */ - private long pipelineMinExecTime; - - /** */ - private long pipelineAvgExecTime; - - /** */ - private int pipelineMaxExecNodes; - - /** */ - private int pipelineMinExecNodes; - - /** */ - private int pipelineAvgExecNodes; - - /** */ - private long qryMaxExecTime; - - /** */ - private long qryMinExecTime; - - /** */ - private long qryAvgExecTime; - - /** */ - private int qryMaxExecNodes; - - /** */ - private int qryMinExecNodes; - - /** */ - private int qryAvgExecNodes; - - /** */ - private int curActiveSes; - - /** */ - private int maxActiveSes; - - /** */ - private int failuresCnt; - - /** */ - private int execSvcCap; - - /** */ - @GridToStringInclude - private Map<String, StreamerStageMetrics> stageMetrics; - - /** */ - @GridToStringInclude - private Map<String, StreamerWindowMetrics> windowMetrics; - - /** - * Empty constructor. - */ - public StreamerMetricsAdapter() { - // No-op. - } - - /** - * @param metrics Metrics. - */ - public StreamerMetricsAdapter(StreamerMetrics metrics) { - // Preserve alphabetic order for maintenance. - curActiveSes = metrics.currentActiveSessions(); - execSvcCap = metrics.executorServiceCapacity(); - failuresCnt = metrics.failuresCount(); - maxActiveSes = metrics.maximumActiveSessions(); - pipelineAvgExecNodes = metrics.pipelineAverageExecutionNodes(); - pipelineAvgExecTime = metrics.pipelineAverageExecutionTime(); - pipelineMaxExecNodes = metrics.pipelineMaximumExecutionNodes(); - pipelineMaxExecTime = metrics.pipelineMaximumExecutionTime(); - pipelineMinExecNodes = metrics.pipelineMinimumExecutionNodes(); - pipelineMinExecTime = metrics.pipelineMinimumExecutionTime(); - qryAvgExecNodes = metrics.queryAverageExecutionNodes(); - qryAvgExecTime = metrics.queryAverageExecutionTime(); - qryMaxExecNodes = metrics.queryMaximumExecutionNodes(); - qryMaxExecTime = metrics.queryMaximumExecutionTime(); - qryMinExecNodes = metrics.queryMinimumExecutionNodes(); - qryMinExecTime = metrics.queryMinimumExecutionTime(); - stageActiveExecCnt = metrics.stageActiveExecutionCount(); - stageTotalExecCnt = metrics.stageTotalExecutionCount(); - stageWaitingExecCnt = metrics.stageWaitingExecutionCount(); - - // Stage metrics. - Map<String, StreamerStageMetrics> map = U.newLinkedHashMap(metrics.stageMetrics().size()); - - for (StreamerStageMetrics m : metrics.stageMetrics()) - map.put(m.name(), new StreamerStageMetricsAdapter(m)); - - stageMetrics = Collections.unmodifiableMap(map); - - Map<String, StreamerWindowMetrics> map0 = U.newLinkedHashMap(metrics.windowMetrics().size()); - - for (StreamerWindowMetrics m : metrics.windowMetrics()) - map0.put(m.name(), new StreamerWindowMetricsAdapter(m)); - - windowMetrics = Collections.unmodifiableMap(map0); - } - - /** {@inheritDoc} */ - @Override public int stageActiveExecutionCount() { - return stageActiveExecCnt; - } - - /** {@inheritDoc} */ - @Override public int stageWaitingExecutionCount() { - return stageWaitingExecCnt; - } - - /** {@inheritDoc} */ - @Override public long stageTotalExecutionCount() { - return stageTotalExecCnt; - } - - /** {@inheritDoc} */ - @Override public long pipelineMaximumExecutionTime() { - return pipelineMaxExecTime; - } - - /** {@inheritDoc} */ - @Override public long pipelineMinimumExecutionTime() { - return pipelineMinExecTime; - } - - /** {@inheritDoc} */ - @Override public long pipelineAverageExecutionTime() { - return pipelineAvgExecTime; - } - - /** {@inheritDoc} */ - @Override public int pipelineMaximumExecutionNodes() { - return pipelineMaxExecNodes; - } - - /** {@inheritDoc} */ - @Override public int pipelineMinimumExecutionNodes() { - return pipelineMinExecNodes; - } - - /** {@inheritDoc} */ - @Override public int pipelineAverageExecutionNodes() { - return pipelineAvgExecNodes; - } - - /** {@inheritDoc} */ - @Override public long queryMaximumExecutionTime() { - return qryMaxExecTime; - } - - /** {@inheritDoc} */ - @Override public long queryMinimumExecutionTime() { - return qryMinExecTime; - } - - /** {@inheritDoc} */ - @Override public long queryAverageExecutionTime() { - return qryAvgExecTime; - } - - /** {@inheritDoc} */ - @Override public int queryMaximumExecutionNodes() { - return qryMaxExecNodes; - } - - /** {@inheritDoc} */ - @Override public int queryMinimumExecutionNodes() { - return qryMinExecNodes; - } - - /** {@inheritDoc} */ - @Override public int queryAverageExecutionNodes() { - return qryAvgExecNodes; - } - - /** {@inheritDoc} */ - @Override public int currentActiveSessions() { - return curActiveSes; - } - - /** {@inheritDoc} */ - @Override public int maximumActiveSessions() { - return maxActiveSes; - } - - /** {@inheritDoc} */ - @Override public int failuresCount() { - return failuresCnt; - } - - /** {@inheritDoc} */ - @Override public int executorServiceCapacity() { - return execSvcCap; - } - - /** {@inheritDoc} */ - @Override public StreamerStageMetrics stageMetrics(String stageName) { - StreamerStageMetrics metrics = stageMetrics.get(stageName); - - if (metrics == null) - throw new IllegalArgumentException("Streamer stage is not configured: " + stageName); - - return metrics; - } - - /** {@inheritDoc} */ - @Override public Collection<StreamerStageMetrics> stageMetrics() { - return stageMetrics.values(); - } - - /** {@inheritDoc} */ - @Override public StreamerWindowMetrics windowMetrics(String winName) { - StreamerWindowMetrics metrics = windowMetrics.get(winName); - - if (metrics == null) - throw new IllegalArgumentException("Streamer window is not configured: " + winName); - - return metrics; - } - - /** {@inheritDoc} */ - @Override public Collection<StreamerWindowMetrics> windowMetrics() { - return windowMetrics.values(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(StreamerMetricsAdapter.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerMetricsHolder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerMetricsHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerMetricsHolder.java deleted file mode 100644 index c130510..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerMetricsHolder.java +++ /dev/null @@ -1,424 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.streamer.*; -import org.jdk8.backport.*; - -import java.util.*; - -/** - * Metrics holder. - * - * Note that for current active stages we use maximum active stages over last second. - */ -public class StreamerMetricsHolder implements StreamerMetrics { - /** Max active stages over last minute. */ - private GridAtomicInteger stageActiveMaxLastSec = new GridAtomicInteger(); - - /** Last stage max update ts. */ - private volatile long lastStageSampleTs; - - /** Number of running stages. */ - private LongAdder stageActiveCnt = new LongAdder(); - - /** Number of waiting stages. */ - private LongAdder stageWaitingCnt = new LongAdder(); - - /** Total number of stages executed. */ - private LongAdder stageTotalCnt = new LongAdder(); - - /** Max exec time. */ - private GridAtomicLong pipelineMaxExecTime = new GridAtomicLong(); - - /** Min exec time. */ - private GridAtomicLong pipelineMinExecTime = new GridAtomicLong(Long.MAX_VALUE); - - /** Pipeline average exec time sampler. */ - private LongAdder pipelineSumExecTime = new LongAdder(); - - /** Max exec nodes. */ - private GridAtomicInteger pipelineMaxExecNodes = new GridAtomicInteger(); - - /** Min exec nodes. */ - private GridAtomicInteger pipelineMinExecNodes = new GridAtomicInteger(Integer.MAX_VALUE); - - /** Avg exec nodes. */ - private LongAdder pipelineSumExecNodes = new LongAdder(); - - /** Total number of pipelines finished on this node. */ - private LongAdder pipelineTotalCnt = new LongAdder(); - - /** Query max exec time. */ - private GridAtomicLong qryMaxExecTime = new GridAtomicLong(); - - /** Query min exec time. */ - private GridAtomicLong qryMinExecTime = new GridAtomicLong(Long.MAX_VALUE); - - /** Query average exec time sampler. */ - private LongAdder qrySumExecTime = new LongAdder(); - - /** Query max exec nodes. */ - private GridAtomicInteger qryMaxExecNodes = new GridAtomicInteger(); - - /** Query min exec nodes. */ - private GridAtomicInteger qryMinExecNodes = new GridAtomicInteger(Integer.MAX_VALUE); - - /** Query avg exec nodes. */ - private LongAdder qrySumExecNodes = new LongAdder(); - - /** Total number of queries finished on this node. */ - private LongAdder qryTotalCnt = new LongAdder(); - - /** Current active sessions. */ - private LongAdder curActiveSessions = new LongAdder(); - - /** Max active sessions. */ - private GridAtomicInteger maxActiveSessions = new GridAtomicInteger(); - - /** Failures count. */ - private LongAdder failuresCnt = new LongAdder(); - - /** Stages metrics. */ - private final StreamerStageMetricsHolder[] stageMetrics; - - /** Stage metrics map. */ - private final Map<String, StreamerStageMetrics> stageMetricsMap; - - /** Window metrics map. */ - private final Map<String, StreamerWindowMetrics> windowMetricsMap; - - /** Executor service capacity. */ - private final int execSvcCap; - - /** - * @param stageMetrics Array of stage metrics holders. - * @param windowMetrics Array of window metrics holders. - * @param execSvcCap Executor service capacity. - */ - public StreamerMetricsHolder( - StreamerStageMetricsHolder[] stageMetrics, - StreamerWindowMetricsHolder[] windowMetrics, - int execSvcCap - ) { - this.execSvcCap = execSvcCap; - this.stageMetrics = stageMetrics; - - Map<String, StreamerStageMetrics> map = new LinkedHashMap<>(); - - for (StreamerStageMetricsHolder holder : stageMetrics) - map.put(holder.name(), holder); - - stageMetricsMap = Collections.unmodifiableMap(map); - - Map<String, StreamerWindowMetrics> map0 = new LinkedHashMap<>(); - - for (StreamerWindowMetricsHolder holder : windowMetrics) - map0.put(holder.name(), holder); - - windowMetricsMap = Collections.unmodifiableMap(map0); - } - - /** {@inheritDoc} */ - @Override public int stageActiveExecutionCount() { - return stageActiveMaxLastSec.get(); - } - - /** {@inheritDoc} */ - @Override public int stageWaitingExecutionCount() { - return stageWaitingCnt.intValue(); - } - - /** {@inheritDoc} */ - @Override public long stageTotalExecutionCount() { - return stageTotalCnt.longValue(); - } - - /** {@inheritDoc} */ - @Override public long pipelineMaximumExecutionTime() { - return pipelineMaxExecTime.get(); - } - - /** {@inheritDoc} */ - @Override public long pipelineMinimumExecutionTime() { - long min = pipelineMinExecTime.get(); - - return min == Long.MAX_VALUE ? 0 : min; - } - - /** {@inheritDoc} */ - @Override public long pipelineAverageExecutionTime() { - long totalTime = pipelineSumExecTime.sum(); - - long execs = pipelineTotalCnt.sum(); - - return execs == 0 ? 0 : totalTime / execs; - } - - /** {@inheritDoc} */ - @Override public int pipelineMaximumExecutionNodes() { - return pipelineMaxExecNodes.get(); - } - - /** {@inheritDoc} */ - @Override public int pipelineMinimumExecutionNodes() { - int min = pipelineMinExecNodes.get(); - - return min == Integer.MAX_VALUE ? 0 : min; - } - - /** {@inheritDoc} */ - @Override public int pipelineAverageExecutionNodes() { - long totalNodes = pipelineSumExecNodes.sum(); - - long execs = pipelineTotalCnt.sum(); - - return execs == 0 ? 0 : (int)(totalNodes / execs); - } - - /** {@inheritDoc} */ - @Override public long queryMaximumExecutionTime() { - return qryMaxExecTime.get(); - } - - /** {@inheritDoc} */ - @Override public long queryMinimumExecutionTime() { - long min = qryMinExecTime.get(); - - return min == Long.MAX_VALUE ? 0 : min; - } - - /** {@inheritDoc} */ - @Override public long queryAverageExecutionTime() { - long totalTime = qrySumExecTime.sum(); - - long execs = qryTotalCnt.sum(); - - return execs == 0 ? 0 : totalTime / execs; - } - - /** {@inheritDoc} */ - @Override public int queryMaximumExecutionNodes() { - return qryMaxExecNodes.get(); - } - - /** {@inheritDoc} */ - @Override public int queryMinimumExecutionNodes() { - int min = qryMinExecNodes.get(); - - return min == Integer.MAX_VALUE ? 0 : min; - } - - /** {@inheritDoc} */ - @Override public int queryAverageExecutionNodes() { - long totalNodes = qrySumExecNodes.sum(); - - long execs = qryTotalCnt.sum(); - - return execs == 0 ? 0 : (int)(totalNodes / execs); - } - - /** {@inheritDoc} */ - @Override public int currentActiveSessions() { - return curActiveSessions.intValue(); - } - - /** {@inheritDoc} */ - @Override public int maximumActiveSessions() { - return maxActiveSessions.get(); - } - - /** {@inheritDoc} */ - @Override public int failuresCount() { - return failuresCnt.intValue(); - } - - /** {@inheritDoc} */ - @Override public int executorServiceCapacity() { - return execSvcCap; - } - - /** {@inheritDoc} */ - @Override public StreamerStageMetrics stageMetrics(String stageName) { - StreamerStageMetrics metrics = stageMetricsMap.get(stageName); - - if (metrics == null) - throw new IllegalArgumentException("Streamer stage is not configured: " + stageName); - - return metrics; - } - - /** {@inheritDoc} */ - @Override public Collection<StreamerStageMetrics> stageMetrics() { - return stageMetricsMap.values(); - } - - /** {@inheritDoc} */ - @Override public StreamerWindowMetrics windowMetrics(String winName) { - StreamerWindowMetrics metrics = windowMetricsMap.get(winName); - - if (metrics == null) - throw new IllegalArgumentException("Streamer window is not configured: " + winName); - - return metrics; - } - - /** {@inheritDoc} */ - @Override public Collection<StreamerWindowMetrics> windowMetrics() { - return windowMetricsMap.values(); - } - - /** - * Stage scheduled callback. - */ - public void onStageScheduled() { - stageWaitingCnt.increment(); - } - - /** - * Stage execution started callback. - * - * @param idx Stage index. - * @param waitTime Stage wait time. - */ - public void onStageExecutionStarted(int idx, long waitTime) { - if (waitTime < 0) - waitTime = 0; - - stageActiveCnt.increment(); - stageWaitingCnt.decrement(); - - stageMetrics[idx].onExecutionStarted(waitTime); - - sampleCurrentStages(); - } - - /** - * Stage execution finished callback. - * - * @param idx Stage index. - * @param execTime Stage execution time. - */ - public void onStageExecutionFinished(int idx, long execTime) { - if (execTime < 0) - execTime = 0; - - stageActiveCnt.decrement(); - - stageTotalCnt.increment(); - - stageMetrics[idx].onExecutionFinished(execTime); - - sampleCurrentStages(); - } - - /** - * Pipeline completed callback. - * - * @param execTime Pipeline execution time. - * @param execNodes Pipeline execution nodes. - */ - public void onPipelineCompleted(long execTime, int execNodes) { - if (execTime < 0) - execTime = 0; - - pipelineMaxExecTime.setIfGreater(execTime); - pipelineMinExecTime.setIfLess(execTime); - pipelineSumExecTime.add(execTime); - - pipelineMaxExecNodes.setIfGreater(execNodes); - pipelineMinExecNodes.setIfLess(execNodes); - pipelineSumExecNodes.add(execNodes); - - pipelineTotalCnt.increment(); - } - - /** - * Query completed callback. - * - * @param execTime Query execution time. - * @param execNodes Query execution nodes. - */ - public void onQueryCompleted(long execTime, int execNodes) { - if (execTime < 0) - execTime = 0; - - qryMaxExecTime.setIfGreater(execTime); - qryMinExecTime.setIfLess(execTime); - qrySumExecTime.add(execTime); - - qryMaxExecNodes.setIfGreater(execNodes); - qryMinExecNodes.setIfLess(execNodes); - qrySumExecNodes.add(execNodes); - - qryTotalCnt.increment(); - } - - /** - * Session started callback. - */ - public void onSessionStarted() { - curActiveSessions.increment(); - - maxActiveSessions.setIfGreater(curActiveSessions.intValue()); - } - - /** - * Session finished callback. - */ - public void onSessionFinished() { - curActiveSessions.decrement(); - } - - /** - * Session failed callback. - */ - public void onSessionFailed() { - curActiveSessions.decrement(); - - failuresCnt.increment(); - } - - /** - * Stage failure callback. - * - * @param idx Stage index. - */ - public void onStageFailure(int idx) { - stageMetrics[idx].onFailure(); - } - - /** - * Samples current sessions. - */ - public void sampleCurrentStages() { - long now = U.currentTimeMillis(); - - int cur = (int)stageActiveCnt.sum(); - - if (now - lastStageSampleTs > 1000) { - stageActiveMaxLastSec.set(cur); - - lastStageSampleTs = now; - } - else - stageActiveMaxLastSec.setIfGreater(cur); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerStageMBeanAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerStageMBeanAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerStageMBeanAdapter.java deleted file mode 100644 index 8c2251b..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerStageMBeanAdapter.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.streamer.*; - -/** - * Streamer stage MBean adapter. - */ -@SuppressWarnings("ConstantConditions") -public class StreamerStageMBeanAdapter implements StreamerStageMBean { - /** Stage name. */ - private String stageName; - - /** Stage class name. */ - private String stageClsName; - - /** */ - private IgniteStreamerImpl streamer; - - /** - * @param stageName Stage name. - * @param stageClsName Stage class name. - * @param streamer Streamer implementation. - */ - public StreamerStageMBeanAdapter(String stageName, String stageClsName, IgniteStreamerImpl streamer) { - this.stageName = stageName; - this.stageClsName = stageClsName; - this.streamer = streamer; - } - - /** {@inheritDoc} */ - @Override public String getName() { - return stageName; - } - - /** {@inheritDoc} */ - @Override public String getStageClassName() { - return stageClsName; - } - - /** {@inheritDoc} */ - @Override public long getMinimumExecutionTime() { - return streamer.metrics().stageMetrics(stageName).minimumExecutionTime(); - } - - /** {@inheritDoc} */ - @Override public long getMaximumExecutionTime() { - return streamer.metrics().stageMetrics(stageName).maximumExecutionTime(); - } - - /** {@inheritDoc} */ - @Override public long getAverageExecutionTime() { - return streamer.metrics().stageMetrics(stageName).averageExecutionTime(); - } - - /** {@inheritDoc} */ - @Override public long getMinimumWaitingTime() { - return streamer.metrics().stageMetrics(stageName).minimumWaitingTime(); - } - - /** {@inheritDoc} */ - @Override public long getMaximumWaitingTime() { - return streamer.metrics().stageMetrics(stageName).maximumWaitingTime(); - } - - /** {@inheritDoc} */ - @Override public long getAverageWaitingTime() { - return streamer.metrics().stageMetrics(stageName).averageWaitingTime(); - } - - /** {@inheritDoc} */ - @Override public long getTotalExecutionCount() { - return streamer.metrics().stageMetrics(stageName).totalExecutionCount(); - } - - /** {@inheritDoc} */ - @Override public int getFailuresCount() { - return streamer.metrics().stageMetrics(stageName).failuresCount(); - } - - /** {@inheritDoc} */ - @Override public boolean isExecuting() { - return streamer.metrics().stageMetrics(stageName).executing(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerStageMetricsAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerStageMetricsAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerStageMetricsAdapter.java deleted file mode 100644 index c5f7a1b..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerStageMetricsAdapter.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.streamer.*; - -/** - * Streamer stage metrics adapter. - */ -public class StreamerStageMetricsAdapter implements StreamerStageMetrics { - /** */ - private String name; - - /** */ - private long minExecTime; - - /** */ - private long maxExecTime; - - /** */ - private long avgExecTime; - - /** */ - private long minWaitTime; - - /** */ - private long maxWaitTime; - - /** */ - private long avgWaitTime; - - /** */ - private long totalExecCnt; - - /** */ - private int failuresCnt; - - /** */ - private boolean executing; - - /** - * Empty constructor. - */ - public StreamerStageMetricsAdapter() { - // No-op. - } - - /** - * @param metrics Metrics. - */ - public StreamerStageMetricsAdapter(StreamerStageMetrics metrics) { - // Preserve alphabetic order for maintenance. - avgExecTime = metrics.averageExecutionTime(); - avgWaitTime = metrics.averageWaitingTime(); - executing = metrics.executing(); - failuresCnt = metrics.failuresCount(); - maxExecTime = metrics.maximumExecutionTime(); - maxWaitTime = metrics.maximumWaitingTime(); - minExecTime = metrics.minimumExecutionTime(); - minWaitTime = metrics.minimumWaitingTime(); - name = metrics.name(); - totalExecCnt = metrics.totalExecutionCount(); - } - - /** {@inheritDoc} */ - @Override public String name() { - return name; - } - - /** {@inheritDoc} */ - @Override public long minimumExecutionTime() { - return minExecTime; - } - - /** {@inheritDoc} */ - @Override public long maximumExecutionTime() { - return maxExecTime; - } - - /** {@inheritDoc} */ - @Override public long averageExecutionTime() { - return avgExecTime; - } - - /** {@inheritDoc} */ - @Override public long totalExecutionCount() { - return totalExecCnt; - } - - /** {@inheritDoc} */ - @Override public long minimumWaitingTime() { - return minWaitTime; - } - - /** {@inheritDoc} */ - @Override public long maximumWaitingTime() { - return maxWaitTime; - } - - /** {@inheritDoc} */ - @Override public long averageWaitingTime() { - return avgWaitTime; - } - - /** {@inheritDoc} */ - @Override public int failuresCount() { - return failuresCnt; - } - - /** {@inheritDoc} */ - @Override public boolean executing() { - return executing; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(StreamerStageMetricsAdapter.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerStageMetricsHolder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerStageMetricsHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerStageMetricsHolder.java deleted file mode 100644 index ed95628..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerStageMetricsHolder.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.internal.util.*; -import org.apache.ignite.streamer.*; -import org.jdk8.backport.*; - -/** - * Streamer stage metrics holder. - */ -public class StreamerStageMetricsHolder implements StreamerStageMetrics { - /** Stage name. */ - private String name; - - /** Minimum execution time. */ - private GridAtomicLong minExecTime = new GridAtomicLong(Long.MAX_VALUE); - - /** Maximum execution time. */ - private GridAtomicLong maxExecTime = new GridAtomicLong(); - - /** Stage execution time sum. */ - private LongAdder sumExecTime = new LongAdder(); - - /** Stage minimum waiting time. */ - private GridAtomicLong minWaitTime = new GridAtomicLong(Long.MAX_VALUE); - - /** Stage maximum waiting time. */ - private GridAtomicLong maxWaitTime = new GridAtomicLong(); - - /** Stage average waiting time sum. */ - private LongAdder sumWaitTime = new LongAdder(); - - /** Total number of times this stage was executed. */ - private LongAdder totalExecCnt = new LongAdder(); - - /** Failures count. */ - private LongAdder failuresCnt = new LongAdder(); - - /** Number of threads executing this stage. */ - private LongAdder curActive = new LongAdder(); - - /** - * @param name Stage name. - */ - public StreamerStageMetricsHolder(String name) { - this.name = name; - } - - /** {@inheritDoc} */ - @Override public String name() { - return name; - } - - /** {@inheritDoc} */ - @Override public long minimumExecutionTime() { - long min = minExecTime.get(); - - return min == Long.MAX_VALUE ? 0 : min; - } - - /** {@inheritDoc} */ - @Override public long maximumExecutionTime() { - return maxExecTime.get(); - } - - /** {@inheritDoc} */ - @Override public long averageExecutionTime() { - long execTime = sumExecTime.sum(); - - long execs = totalExecCnt.sum(); - - return execs == 0 ? 0 : execTime / execs; - } - - /** {@inheritDoc} */ - @Override public long minimumWaitingTime() { - long min = minWaitTime.get(); - - return min == Long.MAX_VALUE ? 0 : min; - } - - /** {@inheritDoc} */ - @Override public long maximumWaitingTime() { - return maxWaitTime.get(); - } - - /** {@inheritDoc} */ - @Override public long averageWaitingTime() { - long waitTime = sumWaitTime.sum(); - - long execs = totalExecCnt.sum(); - - return execs == 0 ? 0 : waitTime / execs; - } - - /** {@inheritDoc} */ - @Override public long totalExecutionCount() { - return totalExecCnt.longValue(); - } - - /** {@inheritDoc} */ - @Override public int failuresCount() { - return failuresCnt.intValue(); - } - - /** {@inheritDoc} */ - @Override public boolean executing() { - return curActive.intValue() > 0; - } - - /** - * Execution started callback. - * - * @param waitTime Wait time. - */ - public void onExecutionStarted(long waitTime) { - if (waitTime < 0) - waitTime = 0; - - curActive.increment(); - - maxWaitTime.setIfGreater(waitTime); - minWaitTime.setIfLess(waitTime); - sumWaitTime.add(waitTime); - } - - /** - * Execution finished callback. - * - * @param execTime Stage execution time. - */ - public void onExecutionFinished(long execTime) { - if (execTime < 0) - execTime = 0; - - curActive.decrement(); - - maxExecTime.setIfGreater(execTime); - minExecTime.setIfLess(execTime); - sumExecTime.add(execTime); - - totalExecCnt.increment(); - } - - /** - * Failure callback. - */ - public void onFailure() { - failuresCnt.increment(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerStageWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerStageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerStageWrapper.java deleted file mode 100644 index 3efb1b0..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerStageWrapper.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.streamer.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Stage wrapper that handles metrics calculation and time measurement. - */ -public class StreamerStageWrapper implements StreamerStage<Object> { - /** Stage delegate. */ - private StreamerStage<Object> delegate; - - /** Stage index. */ - private int idx; - - /** Next stage name. Set after creation. */ - private String nextStageName; - - /** - * @param delegate Delegate stage. - * @param idx Index. - */ - public StreamerStageWrapper(StreamerStage<Object> delegate, int idx) { - this.delegate = delegate; - this.idx = idx; - } - - /** - * @return Stage index. - */ - public int index() { - return idx; - } - - /** - * @return Next stage name in pipeline or {@code null} if this is the last stage. - */ - @Nullable public String nextStageName() { - return nextStageName; - } - - /** - * @param nextStageName Next stage name in pipeline. - */ - public void nextStageName(String nextStageName) { - this.nextStageName = nextStageName; - } - - /** {@inheritDoc} */ - @Override public String name() { - return delegate.name(); - } - - /** {@inheritDoc} */ - @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Object> evts) { - return delegate.run(ctx, evts); - } - - /** - * @return Delegate. - */ - public StreamerStage unwrap() { - return delegate; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(StreamerStageWrapper.class, this); - } -}