This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 88407ae2c67 branch-4.0: [chore](third party) bump librdkafka from
1.9.2 to 2.11.0 for Kafka higher version compatibility (#57565) (#58844)
88407ae2c67 is described below
commit 88407ae2c670980915f963360bb5a0cfefc3d395
Author: hui lai <[email protected]>
AuthorDate: Wed Dec 24 14:22:31 2025 +0800
branch-4.0: [chore](third party) bump librdkafka from 1.9.2 to 2.11.0 for
Kafka higher version compatibility (#57565) (#58844)
pick #57565
### What problem does this PR solve?
Can not use routine load to consumer data from Kafka 4.1.0 with SASL_SSL
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
dist/LICENSE-dist.txt | 2 +-
thirdparty/CHANGELOG.md | 4 +
thirdparty/download-thirdparty.sh | 4 +-
thirdparty/patches/librdkafka-1.9.2.patch | 189 -----------------------------
thirdparty/patches/librdkafka-2.11.0.patch | 76 ++++++++++++
thirdparty/vars.sh | 10 +-
6 files changed, 88 insertions(+), 197 deletions(-)
diff --git a/dist/LICENSE-dist.txt b/dist/LICENSE-dist.txt
index 03579a95472..0aabefe4df5 100644
--- a/dist/LICENSE-dist.txt
+++ b/dist/LICENSE-dist.txt
@@ -1508,7 +1508,7 @@ Other dependencies:
* unixodbc: 2.3.7 -- licenses/LICENSE-unixodbc.txt
* leveldb: 1.23 -- licenses/LICENSE-leveldb.txt
* cyrus-sasl: 2.1.27 -- licenses/LICENSE-cyrus-sasl.txt
- * librdkafka: 1.8.2 -- licenses/LICENSE-librdkafka.txt
+ * librdkafka: 2.11.0 -- licenses/LICENSE-librdkafka.txt
* zstd: 1.5.2 -- licenses/LICENSE-zstd.txt
* brotli: 1.0.9 -- licenses/LICENSE-brotli.txt
* bitshuffle: 0.5.1 -- licenses/LICENSE-bigshuffle.txt
diff --git a/thirdparty/CHANGELOG.md b/thirdparty/CHANGELOG.md
index adffe708362..e4ebec46004 100644
--- a/thirdparty/CHANGELOG.md
+++ b/thirdparty/CHANGELOG.md
@@ -2,6 +2,10 @@
This file contains version of the third-party dependency libraries in the
build-env image. The docker build-env image is apache/doris, and the tag is
`build-env-${version}`
+## 20251031
+
+- Modified: librdkafka 1.9.2 -> 2.11.0
+
## 20251021
- Modified: gtest 1.11.0 -> 1.12.1
diff --git a/thirdparty/download-thirdparty.sh
b/thirdparty/download-thirdparty.sh
index 4444b6a72c4..4ce27e02349 100755
--- a/thirdparty/download-thirdparty.sh
+++ b/thirdparty/download-thirdparty.sh
@@ -344,10 +344,10 @@ fi
# patch librdkafka to avoid crash
if [[ " ${TP_ARCHIVES[*]} " =~ " LIBRDKAFKA " ]]; then
- if [[ "${LIBRDKAFKA_SOURCE}" == "librdkafka-1.9.2" ]]; then
+ if [[ "${LIBRDKAFKA_SOURCE}" == "librdkafka-2.11.0" ]]; then
cd "${TP_SOURCE_DIR}/${LIBRDKAFKA_SOURCE}"
if [[ ! -f "${PATCHED_MARK}" ]]; then
- patch -p0 <"${TP_PATCH_DIR}/librdkafka-1.9.2.patch"
+ patch -p0 <"${TP_PATCH_DIR}/librdkafka-2.11.0.patch"
touch "${PATCHED_MARK}"
fi
cd -
diff --git a/thirdparty/patches/librdkafka-1.9.2.patch
b/thirdparty/patches/librdkafka-1.9.2.patch
deleted file mode 100644
index 3caac08f79d..00000000000
--- a/thirdparty/patches/librdkafka-1.9.2.patch
+++ /dev/null
@@ -1,189 +0,0 @@
---- lds-gen.py
-+++ lds-gen.py
-@@ -58,7 +58,7 @@ if __name__ == '__main__':
-
- # Special symbols not covered by above matches or not exposed in
- # the public header files.
-- funcs.append('rd_ut_coverage_check')
-+ # funcs.append('rd_ut_coverage_check')
-
- print('# Automatically generated by lds-gen.py - DO NOT EDIT')
- print('{\n global:')
---- mklove/modules/configure.base
-+++ mklove/modules/configure.base
-@@ -1741,7 +1741,7 @@ function mkl_pkg_config_check {
- mkl_check_begin "$cname" "$2" "no-cache" "$1 (by pkg-config)" && return $?
-
- local cflags=
-- local cmd="${PKG_CONFIG} --short-errors --cflags $libname"
-+ local cmd="${PKG_CONFIG} --static --short-errors --cflags $libname"
- mkl_dbg "pkg-config check $libname for CFLAGS ($2): $cmd"
-
- cflags=$($cmd 2>&1)
-@@ -1764,11 +1764,11 @@ $cflags"
- fi
-
- local libs=
-- cmd="${PKG_CONFIG} --short-errors --libs $libname"
-+ cmd="${PKG_CONFIG} --static --short-errors --libs $libname"
- mkl_dbg "pkg-config check $libname for LIBS ($2): $cmd"
- libs=$($cmd 2>&1)
- if [[ $? != 0 ]]; then
-- mkl_dbg "${PKG_CONFIG} --libs $libname failed: $libs"
-+ mkl_dbg "${PKG_CONFIG} --static --libs $libname failed: $libs"
- # Clear define name ($2): caller may have additional checks
- mkl_check_failed "$cname" "" "$3" "pkg-config --libs failed"
- return 1
---- src/rdkafka.c
-+++ src/rdkafka.c
-@@ -3510,6 +3510,7 @@ rd_kafka_resp_err_t
rd_kafka_query_watermark_offsets(rd_kafka_t *rk,
- struct rd_kafka_partition_leader *leader;
- rd_list_t leaders;
- rd_kafka_resp_err_t err;
-+ int tmout;
-
- partitions = rd_kafka_topic_partition_list_new(1);
- rktpar =
-@@ -3556,11 +3557,15 @@ rd_kafka_resp_err_t
rd_kafka_query_watermark_offsets(rd_kafka_t *rk,
- rd_list_destroy(&leaders);
-
- /* Wait for reply (or timeout) */
-- while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS &&
-- rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK,
-- rd_kafka_poll_cb,
-- NULL) != RD_KAFKA_OP_RES_YIELD)
-- ;
-+ while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
-+ tmout = rd_timeout_remains(ts_end);
-+ if (rd_timeout_expired(tmout)) {
-+ state.err = RD_KAFKA_RESP_ERR__TIMED_OUT;
-+ break;
-+ }
-+ rd_kafka_q_serve(rkq, tmout, 0, RD_KAFKA_Q_CB_CALLBACK,
-+ rd_kafka_poll_cb, NULL);
-+ }
-
- rd_kafka_q_destroy_owner(rkq);
-
---- src/rdkafka_broker.c
-+++ src/rdkafka_broker.c
-@@ -3288,6 +3288,11 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb,
rd_kafka_op_t *rko) {
- : (topic_err
- ? topic_err
- :
RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION));
-+
-+ if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER) {
-+
rd_kafka_toppar_purge_internal_fetch_queue_maybe(
-+ rktp);
-+ }
- }
-
- rd_kafka_toppar_unlock(rktp);
-@@ -5461,7 +5466,9 @@ static int rd_kafka_broker_thread_main(void *arg) {
- */
- void rd_kafka_broker_destroy_final(rd_kafka_broker_t *rkb) {
-
-- rd_assert(thrd_is_current(rkb->rkb_thread));
-+ // To avoid the error describe in
https://github.com/edenhill/librdkafka/issues/3608
-+ // comment this line to fix it temporarily.
-+ // rd_assert(thrd_is_current(rkb->rkb_thread));
- rd_assert(TAILQ_EMPTY(&rkb->rkb_monitors));
- rd_assert(TAILQ_EMPTY(&rkb->rkb_outbufs.rkbq_bufs));
- rd_assert(TAILQ_EMPTY(&rkb->rkb_waitresps.rkbq_bufs));
---- src/rdkafka_cgrp.c
-+++ src/rdkafka_cgrp.c
-@@ -2734,6 +2734,9 @@ static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t
*rkcg,
- rd_kafka_toppar_lock(rktp);
- rd_assert(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP);
- rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_CGRP;
-+
-+ rd_kafka_toppar_purge_internal_fetch_queue_maybe(rktp);
-+
- rd_kafka_toppar_unlock(rktp);
-
- rd_list_remove(&rkcg->rkcg_toppars, rktp);
---- src/rdkafka_partition.c
-+++ src/rdkafka_partition.c
-@@ -959,7 +959,71 @@ void rd_kafka_toppar_insert_msgq(rd_kafka_toppar_t *rktp,
- rd_kafka_toppar_unlock(rktp);
- }
-
-+/**
-+ * @brief Purge internal fetch queue if toppar is stopped
-+ * (RD_KAFKA_TOPPAR_FETCH_STOPPED) and removed from the cluster
-+ * (RD_KAFKA_TOPPAR_F_REMOVE). Will be called from different places as it's
-+ * removed starting from a metadata response and stopped from a rebalance or a
-+ * consumer close.
-+ *
-+ * @remark Avoids circular dependencies in from `rktp_fetchq` ops to the same
-+ * toppar that stop destroying a consumer.
-+ *
-+ * @locks rd_kafka_toppar_lock() MUST be held
-+ */
-+void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t
*rktp) {
-+ rd_kafka_q_t *rkq;
-+ rkq = rktp->rktp_fetchq;
-+ mtx_lock(&rkq->rkq_lock);
-+ if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE &&
-+ !rktp->rktp_fetchq->rkq_fwdq) {
-+ rd_kafka_op_t *rko;
-+ int cnt = 0, barrier_cnt = 0, message_cnt = 0, other_cnt = 0;
-+
-+ /* Partition is being removed from the cluster and it's
stopped,
-+ * so rktp->rktp_fetchq->rkq_fwdq is NULL.
-+ * Purge remaining operations in rktp->rktp_fetchq->rkq_q,
-+ * while holding lock, to avoid circular references */
-+ rko = TAILQ_FIRST(&rkq->rkq_q);
-+ while (rko) {
-+ if (rko->rko_type != RD_KAFKA_OP_BARRIER &&
-+ rko->rko_type != RD_KAFKA_OP_FETCH) {
-+ rd_kafka_log(
-+ rktp->rktp_rkt->rkt_rk, LOG_WARNING,
-+ "PARTDEL",
-+ "Purging toppar fetch queue buffer op"
-+ "with unexpected type: %s",
-+ rd_kafka_op2str(rko->rko_type));
-+ }
-+
-+ if (rko->rko_type == RD_KAFKA_OP_BARRIER)
-+ barrier_cnt++;
-+ else if (rko->rko_type == RD_KAFKA_OP_FETCH)
-+ message_cnt++;
-+ else
-+ other_cnt++;
-
-+ rko = TAILQ_NEXT(rko, rko_link);
-+ cnt++;
-+ }
-+
-+ if (cnt) {
-+ rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL",
-+ "Purge toppar fetch queue buffer "
-+ "containing %d op(s) "
-+ "(%d barrier(s), %d message(s), %d
other)"
-+ " to avoid "
-+ "circular references",
-+ cnt, barrier_cnt, message_cnt,
other_cnt);
-+ rd_kafka_q_purge0(rktp->rktp_fetchq, rd_false);
-+ } else {
-+ rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL",
-+ "Not purging toppar fetch queue buffer."
-+ " No ops present in the buffer.");
-+ }
-+ }
-+ mtx_unlock(&rkq->rkq_lock);
-+}
-
- /**
- * Helper method for purging queues when removing a toppar.
---- src/rdkafka_partition.h
-+++ src/rdkafka_partition.h
-@@ -541,6 +541,8 @@ void rd_kafka_toppar_offset_request(rd_kafka_toppar_t
*rktp,
- int64_t query_offset,
- int backoff_ms);
-
-+void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t
*rktp);
-+
- int rd_kafka_toppar_purge_queues(rd_kafka_toppar_t *rktp,
- int purge_flags,
- rd_bool_t include_xmit_msgq);
diff --git a/thirdparty/patches/librdkafka-2.11.0.patch
b/thirdparty/patches/librdkafka-2.11.0.patch
new file mode 100644
index 00000000000..21792174073
--- /dev/null
+++ b/thirdparty/patches/librdkafka-2.11.0.patch
@@ -0,0 +1,76 @@
+--- lds-gen.py
++++ lds-gen.py
+@@ -58,7 +58,7 @@ if __name__ == '__main__':
+
+ # Special symbols not covered by above matches or not exposed in
+ # the public header files.
+- funcs.append('rd_ut_coverage_check')
++ # funcs.append('rd_ut_coverage_check')
+
+ print('# Automatically generated by lds-gen.py - DO NOT EDIT')
+ print('{\n global:')
+--- mklove/modules/configure.base
++++ mklove/modules/configure.base
+@@ -1741,7 +1741,7 @@ function mkl_pkg_config_check {
+ mkl_check_begin "$cname" "$2" "no-cache" "$1 (by pkg-config)" && return $?
+
+ local cflags=
+- local cmd="${PKG_CONFIG} --short-errors --cflags $libname"
++ local cmd="${PKG_CONFIG} --static --short-errors --cflags $libname"
+ mkl_dbg "pkg-config check $libname for CFLAGS ($2): $cmd"
+
+ cflags=$($cmd 2>&1)
+@@ -1764,11 +1764,11 @@ $cflags"
+ fi
+
+ local libs=
+- cmd="${PKG_CONFIG} --short-errors --libs $libname"
++ cmd="${PKG_CONFIG} --static --short-errors --libs $libname"
+ mkl_dbg "pkg-config check $libname for LIBS ($2): $cmd"
+ libs=$($cmd 2>&1)
+ if [[ $? != 0 ]]; then
+- mkl_dbg "${PKG_CONFIG} --libs $libname failed: $libs"
++ mkl_dbg "${PKG_CONFIG} --static --libs $libname failed: $libs"
+ # Clear define name ($2): caller may have additional checks
+ mkl_check_failed "$cname" "" "$3" "pkg-config --libs failed"
+ return 1
+--- src/rdkafka.c
++++ src/rdkafka.c
+@@ -3750,6 +3750,7 @@ rd_kafka_resp_err_t
rd_kafka_query_watermark_offsets(rd_kafka_t *rk,
+ struct rd_kafka_partition_leader *leader;
+ rd_list_t leaders;
+ rd_kafka_resp_err_t err;
++ int tmout;
+
+ partitions = rd_kafka_topic_partition_list_new(1);
+ rktpar =
+@@ -3796,9 +3797,13 @@ rd_kafka_resp_err_t
rd_kafka_query_watermark_offsets(rd_kafka_t *rk,
+
+ /* Wait for reply (or timeout) */
+ while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
+- rd_kafka_q_serve(rkq, RD_POLL_INFINITE, 0,
+- RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb,
+- NULL);
++ tmout = rd_timeout_remains(ts_end);
++ if (rd_timeout_expired(tmout)) {
++ state.err = RD_KAFKA_RESP_ERR__TIMED_OUT;
++ break;
++ }
++ rd_kafka_q_serve(rkq, tmout, 0, RD_KAFKA_Q_CB_CALLBACK,
++ rd_kafka_poll_cb, NULL);
+ }
+
+ rd_kafka_q_destroy_owner(rkq);
+--- src/rdkafka_broker.c
++++ src/rdkafka_broker.c
+@@ -4759,7 +4759,9 @@ static int rd_kafka_broker_thread_main(void *arg) {
+ */
+ void rd_kafka_broker_destroy_final(rd_kafka_broker_t *rkb) {
+
+- rd_assert(thrd_is_current(rkb->rkb_thread));
++ // To avoid the error describe in
https://github.com/edenhill/librdkafka/issues/3608
++ // comment this line to fix it temporarily.
++ // rd_assert(thrd_is_current(rkb->rkb_thread));
+ rd_assert(TAILQ_EMPTY(&rkb->rkb_monitors));
+ rd_assert(TAILQ_EMPTY(&rkb->rkb_outbufs.rkbq_bufs));
+ rd_assert(TAILQ_EMPTY(&rkb->rkb_waitresps.rkbq_bufs));
diff --git a/thirdparty/vars.sh b/thirdparty/vars.sh
index a357506876e..b47e8e33f73 100644
--- a/thirdparty/vars.sh
+++ b/thirdparty/vars.sh
@@ -215,11 +215,11 @@ CYRUS_SASL_NAME=cyrus-sasl-2.1.27.tar.gz
CYRUS_SASL_SOURCE=cyrus-sasl-2.1.27
CYRUS_SASL_MD5SUM="a33820c66e0622222c5aefafa1581083"
-# librdkafka-1.9.2
-LIBRDKAFKA_DOWNLOAD="https://github.com/edenhill/librdkafka/archive/v1.9.2.tar.gz"
-LIBRDKAFKA_NAME=librdkafka-1.9.2.tar.gz
-LIBRDKAFKA_SOURCE=librdkafka-1.9.2
-LIBRDKAFKA_MD5SUM="fe9624e905abbf8324b0f6be520d9c24"
+# librdkafka-2.11.0
+LIBRDKAFKA_DOWNLOAD="https://github.com/confluentinc/librdkafka/archive/refs/tags/v2.11.0.tar.gz"
+LIBRDKAFKA_NAME=librdkafka-2.11.0.tar.gz
+LIBRDKAFKA_SOURCE=librdkafka-2.11.0
+LIBRDKAFKA_MD5SUM="bc611d0340e269abaa8886d42ff9c558"
# zstd
ZSTD_DOWNLOAD="https://github.com/facebook/zstd/releases/download/v1.5.7/zstd-1.5.7.tar.gz"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]