This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 88407ae2c67 branch-4.0: [chore](third party) bump librdkafka from 
1.9.2 to 2.11.0 for Kafka higher version compatibility (#57565) (#58844)
88407ae2c67 is described below

commit 88407ae2c670980915f963360bb5a0cfefc3d395
Author: hui lai <[email protected]>
AuthorDate: Wed Dec 24 14:22:31 2025 +0800

    branch-4.0: [chore](third party) bump librdkafka from 1.9.2 to 2.11.0 for 
Kafka higher version compatibility (#57565) (#58844)
    
    pick #57565
    
    ### What problem does this PR solve?
    
    Can not use routine load to consumer data from Kafka 4.1.0 with SASL_SSL
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 dist/LICENSE-dist.txt                      |   2 +-
 thirdparty/CHANGELOG.md                    |   4 +
 thirdparty/download-thirdparty.sh          |   4 +-
 thirdparty/patches/librdkafka-1.9.2.patch  | 189 -----------------------------
 thirdparty/patches/librdkafka-2.11.0.patch |  76 ++++++++++++
 thirdparty/vars.sh                         |  10 +-
 6 files changed, 88 insertions(+), 197 deletions(-)

diff --git a/dist/LICENSE-dist.txt b/dist/LICENSE-dist.txt
index 03579a95472..0aabefe4df5 100644
--- a/dist/LICENSE-dist.txt
+++ b/dist/LICENSE-dist.txt
@@ -1508,7 +1508,7 @@ Other dependencies:
     * unixodbc: 2.3.7 -- licenses/LICENSE-unixodbc.txt
     * leveldb: 1.23 -- licenses/LICENSE-leveldb.txt
     * cyrus-sasl: 2.1.27 -- licenses/LICENSE-cyrus-sasl.txt
-    * librdkafka: 1.8.2 -- licenses/LICENSE-librdkafka.txt
+    * librdkafka: 2.11.0 -- licenses/LICENSE-librdkafka.txt
     * zstd: 1.5.2 -- licenses/LICENSE-zstd.txt
     * brotli: 1.0.9 -- licenses/LICENSE-brotli.txt
     * bitshuffle: 0.5.1 -- licenses/LICENSE-bigshuffle.txt
diff --git a/thirdparty/CHANGELOG.md b/thirdparty/CHANGELOG.md
index adffe708362..e4ebec46004 100644
--- a/thirdparty/CHANGELOG.md
+++ b/thirdparty/CHANGELOG.md
@@ -2,6 +2,10 @@
 
 This file contains version of the third-party dependency libraries in the 
build-env image. The docker build-env image is apache/doris, and the tag is 
`build-env-${version}`
 
+## 20251031
+
+- Modified: librdkafka 1.9.2 -> 2.11.0
+
 ## 20251021
 
 - Modified: gtest 1.11.0 -> 1.12.1
diff --git a/thirdparty/download-thirdparty.sh 
b/thirdparty/download-thirdparty.sh
index 4444b6a72c4..4ce27e02349 100755
--- a/thirdparty/download-thirdparty.sh
+++ b/thirdparty/download-thirdparty.sh
@@ -344,10 +344,10 @@ fi
 
 # patch librdkafka to avoid crash
 if [[ " ${TP_ARCHIVES[*]} " =~ " LIBRDKAFKA " ]]; then
-    if [[ "${LIBRDKAFKA_SOURCE}" == "librdkafka-1.9.2" ]]; then
+    if [[ "${LIBRDKAFKA_SOURCE}" == "librdkafka-2.11.0" ]]; then
         cd "${TP_SOURCE_DIR}/${LIBRDKAFKA_SOURCE}"
         if [[ ! -f "${PATCHED_MARK}" ]]; then
-            patch -p0 <"${TP_PATCH_DIR}/librdkafka-1.9.2.patch"
+            patch -p0 <"${TP_PATCH_DIR}/librdkafka-2.11.0.patch"
             touch "${PATCHED_MARK}"
         fi
         cd -
diff --git a/thirdparty/patches/librdkafka-1.9.2.patch 
b/thirdparty/patches/librdkafka-1.9.2.patch
deleted file mode 100644
index 3caac08f79d..00000000000
--- a/thirdparty/patches/librdkafka-1.9.2.patch
+++ /dev/null
@@ -1,189 +0,0 @@
---- lds-gen.py
-+++ lds-gen.py
-@@ -58,7 +58,7 @@ if __name__ == '__main__':
- 
-     # Special symbols not covered by above matches or not exposed in
-     # the public header files.
--    funcs.append('rd_ut_coverage_check')
-+    # funcs.append('rd_ut_coverage_check')
- 
-     print('# Automatically generated by lds-gen.py - DO NOT EDIT')
-     print('{\n global:')
---- mklove/modules/configure.base
-+++ mklove/modules/configure.base
-@@ -1741,7 +1741,7 @@ function mkl_pkg_config_check {
-     mkl_check_begin "$cname" "$2" "no-cache" "$1 (by pkg-config)" && return $?
- 
-     local cflags=
--    local cmd="${PKG_CONFIG} --short-errors --cflags $libname"
-+    local cmd="${PKG_CONFIG} --static --short-errors --cflags $libname"
-     mkl_dbg "pkg-config check $libname for CFLAGS ($2): $cmd"
- 
-     cflags=$($cmd 2>&1)
-@@ -1764,11 +1764,11 @@ $cflags"
-     fi
- 
-     local libs=
--    cmd="${PKG_CONFIG} --short-errors --libs $libname"
-+    cmd="${PKG_CONFIG} --static --short-errors --libs $libname"
-     mkl_dbg "pkg-config check $libname for LIBS ($2): $cmd"
-     libs=$($cmd 2>&1)
-     if [[ $? != 0 ]]; then
--        mkl_dbg "${PKG_CONFIG} --libs $libname failed: $libs"
-+        mkl_dbg "${PKG_CONFIG} --static --libs $libname failed: $libs"
-         # Clear define name ($2): caller may have additional checks
-         mkl_check_failed "$cname" "" "$3" "pkg-config --libs failed"
-         return 1
---- src/rdkafka.c
-+++ src/rdkafka.c
-@@ -3510,6 +3510,7 @@ rd_kafka_resp_err_t 
rd_kafka_query_watermark_offsets(rd_kafka_t *rk,
-         struct rd_kafka_partition_leader *leader;
-         rd_list_t leaders;
-         rd_kafka_resp_err_t err;
-+        int tmout;
- 
-         partitions = rd_kafka_topic_partition_list_new(1);
-         rktpar =
-@@ -3556,11 +3557,15 @@ rd_kafka_resp_err_t 
rd_kafka_query_watermark_offsets(rd_kafka_t *rk,
-         rd_list_destroy(&leaders);
- 
-         /* Wait for reply (or timeout) */
--        while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS &&
--               rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK,
--                                rd_kafka_poll_cb,
--                                NULL) != RD_KAFKA_OP_RES_YIELD)
--                ;
-+        while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
-+                tmout = rd_timeout_remains(ts_end);
-+                if (rd_timeout_expired(tmout)) {
-+                        state.err = RD_KAFKA_RESP_ERR__TIMED_OUT;
-+                        break;
-+                }
-+                rd_kafka_q_serve(rkq, tmout, 0, RD_KAFKA_Q_CB_CALLBACK,
-+                                 rd_kafka_poll_cb, NULL);
-+        }
- 
-         rd_kafka_q_destroy_owner(rkq);
- 
---- src/rdkafka_broker.c
-+++ src/rdkafka_broker.c
-@@ -3288,6 +3288,11 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb, 
rd_kafka_op_t *rko) {
-                                 : (topic_err
-                                        ? topic_err
-                                        : 
RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION));
-+
-+                        if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER) {
-+                                
rd_kafka_toppar_purge_internal_fetch_queue_maybe(
-+                                    rktp);
-+                        }
-                 }
- 
-                 rd_kafka_toppar_unlock(rktp);
-@@ -5461,7 +5466,9 @@ static int rd_kafka_broker_thread_main(void *arg) {
-  */
- void rd_kafka_broker_destroy_final(rd_kafka_broker_t *rkb) {
- 
--        rd_assert(thrd_is_current(rkb->rkb_thread));
-+        // To avoid the error describe in 
https://github.com/edenhill/librdkafka/issues/3608
-+        // comment this line to fix it temporarily.
-+        // rd_assert(thrd_is_current(rkb->rkb_thread));
-         rd_assert(TAILQ_EMPTY(&rkb->rkb_monitors));
-         rd_assert(TAILQ_EMPTY(&rkb->rkb_outbufs.rkbq_bufs));
-         rd_assert(TAILQ_EMPTY(&rkb->rkb_waitresps.rkbq_bufs));
---- src/rdkafka_cgrp.c
-+++ src/rdkafka_cgrp.c
-@@ -2734,6 +2734,9 @@ static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t 
*rkcg,
-         rd_kafka_toppar_lock(rktp);
-         rd_assert(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP);
-         rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_CGRP;
-+
-+        rd_kafka_toppar_purge_internal_fetch_queue_maybe(rktp);
-+
-         rd_kafka_toppar_unlock(rktp);
- 
-         rd_list_remove(&rkcg->rkcg_toppars, rktp);
---- src/rdkafka_partition.c
-+++ src/rdkafka_partition.c
-@@ -959,7 +959,71 @@ void rd_kafka_toppar_insert_msgq(rd_kafka_toppar_t *rktp,
-         rd_kafka_toppar_unlock(rktp);
- }
- 
-+/**
-+ * @brief Purge internal fetch queue if toppar is stopped
-+ * (RD_KAFKA_TOPPAR_FETCH_STOPPED) and removed from the cluster
-+ * (RD_KAFKA_TOPPAR_F_REMOVE). Will be called from different places as it's
-+ * removed starting from a metadata response and stopped from a rebalance or a
-+ * consumer close.
-+ *
-+ * @remark Avoids circular dependencies in from `rktp_fetchq` ops to the same
-+ * toppar that stop destroying a consumer.
-+ *
-+ * @locks rd_kafka_toppar_lock() MUST be held
-+ */
-+void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t 
*rktp) {
-+        rd_kafka_q_t *rkq;
-+        rkq = rktp->rktp_fetchq;
-+        mtx_lock(&rkq->rkq_lock);
-+        if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE &&
-+            !rktp->rktp_fetchq->rkq_fwdq) {
-+                rd_kafka_op_t *rko;
-+                int cnt = 0, barrier_cnt = 0, message_cnt = 0, other_cnt = 0;
-+
-+                /* Partition is being removed from the cluster and it's 
stopped,
-+                 * so rktp->rktp_fetchq->rkq_fwdq is NULL.
-+                 * Purge remaining operations in rktp->rktp_fetchq->rkq_q,
-+                 * while holding lock, to avoid circular references */
-+                rko = TAILQ_FIRST(&rkq->rkq_q);
-+                while (rko) {
-+                        if (rko->rko_type != RD_KAFKA_OP_BARRIER &&
-+                            rko->rko_type != RD_KAFKA_OP_FETCH) {
-+                                rd_kafka_log(
-+                                    rktp->rktp_rkt->rkt_rk, LOG_WARNING,
-+                                    "PARTDEL",
-+                                    "Purging toppar fetch queue buffer op"
-+                                    "with unexpected type: %s",
-+                                    rd_kafka_op2str(rko->rko_type));
-+                        }
-+
-+                        if (rko->rko_type == RD_KAFKA_OP_BARRIER)
-+                                barrier_cnt++;
-+                        else if (rko->rko_type == RD_KAFKA_OP_FETCH)
-+                                message_cnt++;
-+                        else
-+                                other_cnt++;
- 
-+                        rko = TAILQ_NEXT(rko, rko_link);
-+                        cnt++;
-+                }
-+
-+                if (cnt) {
-+                        rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL",
-+                                     "Purge toppar fetch queue buffer "
-+                                     "containing %d op(s) "
-+                                     "(%d barrier(s), %d message(s), %d 
other)"
-+                                     " to avoid "
-+                                     "circular references",
-+                                     cnt, barrier_cnt, message_cnt, 
other_cnt);
-+                        rd_kafka_q_purge0(rktp->rktp_fetchq, rd_false);
-+                } else {
-+                        rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL",
-+                                     "Not purging toppar fetch queue buffer."
-+                                     " No ops present in the buffer.");
-+                }
-+        }
-+        mtx_unlock(&rkq->rkq_lock);
-+}
- 
- /**
-  * Helper method for purging queues when removing a toppar.
---- src/rdkafka_partition.h
-+++ src/rdkafka_partition.h
-@@ -541,6 +541,8 @@ void rd_kafka_toppar_offset_request(rd_kafka_toppar_t 
*rktp,
-                                     int64_t query_offset,
-                                     int backoff_ms);
- 
-+void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t 
*rktp);
-+
- int rd_kafka_toppar_purge_queues(rd_kafka_toppar_t *rktp,
-                                  int purge_flags,
-                                  rd_bool_t include_xmit_msgq);
diff --git a/thirdparty/patches/librdkafka-2.11.0.patch 
b/thirdparty/patches/librdkafka-2.11.0.patch
new file mode 100644
index 00000000000..21792174073
--- /dev/null
+++ b/thirdparty/patches/librdkafka-2.11.0.patch
@@ -0,0 +1,76 @@
+--- lds-gen.py
++++ lds-gen.py
+@@ -58,7 +58,7 @@ if __name__ == '__main__':
+ 
+     # Special symbols not covered by above matches or not exposed in
+     # the public header files.
+-    funcs.append('rd_ut_coverage_check')
++    # funcs.append('rd_ut_coverage_check')
+ 
+     print('# Automatically generated by lds-gen.py - DO NOT EDIT')
+     print('{\n global:')
+--- mklove/modules/configure.base
++++ mklove/modules/configure.base
+@@ -1741,7 +1741,7 @@ function mkl_pkg_config_check {
+     mkl_check_begin "$cname" "$2" "no-cache" "$1 (by pkg-config)" && return $?
+ 
+     local cflags=
+-    local cmd="${PKG_CONFIG} --short-errors --cflags $libname"
++    local cmd="${PKG_CONFIG} --static --short-errors --cflags $libname"
+     mkl_dbg "pkg-config check $libname for CFLAGS ($2): $cmd"
+ 
+     cflags=$($cmd 2>&1)
+@@ -1764,11 +1764,11 @@ $cflags"
+     fi
+ 
+     local libs=
+-    cmd="${PKG_CONFIG} --short-errors --libs $libname"
++    cmd="${PKG_CONFIG} --static --short-errors --libs $libname"
+     mkl_dbg "pkg-config check $libname for LIBS ($2): $cmd"
+     libs=$($cmd 2>&1)
+     if [[ $? != 0 ]]; then
+-        mkl_dbg "${PKG_CONFIG} --libs $libname failed: $libs"
++        mkl_dbg "${PKG_CONFIG} --static --libs $libname failed: $libs"
+         # Clear define name ($2): caller may have additional checks
+         mkl_check_failed "$cname" "" "$3" "pkg-config --libs failed"
+         return 1
+--- src/rdkafka.c
++++ src/rdkafka.c
+@@ -3750,6 +3750,7 @@ rd_kafka_resp_err_t 
rd_kafka_query_watermark_offsets(rd_kafka_t *rk,
+         struct rd_kafka_partition_leader *leader;
+         rd_list_t leaders;
+         rd_kafka_resp_err_t err;
++        int tmout;
+ 
+         partitions = rd_kafka_topic_partition_list_new(1);
+         rktpar =
+@@ -3796,9 +3797,13 @@ rd_kafka_resp_err_t 
rd_kafka_query_watermark_offsets(rd_kafka_t *rk,
+ 
+         /* Wait for reply (or timeout) */
+         while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
+-                rd_kafka_q_serve(rkq, RD_POLL_INFINITE, 0,
+-                                 RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb,
+-                                 NULL);
++                tmout = rd_timeout_remains(ts_end);
++                if (rd_timeout_expired(tmout)) {
++                        state.err = RD_KAFKA_RESP_ERR__TIMED_OUT;
++                        break;
++                }
++                rd_kafka_q_serve(rkq, tmout, 0, RD_KAFKA_Q_CB_CALLBACK,
++                                 rd_kafka_poll_cb, NULL);
+         }
+ 
+         rd_kafka_q_destroy_owner(rkq);
+--- src/rdkafka_broker.c
++++ src/rdkafka_broker.c
+@@ -4759,7 +4759,9 @@ static int rd_kafka_broker_thread_main(void *arg) {
+  */
+ void rd_kafka_broker_destroy_final(rd_kafka_broker_t *rkb) {
+ 
+-        rd_assert(thrd_is_current(rkb->rkb_thread));
++        // To avoid the error describe in 
https://github.com/edenhill/librdkafka/issues/3608
++        // comment this line to fix it temporarily.
++        // rd_assert(thrd_is_current(rkb->rkb_thread));
+         rd_assert(TAILQ_EMPTY(&rkb->rkb_monitors));
+         rd_assert(TAILQ_EMPTY(&rkb->rkb_outbufs.rkbq_bufs));
+         rd_assert(TAILQ_EMPTY(&rkb->rkb_waitresps.rkbq_bufs));
diff --git a/thirdparty/vars.sh b/thirdparty/vars.sh
index a357506876e..b47e8e33f73 100644
--- a/thirdparty/vars.sh
+++ b/thirdparty/vars.sh
@@ -215,11 +215,11 @@ CYRUS_SASL_NAME=cyrus-sasl-2.1.27.tar.gz
 CYRUS_SASL_SOURCE=cyrus-sasl-2.1.27
 CYRUS_SASL_MD5SUM="a33820c66e0622222c5aefafa1581083"
 
-# librdkafka-1.9.2
-LIBRDKAFKA_DOWNLOAD="https://github.com/edenhill/librdkafka/archive/v1.9.2.tar.gz";
-LIBRDKAFKA_NAME=librdkafka-1.9.2.tar.gz
-LIBRDKAFKA_SOURCE=librdkafka-1.9.2
-LIBRDKAFKA_MD5SUM="fe9624e905abbf8324b0f6be520d9c24"
+# librdkafka-2.11.0
+LIBRDKAFKA_DOWNLOAD="https://github.com/confluentinc/librdkafka/archive/refs/tags/v2.11.0.tar.gz";
+LIBRDKAFKA_NAME=librdkafka-2.11.0.tar.gz
+LIBRDKAFKA_SOURCE=librdkafka-2.11.0
+LIBRDKAFKA_MD5SUM="bc611d0340e269abaa8886d42ff9c558"
 
 # zstd
 
ZSTD_DOWNLOAD="https://github.com/facebook/zstd/releases/download/v1.5.7/zstd-1.5.7.tar.gz";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to