This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 156b151301a [refine](Buffer) Rename and classify in BufferReadable and
BufferWritable (#52735)
156b151301a is described below
commit 156b151301a95525093e0ef3f535d3dd01a2d98f
Author: Mryange <[email protected]>
AuthorDate: Thu Jul 10 08:48:07 2025 +0800
[refine](Buffer) Rename and classify in BufferReadable and BufferWritable
(#52735)
### What problem does this PR solve?
### Release note
None
---
be/src/util/counts.h | 4 +-
.../aggregate_function_approx_count_distinct.h | 4 +-
.../aggregate_function_approx_top_k.h | 24 ++--
.../aggregate_function_approx_top_sum.h | 24 ++--
.../aggregate_function_array_agg.h | 24 ++--
.../aggregate_functions/aggregate_function_avg.h | 8 +-
.../aggregate_function_avg_weighted.h | 8 +-
.../aggregate_functions/aggregate_function_bit.h | 4 +-
.../aggregate_function_collect.h | 36 ++---
.../aggregate_functions/aggregate_function_corr.h | 48 +++----
.../aggregate_functions/aggregate_function_count.h | 8 +-
.../aggregate_function_count_by_enum.h | 28 ++--
.../aggregate_functions/aggregate_function_covar.h | 16 +--
.../aggregate_function_distinct.h | 16 +--
.../aggregate_function_foreach.h | 4 +-
.../aggregate_function_group_array_intersect.h | 32 ++---
.../aggregate_function_group_concat.h | 12 +-
.../aggregate_function_histogram.h | 16 +--
.../aggregate_function_hll_union_agg.h | 4 +-
.../aggregate_function_java_udaf.h | 4 +-
.../aggregate_function_linear_histogram.h | 28 ++--
.../aggregate_functions/aggregate_function_map.h | 15 +-
.../aggregate_function_map_v2.h | 8 +-
.../aggregate_function_min_max.h | 28 ++--
.../aggregate_function_min_max_by.h | 4 +-
.../aggregate_functions/aggregate_function_null.h | 6 +-
.../aggregate_function_orthogonal_bitmap.h | 36 ++---
.../aggregate_function_percentile.h | 28 ++--
.../aggregate_function_product.h | 12 +-
.../aggregate_function_regr_union.h | 20 +--
.../aggregate_functions/aggregate_function_rpc.h | 4 +-
.../aggregate_function_sequence_match.h | 28 ++--
.../aggregate_functions/aggregate_function_sort.h | 4 +-
.../aggregate_function_stddev.h | 12 +-
.../aggregate_functions/aggregate_function_sum.h | 4 +-
.../aggregate_functions/aggregate_function_topn.h | 20 +--
.../aggregate_functions/aggregate_function_uniq.h | 12 +-
.../aggregate_function_uniq_distribute_key.h | 4 +-
be/src/vec/aggregate_functions/moments.h | 4 +-
be/src/vec/common/hash_table/hash_table.h | 10 +-
be/src/vec/common/space_saving.h | 24 ++--
be/src/vec/common/string_buffer.hpp | 109 +++++++++++++++
be/src/vec/data_types/data_type_bitmap.cpp | 4 +-
be/src/vec/data_types/data_type_hll.cpp | 4 +-
be/src/vec/data_types/data_type_quantilestate.cpp | 4 +-
be/src/vec/io/io_helper.h | 154 ---------------------
be/src/vec/io/var_int.h | 43 +-----
be/test/vec/common/string_buffer_test.cpp | 73 ++++++++++
be/test/vec/core/field_test.cpp | 14 +-
49 files changed, 514 insertions(+), 526 deletions(-)
diff --git a/be/src/util/counts.h b/be/src/util/counts.h
index 43b815cf8ca..67880268da2 100644
--- a/be/src/util/counts.h
+++ b/be/src/util/counts.h
@@ -59,7 +59,7 @@ public:
if (!_nums.empty()) {
pdqsort(_nums.begin(), _nums.end());
size_t size = _nums.size();
- write_binary(size, buf);
+ buf.write_binary(size);
buf.write(reinterpret_cast<const char*>(_nums.data()), sizeof(Ty)
* size);
} else {
// convert _sorted_nums_vec to _nums and do seiralize again
@@ -70,7 +70,7 @@ public:
void unserialize(vectorized::BufferReadable& buf) {
size_t size;
- read_binary(size, buf);
+ buf.read_binary(size);
_nums.resize(size);
auto buff = buf.read(sizeof(Ty) * size);
memcpy(_nums.data(), buff.data, buff.size);
diff --git
a/be/src/vec/aggregate_functions/aggregate_function_approx_count_distinct.h
b/be/src/vec/aggregate_functions/aggregate_function_approx_count_distinct.h
index be67927c0eb..0efcaaad22b 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_approx_count_distinct.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_approx_count_distinct.h
@@ -63,12 +63,12 @@ struct AggregateFunctionApproxCountDistinctData {
std::string result;
result.resize(hll_data.max_serialized_size());
result.resize(hll_data.serialize((uint8_t*)result.data()));
- write_binary(result, buf);
+ buf.write_binary(result);
}
void read(BufferReadable& buf) {
StringRef result;
- read_binary(result, buf);
+ buf.read_binary(result);
Slice data = Slice(result.data, result.size);
hll_data.deserialize(data);
}
diff --git a/be/src/vec/aggregate_functions/aggregate_function_approx_top_k.h
b/be/src/vec/aggregate_functions/aggregate_function_approx_top_k.h
index 857df1ea3db..75adac6136a 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_approx_top_k.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_approx_top_k.h
@@ -73,12 +73,12 @@ public:
void serialize(ConstAggregateDataPtr __restrict place, BufferWritable&
buf) const override {
this->data(place).value.write(buf);
- write_var_uint(_column_names.size(), buf);
+ buf.write_var_uint(_column_names.size());
for (const auto& column_name : _column_names) {
- write_string_binary(column_name, buf);
+ buf.write_binary(column_name);
}
- write_var_uint(_threshold, buf);
- write_var_uint(_reserved, buf);
+ buf.write_var_uint(_threshold);
+ buf.write_var_uint(_reserved);
}
// Deserializes the aggregate function's state from a buffer (including
the SpaceSaving structure and threshold).
@@ -86,7 +86,7 @@ public:
Arena* arena) const override {
auto readStringBinaryInto = [](Arena& arena, BufferReadable& buf) {
uint64_t size = 0;
- read_var_uint(size, buf);
+ buf.read_var_uint(size);
if (UNLIKELY(size > DEFAULT_MAX_STRING_SIZE)) {
throw Exception(ErrorCode::INTERNAL_ERROR, "Too large string
size.");
@@ -102,7 +102,7 @@ public:
set.clear();
uint64_t size = 0;
- read_var_uint(size, buf);
+ buf.read_var_uint(size);
if (UNLIKELY(size > TOP_K_MAX_SIZE)) {
throw Exception(ErrorCode::INTERNAL_ERROR,
"Too large size ({}) for aggregate function '{}'
state (maximum is {})",
@@ -114,8 +114,8 @@ public:
auto ref = readStringBinaryInto(*arena, buf);
uint64_t count = 0;
uint64_t error = 0;
- read_var_uint(count, buf);
- read_var_uint(error, buf);
+ buf.read_var_uint(count);
+ buf.read_var_uint(error);
set.insert(ref, count, error);
arena->rollback(ref.size);
}
@@ -123,15 +123,15 @@ public:
set.read_alpha_map(buf);
uint64_t column_size = 0;
- read_var_uint(column_size, buf);
+ buf.read_var_uint(column_size);
_column_names.clear();
for (uint64_t i = 0; i < column_size; i++) {
std::string column_name;
- read_string_binary(column_name, buf);
+ buf.read_binary(column_name);
_column_names.emplace_back(std::move(column_name));
}
- read_var_uint(_threshold, buf);
- read_var_uint(_reserved, buf);
+ buf.read_var_uint(_threshold);
+ buf.read_var_uint(_reserved);
}
// Adds a new row of data to the aggregate function (inserts a new value
into the SpaceSaving structure).
diff --git a/be/src/vec/aggregate_functions/aggregate_function_approx_top_sum.h
b/be/src/vec/aggregate_functions/aggregate_function_approx_top_sum.h
index 46df8dde22c..ced3ac6f760 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_approx_top_sum.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_approx_top_sum.h
@@ -76,12 +76,12 @@ public:
void serialize(ConstAggregateDataPtr __restrict place, BufferWritable&
buf) const override {
this->data(place).value.write(buf);
- write_var_uint(_column_names.size(), buf);
+ buf.write_var_uint(_column_names.size());
for (const auto& column_name : _column_names) {
- write_string_binary(column_name, buf);
+ buf.write_binary(column_name);
}
- write_var_uint(_threshold, buf);
- write_var_uint(_reserved, buf);
+ buf.write_var_uint(_threshold);
+ buf.write_var_uint(_reserved);
}
// Deserializes the aggregate function's state from a buffer (including
the SpaceSaving structure and threshold).
@@ -89,7 +89,7 @@ public:
Arena* arena) const override {
auto readStringBinaryInto = [](Arena& arena, BufferReadable& buf) {
uint64_t size = 0;
- read_var_uint(size, buf);
+ buf.read_var_uint(size);
if (UNLIKELY(size > DEFAULT_MAX_STRING_SIZE)) {
throw Exception(ErrorCode::INTERNAL_ERROR, "Too large string
size.");
@@ -105,7 +105,7 @@ public:
set.clear();
uint64_t size = 0;
- read_var_uint(size, buf);
+ buf.read_var_uint(size);
if (UNLIKELY(size > TOP_K_MAX_SIZE)) {
throw Exception(ErrorCode::INTERNAL_ERROR,
"Too large size ({}) for aggregate function '{}'
state (maximum is {})",
@@ -117,8 +117,8 @@ public:
auto ref = readStringBinaryInto(*arena, buf);
uint64_t count = 0;
uint64_t error = 0;
- read_var_uint(count, buf);
- read_var_uint(error, buf);
+ buf.read_var_uint(count);
+ buf.read_var_uint(error);
set.insert(ref, count, error);
arena->rollback(ref.size);
}
@@ -126,15 +126,15 @@ public:
set.read_alpha_map(buf);
uint64_t column_size = 0;
- read_var_uint(column_size, buf);
+ buf.read_var_uint(column_size);
_column_names.clear();
for (uint64_t i = 0; i < column_size; i++) {
std::string column_name;
- read_string_binary(column_name, buf);
+ buf.read_binary(column_name);
_column_names.emplace_back(std::move(column_name));
}
- read_var_uint(_threshold, buf);
- read_var_uint(_reserved, buf);
+ buf.read_var_uint(_threshold);
+ buf.read_var_uint(_reserved);
}
// Adds a new row of data to the aggregate function (inserts a new value
into the SpaceSaving structure).
diff --git a/be/src/vec/aggregate_functions/aggregate_function_array_agg.h
b/be/src/vec/aggregate_functions/aggregate_function_array_agg.h
index 1005053c1b3..f4fa2dfbd53 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_array_agg.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_array_agg.h
@@ -98,14 +98,14 @@ struct AggregateFunctionArrayAggData {
void write(BufferWritable& buf) const {
const size_t size = null_map->size();
- write_binary(size, buf);
+ buf.write_binary(size);
for (size_t i = 0; i < size; i++) {
- write_binary(null_map->data()[i], buf);
+ buf.write_binary(null_map->data()[i]);
}
for (size_t i = 0; i < size; i++) {
- write_binary(nested_column->get_data()[i], buf);
+ buf.write_binary(nested_column->get_data()[i]);
}
}
@@ -113,16 +113,16 @@ struct AggregateFunctionArrayAggData {
DCHECK(null_map);
DCHECK(null_map->empty());
size_t size = 0;
- read_binary(size, buf);
+ buf.read_binary(size);
null_map->resize(size);
nested_column->reserve(size);
for (size_t i = 0; i < size; i++) {
- read_binary(null_map->data()[i], buf);
+ buf.read_binary(null_map->data()[i]);
}
ElementType data_value;
for (size_t i = 0; i < size; i++) {
- read_binary(data_value, buf);
+ buf.read_binary(data_value);
nested_column->get_data().push_back(data_value);
}
}
@@ -201,12 +201,12 @@ struct AggregateFunctionArrayAggData<T> {
void write(BufferWritable& buf) const {
const size_t size = null_map->size();
- write_binary(size, buf);
+ buf.write_binary(size);
for (size_t i = 0; i < size; i++) {
- write_binary(null_map->data()[i], buf);
+ buf.write_binary(null_map->data()[i]);
}
for (size_t i = 0; i < size; i++) {
- write_string_binary(nested_column->get_data_at(i), buf);
+ buf.write_binary(nested_column->get_data_at(i));
}
}
@@ -214,16 +214,16 @@ struct AggregateFunctionArrayAggData<T> {
DCHECK(null_map);
DCHECK(null_map->empty());
size_t size = 0;
- read_binary(size, buf);
+ buf.read_binary(size);
null_map->resize(size);
nested_column->reserve(size);
for (size_t i = 0; i < size; i++) {
- read_binary(null_map->data()[i], buf);
+ buf.read_binary(null_map->data()[i]);
}
StringRef s;
for (size_t i = 0; i < size; i++) {
- read_string_binary(s, buf);
+ buf.read_binary(s);
nested_column->insert_data(s.data, s.size);
}
}
diff --git a/be/src/vec/aggregate_functions/aggregate_function_avg.h
b/be/src/vec/aggregate_functions/aggregate_function_avg.h
index 14b7e2bea27..1d7472eb358 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_avg.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_avg.h
@@ -96,13 +96,13 @@ struct AggregateFunctionAvgData {
}
void write(BufferWritable& buf) const {
- write_binary(sum, buf);
- write_binary(count, buf);
+ buf.write_binary(sum);
+ buf.write_binary(count);
}
void read(BufferReadable& buf) {
- read_binary(sum, buf);
- read_binary(count, buf);
+ buf.read_binary(sum);
+ buf.read_binary(count);
}
};
diff --git a/be/src/vec/aggregate_functions/aggregate_function_avg_weighted.h
b/be/src/vec/aggregate_functions/aggregate_function_avg_weighted.h
index b1fa380ba53..0d702c0f811 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_avg_weighted.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_avg_weighted.h
@@ -60,13 +60,13 @@ struct AggregateFunctionAvgWeightedData {
}
void write(BufferWritable& buf) const {
- write_binary(data_sum, buf);
- write_binary(weight_sum, buf);
+ buf.write_binary(data_sum);
+ buf.write_binary(weight_sum);
}
void read(BufferReadable& buf) {
- read_binary(data_sum, buf);
- read_binary(weight_sum, buf);
+ buf.read_binary(data_sum);
+ buf.read_binary(weight_sum);
}
void merge(const AggregateFunctionAvgWeightedData& rhs) {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_bit.h
b/be/src/vec/aggregate_functions/aggregate_function_bit.h
index 1798943a8ba..62819cd86df 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_bit.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_bit.h
@@ -45,8 +45,8 @@ struct AggregateFunctionBaseData {
public:
AggregateFunctionBaseData(typename PrimitiveTypeTraits<T>::CppType
init_value)
: res_bit(init_value) {}
- void write(BufferWritable& buf) const { write_binary(res_bit, buf); }
- void read(BufferReadable& buf) { read_binary(res_bit, buf); }
+ void write(BufferWritable& buf) const { buf.write_binary(res_bit); }
+ void read(BufferReadable& buf) { buf.read_binary(res_bit); }
typename PrimitiveTypeTraits<T>::CppType get() const { return res_bit; }
protected:
diff --git a/be/src/vec/aggregate_functions/aggregate_function_collect.h
b/be/src/vec/aggregate_functions/aggregate_function_collect.h
index 4b48e6fe89d..50d841ef63e 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_collect.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_collect.h
@@ -83,19 +83,19 @@ struct AggregateFunctionCollectSetData {
}
void write(BufferWritable& buf) const {
- write_var_uint(data_set.size(), buf);
+ buf.write_var_uint(data_set.size());
for (const auto& value : data_set) {
- write_binary(value, buf);
+ buf.write_binary(value);
}
write_var_int(max_size, buf);
}
void read(BufferReadable& buf) {
uint64_t new_size = 0;
- read_var_uint(new_size, buf);
+ buf.read_var_uint(new_size);
ElementType x;
for (size_t i = 0; i < new_size; ++i) {
- read_binary(x, buf);
+ buf.read_binary(x);
data_set.insert(x);
}
read_var_int(max_size, buf);
@@ -153,19 +153,19 @@ struct AggregateFunctionCollectSetData<T, HasLimit> {
}
void write(BufferWritable& buf) const {
- write_var_uint(size(), buf);
+ buf.write_var_uint(size());
for (const auto& elem : data_set) {
- write_string_binary(elem, buf);
+ buf.write_binary(elem);
}
write_var_int(max_size, buf);
}
void read(BufferReadable& buf) {
UInt64 size;
- read_var_uint(size, buf);
+ buf.read_var_uint(size);
StringRef ref;
for (size_t i = 0; i < size; ++i) {
- read_string_binary(ref, buf);
+ buf.read_binary(ref);
data_set.insert(ref);
}
read_var_int(max_size, buf);
@@ -219,14 +219,14 @@ struct AggregateFunctionCollectListData {
}
void write(BufferWritable& buf) const {
- write_var_uint(size(), buf);
+ buf.write_var_uint(size());
buf.write(data.raw_data(), size() * sizeof(ElementType));
write_var_int(max_size, buf);
}
void read(BufferReadable& buf) {
UInt64 rows = 0;
- read_var_uint(rows, buf);
+ buf.read_var_uint(rows);
data.resize(rows);
buf.read(reinterpret_cast<char*>(data.data()), rows *
sizeof(ElementType));
read_var_int(max_size, buf);
@@ -278,10 +278,10 @@ struct AggregateFunctionCollectListData<T, HasLimit> {
void write(BufferWritable& buf) const {
auto& col = assert_cast<ColVecType&>(*data);
- write_var_uint(col.size(), buf);
+ buf.write_var_uint(col.size());
buf.write(col.get_offsets().raw_data(), col.size() *
sizeof(IColumn::Offset));
- write_var_uint(col.get_chars().size(), buf);
+ buf.write_var_uint(col.get_chars().size());
buf.write(col.get_chars().raw_data(), col.get_chars().size());
write_var_int(max_size, buf);
}
@@ -289,13 +289,13 @@ struct AggregateFunctionCollectListData<T, HasLimit> {
void read(BufferReadable& buf) {
auto& col = assert_cast<ColVecType&>(*data);
UInt64 offs_size = 0;
- read_var_uint(offs_size, buf);
+ buf.read_var_uint(offs_size);
col.get_offsets().resize(offs_size);
buf.read(reinterpret_cast<char*>(col.get_offsets().data()),
offs_size * sizeof(IColumn::Offset));
UInt64 chars_size = 0;
- read_var_uint(chars_size, buf);
+ buf.read_var_uint(chars_size);
col.get_chars().resize(chars_size);
buf.read(reinterpret_cast<char*>(col.get_chars().data()), chars_size);
read_var_int(max_size, buf);
@@ -349,7 +349,7 @@ struct AggregateFunctionCollectListData<T, HasLimit> {
void write(BufferWritable& buf) const {
const size_t size = column_data->size();
- write_binary(size, buf);
+ buf.write_binary(size);
DataTypeSerDe::FormatOptions opt;
auto tmp_str = ColumnString::create();
@@ -363,7 +363,7 @@ struct AggregateFunctionCollectListData<T, HasLimit> {
" error: " + st.to_string());
}
tmp_buf.commit();
- write_string_binary(tmp_str->get_data_at(0), buf);
+ buf.write_binary(tmp_str->get_data_at(0));
}
write_var_int(max_size, buf);
@@ -371,14 +371,14 @@ struct AggregateFunctionCollectListData<T, HasLimit> {
void read(BufferReadable& buf) {
size_t size = 0;
- read_binary(size, buf);
+ buf.read_binary(size);
column_data->clear();
column_data->reserve(size);
StringRef s;
DataTypeSerDe::FormatOptions opt;
for (size_t i = 0; i < size; i++) {
- read_string_binary(s, buf);
+ buf.read_binary(s);
Slice slice(s.data, s.size);
if (Status st =
serde->deserialize_one_cell_from_json(*column_data, slice, opt); !st) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
diff --git a/be/src/vec/aggregate_functions/aggregate_function_corr.h
b/be/src/vec/aggregate_functions/aggregate_function_corr.h
index c3f67ef2b5a..7e3a1591e99 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_corr.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_corr.h
@@ -49,21 +49,21 @@ struct CorrMoment {
}
void write(BufferWritable& buf) const {
- write_binary(m0, buf);
- write_binary(x1, buf);
- write_binary(y1, buf);
- write_binary(xy, buf);
- write_binary(x2, buf);
- write_binary(y2, buf);
+ buf.write_binary(m0);
+ buf.write_binary(x1);
+ buf.write_binary(y1);
+ buf.write_binary(xy);
+ buf.write_binary(x2);
+ buf.write_binary(y2);
}
void read(BufferReadable& buf) {
- read_binary(m0, buf);
- read_binary(x1, buf);
- read_binary(y1, buf);
- read_binary(xy, buf);
- read_binary(x2, buf);
- read_binary(y2, buf);
+ buf.read_binary(m0);
+ buf.read_binary(x1);
+ buf.read_binary(y1);
+ buf.read_binary(xy);
+ buf.read_binary(x2);
+ buf.read_binary(y2);
}
T get() const {
@@ -130,21 +130,21 @@ struct CorrMomentWelford {
}
void write(BufferWritable& buf) const {
- write_binary(meanX, buf);
- write_binary(meanY, buf);
- write_binary(c2, buf);
- write_binary(m2X, buf);
- write_binary(m2Y, buf);
- write_binary(count, buf);
+ buf.write_binary(meanX);
+ buf.write_binary(meanY);
+ buf.write_binary(c2);
+ buf.write_binary(m2X);
+ buf.write_binary(m2Y);
+ buf.write_binary(count);
}
void read(BufferReadable& buf) {
- read_binary(meanX, buf);
- read_binary(meanY, buf);
- read_binary(c2, buf);
- read_binary(m2X, buf);
- read_binary(m2Y, buf);
- read_binary(count, buf);
+ buf.read_binary(meanX);
+ buf.read_binary(meanY);
+ buf.read_binary(c2);
+ buf.read_binary(m2X);
+ buf.read_binary(m2Y);
+ buf.read_binary(count);
}
double get() const {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_count.h
b/be/src/vec/aggregate_functions/aggregate_function_count.h
index 99e7422b108..57e4bacbdd0 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_count.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_count.h
@@ -74,12 +74,12 @@ public:
}
void serialize(ConstAggregateDataPtr __restrict place, BufferWritable&
buf) const override {
- write_var_uint(data(place).count, buf);
+ buf.write_var_uint(data(place).count);
}
void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
Arena*) const override {
- read_var_uint(data(place).count, buf);
+ buf.read_var_uint(data(place).count);
}
void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn&
to) const override {
@@ -221,12 +221,12 @@ public:
}
void serialize(ConstAggregateDataPtr __restrict place, BufferWritable&
buf) const override {
- write_var_uint(data(place).count, buf);
+ buf.write_var_uint(data(place).count);
}
void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
Arena*) const override {
- read_var_uint(data(place).count, buf);
+ buf.read_var_uint(data(place).count);
}
void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn&
to) const override {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_count_by_enum.h
b/be/src/vec/aggregate_functions/aggregate_function_count_by_enum.h
index 543ae55f872..27b71559f03 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_count_by_enum.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_count_by_enum.h
@@ -125,20 +125,20 @@ struct AggregateFunctionCountByEnumData {
}
void write(BufferWritable& buf) const {
- write_binary(data_vec.size(), buf);
+ buf.write_binary(data_vec.size());
for (const auto& data : data_vec) {
const MapType& unordered_map = data.cbe;
- write_binary(unordered_map.size(), buf);
+ buf.write_binary(unordered_map.size());
for (const auto& [key, value] : unordered_map) {
- write_binary(value, buf);
- write_binary(key, buf);
+ buf.write_binary(value);
+ buf.write_binary(key);
}
- write_binary(data.not_null, buf);
- write_binary(data.null, buf);
- write_binary(data.all, buf);
+ buf.write_binary(data.not_null);
+ buf.write_binary(data.null);
+ buf.write_binary(data.all);
}
}
@@ -146,27 +146,27 @@ struct AggregateFunctionCountByEnumData {
data_vec.clear();
uint64_t vec_size_number = 0;
- read_binary(vec_size_number, buf);
+ buf.read_binary(vec_size_number);
for (int idx = 0; idx < vec_size_number; idx++) {
uint64_t element_number = 0;
- read_binary(element_number, buf);
+ buf.read_binary(element_number);
MapType unordered_map;
unordered_map.reserve(element_number);
for (auto i = 0; i < element_number; i++) {
std::string key;
uint64_t value;
- read_binary(value, buf);
- read_binary(key, buf);
+ buf.read_binary(value);
+ buf.read_binary(key);
unordered_map.emplace(std::move(key), value);
}
CountByEnumData data;
data.cbe = std::move(unordered_map);
- read_binary(data.not_null, buf);
- read_binary(data.null, buf);
- read_binary(data.all, buf);
+ buf.read_binary(data.not_null);
+ buf.read_binary(data.null);
+ buf.read_binary(data.all);
data_vec.emplace_back(std::move(data));
}
}
diff --git a/be/src/vec/aggregate_functions/aggregate_function_covar.h
b/be/src/vec/aggregate_functions/aggregate_function_covar.h
index 81c4973ddfa..04bd3760374 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_covar.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_covar.h
@@ -51,17 +51,17 @@ struct BaseData {
static DataTypePtr get_return_type() { return
std::make_shared<DataTypeFloat64>(); }
void write(BufferWritable& buf) const {
- write_binary(sum_x, buf);
- write_binary(sum_y, buf);
- write_binary(sum_xy, buf);
- write_binary(count, buf);
+ buf.write_binary(sum_x);
+ buf.write_binary(sum_y);
+ buf.write_binary(sum_xy);
+ buf.write_binary(count);
}
void read(BufferReadable& buf) {
- read_binary(sum_x, buf);
- read_binary(sum_y, buf);
- read_binary(sum_xy, buf);
- read_binary(count, buf);
+ buf.read_binary(sum_x);
+ buf.read_binary(sum_y);
+ buf.read_binary(sum_xy);
+ buf.read_binary(count);
}
void reset() {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_distinct.h
b/be/src/vec/aggregate_functions/aggregate_function_distinct.h
index 616d8a1e9a9..c697f3be0f2 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_distinct.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_distinct.h
@@ -88,9 +88,9 @@ struct AggregateFunctionDistinctSingleNumericData {
void serialize(BufferWritable& buf) const {
DCHECK(!stable);
if constexpr (!stable) {
- write_var_uint(data.size(), buf);
+ buf.write_var_uint(data.size());
for (const auto& value : data) {
- write_binary(value, buf);
+ buf.write_binary(value);
}
}
}
@@ -99,10 +99,10 @@ struct AggregateFunctionDistinctSingleNumericData {
DCHECK(!stable);
if constexpr (!stable) {
uint64_t new_size = 0;
- read_var_uint(new_size, buf);
+ buf.read_var_uint(new_size);
typename PrimitiveTypeTraits<T>::CppType x;
for (size_t i = 0; i < new_size; ++i) {
- read_binary(x, buf);
+ buf.read_binary(x);
data.insert(x);
}
}
@@ -153,9 +153,9 @@ struct AggregateFunctionDistinctGenericData {
void serialize(BufferWritable& buf) const {
DCHECK(!stable);
if constexpr (!stable) {
- write_var_uint(data.size(), buf);
+ buf.write_var_uint(data.size());
for (const auto& elem : data) {
- write_string_binary(elem, buf);
+ buf.write_binary(elem);
}
}
}
@@ -164,11 +164,11 @@ struct AggregateFunctionDistinctGenericData {
DCHECK(!stable);
if constexpr (!stable) {
UInt64 size;
- read_var_uint(size, buf);
+ buf.read_var_uint(size);
StringRef ref;
for (size_t i = 0; i < size; ++i) {
- read_string_binary(ref, buf);
+ buf.read_binary(ref);
data.insert(ref);
}
}
diff --git a/be/src/vec/aggregate_functions/aggregate_function_foreach.h
b/be/src/vec/aggregate_functions/aggregate_function_foreach.h
index 9a4801c43bd..35e9a7271cf 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_foreach.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_foreach.h
@@ -180,7 +180,7 @@ public:
void serialize(ConstAggregateDataPtr __restrict place, BufferWritable&
buf) const override {
const AggregateFunctionForEachData& state = data(place);
- write_binary(state.dynamic_array_size, buf);
+ buf.write_binary(state.dynamic_array_size);
const char* nested_state = state.array_of_aggregate_datas;
for (size_t i = 0; i < state.dynamic_array_size; ++i) {
nested_function->serialize(nested_state, buf);
@@ -193,7 +193,7 @@ public:
AggregateFunctionForEachData& state = data(place);
size_t new_size = 0;
- read_binary(new_size, buf);
+ buf.read_binary(new_size);
ensure_aggregate_data(place, new_size, *arena);
diff --git
a/be/src/vec/aggregate_functions/aggregate_function_group_array_intersect.h
b/be/src/vec/aggregate_functions/aggregate_function_group_array_intersect.h
index f0111e6443c..fac2c7d060f 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_group_array_intersect.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_group_array_intersect.h
@@ -222,15 +222,15 @@ public:
auto& init = data.init;
const bool is_set_contain_null = set->contain_null();
- write_pod_binary(is_set_contain_null, buf);
- write_pod_binary(init, buf);
- write_var_uint(set->size(), buf);
+ buf.write_binary(is_set_contain_null);
+ buf.write_binary(init);
+ buf.write_var_uint(set->size());
HybridSetBase::IteratorBase* it = set->begin();
while (it->has_next()) {
const typename PrimitiveTypeTraits<T>::CppType* value_ptr =
static_cast<const typename
PrimitiveTypeTraits<T>::CppType*>(it->get_value());
- write_int_binary((*value_ptr), buf);
+ buf.write_binary((*value_ptr));
it->next();
}
}
@@ -240,15 +240,15 @@ public:
auto& data = this->data(place);
bool is_set_contain_null;
- read_pod_binary(is_set_contain_null, buf);
+ buf.read_binary(is_set_contain_null);
data.value->change_contain_null_value(is_set_contain_null);
- read_pod_binary(data.init, buf);
+ buf.read_binary(data.init);
UInt64 size;
- read_var_uint(size, buf);
+ buf.read_var_uint(size);
typename PrimitiveTypeTraits<T>::CppType element;
for (UInt64 i = 0; i < size; ++i) {
- read_int_binary(element, buf);
+ buf.read_binary(element);
data.value->insert(static_cast<void*>(&element));
}
}
@@ -461,14 +461,14 @@ public:
auto& init = data.init;
const bool is_set_contain_null = set->contain_null();
- write_pod_binary(is_set_contain_null, buf);
- write_pod_binary(init, buf);
- write_var_uint(set->size(), buf);
+ buf.write_binary(is_set_contain_null);
+ buf.write_binary(init);
+ buf.write_var_uint(set->size());
HybridSetBase::IteratorBase* it = set->begin();
while (it->has_next()) {
const auto* value = reinterpret_cast<const
StringRef*>(it->get_value());
- write_string_binary(*value, buf);
+ buf.write_binary(*value);
it->next();
}
}
@@ -478,15 +478,15 @@ public:
auto& data = this->data(place);
bool is_set_contain_null;
- read_pod_binary(is_set_contain_null, buf);
+ buf.read_binary(is_set_contain_null);
data.value->change_contain_null_value(is_set_contain_null);
- read_pod_binary(data.init, buf);
+ buf.read_binary(data.init);
UInt64 size;
- read_var_uint(size, buf);
+ buf.read_var_uint(size);
StringRef element;
for (UInt64 i = 0; i < size; ++i) {
- element = read_string_binary_into(*arena, buf);
+ element = read_binary_into(*arena, buf);
data.value->insert((void*)element.data, element.size);
}
}
diff --git a/be/src/vec/aggregate_functions/aggregate_function_group_concat.h
b/be/src/vec/aggregate_functions/aggregate_function_group_concat.h
index 5f598e411b2..98b6503e167 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_group_concat.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_group_concat.h
@@ -91,15 +91,15 @@ struct AggregateFunctionGroupConcatData {
StringRef get() const { return StringRef {data.data(), data.size()}; }
void write(BufferWritable& buf) const {
- write_binary(StringRef {data.data(), data.size()}, buf);
- write_binary(separator, buf);
- write_binary(inited, buf);
+ buf.write_binary(data);
+ buf.write_binary(separator);
+ buf.write_binary(inited);
}
void read(BufferReadable& buf) {
- read_binary(data, buf);
- read_binary(separator, buf);
- read_binary(inited, buf);
+ buf.read_binary(data);
+ buf.read_binary(separator);
+ buf.read_binary(inited);
}
void reset() {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_histogram.h
b/be/src/vec/aggregate_functions/aggregate_function_histogram.h
index 2ca9856e7d9..a6b8a413063 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_histogram.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_histogram.h
@@ -94,30 +94,30 @@ struct AggregateFunctionHistogramData {
}
void write(BufferWritable& buf) const {
- write_binary(max_num_buckets, buf);
+ buf.write_binary(max_num_buckets);
auto element_number = (size_t)ordered_map.size();
- write_binary(element_number, buf);
+ buf.write_binary(element_number);
auto pair_vector = map_to_vector();
for (auto i = 0; i < element_number; i++) {
auto element = pair_vector[i];
- write_binary(element.second, buf);
- write_binary(element.first, buf);
+ buf.write_binary(element.second);
+ buf.write_binary(element.first);
}
}
void read(BufferReadable& buf) {
- read_binary(max_num_buckets, buf);
+ buf.read_binary(max_num_buckets);
size_t element_number = 0;
- read_binary(element_number, buf);
+ buf.read_binary(element_number);
ordered_map.clear();
std::pair<typename PrimitiveTypeTraits<T>::ColumnItemType, size_t>
element;
for (auto i = 0; i < element_number; i++) {
- read_binary(element.first, buf);
- read_binary(element.second, buf);
+ buf.read_binary(element.first);
+ buf.read_binary(element.second);
ordered_map.insert(element);
}
}
diff --git a/be/src/vec/aggregate_functions/aggregate_function_hll_union_agg.h
b/be/src/vec/aggregate_functions/aggregate_function_hll_union_agg.h
index ef4eae8d7bc..f7e2fda4161 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_hll_union_agg.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_hll_union_agg.h
@@ -59,12 +59,12 @@ struct AggregateFunctionHLLData {
void write(BufferWritable& buf) const {
std::string result(dst_hll.max_serialized_size(), '0');
result.resize(dst_hll.serialize((uint8_t*)result.c_str()));
- write_binary(result, buf);
+ buf.write_binary(result);
}
void read(BufferReadable& buf) {
StringRef ref;
- read_binary(ref, buf);
+ buf.read_binary(ref);
dst_hll.deserialize(Slice(ref.data, ref.size));
}
diff --git a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
index f8815c30d70..f4b23786586 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
@@ -170,7 +170,7 @@ public:
int len = env->GetArrayLength(arr);
serialize_data.resize(len);
env->GetByteArrayRegion(arr, 0, len,
reinterpret_cast<jbyte*>(serialize_data.data()));
- write_binary(serialize_data, buf);
+ buf.write_binary(serialize_data);
jbyte* pBytes = env->GetByteArrayElements(arr, nullptr);
env->ReleaseByteArrayElements(arr, pBytes, JNI_ABORT);
env->DeleteLocalRef(arr);
@@ -184,7 +184,7 @@ public:
return JniUtil::GetJniExceptionMsg(env);
}
- void read(BufferReadable& buf) { read_binary(serialize_data, buf); }
+ void read(BufferReadable& buf) { buf.read_binary(serialize_data); }
Status destroy() {
JNIEnv* env = nullptr;
diff --git
a/be/src/vec/aggregate_functions/aggregate_function_linear_histogram.h
b/be/src/vec/aggregate_functions/aggregate_function_linear_histogram.h
index 51440687d3f..8d7b7e100c1 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_linear_histogram.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_linear_histogram.h
@@ -100,30 +100,30 @@ public:
// write
void write(BufferWritable& buf) const {
- write_binary(offset, buf);
- write_binary(interval, buf);
- write_binary(lower, buf);
- write_binary(upper, buf);
- write_binary(buckets.size(), buf);
+ buf.write_binary(offset);
+ buf.write_binary(interval);
+ buf.write_binary(lower);
+ buf.write_binary(upper);
+ buf.write_binary(buckets.size());
for (const auto& [key, count] : buckets) {
- write_binary(key, buf);
- write_binary(count, buf);
+ buf.write_binary(key);
+ buf.write_binary(count);
}
}
// read
void read(BufferReadable& buf) {
- read_binary(offset, buf);
- read_binary(interval, buf);
- read_binary(lower, buf);
- read_binary(upper, buf);
+ buf.read_binary(offset);
+ buf.read_binary(interval);
+ buf.read_binary(lower);
+ buf.read_binary(upper);
size_t size;
- read_binary(size, buf);
+ buf.read_binary(size);
for (size_t i = 0; i < size; i++) {
int32_t key;
size_t count;
- read_binary(key, buf);
- read_binary(count, buf);
+ buf.read_binary(key);
+ buf.read_binary(count);
buckets[key] = count;
}
}
diff --git a/be/src/vec/aggregate_functions/aggregate_function_map.h
b/be/src/vec/aggregate_functions/aggregate_function_map.h
index d23615e0ebe..d8bc506ce5d 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_map.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_map.h
@@ -140,23 +140,22 @@ struct AggregateFunctionMapAggData {
void write(BufferWritable& buf) const {
const size_t size = _key_column->size();
- write_binary(size, buf);
+ buf.write_binary(size);
for (size_t i = 0; i < size; i++) {
- write_binary(assert_cast<KeyColumnType&,
TypeCheckOnRelease::DISABLE>(*_key_column)
- .get_data_at(i),
- buf);
+ buf.write_binary(assert_cast<KeyColumnType&,
TypeCheckOnRelease::DISABLE>(*_key_column)
+ .get_data_at(i));
}
for (size_t i = 0; i < size; i++) {
- write_binary(_value_column->get_data_at(i), buf);
+ buf.write_binary(_value_column->get_data_at(i));
}
}
void read(BufferReadable& buf) {
size_t size = 0;
- read_binary(size, buf);
+ buf.read_binary(size);
StringRef key;
for (size_t i = 0; i < size; i++) {
- read_binary(key, buf);
+ buf.read_binary(key);
DCHECK(_map.find(key) == _map.cend());
key.data = _arena.insert(key.data, key.size);
assert_cast<KeyColumnType&,
TypeCheckOnRelease::DISABLE>(*_key_column)
@@ -164,7 +163,7 @@ struct AggregateFunctionMapAggData {
}
StringRef val;
for (size_t i = 0; i < size; i++) {
- read_binary(val, buf);
+ buf.read_binary(val);
_value_column->insert_data(val.data, val.size);
}
}
diff --git a/be/src/vec/aggregate_functions/aggregate_function_map_v2.h
b/be/src/vec/aggregate_functions/aggregate_function_map_v2.h
index 53290056fa5..620873cf38d 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_map_v2.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_map_v2.h
@@ -131,7 +131,7 @@ struct AggregateFunctionMapAggDataV2 {
DCHECK_LE(written_bytes, serialized_bytes);
serialized_buffer.resize(serialized_bytes);
- write_string_binary(serialized_buffer, buf);
+ buf.write_binary(serialized_buffer);
serialized_bytes =
_value_type->get_uncompressed_serialized_bytes(*_value_column,
_be_version);
@@ -143,20 +143,20 @@ struct AggregateFunctionMapAggDataV2 {
DCHECK_LE(written_bytes, serialized_bytes);
serialized_buffer.resize(written_bytes);
- write_string_binary(serialized_buffer, buf);
+ buf.write_binary(serialized_buffer);
}
void read(BufferReadable& buf) {
std::string deserialized_buffer;
- read_string_binary(deserialized_buffer, buf);
+ buf.read_binary(deserialized_buffer);
const auto* ptr =
_key_type->deserialize(deserialized_buffer.data(),
&_key_column, _be_version);
auto read_bytes = ptr - deserialized_buffer.data();
DCHECK_EQ(read_bytes, deserialized_buffer.size());
- read_string_binary(deserialized_buffer, buf);
+ buf.read_binary(deserialized_buffer);
ptr = _value_type->deserialize(deserialized_buffer.data(),
&_value_column, _be_version);
read_bytes = ptr - deserialized_buffer.data();
diff --git a/be/src/vec/aggregate_functions/aggregate_function_min_max.h
b/be/src/vec/aggregate_functions/aggregate_function_min_max.h
index fafbc2b36c0..02875e45d7e 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_min_max.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_min_max.h
@@ -109,16 +109,16 @@ public:
}
void write(BufferWritable& buf) const {
- write_binary(has(), buf);
+ buf.write_binary(has());
if (has()) {
- write_binary(value, buf);
+ buf.write_binary(value);
}
}
void read(BufferReadable& buf, Arena*) {
- read_binary(has_value, buf);
+ buf.read_binary(has_value);
if (has()) {
- read_binary(value, buf);
+ buf.read_binary(value);
}
}
@@ -248,16 +248,16 @@ public:
}
void write(BufferWritable& buf) const {
- write_binary(has(), buf);
+ buf.write_binary(has());
if (has()) {
- write_binary(value, buf);
+ buf.write_binary(value);
}
}
void read(BufferReadable& buf, Arena*) {
- read_binary(has_value, buf);
+ buf.read_binary(has_value);
if (has()) {
- read_binary(value, buf);
+ buf.read_binary(value);
}
}
@@ -384,7 +384,7 @@ public:
}
void write(BufferWritable& buf) const {
- write_binary(size, buf);
+ buf.write_binary(size);
if (has()) {
buf.write(get_data(), size);
}
@@ -392,7 +392,7 @@ public:
void read(BufferReadable& buf, Arena*) {
Int32 rhs_size;
- read_binary(rhs_size, buf);
+ buf.read_binary(rhs_size);
if (rhs_size >= 0) {
if (rhs_size <= MAX_SMALL_STRING_SIZE) {
@@ -630,7 +630,7 @@ struct SingleValueDataComplexType {
}
void write(BufferWritable& buf) const {
- write_binary(has(), buf);
+ buf.write_binary(has());
if (!has()) {
return;
}
@@ -638,17 +638,17 @@ struct SingleValueDataComplexType {
column_type->get_uncompressed_serialized_bytes(*column_data,
be_exec_version);
std::string memory_buffer(size_bytes, '0');
auto* p = column_type->serialize(*column_data, memory_buffer.data(),
be_exec_version);
- write_binary(memory_buffer, buf);
+ buf.write_binary(memory_buffer);
DCHECK_EQ(p, memory_buffer.data() + size_bytes);
}
void read(BufferReadable& buf, Arena* arena) {
- read_binary(has_value, buf);
+ buf.read_binary(has_value);
if (!has()) {
return;
}
std::string memory_buffer;
- read_binary(memory_buffer, buf);
+ buf.read_binary(memory_buffer);
const auto* p =
column_type->deserialize(memory_buffer.data(), &column_data,
be_exec_version);
DCHECK_EQ(p, memory_buffer.data() + memory_buffer.size());
diff --git a/be/src/vec/aggregate_functions/aggregate_function_min_max_by.h
b/be/src/vec/aggregate_functions/aggregate_function_min_max_by.h
index 0adcaabd9c3..f76448465cc 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_min_max_by.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_min_max_by.h
@@ -59,14 +59,14 @@ public:
}
void write(BufferWritable& buf) const {
- write_binary(has(), buf);
+ buf.write_binary(has());
if (has()) {
DataTypeBitMap::serialize_as_stream(value, buf);
}
}
void read(BufferReadable& buf, Arena*) {
- read_binary(has_value, buf);
+ buf.read_binary(has_value);
if (has()) {
DataTypeBitMap::deserialize_as_stream(value, buf);
}
diff --git a/be/src/vec/aggregate_functions/aggregate_function_null.h
b/be/src/vec/aggregate_functions/aggregate_function_null.h
index 274a39d8409..62c2b4ddba1 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_null.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_null.h
@@ -135,7 +135,7 @@ public:
void serialize(ConstAggregateDataPtr __restrict place, BufferWritable&
buf) const override {
bool flag = get_flag(place);
if (result_is_nullable) {
- write_binary(flag, buf);
+ buf.write_binary(flag);
}
if (flag) {
nested_function->serialize(nested_place(place), buf);
@@ -146,7 +146,7 @@ public:
Arena* arena) const override {
bool flag = true;
if (result_is_nullable) {
- read_binary(flag, buf);
+ buf.read_binary(flag);
}
if (flag) {
set_flag(place);
@@ -158,7 +158,7 @@ public:
BufferReadable& buf, Arena* arena) const
override {
bool flag = true;
if (result_is_nullable) {
- read_binary(flag, buf);
+ buf.read_binary(flag);
}
if (flag) {
set_flag(rhs);
diff --git
a/be/src/vec/aggregate_functions/aggregate_function_orthogonal_bitmap.h
b/be/src/vec/aggregate_functions/aggregate_function_orthogonal_bitmap.h
index b2dcca99a6a..1fc6f8a3985 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_orthogonal_bitmap.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_orthogonal_bitmap.h
@@ -122,13 +122,13 @@ public:
}
void write(BufferWritable& buf) {
- write_binary(AggOrthBitmapBaseData<T>::first_init, buf);
+ buf.write_binary(AggOrthBitmapBaseData<T>::first_init);
result = AggOrthBitmapBaseData<T>::bitmap.intersect();
DataTypeBitMap::serialize_as_stream(result, buf);
}
void read(BufferReadable& buf) {
- read_binary(AggOrthBitmapBaseData<T>::first_init, buf);
+ buf.read_binary(AggOrthBitmapBaseData<T>::first_init);
DataTypeBitMap::deserialize_as_stream(result, buf);
}
@@ -158,17 +158,17 @@ public:
}
void write(BufferWritable& buf) {
- write_binary(AggOrthBitmapBaseData<T>::first_init, buf);
+ buf.write_binary(AggOrthBitmapBaseData<T>::first_init);
std::string data;
data.resize(AggOrthBitmapBaseData<T>::bitmap.size());
AggOrthBitmapBaseData<T>::bitmap.serialize(data.data());
- write_binary(data, buf);
+ buf.write_binary(data);
}
void read(BufferReadable& buf) {
- read_binary(AggOrthBitmapBaseData<T>::first_init, buf);
+ buf.read_binary(AggOrthBitmapBaseData<T>::first_init);
std::string data;
- read_binary(data, buf);
+ buf.read_binary(data);
AggOrthBitmapBaseData<T>::bitmap.deserialize(data.data());
}
@@ -199,14 +199,14 @@ public:
}
void write(BufferWritable& buf) {
- write_binary(AggOrthBitmapBaseData<T>::first_init, buf);
+ buf.write_binary(AggOrthBitmapBaseData<T>::first_init);
result = AggOrthBitmapBaseData<T>::bitmap.intersect_count();
- write_binary(result, buf);
+ buf.write_binary(result);
}
void read(BufferReadable& buf) {
- read_binary(AggOrthBitmapBaseData<T>::first_init, buf);
- read_binary(result, buf);
+ buf.read_binary(AggOrthBitmapBaseData<T>::first_init);
+ buf.read_binary(result);
}
void get(IColumn& to) const {
@@ -272,13 +272,13 @@ public:
}
void write(BufferWritable& buf) {
- write_binary(AggOrthBitmapExprCalBaseData<T>::first_init, buf);
+ buf.write_binary(AggOrthBitmapExprCalBaseData<T>::first_init);
result =
AggOrthBitmapExprCalBaseData<T>::bitmap_expr_cal.bitmap_calculate();
DataTypeBitMap::serialize_as_stream(result, buf);
}
void read(BufferReadable& buf) {
- read_binary(AggOrthBitmapExprCalBaseData<T>::first_init, buf);
+ buf.read_binary(AggOrthBitmapExprCalBaseData<T>::first_init);
DataTypeBitMap::deserialize_as_stream(result, buf);
}
@@ -314,14 +314,14 @@ public:
}
void write(BufferWritable& buf) {
- write_binary(AggOrthBitmapExprCalBaseData<T>::first_init, buf);
+ buf.write_binary(AggOrthBitmapExprCalBaseData<T>::first_init);
result =
AggOrthBitmapExprCalBaseData<T>::bitmap_expr_cal.bitmap_calculate_count();
- write_binary(result, buf);
+ buf.write_binary(result);
}
void read(BufferReadable& buf) {
- read_binary(AggOrthBitmapExprCalBaseData<T>::first_init, buf);
- read_binary(result, buf);
+ buf.read_binary(AggOrthBitmapExprCalBaseData<T>::first_init);
+ buf.read_binary(result);
}
void get(IColumn& to) const {
@@ -357,10 +357,10 @@ struct OrthBitmapUnionCountData {
void write(BufferWritable& buf) {
result = value.cardinality();
- write_binary(result, buf);
+ buf.write_binary(result);
}
- void read(BufferReadable& buf) { read_binary(result, buf); }
+ void read(BufferReadable& buf) { buf.read_binary(result); }
void get(IColumn& to) const {
auto& column = assert_cast<ColumnInt64&>(to);
diff --git a/be/src/vec/aggregate_functions/aggregate_function_percentile.h
b/be/src/vec/aggregate_functions/aggregate_function_percentile.h
index 986ba277e17..eb62f1cb655 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_percentile.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_percentile.h
@@ -82,31 +82,31 @@ struct PercentileApproxState {
}
void write(BufferWritable& buf) const {
- write_binary(init_flag, buf);
+ buf.write_binary(init_flag);
if (!init_flag) {
return;
}
- write_binary(target_quantile, buf);
- write_binary(compressions, buf);
+ buf.write_binary(target_quantile);
+ buf.write_binary(compressions);
uint32_t serialize_size = digest->serialized_size();
std::string result(serialize_size, '0');
DCHECK(digest.get() != nullptr);
digest->serialize((uint8_t*)result.c_str());
- write_binary(result, buf);
+ buf.write_binary(result);
}
void read(BufferReadable& buf) {
- read_binary(init_flag, buf);
+ buf.read_binary(init_flag);
if (!init_flag) {
return;
}
- read_binary(target_quantile, buf);
- read_binary(compressions, buf);
+ buf.read_binary(target_quantile);
+ buf.read_binary(compressions);
std::string str;
- read_binary(str, buf);
+ buf.read_binary(str);
digest = TDigest::create_unique(compressions);
digest->unserialize((uint8_t*)str.c_str());
}
@@ -323,14 +323,14 @@ struct PercentileState {
bool inited_flag = false;
void write(BufferWritable& buf) const {
- write_binary(inited_flag, buf);
+ buf.write_binary(inited_flag);
if (!inited_flag) {
return;
}
int size_num = vec_quantile.size();
- write_binary(size_num, buf);
+ buf.write_binary(size_num);
for (const auto& quantile : vec_quantile) {
- write_binary(quantile, buf);
+ buf.write_binary(quantile);
}
for (auto& counts : vec_counts) {
counts.serialize(buf);
@@ -338,16 +338,16 @@ struct PercentileState {
}
void read(BufferReadable& buf) {
- read_binary(inited_flag, buf);
+ buf.read_binary(inited_flag);
if (!inited_flag) {
return;
}
int size_num = 0;
- read_binary(size_num, buf);
+ buf.read_binary(size_num);
double data = 0.0;
vec_quantile.clear();
for (int i = 0; i < size_num; ++i) {
- read_binary(data, buf);
+ buf.read_binary(data);
vec_quantile.emplace_back(data);
}
vec_counts.clear();
diff --git a/be/src/vec/aggregate_functions/aggregate_function_product.h
b/be/src/vec/aggregate_functions/aggregate_function_product.h
index d9e6e6888a8..633d290d791 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_product.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_product.h
@@ -48,9 +48,9 @@ struct AggregateFunctionProductData {
product *= other.product;
}
- void write(BufferWritable& buffer) const { write_binary(product, buffer); }
+ void write(BufferWritable& buffer) const { buffer.write_binary(product); }
- void read(BufferReadable& buffer) { read_binary(product, buffer); }
+ void read(BufferReadable& buffer) { buffer.read_binary(product); }
typename PrimitiveTypeTraits<T>::ColumnItemType get() const { return
product; }
@@ -77,9 +77,9 @@ struct AggregateFunctionProductData<TYPE_DECIMALV2> {
memcpy(&product, &ret, sizeof(Decimal128V2));
}
- void write(BufferWritable& buffer) const { write_binary(product, buffer); }
+ void write(BufferWritable& buffer) const { buffer.write_binary(product); }
- void read(BufferReadable& buffer) { read_binary(product, buffer); }
+ void read(BufferReadable& buffer) { buffer.read_binary(product); }
Decimal128V2 get() const { return product; }
@@ -104,9 +104,9 @@ struct AggregateFunctionProductData<T> {
product /= multiplier;
}
- void write(BufferWritable& buffer) const { write_binary(product, buffer); }
+ void write(BufferWritable& buffer) const { buffer.write_binary(product); }
- void read(BufferReadable& buffer) { read_binary(product, buffer); }
+ void read(BufferReadable& buffer) { buffer.read_binary(product); }
typename PrimitiveTypeTraits<T>::ColumnItemType get() const { return
product; }
diff --git a/be/src/vec/aggregate_functions/aggregate_function_regr_union.h
b/be/src/vec/aggregate_functions/aggregate_function_regr_union.h
index ab28951de1b..7ac90f0e90b 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_regr_union.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_regr_union.h
@@ -48,19 +48,19 @@ struct AggregateFunctionRegrData {
Float64 sum_of_x_squared {};
void write(BufferWritable& buf) const {
- write_binary(sum_x, buf);
- write_binary(sum_y, buf);
- write_binary(sum_of_x_mul_y, buf);
- write_binary(sum_of_x_squared, buf);
- write_binary(count, buf);
+ buf.write_binary(sum_x);
+ buf.write_binary(sum_y);
+ buf.write_binary(sum_of_x_mul_y);
+ buf.write_binary(sum_of_x_squared);
+ buf.write_binary(count);
}
void read(BufferReadable& buf) {
- read_binary(sum_x, buf);
- read_binary(sum_y, buf);
- read_binary(sum_of_x_mul_y, buf);
- read_binary(sum_of_x_squared, buf);
- read_binary(count, buf);
+ buf.read_binary(sum_x);
+ buf.read_binary(sum_y);
+ buf.read_binary(sum_of_x_mul_y);
+ buf.read_binary(sum_of_x_squared);
+ buf.read_binary(count);
}
void reset() {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_rpc.h
b/be/src/vec/aggregate_functions/aggregate_function_rpc.h
index 27837577801..14a826de0da 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_rpc.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_rpc.h
@@ -246,13 +246,13 @@ public:
} else {
LOG(ERROR) << "serialize empty buf";
}
- write_binary(serialize_data, buf);
+ buf.write_binary(serialize_data);
}
void deserialize(BufferReadable& buf) {
static_cast<void>(send_buffer_to_rpc_server());
std::string serialize_data;
- read_binary(serialize_data, buf);
+ buf.read_binary(serialize_data);
if (error_default_str != serialize_data) {
_res.ParseFromString(serialize_data);
set_last_result(true);
diff --git a/be/src/vec/aggregate_functions/aggregate_function_sequence_match.h
b/be/src/vec/aggregate_functions/aggregate_function_sequence_match.h
index 6d4f5c39b9d..adabef0c27d 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_sequence_match.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_sequence_match.h
@@ -137,47 +137,47 @@ public:
}
void write(BufferWritable& buf) const {
- write_binary(sorted, buf);
- write_binary(events_list.size(), buf);
+ buf.write_binary(sorted);
+ buf.write_binary(events_list.size());
for (const auto& events : events_list) {
- write_binary(events.first, buf);
- write_binary(events.second.to_ulong(), buf);
+ buf.write_binary(events.first);
+ buf.write_binary(events.second.to_ulong());
}
// This is std::bitset<32>, which will not exceed 32 bits.
UInt32 conditions_met_value = (UInt32)conditions_met.to_ulong();
- write_binary(conditions_met_value, buf);
+ buf.write_binary(conditions_met_value);
- write_binary(pattern, buf);
- write_binary(arg_count, buf);
+ buf.write_binary(pattern);
+ buf.write_binary(arg_count);
}
void read(BufferReadable& buf) {
- read_binary(sorted, buf);
+ buf.read_binary(sorted);
size_t events_list_size;
- read_binary(events_list_size, buf);
+ buf.read_binary(events_list_size);
events_list.clear();
events_list.reserve(events_list_size);
for (size_t i = 0; i < events_list_size; ++i) {
Timestamp timestamp;
- read_binary(timestamp, buf);
+ buf.read_binary(timestamp);
UInt64 events;
- read_binary(events, buf);
+ buf.read_binary(events);
events_list.emplace_back(timestamp, Events {events});
}
UInt32 conditions_met_value;
- read_binary(conditions_met_value, buf);
+ buf.read_binary(conditions_met_value);
conditions_met = conditions_met_value;
- read_binary(pattern, buf);
- read_binary(arg_count, buf);
+ buf.read_binary(pattern);
+ buf.read_binary(arg_count);
}
private:
diff --git a/be/src/vec/aggregate_functions/aggregate_function_sort.h
b/be/src/vec/aggregate_functions/aggregate_function_sort.h
index 84cc83cd24b..f574689cf26 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_sort.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_sort.h
@@ -82,12 +82,12 @@ struct AggregateFunctionSortData {
throw doris::Exception(st);
}
- write_string_binary(pblock.SerializeAsString(), buf);
+ buf.write_binary(pblock.SerializeAsString());
}
void deserialize(BufferReadable& buf) {
std::string data;
- read_binary(data, buf);
+ buf.read_binary(data);
PBlock pblock;
pblock.ParseFromString(data);
diff --git a/be/src/vec/aggregate_functions/aggregate_function_stddev.h
b/be/src/vec/aggregate_functions/aggregate_function_stddev.h
index 53ab6dcae7d..b04f52c44ca 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_stddev.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_stddev.h
@@ -49,15 +49,15 @@ struct BaseData {
virtual ~BaseData() = default;
void write(BufferWritable& buf) const {
- write_binary(mean, buf);
- write_binary(m2, buf);
- write_binary(count, buf);
+ buf.write_binary(mean);
+ buf.write_binary(m2);
+ buf.write_binary(count);
}
void read(BufferReadable& buf) {
- read_binary(mean, buf);
- read_binary(m2, buf);
- read_binary(count, buf);
+ buf.read_binary(mean);
+ buf.read_binary(m2);
+ buf.read_binary(count);
}
void reset() {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_sum.h
b/be/src/vec/aggregate_functions/aggregate_function_sum.h
index 8e4d647b20c..b35cc215c13 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_sum.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_sum.h
@@ -61,9 +61,9 @@ struct AggregateFunctionSumData {
void merge(const AggregateFunctionSumData& rhs) { sum += rhs.sum; }
- void write(BufferWritable& buf) const { write_binary(sum, buf); }
+ void write(BufferWritable& buf) const { buf.write_binary(sum); }
- void read(BufferReadable& buf) { read_binary(sum, buf); }
+ void read(BufferReadable& buf) { buf.read_binary(sum); }
typename PrimitiveTypeTraits<T>::ColumnItemType get() const { return sum; }
};
diff --git a/be/src/vec/aggregate_functions/aggregate_function_topn.h
b/be/src/vec/aggregate_functions/aggregate_function_topn.h
index edddb129f06..01982a3e9b4 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_topn.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_topn.h
@@ -137,33 +137,33 @@ struct AggregateFunctionTopNData {
}
void write(BufferWritable& buf) const {
- write_binary(top_num, buf);
- write_binary(capacity, buf);
+ buf.write_binary(top_num);
+ buf.write_binary(capacity);
uint64_t element_number = std::min(capacity,
(uint64_t)counter_map.size());
- write_binary(element_number, buf);
+ buf.write_binary(element_number);
auto counter_vector = get_remain_vector();
for (auto i = 0; i < element_number; i++) {
auto element = counter_vector[i];
- write_binary(element.second, buf);
- write_binary(element.first, buf);
+ buf.write_binary(element.second);
+ buf.write_binary(element.first);
}
}
void read(BufferReadable& buf) {
- read_binary(top_num, buf);
- read_binary(capacity, buf);
+ buf.read_binary(top_num);
+ buf.read_binary(capacity);
uint64_t element_number = 0;
- read_binary(element_number, buf);
+ buf.read_binary(element_number);
counter_map.clear();
std::pair<DataType, uint64_t> element;
for (auto i = 0; i < element_number; i++) {
- read_binary(element.first, buf);
- read_binary(element.second, buf);
+ buf.read_binary(element.first);
+ buf.read_binary(element.second);
counter_map.insert(element);
}
}
diff --git a/be/src/vec/aggregate_functions/aggregate_function_uniq.h
b/be/src/vec/aggregate_functions/aggregate_function_uniq.h
index b68a202b965..76104e8cdd5 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_uniq.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_uniq.h
@@ -202,9 +202,9 @@ public:
void serialize(ConstAggregateDataPtr __restrict place, BufferWritable&
buf) const override {
auto& set = this->data(place).set;
- write_var_uint(set.size(), buf);
+ buf.write_var_uint(set.size());
for (const auto& elem : set) {
- write_pod_binary(elem, buf);
+ buf.write_binary(elem);
}
}
@@ -212,13 +212,13 @@ public:
BufferReadable& buf, Arena*) const override {
auto& set = this->data(place).set;
UInt64 size;
- read_var_uint(size, buf);
+ buf.read_var_uint(size);
set.rehash(size + set.size());
for (size_t i = 0; i < size; ++i) {
KeyType ref;
- read_pod_binary(ref, buf);
+ buf.read_binary(ref);
set.insert(ref);
}
}
@@ -227,13 +227,13 @@ public:
Arena*) const override {
auto& set = this->data(place).set;
UInt64 size;
- read_var_uint(size, buf);
+ buf.read_var_uint(size);
set.rehash(size + set.size());
for (size_t i = 0; i < size; ++i) {
KeyType ref;
- read_pod_binary(ref, buf);
+ buf.read_binary(ref);
set.insert(ref);
}
}
diff --git
a/be/src/vec/aggregate_functions/aggregate_function_uniq_distribute_key.h
b/be/src/vec/aggregate_functions/aggregate_function_uniq_distribute_key.h
index 16feddaae0f..376db6ff768 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_uniq_distribute_key.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_uniq_distribute_key.h
@@ -153,12 +153,12 @@ public:
}
void serialize(ConstAggregateDataPtr __restrict place, BufferWritable&
buf) const override {
- write_var_uint(this->data(place).set.size(), buf);
+ buf.write_var_uint(this->data(place).set.size());
}
void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
Arena*) const override {
- read_var_uint(this->data(place).count, buf);
+ buf.read_var_uint(this->data(place).count);
}
void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn&
to) const override {
diff --git a/be/src/vec/aggregate_functions/moments.h
b/be/src/vec/aggregate_functions/moments.h
index 0628b3116e2..d4614f4f060 100644
--- a/be/src/vec/aggregate_functions/moments.h
+++ b/be/src/vec/aggregate_functions/moments.h
@@ -53,9 +53,9 @@ struct VarMoments {
if constexpr (_level >= 4) m[4] += rhs.m[4];
}
- void write(BufferWritable& buf) const { write_binary(*this, buf); }
+ void write(BufferWritable& buf) const { buf.write_binary(*this); }
- void read(BufferReadable& buf) { read_binary(*this, buf); }
+ void read(BufferReadable& buf) { buf.read_binary(*this); }
T get() const {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
diff --git a/be/src/vec/common/hash_table/hash_table.h
b/be/src/vec/common/hash_table/hash_table.h
index 2330b18fbdf..e00986704d7 100644
--- a/be/src/vec/common/hash_table/hash_table.h
+++ b/be/src/vec/common/hash_table/hash_table.h
@@ -196,12 +196,10 @@ struct HashTableCell {
void set_mapped(const value_type& /*value*/) {}
/// Serialization, in binary and text form.
- void write(doris::vectorized::BufferWritable& wb) const {
- doris::vectorized::write_binary(key, wb);
- }
+ void write(doris::vectorized::BufferWritable& wb) const {
wb.write_binary(key); }
/// Deserialization, in binary and text form.
- void read(doris::vectorized::BufferReadable& rb) {
doris::vectorized::read_binary(key, rb); }
+ void read(doris::vectorized::BufferReadable& rb) { rb.read_binary(key); }
};
template <typename Key, typename Hash, typename State>
@@ -948,7 +946,7 @@ public:
void write(doris::vectorized::BufferWritable& wb) const {
Cell::State::write(wb);
- doris::vectorized::write_var_uint(m_size, wb);
+ wb.write_var_uint(m_size);
if (this->get_has_zero()) this->zero_value()->write(wb);
@@ -964,7 +962,7 @@ public:
m_size = 0;
doris::vectorized::UInt64 new_size = 0;
- doris::vectorized::read_var_uint(new_size, rb);
+ rb.read_var_uint(new_size);
free();
Grower new_grower = grower;
diff --git a/be/src/vec/common/space_saving.h b/be/src/vec/common/space_saving.h
index 4a05ab2b409..85d5d085216 100644
--- a/be/src/vec/common/space_saving.h
+++ b/be/src/vec/common/space_saving.h
@@ -88,15 +88,15 @@ public:
: key(k), hash(h), count(c), error(e) {}
void write(BufferWritable& wb) const {
- write_binary(key, wb);
- write_var_uint(count, wb);
- write_var_uint(error, wb);
+ wb.write_binary(key);
+ wb.write_var_uint(count);
+ wb.write_var_uint(error);
}
void read(BufferReadable& rb) {
- read_binary(key, rb);
- read_var_uint(count, rb);
- read_var_uint(error, rb);
+ rb.read_binary(key);
+ rb.read_var_uint(count);
+ rb.read_var_uint(error);
}
bool operator>(const Counter& b) const {
@@ -231,21 +231,21 @@ public:
}
void write(BufferWritable& wb) const {
- write_var_uint(size(), wb);
+ wb.write_var_uint(size());
for (auto& counter : counter_list) {
counter->write(wb);
}
- write_var_uint(alpha_map.size(), wb);
+ wb.write_var_uint(alpha_map.size());
for (auto alpha : alpha_map) {
- write_var_uint(alpha, wb);
+ wb.write_var_uint(alpha);
}
}
void read(BufferReadable& rb) {
destroy_elements();
uint64_t count = 0;
- read_var_uint(count, rb);
+ rb.read_var_uint(count);
for (UInt64 i = 0; i < count; ++i) {
std::unique_ptr counter = std::make_unique<Counter>();
@@ -260,10 +260,10 @@ public:
// Reads the alpha map data from the provided readable buffer.
void read_alpha_map(BufferReadable& rb) {
uint64_t alpha_size = 0;
- read_var_uint(alpha_size, rb);
+ rb.read_var_uint(alpha_size);
for (size_t i = 0; i < alpha_size; ++i) {
uint64_t alpha = 0;
- read_var_uint(alpha, rb);
+ rb.read_var_uint(alpha);
alpha_map.push_back(alpha);
}
}
diff --git a/be/src/vec/common/string_buffer.hpp
b/be/src/vec/common/string_buffer.hpp
index 980f70b2dc7..5cd981cc16d 100644
--- a/be/src/vec/common/string_buffer.hpp
+++ b/be/src/vec/common/string_buffer.hpp
@@ -24,6 +24,8 @@
#include "vec/common/string_ref.h"
namespace doris::vectorized {
+static constexpr size_t DEFAULT_MAX_STRING_SIZE = 1073741824; // 1GB
+static constexpr size_t DEFAULT_MAX_JSON_SIZE = 1073741824; // 1GB
// store and commit data. only after commit the data is effective on its'
base(ColumnString)
// everytime commit, the _data add one row.
@@ -57,6 +59,46 @@ public:
write(buffer.data(), buffer.size());
}
+ // Write a variable-length unsigned integer to the buffer
+ // maybe it's better not to use this
+ void write_var_uint(UInt64 x) {
+ char bytes[9];
+ uint8_t i = 0;
+ while (i < 9) {
+ uint8_t byte = x & 0x7F;
+ if (x > 0x7F) {
+ byte |= 0x80;
+ }
+
+ bytes[i++] = byte;
+
+ x >>= 7;
+ if (!x) {
+ break;
+ }
+ }
+ write((char*)&i, 1);
+ write(bytes, i);
+ }
+
+ template <typename Type>
+ void write_binary(const Type& x) {
+ static_assert(std::is_standard_layout_v<Type>);
+ write(reinterpret_cast<const char*>(&x), sizeof(x));
+ }
+
+ template <typename Type>
+ requires(std::is_same_v<Type, String> || std::is_same_v<Type,
PaddedPODArray<UInt8>>)
+ void write_binary(const Type& s) {
+ write_var_uint(s.size());
+ write(reinterpret_cast<const char*>(s.data()), s.size());
+ }
+
+ void write_binary(const StringRef& s) {
+ write_var_uint(s.size);
+ write(s.data, s.size);
+ }
+
private:
ColumnString::Chars& _data;
ColumnString::Offsets& _offsets;
@@ -84,6 +126,62 @@ public:
_data += len;
}
+ void read_var_uint(UInt64& x) {
+ x = 0;
+ // get length from first byte firstly
+ uint8_t len = 0;
+ read((char*)&len, 1);
+ auto ref = read(len);
+ // read data and set it to x per byte.
+ char* bytes = const_cast<char*>(ref.data);
+ for (size_t i = 0; i < 9; ++i) {
+ UInt64 byte = bytes[i];
+ x |= (byte & 0x7F) << (7 * i);
+
+ if (!(byte & 0x80)) {
+ return;
+ }
+ }
+ }
+
+ template <typename Type>
+ void read_binary(Type& x) {
+ static_assert(std::is_standard_layout_v<Type>);
+ read(reinterpret_cast<char*>(&x), sizeof(x));
+ }
+
+ template <typename Type>
+ requires(std::is_same_v<Type, String> || std::is_same_v<Type,
PaddedPODArray<UInt8>>)
+ void read_binary(Type& s) {
+ UInt64 size = 0;
+ read_var_uint(size);
+
+ if (size > DEFAULT_MAX_STRING_SIZE) {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR,
+ "Too large string size."
+ " size: {}, max: {}",
+ size, DEFAULT_MAX_STRING_SIZE);
+ }
+
+ s.resize(size);
+ read((char*)s.data(), size);
+ }
+
+ // Note that the StringRef in this function is just a reference, it should
be copied outside
+ void read_binary(StringRef& s) {
+ UInt64 size = 0;
+ read_var_uint(size);
+
+ if (size > DEFAULT_MAX_STRING_SIZE) {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR,
+ "Too large string size. "
+ " size: {}, max: {}",
+ size, DEFAULT_MAX_STRING_SIZE);
+ }
+
+ s = read(size);
+ }
+
private:
const char* _data;
};
@@ -91,4 +189,15 @@ private:
using VectorBufferReader = BufferReadable;
using BufferReader = BufferReadable;
+///TODO: Currently this function is only called in one place, we might need to
convert all read_binary(StringRef) to this style? Or directly use
read_binary(String)
+inline StringRef read_binary_into(Arena& arena, BufferReadable& buf) {
+ UInt64 size = 0;
+ buf.read_var_uint(size);
+
+ char* data = arena.alloc(size);
+ buf.read(data, size);
+
+ return {data, size};
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/data_types/data_type_bitmap.cpp
b/be/src/vec/data_types/data_type_bitmap.cpp
index da7b537c0d3..fc13190e033 100644
--- a/be/src/vec/data_types/data_type_bitmap.cpp
+++ b/be/src/vec/data_types/data_type_bitmap.cpp
@@ -162,12 +162,12 @@ void DataTypeBitMap::serialize_as_stream(const
BitmapValue& cvalue, BufferWritab
size_t bytesize = value.getSizeInBytes();
memory_buffer.resize(bytesize);
value.write_to(const_cast<char*>(memory_buffer.data()));
- write_string_binary(memory_buffer, buf);
+ buf.write_binary(memory_buffer);
}
void DataTypeBitMap::deserialize_as_stream(BitmapValue& value, BufferReadable&
buf) {
StringRef ref;
- read_string_binary(ref, buf);
+ buf.read_binary(ref);
value.deserialize(ref.data);
}
diff --git a/be/src/vec/data_types/data_type_hll.cpp
b/be/src/vec/data_types/data_type_hll.cpp
index e543a42c697..6dcd8386e32 100644
--- a/be/src/vec/data_types/data_type_hll.cpp
+++ b/be/src/vec/data_types/data_type_hll.cpp
@@ -180,12 +180,12 @@ void DataTypeHLL::serialize_as_stream(const HyperLogLog&
cvalue, BufferWritable&
std::string memory_buffer(value.max_serialized_size(), '0');
size_t actual_size = value.serialize((uint8_t*)memory_buffer.data());
memory_buffer.resize(actual_size);
- write_string_binary(memory_buffer, buf);
+ buf.write_binary(memory_buffer);
}
void DataTypeHLL::deserialize_as_stream(HyperLogLog& value, BufferReadable&
buf) {
std::string str;
- read_string_binary(str, buf);
+ buf.read_binary(str);
value.deserialize(Slice(str));
}
diff --git a/be/src/vec/data_types/data_type_quantilestate.cpp
b/be/src/vec/data_types/data_type_quantilestate.cpp
index ec4d05b819e..a73f0de9b17 100644
--- a/be/src/vec/data_types/data_type_quantilestate.cpp
+++ b/be/src/vec/data_types/data_type_quantilestate.cpp
@@ -163,12 +163,12 @@ void DataTypeQuantileState::serialize_as_stream(const
QuantileState& cvalue, Buf
std::string memory_buffer;
memory_buffer.resize(value.get_serialized_size());
value.serialize(const_cast<uint8_t*>(reinterpret_cast<uint8_t*>(memory_buffer.data())));
- write_string_binary(memory_buffer, buf);
+ buf.write_binary(memory_buffer);
}
void DataTypeQuantileState::deserialize_as_stream(QuantileState& value,
BufferReadable& buf) {
StringRef ref;
- read_string_binary(ref, buf);
+ buf.read_binary(ref);
value.deserialize(ref.to_slice());
}
diff --git a/be/src/vec/io/io_helper.h b/be/src/vec/io/io_helper.h
index 8943c2f4fc1..8782a155b2e 100644
--- a/be/src/vec/io/io_helper.h
+++ b/be/src/vec/io/io_helper.h
@@ -41,12 +41,6 @@
namespace doris::vectorized {
-// Define in the namespace and avoid defining global macros,
-// because it maybe conflicts with other libs
-static constexpr size_t DEFAULT_MAX_STRING_SIZE = 1073741824; // 1GB
-static constexpr size_t DEFAULT_MAX_JSON_SIZE = 1073741824; // 1GB
-static constexpr auto WRITE_HELPERS_MAX_INT_WIDTH = 40U;
-
inline std::string int128_to_string(int128_t value) {
return fmt::format(FMT_COMPILE("{}"), value);
}
@@ -96,154 +90,6 @@ void write_text(Decimal<T> value, UInt32 scale,
std::ostream& ostr) {
ostr.write(str_fractional.data(), scale);
}
}
-/// Methods for output in binary format.
-
-/// Write POD-type in native format. It's recommended to use only with packed
(dense) data types.
-template <typename Type>
-void write_pod_binary(const Type& x, BufferWritable& buf) {
- buf.write(reinterpret_cast<const char*>(&x), sizeof(x));
-}
-
-template <typename Type>
-void write_int_binary(const Type& x, BufferWritable& buf) {
- write_pod_binary(x, buf);
-}
-
-template <typename Type>
-void write_float_binary(const Type& x, BufferWritable& buf) {
- write_pod_binary(x, buf);
-}
-
-inline void write_string_binary(const std::string& s, BufferWritable& buf) {
- write_var_uint(s.size(), buf);
- buf.write(s.data(), s.size());
-}
-
-inline void write_string_binary(const StringRef& s, BufferWritable& buf) {
- write_var_uint(s.size, buf);
- buf.write(s.data, s.size);
-}
-
-inline void write_string_binary(const char* s, BufferWritable& buf) {
- write_string_binary(StringRef {std::string(s)}, buf);
-}
-
-inline void write_json_binary(const JsonbField& s, BufferWritable& buf) {
- write_string_binary(StringRef {s.get_value(), s.get_size()}, buf);
-}
-
-template <typename Type>
-void write_vector_binary(const std::vector<Type>& v, BufferWritable& buf) {
- write_var_uint(v.size(), buf);
-
- for (typename std::vector<Type>::const_iterator it = v.begin(); it !=
v.end(); ++it) {
- write_binary(*it, buf);
- }
-}
-
-inline void write_binary(const String& x, BufferWritable& buf) {
- write_string_binary(x, buf);
-}
-
-inline void write_binary(const StringRef& x, BufferWritable& buf) {
- write_string_binary(x, buf);
-}
-
-template <typename Type>
-void write_binary(const Type& x, BufferWritable& buf) {
- write_pod_binary(x, buf);
-}
-
-/// Read POD-type in native format
-template <typename Type>
-void read_pod_binary(Type& x, BufferReadable& buf) {
- buf.read(reinterpret_cast<char*>(&x), sizeof(x));
-}
-
-template <typename Type>
-void read_int_binary(Type& x, BufferReadable& buf) {
- read_pod_binary(x, buf);
-}
-
-template <typename Type>
-void read_float_binary(Type& x, BufferReadable& buf) {
- read_pod_binary(x, buf);
-}
-
-template <typename Type>
- requires(std::is_same_v<Type, String> || std::is_same_v<Type,
PaddedPODArray<UInt8>>)
-inline void read_string_binary(Type& s, BufferReadable& buf,
- size_t MAX_STRING_SIZE =
DEFAULT_MAX_STRING_SIZE) {
- UInt64 size = 0;
- read_var_uint(size, buf);
-
- if (size > MAX_STRING_SIZE) {
- throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Too large string
size.");
- }
-
- s.resize(size);
- buf.read((char*)s.data(), size);
-}
-
-inline void read_string_binary(StringRef& s, BufferReadable& buf,
- size_t MAX_STRING_SIZE =
DEFAULT_MAX_STRING_SIZE) {
- UInt64 size = 0;
- read_var_uint(size, buf);
-
- if (size > MAX_STRING_SIZE) {
- throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Too large string
size.");
- }
-
- s = buf.read(size);
-}
-
-inline StringRef read_string_binary_into(Arena& arena, BufferReadable& buf) {
- UInt64 size = 0;
- read_var_uint(size, buf);
-
- char* data = arena.alloc(size);
- buf.read(data, size);
-
- return {data, size};
-}
-
-inline void read_json_binary(JsonbField& val, BufferReadable& buf,
- size_t MAX_JSON_SIZE = DEFAULT_MAX_JSON_SIZE) {
- StringRef result;
- read_string_binary(result, buf);
- val = JsonbField(result.data, result.size);
-}
-
-template <typename Type>
-void read_vector_binary(std::vector<Type>& v, BufferReadable& buf,
- size_t MAX_VECTOR_SIZE = DEFAULT_MAX_STRING_SIZE) {
- UInt64 size = 0;
- read_var_uint(size, buf);
-
- if (size > MAX_VECTOR_SIZE) {
- throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Too large vector
size.");
- }
-
- v.resize(size);
- for (size_t i = 0; i < size; ++i) {
- read_binary(v[i], buf);
- }
-}
-
-template <typename Type>
- requires(std::is_same_v<Type, String> || std::is_same_v<Type,
PaddedPODArray<UInt8>>)
-inline void read_binary(Type& x, BufferReadable& buf) {
- read_string_binary(x, buf);
-}
-
-inline void read_binary(StringRef& x, BufferReadable& buf) {
- read_string_binary(x, buf);
-}
-
-template <typename Type>
-void read_binary(Type& x, BufferReadable& buf) {
- read_pod_binary(x, buf);
-}
template <typename T>
bool read_float_text_fast_impl(T& x, ReadBuffer& in) {
diff --git a/be/src/vec/io/var_int.h b/be/src/vec/io/var_int.h
index 561e7f59edb..44e07d8e1dd 100644
--- a/be/src/vec/io/var_int.h
+++ b/be/src/vec/io/var_int.h
@@ -24,53 +24,14 @@ namespace doris::vectorized {
/** Write Int64 in variable length format (base128) */
template <typename OUT>
void write_var_int(Int64 x, OUT& ostr) {
- write_var_uint(static_cast<UInt64>((x << 1) ^ (x >> 63)), ostr);
+ ostr.write_var_uint(static_cast<UInt64>((x << 1) ^ (x >> 63)));
}
/** Read Int64, written in variable length format (base128) */
template <typename IN>
void read_var_int(Int64& x, IN& istr) {
- read_var_uint(*reinterpret_cast<UInt64*>(&x), istr);
+ istr.read_var_uint(*reinterpret_cast<UInt64*>(&x));
x = (static_cast<UInt64>(x) >> 1) ^ -(x & 1);
}
-// TODO: do real implement in the future
-inline void read_var_uint(UInt64& x, BufferReadable& buf) {
- x = 0;
- // get length from first byte firstly
- uint8_t len = 0;
- buf.read((char*)&len, 1);
- auto ref = buf.read(len);
- // read data and set it to x per byte.
- char* bytes = const_cast<char*>(ref.data);
- for (size_t i = 0; i < 9; ++i) {
- UInt64 byte = bytes[i];
- x |= (byte & 0x7F) << (7 * i);
-
- if (!(byte & 0x80)) {
- return;
- }
- }
-}
-
-inline void write_var_uint(UInt64 x, BufferWritable& ostr) {
- char bytes[9];
- uint8_t i = 0;
- while (i < 9) {
- uint8_t byte = x & 0x7F;
- if (x > 0x7F) {
- byte |= 0x80;
- }
-
- bytes[i++] = byte;
-
- x >>= 7;
- if (!x) {
- break;
- }
- }
- ostr.write((char*)&i, 1);
- ostr.write(bytes, i);
-}
-
} // namespace doris::vectorized
diff --git a/be/test/vec/common/string_buffer_test.cpp
b/be/test/vec/common/string_buffer_test.cpp
new file mode 100644
index 00000000000..4c80725b8ad
--- /dev/null
+++ b/be/test/vec/common/string_buffer_test.cpp
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/common/string_buffer.hpp"
+
+#include <gtest/gtest.h>
+
+#include "runtime/primitive_type.h"
+
+namespace doris::vectorized {
+
+TEST(StringBufferTest, TestWriteNumber) {
+ ColumnString column_string;
+ BufferWritable buffer(column_string);
+ buffer.write_number(12345);
+ buffer.write_number(true);
+ buffer.write_number(3.14159);
+ buffer.commit();
+
+ EXPECT_EQ(column_string.size(), 1);
+ auto str_ref = column_string.get_data_at(0);
+
+ EXPECT_EQ(str_ref.to_string(), "12345true3.14159");
+}
+
+TEST(StringBufferTest, TestWriteBinary) {
+ ColumnString column_string;
+ BufferWritable buffer(column_string);
+
+ {
+ String str = "Hello, World!";
+ buffer.write_binary(str);
+ }
+ {
+ int64_t x = 123456789;
+ buffer.write_binary(x);
+ }
+
+ buffer.commit();
+
+ EXPECT_EQ(column_string.size(), 1);
+ auto str_ref = column_string.get_data_at(0);
+
+ BufferReadable readable(str_ref);
+
+ {
+ String read_str;
+ readable.read_binary(read_str);
+ EXPECT_EQ(read_str, "Hello, World!");
+ }
+
+ {
+ int64_t read_x;
+ readable.read_binary(read_x);
+ EXPECT_EQ(read_x, 123456789);
+ }
+}
+
+} // namespace doris::vectorized
diff --git a/be/test/vec/core/field_test.cpp b/be/test/vec/core/field_test.cpp
index 9576f2963e7..bdb3eb7a3fe 100644
--- a/be/test/vec/core/field_test.cpp
+++ b/be/test/vec/core/field_test.cpp
@@ -123,7 +123,7 @@ TEST(VFieldTest, jsonb_field_io) {
// Write the JsonbField to the buffer
{
BufferWritable buf(column_str);
- write_json_binary(original, buf);
+ buf.write_binary(StringRef {original.get_value(),
original.get_size()});
buf.commit(); // Important: commit the write operation
}
@@ -139,8 +139,9 @@ TEST(VFieldTest, jsonb_field_io) {
BufferReadable read_buf(str_ref);
// Read the data back into a new JsonbField
- JsonbField read_field;
- read_json_binary(read_field, read_buf);
+ StringRef result;
+ read_buf.read_binary(result);
+ JsonbField read_field = JsonbField(result.data, result.size);
// Verify the data
ASSERT_NE(read_field.get_value(), nullptr);
@@ -156,7 +157,7 @@ TEST(VFieldTest, jsonb_field_io) {
// ser
{
BufferWritable field_buf(field_column);
- write_json_binary(original, field_buf);
+ field_buf.write_binary(StringRef {original.get_value(),
original.get_size()});
field_buf.commit();
}
@@ -169,8 +170,9 @@ TEST(VFieldTest, jsonb_field_io) {
BufferReadable read_field_buf(field_str_ref);
// we can't use read_binary because of the JsonbField is not POD
type
- JsonbField jsonb_from_field;
- read_json_binary(jsonb_from_field, read_field_buf);
+ StringRef result;
+ read_field_buf.read_binary(result);
+ JsonbField jsonb_from_field = JsonbField(result.data, result.size);
Field f2 = Field::create_field<TYPE_JSONB>(jsonb_from_field);
ASSERT_EQ(f2.get_type(), TYPE_JSONB);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]