This is an automated email from the ASF dual-hosted git repository.

zclllyybb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b44c051649 Add datasketches HLL sketch aggregate functions (#63143)
1b44c051649 is described below

commit 1b44c051649f185a8b3cfd6e2fd5c81ed1879083
Author: nooneuse <[email protected]>
AuthorDate: Sat May 30 18:36:41 2026 +0800

    Add datasketches HLL sketch aggregate functions (#63143)
    
    ### What problem does this PR solve?
    
    > An aggregate function is required to process user data containing
    Datasketches HLL sketches. In many data aggregation scenarios, users
    pre‑aggregate detailed data in Hive using the sketching techniques
    provided by Apache Datasketches, and then analyze the resulting sketches
    across various OLAP engines. Compared with the HLL union aggregate
    functions natively offered by these engines, there are two key diff to
    using Datasketches HLL sketches: firstly, the use cases differ; and
    secondly, HLL sketches can be used seamlessly across different
    engines—for example, simultaneously in ES, Doris, and ClickHouse. Such
    requirements are common in many production environments.
    
    Issue Number:
    - #63142(https://github.com/apache/doris/issues/63142)
    - #26416
    - #56246
    
    Summary:
    Implemented a built-in aggregate function that integrates the
    Datasketches HLL sketch. This aggregate function cannot rely on the Java
    UDF environment. Considering that in the Java UDF environment, Strings
    are encoded in UTF-8, which corrupts the binary data of sketches, the
    serialization/deserialization operations for sketches must be
    implemented on the BE side. (additionally, since Apache Datasketches has
    been added to the contrib directory via a git submodule, it will become
    very easy to add other sketches such as theta sketch in the future.)
    
    **see**: https://github.com/apache/doris/issues/63142
    **use case**: see regression test &
    https://github.com/apache/doris/issues/63142
    
    ---------
    
    Co-authored-by: yuanyuhao <[email protected]>
---
 .gitmodules                                        |    3 +
 ...gregate_function_datasketches_hll_union_agg.cpp |   44 +
 ...aggregate_function_datasketches_hll_union_agg.h |  243 +++++
 .../aggregate_function_simple_factory.cpp          |    3 +
 .../agg_datasketches_hll_union_agg_test.cpp        | 1097 ++++++++++++++++++++
 build.sh                                           |    9 +
 contrib/datasketches-cpp                           |    1 +
 .../doris/catalog/BuiltinAggregateFunctions.java   |    3 +
 .../functions/agg/DataSketchesHllUnionAgg.java     |  113 ++
 .../visitor/AggregateFunctionVisitor.java          |    5 +
 .../test_datasketches_hll_union_agg.out            |   28 +
 .../test_datasketches_hll_union_agg.groovy         |  170 +++
 run-be-ut.sh                                       |   14 +-
 13 files changed, 1732 insertions(+), 1 deletion(-)

diff --git a/.gitmodules b/.gitmodules
index 54c1a8a3636..eb8e703aa8a 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -29,3 +29,6 @@
        path = contrib/openblas
        url = https://github.com/apache/doris-thirdparty.git
        branch = openblas
+[submodule "contrib/datasketches-cpp"]
+       path = contrib/datasketches-cpp
+       url = https://github.com/apache/datasketches-cpp.git
diff --git 
a/be/src/exprs/aggregate/aggregate_function_datasketches_hll_union_agg.cpp 
b/be/src/exprs/aggregate/aggregate_function_datasketches_hll_union_agg.cpp
new file mode 100644
index 00000000000..c9b7013e7a9
--- /dev/null
+++ b/be/src/exprs/aggregate/aggregate_function_datasketches_hll_union_agg.cpp
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/aggregate/aggregate_function_datasketches_hll_union_agg.h"
+
+#include <string>
+
+#include "core/data_type/data_type.h"
+#include "core/data_type/define_primitive_type.h"
+#include "exec/common/hash_table/hash.h" // IWYU pragma: keep
+#include "exprs/aggregate/aggregate_function_simple_factory.h"
+#include "exprs/aggregate/helpers.h"
+namespace doris {
+template <template <PrimitiveType> class Data>
+AggregateFunctionPtr create_aggregate_function_datasketches_hll_union_agg(
+        const std::string& name, const DataTypes& argument_types, const 
DataTypePtr& result_type,
+        const bool result_is_nullable, const AggregateFunctionAttr& attr) {
+    return creator_with_type_list<TYPE_STRING, TYPE_VARCHAR, 
TYPE_VARBINARY>::create<
+            AggregateFunctionDataSketchesHllUnionAgg, Data>(argument_types, 
result_is_nullable,
+                                                            attr);
+}
+void register_aggregate_function_datasketches_HLL_union_agg(
+        AggregateFunctionSimpleFactory& factory) {
+    AggregateFunctionCreator creator =
+            
create_aggregate_function_datasketches_hll_union_agg<AggregateFunctionHllSketchData>;
+    factory.register_function_both("datasketches_hll_union_agg", creator);
+    factory.register_alias("datasketches_hll_union_agg", "ds_hll_estimate");
+    factory.register_alias("datasketches_hll_union_agg", 
"datasketches_hll_estimate");
+}
+} // namespace doris
diff --git 
a/be/src/exprs/aggregate/aggregate_function_datasketches_hll_union_agg.h 
b/be/src/exprs/aggregate/aggregate_function_datasketches_hll_union_agg.h
new file mode 100644
index 00000000000..d9f82f193e8
--- /dev/null
+++ b/be/src/exprs/aggregate/aggregate_function_datasketches_hll_union_agg.h
@@ -0,0 +1,243 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <stddef.h>
+
+#include <DataSketches/hll.hpp>
+#include <algorithm>
+#include <boost/iterator/iterator_facade.hpp>
+#include <memory>
+#include <optional>
+#include <type_traits>
+#include <vector>
+
+#include "common/compiler_util.h" // IWYU pragma: keep
+#include "core/assert_cast.h"
+#include "core/column/column.h"
+#include "core/column/column_varbinary.h"
+#include "core/column/column_vector.h"
+#include "core/custom_allocator.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/define_primitive_type.h"
+#include "core/field.h"
+#include "core/string_ref.h"
+#include "core/types.h"
+#include "core/uint128.h"
+#include "exec/common/hash_table/hash.h"
+#include "exec/common/hash_table/phmap_fwd_decl.h"
+#include "exprs/aggregate/aggregate_function.h"
+#include "util/var_int.h"
+template <typename T>
+struct HashCRC32;
+namespace doris {
+class Arena;
+class BufferReadable;
+class BufferWritable;
+template <PrimitiveType T>
+class ColumnDecimal;
+/// datasketches_hll_union_agg
+template <PrimitiveType T>
+struct AggregateFunctionHllSketchData {
+    /** We set the default LgK to 12,
+      * as this value is used as a performance baseline in the relevant 
documentation.
+      * (https://datasketches.apache.org/docs/HLL/HllPerformance.html)
+      */
+    static constexpr uint8_t DEFAULT_LOG_K = 12;
+    using Alloc = CustomStdAllocator<uint8_t>;
+    using Sketch = datasketches::hll_sketch_alloc<Alloc>;
+    using Union = datasketches::hll_union_alloc<Alloc>;
+
+    std::optional<Union> hll_union_data;
+
+    static String get_name() { return "datasketches_hll_union_agg"; }
+
+    void merge(const Sketch& sketch_data) {
+        if (!hll_union_data.has_value()) {
+            /** We clamp max lg_k to [7, 21],
+              * considering that the code comment requires 7 to 21.
+              * See: datasketches-cpp/hll/include/hll.hpp:451
+              */
+            constexpr uint8_t MIN_UNION_LOG_K = 7;
+            const uint8_t union_lg_k =
+                    std::clamp<uint8_t>(sketch_data.get_lg_config_k(), 
MIN_UNION_LOG_K,
+                                        
datasketches::hll_constants::MAX_LOG_K);
+            hll_union_data.emplace(union_lg_k, Alloc());
+        }
+        try {
+            hll_union_data->update(sketch_data);
+        } catch (const doris::Exception& e) {
+            throw Exception(e.code(), "Internal error happened when update HLL 
sketch: {}",
+                            e.to_string());
+        } catch (const std::exception& e) {
+            throw Exception(ErrorCode::INTERNAL_ERROR,
+                            "Internal error happened when update HLL sketch: 
{}", e.what());
+        } catch (...) {
+            throw Exception(ErrorCode::INTERNAL_ERROR,
+                            "Internal error happened when update HLL sketch: 
unknown exception.");
+        }
+    }
+    void reset() {
+        if (hll_union_data.has_value()) {
+            hll_union_data->reset();
+        }
+        hll_union_data.reset();
+    }
+
+    void write_sketch(BufferWritable& buf, const Sketch& sk) const {
+        auto serialized_bytes = sk.serialize_compact();
+        StringRef d(serialized_bytes.data(), serialized_bytes.size());
+        buf.write_binary(d);
+    }
+    void write(BufferWritable& buf) const {
+        if (!hll_union_data.has_value()) {
+            /** Using DEFAULT_LOG_K(12) here is surely sufficient,
+              * because in this case the union that actually needs to be 
serialized should contain no data.
+              */
+            Union u(DEFAULT_LOG_K, Alloc());
+            write_sketch(buf, u.get_result());
+            return;
+        }
+        try {
+            auto cache = hll_union_data->get_result();
+            write_sketch(buf, cache);
+        } catch (const doris::Exception& e) {
+            throw Exception(e.code(), "Internal error happened when serialize 
HLL sketch: {}",
+                            e.to_string());
+        } catch (const std::exception& e) {
+            throw Exception(ErrorCode::INTERNAL_ERROR,
+                            "Internal error happened when serialize HLL 
sketch: {}", e.what());
+        } catch (...) {
+            throw Exception(
+                    ErrorCode::INTERNAL_ERROR,
+                    "Internal error happened when serialize HLL sketch: 
unknown exception.");
+        }
+    }
+    void read(BufferReadable& buf) {
+        StringRef d;
+        buf.read_binary(d);
+
+        auto cache = [&]() -> Sketch {
+            try {
+                return Sketch::deserialize(d.data, d.size, Alloc());
+            } catch (const doris::Exception& e) {
+                throw Exception(e.code(), "Failed to deserialize HLL sketch 
when read: {}",
+                                e.to_string());
+            } catch (const std::exception& e) {
+                throw Exception(ErrorCode::CORRUPTION, "HLL sketch data 
corrupted when read: {}",
+                                e.what());
+            } catch (...) {
+                throw Exception(ErrorCode::CORRUPTION,
+                                "HLL sketch data corrupted when read: unknown 
exception.");
+            }
+        }();
+
+        merge(cache);
+    }
+    double get_result() const {
+        if (hll_union_data.has_value()) {
+            try {
+                return hll_union_data->get_estimate();
+            } catch (const doris::Exception& e) {
+                throw Exception(e.code(),
+                                "Internal error happened when get HLL sketch 
estimate: {}",
+                                e.to_string());
+            } catch (const std::exception& e) {
+                throw Exception(ErrorCode::INTERNAL_ERROR,
+                                "Internal error happened when get HLL sketch 
estimate: {}",
+                                e.what());
+            } catch (...) {
+                throw Exception(
+                        ErrorCode::INTERNAL_ERROR,
+                        "Internal error happened when get HLL sketch estimate: 
unknown exception.");
+            }
+        }
+        return 0.0;
+    }
+};
+
+/// Calculates the number of different values approximately using hll sketch.
+template <PrimitiveType T, typename Data>
+class AggregateFunctionDataSketchesHllUnionAgg final
+        : public IAggregateFunctionDataHelper<Data,
+                                              
AggregateFunctionDataSketchesHllUnionAgg<T, Data>>,
+          UnaryExpression,
+          NotNullableAggregateFunction {
+public:
+    AggregateFunctionDataSketchesHllUnionAgg(const DataTypes& argument_types_)
+            : IAggregateFunctionDataHelper<Data, 
AggregateFunctionDataSketchesHllUnionAgg<T, Data>>(
+                      argument_types_) {}
+    String get_name() const override { return Data::get_name(); }
+    DataTypePtr get_return_type() const override { return 
std::make_shared<DataTypeFloat64>(); }
+    void reset(AggregateDataPtr __restrict place) const override { 
this->data(place).reset(); }
+    void add(AggregateDataPtr __restrict place, const IColumn** columns, 
ssize_t row_num,
+             Arena&) const override {
+        add_one(this->data(place), *columns[0], row_num);
+    }
+    void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
+               Arena&) const override {
+        const auto& rhs_data = this->data(rhs);
+        if (!rhs_data.hll_union_data.has_value()) {
+            return;
+        }
+        
this->data(place).merge(rhs_data.hll_union_data->get_result(datasketches::HLL_8));
+    }
+    void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& 
buf) const override {
+        this->data(place).write(buf);
+    }
+    void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
+                     Arena&) const override {
+        this->data(place).read(buf);
+    }
+    void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& 
to) const override {
+        
assert_cast<ColumnFloat64&>(to).get_data().push_back(this->data(place).get_result());
+    }
+
+private:
+    static void ALWAYS_INLINE add_one(Data& data, const IColumn& column, 
ssize_t row_num) {
+        if constexpr (is_string_type(T) || is_varbinary(T)) {
+            const auto& src_column = assert_cast<const typename 
PrimitiveTypeTraits<T>::ColumnType&,
+                                                 
TypeCheckOnRelease::DISABLE>(column);
+            StringRef value = 
src_column.get_data_at(static_cast<size_t>(row_num));
+            if (value.empty()) {
+                throw Exception(ErrorCode::CORRUPTION,
+                                "HLL sketch data corrupted when add: empty 
input.");
+            }
+
+            using Sketch = typename Data::Sketch;
+            using Alloc = typename Data::Alloc;
+
+            auto sketch_data = [&]() -> Sketch {
+                try {
+                    return Sketch::deserialize(value.begin(), value.size, 
Alloc());
+                } catch (const doris::Exception& e) {
+                    throw Exception(e.code(), "Failed to deserialize HLL 
sketch when add: {}",
+                                    e.to_string());
+                } catch (const std::exception& e) {
+                    throw Exception(ErrorCode::CORRUPTION, "HLL sketch data 
corrupted when add: {}",
+                                    e.what());
+                } catch (...) {
+                    throw Exception(ErrorCode::CORRUPTION,
+                                    "HLL sketch data corrupted when add: 
unknown exception.");
+                }
+            }();
+
+            data.merge(sketch_data);
+        }
+    }
+};
+} // namespace doris
diff --git a/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp 
b/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp
index 7083539de6b..1bef94e03e8 100644
--- a/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp
+++ b/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp
@@ -37,6 +37,8 @@ void 
register_aggregate_function_avg(AggregateFunctionSimpleFactory& factory);
 void register_aggregate_function_count(AggregateFunctionSimpleFactory& 
factory);
 void register_aggregate_function_count_by_enum(AggregateFunctionSimpleFactory& 
factory);
 void register_aggregate_function_HLL_union_agg(AggregateFunctionSimpleFactory& 
factory);
+void register_aggregate_function_datasketches_HLL_union_agg(
+        AggregateFunctionSimpleFactory& factory);
 void register_aggregate_function_uniq(AggregateFunctionSimpleFactory& factory);
 void 
register_aggregate_function_uniq_distribute_key(AggregateFunctionSimpleFactory& 
factory);
 void register_aggregate_function_bit(AggregateFunctionSimpleFactory& factory);
@@ -127,6 +129,7 @@ AggregateFunctionSimpleFactory& 
AggregateFunctionSimpleFactory::instance() {
         register_aggregate_function_replace_reader_load(instance);
         register_aggregate_function_window_lead_lag_first_last(instance);
         register_aggregate_function_HLL_union_agg(instance);
+        register_aggregate_function_datasketches_HLL_union_agg(instance);
         register_aggregate_functions_corr(instance);
         register_aggregate_functions_corr_welford(instance);
         register_aggregate_function_covar_pop(instance);
diff --git a/be/test/exprs/aggregate/agg_datasketches_hll_union_agg_test.cpp 
b/be/test/exprs/aggregate/agg_datasketches_hll_union_agg_test.cpp
new file mode 100644
index 00000000000..7783ec89e2c
--- /dev/null
+++ b/be/test/exprs/aggregate/agg_datasketches_hll_union_agg_test.cpp
@@ -0,0 +1,1097 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <DataSketches/hll.hpp>
+
+#include "agent/be_exec_version_manager.h"
+#include "common/config.h"
+#include "common/status.h"
+#include "core/column/column.h"
+#include "core/column/column_string.h"
+#include "core/column/column_varbinary.h"
+#include "core/custom_allocator.h"
+#include "core/data_type/data_type_nullable.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/data_type_string.h"
+#include "core/data_type/data_type_varbinary.h"
+#include "exec/common/hash_table/hash.h"
+#include "exprs/aggregate/aggregate_function_datasketches_hll_union_agg.h"
+#include "exprs/aggregate/aggregate_function_simple_factory.h"
+
+namespace doris {
+
+void register_aggregate_function_datasketches_HLL_union_agg(
+        AggregateFunctionSimpleFactory& factory);
+
+class AggregateFunctionDataSketchesHllUnionAggTest : public ::testing::Test {
+protected:
+    void SetUp() override { arena = std::make_unique<Arena>(); }
+
+    void TearDown() override { arena.reset(); }
+
+    std::unique_ptr<Arena> arena;
+};
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testBasicUnion) {
+    // Create test: union multiple hll sketches and get correct cardinality 
estimate
+    DataTypePtr input_type = std::make_shared<DataTypeString>();
+    DataTypes argument_types = {input_type};
+
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_STRING, 
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+    // Create 2 different hll sketches, each with 100 unique values
+    datasketches::hll_sketch sketch1(8); // lg_k=8
+    for (int i = 0; i < 100; i++) {
+        sketch1.update(i);
+    }
+
+    datasketches::hll_sketch sketch2(8);
+    for (int i = 50; i < 150; i++) {
+        sketch2.update(i);
+    }
+
+    // Serialize both sketches into string column
+    auto column_string = ColumnString::create();
+    const auto ser1 = sketch1.serialize_compact();
+    column_string->insert_data((const char*)(ser1.data()), ser1.size());
+    const auto ser2 = sketch2.serialize_compact();
+    column_string->insert_data((const char*)(ser2.data()), ser2.size());
+
+    // Create aggregate data place
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    // Add both rows
+    const IColumn* columns[1] = {column_string.get()};
+    agg_func->add(place, columns, 0, *arena);
+    agg_func->add(place, columns, 1, *arena);
+
+    // Get result
+    ColumnFloat64 result_column;
+    agg_func->insert_result_into(place, result_column);
+
+    double estimate = result_column.get_data()[0];
+    EXPECT_GE(estimate, 130.0);
+    EXPECT_LE(estimate, 170.0);
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testMergeTwoAggStates) {
+    DataTypePtr input_type = std::make_shared<DataTypeString>();
+    DataTypes argument_types = {input_type};
+
+    using AggFunc =
+            AggregateFunctionDataSketchesHllUnionAgg<TYPE_STRING,
+                                                     
AggregateFunctionHllSketchData<TYPE_STRING>>;
+    auto agg_func = std::make_shared<AggFunc>(argument_types);
+
+    // Create two separate aggregate states
+    AggregateDataPtr place1 =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place1);
+
+    AggregateDataPtr place2 =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place2);
+
+    // Add different data to each state
+    datasketches::hll_sketch sketch1(8);
+    for (int i = 0; i < 100; i++) sketch1.update(i);
+    const auto ser1 = sketch1.serialize_compact();
+
+    datasketches::hll_sketch sketch2(8);
+    for (int i = 100; i < 200; i++) sketch2.update(i);
+    const auto ser2 = sketch2.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)(ser1.data()), ser1.size());
+    column_string->insert_data((const char*)(ser2.data()), ser2.size());
+
+    const IColumn* columns[1] = {column_string.get()};
+    agg_func->add(place1, columns, 0, *arena);
+    agg_func->add(place2, columns, 1, *arena);
+
+    // Merge second state into first
+    agg_func->merge(place1, place2, *arena);
+
+    // Get result
+    ColumnFloat64 result;
+    agg_func->insert_result_into(place1, result);
+    double estimate = result.get_data()[0];
+
+    EXPECT_GE(estimate, 170.0);
+    EXPECT_LE(estimate, 230.0); // 200 unique values expected
+
+    agg_func->destroy(place1);
+    agg_func->destroy(place2);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testMergeEmptyStateDoesNotCrash) {
+    DataTypePtr input_type = std::make_shared<DataTypeString>();
+    DataTypes argument_types = {input_type};
+
+    using AggFunc =
+            AggregateFunctionDataSketchesHllUnionAgg<TYPE_STRING,
+                                                     
AggregateFunctionHllSketchData<TYPE_STRING>>;
+    auto agg_func = std::make_shared<AggFunc>(argument_types);
+
+    AggregateDataPtr place_with_data =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place_with_data);
+
+    AggregateDataPtr empty_rhs_place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(empty_rhs_place);
+
+    datasketches::hll_sketch sketch(8, datasketches::HLL_8);
+    for (int i = 0; i < 100; ++i) {
+        sketch.update(i);
+    }
+    const auto ser = sketch.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)ser.data(), ser.size());
+    const IColumn* columns[1] = {column_string.get()};
+    agg_func->add(place_with_data, columns, 0, *arena);
+
+    // Covers the "all NULL" style path: rhs exists but never saw add().
+    EXPECT_NO_THROW(agg_func->merge(place_with_data, empty_rhs_place, *arena));
+
+    ColumnFloat64 result;
+    agg_func->insert_result_into(place_with_data, result);
+    EXPECT_DOUBLE_EQ(result.get_data()[0], sketch.get_estimate());
+
+    // Empty string is invalid serialized sketch and should be rejected by 
add().
+    // Merge-empty-state coverage is handled by the "never saw add()" path 
above.
+
+    AggregateDataPtr empty_lhs_place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(empty_lhs_place);
+    EXPECT_NO_THROW(agg_func->merge(empty_lhs_place, empty_rhs_place, *arena));
+
+    ColumnFloat64 empty_merge_result;
+    agg_func->insert_result_into(empty_lhs_place, empty_merge_result);
+    EXPECT_DOUBLE_EQ(empty_merge_result.get_data()[0], 0.0);
+
+    agg_func->destroy(place_with_data);
+    agg_func->destroy(empty_rhs_place);
+    agg_func->destroy(empty_lhs_place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testEmptyState) {
+    DataTypePtr input_type = std::make_shared<DataTypeString>();
+    DataTypes argument_types = {input_type};
+
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_STRING, 
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    ColumnFloat64 result;
+    agg_func->insert_result_into(place, result);
+    EXPECT_DOUBLE_EQ(result.get_data()[0], 0.0);
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testSerializeDeserialize) 
{
+    DataTypePtr input_type = std::make_shared<DataTypeString>();
+    DataTypes argument_types = {input_type};
+
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_STRING, 
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+    // Add some data
+    datasketches::hll_sketch sketch(8);
+    for (int i = 0; i < 100; i++) sketch.update(i);
+    const auto ser = sketch.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)(ser.data()), ser.size());
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    const IColumn* columns[1] = {column_string.get()};
+    agg_func->add(place, columns, 0, *arena);
+
+    // Serialize
+    auto buffer = ColumnString::create();
+    BufferWritable w(*buffer);
+    agg_func->serialize(place, w);
+    w.commit();
+
+    // Deserialize into new state
+    AggregateDataPtr new_place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(new_place);
+
+    BufferReadable r(buffer->get_data_at(0));
+    agg_func->deserialize(new_place, r, *arena);
+
+    // Compare results
+    ColumnFloat64 result1, result2;
+    agg_func->insert_result_into(place, result1);
+    agg_func->insert_result_into(new_place, result2);
+
+    EXPECT_DOUBLE_EQ(result1.get_data()[0], result2.get_data()[0]);
+
+    agg_func->destroy(place);
+    agg_func->destroy(new_place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testReset) {
+    DataTypePtr input_type = std::make_shared<DataTypeString>();
+    DataTypes argument_types = {input_type};
+
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_STRING, 
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+    datasketches::hll_sketch sketch(8);
+    for (int i = 0; i < 100; i++) sketch.update(i);
+    auto ser = sketch.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)(ser.data()), ser.size());
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    const IColumn* columns[1] = {column_string.get()};
+    agg_func->add(place, columns, 0, *arena);
+
+    // Reset
+    agg_func->reset(place);
+
+    ColumnFloat64 result;
+    agg_func->insert_result_into(place, result);
+    EXPECT_DOUBLE_EQ(result.get_data()[0], 0.0);
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testResetThenAddReinitializesState) {
+    DataTypePtr input_type = std::make_shared<DataTypeString>();
+    DataTypes argument_types = {input_type};
+
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_STRING, 
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+    datasketches::hll_sketch sketch1(8, datasketches::HLL_8);
+    for (int i = 0; i < 7; ++i) {
+        sketch1.update(i);
+    }
+    const auto ser1 = sketch1.serialize_compact();
+
+    datasketches::hll_sketch sketch2(8, datasketches::HLL_8);
+    for (int i = 10; i < 17; ++i) {
+        sketch2.update(i);
+    }
+    const auto ser2 = sketch2.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)ser1.data(), ser1.size());
+    column_string->insert_data((const char*)ser2.data(), ser2.size());
+    const IColumn* columns[1] = {column_string.get()};
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    agg_func->add(place, columns, 0, *arena);
+    agg_func->reset(place);
+    agg_func->add(place, columns, 1, *arena);
+
+    ColumnFloat64 result;
+    agg_func->insert_result_into(place, result);
+    EXPECT_DOUBLE_EQ(result.get_data()[0], sketch2.get_estimate());
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
+       testVarcharResetThenAddAndMergeEmptyRhsDoesNotCrash) {
+    DataTypePtr input_type = std::make_shared<DataTypeString>();
+    DataTypes argument_types = {std::make_shared<DataTypeString>(-1, 
TYPE_VARCHAR)};
+
+    using AggFunc =
+            AggregateFunctionDataSketchesHllUnionAgg<TYPE_VARCHAR,
+                                                     
AggregateFunctionHllSketchData<TYPE_VARCHAR>>;
+    auto agg_func = std::make_shared<AggFunc>(argument_types);
+
+    datasketches::hll_sketch sketch1(8, datasketches::HLL_8);
+    for (int i = 0; i < 7; ++i) {
+        sketch1.update(i);
+    }
+    const auto ser1 = sketch1.serialize_compact();
+
+    datasketches::hll_sketch sketch2(8, datasketches::HLL_8);
+    for (int i = 10; i < 17; ++i) {
+        sketch2.update(i);
+    }
+    const auto ser2 = sketch2.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)ser1.data(), ser1.size());
+    column_string->insert_data((const char*)ser2.data(), ser2.size());
+    const IColumn* columns[1] = {column_string.get()};
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    agg_func->add(place, columns, 0, *arena);
+    agg_func->reset(place);
+    agg_func->add(place, columns, 1, *arena);
+
+    ColumnFloat64 after_reset;
+    agg_func->insert_result_into(place, after_reset);
+    EXPECT_DOUBLE_EQ(after_reset.get_data()[0], sketch2.get_estimate());
+
+    AggregateDataPtr empty_rhs_place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(empty_rhs_place);
+
+    EXPECT_NO_THROW(agg_func->merge(place, empty_rhs_place, *arena));
+
+    ColumnFloat64 after_merge;
+    agg_func->insert_result_into(place, after_merge);
+    EXPECT_DOUBLE_EQ(after_merge.get_data()[0], sketch2.get_estimate());
+
+    AggregateDataPtr empty_lhs_place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(empty_lhs_place);
+
+    EXPECT_NO_THROW(agg_func->merge(empty_lhs_place, empty_rhs_place, *arena));
+
+    ColumnFloat64 empty_merge_result;
+    agg_func->insert_result_into(empty_lhs_place, empty_merge_result);
+    EXPECT_DOUBLE_EQ(empty_merge_result.get_data()[0], 0.0);
+
+    agg_func->destroy(place);
+    agg_func->destroy(empty_rhs_place);
+    agg_func->destroy(empty_lhs_place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testFactoryCreateAndAliases) {
+    AggregateFunctionSimpleFactory factory;
+    register_aggregate_function_datasketches_HLL_union_agg(factory);
+    int be_version = BeExecVersionManager::get_newest_version();
+
+    DataTypes argument_types = {std::make_shared<DataTypeString>()};
+
+    auto fn_main =
+            factory.get("datasketches_hll_union_agg", argument_types, nullptr, 
false, be_version);
+    auto fn_alias_sr_estimate =
+            factory.get("ds_hll_estimate", argument_types, nullptr, false, 
be_version);
+    auto fn_alias_datasketches_estimate =
+            factory.get("datasketches_hll_estimate", argument_types, nullptr, 
false, be_version);
+
+    ASSERT_NE(fn_main, nullptr);
+    ASSERT_NE(fn_alias_sr_estimate, nullptr);
+    ASSERT_NE(fn_alias_datasketches_estimate, nullptr);
+
+    datasketches::hll_sketch sketch(8, datasketches::HLL_8);
+    for (int i = 0; i < 7; ++i) sketch.update(i);
+    const auto ser = sketch.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)ser.data(), ser.size());
+    const IColumn* columns[1] = {column_string.get()};
+
+    auto run_and_get_result = [&](const AggregateFunctionPtr& fn) {
+        AggregateDataPtr place = arena->aligned_alloc(fn->size_of_data(), 
fn->align_of_data());
+        fn->create(place);
+        fn->add(place, columns, 0, *arena);
+        ColumnFloat64 result;
+        fn->insert_result_into(place, result);
+        fn->destroy(place);
+        return result.get_data()[0];
+    };
+
+    double expected = sketch.get_estimate();
+    EXPECT_DOUBLE_EQ(run_and_get_result(fn_main), expected);
+    EXPECT_DOUBLE_EQ(run_and_get_result(fn_alias_sr_estimate), expected);
+    EXPECT_DOUBLE_EQ(run_and_get_result(fn_alias_datasketches_estimate), 
expected);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
+       testFactoryCreateForVarcharAndNullableAndUnsupportedType) {
+    AggregateFunctionSimpleFactory factory;
+    register_aggregate_function_datasketches_HLL_union_agg(factory);
+    int be_version = BeExecVersionManager::get_newest_version();
+
+    DataTypes varchar_types = {std::make_shared<DataTypeString>(-1, 
TYPE_VARCHAR)};
+    auto fn_varchar =
+            factory.get("datasketches_hll_union_agg", varchar_types, nullptr, 
false, be_version);
+    auto fn_varchar_alias =
+            factory.get("ds_hll_estimate", varchar_types, nullptr, false, 
be_version);
+    ASSERT_NE(fn_varchar, nullptr);
+    ASSERT_NE(fn_varchar_alias, nullptr);
+
+    datasketches::hll_sketch sketch(8, datasketches::HLL_8);
+    for (int i = 0; i < 7; ++i) {
+        sketch.update(i);
+    }
+    const auto ser = sketch.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)ser.data(), ser.size());
+    const IColumn* columns[1] = {column_string.get()};
+
+    auto run_and_get_result = [&](const AggregateFunctionPtr& fn) {
+        AggregateDataPtr place = arena->aligned_alloc(fn->size_of_data(), 
fn->align_of_data());
+        fn->create(place);
+        fn->add(place, columns, 0, *arena);
+        ColumnFloat64 result;
+        fn->insert_result_into(place, result);
+        fn->destroy(place);
+        return result.get_data()[0];
+    };
+
+    double expected = sketch.get_estimate();
+    EXPECT_DOUBLE_EQ(run_and_get_result(fn_varchar), expected);
+    EXPECT_DOUBLE_EQ(run_and_get_result(fn_varchar_alias), expected);
+
+    DataTypes nullable_varchar_types = {
+            make_nullable(std::make_shared<DataTypeString>(-1, TYPE_VARCHAR))};
+    auto fn_nullable_varchar = factory.get("datasketches_hll_union_agg", 
nullable_varchar_types,
+                                           nullptr, false, be_version);
+    ASSERT_NE(fn_nullable_varchar, nullptr);
+
+    DataTypes unsupported_types = {std::make_shared<DataTypeInt32>()};
+    auto fn_unsupported = factory.get("datasketches_hll_union_agg", 
unsupported_types, nullptr,
+                                      false, be_version);
+    EXPECT_EQ(fn_unsupported, nullptr);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testLowLgKSketchDoesNotReportCorruption) {
+    DataTypes argument_types = {std::make_shared<DataTypeString>()};
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_STRING, 
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+    datasketches::hll_sketch sketch(4, datasketches::HLL_8);
+    for (int i = 0; i < 100; ++i) {
+        sketch.update(i);
+    }
+    const auto ser = sketch.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)ser.data(), ser.size());
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    const IColumn* columns[1] = {column_string.get()};
+    EXPECT_NO_THROW(agg_func->add(place, columns, 0, *arena));
+
+    ColumnFloat64 add_result;
+    agg_func->insert_result_into(place, add_result);
+
+    EXPECT_GE(add_result.get_data()[0], 50);
+    EXPECT_LE(add_result.get_data()[0], 150);
+
+    auto buf = ColumnString::create();
+    BufferWritable w(*buf);
+    StringRef d((const char*)ser.data(), ser.size());
+    w.write_binary(d);
+    w.commit();
+
+    AggregateDataPtr place2 =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place2);
+
+    BufferReadable r(buf->get_data_at(0));
+    EXPECT_NO_THROW(agg_func->deserialize(place2, r, *arena));
+
+    ColumnFloat64 deserialize_result;
+    agg_func->insert_result_into(place2, deserialize_result);
+    EXPECT_EQ(deserialize_result.get_data()[0], add_result.get_data()[0]);
+
+    agg_func->destroy(place);
+    agg_func->destroy(place2);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testAddEmptyStringThrows) 
{
+    DataTypes argument_types = {std::make_shared<DataTypeString>()};
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_STRING, 
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data("", 0);
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    const IColumn* columns[1] = {column_string.get()};
+
+    try {
+        agg_func->add(place, columns, 0, *arena);
+        FAIL() << "Expected doris::Exception";
+    } catch (const doris::Exception& e) {
+        EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+    }
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testResetOnEmptyState) {
+    DataTypes argument_types = {std::make_shared<DataTypeString>()};
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_STRING, 
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    EXPECT_NO_THROW(agg_func->reset(place)); // cover reset() branch when 
union_data is nullptr
+
+    ColumnFloat64 result;
+    agg_func->insert_result_into(place, result);
+    EXPECT_DOUBLE_EQ(result.get_data()[0], 0.0);
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testVarbinaryInput) {
+    AggregateFunctionSimpleFactory factory;
+    register_aggregate_function_datasketches_HLL_union_agg(factory);
+    int be_version = BeExecVersionManager::get_newest_version();
+
+    DataTypes argument_types = {std::make_shared<DataTypeVarbinary>()};
+    auto fn = factory.get("datasketches_hll_union_agg", argument_types, 
nullptr, false, be_version);
+    ASSERT_NE(fn, nullptr);
+
+    datasketches::hll_sketch sketch(8, datasketches::HLL_8);
+    for (int i = 20; i < 30; ++i) sketch.update(i);
+    const auto ser = sketch.serialize_compact();
+
+    auto column_varbinary = ColumnVarbinary::create();
+    column_varbinary->insert_data((const char*)ser.data(), ser.size());
+
+    const IColumn* columns[1] = {column_varbinary.get()};
+
+    AggregateDataPtr place = arena->aligned_alloc(fn->size_of_data(), 
fn->align_of_data());
+    fn->create(place);
+    fn->add(place, columns, 0, *arena);
+
+    ColumnFloat64 result;
+    fn->insert_result_into(place, result);
+    EXPECT_DOUBLE_EQ(result.get_data()[0], sketch.get_estimate());
+
+    fn->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testVarbinaryAddEmptyStringThrows) {
+    DataTypes argument_types = {std::make_shared<DataTypeVarbinary>()};
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_VARBINARY, 
AggregateFunctionHllSketchData<TYPE_VARBINARY>>>(argument_types);
+
+    auto column_varbinary = ColumnVarbinary::create();
+    column_varbinary->insert_data("", 0);
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    const IColumn* columns[1] = {column_varbinary.get()};
+
+    try {
+        agg_func->add(place, columns, 0, *arena);
+        FAIL() << "Expected doris::Exception";
+    } catch (const doris::Exception& e) {
+        EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+    }
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testVarbinaryCorruptedInputThrows) {
+    DataTypes argument_types = {std::make_shared<DataTypeVarbinary>()};
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_VARBINARY, 
AggregateFunctionHllSketchData<TYPE_VARBINARY>>>(argument_types);
+
+    auto column_varbinary = ColumnVarbinary::create();
+    column_varbinary->insert_data("x", 1);
+    const IColumn* columns[1] = {column_varbinary.get()};
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    try {
+        agg_func->add(place, columns, 0, *arena);
+        FAIL() << "Expected doris::Exception";
+    } catch (const doris::Exception& e) {
+        EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+    }
+
+    auto corrupt_buf = ColumnString::create();
+    BufferWritable corrupt_w(*corrupt_buf);
+    StringRef corrupted("x", 1);
+    corrupt_w.write_binary(corrupted);
+    corrupt_w.commit();
+
+    BufferReadable r(corrupt_buf->get_data_at(0));
+    try {
+        agg_func->deserialize(place, r, *arena);
+        FAIL() << "Expected doris::Exception";
+    } catch (const doris::Exception& e) {
+        EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+    }
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testVarbinarySerializeDeserialize) {
+    DataTypes argument_types = {std::make_shared<DataTypeVarbinary>()};
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_VARBINARY, 
AggregateFunctionHllSketchData<TYPE_VARBINARY>>>(argument_types);
+
+    datasketches::hll_sketch sketch(8, datasketches::HLL_8);
+    for (int i = 0; i < 100; ++i) {
+        sketch.update(i);
+    }
+    const auto ser = sketch.serialize_compact();
+
+    auto column_varbinary = ColumnVarbinary::create();
+    column_varbinary->insert_data((const char*)ser.data(), ser.size());
+    const IColumn* columns[1] = {column_varbinary.get()};
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+    agg_func->add(place, columns, 0, *arena);
+
+    auto buffer = ColumnString::create();
+    BufferWritable w(*buffer);
+    agg_func->serialize(place, w);
+    w.commit();
+
+    AggregateDataPtr new_place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(new_place);
+
+    BufferReadable r(buffer->get_data_at(0));
+    agg_func->deserialize(new_place, r, *arena);
+
+    ColumnFloat64 result1, result2;
+    agg_func->insert_result_into(place, result1);
+    agg_func->insert_result_into(new_place, result2);
+
+    EXPECT_DOUBLE_EQ(result1.get_data()[0], result2.get_data()[0]);
+
+    agg_func->destroy(place);
+    agg_func->destroy(new_place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testSerializeDeserializeEmptyState) {
+    DataTypes argument_types = {std::make_shared<DataTypeString>()};
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_STRING, 
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    auto buffer = ColumnString::create();
+    BufferWritable w(*buffer);
+    EXPECT_NO_THROW(agg_func->serialize(place, w)); // covers write() 
empty-state branch
+    w.commit();
+
+    AggregateDataPtr new_place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(new_place);
+
+    BufferReadable r(buffer->get_data_at(0));
+    agg_func->deserialize(new_place, r, *arena);
+
+    ColumnFloat64 result;
+    agg_func->insert_result_into(new_place, result);
+    EXPECT_DOUBLE_EQ(result.get_data()[0], 0.0);
+
+    agg_func->destroy(place);
+    agg_func->destroy(new_place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, testCorruptedInputThrows) 
{
+    DataTypes argument_types = {std::make_shared<DataTypeString>()};
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_STRING, 
AggregateFunctionHllSketchData<TYPE_STRING>>>(argument_types);
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data("x", 1); // definitely not a valid sketch
+    const IColumn* columns[1] = {column_string.get()};
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    try {
+        agg_func->add(place, columns, 0, *arena); // covers add() CORRUPTION 
catch branch
+        FAIL() << "Expected doris::Exception";
+    } catch (const doris::Exception& e) {
+        EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+    }
+
+    auto corrupt_buf = ColumnString::create();
+    BufferWritable corrupt_w(*corrupt_buf);
+    StringRef corrupted("x", 1);
+    corrupt_w.write_binary(corrupted);
+    corrupt_w.commit();
+
+    BufferReadable r(corrupt_buf->get_data_at(0));
+    try {
+        agg_func->deserialize(place, r, *arena); // covers read() CORRUPTION 
catch branch
+        FAIL() << "Expected doris::Exception";
+    } catch (const doris::Exception& e) {
+        EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+    }
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testAllocatorAwareSketchInput) {
+    DataTypes argument_types = {std::make_shared<DataTypeString>()};
+    using AggFunc =
+            AggregateFunctionDataSketchesHllUnionAgg<TYPE_STRING,
+                                                     
AggregateFunctionHllSketchData<TYPE_STRING>>;
+    auto agg_func = std::make_shared<AggFunc>(argument_types);
+
+    using Alloc = doris::CustomStdAllocator<uint8_t>;
+    using Sketch = datasketches::hll_sketch_alloc<Alloc>;
+
+    Sketch sketch(8, datasketches::HLL_8, false, Alloc());
+    for (int i = 0; i < 7; ++i) {
+        sketch.update(i);
+    }
+    const auto ser = sketch.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)ser.data(), ser.size());
+    const IColumn* columns[1] = {column_string.get()};
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    agg_func->add(place, columns, 0, *arena);
+
+    ColumnFloat64 result;
+    agg_func->insert_result_into(place, result);
+    EXPECT_DOUBLE_EQ(result.get_data()[0], sketch.get_estimate());
+
+    agg_func->destroy(place);
+}
+
+namespace {
+
+class ScopedMemAllocFaultInjection {
+public:
+    explicit ScopedMemAllocFaultInjection(double probability)
+            : _old_probability(doris::config::mem_alloc_fault_probability) {
+        doris::config::mem_alloc_fault_probability = probability;
+        doris::enable_thread_catch_bad_alloc++;
+    }
+
+    ~ScopedMemAllocFaultInjection() {
+        doris::enable_thread_catch_bad_alloc--;
+        doris::config::mem_alloc_fault_probability = _old_probability;
+    }
+
+    ScopedMemAllocFaultInjection(const ScopedMemAllocFaultInjection&) = delete;
+    ScopedMemAllocFaultInjection& operator=(const 
ScopedMemAllocFaultInjection&) = delete;
+
+private:
+    double _old_probability;
+};
+
+} // namespace
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testMetaInfoCoversInlineMethods) {
+    DataTypes argument_types = {std::make_shared<DataTypeString>()};
+
+    using AggFunc =
+            AggregateFunctionDataSketchesHllUnionAgg<TYPE_STRING,
+                                                     
AggregateFunctionHllSketchData<TYPE_STRING>>;
+    auto agg_func = std::make_shared<AggFunc>(argument_types);
+
+    EXPECT_EQ(AggregateFunctionHllSketchData<TYPE_STRING>::get_name(),
+              "datasketches_hll_union_agg");
+    EXPECT_EQ(agg_func->get_name(), "datasketches_hll_union_agg");
+
+    auto return_type = agg_func->get_return_type();
+    EXPECT_TRUE(return_type->equals(*std::make_shared<DataTypeFloat64>()));
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testAddMemAllocFailedThrowsMemAllocFailed) {
+    DataTypes argument_types = {std::make_shared<DataTypeString>()};
+    using AggFunc =
+            AggregateFunctionDataSketchesHllUnionAgg<TYPE_STRING,
+                                                     
AggregateFunctionHllSketchData<TYPE_STRING>>;
+    auto agg_func = std::make_shared<AggFunc>(argument_types);
+
+    datasketches::hll_sketch sketch(12, datasketches::HLL_8);
+    for (int i = 0; i < 1000; ++i) {
+        sketch.update(i);
+    }
+    const auto ser = sketch.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)ser.data(), ser.size());
+    const IColumn* columns[1] = {column_string.get()};
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    {
+        ScopedMemAllocFaultInjection inject(1.0);
+        try {
+            agg_func->add(place, columns, 0, *arena);
+            FAIL() << "Expected doris::Exception";
+        } catch (const doris::Exception& e) {
+            EXPECT_EQ(e.code(), doris::ErrorCode::MEM_ALLOC_FAILED);
+        }
+    }
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
+       testDeserializeMemAllocFailedThrowsMemAllocFailed) {
+    DataTypes argument_types = {std::make_shared<DataTypeString>()};
+    using AggFunc =
+            AggregateFunctionDataSketchesHllUnionAgg<TYPE_STRING,
+                                                     
AggregateFunctionHllSketchData<TYPE_STRING>>;
+    auto agg_func = std::make_shared<AggFunc>(argument_types);
+
+    datasketches::hll_sketch sketch(12, datasketches::HLL_8);
+    for (int i = 0; i < 1000; ++i) {
+        sketch.update(i);
+    }
+    const auto ser = sketch.serialize_compact();
+
+    auto buffer = ColumnString::create();
+    BufferWritable w(*buffer);
+    StringRef d((const char*)ser.data(), ser.size());
+    w.write_binary(d);
+    w.commit();
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    BufferReadable r(buffer->get_data_at(0));
+    {
+        ScopedMemAllocFaultInjection inject(1.0);
+        try {
+            agg_func->deserialize(place, r, *arena);
+            FAIL() << "Expected doris::Exception";
+        } catch (const doris::Exception& e) {
+            EXPECT_EQ(e.code(), doris::ErrorCode::MEM_ALLOC_FAILED);
+        }
+    }
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testVarcharAddEmptyStringThrows) {
+    DataTypes argument_types = {std::make_shared<DataTypeString>(-1, 
TYPE_VARCHAR)};
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_VARCHAR, 
AggregateFunctionHllSketchData<TYPE_VARCHAR>>>(argument_types);
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data("", 0);
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    const IColumn* columns[1] = {column_string.get()};
+
+    try {
+        agg_func->add(place, columns, 0, *arena);
+        FAIL() << "Expected doris::Exception";
+    } catch (const doris::Exception& e) {
+        EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+    }
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testVarcharCorruptedInputThrows) {
+    DataTypes argument_types = {std::make_shared<DataTypeString>(-1, 
TYPE_VARCHAR)};
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_VARCHAR, 
AggregateFunctionHllSketchData<TYPE_VARCHAR>>>(argument_types);
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data("x", 1);
+    const IColumn* columns[1] = {column_string.get()};
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+
+    try {
+        agg_func->add(place, columns, 0, *arena);
+        FAIL() << "Expected doris::Exception";
+    } catch (const doris::Exception& e) {
+        EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+    }
+
+    auto corrupt_buf = ColumnString::create();
+    BufferWritable corrupt_w(*corrupt_buf);
+    StringRef corrupted("x", 1);
+    corrupt_w.write_binary(corrupted);
+    corrupt_w.commit();
+
+    BufferReadable r(corrupt_buf->get_data_at(0));
+    try {
+        agg_func->deserialize(place, r, *arena);
+        FAIL() << "Expected doris::Exception";
+    } catch (const doris::Exception& e) {
+        EXPECT_EQ(e.code(), doris::ErrorCode::CORRUPTION);
+    }
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testVarcharSerializeDeserialize) {
+    DataTypes argument_types = {std::make_shared<DataTypeString>(-1, 
TYPE_VARCHAR)};
+    auto agg_func = std::make_shared<AggregateFunctionDataSketchesHllUnionAgg<
+            TYPE_VARCHAR, 
AggregateFunctionHllSketchData<TYPE_VARCHAR>>>(argument_types);
+
+    datasketches::hll_sketch sketch(8, datasketches::HLL_8);
+    for (int i = 0; i < 100; i++) {
+        sketch.update(i);
+    }
+    const auto ser = sketch.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)ser.data(), ser.size());
+    const IColumn* columns[1] = {column_string.get()};
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+    agg_func->add(place, columns, 0, *arena);
+
+    auto buffer = ColumnString::create();
+    BufferWritable w(*buffer);
+    agg_func->serialize(place, w);
+    w.commit();
+
+    AggregateDataPtr new_place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(new_place);
+
+    BufferReadable r(buffer->get_data_at(0));
+    agg_func->deserialize(new_place, r, *arena);
+
+    ColumnFloat64 result1, result2;
+    agg_func->insert_result_into(place, result1);
+    agg_func->insert_result_into(new_place, result2);
+
+    EXPECT_DOUBLE_EQ(result1.get_data()[0], result2.get_data()[0]);
+
+    agg_func->destroy(place);
+    agg_func->destroy(new_place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest,
+       testSerializeMemAllocFailedThrowsMemAllocFailed) {
+    DataTypes argument_types = {std::make_shared<DataTypeString>()};
+    using AggFunc =
+            AggregateFunctionDataSketchesHllUnionAgg<TYPE_STRING,
+                                                     
AggregateFunctionHllSketchData<TYPE_STRING>>;
+    auto agg_func = std::make_shared<AggFunc>(argument_types);
+
+    datasketches::hll_sketch sketch(12, datasketches::HLL_8);
+    for (int i = 0; i < 2000; ++i) {
+        sketch.update(i);
+    }
+    const auto ser = sketch.serialize_compact();
+
+    auto column_string = ColumnString::create();
+    column_string->insert_data((const char*)ser.data(), ser.size());
+    const IColumn* columns[1] = {column_string.get()};
+
+    AggregateDataPtr place =
+            arena->aligned_alloc(agg_func->size_of_data(), 
agg_func->align_of_data());
+    agg_func->create(place);
+    agg_func->add(place, columns, 0, *arena);
+
+    auto buffer = ColumnString::create();
+    BufferWritable w(*buffer);
+
+    {
+        ScopedMemAllocFaultInjection inject(1.0);
+        try {
+            agg_func->serialize(place, w);
+            FAIL() << "Expected doris::Exception";
+        } catch (const doris::Exception& e) {
+            EXPECT_EQ(e.code(), doris::ErrorCode::MEM_ALLOC_FAILED);
+            EXPECT_NE(e.to_string().find("serialize HLL sketch"), 
std::string::npos);
+        }
+    }
+
+    agg_func->destroy(place);
+}
+
+TEST_F(AggregateFunctionDataSketchesHllUnionAggTest, 
testDataMergeDownsampleMemAllocFailed) {
+    using Data = AggregateFunctionHllSketchData<TYPE_STRING>;
+    using Alloc = Data::Alloc;
+    using Sketch = Data::Sketch;
+
+    Data data;
+
+    Sketch large_k_sketch(12, datasketches::HLL_8, true, Alloc());
+    for (int i = 0; i < 5000; ++i) {
+        large_k_sketch.update(i);
+    }
+    EXPECT_NO_THROW(data.merge(large_k_sketch));
+
+    Sketch small_k_sketch(8, datasketches::HLL_8, true, Alloc());
+    for (int i = 0; i < 5000; ++i) {
+        small_k_sketch.update(i);
+    }
+
+    {
+        ScopedMemAllocFaultInjection inject(1.0);
+        try {
+            data.merge(small_k_sketch);
+            FAIL() << "Expected doris::Exception";
+        } catch (const doris::Exception& e) {
+            EXPECT_EQ(e.code(), doris::ErrorCode::MEM_ALLOC_FAILED);
+            EXPECT_NE(e.to_string().find("update HLL sketch"), 
std::string::npos);
+        }
+    }
+}
+
+} // namespace doris
diff --git a/build.sh b/build.sh
index a5fe18b3e8d..eac9545fbf1 100755
--- a/build.sh
+++ b/build.sh
@@ -34,6 +34,7 @@ if [[ -z "${DORIS_THIRDPARTY}" ]]; then
     export DORIS_THIRDPARTY="${DORIS_HOME}/thirdparty"
 fi
 export TP_INCLUDE_DIR="${DORIS_THIRDPARTY}/installed/include"
+export TP_INSTALLED_DIR="${DORIS_THIRDPARTY}/installed"
 export TP_LIB_DIR="${DORIS_THIRDPARTY}/installed/lib"
 HADOOP_DEPS_NAME="hadoop-deps"
 . "${DORIS_HOME}/env.sh"
@@ -662,6 +663,14 @@ FE_MODULES="$(
 
 # Clean and build Backend
 if [[ "${BUILD_BE}" -eq 1 ]]; then
+
+    echo "install datasketches-cpp to thirdparty path before build be"
+    update_submodule "contrib/datasketches-cpp" "datasketches-cpp" 
"https://github.com/apache/datasketches-cpp/archive/refs/heads/master.tar.gz";
+    cd "${DORIS_HOME}/contrib/datasketches-cpp"
+    "${CMAKE_CMD}" -S . -B build/Release -DCMAKE_BUILD_TYPE=Release 
-DCMAKE_INSTALL_PREFIX=$TP_INSTALLED_DIR -DBUILD_TESTS=OFF
+    "${CMAKE_CMD}" --build build/Release -t install
+    cd "${DORIS_HOME}"
+
     update_submodule "contrib/apache-orc" "apache-orc" 
"https://github.com/apache/doris-thirdparty/archive/refs/heads/orc.tar.gz";
     update_submodule "contrib/clucene" "clucene" 
"https://github.com/apache/doris-thirdparty/archive/refs/heads/clucene.tar.gz";
     update_submodule "contrib/openblas" "openblas" 
"https://github.com/apache/doris-thirdparty/archive/refs/heads/openblas.tar.gz";
diff --git a/contrib/datasketches-cpp b/contrib/datasketches-cpp
new file mode 160000
index 00000000000..de8553ba372
--- /dev/null
+++ b/contrib/datasketches-cpp
@@ -0,0 +1 @@
+Subproject commit de8553ba372e618382c2e7b44b0ffc9422b9458c
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
index aa136f3656a..8eb6870f11e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
@@ -38,6 +38,7 @@ import 
org.apache.doris.nereids.trees.expressions.functions.agg.Count;
 import org.apache.doris.nereids.trees.expressions.functions.agg.CountByEnum;
 import org.apache.doris.nereids.trees.expressions.functions.agg.Covar;
 import org.apache.doris.nereids.trees.expressions.functions.agg.CovarSamp;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.DataSketchesHllUnionAgg;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.ExponentialMovingAverage;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.GroupArrayIntersect;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.GroupArrayUnion;
@@ -155,6 +156,8 @@ public class BuiltinAggregateFunctions implements 
FunctionHelper {
                 agg(Histogram.class, "hist", "histogram"),
                 agg(HllUnion.class, "hll_raw_agg", "hll_union"),
                 agg(HllUnionAgg.class, "hll_union_agg"),
+                agg(DataSketchesHllUnionAgg.class, 
"datasketches_hll_union_agg",
+                    "ds_hll_estimate", "datasketches_hll_estimate"),
                 agg(IntersectCount.class, "intersect_count"),
                 agg(Kurt.class, "kurt", "kurt_pop", "kurtosis"),
                 agg(LinearHistogram.class, "linear_histogram"),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/DataSketchesHllUnionAgg.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/DataSketchesHllUnionAgg.java
new file mode 100644
index 00000000000..9fe46d2a77d
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/DataSketchesHllUnionAgg.java
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions.functions.agg;
+
+import org.apache.doris.catalog.FunctionSignature;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
+import org.apache.doris.nereids.trees.expressions.functions.Function;
+import org.apache.doris.nereids.trees.expressions.functions.FunctionTrait;
+import org.apache.doris.nereids.trees.expressions.literal.DoubleLiteral;
+import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
+import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
+import org.apache.doris.nereids.types.DataType;
+import org.apache.doris.nereids.types.DoubleType;
+import org.apache.doris.nereids.types.StringType;
+import org.apache.doris.nereids.types.VarBinaryType;
+import org.apache.doris.nereids.types.VarcharType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+/** datasketches_hll_union_agg agg function. */
+public class DataSketchesHllUnionAgg extends NotNullableAggregateFunction
+        implements UnaryExpression, ExplicitlyCastableSignature, 
FunctionTrait, RollUpTrait {
+    public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
+            
FunctionSignature.ret(DoubleType.INSTANCE).args(StringType.INSTANCE),
+            
FunctionSignature.ret(DoubleType.INSTANCE).args(VarcharType.SYSTEM_DEFAULT),
+            
FunctionSignature.ret(DoubleType.INSTANCE).args(VarBinaryType.INSTANCE)
+    );
+
+    /**
+     * constructor with 1 argument.
+     */
+    public DataSketchesHllUnionAgg(Expression arg) {
+        super("datasketches_hll_union_agg", arg);
+    }
+
+    /**
+     * constructor with 1 argument.
+     */
+    public DataSketchesHllUnionAgg(boolean distinct, Expression arg) {
+        this(arg);
+    }
+
+    /** constructor for withChildren and reuse signature */
+    protected DataSketchesHllUnionAgg(AggregateFunctionParams functionParams) {
+        super(functionParams);
+    }
+
+    @Override
+    public void checkLegalityBeforeTypeCoercion() {
+        DataType inputType = getArgumentType(0);
+        if (!(inputType.isStringType() || inputType.isVarcharType() || 
inputType.isVarBinaryType()
+                || inputType.isNullType())) {
+            throw new AnalysisException(getName()
+                + " function's argument should be of STRING/VARCHAR/VARBINARY 
type, but was " + inputType);
+        }
+    }
+
+    @Override
+    protected List<DataType> intermediateTypes() {
+        return ImmutableList.of(StringType.INSTANCE);
+    }
+
+    @Override
+    public List<FunctionSignature> getSignatures() {
+        return SIGNATURES;
+    }
+
+    @Override
+    public DataSketchesHllUnionAgg withDistinctAndChildren(boolean distinct, 
List<Expression> children) {
+        Preconditions.checkArgument(children.size() == 1);
+        return new DataSketchesHllUnionAgg(getFunctionParams(distinct, 
children));
+    }
+
+    @Override
+    public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
+        return visitor.visitDataSketchesHllUnionAgg(this, context);
+    }
+
+    @Override
+    public Function constructRollUp(Expression param, Expression... varParams) 
{
+        return new 
DataSketchesHllUnionAgg(getFunctionParams(ImmutableList.of(param)));
+    }
+
+    @Override
+    public boolean canRollUp() {
+        return false;
+    }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new DoubleLiteral(0);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
index 2a09e907fc3..d90c9a8eb82 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
@@ -39,6 +39,7 @@ import 
org.apache.doris.nereids.trees.expressions.functions.agg.Count;
 import org.apache.doris.nereids.trees.expressions.functions.agg.CountByEnum;
 import org.apache.doris.nereids.trees.expressions.functions.agg.Covar;
 import org.apache.doris.nereids.trees.expressions.functions.agg.CovarSamp;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.DataSketchesHllUnionAgg;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.ExponentialMovingAverage;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.GroupArrayIntersect;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.GroupArrayUnion;
@@ -246,6 +247,10 @@ public interface AggregateFunctionVisitor<R, C> {
         return visitAggregateFunction(histogram, context);
     }
 
+    default R visitDataSketchesHllUnionAgg(DataSketchesHllUnionAgg 
datasketchesHllUnionAgg, C context) {
+        return visitAggregateFunction(datasketchesHllUnionAgg, context);
+    }
+
     default R visitHllUnion(HllUnion hllUnion, C context) {
         return visitAggregateFunction(hllUnion, context);
     }
diff --git 
a/regression-test/data/query_p0/sql_functions/aggregate_functions/test_datasketches_hll_union_agg.out
 
b/regression-test/data/query_p0/sql_functions/aggregate_functions/test_datasketches_hll_union_agg.out
new file mode 100644
index 00000000000..ae1424305c7
--- /dev/null
+++ 
b/regression-test/data/query_p0/sql_functions/aggregate_functions/test_datasketches_hll_union_agg.out
@@ -0,0 +1,28 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !basic_union --
+17
+
+-- !aliases --
+17     17      17
+
+-- !group_by --
+1      7
+2      10
+
+-- !distinct --
+17     17
+
+-- !basic_union_varchar --
+17
+
+-- !aliases_varchar --
+17     17      17
+
+-- !basic_union_varbinary --
+17
+
+-- !aliases_varbinary --
+17     17      17
+
+-- !empty_input --
+0
diff --git 
a/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_datasketches_hll_union_agg.groovy
 
b/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_datasketches_hll_union_agg.groovy
new file mode 100644
index 00000000000..8d86b8c82d6
--- /dev/null
+++ 
b/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_datasketches_hll_union_agg.groovy
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_datasketches_hll_union_agg") {
+    def tableName = "test_datasketches_hll_union_agg_tbl"
+    def varcharTableName = "test_datasketches_hll_union_agg_varchar_tbl"
+    def emptyTableName = "test_datasketches_hll_union_agg_empty_tbl"
+    def badTableName = "test_datasketches_hll_union_agg_bad_tbl"
+
+    // sk = new HllSketch(8, HLL_8); for (int i = 0; i < 7; i++) sk.update(i);
+    def sk1Base64 = "AgEHCAMIBwjL18IEK/L7BoYv+Q11gWYHgbxdBntl5gj8LUIK"
+
+    // sk = new HllSketch(8, HLL_8); for (int i = 20; i < 30; i++) 
sk.update(i);
+    def sk2Base64 = 
"AwEHCAUIAAkKAAAAIjvrBcS1nwfGGWoEyHokBO8t9wc1qTEENkcJB7hWqQxZf9QNnuSbGA=="
+
+    sql "DROP TABLE IF EXISTS ${tableName}"
+    sql """
+        CREATE TABLE ${tableName} (
+            id INT,
+            sk STRING
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1"
+        )
+    """
+
+    sql """
+        INSERT INTO ${tableName} VALUES
+            (1, from_base64('${sk1Base64}')),
+            (2, from_base64('${sk2Base64}')),
+            (3, NULL)
+    """
+
+    // 1) Basic union: {0..6} U {20..29} => 17 distinct values
+    qt_basic_union """SELECT CAST(ROUND(datasketches_hll_union_agg(sk)) AS 
BIGINT) FROM ${tableName}"""
+
+    // 2) Aliases should behave identically
+    qt_aliases """SELECT
+            CAST(ROUND(datasketches_hll_union_agg(sk)) AS BIGINT),
+            CAST(ROUND(ds_hll_estimate(sk)) AS BIGINT),
+            CAST(ROUND(datasketches_hll_estimate(sk)) AS BIGINT)
+        FROM ${tableName}
+    """
+
+    // 3) Group-by
+    qt_group_by """SELECT id, CAST(ROUND(datasketches_hll_union_agg(sk)) AS 
BIGINT)
+        FROM ${tableName}
+        WHERE id IN (1, 2)
+        GROUP BY id
+        ORDER BY id
+    """
+
+    // 4) DISTINCT should not change result in this data set
+    sql "INSERT INTO ${tableName} VALUES (5, from_base64('${sk1Base64}'))"
+    qt_distinct """SELECT
+            CAST(ROUND(datasketches_hll_union_agg(sk)) AS BIGINT),
+            CAST(ROUND(datasketches_hll_union_agg(DISTINCT sk)) AS BIGINT)
+        FROM ${tableName}
+    """
+
+    // 4.1) Input type coverage: VARCHAR
+    sql "DROP TABLE IF EXISTS ${varcharTableName}"
+    sql """
+        CREATE TABLE ${varcharTableName} (
+            id INT,
+            sk VARCHAR(256)
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1"
+        )
+    """
+
+    sql """
+        INSERT INTO ${varcharTableName} VALUES
+            (1, from_base64('${sk1Base64}')),
+            (2, from_base64('${sk2Base64}')),
+            (3, NULL)
+    """
+
+    qt_basic_union_varchar """SELECT 
CAST(ROUND(datasketches_hll_union_agg(sk)) AS BIGINT) FROM 
${varcharTableName}"""
+
+    qt_aliases_varchar """SELECT
+            CAST(ROUND(datasketches_hll_union_agg(sk)) AS BIGINT),
+            CAST(ROUND(ds_hll_estimate(sk)) AS BIGINT),
+            CAST(ROUND(datasketches_hll_estimate(sk)) AS BIGINT)
+        FROM ${varcharTableName}
+    """
+
+    // 4.2) Input type coverage: VARBINARY (no table column; VARBINARY is not 
supported for table storage)
+    qt_basic_union_varbinary """SELECT 
CAST(ROUND(datasketches_hll_union_agg(sk)) AS BIGINT) FROM (
+            SELECT from_base64_binary('${sk1Base64}') AS sk
+            UNION ALL SELECT from_base64_binary('${sk2Base64}')
+            UNION ALL SELECT NULL
+        ) t
+    """
+
+    qt_aliases_varbinary """SELECT
+            CAST(ROUND(datasketches_hll_union_agg(sk)) AS BIGINT),
+            CAST(ROUND(ds_hll_estimate(sk)) AS BIGINT),
+            CAST(ROUND(datasketches_hll_estimate(sk)) AS BIGINT)
+        FROM (
+            SELECT from_base64_binary('${sk1Base64}') AS sk
+            UNION ALL SELECT from_base64_binary('${sk2Base64}')
+            UNION ALL SELECT NULL
+        ) t
+    """
+
+    // 5) Empty input should return 0
+    sql "DROP TABLE IF EXISTS ${emptyTableName}"
+    sql """
+        CREATE TABLE ${emptyTableName} (
+            id INT,
+            sk STRING
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1"
+        )
+    """
+    qt_empty_input """SELECT CAST(ROUND(datasketches_hll_union_agg(sk)) AS 
BIGINT) FROM ${emptyTableName}"""
+
+    // 6) Illegal input should throw (base64 is valid but bytes are not a 
datasketches HLL sketch)
+    test {
+        sql """SELECT datasketches_hll_union_agg(from_base64('AA=='))"""
+        exception "CORRUPTION"
+    }
+    test {
+        sql """SELECT ds_hll_estimate(from_base64('AA=='))"""
+        exception "CORRUPTION"
+    }
+    test {
+        sql """SELECT datasketches_hll_estimate(from_base64('AA=='))"""
+        exception "CORRUPTION"
+    }
+
+    // Empty string is a valid STRING value, but it is an invalid serialized 
DataSketches HLL sketch.
+    // It should not fail at INSERT time; it should fail when the aggregate 
function reads it.
+    sql "DROP TABLE IF EXISTS ${badTableName}"
+    sql """
+        CREATE TABLE ${badTableName} (
+            id INT,
+            sk STRING
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1"
+        )
+    """
+    sql """INSERT INTO ${badTableName} VALUES (1, '')"""
+    test {
+        sql """SELECT datasketches_hll_union_agg(sk) FROM ${badTableName}"""
+        exception "CORRUPTION"
+    }
+}
diff --git a/run-be-ut.sh b/run-be-ut.sh
index 50b76f6d6f1..a4a463eccec 100755
--- a/run-be-ut.sh
+++ b/run-be-ut.sh
@@ -41,7 +41,12 @@ ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && 
pwd)"
 
 export ROOT
 export DORIS_HOME="${ROOT}"
-
+if [[ -z "${DORIS_THIRDPARTY}" ]]; then
+    export DORIS_THIRDPARTY="${DORIS_HOME}/thirdparty"
+fi
+export TP_INCLUDE_DIR="${DORIS_THIRDPARTY}/installed/include"
+export TP_INSTALLED_DIR="${DORIS_THIRDPARTY}/installed"
+export TP_LIB_DIR="${DORIS_THIRDPARTY}/installed/lib"
 . "${DORIS_HOME}/env.sh"
 
 # Check args
@@ -174,6 +179,13 @@ update_submodule() {
     fi
 }
 
+echo "install datasketches-cpp to thirdparty path before build backend ut"
+update_submodule "contrib/datasketches-cpp" "datasketches-cpp" 
"https://github.com/apache/datasketches-cpp/archive/refs/heads/master.tar.gz";
+cd "${DORIS_HOME}/contrib/datasketches-cpp"
+"${CMAKE_CMD}" -S . -B build/Release -DCMAKE_BUILD_TYPE=Release 
-DCMAKE_INSTALL_PREFIX=$TP_INSTALLED_DIR -DBUILD_TESTS=OFF
+"${CMAKE_CMD}" --build build/Release -t install
+cd "${DORIS_HOME}"
+
 update_submodule "contrib/apache-orc" "apache-orc" 
"https://github.com/apache/doris-thirdparty/archive/refs/heads/orc.tar.gz";
 update_submodule "contrib/clucene" "clucene" 
"https://github.com/apache/doris-thirdparty/archive/refs/heads/clucene.tar.gz";
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to