mjsax commented on code in PR #20063:
URL: https://github.com/apache/kafka/pull/20063#discussion_r2241346902
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -461,120 +461,125 @@ public Set<String> makeReady(final Map<String,
InternalTopicConfig> topics) {
// have existed with the expected number of partitions, or some create
topic returns fatal errors.
log.debug("Starting to validate internal topics {} in partition
assignor.", topics);
- long currentWallClockMs = time.milliseconds();
+ final long currentWallClockMs = time.milliseconds();
final long deadlineMs = currentWallClockMs + retryTimeoutMs;
- Set<String> topicsNotReady = new HashSet<>(topics.keySet());
+ final Set<String> topicsNotReady = new HashSet<>(topics.keySet());
final Set<String> newlyCreatedTopics = new HashSet<>();
while (!topicsNotReady.isEmpty()) {
- final Set<String> tempUnknownTopics = new HashSet<>();
- topicsNotReady = validateTopics(topicsNotReady, topics,
tempUnknownTopics);
- newlyCreatedTopics.addAll(topicsNotReady);
-
+ final Set<NewTopic> validatedTopicObjects =
createValidatedTopicObjects(topics, topicsNotReady, newlyCreatedTopics);
+ if (!validatedTopicObjects.isEmpty()) {
+ setupValidatedTopics(validatedTopicObjects, topicsNotReady);
+ }
if (!topicsNotReady.isEmpty()) {
- final Set<NewTopic> newTopics = new HashSet<>();
+ maybeThrowTimeoutExceptionDuringMakeReady(deadlineMs);
+ }
+ }
+ log.debug("Completed validating internal topics and created {}",
newlyCreatedTopics);
- for (final String topicName : topicsNotReady) {
- if (tempUnknownTopics.contains(topicName)) {
- // for the tempUnknownTopics, don't create topic for
them
- // we'll check again later if remaining retries > 0
- continue;
- }
- final InternalTopicConfig internalTopicConfig =
Objects.requireNonNull(topics.get(topicName));
- final Map<String, String> topicConfig =
internalTopicConfig.properties(defaultTopicConfigs,
windowChangeLogAdditionalRetention);
+ return newlyCreatedTopics;
+ }
+
+ private Set<NewTopic> createValidatedTopicObjects(final Map<String,
InternalTopicConfig> topics,
+ Set<String>
topicsNotReady,
+ final Set<String>
newlyCreatedTopics) {
+ final Set<String> tempUnknownTopics = new HashSet<>();
+
+ topicsNotReady = validateTopics(topicsNotReady, topics,
tempUnknownTopics);
Review Comment:
It does not sound right to overwrite input parameter `topicsNotReady` -- in
the original code, this was ok as it was the same variable, but now that we
copy a reference when calling `createValidatedTopicObjects` this change does
not reflect back to the caller.
If we really need to modify the content of `topicsNotReady`, we need to
apply corresponding set operations to modify the collection.
But I am not sure if the overall code structure is ideal anyway -- it might
be better to also change `validateTopics` and only return topic we want to
create? For this case, we would introduce a new variable here `final
topicToBeCreated = ...`.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -461,120 +461,125 @@ public Set<String> makeReady(final Map<String,
InternalTopicConfig> topics) {
// have existed with the expected number of partitions, or some create
topic returns fatal errors.
log.debug("Starting to validate internal topics {} in partition
assignor.", topics);
- long currentWallClockMs = time.milliseconds();
+ final long currentWallClockMs = time.milliseconds();
final long deadlineMs = currentWallClockMs + retryTimeoutMs;
- Set<String> topicsNotReady = new HashSet<>(topics.keySet());
+ final Set<String> topicsNotReady = new HashSet<>(topics.keySet());
final Set<String> newlyCreatedTopics = new HashSet<>();
while (!topicsNotReady.isEmpty()) {
- final Set<String> tempUnknownTopics = new HashSet<>();
- topicsNotReady = validateTopics(topicsNotReady, topics,
tempUnknownTopics);
- newlyCreatedTopics.addAll(topicsNotReady);
-
+ final Set<NewTopic> validatedTopicObjects =
createValidatedTopicObjects(topics, topicsNotReady, newlyCreatedTopics);
+ if (!validatedTopicObjects.isEmpty()) {
+ setupValidatedTopics(validatedTopicObjects, topicsNotReady);
+ }
if (!topicsNotReady.isEmpty()) {
- final Set<NewTopic> newTopics = new HashSet<>();
+ maybeThrowTimeoutExceptionDuringMakeReady(deadlineMs);
+ }
+ }
+ log.debug("Completed validating internal topics and created {}",
newlyCreatedTopics);
- for (final String topicName : topicsNotReady) {
- if (tempUnknownTopics.contains(topicName)) {
- // for the tempUnknownTopics, don't create topic for
them
- // we'll check again later if remaining retries > 0
- continue;
- }
- final InternalTopicConfig internalTopicConfig =
Objects.requireNonNull(topics.get(topicName));
- final Map<String, String> topicConfig =
internalTopicConfig.properties(defaultTopicConfigs,
windowChangeLogAdditionalRetention);
+ return newlyCreatedTopics;
+ }
+
+ private Set<NewTopic> createValidatedTopicObjects(final Map<String,
InternalTopicConfig> topics,
+ Set<String>
topicsNotReady,
+ final Set<String>
newlyCreatedTopics) {
+ final Set<String> tempUnknownTopics = new HashSet<>();
+
+ topicsNotReady = validateTopics(topicsNotReady, topics,
tempUnknownTopics);
+ newlyCreatedTopics.addAll(topicsNotReady);
+
+ final Set<NewTopic> validatedTopicObjects = new HashSet<>();
+
+ for (final String topicName : topicsNotReady) {
+ if (tempUnknownTopics.contains(topicName)) {
Review Comment:
If we change `validateTopics` (cf other comment above) to only return topic
we want to create, we won't need this check.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -461,120 +461,125 @@ public Set<String> makeReady(final Map<String,
InternalTopicConfig> topics) {
// have existed with the expected number of partitions, or some create
topic returns fatal errors.
log.debug("Starting to validate internal topics {} in partition
assignor.", topics);
- long currentWallClockMs = time.milliseconds();
+ final long currentWallClockMs = time.milliseconds();
final long deadlineMs = currentWallClockMs + retryTimeoutMs;
- Set<String> topicsNotReady = new HashSet<>(topics.keySet());
+ final Set<String> topicsNotReady = new HashSet<>(topics.keySet());
final Set<String> newlyCreatedTopics = new HashSet<>();
while (!topicsNotReady.isEmpty()) {
- final Set<String> tempUnknownTopics = new HashSet<>();
- topicsNotReady = validateTopics(topicsNotReady, topics,
tempUnknownTopics);
- newlyCreatedTopics.addAll(topicsNotReady);
-
+ final Set<NewTopic> validatedTopicObjects =
createValidatedTopicObjects(topics, topicsNotReady, newlyCreatedTopics);
+ if (!validatedTopicObjects.isEmpty()) {
+ setupValidatedTopics(validatedTopicObjects, topicsNotReady);
Review Comment:
Rename `setupValidatedTopics` -> `createTopics` ?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -461,120 +461,125 @@ public Set<String> makeReady(final Map<String,
InternalTopicConfig> topics) {
// have existed with the expected number of partitions, or some create
topic returns fatal errors.
log.debug("Starting to validate internal topics {} in partition
assignor.", topics);
- long currentWallClockMs = time.milliseconds();
+ final long currentWallClockMs = time.milliseconds();
final long deadlineMs = currentWallClockMs + retryTimeoutMs;
- Set<String> topicsNotReady = new HashSet<>(topics.keySet());
+ final Set<String> topicsNotReady = new HashSet<>(topics.keySet());
final Set<String> newlyCreatedTopics = new HashSet<>();
while (!topicsNotReady.isEmpty()) {
- final Set<String> tempUnknownTopics = new HashSet<>();
- topicsNotReady = validateTopics(topicsNotReady, topics,
tempUnknownTopics);
- newlyCreatedTopics.addAll(topicsNotReady);
-
+ final Set<NewTopic> validatedTopicObjects =
createValidatedTopicObjects(topics, topicsNotReady, newlyCreatedTopics);
Review Comment:
`validatedTopicObjects` are the topics we want to create, right? Not sure
what "validated" mean? Why not rename to `topicsToBeCreated` ?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -113,7 +113,7 @@ public InternalTopicManager(final Time time,
}
}
- static class ValidationResult {
+ public static class ValidationResult {
Review Comment:
Why does this class need to become public?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -461,120 +461,125 @@ public Set<String> makeReady(final Map<String,
InternalTopicConfig> topics) {
// have existed with the expected number of partitions, or some create
topic returns fatal errors.
log.debug("Starting to validate internal topics {} in partition
assignor.", topics);
- long currentWallClockMs = time.milliseconds();
+ final long currentWallClockMs = time.milliseconds();
final long deadlineMs = currentWallClockMs + retryTimeoutMs;
- Set<String> topicsNotReady = new HashSet<>(topics.keySet());
+ final Set<String> topicsNotReady = new HashSet<>(topics.keySet());
final Set<String> newlyCreatedTopics = new HashSet<>();
while (!topicsNotReady.isEmpty()) {
- final Set<String> tempUnknownTopics = new HashSet<>();
- topicsNotReady = validateTopics(topicsNotReady, topics,
tempUnknownTopics);
- newlyCreatedTopics.addAll(topicsNotReady);
-
+ final Set<NewTopic> validatedTopicObjects =
createValidatedTopicObjects(topics, topicsNotReady, newlyCreatedTopics);
+ if (!validatedTopicObjects.isEmpty()) {
+ setupValidatedTopics(validatedTopicObjects, topicsNotReady);
+ }
if (!topicsNotReady.isEmpty()) {
- final Set<NewTopic> newTopics = new HashSet<>();
+ maybeThrowTimeoutExceptionDuringMakeReady(deadlineMs);
+ }
+ }
+ log.debug("Completed validating internal topics and created {}",
newlyCreatedTopics);
- for (final String topicName : topicsNotReady) {
- if (tempUnknownTopics.contains(topicName)) {
- // for the tempUnknownTopics, don't create topic for
them
- // we'll check again later if remaining retries > 0
- continue;
- }
- final InternalTopicConfig internalTopicConfig =
Objects.requireNonNull(topics.get(topicName));
- final Map<String, String> topicConfig =
internalTopicConfig.properties(defaultTopicConfigs,
windowChangeLogAdditionalRetention);
+ return newlyCreatedTopics;
+ }
+
+ private Set<NewTopic> createValidatedTopicObjects(final Map<String,
InternalTopicConfig> topics,
+ Set<String>
topicsNotReady,
+ final Set<String>
newlyCreatedTopics) {
+ final Set<String> tempUnknownTopics = new HashSet<>();
+
+ topicsNotReady = validateTopics(topicsNotReady, topics,
tempUnknownTopics);
+ newlyCreatedTopics.addAll(topicsNotReady);
+
+ final Set<NewTopic> validatedTopicObjects = new HashSet<>();
+
+ for (final String topicName : topicsNotReady) {
+ if (tempUnknownTopics.contains(topicName)) {
+ // for the tempUnknownTopics, don't create topic for them
+ // we'll check again later if remaining retries > 0
+ continue;
+ }
+ final InternalTopicConfig internalTopicConfig =
Objects.requireNonNull(topics.get(topicName));
+ final Map<String, String> topicConfig =
internalTopicConfig.properties(defaultTopicConfigs,
windowChangeLogAdditionalRetention);
- log.debug("Going to create topic {} with {} partitions and
config {}.",
- internalTopicConfig.name(),
- internalTopicConfig.numberOfPartitions(),
- topicConfig);
+ log.debug("Going to create topic {} with {} partitions and config
{}.",
+ internalTopicConfig.name(),
+ internalTopicConfig.numberOfPartitions(),
+ topicConfig);
- newTopics.add(
- new NewTopic(
+ validatedTopicObjects.add(
+ new NewTopic(
internalTopicConfig.name(),
internalTopicConfig.numberOfPartitions(),
Optional.of(replicationFactor))
.configs(topicConfig));
- }
+ }
+ return validatedTopicObjects;
+ }
- // it's possible that although some topics are not ready yet
because they
- // are temporarily not available, not that they do not exist;
in this case
- // the new topics to create may be empty and hence we can skip
here
- if (!newTopics.isEmpty()) {
- final CreateTopicsResult createTopicsResult =
adminClient.createTopics(newTopics);
+ private void setupValidatedTopics(final Set<NewTopic>
validatedTopicObjects,
Review Comment:
Not sure why we use "validated" in so many name? This method does create new
topic, so why no just call it `createTopics` ?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -461,120 +461,125 @@ public Set<String> makeReady(final Map<String,
InternalTopicConfig> topics) {
// have existed with the expected number of partitions, or some create
topic returns fatal errors.
log.debug("Starting to validate internal topics {} in partition
assignor.", topics);
- long currentWallClockMs = time.milliseconds();
+ final long currentWallClockMs = time.milliseconds();
final long deadlineMs = currentWallClockMs + retryTimeoutMs;
- Set<String> topicsNotReady = new HashSet<>(topics.keySet());
+ final Set<String> topicsNotReady = new HashSet<>(topics.keySet());
final Set<String> newlyCreatedTopics = new HashSet<>();
while (!topicsNotReady.isEmpty()) {
- final Set<String> tempUnknownTopics = new HashSet<>();
- topicsNotReady = validateTopics(topicsNotReady, topics,
tempUnknownTopics);
- newlyCreatedTopics.addAll(topicsNotReady);
-
+ final Set<NewTopic> validatedTopicObjects =
createValidatedTopicObjects(topics, topicsNotReady, newlyCreatedTopics);
+ if (!validatedTopicObjects.isEmpty()) {
+ setupValidatedTopics(validatedTopicObjects, topicsNotReady);
+ }
if (!topicsNotReady.isEmpty()) {
- final Set<NewTopic> newTopics = new HashSet<>();
+ maybeThrowTimeoutExceptionDuringMakeReady(deadlineMs);
+ }
+ }
+ log.debug("Completed validating internal topics and created {}",
newlyCreatedTopics);
- for (final String topicName : topicsNotReady) {
- if (tempUnknownTopics.contains(topicName)) {
- // for the tempUnknownTopics, don't create topic for
them
- // we'll check again later if remaining retries > 0
- continue;
- }
- final InternalTopicConfig internalTopicConfig =
Objects.requireNonNull(topics.get(topicName));
- final Map<String, String> topicConfig =
internalTopicConfig.properties(defaultTopicConfigs,
windowChangeLogAdditionalRetention);
+ return newlyCreatedTopics;
+ }
+
+ private Set<NewTopic> createValidatedTopicObjects(final Map<String,
InternalTopicConfig> topics,
+ Set<String>
topicsNotReady,
+ final Set<String>
newlyCreatedTopics) {
+ final Set<String> tempUnknownTopics = new HashSet<>();
+
+ topicsNotReady = validateTopics(topicsNotReady, topics,
tempUnknownTopics);
+ newlyCreatedTopics.addAll(topicsNotReady);
+
+ final Set<NewTopic> validatedTopicObjects = new HashSet<>();
Review Comment:
These are the topic we want to create, right? So maybe better name it
`topicsToBeCreated` ?
And also rename the method name accordingly, maybe `computeTopicToBeCreated`?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -461,120 +461,125 @@ public Set<String> makeReady(final Map<String,
InternalTopicConfig> topics) {
// have existed with the expected number of partitions, or some create
topic returns fatal errors.
log.debug("Starting to validate internal topics {} in partition
assignor.", topics);
- long currentWallClockMs = time.milliseconds();
+ final long currentWallClockMs = time.milliseconds();
final long deadlineMs = currentWallClockMs + retryTimeoutMs;
- Set<String> topicsNotReady = new HashSet<>(topics.keySet());
+ final Set<String> topicsNotReady = new HashSet<>(topics.keySet());
final Set<String> newlyCreatedTopics = new HashSet<>();
while (!topicsNotReady.isEmpty()) {
- final Set<String> tempUnknownTopics = new HashSet<>();
- topicsNotReady = validateTopics(topicsNotReady, topics,
tempUnknownTopics);
- newlyCreatedTopics.addAll(topicsNotReady);
-
+ final Set<NewTopic> validatedTopicObjects =
createValidatedTopicObjects(topics, topicsNotReady, newlyCreatedTopics);
+ if (!validatedTopicObjects.isEmpty()) {
+ setupValidatedTopics(validatedTopicObjects, topicsNotReady);
+ }
if (!topicsNotReady.isEmpty()) {
- final Set<NewTopic> newTopics = new HashSet<>();
+ maybeThrowTimeoutExceptionDuringMakeReady(deadlineMs);
+ }
+ }
+ log.debug("Completed validating internal topics and created {}",
newlyCreatedTopics);
- for (final String topicName : topicsNotReady) {
- if (tempUnknownTopics.contains(topicName)) {
- // for the tempUnknownTopics, don't create topic for
them
- // we'll check again later if remaining retries > 0
- continue;
- }
- final InternalTopicConfig internalTopicConfig =
Objects.requireNonNull(topics.get(topicName));
- final Map<String, String> topicConfig =
internalTopicConfig.properties(defaultTopicConfigs,
windowChangeLogAdditionalRetention);
+ return newlyCreatedTopics;
+ }
+
+ private Set<NewTopic> createValidatedTopicObjects(final Map<String,
InternalTopicConfig> topics,
+ Set<String>
topicsNotReady,
+ final Set<String>
newlyCreatedTopics) {
Review Comment:
nit: fix indention -- also all parameters should be final -- cf my comment
below about `topicsNotReady`
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -461,120 +461,125 @@ public Set<String> makeReady(final Map<String,
InternalTopicConfig> topics) {
// have existed with the expected number of partitions, or some create
topic returns fatal errors.
log.debug("Starting to validate internal topics {} in partition
assignor.", topics);
- long currentWallClockMs = time.milliseconds();
+ final long currentWallClockMs = time.milliseconds();
final long deadlineMs = currentWallClockMs + retryTimeoutMs;
- Set<String> topicsNotReady = new HashSet<>(topics.keySet());
+ final Set<String> topicsNotReady = new HashSet<>(topics.keySet());
final Set<String> newlyCreatedTopics = new HashSet<>();
while (!topicsNotReady.isEmpty()) {
- final Set<String> tempUnknownTopics = new HashSet<>();
- topicsNotReady = validateTopics(topicsNotReady, topics,
tempUnknownTopics);
- newlyCreatedTopics.addAll(topicsNotReady);
-
+ final Set<NewTopic> validatedTopicObjects =
createValidatedTopicObjects(topics, topicsNotReady, newlyCreatedTopics);
+ if (!validatedTopicObjects.isEmpty()) {
+ setupValidatedTopics(validatedTopicObjects, topicsNotReady);
+ }
if (!topicsNotReady.isEmpty()) {
- final Set<NewTopic> newTopics = new HashSet<>();
+ maybeThrowTimeoutExceptionDuringMakeReady(deadlineMs);
+ }
+ }
+ log.debug("Completed validating internal topics and created {}",
newlyCreatedTopics);
- for (final String topicName : topicsNotReady) {
- if (tempUnknownTopics.contains(topicName)) {
- // for the tempUnknownTopics, don't create topic for
them
- // we'll check again later if remaining retries > 0
- continue;
- }
- final InternalTopicConfig internalTopicConfig =
Objects.requireNonNull(topics.get(topicName));
- final Map<String, String> topicConfig =
internalTopicConfig.properties(defaultTopicConfigs,
windowChangeLogAdditionalRetention);
+ return newlyCreatedTopics;
+ }
+
+ private Set<NewTopic> createValidatedTopicObjects(final Map<String,
InternalTopicConfig> topics,
+ Set<String>
topicsNotReady,
+ final Set<String>
newlyCreatedTopics) {
+ final Set<String> tempUnknownTopics = new HashSet<>();
+
+ topicsNotReady = validateTopics(topicsNotReady, topics,
tempUnknownTopics);
+ newlyCreatedTopics.addAll(topicsNotReady);
+
+ final Set<NewTopic> validatedTopicObjects = new HashSet<>();
+
+ for (final String topicName : topicsNotReady) {
+ if (tempUnknownTopics.contains(topicName)) {
+ // for the tempUnknownTopics, don't create topic for them
+ // we'll check again later if remaining retries > 0
+ continue;
+ }
+ final InternalTopicConfig internalTopicConfig =
Objects.requireNonNull(topics.get(topicName));
+ final Map<String, String> topicConfig =
internalTopicConfig.properties(defaultTopicConfigs,
windowChangeLogAdditionalRetention);
- log.debug("Going to create topic {} with {} partitions and
config {}.",
- internalTopicConfig.name(),
- internalTopicConfig.numberOfPartitions(),
- topicConfig);
+ log.debug("Going to create topic {} with {} partitions and config
{}.",
+ internalTopicConfig.name(),
+ internalTopicConfig.numberOfPartitions(),
+ topicConfig);
- newTopics.add(
- new NewTopic(
+ validatedTopicObjects.add(
+ new NewTopic(
internalTopicConfig.name(),
internalTopicConfig.numberOfPartitions(),
Optional.of(replicationFactor))
.configs(topicConfig));
- }
+ }
+ return validatedTopicObjects;
+ }
- // it's possible that although some topics are not ready yet
because they
- // are temporarily not available, not that they do not exist;
in this case
- // the new topics to create may be empty and hence we can skip
here
- if (!newTopics.isEmpty()) {
- final CreateTopicsResult createTopicsResult =
adminClient.createTopics(newTopics);
+ private void setupValidatedTopics(final Set<NewTopic>
validatedTopicObjects,
+ final Set<String>
topicsNotReady) {
+ final CreateTopicsResult createTopicsResult =
adminClient.createTopics(validatedTopicObjects);
- for (final Map.Entry<String, KafkaFuture<Void>>
createTopicResult : createTopicsResult.values().entrySet()) {
- final String topicName = createTopicResult.getKey();
- try {
- createTopicResult.getValue().get();
- topicsNotReady.remove(topicName);
- } catch (final InterruptedException fatalException) {
- // this should not happen; if it ever happens it
indicate a bug
- Thread.currentThread().interrupt();
- log.error(INTERRUPTED_ERROR_MESSAGE,
fatalException);
- throw new
IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException);
- } catch (final ExecutionException executionException) {
- final Throwable cause =
executionException.getCause();
- if (cause instanceof TopicExistsException) {
- // This topic didn't exist earlier or its
leader not known before; just retain it for next round of validation.
- log.info(
- "Could not create topic {}. Topic is
probably marked for deletion (number of partitions is unknown).\n"
- +
- "Will retry to create this
topic in {} ms (to let broker finish async delete operation first).\n"
- +
- "Error message was: {}",
topicName, retryBackOffMs,
- cause.toString());
- } else {
- log.error("Unexpected error during topic
creation for {}.\n" +
- "Error message was: {}", topicName,
cause.toString());
-
- if (cause instanceof
UnsupportedVersionException) {
- final String errorMessage =
cause.getMessage();
- if (errorMessage != null &&
- errorMessage.startsWith("Creating
topics with default partitions/replication factor are only supported in
CreateTopicRequest version 4+")) {
-
- throw new
StreamsException(String.format(
- "Could not create topic %s,
because brokers don't support configuration replication.factor=-1."
- + " You can change the
replication.factor config or upgrade your brokers to version 2.4 or newer to
avoid this error.",
- topicName)
- );
- }
- } else if (cause instanceof TimeoutException) {
- log.error("Creating topic {} timed out.\n"
+
- "Error message was: {}",
topicName, cause.toString());
- } else {
- throw new StreamsException(
- String.format("Could not create
topic %s.", topicName),
- cause
- );
- }
- }
+ for (final Map.Entry<String, KafkaFuture<Void>> createTopicResult :
createTopicsResult.values().entrySet()) {
+ final String topicName = createTopicResult.getKey();
+ try {
+ createTopicResult.getValue().get();
+ topicsNotReady.remove(topicName);
+ } catch (final InterruptedException fatalException) {
+ // this should not happen; if it ever happens it indicate a bug
+ Thread.currentThread().interrupt();
+ log.error(INTERRUPTED_ERROR_MESSAGE, fatalException);
+ throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE,
fatalException);
+ } catch (final ExecutionException executionException) {
+ final Throwable cause = executionException.getCause();
+ if (cause instanceof TopicExistsException) {
+ // This topic didn't exist earlier or its leader not known
before; just retain it for next round of validation.
+ log.info(
+ "Could not create topic {}. Topic is probably
marked for deletion (number of partitions is unknown).\n"
+ +
+ "Will retry to create this topic in {} ms
(to let broker finish async delete operation first).\n"
+ +
+ "Error message was: {}", topicName,
retryBackOffMs,
+ cause.toString());
+ } else {
+ log.error("Unexpected error during topic creation for
{}.\n" +
+ "Error message was: {}", topicName,
cause.toString());
+
+ if (cause instanceof UnsupportedVersionException) {
+ final String errorMessage = cause.getMessage();
+ if (errorMessage != null &&
+ errorMessage.startsWith("Creating topics with
default partitions/replication factor are only supported in CreateTopicRequest
version 4+")) {
+
+ throw new StreamsException(String.format(
+ "Could not create topic %s, because
brokers don't support configuration replication.factor=-1."
+ + " You can change the
replication.factor config or upgrade your brokers to version 2.4 or newer to
avoid this error.",
+ topicName)
+ );
}
+ } else if (cause instanceof TimeoutException) {
+ log.error("Creating topic {} timed out.\n" +
+ "Error message was: {}", topicName,
cause.toString());
+ } else {
+ throw new StreamsException(
+ String.format("Could not create topic %s.",
topicName),
+ cause
+ );
}
}
}
+ }
+ }
- if (!topicsNotReady.isEmpty()) {
- currentWallClockMs = time.milliseconds();
+ private void maybeThrowTimeoutExceptionDuringMakeReady(final long
deadlineMs) {
+ final long currentWallClockMs = time.milliseconds();
+ if (currentWallClockMs >= deadlineMs) {
+ final String timeoutError = String.format("Could not create topics
within %d milliseconds. " +
+ "This can happen if the Kafka cluster is temporarily not
available.", retryTimeoutMs);
+ log.error(timeoutError);
- if (currentWallClockMs >= deadlineMs) {
- final String timeoutError = String.format("Could not
create topics within %d milliseconds. " +
- "This can happen if the Kafka cluster is temporarily
not available.", retryTimeoutMs);
- log.error(timeoutError);
- throw new TimeoutException(timeoutError);
- }
- log.info(
Review Comment:
Why did we remove this log line? I mean, it does not belong into this method
any longer, but we should not drop it.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -461,120 +461,125 @@ public Set<String> makeReady(final Map<String,
InternalTopicConfig> topics) {
// have existed with the expected number of partitions, or some create
topic returns fatal errors.
log.debug("Starting to validate internal topics {} in partition
assignor.", topics);
- long currentWallClockMs = time.milliseconds();
+ final long currentWallClockMs = time.milliseconds();
final long deadlineMs = currentWallClockMs + retryTimeoutMs;
- Set<String> topicsNotReady = new HashSet<>(topics.keySet());
+ final Set<String> topicsNotReady = new HashSet<>(topics.keySet());
final Set<String> newlyCreatedTopics = new HashSet<>();
while (!topicsNotReady.isEmpty()) {
- final Set<String> tempUnknownTopics = new HashSet<>();
- topicsNotReady = validateTopics(topicsNotReady, topics,
tempUnknownTopics);
- newlyCreatedTopics.addAll(topicsNotReady);
-
+ final Set<NewTopic> validatedTopicObjects =
createValidatedTopicObjects(topics, topicsNotReady, newlyCreatedTopics);
+ if (!validatedTopicObjects.isEmpty()) {
+ setupValidatedTopics(validatedTopicObjects, topicsNotReady);
+ }
if (!topicsNotReady.isEmpty()) {
- final Set<NewTopic> newTopics = new HashSet<>();
+ maybeThrowTimeoutExceptionDuringMakeReady(deadlineMs);
+ }
+ }
+ log.debug("Completed validating internal topics and created {}",
newlyCreatedTopics);
- for (final String topicName : topicsNotReady) {
- if (tempUnknownTopics.contains(topicName)) {
- // for the tempUnknownTopics, don't create topic for
them
- // we'll check again later if remaining retries > 0
- continue;
- }
- final InternalTopicConfig internalTopicConfig =
Objects.requireNonNull(topics.get(topicName));
- final Map<String, String> topicConfig =
internalTopicConfig.properties(defaultTopicConfigs,
windowChangeLogAdditionalRetention);
+ return newlyCreatedTopics;
+ }
+
+ private Set<NewTopic> createValidatedTopicObjects(final Map<String,
InternalTopicConfig> topics,
+ Set<String>
topicsNotReady,
+ final Set<String>
newlyCreatedTopics) {
+ final Set<String> tempUnknownTopics = new HashSet<>();
+
+ topicsNotReady = validateTopics(topicsNotReady, topics,
tempUnknownTopics);
+ newlyCreatedTopics.addAll(topicsNotReady);
+
+ final Set<NewTopic> validatedTopicObjects = new HashSet<>();
+
+ for (final String topicName : topicsNotReady) {
+ if (tempUnknownTopics.contains(topicName)) {
+ // for the tempUnknownTopics, don't create topic for them
+ // we'll check again later if remaining retries > 0
+ continue;
+ }
+ final InternalTopicConfig internalTopicConfig =
Objects.requireNonNull(topics.get(topicName));
+ final Map<String, String> topicConfig =
internalTopicConfig.properties(defaultTopicConfigs,
windowChangeLogAdditionalRetention);
- log.debug("Going to create topic {} with {} partitions and
config {}.",
- internalTopicConfig.name(),
- internalTopicConfig.numberOfPartitions(),
- topicConfig);
+ log.debug("Going to create topic {} with {} partitions and config
{}.",
+ internalTopicConfig.name(),
+ internalTopicConfig.numberOfPartitions(),
+ topicConfig);
- newTopics.add(
- new NewTopic(
+ validatedTopicObjects.add(
+ new NewTopic(
internalTopicConfig.name(),
internalTopicConfig.numberOfPartitions(),
Optional.of(replicationFactor))
.configs(topicConfig));
- }
+ }
+ return validatedTopicObjects;
+ }
- // it's possible that although some topics are not ready yet
because they
- // are temporarily not available, not that they do not exist;
in this case
- // the new topics to create may be empty and hence we can skip
here
- if (!newTopics.isEmpty()) {
- final CreateTopicsResult createTopicsResult =
adminClient.createTopics(newTopics);
+ private void setupValidatedTopics(final Set<NewTopic>
validatedTopicObjects,
+ final Set<String>
topicsNotReady) {
+ final CreateTopicsResult createTopicsResult =
adminClient.createTopics(validatedTopicObjects);
- for (final Map.Entry<String, KafkaFuture<Void>>
createTopicResult : createTopicsResult.values().entrySet()) {
- final String topicName = createTopicResult.getKey();
- try {
- createTopicResult.getValue().get();
- topicsNotReady.remove(topicName);
- } catch (final InterruptedException fatalException) {
- // this should not happen; if it ever happens it
indicate a bug
- Thread.currentThread().interrupt();
- log.error(INTERRUPTED_ERROR_MESSAGE,
fatalException);
- throw new
IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException);
- } catch (final ExecutionException executionException) {
- final Throwable cause =
executionException.getCause();
- if (cause instanceof TopicExistsException) {
- // This topic didn't exist earlier or its
leader not known before; just retain it for next round of validation.
- log.info(
- "Could not create topic {}. Topic is
probably marked for deletion (number of partitions is unknown).\n"
- +
- "Will retry to create this
topic in {} ms (to let broker finish async delete operation first).\n"
- +
- "Error message was: {}",
topicName, retryBackOffMs,
- cause.toString());
- } else {
- log.error("Unexpected error during topic
creation for {}.\n" +
- "Error message was: {}", topicName,
cause.toString());
-
- if (cause instanceof
UnsupportedVersionException) {
- final String errorMessage =
cause.getMessage();
- if (errorMessage != null &&
- errorMessage.startsWith("Creating
topics with default partitions/replication factor are only supported in
CreateTopicRequest version 4+")) {
-
- throw new
StreamsException(String.format(
- "Could not create topic %s,
because brokers don't support configuration replication.factor=-1."
- + " You can change the
replication.factor config or upgrade your brokers to version 2.4 or newer to
avoid this error.",
- topicName)
- );
- }
- } else if (cause instanceof TimeoutException) {
- log.error("Creating topic {} timed out.\n"
+
- "Error message was: {}",
topicName, cause.toString());
- } else {
- throw new StreamsException(
- String.format("Could not create
topic %s.", topicName),
- cause
- );
- }
- }
+ for (final Map.Entry<String, KafkaFuture<Void>> createTopicResult :
createTopicsResult.values().entrySet()) {
+ final String topicName = createTopicResult.getKey();
+ try {
+ createTopicResult.getValue().get();
+ topicsNotReady.remove(topicName);
+ } catch (final InterruptedException fatalException) {
+ // this should not happen; if it ever happens it indicate a bug
+ Thread.currentThread().interrupt();
+ log.error(INTERRUPTED_ERROR_MESSAGE, fatalException);
+ throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE,
fatalException);
+ } catch (final ExecutionException executionException) {
+ final Throwable cause = executionException.getCause();
+ if (cause instanceof TopicExistsException) {
+ // This topic didn't exist earlier or its leader not known
before; just retain it for next round of validation.
+ log.info(
+ "Could not create topic {}. Topic is probably
marked for deletion (number of partitions is unknown).\n"
+ +
+ "Will retry to create this topic in {} ms
(to let broker finish async delete operation first).\n"
+ +
+ "Error message was: {}", topicName,
retryBackOffMs,
+ cause.toString());
+ } else {
+ log.error("Unexpected error during topic creation for
{}.\n" +
+ "Error message was: {}", topicName,
cause.toString());
+
+ if (cause instanceof UnsupportedVersionException) {
+ final String errorMessage = cause.getMessage();
+ if (errorMessage != null &&
+ errorMessage.startsWith("Creating topics with
default partitions/replication factor are only supported in CreateTopicRequest
version 4+")) {
+
+ throw new StreamsException(String.format(
+ "Could not create topic %s, because
brokers don't support configuration replication.factor=-1."
+ + " You can change the
replication.factor config or upgrade your brokers to version 2.4 or newer to
avoid this error.",
+ topicName)
+ );
}
+ } else if (cause instanceof TimeoutException) {
+ log.error("Creating topic {} timed out.\n" +
+ "Error message was: {}", topicName,
cause.toString());
+ } else {
+ throw new StreamsException(
+ String.format("Could not create topic %s.",
topicName),
+ cause
+ );
}
}
}
+ }
+ }
- if (!topicsNotReady.isEmpty()) {
- currentWallClockMs = time.milliseconds();
+ private void maybeThrowTimeoutExceptionDuringMakeReady(final long
deadlineMs) {
Review Comment:
`maybeThrowTimeoutExceptionDuringMakeReady` -> the suffix `duringMakeReady`
is a little weird I would drop it
Also: this method might actually block/sleep to implement a retry-backoff --
might be good if the name of the method reflects this?
Maybe `retryBackoffOrThrowTimeoutException` or similar?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]