This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 7da79c3 [#CAMEL-15813] Add ClientInterceptors to the channel (#4566) 7da79c3 is described below commit 7da79c371ed2880a82959eb446ec469517c9adb5 Author: Tim Sexton <tim.sex...@ibotta.com> AuthorDate: Wed Nov 4 22:44:50 2020 -0700 [#CAMEL-15813] Add ClientInterceptors to the channel (#4566) --- .../org/apache/camel/catalog/components/grpc.json | 2 + .../apache/camel/catalog/docs/grpc-component.adoc | 3 +- .../component/grpc/GrpcEndpointConfigurer.java | 5 + .../component/grpc/GrpcEndpointUriFactory.java | 3 +- .../org/apache/camel/component/grpc/grpc.json | 1 + .../camel-grpc/src/main/docs/grpc-component.adoc | 3 +- .../apache/camel/component/grpc/GrpcComponent.java | 13 ++- .../camel/component/grpc/GrpcConfiguration.java | 32 ++++++ .../apache/camel/component/grpc/GrpcProducer.java | 1 + .../component/grpc/GrpcMockClientInterceptor.java | 28 ++++++ .../grpc/GrpcProducerClientInterceptorTest.java | 110 +++++++++++++++++++++ .../endpoint/dsl/GrpcEndpointBuilderFactory.java | 90 +++++++++++++++++ .../modules/ROOT/pages/grpc-component.adoc | 3 +- 13 files changed, 289 insertions(+), 5 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/grpc.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/grpc.json index b789938..6e4e5d0 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/grpc.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/grpc.json @@ -30,6 +30,7 @@ "host": { "kind": "path", "displayName": "Host", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "secret": false, "configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration", "configurationField": "configuration", "description": "The gRPC server host name. This is localhost or 0.0.0.0 when being a consumer or remote server host name when using producer." }, "port": { "kind": "path", "displayName": "Port", "group": "common", "label": "", "required": true, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "secret": false, "configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration", "configurationField": "configuration", "description": "The gRPC local or remote server port" }, "service": { "kind": "path", "displayName": "Service", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "secret": false, "configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration", "configurationField": "configuration", "description": "Fully qualified service name from the protocol buffer descriptor file (package dot service definition name)" }, + "autoDiscoverClientInterceptors": { "kind": "parameter", "displayName": "Auto Discover Client Interceptors", "group": "common", "label": "common", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration", "configurationField": "configuration", "description": "Setting the autoDiscoverClientInterceptors mechanism, if true, the component will look for [...] "flowControlWindow": { "kind": "parameter", "displayName": "Flow Control Window", "group": "common", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": 1048576, "configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration", "configurationField": "configuration", "description": "The HTTP\/2 flow control window size (MiB)" }, "maxMessageSize": { "kind": "parameter", "displayName": "Max Message Size", "group": "common", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": 4194304, "configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration", "configurationField": "configuration", "description": "The maximum message size allowed to be received\/sent (MiB)" }, "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled b [...] @@ -40,6 +41,7 @@ "routeControlledStreamObserver": { "kind": "parameter", "displayName": "Route Controlled Stream Observer", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration", "configurationField": "configuration", "description": "Lets the route to take control over stream observer. If this value is set to true, then [...] "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with [...] "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, + "clientInterceptors": { "kind": "parameter", "displayName": "Client Interceptors", "group": "producer", "label": "producer", "required": false, "type": "array", "javaType": "java.util.List<io.grpc.ClientInterceptor>", "multiValue": true, "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration", "configurationField": "configuration", "description": "Setting the client interceptors on the netty channel in order to intercept outgoin [...] "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the [...] "method": { "kind": "parameter", "displayName": "Method", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration", "configurationField": "configuration", "description": "gRPC method name" }, "producerStrategy": { "kind": "parameter", "displayName": "Producer Strategy", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.camel.component.grpc.GrpcProducerStrategy", "enum": [ "SIMPLE", "STREAMING" ], "deprecated": false, "secret": false, "defaultValue": "SIMPLE", "configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration", "configurationField": "configuration", "description": "The mode used to communicate with [...] diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/grpc-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/grpc-component.adoc index fbe662a..a539719 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/grpc-component.adoc +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/grpc-component.adoc @@ -83,12 +83,13 @@ with the following path and query parameters: |=== -=== Query Parameters (28 parameters): +=== Query Parameters (29 parameters): [width="100%",cols="2,5,^1,2",options="header"] |=== | Name | Description | Default | Type +| *autoDiscoverClientInterceptors* (common) | Setting the autoDiscoverClientInterceptors mechanism, if true, the component will look for a ClientInterceptor instance in the registry automatically otherwise it will skip that checking. | true | boolean | *flowControlWindow* (common) | The HTTP/2 flow control window size (MiB) | 1048576 | int | *maxMessageSize* (common) | The maximum message size allowed to be received/sent (MiB) | 4194304 | int | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean diff --git a/components/camel-grpc/src/generated/java/org/apache/camel/component/grpc/GrpcEndpointConfigurer.java b/components/camel-grpc/src/generated/java/org/apache/camel/component/grpc/GrpcEndpointConfigurer.java index b11d35a..a2f14ae 100644 --- a/components/camel-grpc/src/generated/java/org/apache/camel/component/grpc/GrpcEndpointConfigurer.java +++ b/components/camel-grpc/src/generated/java/org/apache/camel/component/grpc/GrpcEndpointConfigurer.java @@ -22,6 +22,7 @@ public class GrpcEndpointConfigurer extends PropertyConfigurerSupport implements map.put("host", java.lang.String.class); map.put("port", int.class); map.put("service", java.lang.String.class); + map.put("autoDiscoverClientInterceptors", boolean.class); map.put("flowControlWindow", int.class); map.put("maxMessageSize", int.class); map.put("bridgeErrorHandler", boolean.class); @@ -60,6 +61,8 @@ public class GrpcEndpointConfigurer extends PropertyConfigurerSupport implements switch (ignoreCase ? name.toLowerCase() : name) { case "authenticationtype": case "authenticationType": target.getConfiguration().setAuthenticationType(property(camelContext, org.apache.camel.component.grpc.GrpcAuthType.class, value)); return true; + case "autodiscoverclientinterceptors": + case "autoDiscoverClientInterceptors": target.getConfiguration().setAutoDiscoverClientInterceptors(property(camelContext, boolean.class, value)); return true; case "basicpropertybinding": case "basicPropertyBinding": target.setBasicPropertyBinding(property(camelContext, boolean.class, value)); return true; case "bridgeerrorhandler": @@ -134,6 +137,8 @@ public class GrpcEndpointConfigurer extends PropertyConfigurerSupport implements switch (ignoreCase ? name.toLowerCase() : name) { case "authenticationtype": case "authenticationType": return target.getConfiguration().getAuthenticationType(); + case "autodiscoverclientinterceptors": + case "autoDiscoverClientInterceptors": return target.getConfiguration().isAutoDiscoverClientInterceptors(); case "basicpropertybinding": case "basicPropertyBinding": return target.isBasicPropertyBinding(); case "bridgeerrorhandler": diff --git a/components/camel-grpc/src/generated/java/org/apache/camel/component/grpc/GrpcEndpointUriFactory.java b/components/camel-grpc/src/generated/java/org/apache/camel/component/grpc/GrpcEndpointUriFactory.java index 174d9af..e8a7ad0 100644 --- a/components/camel-grpc/src/generated/java/org/apache/camel/component/grpc/GrpcEndpointUriFactory.java +++ b/components/camel-grpc/src/generated/java/org/apache/camel/component/grpc/GrpcEndpointUriFactory.java @@ -20,7 +20,7 @@ public class GrpcEndpointUriFactory extends org.apache.camel.support.component.E private static final Set<String> PROPERTY_NAMES; private static final Set<String> SECRET_PROPERTY_NAMES; static { - Set<String> props = new HashSet<>(31); + Set<String> props = new HashSet<>(32); props.add("basicPropertyBinding"); props.add("serviceAccountResource"); props.add("synchronous"); @@ -37,6 +37,7 @@ public class GrpcEndpointUriFactory extends org.apache.camel.support.component.E props.add("jwtSecret"); props.add("keyResource"); props.add("method"); + props.add("autoDiscoverClientInterceptors"); props.add("exchangePattern"); props.add("streamRepliesTo"); props.add("userAgent"); diff --git a/components/camel-grpc/src/generated/resources/org/apache/camel/component/grpc/grpc.json b/components/camel-grpc/src/generated/resources/org/apache/camel/component/grpc/grpc.json index b789938..7ee2fe0 100644 --- a/components/camel-grpc/src/generated/resources/org/apache/camel/component/grpc/grpc.json +++ b/components/camel-grpc/src/generated/resources/org/apache/camel/component/grpc/grpc.json @@ -30,6 +30,7 @@ "host": { "kind": "path", "displayName": "Host", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "secret": false, "configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration", "configurationField": "configuration", "description": "The gRPC server host name. This is localhost or 0.0.0.0 when being a consumer or remote server host name when using producer." }, "port": { "kind": "path", "displayName": "Port", "group": "common", "label": "", "required": true, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "secret": false, "configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration", "configurationField": "configuration", "description": "The gRPC local or remote server port" }, "service": { "kind": "path", "displayName": "Service", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "secret": false, "configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration", "configurationField": "configuration", "description": "Fully qualified service name from the protocol buffer descriptor file (package dot service definition name)" }, + "autoDiscoverClientInterceptors": { "kind": "parameter", "displayName": "Auto Discover Client Interceptors", "group": "common", "label": "common", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration", "configurationField": "configuration", "description": "Setting the autoDiscoverClientInterceptors mechanism, if true, the component will look for [...] "flowControlWindow": { "kind": "parameter", "displayName": "Flow Control Window", "group": "common", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": 1048576, "configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration", "configurationField": "configuration", "description": "The HTTP\/2 flow control window size (MiB)" }, "maxMessageSize": { "kind": "parameter", "displayName": "Max Message Size", "group": "common", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": 4194304, "configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration", "configurationField": "configuration", "description": "The maximum message size allowed to be received\/sent (MiB)" }, "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled b [...] diff --git a/components/camel-grpc/src/main/docs/grpc-component.adoc b/components/camel-grpc/src/main/docs/grpc-component.adoc index fbe662a..a539719 100644 --- a/components/camel-grpc/src/main/docs/grpc-component.adoc +++ b/components/camel-grpc/src/main/docs/grpc-component.adoc @@ -83,12 +83,13 @@ with the following path and query parameters: |=== -=== Query Parameters (28 parameters): +=== Query Parameters (29 parameters): [width="100%",cols="2,5,^1,2",options="header"] |=== | Name | Description | Default | Type +| *autoDiscoverClientInterceptors* (common) | Setting the autoDiscoverClientInterceptors mechanism, if true, the component will look for a ClientInterceptor instance in the registry automatically otherwise it will skip that checking. | true | boolean | *flowControlWindow* (common) | The HTTP/2 flow control window size (MiB) | 1048576 | int | *maxMessageSize* (common) | The maximum message size allowed to be received/sent (MiB) | 4194304 | int | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcComponent.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcComponent.java index 496ecc4..ab5c52c 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcComponent.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcComponent.java @@ -17,8 +17,11 @@ package org.apache.camel.component.grpc; import java.net.URI; +import java.util.ArrayList; import java.util.Map; +import java.util.Set; +import io.grpc.ClientInterceptor; import org.apache.camel.Endpoint; import org.apache.camel.spi.annotations.Component; import org.apache.camel.support.DefaultComponent; @@ -36,16 +39,24 @@ public class GrpcComponent extends DefaultComponent { Endpoint endpoint = new GrpcEndpoint(uri, this, config); setProperties(endpoint, parameters); + if (config.isAutoDiscoverClientInterceptors()) { + checkAndSetRegistryClientInterceptors(config); + } return endpoint; } /** * Parses the configuration - * + * * @return the parsed and valid configuration to use */ protected GrpcConfiguration parseConfiguration(GrpcConfiguration configuration, String remaining) throws Exception { configuration.parseURI(new URI(remaining)); return configuration; } + + private void checkAndSetRegistryClientInterceptors(GrpcConfiguration configuration) { + Set<ClientInterceptor> clientInterceptors = getCamelContext().getRegistry().findByType(ClientInterceptor.class); + configuration.setClientInterceptors(new ArrayList<>(clientInterceptors)); + } } diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java index a8c4e9a..37bee88 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java @@ -17,7 +17,10 @@ package org.apache.camel.component.grpc; import java.net.URI; +import java.util.Collections; +import java.util.List; +import io.grpc.ClientInterceptor; import io.grpc.internal.GrpcUtil; import io.grpc.netty.NegotiationType; import io.grpc.netty.NettyChannelBuilder; @@ -108,6 +111,11 @@ public class GrpcConfiguration { @UriParam(label = "consumer", defaultValue = "false") private boolean routeControlledStreamObserver; + private List<ClientInterceptor> clientInterceptors = Collections.emptyList(); + + @UriParam(label = "common", defaultValue = "true") + private boolean autoDiscoverClientInterceptors = true; + /** * Fully qualified service name from the protocol buffer descriptor file (package dot service definition name) */ @@ -392,6 +400,30 @@ public class GrpcConfiguration { this.maxConcurrentCallsPerConnection = maxConcurrentCallsPerConnection; } + public List<ClientInterceptor> getClientInterceptors() { + return clientInterceptors; + } + + /** + * Setting the client interceptors on the netty channel in order to intercept outgoing calls before they are + * dispatched by the channel. + */ + public void setClientInterceptors(List<ClientInterceptor> clientInterceptors) { + this.clientInterceptors = clientInterceptors; + } + + public boolean isAutoDiscoverClientInterceptors() { + return autoDiscoverClientInterceptors; + } + + /** + * Setting the autoDiscoverClientInterceptors mechanism, if true, the component will look for a ClientInterceptor + * instance in the registry automatically otherwise it will skip that checking. + */ + public void setAutoDiscoverClientInterceptors(boolean autoDiscoverClientInterceptors) { + this.autoDiscoverClientInterceptors = autoDiscoverClientInterceptors; + } + public void parseURI(URI uri) { setHost(uri.getHost()); diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java index a91f35c..f870425 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java @@ -186,6 +186,7 @@ public class GrpcProducer extends DefaultAsyncProducer { .flowControlWindow(configuration.getFlowControlWindow()) .userAgent(configuration.getUserAgent()) .maxInboundMessageSize(configuration.getMaxMessageSize()) + .intercept(configuration.getClientInterceptors()) .build(); } } diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcMockClientInterceptor.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcMockClientInterceptor.java new file mode 100644 index 0000000..0526242 --- /dev/null +++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcMockClientInterceptor.java @@ -0,0 +1,28 @@ +package org.apache.camel.component.grpc; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GrpcMockClientInterceptor implements ClientInterceptor { + + private static final Logger LOG = LoggerFactory.getLogger(GrpcMockClientInterceptor.class); + + @Override + public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( + MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) { + return new SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(methodDescriptor, callOptions)) { + @Override + public void start(Listener<RespT> responseListener, Metadata headers) { + LOG.info("Grpc Client Interceptor intercepted client call, forwarding now."); + super.start(responseListener, headers); + } + }; + } +} diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerClientInterceptorTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerClientInterceptorTest.java new file mode 100644 index 0000000..ed9a650 --- /dev/null +++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerClientInterceptorTest.java @@ -0,0 +1,110 @@ +package org.apache.camel.component.grpc; + +import java.io.IOException; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.stub.StreamObserver; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class GrpcProducerClientInterceptorTest extends CamelTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(GrpcProducerClientInterceptorTest.class); + + private static final int GRPC_TEST_PORT = AvailablePortFinder.getNextAvailable(); + private static final int GRPC_TEST_PING_ID = 1; + private static final String GRPC_TEST_PING_VALUE = "PING"; + private static final String GRPC_TEST_PONG_VALUE = "PONG"; + + private static Server grpcServer; + private static final GrpcMockClientInterceptor mockClientInterceptor = mock(GrpcMockClientInterceptor.class); + private static final GrpcMockClientInterceptor mockClientInterceptor2 = mock(GrpcMockClientInterceptor.class); + + @BeforeAll + public static void startGrpcServer() throws Exception { + grpcServer = ServerBuilder.forPort(GRPC_TEST_PORT).addService(new GrpcProducerClientInterceptorTest.PingPongImpl()) + .build().start(); + LOG.info("gRPC server started on port {}", GRPC_TEST_PORT); + } + + @AfterAll + public static void stopGrpcServer() throws IOException { + if (grpcServer != null) { + grpcServer.shutdown(); + LOG.info("gRPC server stoped"); + } + } + + @Test + public void testClientInterceptors() throws Exception { + when(mockClientInterceptor.interceptCall(any(), any(), any())).thenCallRealMethod(); + when(mockClientInterceptor2.interceptCall(any(), any(), any())).thenCallRealMethod(); + LOG.info("gRPC PingSyncSync method test start"); + // Testing simple sync method invoke with host and port parameters + PingRequest pingRequest + = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); + template.requestBody("direct:grpc-interceptor", pingRequest); + verify(mockClientInterceptor, times(1)).interceptCall(any(), any(), any()); + verify(mockClientInterceptor2, times(1)).interceptCall(any(), any(), any()); + } + + @Test + public void testNoAutoDiscover() throws Exception { + GrpcComponent component = context.getComponent("grpc", GrpcComponent.class); + GrpcEndpoint endpoint = (GrpcEndpoint) component.createEndpoint("grpc://localhost:" + GRPC_TEST_PORT + + "/org.apache.camel.component.grpc" + + ".PingPong?method=pingSyncSync&autoDiscoverClientInterceptors=false"); + + assertFalse(endpoint.getConfiguration().isAutoDiscoverClientInterceptors()); + assertEquals(endpoint.getConfiguration().getClientInterceptors().size(), 0); + + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + // This needs to be here because if it's any earlier the context won't be set up and if it's any later, we'll + // put it in the registry after the component is already set up. + context.getRegistry().bind("grpcMockClientInterceptor", mockClientInterceptor); + context.getRegistry().bind("grpcMockClientInterceptor2", mockClientInterceptor2); + return new RouteBuilder() { + @Override + public void configure() { + from("direct:grpc-interceptor") + .to("grpc://localhost:" + GRPC_TEST_PORT + + "/org.apache.camel.component.grpc" + + ".PingPong?method=pingSyncSync"); + } + }; + } + + /** + * Test gRPC PingPong server implementation + */ + static class PingPongImpl extends PingPongGrpc.PingPongImplBase { + @Override + public void pingSyncSync(PingRequest request, StreamObserver<PongResponse> responseObserver) { + LOG.info("gRPC server received data from PingPong service PingId={} PingName={}", request.getPingId(), + request.getPingName()); + PongResponse response = PongResponse.newBuilder().setPongName(request.getPingName() + GRPC_TEST_PONG_VALUE) + .setPongId(request.getPingId()).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + } + +} diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GrpcEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GrpcEndpointBuilderFactory.java index ea44876..7ea76b4 100644 --- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GrpcEndpointBuilderFactory.java +++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GrpcEndpointBuilderFactory.java @@ -42,6 +42,36 @@ public interface GrpcEndpointBuilderFactory { return (AdvancedGrpcEndpointConsumerBuilder) this; } /** + * Setting the autoDiscoverClientInterceptors mechanism, if true, the + * component will look for a ClientInterceptor instance in the registry + * automatically otherwise it will skip that checking. + * + * The option is a: <code>boolean</code> type. + * + * Default: true + * Group: common + */ + default GrpcEndpointConsumerBuilder autoDiscoverClientInterceptors( + boolean autoDiscoverClientInterceptors) { + doSetProperty("autoDiscoverClientInterceptors", autoDiscoverClientInterceptors); + return this; + } + /** + * Setting the autoDiscoverClientInterceptors mechanism, if true, the + * component will look for a ClientInterceptor instance in the registry + * automatically otherwise it will skip that checking. + * + * The option will be converted to a <code>boolean</code> type. + * + * Default: true + * Group: common + */ + default GrpcEndpointConsumerBuilder autoDiscoverClientInterceptors( + String autoDiscoverClientInterceptors) { + doSetProperty("autoDiscoverClientInterceptors", autoDiscoverClientInterceptors); + return this; + } + /** * The HTTP/2 flow control window size (MiB). * * The option is a: <code>int</code> type. @@ -601,6 +631,36 @@ public interface GrpcEndpointBuilderFactory { return (AdvancedGrpcEndpointProducerBuilder) this; } /** + * Setting the autoDiscoverClientInterceptors mechanism, if true, the + * component will look for a ClientInterceptor instance in the registry + * automatically otherwise it will skip that checking. + * + * The option is a: <code>boolean</code> type. + * + * Default: true + * Group: common + */ + default GrpcEndpointProducerBuilder autoDiscoverClientInterceptors( + boolean autoDiscoverClientInterceptors) { + doSetProperty("autoDiscoverClientInterceptors", autoDiscoverClientInterceptors); + return this; + } + /** + * Setting the autoDiscoverClientInterceptors mechanism, if true, the + * component will look for a ClientInterceptor instance in the registry + * automatically otherwise it will skip that checking. + * + * The option will be converted to a <code>boolean</code> type. + * + * Default: true + * Group: common + */ + default GrpcEndpointProducerBuilder autoDiscoverClientInterceptors( + String autoDiscoverClientInterceptors) { + doSetProperty("autoDiscoverClientInterceptors", autoDiscoverClientInterceptors); + return this; + } + /** * The HTTP/2 flow control window size (MiB). * * The option is a: <code>int</code> type. @@ -1024,6 +1084,36 @@ public interface GrpcEndpointBuilderFactory { return (AdvancedGrpcEndpointBuilder) this; } /** + * Setting the autoDiscoverClientInterceptors mechanism, if true, the + * component will look for a ClientInterceptor instance in the registry + * automatically otherwise it will skip that checking. + * + * The option is a: <code>boolean</code> type. + * + * Default: true + * Group: common + */ + default GrpcEndpointBuilder autoDiscoverClientInterceptors( + boolean autoDiscoverClientInterceptors) { + doSetProperty("autoDiscoverClientInterceptors", autoDiscoverClientInterceptors); + return this; + } + /** + * Setting the autoDiscoverClientInterceptors mechanism, if true, the + * component will look for a ClientInterceptor instance in the registry + * automatically otherwise it will skip that checking. + * + * The option will be converted to a <code>boolean</code> type. + * + * Default: true + * Group: common + */ + default GrpcEndpointBuilder autoDiscoverClientInterceptors( + String autoDiscoverClientInterceptors) { + doSetProperty("autoDiscoverClientInterceptors", autoDiscoverClientInterceptors); + return this; + } + /** * The HTTP/2 flow control window size (MiB). * * The option is a: <code>int</code> type. diff --git a/docs/components/modules/ROOT/pages/grpc-component.adoc b/docs/components/modules/ROOT/pages/grpc-component.adoc index 4bca6a6..f795036 100644 --- a/docs/components/modules/ROOT/pages/grpc-component.adoc +++ b/docs/components/modules/ROOT/pages/grpc-component.adoc @@ -85,12 +85,13 @@ with the following path and query parameters: |=== -=== Query Parameters (28 parameters): +=== Query Parameters (29 parameters): [width="100%",cols="2,5,^1,2",options="header"] |=== | Name | Description | Default | Type +| *autoDiscoverClientInterceptors* (common) | Setting the autoDiscoverClientInterceptors mechanism, if true, the component will look for a ClientInterceptor instance in the registry automatically otherwise it will skip that checking. | true | boolean | *flowControlWindow* (common) | The HTTP/2 flow control window size (MiB) | 1048576 | int | *maxMessageSize* (common) | The maximum message size allowed to be received/sent (MiB) | 4194304 | int | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean