This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/main by this push:
new 6842191232 AVRO-4174: [C++] Support read and write key-value metadata
(#3468)
6842191232 is described below
commit 6842191232cac9620d4b0b6d57375267a10f2d48
Author: Gang Wu <[email protected]>
AuthorDate: Tue Sep 2 18:54:56 2025 +0800
AVRO-4174: [C++] Support read and write key-value metadata (#3468)
* AVRO-4174: [C++] Support read and write key-value metadata
* refine docstring
---
lang/c++/impl/DataFile.cc | 43 ++++++------
lang/c++/include/avro/DataFile.hh | 41 +++++++++--
lang/c++/test/DataFileTests.cc | 143 ++++++++++++++++++++++++++++++++++++++
3 files changed, 200 insertions(+), 27 deletions(-)
diff --git a/lang/c++/impl/DataFile.cc b/lang/c++/impl/DataFile.cc
index f633b959ca..08c6bdce23 100644
--- a/lang/c++/impl/DataFile.cc
+++ b/lang/c++/impl/DataFile.cc
@@ -66,30 +66,33 @@ const size_t zlibBufGrowSize = 128 * 1024;
} // namespace
DataFileWriterBase::DataFileWriterBase(const char *filename, const ValidSchema
&schema, size_t syncInterval,
- Codec codec) : filename_(filename),
- schema_(schema),
-
encoderPtr_(binaryEncoder()),
-
syncInterval_(syncInterval),
- codec_(codec),
-
stream_(fileOutputStream(filename)),
-
buffer_(memoryOutputStream()),
- sync_(makeSync()),
- objectCount_(0),
- lastSync_(0) {
+ Codec codec, const Metadata &metadata)
: filename_(filename),
+
schema_(schema),
+
encoderPtr_(binaryEncoder()),
+
syncInterval_(syncInterval),
+
codec_(codec),
+
stream_(fileOutputStream(filename)),
+
buffer_(memoryOutputStream()),
+
sync_(makeSync()),
+
objectCount_(0),
+
metadata_(metadata),
+
lastSync_(0) {
init(schema, syncInterval, codec);
}
DataFileWriterBase::DataFileWriterBase(std::unique_ptr<OutputStream>
outputStream,
- const ValidSchema &schema, size_t
syncInterval, Codec codec) : filename_(),
-
schema_(schema),
-
encoderPtr_(binaryEncoder()),
-
syncInterval_(syncInterval),
-
codec_(codec),
-
stream_(std::move(outputStream)),
-
buffer_(memoryOutputStream()),
-
sync_(makeSync()),
-
objectCount_(0),
-
lastSync_(0) {
+ const ValidSchema &schema, size_t
syncInterval,
+ Codec codec, const Metadata &metadata)
: filename_(),
+
schema_(schema),
+
encoderPtr_(binaryEncoder()),
+
syncInterval_(syncInterval),
+
codec_(codec),
+
stream_(std::move(outputStream)),
+
buffer_(memoryOutputStream()),
+
sync_(makeSync()),
+
objectCount_(0),
+
metadata_(metadata),
+
lastSync_(0) {
init(schema, syncInterval, codec);
}
diff --git a/lang/c++/include/avro/DataFile.hh
b/lang/c++/include/avro/DataFile.hh
index 8ff5d88c61..27f9c43913 100644
--- a/lang/c++/include/avro/DataFile.hh
+++ b/lang/c++/include/avro/DataFile.hh
@@ -54,6 +54,22 @@ const int SyncSize = 16;
*/
typedef std::array<uint8_t, SyncSize> DataFileSync;
+/**
+ * Avro files may include arbitrary user-specified metadata.
+ * File metadata is written as if defined by the following map schema:
+ *
+ * `{"type": "map", "values": "bytes"}`
+ *
+ * All metadata properties that start with "avro." are reserved.
+ * The following file metadata properties are currently used:
+ *
+ * - `avro.schema` contains the schema of objects stored in the file, as JSON
data (required).
+ * - `avro.codec`, the name of the compression codec used to compress blocks,
as a string.
+ * Implementations are required to support the following codecs: "null" and
"deflate".
+ * If codec is absent, it is assumed to be "null". See avro.codecs for
implementation details.
+ */
+typedef std::map<std::string, std::vector<uint8_t>> Metadata;
+
/**
* Type-independent portion of DataFileWriter.
* At any given point in time, at most one file can be written using
@@ -71,8 +87,6 @@ class AVRO_DECL DataFileWriterBase {
const DataFileSync sync_;
int64_t objectCount_;
- typedef std::map<std::string, std::vector<uint8_t>> Metadata;
-
Metadata metadata_;
int64_t lastSync_;
@@ -119,9 +133,11 @@ public:
* Constructs a data file writer with the given sync interval and name.
*/
DataFileWriterBase(const char *filename, const ValidSchema &schema,
- size_t syncInterval, Codec codec = NULL_CODEC);
+ size_t syncInterval, Codec codec = NULL_CODEC,
+ const Metadata &metadata = {});
DataFileWriterBase(std::unique_ptr<OutputStream> outputStream,
- const ValidSchema &schema, size_t syncInterval, Codec
codec);
+ const ValidSchema &schema, size_t syncInterval, Codec
codec,
+ const Metadata &metadata = {});
DataFileWriterBase(const DataFileWriterBase &) = delete;
DataFileWriterBase &operator=(const DataFileWriterBase &) = delete;
@@ -156,10 +172,12 @@ public:
* Constructs a new data file.
*/
DataFileWriter(const char *filename, const ValidSchema &schema,
- size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC)
: base_(new DataFileWriterBase(filename, schema, syncInterval, codec)) {}
+ size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC,
+ const Metadata &metadata = {}) :
base_(std::make_unique<DataFileWriterBase>(filename, schema, syncInterval,
codec, metadata)) {}
DataFileWriter(std::unique_ptr<OutputStream> outputStream, const
ValidSchema &schema,
- size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC)
: base_(new DataFileWriterBase(std::move(outputStream), schema, syncInterval,
codec)) {}
+ size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC,
+ const Metadata &metadata = {}) :
base_(std::make_unique<DataFileWriterBase>(std::move(outputStream), schema,
syncInterval, codec, metadata)) {}
DataFileWriter(const DataFileWriter &) = delete;
DataFileWriter &operator=(const DataFileWriter &) = delete;
@@ -212,7 +230,6 @@ class AVRO_DECL DataFileReaderBase {
ValidSchema dataSchema_;
DecoderPtr dataDecoder_;
std::unique_ptr<InputStream> dataStream_;
- typedef std::map<std::string, std::vector<uint8_t>> Metadata;
Metadata metadata_;
DataFileSync sync_{};
@@ -306,6 +323,11 @@ public:
* Return the last synchronization point before our current position.
*/
int64_t previousSync() const;
+
+ /**
+ * Return the metadata for the data file.
+ */
+ const Metadata &metadata() const { return metadata_; }
};
/**
@@ -421,6 +443,11 @@ public:
* Return the last synchronization point before our current position.
*/
int64_t previousSync() { return base_->previousSync(); }
+
+ /**
+ * Return the metadata for the data file.
+ */
+ const Metadata &metadata() const { return base_->metadata(); }
};
} // namespace avro
diff --git a/lang/c++/test/DataFileTests.cc b/lang/c++/test/DataFileTests.cc
index 07a6b6b9a3..3723615fb8 100644
--- a/lang/c++/test/DataFileTests.cc
+++ b/lang/c++/test/DataFileTests.cc
@@ -716,6 +716,60 @@ public:
ComplexDouble unused;
BOOST_CHECK_NO_THROW(df.write(unused)); // write has not effect on
closed stream
}
+
+ void testMetadata() {
+ avro::Metadata customMetadata;
+ std::string key1 = "author";
+ std::string value1 = "test-user";
+ customMetadata[key1] = std::vector<uint8_t>(value1.begin(),
value1.end());
+
+ std::string key2 = "version";
+ std::string value2 = "1.0.0";
+ customMetadata[key2] = std::vector<uint8_t>(value2.begin(),
value2.end());
+
+ std::string key3 = "description";
+ std::string value3 = "Test file with custom metadata";
+ customMetadata[key3] = std::vector<uint8_t>(value3.begin(),
value3.end());
+
+ // Write data with custom metadata
+ {
+ avro::DataFileWriter<ComplexInteger> df(filename, writerSchema,
100, avro::NULL_CODEC, customMetadata);
+ int64_t re = 10;
+ int64_t im = 20;
+ for (int i = 0; i < 5; ++i, re += 5, im += 10) {
+ ComplexInteger c(re, im);
+ df.write(c);
+ }
+ df.close();
+ }
+
+ // Read and verify metadata
+ {
+ avro::DataFileReader<ComplexInteger> df(filename, writerSchema);
+ const avro::Metadata &readMetadata = df.metadata();
+
+ // Check that our custom metadata is present
+ auto it1 = readMetadata.find(key1);
+ BOOST_CHECK(it1 != readMetadata.end());
+ BOOST_CHECK_EQUAL(std::string(it1->second.begin(),
it1->second.end()), value1);
+
+ auto it2 = readMetadata.find(key2);
+ BOOST_CHECK(it2 != readMetadata.end());
+ BOOST_CHECK_EQUAL(std::string(it2->second.begin(),
it2->second.end()), value2);
+
+ auto it3 = readMetadata.find(key3);
+ BOOST_CHECK(it3 != readMetadata.end());
+ BOOST_CHECK_EQUAL(std::string(it3->second.begin(),
it3->second.end()), value3);
+
+ // Check that standard metadata is also present
+ auto schemaIt = readMetadata.find("avro.schema");
+ BOOST_CHECK(schemaIt != readMetadata.end());
+
+ auto codecIt = readMetadata.find("avro.codec");
+ BOOST_CHECK(codecIt != readMetadata.end());
+ BOOST_CHECK_EQUAL(std::string(codecIt->second.begin(),
codecIt->second.end()), "null");
+ }
+ }
};
void addReaderTests(test_suite *ts, const shared_ptr<DataFileTest> &t) {
@@ -1082,6 +1136,79 @@ void testReadRecordEfficientlyUsingLastSyncZstdCodec() {
}
#endif
+void testMetadataWithCodec(avro::Codec codec) {
+ const char *filename = "test_metadata_codec.df";
+ avro::ValidSchema schema = avro::compileJsonSchemaFromString(sch);
+
+ avro::Metadata customMetadata;
+ std::string key1 = "test.key1";
+ std::string value1 = "test-value-1";
+ customMetadata[key1] = std::vector<uint8_t>(value1.begin(), value1.end());
+
+ std::string key2 = "test.key2";
+ std::string value2 = "test-value-2-with-special-chars: !@#$%^&*()";
+ customMetadata[key2] = std::vector<uint8_t>(value2.begin(), value2.end());
+
+ // Write data with custom metadata
+ {
+ avro::DataFileWriter<ComplexInteger> writer(filename, schema, 100,
codec, customMetadata);
+ for (int i = 0; i < 10; ++i) {
+ ComplexInteger c(i * 2, i * 3);
+ writer.write(c);
+ }
+ writer.close();
+ }
+
+ // Read and verify metadata
+ {
+ avro::DataFileReader<ComplexInteger> reader(filename, schema);
+ const avro::Metadata &readMetadata = reader.metadata();
+
+ // Verify custom metadata
+ auto it1 = readMetadata.find(key1);
+ BOOST_CHECK(it1 != readMetadata.end());
+ BOOST_CHECK_EQUAL(std::string(it1->second.begin(), it1->second.end()),
value1);
+
+ auto it2 = readMetadata.find(key2);
+ BOOST_CHECK(it2 != readMetadata.end());
+ BOOST_CHECK_EQUAL(std::string(it2->second.begin(), it2->second.end()),
value2);
+
+ // Verify standard metadata
+ auto schemaIt = readMetadata.find("avro.schema");
+ BOOST_CHECK(schemaIt != readMetadata.end());
+
+ auto codecIt = readMetadata.find("avro.codec");
+ BOOST_CHECK(codecIt != readMetadata.end());
+ }
+
+ // Clean up
+ std::filesystem::remove(filename);
+}
+
+void testMetadataWithNullCodec() {
+ BOOST_TEST_CHECKPOINT(__func__);
+ testMetadataWithCodec(avro::NULL_CODEC);
+}
+
+void testMetadataWithDeflateCodec() {
+ BOOST_TEST_CHECKPOINT(__func__);
+ testMetadataWithCodec(avro::DEFLATE_CODEC);
+}
+
+#ifdef SNAPPY_CODEC_AVAILABLE
+void testMetadataWithSnappyCodec() {
+ BOOST_TEST_CHECKPOINT(__func__);
+ testMetadataWithCodec(avro::SNAPPY_CODEC);
+}
+#endif
+
+#ifdef ZSTD_CODEC_AVAILABLE
+void testMetadataWithZstdCodec() {
+ BOOST_TEST_CHECKPOINT(__func__);
+ testMetadataWithCodec(avro::ZSTD_CODEC);
+}
+#endif
+
test_suite *
init_unit_test_suite(int, char *[]) {
{
@@ -1232,6 +1359,13 @@ init_unit_test_suite(int, char *[]) {
ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testCleanup, t));
boost::unit_test::framework::master_test_suite().add(ts);
}
+ {
+ auto *ts = BOOST_TEST_SUITE("DataFile tests: test15.df");
+ shared_ptr<DataFileTest> t(new DataFileTest("test15.df", sch, isch));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testMetadata, t));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testCleanup, t));
+ boost::unit_test::framework::master_test_suite().add(ts);
+ }
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testSkipStringNullCodec));
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testSkipStringDeflateCodec));
@@ -1260,5 +1394,14 @@ init_unit_test_suite(int, char *[]) {
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testReadRecordEfficientlyUsingLastSyncZstdCodec));
#endif
+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testMetadataWithNullCodec));
+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testMetadataWithDeflateCodec));
+#ifdef SNAPPY_CODEC_AVAILABLE
+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testMetadataWithSnappyCodec));
+#endif
+#ifdef ZSTD_CODEC_AVAILABLE
+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testMetadataWithZstdCodec));
+#endif
+
return nullptr;
}