This is an automated email from the ASF dual-hosted git repository.
caiconghui pushed a commit to branch orc-2.1
in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git
The following commit(s) were added to refs/heads/orc-2.1 by this push:
new 8e7a0511980 [Optimize] Support pread cache for index/stripe/file for
orc
8e7a0511980 is described below
commit 8e7a0511980039f29802c25f956e3a907967333a
Author: caiconghui1 <[email protected]>
AuthorDate: Thu Aug 14 00:11:33 2025 +0800
[Optimize] Support pread cache for index/stripe/file for orc
---
c++/include/orc/OrcFile.hh | 7 +++++++
c++/src/Reader.cc | 34 ++++++++++++----------------------
2 files changed, 19 insertions(+), 22 deletions(-)
diff --git a/c++/include/orc/OrcFile.hh b/c++/include/orc/OrcFile.hh
index d840e67cf28..d4d554a3501 100644
--- a/c++/include/orc/OrcFile.hh
+++ b/c++/include/orc/OrcFile.hh
@@ -36,6 +36,12 @@ namespace orc {
*/
class InputStream {
public:
+ enum class CacheType {
+ FILE,
+ STRIPE,
+ ROW_GROUP_INDEX,
+ };
+
virtual ~InputStream();
/**
@@ -65,6 +71,7 @@ namespace orc {
virtual void beforeReadStripe(std::unique_ptr<StripeInformation>
currentStripeInformation,
std::vector<bool>& selectedColumns);
+ virtual void preadCache(CacheType cacheType, uint64_t offset, uint64_t
length);
};
/**
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 4a5fb8f73b5..4c33a3c9aff 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -540,22 +540,9 @@ namespace orc {
// reset all previous row indexes
rowIndexes.clear();
bloomFilterIndex.clear();
- static const uint64_t MAX_READ_STRIPE_INDEX_SIZE_FOR_BUFFER = 1024 * 1024;
- const char* stripeIndexBuffer = nullptr;
uint64_t offset = currentStripeInfo.offset();
- uint64_t startOffset = offset;
- uint64_t stripeIndexSize = currentStripeInfo.indexlength();
- std::unique_ptr<SeekableFileInputStream> stripeIndexInputStream = nullptr;
- if (stripeIndexSize < MAX_READ_STRIPE_INDEX_SIZE_FOR_BUFFER) {
- stripeIndexInputStream = std::unique_ptr<SeekableFileInputStream>(new
SeekableFileInputStream(
- contents->stream.get(), startOffset, stripeIndexSize, *contents->pool,
stripeIndexSize));
- int size = 0;
- const void* buffer = nullptr;
- if (!stripeIndexInputStream->Next(&buffer, &size) ||
static_cast<uint64_t>(size) != stripeIndexSize) {
- throw ParseError("Failed to read the stripe index");
- }
- stripeIndexBuffer = static_cast<const char*>(buffer);
- }
+ uint64_t rowIndexSize = currentStripeInfo.indexlength();
+ contents->stream->preadCache(InputStream::CacheType::ROW_GROUP_INDEX,
offset, rowIndexSize);
// obtain row indexes for selected columns
for (int i = 0; i < currentStripeFooter.streams_size(); ++i) {
const proto::Stream& pbStream = currentStripeFooter.streams(i);
@@ -563,13 +550,10 @@ namespace orc {
if (selectedColumns[colId] && pbStream.has_kind() &&
(pbStream.kind() == proto::Stream_Kind_ROW_INDEX ||
pbStream.kind() == proto::Stream_Kind_BLOOM_FILTER_UTF8)) {
- std::unique_ptr<SeekableInputStream> inputStream = stripeIndexBuffer ?
std::unique_ptr<SeekableInputStream>(
- new SeekableArrayInputStream(stripeIndexBuffer + offset -
startOffset, pbStream.length()))
- : std::unique_ptr<SeekableInputStream>(new
SeekableFileInputStream(contents->stream.get(), offset,
- pbStream.length(), *contents->pool));
std::unique_ptr<SeekableInputStream> inStream = createDecompressor(
getCompression(),
- std::move(inputStream),
+ std::unique_ptr<SeekableInputStream>(new
SeekableFileInputStream(contents->stream.get(),
+ offset, pbStream.length(), *contents->pool)),
getCompressionSize(), *contents->pool, contents->readerMetrics);
if (pbStream.kind() == proto::Stream_Kind_ROW_INDEX) {
@@ -1190,8 +1174,9 @@ namespace orc {
while (currentStripe < lastStripe) {
currentStripeInfo = footer->stripes(static_cast<int>(currentStripe));
uint64_t fileLength = contents->stream->getLength();
- if (currentStripeInfo.offset() + currentStripeInfo.indexlength() +
- currentStripeInfo.datalength() +
currentStripeInfo.footerlength() >=
+ size_t stripeSize = currentStripeInfo.indexlength() +
currentStripeInfo.datalength()
+ + currentStripeInfo.footerlength();
+ if (currentStripeInfo.offset() + stripeSize >=
fileLength) {
std::stringstream msg;
msg << "Malformed StripeInformation at stripe index " << currentStripe
@@ -1221,6 +1206,7 @@ namespace orc {
continue;
}
}
+ contents->stream->preadCache(InputStream::CacheType::STRIPE,
currentStripeInfo.offset(), stripeSize);
currentStripeFooter = getStripeFooter(currentStripeInfo,
*contents.get());
rowsInCurrentStripe = currentStripeInfo.numberofrows();
processingStripe = currentStripe;
@@ -1678,6 +1664,7 @@ namespace orc {
throw ParseError("File size too small");
}
auto buffer = std::make_unique<DataBuffer<char>>(*contents->pool,
readSize);
+ stream->preadCache(InputStream::CacheType::FILE, 0, fileLength);
stream->read(buffer->data(), readSize, fileLength - readSize);
postscriptLength = buffer->data()[readSize - 1] & 0xff;
@@ -1780,4 +1767,7 @@ namespace orc {
void InputStream::beforeReadStripe(std::unique_ptr<StripeInformation>
currentStripeInformation,
std::vector<bool>& selectedColumns) {}
+ void InputStream::preadCache(CacheType cacheType, uint64_t offset, uint64_t
length) {}
+
+
} // namespace orc
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]