This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-13636
in repository https://gitbox.apache.org/repos/asf/camel.git

commit fefaf8e616daf3c98ee95181e6553f9740ce23ba
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Jun 12 14:15:24 2019 +0200

    CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different 
reactive engines
---
 .../camel/impl/engine/DefaultReactiveExecutor.java | 133 ++++++++++++++++++---
 .../org/apache/camel/support/ReactiveHelper.java   |   1 +
 2 files changed, 119 insertions(+), 15 deletions(-)

diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index 0448037..3ce7f0a 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -1,13 +1,13 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,55 +16,158 @@
  */
 package org.apache.camel.impl.engine;
 
+import java.util.LinkedList;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.spi.ReactiveExecutor;
-import org.apache.camel.support.ReactiveHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Default {@link ReactiveExecutor}.
  */
 public class DefaultReactiveExecutor implements ReactiveExecutor {
 
-    // TODO: ReactiveHelper code should be moved here and not static
-    // ppl should use the SPI interface
+    // TODO: StaticServiceSupport so we can init/start/stop
+    // TODO: Add mbean info so we can get details
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultReactiveExecutor.class);
+
+    private final ThreadLocal<Worker> workers = 
ThreadLocal.withInitial(Worker::new);
 
     @Override
     public void scheduleMain(Runnable runnable) {
-        ReactiveHelper.scheduleMain(runnable);
+        workers.get().schedule(runnable, true, true, false);
     }
 
     @Override
     public void scheduleSync(Runnable runnable) {
-        ReactiveHelper.scheduleSync(runnable);
+        workers.get().schedule(runnable, true, true, true);
     }
 
     @Override
     public void scheduleMain(Runnable runnable, String description) {
-        ReactiveHelper.scheduleMain(runnable, description);
+        workers.get().schedule(describe(runnable, description), true, true, 
false);
     }
 
     @Override
     public void schedule(Runnable runnable) {
-        ReactiveHelper.schedule(runnable);
+        workers.get().schedule(runnable, true, false, false);;
     }
 
     @Override
     public void schedule(Runnable runnable, String description) {
-        ReactiveHelper.schedule(runnable, description);
+        workers.get().schedule(describe(runnable, description), true, false, 
false);
     }
 
     @Override
     public void scheduleSync(Runnable runnable, String description) {
-        ReactiveHelper.scheduleSync(runnable, description);
+        workers.get().schedule(describe(runnable, description), false, true, 
true);
     }
 
     @Override
     public boolean executeFromQueue() {
-        return ReactiveHelper.executeFromQueue();
+        return workers.get().executeFromQueue();
     }
 
     @Override
     public void callback(AsyncCallback callback) {
-        ReactiveHelper.callback(callback);
+        schedule(new Runnable() {
+            @Override
+            public void run() {
+                callback.done(false);
+            }
+            @Override
+            public String toString() {
+                return "Callback[" + callback + "]";
+            }
+        });
     }
+
+    private static Runnable describe(Runnable runnable, String description) {
+        return new Runnable() {
+            @Override
+            public void run() {
+                runnable.run();
+            }
+            @Override
+            public String toString() {
+                return description;
+            }
+        };
+    }
+
+    private static class Worker {
+
+        private volatile LinkedList<Runnable> queue = new LinkedList<>();
+        private volatile LinkedList<LinkedList<Runnable>> back;
+        private volatile boolean running;
+
+        public void schedule(Runnable runnable, boolean first, boolean main, 
boolean sync) {
+            if (main) {
+                if (!queue.isEmpty()) {
+                    if (back == null) {
+                        back = new LinkedList<>();
+                    }
+                    back.push(queue);
+                    queue = new LinkedList<>();
+                }
+            }
+            if (first) {
+                queue.addFirst(runnable);
+            } else {
+                queue.addLast(runnable);
+            }
+            if (!running || sync) {
+                running = true;
+//                Thread thread = Thread.currentThread();
+//                String name = thread.getName();
+                try {
+                    for (;;) {
+                        final Runnable polled = queue.poll();
+                        if (polled == null) {
+                            if (back != null && !back.isEmpty()) {
+                                queue = back.poll();
+                                continue;
+                            } else {
+                                break;
+                            }
+                        }
+                        try {
+//                            thread.setName(name + " - " + polled.toString());
+                            polled.run();
+                        } catch (Throwable t) {
+                            LOG.warn("Error executing reactive work due to " + 
t.getMessage() + ". This exception is ignored.", t);
+                        }
+                    }
+                } finally {
+//                    thread.setName(name);
+                    running = false;
+                }
+            } else {
+                LOG.debug("Queuing reactive work: {}", runnable);
+            }
+        }
+
+        public boolean executeFromQueue() {
+            final Runnable polled = queue != null ? queue.poll() : null;
+            if (polled == null) {
+                return false;
+            }
+            Thread thread = Thread.currentThread();
+            String name = thread.getName();
+            try {
+                thread.setName(name + " - " + polled.toString());
+                polled.run();
+            } catch (Throwable t) {
+                // should not happen
+                LOG.warn("Error executing reactive work due to " + 
t.getMessage() + ". This exception is ignored.", t);
+            } finally {
+                thread.setName(name);
+            }
+            return true;
+        }
+
+    }
+
 }
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/ReactiveHelper.java 
b/core/camel-support/src/main/java/org/apache/camel/support/ReactiveHelper.java
index 06db5e6..9541371 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/ReactiveHelper.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/ReactiveHelper.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 /**
  * A basic reactive engine that uses a worker pool to process tasks.
  */
+@Deprecated
 public final class ReactiveHelper {
 
     private static final ThreadLocal<Worker> WORKERS = 
ThreadLocal.withInitial(Worker::new);

Reply via email to