This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new f91486031 feat(connectors): add MongoDB sink connector (#2815)
f91486031 is described below

commit f91486031c8825cf02eae5924760e3a80e0ec781
Author: amuldotexe <[email protected]>
AuthorDate: Tue Mar 10 01:25:42 2026 +0530

    feat(connectors): add MongoDB sink connector (#2815)
    
    Co-authored-by: amuldotexe <[email protected]>
    Co-authored-by: Grzegorz Koszyk 
<[email protected]>
    Co-authored-by: Maciej Modzelewski <[email protected]>
    Co-authored-by: Hubert Gruszecki <[email protected]>
---
 Cargo.lock                                         |  319 ++++++
 Cargo.toml                                         |    1 +
 DEPENDENCIES.md                                    |   23 +
 core/connectors/runtime/src/sink.rs                |    3 +-
 core/connectors/sdk/src/lib.rs                     |    6 +
 core/connectors/sdk/src/sink.rs                    |    3 +-
 core/connectors/sinks/mongodb_sink/Cargo.toml      |   42 +
 core/connectors/sinks/mongodb_sink/README.md       |  146 +++
 core/connectors/sinks/mongodb_sink/config.toml     |   46 +
 core/connectors/sinks/mongodb_sink/src/lib.rs      |  912 +++++++++++++++
 core/integration/Cargo.toml                        |    1 +
 core/integration/tests/connectors/fixtures/mod.rs  |    5 +
 .../tests/connectors/fixtures/mongodb/container.rs |  244 ++++
 .../tests/connectors/fixtures/{ => mongodb}/mod.rs |   19 +-
 .../tests/connectors/fixtures/mongodb/sink.rs      |  390 +++++++
 core/integration/tests/connectors/mod.rs           |    1 +
 .../tests/connectors/{fixtures => mongodb}/mod.rs  |   18 +-
 .../tests/connectors/mongodb/mongodb_sink.rs       | 1162 ++++++++++++++++++++
 .../integration/tests/connectors/mongodb/sink.toml |   20 +
 19 files changed, 3330 insertions(+), 31 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 23b34e3a8..80f8b9554 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1507,6 +1507,29 @@ dependencies = [
  "alloc-stdlib",
 ]
 
+[[package]]
+name = "bson"
+version = "2.15.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "7969a9ba84b0ff843813e7249eed1678d9b6607ce5a3b8f0a47af3fcf7978e6e"
+dependencies = [
+ "ahash 0.8.12",
+ "base64 0.22.1",
+ "bitvec",
+ "getrandom 0.2.17",
+ "getrandom 0.3.4",
+ "hex",
+ "indexmap 2.13.0",
+ "js-sys",
+ "once_cell",
+ "rand 0.9.2",
+ "serde",
+ "serde_bytes",
+ "serde_json",
+ "time",
+ "uuid",
+]
+
 [[package]]
 name = "bstr"
 version = "1.12.1"
@@ -2984,6 +3007,28 @@ dependencies = [
  "syn 2.0.115",
 ]
 
+[[package]]
+name = "derive-syn-parse"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "d65d7ce8132b7c0e54497a4d9a55a1c2a0912a0d786cf894472ba818fba45762"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.115",
+]
+
+[[package]]
+name = "derive-where"
+version = "1.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "ef941ded77d15ca19b40374869ac6000af1c9f2a4c0f3d4c70926287e6364a8f"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.115",
+]
+
 [[package]]
 name = "derive_builder"
 version = "0.20.2"
@@ -3332,6 +3377,18 @@ dependencies = [
  "cfg-if",
 ]
 
+[[package]]
+name = "enum-as-inner"
+version = "0.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a1e6a265c649f3f5979b601d26f1d05ada116434c87741c9493cb56218f76cbc"
+dependencies = [
+ "heck",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.115",
+]
+
 [[package]]
 name = "enum_dispatch"
 version = "0.3.13"
@@ -4534,6 +4591,52 @@ version = "0.4.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
 
+[[package]]
+name = "hickory-proto"
+version = "0.25.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f8a6fe56c0038198998a6f217ca4e7ef3a5e51f46163bd6dd60b5c71ca6c6502"
+dependencies = [
+ "async-trait",
+ "cfg-if",
+ "data-encoding",
+ "enum-as-inner",
+ "futures-channel",
+ "futures-io",
+ "futures-util",
+ "idna",
+ "ipnet",
+ "once_cell",
+ "rand 0.9.2",
+ "ring",
+ "thiserror 2.0.18",
+ "tinyvec",
+ "tokio",
+ "tracing",
+ "url",
+]
+
+[[package]]
+name = "hickory-resolver"
+version = "0.25.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "dc62a9a99b0bfb44d2ab95a7208ac952d31060efc16241c87eaf36406fecf87a"
+dependencies = [
+ "cfg-if",
+ "futures-util",
+ "hickory-proto",
+ "ipconfig",
+ "moka",
+ "once_cell",
+ "parking_lot",
+ "rand 0.9.2",
+ "resolv-conf",
+ "smallvec",
+ "thiserror 2.0.18",
+ "tokio",
+ "tracing",
+]
+
 [[package]]
 name = "hkdf"
 version = "0.12.4"
@@ -5321,6 +5424,20 @@ dependencies = [
  "uuid",
 ]
 
+[[package]]
+name = "iggy_connector_mongodb_sink"
+version = "0.3.0"
+dependencies = [
+ "async-trait",
+ "humantime",
+ "iggy_connector_sdk",
+ "mongodb",
+ "serde",
+ "serde_json",
+ "tokio",
+ "tracing",
+]
+
 [[package]]
 name = "iggy_connector_postgres_sink"
 version = "0.3.1-edge.1"
@@ -5640,6 +5757,7 @@ dependencies = [
  "keyring",
  "lazy_static",
  "libc",
+ "mongodb",
  "once_cell",
  "predicates",
  "rand 0.10.0",
@@ -5710,6 +5828,18 @@ dependencies = [
  "rustix 1.1.3",
 ]
 
+[[package]]
+name = "ipconfig"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f"
+dependencies = [
+ "socket2 0.5.10",
+ "widestring",
+ "windows-sys 0.48.0",
+ "winreg",
+]
+
 [[package]]
 name = "ipnet"
 version = "2.11.0"
@@ -6304,6 +6434,54 @@ dependencies = [
  "twox-hash",
 ]
 
+[[package]]
+name = "macro_magic"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "cc33f9f0351468d26fbc53d9ce00a096c8522ecb42f19b50f34f2c422f76d21d"
+dependencies = [
+ "macro_magic_core",
+ "macro_magic_macros",
+ "quote",
+ "syn 2.0.115",
+]
+
+[[package]]
+name = "macro_magic_core"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1687dc887e42f352865a393acae7cf79d98fab6351cde1f58e9e057da89bf150"
+dependencies = [
+ "const-random",
+ "derive-syn-parse",
+ "macro_magic_core_macros",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.115",
+]
+
+[[package]]
+name = "macro_magic_core_macros"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b02abfe41815b5bd98dbd4260173db2c116dda171dc0fe7838cb206333b83308"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.115",
+]
+
+[[package]]
+name = "macro_magic_macros"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "73ea28ee64b88876bf45277ed9a5817c1817df061a74f2b988971a12570e5869"
+dependencies = [
+ "macro_magic_core",
+ "quote",
+ "syn 2.0.115",
+]
+
 [[package]]
 name = "macro_rules_attribute"
 version = "0.1.3"
@@ -6517,6 +6695,82 @@ dependencies = [
  "uuid",
 ]
 
+[[package]]
+name = "mongocrypt"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "8da0cd419a51a5fb44819e290fbdb0665a54f21dead8923446a799c7f4d26ad9"
+dependencies = [
+ "bson",
+ "mongocrypt-sys",
+ "once_cell",
+ "serde",
+]
+
+[[package]]
+name = "mongocrypt-sys"
+version = "0.1.5+1.15.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "224484c5d09285a7b8cb0a0c117e847ebd14cb6e4470ecf68cdb89c503b0edb9"
+
+[[package]]
+name = "mongodb"
+version = "3.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "803dd859e8afa084c255a8effd8000ff86f7c8076a50cd6d8c99e8f3496f75c2"
+dependencies = [
+ "base64 0.22.1",
+ "bitflags 2.10.0",
+ "bson",
+ "derive-where",
+ "derive_more",
+ "futures-core",
+ "futures-io",
+ "futures-util",
+ "hex",
+ "hickory-proto",
+ "hickory-resolver",
+ "hmac",
+ "macro_magic",
+ "md-5",
+ "mongocrypt",
+ "mongodb-internal-macros",
+ "pbkdf2",
+ "percent-encoding",
+ "rand 0.9.2",
+ "rustc_version_runtime",
+ "rustls",
+ "rustversion",
+ "serde",
+ "serde_bytes",
+ "serde_with",
+ "sha1",
+ "sha2",
+ "socket2 0.6.2",
+ "stringprep",
+ "strsim",
+ "take_mut",
+ "thiserror 2.0.18",
+ "tokio",
+ "tokio-rustls",
+ "tokio-util",
+ "typed-builder 0.22.0",
+ "uuid",
+ "webpki-roots 1.0.6",
+]
+
+[[package]]
+name = "mongodb-internal-macros"
+version = "3.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a973ef3dd3dbc6f6e65bbdecfd9ec5e781b9e7493b0f369a7c62e35d8e5ae2c8"
+dependencies = [
+ "macro_magic",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.115",
+]
+
 [[package]]
 name = "moxcms"
 version = "0.7.11"
@@ -6910,6 +7164,10 @@ name = "once_cell"
 version = "1.21.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
+dependencies = [
+ "critical-section",
+ "portable-atomic",
+]
 
 [[package]]
 name = "once_cell_polyfill"
@@ -7328,6 +7586,15 @@ version = "0.2.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "b867cad97c0791bbd3aaa6472142568c6c9e8f71937e98379f584cfb0cf35bec"
 
+[[package]]
+name = "pbkdf2"
+version = "0.12.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2"
+dependencies = [
+ "digest",
+]
+
 [[package]]
 name = "pear"
 version = "0.2.9"
@@ -8439,6 +8706,12 @@ dependencies = [
  "tracing",
 ]
 
+[[package]]
+name = "resolv-conf"
+version = "0.7.6"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1e061d1b48cb8d38042de4ae0a7a6401009d6143dc80d2e2d6f31f0bdd6470c7"
+
 [[package]]
 name = "resvg"
 version = "0.45.1"
@@ -8724,6 +8997,16 @@ dependencies = [
  "semver",
 ]
 
+[[package]]
+name = "rustc_version_runtime"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "2dd18cd2bae1820af0b6ad5e54f4a51d0f3fcc53b05f845675074efcc7af071d"
+dependencies = [
+ "rustc_version",
+ "semver",
+]
+
 [[package]]
 name = "rusticata-macros"
 version = "4.1.0"
@@ -10109,6 +10392,12 @@ version = "0.2.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
 
+[[package]]
+name = "take_mut"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60"
+
 [[package]]
 name = "tap"
 version = "1.0.1"
@@ -10880,6 +11169,15 @@ dependencies = [
  "typed-builder-macro 0.20.1",
 ]
 
+[[package]]
+name = "typed-builder"
+version = "0.22.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "398a3a3c918c96de527dc11e6e846cd549d4508030b8a33e1da12789c856b81a"
+dependencies = [
+ "typed-builder-macro 0.22.0",
+]
+
 [[package]]
 name = "typed-builder"
 version = "0.23.2"
@@ -10900,6 +11198,17 @@ dependencies = [
  "syn 2.0.115",
 ]
 
+[[package]]
+name = "typed-builder-macro"
+version = "0.22.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "0e48cea23f68d1f78eb7bc092881b6bb88d3d6b5b7e6234f6f9c911da1ffb221"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.115",
+]
+
 [[package]]
 name = "typed-builder-macro"
 version = "0.23.2"
@@ -12109,6 +12418,16 @@ dependencies = [
  "memchr",
 ]
 
+[[package]]
+name = "winreg"
+version = "0.50.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1"
+dependencies = [
+ "cfg-if",
+ "windows-sys 0.48.0",
+]
+
 [[package]]
 name = "winsafe"
 version = "0.0.19"
diff --git a/Cargo.toml b/Cargo.toml
index e0140f2b2..d49abb763 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -35,6 +35,7 @@ members = [
     "core/connectors/sdk",
     "core/connectors/sinks/elasticsearch_sink",
     "core/connectors/sinks/iceberg_sink",
+    "core/connectors/sinks/mongodb_sink",
     "core/connectors/sinks/postgres_sink",
     "core/connectors/sinks/quickwit_sink",
     "core/connectors/sinks/stdout_sink",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 31c1302ff..557e24288 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -122,6 +122,7 @@ borsh: 1.6.0, "Apache-2.0 OR MIT",
 borsh-derive: 1.6.0, "Apache-2.0",
 brotli: 8.0.2, "BSD-3-Clause AND MIT",
 brotli-decompressor: 5.0.0, "BSD-3-Clause OR MIT",
+bson: 2.15.0, "MIT",
 bstr: 1.12.1, "Apache-2.0 OR MIT",
 built: 0.8.0, "MIT",
 bumpalo: 3.19.1, "Apache-2.0 OR MIT",
@@ -254,6 +255,8 @@ der: 0.7.10, "Apache-2.0 OR MIT",
 der-parser: 10.0.0, "Apache-2.0 OR MIT",
 deranged: 0.5.6, "Apache-2.0 OR MIT",
 derive-new: 0.7.0, "MIT",
+derive-syn-parse: 0.2.0, "Apache-2.0 OR MIT",
+derive-where: 1.6.0, "Apache-2.0 OR MIT",
 derive_builder: 0.20.2, "Apache-2.0 OR MIT",
 derive_builder_core: 0.20.2, "Apache-2.0 OR MIT",
 derive_builder_macro: 0.20.2, "Apache-2.0 OR MIT",
@@ -289,6 +292,7 @@ embedded-io: 0.4.0, "Apache-2.0 OR MIT",
 embedded-io: 0.6.1, "Apache-2.0 OR MIT",
 encode_unicode: 1.0.0, "Apache-2.0 OR MIT",
 encoding_rs: 0.8.35, "(Apache-2.0 OR MIT) AND BSD-3-Clause",
+enum-as-inner: 0.6.1, "Apache-2.0 OR MIT",
 enum_dispatch: 0.3.13, "Apache-2.0 OR MIT",
 enumset: 1.1.10, "Apache-2.0 OR MIT",
 enumset_derive: 0.14.0, "Apache-2.0 OR MIT",
@@ -402,6 +406,8 @@ heapless: 0.7.17, "Apache-2.0 OR MIT",
 heck: 0.5.0, "Apache-2.0 OR MIT",
 hermit-abi: 0.5.2, "Apache-2.0 OR MIT",
 hex: 0.4.3, "Apache-2.0 OR MIT",
+hickory-proto: 0.25.2, "Apache-2.0 OR MIT",
+hickory-resolver: 0.25.2, "Apache-2.0 OR MIT",
 hkdf: 0.12.4, "Apache-2.0 OR MIT",
 hmac: 0.12.1, "Apache-2.0 OR MIT",
 home: 0.5.12, "Apache-2.0 OR MIT",
@@ -450,6 +456,7 @@ iggy_common: 0.9.1-edge.1, "Apache-2.0",
 iggy_connector_elasticsearch_sink: 0.3.1-edge.1, "Apache-2.0",
 iggy_connector_elasticsearch_source: 0.3.1-edge.1, "Apache-2.0",
 iggy_connector_iceberg_sink: 0.3.1-edge.1, "Apache-2.0",
+iggy_connector_mongodb_sink: 0.3.0, "Apache-2.0",
 iggy_connector_postgres_sink: 0.3.1-edge.1, "Apache-2.0",
 iggy_connector_postgres_source: 0.3.1-edge.1, "Apache-2.0",
 iggy_connector_quickwit_sink: 0.3.1-edge.1, "Apache-2.0",
@@ -478,6 +485,7 @@ interpolate_name: 0.2.4, "MIT",
 inventory: 0.3.21, "Apache-2.0 OR MIT",
 io-uring: 0.7.11, "Apache-2.0 OR MIT",
 io_uring_buf_ring: 0.2.3, "MIT",
+ipconfig: 0.3.2, "Apache-2.0 OR MIT",
 ipnet: 2.11.0, "Apache-2.0 OR MIT",
 iri-string: 0.7.10, "Apache-2.0 OR MIT",
 is_terminal_polyfill: 1.70.2, "Apache-2.0 OR MIT",
@@ -542,6 +550,10 @@ loom: 0.7.2, "MIT",
 loop9: 0.1.5, "MIT",
 lru-slab: 0.1.2, "Apache-2.0 OR MIT OR Zlib",
 lz4_flex: 0.12.0, "MIT",
+macro_magic: 0.5.1, "MIT",
+macro_magic_core: 0.5.1, "MIT",
+macro_magic_core_macros: 0.5.1, "MIT",
+macro_magic_macros: 0.5.1, "MIT",
 macro_rules_attribute: 0.1.3, "MIT",
 macro_rules_attribute-proc_macro: 0.1.3, "MIT",
 matchers: 0.2.0, "MIT",
@@ -563,6 +575,10 @@ mio: 1.1.1, "MIT",
 mockall: 0.14.0, "Apache-2.0 OR MIT",
 mockall_derive: 0.14.0, "Apache-2.0 OR MIT",
 moka: 0.12.13, "(Apache-2.0 OR MIT) AND Apache-2.0",
+mongocrypt: 0.3.2, "Apache-2.0",
+mongocrypt-sys: 0.1.5+1.15.1, "Apache-2.0",
+mongodb: 3.5.1, "Apache-2.0",
+mongodb-internal-macros: 3.5.1, "Apache-2.0",
 moxcms: 0.7.11, "Apache-2.0 OR BSD-3-Clause",
 murmur3: 0.5.2, "Apache-2.0 OR MIT",
 never-say-never: 6.6.666, "Apache-2.0 OR MIT OR Zlib",
@@ -638,6 +654,7 @@ password-hash: 0.5.0, "Apache-2.0 OR MIT",
 paste: 1.0.15, "Apache-2.0 OR MIT",
 pastey: 0.1.1, "Apache-2.0 OR MIT",
 pastey: 0.2.1, "Apache-2.0 OR MIT",
+pbkdf2: 0.12.2, "Apache-2.0 OR MIT",
 pear: 0.2.9, "Apache-2.0 OR MIT",
 pear_codegen: 0.2.9, "Apache-2.0 OR MIT",
 peg: 0.6.3, "MIT",
@@ -738,6 +755,7 @@ reqwest: 0.12.28, "Apache-2.0 OR MIT",
 reqwest-middleware: 0.4.2, "Apache-2.0 OR MIT",
 reqwest-retry: 0.8.0, "Apache-2.0 OR MIT",
 reqwest-tracing: 0.5.8, "Apache-2.0 OR MIT",
+resolv-conf: 0.7.6, "Apache-2.0 OR MIT",
 resvg: 0.45.1, "Apache-2.0 OR MIT",
 retry-policies: 0.5.1, "Apache-2.0 OR MIT",
 rfc6979: 0.4.0, "Apache-2.0 OR MIT",
@@ -762,6 +780,7 @@ rust-ini: 0.21.3, "MIT",
 rust_decimal: 1.40.0, "MIT",
 rustc-hash: 2.1.1, "Apache-2.0 OR MIT",
 rustc_version: 0.4.1, "Apache-2.0 OR MIT",
+rustc_version_runtime: 0.3.0, "MIT",
 rusticata-macros: 4.1.0, "Apache-2.0 OR MIT",
 rustix: 0.38.44, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT",
 rustix: 1.1.3, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT",
@@ -877,6 +896,7 @@ sys_traits_macros: 0.1.0, "MIT",
 sysinfo: 0.37.2, "MIT",
 sysinfo: 0.38.1, "MIT",
 tagptr: 0.2.0, "Apache-2.0 OR MIT",
+take_mut: 0.2.2, "MIT",
 tap: 1.0.1, "MIT",
 tar: 0.4.44, "Apache-2.0 OR MIT",
 tempfile: 3.25.0, "Apache-2.0 OR MIT",
@@ -943,8 +963,10 @@ ttf-parser: 0.25.1, "Apache-2.0 OR MIT",
 tungstenite: 0.28.0, "Apache-2.0 OR MIT",
 twox-hash: 2.1.2, "MIT",
 typed-builder: 0.20.1, "Apache-2.0 OR MIT",
+typed-builder: 0.22.0, "Apache-2.0 OR MIT",
 typed-builder: 0.23.2, "Apache-2.0 OR MIT",
 typed-builder-macro: 0.20.1, "Apache-2.0 OR MIT",
+typed-builder-macro: 0.22.0, "Apache-2.0 OR MIT",
 typed-builder-macro: 0.23.2, "Apache-2.0 OR MIT",
 typed-path: 0.12.3, "Apache-2.0 OR MIT",
 typenum: 1.19.0, "Apache-2.0 OR MIT",
@@ -1084,6 +1106,7 @@ windows_x86_64_msvc: 0.52.6, "Apache-2.0 OR MIT",
 windows_x86_64_msvc: 0.53.1, "Apache-2.0 OR MIT",
 winnow: 0.5.40, "MIT",
 winnow: 0.7.14, "MIT",
+winreg: 0.50.0, "MIT",
 winsafe: 0.0.19, "MIT",
 wit-bindgen: 0.51.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT",
 wit-bindgen-core: 0.51.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR 
MIT",
diff --git a/core/connectors/runtime/src/sink.rs 
b/core/connectors/runtime/src/sink.rs
index 2074f845c..48961aca6 100644
--- a/core/connectors/runtime/src/sink.rs
+++ b/core/connectors/runtime/src/sink.rs
@@ -20,7 +20,7 @@
 use crate::configs::connectors::SinkConfig;
 use crate::context::RuntimeContext;
 use crate::log::LOG_CALLBACK;
-use crate::metrics::{ConnectorType, Metrics};
+use crate::metrics::Metrics;
 use crate::{
     PLUGIN_ID, RuntimeError, SinkApi, SinkConnector, SinkConnectorConsumer, 
SinkConnectorPlugin,
     SinkConnectorWrapper, resolve_plugin_path, transform,
@@ -324,7 +324,6 @@ pub(crate) async fn consume_messages(
                 error!(
                     "Failed to process {messages_count} messages for sink 
connector with ID: {plugin_id}. {error}",
                 );
-                metrics.increment_errors(plugin_key, ConnectorType::Sink);
                 return Err(error);
             }
         };
diff --git a/core/connectors/sdk/src/lib.rs b/core/connectors/sdk/src/lib.rs
index 72bd781db..8ba37a083 100644
--- a/core/connectors/sdk/src/lib.rs
+++ b/core/connectors/sdk/src/lib.rs
@@ -48,6 +48,12 @@ pub mod transforms;
 pub use log::LogCallback;
 pub use transforms::Transform;
 
+#[doc(hidden)]
+pub mod connector_macro_support {
+    pub use dashmap::DashMap;
+    pub use once_cell::sync::Lazy;
+}
+
 static RUNTIME: OnceCell<Runtime> = OnceCell::new();
 
 pub fn get_runtime() -> &'static Runtime {
diff --git a/core/connectors/sdk/src/sink.rs b/core/connectors/sdk/src/sink.rs
index ad037b6f5..9d21f05ee 100644
--- a/core/connectors/sdk/src/sink.rs
+++ b/core/connectors/sdk/src/sink.rs
@@ -222,8 +222,7 @@ macro_rules! sink_connector {
             assert_trait::<$type>();
         };
 
-        use dashmap::DashMap;
-        use once_cell::sync::Lazy;
+        use $crate::connector_macro_support::{DashMap, Lazy};
         use $crate::LogCallback;
         use $crate::sink::SinkContainer;
 
diff --git a/core/connectors/sinks/mongodb_sink/Cargo.toml 
b/core/connectors/sinks/mongodb_sink/Cargo.toml
new file mode 100644
index 000000000..11a18a40b
--- /dev/null
+++ b/core/connectors/sinks/mongodb_sink/Cargo.toml
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iggy_connector_mongodb_sink"
+version = "0.3.0"
+description = "Iggy MongoDB sink connector for storing stream messages into 
MongoDB database"
+edition = "2024"
+license = "Apache-2.0"
+keywords = ["iggy", "messaging", "streaming", "mongodb", "sink"]
+categories = ["command-line-utilities", "database", "network-programming"]
+homepage = "https://iggy.apache.org";
+documentation = "https://iggy.apache.org/docs";
+repository = "https://github.com/apache/iggy";
+readme = "../../README.md"
+
+[lib]
+crate-type = ["cdylib", "lib"]
+
+[dependencies]
+async-trait = { workspace = true }
+humantime = { workspace = true }
+iggy_connector_sdk = { workspace = true }
+mongodb = { version = "3.0", features = ["rustls-tls"] }
+serde = { workspace = true }
+serde_json = { workspace = true }
+tokio = { workspace = true }
+tracing = { workspace = true }
diff --git a/core/connectors/sinks/mongodb_sink/README.md 
b/core/connectors/sinks/mongodb_sink/README.md
new file mode 100644
index 000000000..550612cc3
--- /dev/null
+++ b/core/connectors/sinks/mongodb_sink/README.md
@@ -0,0 +1,146 @@
+# MongoDB Sink Connector
+
+Consumes messages from Iggy streams and stores them in a MongoDB collection.
+
+## Try It
+
+Send a JSON message through Iggy and see it land in MongoDB.
+
+**Prerequisites**: Docker running, project built (`cargo build` from repo 
root).
+
+```bash
+# Start MongoDB
+docker run -d --name mongo-test -p 27017:27017 mongo:7
+
+# Start iggy-server (terminal 2)
+IGGY_ROOT_USERNAME=iggy IGGY_ROOT_PASSWORD=iggy ./target/debug/iggy-server
+
+# Create stream and topic
+./target/debug/iggy -u iggy -p iggy stream create demo_stream
+./target/debug/iggy -u iggy -p iggy topic create demo_stream demo_topic 1
+
+# Setup connector config
+mkdir -p /tmp/mdb-sink-test/connectors
+cat > /tmp/mdb-sink-test/config.toml << 'TOML'
+[iggy]
+address = "localhost:8090"
+username = "iggy"
+password = "iggy"
+[state]
+path = "/tmp/mdb-sink-test/state"
+[connectors]
+config_type = "local"
+config_dir = "/tmp/mdb-sink-test/connectors"
+TOML
+cat > /tmp/mdb-sink-test/connectors/sink.toml << 'TOML'
+type = "sink"
+key = "mongodb"
+enabled = true
+version = 0
+name = "test"
+path = "target/debug/libiggy_connector_mongodb_sink"
+[[streams]]
+stream = "demo_stream"
+topics = ["demo_topic"]
+schema = "json"
+batch_length = 100
+poll_interval = "100ms"
+consumer_group = "test_cg"
+[plugin_config]
+connection_uri = "mongodb://localhost:27017"
+database = "test_db"
+collection = "messages"
+payload_format = "json"
+auto_create_collection = true
+TOML
+
+# Start connector (terminal 3)
+IGGY_CONNECTORS_CONFIG_PATH=/tmp/mdb-sink-test/config.toml 
./target/debug/iggy-connectors
+
+# Send a message
+./target/debug/iggy -u iggy -p iggy message send demo_stream demo_topic 
'{"hello":"mongodb"}'
+
+# Verify in MongoDB
+docker exec mongo-test mongosh --quiet --eval \
+  'db.getSiblingDB("test_db").messages.find().pretty()'
+```
+
+Expected:
+
+```json
+{ "payload": { "hello": "mongodb" }, "iggy_offset": 0, "iggy_stream": 
"demo_stream" }
+```
+
+Cleanup: `docker rm -f mongo-test && rm -rf /tmp/mdb-sink-test`
+
+## Quick Start
+
+```toml
+[[streams]]
+stream = "demo_stream"
+topics = ["demo_topic"]
+schema = "json"
+batch_length = 100
+poll_interval = "100ms"
+consumer_group = "mongodb_cg"
+
+[plugin_config]
+connection_uri = "mongodb://localhost:27017"
+database = "iggy_data"
+collection = "messages"
+payload_format = "json"
+```
+
+## Configuration
+
+| Option | Default | Description |
+| ------ | ------- | ----------- |
+| `connection_uri` | **required** | MongoDB URI |
+| `database` | **required** | Target database |
+| `collection` | **required** | Target collection |
+| `batch_size` | `100` | Documents per `insertMany` call |
+| `payload_format` | `binary` | `binary`, `json`, or `string` |
+| `include_metadata` | `true` | Add iggy offset, timestamp, stream, topic, 
partition |
+| `include_checksum` | `true` | Add message checksum |
+| `include_origin_timestamp` | `true` | Add origin timestamp |
+| `auto_create_collection` | `false` | Create collection if missing |
+| `max_pool_size` | driver default | Connection pool size |
+| `verbose_logging` | `false` | Log at info instead of debug |
+| `max_retries` | `3` | Retry attempts for transient errors |
+| `retry_delay` | `1s` | Base delay (`retry_delay * attempt`) |
+
+## Testing
+
+Requires Docker. Testcontainers starts MongoDB 7 + iggy-server automatically.
+
+```bash
+cargo test --test mod -- mongodb_sink
+```
+
+This runs 4 E2E tests against a real MongoDB instance:
+
+- `json_messages_sink_to_mongodb` — JSON payloads stored as embedded BSON 
documents
+- `binary_messages_sink_as_bson_binary` — binary payloads stored as BSON Binary
+- `large_batch_processed_correctly` — batch insertion with configurable batch 
size
+- `auto_create_collection_on_open` — collection created automatically when 
missing
+
+Unit tests (no Docker):
+
+```bash
+cargo test -p iggy_connector_mongodb_sink
+```
+
+## Delivery Semantics
+
+This connector provides **at-least-once** delivery semantics.
+
+### Behavior
+
+- Messages may be delivered more than once on retry or restart
+- Uses Iggy message ID as MongoDB `_id` for document identity
+- **Insert-only mode**: duplicate key error is a hard failure (not upsert)
+
+### Known Limitations
+
+- On network timeout during insert, retry may cause duplicate key error
+- Sink does not upsert on duplicate (future improvement)
diff --git a/core/connectors/sinks/mongodb_sink/config.toml 
b/core/connectors/sinks/mongodb_sink/config.toml
new file mode 100644
index 000000000..3b47b4ac1
--- /dev/null
+++ b/core/connectors/sinks/mongodb_sink/config.toml
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+type = "sink"
+key = "mongodb"
+enabled = true
+version = 0
+name = "MongoDB sink"
+path = "../../target/release/libiggy_connector_mongodb_sink"
+verbose = false
+
+[[streams]]
+stream = "user_events"
+topics = ["users", "orders"]
+schema = "json"
+batch_length = 100
+poll_interval = "5ms"
+consumer_group = "mongodb_sink"
+
+[plugin_config]
+connection_uri = "mongodb://localhost:27017"
+database = "iggy_messages"
+collection = "messages"
+batch_size = 100
+include_metadata = true
+include_checksum = true
+include_origin_timestamp = true
+payload_format = "binary"
+auto_create_collection = false
+verbose_logging = false
+max_retries = 3
+retry_delay = "1s"
diff --git a/core/connectors/sinks/mongodb_sink/src/lib.rs 
b/core/connectors/sinks/mongodb_sink/src/lib.rs
new file mode 100644
index 000000000..34b01ca7b
--- /dev/null
+++ b/core/connectors/sinks/mongodb_sink/src/lib.rs
@@ -0,0 +1,912 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_trait::async_trait;
+use humantime::Duration as HumanDuration;
+use iggy_connector_sdk::{
+    ConsumedMessage, Error, MessagesMetadata, Sink, TopicMetadata, 
sink_connector,
+};
+use mongodb::{Client, Collection, bson, options::ClientOptions};
+use serde::{Deserialize, Serialize};
+use std::str::FromStr;
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::time::Duration;
+use tracing::{debug, error, info, warn};
+
+sink_connector!(MongoDbSink);
+
+const DEFAULT_MAX_RETRIES: u32 = 3;
+const DEFAULT_RETRY_DELAY: &str = "1s";
+
+#[derive(Debug)]
+pub struct MongoDbSink {
+    pub id: u32,
+    client: Option<Client>,
+    config: MongoDbSinkConfig,
+    verbose: bool,
+    batch_size: usize,
+    include_metadata: bool,
+    include_checksum: bool,
+    include_origin_timestamp: bool,
+    payload_format: PayloadFormat,
+    max_retries: u32,
+    retry_delay: Duration,
+    messages_processed: AtomicU64,
+    insertion_errors: AtomicU64,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct MongoDbSinkConfig {
+    pub connection_uri: String,
+    pub database: String,
+    pub collection: String,
+    pub max_pool_size: Option<u32>,
+    pub auto_create_collection: Option<bool>,
+    pub batch_size: Option<u32>,
+    pub include_metadata: Option<bool>,
+    pub include_checksum: Option<bool>,
+    pub include_origin_timestamp: Option<bool>,
+    pub payload_format: Option<String>,
+    pub verbose_logging: Option<bool>,
+    pub max_retries: Option<u32>,
+    pub retry_delay: Option<String>,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
+pub enum PayloadFormat {
+    #[default]
+    Binary,
+    Json,
+    String,
+}
+
+impl PayloadFormat {
+    fn from_config(s: Option<&str>) -> Self {
+        let (payload_format, unknown_value) = classify_payload_format_value(s);
+        if let Some(value) = unknown_value {
+            warn!("Unknown MongoDB sink payload format '{value}', defaulting 
to binary");
+        }
+        payload_format
+    }
+}
+
+#[derive(Debug)]
+struct BatchInsertOutcome {
+    inserted_count: u64,
+    error: Option<Error>,
+}
+
+impl MongoDbSink {
+    pub fn new(id: u32, config: MongoDbSinkConfig) -> Self {
+        let verbose = config.verbose_logging.unwrap_or(false);
+        let payload_format = 
PayloadFormat::from_config(config.payload_format.as_deref());
+        let batch_size = config.batch_size.unwrap_or(100).max(1) as usize;
+        let include_metadata = config.include_metadata.unwrap_or(true);
+        let include_checksum = config.include_checksum.unwrap_or(true);
+        let include_origin_timestamp = 
config.include_origin_timestamp.unwrap_or(true);
+        let max_retries = config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES);
+        let delay_str = 
config.retry_delay.as_deref().unwrap_or(DEFAULT_RETRY_DELAY);
+        let retry_delay = HumanDuration::from_str(delay_str)
+            .map(|duration| duration.into())
+            .unwrap_or_else(|_| Duration::from_secs(1));
+        MongoDbSink {
+            id,
+            client: None,
+            config,
+            verbose,
+            batch_size,
+            include_metadata,
+            include_checksum,
+            include_origin_timestamp,
+            payload_format,
+            max_retries,
+            retry_delay,
+            messages_processed: AtomicU64::new(0),
+            insertion_errors: AtomicU64::new(0),
+        }
+    }
+}
+
+#[async_trait]
+impl Sink for MongoDbSink {
+    async fn open(&mut self) -> Result<(), Error> {
+        info!(
+            "Opening MongoDB sink connector with ID: {}. Target: {}.{}",
+            self.id, self.config.database, self.config.collection
+        );
+        self.connect().await?;
+
+        // Optionally create the collection so it is visible before first 
insert
+        if self.config.auto_create_collection.unwrap_or(false) {
+            self.ensure_collection_exists().await?;
+        }
+
+        Ok(())
+    }
+
+    async fn consume(
+        &self,
+        topic_metadata: &TopicMetadata,
+        messages_metadata: MessagesMetadata,
+        messages: Vec<ConsumedMessage>,
+    ) -> Result<(), Error> {
+        self.process_messages(topic_metadata, &messages_metadata, &messages)
+            .await
+    }
+
+    async fn close(&mut self) -> Result<(), Error> {
+        info!("Closing MongoDB sink connector with ID: {}", self.id);
+
+        // MongoDB client doesn't require explicit close - it's reference 
counted
+        // Just take the client to drop it
+        self.client.take();
+
+        let messages_processed = 
self.messages_processed.load(Ordering::Relaxed);
+        let insertion_errors = self.insertion_errors.load(Ordering::Relaxed);
+        info!(
+            "MongoDB sink ID: {} processed {} messages with {} errors",
+            self.id, messages_processed, insertion_errors
+        );
+        Ok(())
+    }
+}
+
+impl MongoDbSink {
+    /// Build a MongoDB client using ClientOptions so max_pool_size can be 
applied.
+    async fn connect(&mut self) -> Result<(), Error> {
+        let redacted = redact_connection_uri(&self.config.connection_uri);
+
+        info!("Connecting to MongoDB: {redacted}");
+
+        let mut options = ClientOptions::parse(&self.config.connection_uri)
+            .await
+            .map_err(|e| Error::InitError(format!("Failed to parse connection 
URI: {e}")))?;
+
+        if let Some(pool_size) = self.config.max_pool_size {
+            options.max_pool_size = Some(pool_size);
+        }
+
+        let client = Client::with_options(options)
+            .map_err(|e| Error::InitError(format!("Failed to create client: 
{e}")))?;
+
+        // Ping the database to verify connectivity
+        client
+            .database(&self.config.database)
+            .run_command(mongodb::bson::doc! {"ping": 1})
+            .await
+            .map_err(|e| Error::InitError(format!("Database connectivity test 
failed: {e}")))?;
+
+        self.client = Some(client);
+        info!("Connected to MongoDB database: {}", self.config.database);
+        Ok(())
+    }
+
+    /// Create the target collection explicitly if it does not already exist.
+    async fn ensure_collection_exists(&self) -> Result<(), Error> {
+        let client = self.get_client()?;
+        let db = client.database(&self.config.database);
+
+        let existing = db
+            .list_collection_names()
+            .await
+            .map_err(|e| Error::InitError(format!("Failed to list collections: 
{e}")))?;
+
+        if !existing.contains(&self.config.collection) {
+            db.create_collection(&self.config.collection)
+                .await
+                .map_err(|e| {
+                    Error::InitError(format!(
+                        "Failed to create collection '{}': {e}",
+                        self.config.collection
+                    ))
+                })?;
+            info!("Created MongoDB collection '{}'", self.config.collection);
+        } else {
+            debug!(
+                "Collection '{}' already exists, skipping creation",
+                self.config.collection
+            );
+        }
+
+        Ok(())
+    }
+
+    async fn process_messages(
+        &self,
+        topic_metadata: &TopicMetadata,
+        messages_metadata: &MessagesMetadata,
+        messages: &[ConsumedMessage],
+    ) -> Result<(), Error> {
+        let client = self.get_client()?;
+        let db = client.database(&self.config.database);
+        let collection = db.collection(&self.config.collection);
+
+        let mut successful_inserts = 0u64;
+        let mut last_error: Option<Error> = None;
+
+        for batch in messages.chunks(self.batch_size) {
+            let outcome = self
+                .insert_batch(batch, topic_metadata, messages_metadata, 
&collection)
+                .await;
+
+            successful_inserts += outcome.inserted_count;
+            if let Some(batch_error) = outcome.error {
+                self.insertion_errors.fetch_add(1, Ordering::Relaxed);
+                error!(
+                    "Failed to insert batch of {} messages: {batch_error}",
+                    batch.len()
+                );
+                last_error = Some(batch_error);
+            }
+        }
+
+        self.messages_processed
+            .fetch_add(successful_inserts, Ordering::Relaxed);
+
+        let coll = &self.config.collection;
+        if self.verbose {
+            info!(
+                "MongoDB sink ID: {} inserted {successful_inserts} messages to 
collection '{coll}'",
+                self.id
+            );
+        } else {
+            debug!(
+                "MongoDB sink ID: {} inserted {successful_inserts} messages to 
collection '{coll}'",
+                self.id
+            );
+        }
+
+        if let Some(e) = last_error {
+            Err(e)
+        } else {
+            Ok(())
+        }
+    }
+
+    async fn insert_batch(
+        &self,
+        messages: &[ConsumedMessage],
+        topic_metadata: &TopicMetadata,
+        messages_metadata: &MessagesMetadata,
+        collection: &Collection<mongodb::bson::Document>,
+    ) -> BatchInsertOutcome {
+        if messages.is_empty() {
+            return BatchInsertOutcome {
+                inserted_count: 0,
+                error: None,
+            };
+        }
+
+        let mut docs = Vec::with_capacity(messages.len());
+
+        for message in messages {
+            let mut doc = mongodb::bson::Document::new();
+
+            let document_id =
+                build_composite_document_id(topic_metadata, messages_metadata, 
message.id);
+            doc.insert("_id", document_id);
+
+            if self.include_metadata {
+                let (offset_key, offset_value) = 
build_offset_metadata_value(message.offset);
+                doc.insert(offset_key, offset_value);
+                doc.insert(
+                    "iggy_timestamp",
+                    build_bson_datetime_value(message.timestamp),
+                );
+                doc.insert("iggy_stream", &topic_metadata.stream);
+                doc.insert("iggy_topic", &topic_metadata.topic);
+                doc.insert(
+                    "iggy_partition_id",
+                    
build_partition_metadata_value(messages_metadata.partition_id),
+                );
+            }
+
+            if self.include_checksum {
+                doc.insert(
+                    "iggy_checksum",
+                    build_checksum_metadata_value(message.checksum),
+                );
+            }
+
+            if self.include_origin_timestamp {
+                doc.insert(
+                    "iggy_origin_timestamp",
+                    build_bson_datetime_value(message.origin_timestamp),
+                );
+            }
+
+            // Handle payload based on format
+            let payload_bytes = match message.payload.clone().try_into_vec() {
+                Ok(payload_bytes) => payload_bytes,
+                Err(error) => {
+                    return BatchInsertOutcome {
+                        inserted_count: 0,
+                        error: Some(Error::CannotStoreData(format!(
+                            "Failed to convert payload to bytes: {error}"
+                        ))),
+                    };
+                }
+            };
+
+            match self.payload_format {
+                PayloadFormat::Binary => {
+                    doc.insert(
+                        "payload",
+                        bson::Binary {
+                            subtype: bson::spec::BinarySubtype::Generic,
+                            bytes: payload_bytes,
+                        },
+                    );
+                }
+                PayloadFormat::Json => {
+                    let json_value: serde_json::Value = match 
serde_json::from_slice(&payload_bytes)
+                    {
+                        Ok(json_value) => json_value,
+                        Err(error) => {
+                            error!("Failed to parse payload as JSON: {error}");
+                            return BatchInsertOutcome {
+                                inserted_count: 0,
+                                error: Some(Error::CannotStoreData(format!(
+                                    "Failed to parse payload as JSON: {error}"
+                                ))),
+                            };
+                        }
+                    };
+                    let bson_value = match bson::to_bson(&json_value) {
+                        Ok(bson_value) => bson_value,
+                        Err(error) => {
+                            error!("Failed to convert JSON to BSON: {error}");
+                            return BatchInsertOutcome {
+                                inserted_count: 0,
+                                error: Some(Error::CannotStoreData(format!(
+                                    "Failed to convert JSON to BSON: {error}"
+                                ))),
+                            };
+                        }
+                    };
+                    doc.insert("payload", bson_value);
+                }
+                PayloadFormat::String => {
+                    let text_value = match String::from_utf8(payload_bytes) {
+                        Ok(text_value) => text_value,
+                        Err(error) => {
+                            error!("Failed to parse payload as UTF-8 text: 
{error}");
+                            return BatchInsertOutcome {
+                                inserted_count: 0,
+                                error: Some(Error::CannotStoreData(format!(
+                                    "Failed to parse payload as UTF-8 text: 
{error}"
+                                ))),
+                            };
+                        }
+                    };
+                    doc.insert("payload", text_value);
+                }
+            }
+
+            docs.push(doc);
+        }
+
+        self.insert_batch_with_retry(collection, &docs).await
+    }
+
+    async fn insert_batch_with_retry(
+        &self,
+        collection: &Collection<mongodb::bson::Document>,
+        docs: &[mongodb::bson::Document],
+    ) -> BatchInsertOutcome {
+        let mut attempts = 0u32;
+
+        loop {
+            let result = 
collection.insert_many(docs.to_vec()).ordered(false).await;
+
+            match result {
+                Ok(result) => {
+                    return BatchInsertOutcome {
+                        inserted_count: result.inserted_ids.len() as u64,
+                        error: None,
+                    };
+                }
+                Err(error) => {
+                    if let Some(duplicate_count) = 
count_duplicate_write_errors(&error) {
+                        let inserted_count = 
docs.len().saturating_sub(duplicate_count) as u64;
+                        if duplicate_count > 0 {
+                            warn!(
+                                "MongoDB sink ID: {} ignored {duplicate_count} 
duplicate writes in batch",
+                                self.id
+                            );
+                        }
+                        return BatchInsertOutcome {
+                            inserted_count,
+                            error: None,
+                        };
+                    }
+
+                    attempts += 1;
+                    if !is_transient_error(&error) || attempts >= 
self.max_retries {
+                        let inserted_count = 
estimate_inserted_count_value(&error, docs.len());
+                        error!("Batch insert failed after {attempts} attempts: 
{error}");
+                        return BatchInsertOutcome {
+                            inserted_count,
+                            error: Some(Error::CannotStoreData(format!(
+                                "Batch insert failed after {attempts} 
attempts: {error}"
+                            ))),
+                        };
+                    }
+                    warn!(
+                        "Transient database error (attempt {attempts}/{}): 
{error}. Retrying...",
+                        self.max_retries
+                    );
+                    tokio::time::sleep(self.retry_delay * attempts).await;
+                }
+            }
+        }
+    }
+
+    fn get_client(&self) -> Result<&Client, Error> {
+        self.client
+            .as_ref()
+            .ok_or_else(|| Error::InitError("Database not 
connected".to_string()))
+    }
+}
+
+fn build_composite_document_id(
+    topic_metadata: &TopicMetadata,
+    messages_metadata: &MessagesMetadata,
+    message_id: u128,
+) -> String {
+    format!(
+        "{}:{}:{}:{message_id}",
+        topic_metadata.stream, topic_metadata.topic, 
messages_metadata.partition_id
+    )
+}
+
+fn build_partition_metadata_value(partition_id: u32) -> bson::Bson {
+    if let Ok(value) = i32::try_from(partition_id) {
+        bson::Bson::Int32(value)
+    } else {
+        bson::Bson::Int64(i64::from(partition_id))
+    }
+}
+
+fn build_offset_metadata_value(offset: u64) -> (&'static str, bson::Bson) {
+    if let Ok(offset) = i64::try_from(offset) {
+        ("iggy_offset", bson::Bson::Int64(offset))
+    } else {
+        ("iggy_offset_str", bson::Bson::String(offset.to_string()))
+    }
+}
+
+fn build_bson_datetime_value(timestamp_micros: u64) -> bson::DateTime {
+    let timestamp_ms = timestamp_micros / 1000;
+    let timestamp_ms = i64::try_from(timestamp_ms).unwrap_or(i64::MAX);
+    bson::DateTime::from_millis(timestamp_ms)
+}
+
+fn build_checksum_metadata_value(checksum: u64) -> bson::Bson {
+    if let Ok(checksum) = i64::try_from(checksum) {
+        bson::Bson::Int64(checksum)
+    } else {
+        bson::Bson::String(checksum.to_string())
+    }
+}
+
+fn classify_payload_format_value(input: Option<&str>) -> (PayloadFormat, 
Option<&str>) {
+    match input {
+        Some(value) if value.eq_ignore_ascii_case("json") => 
(PayloadFormat::Json, None),
+        Some(value)
+            if value.eq_ignore_ascii_case("string") || 
value.eq_ignore_ascii_case("text") =>
+        {
+            (PayloadFormat::String, None)
+        }
+        Some(value) if value.eq_ignore_ascii_case("binary") => 
(PayloadFormat::Binary, None),
+        Some(value) => (PayloadFormat::Binary, Some(value)),
+        None => (PayloadFormat::Binary, None),
+    }
+}
+
+fn count_duplicate_write_errors(error: &mongodb::error::Error) -> 
Option<usize> {
+    use mongodb::error::ErrorKind;
+
+    let ErrorKind::InsertMany(insert_many_error) = error.kind.as_ref() else {
+        return None;
+    };
+    if insert_many_error.write_concern_error.is_some() {
+        return None;
+    }
+
+    let write_errors = insert_many_error.write_errors.as_ref()?;
+    if write_errors.is_empty() || write_errors.iter().any(|error| error.code 
!= 11000) {
+        return None;
+    }
+
+    Some(write_errors.len())
+}
+
+fn estimate_inserted_count_value(error: &mongodb::error::Error, total_docs: 
usize) -> u64 {
+    use mongodb::error::ErrorKind;
+
+    if let ErrorKind::InsertMany(insert_many_error) = error.kind.as_ref()
+        && let Some(write_errors) = &insert_many_error.write_errors
+    {
+        return total_docs.saturating_sub(write_errors.len()) as u64;
+    }
+
+    0
+}
+
+fn is_transient_error(e: &mongodb::error::Error) -> bool {
+    use mongodb::error::ErrorKind;
+
+    if e.contains_label(mongodb::error::RETRYABLE_WRITE_ERROR) {
+        return true;
+    }
+
+    match e.kind.as_ref() {
+        ErrorKind::Io(_) => true,
+        ErrorKind::ConnectionPoolCleared { .. } => true,
+        ErrorKind::ServerSelection { .. } => true,
+        ErrorKind::Authentication { .. } => false,
+        ErrorKind::BsonDeserialization(_) => false,
+        ErrorKind::BsonSerialization(_) => false,
+        ErrorKind::InsertMany(insert_many_error) => {
+            let has_non_retryable_write_error = insert_many_error
+                .write_errors
+                .as_ref()
+                .is_some_and(|wes| wes.iter().any(|we| matches!(we.code, 11000 
| 13 | 121)));
+            !has_non_retryable_write_error
+        }
+        ErrorKind::Command(cmd_err) => !matches!(cmd_err.code, 11000 | 13 | 
121),
+        _ => {
+            let msg = e.to_string().to_lowercase();
+            msg.contains("timeout")
+                || msg.contains("network")
+                || msg.contains("pool")
+                || msg.contains("server selection")
+        }
+    }
+}
+
+fn redact_connection_uri(uri: &str) -> String {
+    if let Some(scheme_end) = uri.find("://") {
+        let scheme = &uri[..scheme_end + 3];
+        let rest = &uri[scheme_end + 3..];
+        let preview: String = rest.chars().take(3).collect();
+        return format!("{scheme}{preview}***");
+    }
+    let preview: String = uri.chars().take(3).collect();
+    format!("{preview}***")
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    fn given_default_config() -> MongoDbSinkConfig {
+        MongoDbSinkConfig {
+            connection_uri: "mongodb://localhost:27017".to_string(),
+            database: "test_db".to_string(),
+            collection: "test_collection".to_string(),
+            max_pool_size: None,
+            auto_create_collection: None,
+            batch_size: Some(100),
+            include_metadata: None,
+            include_checksum: None,
+            include_origin_timestamp: None,
+            payload_format: None,
+            verbose_logging: None,
+            max_retries: None,
+            retry_delay: None,
+        }
+    }
+
+    #[test]
+    fn given_payload_format_inputs_should_map_expected_variant() {
+        let cases = [
+            (Some("json"), PayloadFormat::Json),
+            (Some("JSON"), PayloadFormat::Json),
+            (Some("string"), PayloadFormat::String),
+            (Some("text"), PayloadFormat::String),
+            (Some("TEXT"), PayloadFormat::String),
+            (Some("binary"), PayloadFormat::Binary),
+            (Some("unknown"), PayloadFormat::Binary),
+            (None, PayloadFormat::Binary),
+        ];
+
+        for (input, expected) in cases {
+            assert_eq!(PayloadFormat::from_config(input), expected);
+        }
+    }
+
+    #[test]
+    fn given_unknown_payload_format_should_mark_warning_contract() {
+        let (payload_format, unknown_value) = 
classify_payload_format_value(Some("surprise"));
+        assert_eq!(payload_format, PayloadFormat::Binary);
+        assert_eq!(unknown_value, Some("surprise"));
+    }
+
+    #[test]
+    fn given_retry_configurations_should_use_expected_values() {
+        let cases = [
+            (None, None, DEFAULT_MAX_RETRIES, Duration::from_secs(1)),
+            (Some(5), None, 5, Duration::from_secs(1)),
+            (
+                None,
+                Some("500ms"),
+                DEFAULT_MAX_RETRIES,
+                Duration::from_millis(500),
+            ),
+        ];
+
+        for (max_retries, retry_delay, expected_retries, expected_delay) in 
cases {
+            let mut config = given_default_config();
+            config.max_retries = max_retries;
+            config.retry_delay = 
retry_delay.map(std::string::ToString::to_string);
+
+            let sink = MongoDbSink::new(1, config);
+            assert_eq!(sink.max_retries, expected_retries);
+            assert_eq!(sink.retry_delay, expected_delay);
+        }
+    }
+
+    #[test]
+    fn given_connection_uri_shapes_should_redact_consistently() {
+        let cases = [
+            (
+                "mongodb://user:password@localhost:27017",
+                "mongodb://use***",
+            ),
+            ("localhost:27017", "loc***"),
+            (
+                "mongodb+srv://admin:[email protected]",
+                "mongodb+srv://adm***",
+            ),
+        ];
+
+        for (uri, expected) in cases {
+            assert_eq!(redact_connection_uri(uri), expected);
+        }
+    }
+
+    #[test]
+    fn given_payload_format_config_should_select_sink_format() {
+        let cases = [
+            (None, PayloadFormat::Binary),
+            (Some("json"), PayloadFormat::Json),
+            (Some("string"), PayloadFormat::String),
+        ];
+
+        for (payload_format, expected) in cases {
+            let mut config = given_default_config();
+            config.payload_format = 
payload_format.map(std::string::ToString::to_string);
+
+            let sink = MongoDbSink::new(1, config);
+            assert_eq!(sink.payload_format, expected);
+        }
+    }
+
+    #[test]
+    fn given_offset_values_should_use_expected_metadata_field() {
+        let (normal_key, normal_value) = build_offset_metadata_value(i64::MAX 
as u64);
+        assert_eq!(normal_key, "iggy_offset");
+        assert_eq!(normal_value, bson::Bson::Int64(i64::MAX));
+
+        let oversized_offset = (i64::MAX as u64) + 1;
+        let (oversized_key, oversized_value) = 
build_offset_metadata_value(oversized_offset);
+        assert_eq!(oversized_key, "iggy_offset_str");
+        assert_eq!(
+            oversized_value,
+            bson::Bson::String(oversized_offset.to_string())
+        );
+    }
+
+    #[test]
+    fn given_partition_values_should_use_expected_bson_type() {
+        assert_eq!(
+            build_partition_metadata_value(i32::MAX as u32),
+            bson::Bson::Int32(i32::MAX)
+        );
+
+        let oversized_partition = (i32::MAX as u32) + 1;
+        assert_eq!(
+            build_partition_metadata_value(oversized_partition),
+            bson::Bson::Int64(i64::from(oversized_partition))
+        );
+    }
+
+    #[test]
+    fn given_timestamp_values_should_use_bounded_bson_datetime() {
+        assert_eq!(
+            build_bson_datetime_value(1_234_000),
+            bson::DateTime::from_millis(1_234)
+        );
+
+        assert_eq!(
+            build_bson_datetime_value(u64::MAX),
+            bson::DateTime::from_millis((u64::MAX / 1_000) as i64)
+        );
+    }
+
+    #[test]
+    fn given_checksum_values_should_use_lossless_bson_value() {
+        assert_eq!(
+            build_checksum_metadata_value(i64::MAX as u64),
+            bson::Bson::Int64(i64::MAX)
+        );
+
+        let oversized_checksum = (i64::MAX as u64) + 1;
+        assert_eq!(
+            build_checksum_metadata_value(oversized_checksum),
+            bson::Bson::String(oversized_checksum.to_string())
+        );
+    }
+
+    #[test]
+    fn given_cross_topic_messages_should_build_unique_document_ids() {
+        let first_topic = TopicMetadata {
+            stream: "test_stream".to_string(),
+            topic: "topic_primary".to_string(),
+        };
+        let second_topic = TopicMetadata {
+            stream: "test_stream".to_string(),
+            topic: "topic_secondary".to_string(),
+        };
+        let messages_metadata = MessagesMetadata {
+            partition_id: 0,
+            current_offset: 0,
+            schema: iggy_connector_sdk::Schema::Raw,
+        };
+
+        let first_id = build_composite_document_id(&first_topic, 
&messages_metadata, 42);
+        let second_id = build_composite_document_id(&second_topic, 
&messages_metadata, 42);
+        assert_ne!(
+            first_id, second_id,
+            "Composite document ID must differ for different topics"
+        );
+    }
+
+    #[test]
+    fn given_auto_create_collection_config_should_store_expected_option() {
+        let cases = [None, Some(true), Some(false)];
+
+        for auto_create_collection in cases {
+            let mut config = given_default_config();
+            config.auto_create_collection = auto_create_collection;
+
+            let sink = MongoDbSink::new(1, config);
+            assert_eq!(sink.config.auto_create_collection, 
auto_create_collection);
+        }
+    }
+
+    // ---- is_transient_error tests ----
+
+    #[test]
+    fn given_io_timeout_error_should_be_transient() {
+        let io_err = std::io::Error::new(std::io::ErrorKind::TimedOut, 
"connection timed out");
+        let e: mongodb::error::Error = io_err.into();
+        assert!(is_transient_error(&e));
+    }
+
+    #[test]
+    fn given_io_network_error_should_be_transient() {
+        let io_err =
+            std::io::Error::new(std::io::ErrorKind::ConnectionRefused, 
"connection refused");
+        let e: mongodb::error::Error = io_err.into();
+        assert!(is_transient_error(&e));
+    }
+
+    #[test]
+    fn given_string_timeout_error_should_be_transient() {
+        let e = mongodb::error::Error::custom(String::from("server selection 
timeout exceeded"));
+        assert!(is_transient_error(&e));
+    }
+
+    #[test]
+    fn given_string_pool_error_should_be_transient() {
+        let e = mongodb::error::Error::custom(String::from("connection pool 
exhausted"));
+        assert!(is_transient_error(&e));
+    }
+
+    #[test]
+    fn given_auth_failure_string_should_not_be_transient() {
+        let e =
+            mongodb::error::Error::custom(String::from("authentication failed: 
bad credentials"));
+        assert!(!is_transient_error(&e));
+    }
+
+    #[test]
+    fn given_duplicate_key_string_should_not_be_transient() {
+        let e = mongodb::error::Error::custom(String::from("duplicate key 
error on collection"));
+        assert!(!is_transient_error(&e));
+    }
+
+    // ---- process_messages error propagation tests ----
+    // These tests verify that the sink does NOT silently lose data when 
inserts fail.
+
+    /// Test contract: When MongoDB insert fails, process_messages MUST return 
Err.
+    /// This prevents silent data loss where upstream commits while writes 
failed.
+    ///
+    /// Given: A sink with no client (will fail on get_client)
+    /// When: process_messages is called with messages
+    /// Then: Returns Err (not Ok) and does NOT count failed messages as 
processed
+    #[tokio::test]
+    async fn given_no_client_should_return_error_not_silent_ok() {
+        let config = given_default_config();
+        let sink = MongoDbSink::new(1, config);
+
+        // Sink has no client - this simulates connection failure
+        assert!(
+            sink.client.is_none(),
+            "Sink should not have client before connect"
+        );
+
+        let topic_metadata = TopicMetadata {
+            stream: "test_stream".to_string(),
+            topic: "test_topic".to_string(),
+        };
+        let messages_metadata = MessagesMetadata {
+            partition_id: 1,
+            current_offset: 0,
+            schema: iggy_connector_sdk::Schema::Raw,
+        };
+        let messages = vec![ConsumedMessage {
+            id: 1,
+            offset: 0,
+            timestamp: 1000,
+            origin_timestamp: 1000,
+            checksum: 0,
+            headers: None,
+            payload: iggy_connector_sdk::Payload::Raw(vec![1, 2, 3]),
+        }];
+
+        let result = sink
+            .process_messages(&topic_metadata, &messages_metadata, &messages)
+            .await;
+
+        // CRITICAL: Must return Err, not Ok(())
+        assert!(
+            result.is_err(),
+            "process_messages MUST return Err when client is unavailable - 
silent data loss bug!"
+        );
+
+        assert_eq!(
+            sink.messages_processed.load(Ordering::Relaxed),
+            0,
+            "messages_processed must only count SUCCESSFUL inserts"
+        );
+    }
+
+    /// Test contract: messages_processed only counts successfully inserted 
messages.
+    ///
+    /// Given: Multiple messages where some may fail
+    /// When: process_messages handles them
+    /// Then: messages_processed reflects only successful writes
+    #[test]
+    fn given_new_sink_should_have_zero_messages_processed() {
+        let sink = MongoDbSink::new(1, given_default_config());
+        assert_eq!(
+            sink.messages_processed.load(Ordering::Relaxed),
+            0,
+            "New sink must start with zero processed count"
+        );
+        assert_eq!(
+            sink.insertion_errors.load(Ordering::Relaxed),
+            0,
+            "New sink must start with zero error count"
+        );
+    }
+}
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index 13afb04dc..e10b7bbeb 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -47,6 +47,7 @@ iggy_connector_sdk = { workspace = true, features = ["api"] }
 keyring = { workspace = true }
 lazy_static = { workspace = true }
 libc = { workspace = true }
+mongodb = { version = "3.0", features = ["rustls-tls"] }
 once_cell = { workspace = true }
 predicates = { workspace = true }
 rand = { workspace = true }
diff --git a/core/integration/tests/connectors/fixtures/mod.rs 
b/core/integration/tests/connectors/fixtures/mod.rs
index 3c0ac0d93..6deae4866 100644
--- a/core/integration/tests/connectors/fixtures/mod.rs
+++ b/core/integration/tests/connectors/fixtures/mod.rs
@@ -19,12 +19,17 @@
 
 mod elasticsearch;
 mod iceberg;
+mod mongodb;
 mod postgres;
 mod quickwit;
 mod wiremock;
 
 pub use elasticsearch::{ElasticsearchSinkFixture, 
ElasticsearchSourcePreCreatedFixture};
 pub use iceberg::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, 
IcebergPreCreatedFixture};
+pub use mongodb::{
+    MongoDbOps, MongoDbSinkAutoCreateFixture, MongoDbSinkBatchFixture, 
MongoDbSinkFailpointFixture,
+    MongoDbSinkFixture, MongoDbSinkJsonFixture, MongoDbSinkWriteConcernFixture,
+};
 pub use postgres::{
     PostgresOps, PostgresSinkByteaFixture, PostgresSinkFixture, 
PostgresSinkJsonFixture,
     PostgresSourceByteaFixture, PostgresSourceDeleteFixture, 
PostgresSourceJsonFixture,
diff --git a/core/integration/tests/connectors/fixtures/mongodb/container.rs 
b/core/integration/tests/connectors/fixtures/mongodb/container.rs
new file mode 100644
index 000000000..4c2b5177d
--- /dev/null
+++ b/core/integration/tests/connectors/fixtures/mongodb/container.rs
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use integration::harness::TestBinaryError;
+use mongodb::{Client, bson::doc, options::ClientOptions};
+use std::time::Duration;
+use testcontainers_modules::testcontainers::core::{IntoContainerPort, WaitFor};
+use testcontainers_modules::testcontainers::runners::AsyncRunner;
+use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage, 
ImageExt};
+use tokio::time::sleep;
+use tracing::info;
+
+const MONGODB_IMAGE: &str = "mongo";
+const MONGODB_TAG: &str = "7";
+const MONGODB_PORT: u16 = 27017;
+const MONGODB_READY_MSG: &str = "Waiting for connections";
+const MONGODB_REPLICA_SET_NAME: &str = "rs0";
+const MONGODB_INIT_ATTEMPTS: usize = 120;
+const MONGODB_INIT_INTERVAL_MS: u64 = 250;
+
+pub(super) const DEFAULT_TEST_STREAM: &str = "test_stream";
+pub(super) const DEFAULT_TEST_TOPIC: &str = "test_topic";
+pub(super) const DEFAULT_SINK_COLLECTION: &str = "iggy_messages";
+pub(super) const DEFAULT_TEST_DATABASE: &str = "iggy_test";
+
+pub(super) const DEFAULT_POLL_ATTEMPTS: usize = 100;
+pub(super) const DEFAULT_POLL_INTERVAL_MS: u64 = 50;
+
+// Sink env vars
+pub(super) const ENV_SINK_CONNECTION_URI: &str =
+    "IGGY_CONNECTORS_SINK_MONGODB_PLUGIN_CONFIG_CONNECTION_URI";
+pub(super) const ENV_SINK_DATABASE: &str = 
"IGGY_CONNECTORS_SINK_MONGODB_PLUGIN_CONFIG_DATABASE";
+pub(super) const ENV_SINK_COLLECTION: &str =
+    "IGGY_CONNECTORS_SINK_MONGODB_PLUGIN_CONFIG_COLLECTION";
+pub(super) const ENV_SINK_PAYLOAD_FORMAT: &str =
+    "IGGY_CONNECTORS_SINK_MONGODB_PLUGIN_CONFIG_PAYLOAD_FORMAT";
+pub(super) const ENV_SINK_INCLUDE_METADATA: &str =
+    "IGGY_CONNECTORS_SINK_MONGODB_PLUGIN_CONFIG_INCLUDE_METADATA";
+pub(super) const ENV_SINK_AUTO_CREATE_COLLECTION: &str =
+    "IGGY_CONNECTORS_SINK_MONGODB_PLUGIN_CONFIG_AUTO_CREATE_COLLECTION";
+pub(super) const ENV_SINK_BATCH_SIZE: &str =
+    "IGGY_CONNECTORS_SINK_MONGODB_PLUGIN_CONFIG_BATCH_SIZE";
+pub(super) const ENV_SINK_MAX_RETRIES: &str =
+    "IGGY_CONNECTORS_SINK_MONGODB_PLUGIN_CONFIG_MAX_RETRIES";
+pub(super) const ENV_SINK_RETRY_DELAY: &str =
+    "IGGY_CONNECTORS_SINK_MONGODB_PLUGIN_CONFIG_RETRY_DELAY";
+pub(super) const ENV_SINK_STREAMS_0_STREAM: &str = 
"IGGY_CONNECTORS_SINK_MONGODB_STREAMS_0_STREAM";
+pub(super) const ENV_SINK_STREAMS_0_TOPICS: &str = 
"IGGY_CONNECTORS_SINK_MONGODB_STREAMS_0_TOPICS";
+pub(super) const ENV_SINK_STREAMS_0_SCHEMA: &str = 
"IGGY_CONNECTORS_SINK_MONGODB_STREAMS_0_SCHEMA";
+pub(super) const ENV_SINK_STREAMS_0_CONSUMER_GROUP: &str =
+    "IGGY_CONNECTORS_SINK_MONGODB_STREAMS_0_CONSUMER_GROUP";
+pub(super) const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_MONGODB_PATH";
+
+/// Base container management for MongoDB fixtures.
+pub struct MongoDbContainer {
+    #[allow(dead_code)]
+    container: ContainerAsync<GenericImage>,
+    pub(super) connection_uri: String,
+}
+
+impl MongoDbContainer {
+    pub(super) async fn start() -> Result<Self, TestBinaryError> {
+        let container = GenericImage::new(MONGODB_IMAGE, MONGODB_TAG)
+            .with_exposed_port(MONGODB_PORT.tcp())
+            .with_wait_for(WaitFor::message_on_stdout(MONGODB_READY_MSG))
+            .with_mapped_port(0, MONGODB_PORT.tcp())
+            .start()
+            .await
+            .map_err(|e| TestBinaryError::FixtureSetup {
+                fixture_type: "MongoDbContainer".to_string(),
+                message: format!("Failed to start container: {e}"),
+            })?;
+
+        info!("Started MongoDB container");
+
+        let mapped_port = container
+            .ports()
+            .await
+            .map_err(|e| TestBinaryError::FixtureSetup {
+                fixture_type: "MongoDbContainer".to_string(),
+                message: format!("Failed to get ports: {e}"),
+            })?
+            .map_to_host_port_ipv4(MONGODB_PORT)
+            .ok_or_else(|| TestBinaryError::FixtureSetup {
+                fixture_type: "MongoDbContainer".to_string(),
+                message: "No mapping for MongoDB port".to_string(),
+            })?;
+
+        // Standalone mode: plain URI. No ?directConnection=true needed
+        // (directConnection is only required for single-node replica sets).
+        let connection_uri = format!("mongodb://localhost:{mapped_port}");
+
+        info!("MongoDB container available at {connection_uri}");
+
+        Ok(Self {
+            container,
+            connection_uri,
+        })
+    }
+
+    pub(super) async fn start_single_node_replica_set(
+        enable_test_commands: bool,
+    ) -> Result<Self, TestBinaryError> {
+        let mut image = GenericImage::new(MONGODB_IMAGE, MONGODB_TAG)
+            .with_exposed_port(MONGODB_PORT.tcp())
+            .with_wait_for(WaitFor::message_on_stdout(MONGODB_READY_MSG))
+            .with_mapped_port(0, MONGODB_PORT.tcp())
+            .with_cmd(["--replSet", MONGODB_REPLICA_SET_NAME, 
"--bind_ip_all"]);
+
+        if enable_test_commands {
+            image = image.with_cmd([
+                "--replSet",
+                MONGODB_REPLICA_SET_NAME,
+                "--bind_ip_all",
+                "--setParameter",
+                "enableTestCommands=1",
+            ]);
+        }
+
+        let container = image
+            .start()
+            .await
+            .map_err(|e| TestBinaryError::FixtureSetup {
+                fixture_type: "MongoDbContainer".to_string(),
+                message: format!("Failed to start replica-set container: {e}"),
+            })?;
+
+        let mapped_port = container
+            .ports()
+            .await
+            .map_err(|e| TestBinaryError::FixtureSetup {
+                fixture_type: "MongoDbContainer".to_string(),
+                message: format!("Failed to get ports: {e}"),
+            })?
+            .map_to_host_port_ipv4(MONGODB_PORT)
+            .ok_or_else(|| TestBinaryError::FixtureSetup {
+                fixture_type: "MongoDbContainer".to_string(),
+                message: "No mapping for MongoDB port".to_string(),
+            })?;
+
+        let bootstrap_uri = 
format!("mongodb://localhost:{mapped_port}/?directConnection=true");
+        let options = ClientOptions::parse(&bootstrap_uri).await.map_err(|e| {
+            TestBinaryError::FixtureSetup {
+                fixture_type: "MongoDbContainer".to_string(),
+                message: format!("Failed to parse bootstrap URI: {e}"),
+            }
+        })?;
+        let bootstrap_client =
+            Client::with_options(options).map_err(|e| 
TestBinaryError::FixtureSetup {
+                fixture_type: "MongoDbContainer".to_string(),
+                message: format!("Failed to create bootstrap client: {e}"),
+            })?;
+
+        let _ = bootstrap_client
+            .database("admin")
+            .run_command(doc! {
+                "replSetInitiate": {
+                    "_id": MONGODB_REPLICA_SET_NAME,
+                    "members": [
+                        {
+                            "_id": 0,
+                            "host": format!("localhost:{MONGODB_PORT}")
+                        }
+                    ]
+                }
+            })
+            .await;
+
+        let mut initialized = false;
+        for _ in 0..MONGODB_INIT_ATTEMPTS {
+            let status = bootstrap_client
+                .database("admin")
+                .run_command(doc! { "replSetGetStatus": 1 })
+                .await;
+
+            if let Ok(status) = status
+                && status.get_i32("myState").ok() == Some(1)
+            {
+                initialized = true;
+                break;
+            }
+
+            sleep(Duration::from_millis(MONGODB_INIT_INTERVAL_MS)).await;
+        }
+
+        if !initialized {
+            return Err(TestBinaryError::FixtureSetup {
+                fixture_type: "MongoDbContainer".to_string(),
+                message: "Replica set failed to reach PRIMARY 
state".to_string(),
+            });
+        }
+
+        let connection_uri = format!(
+            
"mongodb://localhost:{mapped_port}/?replicaSet={MONGODB_REPLICA_SET_NAME}&directConnection=true"
+        );
+        info!("MongoDB single-node replica set available at {connection_uri}");
+
+        Ok(Self {
+            container,
+            connection_uri,
+        })
+    }
+
+    pub async fn create_client(&self) -> Result<Client, TestBinaryError> {
+        let options = ClientOptions::parse(&self.connection_uri)
+            .await
+            .map_err(|e| TestBinaryError::FixtureSetup {
+                fixture_type: "MongoDbContainer".to_string(),
+                message: format!("Failed to parse URI: {e}"),
+            })?;
+
+        Client::with_options(options).map_err(|e| 
TestBinaryError::FixtureSetup {
+            fixture_type: "MongoDbContainer".to_string(),
+            message: format!("Failed to create client: {e}"),
+        })
+    }
+}
+
+/// Common MongoDB operations for fixtures.
+pub trait MongoDbOps: Sync {
+    fn container(&self) -> &MongoDbContainer;
+
+    fn create_client(
+        &self,
+    ) -> impl std::future::Future<Output = Result<Client, TestBinaryError>> + 
Send {
+        self.container().create_client()
+    }
+}
diff --git a/core/integration/tests/connectors/fixtures/mod.rs 
b/core/integration/tests/connectors/fixtures/mongodb/mod.rs
similarity index 54%
copy from core/integration/tests/connectors/fixtures/mod.rs
copy to core/integration/tests/connectors/fixtures/mongodb/mod.rs
index 3c0ac0d93..cfaf97ec6 100644
--- a/core/integration/tests/connectors/fixtures/mod.rs
+++ b/core/integration/tests/connectors/fixtures/mongodb/mod.rs
@@ -17,18 +17,11 @@
  * under the License.
  */
 
-mod elasticsearch;
-mod iceberg;
-mod postgres;
-mod quickwit;
-mod wiremock;
+mod container;
+mod sink;
 
-pub use elasticsearch::{ElasticsearchSinkFixture, 
ElasticsearchSourcePreCreatedFixture};
-pub use iceberg::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, 
IcebergPreCreatedFixture};
-pub use postgres::{
-    PostgresOps, PostgresSinkByteaFixture, PostgresSinkFixture, 
PostgresSinkJsonFixture,
-    PostgresSourceByteaFixture, PostgresSourceDeleteFixture, 
PostgresSourceJsonFixture,
-    PostgresSourceJsonbFixture, PostgresSourceMarkFixture, PostgresSourceOps,
+pub use container::MongoDbOps;
+pub use sink::{
+    MongoDbSinkAutoCreateFixture, MongoDbSinkBatchFixture, 
MongoDbSinkFailpointFixture,
+    MongoDbSinkFixture, MongoDbSinkJsonFixture, MongoDbSinkWriteConcernFixture,
 };
-pub use quickwit::{QuickwitFixture, QuickwitOps, QuickwitPreCreatedFixture};
-pub use wiremock::{WireMockDirectFixture, WireMockWrappedFixture};
diff --git a/core/integration/tests/connectors/fixtures/mongodb/sink.rs 
b/core/integration/tests/connectors/fixtures/mongodb/sink.rs
new file mode 100644
index 000000000..8fa978275
--- /dev/null
+++ b/core/integration/tests/connectors/fixtures/mongodb/sink.rs
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::container::{
+    DEFAULT_POLL_ATTEMPTS, DEFAULT_POLL_INTERVAL_MS, DEFAULT_SINK_COLLECTION,
+    DEFAULT_TEST_DATABASE, DEFAULT_TEST_STREAM, DEFAULT_TEST_TOPIC,
+    ENV_SINK_AUTO_CREATE_COLLECTION, ENV_SINK_BATCH_SIZE, ENV_SINK_COLLECTION,
+    ENV_SINK_CONNECTION_URI, ENV_SINK_DATABASE, ENV_SINK_INCLUDE_METADATA, 
ENV_SINK_MAX_RETRIES,
+    ENV_SINK_PATH, ENV_SINK_PAYLOAD_FORMAT, ENV_SINK_RETRY_DELAY,
+    ENV_SINK_STREAMS_0_CONSUMER_GROUP, ENV_SINK_STREAMS_0_SCHEMA, 
ENV_SINK_STREAMS_0_STREAM,
+    ENV_SINK_STREAMS_0_TOPICS, MongoDbContainer, MongoDbOps,
+};
+use async_trait::async_trait;
+use futures::TryStreamExt;
+use integration::harness::{TestBinaryError, TestFixture};
+use mongodb::{Client, bson::Document};
+use std::collections::HashMap;
+use std::time::Duration;
+use tokio::time::sleep;
+use tracing::info;
+
+/// MongoDB sink connector fixture with binary payload format (default).
+pub struct MongoDbSinkFixture {
+    container: MongoDbContainer,
+    payload_format: &'static str,
+    auto_create: bool,
+}
+
+impl MongoDbOps for MongoDbSinkFixture {
+    fn container(&self) -> &MongoDbContainer {
+        &self.container
+    }
+}
+
+impl MongoDbSinkFixture {
+    /// Wait for documents to appear in a MongoDB collection, polling until 
expected count reached.
+    ///
+    /// Returns an error if the expected count is not reached within the poll 
attempts.
+    pub async fn wait_for_documents(
+        &self,
+        client: &Client,
+        collection_name: &str,
+        expected: usize,
+    ) -> Result<Vec<Document>, TestBinaryError> {
+        let db = client.database(DEFAULT_TEST_DATABASE);
+        let collection = db.collection::<Document>(collection_name);
+
+        for _ in 0..DEFAULT_POLL_ATTEMPTS {
+            let cursor = collection
+                .find(mongodb::bson::doc! {})
+                .sort(mongodb::bson::doc! { "iggy_offset": 1 })
+                .await;
+
+            if let Ok(c) = cursor
+                && let Ok(docs) = c.try_collect::<Vec<_>>().await
+                && docs.len() >= expected
+            {
+                info!(
+                    "Found {} documents in MongoDB collection 
'{collection_name}'",
+                    docs.len()
+                );
+                return Ok(docs);
+            }
+            sleep(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS)).await;
+        }
+
+        Err(TestBinaryError::InvalidState {
+            message: format!(
+                "Expected at least {expected} documents in '{collection_name}' 
after {} attempts",
+                DEFAULT_POLL_ATTEMPTS
+            ),
+        })
+    }
+
+    /// Count all documents in a collection.
+    pub async fn count_documents_in_collection(
+        &self,
+        client: &Client,
+        collection_name: &str,
+    ) -> Result<u64, TestBinaryError> {
+        let db = client.database(DEFAULT_TEST_DATABASE);
+        let collection = db.collection::<Document>(collection_name);
+        collection
+            .count_documents(mongodb::bson::doc! {})
+            .await
+            .map_err(|e| TestBinaryError::InvalidState {
+                message: format!("Failed to count documents: {e}"),
+            })
+    }
+
+    /// Check whether a named collection exists in the test database.
+    pub async fn collection_exists(
+        &self,
+        client: &Client,
+        collection_name: &str,
+    ) -> Result<bool, TestBinaryError> {
+        let db = client.database(DEFAULT_TEST_DATABASE);
+        let names =
+            db.list_collection_names()
+                .await
+                .map_err(|e| TestBinaryError::InvalidState {
+                    message: format!("Failed to list collections: {e}"),
+                })?;
+        Ok(names.contains(&collection_name.to_string()))
+    }
+
+    /// Enable `failCommand` failpoint once with caller-supplied failpoint 
data.
+    pub async fn configure_fail_command_once(
+        &self,
+        client: &Client,
+        data: Document,
+    ) -> Result<(), TestBinaryError> {
+        client
+            .database("admin")
+            .run_command(mongodb::bson::doc! {
+                "configureFailPoint": "failCommand",
+                "mode": { "times": 1 },
+                "data": data,
+            })
+            .await
+            .map(|_| ())
+            .map_err(|e| TestBinaryError::FixtureSetup {
+                fixture_type: "MongoDbSinkFixture".to_string(),
+                message: format!("Failed to configure failCommand failpoint: 
{e}"),
+            })
+    }
+
+    /// Disable `failCommand` failpoint explicitly.
+    pub async fn disable_fail_command(&self, client: &Client) -> Result<(), 
TestBinaryError> {
+        client
+            .database("admin")
+            .run_command(mongodb::bson::doc! {
+                "configureFailPoint": "failCommand",
+                "mode": "off",
+            })
+            .await
+            .map(|_| ())
+            .map_err(|e| TestBinaryError::FixtureSetup {
+                fixture_type: "MongoDbSinkFixture".to_string(),
+                message: format!("Failed to disable failCommand failpoint: 
{e}"),
+            })
+    }
+}
+
+#[async_trait]
+impl TestFixture for MongoDbSinkFixture {
+    async fn setup() -> Result<Self, TestBinaryError> {
+        let container = MongoDbContainer::start().await?;
+        Ok(Self {
+            container,
+            payload_format: "binary",
+            auto_create: false,
+        })
+    }
+
+    fn connectors_runtime_envs(&self) -> HashMap<String, String> {
+        let mut envs = HashMap::new();
+        envs.insert(
+            ENV_SINK_CONNECTION_URI.to_string(),
+            self.container.connection_uri.clone(),
+        );
+        envs.insert(
+            ENV_SINK_DATABASE.to_string(),
+            DEFAULT_TEST_DATABASE.to_string(),
+        );
+        envs.insert(
+            ENV_SINK_COLLECTION.to_string(),
+            DEFAULT_SINK_COLLECTION.to_string(),
+        );
+        envs.insert(
+            ENV_SINK_PAYLOAD_FORMAT.to_string(),
+            self.payload_format.to_string(),
+        );
+        envs.insert(ENV_SINK_INCLUDE_METADATA.to_string(), "true".to_string());
+        envs.insert(
+            ENV_SINK_STREAMS_0_STREAM.to_string(),
+            DEFAULT_TEST_STREAM.to_string(),
+        );
+        envs.insert(
+            ENV_SINK_STREAMS_0_TOPICS.to_string(),
+            format!("[{}]", DEFAULT_TEST_TOPIC),
+        );
+        envs.insert(ENV_SINK_STREAMS_0_SCHEMA.to_string(), "raw".to_string());
+        envs.insert(
+            ENV_SINK_STREAMS_0_CONSUMER_GROUP.to_string(),
+            "mongodb_sink_cg".to_string(),
+        );
+        envs.insert(
+            ENV_SINK_PATH.to_string(),
+            "../../target/debug/libiggy_connector_mongodb_sink".to_string(),
+        );
+        if self.auto_create {
+            envs.insert(
+                ENV_SINK_AUTO_CREATE_COLLECTION.to_string(),
+                "true".to_string(),
+            );
+        }
+        envs
+    }
+}
+
+/// MongoDB sink fixture with JSON payload format.
+pub struct MongoDbSinkJsonFixture {
+    inner: MongoDbSinkFixture,
+}
+
+impl std::ops::Deref for MongoDbSinkJsonFixture {
+    type Target = MongoDbSinkFixture;
+    fn deref(&self) -> &Self::Target {
+        &self.inner
+    }
+}
+
+#[async_trait]
+impl TestFixture for MongoDbSinkJsonFixture {
+    async fn setup() -> Result<Self, TestBinaryError> {
+        let container = MongoDbContainer::start().await?;
+        Ok(Self {
+            inner: MongoDbSinkFixture {
+                container,
+                payload_format: "json",
+                auto_create: false,
+            },
+        })
+    }
+
+    fn connectors_runtime_envs(&self) -> HashMap<String, String> {
+        let mut envs = self.inner.connectors_runtime_envs();
+        // Schema must be "json" for the runtime to route messages correctly.
+        // Already set in base, but override STREAMS_0_SCHEMA explicitly.
+        envs.insert(ENV_SINK_STREAMS_0_SCHEMA.to_string(), "json".to_string());
+        envs
+    }
+}
+
+/// MongoDB sink fixture with auto_create_collection enabled.
+pub struct MongoDbSinkAutoCreateFixture {
+    inner: MongoDbSinkFixture,
+}
+
+impl std::ops::Deref for MongoDbSinkAutoCreateFixture {
+    type Target = MongoDbSinkFixture;
+    fn deref(&self) -> &Self::Target {
+        &self.inner
+    }
+}
+
+#[async_trait]
+impl TestFixture for MongoDbSinkAutoCreateFixture {
+    async fn setup() -> Result<Self, TestBinaryError> {
+        let container = MongoDbContainer::start().await?;
+        Ok(Self {
+            inner: MongoDbSinkFixture {
+                container,
+                payload_format: "binary",
+                auto_create: true,
+            },
+        })
+    }
+
+    fn connectors_runtime_envs(&self) -> HashMap<String, String> {
+        // auto_create flag is set in the base connectors_runtime_envs()
+        self.inner.connectors_runtime_envs()
+    }
+}
+
+/// MongoDB sink fixture with batch_size set to 10 for large-batch tests.
+pub struct MongoDbSinkBatchFixture {
+    inner: MongoDbSinkFixture,
+}
+
+impl std::ops::Deref for MongoDbSinkBatchFixture {
+    type Target = MongoDbSinkFixture;
+    fn deref(&self) -> &Self::Target {
+        &self.inner
+    }
+}
+
+#[async_trait]
+impl TestFixture for MongoDbSinkBatchFixture {
+    async fn setup() -> Result<Self, TestBinaryError> {
+        let container = MongoDbContainer::start().await?;
+        Ok(Self {
+            inner: MongoDbSinkFixture {
+                container,
+                payload_format: "binary",
+                auto_create: false,
+            },
+        })
+    }
+
+    fn connectors_runtime_envs(&self) -> HashMap<String, String> {
+        let mut envs = self.inner.connectors_runtime_envs();
+        envs.insert(ENV_SINK_BATCH_SIZE.to_string(), "10".to_string());
+        envs
+    }
+}
+
+/// MongoDB sink fixture backed by single-node replica set and write concern 
timeout URI.
+pub struct MongoDbSinkWriteConcernFixture {
+    inner: MongoDbSinkFixture,
+    connection_uri: String,
+}
+
+impl std::ops::Deref for MongoDbSinkWriteConcernFixture {
+    type Target = MongoDbSinkFixture;
+    fn deref(&self) -> &Self::Target {
+        &self.inner
+    }
+}
+
+#[async_trait]
+impl TestFixture for MongoDbSinkWriteConcernFixture {
+    async fn setup() -> Result<Self, TestBinaryError> {
+        let container = 
MongoDbContainer::start_single_node_replica_set(false).await?;
+        let connection_uri = format!(
+            "{}&w=2&wtimeoutMS=200&retryWrites=false",
+            container.connection_uri
+        );
+        Ok(Self {
+            inner: MongoDbSinkFixture {
+                container,
+                payload_format: "binary",
+                auto_create: false,
+            },
+            connection_uri,
+        })
+    }
+
+    fn connectors_runtime_envs(&self) -> HashMap<String, String> {
+        let mut envs = self.inner.connectors_runtime_envs();
+        envs.insert(
+            ENV_SINK_CONNECTION_URI.to_string(),
+            self.connection_uri.clone(),
+        );
+        // Keep the signal clean: fail once and report failure instead of 
retrying.
+        envs.insert(ENV_SINK_MAX_RETRIES.to_string(), "1".to_string());
+        envs.insert(ENV_SINK_RETRY_DELAY.to_string(), "50ms".to_string());
+        envs
+    }
+}
+
+/// MongoDB sink fixture backed by single-node replica set with test commands 
enabled.
+pub struct MongoDbSinkFailpointFixture {
+    inner: MongoDbSinkFixture,
+}
+
+impl std::ops::Deref for MongoDbSinkFailpointFixture {
+    type Target = MongoDbSinkFixture;
+    fn deref(&self) -> &Self::Target {
+        &self.inner
+    }
+}
+
+#[async_trait]
+impl TestFixture for MongoDbSinkFailpointFixture {
+    async fn setup() -> Result<Self, TestBinaryError> {
+        let container = 
MongoDbContainer::start_single_node_replica_set(true).await?;
+        Ok(Self {
+            inner: MongoDbSinkFixture {
+                container,
+                payload_format: "binary",
+                auto_create: false,
+            },
+        })
+    }
+
+    fn connectors_runtime_envs(&self) -> HashMap<String, String> {
+        let mut envs = self.inner.connectors_runtime_envs();
+        envs.insert(ENV_SINK_MAX_RETRIES.to_string(), "2".to_string());
+        envs.insert(ENV_SINK_RETRY_DELAY.to_string(), "50ms".to_string());
+        envs
+    }
+}
diff --git a/core/integration/tests/connectors/mod.rs 
b/core/integration/tests/connectors/mod.rs
index a85453d52..0d9352904 100644
--- a/core/integration/tests/connectors/mod.rs
+++ b/core/integration/tests/connectors/mod.rs
@@ -22,6 +22,7 @@ mod elasticsearch;
 mod fixtures;
 mod http_config_provider;
 mod iceberg;
+mod mongodb;
 mod postgres;
 mod quickwit;
 mod random;
diff --git a/core/integration/tests/connectors/fixtures/mod.rs 
b/core/integration/tests/connectors/mongodb/mod.rs
similarity index 54%
copy from core/integration/tests/connectors/fixtures/mod.rs
copy to core/integration/tests/connectors/mongodb/mod.rs
index 3c0ac0d93..d44706b49 100644
--- a/core/integration/tests/connectors/fixtures/mod.rs
+++ b/core/integration/tests/connectors/mongodb/mod.rs
@@ -17,18 +17,8 @@
  * under the License.
  */
 
-mod elasticsearch;
-mod iceberg;
-mod postgres;
-mod quickwit;
-mod wiremock;
+mod mongodb_sink;
 
-pub use elasticsearch::{ElasticsearchSinkFixture, 
ElasticsearchSourcePreCreatedFixture};
-pub use iceberg::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, 
IcebergPreCreatedFixture};
-pub use postgres::{
-    PostgresOps, PostgresSinkByteaFixture, PostgresSinkFixture, 
PostgresSinkJsonFixture,
-    PostgresSourceByteaFixture, PostgresSourceDeleteFixture, 
PostgresSourceJsonFixture,
-    PostgresSourceJsonbFixture, PostgresSourceMarkFixture, PostgresSourceOps,
-};
-pub use quickwit::{QuickwitFixture, QuickwitOps, QuickwitPreCreatedFixture};
-pub use wiremock::{WireMockDirectFixture, WireMockWrappedFixture};
+const TEST_MESSAGE_COUNT: usize = 3;
+const POLL_ATTEMPTS: usize = 100;
+const POLL_INTERVAL_MS: u64 = 50;
diff --git a/core/integration/tests/connectors/mongodb/mongodb_sink.rs 
b/core/integration/tests/connectors/mongodb/mongodb_sink.rs
new file mode 100644
index 000000000..0bf666a4b
--- /dev/null
+++ b/core/integration/tests/connectors/mongodb/mongodb_sink.rs
@@ -0,0 +1,1162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::{POLL_ATTEMPTS, POLL_INTERVAL_MS, TEST_MESSAGE_COUNT};
+use crate::connectors::fixtures::{
+    MongoDbOps, MongoDbSinkAutoCreateFixture, MongoDbSinkBatchFixture, 
MongoDbSinkFailpointFixture,
+    MongoDbSinkFixture, MongoDbSinkJsonFixture, MongoDbSinkWriteConcernFixture,
+};
+use bytes::Bytes;
+use iggy::prelude::{IggyMessage, Partitioning};
+use iggy_binary_protocol::MessageClient;
+use iggy_common::Identifier;
+use iggy_connector_sdk::api::{ConnectorRuntimeStats, ConnectorStatus, 
SinkInfoResponse};
+use integration::harness::seeds;
+use integration::iggy_harness;
+use mongodb::bson::{Document, doc};
+use reqwest::Client as HttpClient;
+use std::time::Duration;
+use tokio::time::sleep;
+
+const DEFAULT_TEST_DATABASE: &str = "iggy_test";
+const DEFAULT_SINK_COLLECTION: &str = "iggy_messages";
+const LARGE_BATCH_COUNT: usize = 50;
+const MONGODB_SINK_KEY: &str = "mongodb";
+
+fn build_expected_document_id(message_id: u128) -> String {
+    build_expected_document_id_for_topic(seeds::names::TOPIC, message_id)
+}
+
+fn build_expected_document_id_for_topic(topic_name: &str, message_id: u128) -> 
String {
+    format!("{}:{}:{}:{message_id}", seeds::names::STREAM, topic_name, 0)
+}
+
+#[iggy_harness(
+    server(connectors_runtime(config_path = 
"tests/connectors/mongodb/sink.toml")),
+    seed = seeds::connector_stream
+)]
+async fn json_messages_sink_to_mongodb(harness: &TestHarness, fixture: 
MongoDbSinkJsonFixture) {
+    let client = harness.root_client().await.unwrap();
+    let mongo_client = fixture
+        .create_client()
+        .await
+        .expect("Failed to create MongoDB client");
+
+    let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+    let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+
+    let json_payloads: Vec<serde_json::Value> = vec![
+        serde_json::json!({"name": "Alice", "age": 30}),
+        serde_json::json!({"name": "Bob", "score": 99}),
+        serde_json::json!({"name": "Carol", "active": true}),
+    ];
+
+    let mut messages: Vec<IggyMessage> = json_payloads
+        .iter()
+        .enumerate()
+        .map(|(i, payload)| {
+            let bytes = serde_json::to_vec(payload).expect("Failed to 
serialize");
+            IggyMessage::builder()
+                .id((i + 1) as u128)
+                .payload(Bytes::from(bytes))
+                .build()
+                .expect("Failed to build message")
+        })
+        .collect();
+
+    client
+        .send_messages(
+            &stream_id,
+            &topic_id,
+            &Partitioning::partition_id(0),
+            &mut messages,
+        )
+        .await
+        .expect("Failed to send messages");
+
+    // Wait for connector to consume and insert into MongoDB.
+    let docs = fixture
+        .wait_for_documents(&mongo_client, DEFAULT_SINK_COLLECTION, 
TEST_MESSAGE_COUNT)
+        .await
+        .expect("Documents did not appear in MongoDB");
+
+    assert_eq!(docs.len(), TEST_MESSAGE_COUNT);
+
+    // Verify metadata fields are present on first document.
+    let first = &docs[0];
+    assert!(
+        first.contains_key("iggy_offset"),
+        "Expected iggy_offset field"
+    );
+    assert!(
+        first.contains_key("iggy_stream"),
+        "Expected iggy_stream field"
+    );
+    assert!(
+        first.contains_key("iggy_topic"),
+        "Expected iggy_topic field"
+    );
+    assert!(
+        first.contains_key("iggy_partition_id"),
+        "Expected iggy_partition_id field"
+    );
+    assert!(
+        first.contains_key("iggy_timestamp"),
+        "Expected iggy_timestamp field"
+    );
+
+    // Verify offset sequence is contiguous.
+    for (i, doc) in docs.iter().enumerate() {
+        let offset = doc.get_i64("iggy_offset").expect("iggy_offset missing");
+        assert_eq!(offset, i as i64, "Offset mismatch at document {i}");
+    }
+
+    // Verify payload is stored as a BSON Document (queryable) not Binary.
+    let payload = first.get("payload").expect("payload field missing");
+    assert!(
+        matches!(payload, mongodb::bson::Bson::Document(_)),
+        "Expected payload to be BSON Document for json format, got: 
{payload:?}"
+    );
+}
+
+#[iggy_harness(
+    server(connectors_runtime(config_path = 
"tests/connectors/mongodb/sink.toml")),
+    seed = seeds::connector_stream
+)]
+async fn binary_messages_sink_as_bson_binary(harness: &TestHarness, fixture: 
MongoDbSinkFixture) {
+    let client = harness.root_client().await.unwrap();
+    let mongo_client = fixture
+        .create_client()
+        .await
+        .expect("Failed to create MongoDB client");
+
+    let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+    let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+
+    let raw_payloads: Vec<Vec<u8>> = vec![
+        b"plain text message".to_vec(),
+        vec![0x00, 0x01, 0x02, 0xFF, 0xFE, 0xFD],
+        vec![0xDE, 0xAD, 0xBE, 0xEF],
+    ];
+
+    let mut messages: Vec<IggyMessage> = raw_payloads
+        .iter()
+        .enumerate()
+        .map(|(i, payload)| {
+            IggyMessage::builder()
+                .id((i + 1) as u128)
+                .payload(Bytes::from(payload.clone()))
+                .build()
+                .expect("Failed to build message")
+        })
+        .collect();
+
+    client
+        .send_messages(
+            &stream_id,
+            &topic_id,
+            &Partitioning::partition_id(0),
+            &mut messages,
+        )
+        .await
+        .expect("Failed to send messages");
+
+    let docs = fixture
+        .wait_for_documents(&mongo_client, DEFAULT_SINK_COLLECTION, 
raw_payloads.len())
+        .await
+        .expect("Documents did not appear");
+
+    assert_eq!(docs.len(), raw_payloads.len());
+
+    for (i, doc) in docs.iter().enumerate() {
+        let payload = doc.get("payload").expect("payload field missing");
+        match payload {
+            mongodb::bson::Bson::Binary(bin) => {
+                assert_eq!(
+                    bin.subtype,
+                    mongodb::bson::spec::BinarySubtype::Generic,
+                    "Expected Generic subtype at doc {i}"
+                );
+                assert_eq!(
+                    bin.bytes, raw_payloads[i],
+                    "Payload bytes mismatch at doc {i}"
+                );
+            }
+            other => panic!("Expected Binary, got {other:?} at doc {i}"),
+        }
+    }
+}
+
+#[iggy_harness(
+    server(connectors_runtime(config_path = 
"tests/connectors/mongodb/sink.toml")),
+    seed = seeds::connector_stream
+)]
+async fn large_batch_processed_correctly(harness: &TestHarness, fixture: 
MongoDbSinkBatchFixture) {
+    let client = harness.root_client().await.unwrap();
+    let mongo_client = fixture
+        .create_client()
+        .await
+        .expect("Failed to create MongoDB client");
+
+    let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+    let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+
+    let mut messages: Vec<IggyMessage> = (0..LARGE_BATCH_COUNT)
+        .map(|i| {
+            let payload =
+                serde_json::to_vec(&serde_json::json!({"idx": 
i})).expect("Failed to serialize");
+            IggyMessage::builder()
+                .id((i + 1) as u128)
+                .payload(Bytes::from(payload))
+                .build()
+                .expect("Failed to build message")
+        })
+        .collect();
+
+    client
+        .send_messages(
+            &stream_id,
+            &topic_id,
+            &Partitioning::partition_id(0),
+            &mut messages,
+        )
+        .await
+        .expect("Failed to send messages");
+
+    let docs = fixture
+        .wait_for_documents(&mongo_client, DEFAULT_SINK_COLLECTION, 
LARGE_BATCH_COUNT)
+        .await
+        .expect("Not all documents appeared");
+
+    assert!(
+        docs.len() >= LARGE_BATCH_COUNT,
+        "Expected at least {LARGE_BATCH_COUNT} documents, got {}",
+        docs.len()
+    );
+
+    // Verify offsets are contiguous (0..N).
+    for (i, doc) in docs.iter().enumerate() {
+        let offset = doc.get_i64("iggy_offset").expect("iggy_offset missing");
+        assert_eq!(offset, i as i64, "Offset gap detected at position {i}");
+    }
+}
+
+#[iggy_harness(
+    server(connectors_runtime(config_path = 
"tests/connectors/mongodb/sink.toml")),
+    seed = seeds::connector_stream
+)]
+async fn duplicate_key_is_explicit_failure_and_not_silent_success(
+    harness: &TestHarness,
+    fixture: MongoDbSinkFixture,
+) {
+    let client = harness.root_client().await.unwrap();
+    let api_address = harness
+        .connectors_runtime()
+        .expect("connector runtime should be available")
+        .http_url();
+    let http_client = HttpClient::new();
+    let mongo_client = fixture
+        .create_client()
+        .await
+        .expect("Failed to create MongoDB client");
+
+    let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+    let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+
+    let collection = mongo_client
+        .database(DEFAULT_TEST_DATABASE)
+        .collection::<Document>(DEFAULT_SINK_COLLECTION);
+
+    let preseeded_document = doc! {
+        "_id": build_expected_document_id(2),
+        "seed_marker": "preseed-unchanged",
+        "payload": "preseeded"
+    };
+
+    collection
+        .insert_one(preseeded_document)
+        .await
+        .expect("Failed to pre-seed duplicate _id document");
+
+    // Create deterministic insert set:
+    // - "1" should be inserted
+    // - "2" collides with pre-seeded _id
+    // - "3" should be inserted because unordered insert_many continues past 
duplicates.
+    let mut messages: Vec<IggyMessage> = vec![
+        IggyMessage::builder()
+            .id(1)
+            .payload(Bytes::from_static(b"message-1"))
+            .build()
+            .expect("Failed to build message 1"),
+        IggyMessage::builder()
+            .id(2)
+            .payload(Bytes::from_static(b"message-2"))
+            .build()
+            .expect("Failed to build message 2"),
+        IggyMessage::builder()
+            .id(3)
+            .payload(Bytes::from_static(b"message-3"))
+            .build()
+            .expect("Failed to build message 3"),
+    ];
+
+    client
+        .send_messages(
+            &stream_id,
+            &topic_id,
+            &Partitioning::partition_id(0),
+            &mut messages,
+        )
+        .await
+        .expect("Failed to send messages");
+
+    // Wait until the sink processes the batch and at least the first 
non-conflicting message appears.
+    let mut first_message_inserted = false;
+    for _ in 0..POLL_ATTEMPTS {
+        if collection
+            .find_one(doc! { "_id": build_expected_document_id(1) })
+            .await
+            .expect("Failed to query _id=1")
+            .is_some()
+        {
+            first_message_inserted = true;
+            break;
+        }
+        sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
+    }
+
+    assert!(
+        first_message_inserted,
+        "Expected first non-conflicting message (_id=1) to be inserted"
+    );
+
+    let preseeded_after = collection
+        .find_one(doc! { "_id": build_expected_document_id(2) })
+        .await
+        .expect("Failed to query pre-seeded document")
+        .expect("Pre-seeded _id=2 document should still exist");
+
+    assert_eq!(
+        preseeded_after
+            .get_str("seed_marker")
+            .expect("seed_marker missing from pre-seeded document"),
+        "preseed-unchanged",
+        "Pre-seeded document must remain unchanged"
+    );
+
+    let duplicate_count = collection
+        .count_documents(doc! { "_id": build_expected_document_id(2) })
+        .await
+        .expect("Failed to count _id=2 documents");
+    assert_eq!(
+        duplicate_count, 1,
+        "Duplicate _id must not create extra documents"
+    );
+
+    let mut sink_error_reported = false;
+    let mut processed_messages = u64::MAX;
+    let mut sink_errors = 0_u64;
+    let mut third_message_inserted = false;
+
+    for _ in 0..POLL_ATTEMPTS {
+        third_message_inserted = collection
+            .find_one(doc! { "_id": build_expected_document_id(3) })
+            .await
+            .expect("Failed to query _id=3")
+            .is_some();
+
+        let sinks_response = http_client
+            .get(format!("{}/sinks", api_address))
+            .send()
+            .await
+            .expect("Failed to get sinks");
+        assert_eq!(sinks_response.status(), 200);
+        let sinks: Vec<SinkInfoResponse> = 
sinks_response.json().await.expect("Invalid sinks JSON");
+        let sink = sinks
+            .iter()
+            .find(|s| s.key == MONGODB_SINK_KEY)
+            .expect("MongoDB sink not found in /sinks response");
+        sink_error_reported = sink.last_error.is_some();
+
+        let stats_response = http_client
+            .get(format!("{}/stats", api_address))
+            .send()
+            .await
+            .expect("Failed to get stats");
+        assert_eq!(stats_response.status(), 200);
+        let stats: ConnectorRuntimeStats = 
stats_response.json().await.expect("Invalid stats JSON");
+        let sink_stats = stats
+            .connectors
+            .iter()
+            .find(|c| c.key == MONGODB_SINK_KEY)
+            .expect("MongoDB sink stats not found");
+        processed_messages = sink_stats
+            .messages_processed
+            .expect("messages_processed should be present for sink stats");
+        sink_errors = sink_stats.errors;
+
+        if third_message_inserted
+            && !sink_error_reported
+            && sink_errors == 0
+            && processed_messages == 3
+        {
+            break;
+        }
+
+        sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
+    }
+
+    assert!(
+        third_message_inserted,
+        "Message after duplicate conflict should still be inserted with 
unordered insert_many"
+    );
+    assert!(
+        !sink_error_reported,
+        "Duplicate key collision should be idempotent"
+    );
+    assert_eq!(
+        sink_errors, 0,
+        "Idempotent duplicate handling should not count as sink error"
+    );
+    assert_eq!(
+        processed_messages, 3,
+        "messages_processed should reflect the successful runtime batch 
processing count"
+    );
+}
+
+#[iggy_harness(
+    server(connectors_runtime(config_path = 
"tests/connectors/mongodb/sink.toml")),
+    seed = seeds::connector_stream
+)]
+async fn ordered_duplicate_partial_insert_has_exact_accounting(
+    harness: &TestHarness,
+    fixture: MongoDbSinkFixture,
+) {
+    let client = harness.root_client().await.unwrap();
+    let api_address = harness
+        .connectors_runtime()
+        .expect("connector runtime should be available")
+        .http_url();
+    let http_client = HttpClient::new();
+    let mongo_client = fixture
+        .create_client()
+        .await
+        .expect("Failed to create MongoDB client");
+
+    let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+    let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+    let collection = mongo_client
+        .database(DEFAULT_TEST_DATABASE)
+        .collection::<Document>(DEFAULT_SINK_COLLECTION);
+
+    collection
+        .insert_one(doc! {
+            "_id": build_expected_document_id(2),
+            "seed_marker": "preseeded-stable",
+            "payload": "preseeded"
+        })
+        .await
+        .expect("Failed to pre-seed duplicate key baseline");
+
+    let mut messages: Vec<IggyMessage> = vec![
+        IggyMessage::builder()
+            .id(1)
+            .payload(Bytes::from_static(b"ordered-accounting-1"))
+            .build()
+            .expect("Failed to build message 1"),
+        IggyMessage::builder()
+            .id(2)
+            .payload(Bytes::from_static(b"ordered-accounting-2"))
+            .build()
+            .expect("Failed to build message 2"),
+        IggyMessage::builder()
+            .id(3)
+            .payload(Bytes::from_static(b"ordered-accounting-3"))
+            .build()
+            .expect("Failed to build message 3"),
+    ];
+
+    client
+        .send_messages(
+            &stream_id,
+            &topic_id,
+            &Partitioning::partition_id(0),
+            &mut messages,
+        )
+        .await
+        .expect("Failed to send ordered duplicate batch");
+
+    let mut id1_inserted = false;
+    let mut id3_inserted = false;
+    let mut sink_last_error = false;
+    let mut sink_status = ConnectorStatus::Stopped;
+    let mut processed_messages = u64::MAX;
+    let mut sink_errors = 0_u64;
+
+    for _ in 0..POLL_ATTEMPTS {
+        id1_inserted = collection
+            .find_one(doc! { "_id": build_expected_document_id(1) })
+            .await
+            .expect("Failed to query _id=1")
+            .is_some();
+        id3_inserted = collection
+            .find_one(doc! { "_id": build_expected_document_id(3) })
+            .await
+            .expect("Failed to query _id=3")
+            .is_some();
+
+        let sinks_response = http_client
+            .get(format!("{}/sinks", api_address))
+            .send()
+            .await
+            .expect("Failed to fetch /sinks");
+        assert_eq!(sinks_response.status(), 200);
+        let sinks: Vec<SinkInfoResponse> = 
sinks_response.json().await.expect("Invalid sinks JSON");
+        let sink = sinks
+            .iter()
+            .find(|s| s.key == MONGODB_SINK_KEY)
+            .expect("MongoDB sink not found in /sinks");
+        sink_last_error = sink.last_error.is_some();
+        sink_status = sink.status;
+
+        let stats_response = http_client
+            .get(format!("{}/stats", api_address))
+            .send()
+            .await
+            .expect("Failed to fetch /stats");
+        assert_eq!(stats_response.status(), 200);
+        let stats: ConnectorRuntimeStats = 
stats_response.json().await.expect("Invalid stats JSON");
+        let sink_stats = stats
+            .connectors
+            .iter()
+            .find(|c| c.key == MONGODB_SINK_KEY)
+            .expect("MongoDB sink stats not found");
+        processed_messages = sink_stats
+            .messages_processed
+            .expect("messages_processed must be present for sink stats");
+        sink_errors = sink_stats.errors;
+
+        if id1_inserted
+            && id3_inserted
+            && !sink_last_error
+            && sink_status == ConnectorStatus::Running
+        {
+            break;
+        }
+
+        sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
+    }
+
+    assert!(id1_inserted, "Prefix write (_id=1) must be inserted");
+    assert!(
+        id3_inserted,
+        "Suffix write (_id=3) should be inserted with unordered duplicate 
handling"
+    );
+
+    let preseeded_after = collection
+        .find_one(doc! { "_id": build_expected_document_id(2) })
+        .await
+        .expect("Failed to query pre-seeded _id=2")
+        .expect("Pre-seeded _id=2 document must remain");
+    assert_eq!(
+        preseeded_after
+            .get_str("seed_marker")
+            .expect("seed_marker missing on pre-seeded document"),
+        "preseeded-stable",
+        "Pre-seeded duplicate target must remain unchanged"
+    );
+
+    let total_docs = collection
+        .count_documents(doc! {})
+        .await
+        .expect("Failed to count total collection documents");
+    assert_eq!(
+        total_docs, 3,
+        "Expected exact collection accounting (preseed + two successful 
inserts)"
+    );
+
+    let duplicate_docs = collection
+        .count_documents(doc! { "_id": build_expected_document_id(2) })
+        .await
+        .expect("Failed to count duplicate _id entries");
+    assert_eq!(duplicate_docs, 1, "Duplicate _id must remain single");
+
+    assert!(
+        !sink_last_error,
+        "Duplicate collision should not produce sink last_error"
+    );
+    assert_eq!(
+        sink_status,
+        ConnectorStatus::Running,
+        "Sink should remain Running on idempotent duplicate handling"
+    );
+    assert_eq!(
+        sink_errors, 0,
+        "Duplicate handling should not increment runtime errors"
+    );
+    assert_eq!(
+        processed_messages, 3,
+        "Idempotent duplicate handling should keep runtime batch processed 
count"
+    );
+}
+
+#[iggy_harness(
+    server(connectors_runtime(config_path = 
"tests/connectors/mongodb/sink.toml")),
+    seed = seeds::connector_stream
+)]
+async fn schema_validation_mid_batch_surfaces_hard_error_and_partial_prefix(
+    harness: &TestHarness,
+    fixture: MongoDbSinkFixture,
+) {
+    let client = harness.root_client().await.unwrap();
+    let mongo_client = fixture
+        .create_client()
+        .await
+        .expect("Failed to create MongoDB client");
+
+    let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+    let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+    let database = mongo_client.database(DEFAULT_TEST_DATABASE);
+    let collection = database.collection::<Document>(DEFAULT_SINK_COLLECTION);
+
+    // Enforce that only the first message in this batch is valid.
+    // iggy_offset=0 passes; iggy_offset>=1 fails validation.
+    database
+        .run_command(doc! {
+            "create": DEFAULT_SINK_COLLECTION,
+            "validator": {
+                "iggy_offset": { "$lt": 1 }
+            },
+            "validationLevel": "strict",
+            "validationAction": "error"
+        })
+        .await
+        .expect("Failed to create collection with schema validator");
+
+    let mut messages: Vec<IggyMessage> = vec![
+        IggyMessage::builder()
+            .id(11)
+            .payload(Bytes::from_static(b"schema-validation-1"))
+            .build()
+            .expect("Failed to build message 11"),
+        IggyMessage::builder()
+            .id(12)
+            .payload(Bytes::from_static(b"schema-validation-2"))
+            .build()
+            .expect("Failed to build message 12"),
+        IggyMessage::builder()
+            .id(13)
+            .payload(Bytes::from_static(b"schema-validation-3"))
+            .build()
+            .expect("Failed to build message 13"),
+    ];
+
+    client
+        .send_messages(
+            &stream_id,
+            &topic_id,
+            &Partitioning::partition_id(0),
+            &mut messages,
+        )
+        .await
+        .expect("Failed to send schema-validation batch");
+
+    let mut id11_inserted = false;
+    let mut id12_inserted = true;
+    let mut id13_inserted = true;
+
+    for _ in 0..POLL_ATTEMPTS {
+        id11_inserted = collection
+            .find_one(doc! { "_id": build_expected_document_id(11) })
+            .await
+            .expect("Failed to query _id=11")
+            .is_some();
+        id12_inserted = collection
+            .find_one(doc! { "_id": build_expected_document_id(12) })
+            .await
+            .expect("Failed to query _id=12")
+            .is_some();
+        id13_inserted = collection
+            .find_one(doc! { "_id": build_expected_document_id(13) })
+            .await
+            .expect("Failed to query _id=13")
+            .is_some();
+
+        if id11_inserted && !id12_inserted && !id13_inserted {
+            break;
+        }
+
+        sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
+    }
+
+    assert!(
+        id11_inserted,
+        "Expected prefix message (_id=11) to be inserted before schema 
validation failure"
+    );
+    assert!(
+        !id12_inserted,
+        "Expected mid-batch message (_id=12) to fail schema validation and not 
insert"
+    );
+    assert!(
+        !id13_inserted,
+        "Expected suffix message (_id=13) to be skipped after ordered schema 
validation failure"
+    );
+
+    let total_docs = collection
+        .count_documents(doc! {})
+        .await
+        .expect("Failed to count documents after schema validation test");
+    assert_eq!(
+        total_docs, 1,
+        "Expected exact prefix-only write accounting under schema validation 
failure"
+    );
+}
+
+#[iggy_harness(
+    server(connectors_runtime(config_path = 
"tests/connectors/mongodb/sink.toml")),
+    seed = seeds::connector_stream
+)]
+async fn write_concern_timeout_does_not_report_full_success(
+    harness: &TestHarness,
+    fixture: MongoDbSinkWriteConcernFixture,
+) {
+    let client = harness.root_client().await.unwrap();
+    let mongo_client = fixture
+        .create_client()
+        .await
+        .expect("Failed to create MongoDB client");
+
+    let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+    let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+    let collection = mongo_client
+        .database(DEFAULT_TEST_DATABASE)
+        .collection::<Document>(DEFAULT_SINK_COLLECTION);
+
+    let mut messages: Vec<IggyMessage> = vec![
+        IggyMessage::builder()
+            .id(101)
+            .payload(Bytes::from_static(b"wc-timeout-1"))
+            .build()
+            .expect("Failed to build message 101"),
+        IggyMessage::builder()
+            .id(102)
+            .payload(Bytes::from_static(b"wc-timeout-2"))
+            .build()
+            .expect("Failed to build message 102"),
+        IggyMessage::builder()
+            .id(103)
+            .payload(Bytes::from_static(b"wc-timeout-3"))
+            .build()
+            .expect("Failed to build message 103"),
+    ];
+
+    client
+        .send_messages(
+            &stream_id,
+            &topic_id,
+            &Partitioning::partition_id(0),
+            &mut messages,
+        )
+        .await
+        .expect("Failed to send write concern timeout batch");
+
+    let mut total_docs = 0_u64;
+    let mut id101_inserted = false;
+    let mut id102_inserted = false;
+    let mut id103_inserted = false;
+
+    for _ in 0..POLL_ATTEMPTS {
+        total_docs = collection
+            .count_documents(doc! {})
+            .await
+            .expect("Failed to count documents after write concern timeout");
+        id101_inserted = collection
+            .find_one(doc! { "_id": build_expected_document_id(101) })
+            .await
+            .expect("Failed to query _id=101")
+            .is_some();
+        id102_inserted = collection
+            .find_one(doc! { "_id": build_expected_document_id(102) })
+            .await
+            .expect("Failed to query _id=102")
+            .is_some();
+        id103_inserted = collection
+            .find_one(doc! { "_id": build_expected_document_id(103) })
+            .await
+            .expect("Failed to query _id=103")
+            .is_some();
+
+        if total_docs == 3 && id101_inserted && id102_inserted && 
id103_inserted {
+            break;
+        }
+
+        sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
+    }
+
+    assert_eq!(
+        total_docs, 3,
+        "write concern timeout should still leave exactly one persisted 
document per message"
+    );
+    assert!(
+        id101_inserted && id102_inserted && id103_inserted,
+        "write concern timeout should not lose the primary-side writes for 
this batch"
+    );
+}
+
+#[iggy_harness(
+    server(connectors_runtime(config_path = 
"tests/connectors/mongodb/sink.toml")),
+    seed = seeds::connector_stream
+)]
+async fn retryable_write_failover_keeps_single_doc_per_id(
+    harness: &TestHarness,
+    fixture: MongoDbSinkFailpointFixture,
+) {
+    let client = harness.root_client().await.unwrap();
+    let api_address = harness
+        .connectors_runtime()
+        .expect("connector runtime should be available")
+        .http_url();
+    let http_client = HttpClient::new();
+    let mongo_client = fixture
+        .create_client()
+        .await
+        .expect("Failed to create MongoDB client");
+
+    fixture
+        .configure_fail_command_once(
+            &mongo_client,
+            doc! {
+                "failCommands": ["insert"],
+                "closeConnection": true
+            },
+        )
+        .await
+        .expect("Failed to configure failpoint for retryable failover 
simulation");
+
+    let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+    let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+    let collection = mongo_client
+        .database(DEFAULT_TEST_DATABASE)
+        .collection::<Document>(DEFAULT_SINK_COLLECTION);
+
+    let mut messages: Vec<IggyMessage> = vec![
+        IggyMessage::builder()
+            .id(201)
+            .payload(Bytes::from_static(b"retryable-failover-1"))
+            .build()
+            .expect("Failed to build message 201"),
+        IggyMessage::builder()
+            .id(202)
+            .payload(Bytes::from_static(b"retryable-failover-2"))
+            .build()
+            .expect("Failed to build message 202"),
+        IggyMessage::builder()
+            .id(203)
+            .payload(Bytes::from_static(b"retryable-failover-3"))
+            .build()
+            .expect("Failed to build message 203"),
+    ];
+
+    client
+        .send_messages(
+            &stream_id,
+            &topic_id,
+            &Partitioning::partition_id(0),
+            &mut messages,
+        )
+        .await
+        .expect("Failed to send retryable-failover batch");
+
+    let mut sink_last_error = true;
+    let mut sink_status = ConnectorStatus::Stopped;
+    let mut processed_messages = 0_u64;
+    let mut sink_errors = u64::MAX;
+    let mut total_docs = 0_u64;
+
+    for _ in 0..POLL_ATTEMPTS {
+        total_docs = collection
+            .count_documents(doc! {})
+            .await
+            .expect("Failed to count documents after retryable failover");
+
+        let sinks_response = http_client
+            .get(format!("{}/sinks", api_address))
+            .send()
+            .await
+            .expect("Failed to fetch /sinks");
+        assert_eq!(sinks_response.status(), 200);
+        let sinks: Vec<SinkInfoResponse> = 
sinks_response.json().await.expect("Invalid sinks JSON");
+        let sink = sinks
+            .iter()
+            .find(|s| s.key == MONGODB_SINK_KEY)
+            .expect("MongoDB sink not found in /sinks");
+        sink_last_error = sink.last_error.is_some();
+        sink_status = sink.status;
+
+        let stats_response = http_client
+            .get(format!("{}/stats", api_address))
+            .send()
+            .await
+            .expect("Failed to fetch /stats");
+        assert_eq!(stats_response.status(), 200);
+        let stats: ConnectorRuntimeStats = 
stats_response.json().await.expect("Invalid stats JSON");
+        let sink_stats = stats
+            .connectors
+            .iter()
+            .find(|c| c.key == MONGODB_SINK_KEY)
+            .expect("MongoDB sink stats not found");
+        processed_messages = sink_stats
+            .messages_processed
+            .expect("messages_processed must be present for sink stats");
+        sink_errors = sink_stats.errors;
+
+        if total_docs == 3 && !sink_last_error && processed_messages == 3 && 
sink_errors == 0 {
+            break;
+        }
+
+        sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
+    }
+
+    let _ = fixture.disable_fail_command(&mongo_client).await;
+
+    assert_eq!(
+        total_docs, 3,
+        "expected exact document count after retryable failover retry"
+    );
+    assert!(
+        !sink_last_error,
+        "retryable failover path must recover without leaving sink in error 
state"
+    );
+    assert_eq!(
+        sink_status,
+        ConnectorStatus::Running,
+        "sink should remain Running after successful retryable failover 
recovery"
+    );
+    assert_eq!(
+        sink_errors, 0,
+        "retryable failover recovery should not increment runtime errors"
+    );
+    assert_eq!(
+        processed_messages, 3,
+        "successful retryable failover recovery must count full batch as 
processed"
+    );
+
+    for id in [201_u128, 202, 203] {
+        let count = collection
+            .count_documents(doc! { "_id": build_expected_document_id(id) })
+            .await
+            .expect("Failed to count per-id documents after retryable 
failover");
+        assert_eq!(count, 1, "Expected exactly one document for _id={id}");
+    }
+}
+
+#[iggy_harness(
+    server(connectors_runtime(config_path = 
"tests/connectors/mongodb/sink.toml")),
+    seed = seeds::connector_stream
+)]
+async fn no_writes_performed_label_path_preserves_state_accuracy(
+    harness: &TestHarness,
+    fixture: MongoDbSinkFailpointFixture,
+) {
+    let client = harness.root_client().await.unwrap();
+    let api_address = harness
+        .connectors_runtime()
+        .expect("connector runtime should be available")
+        .http_url();
+    let http_client = HttpClient::new();
+    let mongo_client = fixture
+        .create_client()
+        .await
+        .expect("Failed to create MongoDB client");
+
+    fixture
+        .configure_fail_command_once(
+            &mongo_client,
+            doc! {
+                "failCommands": ["insert"],
+                "errorCode": 13,
+                "errorLabels": ["RetryableWriteError", "NoWritesPerformed"]
+            },
+        )
+        .await
+        .expect("Failed to configure failpoint with NoWritesPerformed label");
+
+    let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+    let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+    let collection = mongo_client
+        .database(DEFAULT_TEST_DATABASE)
+        .collection::<Document>(DEFAULT_SINK_COLLECTION);
+
+    let mut messages: Vec<IggyMessage> = vec![
+        IggyMessage::builder()
+            .id(301)
+            .payload(Bytes::from_static(b"no-writes-performed-1"))
+            .build()
+            .expect("Failed to build message 301"),
+        IggyMessage::builder()
+            .id(302)
+            .payload(Bytes::from_static(b"no-writes-performed-2"))
+            .build()
+            .expect("Failed to build message 302"),
+        IggyMessage::builder()
+            .id(303)
+            .payload(Bytes::from_static(b"no-writes-performed-3"))
+            .build()
+            .expect("Failed to build message 303"),
+    ];
+
+    client
+        .send_messages(
+            &stream_id,
+            &topic_id,
+            &Partitioning::partition_id(0),
+            &mut messages,
+        )
+        .await
+        .expect("Failed to send NoWritesPerformed label batch");
+
+    let mut sink_last_error = true;
+    let mut sink_status = ConnectorStatus::Stopped;
+    let mut processed_messages = 0_u64;
+    let mut sink_errors = u64::MAX;
+    let mut total_docs = 0_u64;
+
+    for _ in 0..POLL_ATTEMPTS {
+        total_docs = collection
+            .count_documents(doc! {})
+            .await
+            .expect("Failed to count documents for NoWritesPerformed path");
+
+        let sinks_response = http_client
+            .get(format!("{}/sinks", api_address))
+            .send()
+            .await
+            .expect("Failed to fetch /sinks");
+        assert_eq!(sinks_response.status(), 200);
+        let sinks: Vec<SinkInfoResponse> = 
sinks_response.json().await.expect("Invalid sinks JSON");
+        let sink = sinks
+            .iter()
+            .find(|s| s.key == MONGODB_SINK_KEY)
+            .expect("MongoDB sink not found in /sinks");
+        sink_last_error = sink.last_error.is_some();
+        sink_status = sink.status;
+
+        let stats_response = http_client
+            .get(format!("{}/stats", api_address))
+            .send()
+            .await
+            .expect("Failed to fetch /stats");
+        assert_eq!(stats_response.status(), 200);
+        let stats: ConnectorRuntimeStats = 
stats_response.json().await.expect("Invalid stats JSON");
+        let sink_stats = stats
+            .connectors
+            .iter()
+            .find(|c| c.key == MONGODB_SINK_KEY)
+            .expect("MongoDB sink stats not found");
+        processed_messages = sink_stats
+            .messages_processed
+            .expect("messages_processed must be present for sink stats");
+        sink_errors = sink_stats.errors;
+
+        if total_docs == 3 && !sink_last_error && processed_messages == 3 && 
sink_errors == 0 {
+            break;
+        }
+
+        sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
+    }
+
+    let _ = fixture.disable_fail_command(&mongo_client).await;
+
+    assert_eq!(
+        total_docs, 3,
+        "NoWritesPerformed path should result in exactly one committed batch 
after retry"
+    );
+    assert!(
+        !sink_last_error,
+        "NoWritesPerformed retry path should not leave sink in error state"
+    );
+    assert_eq!(
+        sink_status,
+        ConnectorStatus::Running,
+        "sink should remain Running after NoWritesPerformed retry recovery"
+    );
+    assert_eq!(
+        sink_errors, 0,
+        "NoWritesPerformed retry recovery should not increment runtime errors"
+    );
+    assert_eq!(
+        processed_messages, 3,
+        "NoWritesPerformed retry recovery must count full batch as processed"
+    );
+
+    for id in [301_u128, 302, 303] {
+        let count = collection
+            .count_documents(doc! { "_id": build_expected_document_id(id) })
+            .await
+            .expect("Failed to count per-id documents for NoWritesPerformed 
path");
+        assert_eq!(count, 1, "Expected exactly one document for _id={id}");
+    }
+}
+
+#[iggy_harness(
+    server(connectors_runtime(config_path = 
"tests/connectors/mongodb/sink.toml")),
+    seed = seeds::connector_stream
+)]
+async fn auto_create_collection_on_open(
+    harness: &TestHarness,
+    fixture: MongoDbSinkAutoCreateFixture,
+) {
+    let mongo_client = fixture
+        .create_client()
+        .await
+        .expect("Failed to create MongoDB client");
+
+    // The connector's open() creates the collection. Poll until it appears.
+    // No messages are sent in this test.
+    let mut found = false;
+    for _ in 0..POLL_ATTEMPTS {
+        if fixture
+            .collection_exists(&mongo_client, DEFAULT_SINK_COLLECTION)
+            .await
+            .unwrap_or(false)
+        {
+            found = true;
+            break;
+        }
+        sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
+    }
+
+    assert!(
+        found,
+        "Collection '{DEFAULT_SINK_COLLECTION}' was not created by open() 
within timeout"
+    );
+
+    // No messages sent -- collection should be empty.
+    let count = fixture
+        .count_documents_in_collection(&mongo_client, DEFAULT_SINK_COLLECTION)
+        .await
+        .expect("Failed to count");
+    assert_eq!(
+        count, 0,
+        "Collection should be empty after open() with no messages"
+    );
+
+    // Suppress unused harness warning.
+    let _ = harness;
+}
diff --git a/core/integration/tests/connectors/mongodb/sink.toml 
b/core/integration/tests/connectors/mongodb/sink.toml
new file mode 100644
index 000000000..339ab38a6
--- /dev/null
+++ b/core/integration/tests/connectors/mongodb/sink.toml
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[connectors]
+config_type = "local"
+config_dir = "../connectors/sinks/mongodb_sink"

Reply via email to