This is an automated email from the ASF dual-hosted git repository. pcongiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 52e245eb588 feat(components): google-pubsub inherit headers 52e245eb588 is described below commit 52e245eb588f28076bc8c06f89e9960cb81b4d63 Author: Pasquale Congiusti <pasquale.congiu...@gmail.com> AuthorDate: Tue Sep 9 12:41:33 2025 +0200 feat(components): google-pubsub inherit headers * Add an header strategy to allow all headers which do not belong to the protocol to passthrough Camel exchange. * Deprecated the CamelGooglePubsubAttributes header, which was the way used so far to pass properties and does not apply to third party applications externally to Camel. Closes #CAMEL-22403 --- .../camel/catalog/components/google-pubsub.json | 9 +- .../pubsub/GooglePubsubEndpointConfigurer.java | 6 + .../pubsub/GooglePubsubEndpointUriFactory.java | 3 +- .../component/google/pubsub/google-pubsub.json | 9 +- .../google/pubsub/GooglePubsubConstants.java | 2 + .../google/pubsub/GooglePubsubConsumer.java | 16 ++- .../google/pubsub/GooglePubsubEndpoint.java | 20 +++- .../pubsub/GooglePubsubHeaderFilterStrategy.java | 48 ++++++++ .../google/pubsub/GooglePubsubProducer.java | 18 +++ .../pubsub/consumer/CamelMessageReceiver.java | 16 +++ .../SingleExchangeRoundAllHeadersIT.java | 122 +++++++++++++++++++++ .../dsl/GooglePubsubEndpointBuilderFactory.java | 103 +++++++++++++++++ 12 files changed, 358 insertions(+), 14 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json index 37878236d36..07618c59ff5 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json @@ -39,7 +39,7 @@ "CamelGooglePubsubMessageId": { "index": 0, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The ID of the message, assigned by the server when the message is published.", "constantName": "org.apache.camel.component.google.pubsub.GooglePubsubConstants#MESSAGE_ID" }, "CamelGooglePubsubMsgAckId": { "index": 1, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The ID used to acknowledge the received message.", "constantName": "org.apache.camel.component.google.pubsub.GooglePubsubConstants#ACK_ID" }, "CamelGooglePubsubPublishTime": { "index": 2, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "com.google.protobuf.Timestamp", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The time at which the message was published", "constantName": "org.apache.camel.component.google.pubsub.GooglePubsubConstants#PUBLISH_TIME" }, - "CamelGooglePubsubAttributes": { "index": 3, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "Map<String, String>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The attributes of the message.", "constantName": "org.apache.camel.component.google.pubsub.GooglePubsubConstants#ATTRIBUTES" }, + "CamelGooglePubsubAttributes": { "index": 3, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "Map<String, String>", "deprecated": true, "deprecationNote": "", "autowired": false, "secret": false, "description": "The attributes of the message.", "constantName": "org.apache.camel.component.google.pubsub.GooglePubsubConstants#ATTRIBUTES" }, "CamelGooglePubsubOrderingKey": { "index": 4, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "If non-empty, identifies related messages for which publish order should be respected.", "constantName": "org.apache.camel.component.google.pubsub.GooglePubsubConstants#ORDERING_KEY" }, "CamelGooglePubsubAcknowledge": { "index": 5, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Can be used to manually acknowledge or negative-acknowledge a message when ackMode=NONE.", "constantName": "org.apache.camel.component.google.pubsub.GooglePub [...] }, @@ -59,8 +59,9 @@ "pubsubEndpoint": { "index": 12, "kind": "parameter", "displayName": "Pubsub Endpoint", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Pub\/Sub endpoint to use. Required when using message ordering, and ensures that messages are received in order even when multiple publishers are used" }, "retry": { "index": 13, "kind": "parameter", "displayName": "Retry", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "object", "javaType": "com.google.api.gax.retrying.RetrySettings", "deprecated": false, "autowired": false, "secret": false, "description": "A custom RetrySettings to control how the publisher handles retry-able failures" }, "serializer": { "index": 14, "kind": "parameter", "displayName": "Serializer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer", "deprecated": false, "deprecationNote": "", "autowired": true, "secret": false, "description": "A custom GooglePubsubSerializer to use for serializing message payloads in the producer" }, - "loggerId": { "index": 15, "kind": "parameter", "displayName": "Logger Id", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": true, "autowired": false, "secret": false, "description": "To use a custom logger name" }, - "authenticate": { "index": 16, "kind": "parameter", "displayName": "Authenticate", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Use Credentials when interacting with PubSub service (no authentication is required when using emulator)." }, - "serviceAccountKey": { "index": 17, "kind": "parameter", "displayName": "Service Account Key", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The Service account key that can be used as credentials for the PubSub publisher\/subscriber. It can be loaded by default from classpath, but you can prefix with classpath:, file:, or http: to load the resour [...] + "includeAllGoogleProperties": { "index": 15, "kind": "parameter", "displayName": "Include All Google Properties", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to include all Google headers when mapping from Pubsub to Camel Message. Setting this to true will include properties such as x-goog etc." }, + "loggerId": { "index": 16, "kind": "parameter", "displayName": "Logger Id", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": true, "autowired": false, "secret": false, "description": "To use a custom logger name" }, + "authenticate": { "index": 17, "kind": "parameter", "displayName": "Authenticate", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Use Credentials when interacting with PubSub service (no authentication is required when using emulator)." }, + "serviceAccountKey": { "index": 18, "kind": "parameter", "displayName": "Service Account Key", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The Service account key that can be used as credentials for the PubSub publisher\/subscriber. It can be loaded by default from classpath, but you can prefix with classpath:, file:, or http: to load the resour [...] } } diff --git a/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java b/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java index c6b2217cbe7..c9b1c82003b 100644 --- a/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java +++ b/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java @@ -34,6 +34,8 @@ public class GooglePubsubEndpointConfigurer extends PropertyConfigurerSupport im case "exceptionHandler": target.setExceptionHandler(property(camelContext, org.apache.camel.spi.ExceptionHandler.class, value)); return true; case "exchangepattern": case "exchangePattern": target.setExchangePattern(property(camelContext, org.apache.camel.ExchangePattern.class, value)); return true; + case "includeallgoogleproperties": + case "includeAllGoogleProperties": target.setIncludeAllGoogleProperties(property(camelContext, boolean.class, value)); return true; case "lazystartproducer": case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true; case "loggerid": @@ -75,6 +77,8 @@ public class GooglePubsubEndpointConfigurer extends PropertyConfigurerSupport im case "exceptionHandler": return org.apache.camel.spi.ExceptionHandler.class; case "exchangepattern": case "exchangePattern": return org.apache.camel.ExchangePattern.class; + case "includeallgoogleproperties": + case "includeAllGoogleProperties": return boolean.class; case "lazystartproducer": case "lazyStartProducer": return boolean.class; case "loggerid": @@ -112,6 +116,8 @@ public class GooglePubsubEndpointConfigurer extends PropertyConfigurerSupport im case "exceptionHandler": return target.getExceptionHandler(); case "exchangepattern": case "exchangePattern": return target.getExchangePattern(); + case "includeallgoogleproperties": + case "includeAllGoogleProperties": return target.isIncludeAllGoogleProperties(); case "lazystartproducer": case "lazyStartProducer": return target.isLazyStartProducer(); case "loggerid": diff --git a/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointUriFactory.java b/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointUriFactory.java index 996447b78ee..2621f51a5b9 100644 --- a/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointUriFactory.java +++ b/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointUriFactory.java @@ -23,7 +23,7 @@ public class GooglePubsubEndpointUriFactory extends org.apache.camel.support.com private static final Set<String> SECRET_PROPERTY_NAMES; private static final Set<String> MULTI_VALUE_PREFIXES; static { - Set<String> props = new HashSet<>(18); + Set<String> props = new HashSet<>(19); props.add("ackMode"); props.add("authenticate"); props.add("bridgeErrorHandler"); @@ -31,6 +31,7 @@ public class GooglePubsubEndpointUriFactory extends org.apache.camel.support.com props.add("destinationName"); props.add("exceptionHandler"); props.add("exchangePattern"); + props.add("includeAllGoogleProperties"); props.add("lazyStartProducer"); props.add("loggerId"); props.add("maxAckExtensionPeriod"); diff --git a/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json b/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json index 37878236d36..07618c59ff5 100644 --- a/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json +++ b/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json @@ -39,7 +39,7 @@ "CamelGooglePubsubMessageId": { "index": 0, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The ID of the message, assigned by the server when the message is published.", "constantName": "org.apache.camel.component.google.pubsub.GooglePubsubConstants#MESSAGE_ID" }, "CamelGooglePubsubMsgAckId": { "index": 1, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The ID used to acknowledge the received message.", "constantName": "org.apache.camel.component.google.pubsub.GooglePubsubConstants#ACK_ID" }, "CamelGooglePubsubPublishTime": { "index": 2, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "com.google.protobuf.Timestamp", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The time at which the message was published", "constantName": "org.apache.camel.component.google.pubsub.GooglePubsubConstants#PUBLISH_TIME" }, - "CamelGooglePubsubAttributes": { "index": 3, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "Map<String, String>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The attributes of the message.", "constantName": "org.apache.camel.component.google.pubsub.GooglePubsubConstants#ATTRIBUTES" }, + "CamelGooglePubsubAttributes": { "index": 3, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "Map<String, String>", "deprecated": true, "deprecationNote": "", "autowired": false, "secret": false, "description": "The attributes of the message.", "constantName": "org.apache.camel.component.google.pubsub.GooglePubsubConstants#ATTRIBUTES" }, "CamelGooglePubsubOrderingKey": { "index": 4, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "If non-empty, identifies related messages for which publish order should be respected.", "constantName": "org.apache.camel.component.google.pubsub.GooglePubsubConstants#ORDERING_KEY" }, "CamelGooglePubsubAcknowledge": { "index": 5, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Can be used to manually acknowledge or negative-acknowledge a message when ackMode=NONE.", "constantName": "org.apache.camel.component.google.pubsub.GooglePub [...] }, @@ -59,8 +59,9 @@ "pubsubEndpoint": { "index": 12, "kind": "parameter", "displayName": "Pubsub Endpoint", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Pub\/Sub endpoint to use. Required when using message ordering, and ensures that messages are received in order even when multiple publishers are used" }, "retry": { "index": 13, "kind": "parameter", "displayName": "Retry", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "object", "javaType": "com.google.api.gax.retrying.RetrySettings", "deprecated": false, "autowired": false, "secret": false, "description": "A custom RetrySettings to control how the publisher handles retry-able failures" }, "serializer": { "index": 14, "kind": "parameter", "displayName": "Serializer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer", "deprecated": false, "deprecationNote": "", "autowired": true, "secret": false, "description": "A custom GooglePubsubSerializer to use for serializing message payloads in the producer" }, - "loggerId": { "index": 15, "kind": "parameter", "displayName": "Logger Id", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": true, "autowired": false, "secret": false, "description": "To use a custom logger name" }, - "authenticate": { "index": 16, "kind": "parameter", "displayName": "Authenticate", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Use Credentials when interacting with PubSub service (no authentication is required when using emulator)." }, - "serviceAccountKey": { "index": 17, "kind": "parameter", "displayName": "Service Account Key", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The Service account key that can be used as credentials for the PubSub publisher\/subscriber. It can be loaded by default from classpath, but you can prefix with classpath:, file:, or http: to load the resour [...] + "includeAllGoogleProperties": { "index": 15, "kind": "parameter", "displayName": "Include All Google Properties", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to include all Google headers when mapping from Pubsub to Camel Message. Setting this to true will include properties such as x-goog etc." }, + "loggerId": { "index": 16, "kind": "parameter", "displayName": "Logger Id", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": true, "autowired": false, "secret": false, "description": "To use a custom logger name" }, + "authenticate": { "index": 17, "kind": "parameter", "displayName": "Authenticate", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Use Credentials when interacting with PubSub service (no authentication is required when using emulator)." }, + "serviceAccountKey": { "index": 18, "kind": "parameter", "displayName": "Service Account Key", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The Service account key that can be used as credentials for the PubSub publisher\/subscriber. It can be loaded by default from classpath, but you can prefix with classpath:, file:, or http: to load the resour [...] } } diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java index efe6974356e..d5eb0937a23 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java @@ -27,6 +27,7 @@ public final class GooglePubsubConstants { @Metadata(label = "consumer", description = "The time at which the message was published", javaType = "com.google.protobuf.Timestamp") public static final String PUBLISH_TIME = "CamelGooglePubsubPublishTime"; + @Deprecated(since = "4.15") @Metadata(description = "The attributes of the message.", javaType = "Map<String, String>") public static final String ATTRIBUTES = "CamelGooglePubsubAttributes"; @Metadata(label = "producer", @@ -38,6 +39,7 @@ public final class GooglePubsubConstants { "message when ackMode=NONE.", javaType = "org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge") public static final String GOOGLE_PUBSUB_ACKNOWLEDGE = "CamelGooglePubsubAcknowledge"; + @Deprecated(since = "4.15") public static final String RESERVED_GOOGLE_CLIENT_ATTRIBUTE_PREFIX = "goog"; public enum AckMode { diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java index bd40d3e9bea..7c2a4afdbcf 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java @@ -44,6 +44,7 @@ import org.apache.camel.component.google.pubsub.consumer.AcknowledgeCompletion; import org.apache.camel.component.google.pubsub.consumer.AcknowledgeSync; import org.apache.camel.component.google.pubsub.consumer.CamelMessageReceiver; import org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge; +import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.support.DefaultConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +58,7 @@ public class GooglePubsubConsumer extends DefaultConsumer { private ExecutorService executor; private final List<Subscriber> subscribers; private final Set<ApiFuture<PullResponse>> pendingSynchronousPullResponses; + private final HeaderFilterStrategy headerFilterStrategy; GooglePubsubConsumer(GooglePubsubEndpoint endpoint, Processor processor) { super(endpoint, processor); @@ -71,6 +73,7 @@ public class GooglePubsubConsumer extends DefaultConsumer { } localLog = LoggerFactory.getLogger(loggerId); + headerFilterStrategy = new GooglePubsubHeaderFilterStrategy(endpoint.isIncludeAllGoogleProperties()); } @Override @@ -197,9 +200,11 @@ public class GooglePubsubConsumer extends DefaultConsumer { Exchange exchange = createExchange(true); exchange.getIn().setBody(pubsubMessage.getData().toByteArray()); + // Standard headers exchange.getIn().setHeader(GooglePubsubConstants.ACK_ID, message.getAckId()); exchange.getIn().setHeader(GooglePubsubConstants.MESSAGE_ID, pubsubMessage.getMessageId()); exchange.getIn().setHeader(GooglePubsubConstants.PUBLISH_TIME, pubsubMessage.getPublishTime()); + // Deprecated: replaced by headerFilterStrategy exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES, pubsubMessage.getAttributesMap()); //existing subscriber can not be propagated, because it will be closed at the end of this block @@ -207,13 +212,22 @@ public class GooglePubsubConsumer extends DefaultConsumer { // (see https://issues.apache.org/jira/browse/CAMEL-18447) GooglePubsubAcknowledge acknowledge = new AcknowledgeSync( () -> endpoint.getComponent().getSubscriberStub(endpoint), subscriptionName); - if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) { exchange.getExchangeExtension().addOnCompletion(new AcknowledgeCompletion(acknowledge)); } else { exchange.getIn().setHeader(GooglePubsubConstants.GOOGLE_PUBSUB_ACKNOWLEDGE, acknowledge); } + // Inherit the rest of headers + for (String pubSubHeader : pubsubMessage.getAttributesMap().keySet()) { + String value = pubsubMessage.getAttributesMap().get(pubSubHeader); + if (headerFilterStrategy != null + && headerFilterStrategy.applyFilterToExternalHeaders(pubSubHeader, value, exchange)) { + continue; + } + exchange.getIn().setHeader(pubSubHeader, value); + } + try { processor.process(exchange); } catch (Exception e) { diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java index 62a5d3ae0b3..48f15df8a83 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java @@ -94,6 +94,10 @@ public class GooglePubsubEndpoint extends DefaultEndpoint implements EndpointSer @UriParam(label = "producer,advanced", description = "A custom RetrySettings to control how the publisher handles retry-able failures") private RetrySettings retry; + @UriParam(label = "advanced", + description = "Whether to include all Google headers when mapping from Pubsub to Camel Message." + + " Setting this to true will include properties such as x-goog etc.") + private boolean includeAllGoogleProperties; public GooglePubsubEndpoint(String uri, Component component) { super(uri, component); @@ -203,12 +207,12 @@ public class GooglePubsubEndpoint extends DefaultEndpoint implements EndpointSer this.maxMessagesPerPoll = maxMessagesPerPoll; } - public boolean isSynchronousPull() { - return synchronousPull; + public boolean isIncludeAllGoogleProperties() { + return includeAllGoogleProperties; } - public void setSynchronousPull(Boolean synchronousPull) { - this.synchronousPull = synchronousPull; + public void setIncludeAllGoogleProperties(Boolean includeAllGoogleProperties) { + this.includeAllGoogleProperties = includeAllGoogleProperties; } public GooglePubsubConstants.AckMode getAckMode() { @@ -251,6 +255,14 @@ public class GooglePubsubEndpoint extends DefaultEndpoint implements EndpointSer this.retry = retry; } + public boolean isSynchronousPull() { + return synchronousPull; + } + + public void setSynchronousPull(Boolean synchronousPull) { + this.synchronousPull = synchronousPull; + } + public String getPubsubEndpoint() { return this.pubsubEndpoint; } diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubHeaderFilterStrategy.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubHeaderFilterStrategy.java new file mode 100644 index 00000000000..7232a5c2c6d --- /dev/null +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubHeaderFilterStrategy.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.pubsub; + +import org.apache.camel.support.DefaultHeaderFilterStrategy; + +public class GooglePubsubHeaderFilterStrategy extends DefaultHeaderFilterStrategy { + + public GooglePubsubHeaderFilterStrategy() { + this(false); + } + + public GooglePubsubHeaderFilterStrategy(boolean includeAllGoogleProperties) { + setOutFilterStartsWith(DefaultHeaderFilterStrategy.CAMEL_FILTER_STARTS_WITH); + setInFilterStartsWith(DefaultHeaderFilterStrategy.CAMEL_FILTER_STARTS_WITH); + getOutFilter().add("authorization"); + if (!includeAllGoogleProperties) { + ignoreGoogProperties(); + } + } + + protected void ignoreGoogProperties() { + String[] filterStartWith = new String[DefaultHeaderFilterStrategy.CAMEL_FILTER_STARTS_WITH.length + 2]; + System.arraycopy(DefaultHeaderFilterStrategy.CAMEL_FILTER_STARTS_WITH, 0, + filterStartWith, 0, DefaultHeaderFilterStrategy.CAMEL_FILTER_STARTS_WITH.length); + filterStartWith[DefaultHeaderFilterStrategy.CAMEL_FILTER_STARTS_WITH.length] = "x-goog"; + filterStartWith[DefaultHeaderFilterStrategy.CAMEL_FILTER_STARTS_WITH.length + 1] = "X-GOOG"; + setOutFilterStartsWith(filterStartWith); + setInFilterStartsWith(filterStartWith); + getOutFilter().add("google-cloud-resource-prefix"); + getOutFilter().add("grpc-timeout"); + } + +} diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubProducer.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubProducer.java index 22b50890773..9c2bc1f7b66 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubProducer.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubProducer.java @@ -25,6 +25,7 @@ import com.google.common.base.Strings; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; import org.apache.camel.Exchange; +import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.support.DefaultProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,7 @@ import static org.apache.camel.component.google.pubsub.GooglePubsubConstants.RES public class GooglePubsubProducer extends DefaultProducer { public Logger logger; + private final HeaderFilterStrategy headerFilterStrategy; public GooglePubsubProducer(GooglePubsubEndpoint endpoint) { super(endpoint); @@ -50,6 +52,7 @@ public class GooglePubsubProducer extends DefaultProducer { } logger = LoggerFactory.getLogger(loggerId); + headerFilterStrategy = new GooglePubsubHeaderFilterStrategy(endpoint.isIncludeAllGoogleProperties()); } /** @@ -101,6 +104,9 @@ public class GooglePubsubProducer extends DefaultProducer { } PubsubMessage.Builder messageBuilder = PubsubMessage.newBuilder().setData(byteString); + + // Deprecated: start + // Replaced by headerFilterStrategy Map<String, String> attributes = exchange.getIn().getHeader(ATTRIBUTES, Map.class); if (attributes != null) { for (Map.Entry<String, String> attribute : attributes.entrySet()) { @@ -109,11 +115,23 @@ public class GooglePubsubProducer extends DefaultProducer { } } } + // Deprecated: end + String orderingKey = exchange.getIn().getHeader(ORDERING_KEY, String.class); if (orderingKey != null) { messageBuilder.setOrderingKey(orderingKey); } + // Inherit the rest of headers + for (String camelHeader : exchange.getIn().getHeaders().keySet()) { + String value = exchange.getIn().getHeader(camelHeader, String.class); + if (headerFilterStrategy != null + && headerFilterStrategy.applyFilterToExternalHeaders(camelHeader, value, exchange)) { + continue; + } + messageBuilder.putAttributes(camelHeader, value); + } + PubsubMessage message = messageBuilder.build(); ApiFuture<String> messageIdFuture = publisher.publish(message); diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java index 8a123df7236..4169c2630f3 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java @@ -25,6 +25,8 @@ import org.apache.camel.Processor; import org.apache.camel.component.google.pubsub.GooglePubsubConstants; import org.apache.camel.component.google.pubsub.GooglePubsubConsumer; import org.apache.camel.component.google.pubsub.GooglePubsubEndpoint; +import org.apache.camel.component.google.pubsub.GooglePubsubHeaderFilterStrategy; +import org.apache.camel.spi.HeaderFilterStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +36,7 @@ public class CamelMessageReceiver implements MessageReceiver { private final GooglePubsubConsumer consumer; private final GooglePubsubEndpoint endpoint; private final Processor processor; + private final HeaderFilterStrategy headerFilterStrategy; public CamelMessageReceiver(GooglePubsubConsumer consumer, GooglePubsubEndpoint endpoint, Processor processor) { this.consumer = consumer; @@ -44,6 +47,7 @@ public class CamelMessageReceiver implements MessageReceiver { loggerId = this.getClass().getName(); } localLog = LoggerFactory.getLogger(loggerId); + headerFilterStrategy = new GooglePubsubHeaderFilterStrategy(endpoint.isIncludeAllGoogleProperties()); } @Override @@ -55,8 +59,10 @@ public class CamelMessageReceiver implements MessageReceiver { Exchange exchange = consumer.createExchange(true); exchange.getIn().setBody(pubsubMessage.getData().toByteArray()); + // Standard headers exchange.getIn().setHeader(GooglePubsubConstants.MESSAGE_ID, pubsubMessage.getMessageId()); exchange.getIn().setHeader(GooglePubsubConstants.PUBLISH_TIME, pubsubMessage.getPublishTime()); + // Deprecated: replaced by headerFilterStrategy exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES, pubsubMessage.getAttributesMap()); GooglePubsubAcknowledge acknowledge = new AcknowledgeAsync(ackReplyConsumer); @@ -66,6 +72,16 @@ public class CamelMessageReceiver implements MessageReceiver { exchange.getIn().setHeader(GooglePubsubConstants.GOOGLE_PUBSUB_ACKNOWLEDGE, acknowledge); } + // Inherit the rest of headers + for (String pubSubHeader : pubsubMessage.getAttributesMap().keySet()) { + String value = pubsubMessage.getAttributesMap().get(pubSubHeader); + if (headerFilterStrategy != null + && headerFilterStrategy.applyFilterToExternalHeaders(pubSubHeader, value, exchange)) { + continue; + } + exchange.getIn().setHeader(pubSubHeader, value); + } + try { processor.process(exchange); } catch (Exception e) { diff --git a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/SingleExchangeRoundAllHeadersIT.java b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/SingleExchangeRoundAllHeadersIT.java new file mode 100644 index 00000000000..e401bdc3efe --- /dev/null +++ b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/SingleExchangeRoundAllHeadersIT.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.pubsub.integration; + +import java.util.List; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.google.pubsub.GooglePubsubConstants; +import org.apache.camel.component.google.pubsub.PubsubTestSupport; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.support.DefaultExchange; +import org.junit.jupiter.api.Test; + +import static org.junit.Assert.assertNull; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class SingleExchangeRoundAllHeadersIT extends PubsubTestSupport { + + private static final String TOPIC_NAME = "singleSend"; + private static final String SUBSCRIPTION_NAME = "singleReceive"; + + @EndpointInject("direct:from") + private Endpoint directIn; + + @EndpointInject("google-pubsub:{{project.id}}:" + TOPIC_NAME) + private Endpoint pubsubTopic; + + @EndpointInject("mock:sendResult") + private MockEndpoint sendResult; + + @EndpointInject("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME + "?synchronousPull=true") + private Endpoint pubsubSubscription; + + @EndpointInject("mock:receiveResult") + private MockEndpoint receiveResult; + + @Produce("direct:from") + private ProducerTemplate producer; + + @Override + public void createTopicSubscription() { + createTopicSubscriptionPair(TOPIC_NAME, SUBSCRIPTION_NAME); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from(directIn).routeId("Single_Send").to(pubsubTopic).to(sendResult); + + from(pubsubSubscription).routeId("Single_Receive").to("direct:one"); + + from("direct:one").to(receiveResult); + } + }; + } + + @Test + public void testIncludeHeaders() throws Exception { + + Exchange exchange = new DefaultExchange(context); + + String attributeKey = "ATTRIBUTE-TEST-KEY"; + String attributeValue = "ATTRIBUTE-TEST-VALUE"; + String hiddenAttributeKey = "x-goog-attr"; + String hiddenAttributeValue = "ATTRIBUTE-HIDDEN-VALUE"; + + exchange.getIn().setBody("Single : " + exchange.getExchangeId()); + exchange.getIn().setHeader(attributeKey, attributeValue); + exchange.getIn().setHeader(hiddenAttributeKey, hiddenAttributeValue); + + receiveResult.expectedMessageCount(1); + receiveResult.expectedBodiesReceivedInAnyOrder(exchange.getIn().getBody()); + + producer.send(exchange); + + List<Exchange> sentExchanges = sendResult.getExchanges(); + assertEquals(1, sentExchanges.size(), "Sent exchanges"); + + Exchange sentExchange = sentExchanges.get(0); + + assertEquals(exchange.getIn().getHeader(GooglePubsubConstants.MESSAGE_ID), + sentExchange.getIn().getHeader(GooglePubsubConstants.MESSAGE_ID), "Sent ID"); + + receiveResult.assertIsSatisfied(5000); + + List<Exchange> receivedExchanges = receiveResult.getExchanges(); + + assertNotNull(receivedExchanges, "Received exchanges"); + + Exchange receivedExchange = receivedExchanges.get(0); + + assertNotNull(receivedExchange.getIn().getHeader(GooglePubsubConstants.MESSAGE_ID), "PUBSUB Message ID Property"); + assertNotNull(receivedExchange.getIn().getHeader(GooglePubsubConstants.PUBLISH_TIME), "PUBSUB Published Time"); + + assertEquals(attributeValue, receivedExchange.getIn().getHeader(attributeKey)); + assertNull(receivedExchange.getIn().getHeader(hiddenAttributeKey)); + + assertEquals(sentExchange.getIn().getHeader(GooglePubsubConstants.MESSAGE_ID), + receivedExchange.getIn().getHeader(GooglePubsubConstants.MESSAGE_ID)); + } +} diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java index f72cd982651..0cad250eaca 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java @@ -380,6 +380,40 @@ public interface GooglePubsubEndpointBuilderFactory { doSetProperty("synchronousPull", synchronousPull); return this; } + /** + * Whether to include all Google headers when mapping from Pubsub to + * Camel Message. Setting this to true will include properties such as + * x-goog etc. + * + * The option is a: <code>boolean</code> type. + * + * Default: false + * Group: advanced + * + * @param includeAllGoogleProperties the value to set + * @return the dsl builder + */ + default AdvancedGooglePubsubEndpointConsumerBuilder includeAllGoogleProperties(boolean includeAllGoogleProperties) { + doSetProperty("includeAllGoogleProperties", includeAllGoogleProperties); + return this; + } + /** + * Whether to include all Google headers when mapping from Pubsub to + * Camel Message. Setting this to true will include properties such as + * x-goog etc. + * + * The option will be converted to a <code>boolean</code> type. + * + * Default: false + * Group: advanced + * + * @param includeAllGoogleProperties the value to set + * @return the dsl builder + */ + default AdvancedGooglePubsubEndpointConsumerBuilder includeAllGoogleProperties(String includeAllGoogleProperties) { + doSetProperty("includeAllGoogleProperties", includeAllGoogleProperties); + return this; + } /** * To use a custom logger name. * @@ -622,6 +656,40 @@ public interface GooglePubsubEndpointBuilderFactory { doSetProperty("serializer", serializer); return this; } + /** + * Whether to include all Google headers when mapping from Pubsub to + * Camel Message. Setting this to true will include properties such as + * x-goog etc. + * + * The option is a: <code>boolean</code> type. + * + * Default: false + * Group: advanced + * + * @param includeAllGoogleProperties the value to set + * @return the dsl builder + */ + default AdvancedGooglePubsubEndpointProducerBuilder includeAllGoogleProperties(boolean includeAllGoogleProperties) { + doSetProperty("includeAllGoogleProperties", includeAllGoogleProperties); + return this; + } + /** + * Whether to include all Google headers when mapping from Pubsub to + * Camel Message. Setting this to true will include properties such as + * x-goog etc. + * + * The option will be converted to a <code>boolean</code> type. + * + * Default: false + * Group: advanced + * + * @param includeAllGoogleProperties the value to set + * @return the dsl builder + */ + default AdvancedGooglePubsubEndpointProducerBuilder includeAllGoogleProperties(String includeAllGoogleProperties) { + doSetProperty("includeAllGoogleProperties", includeAllGoogleProperties); + return this; + } /** * To use a custom logger name. * @@ -712,6 +780,40 @@ public interface GooglePubsubEndpointBuilderFactory { return (GooglePubsubEndpointBuilder) this; } + /** + * Whether to include all Google headers when mapping from Pubsub to + * Camel Message. Setting this to true will include properties such as + * x-goog etc. + * + * The option is a: <code>boolean</code> type. + * + * Default: false + * Group: advanced + * + * @param includeAllGoogleProperties the value to set + * @return the dsl builder + */ + default AdvancedGooglePubsubEndpointBuilder includeAllGoogleProperties(boolean includeAllGoogleProperties) { + doSetProperty("includeAllGoogleProperties", includeAllGoogleProperties); + return this; + } + /** + * Whether to include all Google headers when mapping from Pubsub to + * Camel Message. Setting this to true will include properties such as + * x-goog etc. + * + * The option will be converted to a <code>boolean</code> type. + * + * Default: false + * Group: advanced + * + * @param includeAllGoogleProperties the value to set + * @return the dsl builder + */ + default AdvancedGooglePubsubEndpointBuilder includeAllGoogleProperties(String includeAllGoogleProperties) { + doSetProperty("includeAllGoogleProperties", includeAllGoogleProperties); + return this; + } /** * To use a custom logger name. * @@ -852,6 +954,7 @@ public interface GooglePubsubEndpointBuilderFactory { * * @return the name of the header {@code GooglePubsubAttributes}. */ + @Deprecated public String googlePubsubAttributes() { return "CamelGooglePubsubAttributes"; }