This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 12451a0 CAMEL-15861: Add the capability to provide a custom serializer to GooglePubSubProducer (#4619) 12451a0 is described below commit 12451a0c35a1595249768891f44524e10f66d408 Author: James Netherton <jamesnether...@users.noreply.github.com> AuthorDate: Tue Nov 17 20:21:54 2020 +0000 CAMEL-15861: Add the capability to provide a custom serializer to GooglePubSubProducer (#4619) --- .../camel/catalog/components/google-pubsub.json | 1 + .../catalog/docs/google-pubsub-component.adoc | 3 +- .../pubsub/GooglePubsubEndpointConfigurer.java | 8 ++ .../pubsub/GooglePubsubEndpointUriFactory.java | 11 +-- .../component/google/pubsub/google-pubsub.json | 1 + .../src/main/docs/google-pubsub-component.adoc | 3 +- .../google/pubsub/GooglePubsubEndpoint.java | 19 +++++ .../google/pubsub/GooglePubsubProducer.java | 12 +-- .../serializer/DefaultGooglePubsubSerializer.java | 35 +++++++++ .../pubsub/serializer/GooglePubsubSerializer.java | 34 ++++++++ .../pubsub/integration/CustomSerializerTest.java | 90 ++++++++++++++++++++++ .../dsl/GooglePubsubEndpointBuilderFactory.java | 28 +++++++ .../ROOT/pages/google-pubsub-component.adoc | 3 +- 13 files changed, 229 insertions(+), 19 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json index 8bdf67a..38a8c46 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json @@ -42,6 +42,7 @@ "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the con [...] "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during sta [...] + "serializer": { "kind": "parameter", "displayName": "Serializer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer", "deprecated": false, "deprecationNote": "", "autowired": true, "secret": false, "description": "A custom GooglePubsubSerializer to use for serializing message payloads in the producer" }, "synchronous": { "kind": "parameter", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported)." } } } diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/google-pubsub-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/google-pubsub-component.adoc index 1fc2426..016f413 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/google-pubsub-component.adoc +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/google-pubsub-component.adoc @@ -81,7 +81,7 @@ with the following path and query parameters: |=== -=== Query Parameters (10 parameters): +=== Query Parameters (11 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -96,6 +96,7 @@ with the following path and query parameters: | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. There are 3 enums and the value can be one of: InOnly, InOut, InOptionalOut | | ExchangePattern | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...] +| *serializer* (producer) | *Autowired* A custom GooglePubsubSerializer to use for serializing message payloads in the producer | | GooglePubsubSerializer | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean |=== // endpoint options: END diff --git a/components/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java b/components/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java index aa0d3aa..ab8f6da 100644 --- a/components/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java +++ b/components/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java @@ -37,6 +37,7 @@ public class GooglePubsubEndpointConfigurer extends PropertyConfigurerSupport im case "loggerId": target.setLoggerId(property(camelContext, java.lang.String.class, value)); return true; case "maxmessagesperpoll": case "maxMessagesPerPoll": target.setMaxMessagesPerPoll(property(camelContext, java.lang.Integer.class, value)); return true; + case "serializer": target.setSerializer(property(camelContext, org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer.class, value)); return true; case "synchronous": target.setSynchronous(property(camelContext, boolean.class, value)); return true; case "synchronouspull": case "synchronousPull": target.setSynchronousPull(property(camelContext, boolean.class, value)); return true; @@ -45,6 +46,11 @@ public class GooglePubsubEndpointConfigurer extends PropertyConfigurerSupport im } @Override + public String[] getAutowiredNames() { + return new String[]{"serializer"}; + } + + @Override public Class<?> getOptionType(String name, boolean ignoreCase) { switch (ignoreCase ? name.toLowerCase() : name) { case "ackmode": @@ -63,6 +69,7 @@ public class GooglePubsubEndpointConfigurer extends PropertyConfigurerSupport im case "loggerId": return java.lang.String.class; case "maxmessagesperpoll": case "maxMessagesPerPoll": return java.lang.Integer.class; + case "serializer": return org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer.class; case "synchronous": return boolean.class; case "synchronouspull": case "synchronousPull": return boolean.class; @@ -90,6 +97,7 @@ public class GooglePubsubEndpointConfigurer extends PropertyConfigurerSupport im case "loggerId": return target.getLoggerId(); case "maxmessagesperpoll": case "maxMessagesPerPoll": return target.getMaxMessagesPerPoll(); + case "serializer": return target.getSerializer(); case "synchronous": return target.isSynchronous(); case "synchronouspull": case "synchronousPull": return target.isSynchronousPull(); diff --git a/components/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointUriFactory.java b/components/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointUriFactory.java index bec66cb..07bf77a 100644 --- a/components/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointUriFactory.java +++ b/components/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointUriFactory.java @@ -20,18 +20,19 @@ public class GooglePubsubEndpointUriFactory extends org.apache.camel.support.com private static final Set<String> PROPERTY_NAMES; private static final Set<String> SECRET_PROPERTY_NAMES; static { - Set<String> props = new HashSet<>(12); + Set<String> props = new HashSet<>(13); + props.add("synchronous"); + props.add("exchangePattern"); + props.add("serializer"); + props.add("synchronousPull"); + props.add("concurrentConsumers"); props.add("lazyStartProducer"); props.add("bridgeErrorHandler"); - props.add("synchronous"); props.add("destinationName"); props.add("ackMode"); - props.add("exchangePattern"); props.add("maxMessagesPerPoll"); - props.add("synchronousPull"); props.add("loggerId"); props.add("projectId"); - props.add("concurrentConsumers"); props.add("exceptionHandler"); PROPERTY_NAMES = Collections.unmodifiableSet(props); SECRET_PROPERTY_NAMES = Collections.emptySet(); diff --git a/components/camel-google-pubsub/src/generated/resources/org/apache/camel/component/google/pubsub/google-pubsub.json b/components/camel-google-pubsub/src/generated/resources/org/apache/camel/component/google/pubsub/google-pubsub.json index 8bdf67a..38a8c46 100644 --- a/components/camel-google-pubsub/src/generated/resources/org/apache/camel/component/google/pubsub/google-pubsub.json +++ b/components/camel-google-pubsub/src/generated/resources/org/apache/camel/component/google/pubsub/google-pubsub.json @@ -42,6 +42,7 @@ "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the con [...] "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during sta [...] + "serializer": { "kind": "parameter", "displayName": "Serializer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer", "deprecated": false, "deprecationNote": "", "autowired": true, "secret": false, "description": "A custom GooglePubsubSerializer to use for serializing message payloads in the producer" }, "synchronous": { "kind": "parameter", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported)." } } } diff --git a/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc b/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc index 1fc2426..016f413 100644 --- a/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc +++ b/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc @@ -81,7 +81,7 @@ with the following path and query parameters: |=== -=== Query Parameters (10 parameters): +=== Query Parameters (11 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -96,6 +96,7 @@ with the following path and query parameters: | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. There are 3 enums and the value can be one of: InOnly, InOut, InOptionalOut | | ExchangePattern | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...] +| *serializer* (producer) | *Autowired* A custom GooglePubsubSerializer to use for serializing message payloads in the producer | | GooglePubsubSerializer | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean |=== // endpoint options: END diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java index b70f818..5951673 100644 --- a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java +++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java @@ -24,6 +24,8 @@ import org.apache.camel.Consumer; import org.apache.camel.ExchangePattern; import org.apache.camel.Processor; import org.apache.camel.Producer; +import org.apache.camel.component.google.pubsub.serializer.DefaultGooglePubsubSerializer; +import org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; @@ -70,6 +72,12 @@ public class GooglePubsubEndpoint extends DefaultEndpoint { description = "AUTO = exchange gets ack'ed/nack'ed on completion. NONE = downstream process has to ack/nack explicitly") private GooglePubsubConstants.AckMode ackMode = GooglePubsubConstants.AckMode.AUTO; + @UriParam(name = "serializer", + description = "A custom GooglePubsubSerializer to use for serializing message payloads in the producer", + label = "producer,advanced") + @Metadata(autowired = true) + private GooglePubsubSerializer serializer; + public GooglePubsubEndpoint(String uri, Component component, String remaining) { super(uri, component); @@ -102,6 +110,9 @@ public class GooglePubsubEndpoint extends DefaultEndpoint { @Override public Producer createProducer() throws Exception { afterPropertiesSet(); + if (ObjectHelper.isEmpty(serializer)) { + serializer = new DefaultGooglePubsubSerializer(); + } return new GooglePubsubProducer(this); } @@ -179,4 +190,12 @@ public class GooglePubsubEndpoint extends DefaultEndpoint { public void setAckMode(GooglePubsubConstants.AckMode ackMode) { this.ackMode = ackMode; } + + public GooglePubsubSerializer getSerializer() { + return serializer; + } + + public void setSerializer(GooglePubsubSerializer serializer) { + this.serializer = serializer; + } } diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubProducer.java b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubProducer.java index 0ea84d8..995368d 100644 --- a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubProducer.java +++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubProducer.java @@ -16,9 +16,6 @@ */ package org.apache.camel.component.google.pubsub; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; import java.util.List; import java.util.Map; @@ -95,7 +92,7 @@ public class GooglePubsubProducer extends DefaultProducer { } else if (body instanceof byte[]) { byteString = ByteString.copyFrom((byte[]) body); } else { - byteString = ByteString.copyFrom(serialize(body)); + byteString = ByteString.copyFrom(endpoint.getSerializer().serialize(body)); } PubsubMessage.Builder messageBuilder = PubsubMessage.newBuilder().setData(byteString); @@ -112,11 +109,4 @@ public class GooglePubsubProducer extends DefaultProducer { ApiFuture<String> messageIdFuture = publisher.publish(message); exchange.getIn().setHeader(GooglePubsubConstants.MESSAGE_ID, messageIdFuture.get()); } - - public static byte[] serialize(Object obj) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - ObjectOutputStream os = new ObjectOutputStream(out); - os.writeObject(obj); - return out.toByteArray(); - } } diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/serializer/DefaultGooglePubsubSerializer.java b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/serializer/DefaultGooglePubsubSerializer.java new file mode 100644 index 0000000..4b8937d --- /dev/null +++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/serializer/DefaultGooglePubsubSerializer.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.pubsub.serializer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; + +/** + * Default GooglePubsubMessageSerializer that uses ObjectOutputStream to serialize objects. + */ +public class DefaultGooglePubsubSerializer implements GooglePubsubSerializer { + + @Override + public byte[] serialize(Object payload) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ObjectOutputStream os = new ObjectOutputStream(out); + os.writeObject(payload); + return out.toByteArray(); + } +} diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/serializer/GooglePubsubSerializer.java b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/serializer/GooglePubsubSerializer.java new file mode 100644 index 0000000..284b071e --- /dev/null +++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/serializer/GooglePubsubSerializer.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.pubsub.serializer; + +import java.io.IOException; + +/** + * An abstraction to customize how the GooglePubsubProducer serializes non String & byte[] message body payloads. + */ +public interface GooglePubsubSerializer { + + /** + * Serializes an object payload to a byte array. + * + * @param payload The payload to serialize + * @return The serialized payload as a byte array + * @throws IOException If the serialization process encountered errors + */ + byte[] serialize(Object payload) throws IOException; +} diff --git a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/CustomSerializerTest.java b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/CustomSerializerTest.java new file mode 100644 index 0000000..7772ea3 --- /dev/null +++ b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/CustomSerializerTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.pubsub.integration; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import org.apache.camel.BindToRegistry; +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.google.pubsub.PubsubTestSupport; +import org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.Test; + +public class CustomSerializerTest extends PubsubTestSupport { + + private static final String TOPIC_NAME = "typesSend"; + private static final String SUBSCRIPTION_NAME = "TypesReceive"; + + @EndpointInject("direct:from") + private Endpoint directIn; + + @EndpointInject("google-pubsub:{{project.id}}:" + TOPIC_NAME) + private Endpoint pubsubTopic; + + @EndpointInject("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME + "?synchronousPull=true") + private Endpoint pubsubSubscription; + + @EndpointInject("mock:receiveResult") + private MockEndpoint receiveResult; + + @Produce("direct:from") + private ProducerTemplate producer; + + @BindToRegistry + private GooglePubsubSerializer serializer = new CustomSerializer(); + + @Override + public void createTopicSubscription() { + createTopicSubscriptionPair(TOPIC_NAME, SUBSCRIPTION_NAME); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + from(directIn).to(pubsubTopic); + + from(pubsubSubscription).to(receiveResult); + } + }; + } + + @Test + public void customSerializer() throws Exception { + receiveResult.expectedBodiesReceived("12345 custom serialized".getBytes(StandardCharsets.UTF_8)); + + producer.sendBody(12345); + + receiveResult.assertIsSatisfied(); + } + + private static final class CustomSerializer implements GooglePubsubSerializer { + + @Override + public byte[] serialize(Object payload) throws IOException { + // Append 'custom serialized' to the payload + String serialized = payload + " custom serialized"; + return serialized.getBytes(StandardCharsets.UTF_8); + } + } +} diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java index 489e2cb..484a4b4 100644 --- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java +++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java @@ -482,6 +482,34 @@ public interface GooglePubsubEndpointBuilderFactory { return (GooglePubsubEndpointProducerBuilder) this; } /** + * A custom GooglePubsubSerializer to use for serializing message + * payloads in the producer. + * + * The option is a: + * <code>org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer</code> type. + * + * Group: producer (advanced) + */ + default AdvancedGooglePubsubEndpointProducerBuilder serializer( + Object serializer) { + doSetProperty("serializer", serializer); + return this; + } + /** + * A custom GooglePubsubSerializer to use for serializing message + * payloads in the producer. + * + * The option will be converted to a + * <code>org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer</code> type. + * + * Group: producer (advanced) + */ + default AdvancedGooglePubsubEndpointProducerBuilder serializer( + String serializer) { + doSetProperty("serializer", serializer); + return this; + } + /** * Sets whether synchronous processing should be strictly used, or Camel * is allowed to use asynchronous processing (if supported). * diff --git a/docs/components/modules/ROOT/pages/google-pubsub-component.adoc b/docs/components/modules/ROOT/pages/google-pubsub-component.adoc index 0a7fd0a..c4bc2e2 100644 --- a/docs/components/modules/ROOT/pages/google-pubsub-component.adoc +++ b/docs/components/modules/ROOT/pages/google-pubsub-component.adoc @@ -83,7 +83,7 @@ with the following path and query parameters: |=== -=== Query Parameters (10 parameters): +=== Query Parameters (11 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -98,6 +98,7 @@ with the following path and query parameters: | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. There are 3 enums and the value can be one of: InOnly, InOut, InOptionalOut | | ExchangePattern | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...] +| *serializer* (producer) | *Autowired* A custom GooglePubsubSerializer to use for serializing message payloads in the producer | | GooglePubsubSerializer | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean |=== // endpoint options: END