This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 1bc4cad90dd Upgrade debezium 1.9.3
1bc4cad90dd is described below
commit 1bc4cad90dd0c03004b18f504acbcbf3f5b8b654
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Jun 15 06:28:54 2022 +0200
Upgrade debezium 1.9.3
---
camel-dependencies/pom.xml | 2 +-
.../DebeziumOracleComponentConfigurer.java | 6 +
.../debezium/DebeziumOracleEndpointConfigurer.java | 6 +
.../debezium/DebeziumOracleEndpointUriFactory.java | 3 +-
...acleConnectorEmbeddedDebeziumConfiguration.java | 1338 ++++++++++----------
.../camel/component/debezium/debezium-oracle.json | 2 +
parent/pom.xml | 2 +-
7 files changed, 695 insertions(+), 664 deletions(-)
diff --git a/camel-dependencies/pom.xml b/camel-dependencies/pom.xml
index 76e9acf3afc..2e87c1f8a2d 100644
--- a/camel-dependencies/pom.xml
+++ b/camel-dependencies/pom.xml
@@ -166,7 +166,7 @@
<cxf.xjc.jvmArgs></cxf.xjc.jvmArgs>
<datasonnet-mapper-version>2.1.4</datasonnet-mapper-version>
<debezium-mysql-connector-version>8.0.28</debezium-mysql-connector-version>
- <debezium-version>1.9.1.Final</debezium-version>
+ <debezium-version>1.9.3.Final</debezium-version>
<deltaspike-version>1.9.5</deltaspike-version>
<depends-maven-plugin-version>1.4.0</depends-maven-plugin-version>
<derby-version>10.14.2.0</derby-version>
diff --git
a/components/camel-debezium/camel-debezium-oracle/src/generated/java/org/apache/camel/component/debezium/DebeziumOracleComponentConfigurer.java
b/components/camel-debezium/camel-debezium-oracle/src/generated/java/org/apache/camel/component/debezium/DebeziumOracleComponentConfigurer.java
index 65382dc4979..0a2e6cf23f9 100644
---
a/components/camel-debezium/camel-debezium-oracle/src/generated/java/org/apache/camel/component/debezium/DebeziumOracleComponentConfigurer.java
+++
b/components/camel-debezium/camel-debezium-oracle/src/generated/java/org/apache/camel/component/debezium/DebeziumOracleComponentConfigurer.java
@@ -142,6 +142,8 @@ public class DebeziumOracleComponentConfigurer extends
PropertyConfigurerSupport
case "logMiningScnGapDetectionGapSizeMin":
getOrCreateConfiguration(target).setLogMiningScnGapDetectionGapSizeMin(property(camelContext,
long.class, value)); return true;
case "logminingscngapdetectiontimeintervalmaxms":
case "logMiningScnGapDetectionTimeIntervalMaxMs":
getOrCreateConfiguration(target).setLogMiningScnGapDetectionTimeIntervalMaxMs(property(camelContext,
java.time.Duration.class, value).toMillis()); return true;
+ case "logminingsessionmaxms":
+ case "logMiningSessionMaxMs":
getOrCreateConfiguration(target).setLogMiningSessionMaxMs(property(camelContext,
java.time.Duration.class, value).toMillis()); return true;
case "logminingsleeptimedefaultms":
case "logMiningSleepTimeDefaultMs":
getOrCreateConfiguration(target).setLogMiningSleepTimeDefaultMs(property(camelContext,
java.time.Duration.class, value).toMillis()); return true;
case "logminingsleeptimeincrementms":
@@ -355,6 +357,8 @@ public class DebeziumOracleComponentConfigurer extends
PropertyConfigurerSupport
case "logMiningScnGapDetectionGapSizeMin": return long.class;
case "logminingscngapdetectiontimeintervalmaxms":
case "logMiningScnGapDetectionTimeIntervalMaxMs": return long.class;
+ case "logminingsessionmaxms":
+ case "logMiningSessionMaxMs": return long.class;
case "logminingsleeptimedefaultms":
case "logMiningSleepTimeDefaultMs": return long.class;
case "logminingsleeptimeincrementms":
@@ -569,6 +573,8 @@ public class DebeziumOracleComponentConfigurer extends
PropertyConfigurerSupport
case "logMiningScnGapDetectionGapSizeMin": return
getOrCreateConfiguration(target).getLogMiningScnGapDetectionGapSizeMin();
case "logminingscngapdetectiontimeintervalmaxms":
case "logMiningScnGapDetectionTimeIntervalMaxMs": return
getOrCreateConfiguration(target).getLogMiningScnGapDetectionTimeIntervalMaxMs();
+ case "logminingsessionmaxms":
+ case "logMiningSessionMaxMs": return
getOrCreateConfiguration(target).getLogMiningSessionMaxMs();
case "logminingsleeptimedefaultms":
case "logMiningSleepTimeDefaultMs": return
getOrCreateConfiguration(target).getLogMiningSleepTimeDefaultMs();
case "logminingsleeptimeincrementms":
diff --git
a/components/camel-debezium/camel-debezium-oracle/src/generated/java/org/apache/camel/component/debezium/DebeziumOracleEndpointConfigurer.java
b/components/camel-debezium/camel-debezium-oracle/src/generated/java/org/apache/camel/component/debezium/DebeziumOracleEndpointConfigurer.java
index 2b0cc7d25b7..9036eda6314 100644
---
a/components/camel-debezium/camel-debezium-oracle/src/generated/java/org/apache/camel/component/debezium/DebeziumOracleEndpointConfigurer.java
+++
b/components/camel-debezium/camel-debezium-oracle/src/generated/java/org/apache/camel/component/debezium/DebeziumOracleEndpointConfigurer.java
@@ -136,6 +136,8 @@ public class DebeziumOracleEndpointConfigurer extends
PropertyConfigurerSupport
case "logMiningScnGapDetectionGapSizeMin":
target.getConfiguration().setLogMiningScnGapDetectionGapSizeMin(property(camelContext,
long.class, value)); return true;
case "logminingscngapdetectiontimeintervalmaxms":
case "logMiningScnGapDetectionTimeIntervalMaxMs":
target.getConfiguration().setLogMiningScnGapDetectionTimeIntervalMaxMs(property(camelContext,
java.time.Duration.class, value).toMillis()); return true;
+ case "logminingsessionmaxms":
+ case "logMiningSessionMaxMs":
target.getConfiguration().setLogMiningSessionMaxMs(property(camelContext,
java.time.Duration.class, value).toMillis()); return true;
case "logminingsleeptimedefaultms":
case "logMiningSleepTimeDefaultMs":
target.getConfiguration().setLogMiningSleepTimeDefaultMs(property(camelContext,
java.time.Duration.class, value).toMillis()); return true;
case "logminingsleeptimeincrementms":
@@ -350,6 +352,8 @@ public class DebeziumOracleEndpointConfigurer extends
PropertyConfigurerSupport
case "logMiningScnGapDetectionGapSizeMin": return long.class;
case "logminingscngapdetectiontimeintervalmaxms":
case "logMiningScnGapDetectionTimeIntervalMaxMs": return long.class;
+ case "logminingsessionmaxms":
+ case "logMiningSessionMaxMs": return long.class;
case "logminingsleeptimedefaultms":
case "logMiningSleepTimeDefaultMs": return long.class;
case "logminingsleeptimeincrementms":
@@ -565,6 +569,8 @@ public class DebeziumOracleEndpointConfigurer extends
PropertyConfigurerSupport
case "logMiningScnGapDetectionGapSizeMin": return
target.getConfiguration().getLogMiningScnGapDetectionGapSizeMin();
case "logminingscngapdetectiontimeintervalmaxms":
case "logMiningScnGapDetectionTimeIntervalMaxMs": return
target.getConfiguration().getLogMiningScnGapDetectionTimeIntervalMaxMs();
+ case "logminingsessionmaxms":
+ case "logMiningSessionMaxMs": return
target.getConfiguration().getLogMiningSessionMaxMs();
case "logminingsleeptimedefaultms":
case "logMiningSleepTimeDefaultMs": return
target.getConfiguration().getLogMiningSleepTimeDefaultMs();
case "logminingsleeptimeincrementms":
diff --git
a/components/camel-debezium/camel-debezium-oracle/src/generated/java/org/apache/camel/component/debezium/DebeziumOracleEndpointUriFactory.java
b/components/camel-debezium/camel-debezium-oracle/src/generated/java/org/apache/camel/component/debezium/DebeziumOracleEndpointUriFactory.java
index 45b78b28767..2e7140d61ff 100644
---
a/components/camel-debezium/camel-debezium-oracle/src/generated/java/org/apache/camel/component/debezium/DebeziumOracleEndpointUriFactory.java
+++
b/components/camel-debezium/camel-debezium-oracle/src/generated/java/org/apache/camel/component/debezium/DebeziumOracleEndpointUriFactory.java
@@ -21,7 +21,7 @@ public class DebeziumOracleEndpointUriFactory extends
org.apache.camel.support.c
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Set<String> MULTI_VALUE_PREFIXES;
static {
- Set<String> props = new HashSet<>(105);
+ Set<String> props = new HashSet<>(106);
props.add("additionalProperties");
props.add("binaryHandlingMode");
props.add("bridgeErrorHandler");
@@ -80,6 +80,7 @@ public class DebeziumOracleEndpointUriFactory extends
org.apache.camel.support.c
props.add("logMiningBufferType");
props.add("logMiningScnGapDetectionGapSizeMin");
props.add("logMiningScnGapDetectionTimeIntervalMaxMs");
+ props.add("logMiningSessionMaxMs");
props.add("logMiningSleepTimeDefaultMs");
props.add("logMiningSleepTimeIncrementMs");
props.add("logMiningSleepTimeMaxMs");
diff --git
a/components/camel-debezium/camel-debezium-oracle/src/generated/java/org/apache/camel/component/debezium/configuration/OracleConnectorEmbeddedDebeziumConfiguration.java
b/components/camel-debezium/camel-debezium-oracle/src/generated/java/org/apache/camel/component/debezium/configuration/OracleConnectorEmbeddedDebeziumConfiguration.java
index 45d56832e14..f6d3a7ca06f 100644
---
a/components/camel-debezium/camel-debezium-oracle/src/generated/java/org/apache/camel/component/debezium/configuration/OracleConnectorEmbeddedDebeziumConfiguration.java
+++
b/components/camel-debezium/camel-debezium-oracle/src/generated/java/org/apache/camel/component/debezium/configuration/OracleConnectorEmbeddedDebeziumConfiguration.java
@@ -18,18 +18,88 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
private boolean logMiningBufferDropOnStop = false;
@UriParam(label = LABEL_NAME)
private String messageKeyColumns;
- @UriParam(label = LABEL_NAME, defaultValue = "0")
- private int queryFetchSize = 0;
@UriParam(label = LABEL_NAME)
private String logMiningArchiveDestinationName;
@UriParam(label = LABEL_NAME)
private String columnBlacklist;
+ @UriParam(label = LABEL_NAME, defaultValue = "true")
+ private boolean includeSchemaChanges = true;
+ @UriParam(label = LABEL_NAME)
+ private String signalDataCollection;
+ @UriParam(label = LABEL_NAME)
+ private String converters;
+ @UriParam(label = LABEL_NAME)
+ private int snapshotFetchSize;
+ @UriParam(label = LABEL_NAME, defaultValue = "10s", javaType =
"java.time.Duration")
+ private long snapshotLockTimeoutMs = 10000;
+ @UriParam(label = LABEL_NAME, defaultValue = "3s", javaType =
"java.time.Duration")
+ private long databaseHistoryKafkaQueryTimeoutMs = 3000;
+ @UriParam(label = LABEL_NAME, defaultValue = "1000000")
+ private long logMiningScnGapDetectionGapSizeMin = 1000000;
+ @UriParam(label = LABEL_NAME)
+ private String databaseDbname;
+ @UriParam(label = LABEL_NAME, defaultValue = "1s", javaType =
"java.time.Duration")
+ private long logMiningSleepTimeDefaultMs = 1000;
+ @UriParam(label = LABEL_NAME)
+ private String snapshotSelectStatementOverrides;
+ @UriParam(label = LABEL_NAME, defaultValue = "v2")
+ private String sourceStructVersion = "v2";
+ @UriParam(label = LABEL_NAME)
+ private String columnWhitelist;
+ @UriParam(label = LABEL_NAME, defaultValue = "10s", javaType =
"java.time.Duration")
+ private long logMiningArchiveLogOnlyScnPollIntervalMs = 10000;
+ @UriParam(label = LABEL_NAME)
+ private String tableExcludeList;
+ @UriParam(label = LABEL_NAME, defaultValue = "2048")
+ private int maxBatchSize = 2048;
+ @UriParam(label = LABEL_NAME)
+ private String logMiningBufferInfinispanCacheTransactions;
+ @UriParam(label = LABEL_NAME, defaultValue = "initial")
+ private String snapshotMode = "initial";
+ @UriParam(label = LABEL_NAME)
+ private String databaseHistoryKafkaTopic;
+ @UriParam(label = LABEL_NAME, defaultValue = "10s", javaType =
"java.time.Duration")
+ private long retriableRestartConnectorWaitMs = 10000;
+ @UriParam(label = LABEL_NAME, defaultValue = "0ms", javaType =
"java.time.Duration")
+ private long snapshotDelayMs = 0;
+ @UriParam(label = LABEL_NAME, defaultValue = "redo_log_catalog")
+ private String logMiningStrategy = "redo_log_catalog";
+ @UriParam(label = LABEL_NAME)
+ private String tableWhitelist;
+ @UriParam(label = LABEL_NAME, defaultValue = "false")
+ private boolean tombstonesOnDelete = false;
+ @UriParam(label = LABEL_NAME, defaultValue = "precise")
+ private String decimalHandlingMode = "precise";
+ @UriParam(label = LABEL_NAME, defaultValue = "bytes")
+ private String binaryHandlingMode = "bytes";
+ @UriParam(label = LABEL_NAME)
+ private String databaseOutServerName;
+ @UriParam(label = LABEL_NAME)
+ private String snapshotIncludeCollectionList;
+ @UriParam(label = LABEL_NAME)
+ private String databaseHistoryFileFilename;
+ @UriParam(label = LABEL_NAME)
+ private String databasePdbName;
+ @UriParam(label = LABEL_NAME, defaultValue = "LogMiner")
+ private String databaseConnectionAdapter = "LogMiner";
+ @UriParam(label = LABEL_NAME, defaultValue = "memory")
+ private String logMiningBufferType = "memory";
+ @UriParam(label = LABEL_NAME, defaultValue = "fail")
+ private String eventProcessingFailureHandlingMode = "fail";
+ @UriParam(label = LABEL_NAME, defaultValue = "1")
+ private int snapshotMaxThreads = 1;
+ @UriParam(label = LABEL_NAME, defaultValue = "avro")
+ private String schemaNameAdjustmentMode = "avro";
+ @UriParam(label = LABEL_NAME, defaultValue = "20000")
+ private long logMiningBatchSizeDefault = 20000;
+ @UriParam(label = LABEL_NAME)
+ private String tableIncludeList;
+ @UriParam(label = LABEL_NAME, defaultValue = "0")
+ private int queryFetchSize = 0;
@UriParam(label = LABEL_NAME, defaultValue = "0ms", javaType =
"java.time.Duration")
private long logMiningSleepTimeMinMs = 0;
@UriParam(label = LABEL_NAME)
private String tableBlacklist;
- @UriParam(label = LABEL_NAME, defaultValue = "true")
- private boolean includeSchemaChanges = true;
@UriParam(label = LABEL_NAME, defaultValue =
"__debezium_unavailable_value")
private String unavailableValuePlaceholder =
"__debezium_unavailable_value";
@UriParam(label = LABEL_NAME)
@@ -38,55 +108,31 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
private long pollIntervalMs = 500;
@UriParam(label = LABEL_NAME, defaultValue = "100ms", javaType =
"java.time.Duration")
private int databaseHistoryKafkaRecoveryPollIntervalMs = 100;
- @UriParam(label = LABEL_NAME)
- private String signalDataCollection;
@UriParam(label = LABEL_NAME, defaultValue = "false")
private boolean lobEnabled = false;
@UriParam(label = LABEL_NAME, defaultValue = "numeric")
private String intervalHandlingMode = "numeric";
@UriParam(label = LABEL_NAME, defaultValue = "false")
private boolean databaseHistoryStoreOnlyCapturedTablesDdl = false;
- @UriParam(label = LABEL_NAME)
- private String converters;
@UriParam(label = LABEL_NAME, defaultValue = "__debezium-heartbeat")
private String heartbeatTopicsPrefix = "__debezium-heartbeat";
@UriParam(label = LABEL_NAME, defaultValue = "false")
private boolean logMiningArchiveLogOnlyMode = false;
@UriParam(label = LABEL_NAME)
- private int snapshotFetchSize;
- @UriParam(label = LABEL_NAME)
private String logMiningBufferInfinispanCacheSchemaChanges;
- @UriParam(label = LABEL_NAME, defaultValue = "10s", javaType =
"java.time.Duration")
- private long snapshotLockTimeoutMs = 10000;
- @UriParam(label = LABEL_NAME, defaultValue = "3s", javaType =
"java.time.Duration")
- private long databaseHistoryKafkaQueryTimeoutMs = 3000;
@UriParam(label = LABEL_NAME, defaultValue = "3s", javaType =
"java.time.Duration")
private long logMiningSleepTimeMaxMs = 3000;
- @UriParam(label = LABEL_NAME, defaultValue = "1000000")
- private long logMiningScnGapDetectionGapSizeMin = 1000000;
@UriParam(label = LABEL_NAME)
private String databaseUser;
@UriParam(label = LABEL_NAME)
- private String databaseDbname;
- @UriParam(label = LABEL_NAME)
private String datatypePropagateSourceType;
@UriParam(label = LABEL_NAME, defaultValue = "false")
private boolean sanitizeFieldNames = false;
- @UriParam(label = LABEL_NAME, defaultValue = "1s", javaType =
"java.time.Duration")
- private long logMiningSleepTimeDefaultMs = 1000;
- @UriParam(label = LABEL_NAME)
- private String snapshotSelectStatementOverrides;
@UriParam(label = LABEL_NAME)
private String databaseHistoryKafkaBootstrapServers;
- @UriParam(label = LABEL_NAME, defaultValue = "v2")
- private String sourceStructVersion = "v2";
@UriParam(label = LABEL_NAME, defaultValue = "0ms", javaType =
"java.time.Duration")
private int heartbeatIntervalMs = 0;
@UriParam(label = LABEL_NAME)
- private String columnWhitelist;
- @UriParam(label = LABEL_NAME, defaultValue = "10s", javaType =
"java.time.Duration")
- private long logMiningArchiveLogOnlyScnPollIntervalMs = 10000;
- @UriParam(label = LABEL_NAME)
private String columnIncludeList;
@UriParam(label = LABEL_NAME)
private String logMiningUsernameExcludeList;
@@ -95,74 +141,38 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
@UriParam(label = LABEL_NAME)
private String logMiningBufferInfinispanCacheProcessedTransactions;
@UriParam(label = LABEL_NAME)
- private String tableExcludeList;
- @UriParam(label = LABEL_NAME)
@Metadata(required = true)
private String databasePassword;
@UriParam(label = LABEL_NAME)
private String logMiningBufferInfinispanCacheEvents;
@UriParam(label = LABEL_NAME, defaultValue = "false")
private boolean databaseHistoryStoreOnlyMonitoredTablesDdl = false;
- @UriParam(label = LABEL_NAME, defaultValue = "2048")
- private int maxBatchSize = 2048;
@UriParam(label = LABEL_NAME)
private String skippedOperations;
- @UriParam(label = LABEL_NAME)
- private String logMiningBufferInfinispanCacheTransactions;
@UriParam(label = LABEL_NAME, defaultValue = "20s", javaType =
"java.time.Duration")
private long logMiningScnGapDetectionTimeIntervalMaxMs = 20000;
- @UriParam(label = LABEL_NAME, defaultValue = "initial")
- private String snapshotMode = "initial";
@UriParam(label = LABEL_NAME, defaultValue =
"io.debezium.relational.history.KafkaDatabaseHistory")
private String databaseHistory =
"io.debezium.relational.history.KafkaDatabaseHistory";
@UriParam(label = LABEL_NAME, defaultValue = "8192")
private int maxQueueSize = 8192;
@UriParam(label = LABEL_NAME)
private String racNodes;
- @UriParam(label = LABEL_NAME)
- private String databaseHistoryKafkaTopic;
- @UriParam(label = LABEL_NAME, defaultValue = "10s", javaType =
"java.time.Duration")
- private long retriableRestartConnectorWaitMs = 10000;
- @UriParam(label = LABEL_NAME, defaultValue = "0ms", javaType =
"java.time.Duration")
- private long snapshotDelayMs = 0;
- @UriParam(label = LABEL_NAME, defaultValue = "redo_log_catalog")
- private String logMiningStrategy = "redo_log_catalog";
@UriParam(label = LABEL_NAME, defaultValue = "100")
private int databaseHistoryKafkaRecoveryAttempts = 100;
@UriParam(label = LABEL_NAME, defaultValue = "false")
private boolean provideTransactionMetadata = false;
- @UriParam(label = LABEL_NAME)
- private String tableWhitelist;
- @UriParam(label = LABEL_NAME, defaultValue = "false")
- private boolean tombstonesOnDelete = false;
- @UriParam(label = LABEL_NAME, defaultValue = "precise")
- private String decimalHandlingMode = "precise";
- @UriParam(label = LABEL_NAME, defaultValue = "bytes")
- private String binaryHandlingMode = "bytes";
@UriParam(label = LABEL_NAME, defaultValue = "false")
private boolean includeSchemaComments = false;
@UriParam(label = LABEL_NAME, defaultValue = "false")
private boolean databaseHistorySkipUnparseableDdl = false;
- @UriParam(label = LABEL_NAME)
- private String databaseOutServerName;
@UriParam(label = LABEL_NAME, defaultValue = "0")
private long logMiningArchiveLogHours = 0;
@UriParam(label = LABEL_NAME, defaultValue = "0")
private long logMiningTransactionRetentionHours = 0;
- @UriParam(label = LABEL_NAME)
- private String snapshotIncludeCollectionList;
- @UriParam(label = LABEL_NAME)
- private String databaseHistoryFileFilename;
@UriParam(label = LABEL_NAME, defaultValue = "100000")
private long logMiningBatchSizeMax = 100000;
- @UriParam(label = LABEL_NAME)
- private String databasePdbName;
- @UriParam(label = LABEL_NAME, defaultValue = "LogMiner")
- private String databaseConnectionAdapter = "LogMiner";
@UriParam(label = LABEL_NAME, defaultValue = "0")
private long maxQueueSizeInBytes = 0;
- @UriParam(label = LABEL_NAME, defaultValue = "memory")
- private String logMiningBufferType = "memory";
@UriParam(label = LABEL_NAME, defaultValue =
"${database.server.name}.transaction")
private String transactionTopic = "${database.server.name}.transaction";
@UriParam(label = LABEL_NAME)
@@ -172,28 +182,20 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
@UriParam(label = LABEL_NAME)
@Metadata(required = true)
private String databaseServerName;
- @UriParam(label = LABEL_NAME, defaultValue = "fail")
- private String eventProcessingFailureHandlingMode = "fail";
- @UriParam(label = LABEL_NAME, defaultValue = "1")
- private int snapshotMaxThreads = 1;
@UriParam(label = LABEL_NAME, defaultValue = "1528")
private int databasePort = 1528;
@UriParam(label = LABEL_NAME, defaultValue = "200ms", javaType =
"java.time.Duration")
private long logMiningSleepTimeIncrementMs = 200;
@UriParam(label = LABEL_NAME)
private String columnExcludeList;
+ @UriParam(label = LABEL_NAME, defaultValue = "0ms", javaType =
"java.time.Duration")
+ private long logMiningSessionMaxMs = 0;
@UriParam(label = LABEL_NAME)
private String databaseHostname;
@UriParam(label = LABEL_NAME, defaultValue = "1000")
private long logMiningBatchSizeMin = 1000;
- @UriParam(label = LABEL_NAME, defaultValue = "avro")
- private String schemaNameAdjustmentMode = "avro";
- @UriParam(label = LABEL_NAME, defaultValue = "20000")
- private long logMiningBatchSizeDefault = 20000;
@UriParam(label = LABEL_NAME)
private String snapshotEnhancePredicateScn;
- @UriParam(label = LABEL_NAME)
- private String tableIncludeList;
/**
* Controls how the connector holds locks on tables while performing the
@@ -247,18 +249,6 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
return messageKeyColumns;
}
- /**
- * The maximum number of records that should be loaded into memory while
- * streaming. A value of `0` uses the default JDBC fetch size.
- */
- public void setQueryFetchSize(int queryFetchSize) {
- this.queryFetchSize = queryFetchSize;
- }
-
- public int getQueryFetchSize() {
- return queryFetchSize;
- }
-
/**
* Sets the specific archive log destination as the source for reading
* archive logs.When not set, the connector will automatically select the
@@ -285,32 +275,6 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
return columnBlacklist;
}
- /**
- * The minimum amount of time that the connector will sleep after reading
- * data from redo/archive logs and before starting reading data again.
Value
- * is in milliseconds.
- */
- public void setLogMiningSleepTimeMinMs(long logMiningSleepTimeMinMs) {
- this.logMiningSleepTimeMinMs = logMiningSleepTimeMinMs;
- }
-
- public long getLogMiningSleepTimeMinMs() {
- return logMiningSleepTimeMinMs;
- }
-
- /**
- * A comma-separated list of regular expressions that match the
- * fully-qualified names of tables to be excluded from monitoring
- * (deprecated, use "table.exclude.list" instead)
- */
- public void setTableBlacklist(String tableBlacklist) {
- this.tableBlacklist = tableBlacklist;
- }
-
- public String getTableBlacklist() {
- return tableBlacklist;
- }
-
/**
* Whether the connector should publish changes in the database schema to a
* Kafka topic with the same name as the database server ID. Each schema
@@ -328,72 +292,563 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
}
/**
- * Specify the constant that will be provided by Debezium to indicate that
- * the original value is unavailable and not provided by the database.
+ * The name of the data collection that is used to send signals/commands to
+ * Debezium. Signaling is disabled when not set.
*/
- public void setUnavailableValuePlaceholder(
- String unavailableValuePlaceholder) {
- this.unavailableValuePlaceholder = unavailableValuePlaceholder;
+ public void setSignalDataCollection(String signalDataCollection) {
+ this.signalDataCollection = signalDataCollection;
}
- public String getUnavailableValuePlaceholder() {
- return unavailableValuePlaceholder;
+ public String getSignalDataCollection() {
+ return signalDataCollection;
}
/**
- * The query executed with every heartbeat.
+ * Optional list of custom converters that would be used instead of default
+ * ones. The converters are defined using '<converter.prefix>.type' config
+ * option and configured using options '<converter.prefix>.<option>'
*/
- public void setHeartbeatActionQuery(String heartbeatActionQuery) {
- this.heartbeatActionQuery = heartbeatActionQuery;
+ public void setConverters(String converters) {
+ this.converters = converters;
}
- public String getHeartbeatActionQuery() {
- return heartbeatActionQuery;
+ public String getConverters() {
+ return converters;
}
/**
- * Time to wait for new change events to appear after receiving no events,
- * given in milliseconds. Defaults to 500 ms.
+ * The maximum number of records that should be loaded into memory while
+ * performing a snapshot
*/
- public void setPollIntervalMs(long pollIntervalMs) {
- this.pollIntervalMs = pollIntervalMs;
+ public void setSnapshotFetchSize(int snapshotFetchSize) {
+ this.snapshotFetchSize = snapshotFetchSize;
}
- public long getPollIntervalMs() {
- return pollIntervalMs;
+ public int getSnapshotFetchSize() {
+ return snapshotFetchSize;
}
/**
- * The number of milliseconds to wait while polling for persisted data
- * during recovery.
+ * The maximum number of millis to wait for table locks at the beginning of
+ * a snapshot. If locks cannot be acquired in this time frame, the snapshot
+ * will be aborted. Defaults to 10 seconds
*/
- public void setDatabaseHistoryKafkaRecoveryPollIntervalMs(
- int databaseHistoryKafkaRecoveryPollIntervalMs) {
- this.databaseHistoryKafkaRecoveryPollIntervalMs =
databaseHistoryKafkaRecoveryPollIntervalMs;
+ public void setSnapshotLockTimeoutMs(long snapshotLockTimeoutMs) {
+ this.snapshotLockTimeoutMs = snapshotLockTimeoutMs;
}
- public int getDatabaseHistoryKafkaRecoveryPollIntervalMs() {
- return databaseHistoryKafkaRecoveryPollIntervalMs;
+ public long getSnapshotLockTimeoutMs() {
+ return snapshotLockTimeoutMs;
}
/**
- * The name of the data collection that is used to send signals/commands to
- * Debezium. Signaling is disabled when not set.
+ * The number of milliseconds to wait while fetching cluster information
+ * using Kafka admin client.
*/
- public void setSignalDataCollection(String signalDataCollection) {
- this.signalDataCollection = signalDataCollection;
+ public void setDatabaseHistoryKafkaQueryTimeoutMs(
+ long databaseHistoryKafkaQueryTimeoutMs) {
+ this.databaseHistoryKafkaQueryTimeoutMs =
databaseHistoryKafkaQueryTimeoutMs;
}
- public String getSignalDataCollection() {
- return signalDataCollection;
+ public long getDatabaseHistoryKafkaQueryTimeoutMs() {
+ return databaseHistoryKafkaQueryTimeoutMs;
}
/**
- * When set to `false`, the default, LOB fields will not be captured nor
- * emitted. When set to `true`, the connector will capture LOB fields and
- * emit changes for those fields like any other column type.
- */
- public void setLobEnabled(boolean lobEnabled) {
+ * Used for SCN gap detection, if the difference between current SCN and
+ * previous end SCN is bigger than this value, and the time difference of
+ * current SCN and previous end SCN is smaller than
+ * log.mining.scn.gap.detection.time.interval.max.ms, consider it a SCN
gap.
+ */
+ public void setLogMiningScnGapDetectionGapSizeMin(
+ long logMiningScnGapDetectionGapSizeMin) {
+ this.logMiningScnGapDetectionGapSizeMin =
logMiningScnGapDetectionGapSizeMin;
+ }
+
+ public long getLogMiningScnGapDetectionGapSizeMin() {
+ return logMiningScnGapDetectionGapSizeMin;
+ }
+
+ /**
+ * The name of the database from which the connector should capture changes
+ */
+ public void setDatabaseDbname(String databaseDbname) {
+ this.databaseDbname = databaseDbname;
+ }
+
+ public String getDatabaseDbname() {
+ return databaseDbname;
+ }
+
+ /**
+ * The amount of time that the connector will sleep after reading data from
+ * redo/archive logs and before starting reading data again. Value is in
+ * milliseconds.
+ */
+ public void setLogMiningSleepTimeDefaultMs(long
logMiningSleepTimeDefaultMs) {
+ this.logMiningSleepTimeDefaultMs = logMiningSleepTimeDefaultMs;
+ }
+
+ public long getLogMiningSleepTimeDefaultMs() {
+ return logMiningSleepTimeDefaultMs;
+ }
+
+ /**
+ * This property contains a comma-separated list of fully-qualified tables
+ * (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on
+ * thespecific connectors. Select statements for the individual tables are
+ * specified in further configuration properties, one for each table,
+ * identified by the id
+ * 'snapshot.select.statement.overrides.[DB_NAME].[TABLE_NAME]' or
+ * 'snapshot.select.statement.overrides.[SCHEMA_NAME].[TABLE_NAME]',
+ * respectively. The value of those properties is the select statement to
+ * use when retrieving data from the specific table during snapshotting. A
+ * possible use case for large append-only tables is setting a specific
+ * point where to start (resume) snapshotting, in case a previous
+ * snapshotting was interrupted.
+ */
+ public void setSnapshotSelectStatementOverrides(
+ String snapshotSelectStatementOverrides) {
+ this.snapshotSelectStatementOverrides =
snapshotSelectStatementOverrides;
+ }
+
+ public String getSnapshotSelectStatementOverrides() {
+ return snapshotSelectStatementOverrides;
+ }
+
+ /**
+ * A version of the format of the publicly visible source part in the
+ * message
+ */
+ public void setSourceStructVersion(String sourceStructVersion) {
+ this.sourceStructVersion = sourceStructVersion;
+ }
+
+ public String getSourceStructVersion() {
+ return sourceStructVersion;
+ }
+
+ /**
+ * Regular expressions matching columns to include in change events
+ * (deprecated, use "column.include.list" instead)
+ */
+ public void setColumnWhitelist(String columnWhitelist) {
+ this.columnWhitelist = columnWhitelist;
+ }
+
+ public String getColumnWhitelist() {
+ return columnWhitelist;
+ }
+
+ /**
+ * The interval in milliseconds to wait between polls checking to see if
the
+ * SCN is in the archive logs.
+ */
+ public void setLogMiningArchiveLogOnlyScnPollIntervalMs(
+ long logMiningArchiveLogOnlyScnPollIntervalMs) {
+ this.logMiningArchiveLogOnlyScnPollIntervalMs =
logMiningArchiveLogOnlyScnPollIntervalMs;
+ }
+
+ public long getLogMiningArchiveLogOnlyScnPollIntervalMs() {
+ return logMiningArchiveLogOnlyScnPollIntervalMs;
+ }
+
+ /**
+ * A comma-separated list of regular expressions that match the
+ * fully-qualified names of tables to be excluded from monitoring
+ */
+ public void setTableExcludeList(String tableExcludeList) {
+ this.tableExcludeList = tableExcludeList;
+ }
+
+ public String getTableExcludeList() {
+ return tableExcludeList;
+ }
+
+ /**
+ * Maximum size of each batch of source records. Defaults to 2048.
+ */
+ public void setMaxBatchSize(int maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+ }
+
+ public int getMaxBatchSize() {
+ return maxBatchSize;
+ }
+
+ /**
+ * Specifies the XML configuration for the Infinispan 'transactions' cache
+ */
+ public void setLogMiningBufferInfinispanCacheTransactions(
+ String logMiningBufferInfinispanCacheTransactions) {
+ this.logMiningBufferInfinispanCacheTransactions =
logMiningBufferInfinispanCacheTransactions;
+ }
+
+ public String getLogMiningBufferInfinispanCacheTransactions() {
+ return logMiningBufferInfinispanCacheTransactions;
+ }
+
+ /**
+ * The criteria for running a snapshot upon startup of the connector.
+ * Options include: 'initial' (the default) to specify the connector should
+ * run a snapshot only when no offsets are available for the logical server
+ * name; 'schema_only' to specify the connector should run a snapshot of
the
+ * schema when no offsets are available for the logical server name.
+ */
+ public void setSnapshotMode(String snapshotMode) {
+ this.snapshotMode = snapshotMode;
+ }
+
+ public String getSnapshotMode() {
+ return snapshotMode;
+ }
+
+ /**
+ * The name of the topic for the database schema history
+ */
+ public void setDatabaseHistoryKafkaTopic(String databaseHistoryKafkaTopic)
{
+ this.databaseHistoryKafkaTopic = databaseHistoryKafkaTopic;
+ }
+
+ public String getDatabaseHistoryKafkaTopic() {
+ return databaseHistoryKafkaTopic;
+ }
+
+ /**
+ * Time to wait before restarting connector after retriable exception
+ * occurs. Defaults to 10000ms.
+ */
+ public void setRetriableRestartConnectorWaitMs(
+ long retriableRestartConnectorWaitMs) {
+ this.retriableRestartConnectorWaitMs = retriableRestartConnectorWaitMs;
+ }
+
+ public long getRetriableRestartConnectorWaitMs() {
+ return retriableRestartConnectorWaitMs;
+ }
+
+ /**
+ * A delay period before a snapshot will begin, given in milliseconds.
+ * Defaults to 0 ms.
+ */
+ public void setSnapshotDelayMs(long snapshotDelayMs) {
+ this.snapshotDelayMs = snapshotDelayMs;
+ }
+
+ public long getSnapshotDelayMs() {
+ return snapshotDelayMs;
+ }
+
+ /**
+ * There are strategies: Online catalog with faster mining but no captured
+ * DDL. Another - with data dictionary loaded into REDO LOG files
+ */
+ public void setLogMiningStrategy(String logMiningStrategy) {
+ this.logMiningStrategy = logMiningStrategy;
+ }
+
+ public String getLogMiningStrategy() {
+ return logMiningStrategy;
+ }
+
+ /**
+ * The tables for which changes are to be captured (deprecated, use
+ * "table.include.list" instead)
+ */
+ public void setTableWhitelist(String tableWhitelist) {
+ this.tableWhitelist = tableWhitelist;
+ }
+
+ public String getTableWhitelist() {
+ return tableWhitelist;
+ }
+
+ /**
+ * Whether delete operations should be represented by a delete event and a
+ * subsquenttombstone event (true) or only by a delete event (false).
+ * Emitting the tombstone event (the default behavior) allows Kafka to
+ * completely delete all events pertaining to the given key once the source
+ * record got deleted.
+ */
+ public void setTombstonesOnDelete(boolean tombstonesOnDelete) {
+ this.tombstonesOnDelete = tombstonesOnDelete;
+ }
+
+ public boolean isTombstonesOnDelete() {
+ return tombstonesOnDelete;
+ }
+
+ /**
+ * Specify how DECIMAL and NUMERIC columns should be represented in change
+ * events, including:'precise' (the default) uses java.math.BigDecimal to
+ * represent values, which are encoded in the change events using a binary
+ * representation and Kafka Connect's
+ * 'org.apache.kafka.connect.data.Decimal' type; 'string' uses string to
+ * represent values; 'double' represents values using Java's 'double',
which
+ * may not offer the precision but will be far easier to use in consumers.
+ */
+ public void setDecimalHandlingMode(String decimalHandlingMode) {
+ this.decimalHandlingMode = decimalHandlingMode;
+ }
+
+ public String getDecimalHandlingMode() {
+ return decimalHandlingMode;
+ }
+
+ /**
+ * Specify how binary (blob, binary, etc.) columns should be represented in
+ * change events, including:'bytes' represents binary data as byte array
+ * (default)'base64' represents binary data as base64-encoded string'hex'
+ * represents binary data as hex-encoded (base16) string
+ */
+ public void setBinaryHandlingMode(String binaryHandlingMode) {
+ this.binaryHandlingMode = binaryHandlingMode;
+ }
+
+ public String getBinaryHandlingMode() {
+ return binaryHandlingMode;
+ }
+
+ /**
+ * Name of the XStream Out server to connect to.
+ */
+ public void setDatabaseOutServerName(String databaseOutServerName) {
+ this.databaseOutServerName = databaseOutServerName;
+ }
+
+ public String getDatabaseOutServerName() {
+ return databaseOutServerName;
+ }
+
+ /**
+ * this setting must be set to specify a list of tables/collections whose
+ * snapshot must be taken on creating or restarting the connector.
+ */
+ public void setSnapshotIncludeCollectionList(
+ String snapshotIncludeCollectionList) {
+ this.snapshotIncludeCollectionList = snapshotIncludeCollectionList;
+ }
+
+ public String getSnapshotIncludeCollectionList() {
+ return snapshotIncludeCollectionList;
+ }
+
+ /**
+ * The path to the file that will be used to record the database history
+ */
+ public void setDatabaseHistoryFileFilename(
+ String databaseHistoryFileFilename) {
+ this.databaseHistoryFileFilename = databaseHistoryFileFilename;
+ }
+
+ public String getDatabaseHistoryFileFilename() {
+ return databaseHistoryFileFilename;
+ }
+
+ /**
+ * Name of the pluggable database when working with a multi-tenant set-up.
+ * The CDB name must be given via database.dbname in this case.
+ */
+ public void setDatabasePdbName(String databasePdbName) {
+ this.databasePdbName = databasePdbName;
+ }
+
+ public String getDatabasePdbName() {
+ return databasePdbName;
+ }
+
+ /**
+ * The adapter to use when capturing changes from the database. Options
+ * include: 'logminer': (the default) to capture changes using native
Oracle
+ * LogMiner; 'xstream' to capture changes using Oracle XStreams
+ */
+ public void setDatabaseConnectionAdapter(String databaseConnectionAdapter)
{
+ this.databaseConnectionAdapter = databaseConnectionAdapter;
+ }
+
+ public String getDatabaseConnectionAdapter() {
+ return databaseConnectionAdapter;
+ }
+
+ /**
+ * The buffer type controls how the connector manages buffering transaction
+ * data.
+ *
+ * memory - Uses the JVM process' heap to buffer all transaction data.
+ *
+ * infinispan_embedded - This option uses an embedded Infinispan cache to
+ * buffer transaction data and persist it to disk.
+ *
+ * infinispan_remote - This option uses a remote Infinispan cluster to
+ * buffer transaction data and persist it to disk.
+ */
+ public void setLogMiningBufferType(String logMiningBufferType) {
+ this.logMiningBufferType = logMiningBufferType;
+ }
+
+ public String getLogMiningBufferType() {
+ return logMiningBufferType;
+ }
+
+ /**
+ * Specify how failures during processing of events (i.e. when encountering
+ * a corrupted event) should be handled, including:'fail' (the default) an
+ * exception indicating the problematic event and its position is raised,
+ * causing the connector to be stopped; 'warn' the problematic event and
its
+ * position will be logged and the event will be skipped;'ignore' the
+ * problematic event will be skipped.
+ */
+ public void setEventProcessingFailureHandlingMode(
+ String eventProcessingFailureHandlingMode) {
+ this.eventProcessingFailureHandlingMode =
eventProcessingFailureHandlingMode;
+ }
+
+ public String getEventProcessingFailureHandlingMode() {
+ return eventProcessingFailureHandlingMode;
+ }
+
+ /**
+ * The maximum number of threads used to perform the snapshot. Defaults to
+ * 1.
+ */
+ public void setSnapshotMaxThreads(int snapshotMaxThreads) {
+ this.snapshotMaxThreads = snapshotMaxThreads;
+ }
+
+ public int getSnapshotMaxThreads() {
+ return snapshotMaxThreads;
+ }
+
+ /**
+ * Specify how schema names should be adjusted for compatibility with the
+ * message converter used by the connector, including:'avro' replaces the
+ * characters that cannot be used in the Avro type name with underscore
+ * (default)'none' does not apply any adjustment
+ */
+ public void setSchemaNameAdjustmentMode(String schemaNameAdjustmentMode) {
+ this.schemaNameAdjustmentMode = schemaNameAdjustmentMode;
+ }
+
+ public String getSchemaNameAdjustmentMode() {
+ return schemaNameAdjustmentMode;
+ }
+
+ /**
+ * The starting SCN interval size that the connector will use for reading
+ * data from redo/archive logs.
+ */
+ public void setLogMiningBatchSizeDefault(long logMiningBatchSizeDefault) {
+ this.logMiningBatchSizeDefault = logMiningBatchSizeDefault;
+ }
+
+ public long getLogMiningBatchSizeDefault() {
+ return logMiningBatchSizeDefault;
+ }
+
+ /**
+ * The tables for which changes are to be captured
+ */
+ public void setTableIncludeList(String tableIncludeList) {
+ this.tableIncludeList = tableIncludeList;
+ }
+
+ public String getTableIncludeList() {
+ return tableIncludeList;
+ }
+
+ /**
+ * The maximum number of records that should be loaded into memory while
+ * streaming. A value of `0` uses the default JDBC fetch size.
+ */
+ public void setQueryFetchSize(int queryFetchSize) {
+ this.queryFetchSize = queryFetchSize;
+ }
+
+ public int getQueryFetchSize() {
+ return queryFetchSize;
+ }
+
+ /**
+ * The minimum amount of time that the connector will sleep after reading
+ * data from redo/archive logs and before starting reading data again.
Value
+ * is in milliseconds.
+ */
+ public void setLogMiningSleepTimeMinMs(long logMiningSleepTimeMinMs) {
+ this.logMiningSleepTimeMinMs = logMiningSleepTimeMinMs;
+ }
+
+ public long getLogMiningSleepTimeMinMs() {
+ return logMiningSleepTimeMinMs;
+ }
+
+ /**
+ * A comma-separated list of regular expressions that match the
+ * fully-qualified names of tables to be excluded from monitoring
+ * (deprecated, use "table.exclude.list" instead)
+ */
+ public void setTableBlacklist(String tableBlacklist) {
+ this.tableBlacklist = tableBlacklist;
+ }
+
+ public String getTableBlacklist() {
+ return tableBlacklist;
+ }
+
+ /**
+ * Specify the constant that will be provided by Debezium to indicate that
+ * the original value is unavailable and not provided by the database.
+ */
+ public void setUnavailableValuePlaceholder(
+ String unavailableValuePlaceholder) {
+ this.unavailableValuePlaceholder = unavailableValuePlaceholder;
+ }
+
+ public String getUnavailableValuePlaceholder() {
+ return unavailableValuePlaceholder;
+ }
+
+ /**
+ * The query executed with every heartbeat.
+ */
+ public void setHeartbeatActionQuery(String heartbeatActionQuery) {
+ this.heartbeatActionQuery = heartbeatActionQuery;
+ }
+
+ public String getHeartbeatActionQuery() {
+ return heartbeatActionQuery;
+ }
+
+ /**
+ * Time to wait for new change events to appear after receiving no events,
+ * given in milliseconds. Defaults to 500 ms.
+ */
+ public void setPollIntervalMs(long pollIntervalMs) {
+ this.pollIntervalMs = pollIntervalMs;
+ }
+
+ public long getPollIntervalMs() {
+ return pollIntervalMs;
+ }
+
+ /**
+ * The number of milliseconds to wait while polling for persisted data
+ * during recovery.
+ */
+ public void setDatabaseHistoryKafkaRecoveryPollIntervalMs(
+ int databaseHistoryKafkaRecoveryPollIntervalMs) {
+ this.databaseHistoryKafkaRecoveryPollIntervalMs =
databaseHistoryKafkaRecoveryPollIntervalMs;
+ }
+
+ public int getDatabaseHistoryKafkaRecoveryPollIntervalMs() {
+ return databaseHistoryKafkaRecoveryPollIntervalMs;
+ }
+
+ /**
+ * When set to `false`, the default, LOB fields will not be captured nor
+ * emitted. When set to `true`, the connector will capture LOB fields and
+ * emit changes for those fields like any other column type.
+ */
+ public void setLobEnabled(boolean lobEnabled) {
this.lobEnabled = lobEnabled;
}
@@ -429,19 +884,6 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
return databaseHistoryStoreOnlyCapturedTablesDdl;
}
- /**
- * Optional list of custom converters that would be used instead of default
- * ones. The converters are defined using '<converter.prefix>.type' config
- * option and configured using options '<converter.prefix>.<option>'
- */
- public void setConverters(String converters) {
- this.converters = converters;
- }
-
- public String getConverters() {
- return converters;
- }
-
/**
* The prefix that is used to name heartbeat topics.Defaults to
* __debezium-heartbeat.
@@ -470,18 +912,6 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
return logMiningArchiveLogOnlyMode;
}
- /**
- * The maximum number of records that should be loaded into memory while
- * performing a snapshot
- */
- public void setSnapshotFetchSize(int snapshotFetchSize) {
- this.snapshotFetchSize = snapshotFetchSize;
- }
-
- public int getSnapshotFetchSize() {
- return snapshotFetchSize;
- }
-
/**
* Specifies the XML configuration for the Infinispan 'schema-changes'
cache
*/
@@ -494,58 +924,17 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
return logMiningBufferInfinispanCacheSchemaChanges;
}
- /**
- * The maximum number of millis to wait for table locks at the beginning of
- * a snapshot. If locks cannot be acquired in this time frame, the snapshot
- * will be aborted. Defaults to 10 seconds
- */
- public void setSnapshotLockTimeoutMs(long snapshotLockTimeoutMs) {
- this.snapshotLockTimeoutMs = snapshotLockTimeoutMs;
- }
-
- public long getSnapshotLockTimeoutMs() {
- return snapshotLockTimeoutMs;
- }
-
- /**
- * The number of milliseconds to wait while fetching cluster information
- * using Kafka admin client.
- */
- public void setDatabaseHistoryKafkaQueryTimeoutMs(
- long databaseHistoryKafkaQueryTimeoutMs) {
- this.databaseHistoryKafkaQueryTimeoutMs =
databaseHistoryKafkaQueryTimeoutMs;
- }
-
- public long getDatabaseHistoryKafkaQueryTimeoutMs() {
- return databaseHistoryKafkaQueryTimeoutMs;
- }
-
/**
* The maximum amount of time that the connector will sleep after reading
- * data from redo/archive logs and before starting reading data again.
Value
- * is in milliseconds.
- */
- public void setLogMiningSleepTimeMaxMs(long logMiningSleepTimeMaxMs) {
- this.logMiningSleepTimeMaxMs = logMiningSleepTimeMaxMs;
- }
-
- public long getLogMiningSleepTimeMaxMs() {
- return logMiningSleepTimeMaxMs;
- }
-
- /**
- * Used for SCN gap detection, if the difference between current SCN and
- * previous end SCN is bigger than this value, and the time difference of
- * current SCN and previous end SCN is smaller than
- * log.mining.scn.gap.detection.time.interval.max.ms, consider it a SCN
gap.
+ * data from redo/archive logs and before starting reading data again.
Value
+ * is in milliseconds.
*/
- public void setLogMiningScnGapDetectionGapSizeMin(
- long logMiningScnGapDetectionGapSizeMin) {
- this.logMiningScnGapDetectionGapSizeMin =
logMiningScnGapDetectionGapSizeMin;
+ public void setLogMiningSleepTimeMaxMs(long logMiningSleepTimeMaxMs) {
+ this.logMiningSleepTimeMaxMs = logMiningSleepTimeMaxMs;
}
- public long getLogMiningScnGapDetectionGapSizeMin() {
- return logMiningScnGapDetectionGapSizeMin;
+ public long getLogMiningSleepTimeMaxMs() {
+ return logMiningSleepTimeMaxMs;
}
/**
@@ -559,17 +948,6 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
return databaseUser;
}
- /**
- * The name of the database from which the connector should capture changes
- */
- public void setDatabaseDbname(String databaseDbname) {
- this.databaseDbname = databaseDbname;
- }
-
- public String getDatabaseDbname() {
- return databaseDbname;
- }
-
/**
* A comma-separated list of regular expressions matching the
* database-specific data type names that adds the data type's original
type
@@ -596,42 +974,6 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
return sanitizeFieldNames;
}
- /**
- * The amount of time that the connector will sleep after reading data from
- * redo/archive logs and before starting reading data again. Value is in
- * milliseconds.
- */
- public void setLogMiningSleepTimeDefaultMs(long
logMiningSleepTimeDefaultMs) {
- this.logMiningSleepTimeDefaultMs = logMiningSleepTimeDefaultMs;
- }
-
- public long getLogMiningSleepTimeDefaultMs() {
- return logMiningSleepTimeDefaultMs;
- }
-
- /**
- * This property contains a comma-separated list of fully-qualified tables
- * (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on
- * thespecific connectors. Select statements for the individual tables are
- * specified in further configuration properties, one for each table,
- * identified by the id
- * 'snapshot.select.statement.overrides.[DB_NAME].[TABLE_NAME]' or
- * 'snapshot.select.statement.overrides.[SCHEMA_NAME].[TABLE_NAME]',
- * respectively. The value of those properties is the select statement to
- * use when retrieving data from the specific table during snapshotting. A
- * possible use case for large append-only tables is setting a specific
- * point where to start (resume) snapshotting, in case a previous
- * snapshotting was interrupted.
- */
- public void setSnapshotSelectStatementOverrides(
- String snapshotSelectStatementOverrides) {
- this.snapshotSelectStatementOverrides =
snapshotSelectStatementOverrides;
- }
-
- public String getSnapshotSelectStatementOverrides() {
- return snapshotSelectStatementOverrides;
- }
-
/**
* A list of host/port pairs that the connector will use for establishing
* the initial connection to the Kafka cluster for retrieving database
@@ -647,18 +989,6 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
return databaseHistoryKafkaBootstrapServers;
}
- /**
- * A version of the format of the publicly visible source part in the
- * message
- */
- public void setSourceStructVersion(String sourceStructVersion) {
- this.sourceStructVersion = sourceStructVersion;
- }
-
- public String getSourceStructVersion() {
- return sourceStructVersion;
- }
-
/**
* Length of an interval in milli-seconds in in which the connector
* periodically sends heartbeat messages to a heartbeat topic. Use 0 to
@@ -672,31 +1002,6 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
return heartbeatIntervalMs;
}
- /**
- * Regular expressions matching columns to include in change events
- * (deprecated, use "column.include.list" instead)
- */
- public void setColumnWhitelist(String columnWhitelist) {
- this.columnWhitelist = columnWhitelist;
- }
-
- public String getColumnWhitelist() {
- return columnWhitelist;
- }
-
- /**
- * The interval in milliseconds to wait between polls checking to see if
the
- * SCN is in the archive logs.
- */
- public void setLogMiningArchiveLogOnlyScnPollIntervalMs(
- long logMiningArchiveLogOnlyScnPollIntervalMs) {
- this.logMiningArchiveLogOnlyScnPollIntervalMs =
logMiningArchiveLogOnlyScnPollIntervalMs;
- }
-
- public long getLogMiningArchiveLogOnlyScnPollIntervalMs() {
- return logMiningArchiveLogOnlyScnPollIntervalMs;
- }
-
/**
* Regular expressions matching columns to include in change events
*/
@@ -747,18 +1052,6 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
return logMiningBufferInfinispanCacheProcessedTransactions;
}
- /**
- * A comma-separated list of regular expressions that match the
- * fully-qualified names of tables to be excluded from monitoring
- */
- public void setTableExcludeList(String tableExcludeList) {
- this.tableExcludeList = tableExcludeList;
- }
-
- public String getTableExcludeList() {
- return tableExcludeList;
- }
-
/**
* Password of the database user to be used when connecting to the
database.
*/
@@ -798,17 +1091,6 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
return databaseHistoryStoreOnlyMonitoredTablesDdl;
}
- /**
- * Maximum size of each batch of source records. Defaults to 2048.
- */
- public void setMaxBatchSize(int maxBatchSize) {
- this.maxBatchSize = maxBatchSize;
- }
-
- public int getMaxBatchSize() {
- return maxBatchSize;
- }
-
/**
* The comma-separated list of operations to skip during streaming, defined
* as: 'c' for inserts/create; 'u' for updates; 'd' for deletes, 't' for
@@ -823,18 +1105,6 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
return skippedOperations;
}
- /**
- * Specifies the XML configuration for the Infinispan 'transactions' cache
- */
- public void setLogMiningBufferInfinispanCacheTransactions(
- String logMiningBufferInfinispanCacheTransactions) {
- this.logMiningBufferInfinispanCacheTransactions =
logMiningBufferInfinispanCacheTransactions;
- }
-
- public String getLogMiningBufferInfinispanCacheTransactions() {
- return logMiningBufferInfinispanCacheTransactions;
- }
-
/**
* Used for SCN gap detection, if the difference between current SCN and
* previous end SCN is bigger than
@@ -851,21 +1121,6 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
return logMiningScnGapDetectionTimeIntervalMaxMs;
}
- /**
- * The criteria for running a snapshot upon startup of the connector.
- * Options include: 'initial' (the default) to specify the connector should
- * run a snapshot only when no offsets are available for the logical server
- * name; 'schema_only' to specify the connector should run a snapshot of
the
- * schema when no offsets are available for the logical server name.
- */
- public void setSnapshotMode(String snapshotMode) {
- this.snapshotMode = snapshotMode;
- }
-
- public String getSnapshotMode() {
- return snapshotMode;
- }
-
/**
* The name of the DatabaseHistory class that should be used to store and
* recover database schema changes. The configuration properties for the
@@ -900,138 +1155,32 @@ public class
OracleConnectorEmbeddedDebeziumConfiguration
}
public String getRacNodes() {
- return racNodes;
- }
-
- /**
- * The name of the topic for the database schema history
- */
- public void setDatabaseHistoryKafkaTopic(String databaseHistoryKafkaTopic)
{
- this.databaseHistoryKafkaTopic = databaseHistoryKafkaTopic;
- }
-
- public String getDatabaseHistoryKafkaTopic() {
- return databaseHistoryKafkaTopic;
- }
-
- /**
- * Time to wait before restarting connector after retriable exception
- * occurs. Defaults to 10000ms.
- */
- public void setRetriableRestartConnectorWaitMs(
- long retriableRestartConnectorWaitMs) {
- this.retriableRestartConnectorWaitMs = retriableRestartConnectorWaitMs;
- }
-
- public long getRetriableRestartConnectorWaitMs() {
- return retriableRestartConnectorWaitMs;
- }
-
- /**
- * A delay period before a snapshot will begin, given in milliseconds.
- * Defaults to 0 ms.
- */
- public void setSnapshotDelayMs(long snapshotDelayMs) {
- this.snapshotDelayMs = snapshotDelayMs;
- }
-
- public long getSnapshotDelayMs() {
- return snapshotDelayMs;
- }
-
- /**
- * There are strategies: Online catalog with faster mining but no captured
- * DDL. Another - with data dictionary loaded into REDO LOG files
- */
- public void setLogMiningStrategy(String logMiningStrategy) {
- this.logMiningStrategy = logMiningStrategy;
- }
-
- public String getLogMiningStrategy() {
- return logMiningStrategy;
- }
-
- /**
- * The number of attempts in a row that no data are returned from Kafka
- * before recover completes. The maximum amount of time to wait after
- * receiving no data is (recovery.attempts) x (recovery.poll.interval.ms).
- */
- public void setDatabaseHistoryKafkaRecoveryAttempts(
- int databaseHistoryKafkaRecoveryAttempts) {
- this.databaseHistoryKafkaRecoveryAttempts =
databaseHistoryKafkaRecoveryAttempts;
- }
-
- public int getDatabaseHistoryKafkaRecoveryAttempts() {
- return databaseHistoryKafkaRecoveryAttempts;
- }
-
- /**
- * Enables transaction metadata extraction together with event counting
- */
- public void setProvideTransactionMetadata(boolean
provideTransactionMetadata) {
- this.provideTransactionMetadata = provideTransactionMetadata;
- }
-
- public boolean isProvideTransactionMetadata() {
- return provideTransactionMetadata;
- }
-
- /**
- * The tables for which changes are to be captured (deprecated, use
- * "table.include.list" instead)
- */
- public void setTableWhitelist(String tableWhitelist) {
- this.tableWhitelist = tableWhitelist;
- }
-
- public String getTableWhitelist() {
- return tableWhitelist;
- }
-
- /**
- * Whether delete operations should be represented by a delete event and a
- * subsquenttombstone event (true) or only by a delete event (false).
- * Emitting the tombstone event (the default behavior) allows Kafka to
- * completely delete all events pertaining to the given key once the source
- * record got deleted.
- */
- public void setTombstonesOnDelete(boolean tombstonesOnDelete) {
- this.tombstonesOnDelete = tombstonesOnDelete;
- }
-
- public boolean isTombstonesOnDelete() {
- return tombstonesOnDelete;
- }
-
- /**
- * Specify how DECIMAL and NUMERIC columns should be represented in change
- * events, including:'precise' (the default) uses java.math.BigDecimal to
- * represent values, which are encoded in the change events using a binary
- * representation and Kafka Connect's
- * 'org.apache.kafka.connect.data.Decimal' type; 'string' uses string to
- * represent values; 'double' represents values using Java's 'double',
which
- * may not offer the precision but will be far easier to use in consumers.
+ return racNodes;
+ }
+
+ /**
+ * The number of attempts in a row that no data are returned from Kafka
+ * before recover completes. The maximum amount of time to wait after
+ * receiving no data is (recovery.attempts) x (recovery.poll.interval.ms).
*/
- public void setDecimalHandlingMode(String decimalHandlingMode) {
- this.decimalHandlingMode = decimalHandlingMode;
+ public void setDatabaseHistoryKafkaRecoveryAttempts(
+ int databaseHistoryKafkaRecoveryAttempts) {
+ this.databaseHistoryKafkaRecoveryAttempts =
databaseHistoryKafkaRecoveryAttempts;
}
- public String getDecimalHandlingMode() {
- return decimalHandlingMode;
+ public int getDatabaseHistoryKafkaRecoveryAttempts() {
+ return databaseHistoryKafkaRecoveryAttempts;
}
/**
- * Specify how binary (blob, binary, etc.) columns should be represented in
- * change events, including:'bytes' represents binary data as byte array
- * (default)'base64' represents binary data as base64-encoded string'hex'
- * represents binary data as hex-encoded (base16) string
+ * Enables transaction metadata extraction together with event counting
*/
- public void setBinaryHandlingMode(String binaryHandlingMode) {
- this.binaryHandlingMode = binaryHandlingMode;
+ public void setProvideTransactionMetadata(boolean
provideTransactionMetadata) {
+ this.provideTransactionMetadata = provideTransactionMetadata;
}
- public String getBinaryHandlingMode() {
- return binaryHandlingMode;
+ public boolean isProvideTransactionMetadata() {
+ return provideTransactionMetadata;
}
/**
@@ -1065,17 +1214,6 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
return databaseHistorySkipUnparseableDdl;
}
- /**
- * Name of the XStream Out server to connect to.
- */
- public void setDatabaseOutServerName(String databaseOutServerName) {
- this.databaseOutServerName = databaseOutServerName;
- }
-
- public String getDatabaseOutServerName() {
- return databaseOutServerName;
- }
-
/**
* The number of hours in the past from SYSDATE to mine archive logs.
Using
* 0 mines all available archive logs
@@ -1101,31 +1239,6 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
return logMiningTransactionRetentionHours;
}
- /**
- * this setting must be set to specify a list of tables/collections whose
- * snapshot must be taken on creating or restarting the connector.
- */
- public void setSnapshotIncludeCollectionList(
- String snapshotIncludeCollectionList) {
- this.snapshotIncludeCollectionList = snapshotIncludeCollectionList;
- }
-
- public String getSnapshotIncludeCollectionList() {
- return snapshotIncludeCollectionList;
- }
-
- /**
- * The path to the file that will be used to record the database history
- */
- public void setDatabaseHistoryFileFilename(
- String databaseHistoryFileFilename) {
- this.databaseHistoryFileFilename = databaseHistoryFileFilename;
- }
-
- public String getDatabaseHistoryFileFilename() {
- return databaseHistoryFileFilename;
- }
-
/**
* The maximum SCN interval size that this connector will use when reading
* from redo/archive logs.
@@ -1138,31 +1251,6 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
return logMiningBatchSizeMax;
}
- /**
- * Name of the pluggable database when working with a multi-tenant set-up.
- * The CDB name must be given via database.dbname in this case.
- */
- public void setDatabasePdbName(String databasePdbName) {
- this.databasePdbName = databasePdbName;
- }
-
- public String getDatabasePdbName() {
- return databasePdbName;
- }
-
- /**
- * The adapter to use when capturing changes from the database. Options
- * include: 'logminer': (the default) to capture changes using native
Oracle
- * LogMiner; 'xstream' to capture changes using Oracle XStreams
- */
- public void setDatabaseConnectionAdapter(String databaseConnectionAdapter)
{
- this.databaseConnectionAdapter = databaseConnectionAdapter;
- }
-
- public String getDatabaseConnectionAdapter() {
- return databaseConnectionAdapter;
- }
-
/**
* Maximum size of the queue in bytes for change events read from the
* database log but not yet recorded or forwarded. Defaults to 0. Mean the
@@ -1176,26 +1264,6 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
return maxQueueSizeInBytes;
}
- /**
- * The buffer type controls how the connector manages buffering transaction
- * data.
- *
- * memory - Uses the JVM process' heap to buffer all transaction data.
- *
- * infinispan_embedded - This option uses an embedded Infinispan cache to
- * buffer transaction data and persist it to disk.
- *
- * infinispan_remote - This option uses a remote Infinispan cluster to
- * buffer transaction data and persist it to disk.
- */
- public void setLogMiningBufferType(String logMiningBufferType) {
- this.logMiningBufferType = logMiningBufferType;
- }
-
- public String getLogMiningBufferType() {
- return logMiningBufferType;
- }
-
/**
* The name of the transaction metadata topic. The placeholder
* ${database.server.name} can be used for referring to the connector's
@@ -1253,35 +1321,6 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
return databaseServerName;
}
- /**
- * Specify how failures during processing of events (i.e. when encountering
- * a corrupted event) should be handled, including:'fail' (the default) an
- * exception indicating the problematic event and its position is raised,
- * causing the connector to be stopped; 'warn' the problematic event and
its
- * position will be logged and the event will be skipped;'ignore' the
- * problematic event will be skipped.
- */
- public void setEventProcessingFailureHandlingMode(
- String eventProcessingFailureHandlingMode) {
- this.eventProcessingFailureHandlingMode =
eventProcessingFailureHandlingMode;
- }
-
- public String getEventProcessingFailureHandlingMode() {
- return eventProcessingFailureHandlingMode;
- }
-
- /**
- * The maximum number of threads used to perform the snapshot. Defaults to
- * 1.
- */
- public void setSnapshotMaxThreads(int snapshotMaxThreads) {
- this.snapshotMaxThreads = snapshotMaxThreads;
- }
-
- public int getSnapshotMaxThreads() {
- return snapshotMaxThreads;
- }
-
/**
* Port of the database server.
*/
@@ -1318,6 +1357,19 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
return columnExcludeList;
}
+ /**
+ * The maximum number of milliseconds that a LogMiner session lives for
+ * before being restarted. Defaults to 0 (indefinite until a log switch
+ * occurs)
+ */
+ public void setLogMiningSessionMaxMs(long logMiningSessionMaxMs) {
+ this.logMiningSessionMaxMs = logMiningSessionMaxMs;
+ }
+
+ public long getLogMiningSessionMaxMs() {
+ return logMiningSessionMaxMs;
+ }
+
/**
* Resolvable hostname or IP address of the database server.
*/
@@ -1342,32 +1394,6 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
return logMiningBatchSizeMin;
}
- /**
- * Specify how schema names should be adjusted for compatibility with the
- * message converter used by the connector, including:'avro' replaces the
- * characters that cannot be used in the Avro type name with underscore
- * (default)'none' does not apply any adjustment
- */
- public void setSchemaNameAdjustmentMode(String schemaNameAdjustmentMode) {
- this.schemaNameAdjustmentMode = schemaNameAdjustmentMode;
- }
-
- public String getSchemaNameAdjustmentMode() {
- return schemaNameAdjustmentMode;
- }
-
- /**
- * The starting SCN interval size that the connector will use for reading
- * data from redo/archive logs.
- */
- public void setLogMiningBatchSizeDefault(long logMiningBatchSizeDefault) {
- this.logMiningBatchSizeDefault = logMiningBatchSizeDefault;
- }
-
- public long getLogMiningBatchSizeDefault() {
- return logMiningBatchSizeDefault;
- }
-
/**
* A token to replace on snapshot predicate template
*/
@@ -1380,17 +1406,6 @@ public class OracleConnectorEmbeddedDebeziumConfiguration
return snapshotEnhancePredicateScn;
}
- /**
- * The tables for which changes are to be captured
- */
- public void setTableIncludeList(String tableIncludeList) {
- this.tableIncludeList = tableIncludeList;
- }
-
- public String getTableIncludeList() {
- return tableIncludeList;
- }
-
@Override
protected Configuration createConnectorConfiguration() {
final Configuration.Builder configBuilder = Configuration.create();
@@ -1398,93 +1413,94 @@ public class
OracleConnectorEmbeddedDebeziumConfiguration
addPropertyIfNotNull(configBuilder, "snapshot.locking.mode",
snapshotLockingMode);
addPropertyIfNotNull(configBuilder, "log.mining.buffer.drop.on.stop",
logMiningBufferDropOnStop);
addPropertyIfNotNull(configBuilder, "message.key.columns",
messageKeyColumns);
- addPropertyIfNotNull(configBuilder, "query.fetch.size",
queryFetchSize);
addPropertyIfNotNull(configBuilder,
"log.mining.archive.destination.name", logMiningArchiveDestinationName);
addPropertyIfNotNull(configBuilder, "column.blacklist",
columnBlacklist);
+ addPropertyIfNotNull(configBuilder, "include.schema.changes",
includeSchemaChanges);
+ addPropertyIfNotNull(configBuilder, "signal.data.collection",
signalDataCollection);
+ addPropertyIfNotNull(configBuilder, "converters", converters);
+ addPropertyIfNotNull(configBuilder, "snapshot.fetch.size",
snapshotFetchSize);
+ addPropertyIfNotNull(configBuilder, "snapshot.lock.timeout.ms",
snapshotLockTimeoutMs);
+ addPropertyIfNotNull(configBuilder,
"database.history.kafka.query.timeout.ms", databaseHistoryKafkaQueryTimeoutMs);
+ addPropertyIfNotNull(configBuilder,
"log.mining.scn.gap.detection.gap.size.min",
logMiningScnGapDetectionGapSizeMin);
+ addPropertyIfNotNull(configBuilder, "database.dbname", databaseDbname);
+ addPropertyIfNotNull(configBuilder,
"log.mining.sleep.time.default.ms", logMiningSleepTimeDefaultMs);
+ addPropertyIfNotNull(configBuilder,
"snapshot.select.statement.overrides", snapshotSelectStatementOverrides);
+ addPropertyIfNotNull(configBuilder, "source.struct.version",
sourceStructVersion);
+ addPropertyIfNotNull(configBuilder, "column.whitelist",
columnWhitelist);
+ addPropertyIfNotNull(configBuilder,
"log.mining.archive.log.only.scn.poll.interval.ms",
logMiningArchiveLogOnlyScnPollIntervalMs);
+ addPropertyIfNotNull(configBuilder, "table.exclude.list",
tableExcludeList);
+ addPropertyIfNotNull(configBuilder, "max.batch.size", maxBatchSize);
+ addPropertyIfNotNull(configBuilder,
"log.mining.buffer.infinispan.cache.transactions",
logMiningBufferInfinispanCacheTransactions);
+ addPropertyIfNotNull(configBuilder, "snapshot.mode", snapshotMode);
+ addPropertyIfNotNull(configBuilder, "database.history.kafka.topic",
databaseHistoryKafkaTopic);
+ addPropertyIfNotNull(configBuilder,
"retriable.restart.connector.wait.ms", retriableRestartConnectorWaitMs);
+ addPropertyIfNotNull(configBuilder, "snapshot.delay.ms",
snapshotDelayMs);
+ addPropertyIfNotNull(configBuilder, "log.mining.strategy",
logMiningStrategy);
+ addPropertyIfNotNull(configBuilder, "table.whitelist", tableWhitelist);
+ addPropertyIfNotNull(configBuilder, "tombstones.on.delete",
tombstonesOnDelete);
+ addPropertyIfNotNull(configBuilder, "decimal.handling.mode",
decimalHandlingMode);
+ addPropertyIfNotNull(configBuilder, "binary.handling.mode",
binaryHandlingMode);
+ addPropertyIfNotNull(configBuilder, "database.out.server.name",
databaseOutServerName);
+ addPropertyIfNotNull(configBuilder,
"snapshot.include.collection.list", snapshotIncludeCollectionList);
+ addPropertyIfNotNull(configBuilder, "database.history.file.filename",
databaseHistoryFileFilename);
+ addPropertyIfNotNull(configBuilder, "database.pdb.name",
databasePdbName);
+ addPropertyIfNotNull(configBuilder, "database.connection.adapter",
databaseConnectionAdapter);
+ addPropertyIfNotNull(configBuilder, "log.mining.buffer.type",
logMiningBufferType);
+ addPropertyIfNotNull(configBuilder,
"event.processing.failure.handling.mode", eventProcessingFailureHandlingMode);
+ addPropertyIfNotNull(configBuilder, "snapshot.max.threads",
snapshotMaxThreads);
+ addPropertyIfNotNull(configBuilder, "schema.name.adjustment.mode",
schemaNameAdjustmentMode);
+ addPropertyIfNotNull(configBuilder, "log.mining.batch.size.default",
logMiningBatchSizeDefault);
+ addPropertyIfNotNull(configBuilder, "table.include.list",
tableIncludeList);
+ addPropertyIfNotNull(configBuilder, "query.fetch.size",
queryFetchSize);
addPropertyIfNotNull(configBuilder, "log.mining.sleep.time.min.ms",
logMiningSleepTimeMinMs);
addPropertyIfNotNull(configBuilder, "table.blacklist", tableBlacklist);
- addPropertyIfNotNull(configBuilder, "include.schema.changes",
includeSchemaChanges);
addPropertyIfNotNull(configBuilder, "unavailable.value.placeholder",
unavailableValuePlaceholder);
addPropertyIfNotNull(configBuilder, "heartbeat.action.query",
heartbeatActionQuery);
addPropertyIfNotNull(configBuilder, "poll.interval.ms",
pollIntervalMs);
addPropertyIfNotNull(configBuilder,
"database.history.kafka.recovery.poll.interval.ms",
databaseHistoryKafkaRecoveryPollIntervalMs);
- addPropertyIfNotNull(configBuilder, "signal.data.collection",
signalDataCollection);
addPropertyIfNotNull(configBuilder, "lob.enabled", lobEnabled);
addPropertyIfNotNull(configBuilder, "interval.handling.mode",
intervalHandlingMode);
addPropertyIfNotNull(configBuilder,
"database.history.store.only.captured.tables.ddl",
databaseHistoryStoreOnlyCapturedTablesDdl);
- addPropertyIfNotNull(configBuilder, "converters", converters);
addPropertyIfNotNull(configBuilder, "heartbeat.topics.prefix",
heartbeatTopicsPrefix);
addPropertyIfNotNull(configBuilder,
"log.mining.archive.log.only.mode", logMiningArchiveLogOnlyMode);
- addPropertyIfNotNull(configBuilder, "snapshot.fetch.size",
snapshotFetchSize);
addPropertyIfNotNull(configBuilder,
"log.mining.buffer.infinispan.cache.schema_changes",
logMiningBufferInfinispanCacheSchemaChanges);
- addPropertyIfNotNull(configBuilder, "snapshot.lock.timeout.ms",
snapshotLockTimeoutMs);
- addPropertyIfNotNull(configBuilder,
"database.history.kafka.query.timeout.ms", databaseHistoryKafkaQueryTimeoutMs);
addPropertyIfNotNull(configBuilder, "log.mining.sleep.time.max.ms",
logMiningSleepTimeMaxMs);
- addPropertyIfNotNull(configBuilder,
"log.mining.scn.gap.detection.gap.size.min",
logMiningScnGapDetectionGapSizeMin);
addPropertyIfNotNull(configBuilder, "database.user", databaseUser);
- addPropertyIfNotNull(configBuilder, "database.dbname", databaseDbname);
addPropertyIfNotNull(configBuilder, "datatype.propagate.source.type",
datatypePropagateSourceType);
addPropertyIfNotNull(configBuilder, "sanitize.field.names",
sanitizeFieldNames);
- addPropertyIfNotNull(configBuilder,
"log.mining.sleep.time.default.ms", logMiningSleepTimeDefaultMs);
- addPropertyIfNotNull(configBuilder,
"snapshot.select.statement.overrides", snapshotSelectStatementOverrides);
addPropertyIfNotNull(configBuilder,
"database.history.kafka.bootstrap.servers",
databaseHistoryKafkaBootstrapServers);
- addPropertyIfNotNull(configBuilder, "source.struct.version",
sourceStructVersion);
addPropertyIfNotNull(configBuilder, "heartbeat.interval.ms",
heartbeatIntervalMs);
- addPropertyIfNotNull(configBuilder, "column.whitelist",
columnWhitelist);
- addPropertyIfNotNull(configBuilder,
"log.mining.archive.log.only.scn.poll.interval.ms",
logMiningArchiveLogOnlyScnPollIntervalMs);
addPropertyIfNotNull(configBuilder, "column.include.list",
columnIncludeList);
addPropertyIfNotNull(configBuilder,
"log.mining.username.exclude.list", logMiningUsernameExcludeList);
addPropertyIfNotNull(configBuilder, "column.propagate.source.type",
columnPropagateSourceType);
addPropertyIfNotNull(configBuilder,
"log.mining.buffer.infinispan.cache.processed_transactions",
logMiningBufferInfinispanCacheProcessedTransactions);
- addPropertyIfNotNull(configBuilder, "table.exclude.list",
tableExcludeList);
addPropertyIfNotNull(configBuilder, "database.password",
databasePassword);
addPropertyIfNotNull(configBuilder,
"log.mining.buffer.infinispan.cache.events",
logMiningBufferInfinispanCacheEvents);
addPropertyIfNotNull(configBuilder,
"database.history.store.only.monitored.tables.ddl",
databaseHistoryStoreOnlyMonitoredTablesDdl);
- addPropertyIfNotNull(configBuilder, "max.batch.size", maxBatchSize);
addPropertyIfNotNull(configBuilder, "skipped.operations",
skippedOperations);
- addPropertyIfNotNull(configBuilder,
"log.mining.buffer.infinispan.cache.transactions",
logMiningBufferInfinispanCacheTransactions);
addPropertyIfNotNull(configBuilder,
"log.mining.scn.gap.detection.time.interval.max.ms",
logMiningScnGapDetectionTimeIntervalMaxMs);
- addPropertyIfNotNull(configBuilder, "snapshot.mode", snapshotMode);
addPropertyIfNotNull(configBuilder, "database.history",
databaseHistory);
addPropertyIfNotNull(configBuilder, "max.queue.size", maxQueueSize);
addPropertyIfNotNull(configBuilder, "rac.nodes", racNodes);
- addPropertyIfNotNull(configBuilder, "database.history.kafka.topic",
databaseHistoryKafkaTopic);
- addPropertyIfNotNull(configBuilder,
"retriable.restart.connector.wait.ms", retriableRestartConnectorWaitMs);
- addPropertyIfNotNull(configBuilder, "snapshot.delay.ms",
snapshotDelayMs);
- addPropertyIfNotNull(configBuilder, "log.mining.strategy",
logMiningStrategy);
addPropertyIfNotNull(configBuilder,
"database.history.kafka.recovery.attempts",
databaseHistoryKafkaRecoveryAttempts);
addPropertyIfNotNull(configBuilder, "provide.transaction.metadata",
provideTransactionMetadata);
- addPropertyIfNotNull(configBuilder, "table.whitelist", tableWhitelist);
- addPropertyIfNotNull(configBuilder, "tombstones.on.delete",
tombstonesOnDelete);
- addPropertyIfNotNull(configBuilder, "decimal.handling.mode",
decimalHandlingMode);
- addPropertyIfNotNull(configBuilder, "binary.handling.mode",
binaryHandlingMode);
addPropertyIfNotNull(configBuilder, "include.schema.comments",
includeSchemaComments);
addPropertyIfNotNull(configBuilder,
"database.history.skip.unparseable.ddl", databaseHistorySkipUnparseableDdl);
- addPropertyIfNotNull(configBuilder, "database.out.server.name",
databaseOutServerName);
addPropertyIfNotNull(configBuilder, "log.mining.archive.log.hours",
logMiningArchiveLogHours);
addPropertyIfNotNull(configBuilder,
"log.mining.transaction.retention.hours", logMiningTransactionRetentionHours);
- addPropertyIfNotNull(configBuilder,
"snapshot.include.collection.list", snapshotIncludeCollectionList);
- addPropertyIfNotNull(configBuilder, "database.history.file.filename",
databaseHistoryFileFilename);
addPropertyIfNotNull(configBuilder, "log.mining.batch.size.max",
logMiningBatchSizeMax);
- addPropertyIfNotNull(configBuilder, "database.pdb.name",
databasePdbName);
- addPropertyIfNotNull(configBuilder, "database.connection.adapter",
databaseConnectionAdapter);
addPropertyIfNotNull(configBuilder, "max.queue.size.in.bytes",
maxQueueSizeInBytes);
- addPropertyIfNotNull(configBuilder, "log.mining.buffer.type",
logMiningBufferType);
addPropertyIfNotNull(configBuilder, "transaction.topic",
transactionTopic);
addPropertyIfNotNull(configBuilder, "database.url", databaseUrl);
addPropertyIfNotNull(configBuilder, "time.precision.mode",
timePrecisionMode);
addPropertyIfNotNull(configBuilder, "database.server.name",
databaseServerName);
- addPropertyIfNotNull(configBuilder,
"event.processing.failure.handling.mode", eventProcessingFailureHandlingMode);
- addPropertyIfNotNull(configBuilder, "snapshot.max.threads",
snapshotMaxThreads);
addPropertyIfNotNull(configBuilder, "database.port", databasePort);
addPropertyIfNotNull(configBuilder,
"log.mining.sleep.time.increment.ms", logMiningSleepTimeIncrementMs);
addPropertyIfNotNull(configBuilder, "column.exclude.list",
columnExcludeList);
+ addPropertyIfNotNull(configBuilder, "log.mining.session.max.ms",
logMiningSessionMaxMs);
addPropertyIfNotNull(configBuilder, "database.hostname",
databaseHostname);
addPropertyIfNotNull(configBuilder, "log.mining.batch.size.min",
logMiningBatchSizeMin);
- addPropertyIfNotNull(configBuilder, "schema.name.adjustment.mode",
schemaNameAdjustmentMode);
- addPropertyIfNotNull(configBuilder, "log.mining.batch.size.default",
logMiningBatchSizeDefault);
addPropertyIfNotNull(configBuilder, "snapshot.enhance.predicate.scn",
snapshotEnhancePredicateScn);
- addPropertyIfNotNull(configBuilder, "table.include.list",
tableIncludeList);
return configBuilder.build();
}
diff --git
a/components/camel-debezium/camel-debezium-oracle/src/generated/resources/org/apache/camel/component/debezium/debezium-oracle.json
b/components/camel-debezium/camel-debezium-oracle/src/generated/resources/org/apache/camel/component/debezium/debezium-oracle.json
index d6c1d14bf43..d3de9059613 100644
---
a/components/camel-debezium/camel-debezium-oracle/src/generated/resources/org/apache/camel/component/debezium/debezium-oracle.json
+++
b/components/camel-debezium/camel-debezium-oracle/src/generated/resources/org/apache/camel/component/debezium/debezium-oracle.json
@@ -88,6 +88,7 @@
"logMiningBufferType": { "kind": "property", "displayName": "Log Mining
Buffer Type", "group": "oracle", "label": "consumer,oracle", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "memory",
"configurationClass":
"org.apache.camel.component.debezium.configuration.OracleConnectorEmbeddedDebeziumConfiguration",
"configurationField": "configuration", "description": "The buffer type
controls ho [...]
"logMiningScnGapDetectionGapSizeMin": { "kind": "property", "displayName":
"Log Mining Scn Gap Detection Gap Size Min", "group": "oracle", "label":
"consumer,oracle", "required": false, "type": "integer", "javaType": "long",
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
1000000, "configurationClass":
"org.apache.camel.component.debezium.configuration.OracleConnectorEmbeddedDebeziumConfiguration",
"configurationField": "configuration", "description": "Used [...]
"logMiningScnGapDetectionTimeIntervalMaxMs": { "kind": "property",
"displayName": "Log Mining Scn Gap Detection Time Interval Max Ms", "group":
"oracle", "label": "consumer,oracle", "required": false, "type": "duration",
"javaType": "long", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "20s", "configurationClass":
"org.apache.camel.component.debezium.configuration.OracleConnectorEmbeddedDebeziumConfiguration",
"configurationField": "configuration", "descri [...]
+ "logMiningSessionMaxMs": { "kind": "property", "displayName": "Log Mining
Session Max Ms", "group": "oracle", "label": "consumer,oracle", "required":
false, "type": "duration", "javaType": "long", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "0ms",
"configurationClass":
"org.apache.camel.component.debezium.configuration.OracleConnectorEmbeddedDebeziumConfiguration",
"configurationField": "configuration", "description": "The maximum number of
milliseconds [...]
"logMiningSleepTimeDefaultMs": { "kind": "property", "displayName": "Log
Mining Sleep Time Default Ms", "group": "oracle", "label": "consumer,oracle",
"required": false, "type": "duration", "javaType": "long", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "1s",
"configurationClass":
"org.apache.camel.component.debezium.configuration.OracleConnectorEmbeddedDebeziumConfiguration",
"configurationField": "configuration", "description": "The amount of time that
[...]
"logMiningSleepTimeIncrementMs": { "kind": "property", "displayName": "Log
Mining Sleep Time Increment Ms", "group": "oracle", "label": "consumer,oracle",
"required": false, "type": "duration", "javaType": "long", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "200ms",
"configurationClass":
"org.apache.camel.component.debezium.configuration.OracleConnectorEmbeddedDebeziumConfiguration",
"configurationField": "configuration", "description": "The maximum amou [...]
"logMiningSleepTimeMaxMs": { "kind": "property", "displayName": "Log
Mining Sleep Time Max Ms", "group": "oracle", "label": "consumer,oracle",
"required": false, "type": "duration", "javaType": "long", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "3s",
"configurationClass":
"org.apache.camel.component.debezium.configuration.OracleConnectorEmbeddedDebeziumConfiguration",
"configurationField": "configuration", "description": "The maximum amount of
time that [...]
@@ -204,6 +205,7 @@
"logMiningBufferType": { "kind": "parameter", "displayName": "Log Mining
Buffer Type", "group": "oracle", "label": "consumer,oracle", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "memory",
"configurationClass":
"org.apache.camel.component.debezium.configuration.OracleConnectorEmbeddedDebeziumConfiguration",
"configurationField": "configuration", "description": "The buffer type
controls h [...]
"logMiningScnGapDetectionGapSizeMin": { "kind": "parameter",
"displayName": "Log Mining Scn Gap Detection Gap Size Min", "group": "oracle",
"label": "consumer,oracle", "required": false, "type": "integer", "javaType":
"long", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": 1000000, "configurationClass":
"org.apache.camel.component.debezium.configuration.OracleConnectorEmbeddedDebeziumConfiguration",
"configurationField": "configuration", "description": "Used [...]
"logMiningScnGapDetectionTimeIntervalMaxMs": { "kind": "parameter",
"displayName": "Log Mining Scn Gap Detection Time Interval Max Ms", "group":
"oracle", "label": "consumer,oracle", "required": false, "type": "duration",
"javaType": "long", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "20s", "configurationClass":
"org.apache.camel.component.debezium.configuration.OracleConnectorEmbeddedDebeziumConfiguration",
"configurationField": "configuration", "descr [...]
+ "logMiningSessionMaxMs": { "kind": "parameter", "displayName": "Log Mining
Session Max Ms", "group": "oracle", "label": "consumer,oracle", "required":
false, "type": "duration", "javaType": "long", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "0ms",
"configurationClass":
"org.apache.camel.component.debezium.configuration.OracleConnectorEmbeddedDebeziumConfiguration",
"configurationField": "configuration", "description": "The maximum number of
milliseconds [...]
"logMiningSleepTimeDefaultMs": { "kind": "parameter", "displayName": "Log
Mining Sleep Time Default Ms", "group": "oracle", "label": "consumer,oracle",
"required": false, "type": "duration", "javaType": "long", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "1s",
"configurationClass":
"org.apache.camel.component.debezium.configuration.OracleConnectorEmbeddedDebeziumConfiguration",
"configurationField": "configuration", "description": "The amount of time tha
[...]
"logMiningSleepTimeIncrementMs": { "kind": "parameter", "displayName":
"Log Mining Sleep Time Increment Ms", "group": "oracle", "label":
"consumer,oracle", "required": false, "type": "duration", "javaType": "long",
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
"200ms", "configurationClass":
"org.apache.camel.component.debezium.configuration.OracleConnectorEmbeddedDebeziumConfiguration",
"configurationField": "configuration", "description": "The maximum amo [...]
"logMiningSleepTimeMaxMs": { "kind": "parameter", "displayName": "Log
Mining Sleep Time Max Ms", "group": "oracle", "label": "consumer,oracle",
"required": false, "type": "duration", "javaType": "long", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "3s",
"configurationClass":
"org.apache.camel.component.debezium.configuration.OracleConnectorEmbeddedDebeziumConfiguration",
"configurationField": "configuration", "description": "The maximum amount of
time tha [...]
diff --git a/parent/pom.xml b/parent/pom.xml
index 90bea608ee6..32d300dd057 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -161,7 +161,7 @@
<docker-java-version>3.2.13</docker-java-version>
<dozer-version>6.5.2</dozer-version>
<dropbox-version>3.2.0</dropbox-version>
- <debezium-version>1.9.1.Final</debezium-version>
+ <debezium-version>1.9.3.Final</debezium-version>
<debezium-mysql-connector-version>8.0.28</debezium-mysql-connector-version>
<eddsa-version>0.3.0</eddsa-version>
<egit-github-core-version>2.1.5</egit-github-core-version>