This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new f91486031 feat(connectors): add MongoDB sink connector (#2815)
f91486031 is described below
commit f91486031c8825cf02eae5924760e3a80e0ec781
Author: amuldotexe <[email protected]>
AuthorDate: Tue Mar 10 01:25:42 2026 +0530
feat(connectors): add MongoDB sink connector (#2815)
Co-authored-by: amuldotexe <[email protected]>
Co-authored-by: Grzegorz Koszyk
<[email protected]>
Co-authored-by: Maciej Modzelewski <[email protected]>
Co-authored-by: Hubert Gruszecki <[email protected]>
---
Cargo.lock | 319 ++++++
Cargo.toml | 1 +
DEPENDENCIES.md | 23 +
core/connectors/runtime/src/sink.rs | 3 +-
core/connectors/sdk/src/lib.rs | 6 +
core/connectors/sdk/src/sink.rs | 3 +-
core/connectors/sinks/mongodb_sink/Cargo.toml | 42 +
core/connectors/sinks/mongodb_sink/README.md | 146 +++
core/connectors/sinks/mongodb_sink/config.toml | 46 +
core/connectors/sinks/mongodb_sink/src/lib.rs | 912 +++++++++++++++
core/integration/Cargo.toml | 1 +
core/integration/tests/connectors/fixtures/mod.rs | 5 +
.../tests/connectors/fixtures/mongodb/container.rs | 244 ++++
.../tests/connectors/fixtures/{ => mongodb}/mod.rs | 19 +-
.../tests/connectors/fixtures/mongodb/sink.rs | 390 +++++++
core/integration/tests/connectors/mod.rs | 1 +
.../tests/connectors/{fixtures => mongodb}/mod.rs | 18 +-
.../tests/connectors/mongodb/mongodb_sink.rs | 1162 ++++++++++++++++++++
.../integration/tests/connectors/mongodb/sink.toml | 20 +
19 files changed, 3330 insertions(+), 31 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 23b34e3a8..80f8b9554 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1507,6 +1507,29 @@ dependencies = [
"alloc-stdlib",
]
+[[package]]
+name = "bson"
+version = "2.15.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7969a9ba84b0ff843813e7249eed1678d9b6607ce5a3b8f0a47af3fcf7978e6e"
+dependencies = [
+ "ahash 0.8.12",
+ "base64 0.22.1",
+ "bitvec",
+ "getrandom 0.2.17",
+ "getrandom 0.3.4",
+ "hex",
+ "indexmap 2.13.0",
+ "js-sys",
+ "once_cell",
+ "rand 0.9.2",
+ "serde",
+ "serde_bytes",
+ "serde_json",
+ "time",
+ "uuid",
+]
+
[[package]]
name = "bstr"
version = "1.12.1"
@@ -2984,6 +3007,28 @@ dependencies = [
"syn 2.0.115",
]
+[[package]]
+name = "derive-syn-parse"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d65d7ce8132b7c0e54497a4d9a55a1c2a0912a0d786cf894472ba818fba45762"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.115",
+]
+
+[[package]]
+name = "derive-where"
+version = "1.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ef941ded77d15ca19b40374869ac6000af1c9f2a4c0f3d4c70926287e6364a8f"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.115",
+]
+
[[package]]
name = "derive_builder"
version = "0.20.2"
@@ -3332,6 +3377,18 @@ dependencies = [
"cfg-if",
]
+[[package]]
+name = "enum-as-inner"
+version = "0.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a1e6a265c649f3f5979b601d26f1d05ada116434c87741c9493cb56218f76cbc"
+dependencies = [
+ "heck",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.115",
+]
+
[[package]]
name = "enum_dispatch"
version = "0.3.13"
@@ -4534,6 +4591,52 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
+[[package]]
+name = "hickory-proto"
+version = "0.25.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f8a6fe56c0038198998a6f217ca4e7ef3a5e51f46163bd6dd60b5c71ca6c6502"
+dependencies = [
+ "async-trait",
+ "cfg-if",
+ "data-encoding",
+ "enum-as-inner",
+ "futures-channel",
+ "futures-io",
+ "futures-util",
+ "idna",
+ "ipnet",
+ "once_cell",
+ "rand 0.9.2",
+ "ring",
+ "thiserror 2.0.18",
+ "tinyvec",
+ "tokio",
+ "tracing",
+ "url",
+]
+
+[[package]]
+name = "hickory-resolver"
+version = "0.25.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dc62a9a99b0bfb44d2ab95a7208ac952d31060efc16241c87eaf36406fecf87a"
+dependencies = [
+ "cfg-if",
+ "futures-util",
+ "hickory-proto",
+ "ipconfig",
+ "moka",
+ "once_cell",
+ "parking_lot",
+ "rand 0.9.2",
+ "resolv-conf",
+ "smallvec",
+ "thiserror 2.0.18",
+ "tokio",
+ "tracing",
+]
+
[[package]]
name = "hkdf"
version = "0.12.4"
@@ -5321,6 +5424,20 @@ dependencies = [
"uuid",
]
+[[package]]
+name = "iggy_connector_mongodb_sink"
+version = "0.3.0"
+dependencies = [
+ "async-trait",
+ "humantime",
+ "iggy_connector_sdk",
+ "mongodb",
+ "serde",
+ "serde_json",
+ "tokio",
+ "tracing",
+]
+
[[package]]
name = "iggy_connector_postgres_sink"
version = "0.3.1-edge.1"
@@ -5640,6 +5757,7 @@ dependencies = [
"keyring",
"lazy_static",
"libc",
+ "mongodb",
"once_cell",
"predicates",
"rand 0.10.0",
@@ -5710,6 +5828,18 @@ dependencies = [
"rustix 1.1.3",
]
+[[package]]
+name = "ipconfig"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f"
+dependencies = [
+ "socket2 0.5.10",
+ "widestring",
+ "windows-sys 0.48.0",
+ "winreg",
+]
+
[[package]]
name = "ipnet"
version = "2.11.0"
@@ -6304,6 +6434,54 @@ dependencies = [
"twox-hash",
]
+[[package]]
+name = "macro_magic"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cc33f9f0351468d26fbc53d9ce00a096c8522ecb42f19b50f34f2c422f76d21d"
+dependencies = [
+ "macro_magic_core",
+ "macro_magic_macros",
+ "quote",
+ "syn 2.0.115",
+]
+
+[[package]]
+name = "macro_magic_core"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1687dc887e42f352865a393acae7cf79d98fab6351cde1f58e9e057da89bf150"
+dependencies = [
+ "const-random",
+ "derive-syn-parse",
+ "macro_magic_core_macros",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.115",
+]
+
+[[package]]
+name = "macro_magic_core_macros"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b02abfe41815b5bd98dbd4260173db2c116dda171dc0fe7838cb206333b83308"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.115",
+]
+
+[[package]]
+name = "macro_magic_macros"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "73ea28ee64b88876bf45277ed9a5817c1817df061a74f2b988971a12570e5869"
+dependencies = [
+ "macro_magic_core",
+ "quote",
+ "syn 2.0.115",
+]
+
[[package]]
name = "macro_rules_attribute"
version = "0.1.3"
@@ -6517,6 +6695,82 @@ dependencies = [
"uuid",
]
+[[package]]
+name = "mongocrypt"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8da0cd419a51a5fb44819e290fbdb0665a54f21dead8923446a799c7f4d26ad9"
+dependencies = [
+ "bson",
+ "mongocrypt-sys",
+ "once_cell",
+ "serde",
+]
+
+[[package]]
+name = "mongocrypt-sys"
+version = "0.1.5+1.15.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "224484c5d09285a7b8cb0a0c117e847ebd14cb6e4470ecf68cdb89c503b0edb9"
+
+[[package]]
+name = "mongodb"
+version = "3.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "803dd859e8afa084c255a8effd8000ff86f7c8076a50cd6d8c99e8f3496f75c2"
+dependencies = [
+ "base64 0.22.1",
+ "bitflags 2.10.0",
+ "bson",
+ "derive-where",
+ "derive_more",
+ "futures-core",
+ "futures-io",
+ "futures-util",
+ "hex",
+ "hickory-proto",
+ "hickory-resolver",
+ "hmac",
+ "macro_magic",
+ "md-5",
+ "mongocrypt",
+ "mongodb-internal-macros",
+ "pbkdf2",
+ "percent-encoding",
+ "rand 0.9.2",
+ "rustc_version_runtime",
+ "rustls",
+ "rustversion",
+ "serde",
+ "serde_bytes",
+ "serde_with",
+ "sha1",
+ "sha2",
+ "socket2 0.6.2",
+ "stringprep",
+ "strsim",
+ "take_mut",
+ "thiserror 2.0.18",
+ "tokio",
+ "tokio-rustls",
+ "tokio-util",
+ "typed-builder 0.22.0",
+ "uuid",
+ "webpki-roots 1.0.6",
+]
+
+[[package]]
+name = "mongodb-internal-macros"
+version = "3.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a973ef3dd3dbc6f6e65bbdecfd9ec5e781b9e7493b0f369a7c62e35d8e5ae2c8"
+dependencies = [
+ "macro_magic",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.115",
+]
+
[[package]]
name = "moxcms"
version = "0.7.11"
@@ -6910,6 +7164,10 @@ name = "once_cell"
version = "1.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
+dependencies = [
+ "critical-section",
+ "portable-atomic",
+]
[[package]]
name = "once_cell_polyfill"
@@ -7328,6 +7586,15 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b867cad97c0791bbd3aaa6472142568c6c9e8f71937e98379f584cfb0cf35bec"
+[[package]]
+name = "pbkdf2"
+version = "0.12.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2"
+dependencies = [
+ "digest",
+]
+
[[package]]
name = "pear"
version = "0.2.9"
@@ -8439,6 +8706,12 @@ dependencies = [
"tracing",
]
+[[package]]
+name = "resolv-conf"
+version = "0.7.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1e061d1b48cb8d38042de4ae0a7a6401009d6143dc80d2e2d6f31f0bdd6470c7"
+
[[package]]
name = "resvg"
version = "0.45.1"
@@ -8724,6 +8997,16 @@ dependencies = [
"semver",
]
+[[package]]
+name = "rustc_version_runtime"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2dd18cd2bae1820af0b6ad5e54f4a51d0f3fcc53b05f845675074efcc7af071d"
+dependencies = [
+ "rustc_version",
+ "semver",
+]
+
[[package]]
name = "rusticata-macros"
version = "4.1.0"
@@ -10109,6 +10392,12 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
+[[package]]
+name = "take_mut"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60"
+
[[package]]
name = "tap"
version = "1.0.1"
@@ -10880,6 +11169,15 @@ dependencies = [
"typed-builder-macro 0.20.1",
]
+[[package]]
+name = "typed-builder"
+version = "0.22.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "398a3a3c918c96de527dc11e6e846cd549d4508030b8a33e1da12789c856b81a"
+dependencies = [
+ "typed-builder-macro 0.22.0",
+]
+
[[package]]
name = "typed-builder"
version = "0.23.2"
@@ -10900,6 +11198,17 @@ dependencies = [
"syn 2.0.115",
]
+[[package]]
+name = "typed-builder-macro"
+version = "0.22.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0e48cea23f68d1f78eb7bc092881b6bb88d3d6b5b7e6234f6f9c911da1ffb221"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.115",
+]
+
[[package]]
name = "typed-builder-macro"
version = "0.23.2"
@@ -12109,6 +12418,16 @@ dependencies = [
"memchr",
]
+[[package]]
+name = "winreg"
+version = "0.50.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1"
+dependencies = [
+ "cfg-if",
+ "windows-sys 0.48.0",
+]
+
[[package]]
name = "winsafe"
version = "0.0.19"
diff --git a/Cargo.toml b/Cargo.toml
index e0140f2b2..d49abb763 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -35,6 +35,7 @@ members = [
"core/connectors/sdk",
"core/connectors/sinks/elasticsearch_sink",
"core/connectors/sinks/iceberg_sink",
+ "core/connectors/sinks/mongodb_sink",
"core/connectors/sinks/postgres_sink",
"core/connectors/sinks/quickwit_sink",
"core/connectors/sinks/stdout_sink",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 31c1302ff..557e24288 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -122,6 +122,7 @@ borsh: 1.6.0, "Apache-2.0 OR MIT",
borsh-derive: 1.6.0, "Apache-2.0",
brotli: 8.0.2, "BSD-3-Clause AND MIT",
brotli-decompressor: 5.0.0, "BSD-3-Clause OR MIT",
+bson: 2.15.0, "MIT",
bstr: 1.12.1, "Apache-2.0 OR MIT",
built: 0.8.0, "MIT",
bumpalo: 3.19.1, "Apache-2.0 OR MIT",
@@ -254,6 +255,8 @@ der: 0.7.10, "Apache-2.0 OR MIT",
der-parser: 10.0.0, "Apache-2.0 OR MIT",
deranged: 0.5.6, "Apache-2.0 OR MIT",
derive-new: 0.7.0, "MIT",
+derive-syn-parse: 0.2.0, "Apache-2.0 OR MIT",
+derive-where: 1.6.0, "Apache-2.0 OR MIT",
derive_builder: 0.20.2, "Apache-2.0 OR MIT",
derive_builder_core: 0.20.2, "Apache-2.0 OR MIT",
derive_builder_macro: 0.20.2, "Apache-2.0 OR MIT",
@@ -289,6 +292,7 @@ embedded-io: 0.4.0, "Apache-2.0 OR MIT",
embedded-io: 0.6.1, "Apache-2.0 OR MIT",
encode_unicode: 1.0.0, "Apache-2.0 OR MIT",
encoding_rs: 0.8.35, "(Apache-2.0 OR MIT) AND BSD-3-Clause",
+enum-as-inner: 0.6.1, "Apache-2.0 OR MIT",
enum_dispatch: 0.3.13, "Apache-2.0 OR MIT",
enumset: 1.1.10, "Apache-2.0 OR MIT",
enumset_derive: 0.14.0, "Apache-2.0 OR MIT",
@@ -402,6 +406,8 @@ heapless: 0.7.17, "Apache-2.0 OR MIT",
heck: 0.5.0, "Apache-2.0 OR MIT",
hermit-abi: 0.5.2, "Apache-2.0 OR MIT",
hex: 0.4.3, "Apache-2.0 OR MIT",
+hickory-proto: 0.25.2, "Apache-2.0 OR MIT",
+hickory-resolver: 0.25.2, "Apache-2.0 OR MIT",
hkdf: 0.12.4, "Apache-2.0 OR MIT",
hmac: 0.12.1, "Apache-2.0 OR MIT",
home: 0.5.12, "Apache-2.0 OR MIT",
@@ -450,6 +456,7 @@ iggy_common: 0.9.1-edge.1, "Apache-2.0",
iggy_connector_elasticsearch_sink: 0.3.1-edge.1, "Apache-2.0",
iggy_connector_elasticsearch_source: 0.3.1-edge.1, "Apache-2.0",
iggy_connector_iceberg_sink: 0.3.1-edge.1, "Apache-2.0",
+iggy_connector_mongodb_sink: 0.3.0, "Apache-2.0",
iggy_connector_postgres_sink: 0.3.1-edge.1, "Apache-2.0",
iggy_connector_postgres_source: 0.3.1-edge.1, "Apache-2.0",
iggy_connector_quickwit_sink: 0.3.1-edge.1, "Apache-2.0",
@@ -478,6 +485,7 @@ interpolate_name: 0.2.4, "MIT",
inventory: 0.3.21, "Apache-2.0 OR MIT",
io-uring: 0.7.11, "Apache-2.0 OR MIT",
io_uring_buf_ring: 0.2.3, "MIT",
+ipconfig: 0.3.2, "Apache-2.0 OR MIT",
ipnet: 2.11.0, "Apache-2.0 OR MIT",
iri-string: 0.7.10, "Apache-2.0 OR MIT",
is_terminal_polyfill: 1.70.2, "Apache-2.0 OR MIT",
@@ -542,6 +550,10 @@ loom: 0.7.2, "MIT",
loop9: 0.1.5, "MIT",
lru-slab: 0.1.2, "Apache-2.0 OR MIT OR Zlib",
lz4_flex: 0.12.0, "MIT",
+macro_magic: 0.5.1, "MIT",
+macro_magic_core: 0.5.1, "MIT",
+macro_magic_core_macros: 0.5.1, "MIT",
+macro_magic_macros: 0.5.1, "MIT",
macro_rules_attribute: 0.1.3, "MIT",
macro_rules_attribute-proc_macro: 0.1.3, "MIT",
matchers: 0.2.0, "MIT",
@@ -563,6 +575,10 @@ mio: 1.1.1, "MIT",
mockall: 0.14.0, "Apache-2.0 OR MIT",
mockall_derive: 0.14.0, "Apache-2.0 OR MIT",
moka: 0.12.13, "(Apache-2.0 OR MIT) AND Apache-2.0",
+mongocrypt: 0.3.2, "Apache-2.0",
+mongocrypt-sys: 0.1.5+1.15.1, "Apache-2.0",
+mongodb: 3.5.1, "Apache-2.0",
+mongodb-internal-macros: 3.5.1, "Apache-2.0",
moxcms: 0.7.11, "Apache-2.0 OR BSD-3-Clause",
murmur3: 0.5.2, "Apache-2.0 OR MIT",
never-say-never: 6.6.666, "Apache-2.0 OR MIT OR Zlib",
@@ -638,6 +654,7 @@ password-hash: 0.5.0, "Apache-2.0 OR MIT",
paste: 1.0.15, "Apache-2.0 OR MIT",
pastey: 0.1.1, "Apache-2.0 OR MIT",
pastey: 0.2.1, "Apache-2.0 OR MIT",
+pbkdf2: 0.12.2, "Apache-2.0 OR MIT",
pear: 0.2.9, "Apache-2.0 OR MIT",
pear_codegen: 0.2.9, "Apache-2.0 OR MIT",
peg: 0.6.3, "MIT",
@@ -738,6 +755,7 @@ reqwest: 0.12.28, "Apache-2.0 OR MIT",
reqwest-middleware: 0.4.2, "Apache-2.0 OR MIT",
reqwest-retry: 0.8.0, "Apache-2.0 OR MIT",
reqwest-tracing: 0.5.8, "Apache-2.0 OR MIT",
+resolv-conf: 0.7.6, "Apache-2.0 OR MIT",
resvg: 0.45.1, "Apache-2.0 OR MIT",
retry-policies: 0.5.1, "Apache-2.0 OR MIT",
rfc6979: 0.4.0, "Apache-2.0 OR MIT",
@@ -762,6 +780,7 @@ rust-ini: 0.21.3, "MIT",
rust_decimal: 1.40.0, "MIT",
rustc-hash: 2.1.1, "Apache-2.0 OR MIT",
rustc_version: 0.4.1, "Apache-2.0 OR MIT",
+rustc_version_runtime: 0.3.0, "MIT",
rusticata-macros: 4.1.0, "Apache-2.0 OR MIT",
rustix: 0.38.44, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT",
rustix: 1.1.3, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT",
@@ -877,6 +896,7 @@ sys_traits_macros: 0.1.0, "MIT",
sysinfo: 0.37.2, "MIT",
sysinfo: 0.38.1, "MIT",
tagptr: 0.2.0, "Apache-2.0 OR MIT",
+take_mut: 0.2.2, "MIT",
tap: 1.0.1, "MIT",
tar: 0.4.44, "Apache-2.0 OR MIT",
tempfile: 3.25.0, "Apache-2.0 OR MIT",
@@ -943,8 +963,10 @@ ttf-parser: 0.25.1, "Apache-2.0 OR MIT",
tungstenite: 0.28.0, "Apache-2.0 OR MIT",
twox-hash: 2.1.2, "MIT",
typed-builder: 0.20.1, "Apache-2.0 OR MIT",
+typed-builder: 0.22.0, "Apache-2.0 OR MIT",
typed-builder: 0.23.2, "Apache-2.0 OR MIT",
typed-builder-macro: 0.20.1, "Apache-2.0 OR MIT",
+typed-builder-macro: 0.22.0, "Apache-2.0 OR MIT",
typed-builder-macro: 0.23.2, "Apache-2.0 OR MIT",
typed-path: 0.12.3, "Apache-2.0 OR MIT",
typenum: 1.19.0, "Apache-2.0 OR MIT",
@@ -1084,6 +1106,7 @@ windows_x86_64_msvc: 0.52.6, "Apache-2.0 OR MIT",
windows_x86_64_msvc: 0.53.1, "Apache-2.0 OR MIT",
winnow: 0.5.40, "MIT",
winnow: 0.7.14, "MIT",
+winreg: 0.50.0, "MIT",
winsafe: 0.0.19, "MIT",
wit-bindgen: 0.51.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT",
wit-bindgen-core: 0.51.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR
MIT",
diff --git a/core/connectors/runtime/src/sink.rs
b/core/connectors/runtime/src/sink.rs
index 2074f845c..48961aca6 100644
--- a/core/connectors/runtime/src/sink.rs
+++ b/core/connectors/runtime/src/sink.rs
@@ -20,7 +20,7 @@
use crate::configs::connectors::SinkConfig;
use crate::context::RuntimeContext;
use crate::log::LOG_CALLBACK;
-use crate::metrics::{ConnectorType, Metrics};
+use crate::metrics::Metrics;
use crate::{
PLUGIN_ID, RuntimeError, SinkApi, SinkConnector, SinkConnectorConsumer,
SinkConnectorPlugin,
SinkConnectorWrapper, resolve_plugin_path, transform,
@@ -324,7 +324,6 @@ pub(crate) async fn consume_messages(
error!(
"Failed to process {messages_count} messages for sink
connector with ID: {plugin_id}. {error}",
);
- metrics.increment_errors(plugin_key, ConnectorType::Sink);
return Err(error);
}
};
diff --git a/core/connectors/sdk/src/lib.rs b/core/connectors/sdk/src/lib.rs
index 72bd781db..8ba37a083 100644
--- a/core/connectors/sdk/src/lib.rs
+++ b/core/connectors/sdk/src/lib.rs
@@ -48,6 +48,12 @@ pub mod transforms;
pub use log::LogCallback;
pub use transforms::Transform;
+#[doc(hidden)]
+pub mod connector_macro_support {
+ pub use dashmap::DashMap;
+ pub use once_cell::sync::Lazy;
+}
+
static RUNTIME: OnceCell<Runtime> = OnceCell::new();
pub fn get_runtime() -> &'static Runtime {
diff --git a/core/connectors/sdk/src/sink.rs b/core/connectors/sdk/src/sink.rs
index ad037b6f5..9d21f05ee 100644
--- a/core/connectors/sdk/src/sink.rs
+++ b/core/connectors/sdk/src/sink.rs
@@ -222,8 +222,7 @@ macro_rules! sink_connector {
assert_trait::<$type>();
};
- use dashmap::DashMap;
- use once_cell::sync::Lazy;
+ use $crate::connector_macro_support::{DashMap, Lazy};
use $crate::LogCallback;
use $crate::sink::SinkContainer;
diff --git a/core/connectors/sinks/mongodb_sink/Cargo.toml
b/core/connectors/sinks/mongodb_sink/Cargo.toml
new file mode 100644
index 000000000..11a18a40b
--- /dev/null
+++ b/core/connectors/sinks/mongodb_sink/Cargo.toml
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iggy_connector_mongodb_sink"
+version = "0.3.0"
+description = "Iggy MongoDB sink connector for storing stream messages into
MongoDB database"
+edition = "2024"
+license = "Apache-2.0"
+keywords = ["iggy", "messaging", "streaming", "mongodb", "sink"]
+categories = ["command-line-utilities", "database", "network-programming"]
+homepage = "https://iggy.apache.org"
+documentation = "https://iggy.apache.org/docs"
+repository = "https://github.com/apache/iggy"
+readme = "../../README.md"
+
+[lib]
+crate-type = ["cdylib", "lib"]
+
+[dependencies]
+async-trait = { workspace = true }
+humantime = { workspace = true }
+iggy_connector_sdk = { workspace = true }
+mongodb = { version = "3.0", features = ["rustls-tls"] }
+serde = { workspace = true }
+serde_json = { workspace = true }
+tokio = { workspace = true }
+tracing = { workspace = true }
diff --git a/core/connectors/sinks/mongodb_sink/README.md
b/core/connectors/sinks/mongodb_sink/README.md
new file mode 100644
index 000000000..550612cc3
--- /dev/null
+++ b/core/connectors/sinks/mongodb_sink/README.md
@@ -0,0 +1,146 @@
+# MongoDB Sink Connector
+
+Consumes messages from Iggy streams and stores them in a MongoDB collection.
+
+## Try It
+
+Send a JSON message through Iggy and see it land in MongoDB.
+
+**Prerequisites**: Docker running, project built (`cargo build` from repo
root).
+
+```bash
+# Start MongoDB
+docker run -d --name mongo-test -p 27017:27017 mongo:7
+
+# Start iggy-server (terminal 2)
+IGGY_ROOT_USERNAME=iggy IGGY_ROOT_PASSWORD=iggy ./target/debug/iggy-server
+
+# Create stream and topic
+./target/debug/iggy -u iggy -p iggy stream create demo_stream
+./target/debug/iggy -u iggy -p iggy topic create demo_stream demo_topic 1
+
+# Setup connector config
+mkdir -p /tmp/mdb-sink-test/connectors
+cat > /tmp/mdb-sink-test/config.toml << 'TOML'
+[iggy]
+address = "localhost:8090"
+username = "iggy"
+password = "iggy"
+[state]
+path = "/tmp/mdb-sink-test/state"
+[connectors]
+config_type = "local"
+config_dir = "/tmp/mdb-sink-test/connectors"
+TOML
+cat > /tmp/mdb-sink-test/connectors/sink.toml << 'TOML'
+type = "sink"
+key = "mongodb"
+enabled = true
+version = 0
+name = "test"
+path = "target/debug/libiggy_connector_mongodb_sink"
+[[streams]]
+stream = "demo_stream"
+topics = ["demo_topic"]
+schema = "json"
+batch_length = 100
+poll_interval = "100ms"
+consumer_group = "test_cg"
+[plugin_config]
+connection_uri = "mongodb://localhost:27017"
+database = "test_db"
+collection = "messages"
+payload_format = "json"
+auto_create_collection = true
+TOML
+
+# Start connector (terminal 3)
+IGGY_CONNECTORS_CONFIG_PATH=/tmp/mdb-sink-test/config.toml
./target/debug/iggy-connectors
+
+# Send a message
+./target/debug/iggy -u iggy -p iggy message send demo_stream demo_topic
'{"hello":"mongodb"}'
+
+# Verify in MongoDB
+docker exec mongo-test mongosh --quiet --eval \
+ 'db.getSiblingDB("test_db").messages.find().pretty()'
+```
+
+Expected:
+
+```json
+{ "payload": { "hello": "mongodb" }, "iggy_offset": 0, "iggy_stream":
"demo_stream" }
+```
+
+Cleanup: `docker rm -f mongo-test && rm -rf /tmp/mdb-sink-test`
+
+## Quick Start
+
+```toml
+[[streams]]
+stream = "demo_stream"
+topics = ["demo_topic"]
+schema = "json"
+batch_length = 100
+poll_interval = "100ms"
+consumer_group = "mongodb_cg"
+
+[plugin_config]
+connection_uri = "mongodb://localhost:27017"
+database = "iggy_data"
+collection = "messages"
+payload_format = "json"
+```
+
+## Configuration
+
+| Option | Default | Description |
+| ------ | ------- | ----------- |
+| `connection_uri` | **required** | MongoDB URI |
+| `database` | **required** | Target database |
+| `collection` | **required** | Target collection |
+| `batch_size` | `100` | Documents per `insertMany` call |
+| `payload_format` | `binary` | `binary`, `json`, or `string` |
+| `include_metadata` | `true` | Add iggy offset, timestamp, stream, topic,
partition |
+| `include_checksum` | `true` | Add message checksum |
+| `include_origin_timestamp` | `true` | Add origin timestamp |
+| `auto_create_collection` | `false` | Create collection if missing |
+| `max_pool_size` | driver default | Connection pool size |
+| `verbose_logging` | `false` | Log at info instead of debug |
+| `max_retries` | `3` | Retry attempts for transient errors |
+| `retry_delay` | `1s` | Base delay (`retry_delay * attempt`) |
+
+## Testing
+
+Requires Docker. Testcontainers starts MongoDB 7 + iggy-server automatically.
+
+```bash
+cargo test --test mod -- mongodb_sink
+```
+
+This runs 4 E2E tests against a real MongoDB instance:
+
+- `json_messages_sink_to_mongodb` — JSON payloads stored as embedded BSON
documents
+- `binary_messages_sink_as_bson_binary` — binary payloads stored as BSON Binary
+- `large_batch_processed_correctly` — batch insertion with configurable batch
size
+- `auto_create_collection_on_open` — collection created automatically when
missing
+
+Unit tests (no Docker):
+
+```bash
+cargo test -p iggy_connector_mongodb_sink
+```
+
+## Delivery Semantics
+
+This connector provides **at-least-once** delivery semantics.
+
+### Behavior
+
+- Messages may be delivered more than once on retry or restart
+- Uses Iggy message ID as MongoDB `_id` for document identity
+- **Insert-only mode**: duplicate key error is a hard failure (not upsert)
+
+### Known Limitations
+
+- On network timeout during insert, retry may cause duplicate key error
+- Sink does not upsert on duplicate (future improvement)
diff --git a/core/connectors/sinks/mongodb_sink/config.toml
b/core/connectors/sinks/mongodb_sink/config.toml
new file mode 100644
index 000000000..3b47b4ac1
--- /dev/null
+++ b/core/connectors/sinks/mongodb_sink/config.toml
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+type = "sink"
+key = "mongodb"
+enabled = true
+version = 0
+name = "MongoDB sink"
+path = "../../target/release/libiggy_connector_mongodb_sink"
+verbose = false
+
+[[streams]]
+stream = "user_events"
+topics = ["users", "orders"]
+schema = "json"
+batch_length = 100
+poll_interval = "5ms"
+consumer_group = "mongodb_sink"
+
+[plugin_config]
+connection_uri = "mongodb://localhost:27017"
+database = "iggy_messages"
+collection = "messages"
+batch_size = 100
+include_metadata = true
+include_checksum = true
+include_origin_timestamp = true
+payload_format = "binary"
+auto_create_collection = false
+verbose_logging = false
+max_retries = 3
+retry_delay = "1s"
diff --git a/core/connectors/sinks/mongodb_sink/src/lib.rs
b/core/connectors/sinks/mongodb_sink/src/lib.rs
new file mode 100644
index 000000000..34b01ca7b
--- /dev/null
+++ b/core/connectors/sinks/mongodb_sink/src/lib.rs
@@ -0,0 +1,912 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_trait::async_trait;
+use humantime::Duration as HumanDuration;
+use iggy_connector_sdk::{
+ ConsumedMessage, Error, MessagesMetadata, Sink, TopicMetadata,
sink_connector,
+};
+use mongodb::{Client, Collection, bson, options::ClientOptions};
+use serde::{Deserialize, Serialize};
+use std::str::FromStr;
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::time::Duration;
+use tracing::{debug, error, info, warn};
+
+sink_connector!(MongoDbSink);
+
+const DEFAULT_MAX_RETRIES: u32 = 3;
+const DEFAULT_RETRY_DELAY: &str = "1s";
+
+#[derive(Debug)]
+pub struct MongoDbSink {
+ pub id: u32,
+ client: Option<Client>,
+ config: MongoDbSinkConfig,
+ verbose: bool,
+ batch_size: usize,
+ include_metadata: bool,
+ include_checksum: bool,
+ include_origin_timestamp: bool,
+ payload_format: PayloadFormat,
+ max_retries: u32,
+ retry_delay: Duration,
+ messages_processed: AtomicU64,
+ insertion_errors: AtomicU64,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct MongoDbSinkConfig {
+ pub connection_uri: String,
+ pub database: String,
+ pub collection: String,
+ pub max_pool_size: Option<u32>,
+ pub auto_create_collection: Option<bool>,
+ pub batch_size: Option<u32>,
+ pub include_metadata: Option<bool>,
+ pub include_checksum: Option<bool>,
+ pub include_origin_timestamp: Option<bool>,
+ pub payload_format: Option<String>,
+ pub verbose_logging: Option<bool>,
+ pub max_retries: Option<u32>,
+ pub retry_delay: Option<String>,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
+pub enum PayloadFormat {
+ #[default]
+ Binary,
+ Json,
+ String,
+}
+
+impl PayloadFormat {
+ fn from_config(s: Option<&str>) -> Self {
+ let (payload_format, unknown_value) = classify_payload_format_value(s);
+ if let Some(value) = unknown_value {
+ warn!("Unknown MongoDB sink payload format '{value}', defaulting
to binary");
+ }
+ payload_format
+ }
+}
+
+#[derive(Debug)]
+struct BatchInsertOutcome {
+ inserted_count: u64,
+ error: Option<Error>,
+}
+
+impl MongoDbSink {
+ pub fn new(id: u32, config: MongoDbSinkConfig) -> Self {
+ let verbose = config.verbose_logging.unwrap_or(false);
+ let payload_format =
PayloadFormat::from_config(config.payload_format.as_deref());
+ let batch_size = config.batch_size.unwrap_or(100).max(1) as usize;
+ let include_metadata = config.include_metadata.unwrap_or(true);
+ let include_checksum = config.include_checksum.unwrap_or(true);
+ let include_origin_timestamp =
config.include_origin_timestamp.unwrap_or(true);
+ let max_retries = config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES);
+ let delay_str =
config.retry_delay.as_deref().unwrap_or(DEFAULT_RETRY_DELAY);
+ let retry_delay = HumanDuration::from_str(delay_str)
+ .map(|duration| duration.into())
+ .unwrap_or_else(|_| Duration::from_secs(1));
+ MongoDbSink {
+ id,
+ client: None,
+ config,
+ verbose,
+ batch_size,
+ include_metadata,
+ include_checksum,
+ include_origin_timestamp,
+ payload_format,
+ max_retries,
+ retry_delay,
+ messages_processed: AtomicU64::new(0),
+ insertion_errors: AtomicU64::new(0),
+ }
+ }
+}
+
+#[async_trait]
+impl Sink for MongoDbSink {
+ async fn open(&mut self) -> Result<(), Error> {
+ info!(
+ "Opening MongoDB sink connector with ID: {}. Target: {}.{}",
+ self.id, self.config.database, self.config.collection
+ );
+ self.connect().await?;
+
+ // Optionally create the collection so it is visible before first
insert
+ if self.config.auto_create_collection.unwrap_or(false) {
+ self.ensure_collection_exists().await?;
+ }
+
+ Ok(())
+ }
+
+ async fn consume(
+ &self,
+ topic_metadata: &TopicMetadata,
+ messages_metadata: MessagesMetadata,
+ messages: Vec<ConsumedMessage>,
+ ) -> Result<(), Error> {
+ self.process_messages(topic_metadata, &messages_metadata, &messages)
+ .await
+ }
+
+ async fn close(&mut self) -> Result<(), Error> {
+ info!("Closing MongoDB sink connector with ID: {}", self.id);
+
+ // MongoDB client doesn't require explicit close - it's reference
counted
+ // Just take the client to drop it
+ self.client.take();
+
+ let messages_processed =
self.messages_processed.load(Ordering::Relaxed);
+ let insertion_errors = self.insertion_errors.load(Ordering::Relaxed);
+ info!(
+ "MongoDB sink ID: {} processed {} messages with {} errors",
+ self.id, messages_processed, insertion_errors
+ );
+ Ok(())
+ }
+}
+
+impl MongoDbSink {
+ /// Build a MongoDB client using ClientOptions so max_pool_size can be
applied.
+ async fn connect(&mut self) -> Result<(), Error> {
+ let redacted = redact_connection_uri(&self.config.connection_uri);
+
+ info!("Connecting to MongoDB: {redacted}");
+
+ let mut options = ClientOptions::parse(&self.config.connection_uri)
+ .await
+ .map_err(|e| Error::InitError(format!("Failed to parse connection
URI: {e}")))?;
+
+ if let Some(pool_size) = self.config.max_pool_size {
+ options.max_pool_size = Some(pool_size);
+ }
+
+ let client = Client::with_options(options)
+ .map_err(|e| Error::InitError(format!("Failed to create client:
{e}")))?;
+
+ // Ping the database to verify connectivity
+ client
+ .database(&self.config.database)
+ .run_command(mongodb::bson::doc! {"ping": 1})
+ .await
+ .map_err(|e| Error::InitError(format!("Database connectivity test
failed: {e}")))?;
+
+ self.client = Some(client);
+ info!("Connected to MongoDB database: {}", self.config.database);
+ Ok(())
+ }
+
+ /// Create the target collection explicitly if it does not already exist.
+ async fn ensure_collection_exists(&self) -> Result<(), Error> {
+ let client = self.get_client()?;
+ let db = client.database(&self.config.database);
+
+ let existing = db
+ .list_collection_names()
+ .await
+ .map_err(|e| Error::InitError(format!("Failed to list collections:
{e}")))?;
+
+ if !existing.contains(&self.config.collection) {
+ db.create_collection(&self.config.collection)
+ .await
+ .map_err(|e| {
+ Error::InitError(format!(
+ "Failed to create collection '{}': {e}",
+ self.config.collection
+ ))
+ })?;
+ info!("Created MongoDB collection '{}'", self.config.collection);
+ } else {
+ debug!(
+ "Collection '{}' already exists, skipping creation",
+ self.config.collection
+ );
+ }
+
+ Ok(())
+ }
+
+ async fn process_messages(
+ &self,
+ topic_metadata: &TopicMetadata,
+ messages_metadata: &MessagesMetadata,
+ messages: &[ConsumedMessage],
+ ) -> Result<(), Error> {
+ let client = self.get_client()?;
+ let db = client.database(&self.config.database);
+ let collection = db.collection(&self.config.collection);
+
+ let mut successful_inserts = 0u64;
+ let mut last_error: Option<Error> = None;
+
+ for batch in messages.chunks(self.batch_size) {
+ let outcome = self
+ .insert_batch(batch, topic_metadata, messages_metadata,
&collection)
+ .await;
+
+ successful_inserts += outcome.inserted_count;
+ if let Some(batch_error) = outcome.error {
+ self.insertion_errors.fetch_add(1, Ordering::Relaxed);
+ error!(
+ "Failed to insert batch of {} messages: {batch_error}",
+ batch.len()
+ );
+ last_error = Some(batch_error);
+ }
+ }
+
+ self.messages_processed
+ .fetch_add(successful_inserts, Ordering::Relaxed);
+
+ let coll = &self.config.collection;
+ if self.verbose {
+ info!(
+ "MongoDB sink ID: {} inserted {successful_inserts} messages to
collection '{coll}'",
+ self.id
+ );
+ } else {
+ debug!(
+ "MongoDB sink ID: {} inserted {successful_inserts} messages to
collection '{coll}'",
+ self.id
+ );
+ }
+
+ if let Some(e) = last_error {
+ Err(e)
+ } else {
+ Ok(())
+ }
+ }
+
+ async fn insert_batch(
+ &self,
+ messages: &[ConsumedMessage],
+ topic_metadata: &TopicMetadata,
+ messages_metadata: &MessagesMetadata,
+ collection: &Collection<mongodb::bson::Document>,
+ ) -> BatchInsertOutcome {
+ if messages.is_empty() {
+ return BatchInsertOutcome {
+ inserted_count: 0,
+ error: None,
+ };
+ }
+
+ let mut docs = Vec::with_capacity(messages.len());
+
+ for message in messages {
+ let mut doc = mongodb::bson::Document::new();
+
+ let document_id =
+ build_composite_document_id(topic_metadata, messages_metadata,
message.id);
+ doc.insert("_id", document_id);
+
+ if self.include_metadata {
+ let (offset_key, offset_value) =
build_offset_metadata_value(message.offset);
+ doc.insert(offset_key, offset_value);
+ doc.insert(
+ "iggy_timestamp",
+ build_bson_datetime_value(message.timestamp),
+ );
+ doc.insert("iggy_stream", &topic_metadata.stream);
+ doc.insert("iggy_topic", &topic_metadata.topic);
+ doc.insert(
+ "iggy_partition_id",
+
build_partition_metadata_value(messages_metadata.partition_id),
+ );
+ }
+
+ if self.include_checksum {
+ doc.insert(
+ "iggy_checksum",
+ build_checksum_metadata_value(message.checksum),
+ );
+ }
+
+ if self.include_origin_timestamp {
+ doc.insert(
+ "iggy_origin_timestamp",
+ build_bson_datetime_value(message.origin_timestamp),
+ );
+ }
+
+ // Handle payload based on format
+ let payload_bytes = match message.payload.clone().try_into_vec() {
+ Ok(payload_bytes) => payload_bytes,
+ Err(error) => {
+ return BatchInsertOutcome {
+ inserted_count: 0,
+ error: Some(Error::CannotStoreData(format!(
+ "Failed to convert payload to bytes: {error}"
+ ))),
+ };
+ }
+ };
+
+ match self.payload_format {
+ PayloadFormat::Binary => {
+ doc.insert(
+ "payload",
+ bson::Binary {
+ subtype: bson::spec::BinarySubtype::Generic,
+ bytes: payload_bytes,
+ },
+ );
+ }
+ PayloadFormat::Json => {
+ let json_value: serde_json::Value = match
serde_json::from_slice(&payload_bytes)
+ {
+ Ok(json_value) => json_value,
+ Err(error) => {
+ error!("Failed to parse payload as JSON: {error}");
+ return BatchInsertOutcome {
+ inserted_count: 0,
+ error: Some(Error::CannotStoreData(format!(
+ "Failed to parse payload as JSON: {error}"
+ ))),
+ };
+ }
+ };
+ let bson_value = match bson::to_bson(&json_value) {
+ Ok(bson_value) => bson_value,
+ Err(error) => {
+ error!("Failed to convert JSON to BSON: {error}");
+ return BatchInsertOutcome {
+ inserted_count: 0,
+ error: Some(Error::CannotStoreData(format!(
+ "Failed to convert JSON to BSON: {error}"
+ ))),
+ };
+ }
+ };
+ doc.insert("payload", bson_value);
+ }
+ PayloadFormat::String => {
+ let text_value = match String::from_utf8(payload_bytes) {
+ Ok(text_value) => text_value,
+ Err(error) => {
+ error!("Failed to parse payload as UTF-8 text:
{error}");
+ return BatchInsertOutcome {
+ inserted_count: 0,
+ error: Some(Error::CannotStoreData(format!(
+ "Failed to parse payload as UTF-8 text:
{error}"
+ ))),
+ };
+ }
+ };
+ doc.insert("payload", text_value);
+ }
+ }
+
+ docs.push(doc);
+ }
+
+ self.insert_batch_with_retry(collection, &docs).await
+ }
+
+ async fn insert_batch_with_retry(
+ &self,
+ collection: &Collection<mongodb::bson::Document>,
+ docs: &[mongodb::bson::Document],
+ ) -> BatchInsertOutcome {
+ let mut attempts = 0u32;
+
+ loop {
+ let result =
collection.insert_many(docs.to_vec()).ordered(false).await;
+
+ match result {
+ Ok(result) => {
+ return BatchInsertOutcome {
+ inserted_count: result.inserted_ids.len() as u64,
+ error: None,
+ };
+ }
+ Err(error) => {
+ if let Some(duplicate_count) =
count_duplicate_write_errors(&error) {
+ let inserted_count =
docs.len().saturating_sub(duplicate_count) as u64;
+ if duplicate_count > 0 {
+ warn!(
+ "MongoDB sink ID: {} ignored {duplicate_count}
duplicate writes in batch",
+ self.id
+ );
+ }
+ return BatchInsertOutcome {
+ inserted_count,
+ error: None,
+ };
+ }
+
+ attempts += 1;
+ if !is_transient_error(&error) || attempts >=
self.max_retries {
+ let inserted_count =
estimate_inserted_count_value(&error, docs.len());
+ error!("Batch insert failed after {attempts} attempts:
{error}");
+ return BatchInsertOutcome {
+ inserted_count,
+ error: Some(Error::CannotStoreData(format!(
+ "Batch insert failed after {attempts}
attempts: {error}"
+ ))),
+ };
+ }
+ warn!(
+ "Transient database error (attempt {attempts}/{}):
{error}. Retrying...",
+ self.max_retries
+ );
+ tokio::time::sleep(self.retry_delay * attempts).await;
+ }
+ }
+ }
+ }
+
+ fn get_client(&self) -> Result<&Client, Error> {
+ self.client
+ .as_ref()
+ .ok_or_else(|| Error::InitError("Database not
connected".to_string()))
+ }
+}
+
+fn build_composite_document_id(
+ topic_metadata: &TopicMetadata,
+ messages_metadata: &MessagesMetadata,
+ message_id: u128,
+) -> String {
+ format!(
+ "{}:{}:{}:{message_id}",
+ topic_metadata.stream, topic_metadata.topic,
messages_metadata.partition_id
+ )
+}
+
+fn build_partition_metadata_value(partition_id: u32) -> bson::Bson {
+ if let Ok(value) = i32::try_from(partition_id) {
+ bson::Bson::Int32(value)
+ } else {
+ bson::Bson::Int64(i64::from(partition_id))
+ }
+}
+
+fn build_offset_metadata_value(offset: u64) -> (&'static str, bson::Bson) {
+ if let Ok(offset) = i64::try_from(offset) {
+ ("iggy_offset", bson::Bson::Int64(offset))
+ } else {
+ ("iggy_offset_str", bson::Bson::String(offset.to_string()))
+ }
+}
+
+fn build_bson_datetime_value(timestamp_micros: u64) -> bson::DateTime {
+ let timestamp_ms = timestamp_micros / 1000;
+ let timestamp_ms = i64::try_from(timestamp_ms).unwrap_or(i64::MAX);
+ bson::DateTime::from_millis(timestamp_ms)
+}
+
+fn build_checksum_metadata_value(checksum: u64) -> bson::Bson {
+ if let Ok(checksum) = i64::try_from(checksum) {
+ bson::Bson::Int64(checksum)
+ } else {
+ bson::Bson::String(checksum.to_string())
+ }
+}
+
+fn classify_payload_format_value(input: Option<&str>) -> (PayloadFormat,
Option<&str>) {
+ match input {
+ Some(value) if value.eq_ignore_ascii_case("json") =>
(PayloadFormat::Json, None),
+ Some(value)
+ if value.eq_ignore_ascii_case("string") ||
value.eq_ignore_ascii_case("text") =>
+ {
+ (PayloadFormat::String, None)
+ }
+ Some(value) if value.eq_ignore_ascii_case("binary") =>
(PayloadFormat::Binary, None),
+ Some(value) => (PayloadFormat::Binary, Some(value)),
+ None => (PayloadFormat::Binary, None),
+ }
+}
+
+fn count_duplicate_write_errors(error: &mongodb::error::Error) ->
Option<usize> {
+ use mongodb::error::ErrorKind;
+
+ let ErrorKind::InsertMany(insert_many_error) = error.kind.as_ref() else {
+ return None;
+ };
+ if insert_many_error.write_concern_error.is_some() {
+ return None;
+ }
+
+ let write_errors = insert_many_error.write_errors.as_ref()?;
+ if write_errors.is_empty() || write_errors.iter().any(|error| error.code
!= 11000) {
+ return None;
+ }
+
+ Some(write_errors.len())
+}
+
+fn estimate_inserted_count_value(error: &mongodb::error::Error, total_docs:
usize) -> u64 {
+ use mongodb::error::ErrorKind;
+
+ if let ErrorKind::InsertMany(insert_many_error) = error.kind.as_ref()
+ && let Some(write_errors) = &insert_many_error.write_errors
+ {
+ return total_docs.saturating_sub(write_errors.len()) as u64;
+ }
+
+ 0
+}
+
+fn is_transient_error(e: &mongodb::error::Error) -> bool {
+ use mongodb::error::ErrorKind;
+
+ if e.contains_label(mongodb::error::RETRYABLE_WRITE_ERROR) {
+ return true;
+ }
+
+ match e.kind.as_ref() {
+ ErrorKind::Io(_) => true,
+ ErrorKind::ConnectionPoolCleared { .. } => true,
+ ErrorKind::ServerSelection { .. } => true,
+ ErrorKind::Authentication { .. } => false,
+ ErrorKind::BsonDeserialization(_) => false,
+ ErrorKind::BsonSerialization(_) => false,
+ ErrorKind::InsertMany(insert_many_error) => {
+ let has_non_retryable_write_error = insert_many_error
+ .write_errors
+ .as_ref()
+ .is_some_and(|wes| wes.iter().any(|we| matches!(we.code, 11000
| 13 | 121)));
+ !has_non_retryable_write_error
+ }
+ ErrorKind::Command(cmd_err) => !matches!(cmd_err.code, 11000 | 13 |
121),
+ _ => {
+ let msg = e.to_string().to_lowercase();
+ msg.contains("timeout")
+ || msg.contains("network")
+ || msg.contains("pool")
+ || msg.contains("server selection")
+ }
+ }
+}
+
+fn redact_connection_uri(uri: &str) -> String {
+ if let Some(scheme_end) = uri.find("://") {
+ let scheme = &uri[..scheme_end + 3];
+ let rest = &uri[scheme_end + 3..];
+ let preview: String = rest.chars().take(3).collect();
+ return format!("{scheme}{preview}***");
+ }
+ let preview: String = uri.chars().take(3).collect();
+ format!("{preview}***")
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn given_default_config() -> MongoDbSinkConfig {
+ MongoDbSinkConfig {
+ connection_uri: "mongodb://localhost:27017".to_string(),
+ database: "test_db".to_string(),
+ collection: "test_collection".to_string(),
+ max_pool_size: None,
+ auto_create_collection: None,
+ batch_size: Some(100),
+ include_metadata: None,
+ include_checksum: None,
+ include_origin_timestamp: None,
+ payload_format: None,
+ verbose_logging: None,
+ max_retries: None,
+ retry_delay: None,
+ }
+ }
+
+ #[test]
+ fn given_payload_format_inputs_should_map_expected_variant() {
+ let cases = [
+ (Some("json"), PayloadFormat::Json),
+ (Some("JSON"), PayloadFormat::Json),
+ (Some("string"), PayloadFormat::String),
+ (Some("text"), PayloadFormat::String),
+ (Some("TEXT"), PayloadFormat::String),
+ (Some("binary"), PayloadFormat::Binary),
+ (Some("unknown"), PayloadFormat::Binary),
+ (None, PayloadFormat::Binary),
+ ];
+
+ for (input, expected) in cases {
+ assert_eq!(PayloadFormat::from_config(input), expected);
+ }
+ }
+
+ #[test]
+ fn given_unknown_payload_format_should_mark_warning_contract() {
+ let (payload_format, unknown_value) =
classify_payload_format_value(Some("surprise"));
+ assert_eq!(payload_format, PayloadFormat::Binary);
+ assert_eq!(unknown_value, Some("surprise"));
+ }
+
+ #[test]
+ fn given_retry_configurations_should_use_expected_values() {
+ let cases = [
+ (None, None, DEFAULT_MAX_RETRIES, Duration::from_secs(1)),
+ (Some(5), None, 5, Duration::from_secs(1)),
+ (
+ None,
+ Some("500ms"),
+ DEFAULT_MAX_RETRIES,
+ Duration::from_millis(500),
+ ),
+ ];
+
+ for (max_retries, retry_delay, expected_retries, expected_delay) in
cases {
+ let mut config = given_default_config();
+ config.max_retries = max_retries;
+ config.retry_delay =
retry_delay.map(std::string::ToString::to_string);
+
+ let sink = MongoDbSink::new(1, config);
+ assert_eq!(sink.max_retries, expected_retries);
+ assert_eq!(sink.retry_delay, expected_delay);
+ }
+ }
+
+ #[test]
+ fn given_connection_uri_shapes_should_redact_consistently() {
+ let cases = [
+ (
+ "mongodb://user:password@localhost:27017",
+ "mongodb://use***",
+ ),
+ ("localhost:27017", "loc***"),
+ (
+ "mongodb+srv://admin:[email protected]",
+ "mongodb+srv://adm***",
+ ),
+ ];
+
+ for (uri, expected) in cases {
+ assert_eq!(redact_connection_uri(uri), expected);
+ }
+ }
+
+ #[test]
+ fn given_payload_format_config_should_select_sink_format() {
+ let cases = [
+ (None, PayloadFormat::Binary),
+ (Some("json"), PayloadFormat::Json),
+ (Some("string"), PayloadFormat::String),
+ ];
+
+ for (payload_format, expected) in cases {
+ let mut config = given_default_config();
+ config.payload_format =
payload_format.map(std::string::ToString::to_string);
+
+ let sink = MongoDbSink::new(1, config);
+ assert_eq!(sink.payload_format, expected);
+ }
+ }
+
+ #[test]
+ fn given_offset_values_should_use_expected_metadata_field() {
+ let (normal_key, normal_value) = build_offset_metadata_value(i64::MAX
as u64);
+ assert_eq!(normal_key, "iggy_offset");
+ assert_eq!(normal_value, bson::Bson::Int64(i64::MAX));
+
+ let oversized_offset = (i64::MAX as u64) + 1;
+ let (oversized_key, oversized_value) =
build_offset_metadata_value(oversized_offset);
+ assert_eq!(oversized_key, "iggy_offset_str");
+ assert_eq!(
+ oversized_value,
+ bson::Bson::String(oversized_offset.to_string())
+ );
+ }
+
+ #[test]
+ fn given_partition_values_should_use_expected_bson_type() {
+ assert_eq!(
+ build_partition_metadata_value(i32::MAX as u32),
+ bson::Bson::Int32(i32::MAX)
+ );
+
+ let oversized_partition = (i32::MAX as u32) + 1;
+ assert_eq!(
+ build_partition_metadata_value(oversized_partition),
+ bson::Bson::Int64(i64::from(oversized_partition))
+ );
+ }
+
+ #[test]
+ fn given_timestamp_values_should_use_bounded_bson_datetime() {
+ assert_eq!(
+ build_bson_datetime_value(1_234_000),
+ bson::DateTime::from_millis(1_234)
+ );
+
+ assert_eq!(
+ build_bson_datetime_value(u64::MAX),
+ bson::DateTime::from_millis((u64::MAX / 1_000) as i64)
+ );
+ }
+
+ #[test]
+ fn given_checksum_values_should_use_lossless_bson_value() {
+ assert_eq!(
+ build_checksum_metadata_value(i64::MAX as u64),
+ bson::Bson::Int64(i64::MAX)
+ );
+
+ let oversized_checksum = (i64::MAX as u64) + 1;
+ assert_eq!(
+ build_checksum_metadata_value(oversized_checksum),
+ bson::Bson::String(oversized_checksum.to_string())
+ );
+ }
+
+ #[test]
+ fn given_cross_topic_messages_should_build_unique_document_ids() {
+ let first_topic = TopicMetadata {
+ stream: "test_stream".to_string(),
+ topic: "topic_primary".to_string(),
+ };
+ let second_topic = TopicMetadata {
+ stream: "test_stream".to_string(),
+ topic: "topic_secondary".to_string(),
+ };
+ let messages_metadata = MessagesMetadata {
+ partition_id: 0,
+ current_offset: 0,
+ schema: iggy_connector_sdk::Schema::Raw,
+ };
+
+ let first_id = build_composite_document_id(&first_topic,
&messages_metadata, 42);
+ let second_id = build_composite_document_id(&second_topic,
&messages_metadata, 42);
+ assert_ne!(
+ first_id, second_id,
+ "Composite document ID must differ for different topics"
+ );
+ }
+
+ #[test]
+ fn given_auto_create_collection_config_should_store_expected_option() {
+ let cases = [None, Some(true), Some(false)];
+
+ for auto_create_collection in cases {
+ let mut config = given_default_config();
+ config.auto_create_collection = auto_create_collection;
+
+ let sink = MongoDbSink::new(1, config);
+ assert_eq!(sink.config.auto_create_collection,
auto_create_collection);
+ }
+ }
+
+ // ---- is_transient_error tests ----
+
+ #[test]
+ fn given_io_timeout_error_should_be_transient() {
+ let io_err = std::io::Error::new(std::io::ErrorKind::TimedOut,
"connection timed out");
+ let e: mongodb::error::Error = io_err.into();
+ assert!(is_transient_error(&e));
+ }
+
+ #[test]
+ fn given_io_network_error_should_be_transient() {
+ let io_err =
+ std::io::Error::new(std::io::ErrorKind::ConnectionRefused,
"connection refused");
+ let e: mongodb::error::Error = io_err.into();
+ assert!(is_transient_error(&e));
+ }
+
+ #[test]
+ fn given_string_timeout_error_should_be_transient() {
+ let e = mongodb::error::Error::custom(String::from("server selection
timeout exceeded"));
+ assert!(is_transient_error(&e));
+ }
+
+ #[test]
+ fn given_string_pool_error_should_be_transient() {
+ let e = mongodb::error::Error::custom(String::from("connection pool
exhausted"));
+ assert!(is_transient_error(&e));
+ }
+
+ #[test]
+ fn given_auth_failure_string_should_not_be_transient() {
+ let e =
+ mongodb::error::Error::custom(String::from("authentication failed:
bad credentials"));
+ assert!(!is_transient_error(&e));
+ }
+
+ #[test]
+ fn given_duplicate_key_string_should_not_be_transient() {
+ let e = mongodb::error::Error::custom(String::from("duplicate key
error on collection"));
+ assert!(!is_transient_error(&e));
+ }
+
+ // ---- process_messages error propagation tests ----
+ // These tests verify that the sink does NOT silently lose data when
inserts fail.
+
+ /// Test contract: When MongoDB insert fails, process_messages MUST return
Err.
+ /// This prevents silent data loss where upstream commits while writes
failed.
+ ///
+ /// Given: A sink with no client (will fail on get_client)
+ /// When: process_messages is called with messages
+ /// Then: Returns Err (not Ok) and does NOT count failed messages as
processed
+ #[tokio::test]
+ async fn given_no_client_should_return_error_not_silent_ok() {
+ let config = given_default_config();
+ let sink = MongoDbSink::new(1, config);
+
+ // Sink has no client - this simulates connection failure
+ assert!(
+ sink.client.is_none(),
+ "Sink should not have client before connect"
+ );
+
+ let topic_metadata = TopicMetadata {
+ stream: "test_stream".to_string(),
+ topic: "test_topic".to_string(),
+ };
+ let messages_metadata = MessagesMetadata {
+ partition_id: 1,
+ current_offset: 0,
+ schema: iggy_connector_sdk::Schema::Raw,
+ };
+ let messages = vec![ConsumedMessage {
+ id: 1,
+ offset: 0,
+ timestamp: 1000,
+ origin_timestamp: 1000,
+ checksum: 0,
+ headers: None,
+ payload: iggy_connector_sdk::Payload::Raw(vec![1, 2, 3]),
+ }];
+
+ let result = sink
+ .process_messages(&topic_metadata, &messages_metadata, &messages)
+ .await;
+
+ // CRITICAL: Must return Err, not Ok(())
+ assert!(
+ result.is_err(),
+ "process_messages MUST return Err when client is unavailable -
silent data loss bug!"
+ );
+
+ assert_eq!(
+ sink.messages_processed.load(Ordering::Relaxed),
+ 0,
+ "messages_processed must only count SUCCESSFUL inserts"
+ );
+ }
+
+ /// Test contract: messages_processed only counts successfully inserted
messages.
+ ///
+ /// Given: Multiple messages where some may fail
+ /// When: process_messages handles them
+ /// Then: messages_processed reflects only successful writes
+ #[test]
+ fn given_new_sink_should_have_zero_messages_processed() {
+ let sink = MongoDbSink::new(1, given_default_config());
+ assert_eq!(
+ sink.messages_processed.load(Ordering::Relaxed),
+ 0,
+ "New sink must start with zero processed count"
+ );
+ assert_eq!(
+ sink.insertion_errors.load(Ordering::Relaxed),
+ 0,
+ "New sink must start with zero error count"
+ );
+ }
+}
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index 13afb04dc..e10b7bbeb 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -47,6 +47,7 @@ iggy_connector_sdk = { workspace = true, features = ["api"] }
keyring = { workspace = true }
lazy_static = { workspace = true }
libc = { workspace = true }
+mongodb = { version = "3.0", features = ["rustls-tls"] }
once_cell = { workspace = true }
predicates = { workspace = true }
rand = { workspace = true }
diff --git a/core/integration/tests/connectors/fixtures/mod.rs
b/core/integration/tests/connectors/fixtures/mod.rs
index 3c0ac0d93..6deae4866 100644
--- a/core/integration/tests/connectors/fixtures/mod.rs
+++ b/core/integration/tests/connectors/fixtures/mod.rs
@@ -19,12 +19,17 @@
mod elasticsearch;
mod iceberg;
+mod mongodb;
mod postgres;
mod quickwit;
mod wiremock;
pub use elasticsearch::{ElasticsearchSinkFixture,
ElasticsearchSourcePreCreatedFixture};
pub use iceberg::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps,
IcebergPreCreatedFixture};
+pub use mongodb::{
+ MongoDbOps, MongoDbSinkAutoCreateFixture, MongoDbSinkBatchFixture,
MongoDbSinkFailpointFixture,
+ MongoDbSinkFixture, MongoDbSinkJsonFixture, MongoDbSinkWriteConcernFixture,
+};
pub use postgres::{
PostgresOps, PostgresSinkByteaFixture, PostgresSinkFixture,
PostgresSinkJsonFixture,
PostgresSourceByteaFixture, PostgresSourceDeleteFixture,
PostgresSourceJsonFixture,
diff --git a/core/integration/tests/connectors/fixtures/mongodb/container.rs
b/core/integration/tests/connectors/fixtures/mongodb/container.rs
new file mode 100644
index 000000000..4c2b5177d
--- /dev/null
+++ b/core/integration/tests/connectors/fixtures/mongodb/container.rs
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use integration::harness::TestBinaryError;
+use mongodb::{Client, bson::doc, options::ClientOptions};
+use std::time::Duration;
+use testcontainers_modules::testcontainers::core::{IntoContainerPort, WaitFor};
+use testcontainers_modules::testcontainers::runners::AsyncRunner;
+use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage,
ImageExt};
+use tokio::time::sleep;
+use tracing::info;
+
+const MONGODB_IMAGE: &str = "mongo";
+const MONGODB_TAG: &str = "7";
+const MONGODB_PORT: u16 = 27017;
+const MONGODB_READY_MSG: &str = "Waiting for connections";
+const MONGODB_REPLICA_SET_NAME: &str = "rs0";
+const MONGODB_INIT_ATTEMPTS: usize = 120;
+const MONGODB_INIT_INTERVAL_MS: u64 = 250;
+
+pub(super) const DEFAULT_TEST_STREAM: &str = "test_stream";
+pub(super) const DEFAULT_TEST_TOPIC: &str = "test_topic";
+pub(super) const DEFAULT_SINK_COLLECTION: &str = "iggy_messages";
+pub(super) const DEFAULT_TEST_DATABASE: &str = "iggy_test";
+
+pub(super) const DEFAULT_POLL_ATTEMPTS: usize = 100;
+pub(super) const DEFAULT_POLL_INTERVAL_MS: u64 = 50;
+
+// Sink env vars
+pub(super) const ENV_SINK_CONNECTION_URI: &str =
+ "IGGY_CONNECTORS_SINK_MONGODB_PLUGIN_CONFIG_CONNECTION_URI";
+pub(super) const ENV_SINK_DATABASE: &str =
"IGGY_CONNECTORS_SINK_MONGODB_PLUGIN_CONFIG_DATABASE";
+pub(super) const ENV_SINK_COLLECTION: &str =
+ "IGGY_CONNECTORS_SINK_MONGODB_PLUGIN_CONFIG_COLLECTION";
+pub(super) const ENV_SINK_PAYLOAD_FORMAT: &str =
+ "IGGY_CONNECTORS_SINK_MONGODB_PLUGIN_CONFIG_PAYLOAD_FORMAT";
+pub(super) const ENV_SINK_INCLUDE_METADATA: &str =
+ "IGGY_CONNECTORS_SINK_MONGODB_PLUGIN_CONFIG_INCLUDE_METADATA";
+pub(super) const ENV_SINK_AUTO_CREATE_COLLECTION: &str =
+ "IGGY_CONNECTORS_SINK_MONGODB_PLUGIN_CONFIG_AUTO_CREATE_COLLECTION";
+pub(super) const ENV_SINK_BATCH_SIZE: &str =
+ "IGGY_CONNECTORS_SINK_MONGODB_PLUGIN_CONFIG_BATCH_SIZE";
+pub(super) const ENV_SINK_MAX_RETRIES: &str =
+ "IGGY_CONNECTORS_SINK_MONGODB_PLUGIN_CONFIG_MAX_RETRIES";
+pub(super) const ENV_SINK_RETRY_DELAY: &str =
+ "IGGY_CONNECTORS_SINK_MONGODB_PLUGIN_CONFIG_RETRY_DELAY";
+pub(super) const ENV_SINK_STREAMS_0_STREAM: &str =
"IGGY_CONNECTORS_SINK_MONGODB_STREAMS_0_STREAM";
+pub(super) const ENV_SINK_STREAMS_0_TOPICS: &str =
"IGGY_CONNECTORS_SINK_MONGODB_STREAMS_0_TOPICS";
+pub(super) const ENV_SINK_STREAMS_0_SCHEMA: &str =
"IGGY_CONNECTORS_SINK_MONGODB_STREAMS_0_SCHEMA";
+pub(super) const ENV_SINK_STREAMS_0_CONSUMER_GROUP: &str =
+ "IGGY_CONNECTORS_SINK_MONGODB_STREAMS_0_CONSUMER_GROUP";
+pub(super) const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_MONGODB_PATH";
+
+/// Base container management for MongoDB fixtures.
+pub struct MongoDbContainer {
+ #[allow(dead_code)]
+ container: ContainerAsync<GenericImage>,
+ pub(super) connection_uri: String,
+}
+
+impl MongoDbContainer {
+ pub(super) async fn start() -> Result<Self, TestBinaryError> {
+ let container = GenericImage::new(MONGODB_IMAGE, MONGODB_TAG)
+ .with_exposed_port(MONGODB_PORT.tcp())
+ .with_wait_for(WaitFor::message_on_stdout(MONGODB_READY_MSG))
+ .with_mapped_port(0, MONGODB_PORT.tcp())
+ .start()
+ .await
+ .map_err(|e| TestBinaryError::FixtureSetup {
+ fixture_type: "MongoDbContainer".to_string(),
+ message: format!("Failed to start container: {e}"),
+ })?;
+
+ info!("Started MongoDB container");
+
+ let mapped_port = container
+ .ports()
+ .await
+ .map_err(|e| TestBinaryError::FixtureSetup {
+ fixture_type: "MongoDbContainer".to_string(),
+ message: format!("Failed to get ports: {e}"),
+ })?
+ .map_to_host_port_ipv4(MONGODB_PORT)
+ .ok_or_else(|| TestBinaryError::FixtureSetup {
+ fixture_type: "MongoDbContainer".to_string(),
+ message: "No mapping for MongoDB port".to_string(),
+ })?;
+
+ // Standalone mode: plain URI. No ?directConnection=true needed
+ // (directConnection is only required for single-node replica sets).
+ let connection_uri = format!("mongodb://localhost:{mapped_port}");
+
+ info!("MongoDB container available at {connection_uri}");
+
+ Ok(Self {
+ container,
+ connection_uri,
+ })
+ }
+
+ pub(super) async fn start_single_node_replica_set(
+ enable_test_commands: bool,
+ ) -> Result<Self, TestBinaryError> {
+ let mut image = GenericImage::new(MONGODB_IMAGE, MONGODB_TAG)
+ .with_exposed_port(MONGODB_PORT.tcp())
+ .with_wait_for(WaitFor::message_on_stdout(MONGODB_READY_MSG))
+ .with_mapped_port(0, MONGODB_PORT.tcp())
+ .with_cmd(["--replSet", MONGODB_REPLICA_SET_NAME,
"--bind_ip_all"]);
+
+ if enable_test_commands {
+ image = image.with_cmd([
+ "--replSet",
+ MONGODB_REPLICA_SET_NAME,
+ "--bind_ip_all",
+ "--setParameter",
+ "enableTestCommands=1",
+ ]);
+ }
+
+ let container = image
+ .start()
+ .await
+ .map_err(|e| TestBinaryError::FixtureSetup {
+ fixture_type: "MongoDbContainer".to_string(),
+ message: format!("Failed to start replica-set container: {e}"),
+ })?;
+
+ let mapped_port = container
+ .ports()
+ .await
+ .map_err(|e| TestBinaryError::FixtureSetup {
+ fixture_type: "MongoDbContainer".to_string(),
+ message: format!("Failed to get ports: {e}"),
+ })?
+ .map_to_host_port_ipv4(MONGODB_PORT)
+ .ok_or_else(|| TestBinaryError::FixtureSetup {
+ fixture_type: "MongoDbContainer".to_string(),
+ message: "No mapping for MongoDB port".to_string(),
+ })?;
+
+ let bootstrap_uri =
format!("mongodb://localhost:{mapped_port}/?directConnection=true");
+ let options = ClientOptions::parse(&bootstrap_uri).await.map_err(|e| {
+ TestBinaryError::FixtureSetup {
+ fixture_type: "MongoDbContainer".to_string(),
+ message: format!("Failed to parse bootstrap URI: {e}"),
+ }
+ })?;
+ let bootstrap_client =
+ Client::with_options(options).map_err(|e|
TestBinaryError::FixtureSetup {
+ fixture_type: "MongoDbContainer".to_string(),
+ message: format!("Failed to create bootstrap client: {e}"),
+ })?;
+
+ let _ = bootstrap_client
+ .database("admin")
+ .run_command(doc! {
+ "replSetInitiate": {
+ "_id": MONGODB_REPLICA_SET_NAME,
+ "members": [
+ {
+ "_id": 0,
+ "host": format!("localhost:{MONGODB_PORT}")
+ }
+ ]
+ }
+ })
+ .await;
+
+ let mut initialized = false;
+ for _ in 0..MONGODB_INIT_ATTEMPTS {
+ let status = bootstrap_client
+ .database("admin")
+ .run_command(doc! { "replSetGetStatus": 1 })
+ .await;
+
+ if let Ok(status) = status
+ && status.get_i32("myState").ok() == Some(1)
+ {
+ initialized = true;
+ break;
+ }
+
+ sleep(Duration::from_millis(MONGODB_INIT_INTERVAL_MS)).await;
+ }
+
+ if !initialized {
+ return Err(TestBinaryError::FixtureSetup {
+ fixture_type: "MongoDbContainer".to_string(),
+ message: "Replica set failed to reach PRIMARY
state".to_string(),
+ });
+ }
+
+ let connection_uri = format!(
+
"mongodb://localhost:{mapped_port}/?replicaSet={MONGODB_REPLICA_SET_NAME}&directConnection=true"
+ );
+ info!("MongoDB single-node replica set available at {connection_uri}");
+
+ Ok(Self {
+ container,
+ connection_uri,
+ })
+ }
+
+ pub async fn create_client(&self) -> Result<Client, TestBinaryError> {
+ let options = ClientOptions::parse(&self.connection_uri)
+ .await
+ .map_err(|e| TestBinaryError::FixtureSetup {
+ fixture_type: "MongoDbContainer".to_string(),
+ message: format!("Failed to parse URI: {e}"),
+ })?;
+
+ Client::with_options(options).map_err(|e|
TestBinaryError::FixtureSetup {
+ fixture_type: "MongoDbContainer".to_string(),
+ message: format!("Failed to create client: {e}"),
+ })
+ }
+}
+
+/// Common MongoDB operations for fixtures.
+pub trait MongoDbOps: Sync {
+ fn container(&self) -> &MongoDbContainer;
+
+ fn create_client(
+ &self,
+ ) -> impl std::future::Future<Output = Result<Client, TestBinaryError>> +
Send {
+ self.container().create_client()
+ }
+}
diff --git a/core/integration/tests/connectors/fixtures/mod.rs
b/core/integration/tests/connectors/fixtures/mongodb/mod.rs
similarity index 54%
copy from core/integration/tests/connectors/fixtures/mod.rs
copy to core/integration/tests/connectors/fixtures/mongodb/mod.rs
index 3c0ac0d93..cfaf97ec6 100644
--- a/core/integration/tests/connectors/fixtures/mod.rs
+++ b/core/integration/tests/connectors/fixtures/mongodb/mod.rs
@@ -17,18 +17,11 @@
* under the License.
*/
-mod elasticsearch;
-mod iceberg;
-mod postgres;
-mod quickwit;
-mod wiremock;
+mod container;
+mod sink;
-pub use elasticsearch::{ElasticsearchSinkFixture,
ElasticsearchSourcePreCreatedFixture};
-pub use iceberg::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps,
IcebergPreCreatedFixture};
-pub use postgres::{
- PostgresOps, PostgresSinkByteaFixture, PostgresSinkFixture,
PostgresSinkJsonFixture,
- PostgresSourceByteaFixture, PostgresSourceDeleteFixture,
PostgresSourceJsonFixture,
- PostgresSourceJsonbFixture, PostgresSourceMarkFixture, PostgresSourceOps,
+pub use container::MongoDbOps;
+pub use sink::{
+ MongoDbSinkAutoCreateFixture, MongoDbSinkBatchFixture,
MongoDbSinkFailpointFixture,
+ MongoDbSinkFixture, MongoDbSinkJsonFixture, MongoDbSinkWriteConcernFixture,
};
-pub use quickwit::{QuickwitFixture, QuickwitOps, QuickwitPreCreatedFixture};
-pub use wiremock::{WireMockDirectFixture, WireMockWrappedFixture};
diff --git a/core/integration/tests/connectors/fixtures/mongodb/sink.rs
b/core/integration/tests/connectors/fixtures/mongodb/sink.rs
new file mode 100644
index 000000000..8fa978275
--- /dev/null
+++ b/core/integration/tests/connectors/fixtures/mongodb/sink.rs
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::container::{
+ DEFAULT_POLL_ATTEMPTS, DEFAULT_POLL_INTERVAL_MS, DEFAULT_SINK_COLLECTION,
+ DEFAULT_TEST_DATABASE, DEFAULT_TEST_STREAM, DEFAULT_TEST_TOPIC,
+ ENV_SINK_AUTO_CREATE_COLLECTION, ENV_SINK_BATCH_SIZE, ENV_SINK_COLLECTION,
+ ENV_SINK_CONNECTION_URI, ENV_SINK_DATABASE, ENV_SINK_INCLUDE_METADATA,
ENV_SINK_MAX_RETRIES,
+ ENV_SINK_PATH, ENV_SINK_PAYLOAD_FORMAT, ENV_SINK_RETRY_DELAY,
+ ENV_SINK_STREAMS_0_CONSUMER_GROUP, ENV_SINK_STREAMS_0_SCHEMA,
ENV_SINK_STREAMS_0_STREAM,
+ ENV_SINK_STREAMS_0_TOPICS, MongoDbContainer, MongoDbOps,
+};
+use async_trait::async_trait;
+use futures::TryStreamExt;
+use integration::harness::{TestBinaryError, TestFixture};
+use mongodb::{Client, bson::Document};
+use std::collections::HashMap;
+use std::time::Duration;
+use tokio::time::sleep;
+use tracing::info;
+
+/// MongoDB sink connector fixture with binary payload format (default).
+pub struct MongoDbSinkFixture {
+ container: MongoDbContainer,
+ payload_format: &'static str,
+ auto_create: bool,
+}
+
+impl MongoDbOps for MongoDbSinkFixture {
+ fn container(&self) -> &MongoDbContainer {
+ &self.container
+ }
+}
+
+impl MongoDbSinkFixture {
+ /// Wait for documents to appear in a MongoDB collection, polling until
expected count reached.
+ ///
+ /// Returns an error if the expected count is not reached within the poll
attempts.
+ pub async fn wait_for_documents(
+ &self,
+ client: &Client,
+ collection_name: &str,
+ expected: usize,
+ ) -> Result<Vec<Document>, TestBinaryError> {
+ let db = client.database(DEFAULT_TEST_DATABASE);
+ let collection = db.collection::<Document>(collection_name);
+
+ for _ in 0..DEFAULT_POLL_ATTEMPTS {
+ let cursor = collection
+ .find(mongodb::bson::doc! {})
+ .sort(mongodb::bson::doc! { "iggy_offset": 1 })
+ .await;
+
+ if let Ok(c) = cursor
+ && let Ok(docs) = c.try_collect::<Vec<_>>().await
+ && docs.len() >= expected
+ {
+ info!(
+ "Found {} documents in MongoDB collection
'{collection_name}'",
+ docs.len()
+ );
+ return Ok(docs);
+ }
+ sleep(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS)).await;
+ }
+
+ Err(TestBinaryError::InvalidState {
+ message: format!(
+ "Expected at least {expected} documents in '{collection_name}'
after {} attempts",
+ DEFAULT_POLL_ATTEMPTS
+ ),
+ })
+ }
+
+ /// Count all documents in a collection.
+ pub async fn count_documents_in_collection(
+ &self,
+ client: &Client,
+ collection_name: &str,
+ ) -> Result<u64, TestBinaryError> {
+ let db = client.database(DEFAULT_TEST_DATABASE);
+ let collection = db.collection::<Document>(collection_name);
+ collection
+ .count_documents(mongodb::bson::doc! {})
+ .await
+ .map_err(|e| TestBinaryError::InvalidState {
+ message: format!("Failed to count documents: {e}"),
+ })
+ }
+
+ /// Check whether a named collection exists in the test database.
+ pub async fn collection_exists(
+ &self,
+ client: &Client,
+ collection_name: &str,
+ ) -> Result<bool, TestBinaryError> {
+ let db = client.database(DEFAULT_TEST_DATABASE);
+ let names =
+ db.list_collection_names()
+ .await
+ .map_err(|e| TestBinaryError::InvalidState {
+ message: format!("Failed to list collections: {e}"),
+ })?;
+ Ok(names.contains(&collection_name.to_string()))
+ }
+
+ /// Enable `failCommand` failpoint once with caller-supplied failpoint
data.
+ pub async fn configure_fail_command_once(
+ &self,
+ client: &Client,
+ data: Document,
+ ) -> Result<(), TestBinaryError> {
+ client
+ .database("admin")
+ .run_command(mongodb::bson::doc! {
+ "configureFailPoint": "failCommand",
+ "mode": { "times": 1 },
+ "data": data,
+ })
+ .await
+ .map(|_| ())
+ .map_err(|e| TestBinaryError::FixtureSetup {
+ fixture_type: "MongoDbSinkFixture".to_string(),
+ message: format!("Failed to configure failCommand failpoint:
{e}"),
+ })
+ }
+
+ /// Disable `failCommand` failpoint explicitly.
+ pub async fn disable_fail_command(&self, client: &Client) -> Result<(),
TestBinaryError> {
+ client
+ .database("admin")
+ .run_command(mongodb::bson::doc! {
+ "configureFailPoint": "failCommand",
+ "mode": "off",
+ })
+ .await
+ .map(|_| ())
+ .map_err(|e| TestBinaryError::FixtureSetup {
+ fixture_type: "MongoDbSinkFixture".to_string(),
+ message: format!("Failed to disable failCommand failpoint:
{e}"),
+ })
+ }
+}
+
+#[async_trait]
+impl TestFixture for MongoDbSinkFixture {
+ async fn setup() -> Result<Self, TestBinaryError> {
+ let container = MongoDbContainer::start().await?;
+ Ok(Self {
+ container,
+ payload_format: "binary",
+ auto_create: false,
+ })
+ }
+
+ fn connectors_runtime_envs(&self) -> HashMap<String, String> {
+ let mut envs = HashMap::new();
+ envs.insert(
+ ENV_SINK_CONNECTION_URI.to_string(),
+ self.container.connection_uri.clone(),
+ );
+ envs.insert(
+ ENV_SINK_DATABASE.to_string(),
+ DEFAULT_TEST_DATABASE.to_string(),
+ );
+ envs.insert(
+ ENV_SINK_COLLECTION.to_string(),
+ DEFAULT_SINK_COLLECTION.to_string(),
+ );
+ envs.insert(
+ ENV_SINK_PAYLOAD_FORMAT.to_string(),
+ self.payload_format.to_string(),
+ );
+ envs.insert(ENV_SINK_INCLUDE_METADATA.to_string(), "true".to_string());
+ envs.insert(
+ ENV_SINK_STREAMS_0_STREAM.to_string(),
+ DEFAULT_TEST_STREAM.to_string(),
+ );
+ envs.insert(
+ ENV_SINK_STREAMS_0_TOPICS.to_string(),
+ format!("[{}]", DEFAULT_TEST_TOPIC),
+ );
+ envs.insert(ENV_SINK_STREAMS_0_SCHEMA.to_string(), "raw".to_string());
+ envs.insert(
+ ENV_SINK_STREAMS_0_CONSUMER_GROUP.to_string(),
+ "mongodb_sink_cg".to_string(),
+ );
+ envs.insert(
+ ENV_SINK_PATH.to_string(),
+ "../../target/debug/libiggy_connector_mongodb_sink".to_string(),
+ );
+ if self.auto_create {
+ envs.insert(
+ ENV_SINK_AUTO_CREATE_COLLECTION.to_string(),
+ "true".to_string(),
+ );
+ }
+ envs
+ }
+}
+
+/// MongoDB sink fixture with JSON payload format.
+pub struct MongoDbSinkJsonFixture {
+ inner: MongoDbSinkFixture,
+}
+
+impl std::ops::Deref for MongoDbSinkJsonFixture {
+ type Target = MongoDbSinkFixture;
+ fn deref(&self) -> &Self::Target {
+ &self.inner
+ }
+}
+
+#[async_trait]
+impl TestFixture for MongoDbSinkJsonFixture {
+ async fn setup() -> Result<Self, TestBinaryError> {
+ let container = MongoDbContainer::start().await?;
+ Ok(Self {
+ inner: MongoDbSinkFixture {
+ container,
+ payload_format: "json",
+ auto_create: false,
+ },
+ })
+ }
+
+ fn connectors_runtime_envs(&self) -> HashMap<String, String> {
+ let mut envs = self.inner.connectors_runtime_envs();
+ // Schema must be "json" for the runtime to route messages correctly.
+ // Already set in base, but override STREAMS_0_SCHEMA explicitly.
+ envs.insert(ENV_SINK_STREAMS_0_SCHEMA.to_string(), "json".to_string());
+ envs
+ }
+}
+
+/// MongoDB sink fixture with auto_create_collection enabled.
+pub struct MongoDbSinkAutoCreateFixture {
+ inner: MongoDbSinkFixture,
+}
+
+impl std::ops::Deref for MongoDbSinkAutoCreateFixture {
+ type Target = MongoDbSinkFixture;
+ fn deref(&self) -> &Self::Target {
+ &self.inner
+ }
+}
+
+#[async_trait]
+impl TestFixture for MongoDbSinkAutoCreateFixture {
+ async fn setup() -> Result<Self, TestBinaryError> {
+ let container = MongoDbContainer::start().await?;
+ Ok(Self {
+ inner: MongoDbSinkFixture {
+ container,
+ payload_format: "binary",
+ auto_create: true,
+ },
+ })
+ }
+
+ fn connectors_runtime_envs(&self) -> HashMap<String, String> {
+ // auto_create flag is set in the base connectors_runtime_envs()
+ self.inner.connectors_runtime_envs()
+ }
+}
+
+/// MongoDB sink fixture with batch_size set to 10 for large-batch tests.
+pub struct MongoDbSinkBatchFixture {
+ inner: MongoDbSinkFixture,
+}
+
+impl std::ops::Deref for MongoDbSinkBatchFixture {
+ type Target = MongoDbSinkFixture;
+ fn deref(&self) -> &Self::Target {
+ &self.inner
+ }
+}
+
+#[async_trait]
+impl TestFixture for MongoDbSinkBatchFixture {
+ async fn setup() -> Result<Self, TestBinaryError> {
+ let container = MongoDbContainer::start().await?;
+ Ok(Self {
+ inner: MongoDbSinkFixture {
+ container,
+ payload_format: "binary",
+ auto_create: false,
+ },
+ })
+ }
+
+ fn connectors_runtime_envs(&self) -> HashMap<String, String> {
+ let mut envs = self.inner.connectors_runtime_envs();
+ envs.insert(ENV_SINK_BATCH_SIZE.to_string(), "10".to_string());
+ envs
+ }
+}
+
+/// MongoDB sink fixture backed by single-node replica set and write concern
timeout URI.
+pub struct MongoDbSinkWriteConcernFixture {
+ inner: MongoDbSinkFixture,
+ connection_uri: String,
+}
+
+impl std::ops::Deref for MongoDbSinkWriteConcernFixture {
+ type Target = MongoDbSinkFixture;
+ fn deref(&self) -> &Self::Target {
+ &self.inner
+ }
+}
+
+#[async_trait]
+impl TestFixture for MongoDbSinkWriteConcernFixture {
+ async fn setup() -> Result<Self, TestBinaryError> {
+ let container =
MongoDbContainer::start_single_node_replica_set(false).await?;
+ let connection_uri = format!(
+ "{}&w=2&wtimeoutMS=200&retryWrites=false",
+ container.connection_uri
+ );
+ Ok(Self {
+ inner: MongoDbSinkFixture {
+ container,
+ payload_format: "binary",
+ auto_create: false,
+ },
+ connection_uri,
+ })
+ }
+
+ fn connectors_runtime_envs(&self) -> HashMap<String, String> {
+ let mut envs = self.inner.connectors_runtime_envs();
+ envs.insert(
+ ENV_SINK_CONNECTION_URI.to_string(),
+ self.connection_uri.clone(),
+ );
+ // Keep the signal clean: fail once and report failure instead of
retrying.
+ envs.insert(ENV_SINK_MAX_RETRIES.to_string(), "1".to_string());
+ envs.insert(ENV_SINK_RETRY_DELAY.to_string(), "50ms".to_string());
+ envs
+ }
+}
+
+/// MongoDB sink fixture backed by single-node replica set with test commands
enabled.
+pub struct MongoDbSinkFailpointFixture {
+ inner: MongoDbSinkFixture,
+}
+
+impl std::ops::Deref for MongoDbSinkFailpointFixture {
+ type Target = MongoDbSinkFixture;
+ fn deref(&self) -> &Self::Target {
+ &self.inner
+ }
+}
+
+#[async_trait]
+impl TestFixture for MongoDbSinkFailpointFixture {
+ async fn setup() -> Result<Self, TestBinaryError> {
+ let container =
MongoDbContainer::start_single_node_replica_set(true).await?;
+ Ok(Self {
+ inner: MongoDbSinkFixture {
+ container,
+ payload_format: "binary",
+ auto_create: false,
+ },
+ })
+ }
+
+ fn connectors_runtime_envs(&self) -> HashMap<String, String> {
+ let mut envs = self.inner.connectors_runtime_envs();
+ envs.insert(ENV_SINK_MAX_RETRIES.to_string(), "2".to_string());
+ envs.insert(ENV_SINK_RETRY_DELAY.to_string(), "50ms".to_string());
+ envs
+ }
+}
diff --git a/core/integration/tests/connectors/mod.rs
b/core/integration/tests/connectors/mod.rs
index a85453d52..0d9352904 100644
--- a/core/integration/tests/connectors/mod.rs
+++ b/core/integration/tests/connectors/mod.rs
@@ -22,6 +22,7 @@ mod elasticsearch;
mod fixtures;
mod http_config_provider;
mod iceberg;
+mod mongodb;
mod postgres;
mod quickwit;
mod random;
diff --git a/core/integration/tests/connectors/fixtures/mod.rs
b/core/integration/tests/connectors/mongodb/mod.rs
similarity index 54%
copy from core/integration/tests/connectors/fixtures/mod.rs
copy to core/integration/tests/connectors/mongodb/mod.rs
index 3c0ac0d93..d44706b49 100644
--- a/core/integration/tests/connectors/fixtures/mod.rs
+++ b/core/integration/tests/connectors/mongodb/mod.rs
@@ -17,18 +17,8 @@
* under the License.
*/
-mod elasticsearch;
-mod iceberg;
-mod postgres;
-mod quickwit;
-mod wiremock;
+mod mongodb_sink;
-pub use elasticsearch::{ElasticsearchSinkFixture,
ElasticsearchSourcePreCreatedFixture};
-pub use iceberg::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps,
IcebergPreCreatedFixture};
-pub use postgres::{
- PostgresOps, PostgresSinkByteaFixture, PostgresSinkFixture,
PostgresSinkJsonFixture,
- PostgresSourceByteaFixture, PostgresSourceDeleteFixture,
PostgresSourceJsonFixture,
- PostgresSourceJsonbFixture, PostgresSourceMarkFixture, PostgresSourceOps,
-};
-pub use quickwit::{QuickwitFixture, QuickwitOps, QuickwitPreCreatedFixture};
-pub use wiremock::{WireMockDirectFixture, WireMockWrappedFixture};
+const TEST_MESSAGE_COUNT: usize = 3;
+const POLL_ATTEMPTS: usize = 100;
+const POLL_INTERVAL_MS: u64 = 50;
diff --git a/core/integration/tests/connectors/mongodb/mongodb_sink.rs
b/core/integration/tests/connectors/mongodb/mongodb_sink.rs
new file mode 100644
index 000000000..0bf666a4b
--- /dev/null
+++ b/core/integration/tests/connectors/mongodb/mongodb_sink.rs
@@ -0,0 +1,1162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::{POLL_ATTEMPTS, POLL_INTERVAL_MS, TEST_MESSAGE_COUNT};
+use crate::connectors::fixtures::{
+ MongoDbOps, MongoDbSinkAutoCreateFixture, MongoDbSinkBatchFixture,
MongoDbSinkFailpointFixture,
+ MongoDbSinkFixture, MongoDbSinkJsonFixture, MongoDbSinkWriteConcernFixture,
+};
+use bytes::Bytes;
+use iggy::prelude::{IggyMessage, Partitioning};
+use iggy_binary_protocol::MessageClient;
+use iggy_common::Identifier;
+use iggy_connector_sdk::api::{ConnectorRuntimeStats, ConnectorStatus,
SinkInfoResponse};
+use integration::harness::seeds;
+use integration::iggy_harness;
+use mongodb::bson::{Document, doc};
+use reqwest::Client as HttpClient;
+use std::time::Duration;
+use tokio::time::sleep;
+
+const DEFAULT_TEST_DATABASE: &str = "iggy_test";
+const DEFAULT_SINK_COLLECTION: &str = "iggy_messages";
+const LARGE_BATCH_COUNT: usize = 50;
+const MONGODB_SINK_KEY: &str = "mongodb";
+
+fn build_expected_document_id(message_id: u128) -> String {
+ build_expected_document_id_for_topic(seeds::names::TOPIC, message_id)
+}
+
+fn build_expected_document_id_for_topic(topic_name: &str, message_id: u128) ->
String {
+ format!("{}:{}:{}:{message_id}", seeds::names::STREAM, topic_name, 0)
+}
+
+#[iggy_harness(
+ server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
+ seed = seeds::connector_stream
+)]
+async fn json_messages_sink_to_mongodb(harness: &TestHarness, fixture:
MongoDbSinkJsonFixture) {
+ let client = harness.root_client().await.unwrap();
+ let mongo_client = fixture
+ .create_client()
+ .await
+ .expect("Failed to create MongoDB client");
+
+ let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+ let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+
+ let json_payloads: Vec<serde_json::Value> = vec![
+ serde_json::json!({"name": "Alice", "age": 30}),
+ serde_json::json!({"name": "Bob", "score": 99}),
+ serde_json::json!({"name": "Carol", "active": true}),
+ ];
+
+ let mut messages: Vec<IggyMessage> = json_payloads
+ .iter()
+ .enumerate()
+ .map(|(i, payload)| {
+ let bytes = serde_json::to_vec(payload).expect("Failed to
serialize");
+ IggyMessage::builder()
+ .id((i + 1) as u128)
+ .payload(Bytes::from(bytes))
+ .build()
+ .expect("Failed to build message")
+ })
+ .collect();
+
+ client
+ .send_messages(
+ &stream_id,
+ &topic_id,
+ &Partitioning::partition_id(0),
+ &mut messages,
+ )
+ .await
+ .expect("Failed to send messages");
+
+ // Wait for connector to consume and insert into MongoDB.
+ let docs = fixture
+ .wait_for_documents(&mongo_client, DEFAULT_SINK_COLLECTION,
TEST_MESSAGE_COUNT)
+ .await
+ .expect("Documents did not appear in MongoDB");
+
+ assert_eq!(docs.len(), TEST_MESSAGE_COUNT);
+
+ // Verify metadata fields are present on first document.
+ let first = &docs[0];
+ assert!(
+ first.contains_key("iggy_offset"),
+ "Expected iggy_offset field"
+ );
+ assert!(
+ first.contains_key("iggy_stream"),
+ "Expected iggy_stream field"
+ );
+ assert!(
+ first.contains_key("iggy_topic"),
+ "Expected iggy_topic field"
+ );
+ assert!(
+ first.contains_key("iggy_partition_id"),
+ "Expected iggy_partition_id field"
+ );
+ assert!(
+ first.contains_key("iggy_timestamp"),
+ "Expected iggy_timestamp field"
+ );
+
+ // Verify offset sequence is contiguous.
+ for (i, doc) in docs.iter().enumerate() {
+ let offset = doc.get_i64("iggy_offset").expect("iggy_offset missing");
+ assert_eq!(offset, i as i64, "Offset mismatch at document {i}");
+ }
+
+ // Verify payload is stored as a BSON Document (queryable) not Binary.
+ let payload = first.get("payload").expect("payload field missing");
+ assert!(
+ matches!(payload, mongodb::bson::Bson::Document(_)),
+ "Expected payload to be BSON Document for json format, got:
{payload:?}"
+ );
+}
+
+#[iggy_harness(
+ server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
+ seed = seeds::connector_stream
+)]
+async fn binary_messages_sink_as_bson_binary(harness: &TestHarness, fixture:
MongoDbSinkFixture) {
+ let client = harness.root_client().await.unwrap();
+ let mongo_client = fixture
+ .create_client()
+ .await
+ .expect("Failed to create MongoDB client");
+
+ let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+ let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+
+ let raw_payloads: Vec<Vec<u8>> = vec![
+ b"plain text message".to_vec(),
+ vec![0x00, 0x01, 0x02, 0xFF, 0xFE, 0xFD],
+ vec![0xDE, 0xAD, 0xBE, 0xEF],
+ ];
+
+ let mut messages: Vec<IggyMessage> = raw_payloads
+ .iter()
+ .enumerate()
+ .map(|(i, payload)| {
+ IggyMessage::builder()
+ .id((i + 1) as u128)
+ .payload(Bytes::from(payload.clone()))
+ .build()
+ .expect("Failed to build message")
+ })
+ .collect();
+
+ client
+ .send_messages(
+ &stream_id,
+ &topic_id,
+ &Partitioning::partition_id(0),
+ &mut messages,
+ )
+ .await
+ .expect("Failed to send messages");
+
+ let docs = fixture
+ .wait_for_documents(&mongo_client, DEFAULT_SINK_COLLECTION,
raw_payloads.len())
+ .await
+ .expect("Documents did not appear");
+
+ assert_eq!(docs.len(), raw_payloads.len());
+
+ for (i, doc) in docs.iter().enumerate() {
+ let payload = doc.get("payload").expect("payload field missing");
+ match payload {
+ mongodb::bson::Bson::Binary(bin) => {
+ assert_eq!(
+ bin.subtype,
+ mongodb::bson::spec::BinarySubtype::Generic,
+ "Expected Generic subtype at doc {i}"
+ );
+ assert_eq!(
+ bin.bytes, raw_payloads[i],
+ "Payload bytes mismatch at doc {i}"
+ );
+ }
+ other => panic!("Expected Binary, got {other:?} at doc {i}"),
+ }
+ }
+}
+
+#[iggy_harness(
+ server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
+ seed = seeds::connector_stream
+)]
+async fn large_batch_processed_correctly(harness: &TestHarness, fixture:
MongoDbSinkBatchFixture) {
+ let client = harness.root_client().await.unwrap();
+ let mongo_client = fixture
+ .create_client()
+ .await
+ .expect("Failed to create MongoDB client");
+
+ let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+ let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+
+ let mut messages: Vec<IggyMessage> = (0..LARGE_BATCH_COUNT)
+ .map(|i| {
+ let payload =
+ serde_json::to_vec(&serde_json::json!({"idx":
i})).expect("Failed to serialize");
+ IggyMessage::builder()
+ .id((i + 1) as u128)
+ .payload(Bytes::from(payload))
+ .build()
+ .expect("Failed to build message")
+ })
+ .collect();
+
+ client
+ .send_messages(
+ &stream_id,
+ &topic_id,
+ &Partitioning::partition_id(0),
+ &mut messages,
+ )
+ .await
+ .expect("Failed to send messages");
+
+ let docs = fixture
+ .wait_for_documents(&mongo_client, DEFAULT_SINK_COLLECTION,
LARGE_BATCH_COUNT)
+ .await
+ .expect("Not all documents appeared");
+
+ assert!(
+ docs.len() >= LARGE_BATCH_COUNT,
+ "Expected at least {LARGE_BATCH_COUNT} documents, got {}",
+ docs.len()
+ );
+
+ // Verify offsets are contiguous (0..N).
+ for (i, doc) in docs.iter().enumerate() {
+ let offset = doc.get_i64("iggy_offset").expect("iggy_offset missing");
+ assert_eq!(offset, i as i64, "Offset gap detected at position {i}");
+ }
+}
+
+#[iggy_harness(
+ server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
+ seed = seeds::connector_stream
+)]
+async fn duplicate_key_is_explicit_failure_and_not_silent_success(
+ harness: &TestHarness,
+ fixture: MongoDbSinkFixture,
+) {
+ let client = harness.root_client().await.unwrap();
+ let api_address = harness
+ .connectors_runtime()
+ .expect("connector runtime should be available")
+ .http_url();
+ let http_client = HttpClient::new();
+ let mongo_client = fixture
+ .create_client()
+ .await
+ .expect("Failed to create MongoDB client");
+
+ let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+ let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+
+ let collection = mongo_client
+ .database(DEFAULT_TEST_DATABASE)
+ .collection::<Document>(DEFAULT_SINK_COLLECTION);
+
+ let preseeded_document = doc! {
+ "_id": build_expected_document_id(2),
+ "seed_marker": "preseed-unchanged",
+ "payload": "preseeded"
+ };
+
+ collection
+ .insert_one(preseeded_document)
+ .await
+ .expect("Failed to pre-seed duplicate _id document");
+
+ // Create deterministic insert set:
+ // - "1" should be inserted
+ // - "2" collides with pre-seeded _id
+ // - "3" should be inserted because unordered insert_many continues past
duplicates.
+ let mut messages: Vec<IggyMessage> = vec![
+ IggyMessage::builder()
+ .id(1)
+ .payload(Bytes::from_static(b"message-1"))
+ .build()
+ .expect("Failed to build message 1"),
+ IggyMessage::builder()
+ .id(2)
+ .payload(Bytes::from_static(b"message-2"))
+ .build()
+ .expect("Failed to build message 2"),
+ IggyMessage::builder()
+ .id(3)
+ .payload(Bytes::from_static(b"message-3"))
+ .build()
+ .expect("Failed to build message 3"),
+ ];
+
+ client
+ .send_messages(
+ &stream_id,
+ &topic_id,
+ &Partitioning::partition_id(0),
+ &mut messages,
+ )
+ .await
+ .expect("Failed to send messages");
+
+ // Wait until the sink processes the batch and at least the first
non-conflicting message appears.
+ let mut first_message_inserted = false;
+ for _ in 0..POLL_ATTEMPTS {
+ if collection
+ .find_one(doc! { "_id": build_expected_document_id(1) })
+ .await
+ .expect("Failed to query _id=1")
+ .is_some()
+ {
+ first_message_inserted = true;
+ break;
+ }
+ sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
+ }
+
+ assert!(
+ first_message_inserted,
+ "Expected first non-conflicting message (_id=1) to be inserted"
+ );
+
+ let preseeded_after = collection
+ .find_one(doc! { "_id": build_expected_document_id(2) })
+ .await
+ .expect("Failed to query pre-seeded document")
+ .expect("Pre-seeded _id=2 document should still exist");
+
+ assert_eq!(
+ preseeded_after
+ .get_str("seed_marker")
+ .expect("seed_marker missing from pre-seeded document"),
+ "preseed-unchanged",
+ "Pre-seeded document must remain unchanged"
+ );
+
+ let duplicate_count = collection
+ .count_documents(doc! { "_id": build_expected_document_id(2) })
+ .await
+ .expect("Failed to count _id=2 documents");
+ assert_eq!(
+ duplicate_count, 1,
+ "Duplicate _id must not create extra documents"
+ );
+
+ let mut sink_error_reported = false;
+ let mut processed_messages = u64::MAX;
+ let mut sink_errors = 0_u64;
+ let mut third_message_inserted = false;
+
+ for _ in 0..POLL_ATTEMPTS {
+ third_message_inserted = collection
+ .find_one(doc! { "_id": build_expected_document_id(3) })
+ .await
+ .expect("Failed to query _id=3")
+ .is_some();
+
+ let sinks_response = http_client
+ .get(format!("{}/sinks", api_address))
+ .send()
+ .await
+ .expect("Failed to get sinks");
+ assert_eq!(sinks_response.status(), 200);
+ let sinks: Vec<SinkInfoResponse> =
sinks_response.json().await.expect("Invalid sinks JSON");
+ let sink = sinks
+ .iter()
+ .find(|s| s.key == MONGODB_SINK_KEY)
+ .expect("MongoDB sink not found in /sinks response");
+ sink_error_reported = sink.last_error.is_some();
+
+ let stats_response = http_client
+ .get(format!("{}/stats", api_address))
+ .send()
+ .await
+ .expect("Failed to get stats");
+ assert_eq!(stats_response.status(), 200);
+ let stats: ConnectorRuntimeStats =
stats_response.json().await.expect("Invalid stats JSON");
+ let sink_stats = stats
+ .connectors
+ .iter()
+ .find(|c| c.key == MONGODB_SINK_KEY)
+ .expect("MongoDB sink stats not found");
+ processed_messages = sink_stats
+ .messages_processed
+ .expect("messages_processed should be present for sink stats");
+ sink_errors = sink_stats.errors;
+
+ if third_message_inserted
+ && !sink_error_reported
+ && sink_errors == 0
+ && processed_messages == 3
+ {
+ break;
+ }
+
+ sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
+ }
+
+ assert!(
+ third_message_inserted,
+ "Message after duplicate conflict should still be inserted with
unordered insert_many"
+ );
+ assert!(
+ !sink_error_reported,
+ "Duplicate key collision should be idempotent"
+ );
+ assert_eq!(
+ sink_errors, 0,
+ "Idempotent duplicate handling should not count as sink error"
+ );
+ assert_eq!(
+ processed_messages, 3,
+ "messages_processed should reflect the successful runtime batch
processing count"
+ );
+}
+
+#[iggy_harness(
+ server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
+ seed = seeds::connector_stream
+)]
+async fn ordered_duplicate_partial_insert_has_exact_accounting(
+ harness: &TestHarness,
+ fixture: MongoDbSinkFixture,
+) {
+ let client = harness.root_client().await.unwrap();
+ let api_address = harness
+ .connectors_runtime()
+ .expect("connector runtime should be available")
+ .http_url();
+ let http_client = HttpClient::new();
+ let mongo_client = fixture
+ .create_client()
+ .await
+ .expect("Failed to create MongoDB client");
+
+ let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+ let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+ let collection = mongo_client
+ .database(DEFAULT_TEST_DATABASE)
+ .collection::<Document>(DEFAULT_SINK_COLLECTION);
+
+ collection
+ .insert_one(doc! {
+ "_id": build_expected_document_id(2),
+ "seed_marker": "preseeded-stable",
+ "payload": "preseeded"
+ })
+ .await
+ .expect("Failed to pre-seed duplicate key baseline");
+
+ let mut messages: Vec<IggyMessage> = vec![
+ IggyMessage::builder()
+ .id(1)
+ .payload(Bytes::from_static(b"ordered-accounting-1"))
+ .build()
+ .expect("Failed to build message 1"),
+ IggyMessage::builder()
+ .id(2)
+ .payload(Bytes::from_static(b"ordered-accounting-2"))
+ .build()
+ .expect("Failed to build message 2"),
+ IggyMessage::builder()
+ .id(3)
+ .payload(Bytes::from_static(b"ordered-accounting-3"))
+ .build()
+ .expect("Failed to build message 3"),
+ ];
+
+ client
+ .send_messages(
+ &stream_id,
+ &topic_id,
+ &Partitioning::partition_id(0),
+ &mut messages,
+ )
+ .await
+ .expect("Failed to send ordered duplicate batch");
+
+ let mut id1_inserted = false;
+ let mut id3_inserted = false;
+ let mut sink_last_error = false;
+ let mut sink_status = ConnectorStatus::Stopped;
+ let mut processed_messages = u64::MAX;
+ let mut sink_errors = 0_u64;
+
+ for _ in 0..POLL_ATTEMPTS {
+ id1_inserted = collection
+ .find_one(doc! { "_id": build_expected_document_id(1) })
+ .await
+ .expect("Failed to query _id=1")
+ .is_some();
+ id3_inserted = collection
+ .find_one(doc! { "_id": build_expected_document_id(3) })
+ .await
+ .expect("Failed to query _id=3")
+ .is_some();
+
+ let sinks_response = http_client
+ .get(format!("{}/sinks", api_address))
+ .send()
+ .await
+ .expect("Failed to fetch /sinks");
+ assert_eq!(sinks_response.status(), 200);
+ let sinks: Vec<SinkInfoResponse> =
sinks_response.json().await.expect("Invalid sinks JSON");
+ let sink = sinks
+ .iter()
+ .find(|s| s.key == MONGODB_SINK_KEY)
+ .expect("MongoDB sink not found in /sinks");
+ sink_last_error = sink.last_error.is_some();
+ sink_status = sink.status;
+
+ let stats_response = http_client
+ .get(format!("{}/stats", api_address))
+ .send()
+ .await
+ .expect("Failed to fetch /stats");
+ assert_eq!(stats_response.status(), 200);
+ let stats: ConnectorRuntimeStats =
stats_response.json().await.expect("Invalid stats JSON");
+ let sink_stats = stats
+ .connectors
+ .iter()
+ .find(|c| c.key == MONGODB_SINK_KEY)
+ .expect("MongoDB sink stats not found");
+ processed_messages = sink_stats
+ .messages_processed
+ .expect("messages_processed must be present for sink stats");
+ sink_errors = sink_stats.errors;
+
+ if id1_inserted
+ && id3_inserted
+ && !sink_last_error
+ && sink_status == ConnectorStatus::Running
+ {
+ break;
+ }
+
+ sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
+ }
+
+ assert!(id1_inserted, "Prefix write (_id=1) must be inserted");
+ assert!(
+ id3_inserted,
+ "Suffix write (_id=3) should be inserted with unordered duplicate
handling"
+ );
+
+ let preseeded_after = collection
+ .find_one(doc! { "_id": build_expected_document_id(2) })
+ .await
+ .expect("Failed to query pre-seeded _id=2")
+ .expect("Pre-seeded _id=2 document must remain");
+ assert_eq!(
+ preseeded_after
+ .get_str("seed_marker")
+ .expect("seed_marker missing on pre-seeded document"),
+ "preseeded-stable",
+ "Pre-seeded duplicate target must remain unchanged"
+ );
+
+ let total_docs = collection
+ .count_documents(doc! {})
+ .await
+ .expect("Failed to count total collection documents");
+ assert_eq!(
+ total_docs, 3,
+ "Expected exact collection accounting (preseed + two successful
inserts)"
+ );
+
+ let duplicate_docs = collection
+ .count_documents(doc! { "_id": build_expected_document_id(2) })
+ .await
+ .expect("Failed to count duplicate _id entries");
+ assert_eq!(duplicate_docs, 1, "Duplicate _id must remain single");
+
+ assert!(
+ !sink_last_error,
+ "Duplicate collision should not produce sink last_error"
+ );
+ assert_eq!(
+ sink_status,
+ ConnectorStatus::Running,
+ "Sink should remain Running on idempotent duplicate handling"
+ );
+ assert_eq!(
+ sink_errors, 0,
+ "Duplicate handling should not increment runtime errors"
+ );
+ assert_eq!(
+ processed_messages, 3,
+ "Idempotent duplicate handling should keep runtime batch processed
count"
+ );
+}
+
+#[iggy_harness(
+ server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
+ seed = seeds::connector_stream
+)]
+async fn schema_validation_mid_batch_surfaces_hard_error_and_partial_prefix(
+ harness: &TestHarness,
+ fixture: MongoDbSinkFixture,
+) {
+ let client = harness.root_client().await.unwrap();
+ let mongo_client = fixture
+ .create_client()
+ .await
+ .expect("Failed to create MongoDB client");
+
+ let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+ let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+ let database = mongo_client.database(DEFAULT_TEST_DATABASE);
+ let collection = database.collection::<Document>(DEFAULT_SINK_COLLECTION);
+
+ // Enforce that only the first message in this batch is valid.
+ // iggy_offset=0 passes; iggy_offset>=1 fails validation.
+ database
+ .run_command(doc! {
+ "create": DEFAULT_SINK_COLLECTION,
+ "validator": {
+ "iggy_offset": { "$lt": 1 }
+ },
+ "validationLevel": "strict",
+ "validationAction": "error"
+ })
+ .await
+ .expect("Failed to create collection with schema validator");
+
+ let mut messages: Vec<IggyMessage> = vec![
+ IggyMessage::builder()
+ .id(11)
+ .payload(Bytes::from_static(b"schema-validation-1"))
+ .build()
+ .expect("Failed to build message 11"),
+ IggyMessage::builder()
+ .id(12)
+ .payload(Bytes::from_static(b"schema-validation-2"))
+ .build()
+ .expect("Failed to build message 12"),
+ IggyMessage::builder()
+ .id(13)
+ .payload(Bytes::from_static(b"schema-validation-3"))
+ .build()
+ .expect("Failed to build message 13"),
+ ];
+
+ client
+ .send_messages(
+ &stream_id,
+ &topic_id,
+ &Partitioning::partition_id(0),
+ &mut messages,
+ )
+ .await
+ .expect("Failed to send schema-validation batch");
+
+ let mut id11_inserted = false;
+ let mut id12_inserted = true;
+ let mut id13_inserted = true;
+
+ for _ in 0..POLL_ATTEMPTS {
+ id11_inserted = collection
+ .find_one(doc! { "_id": build_expected_document_id(11) })
+ .await
+ .expect("Failed to query _id=11")
+ .is_some();
+ id12_inserted = collection
+ .find_one(doc! { "_id": build_expected_document_id(12) })
+ .await
+ .expect("Failed to query _id=12")
+ .is_some();
+ id13_inserted = collection
+ .find_one(doc! { "_id": build_expected_document_id(13) })
+ .await
+ .expect("Failed to query _id=13")
+ .is_some();
+
+ if id11_inserted && !id12_inserted && !id13_inserted {
+ break;
+ }
+
+ sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
+ }
+
+ assert!(
+ id11_inserted,
+ "Expected prefix message (_id=11) to be inserted before schema
validation failure"
+ );
+ assert!(
+ !id12_inserted,
+ "Expected mid-batch message (_id=12) to fail schema validation and not
insert"
+ );
+ assert!(
+ !id13_inserted,
+ "Expected suffix message (_id=13) to be skipped after ordered schema
validation failure"
+ );
+
+ let total_docs = collection
+ .count_documents(doc! {})
+ .await
+ .expect("Failed to count documents after schema validation test");
+ assert_eq!(
+ total_docs, 1,
+ "Expected exact prefix-only write accounting under schema validation
failure"
+ );
+}
+
+#[iggy_harness(
+ server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
+ seed = seeds::connector_stream
+)]
+async fn write_concern_timeout_does_not_report_full_success(
+ harness: &TestHarness,
+ fixture: MongoDbSinkWriteConcernFixture,
+) {
+ let client = harness.root_client().await.unwrap();
+ let mongo_client = fixture
+ .create_client()
+ .await
+ .expect("Failed to create MongoDB client");
+
+ let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+ let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+ let collection = mongo_client
+ .database(DEFAULT_TEST_DATABASE)
+ .collection::<Document>(DEFAULT_SINK_COLLECTION);
+
+ let mut messages: Vec<IggyMessage> = vec![
+ IggyMessage::builder()
+ .id(101)
+ .payload(Bytes::from_static(b"wc-timeout-1"))
+ .build()
+ .expect("Failed to build message 101"),
+ IggyMessage::builder()
+ .id(102)
+ .payload(Bytes::from_static(b"wc-timeout-2"))
+ .build()
+ .expect("Failed to build message 102"),
+ IggyMessage::builder()
+ .id(103)
+ .payload(Bytes::from_static(b"wc-timeout-3"))
+ .build()
+ .expect("Failed to build message 103"),
+ ];
+
+ client
+ .send_messages(
+ &stream_id,
+ &topic_id,
+ &Partitioning::partition_id(0),
+ &mut messages,
+ )
+ .await
+ .expect("Failed to send write concern timeout batch");
+
+ let mut total_docs = 0_u64;
+ let mut id101_inserted = false;
+ let mut id102_inserted = false;
+ let mut id103_inserted = false;
+
+ for _ in 0..POLL_ATTEMPTS {
+ total_docs = collection
+ .count_documents(doc! {})
+ .await
+ .expect("Failed to count documents after write concern timeout");
+ id101_inserted = collection
+ .find_one(doc! { "_id": build_expected_document_id(101) })
+ .await
+ .expect("Failed to query _id=101")
+ .is_some();
+ id102_inserted = collection
+ .find_one(doc! { "_id": build_expected_document_id(102) })
+ .await
+ .expect("Failed to query _id=102")
+ .is_some();
+ id103_inserted = collection
+ .find_one(doc! { "_id": build_expected_document_id(103) })
+ .await
+ .expect("Failed to query _id=103")
+ .is_some();
+
+ if total_docs == 3 && id101_inserted && id102_inserted &&
id103_inserted {
+ break;
+ }
+
+ sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
+ }
+
+ assert_eq!(
+ total_docs, 3,
+ "write concern timeout should still leave exactly one persisted
document per message"
+ );
+ assert!(
+ id101_inserted && id102_inserted && id103_inserted,
+ "write concern timeout should not lose the primary-side writes for
this batch"
+ );
+}
+
+#[iggy_harness(
+ server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
+ seed = seeds::connector_stream
+)]
+async fn retryable_write_failover_keeps_single_doc_per_id(
+ harness: &TestHarness,
+ fixture: MongoDbSinkFailpointFixture,
+) {
+ let client = harness.root_client().await.unwrap();
+ let api_address = harness
+ .connectors_runtime()
+ .expect("connector runtime should be available")
+ .http_url();
+ let http_client = HttpClient::new();
+ let mongo_client = fixture
+ .create_client()
+ .await
+ .expect("Failed to create MongoDB client");
+
+ fixture
+ .configure_fail_command_once(
+ &mongo_client,
+ doc! {
+ "failCommands": ["insert"],
+ "closeConnection": true
+ },
+ )
+ .await
+ .expect("Failed to configure failpoint for retryable failover
simulation");
+
+ let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+ let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+ let collection = mongo_client
+ .database(DEFAULT_TEST_DATABASE)
+ .collection::<Document>(DEFAULT_SINK_COLLECTION);
+
+ let mut messages: Vec<IggyMessage> = vec![
+ IggyMessage::builder()
+ .id(201)
+ .payload(Bytes::from_static(b"retryable-failover-1"))
+ .build()
+ .expect("Failed to build message 201"),
+ IggyMessage::builder()
+ .id(202)
+ .payload(Bytes::from_static(b"retryable-failover-2"))
+ .build()
+ .expect("Failed to build message 202"),
+ IggyMessage::builder()
+ .id(203)
+ .payload(Bytes::from_static(b"retryable-failover-3"))
+ .build()
+ .expect("Failed to build message 203"),
+ ];
+
+ client
+ .send_messages(
+ &stream_id,
+ &topic_id,
+ &Partitioning::partition_id(0),
+ &mut messages,
+ )
+ .await
+ .expect("Failed to send retryable-failover batch");
+
+ let mut sink_last_error = true;
+ let mut sink_status = ConnectorStatus::Stopped;
+ let mut processed_messages = 0_u64;
+ let mut sink_errors = u64::MAX;
+ let mut total_docs = 0_u64;
+
+ for _ in 0..POLL_ATTEMPTS {
+ total_docs = collection
+ .count_documents(doc! {})
+ .await
+ .expect("Failed to count documents after retryable failover");
+
+ let sinks_response = http_client
+ .get(format!("{}/sinks", api_address))
+ .send()
+ .await
+ .expect("Failed to fetch /sinks");
+ assert_eq!(sinks_response.status(), 200);
+ let sinks: Vec<SinkInfoResponse> =
sinks_response.json().await.expect("Invalid sinks JSON");
+ let sink = sinks
+ .iter()
+ .find(|s| s.key == MONGODB_SINK_KEY)
+ .expect("MongoDB sink not found in /sinks");
+ sink_last_error = sink.last_error.is_some();
+ sink_status = sink.status;
+
+ let stats_response = http_client
+ .get(format!("{}/stats", api_address))
+ .send()
+ .await
+ .expect("Failed to fetch /stats");
+ assert_eq!(stats_response.status(), 200);
+ let stats: ConnectorRuntimeStats =
stats_response.json().await.expect("Invalid stats JSON");
+ let sink_stats = stats
+ .connectors
+ .iter()
+ .find(|c| c.key == MONGODB_SINK_KEY)
+ .expect("MongoDB sink stats not found");
+ processed_messages = sink_stats
+ .messages_processed
+ .expect("messages_processed must be present for sink stats");
+ sink_errors = sink_stats.errors;
+
+ if total_docs == 3 && !sink_last_error && processed_messages == 3 &&
sink_errors == 0 {
+ break;
+ }
+
+ sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
+ }
+
+ let _ = fixture.disable_fail_command(&mongo_client).await;
+
+ assert_eq!(
+ total_docs, 3,
+ "expected exact document count after retryable failover retry"
+ );
+ assert!(
+ !sink_last_error,
+ "retryable failover path must recover without leaving sink in error
state"
+ );
+ assert_eq!(
+ sink_status,
+ ConnectorStatus::Running,
+ "sink should remain Running after successful retryable failover
recovery"
+ );
+ assert_eq!(
+ sink_errors, 0,
+ "retryable failover recovery should not increment runtime errors"
+ );
+ assert_eq!(
+ processed_messages, 3,
+ "successful retryable failover recovery must count full batch as
processed"
+ );
+
+ for id in [201_u128, 202, 203] {
+ let count = collection
+ .count_documents(doc! { "_id": build_expected_document_id(id) })
+ .await
+ .expect("Failed to count per-id documents after retryable
failover");
+ assert_eq!(count, 1, "Expected exactly one document for _id={id}");
+ }
+}
+
+#[iggy_harness(
+ server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
+ seed = seeds::connector_stream
+)]
+async fn no_writes_performed_label_path_preserves_state_accuracy(
+ harness: &TestHarness,
+ fixture: MongoDbSinkFailpointFixture,
+) {
+ let client = harness.root_client().await.unwrap();
+ let api_address = harness
+ .connectors_runtime()
+ .expect("connector runtime should be available")
+ .http_url();
+ let http_client = HttpClient::new();
+ let mongo_client = fixture
+ .create_client()
+ .await
+ .expect("Failed to create MongoDB client");
+
+ fixture
+ .configure_fail_command_once(
+ &mongo_client,
+ doc! {
+ "failCommands": ["insert"],
+ "errorCode": 13,
+ "errorLabels": ["RetryableWriteError", "NoWritesPerformed"]
+ },
+ )
+ .await
+ .expect("Failed to configure failpoint with NoWritesPerformed label");
+
+ let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+ let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+ let collection = mongo_client
+ .database(DEFAULT_TEST_DATABASE)
+ .collection::<Document>(DEFAULT_SINK_COLLECTION);
+
+ let mut messages: Vec<IggyMessage> = vec![
+ IggyMessage::builder()
+ .id(301)
+ .payload(Bytes::from_static(b"no-writes-performed-1"))
+ .build()
+ .expect("Failed to build message 301"),
+ IggyMessage::builder()
+ .id(302)
+ .payload(Bytes::from_static(b"no-writes-performed-2"))
+ .build()
+ .expect("Failed to build message 302"),
+ IggyMessage::builder()
+ .id(303)
+ .payload(Bytes::from_static(b"no-writes-performed-3"))
+ .build()
+ .expect("Failed to build message 303"),
+ ];
+
+ client
+ .send_messages(
+ &stream_id,
+ &topic_id,
+ &Partitioning::partition_id(0),
+ &mut messages,
+ )
+ .await
+ .expect("Failed to send NoWritesPerformed label batch");
+
+ let mut sink_last_error = true;
+ let mut sink_status = ConnectorStatus::Stopped;
+ let mut processed_messages = 0_u64;
+ let mut sink_errors = u64::MAX;
+ let mut total_docs = 0_u64;
+
+ for _ in 0..POLL_ATTEMPTS {
+ total_docs = collection
+ .count_documents(doc! {})
+ .await
+ .expect("Failed to count documents for NoWritesPerformed path");
+
+ let sinks_response = http_client
+ .get(format!("{}/sinks", api_address))
+ .send()
+ .await
+ .expect("Failed to fetch /sinks");
+ assert_eq!(sinks_response.status(), 200);
+ let sinks: Vec<SinkInfoResponse> =
sinks_response.json().await.expect("Invalid sinks JSON");
+ let sink = sinks
+ .iter()
+ .find(|s| s.key == MONGODB_SINK_KEY)
+ .expect("MongoDB sink not found in /sinks");
+ sink_last_error = sink.last_error.is_some();
+ sink_status = sink.status;
+
+ let stats_response = http_client
+ .get(format!("{}/stats", api_address))
+ .send()
+ .await
+ .expect("Failed to fetch /stats");
+ assert_eq!(stats_response.status(), 200);
+ let stats: ConnectorRuntimeStats =
stats_response.json().await.expect("Invalid stats JSON");
+ let sink_stats = stats
+ .connectors
+ .iter()
+ .find(|c| c.key == MONGODB_SINK_KEY)
+ .expect("MongoDB sink stats not found");
+ processed_messages = sink_stats
+ .messages_processed
+ .expect("messages_processed must be present for sink stats");
+ sink_errors = sink_stats.errors;
+
+ if total_docs == 3 && !sink_last_error && processed_messages == 3 &&
sink_errors == 0 {
+ break;
+ }
+
+ sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
+ }
+
+ let _ = fixture.disable_fail_command(&mongo_client).await;
+
+ assert_eq!(
+ total_docs, 3,
+ "NoWritesPerformed path should result in exactly one committed batch
after retry"
+ );
+ assert!(
+ !sink_last_error,
+ "NoWritesPerformed retry path should not leave sink in error state"
+ );
+ assert_eq!(
+ sink_status,
+ ConnectorStatus::Running,
+ "sink should remain Running after NoWritesPerformed retry recovery"
+ );
+ assert_eq!(
+ sink_errors, 0,
+ "NoWritesPerformed retry recovery should not increment runtime errors"
+ );
+ assert_eq!(
+ processed_messages, 3,
+ "NoWritesPerformed retry recovery must count full batch as processed"
+ );
+
+ for id in [301_u128, 302, 303] {
+ let count = collection
+ .count_documents(doc! { "_id": build_expected_document_id(id) })
+ .await
+ .expect("Failed to count per-id documents for NoWritesPerformed
path");
+ assert_eq!(count, 1, "Expected exactly one document for _id={id}");
+ }
+}
+
+#[iggy_harness(
+ server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
+ seed = seeds::connector_stream
+)]
+async fn auto_create_collection_on_open(
+ harness: &TestHarness,
+ fixture: MongoDbSinkAutoCreateFixture,
+) {
+ let mongo_client = fixture
+ .create_client()
+ .await
+ .expect("Failed to create MongoDB client");
+
+ // The connector's open() creates the collection. Poll until it appears.
+ // No messages are sent in this test.
+ let mut found = false;
+ for _ in 0..POLL_ATTEMPTS {
+ if fixture
+ .collection_exists(&mongo_client, DEFAULT_SINK_COLLECTION)
+ .await
+ .unwrap_or(false)
+ {
+ found = true;
+ break;
+ }
+ sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
+ }
+
+ assert!(
+ found,
+ "Collection '{DEFAULT_SINK_COLLECTION}' was not created by open()
within timeout"
+ );
+
+ // No messages sent -- collection should be empty.
+ let count = fixture
+ .count_documents_in_collection(&mongo_client, DEFAULT_SINK_COLLECTION)
+ .await
+ .expect("Failed to count");
+ assert_eq!(
+ count, 0,
+ "Collection should be empty after open() with no messages"
+ );
+
+ // Suppress unused harness warning.
+ let _ = harness;
+}
diff --git a/core/integration/tests/connectors/mongodb/sink.toml
b/core/integration/tests/connectors/mongodb/sink.toml
new file mode 100644
index 000000000..339ab38a6
--- /dev/null
+++ b/core/integration/tests/connectors/mongodb/sink.toml
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[connectors]
+config_type = "local"
+config_dir = "../connectors/sinks/mongodb_sink"