This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 162b3a1bcf9 MINOR: Fix various typos and formatting issues across
multiple modules (#22177)
162b3a1bcf9 is described below
commit 162b3a1bcf9c11a90fdf9f85df760c77de478753
Author: Amir Sarabadani <[email protected]>
AuthorDate: Sat May 2 05:29:09 2026 +0200
MINOR: Fix various typos and formatting issues across multiple modules
(#22177)
This is done via a script I wrote to report typos based on statistical
analysis and after I checked them manually, I applied the correct ones.
No AI was used.
I know it's not a very useful PR but I hope it's fine. Thank you!
Reviewers: Murali Basani <[email protected]>, Pratyaksh Sharma
<[email protected]>, Ken Huang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../org/apache/kafka/clients/FetchSessionHandlerTest.java | 6 +++---
.../kafka/clients/consumer/internals/CompletedFetchTest.java | 6 +++---
.../src/test/java/kafka/server/share/SharePartitionTest.java | 6 +++---
.../kafka/coordinator/group/GroupMetadataManagerTest.java | 2 +-
.../group/modern/TargetAssignmentBuilderTest.java | 8 ++++----
.../java/org/apache/kafka/raft/internals/KafkaRaftLog.java | 8 ++++----
.../kafka/streams/integration/IQv2StoreIntegrationTest.java | 10 +++++-----
.../integration/KafkaStreamsCloseOptionsIntegrationTest.java | 2 +-
.../integration/SuppressionDurabilityIntegrationTest.java | 4 ++--
.../main/java/org/apache/kafka/streams/kstream/KStream.java | 4 ++--
.../kafka/streams/processor/internals/StreamThread.java | 2 +-
.../processor/internals/StreamsPartitionAssignor.java | 6 +++---
.../kafka/streams/processor/internals/assignment/Graph.java | 2 +-
.../state/internals/RocksDBTimeOrderedKeyValueBuffer.java | 12 ++++++------
.../state/internals/RocksDBTimeOrderedSessionStore.java | 4 ++--
.../kafka/streams/kstream/internals/KStreamImplTest.java | 4 ++--
.../processor/internals/ProcessorStateManagerTest.java | 2 +-
.../kafka/streams/processor/internals/StreamThreadTest.java | 6 +++---
.../kafka/streams/processor/internals/TaskManagerTest.java | 2 +-
.../test/java/org/apache/kafka/tools/TopicCommandTest.java | 2 +-
.../java/org/apache/kafka/trogdor/common/StringExpander.java | 2 +-
.../org/apache/kafka/trogdor/coordinator/NodeManager.java | 2 +-
.../java/org/apache/kafka/trogdor/rest/JsonRestServer.java | 2 +-
.../java/org/apache/kafka/trogdor/task/TaskController.java | 2 +-
24 files changed, 53 insertions(+), 53 deletions(-)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
index 7e5d7055f28..85a076d1897 100644
---
a/clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
@@ -124,10 +124,10 @@ public class FetchSessionHandlerTest {
if (!actualIter.hasNext()) {
fail("Element " + i + " not found.");
}
- Map.Entry<TopicPartition, FetchRequest.PartitionData> actuaLEntry
= actualIter.next();
- assertEquals(expectedEntry.getKey(), actuaLEntry.getKey(),
"Element " + i +
+ Map.Entry<TopicPartition, FetchRequest.PartitionData> actualEntry
= actualIter.next();
+ assertEquals(expectedEntry.getKey(), actualEntry.getKey(),
"Element " + i +
" had a different TopicPartition than expected.");
- assertEquals(expectedEntry.getValue(), actuaLEntry.getValue(),
"Element " + i +
+ assertEquals(expectedEntry.getValue(), actualEntry.getValue(),
"Element " + i +
" had different PartitionData than expected.");
i++;
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
index 6db5def3032..452d8697467 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
@@ -92,7 +92,7 @@ public class CompletedFetchTest {
@Test
public void testAbortedTransactionRecordsRemoved() {
int numRecords = 10;
- Records rawRecords = newTranscactionalRecords(ControlRecordType.ABORT,
numRecords);
+ Records rawRecords = newTransactionalRecords(ControlRecordType.ABORT,
numRecords);
FetchResponseData.PartitionData partitionData = new
FetchResponseData.PartitionData()
.setRecords(rawRecords)
@@ -114,7 +114,7 @@ public class CompletedFetchTest {
@Test
public void testCommittedTransactionRecordsIncluded() {
int numRecords = 10;
- Records rawRecords =
newTranscactionalRecords(ControlRecordType.COMMIT, numRecords);
+ Records rawRecords = newTransactionalRecords(ControlRecordType.COMMIT,
numRecords);
FetchResponseData.PartitionData partitionData = new
FetchResponseData.PartitionData()
.setRecords(rawRecords);
CompletedFetch completedFetch = newCompletedFetch(0, partitionData);
@@ -258,7 +258,7 @@ public class CompletedFetchTest {
}
}
- private Records newTranscactionalRecords(ControlRecordType
controlRecordType, int numRecords) {
+ private Records newTransactionalRecords(ControlRecordType
controlRecordType, int numRecords) {
Time time = new MockTime();
ByteBuffer buffer = ByteBuffer.allocate(1024);
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 2cbb103349a..f7b91bcc93e 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -5278,7 +5278,7 @@ public class SharePartitionTest {
// Ack subset of records by "member-2".
sharePartition.acknowledge("member-2",
List.of(new ShareAcknowledgementBatch(5, 5,
List.of(AcknowledgeType.ACCEPT.id))));
- // After the acknowledgements, the startOffset will be upadated to 6,
since offset 5 is Terminal. Hence
+ // After the acknowledgements, the startOffset will be updated to 6,
since offset 5 is Terminal. Hence
// deliveryCompleteCount will remain 9.
assertEquals(9, sharePartition.deliveryCompleteCount());
@@ -5465,7 +5465,7 @@ public class SharePartitionTest {
)));
// After acknowledgements, since offsets 10 -> 12 are at the start of
the caches state and are in Terminal state,
- // the start offset will be updated to 13. From the remaining offstes
in flight, only records (17 -> 19) are in Terminal state.
+ // the start offset will be updated to 13. From the remaining offsets
in flight, only records (17 -> 19) are in Terminal state.
assertEquals(3, sharePartition.deliveryCompleteCount());
// Send next batch from offset 13, only 2 records should be acquired.
@@ -9709,7 +9709,7 @@ public class SharePartitionTest {
TimerTask timerTask2 =
sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask();
// Acknowledge 1 offset in first batch as Accept to create offset
tracking, accept complete
- // sencond batch. And mark offset 0 as release so cached state do not
move ahead.
+ // second batch. And mark offset 0 as release so cached state do not
move ahead.
sharePartition.acknowledge(MEMBER_ID, List.of(
new ShareAcknowledgementBatch(0, 0,
List.of(AcknowledgeType.RELEASE.id)),
new ShareAcknowledgementBatch(1, 1,
List.of(AcknowledgeType.ACCEPT.id)),
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 28d67d9044e..a9a39133521 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -5805,7 +5805,7 @@ public class GroupMetadataManagerTest {
assertEquals(groupMaxSize, group.numAwaitingJoinResponse());
assertTrue(group.isInState(PREPARING_REBALANCE));
- // Advance clock by group initial rebalance delay to complete first
inital delayed join.
+ // Advance clock by group initial rebalance delay to complete first
initial delayed join.
// This will extend the initial rebalance as new members have joined.
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(50));
// Advance clock by group initial rebalance delay to complete second
inital delayed join.
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
index 902864652dc..c575b621e4d 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
@@ -217,8 +217,8 @@ public class TargetAssignmentBuilderTest {
}
public TargetAssignmentBuilder.TargetAssignmentResult build() {
- CoordinatorMetadataImage cooridnatorMetadataImage = new
KRaftCoordinatorMetadataImage(metadataImageBuilder.build());
- TopicIds.TopicResolver topicResolver = new
TopicIds.CachedTopicResolver(cooridnatorMetadataImage);
+ CoordinatorMetadataImage coordinatorMetadataImage = new
KRaftCoordinatorMetadataImage(metadataImageBuilder.build());
+ TopicIds.TopicResolver topicResolver = new
TopicIds.CachedTopicResolver(coordinatorMetadataImage);
// Prepare expected member specs.
Map<String, MemberSubscriptionAndAssignmentImpl>
memberSubscriptions = new HashMap<>();
@@ -256,7 +256,7 @@ public class TargetAssignmentBuilderTest {
});
// Prepare the expected subscription topic metadata.
- SubscribedTopicDescriberImpl subscribedTopicMetadata = new
SubscribedTopicDescriberImpl(cooridnatorMetadataImage);
+ SubscribedTopicDescriberImpl subscribedTopicMetadata = new
SubscribedTopicDescriberImpl(coordinatorMetadataImage);
SubscriptionType subscriptionType = HOMOGENEOUS;
// Prepare the member assignments per topic partition.
@@ -284,7 +284,7 @@ public class TargetAssignmentBuilderTest {
.withSubscriptionType(subscriptionType)
.withTargetAssignment(targetAssignment)
.withInvertedTargetAssignment(invertedTargetAssignment)
- .withMetadataImage(cooridnatorMetadataImage)
+ .withMetadataImage(coordinatorMetadataImage)
.withResolvedRegularExpressions(resolvedRegularExpressions);
// Add the updated members or delete the deleted members.
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java
b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java
index c6c40a75417..4334d0a6d82 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java
@@ -563,10 +563,10 @@ public class KafkaRaftLog implements RaftLog {
}
boolean didClean = false;
- List<OffsetAndEpoch> epoches = new ArrayList<>(snapshots.keySet());
- for (int i = 0; i < epoches.size() - 1; i++) {
- OffsetAndEpoch epoch = epoches.get(i);
- OffsetAndEpoch nextEpoch = epoches.get(i + 1);
+ List<OffsetAndEpoch> epochs = new ArrayList<>(snapshots.keySet());
+ for (int i = 0; i < epochs.size() - 1; i++) {
+ OffsetAndEpoch epoch = epochs.get(i);
+ OffsetAndEpoch nextEpoch = epochs.get(i + 1);
Optional<SnapshotDeletionReason> reason = predicate.apply(epoch);
if (reason.isPresent()) {
boolean deleted = deleteBeforeSnapshot(nextEpoch,
reason.get());
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index be94882885a..42bd76d565d 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -1823,7 +1823,7 @@ public class IQv2StoreIntegrationTest {
final Integer key,
final Instant timeFrom,
final Instant timeTo,
- final Function<V, Integer> valueExtactor,
+ final Function<V, Integer> valueExtractor,
final Set<Integer> expectedValues) {
final WindowKeyQuery<Integer, V> query =
WindowKeyQuery.withKeyAndWindowStartRange(
@@ -1864,7 +1864,7 @@ public class IQv2StoreIntegrationTest {
try (final WindowStoreIterator<V> iterator =
queryResult.get(partition).getResult()) {
while (iterator.hasNext()) {
-
actualValues.add(valueExtactor.apply(iterator.next().value));
+
actualValues.add(valueExtractor.apply(iterator.next().value));
}
}
assertThat(queryResult.get(partition).getExecutionInfo(),
is(empty()));
@@ -1877,7 +1877,7 @@ public class IQv2StoreIntegrationTest {
public <V> void shouldHandleWindowRangeQuery(
final Instant timeFrom,
final Instant timeTo,
- final Function<V, Integer> valueExtactor,
+ final Function<V, Integer> valueExtractor,
final Set<Integer> expectedValues) {
final WindowRangeQuery<Integer, V> query =
WindowRangeQuery.withWindowStartRange(timeFrom, timeTo);
@@ -1914,7 +1914,7 @@ public class IQv2StoreIntegrationTest {
try (final KeyValueIterator<Windowed<Integer>, V> iterator =
queryResult.get(partition).getResult()) {
while (iterator.hasNext()) {
-
actualValues.add(valueExtactor.apply(iterator.next().value));
+
actualValues.add(valueExtractor.apply(iterator.next().value));
}
}
assertThat(queryResult.get(partition).getExecutionInfo(),
is(empty()));
@@ -2057,4 +2057,4 @@ public class IQv2StoreIntegrationTest {
StreamsTestUtils.maybeSetDslStoreFormatHeaders(config, withHeaders);
return config;
}
-}
\ No newline at end of file
+}
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
index 5f690a794f2..c3810422458 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
@@ -115,7 +115,7 @@ public class KafkaStreamsCloseOptionsIntegrationTest {
streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// In this test, we set the SESSION_TIMEOUT_MS_CONFIG high in order to
show that the call to
- // `close(CloseOptions)` can remove the application from the Consumder
Groups successfully.
+ // `close(CloseOptions)` can remove the application from the Consumer
Groups successfully.
streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
Integer.MAX_VALUE);
streamsConfig.putAll(commonClientConfig);
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
index d6a1646dec9..21acb226fc3 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
@@ -158,7 +158,7 @@ public class SuppressionDurabilityIntegrationTest {
try {
// start by putting some stuff in the buffer
// note, we send all input records to partition 0
- // to make sure that supppress doesn't erroneously send records to
other partitions.
+ // to make sure that suppress doesn't erroneously send records to
other partitions.
produceSynchronouslyToPartitionZero(
input,
asList(
@@ -316,4 +316,4 @@ public class SuppressionDurabilityIntegrationTest {
));
IntegrationTestUtils.produceSynchronously(producerConfig, false,
topic, Optional.of(0), toProduce);
}
-}
\ No newline at end of file
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index acbe1a6b7e2..5ccc84a1275 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -757,7 +757,7 @@ public interface KStream<K, V> {
* <p>You can retrieve all generated internal topic names via {@link
Topology#describe()}.
* To explicitly set key/value serdes, to customize the names of the
repartition and changelog topic, or to
* customize the used state store, use {@link #join(KStream, ValueJoiner,
JoinWindows, StreamJoined)}.
- * For more control over the repartitioning, use {@link
#repartition(Repartitioned)} on eiter input before {@code join()}.
+ * For more control over the repartitioning, use {@link
#repartition(Repartitioned)} on either input before {@code join()}.
*
* @param rightStream
* the {@code KStream} to be joined with this stream
@@ -1592,4 +1592,4 @@ public interface KStream<K, V> {
final Named named,
final String... stateStoreNames
);
-}
\ No newline at end of file
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index b5a07f7b4c2..f9a9ba0d5e2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1615,7 +1615,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
// TODO
// This may be null if the task we are currently processing was
part of a named topology that was just removed.
- // After named topologies are removed, we can update
`topologyMetadata.offsetResetStrateg()` so it
+ // After named topologies are removed, we can update
`topologyMetadata.offsetResetStrategy()` so it
// will not return null any longer, and we can remove this check
if (offsetResetStrategy != null) {
if (offsetResetStrategy.isPresent()) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index c783b64d799..745fec081d8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -1129,7 +1129,7 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
final List<TopicPartition> activePartitionsList = new
ArrayList<>();
final List<TaskId> assignedActiveList = new ArrayList<>();
- final Set<TaskId> activeTasksRemovedPendingRevokation =
populateActiveTaskAndPartitionsLists(
+ final Set<TaskId> activeTasksRemovedPendingRevocation =
populateActiveTaskAndPartitionsLists(
activePartitionsList,
assignedActiveList,
consumer,
@@ -1142,7 +1142,7 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
final Map<TaskId, Set<TopicPartition>> standbyTaskMap =
buildStandbyTaskMap(
consumer,
standbyTaskAssignments.get(consumer),
- activeTasksRemovedPendingRevokation,
+ activeTasksRemovedPendingRevocation,
statefulTasks,
partitionsForTask,
clientMetadata.state
@@ -1158,7 +1158,7 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
AssignorError.NONE.code()
);
- if (!activeTasksRemovedPendingRevokation.isEmpty()) {
+ if (!activeTasksRemovedPendingRevocation.isEmpty()) {
// TODO: once KAFKA-10078 is resolved we can leave it to the
client to trigger this rebalance
log.info("Requesting followup rebalance be scheduled
immediately by {} due to tasks changing ownership.", consumer);
info.setNextRebalanceTime(0L);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java
index 7f066bdccb7..7756e2a2fde 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java
@@ -437,7 +437,7 @@ public class Graph<V extends Comparable<V>> {
}
private void cancelNegativeCycle(final V nodeInCycle, final Map<V, V>
parentNodes, final Map<V, Edge> parentEdges) {
- // Start from parentNode since nodeInCyle is used as exit condition in
below loops
+ // Start from parentNode since nodeInCycle is used as exit condition
in below loops
final V parentNode = parentNodes.get(nodeInCycle);
Edge parentEdge = parentEdges.get(nodeInCycle);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
index 956d72f5ffa..d226e1fc24f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
@@ -59,7 +59,7 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V>
implements TimeOrderedKeyVal
private final boolean loggingEnabled;
private int partition;
private String changelogTopic;
- private InternalProcessorContext<?, ?> iternalContext;
+ private InternalProcessorContext<?, ?> internalContext;
private boolean minValid;
public static class Builder<K, V> implements
StoreBuilder<TimeOrderedKeyValueBuffer<K, V, V>> {
@@ -188,7 +188,7 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V>
implements TimeOrderedKeyVal
@Override
public void init(final StateStoreContext stateStoreContext, final
StateStore root) {
store.init(stateStoreContext, root);
- iternalContext =
ProcessorContextUtils.asInternalProcessorContext(stateStoreContext);
+ internalContext =
ProcessorContextUtils.asInternalProcessorContext(stateStoreContext);
partition = stateStoreContext.taskId().partition();
if (loggingEnabled) {
changelogTopic =
ProcessorContextUtils.changelogFor(stateStoreContext, name(), Boolean.TRUE);
@@ -231,7 +231,7 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V>
implements TimeOrderedKeyVal
final BufferValue bufferValue =
BufferValue.deserialize(ByteBuffer.wrap(keyValue.value));
final K key = keySerde.deserializer().deserialize(topic,
- iternalContext.headers(),
+ internalContext.headers(),
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get()));
if (bufferValue.context().timestamp() < minTimestamp &&
minValid) {
@@ -243,7 +243,7 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V>
implements TimeOrderedKeyVal
minTimestamp = bufferValue.context().timestamp();
minValid = true;
- final V value =
valueSerde.deserializer().deserialize(topic, iternalContext.headers(),
bufferValue.newValue());
+ final V value =
valueSerde.deserializer().deserialize(topic, internalContext.headers(),
bufferValue.newValue());
callback.accept(new Eviction<>(key, value,
bufferValue.context()));
@@ -332,7 +332,7 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V>
implements TimeOrderedKeyVal
final ByteBuffer buffer = value.serialize(sizeOfBufferTime);
buffer.putLong(bufferKey.time());
final byte[] array = buffer.array();
- ((RecordCollector.Supplier) iternalContext).recordCollector().send(
+ ((RecordCollector.Supplier) internalContext).recordCollector().send(
changelogTopic,
key,
array,
@@ -346,7 +346,7 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V>
implements TimeOrderedKeyVal
}
private void logTombstone(final Bytes key) {
- ((RecordCollector.Supplier) iternalContext).recordCollector().send(
+ ((RecordCollector.Supplier) internalContext).recordCollector().send(
changelogTopic,
key,
null,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
index 9e2caca3699..ab375907f5d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
@@ -124,11 +124,11 @@ public class RocksDBTimeOrderedSessionStore
@Override
public byte[] fetchSession(final Bytes key,
final long sessionStartTime,
- final long sessiontEndTime) {
+ final long sessionEndTime) {
return wrapped().fetchSession(
key,
sessionStartTime,
- sessiontEndTime
+ sessionEndTime
);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 1794a98bd31..71bcef447f2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -1341,10 +1341,10 @@ public class KStreamImplTest {
public void shouldPropagateRepartitionFlagAfterGlobalKTableJoin() {
final StreamsBuilder builder = new StreamsBuilder();
final GlobalKTable<String, String> globalKTable =
builder.globalTable("globalTopic");
- final KeyValueMapper<String, String, String> kvMappper = (k, v) -> k +
v;
+ final KeyValueMapper<String, String, String> kvMapper = (k, v) -> k +
v;
final ValueJoiner<String, String, String> valueJoiner = (v1, v2) -> v1
+ v2;
builder.<String, String>stream("topic").selectKey((k, v) -> v)
- .join(globalKTable, kvMappper, valueJoiner)
+ .join(globalKTable, kvMapper, valueJoiner)
.groupByKey()
.count();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 2085a4cace2..63e7e7c2e23 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -635,7 +635,7 @@ public class ProcessorStateManagerTest {
assertThat(checkpointedOffsets, is(singletonMap(new
TopicPartition(persistentStoreTopicName, 1), -4L)));
try {
- // Reopen to verify null commited offset
+ // Reopen to verify null committed offset
stateMgr.registerStateStores(Arrays.asList(persistentStore,
nonPersistentStore), context);
assertNull(stateMgr.storeMetadata(persistentStorePartition).offset());
} finally {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index bdd3340af1c..4a393060d62 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -946,13 +946,13 @@ public class StreamThreadTest {
runOnce(false);
assertThat(thread.currentNumIterations(), equalTo(2));
- // system time based punctutation without processing any record,
iteration stays as 2
+ // system time based punctuation without processing any record,
iteration stays as 2
mockTime.sleep(11L);
runOnce(false);
assertThat(thread.currentNumIterations(), equalTo(2));
- // system time based punctutation after processing a record, half
iteration to 1
+ // system time based punctuation after processing a record, half
iteration to 1
mockTime.sleep(11L);
addRecord(mockConsumer, ++offset, 5L);
@@ -966,7 +966,7 @@ public class StreamThreadTest {
assertThat(thread.currentNumIterations(), equalTo(3));
- // stream time based punctutation halves to 1
+ // stream time based punctuation halves to 1
addRecord(mockConsumer, ++offset, 11L);
runOnce(false);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 183dcc35b64..baee5286cae 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -254,7 +254,7 @@ public class TaskManagerTest {
}
@Test
- public void shouldLockCommitableTasksOnCorruptionWithProcessingThreads() {
+ public void shouldLockCommittableTasksOnCorruptionWithProcessingThreads() {
final StreamTask activeTask1 = statefulTask(taskId00,
taskId00ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId00Partitions).build();
diff --git a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
index 06efc3c9186..26645a75f7d 100644
--- a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
@@ -1220,7 +1220,7 @@ public class TopicCommandTest {
ToolsTestUtils.removeReplicationThrottleForPartitions(adminClient,
brokerIds, Set.of(tp));
TestUtils.waitForCondition(
() ->
adminClient.listPartitionReassignments().reassignments().get().isEmpty(),
- CLUSTER_WAIT_MS, String.format("reassignmet not finished
after %s ms", CLUSTER_WAIT_MS)
+ CLUSTER_WAIT_MS, String.format("reassignment not finished
after %s ms", CLUSTER_WAIT_MS)
);
}
}
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/common/StringExpander.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/common/StringExpander.java
index 8763b4db50a..982105aee6c 100644
--- a/trogdor/src/main/java/org/apache/kafka/trogdor/common/StringExpander.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/common/StringExpander.java
@@ -24,7 +24,7 @@ import java.util.regex.Pattern;
/**
* Utilities for expanding strings that have range expressions in them.
*
- * For example, 'foo[1-3]' would be expaneded to foo1, foo2, foo3.
+ * For example, 'foo[1-3]' would be expanded to foo1, foo2, foo3.
* Strings that have no range expressions will not be expanded.
*/
public class StringExpander {
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
index 60429f46927..94292874ada 100644
---
a/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
@@ -150,7 +150,7 @@ public final class NodeManager {
private final NodeHeartbeat heartbeat;
/**
- * A future which can be used to cancel the periodic hearbeat task.
+ * A future which can be used to cancel the periodic heartbeat task.
*/
private ScheduledFuture<?> heartbeatFuture;
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java
index 407a81c1e4f..fe0678530ff 100644
--- a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java
@@ -210,7 +210,7 @@ public class JsonRestServer {
is.close();
return new HttpResponse<>(result, null);
} else {
- // If the resposne code was not in the 200s, we assume that
this is an error
+ // If the response code was not in the 200s, we assume that
this is an error
// response.
InputStream es = connection.getErrorStream();
if (es == null) {
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/task/TaskController.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/task/TaskController.java
index dbd0b09c1e7..4aafb9492de 100644
--- a/trogdor/src/main/java/org/apache/kafka/trogdor/task/TaskController.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/task/TaskController.java
@@ -26,7 +26,7 @@ import java.util.Set;
*/
public interface TaskController {
/**
- * Get the agent nodes which this task is targetting.
+ * Get the agent nodes which this task is targeting.
*
* @param topology The topology to use.
*