mjsax commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r645299252



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
##########
@@ -40,7 +40,10 @@ public ChangedSerializer(final Serializer<T> inner) {
     @Override
     public void setIfUnset(final Serializer<Void> defaultKeySerializer, final 
Serializer<T> defaultValueSerializer) {
         if (inner == null) {
-            inner = Objects.requireNonNull(defaultValueSerializer);
+            if (defaultValueSerializer == null) {
+                throw new ConfigException("Please specify a value serde or set 
one through the default.value.serde config");

Review comment:
       as above

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
##########
@@ -39,7 +39,10 @@ public ChangedDeserializer(final Deserializer<T> inner) {
     @Override
     public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer, 
final Deserializer<T> defaultValueDeserializer) {
         if (inner == null) {
-            inner = Objects.requireNonNull(defaultValueDeserializer);
+            if (defaultValueDeserializer == null) {
+                throw new ConfigException("Please specify a value serde or set 
one through the default.value.serde config");

Review comment:
       nit: use `StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG` 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
##########
@@ -83,9 +84,15 @@ public void init(final ProcessorContext context) {
             valueSerdeTopic = valueSerdeTopicSupplier.get();
             // get default key serde if it wasn't supplied directly at 
construction
             if (foreignKeySerializer == null) {
+                if (context.keySerde() == null || 
context.keySerde().serializer() == null) {
+                    throw new ConfigException("Please specify a key serde or 
set one through the default.key.serde config");
+                }
                 foreignKeySerializer = (Serializer<KO>) 
context.keySerde().serializer();
             }
             if (valueSerializer == null) {
+                if (context.valueSerde() == null || 
context.valueSerde().serializer() == null) {

Review comment:
       as above

##########
File path: streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
##########
@@ -767,10 +767,9 @@ public void shouldUseNewConfigsWhenPresent() {
     }
 
     @Test
-    public void shouldUseCorrectDefaultsWhenNoneSpecified() {
-        final StreamsConfig config = new StreamsConfig(getStreamsConfig());
-        assertTrue(config.defaultKeySerde() instanceof Serdes.ByteArraySerde);
-        assertTrue(config.defaultValueSerde() instanceof 
Serdes.ByteArraySerde);

Review comment:
       Instead of removing those, should we replace them with `assertThrows` ?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
##########
@@ -83,9 +84,15 @@ public void init(final ProcessorContext context) {
             valueSerdeTopic = valueSerdeTopicSupplier.get();
             // get default key serde if it wasn't supplied directly at 
construction
             if (foreignKeySerializer == null) {
+                if (context.keySerde() == null || 
context.keySerde().serializer() == null) {

Review comment:
       I don't think that a `Serde` should ever wrap `null` ? Can we omit the 
second check?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
##########
@@ -115,7 +118,10 @@ public void setIfUnset(final Serializer<K> 
defaultKeySerializer, final Serialize
         @Override
         public void setIfUnset(final Deserializer<K> defaultKeyDeserializer, 
final Deserializer<Void> defaultValueDeserializer) {
             if (primaryKeyDeserializer == null) {
-                primaryKeyDeserializer = 
Objects.requireNonNull(defaultKeyDeserializer);
+                if (defaultKeyDeserializer == null) {
+                    throw new ConfigException("Please specify a key serde or 
set one through the default.key.serde config");

Review comment:
       as above

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1384,8 +1383,12 @@ public Serde defaultKeySerde() {
             serde.configure(originals(), true);
             return serde;
         } catch (final Exception e) {
-            throw new StreamsException(
-                String.format("Failed to configure key serde %s", 
keySerdeConfigSetting), e);
+            if (e instanceof NullPointerException) {

Review comment:
       I think it would be cleaner to check if `DEFAULT_KEY_SERDE_CLASS_CONFIG` 
is set or not (ie, `keySerdeConfigSetting == null`), before calling 
`getConfiguredInstance()` and just return `null` if it's not set, instead of 
handling a NPE

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
##########
@@ -105,7 +108,10 @@ private SubscriptionResponseWrapperDeserializer(final 
Deserializer<V> deserializ
         @Override
         public void setIfUnset(final Deserializer<Void> 
defaultKeyDeserializer, final Deserializer<V> defaultValueDeserializer) {
             if (deserializer == null) {
-                deserializer = 
Objects.requireNonNull(defaultValueDeserializer);
+                if (defaultValueDeserializer == null) {
+                    throw new ConfigException("Please specify a value serde or 
set one through the default.value.serde config");

Review comment:
       as above

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
##########
@@ -37,10 +38,16 @@
         return deserializerToUse;
     }
     @SuppressWarnings("unchecked")
-    private static <T> Serializer<T> prepareSerializer(final Serializer<T> 
specificSerializer, final Serializer<?> contextKeySerializer, final 
Serializer<?> contextValueSerializer, final boolean isKey) {
-        Serializer<T> serializerToUse = specificSerializer;
-        if (serializerToUse == null) {
+    private static <T> Serializer<T> prepareSerializer(final Serializer<T> 
specificSerializer, final Serializer<?> contextKeySerializer, final 
Serializer<?> contextValueSerializer, final boolean isKey, final String name) {
+        final Serializer<T> serializerToUse;
+        if (specificSerializer == null) {
             serializerToUse = (Serializer<T>) (isKey ? contextKeySerializer : 
contextValueSerializer);
+        } else {
+            serializerToUse = specificSerializer;
+        }
+        if (serializerToUse == null) {
+            final String serde = isKey ? "key" : "value";
+            throw new ConfigException("Please specify a " + serde + " serde 
through produced or materialized, or set one through the default." + serde + 
".serde config for node " + name);

Review comment:
       as above. use `StreamsConfig#...`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
##########
@@ -49,9 +56,15 @@
 
     @SuppressWarnings({"rawtypes", "unchecked"})
     private static <T> Serde<T> prepareSerde(final Serde<T> specificSerde, 
final Serde<?> contextKeySerde, final Serde<?> contextValueSerde, final boolean 
isKey) {
-        Serde<T> serdeToUse = specificSerde;
-        if (serdeToUse == null) {
+        final Serde<T> serdeToUse;
+        if (specificSerde == null) {
             serdeToUse = (Serde<T>) (isKey ?  contextKeySerde : 
contextValueSerde);
+        } else {
+            serdeToUse = specificSerde;
+        }
+        if (serdeToUse == null) {
+            final String serde = isKey ? "key" : "value";
+            throw new ConfigException("Please specify a " + serde + " serde or 
set one through the default." + serde + ".serde config");

Review comment:
       as above

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
##########
@@ -57,7 +57,10 @@ private SubscriptionResponseWrapperSerializer(final 
Serializer<V> serializer) {
         @Override
         public void setIfUnset(final Serializer<Void> defaultKeySerializer, 
final Serializer<V> defaultValueSerializer) {
             if (serializer == null) {
-                serializer = Objects.requireNonNull(defaultValueSerializer);
+                if (defaultValueSerializer == null) {
+                    throw new ConfigException("Please specify a value serde or 
set one through the default.value.serde config");

Review comment:
       as above

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
##########
@@ -78,6 +79,10 @@
                 throw new StreamsException("Fatal user code error in 
deserialization error callback", fatalUserException);
             }
 
+            if (deserializationException instanceof ConfigException) {

Review comment:
       Should this check be done before we call 
`deserializationExceptionHandler.handle` ?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
##########
@@ -55,7 +55,10 @@ public SubscriptionWrapperSerde(final Supplier<String> 
primaryKeySerializationPs
         @Override
         public void setIfUnset(final Serializer<K> defaultKeySerializer, final 
Serializer<Void> defaultValueSerializer) {
             if (primaryKeySerializer == null) {
-                primaryKeySerializer = 
Objects.requireNonNull(defaultKeySerializer);
+                if (defaultKeySerializer == null) {
+                    throw new ConfigException("Please specify a key serde or 
set one through the default.key.serde config");

Review comment:
       as above

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
##########
@@ -160,8 +160,8 @@ private void testMetrics(final String 
builtInMetricsVersion) {
     }
 
     @Test
-    public void testTopologyLevelClassCastException() {
-        // Serdes configuration is missing (default will be used which don't 
match the DSL below), which will trigger the new exception
+    public void testTopologyLevelConfigException() {

Review comment:
       It seems this test should verify the error for miss-configures (but 
existing serdes).
   
   Thus, it seems we should set byte-array serdes on the config but not modify 
the test by any other means?

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1403,8 +1406,12 @@ public Serde defaultValueSerde() {
             serde.configure(originals(), false);
             return serde;
         } catch (final Exception e) {
-            throw new StreamsException(
-                String.format("Failed to configure value serde %s", 
valueSerdeConfigSetting), e);
+            if (e instanceof NullPointerException) {

Review comment:
       as above.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
##########
@@ -51,10 +52,16 @@ public SourceNode(final String name,
     }
 
     KIn deserializeKey(final String topic, final Headers headers, final byte[] 
data) {
+        if (keyDeserializer == null) {

Review comment:
       In which case can `keyDeserializer` be `null` here?
   
   Should we not better ensure that it's never `null` to begin with? Have the 
check in `init()` method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to