mjsax commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r645299252
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
##########
@@ -40,7 +40,10 @@ public ChangedSerializer(final Serializer<T> inner) {
@Override
public void setIfUnset(final Serializer<Void> defaultKeySerializer, final
Serializer<T> defaultValueSerializer) {
if (inner == null) {
- inner = Objects.requireNonNull(defaultValueSerializer);
+ if (defaultValueSerializer == null) {
+ throw new ConfigException("Please specify a value serde or set
one through the default.value.serde config");
Review comment:
as above
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
##########
@@ -39,7 +39,10 @@ public ChangedDeserializer(final Deserializer<T> inner) {
@Override
public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer,
final Deserializer<T> defaultValueDeserializer) {
if (inner == null) {
- inner = Objects.requireNonNull(defaultValueDeserializer);
+ if (defaultValueDeserializer == null) {
+ throw new ConfigException("Please specify a value serde or set
one through the default.value.serde config");
Review comment:
nit: use `StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG`
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
##########
@@ -83,9 +84,15 @@ public void init(final ProcessorContext context) {
valueSerdeTopic = valueSerdeTopicSupplier.get();
// get default key serde if it wasn't supplied directly at
construction
if (foreignKeySerializer == null) {
+ if (context.keySerde() == null ||
context.keySerde().serializer() == null) {
+ throw new ConfigException("Please specify a key serde or
set one through the default.key.serde config");
+ }
foreignKeySerializer = (Serializer<KO>)
context.keySerde().serializer();
}
if (valueSerializer == null) {
+ if (context.valueSerde() == null ||
context.valueSerde().serializer() == null) {
Review comment:
as above
##########
File path: streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
##########
@@ -767,10 +767,9 @@ public void shouldUseNewConfigsWhenPresent() {
}
@Test
- public void shouldUseCorrectDefaultsWhenNoneSpecified() {
- final StreamsConfig config = new StreamsConfig(getStreamsConfig());
- assertTrue(config.defaultKeySerde() instanceof Serdes.ByteArraySerde);
- assertTrue(config.defaultValueSerde() instanceof
Serdes.ByteArraySerde);
Review comment:
Instead of removing those, should we replace them with `assertThrows` ?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
##########
@@ -83,9 +84,15 @@ public void init(final ProcessorContext context) {
valueSerdeTopic = valueSerdeTopicSupplier.get();
// get default key serde if it wasn't supplied directly at
construction
if (foreignKeySerializer == null) {
+ if (context.keySerde() == null ||
context.keySerde().serializer() == null) {
Review comment:
I don't think that a `Serde` should ever wrap `null` ? Can we omit the
second check?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
##########
@@ -115,7 +118,10 @@ public void setIfUnset(final Serializer<K>
defaultKeySerializer, final Serialize
@Override
public void setIfUnset(final Deserializer<K> defaultKeyDeserializer,
final Deserializer<Void> defaultValueDeserializer) {
if (primaryKeyDeserializer == null) {
- primaryKeyDeserializer =
Objects.requireNonNull(defaultKeyDeserializer);
+ if (defaultKeyDeserializer == null) {
+ throw new ConfigException("Please specify a key serde or
set one through the default.key.serde config");
Review comment:
as above
##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1384,8 +1383,12 @@ public Serde defaultKeySerde() {
serde.configure(originals(), true);
return serde;
} catch (final Exception e) {
- throw new StreamsException(
- String.format("Failed to configure key serde %s",
keySerdeConfigSetting), e);
+ if (e instanceof NullPointerException) {
Review comment:
I think it would be cleaner to check if `DEFAULT_KEY_SERDE_CLASS_CONFIG`
is set or not (ie, `keySerdeConfigSetting == null`), before calling
`getConfiguredInstance()` and just return `null` if it's not set, instead of
handling a NPE
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
##########
@@ -105,7 +108,10 @@ private SubscriptionResponseWrapperDeserializer(final
Deserializer<V> deserializ
@Override
public void setIfUnset(final Deserializer<Void>
defaultKeyDeserializer, final Deserializer<V> defaultValueDeserializer) {
if (deserializer == null) {
- deserializer =
Objects.requireNonNull(defaultValueDeserializer);
+ if (defaultValueDeserializer == null) {
+ throw new ConfigException("Please specify a value serde or
set one through the default.value.serde config");
Review comment:
as above
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
##########
@@ -37,10 +38,16 @@
return deserializerToUse;
}
@SuppressWarnings("unchecked")
- private static <T> Serializer<T> prepareSerializer(final Serializer<T>
specificSerializer, final Serializer<?> contextKeySerializer, final
Serializer<?> contextValueSerializer, final boolean isKey) {
- Serializer<T> serializerToUse = specificSerializer;
- if (serializerToUse == null) {
+ private static <T> Serializer<T> prepareSerializer(final Serializer<T>
specificSerializer, final Serializer<?> contextKeySerializer, final
Serializer<?> contextValueSerializer, final boolean isKey, final String name) {
+ final Serializer<T> serializerToUse;
+ if (specificSerializer == null) {
serializerToUse = (Serializer<T>) (isKey ? contextKeySerializer :
contextValueSerializer);
+ } else {
+ serializerToUse = specificSerializer;
+ }
+ if (serializerToUse == null) {
+ final String serde = isKey ? "key" : "value";
+ throw new ConfigException("Please specify a " + serde + " serde
through produced or materialized, or set one through the default." + serde +
".serde config for node " + name);
Review comment:
as above. use `StreamsConfig#...`
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
##########
@@ -49,9 +56,15 @@
@SuppressWarnings({"rawtypes", "unchecked"})
private static <T> Serde<T> prepareSerde(final Serde<T> specificSerde,
final Serde<?> contextKeySerde, final Serde<?> contextValueSerde, final boolean
isKey) {
- Serde<T> serdeToUse = specificSerde;
- if (serdeToUse == null) {
+ final Serde<T> serdeToUse;
+ if (specificSerde == null) {
serdeToUse = (Serde<T>) (isKey ? contextKeySerde :
contextValueSerde);
+ } else {
+ serdeToUse = specificSerde;
+ }
+ if (serdeToUse == null) {
+ final String serde = isKey ? "key" : "value";
+ throw new ConfigException("Please specify a " + serde + " serde or
set one through the default." + serde + ".serde config");
Review comment:
as above
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
##########
@@ -57,7 +57,10 @@ private SubscriptionResponseWrapperSerializer(final
Serializer<V> serializer) {
@Override
public void setIfUnset(final Serializer<Void> defaultKeySerializer,
final Serializer<V> defaultValueSerializer) {
if (serializer == null) {
- serializer = Objects.requireNonNull(defaultValueSerializer);
+ if (defaultValueSerializer == null) {
+ throw new ConfigException("Please specify a value serde or
set one through the default.value.serde config");
Review comment:
as above
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
##########
@@ -78,6 +79,10 @@
throw new StreamsException("Fatal user code error in
deserialization error callback", fatalUserException);
}
+ if (deserializationException instanceof ConfigException) {
Review comment:
Should this check be done before we call
`deserializationExceptionHandler.handle` ?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
##########
@@ -55,7 +55,10 @@ public SubscriptionWrapperSerde(final Supplier<String>
primaryKeySerializationPs
@Override
public void setIfUnset(final Serializer<K> defaultKeySerializer, final
Serializer<Void> defaultValueSerializer) {
if (primaryKeySerializer == null) {
- primaryKeySerializer =
Objects.requireNonNull(defaultKeySerializer);
+ if (defaultKeySerializer == null) {
+ throw new ConfigException("Please specify a key serde or
set one through the default.key.serde config");
Review comment:
as above
##########
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
##########
@@ -160,8 +160,8 @@ private void testMetrics(final String
builtInMetricsVersion) {
}
@Test
- public void testTopologyLevelClassCastException() {
- // Serdes configuration is missing (default will be used which don't
match the DSL below), which will trigger the new exception
+ public void testTopologyLevelConfigException() {
Review comment:
It seems this test should verify the error for miss-configures (but
existing serdes).
Thus, it seems we should set byte-array serdes on the config but not modify
the test by any other means?
##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1403,8 +1406,12 @@ public Serde defaultValueSerde() {
serde.configure(originals(), false);
return serde;
} catch (final Exception e) {
- throw new StreamsException(
- String.format("Failed to configure value serde %s",
valueSerdeConfigSetting), e);
+ if (e instanceof NullPointerException) {
Review comment:
as above.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
##########
@@ -51,10 +52,16 @@ public SourceNode(final String name,
}
KIn deserializeKey(final String topic, final Headers headers, final byte[]
data) {
+ if (keyDeserializer == null) {
Review comment:
In which case can `keyDeserializer` be `null` here?
Should we not better ensure that it's never `null` to begin with? Have the
check in `init()` method?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]