This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch CAMEL-21249 in repository https://gitbox.apache.org/repos/asf/camel.git
commit aac369d2b793aed25a3edde36a7a3882a89ee03e Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Thu Oct 10 15:49:23 2024 +0200 CAMEL-21249 - Camel-Kamelets: Move Kamelets utils in Camel Kamelets component Signed-off-by: Andrea Cosentino <anco...@gmail.com> --- .../apache/camel/catalog/transformers.properties | 1 + .../aws2-ddb-application-x-struct.json | 14 + components/camel-kamelet/pom.xml | 48 ++++ .../org/apache/camel/transformer.properties | 7 + .../transformer/aws2-ddb-application-x-struct | 2 + .../transformer/aws2-ddb-application-x-struct.json | 14 + .../component/kamelet/utils/djl/ImageNetUtil.java | 57 ++++ .../component/kamelet/utils/format/MimeType.java | 55 ++++ .../format/schema/DelegatingSchemaResolver.java | 121 +++++++++ .../kamelet/utils/mongodb/SslAwareMongoClient.java | 291 +++++++++++++++++++++ .../gson/JavaTimeInstantTypeAdapter.java | 41 +++ .../kafka/KafkaHeaderDeserializer.java | 98 +++++++ .../kamelet/utils/transform/DropField.java | 60 +++++ .../kamelet/utils/transform/ExtractField.java | 112 ++++++++ .../kamelet/utils/transform/HoistField.java | 38 +++ .../kamelet/utils/transform/InsertField.java | 85 ++++++ .../kamelet/utils/transform/MaskField.java | 125 +++++++++ .../utils/transform/MessageTimestampRouter.java | 90 +++++++ .../kamelet/utils/transform/RegexRouter.java | 50 ++++ .../kamelet/utils/transform/ReplaceField.java | 90 +++++++ .../kamelet/utils/transform/TimestampRouter.java | 69 +++++ .../ddb/Ddb2JsonStructDataTypeTransformer.java | 46 ++++ .../utils/transform/kafka/BatchManualCommit.java | 42 +++ .../utils/transform/kafka/ManualCommit.java | 33 +++ .../kamelet/utils/transform/kafka/ValueToKey.java | 57 ++++ .../utils/kafka/KafkaHeaderDeserializerTest.java | 78 ++++++ .../kamelet/utils/transform/ExtractFieldTest.java | 133 ++++++++++ .../kamelet/utils/transform/HoistFieldTest.java | 58 ++++ .../kamelet/utils/transform/InsertFieldTest.java | 73 ++++++ .../kamelet/utils/transform/MaskFieldTest.java | 86 ++++++ .../kamelet/utils/transform/RegexRouterTest.java | 51 ++++ .../kamelet/utils/transform/ReplaceFieldTest.java | 115 ++++++++ 32 files changed, 2240 insertions(+) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers.properties b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers.properties index 4efe6eae08f..b740fa67e2c 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers.properties +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers.properties @@ -8,6 +8,7 @@ avro-x-java-object avro-x-struct aws-cloudtrail-application-cloudevents aws2-ddb-application-json +aws2-ddb-application-x-struct aws2-ddbstream-application-cloudevents aws2-kinesis-application-cloudevents aws2-s3-application-cloudevents diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers/aws2-ddb-application-x-struct.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers/aws2-ddb-application-x-struct.json new file mode 100644 index 00000000000..d826347d326 --- /dev/null +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers/aws2-ddb-application-x-struct.json @@ -0,0 +1,14 @@ +{ + "transformer": { + "kind": "transformer", + "name": "aws2-ddb:application-x-struct", + "title": "Aws2 Ddb (Application X Struct)", + "description": "Transforms DynamoDB record into a Json node", + "deprecated": false, + "javaType": "org.apache.camel.component.kamelet.utils.transform.aws2.ddb.Ddb2JsonStructDataTypeTransformer", + "groupId": "org.apache.camel", + "artifactId": "camel-kamelet", + "version": "4.9.0-SNAPSHOT" + } +} + diff --git a/components/camel-kamelet/pom.xml b/components/camel-kamelet/pom.xml index e7635a793ba..7473eabff03 100644 --- a/components/camel-kamelet/pom.xml +++ b/components/camel-kamelet/pom.xml @@ -43,6 +43,54 @@ <groupId>org.apache.camel</groupId> <artifactId>camel-core-model</artifactId> </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-djl</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>net.sf.extjwnl</groupId> + <artifactId>extjwnl</artifactId> + <version>2.0.5</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>net.sf.extjwnl</groupId> + <artifactId>extjwnl-data-wn31</artifactId> + <version>1.2</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-kafka</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-jackson</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-jackson-avro</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-jackson-protobuf</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-mongodb</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-gson</artifactId> + <scope>provided</scope> + </dependency> + <!-- TESTS --> <dependency> diff --git a/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer.properties b/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer.properties new file mode 100644 index 00000000000..3d2567b0cb8 --- /dev/null +++ b/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer.properties @@ -0,0 +1,7 @@ +# Generated by camel build tools - do NOT edit this file! +transformers=aws2-ddb:application-x-struct +groupId=org.apache.camel +artifactId=camel-kamelet +version=4.9.0-SNAPSHOT +projectName=Camel :: Kamelet +projectDescription=To call Kamelets diff --git a/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct b/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct new file mode 100644 index 00000000000..4db224fcbbe --- /dev/null +++ b/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.component.kamelet.utils.transform.aws2.ddb.Ddb2JsonStructDataTypeTransformer diff --git a/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct.json b/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct.json new file mode 100644 index 00000000000..d826347d326 --- /dev/null +++ b/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct.json @@ -0,0 +1,14 @@ +{ + "transformer": { + "kind": "transformer", + "name": "aws2-ddb:application-x-struct", + "title": "Aws2 Ddb (Application X Struct)", + "description": "Transforms DynamoDB record into a Json node", + "deprecated": false, + "javaType": "org.apache.camel.component.kamelet.utils.transform.aws2.ddb.Ddb2JsonStructDataTypeTransformer", + "groupId": "org.apache.camel", + "artifactId": "camel-kamelet", + "version": "4.9.0-SNAPSHOT" + } +} + diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/djl/ImageNetUtil.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/djl/ImageNetUtil.java new file mode 100644 index 00000000000..7f4416deef9 --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/djl/ImageNetUtil.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet.utils.djl; + +import java.util.List; + +import ai.djl.modality.Classifications; +import net.sf.extjwnl.data.IndexWord; +import net.sf.extjwnl.data.POS; +import net.sf.extjwnl.data.PointerUtils; +import net.sf.extjwnl.data.list.PointerTargetNodeList; +import net.sf.extjwnl.dictionary.Dictionary; +import org.apache.camel.Exchange; +import org.apache.camel.RuntimeCamelException; + +/** + * A utility bean class for handling ImageNet (https://image-net.org/) classifications. + */ +public class ImageNetUtil { + + public ImageNetUtil() { + } + + public void extractClassName(Exchange exchange) { + Classifications body = exchange.getMessage().getBody(Classifications.class); + String className = body.best().getClassName().split(",")[0].split(" ", 2)[1]; + exchange.getMessage().setBody(className); + } + + public void addHypernym(Exchange exchange) throws Exception { + String className = exchange.getMessage().getBody(String.class); + Dictionary dic = Dictionary.getDefaultResourceInstance(); + IndexWord word = dic.getIndexWord(POS.NOUN, className); + if (word == null) { + throw new RuntimeCamelException("Word not found: " + className); + } + PointerTargetNodeList hypernyms = PointerUtils.getDirectHypernyms(word.getSenses().get(0)); + String hypernym = hypernyms.stream() + .map(h -> h.getSynset().getWords().get(0).getLemma()) + .findFirst().orElse(className); + exchange.getMessage().setBody(List.of(className, hypernym)); + } +} diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/format/MimeType.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/format/MimeType.java new file mode 100644 index 00000000000..aa16d5300c0 --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/format/MimeType.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.kamelet.utils.format; + +import java.util.Objects; + +public enum MimeType { + JSON("application/json"), + PROTOBUF("application/protobuf"), + PROTOBUF_BINARY("protobuf/binary"), + PROTOBUF_STRUCT("protobuf/x-struct"), + AVRO("application/avro"), + AVRO_BINARY("avro/binary"), + AVRO_STRUCT("avro/x-struct"), + BINARY("application/octet-stream"), + TEXT("text/plain"), + JAVA_OBJECT("application/x-java-object"), + STRUCT("application/x-struct"); + + private static final MimeType[] VALUES = values(); + private final String type; + + MimeType(String type) { + this.type = type; + } + + public String type() { + return type; + } + + public static MimeType of(String type) { + for (MimeType mt : VALUES) { + if (Objects.equals(type, mt.type)) { + return mt; + } + } + + throw new IllegalArgumentException("Unsupported type: " + type); + } +} diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/format/schema/DelegatingSchemaResolver.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/format/schema/DelegatingSchemaResolver.java new file mode 100644 index 00000000000..aa317448fbb --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/format/schema/DelegatingSchemaResolver.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.kamelet.utils.format.schema; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.jackson.avro.transform.AvroSchemaResolver; +import org.apache.camel.component.jackson.protobuf.transform.ProtobufSchemaResolver; +import org.apache.camel.component.jackson.transform.JsonSchemaResolver; +import org.apache.camel.component.kamelet.utils.format.MimeType; +import org.apache.camel.util.ObjectHelper; + +/** + * Schema resolver processor delegates to either Avro or Json schema resolver based on the given mimetype property. When + * mimetype is of type application/x-java-object uses additional target mimetype (usually the produces mimetype) to + * determine the schema resolver (Avro or Json). Delegates to schema resolver and sets proper content class and schema + * properties on the delegate. + */ +public class DelegatingSchemaResolver implements Processor { + private String mimeType; + private String targetMimeType; + + private String schema; + private String contentClass; + + @Override + public void process(Exchange exchange) throws Exception { + if (ObjectHelper.isEmpty(mimeType)) { + return; + } + + MimeType mimeType = MimeType.of(this.mimeType); + Processor resolver; + if (mimeType.equals(MimeType.JAVA_OBJECT)) { + if (ObjectHelper.isEmpty(targetMimeType)) { + return; + } + resolver = fromMimeType(MimeType.of(targetMimeType)); + } else { + resolver = fromMimeType(mimeType); + } + + if (resolver != null) { + resolver.process(exchange); + } + } + + private Processor fromMimeType(MimeType mimeType) { + switch (mimeType) { + case PROTOBUF: + case PROTOBUF_BINARY: + case PROTOBUF_STRUCT: + ProtobufSchemaResolver protobufSchemaResolver = new ProtobufSchemaResolver(); + protobufSchemaResolver.setSchema(this.schema); + protobufSchemaResolver.setContentClass(this.contentClass); + return protobufSchemaResolver; + case AVRO: + case AVRO_BINARY: + case AVRO_STRUCT: + AvroSchemaResolver avroSchemaResolver = new AvroSchemaResolver(); + avroSchemaResolver.setSchema(this.schema); + avroSchemaResolver.setContentClass(this.contentClass); + return avroSchemaResolver; + case JSON: + case STRUCT: + JsonSchemaResolver jsonSchemaResolver = new JsonSchemaResolver(); + jsonSchemaResolver.setSchema(this.schema); + jsonSchemaResolver.setContentClass(this.contentClass); + return jsonSchemaResolver; + default: + return null; + } + } + + public String getMimeType() { + return mimeType; + } + + public void setMimeType(String mimeType) { + this.mimeType = mimeType; + } + + public String getSchema() { + return schema; + } + + public void setSchema(String schema) { + this.schema = schema; + } + + public String getContentClass() { + return contentClass; + } + + public void setContentClass(String contentClass) { + this.contentClass = contentClass; + } + + public String getTargetMimeType() { + return targetMimeType; + } + + public void setTargetMimeType(String targetMimeType) { + this.targetMimeType = targetMimeType; + } +} diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/mongodb/SslAwareMongoClient.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/mongodb/SslAwareMongoClient.java new file mode 100644 index 00000000000..d7b3819fb1d --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/mongodb/SslAwareMongoClient.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet.utils.mongodb; + +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; + +import com.mongodb.*; +import com.mongodb.client.*; +import com.mongodb.connection.ClusterDescription; +import org.apache.camel.util.function.Suppliers; +import org.bson.Document; +import org.bson.codecs.configuration.CodecRegistry; +import org.bson.conversions.Bson; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SslAwareMongoClient implements MongoClient { + private static final Logger LOG = LoggerFactory.getLogger(SslAwareMongoClient.class); + private static final TrustManager[] trustAllCerts = new TrustManager[] { + new X509TrustManager() { + public X509Certificate[] getAcceptedIssuers() { + return null; + } + + @Override + public void checkClientTrusted(X509Certificate[] arg0, String arg1) + throws CertificateException { + } + + @Override + public void checkServerTrusted(X509Certificate[] arg0, String arg1) + throws CertificateException { + } + } + }; + private final Supplier<MongoClient> wrappedMongoClient = Suppliers.memorize(new Supplier<MongoClient>() { + @Override + public MongoClient get() { + String credentials = username == null ? "" : username; + + if (!credentials.isEmpty()) { + credentials += password == null ? "@" : ":" + password + "@"; + } + + MongoClientSettings settings = MongoClientSettings.builder() + .applyToSslSettings(builder -> { + builder.enabled(ssl); + if (!sslValidationEnabled) { + builder.invalidHostNameAllowed(true); + SSLContext sc = null; + try { + sc = SSLContext.getInstance("TLSv1.2"); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException("Error instantiating trust all SSL context.", e); + } + try { + sc.init(null, trustAllCerts, new java.security.SecureRandom()); + } catch (KeyManagementException e) { + throw new RuntimeException("Error instantiating trust all SSL context.", e); + } + builder.context(sc); + } + }) + .applyConnectionString(new ConnectionString(String.format("mongodb://%s%s", credentials, hosts))) + .build(); + LOG.info("Connection created using provided credentials"); + return MongoClients.create(settings); + } + }); + private String hosts = null; + private String username = null; + private String password = null; + private boolean ssl = true; + + private boolean sslValidationEnabled = true; + + public MongoClient getWrappedMongoClient() { + return wrappedMongoClient.get(); + } + + @Override + public MongoDatabase getDatabase(String s) { + return getWrappedMongoClient().getDatabase(s); + } + + @Override + public ClientSession startSession() { + return getWrappedMongoClient().startSession(); + } + + @Override + public ClientSession startSession(ClientSessionOptions clientSessionOptions) { + return getWrappedMongoClient().startSession(clientSessionOptions); + } + + @Override + public void close() { + getWrappedMongoClient().close(); + } + + @Override + public MongoIterable<String> listDatabaseNames() { + return getWrappedMongoClient().listDatabaseNames(); + } + + @Override + public MongoIterable<String> listDatabaseNames(ClientSession clientSession) { + return getWrappedMongoClient().listDatabaseNames(clientSession); + } + + @Override + public ListDatabasesIterable<Document> listDatabases() { + return getWrappedMongoClient().listDatabases(); + } + + @Override + public ListDatabasesIterable<Document> listDatabases(ClientSession clientSession) { + return getWrappedMongoClient().listDatabases(clientSession); + } + + @Override + public <TResult> ListDatabasesIterable<TResult> listDatabases(Class<TResult> aClass) { + return getWrappedMongoClient().listDatabases(aClass); + } + + @Override + public <TResult> ListDatabasesIterable<TResult> listDatabases(ClientSession clientSession, Class<TResult> aClass) { + return getWrappedMongoClient().listDatabases(clientSession, aClass); + } + + @Override + public ChangeStreamIterable<Document> watch() { + return getWrappedMongoClient().watch(); + } + + @Override + public <TResult> ChangeStreamIterable<TResult> watch(Class<TResult> aClass) { + return getWrappedMongoClient().watch(aClass); + } + + @Override + public ChangeStreamIterable<Document> watch(List<? extends Bson> list) { + return getWrappedMongoClient().watch(list); + } + + @Override + public <TResult> ChangeStreamIterable<TResult> watch(List<? extends Bson> list, Class<TResult> aClass) { + return getWrappedMongoClient().watch(list, aClass); + } + + @Override + public ChangeStreamIterable<Document> watch(ClientSession clientSession) { + return getWrappedMongoClient().watch(clientSession); + } + + @Override + public <TResult> ChangeStreamIterable<TResult> watch(ClientSession clientSession, Class<TResult> aClass) { + return getWrappedMongoClient().watch(clientSession, aClass); + } + + @Override + public ChangeStreamIterable<Document> watch(ClientSession clientSession, List<? extends Bson> list) { + return getWrappedMongoClient().watch(clientSession, list); + } + + @Override + public <TResult> ChangeStreamIterable<TResult> watch( + ClientSession clientSession, List<? extends Bson> list, + Class<TResult> aClass) { + return getWrappedMongoClient().watch(clientSession, list, aClass); + } + + @Override + public ClusterDescription getClusterDescription() { + return getWrappedMongoClient().getClusterDescription(); + } + + @Override + public CodecRegistry getCodecRegistry() { + return getWrappedMongoClient().getCodecRegistry(); + } + + @Override + public ReadPreference getReadPreference() { + return getWrappedMongoClient().getReadPreference(); + } + + @Override + public WriteConcern getWriteConcern() { + return getWrappedMongoClient().getWriteConcern(); + } + + @Override + public ReadConcern getReadConcern() { + return getWrappedMongoClient().getReadConcern(); + } + + @Override + public Long getTimeout(TimeUnit timeUnit) { + return getWrappedMongoClient().getTimeout(timeUnit); + } + + @Override + public MongoCluster withCodecRegistry(CodecRegistry codecRegistry) { + return getWrappedMongoClient().withCodecRegistry(codecRegistry); + } + + @Override + public MongoCluster withReadPreference(ReadPreference readPreference) { + return getWrappedMongoClient().withReadPreference(readPreference); + } + + @Override + public MongoCluster withWriteConcern(WriteConcern writeConcern) { + return getWrappedMongoClient().withWriteConcern(writeConcern); + } + + @Override + public MongoCluster withReadConcern(ReadConcern readConcern) { + return getWrappedMongoClient().withReadConcern(readConcern); + } + + @Override + public MongoCluster withTimeout(long l, TimeUnit timeUnit) { + return getWrappedMongoClient().withTimeout(l, timeUnit); + } + + public String getHosts() { + return hosts; + } + + public void setHosts(String hosts) { + this.hosts = hosts; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public boolean isSsl() { + return ssl; + } + + public void setSsl(boolean ssl) { + this.ssl = ssl; + } + + public boolean isSslValidationEnabled() { + return sslValidationEnabled; + } + + public void setSslValidationEnabled(boolean sslValidationEnabled) { + this.sslValidationEnabled = sslValidationEnabled; + } +} diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/gson/JavaTimeInstantTypeAdapter.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/gson/JavaTimeInstantTypeAdapter.java new file mode 100644 index 00000000000..6e062fc4c27 --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/gson/JavaTimeInstantTypeAdapter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.kamelet.utils.serialization.gson; + +import java.lang.reflect.Type; +import java.time.Instant; + +import com.google.gson.*; + +public class JavaTimeInstantTypeAdapter implements JsonSerializer<Instant>, JsonDeserializer<Instant> { + + @Override + public JsonElement serialize( + final Instant time, final Type typeOfSrc, + final JsonSerializationContext context) { + return new JsonPrimitive(time.getEpochSecond() * 1000); + } + + @Override + public Instant deserialize( + final JsonElement json, final Type typeOfT, + final JsonDeserializationContext context) + throws JsonParseException { + return Instant.ofEpochMilli(json.getAsLong()); + } +} diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/kafka/KafkaHeaderDeserializer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/kafka/KafkaHeaderDeserializer.java new file mode 100644 index 00000000000..6cb8f2f1b02 --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/kafka/KafkaHeaderDeserializer.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.kamelet.utils.serialization.kafka; + +import java.nio.charset.StandardCharsets; +import java.util.Map; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.TypeConverter; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.camel.support.SimpleTypeConverter; + +/** + * Header deserializer used in Kafka source Kamelet. Automatically converts all message headers to String. Uses given + * type converter implementation set on the Camel context to convert values. If no type converter is set the + * deserializer uses its own fallback conversion implementation. + */ +public class KafkaHeaderDeserializer implements Processor { + + public boolean enabled = false; + + private final SimpleTypeConverter defaultTypeConverter = new SimpleTypeConverter(true, KafkaHeaderDeserializer::convert); + + @Override + public void process(Exchange exchange) throws Exception { + if (!enabled) { + return; + } + + Map<String, Object> headers = exchange.getMessage().getHeaders(); + + TypeConverter typeConverter = exchange.getContext().getTypeConverter(); + if (typeConverter == null) { + typeConverter = defaultTypeConverter; + } + + for (Map.Entry<String, Object> header : headers.entrySet()) { + if (shouldDeserialize(header)) { + header.setValue(typeConverter.convertTo(String.class, header.getValue())); + } + } + } + + /** + * Fallback conversion strategy supporting null values, String and byte[]. Converts headers to respective String + * representation or null. + * + * @param type target type, always String in this case. + * @param exchange the exchange containing all headers to convert. + * @param value the current value to convert. + * @return String representation of given value or null if value itself is null. + */ + private static Object convert(Class<?> type, Exchange exchange, Object value) { + if (value == null) { + return null; + } + + if (value instanceof String) { + return value; + } + + if (value instanceof byte[]) { + return new String((byte[]) value, StandardCharsets.UTF_8); + } + + return value.toString(); + } + + /** + * Exclude special Kafka headers from auto deserialization. + * + * @param entry + * @return + */ + private boolean shouldDeserialize(Map.Entry<String, Object> entry) { + return !entry.getKey().equals(KafkaConstants.HEADERS) && !entry.getKey().equals(KafkaConstants.MANUAL_COMMIT); + } + + public void setEnabled(String enabled) { + this.enabled = Boolean.parseBoolean(enabled); + } +} diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/DropField.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/DropField.java new file mode 100644 index 00000000000..7dec28eb05b --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/DropField.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet.utils.transform; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeType; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.camel.Exchange; +import org.apache.camel.InvalidPayloadException; +import org.apache.camel.Processor; + +public class DropField implements Processor { + + String field; + + /** + * Default constructor. + */ + public DropField() { + } + + /** + * Constructor using fields. + * + * @param field the field name to drop. + */ + public DropField(String field, String value) { + this.field = field; + } + + public void process(Exchange ex) throws InvalidPayloadException { + JsonNode body = ex.getMessage().getBody(JsonNode.class); + if (body == null) { + throw new InvalidPayloadException(ex, JsonNode.class); + } + + if (body.getNodeType().equals(JsonNodeType.OBJECT)) { + ((ObjectNode) body).remove(field); + ex.getMessage().setBody(body); + } + } + + public void setField(String field) { + this.field = field; + } +} diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/ExtractField.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/ExtractField.java new file mode 100644 index 00000000000..eab12202931 --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/ExtractField.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet.utils.transform; + +import java.util.Map; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.Exchange; +import org.apache.camel.InvalidPayloadException; +import org.apache.camel.Processor; + +public class ExtractField implements Processor { + + String field; + String headerOutputName; + boolean headerOutput; + boolean strictHeaderCheck; + boolean trimField; + + static final String EXTRACTED_FIELD_HEADER = "CamelKameletsExtractFieldName"; + + /** + * Default constructor + */ + public ExtractField() { + } + + /** + * Constructor using field member. + * + * @param field the field name to extract. + */ + public ExtractField(String field) { + this.field = field; + } + + @Override + public void process(Exchange ex) throws InvalidPayloadException { + ObjectMapper mapper = new ObjectMapper(); + JsonNode jsonNodeBody = ex.getMessage().getBody(JsonNode.class); + + if (jsonNodeBody == null) { + throw new InvalidPayloadException(ex, JsonNode.class); + + } + + Map<Object, Object> body = mapper.convertValue(jsonNodeBody, new TypeReference<Map<Object, Object>>() { + }); + if (!headerOutput || (strictHeaderCheck && checkHeaderExistence(ex))) { + ex.getMessage().setBody(body.get(field)); + } else { + extractToHeader(ex, body); + } + if (trimField) { + ex.setProperty("trimField", "true"); + } else { + ex.setProperty("trimField", "false"); + } + } + + private void extractToHeader(Exchange ex, Map<Object, Object> body) { + if (headerOutputName == null || headerOutputName.isEmpty() || "none".equalsIgnoreCase(headerOutputName)) { + ex.getMessage().setHeader(EXTRACTED_FIELD_HEADER, body.get(field)); + } else { + ex.getMessage().setHeader(headerOutputName, body.get(field)); + } + } + + private boolean checkHeaderExistence(Exchange exchange) { + if (headerOutputName == null || headerOutputName.isEmpty() || "none".equalsIgnoreCase(headerOutputName)) { + return exchange.getMessage().getHeaders().containsKey(EXTRACTED_FIELD_HEADER); + } else { + return exchange.getMessage().getHeaders().containsKey(headerOutputName); + } + } + + public void setField(String field) { + this.field = field; + } + + public void setHeaderOutput(boolean headerOutput) { + this.headerOutput = headerOutput; + } + + public void setHeaderOutputName(String headerOutputName) { + this.headerOutputName = headerOutputName; + } + + public void setStrictHeaderCheck(boolean strictHeaderCheck) { + this.strictHeaderCheck = strictHeaderCheck; + } + + public void setTrimField(boolean trimField) { + this.trimField = trimField; + } +} diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/HoistField.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/HoistField.java new file mode 100644 index 00000000000..43fee2e79f0 --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/HoistField.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet.utils.transform; + +import java.util.HashMap; +import java.util.Map; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangeProperty; +import org.apache.camel.InvalidPayloadException; + +public class HoistField { + + public JsonNode process(@ExchangeProperty("field") String field, Exchange ex) throws InvalidPayloadException { + ObjectMapper mapper = new ObjectMapper(); + Object body = ex.getMessage().getBody(); + Map<Object, Object> updatedBody = new HashMap<>(); + updatedBody.put(field, body); + return mapper.valueToTree(updatedBody); + } + +} diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/InsertField.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/InsertField.java new file mode 100644 index 00000000000..0ccf0a2b6fe --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/InsertField.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet.utils.transform; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.camel.Exchange; +import org.apache.camel.InvalidPayloadException; +import org.apache.camel.Processor; +import org.apache.camel.support.LanguageSupport; + +public class InsertField implements Processor { + + String field; + String value; + + /** + * Default constructor. + */ + public InsertField() { + } + + /** + * Constructor using fields. + * + * @param field the field name to insert. + * @param value the value of the new field. + */ + public InsertField(String field, String value) { + this.field = field; + this.value = value; + } + + public void process(Exchange ex) throws InvalidPayloadException { + JsonNode body = ex.getMessage().getBody(JsonNode.class); + + if (body == null) { + throw new InvalidPayloadException(ex, JsonNode.class); + } + + String resolvedValue; + if (LanguageSupport.hasSimpleFunction(value)) { + resolvedValue = ex.getContext().resolveLanguage("simple").createExpression(value).evaluate(ex, String.class); + } else { + resolvedValue = value; + } + + switch (body.getNodeType()) { + case ARRAY: + ((ArrayNode) body).add(resolvedValue); + break; + case OBJECT: + ((ObjectNode) body).put(field, resolvedValue); + break; + default: + ((ObjectNode) body).put(field, resolvedValue); + break; + } + + ex.getMessage().setBody(body); + } + + public void setField(String field) { + this.field = field; + } + + public void setValue(String value) { + this.value = value; + } +} diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MaskField.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MaskField.java new file mode 100644 index 00000000000..ed83fbea232 --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MaskField.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet.utils.transform; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.*; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangeProperty; +import org.apache.camel.InvalidPayloadException; +import org.apache.camel.util.ObjectHelper; + +public class MaskField { + + private static final Map<Class<?>, Function<String, ?>> MAPPING_FUNC = new HashMap<>(); + private static final Map<Class<?>, Object> BASIC_MAPPING = new HashMap<>(); + + static { + BASIC_MAPPING.put(Boolean.class, Boolean.FALSE); + BASIC_MAPPING.put(Byte.class, (byte) 0); + BASIC_MAPPING.put(Short.class, (short) 0); + BASIC_MAPPING.put(Integer.class, 0); + BASIC_MAPPING.put(Long.class, 0L); + BASIC_MAPPING.put(Float.class, 0f); + BASIC_MAPPING.put(Double.class, 0d); + BASIC_MAPPING.put(BigInteger.class, BigInteger.ZERO); + BASIC_MAPPING.put(BigDecimal.class, BigDecimal.ZERO); + BASIC_MAPPING.put(Date.class, new Date(0)); + BASIC_MAPPING.put(String.class, ""); + + MAPPING_FUNC.put(Byte.class, Byte::parseByte); + MAPPING_FUNC.put(Short.class, Short::parseShort); + MAPPING_FUNC.put(Integer.class, Integer::parseInt); + MAPPING_FUNC.put(Long.class, Long::parseLong); + MAPPING_FUNC.put(Float.class, Float::parseFloat); + MAPPING_FUNC.put(Double.class, Double::parseDouble); + MAPPING_FUNC.put(String.class, Function.identity()); + MAPPING_FUNC.put(BigDecimal.class, BigDecimal::new); + MAPPING_FUNC.put(BigInteger.class, BigInteger::new); + } + + public JsonNode process( + @ExchangeProperty("fields") String fields, @ExchangeProperty("replacement") String replacement, Exchange ex) + throws InvalidPayloadException { + ObjectMapper mapper = new ObjectMapper(); + List<String> splittedFields = new ArrayList<>(); + JsonNode jsonNodeBody = ex.getMessage().getBody(JsonNode.class); + Map<Object, Object> body = mapper.convertValue(jsonNodeBody, new TypeReference<Map<Object, Object>>() { + }); + if (ObjectHelper.isNotEmpty(fields)) { + splittedFields = Arrays.stream(fields.split(",")).collect(Collectors.toList()); + } + + Map<Object, Object> updatedBody = new HashMap<>(); + for (Map.Entry<Object, Object> entry : body.entrySet()) { + final String fieldName = (String) entry.getKey(); + final Object origFieldValue = entry.getValue(); + updatedBody.put(fieldName, + filterNames(fieldName, splittedFields) ? masked(origFieldValue, replacement) : origFieldValue); + } + if (!updatedBody.isEmpty()) { + return mapper.valueToTree(updatedBody); + } else { + return mapper.valueToTree(body); + } + } + + boolean filterNames(String fieldName, List<String> splittedFields) { + return splittedFields.contains(fieldName); + } + + private Object masked(Object value, String replacement) { + if (value == null) { + return null; + } + return replacement == null ? maskWithNullValue(value) : maskWithCustomReplacement(value, replacement); + } + + private static Object maskWithCustomReplacement(Object value, String replacement) { + Function<String, ?> replacementMapper = MAPPING_FUNC.get(value.getClass()); + if (replacementMapper == null) { + throw new IllegalArgumentException( + "Unable to mask value of type " + value.getClass() + " with custom replacement."); + } + try { + return replacementMapper.apply(replacement); + } catch (NumberFormatException ex) { + throw new IllegalArgumentException( + "Unable to convert " + replacement + " (" + replacement.getClass() + ") to number", ex); + } + } + + private static Object maskWithNullValue(Object value) { + Object maskedValue = BASIC_MAPPING.get(value.getClass()); + if (maskedValue == null) { + if (value instanceof List) + maskedValue = Collections.emptyList(); + else if (value instanceof Map) + maskedValue = Collections.emptyMap(); + else + throw new IllegalArgumentException("Unable to mask value of type: " + value.getClass()); + } + return maskedValue; + } +} diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MessageTimestampRouter.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MessageTimestampRouter.java new file mode 100644 index 00000000000..3b1ef2eb3dc --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MessageTimestampRouter.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet.utils.transform; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangeProperty; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.camel.util.ObjectHelper; + +public class MessageTimestampRouter { + + public void process( + @ExchangeProperty("topicFormat") String topicFormat, @ExchangeProperty("timestampFormat") String timestampFormat, + @ExchangeProperty("timestampKeys") String timestampKeys, + @ExchangeProperty("timestampKeyFormat") String timestampKeyFormat, Exchange ex) + throws ParseException { + final Pattern TOPIC = Pattern.compile("$[topic]", Pattern.LITERAL); + + final Pattern TIMESTAMP = Pattern.compile("$[timestamp]", Pattern.LITERAL); + + final SimpleDateFormat fmt = new SimpleDateFormat(timestampFormat); + fmt.setTimeZone(TimeZone.getTimeZone("UTC")); + + ObjectMapper mapper = new ObjectMapper(); + List<String> splittedKeys = new ArrayList<>(); + JsonNode jsonNodeBody = ex.getMessage().getBody(JsonNode.class); + Map<Object, Object> body = mapper.convertValue(jsonNodeBody, new TypeReference<Map<Object, Object>>() { + }); + if (ObjectHelper.isNotEmpty(timestampKeys)) { + splittedKeys = Arrays.stream(timestampKeys.split(",")).collect(Collectors.toList()); + } + + Object rawTimestamp = null; + String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC, String.class); + for (String key : splittedKeys) { + if (ObjectHelper.isNotEmpty(key)) { + rawTimestamp = body.get(key); + break; + } + } + Long timestamp = null; + if (ObjectHelper.isNotEmpty(timestampKeyFormat) && ObjectHelper.isNotEmpty(rawTimestamp) + && !timestampKeyFormat.equalsIgnoreCase("timestamp")) { + final SimpleDateFormat timestampKeyFmt = new SimpleDateFormat(timestampKeyFormat); + timestampKeyFmt.setTimeZone(TimeZone.getTimeZone("UTC")); + timestamp = timestampKeyFmt.parse((String) rawTimestamp).getTime(); + } else if (ObjectHelper.isNotEmpty(rawTimestamp)) { + timestamp = Long.parseLong(rawTimestamp.toString()); + } + if (ObjectHelper.isNotEmpty(timestamp)) { + final String formattedTimestamp = fmt.format(new Date(timestamp)); + String replace1; + String updatedTopic; + + if (ObjectHelper.isNotEmpty(topicName)) { + replace1 = TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(topicName)); + updatedTopic = TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp)); + } else { + replace1 = TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement("")); + updatedTopic = TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp)); + } + ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC, updatedTopic); + } + } + +} diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/RegexRouter.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/RegexRouter.java new file mode 100644 index 00000000000..517f1d0b787 --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/RegexRouter.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet.utils.transform; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.camel.Exchange; +import org.apache.camel.ExchangeProperty; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.camel.util.ObjectHelper; + +public class RegexRouter { + + public void process( + @ExchangeProperty("regex") String regex, @ExchangeProperty("replacement") String replacement, Exchange ex) { + Pattern regexPattern = Pattern.compile(regex); + String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC, String.class); + if (ObjectHelper.isNotEmpty(topicName)) { + final Matcher matcher = regexPattern.matcher(topicName); + if (matcher.matches()) { + final String topicUpdated = matcher.replaceFirst(replacement); + ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC, topicUpdated); + } + } + String ceType = ex.getMessage().getHeader("ce-type", String.class); + if (ObjectHelper.isNotEmpty(ceType)) { + final Matcher matcher = regexPattern.matcher(ceType); + if (matcher.matches()) { + final String ceTypeUpdated = matcher.replaceFirst(replacement); + ex.getMessage().setHeader("ce-type", ceTypeUpdated); + } + } + } + +} diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/ReplaceField.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/ReplaceField.java new file mode 100644 index 00000000000..79048041f63 --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/ReplaceField.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet.utils.transform; + +import java.util.*; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangeProperty; +import org.apache.camel.InvalidPayloadException; +import org.apache.camel.util.ObjectHelper; + +public class ReplaceField { + + public JsonNode process( + @ExchangeProperty("enabled") String enabled, @ExchangeProperty("disabled") String disabled, + @ExchangeProperty("renames") String renames, Exchange ex) + throws InvalidPayloadException { + ObjectMapper mapper = new ObjectMapper(); + List<String> enabledFields = new ArrayList<>(); + List<String> disabledFields = new ArrayList<>(); + List<String> renameFields = new ArrayList<>(); + JsonNode jsonNodeBody = ex.getMessage().getBody(JsonNode.class); + Map<Object, Object> body = mapper.convertValue(jsonNodeBody, new TypeReference<Map<Object, Object>>() { + }); + if (ObjectHelper.isNotEmpty(enabled) && !enabled.equalsIgnoreCase("all")) { + enabledFields = Arrays.stream(enabled.split(",")).collect(Collectors.toList()); + } + if (ObjectHelper.isNotEmpty(disabled) && !disabled.equalsIgnoreCase("none")) { + disabledFields = Arrays.stream(disabled.split(",")).collect(Collectors.toList()); + } + if (ObjectHelper.isNotEmpty(disabled)) { + renameFields = Arrays.stream(renames.split(",")).collect(Collectors.toList()); + } + Map<Object, Object> updatedBody = new HashMap<>(); + + if (ObjectHelper.isNotEmpty(renameFields)) { + Map<String, String> renamingMap = parseNames(renameFields); + + for (Map.Entry<Object, Object> entry : body.entrySet()) { + final String fieldName = (String) entry.getKey(); + if (filterNames(fieldName, enabledFields, disabledFields)) { + final Object fieldValue = entry.getValue(); + updatedBody.put(renameOptional(fieldName, renamingMap), fieldValue); + } + } + } + if (!updatedBody.isEmpty()) { + return mapper.valueToTree(updatedBody); + } else { + return mapper.valueToTree(body); + } + } + + boolean filterNames(String fieldName, List<String> enabledFields, List<String> disabledFields) { + return !disabledFields.contains(fieldName) && (enabledFields.isEmpty() || enabledFields.contains(fieldName)); + } + + static Map<String, String> parseNames(List<String> mappings) { + final Map<String, String> m = new HashMap<>(); + for (String mapping : mappings) { + final String[] parts = mapping.split(":"); + m.put(parts[0], parts[1]); + } + return m; + } + + String renameOptional(String fieldName, Map<String, String> renames) { + final String mapping = renames.get(fieldName); + return mapping == null ? fieldName : mapping; + } + +} diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/TimestampRouter.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/TimestampRouter.java new file mode 100644 index 00000000000..9c660497f0e --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/TimestampRouter.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet.utils.transform; + +import java.text.SimpleDateFormat; +import java.time.Instant; +import java.util.Date; +import java.util.TimeZone; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.camel.Exchange; +import org.apache.camel.ExchangeProperty; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.camel.util.ObjectHelper; + +public class TimestampRouter { + + public void process( + @ExchangeProperty("topicFormat") String topicFormat, @ExchangeProperty("timestampFormat") String timestampFormat, + @ExchangeProperty("timestampHeaderName") String timestampHeaderName, Exchange ex) { + final Pattern TOPIC = Pattern.compile("$[topic]", Pattern.LITERAL); + + final Pattern TIMESTAMP = Pattern.compile("$[timestamp]", Pattern.LITERAL); + + final SimpleDateFormat fmt = new SimpleDateFormat(timestampFormat); + fmt.setTimeZone(TimeZone.getTimeZone("UTC")); + + Long timestamp = null; + String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC, String.class); + Object rawTimestamp = ex.getMessage().getHeader(timestampHeaderName); + if (rawTimestamp instanceof Long) { + timestamp = (Long) rawTimestamp; + } else if (rawTimestamp instanceof Instant) { + timestamp = ((Instant) rawTimestamp).toEpochMilli(); + } else if (ObjectHelper.isNotEmpty(rawTimestamp)) { + timestamp = Long.parseLong(rawTimestamp.toString()); + } + if (ObjectHelper.isNotEmpty(timestamp)) { + final String formattedTimestamp = fmt.format(new Date(timestamp)); + String replace1; + String updatedTopic; + + if (ObjectHelper.isNotEmpty(topicName)) { + replace1 = TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(topicName)); + updatedTopic = TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp)); + } else { + replace1 = TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement("")); + updatedTopic = TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp)); + } + ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC, updatedTopic); + } + } + +} diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/aws2/ddb/Ddb2JsonStructDataTypeTransformer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/aws2/ddb/Ddb2JsonStructDataTypeTransformer.java new file mode 100644 index 00000000000..629f4ebadb9 --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/aws2/ddb/Ddb2JsonStructDataTypeTransformer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.kamelet.utils.transform.aws2.ddb; + +import java.time.Instant; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import org.apache.camel.Message; +import org.apache.camel.component.kamelet.utils.serialization.gson.JavaTimeInstantTypeAdapter; +import org.apache.camel.spi.DataType; +import org.apache.camel.spi.DataTypeTransformer; +import org.apache.camel.spi.Transformer; + +@DataTypeTransformer(name = "aws2-ddb:application-x-struct", + description = "Transforms DynamoDB record into a Json node") +public class Ddb2JsonStructDataTypeTransformer extends Transformer { + + private final Gson gson = new GsonBuilder() + .registerTypeAdapter(Instant.class, new JavaTimeInstantTypeAdapter()) + .create(); + + @Override + public void transform(Message message, DataType fromType, DataType toType) { + if (message.getBody() instanceof String) { + return; + } + + message.setBody(gson.toJson(message.getBody())); + } +} diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java new file mode 100644 index 00000000000..8b97d3b9ff1 --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet.utils.transform.kafka; + +import java.util.List; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.camel.component.kafka.consumer.KafkaManualCommit; + +public class BatchManualCommit implements Processor { + + @Override + public void process(Exchange exchange) throws Exception { + List<?> exchanges = exchange.getMessage().getBody(List.class); + if (exchanges.size() > 0) { + final Object tmp = exchanges.get(exchanges.size() - 1); + if (tmp instanceof Exchange element) { + KafkaManualCommit manual + = element.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); + if (manual != null) { + manual.commit(); + } + } + } + } +} diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java new file mode 100644 index 00000000000..db92916f2b9 --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet.utils.transform.kafka; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.camel.component.kafka.consumer.KafkaManualCommit; + +public class ManualCommit implements Processor { + + @Override + public void process(Exchange exchange) throws Exception { + KafkaManualCommit manual = exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); + if (manual != null) { + manual.commit(); + } + } +} diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ValueToKey.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ValueToKey.java new file mode 100644 index 00000000000..8333ab25cf6 --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ValueToKey.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet.utils.transform.kafka; + +import java.util.*; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangeProperty; +import org.apache.camel.InvalidPayloadException; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.camel.util.ObjectHelper; + +public class ValueToKey { + + public void process(@ExchangeProperty("fields") String fields, Exchange ex) throws InvalidPayloadException { + List<String> splittedFields = new ArrayList<>(); + ObjectMapper mapper = new ObjectMapper(); + JsonNode jsonNodeBody = ex.getMessage().getBody(JsonNode.class); + Map<Object, Object> body = mapper.convertValue(jsonNodeBody, new TypeReference<Map<Object, Object>>() { + }); + if (ObjectHelper.isNotEmpty(fields)) { + splittedFields = Arrays.stream(fields.split(",")).collect(Collectors.toList()); + } + Map<Object, Object> key = new HashMap<>(); + for (Map.Entry<Object, Object> entry : body.entrySet()) { + final String fieldName = (String) entry.getKey(); + if (filterNames(fieldName, splittedFields)) { + final Object fieldValue = entry.getValue(); + key.put(entry.getKey(), fieldValue); + } + } + + ex.getMessage().setHeader(KafkaConstants.KEY, key); + } + + boolean filterNames(String fieldName, List<String> splittedFields) { + return splittedFields.contains(fieldName); + } +} diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/kafka/KafkaHeaderDeserializerTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/kafka/KafkaHeaderDeserializerTest.java new file mode 100644 index 00000000000..9332887679b --- /dev/null +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/kafka/KafkaHeaderDeserializerTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.kamelet.utils.kafka; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import org.apache.camel.Exchange; +import org.apache.camel.component.kamelet.utils.serialization.kafka.KafkaHeaderDeserializer; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.support.DefaultExchange; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class KafkaHeaderDeserializerTest { + + private DefaultCamelContext camelContext; + + private final KafkaHeaderDeserializer processor = new KafkaHeaderDeserializer(); + + @BeforeEach + void setup() { + this.camelContext = new DefaultCamelContext(); + } + + @Test + void shouldDeserializeHeaders() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setHeader("foo", "bar"); + exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8)); + exchange.getMessage().setHeader("fooNull", null); + exchange.getMessage().setHeader("number", 1L); + + processor.enabled = true; + processor.process(exchange); + + Assertions.assertTrue(exchange.getMessage().hasHeaders()); + Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo")); + Assertions.assertEquals("barBytes", exchange.getMessage().getHeader("fooBytes")); + Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("fooNull")); + Assertions.assertNull(exchange.getMessage().getHeader("fooNull")); + Assertions.assertEquals("1", exchange.getMessage().getHeader("number")); + } + + @Test + void shouldNotDeserializeHeadersWhenDisabled() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setHeader("foo", "bar"); + exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8)); + + processor.enabled = false; + processor.process(exchange); + + Assertions.assertTrue(exchange.getMessage().hasHeaders()); + Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo")); + Assertions.assertTrue(exchange.getMessage().getHeader("fooBytes") instanceof byte[]); + Assertions.assertEquals(Arrays.toString("barBytes".getBytes(StandardCharsets.UTF_8)), + Arrays.toString((byte[]) exchange.getMessage().getHeader("fooBytes"))); + } +} diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/ExtractFieldTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/ExtractFieldTest.java new file mode 100644 index 00000000000..94608d554f1 --- /dev/null +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/ExtractFieldTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet.utils.transform; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.support.DefaultExchange; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class ExtractFieldTest { + + private DefaultCamelContext camelContext; + + private final ObjectMapper mapper = new ObjectMapper(); + + private ExtractField processor; + + private final String baseJson = "{" + "\n" + + " \"name\" : \"Rajesh Koothrappali\"" + "\n" + + "}"; + + @BeforeEach + void setup() { + camelContext = new DefaultCamelContext(); + processor = new ExtractField(); + } + + @Test + void shouldExtractFieldFromJsonNode() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree(baseJson)); + + processor.setField("name"); + processor.process(exchange); + + Assertions.assertEquals("Rajesh Koothrappali", exchange.getMessage().getBody(String.class)); + } + + @Test + void shouldExtractFieldToHeader() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree(baseJson)); + + processor.setField("name"); + processor.setHeaderOutput(true); + processor.setHeaderOutputName("name"); + processor.process(exchange); + + Assertions.assertEquals(baseJson, exchange.getMessage().getBody(String.class)); + Assertions.assertEquals("Rajesh Koothrappali", exchange.getMessage().getHeader("name")); + } + + @Test + void shouldExtractFieldToHeaderWithStrictHeaderCheck() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree(baseJson)); + + processor.setField("name"); + processor.setHeaderOutput(true); + processor.setHeaderOutputName("name"); + processor.setStrictHeaderCheck(true); + processor.process(exchange); + + Assertions.assertEquals(baseJson, exchange.getMessage().getBody(String.class)); + Assertions.assertEquals("Rajesh Koothrappali", exchange.getMessage().getHeader("name")); + + exchange.getMessage().setHeader("name", "somethingElse"); + + processor.process(exchange); + + Assertions.assertEquals("Rajesh Koothrappali", exchange.getMessage().getBody(String.class)); + Assertions.assertEquals("somethingElse", exchange.getMessage().getHeader("name")); + } + + @Test + void shouldExtractFieldToDefaultHeader() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree(baseJson)); + + processor.setField("name"); + processor.setHeaderOutput(true); + processor.process(exchange); + + Assertions.assertEquals(baseJson, exchange.getMessage().getBody(String.class)); + Assertions.assertEquals("Rajesh Koothrappali", exchange.getMessage().getHeader(ExtractField.EXTRACTED_FIELD_HEADER)); + + exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree(baseJson)); + + processor.setHeaderOutputName("none"); + processor.process(exchange); + + Assertions.assertEquals(baseJson, exchange.getMessage().getBody(String.class)); + Assertions.assertEquals("Rajesh Koothrappali", exchange.getMessage().getHeader(ExtractField.EXTRACTED_FIELD_HEADER)); + } + + @Test + void shouldExtractFieldWithT() throws Exception { + final String baseJson = "{\"id\":\"1\",\"message\":\"Camel\\\\tRocks\"}"; + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree(baseJson)); + + processor.setField("message"); + processor.setTrimField(true); + processor.process(exchange); + + Assertions.assertEquals("Camel\\tRocks", exchange.getMessage().getBody()); + } + +} diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/HoistFieldTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/HoistFieldTest.java new file mode 100644 index 00000000000..226328d3367 --- /dev/null +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/HoistFieldTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet.utils.transform; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.support.DefaultExchange; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class HoistFieldTest { + + private DefaultCamelContext camelContext; + + private final ObjectMapper mapper = new ObjectMapper(); + + private HoistField processor; + + private final String baseJson = "{" + "\n" + + " \"name\" : \"Rajesh Koothrappali\"" + "\n" + + "}"; + + @BeforeEach + void setup() { + camelContext = new DefaultCamelContext(); + processor = new HoistField(); + } + + @Test + void shouldHoistField() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree(baseJson)); + + JsonNode s = processor.process("element", exchange); + Assertions.assertEquals("{" + "\"element\"" + ":" + "{" + + "\"name\":\"Rajesh Koothrappali\"" + + "}" + "}", + s.toString()); + } +} diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/InsertFieldTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/InsertFieldTest.java new file mode 100644 index 00000000000..c1da17929cc --- /dev/null +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/InsertFieldTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet.utils.transform; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.support.DefaultExchange; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class InsertFieldTest { + + private DefaultCamelContext camelContext; + + private final ObjectMapper mapper = new ObjectMapper(); + + private InsertField processor; + + private final String baseJson = "{" + + "\"name\":\"Rajesh Koothrappali\"" + + "}"; + + @BeforeEach + void setup() { + camelContext = new DefaultCamelContext(); + processor = new InsertField(); + } + + @Test + void shouldAddFieldToPlainJson() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree(baseJson)); + + processor = new InsertField("age", "29"); + processor.process(exchange); + + Assertions.assertEquals(exchange.getMessage().getBody(String.class), "{" + "\n" + + " \"name\" : \"Rajesh Koothrappali\"," + "\n" + + " \"age\" : \"29\"" + "\n" + + "}"); + } + + @Test + void shouldAddFieldToArrayJson() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + String arrayJson = "[\"batman\",\"spiderman\",\"wonderwoman\"]"; + exchange.getMessage().setBody(mapper.readTree(arrayJson)); + + processor.setValue("green lantern"); + processor.process(exchange); + + Assertions.assertEquals(exchange.getMessage().getBody(String.class), + "[ \"batman\", \"spiderman\", \"wonderwoman\", \"green lantern\" ]"); + } +} diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/MaskFieldTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/MaskFieldTest.java new file mode 100644 index 00000000000..18972e635ea --- /dev/null +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/MaskFieldTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet.utils.transform; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.support.DefaultExchange; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class MaskFieldTest { + + private DefaultCamelContext camelContext; + + private final ObjectMapper mapper = new ObjectMapper(); + + private MaskField processor; + + private final String baseJson = "{" + "\n" + + " \"name\" : \"Rajesh Koothrappali\"" + "\n" + + "}"; + + @BeforeEach + void setup() { + camelContext = new DefaultCamelContext(); + processor = new MaskField(); + } + + @Test + void shouldMaskField() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree(baseJson)); + + JsonNode s = processor.process("name", "xxxx", exchange); + Assertions.assertEquals("\"xxxx\"", s.get("name").toString()); + } + + @Test + void shouldMaskFieldWithNull() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree(baseJson)); + + JsonNode s = processor.process("name", null, exchange); + Assertions.assertEquals("\"\"", s.get("name").toString()); + } + + @Test + void shouldMaskFieldList() throws Exception { + Map<String, List<String>> names = new HashMap<>(); + Exchange exchange = new DefaultExchange(camelContext); + List<String> els = new ArrayList<>(); + els.add("Sheldon"); + els.add("Rajesh"); + els.add("Leonard"); + names.put("names", els); + + exchange.getMessage().setBody(mapper.writeValueAsString(names)); + + JsonNode s = processor.process("names", null, exchange); + Assertions.assertEquals("[]", s.get("names").toString()); + } +} diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/RegexRouterTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/RegexRouterTest.java new file mode 100644 index 00000000000..c1837a86718 --- /dev/null +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/RegexRouterTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet.utils.transform; + +import org.apache.camel.Exchange; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.support.DefaultExchange; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class RegexRouterTest { + + private DefaultCamelContext camelContext; + + private RegexRouter processor; + + private final String topic = "hello"; + + @BeforeEach + void setup() { + camelContext = new DefaultCamelContext(); + processor = new RegexRouter(); + } + + @Test + void shouldReplaceFieldToPlainJson() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setHeader(KafkaConstants.TOPIC, topic); + + processor.process(".*ll.*", "newTopic", exchange); + + Assertions.assertEquals("newTopic", exchange.getMessage().getHeader(KafkaConstants.OVERRIDE_TOPIC)); + } +} diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/ReplaceFieldTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/ReplaceFieldTest.java new file mode 100644 index 00000000000..2e6e4ebfcc4 --- /dev/null +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/ReplaceFieldTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet.utils.transform; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.support.DefaultExchange; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class ReplaceFieldTest { + + private DefaultCamelContext camelContext; + + private final ObjectMapper mapper = new ObjectMapper(); + + private ReplaceField processor; + + private final String baseJson = "{" + "\n" + + " \"name\" : \"Rajesh Koothrappali\"," + "\n" + + " \"age\" : \"29\"" + "\n" + + "}"; + + @BeforeEach + void setup() { + camelContext = new DefaultCamelContext(); + processor = new ReplaceField(); + } + + @Test + void shouldReplaceFieldToPlainJson() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree(baseJson)); + + JsonNode node = processor.process("all", "none", "name:firstName,age:years", exchange); + + Assertions.assertEquals(node.toString(), "{" + + "\"firstName\":\"Rajesh Koothrappali\"," + + "\"years\":\"29\"" + + "}"); + } + + @Test + void shouldReplaceFieldWithSpecificRename() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree(baseJson)); + + JsonNode node = processor.process("name,age", "none", "name:firstName", exchange); + + Assertions.assertEquals(node.toString(), "{" + + "\"firstName\":\"Rajesh Koothrappali\"," + + "\"age\":\"29\"" + + "}"); + } + + @Test + void shouldReplaceFieldWithSpecificRenameAndDisableFields() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree(baseJson)); + + JsonNode node = processor.process("name", "none", "name:firstName", exchange); + + Assertions.assertEquals(node.toString(), "{" + + "\"firstName\":\"Rajesh Koothrappali\"" + + "}"); + } + + @Test + void shouldReplaceFieldWithSpecificDisableFields() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree(baseJson)); + + JsonNode node = processor.process("all", "name,age", "name:firstName", exchange); + + Assertions.assertEquals(node.toString(), "{" + + "\"name\":\"Rajesh Koothrappali\"," + + "\"age\":\"29\"" + + "}"); + } + + @Test + void shouldReplaceFieldWithDisableAllFields() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree(baseJson)); + + JsonNode node = processor.process("none", "all", "name:firstName", exchange); + + Assertions.assertEquals(node.toString(), "{" + + "\"name\":\"Rajesh Koothrappali\"," + + "\"age\":\"29\"" + + "}"); + } +}