This is an automated email from the ASF dual-hosted git repository. zhaocong pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 86fb07bc4588a732bb935552448aeb77e93436c8 Author: Matteo Merli <[email protected]> AuthorDate: Wed Mar 11 13:50:28 2026 -0700 [fix][test] Fix flaky PulsarDebeziumOracleSourceTest (#25314) --- .../debezium/DebeziumOracleDbSourceTester.java | 51 +++++++++++++++++----- 1 file changed, 41 insertions(+), 10 deletions(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java index 41db0a7cc18..961d6192316 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java @@ -146,18 +146,15 @@ public class DebeziumOracleDbSourceTester extends SourceTester<DebeziumOracleDbC // configure logminer runSqlCmd("shutdown immediate"); - // good first approximation but still not enough: waitForOracleStatus("ORACLE not available"); Thread.sleep(SLEEP_AFTER_COMMAND_MS); - runSqlCmd("startup mount"); - // good first approximation but still not enough: - waitForOracleStatus("MOUNTED"); + // startup mount may need retries if Oracle is still shutting down + retryCommand("startup mount", "MOUNTED"); Thread.sleep(SLEEP_AFTER_COMMAND_MS); runSqlCmd("alter database archivelog;"); runSqlCmd("alter database open;"); - // good first approximation but still not enough: waitForOracleStatus("OPEN"); Thread.sleep(SLEEP_AFTER_COMMAND_MS); @@ -175,15 +172,49 @@ public class DebeziumOracleDbSourceTester extends SourceTester<DebeziumOracleDbC } private void waitForOracleStatus(String status) throws Exception { + log.info("Waiting for Oracle status '{}'", status); + String lastStdout = ""; + String lastStderr = ""; for (int i = 0; i < 1000; i++) { - ContainerExecResult response = runSqlCmd("SELECT INSTANCE_NAME, STATUS, DATABASE_STATUS FROM V$INSTANCE;"); - if ((response.getStderr() != null && response.getStderr().contains(status)) - || (response.getStdout() != null && response.getStdout().contains(status))) { - return; + try { + ContainerExecResult response = + runSqlCmd("SELECT INSTANCE_NAME, STATUS, DATABASE_STATUS FROM V$INSTANCE;"); + lastStdout = response.getStdout() != null ? response.getStdout() : ""; + lastStderr = response.getStderr() != null ? response.getStderr() : ""; + if (lastStderr.contains(status) || lastStdout.contains(status)) { + log.info("Oracle reached status '{}' after {} attempts", status, i + 1); + return; + } + } catch (Exception e) { + lastStderr = e.getMessage(); + log.debug("Error polling Oracle status (attempt {}): {}", i + 1, e.getMessage()); + } + if (i % 30 == 29) { + log.info("Still waiting for Oracle status '{}' after {} attempts. " + + "Last stdout: {}, Last stderr: {}", status, i + 1, lastStdout, lastStderr); } Thread.sleep(1000); } - throw new IllegalStateException("Oracle did not initialize properly"); + throw new IllegalStateException( + String.format("Oracle did not reach status '%s'. Last stdout: %s, Last stderr: %s", + status, lastStdout, lastStderr)); + } + + private void retryCommand(String cmd, String expectedStatus) throws Exception { + for (int attempt = 1; attempt <= 3; attempt++) { + runSqlCmd(cmd); + try { + waitForOracleStatus(expectedStatus); + return; + } catch (IllegalStateException e) { + if (attempt == 3) { + throw e; + } + log.warn("Command '{}' did not lead to status '{}' (attempt {}), retrying...", + cmd, expectedStatus, attempt); + Thread.sleep(SLEEP_AFTER_COMMAND_MS); + } + } } private ContainerExecResult runSqlCmd(String cmd) throws Exception {
