This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git

commit f71ca6e86b023a8c9ed417a2942898e28c0d2a24
Author: Christoph Deppisch <cdeppi...@redhat.com>
AuthorDate: Thu Nov 17 12:06:45 2022 +0100

    Refine Kamelet data type solution with review comments
    
    - Cache converter in DataTypeProcessor so lookup is only done once
    - Add lazy loading of component converters via resource path lookup 
(DataTypeConverterResolver)
    - Only load standard converters via annotation package scan
---
 kamelets/aws-ddb-sink.kamelet.yaml                 |  2 +
 kamelets/aws-s3-source.kamelet.yaml                |  2 +
 .../utils/format/AnnotationDataTypeLoader.java     | 41 ++++++++---
 .../kamelets/utils/format/DataTypeProcessor.java   | 27 +++++--
 .../format/DefaultDataTypeConverterResolver.java   | 83 ++++++++++++++++++++++
 .../utils/format/DefaultDataTypeRegistry.java      | 64 ++++++++---------
 .../format/spi/DataTypeConverterResolver.java      | 49 +++++++++++++
 .../apache/camel/{DataType => DataTypeConverter}   |  4 +-
 .../{DataType => datatype/converter/aws2-ddb-json} |  4 +-
 .../converter/aws2-s3-binary}                      |  4 +-
 .../{DataType => datatype/converter/aws2-s3-json}  |  4 +-
 ...a => DefaultDataTypeConverterResolverTest.java} | 42 +++++++----
 .../utils/format/DefaultDataTypeRegistryTest.java  |  7 +-
 .../camel/datatype/converter/camel-jsonObject}     |  4 +-
 .../org/apache/camel/datatype/converter/foo-json}  |  4 +-
 .../resources/kamelets/aws-ddb-sink.kamelet.yaml   |  2 +
 .../resources/kamelets/aws-s3-source.kamelet.yaml  |  2 +
 17 files changed, 260 insertions(+), 85 deletions(-)

diff --git a/kamelets/aws-ddb-sink.kamelet.yaml 
b/kamelets/aws-ddb-sink.kamelet.yaml
index ba200347..a4e7a114 100644
--- a/kamelets/aws-ddb-sink.kamelet.yaml
+++ b/kamelets/aws-ddb-sink.kamelet.yaml
@@ -123,6 +123,8 @@ spec:
           value: 'aws2-ddb'
         - key: format
           value: '{{inputFormat}}'
+        - key: registry
+          value: '{{dataTypeRegistry}}'
     from:
       uri: "kamelet:source"
       steps:
diff --git a/kamelets/aws-s3-source.kamelet.yaml 
b/kamelets/aws-s3-source.kamelet.yaml
index e09cf4aa..a63af7dc 100644
--- a/kamelets/aws-s3-source.kamelet.yaml
+++ b/kamelets/aws-s3-source.kamelet.yaml
@@ -129,6 +129,8 @@ spec:
             value: 'aws2-s3'
           - key: format
             value: '{{outputFormat}}'
+          - key: registry
+            value: '{{dataTypeRegistry}}'
       - name: renameHeaders
         type: 
"#class:org.apache.camel.kamelets.utils.headers.DuplicateNamingHeaders"
         property:
diff --git 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/AnnotationDataTypeLoader.java
 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/AnnotationDataTypeLoader.java
index 96ca50eb..9b37c377 100644
--- 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/AnnotationDataTypeLoader.java
+++ 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/AnnotationDataTypeLoader.java
@@ -26,12 +26,15 @@ import java.util.Enumeration;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.TypeConverterLoaderException;
+import org.apache.camel.impl.engine.DefaultPackageScanClassResolver;
 import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
 import org.apache.camel.kamelets.utils.format.spi.DataTypeLoader;
 import org.apache.camel.kamelets.utils.format.spi.DataTypeRegistry;
 import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
-import org.apache.camel.spi.Injector;
 import org.apache.camel.spi.PackageScanClassResolver;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -41,25 +44,31 @@ import org.slf4j.LoggerFactory;
 /**
  * Data type loader scans packages for {@link DataTypeConverter} classes 
annotated with {@link DataType} annotation.
  */
-public class AnnotationDataTypeLoader implements DataTypeLoader {
+public class AnnotationDataTypeLoader implements DataTypeLoader, 
CamelContextAware {
 
-    public static final String META_INF_SERVICES = 
"META-INF/services/org/apache/camel/DataType";
+    public static final String META_INF_SERVICES = 
"META-INF/services/org/apache/camel/DataTypeConverter";
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AnnotationDataTypeLoader.class);
 
-    protected final PackageScanClassResolver resolver;
-    protected final Injector injector;
+    private CamelContext camelContext;
+
+    protected PackageScanClassResolver resolver;
 
     protected Set<Class<?>> visitedClasses = new HashSet<>();
     protected Set<String> visitedURIs = new HashSet<>();
 
-    public AnnotationDataTypeLoader(Injector injector, 
PackageScanClassResolver resolver) {
-        this.injector = injector;
-        this.resolver = resolver;
-    }
-
     @Override
     public void load(DataTypeRegistry registry) {
+        ObjectHelper.notNull(camelContext, "camelContext");
+
+        if (resolver == null) {
+            if (camelContext instanceof ExtendedCamelContext) {
+                resolver = 
camelContext.adapt(ExtendedCamelContext.class).getPackageScanClassResolver();
+            } else {
+                resolver = new DefaultPackageScanClassResolver();
+            }
+        }
+
         Set<String> packages = new HashSet<>();
 
         LOG.trace("Searching for {} services", META_INF_SERVICES);
@@ -111,7 +120,7 @@ public class AnnotationDataTypeLoader implements 
DataTypeLoader {
         try {
             if (DataTypeConverter.class.isAssignableFrom(type) && 
type.isAnnotationPresent(DataType.class)) {
                 DataType dt = type.getAnnotation(DataType.class);
-                DataTypeConverter converter = (DataTypeConverter) 
injector.newInstance(type);
+                DataTypeConverter converter = (DataTypeConverter) 
camelContext.getInjector().newInstance(type);
                 registry.addDataTypeConverter(dt.scheme(), converter);
             }
         } catch (NoClassDefFoundError e) {
@@ -149,4 +158,14 @@ public class AnnotationDataTypeLoader implements 
DataTypeLoader {
             }
         }
     }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
 }
diff --git 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DataTypeProcessor.java
 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DataTypeProcessor.java
index 859269fe..81c58330 100644
--- 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DataTypeProcessor.java
+++ 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DataTypeProcessor.java
@@ -17,11 +17,13 @@
 
 package org.apache.camel.kamelets.utils.format;
 
-import org.apache.camel.BeanInject;
+import java.util.Optional;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
 
 /**
  * Processor applies data type conversion based on given format name. Searches 
for matching data type converter
@@ -31,20 +33,31 @@ public class DataTypeProcessor implements Processor, 
CamelContextAware {
 
     private CamelContext camelContext;
 
-    @BeanInject
-    private DefaultDataTypeRegistry dataTypeRegistry;
+    private DefaultDataTypeRegistry registry;
 
     private String scheme;
     private String format;
 
+    private DataTypeConverter converter;
+
     @Override
     public void process(Exchange exchange) throws Exception {
         if (format == null || format.isEmpty()) {
             return;
         }
 
-        dataTypeRegistry.lookup(scheme, format)
-                        .ifPresent(converter -> converter.convert(exchange));
+        doConverterLookup().ifPresent(converter -> 
converter.convert(exchange));
+    }
+
+    private Optional<DataTypeConverter> doConverterLookup() {
+        if (converter != null) {
+            return Optional.of(converter);
+        }
+
+        Optional<DataTypeConverter> maybeConverter = registry.lookup(scheme, 
format);
+        maybeConverter.ifPresent(dataTypeConverter -> this.converter = 
dataTypeConverter);
+
+        return maybeConverter;
     }
 
     public void setFormat(String format) {
@@ -55,6 +68,10 @@ public class DataTypeProcessor implements Processor, 
CamelContextAware {
         this.scheme = scheme;
     }
 
+    public void setRegistry(DefaultDataTypeRegistry dataTypeRegistry) {
+        this.registry = dataTypeRegistry;
+    }
+
     @Override
     public CamelContext getCamelContext() {
         return camelContext;
diff --git 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolver.java
 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolver.java
new file mode 100644
index 00000000..85444a28
--- /dev/null
+++ 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolver.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format;
+
+import java.util.Optional;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverterResolver;
+import org.apache.camel.spi.FactoryFinder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The default implementation of {@link DataTypeConverterResolver} which tries 
to find components by using the URI scheme prefix
+ * and searching for a file of the URI scheme name in the 
<b>META-INF/services/org/apache/camel/datatype/converter/</b> directory
+ * on the classpath.
+ */
+public class DefaultDataTypeConverterResolver implements 
DataTypeConverterResolver {
+
+    public static final String RESOURCE_PATH = 
"META-INF/services/org/apache/camel/datatype/converter/";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDataTypeConverterResolver.class);
+
+    private FactoryFinder factoryFinder;
+
+    @Override
+    public Optional<DataTypeConverter> resolve(String scheme, String name, 
CamelContext context) {
+        String converterName = String.format("%s-%s", scheme, name);
+        Class<?> type = findConverter(converterName, context);
+        if (type == null) {
+            // not found
+            return Optional.empty();
+        }
+
+        if (getLog().isDebugEnabled()) {
+            getLog().debug("Found data type converter: {} via type: {} via: 
{}{}", converterName,
+                    type.getName(), factoryFinder.getResourcePath(), 
converterName);
+        }
+
+        // create the converter instance
+        if (DataTypeConverter.class.isAssignableFrom(type)) {
+            try {
+                return Optional.of((DataTypeConverter) 
context.getInjector().newInstance(type));
+            } catch (NoClassDefFoundError e) {
+                LOG.debug("Ignoring converter type: {} as a dependent class 
could not be found: {}",
+                        type.getCanonicalName(), e, e);
+            }
+        } else {
+            throw new IllegalArgumentException("Type is not a 
DataTypeConverter implementation. Found: " + type.getName());
+        }
+
+        return Optional.empty();
+    }
+
+    private Class<?> findConverter(String name, CamelContext context) {
+        if (factoryFinder == null) {
+            factoryFinder = 
context.adapt(ExtendedCamelContext.class).getFactoryFinder(RESOURCE_PATH);
+        }
+        return factoryFinder.findClass(name).orElse(null);
+    }
+
+    protected Logger getLog() {
+        return LOG;
+    }
+
+}
diff --git 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistry.java
 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistry.java
index e7c6e3e8..7105fb4c 100644
--- 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistry.java
+++ 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistry.java
@@ -25,14 +25,11 @@ import java.util.Optional;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
-import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.impl.engine.DefaultInjector;
-import org.apache.camel.impl.engine.DefaultPackageScanClassResolver;
 import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverterResolver;
 import org.apache.camel.kamelets.utils.format.spi.DataTypeLoader;
 import org.apache.camel.kamelets.utils.format.spi.DataTypeRegistry;
-import org.apache.camel.spi.PackageScanClassResolver;
 import org.apache.camel.support.service.ServiceSupport;
 
 /**
@@ -46,10 +43,10 @@ public class DefaultDataTypeRegistry extends ServiceSupport 
implements DataTypeR
 
     private CamelContext camelContext;
 
-    private PackageScanClassResolver resolver;
-
     protected final List<DataTypeLoader> dataTypeLoaders = new ArrayList<>();
 
+    private DataTypeConverterResolver dataTypeConverterResolver;
+
     private final Map<String, List<DataTypeConverter>> dataTypeConverters = 
new HashMap<>();
 
     @Override
@@ -71,30 +68,19 @@ public class DefaultDataTypeRegistry extends ServiceSupport 
implements DataTypeR
             return Optional.empty();
         }
 
-        Optional<DataTypeConverter> componentDataTypeConverter = 
getComponentDataTypeConverters(scheme).stream()
-                .filter(dtc -> name.equals(dtc.getName()))
-                .findFirst();
-
-        if (componentDataTypeConverter.isPresent()) {
-            return componentDataTypeConverter;
+        Optional<DataTypeConverter> dataTypeConverter = 
getDataTypeConverter(scheme, name);
+        if (!dataTypeConverter.isPresent()) {
+            dataTypeConverter = getDataTypeConverter("camel", name);
         }
 
-        return getDefaultDataTypeConverter(name);
+        return dataTypeConverter;
     }
 
     @Override
     protected void doInit() throws Exception {
         super.doInit();
 
-        if (resolver == null) {
-            if (camelContext != null) {
-                resolver = 
camelContext.adapt(ExtendedCamelContext.class).getPackageScanClassResolver();
-            } else {
-                resolver = new DefaultPackageScanClassResolver();
-            }
-        }
-
-        dataTypeLoaders.add(new AnnotationDataTypeLoader(new 
DefaultInjector(camelContext), resolver));
+        dataTypeLoaders.add(new AnnotationDataTypeLoader());
 
         addDataTypeConverter(new DefaultDataTypeConverter("string", 
String.class));
         addDataTypeConverter(new DefaultDataTypeConverter("binary", 
byte[].class));
@@ -113,20 +99,36 @@ public class DefaultDataTypeRegistry extends 
ServiceSupport implements DataTypeR
     }
 
     /**
-     * Retrieve default data output type from Camel context for given format 
name.
+     * Retrieve data type converter for given scheme and format name. First 
checks for matching bean in Camel registry then
+     * tries to get from local cache or perform lazy lookup.
+     * @param scheme
      * @param name
      * @return
      */
-    private Optional<DataTypeConverter> getDefaultDataTypeConverter(String 
name) {
-        Optional<DataTypeConverter> dataTypeConverter = 
getComponentDataTypeConverters("camel").stream()
+    private Optional<DataTypeConverter> getDataTypeConverter(String scheme, 
String name) {
+        if (dataTypeConverterResolver == null) {
+             dataTypeConverterResolver = 
Optional.ofNullable(camelContext.getRegistry().findSingleByType(DataTypeConverterResolver.class))
+                     .orElseGet(DefaultDataTypeConverterResolver::new);
+        }
+
+        // Looking for matching beans in Camel registry first
+        Optional<DataTypeConverter> dataTypeConverter = 
Optional.ofNullable(camelContext.getRegistry()
+                .lookupByNameAndType(String.format("%s-%s", scheme, name), 
DataTypeConverter.class));
+
+        if (!dataTypeConverter.isPresent()) {
+            // Try to retrieve converter from preloaded converters in local 
cache
+            dataTypeConverter = getComponentDataTypeConverters(scheme).stream()
                 .filter(dtc -> name.equals(dtc.getName()))
                 .findFirst();
+        }
 
-        if (dataTypeConverter.isPresent()) {
-            return dataTypeConverter;
+        if (!dataTypeConverter.isPresent()) {
+            // Try to lazy load converter via resource path lookup
+            dataTypeConverter = dataTypeConverterResolver.resolve(scheme, 
name, camelContext);
+            dataTypeConverter.ifPresent(converter -> 
getComponentDataTypeConverters(scheme).add(converter));
         }
 
-        return 
Optional.ofNullable(camelContext.getRegistry().lookupByNameAndType(name, 
DataTypeConverter.class));
+        return dataTypeConverter;
     }
 
     /**
@@ -135,11 +137,7 @@ public class DefaultDataTypeRegistry extends 
ServiceSupport implements DataTypeR
      * @return
      */
     private List<DataTypeConverter> getComponentDataTypeConverters(String 
scheme) {
-        if (!dataTypeConverters.containsKey(scheme)) {
-            dataTypeConverters.put(scheme, new ArrayList<>());
-        }
-
-        return dataTypeConverters.get(scheme);
+        return dataTypeConverters.computeIfAbsent(scheme, (s) -> new 
ArrayList<>());
     }
 
     @Override
diff --git 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/DataTypeConverterResolver.java
 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/DataTypeConverterResolver.java
new file mode 100644
index 00000000..17c48664
--- /dev/null
+++ 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/DataTypeConverterResolver.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.spi;
+
+import java.util.Optional;
+
+import org.apache.camel.CamelContext;
+
+/**
+ * Represents a resolver of data type converters from a URI to be able to lazy 
load them using some discovery mechanism.
+ */
+@FunctionalInterface
+public interface DataTypeConverterResolver {
+
+    /**
+     * Attempts to resolve the converter for the given URI.
+     *
+     * @param scheme
+     * @param name
+     * @param camelContext
+     * @return
+     */
+    Optional<DataTypeConverter> resolve(String scheme, String name, 
CamelContext camelContext);
+
+    /**
+     * Attempts to resolve default converter for the given name.
+     * @param name
+     * @param camelContext
+     * @return
+     */
+    default Optional<DataTypeConverter> resolve(String name, CamelContext 
camelContext) {
+        return resolve("camel", name, camelContext);
+    }
+}
diff --git 
a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType
 
b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter
similarity index 81%
copy from 
library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType
copy to 
library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter
index b51d3404..adf4eb63 100644
--- 
a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType
+++ 
b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter
@@ -15,6 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.camel.kamelets.utils.format.converter.standard
-org.apache.camel.kamelets.utils.format.converter.aws2.ddb
-org.apache.camel.kamelets.utils.format.converter.aws2.s3
\ No newline at end of file
+org.apache.camel.kamelets.utils.format.converter.standard
\ No newline at end of file
diff --git 
a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType
 
b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-ddb-json
similarity index 81%
copy from 
library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType
copy to 
library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-ddb-json
index b51d3404..f0194cc4 100644
--- 
a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType
+++ 
b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-ddb-json
@@ -15,6 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.camel.kamelets.utils.format.converter.standard
-org.apache.camel.kamelets.utils.format.converter.aws2.ddb
-org.apache.camel.kamelets.utils.format.converter.aws2.s3
\ No newline at end of file
+class=org.apache.camel.kamelets.utils.format.converter.aws2.ddb.Ddb2JsonInputType
\ No newline at end of file
diff --git 
a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType
 
b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-s3-binary
similarity index 81%
copy from 
library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType
copy to 
library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-s3-binary
index b51d3404..ba9c13f3 100644
--- 
a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType
+++ 
b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-s3-binary
@@ -15,6 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.camel.kamelets.utils.format.converter.standard
-org.apache.camel.kamelets.utils.format.converter.aws2.ddb
-org.apache.camel.kamelets.utils.format.converter.aws2.s3
\ No newline at end of file
+class=org.apache.camel.kamelets.utils.format.converter.aws2.s3.AWS2S3BinaryOutputType
\ No newline at end of file
diff --git 
a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType
 
b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-s3-json
similarity index 81%
copy from 
library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType
copy to 
library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-s3-json
index b51d3404..7a7c544f 100644
--- 
a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType
+++ 
b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-s3-json
@@ -15,6 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.camel.kamelets.utils.format.converter.standard
-org.apache.camel.kamelets.utils.format.converter.aws2.ddb
-org.apache.camel.kamelets.utils.format.converter.aws2.s3
\ No newline at end of file
+class=org.apache.camel.kamelets.utils.format.converter.aws2.s3.AWS2S3JsonOutputType
\ No newline at end of file
diff --git 
a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java
 
b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolverTest.java
similarity index 57%
copy from 
library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java
copy to 
library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolverTest.java
index 2ee4113e..1972b047 100644
--- 
a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java
+++ 
b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolverTest.java
@@ -19,7 +19,7 @@ package org.apache.camel.kamelets.utils.format;
 
 import java.util.Optional;
 
-import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultCamelContext;
 import 
org.apache.camel.kamelets.utils.format.converter.standard.JsonModelDataType;
 import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
@@ -27,31 +27,47 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-class DefaultDataTypeRegistryTest {
+class DefaultDataTypeConverterResolverTest {
 
     private DefaultCamelContext camelContext;
 
-    private DefaultDataTypeRegistry dataTypeRegistry = new 
DefaultDataTypeRegistry();
+    private final DefaultDataTypeConverterResolver resolver = new 
DefaultDataTypeConverterResolver();
 
     @BeforeEach
     void setup() {
         this.camelContext = new DefaultCamelContext();
-        CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext);
     }
 
     @Test
-    public void shouldLookupDefaultDataTypeConverters() throws Exception {
-        Optional<DataTypeConverter> converter = dataTypeRegistry.lookup( 
"jsonObject");
+    public void shouldHandleUnresolvableDataTypeConverters() throws Exception {
+        Optional<DataTypeConverter> converter = resolver.resolve("unknown", 
camelContext);
+        Assertions.assertFalse(converter.isPresent());
+
+        converter = resolver.resolve("foo", "unknown", camelContext);
+        Assertions.assertFalse(converter.isPresent());
+    }
+
+    @Test
+    public void shouldResolveDataTypeConverters() throws Exception {
+        Optional<DataTypeConverter> converter = resolver.resolve("jsonObject", 
camelContext);
         Assertions.assertTrue(converter.isPresent());
         Assertions.assertEquals(JsonModelDataType.class, 
converter.get().getClass());
-        converter = dataTypeRegistry.lookup( "string");
-        Assertions.assertTrue(converter.isPresent());
-        Assertions.assertEquals(DefaultDataTypeConverter.class, 
converter.get().getClass());
-        Assertions.assertEquals(String.class, ((DefaultDataTypeConverter) 
converter.get()).getType());
-        converter = dataTypeRegistry.lookup( "binary");
+
+        converter = resolver.resolve("foo", "json", camelContext);
         Assertions.assertTrue(converter.isPresent());
-        Assertions.assertEquals(DefaultDataTypeConverter.class, 
converter.get().getClass());
-        Assertions.assertEquals(byte[].class, ((DefaultDataTypeConverter) 
converter.get()).getType());
+        Assertions.assertEquals(FooConverter.class, 
converter.get().getClass());
     }
 
+    public static class FooConverter implements DataTypeConverter {
+
+        @Override
+        public void convert(Exchange exchange) {
+            exchange.getMessage().setBody("Foo");
+        }
+
+        @Override
+        public String getName() {
+            return "foo";
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java
 
b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java
index 2ee4113e..e077b369 100644
--- 
a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java
+++ 
b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java
@@ -29,14 +29,11 @@ import org.junit.jupiter.api.Test;
 
 class DefaultDataTypeRegistryTest {
 
-    private DefaultCamelContext camelContext;
-
-    private DefaultDataTypeRegistry dataTypeRegistry = new 
DefaultDataTypeRegistry();
+    private final DefaultDataTypeRegistry dataTypeRegistry = new 
DefaultDataTypeRegistry();
 
     @BeforeEach
     void setup() {
-        this.camelContext = new DefaultCamelContext();
-        CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext);
+        CamelContextAware.trySetCamelContext(dataTypeRegistry, new 
DefaultCamelContext());
     }
 
     @Test
diff --git 
a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType
 
b/library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/datatype/converter/camel-jsonObject
similarity index 81%
copy from 
library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType
copy to 
library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/datatype/converter/camel-jsonObject
index b51d3404..2f725f6a 100644
--- 
a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType
+++ 
b/library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/datatype/converter/camel-jsonObject
@@ -15,6 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.camel.kamelets.utils.format.converter.standard
-org.apache.camel.kamelets.utils.format.converter.aws2.ddb
-org.apache.camel.kamelets.utils.format.converter.aws2.s3
\ No newline at end of file
+class=org.apache.camel.kamelets.utils.format.converter.standard.JsonModelDataType
\ No newline at end of file
diff --git 
a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType
 
b/library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/datatype/converter/foo-json
similarity index 81%
rename from 
library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType
rename to 
library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/datatype/converter/foo-json
index b51d3404..ca7eaa02 100644
--- 
a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType
+++ 
b/library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/datatype/converter/foo-json
@@ -15,6 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.camel.kamelets.utils.format.converter.standard
-org.apache.camel.kamelets.utils.format.converter.aws2.ddb
-org.apache.camel.kamelets.utils.format.converter.aws2.s3
\ No newline at end of file
+class=org.apache.camel.kamelets.utils.format.DefaultDataTypeConverterResolverTest$FooConverter
\ No newline at end of file
diff --git 
a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml 
b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml
index ba200347..a4e7a114 100644
--- 
a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml
+++ 
b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml
@@ -123,6 +123,8 @@ spec:
           value: 'aws2-ddb'
         - key: format
           value: '{{inputFormat}}'
+        - key: registry
+          value: '{{dataTypeRegistry}}'
     from:
       uri: "kamelet:source"
       steps:
diff --git 
a/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml 
b/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml
index e09cf4aa..a63af7dc 100644
--- 
a/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml
+++ 
b/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml
@@ -129,6 +129,8 @@ spec:
             value: 'aws2-s3'
           - key: format
             value: '{{outputFormat}}'
+          - key: registry
+            value: '{{dataTypeRegistry}}'
       - name: renameHeaders
         type: 
"#class:org.apache.camel.kamelets.utils.headers.DuplicateNamingHeaders"
         property:

Reply via email to