[ 
https://issues.apache.org/jira/browse/KAFKA-16943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17855997#comment-17855997
 ] 

Ksolves commented on KAFKA-16943:
---------------------------------

In our current test cases 
(`testFailToCreateInternalTopicsWithMoreReplicasThanBrokers` and 
`testFailToStartWhenInternalTopicsAreNotCompacted`), we attempt to verify that 
the Connect worker fails to start. However, our mechanism for verifying the 
startup failure lacks synchronous waiting and precise assertion.

Example Test Case: [Changes in existing test case marked in 
{color:#00875a}*green*{color}]
{code:java}
@Test
public void testFailToCreateInternalTopicsWithMoreReplicasThanBrokers() throws 
InterruptedException {
    workerProps.put(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, 
"3");
    workerProps.put(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, 
"2");
    workerProps.put(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, 
"1");
    int numWorkers = 0;
    int numBrokers = 1;
    connect = new EmbeddedConnectCluster.Builder().name("connect-cluster-1")
                                                  .workerProps(workerProps)
                                                  .numWorkers(numWorkers)
                                                  .numBrokers(numBrokers)
                                                  .brokerProps(brokerProps)
                                                  .build();

    // Start the brokers and Connect, but Connect should fail to create config 
and offset topic
    connect.start();
    log.info("Completed startup of {} Kafka broker. Expected Connect worker to 
fail", numBrokers);

    // Try to start a worker
    connect.addWorker();

    // Synchronously await and verify that the worker fails during startup
    boolean workerStarted = waitForWorkerStartupFailure(connect, 30000); // 30 
seconds timeout
    assertFalse(workerStarted, "Worker should not have started successfully");

    log.info("Verifying the internal topics for Connect");
    connect.assertions().assertTopicsDoNotExist(configTopic(), offsetTopic());

    // Verify that no workers are running
    assertFalse(connect.anyWorkersRunning());
}

private boolean waitForWorkerStartupFailure(EmbeddedConnectCluster connect, 
long timeoutMillis) throws InterruptedException {
    long startTime = System.currentTimeMillis();
    while (System.currentTimeMillis() - startTime < timeoutMillis) {
        if (!connect.anyWorkersRunning()) {
            return false;
        }
        Thread.sleep(500); // wait for 500 milliseconds before checking again
    }
    return true;
} {code}
What changes do you suggest to improve this synchronous verification mechanism? 
I'll create the PR accordingly.

> Synchronously verify Connect worker startup failure in 
> InternalTopicsIntegrationTest
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-16943
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16943
>             Project: Kafka
>          Issue Type: Improvement
>          Components: connect
>            Reporter: Chris Egerton
>            Priority: Minor
>              Labels: newbie
>
> Created after PR discussion 
> [here|https://github.com/apache/kafka/pull/16288#discussion_r1636615220].
> In some of our integration tests, we want to verify that a Connect worker 
> cannot start under poor conditions (such as when its internal topics do not 
> yet exist and it is configured to create them with a higher replication 
> factor than the number of available brokers, or when its internal topics 
> already exist but they do not have the compaction cleanup policy).
> This is currently not possible, and presents a possible gap in testing 
> coverage, especially for the test cases 
> {{testFailToCreateInternalTopicsWithMoreReplicasThanBrokers}} and 
> {{{}testFailToStartWhenInternalTopicsAreNotCompacted{}}}. It'd be nice if we 
> could have some way of synchronously awaiting the completion or failure of 
> worker startup in our integration tests in order to guarantee that worker 
> startup fails under sufficiently adverse conditions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to