This is an automated email from the ASF dual-hosted git repository. C0urante pushed a commit to branch metamorphomover-prototype in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 08d656e584d507971532f337f679375b7fa57545 Author: chrise <[email protected]> AuthorDate: Wed Apr 29 05:38:03 2026 +0000 [DRAFT] Add pluggable worker assignor, remove Uber module --- build-prototype.sh | 70 ---------------------- build.gradle | 23 ++----- .../distributed/ClusterConfigStateImpl.java | 16 +++++ .../runtime/distributed/DistributedConfig.java | 3 +- .../runtime/distributed/DistributedHerder.java | 3 +- .../connect/runtime/distributed/UberAssignor.java | 5 +- .../runtime/distributed/WorkerCoordinator.java | 3 +- .../runtime/distributed/WorkerGroupMember.java | 3 +- .../runtime/health/ConnectClusterStateImpl.java | 1 - .../runtime/isolation/PluginScanResult.java | 3 +- .../connect/runtime/isolation/PluginType.java | 2 +- publish-connect.sh | 51 ++++++++++++++++ 12 files changed, 87 insertions(+), 96 deletions(-) diff --git a/build-prototype.sh b/build-prototype.sh deleted file mode 100755 index 1ab302b0ea8..00000000000 --- a/build-prototype.sh +++ /dev/null @@ -1,70 +0,0 @@ -#! /usr/bin/env bash - -if [[ "$#" -ne 1 ]]; then - echo "Usage: $0 <tb_path>" - exit 1 -fi - -TB_PATH="$1" - -upload() { - echo "Uploading $1 to TerraBlob" - if ! [ -z ${FORCE+x} ]; then - tb-cli delete "$TB_PATH"/"$1" || true - fi - tb-cli put --timeout 90s --multipart "$1" "$TB_PATH"/"$1" -} - -set -e - -SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &> /dev/null && pwd) - -if tb-cli ls "$TB_PATH" > /dev/null 2> /dev/null; then - if [ -z ${FORCE+x} ] && [[ $(tb-cli ls "$TB_PATH" | wc -l) != 0 ]]; then - echo "Path $TB_PATH already exists and is non-empty; cannot use" - exit 1 - elif ! [ -z ${FORCE+x} ]; then - echo "Path $TB_PATH is non-empty but FORCE is set; proceeding" - else - echo "Path $TB_PATH already exists and is empty; can use safely" - fi -else - tb-cli mkdir -p "$TB_PATH" -fi - - -if [ -z ${NOBUILD+x} ]; then - - if [ -z ${NOCLEAN+x} ]; then - "$SCRIPT_DIR"/gradlew clean - else - echo "Skipping clean" - fi - - "$SCRIPT_DIR"/gradlew :connect:uber:{spotlessApply,build} releaseTarGz -x test - -else - echo "Skipping build" -fi - -if ! [ -z ${NOPUSH+x} ]; then - echo "Skipping upload" - exit 0 -fi - - -pushd "$SCRIPT_DIR"/core/build/distributions > /dev/null -upload kafka_2.13-4.3.0-SNAPSHOT.tgz -popd > /dev/null - -pushd "$SCRIPT_DIR"/connect/uber/build > /dev/null -TEMP_DIR="$(mktemp -d)" -mkdir "$TEMP_DIR"/connect-uber-plugins -cp libs/* dependant-libs/* "$TEMP_DIR"/connect-uber-plugins -pushd "$TEMP_DIR" > /dev/null -tar czf connect-uber-plugins.tgz connect-uber-plugins -upload connect-uber-plugins.tgz -popd > /dev/null -popd > /dev/null - -echo "Finished uploading artifacts to $TB_PATH" diff --git a/build.gradle b/build.gradle index 404e1a0ded6..abc18ef9e92 100644 --- a/build.gradle +++ b/build.gradle @@ -430,6 +430,7 @@ subprojects { if (shouldSign) { signing { + useGpgCmd() sign publishing.publications.mavenJava } } @@ -3644,10 +3645,7 @@ project(':connect:runtime') { implementation libs.classgraph implementation libs.mavenArtifact implementation libs.swaggerAnnotations - // TODO: We should not be hacking the :connect:runtime module with our own dependencies; instead, - // we should be adding them to the classpath - // This is also causing embedded integration tests to fail ☹️ - implementation ('com.uber.kafka:kafka-upki-provider:1.0.1-217732-a4c6b554904e-java-monorepo') + implementation 'com.uber.kafka:kafka-upki-provider-shaded:1.20.0-chrise-5' compileOnly libs.bndlib compileOnly libs.spotbugs @@ -3677,8 +3675,7 @@ project(':connect:runtime') { testImplementation libs.junitJupiter testImplementation libs.mockitoCore testImplementation libs.mockitoJunitJupiter - // TODO: Conflicts with Kafka UPKI provider -// testImplementation libs.httpclient + testImplementation libs.httpclient testImplementation testLog4j2Libs testCompileOnly libs.bndlib @@ -3990,14 +3987,6 @@ project(':connect:uber') { archivesName = "connect-uber" } - task genUReplicator3Config(type: JavaExec) { - classpath sourceSets.main.runtimeClasspath - classpath sourceSets.main.compileClasspath - mainClass = 'com.uber.data.kafka.connect.ureplicator3.UReplicator3ConnectorConfig' - if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() } - standardOutput = new File(generatedDocsDir, "ureplicator3_config.html").newOutputStream() - } - dependencies { implementation 'com.google.guava:guava:23.0' implementation 'com.uber.m3:tally-core:0.11.1' @@ -4153,9 +4142,9 @@ def updateVersionTask = tasks.register('updateVersion') { def rawVersion if (project.hasProperty('newVersion')) { rawVersion = newVersion - if (!rawVersion.matches(/^[0-9]+\.[0-9]+\.[0-9]+(-SNAPSHOT)?$/)) { - throw new GradleException("Invalid version format: '${rawVersion}'. Expected format: X.Y.Z or X.Y.Z-SNAPSHOT") - } +// if (!rawVersion.matches(/^[0-9]+\.[0-9]+\.[0-9]+(-SNAPSHOT)?$/)) { +// throw new GradleException("Invalid version format: '${rawVersion}'. Expected format: X.Y.Z or X.Y.Z-SNAPSHOT") +// } // Update gradle.properties def gradlePropsFile = file("${project.rootDir}/gradle.properties") if (gradlePropsFile.exists()) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigStateImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigStateImpl.java index def0f344d56..9321a113843 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigStateImpl.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigStateImpl.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.kafka.connect.runtime.distributed; import com.uber.data.kafka.connect.distributed.ClusterConfigState; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java index 391597f664c..1026581a167 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.connect.runtime.distributed; -import com.uber.data.kafka.connect.distributed.ClusterAssignor; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.MetadataRecoveryStrategy; import org.apache.kafka.clients.producer.ProducerConfig; @@ -27,6 +26,8 @@ import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.runtime.WorkerConfig; +import com.uber.data.kafka.connect.distributed.ClusterAssignor; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 3b85f561e16..381db371aac 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.connect.runtime.distributed; -import com.uber.data.kafka.connect.distributed.ClusterAssignor; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.config.ConfigDef; @@ -80,6 +79,8 @@ import org.apache.kafka.connect.util.SinkUtils; import org.apache.kafka.connect.util.Stage; import org.apache.kafka.connect.util.TemporaryStage; +import com.uber.data.kafka.connect.distributed.ClusterAssignor; + import org.slf4j.Logger; import java.util.ArrayList; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/UberAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/UberAssignor.java index 4a466575ad3..1166199afec 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/UberAssignor.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/UberAssignor.java @@ -16,11 +16,12 @@ */ package org.apache.kafka.connect.runtime.distributed; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.connect.storage.ClusterConfigState; + import com.uber.data.kafka.connect.distributed.ClusterAssignor; import com.uber.data.kafka.connect.distributed.ConnectorTaskId; import com.uber.data.kafka.connect.distributed.ConnectorsAndTasks; -import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.connect.storage.ClusterConfigState; import org.slf4j.Logger; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java index 192f035c6b7..1f98f92c628 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.connect.runtime.distributed; -import com.uber.data.kafka.connect.distributed.ClusterAssignor; import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.consumer.CloseOptions; import org.apache.kafka.clients.consumer.internals.AbstractCoordinator; @@ -31,6 +30,8 @@ import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.storage.ConfigBackingStore; import org.apache.kafka.connect.util.ConnectorTaskId; +import com.uber.data.kafka.connect.distributed.ClusterAssignor; + import org.slf4j.Logger; import java.io.Closeable; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java index c3dc2acdd3f..f3d0e664110 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.connect.runtime.distributed; -import com.uber.data.kafka.connect.distributed.ClusterAssignor; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.CommonClientConfigs; @@ -43,6 +42,8 @@ import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.storage.ConfigBackingStore; import org.apache.kafka.connect.util.ConnectorTaskId; +import com.uber.data.kafka.connect.distributed.ClusterAssignor; + import org.slf4j.Logger; import java.net.InetSocketAddress; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java index 3f2ce96766b..4c82e8f3a57 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java @@ -29,7 +29,6 @@ import org.apache.kafka.connect.health.TaskState; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; -import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; import org.apache.kafka.connect.util.FutureCallback; import org.slf4j.Logger; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java index 3c6a193a4bf..5ade6a6ef2e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.connect.runtime.isolation; -import com.uber.data.kafka.connect.distributed.ClusterAssignor; import org.apache.kafka.common.config.provider.ConfigProvider; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.rest.ConnectRestExtension; @@ -27,6 +26,8 @@ import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.predicates.Predicate; +import com.uber.data.kafka.connect.distributed.ClusterAssignor; + import java.util.List; import java.util.SortedSet; import java.util.TreeSet; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java index 05a4766df71..0b686ac68d4 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.connect.runtime.isolation; -import com.uber.data.kafka.connect.distributed.ClusterAssignor; import org.apache.kafka.common.config.provider.ConfigProvider; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.rest.ConnectRestExtension; @@ -28,6 +27,7 @@ import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.predicates.Predicate; import com.fasterxml.jackson.annotation.JsonValue; +import com.uber.data.kafka.connect.distributed.ClusterAssignor; import java.util.Locale; diff --git a/publish-connect.sh b/publish-connect.sh new file mode 100755 index 00000000000..0bb8d7211de --- /dev/null +++ b/publish-connect.sh @@ -0,0 +1,51 @@ +#! /usr/bin/env bash + +if [[ "$#" -ne 1 ]]; then + echo "Usage: $0 <version>" + exit 1 +fi + +set -e + +VERSION=4.3.0-chrise-$1-uber + +./gradlew clean +./gradlew updateVersion -PnewVersion=$VERSION +./gradlew build -x test -x rat -x spotbugsMain -x spotbugsTest -x checkstyleMain -x checkstyleTest +./gradlew releaseTarGz +./gradlew publish -x test -x rat \ + -PmavenUrl=https://artifacts.uberinternal.com/artifactory/libs-release-local \ + -PmavenPassword="$(usso -ussh artifacts.uberinternal.com -print)" \ + -PmaxParallelForks=8 --max-workers=8 \ + -x :streams:publish \ + -x :streams:examples:publish \ + -x :streams:test-utils:publish \ + -x :streams:streams-scala:publish \ + -x :streams:integration-tests:publish \ + -x :streams:upgrade-system-tests-0110:publish \ + -x :streams:upgrade-system-tests-10:publish \ + -x :streams:upgrade-system-tests-11:publish \ + -x :streams:upgrade-system-tests-20:publish \ + -x :streams:upgrade-system-tests-21:publish \ + -x :streams:upgrade-system-tests-22:publish \ + -x :streams:upgrade-system-tests-23:publish \ + -x :streams:upgrade-system-tests-24:publish \ + -x :streams:upgrade-system-tests-25:publish \ + -x :streams:upgrade-system-tests-26:publish \ + -x :streams:upgrade-system-tests-27:publish \ + -x :streams:upgrade-system-tests-28:publish \ + -x :streams:upgrade-system-tests-30:publish \ + -x :streams:upgrade-system-tests-31:publish \ + -x :streams:upgrade-system-tests-32:publish \ + -x :streams:upgrade-system-tests-33:publish \ + -x :streams:upgrade-system-tests-34:publish \ + -x :streams:upgrade-system-tests-35:publish \ + -x :streams:upgrade-system-tests-36:publish \ + -x :streams:upgrade-system-tests-37:publish \ + -x :streams:upgrade-system-tests-38:publish \ + -x :streams:upgrade-system-tests-39:publish \ + -x :streams:upgrade-system-tests-40:publish \ + -x :streams:upgrade-system-tests-41:publish +curl -H "Authorization: Bearer $(usso -ussh artifacts -print)" \ + https://artifacts.uberinternal.com/artifactory/libs-release-local/org/apache/kafka/kafka_2.13/$VERSION/kafka_2.13-$VERSION.tgz \ + -T core/build/distributions/kafka_2.13-$VERSION.tgz
