This is an automated email from the ASF dual-hosted git repository. dmvolod pushed a commit to branch camel-2.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.x by this push: new 14dcc47 CAMEL-12213: Update camel-thrift to libthrift 0.12.0 14dcc47 is described below commit 14dcc47a718033a0cb52307eaf257c6d03ae494f Author: Dmitry Volodin <dmvo...@gmail.com> AuthorDate: Wed Jan 23 22:36:12 2019 +0300 CAMEL-12213: Update camel-thrift to libthrift 0.12.0 --- .../camel/component/thrift/ThriftConsumer.java | 16 +- .../camel/component/thrift/ThriftEndpoint.java | 1 + .../camel/component/thrift/ThriftProducer.java | 8 +- .../thrift/server/ThriftThreadPoolServer.java | 221 +-------------------- .../thrift/ThriftConsumerSecurityTest.java | 8 +- .../thrift/ThriftProducerSecurityTest.java | 8 +- parent/pom.xml | 2 +- 7 files changed, 36 insertions(+), 228 deletions(-) diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConsumer.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConsumer.java index 395b7a2..ed08011 100644 --- a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConsumer.java +++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConsumer.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.thrift; +import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -40,8 +41,6 @@ import org.apache.thrift.transport.TSSLTransportFactory; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TZlibTransport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Represents Thrift server consumer implementation @@ -96,7 +95,7 @@ public class ThriftConsumer extends DefaultConsumer { } @SuppressWarnings({"rawtypes", "unchecked"}) - protected void initializeServer() throws TTransportException { + protected void initializeServer() throws TTransportException, IOException { Class serverImplementationClass; Object serverImplementationInstance; Object serverProcessor; @@ -117,7 +116,9 @@ public class ThriftConsumer extends DefaultConsumer { } if (configuration.getNegotiationType() == ThriftNegotiationType.SSL && endpoint.isSynchronous()) { + ClassResolver classResolver = endpoint.getCamelContext().getClassResolver(); SSLContextParameters sslParameters = configuration.getSslParameters(); + if (sslParameters == null) { throw new IllegalArgumentException("SSL parameters must be initialized if negotiation type is set to " + configuration.getNegotiationType()); } @@ -132,10 +133,13 @@ public class ThriftConsumer extends DefaultConsumer { : sslParameters.getCipherSuites().getCipherSuite().stream().toArray(String[]::new)); if (ObjectHelper.isNotEmpty(sslParameters.getKeyManagers().getKeyStore().getProvider()) && ObjectHelper.isNotEmpty(sslParameters.getKeyManagers().getKeyStore().getType())) { - sslParams.setKeyStore(sslParameters.getKeyManagers().getKeyStore().getResource(), sslParameters.getKeyManagers().getKeyStore().getPassword(), - sslParameters.getKeyManagers().getKeyStore().getProvider(), sslParameters.getKeyManagers().getKeyStore().getType()); + sslParams.setKeyStore(ResourceHelper.resolveResourceAsInputStream(classResolver, sslParameters.getKeyManagers().getKeyStore().getResource()), + sslParameters.getKeyManagers().getKeyStore().getPassword(), + sslParameters.getKeyManagers().getKeyStore().getProvider(), + sslParameters.getKeyManagers().getKeyStore().getType()); } else { - sslParams.setKeyStore(sslParameters.getKeyManagers().getKeyStore().getResource(), sslParameters.getKeyManagers().getKeyStore().getPassword()); + sslParams.setKeyStore(ResourceHelper.resolveResourceAsInputStream(classResolver, sslParameters.getKeyManagers().getKeyStore().getResource()), + sslParameters.getKeyManagers().getKeyStore().getPassword()); } try { diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftEndpoint.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftEndpoint.java index ffd79c3..53035a4 100644 --- a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftEndpoint.java +++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftEndpoint.java @@ -23,6 +23,7 @@ import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.impl.SynchronousDelegateProducer; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; +import org.apache.camel.support.DefaultEndpoint; /** * The Thrift component allows to call and expose remote procedures (RPC) with diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftProducer.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftProducer.java index 5e27227..481c4f5 100644 --- a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftProducer.java +++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftProducer.java @@ -157,7 +157,7 @@ public class ThriftProducer extends DefaultProducer implements AsyncProcessor { } } - protected void initializeSslTransport() throws TTransportException { + protected void initializeSslTransport() throws TTransportException, IOException { if (!ObjectHelper.isEmpty(configuration.getHost()) && !ObjectHelper.isEmpty(configuration.getPort())) { SSLContextParameters sslParameters = configuration.getSslParameters(); if (sslParameters == null) { @@ -176,8 +176,10 @@ public class ThriftProducer extends DefaultProducer implements AsyncProcessor { : sslParameters.getCipherSuites().getCipherSuite().stream().toArray(String[]::new)); if (ObjectHelper.isNotEmpty(sslParameters.getTrustManagers().getProvider()) && ObjectHelper.isNotEmpty(sslParameters.getTrustManagers().getKeyStore().getType())) { - sslParams.setTrustStore(sslParameters.getTrustManagers().getKeyStore().getResource(), sslParameters.getTrustManagers().getKeyStore().getPassword(), - sslParameters.getTrustManagers().getProvider(), sslParameters.getTrustManagers().getKeyStore().getType()); + sslParams.setTrustStore(ResourceHelper.resolveResourceAsInputStream(classResolver, sslParameters.getTrustManagers().getKeyStore().getResource()), + sslParameters.getTrustManagers().getKeyStore().getPassword(), + sslParameters.getTrustManagers().getProvider(), + sslParameters.getTrustManagers().getKeyStore().getType()); } else { sslParams.setTrustStore(sslParameters.getTrustManagers().getKeyStore().getResource(), sslParameters.getTrustManagers().getKeyStore().getPassword()); } diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftThreadPoolServer.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftThreadPoolServer.java index 2c99c98..65ca03a 100644 --- a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftThreadPoolServer.java +++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftThreadPoolServer.java @@ -16,72 +16,26 @@ */ package org.apache.camel.component.thrift.server; -import java.util.Random; import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; import org.apache.camel.CamelContext; -import org.apache.thrift.TException; -import org.apache.thrift.TProcessor; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.server.ServerContext; -import org.apache.thrift.server.TServer; -import org.apache.thrift.server.TServerEventHandler; -import org.apache.thrift.transport.TSaslTransportException; +import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerTransport; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + /* * Thrift ThreadPoolServer implementation with executors controlled by the Camel Executor Service Manager */ -public class ThriftThreadPoolServer extends TServer { - private static final Logger LOGGER = LoggerFactory.getLogger(ThriftThreadPoolServer.class.getName()); +public class ThriftThreadPoolServer extends TThreadPoolServer { - public static class Args extends AbstractServerArgs<Args> { - private ExecutorService executorService; + public static class Args extends TThreadPoolServer.Args { private ExecutorService startThreadPool; private CamelContext context; - - private int requestTimeout = 20; - private TimeUnit requestTimeoutUnit = TimeUnit.SECONDS; - private int beBackoffSlotLength = 100; - private TimeUnit beBackoffSlotLengthUnit = TimeUnit.MILLISECONDS; - + public Args(TServerTransport transport) { super(transport); } - public Args requestTimeout(int n) { - requestTimeout = n; - return this; - } - - public Args requestTimeoutUnit(TimeUnit tu) { - requestTimeoutUnit = tu; - return this; - } - - // Binary exponential backoff slot length - public Args beBackoffSlotLength(int n) { - beBackoffSlotLength = n; - return this; - } - - // Binary exponential backoff slot time unit - public Args beBackoffSlotLengthUnit(TimeUnit tu) { - beBackoffSlotLengthUnit = tu; - return this; - } - - public Args executorService(ExecutorService executorService) { - this.executorService = executorService; - return this; - } - public Args startThreadPool(ExecutorService startThreadPool) { this.startThreadPool = startThreadPool; return this; @@ -94,188 +48,35 @@ public class ThriftThreadPoolServer extends TServer { } // Executor service for handling client connections - private final ExecutorService invoker; private final CamelContext context; private final ExecutorService startExecutor; - private final TimeUnit requestTimeoutUnit; - - private final long requestTimeout; - - private final long beBackoffSlotInMillis; - - private Random random = new Random(System.currentTimeMillis()); public ThriftThreadPoolServer(Args args) { super(args); - requestTimeoutUnit = args.requestTimeoutUnit; - requestTimeout = args.requestTimeout; - beBackoffSlotInMillis = args.beBackoffSlotLengthUnit.toMillis(args.beBackoffSlotLength); - context = args.context; - invoker = args.executorService; startExecutor = args.startThreadPool; } + @Override public void serve() { - try { - serverTransport_.listen(); - } catch (TTransportException ttx) { - LOGGER.error("Error occurred during listening.", ttx); + if (!preServe()) { return; } - // Run the preServe event - if (eventHandler_ != null) { - eventHandler_.preServe(); - } - startExecutor.execute(() -> { - stopped_ = false; - setServing(true); - + execute(); waitForShutdown(); - context.getExecutorServiceManager().shutdownGraceful(invoker); + context.getExecutorServiceManager().shutdownGraceful(getExecutorService()); setServing(false); }); } - private void waitForShutdown() { - while (!stopped_) { - try { - TTransport client = serverTransport_.accept(); - WorkerProcess wp = new WorkerProcess(client); - - int retryCount = 0; - long remainTimeInMillis = requestTimeoutUnit.toMillis(requestTimeout); - while (true) { - try { - invoker.execute(wp); - break; - } catch (Throwable t) { - if (t instanceof RejectedExecutionException) { - retryCount++; - try { - if (remainTimeInMillis > 0) { - // do a truncated 20 binary exponential - // backoff sleep - long sleepTimeInMillis = ((long)(random.nextDouble() * (1L << Math.min(retryCount, 20)))) * beBackoffSlotInMillis; - sleepTimeInMillis = Math.min(sleepTimeInMillis, remainTimeInMillis); - TimeUnit.MILLISECONDS.sleep(sleepTimeInMillis); - remainTimeInMillis = remainTimeInMillis - sleepTimeInMillis; - } else { - client.close(); - wp = null; - LOGGER.warn("Task has been rejected by ExecutorService " + retryCount + " times till timedout, reason: " + t); - break; - } - } catch (InterruptedException e) { - LOGGER.warn("Interrupted while waiting to place client on executor queue."); - Thread.currentThread().interrupt(); - break; - } - } else if (t instanceof Error) { - LOGGER.error("ExecutorService threw error: " + t, t); - throw (Error)t; - } else { - // for other possible runtime errors from - // ExecutorService, should also not kill serve - LOGGER.warn("ExecutorService threw error: " + t, t); - break; - } - } - } - } catch (TTransportException ttx) { - if (!stopped_) { - LOGGER.warn("Transport error occurred during acceptance of message.", ttx); - } - } - } - } - + @Override public void stop() { - stopped_ = true; - serverTransport_.interrupt(); + super.stop(); context.getExecutorServiceManager().shutdownGraceful(startExecutor); } - - private final class WorkerProcess implements Runnable { - - /** - * Client that this services. - */ - private TTransport client; - - /** - * Default constructor. - * - * @param client Transport to process - */ - private WorkerProcess(TTransport client) { - this.client = client; - } - - /** - * Loops on processing a client forever - */ - public void run() { - TProcessor processor = null; - TTransport inputTransport = null; - TTransport outputTransport = null; - TProtocol inputProtocol = null; - TProtocol outputProtocol = null; - - TServerEventHandler eventHandler = null; - ServerContext connectionContext = null; - - try { - processor = processorFactory_.getProcessor(client); - inputTransport = inputTransportFactory_.getTransport(client); - outputTransport = outputTransportFactory_.getTransport(client); - inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); - outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); - - eventHandler = getEventHandler(); - if (eventHandler != null) { - connectionContext = eventHandler.createContext(inputProtocol, outputProtocol); - } - // we check stopped_ first to make sure we're not supposed to be - // shutting - // down. this is necessary for graceful shutdown. - while (true) { - - if (eventHandler != null) { - eventHandler.processContext(connectionContext, inputTransport, outputTransport); - } - - if (stopped_ || !processor.process(inputProtocol, outputProtocol)) { - break; - } - } - } catch (TSaslTransportException ttx) { - // Something thats not SASL was in the stream, continue silently - } catch (TTransportException ttx) { - // Assume the client died and continue silently - } catch (TException tx) { - LOGGER.error("Thrift error occurred during processing of message.", tx); - } catch (Exception x) { - LOGGER.error("Error occurred during processing of message.", x); - } finally { - if (eventHandler != null) { - eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol); - } - if (inputTransport != null) { - inputTransport.close(); - } - if (outputTransport != null) { - outputTransport.close(); - } - if (client.isOpen()) { - client.close(); - } - } - } - } } diff --git a/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerSecurityTest.java b/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerSecurityTest.java index dd316f7..a85486d 100644 --- a/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerSecurityTest.java +++ b/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerSecurityTest.java @@ -45,8 +45,8 @@ public class ThriftConsumerSecurityTest extends CamelTestSupport { private static final int THRIFT_TEST_PORT = AvailablePortFinder.getNextAvailable(); private static final int THRIFT_TEST_NUM1 = 12; private static final int THRIFT_TEST_NUM2 = 13; - private static final String TRUST_STORE_PATH = "src/test/resources/certs/truststore.jks"; - private static final String KEY_STORE_PATH = "src/test/resources/certs/keystore.jks"; + private static final String TRUST_STORE_RESOURCE = "file:src/test/resources/certs/truststore.jks"; + private static final String KEY_STORE_RESOURCE = "file:src/test/resources/certs/keystore.jks"; private static final String SECURITY_STORE_PASSWORD = "camelinaction"; private static final int THRIFT_CLIENT_TIMEOUT = 2000; @@ -62,7 +62,7 @@ public class ThriftConsumerSecurityTest extends CamelTestSupport { TSSLTransportFactory.TSSLTransportParameters sslParams = new TSSLTransportFactory.TSSLTransportParameters(); - sslParams.setTrustStore(TRUST_STORE_PATH, SECURITY_STORE_PASSWORD); + sslParams.setTrustStore(TRUST_STORE_RESOURCE, SECURITY_STORE_PASSWORD); transport = TSSLTransportFactory.getClientSocket("localhost", THRIFT_TEST_PORT, THRIFT_CLIENT_TIMEOUT, sslParams); protocol = new TBinaryProtocol(transport); @@ -86,7 +86,7 @@ public class ThriftConsumerSecurityTest extends CamelTestSupport { SSLContextParameters sslParameters = new SSLContextParameters(); KeyStoreParameters keyStoreParams = new KeyStoreParameters(); - keyStoreParams.setResource(KEY_STORE_PATH); + keyStoreParams.setResource(KEY_STORE_RESOURCE); keyStoreParams.setPassword(SECURITY_STORE_PASSWORD); KeyManagersParameters keyManagerParams = new KeyManagersParameters(); diff --git a/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftProducerSecurityTest.java b/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftProducerSecurityTest.java index 9038065..bf503e7 100644 --- a/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftProducerSecurityTest.java +++ b/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftProducerSecurityTest.java @@ -59,8 +59,8 @@ public class ThriftProducerSecurityTest extends CamelTestSupport { private static final int THRIFT_TEST_NUM1 = 12; private static final int THRIFT_TEST_NUM2 = 13; - private static final String TRUST_STORE_PATH = "src/test/resources/certs/truststore.jks"; - private static final String KEY_STORE_PATH = "src/test/resources/certs/keystore.jks"; + private static final String TRUST_STORE_SOURCE = "file:src/test/resources/certs/truststore.jks"; + private static final String KEY_STORE_SOURCE = "file:src/test/resources/certs/keystore.jks"; private static final String SECURITY_STORE_PASSWORD = "camelinaction"; private static final int THRIFT_CLIENT_TIMEOUT = 2000; @@ -71,7 +71,7 @@ public class ThriftProducerSecurityTest extends CamelTestSupport { TSSLTransportFactory.TSSLTransportParameters sslParams = new TSSLTransportFactory.TSSLTransportParameters(); - sslParams.setKeyStore(KEY_STORE_PATH, SECURITY_STORE_PASSWORD); + sslParams.setKeyStore(KEY_STORE_SOURCE, SECURITY_STORE_PASSWORD); serverTransport = TSSLTransportFactory.getServerSocket(THRIFT_TEST_PORT, THRIFT_CLIENT_TIMEOUT, InetAddress.getByName("localhost"), sslParams); TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport); args.processor(processor); @@ -101,7 +101,7 @@ public class ThriftProducerSecurityTest extends CamelTestSupport { SSLContextParameters sslParameters = new SSLContextParameters(); KeyStoreParameters keyStoreParams = new KeyStoreParameters(); - keyStoreParams.setResource(TRUST_STORE_PATH); + keyStoreParams.setResource(TRUST_STORE_SOURCE); keyStoreParams.setPassword(SECURITY_STORE_PASSWORD); TrustManagersParameters trustManagerParams = new TrustManagersParameters(); diff --git a/parent/pom.xml b/parent/pom.xml index 268c925..e35280d 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -393,7 +393,7 @@ <jgroups-raft-jgroups-version>4.0.15.Final</jgroups-raft-jgroups-version> <jgroups-raft-leveldbjni-version>1.8</jgroups-raft-leveldbjni-version> <jgroups-raft-mapdb-version>1.0.8</jgroups-raft-mapdb-version> - <libthrift-version>0.11.0</libthrift-version> + <libthrift-version>0.12.0</libthrift-version> <jibx-version>1.2.6</jibx-version> <jing-bundle-version>20030619_5</jing-bundle-version> <jing-version>20030619</jing-version>