http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4187cb06/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/IgniteScheduleProcessor.java ---------------------------------------------------------------------- diff --git a/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/IgniteScheduleProcessor.java b/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/IgniteScheduleProcessor.java new file mode 100644 index 0000000..c3a62f2 --- /dev/null +++ b/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/IgniteScheduleProcessor.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.schedule; + +import it.sauronsoftware.cron4j.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.scheduler.*; +import org.apache.ignite.internal.util.typedef.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * Schedules cron-based execution of grid tasks and closures. + */ +public class IgniteScheduleProcessor extends IgniteScheduleProcessorAdapter { + /** Cron scheduler. */ + private Scheduler sched; + + /** Schedule futures. */ + private Set<SchedulerFuture<?>> schedFuts = new GridConcurrentHashSet<>(); + + /** + * @param ctx Kernal context. + */ + public IgniteScheduleProcessor(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public SchedulerFuture<?> schedule(final Runnable c, String ptrn) { + assert c != null; + assert ptrn != null; + + ScheduleFutureImpl<Object> fut = new ScheduleFutureImpl<>(sched, ctx, ptrn); + + fut.schedule(new IgniteCallable<Object>() { + @Nullable @Override public Object call() { + c.run(); + + return null; + } + }); + + return fut; + } + + /** {@inheritDoc} */ + @Override public <R> SchedulerFuture<R> schedule(Callable<R> c, String pattern) { + assert c != null; + assert pattern != null; + + ScheduleFutureImpl<R> fut = new ScheduleFutureImpl<>(sched, ctx, pattern); + + fut.schedule(c); + + return fut; + } + + /** + * + * @return Future objects of currently scheduled active(not finished) tasks. + */ + public Collection<SchedulerFuture<?>> getScheduledFutures() { + return Collections.unmodifiableList(new ArrayList<>(schedFuts)); + } + + /** + * Removes future object from the collection of scheduled futures. + * + * @param fut Future object. + */ + void onDescheduled(SchedulerFuture<?> fut) { + assert fut != null; + + schedFuts.remove(fut); + } + + /** + * Adds future object to the collection of scheduled futures. + * + * @param fut Future object. + */ + void onScheduled(SchedulerFuture<?> fut) { + assert fut != null; + + schedFuts.add(fut); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + sched = new Scheduler(); + + sched.start(); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) throws IgniteCheckedException { + if (sched.isStarted()) + sched.stop(); + + sched = null; + } + + /** {@inheritDoc} */ + @Override public void printMemoryStats() { + X.println(">>>"); + X.println(">>> Schedule processor memory stats [grid=" + ctx.gridName() + ']'); + X.println(">>> schedFutsSize: " + schedFuts.size()); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4187cb06/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java ---------------------------------------------------------------------- diff --git a/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java b/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java index e730dc3..076ec1a 100644 --- a/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java +++ b/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java @@ -309,7 +309,7 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R>, Externalizable { this.task = task; - ((GridScheduleProcessor)ctx.schedule()).onScheduled(this); + ((IgniteScheduleProcessor)ctx.schedule()).onScheduled(this); if (delay > 0) { // Schedule after delay. @@ -351,7 +351,7 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R>, Externalizable { if (descheduled.compareAndSet(false, true)) { sched.deschedule(id); - ((GridScheduleProcessor)ctx.schedule()).onDescheduled(this); + ((IgniteScheduleProcessor)ctx.schedule()).onDescheduled(this); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4187cb06/modules/ssh/src/main/java/org/apache/ignite/internal/util/nodestart/GridSshProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/ssh/src/main/java/org/apache/ignite/internal/util/nodestart/GridSshProcessorImpl.java b/modules/ssh/src/main/java/org/apache/ignite/internal/util/nodestart/GridSshProcessorImpl.java deleted file mode 100644 index c712bc7..0000000 --- a/modules/ssh/src/main/java/org/apache/ignite/internal/util/nodestart/GridSshProcessorImpl.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.nodestart; - -/** - * Implementation of {@link GridSshProcessor}. - */ -public class GridSshProcessorImpl implements GridSshProcessor { - /** {@inheritDoc} */ - @Override public GridNodeCallable nodeStartCallable(GridRemoteStartSpecification spec, int timeout) { - return new GridNodeCallableImpl(spec, timeout); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4187cb06/modules/ssh/src/main/java/org/apache/ignite/internal/util/nodestart/IgniteSshProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/ssh/src/main/java/org/apache/ignite/internal/util/nodestart/IgniteSshProcessorImpl.java b/modules/ssh/src/main/java/org/apache/ignite/internal/util/nodestart/IgniteSshProcessorImpl.java new file mode 100644 index 0000000..4d867c0 --- /dev/null +++ b/modules/ssh/src/main/java/org/apache/ignite/internal/util/nodestart/IgniteSshProcessorImpl.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.nodestart; + +/** + * Implementation of {@link IgniteSshProcessor}. + */ +public class IgniteSshProcessorImpl implements IgniteSshProcessor { + /** {@inheritDoc} */ + @Override public GridNodeCallable nodeStartCallable(GridRemoteStartSpecification spec, int timeout) { + return new GridNodeCallableImpl(spec, timeout); + } +}