This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d57851c8743 [fix](variant) reduce doc compaction writer peak memory
(#61955)
d57851c8743 is described below
commit d57851c8743e3d37e95a2159ec066cf240ce19bc
Author: lihangyu <[email protected]>
AuthorDate: Wed Apr 1 11:00:52 2026 +0800
[fix](variant) reduce doc compaction writer peak memory (#61955)
- cap doc-value path reserve and reuse exact unique-path counts when
available
- release doc write plans before doc-value writing to avoid overlapping
peaks
- eagerly finish and flush compact-writer subcolumns, and make
finish/write_data idempotent
- avoid merge/sort when bucket = 1
- add unit coverage for expected_unique_paths and compact-writer
roundtrip/idempotence
The blue line is the improved version
<img width="1578" height="570" alt="image"
src="https://github.com/user-attachments/assets/eb501829-627c-45e8-85a1-93c63d7148de"
/>
---
be/src/exec/common/variant_util.cpp | 10 +-
be/src/exec/common/variant_util.h | 2 +-
.../segment/variant/binary_column_reader.cpp | 12 +-
.../segment/variant/variant_column_writer_impl.cpp | 96 +++--
.../segment/variant/variant_column_writer_impl.h | 1 +
.../segment/variant_column_writer_reader_test.cpp | 386 +++++++++++----------
be/test/storage/segment/variant_util_test.cpp | 61 ++++
7 files changed, 343 insertions(+), 225 deletions(-)
diff --git a/be/src/exec/common/variant_util.cpp
b/be/src/exec/common/variant_util.cpp
index 73a31b95c7b..e2bede96fbf 100644
--- a/be/src/exec/common/variant_util.cpp
+++ b/be/src/exec/common/variant_util.cpp
@@ -2052,7 +2052,8 @@ void materialize_docs_to_subcolumns(ColumnVariant&
column_variant) {
// ============ Implementation from variant_util.cpp ============
phmap::flat_hash_map<std::string_view, ColumnVariant::Subcolumn>
materialize_docs_to_subcolumns_map(
- const ColumnVariant& variant) {
+ const ColumnVariant& variant, size_t expected_unique_paths) {
+ constexpr size_t kInitialPathReserve = 8192;
phmap::flat_hash_map<std::string_view, ColumnVariant::Subcolumn>
subcolumns;
const auto [column_key, column_value] =
variant.get_doc_value_data_paths_and_values();
@@ -2061,11 +2062,12 @@ phmap::flat_hash_map<std::string_view,
ColumnVariant::Subcolumn> materialize_doc
DCHECK_EQ(num_rows, variant.size()) << "doc snapshot offsets size mismatch
with variant rows";
- // Best-effort reserve: at most number of kv pairs.
- subcolumns.reserve(column_key->size());
+ subcolumns.reserve(expected_unique_paths != 0
+ ? expected_unique_paths
+ : std::min<size_t>(column_key->size(),
kInitialPathReserve));
for (size_t row = 0; row < num_rows; ++row) {
- const size_t start = (row == 0) ? 0 : column_offsets[row - 1];
+ const size_t start = column_offsets[row - 1];
const size_t end = column_offsets[row];
for (size_t i = start; i < end; ++i) {
const auto& key = column_key->get_data_at(i);
diff --git a/be/src/exec/common/variant_util.h
b/be/src/exec/common/variant_util.h
index dda580154b6..f4302146972 100644
--- a/be/src/exec/common/variant_util.h
+++ b/be/src/exec/common/variant_util.h
@@ -265,6 +265,6 @@ Status parse_and_materialize_variant_columns(Block& block,
const TabletSchema& t
// NOTE: Returned map keys are `std::string_view` pointing into the underlying
doc snapshot paths
// column, so the input `variant` must outlive the returned map.
phmap::flat_hash_map<std::string_view, ColumnVariant::Subcolumn>
materialize_docs_to_subcolumns_map(
- const ColumnVariant& variant);
+ const ColumnVariant& variant, size_t expected_unique_paths = 0);
} // namespace doris::variant_util
diff --git a/be/src/storage/segment/variant/binary_column_reader.cpp
b/be/src/storage/segment/variant/binary_column_reader.cpp
index fb9335b73c6..96de6c67507 100644
--- a/be/src/storage/segment/variant/binary_column_reader.cpp
+++ b/be/src/storage/segment/variant/binary_column_reader.cpp
@@ -107,6 +107,16 @@ BinaryColumnType SingleSparseColumnReader::get_type()
const {
}
Status
MultipleBinaryColumnReader::new_binary_column_iterator(ColumnIteratorUPtr*
iter) const {
+ // Single bucket can be read directly without cross-bucket merge/sort.
+ if (_multiple_column_readers.size() == 1) {
+ DCHECK(!_multiple_column_readers.empty());
+ auto it = _multiple_column_readers.begin();
+ ColumnIteratorUPtr single_iter;
+ RETURN_IF_ERROR(it->second->new_iterator(&single_iter, nullptr));
+ *iter = std::move(single_iter);
+ return Status::OK();
+ }
+
std::vector<std::unique_ptr<ColumnIterator>> iters;
iters.reserve(_multiple_column_readers.size());
for (const auto& [index, reader] : _multiple_column_readers) {
@@ -268,4 +278,4 @@ void
CombineMultipleBinaryColumnIterator::_collect_sparse_data_from_buckets(
}
#include "common/compile_check_end.h"
-} // namespace doris::segment_v2
\ No newline at end of file
+} // namespace doris::segment_v2
diff --git a/be/src/storage/segment/variant/variant_column_writer_impl.cpp
b/be/src/storage/segment/variant/variant_column_writer_impl.cpp
index 4f87a1640b3..f70d1565788 100644
--- a/be/src/storage/segment/variant/variant_column_writer_impl.cpp
+++ b/be/src/storage/segment/variant/variant_column_writer_impl.cpp
@@ -190,7 +190,7 @@ struct SubcolumnWriteEntry {
std::string_view path;
ColumnVariant::Subcolumn* subcolumn = nullptr;
// nullptr means dense materialization; otherwise sparse row ids for this
path.
- const std::vector<uint32_t>* rowids = nullptr;
+ std::vector<uint32_t>* rowids = nullptr;
};
struct SubcolumnWritePlan {
@@ -203,6 +203,8 @@ struct SubcolumnWritePlan {
DocValuePathStats stats;
};
+constexpr size_t kInitialDocPathReserve = 8192;
+
// Build per-path non-null counts from the serialized doc-value representation.
void build_doc_value_stats(const ColumnVariant& variant, DocValuePathStats*
stats) {
auto [column_key, column_value] =
variant.get_doc_value_data_paths_and_values();
@@ -211,7 +213,7 @@ void build_doc_value_stats(const ColumnVariant& variant,
DocValuePathStats* stat
const size_t num_rows = column_offsets.size();
stats->clear();
- stats->reserve(column_key->size());
+ stats->reserve(std::min<size_t>(column_key->size(),
kInitialDocPathReserve));
for (size_t row = 0; row < num_rows; ++row) {
const size_t start = column_offsets[row - 1];
const size_t end = column_offsets[row];
@@ -223,35 +225,35 @@ void build_doc_value_stats(const ColumnVariant& variant,
DocValuePathStats* stat
}
}
-// Materialize sparse subcolumns for each path and build per-path non-null
counts.
+// Materialize sparse subcolumns for each path using precomputed per-path
non-null counts.
// For each row, we decode only present (path, value) pairs and append them to
the
// corresponding subcolumn, while recording the row id to allow gap filling
later.
-void build_sparse_subcolumns_and_stats(const ColumnVariant& variant,
- DocSparseSubcolumns* subcolumns,
DocValuePathStats* stats) {
+void build_sparse_subcolumns(const ColumnVariant& variant, const
DocValuePathStats& stats,
+ DocSparseSubcolumns* subcolumns) {
auto [column_key, column_value] =
variant.get_doc_value_data_paths_and_values();
const auto& column_offsets = variant.serialized_doc_value_column_offsets();
const size_t num_rows = column_offsets.size();
subcolumns->clear();
- stats->clear();
- subcolumns->reserve(column_key->size());
+ subcolumns->reserve(stats.size());
for (size_t row = 0; row < num_rows; ++row) {
const size_t start = column_offsets[row - 1];
const size_t end = column_offsets[row];
for (size_t i = start; i < end; ++i) {
const StringRef path = column_key->get_data_at(i);
- auto& data = subcolumns->try_emplace(path).first->second;
+ auto stat_it = stats.find(path);
+ DCHECK(stat_it != stats.end());
+ auto [data_it, inserted] = subcolumns->try_emplace(path);
+ auto& data = data_it->second;
+ if (inserted) {
+ data.rowids.reserve(stat_it->second);
+ }
data.rowids.push_back(cast_set<uint32_t>(row));
data.subcolumn.deserialize_from_binary_column(column_value, i);
++data.non_null_count;
}
}
-
- stats->reserve(subcolumns->size());
- for (const auto& [path, data] : *subcolumns) {
- stats->try_emplace(path, data.non_null_count);
- }
}
SubcolumnWritePlan build_subcolumn_write_plan(const ColumnVariant& variant,
size_t num_rows,
@@ -263,7 +265,8 @@ SubcolumnWritePlan build_subcolumn_write_plan(const
ColumnVariant& variant, size
}
if (config::enable_variant_doc_sparse_write_subcolumns) {
- build_sparse_subcolumns_and_stats(variant, &plan.sparse_subcolumns,
&plan.stats);
+ build_doc_value_stats(variant, &plan.stats);
+ build_sparse_subcolumns(variant, plan.stats, &plan.sparse_subcolumns);
plan.entries.reserve(plan.sparse_subcolumns.size());
for (auto& [path, sparse] : plan.sparse_subcolumns) {
SubcolumnWriteEntry entry;
@@ -277,7 +280,8 @@ SubcolumnWritePlan build_subcolumn_write_plan(const
ColumnVariant& variant, size
}
build_doc_value_stats(variant, &plan.stats);
- plan.dense_subcolumns =
variant_util::materialize_docs_to_subcolumns_map(variant);
+ plan.dense_subcolumns =
+ variant_util::materialize_docs_to_subcolumns_map(variant,
plan.stats.size());
plan.entries.reserve(plan.dense_subcolumns.size());
for (auto& [path, subcolumn] : plan.dense_subcolumns) {
SubcolumnWriteEntry entry;
@@ -295,20 +299,39 @@ Status execute_doc_write_pipeline(const ColumnVariant&
variant, size_t num_rows,
WriteMaterializedFn&& write_materialized_fn,
WriteDocValueFn&& write_doc_value_fn,
DocValuePathStats* out_column_stats) {
- SubcolumnWritePlan plan =
- build_subcolumn_write_plan(variant, num_rows,
variant_doc_materialization_min_rows);
- *out_column_stats = std::move(plan.stats);
- if (out_column_stats->empty()) {
- build_doc_value_stats(variant, out_column_stats);
- }
+ {
+ SubcolumnWritePlan plan =
+ build_subcolumn_write_plan(variant, num_rows,
variant_doc_materialization_min_rows);
+ *out_column_stats = std::move(plan.stats);
+ if (out_column_stats->empty()) {
+ build_doc_value_stats(variant, out_column_stats);
+ }
- for (auto& entry : plan.entries) {
- RETURN_IF_ERROR(write_materialized_fn(entry, column_id));
+ for (auto& entry : plan.entries) {
+ RETURN_IF_ERROR(write_materialized_fn(entry, column_id));
+ }
}
RETURN_IF_ERROR(write_doc_value_fn(column_id));
return Status::OK();
}
+Status finish_and_write_column_writer(ColumnWriter* writer) {
+ RETURN_IF_ERROR(writer->finish());
+ RETURN_IF_ERROR(writer->write_data());
+ return Status::OK();
+}
+
+void release_processed_subcolumn_write_entry(SubcolumnWriteEntry* entry) {
+ DCHECK(entry != nullptr);
+ DCHECK(entry->subcolumn != nullptr);
+ ColumnVariant::Subcolumn released_subcolumn;
+ std::swap(*entry->subcolumn, released_subcolumn);
+ if (entry->rowids != nullptr) {
+ std::vector<uint32_t> released_rowids;
+ released_rowids.swap(*entry->rowids);
+ }
+}
+
bool is_invalid_materialized_subcolumn_type(const DataTypePtr& type) {
return variant_util::get_base_type_of_array(type)->get_primitive_type() ==
PrimitiveType::INVALID_TYPE;
@@ -1736,6 +1759,9 @@ Status VariantDocCompactWriter::finish() {
if (!is_finalized()) {
RETURN_IF_ERROR(finalize());
}
+ if (_data_written) {
+ return Status::OK();
+ }
for (auto& column_writer : _subcolumn_writers) {
RETURN_IF_ERROR(column_writer->finish());
}
@@ -1746,10 +1772,14 @@ Status VariantDocCompactWriter::write_data() {
if (!is_finalized()) {
RETURN_IF_ERROR(finalize());
}
+ if (_data_written) {
+ return Status::OK();
+ }
for (auto& column_writer : _subcolumn_writers) {
RETURN_IF_ERROR(column_writer->write_data());
}
RETURN_IF_ERROR(_doc_value_column_writer->write_data());
+ _data_written = true;
return Status::OK();
}
Status VariantDocCompactWriter::write_ordinal_index() {
@@ -1854,13 +1884,21 @@ Status VariantDocCompactWriter::finalize() {
*variant_column, num_rows, variant_doc_materialization_min_rows,
column_id,
[this, &parent_column, num_rows, &converter](SubcolumnWriteEntry&
entry,
int&
materialized_column_id) {
- return _write_materialized_subcolumn(parent_column,
entry.path, *entry.subcolumn,
- num_rows, converter.get(),
- materialized_column_id,
entry.rowids);
+ const size_t prev_writer_count = _subcolumn_writers.size();
+ RETURN_IF_ERROR(_write_materialized_subcolumn(
+ parent_column, entry.path, *entry.subcolumn, num_rows,
converter.get(),
+ materialized_column_id, entry.rowids));
+ DCHECK_EQ(_subcolumn_writers.size(), prev_writer_count + 1);
+
RETURN_IF_ERROR(finish_and_write_column_writer(_subcolumn_writers.back().get()));
+ release_processed_subcolumn_write_entry(&entry);
+ return Status::OK();
},
[this, &parent_column, variant_column, &converter, num_rows](int
doc_value_column_id) {
- return _write_doc_value_column(parent_column, variant_column,
converter.get(),
- doc_value_column_id, num_rows);
+ RETURN_IF_ERROR(_write_doc_value_column(parent_column,
variant_column,
+ converter.get(),
doc_value_column_id,
+ num_rows));
+
RETURN_IF_ERROR(finish_and_write_column_writer(_doc_value_column_writer.get()));
+ return Status::OK();
},
&column_stats));
@@ -1870,6 +1908,8 @@ Status VariantDocCompactWriter::finalize() {
for (const auto& [k, cnt] : column_stats) {
(*doc_value_column_non_null_size)[std::string(k)] = cnt;
}
+ _column = ColumnVariant::create(0, false);
+ _data_written = true;
_is_finalized = true;
return Status::OK();
}
diff --git a/be/src/storage/segment/variant/variant_column_writer_impl.h
b/be/src/storage/segment/variant/variant_column_writer_impl.h
index 15892ab6331..b4100388995 100644
--- a/be/src/storage/segment/variant/variant_column_writer_impl.h
+++ b/be/src/storage/segment/variant/variant_column_writer_impl.h
@@ -284,6 +284,7 @@ private:
const TabletColumn* _tablet_column = nullptr;
ColumnWriterOptions _opts;
bool _is_finalized = false;
+ bool _data_written = false;
std::unique_ptr<ColumnWriter> _doc_value_column_writer;
std::vector<std::unique_ptr<ColumnWriter>> _subcolumn_writers;
std::vector<TabletIndexes> _subcolumns_indexes;
diff --git a/be/test/storage/segment/variant_column_writer_reader_test.cpp
b/be/test/storage/segment/variant_column_writer_reader_test.cpp
index e309aa6f2b1..a91eb450357 100644
--- a/be/test/storage/segment/variant_column_writer_reader_test.cpp
+++ b/be/test/storage/segment/variant_column_writer_reader_test.cpp
@@ -145,6 +145,9 @@ static Status create_variant_root_reader(const
SegmentFooterPB& footer,
return Status::OK();
}
+static std::string expected_doc_bucket_json_from_full(const std::string&
full_json, int bucket_num,
+ int bucket_index);
+
class VariantColumnWriterReaderTest : public testing::Test {
public:
void SetUp() override {
@@ -334,6 +337,193 @@ protected:
return Status::OK();
}
+ void validate_doc_compact_writer_roundtrip(bool repeat_finish_write_calls)
{
+ constexpr int kRows = 200;
+ constexpr int kDocBuckets = 4;
+ constexpr int kBucket = 0;
+
+ TabletSchemaPB schema_pb;
+ schema_pb.set_keys_type(KeysType::DUP_KEYS);
+ construct_column(schema_pb.add_column(), 1, "VARIANT", "V1", 3, false,
false,
+ /*variant_sparse_hash_shard_count=*/0,
+ /*variant_enable_doc_mode=*/true,
+ /*variant_doc_materialization_min_rows=*/0,
+ /*variant_doc_hash_shard_count=*/kDocBuckets);
+ _tablet_schema = std::make_shared<TabletSchema>();
+ _tablet_schema->init_from_pb(schema_pb);
+
+ TabletColumn parent_column = _tablet_schema->column(0);
+ TabletColumn extracted_doc_bucket =
+ variant_util::create_doc_value_column(parent_column, kBucket);
+ extracted_doc_bucket.set_type(FieldType::OLAP_FIELD_TYPE_VARIANT);
+ extracted_doc_bucket.set_is_nullable(false);
+ _tablet_schema->append_column(extracted_doc_bucket);
+
+ TabletMetaSharedPtr tablet_meta(new TabletMeta(_tablet_schema));
+ _tablet_schema->set_external_segment_meta_used_default(false);
+ tablet_meta->_tablet_id = 33000;
+ _tablet = std::make_shared<Tablet>(*_engine_ref, tablet_meta,
_data_dir.get());
+ EXPECT_TRUE(_tablet->init().ok());
+
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
+
EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok());
+
+ io::FileWriterPtr file_writer;
+ auto file_path = local_segment_path(_tablet->tablet_path(), "0", 0);
+ auto st = io::global_local_filesystem()->create_file(file_path,
&file_writer);
+ EXPECT_TRUE(st.ok()) << st.msg();
+
+ SegmentFooterPB footer;
+
+ RowsetWriterContext rowset_ctx;
+ rowset_ctx.write_type = DataWriteType::TYPE_DIRECT;
+ rowset_ctx.tablet_schema = _tablet_schema;
+
+ ColumnWriterOptions root_opts;
+ root_opts.meta = footer.add_columns();
+ root_opts.compression_type = CompressionTypePB::LZ4;
+ root_opts.file_writer = file_writer.get();
+ root_opts.footer = &footer;
+ root_opts.rowset_ctx = &rowset_ctx;
+ _init_column_meta(root_opts.meta, 0, parent_column,
CompressionTypePB::LZ4);
+
+ std::unique_ptr<ColumnWriter> root_writer;
+ EXPECT_TRUE(ColumnWriter::create(root_opts, &parent_column,
file_writer.get(), &root_writer)
+ .ok());
+ EXPECT_TRUE(root_writer->init().ok());
+
+ TabletColumn extracted_doc_bucket_col = _tablet_schema->column(1);
+ ColumnWriterOptions doc_compact_opts = root_opts;
+ doc_compact_opts.meta = footer.add_columns();
+ _init_column_meta(doc_compact_opts.meta, 0, extracted_doc_bucket_col,
+ CompressionTypePB::LZ4);
+ std::unique_ptr<ColumnWriter> doc_compact_writer;
+ EXPECT_TRUE(ColumnWriter::create(doc_compact_opts,
&extracted_doc_bucket_col,
+ file_writer.get(),
&doc_compact_writer)
+ .ok());
+ EXPECT_TRUE(doc_compact_writer->init().ok());
+
+ std::unordered_map<int, std::string> inserted_full_json;
+ auto type_string = std::make_shared<DataTypeString>();
+ auto full_json_column = type_string->create_column();
+ auto* full_strings =
assert_cast<ColumnString*>(full_json_column.get());
+ VariantUtil::fill_string_column_with_test_data(full_strings, kRows,
&inserted_full_json);
+
+ std::unordered_map<int, std::string> expected_bucket_json;
+ auto bucket_json_column = type_string->create_column();
+ auto* bucket_strings =
assert_cast<ColumnString*>(bucket_json_column.get());
+ for (int i = 0; i < kRows; ++i) {
+ const std::string& full = inserted_full_json[i];
+ std::string bucket_json =
+ expected_doc_bucket_json_from_full(full, kDocBuckets,
kBucket);
+ expected_bucket_json.emplace(i, bucket_json);
+ bucket_strings->insert_data(bucket_json.data(),
bucket_json.size());
+ }
+
+ ParseConfig config;
+ config.deprecated_enable_flatten_nested = false;
+ config.parse_to = ParseConfig::ParseTo::OnlyDocValueColumn;
+
+ MutableColumnPtr root_variant =
+
ColumnVariant::create(parent_column.variant_max_subcolumns_count(), true);
+ variant_util::parse_json_to_variant(*root_variant, *full_strings,
config);
+
+ MutableColumnPtr bucket_variant =
+
ColumnVariant::create(parent_column.variant_max_subcolumns_count(), true);
+ variant_util::parse_json_to_variant(*bucket_variant, *bucket_strings,
config);
+
+ auto root_data = std::make_unique<VariantColumnData>();
+ root_data->column_data = root_variant.get();
+ root_data->row_pos = 0;
+ const auto* root_ptr = reinterpret_cast<const
uint8_t*>(root_data.get());
+ EXPECT_TRUE(root_writer->append_data(&root_ptr, kRows).ok());
+
+ auto bucket_data = std::make_unique<VariantColumnData>();
+ bucket_data->column_data = bucket_variant.get();
+ bucket_data->row_pos = 0;
+ const auto* bucket_ptr = reinterpret_cast<const
uint8_t*>(bucket_data.get());
+ EXPECT_TRUE(doc_compact_writer->append_data(&bucket_ptr, kRows).ok());
+
+ EXPECT_TRUE(root_writer->finish().ok());
+ EXPECT_TRUE(doc_compact_writer->finish().ok());
+ if (repeat_finish_write_calls) {
+ EXPECT_TRUE(doc_compact_writer->finish().ok());
+ }
+ EXPECT_TRUE(root_writer->write_data().ok());
+ EXPECT_TRUE(doc_compact_writer->write_data().ok());
+ if (repeat_finish_write_calls) {
+ EXPECT_TRUE(doc_compact_writer->write_data().ok());
+ EXPECT_TRUE(doc_compact_writer->finish().ok());
+ }
+ EXPECT_TRUE(root_writer->write_ordinal_index().ok());
+ EXPECT_TRUE(doc_compact_writer->write_ordinal_index().ok());
+ EXPECT_TRUE(file_writer->close().ok());
+ footer.set_num_rows(kRows);
+
+ io::FileReaderSPtr file_reader;
+ st = io::global_local_filesystem()->open_file(file_path, &file_reader);
+ EXPECT_TRUE(st.ok()) << st.msg();
+ std::shared_ptr<ColumnReader> column_reader;
+ st = create_variant_root_reader(footer, file_reader, _tablet_schema,
&column_reader);
+ EXPECT_TRUE(st.ok()) << st.msg();
+ auto* variant_column_reader =
assert_cast<VariantColumnReader*>(column_reader.get());
+ EXPECT_TRUE(variant_column_reader != nullptr);
+
+ bool checked_one_key = false;
+ for (int j = 0; j < 10; ++j) {
+ const std::string key = "key" + std::to_string(j);
+ StringRef ref {key.data(), key.size()};
+ if (variant_util::variant_binary_shard_of(ref, kDocBuckets) ==
+ static_cast<uint32_t>(kBucket)) {
+
EXPECT_TRUE(variant_column_reader->get_subcolumn_meta_by_path(PathInData(key))
!=
+ nullptr);
+ checked_one_key = true;
+ break;
+ }
+ }
+ EXPECT_TRUE(checked_one_key);
+
+ MockColumnReaderCache column_reader_cache(footer, file_reader,
_tablet_schema);
+ StorageReadOptions storage_read_opts;
+ storage_read_opts.io_ctx.reader_type =
ReaderType::READER_BASE_COMPACTION;
+ storage_read_opts.tablet_schema = _tablet_schema;
+ OlapReaderStatistics stats;
+ storage_read_opts.stats = &stats;
+
+ TabletColumn doc_bucket_map =
variant_util::create_doc_value_column(parent_column, kBucket);
+ ColumnIteratorUPtr it;
+ st = variant_column_reader->new_iterator(&it, &doc_bucket_map,
&storage_read_opts,
+ &column_reader_cache);
+ EXPECT_TRUE(st.ok()) << st.msg();
+
EXPECT_TRUE(dynamic_cast<segment_v2::VariantDocValueCompactIterator*>(it.get())
!= nullptr);
+
+ ColumnIteratorOptions column_iter_opts;
+ column_iter_opts.stats = &stats;
+ column_iter_opts.file_reader = file_reader.get();
+ st = it->init(column_iter_opts);
+ EXPECT_TRUE(st.ok()) << st.msg();
+
+ DataTypeSerDe::FormatOptions options;
+ auto tz = cctz::utc_time_zone();
+ options.timezone = &tz;
+
+ MutableColumnPtr dst =
+
ColumnVariant::create(parent_column.variant_max_subcolumns_count(), false);
+ size_t nrows = kRows;
+ st = it->seek_to_ordinal(0);
+ EXPECT_TRUE(st.ok()) << st.msg();
+ st = it->next_batch(&nrows, dst);
+ EXPECT_TRUE(st.ok()) << st.msg();
+ EXPECT_EQ(nrows, kRows);
+
+ for (int i = 0; i < kRows; ++i) {
+ std::string value;
+
assert_cast<ColumnVariant*>(dst.get())->serialize_one_row_to_string(i, &value,
options);
+ EXPECT_EQ(value, expected_bucket_json[i]);
+ }
+
+
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
+ }
+
TabletSchemaSPtr _tablet_schema = nullptr;
StorageEngine* _engine_ref = nullptr;
std::unique_ptr<DataDir> _data_dir = nullptr;
@@ -1435,198 +1625,12 @@ TEST_F(VariantColumnWriterReaderTest,
test_read_doc_compact_from_doc_value_bucke
}
TEST_F(VariantColumnWriterReaderTest,
test_write_doc_compact_writer_and_read_doc_compact) {
- constexpr int kRows = 200;
- constexpr int kDocBuckets = 4;
- constexpr int kBucket = 0;
-
- // 1. create tablet_schema: root variant is in doc mode; plus one
extracted doc bucket column
- TabletSchemaPB schema_pb;
- schema_pb.set_keys_type(KeysType::DUP_KEYS);
- construct_column(schema_pb.add_column(), 1, "VARIANT", "V1", 3, false,
false,
- /*variant_sparse_hash_shard_count=*/0,
- /*variant_enable_doc_mode=*/true,
- /*variant_doc_materialization_min_rows=*/0,
- /*variant_doc_hash_shard_count=*/kDocBuckets);
- _tablet_schema = std::make_shared<TabletSchema>();
- _tablet_schema->init_from_pb(schema_pb);
-
- TabletColumn parent_column = _tablet_schema->column(0);
- TabletColumn extracted_doc_bucket =
- variant_util::create_doc_value_column(parent_column, kBucket);
- // This matches VariantCompactionUtil::get_extended_compaction_schema
behavior:
- // extracted doc bucket columns are represented as VARIANT to trigger
VariantDocCompactWriter.
- extracted_doc_bucket.set_type(FieldType::OLAP_FIELD_TYPE_VARIANT);
- extracted_doc_bucket.set_is_nullable(false);
- _tablet_schema->append_column(extracted_doc_bucket);
-
- // 2. create tablet
- TabletMetaSharedPtr tablet_meta(new TabletMeta(_tablet_schema));
- _tablet_schema->set_external_segment_meta_used_default(false);
- tablet_meta->_tablet_id = 33000;
- _tablet = std::make_shared<Tablet>(*_engine_ref, tablet_meta,
_data_dir.get());
- EXPECT_TRUE(_tablet->init().ok());
-
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
-
EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok());
-
- // 3. create file_writer
- io::FileWriterPtr file_writer;
- auto file_path = local_segment_path(_tablet->tablet_path(), "0", 0);
- auto st = io::global_local_filesystem()->create_file(file_path,
&file_writer);
- EXPECT_TRUE(st.ok()) << st.msg();
-
- // 4. create column writers: root VariantColumnWriter + extracted
VariantDocCompactWriter
- SegmentFooterPB footer;
-
- RowsetWriterContext rowset_ctx;
- rowset_ctx.write_type = DataWriteType::TYPE_DIRECT;
- rowset_ctx.tablet_schema = _tablet_schema;
-
- ColumnWriterOptions root_opts;
- root_opts.meta = footer.add_columns();
- root_opts.compression_type = CompressionTypePB::LZ4;
- root_opts.file_writer = file_writer.get();
- root_opts.footer = &footer;
- root_opts.rowset_ctx = &rowset_ctx;
- _init_column_meta(root_opts.meta, 0, parent_column,
CompressionTypePB::LZ4);
-
- std::unique_ptr<ColumnWriter> root_writer;
- EXPECT_TRUE(
- ColumnWriter::create(root_opts, &parent_column, file_writer.get(),
&root_writer).ok());
- EXPECT_TRUE(root_writer->init().ok());
-
- TabletColumn extracted_doc_bucket_col = _tablet_schema->column(1);
- ColumnWriterOptions doc_compact_opts = root_opts;
- doc_compact_opts.meta = footer.add_columns();
- _init_column_meta(doc_compact_opts.meta, 0, extracted_doc_bucket_col,
CompressionTypePB::LZ4);
- std::unique_ptr<ColumnWriter> doc_compact_writer;
- EXPECT_TRUE(ColumnWriter::create(doc_compact_opts,
&extracted_doc_bucket_col, file_writer.get(),
- &doc_compact_writer)
- .ok());
- EXPECT_TRUE(doc_compact_writer->init().ok());
-
- // 5. build doc-value-only data:
- // - root column uses the full JSON (doc values only is enough for this
test)
- // - extracted doc bucket column uses bucket-filtered JSON so that doc
bucket data matches
- // the bucket index expected by VariantDocCompactWriter.
- std::unordered_map<int, std::string> inserted_full_json;
- auto type_string = std::make_shared<DataTypeString>();
- auto full_json_column = type_string->create_column();
- auto* full_strings = assert_cast<ColumnString*>(full_json_column.get());
- VariantUtil::fill_string_column_with_test_data(full_strings, kRows,
&inserted_full_json);
-
- std::unordered_map<int, std::string> expected_bucket_json;
- auto bucket_json_column = type_string->create_column();
- auto* bucket_strings =
assert_cast<ColumnString*>(bucket_json_column.get());
- for (int i = 0; i < kRows; ++i) {
- const std::string& full = inserted_full_json[i];
- std::string bucket_json = expected_doc_bucket_json_from_full(full,
kDocBuckets, kBucket);
- expected_bucket_json.emplace(i, bucket_json);
- bucket_strings->insert_data(bucket_json.data(), bucket_json.size());
- }
-
- ParseConfig config;
- config.deprecated_enable_flatten_nested = false;
- config.parse_to = ParseConfig::ParseTo::OnlyDocValueColumn;
-
- MutableColumnPtr root_variant =
-
ColumnVariant::create(parent_column.variant_max_subcolumns_count(), true);
- variant_util::parse_json_to_variant(*root_variant, *full_strings, config);
-
- MutableColumnPtr bucket_variant =
-
ColumnVariant::create(parent_column.variant_max_subcolumns_count(), true);
- variant_util::parse_json_to_variant(*bucket_variant, *bucket_strings,
config);
-
- // 6. append and write
- {
- auto root_data = std::make_unique<VariantColumnData>();
- root_data->column_data = root_variant.get();
- root_data->row_pos = 0;
- const auto* data = reinterpret_cast<const uint8_t*>(root_data.get());
- EXPECT_TRUE(root_writer->append_data(&data, kRows).ok());
- }
- {
- auto bucket_data = std::make_unique<VariantColumnData>();
- bucket_data->column_data = bucket_variant.get();
- bucket_data->row_pos = 0;
- const auto* data = reinterpret_cast<const uint8_t*>(bucket_data.get());
- EXPECT_TRUE(doc_compact_writer->append_data(&data, kRows).ok());
- }
-
- EXPECT_TRUE(root_writer->finish().ok());
- EXPECT_TRUE(doc_compact_writer->finish().ok());
- EXPECT_TRUE(root_writer->write_data().ok());
- EXPECT_TRUE(doc_compact_writer->write_data().ok());
- EXPECT_TRUE(root_writer->write_ordinal_index().ok());
- EXPECT_TRUE(doc_compact_writer->write_ordinal_index().ok());
- EXPECT_TRUE(file_writer->close().ok());
- footer.set_num_rows(kRows);
-
- // 7. open reader and validate:
- // - doc bucket can be read via DOC_COMPACT iterator in flat-leaf
compaction mode
- // - materialized leaf meta exists for at least one key in this bucket
- io::FileReaderSPtr file_reader;
- st = io::global_local_filesystem()->open_file(file_path, &file_reader);
- EXPECT_TRUE(st.ok()) << st.msg();
- std::shared_ptr<ColumnReader> column_reader;
- st = create_variant_root_reader(footer, file_reader, _tablet_schema,
&column_reader);
- EXPECT_TRUE(st.ok()) << st.msg();
- auto* variant_column_reader =
assert_cast<VariantColumnReader*>(column_reader.get());
- EXPECT_TRUE(variant_column_reader != nullptr);
-
- bool checked_one_key = false;
- for (int j = 0; j < 10; ++j) {
- const std::string key = "key" + std::to_string(j);
- StringRef ref {key.data(), key.size()};
- if (variant_util::variant_binary_shard_of(ref, kDocBuckets) ==
- static_cast<uint32_t>(kBucket)) {
-
EXPECT_TRUE(variant_column_reader->get_subcolumn_meta_by_path(PathInData(key))
!=
- nullptr);
- checked_one_key = true;
- break;
- }
- }
- EXPECT_TRUE(checked_one_key);
-
- MockColumnReaderCache column_reader_cache(footer, file_reader,
_tablet_schema);
- StorageReadOptions storage_read_opts;
- storage_read_opts.io_ctx.reader_type = ReaderType::READER_BASE_COMPACTION;
- storage_read_opts.tablet_schema = _tablet_schema;
- OlapReaderStatistics stats;
- storage_read_opts.stats = &stats;
-
- TabletColumn doc_bucket_map =
variant_util::create_doc_value_column(parent_column, kBucket);
- ColumnIteratorUPtr it;
- st = variant_column_reader->new_iterator(&it, &doc_bucket_map,
&storage_read_opts,
- &column_reader_cache);
- EXPECT_TRUE(st.ok()) << st.msg();
-
EXPECT_TRUE(dynamic_cast<segment_v2::VariantDocValueCompactIterator*>(it.get())
!= nullptr);
-
- ColumnIteratorOptions column_iter_opts;
- column_iter_opts.stats = &stats;
- column_iter_opts.file_reader = file_reader.get();
- st = it->init(column_iter_opts);
- EXPECT_TRUE(st.ok()) << st.msg();
-
- DataTypeSerDe::FormatOptions options;
- auto tz = cctz::utc_time_zone();
- options.timezone = &tz;
-
- MutableColumnPtr dst =
-
ColumnVariant::create(parent_column.variant_max_subcolumns_count(), false);
- size_t nrows = kRows;
- st = it->seek_to_ordinal(0);
- EXPECT_TRUE(st.ok()) << st.msg();
- st = it->next_batch(&nrows, dst);
- EXPECT_TRUE(st.ok()) << st.msg();
- EXPECT_EQ(nrows, kRows);
-
- for (int i = 0; i < kRows; ++i) {
- std::string value;
- assert_cast<ColumnVariant*>(dst.get())->serialize_one_row_to_string(i,
&value, options);
- EXPECT_EQ(value, expected_bucket_json[i]);
- }
+ validate_doc_compact_writer_roundtrip(false);
+}
-
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok());
+TEST_F(VariantColumnWriterReaderTest,
+
test_write_doc_compact_writer_finish_write_data_idempotent_and_read_doc_compact)
{
+ validate_doc_compact_writer_roundtrip(true);
}
TEST_F(VariantColumnWriterReaderTest, test_doc_compact_sparse_write_array_gap)
{
diff --git a/be/test/storage/segment/variant_util_test.cpp
b/be/test/storage/segment/variant_util_test.cpp
index 9fb72343eba..e9f126faca0 100644
--- a/be/test/storage/segment/variant_util_test.cpp
+++ b/be/test/storage/segment/variant_util_test.cpp
@@ -95,6 +95,67 @@ TEST(VariantUtilTest,
ParseDocValueToSubcolumns_FillsDefaultsAndValues) {
EXPECT_EQ(fb.field.get_type(), PrimitiveType::TYPE_NULL); // missing
}
+TEST(VariantUtilTest,
MaterializeDocsToSubcolumnsMap_ExpectedUniquePathsPreservesValues) {
+ const std::vector<std::string_view> jsons = {
+ R"({"a":1,"b":"x"})", //
+ R"({"b":"y","c":2})", //
+ R"({"a":3,"c":4})", //
+ };
+
+ auto variant = ColumnVariant::create(0, true);
+ auto json_col = _make_json_column(jsons);
+
+ ParseConfig cfg;
+ cfg.deprecated_enable_flatten_nested = false;
+ cfg.parse_to = ParseConfig::ParseTo::OnlyDocValueColumn;
+ parse_json_to_variant(*variant, *json_col, cfg);
+
+ EXPECT_TRUE(variant->is_doc_mode());
+
+ auto default_subcolumns = materialize_docs_to_subcolumns_map(*variant);
+ auto subcolumns = materialize_docs_to_subcolumns_map(*variant, 3);
+ ASSERT_EQ(subcolumns.size(), default_subcolumns.size());
+ ASSERT_EQ(subcolumns.size(), 3);
+
+ auto& a = subcolumns.at("a");
+ auto& b = subcolumns.at("b");
+ auto& c = subcolumns.at("c");
+ a.finalize();
+ b.finalize();
+ c.finalize();
+ EXPECT_EQ(a.size(), jsons.size());
+ EXPECT_EQ(b.size(), jsons.size());
+ EXPECT_EQ(c.size(), jsons.size());
+
+ FieldWithDataType f;
+ a.get(0, f);
+ EXPECT_EQ(f.field.get_type(), PrimitiveType::TYPE_BIGINT);
+ EXPECT_EQ(f.field.get<TYPE_BIGINT>(), 1);
+ a.get(1, f);
+ EXPECT_EQ(f.field.get_type(), PrimitiveType::TYPE_NULL);
+ a.get(2, f);
+ EXPECT_EQ(f.field.get_type(), PrimitiveType::TYPE_BIGINT);
+ EXPECT_EQ(f.field.get<TYPE_BIGINT>(), 3);
+
+ b.get(0, f);
+ EXPECT_EQ(f.field.get_type(), PrimitiveType::TYPE_STRING);
+ EXPECT_EQ(f.field.get<TYPE_STRING>(), "x");
+ b.get(1, f);
+ EXPECT_EQ(f.field.get_type(), PrimitiveType::TYPE_STRING);
+ EXPECT_EQ(f.field.get<TYPE_STRING>(), "y");
+ b.get(2, f);
+ EXPECT_EQ(f.field.get_type(), PrimitiveType::TYPE_NULL);
+
+ c.get(0, f);
+ EXPECT_EQ(f.field.get_type(), PrimitiveType::TYPE_NULL);
+ c.get(1, f);
+ EXPECT_EQ(f.field.get_type(), PrimitiveType::TYPE_BIGINT);
+ EXPECT_EQ(f.field.get<TYPE_BIGINT>(), 2);
+ c.get(2, f);
+ EXPECT_EQ(f.field.get_type(), PrimitiveType::TYPE_BIGINT);
+ EXPECT_EQ(f.field.get<TYPE_BIGINT>(), 4);
+}
+
TEST(VariantUtilTest, ParseOnlyDocValueColumn_SerializesMixedTypes) {
const std::vector<std::string_view> jsons = {
R"({"b":true,"d":1.5,"u":18446744073709551615,"arr":[1,2,3],"arr2":[[1],[2]],"s":"x"})",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]