guozhangwang commented on a change in pull request #11857:
URL: https://github.com/apache/kafka/pull/11857#discussion_r820993077
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -161,35 +161,47 @@ public void registerThread(final String threadName) {
public void unregisterThread(final String threadName) {
threadVersions.remove(threadName);
-
maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(threadName);
+ maybeNotifyTopologyVersionListeners();
}
public TaskExecutionMetadata taskExecutionMetadata() {
return taskExecutionMetadata;
}
- public void
maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(final String
threadName) {
+ public Set<String> updateThreadTopologyVersion(final String threadName) {
try {
- lock();
- final Iterator<TopologyVersionWaiters> iterator =
version.activeTopologyWaiters.listIterator();
- TopologyVersionWaiters topologyVersionWaiters;
+ version.topologyLock.lock();
threadVersions.put(threadName, topologyVersion());
+ return namedTopologiesView();
+ } finally {
+ version.topologyLock.unlock();
+ }
+ }
+
+ public void maybeNotifyTopologyVersionListeners() {
+ try {
+ lock();
+ final long minThreadVersion = getMinimumThreadVersion();
+ final Iterator<TopologyVersionListener> iterator =
version.activeTopologyUpdateListeners.listIterator();
+ TopologyVersionListener topologyVersionListener;
while (iterator.hasNext()) {
- topologyVersionWaiters = iterator.next();
- final long topologyVersionWaitersVersion =
topologyVersionWaiters.topologyVersion;
- if (topologyVersionWaitersVersion <=
threadVersions.get(threadName)) {
- if (threadVersions.values().stream().allMatch(t -> t >=
topologyVersionWaitersVersion)) {
- topologyVersionWaiters.future.complete(null);
- iterator.remove();
- log.info("All threads are now on topology version {}",
topologyVersionWaiters.topologyVersion);
- }
+ topologyVersionListener = iterator.next();
+ final long topologyVersionWaitersVersion =
topologyVersionListener.topologyVersion;
+ if (minThreadVersion >= topologyVersionWaitersVersion) {
Review comment:
Hmm... just to make sure we are talking about
`version.activeTopologyUpdateListeners` right? These listeners are for the
calling thread of the `removeNamedTopology / addNamedTopology / start`, which
would get the wraped futures these listeners are constructed on.
Anyways, my understanding is that when a thread is removed, the
`getMinimumThreadVersion` returned version would not take that removed thread
into consideration, so that even the removed thread's version is low it would
not block the future being completed.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -161,35 +161,47 @@ public void registerThread(final String threadName) {
public void unregisterThread(final String threadName) {
threadVersions.remove(threadName);
-
maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(threadName);
+ maybeNotifyTopologyVersionListeners();
}
public TaskExecutionMetadata taskExecutionMetadata() {
return taskExecutionMetadata;
}
- public void
maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(final String
threadName) {
+ public Set<String> updateThreadTopologyVersion(final String threadName) {
try {
- lock();
- final Iterator<TopologyVersionWaiters> iterator =
version.activeTopologyWaiters.listIterator();
- TopologyVersionWaiters topologyVersionWaiters;
+ version.topologyLock.lock();
threadVersions.put(threadName, topologyVersion());
+ return namedTopologiesView();
+ } finally {
+ version.topologyLock.unlock();
+ }
+ }
+
+ public void maybeNotifyTopologyVersionListeners() {
+ try {
+ lock();
+ final long minThreadVersion = getMinimumThreadVersion();
+ final Iterator<TopologyVersionListener> iterator =
version.activeTopologyUpdateListeners.listIterator();
+ TopologyVersionListener topologyVersionListener;
Review comment:
nit: this `TopologyVersionListener topologyVersionListener` could be
declared within the while loop.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]