This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch orc in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git
The following commit(s) were added to refs/heads/orc by this push: new db01184f765 [opt] support orc merge multi stripes io (#244) db01184f765 is described below commit db01184f765c03496e4107bd3ac37c077ac4bc5f Author: daidai <2017501...@qq.com> AuthorDate: Fri Oct 18 23:17:33 2024 +0800 [opt] support orc merge multi stripes io (#244) --- c++/include/orc/Reader.hh | 8 +++++- c++/src/Reader.cc | 71 +++++++++++++++++++++++++++++++++++++---------- c++/src/Reader.hh | 9 +++++- 3 files changed, 71 insertions(+), 17 deletions(-) diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh index 5843d88c059..c9be47e0d8b 100644 --- a/c++/include/orc/Reader.hh +++ b/c++/include/orc/Reader.hh @@ -40,7 +40,7 @@ namespace orc { // classes that hold data members so we can maintain binary compatibility struct ReaderOptionsPrivate; struct RowReaderOptionsPrivate; - + class InputStream; /** * Expose the reader metrics including the latency and * number of calls of the decompression/decoding/IO modules. @@ -633,6 +633,12 @@ namespace orc { */ virtual std::map<uint32_t, BloomFilterIndex> getBloomFilters( uint32_t stripeIndex, const std::set<uint32_t>& included) const = 0; + + virtual InputStream* getStream() const = 0; + + virtual void setStream(std::unique_ptr<InputStream>) = 0; + + virtual std::vector<int> getNeedReadStripes(const RowReaderOptions& opts) = 0; }; /** diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc index 80a5cfd4b7d..eddeeee0b05 100644 --- a/c++/src/Reader.cc +++ b/c++/src/Reader.cc @@ -306,7 +306,8 @@ namespace orc { column_selector.updateSelected(selectedColumns, opts); // prepare SargsApplier if SearchArgument is available - if (opts.getSearchArgument() && footer->rowindexstride() > 0) { + sargsApplier = std::move(contents->sargsApplier); + if (sargsApplier == nullptr && opts.getSearchArgument() && footer->rowindexstride() > 0) { sargs = opts.getSearchArgument(); sargsApplier.reset(new SargsApplier(*contents->schema, sargs.get(), footer->rowindexstride(), getWriterVersionImpl(_contents.get()), @@ -917,6 +918,37 @@ namespace orc { return std::make_unique<RowReaderImpl>(contents, opts, filter, stringDictFilter); } + std::vector<int> ReaderImpl::getNeedReadStripes(const RowReaderOptions& opts) { + if (opts.getSearchArgument() && !isMetadataLoaded) { + // load stripe statistics for PPD + readMetadata(); + } + + std::vector<int> allStripesNeeded(numberOfStripes,1); + + if (opts.getSearchArgument() && footer->rowindexstride() > 0) { + auto sargs = opts.getSearchArgument(); + sargsApplier.reset(new SargsApplier(*contents->schema, sargs.get(), footer->rowindexstride(), + getWriterVersionImpl(contents.get()), + contents->readerMetrics)); + + if (sargsApplier == nullptr || contents->metadata == nullptr) { + return allStripesNeeded; + } + + for ( uint64_t currentStripeIndex = 0;currentStripeIndex < numberOfStripes ; currentStripeIndex ++) { + const auto& currentStripeStats = + contents->metadata->stripestats(static_cast<int>(currentStripeIndex)); + //Not need add mMetrics,so use 0. + allStripesNeeded[currentStripeIndex] = sargsApplier->evaluateStripeStatistics(currentStripeStats, 0);; + } + contents->sargsApplier = std::move(sargsApplier); + } + return allStripesNeeded; + } + + + uint64_t maxStreamsForType(const proto::Type& type) { switch (static_cast<int64_t>(type.kind())) { case proto::Type_Kind_STRUCT: @@ -1126,29 +1158,38 @@ namespace orc { << ", footerLength=" << currentStripeInfo.footerlength() << ")"; throw ParseError(msg.str()); } - currentStripeFooter = getStripeFooter(currentStripeInfo, *contents.get()); - rowsInCurrentStripe = currentStripeInfo.numberofrows(); - processingStripe = currentStripe; - - std::unique_ptr<StripeInformation> currentStripeInformation(new StripeInformationImpl( - currentStripeInfo.offset(), currentStripeInfo.indexlength(), - currentStripeInfo.datalength(), currentStripeInfo.footerlength(), - currentStripeInfo.numberofrows(), contents->stream.get(), *contents->pool, - contents->compression, contents->blockSize, contents->readerMetrics)); - contents->stream->beforeReadStripe(std::move(currentStripeInformation), selectedColumns); if (sargsApplier) { bool isStripeNeeded = true; if (contents->metadata) { - const auto& currentStripeStats = - contents->metadata->stripestats(static_cast<int>(currentStripe)); + const auto ¤tStripeStats = + contents->metadata->stripestats(static_cast<int>(currentStripe)); // skip this stripe after stats fail to satisfy sargs uint64_t stripeRowGroupCount = - (rowsInCurrentStripe + footer->rowindexstride() - 1) / footer->rowindexstride(); + (rowsInCurrentStripe + footer->rowindexstride() - 1) / footer->rowindexstride(); isStripeNeeded = - sargsApplier->evaluateStripeStatistics(currentStripeStats, stripeRowGroupCount); + sargsApplier->evaluateStripeStatistics(currentStripeStats, stripeRowGroupCount); } + if (!isStripeNeeded) { + // advance to next stripe when current stripe has no matching rows + currentStripe += 1; + currentRowInStripe = 0; + continue; + } + } + currentStripeFooter = getStripeFooter(currentStripeInfo, *contents.get()); + rowsInCurrentStripe = currentStripeInfo.numberofrows(); + processingStripe = currentStripe; + std::unique_ptr<StripeInformation> currentStripeInformation(new StripeInformationImpl( + currentStripeInfo.offset(), currentStripeInfo.indexlength(), + currentStripeInfo.datalength(), currentStripeInfo.footerlength(), + currentStripeInfo.numberofrows(), contents->stream.get(), *contents->pool, + contents->compression, contents->blockSize, contents->readerMetrics)); + contents->stream->beforeReadStripe(std::move(currentStripeInformation), selectedColumns); + + if (sargsApplier) { + bool isStripeNeeded = true; if (isStripeNeeded) { // read row group statistics and bloom filters of current stripe loadStripeIndex(); diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh index c0f891ef27c..9505022c558 100644 --- a/c++/src/Reader.hh +++ b/c++/src/Reader.hh @@ -100,6 +100,7 @@ namespace orc { bool isDecimalAsLong; std::unique_ptr<proto::Metadata> metadata; ReaderMetrics* readerMetrics; + std::unique_ptr<SargsApplier> sargsApplier; }; proto::StripeFooter getStripeFooter(const proto::StripeInformation& info, @@ -314,6 +315,8 @@ namespace orc { // footer proto::Footer* footer; uint64_t numberOfStripes; + std::unique_ptr<SargsApplier> sargsApplier; + std::vector<int> getNeedReadStripes(const RowReaderOptions& opts) override; uint64_t getMemoryUse(int stripeIx, std::vector<bool>& selectedColumns); // internal methods @@ -418,10 +421,14 @@ namespace orc { return contents->schema.get(); } - InputStream* getStream() const { + InputStream* getStream() const override { return contents->stream.get(); } + void setStream(std::unique_ptr<InputStream> inputStreamUPtr) override{ + contents->stream = std::move(inputStreamUPtr); + } + uint64_t getMemoryUse(int stripeIx = -1) override; uint64_t getMemoryUseByFieldId(const std::list<uint64_t>& include, int stripeIx = -1) override; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org