This is an automated email from the ASF dual-hosted git repository. zhaoc pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new c617fc9 Fix the flush_status bug in flush-executor (#2933) c617fc9 is described below commit c617fc9064cc8475b7c6a5ecbe540e3a88e05663 Author: LingBin <lingbi...@gmail.com> AuthorDate: Wed Feb 19 06:23:19 2020 -0600 Fix the flush_status bug in flush-executor (#2933) For a tablet, there may be multiple memtables, which will be flushed to disk one by one in the order of generation. If a memtable flush fails, then the load job will definitely fail, but the previous implementation will overwrite `_flush_status`, which may make the error can not be detected, leads to an error load job to be success. This patch also have two other changes: 1. Use `std::bind` to replace `boost::bind`; 2. Removes some unneeded headers. --- be/src/olap/memtable_flush_executor.cpp | 44 ++++++++++++++++++--------------- be/src/olap/memtable_flush_executor.h | 23 +++++++++++------ be/src/olap/tablet_sync_service.cpp | 2 +- be/src/util/threadpool.cpp | 21 ++++++++-------- be/src/util/threadpool.h | 12 ++++----- be/test/olap/delta_writer_test.cpp | 21 ++++++++-------- be/test/util/threadpool_test.cpp | 28 ++++++++++----------- 7 files changed, 81 insertions(+), 70 deletions(-) diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index 00bf34e..e3f6c90 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -19,11 +19,7 @@ #include <functional> -#include "olap/data_dir.h" -#include "olap/delta_writer.h" #include "olap/memtable.h" -#include "runtime/exec_env.h" -#include "runtime/mem_tracker.h" #include "util/scoped_cleanup.h" namespace doris { @@ -34,36 +30,44 @@ std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) { return os; } -OLAPStatus FlushToken::submit(std::shared_ptr<MemTable> memtable) { - _flush_token->submit_func(boost::bind(boost::mem_fn(&FlushToken::_flush_memtable), this, memtable)); +// The type of parameter is safe to be a reference. Because the function object +// returned by std::bind() will increase the reference count of Memtable. i.e., +// after the submit() method returns, even if the caller immediately releases the +// passed shared_ptr object, the Memtable object will not be destructed because +// its reference count is not 0. +OLAPStatus FlushToken::submit(const std::shared_ptr<MemTable>& memtable) { + RETURN_NOT_OK(_flush_status.load()); + _flush_token->submit_func(std::bind(&FlushToken::_flush_memtable, this, memtable)); return OLAP_SUCCESS; } -void FlushToken::cancel() { +void FlushToken::cancel() { _flush_token->shutdown(); } -OLAPStatus FlushToken::wait() { +OLAPStatus FlushToken::wait() { _flush_token->wait(); - return _flush_status; + return _flush_status.load(); } void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable) { + SCOPED_CLEANUP({ memtable.reset(); }); + + // If previous flush has failed, return directly + if (_flush_status.load() != OLAP_SUCCESS) { + return; + } + MonotonicStopWatch timer; timer.start(); - _flush_status = memtable->flush(); - SCOPED_CLEANUP({ - memtable.reset(); - }); - if (_flush_status != OLAP_SUCCESS) { + _flush_status.store(memtable->flush()); + if (_flush_status.load() != OLAP_SUCCESS) { return; } _stats.flush_time_ns += timer.elapsed_time(); _stats.flush_count++; _stats.flush_size_bytes += memtable->memory_usage(); - LOG(INFO) << "flushed " << *(memtable) << " in " << _stats.flush_time_ns / 1000 / 1000 - << " ms, status=" << _flush_status; } void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) { @@ -71,12 +75,12 @@ void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) { size_t min_threads = std::max(1, config::flush_thread_num_per_store); size_t max_threads = data_dir_num * min_threads; ThreadPoolBuilder("MemTableFlushThreadPool") - .set_min_threads(min_threads) - .set_max_threads(max_threads) - .build(&_flush_pool); + .set_min_threads(min_threads) + .set_max_threads(max_threads) + .build(&_flush_pool); } -// create a flush token +// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order. OLAPStatus MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>* flush_token) { flush_token->reset(new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL))); return OLAP_SUCCESS; diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index bb58e3f..b3d0cbe 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -17,10 +17,9 @@ #pragma once -#include <cstdint> +#include <atomic> #include <memory> #include <vector> -#include <utility> #include "olap/olap_define.h" #include "util/threadpool.h" @@ -43,13 +42,19 @@ struct FlushStatistic { std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat); // A thin wrapper of ThreadPoolToken to submit task. +// For a tablet, there may be multiple memtables, which will be flushed to disk +// one by one in the order of generation. +// If a memtable flush fails, then: +// 1. Immediately disallow submission of any subsequent memtable +// 2. For the memtables that have already been submitted, there is no need to flush, +// because the entire job will definitely fail; class FlushToken { public: - explicit FlushToken(std::unique_ptr<ThreadPoolToken> flush_pool_token) - : _flush_status(OLAP_SUCCESS), - _flush_token(std::move(flush_pool_token)) {} + explicit FlushToken(std::unique_ptr<ThreadPoolToken> flush_pool_token) : + _flush_token(std::move(flush_pool_token)), + _flush_status(OLAP_SUCCESS) { } - OLAPStatus submit(std::shared_ptr<MemTable> mem_table); + OLAPStatus submit(const std::shared_ptr<MemTable>& mem_table); // error has happpens, so we cancel this token // And remove all tasks in the queue. @@ -64,8 +69,12 @@ public: private: void _flush_memtable(std::shared_ptr<MemTable> mem_table); - OLAPStatus _flush_status; std::unique_ptr<ThreadPoolToken> _flush_token; + + // Records the current flush status of the tablet. + // Note: Once its value is set to Failed, it cannot return to SUCCESS. + std::atomic<OLAPStatus> _flush_status; + FlushStatistic _stats; }; diff --git a/be/src/olap/tablet_sync_service.cpp b/be/src/olap/tablet_sync_service.cpp index 329a67d..e61c67a 100644 --- a/be/src/olap/tablet_sync_service.cpp +++ b/be/src/olap/tablet_sync_service.cpp @@ -152,4 +152,4 @@ void TabletSyncService::_push_tablet_meta_thread(std::vector<PushTabletMetaTask> return; } -} // doris \ No newline at end of file +} // doris diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp index cada3c2..582f224 100644 --- a/be/src/util/threadpool.cpp +++ b/be/src/util/threadpool.cpp @@ -36,23 +36,22 @@ using strings::Substitute; class FunctionRunnable : public Runnable { public: - explicit FunctionRunnable(boost::function<void()> func) - : _func(std::move(func)) {} + explicit FunctionRunnable(std::function<void()> func) : _func(std::move(func)) {} void run() OVERRIDE { _func(); } private: - boost::function<void()> _func; + std::function<void()> _func; }; -ThreadPoolBuilder::ThreadPoolBuilder(string name) - : _name(std::move(name)), - _min_threads(0), - _max_threads(base::NumCPUs()), - _max_queue_size(std::numeric_limits<int>::max()), - _idle_timeout(MonoDelta::FromMilliseconds(500)) {} +ThreadPoolBuilder::ThreadPoolBuilder(string name) : + _name(std::move(name)), + _min_threads(0), + _max_threads(base::NumCPUs()), + _max_queue_size(std::numeric_limits<int>::max()), + _idle_timeout(MonoDelta::FromMilliseconds(500)) {} ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) { CHECK_GE(min_threads, 0); @@ -99,7 +98,7 @@ Status ThreadPoolToken::submit(std::shared_ptr<Runnable> r) { return _pool->do_submit(std::move(r), this); } -Status ThreadPoolToken::submit_func(boost::function<void()> f) { +Status ThreadPoolToken::submit_func(std::function<void()> f) { return submit(std::make_shared<FunctionRunnable>(std::move(f))); } @@ -353,7 +352,7 @@ Status ThreadPool::submit(std::shared_ptr<Runnable> r) { return do_submit(std::move(r), _tokenless.get()); } -Status ThreadPool::submit_func(boost::function<void()> f) { +Status ThreadPool::submit_func(std::function<void()> f) { return submit(std::make_shared<FunctionRunnable>(std::move(f))); } diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h index b13f396..af60a69 100644 --- a/be/src/util/threadpool.h +++ b/be/src/util/threadpool.h @@ -19,12 +19,12 @@ #define DORIS_BE_SRC_UTIL_THREAD_POOL_H #include <deque> +#include <functional> #include <memory> #include <utility> #include <unordered_set> #include <string> -#include "boost/function.hpp" #include <boost/intrusive/list.hpp> #include <boost/intrusive/list_hook.hpp> @@ -150,7 +150,7 @@ private: // .set_idle_timeout(MonoDelta::FromMilliseconds(2000)) // .Build(&thread_pool)); // thread_pool->Submit(shared_ptr<Runnable>(new Task())); -// thread_pool->SubmitFunc(boost::bind(&Func, 10)); +// thread_pool->SubmitFunc(std::bind(&Func, 10)); class ThreadPool { public: ~ThreadPool(); @@ -166,8 +166,8 @@ public: // Submits a Runnable class. Status submit(std::shared_ptr<Runnable> r); - // Submits a function bound using boost::bind(&FuncName, args...). - Status submit_func(boost::function<void()> f); + // Submits a function bound using std::bind(&FuncName, args...). + Status submit_func(std::function<void()> f); // Waits until all the tasks are completed. void wait(); @@ -341,8 +341,8 @@ public: // Submits a Runnable class. Status submit(std::shared_ptr<Runnable> r); - // Submits a function bound using boost::bind(&FuncName, args...). - Status submit_func(boost::function<void()> f); + // Submits a function bound using std::bind(&FuncName, args...). + Status submit_func(std::function<void()> f); // Marks the token as unusable for future submissions. Any queued tasks not // yet running are destroyed. If tasks are in flight, Shutdown() will wait diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index c4175f1..537433f 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -310,7 +310,7 @@ TEST_F(TestDeltaWriter, open) { WriteRequest write_req = {10003, 270068375, WriteType::LOAD, 20001, 30001, load_id, false, tuple_desc}; DeltaWriter* delta_writer = nullptr; - DeltaWriter::open(&write_req, k_mem_tracker, &delta_writer); + DeltaWriter::open(&write_req, k_mem_tracker, &delta_writer); ASSERT_NE(delta_writer, nullptr); res = delta_writer->close(); ASSERT_EQ(OLAP_SUCCESS, res); @@ -345,7 +345,7 @@ TEST_F(TestDeltaWriter, write) { 20002, 30002, load_id, false, tuple_desc, &(tuple_desc->slots())}; DeltaWriter* delta_writer = nullptr; - DeltaWriter::open(&write_req, k_mem_tracker, &delta_writer); + DeltaWriter::open(&write_req, k_mem_tracker, &delta_writer); ASSERT_NE(delta_writer, nullptr); MemTracker tracker; @@ -362,18 +362,18 @@ TEST_F(TestDeltaWriter, write) { int128_t large_int_value = -90000; memcpy(tuple->get_slot(slots[4]->tuple_offset()), &large_int_value, sizeof(int128_t)); - ((DateTimeValue*)(tuple->get_slot(slots[5]->tuple_offset())))->from_date_str("2048-11-10", 10); - ((DateTimeValue*)(tuple->get_slot(slots[6]->tuple_offset())))->from_date_str("2636-08-16 19:39:43", 19); + ((DateTimeValue*)(tuple->get_slot(slots[5]->tuple_offset())))->from_date_str("2048-11-10", 10); + ((DateTimeValue*)(tuple->get_slot(slots[6]->tuple_offset())))->from_date_str("2636-08-16 19:39:43", 19); StringValue* char_ptr = (StringValue*)(tuple->get_slot(slots[7]->tuple_offset())); char_ptr->ptr = (char*)pool.allocate(4); memcpy(char_ptr->ptr, "abcd", 4); - char_ptr->len = 4; + char_ptr->len = 4; StringValue* var_ptr = (StringValue*)(tuple->get_slot(slots[8]->tuple_offset())); var_ptr->ptr = (char*)pool.allocate(5); memcpy(var_ptr->ptr, "abcde", 5); - var_ptr->len = 5; + var_ptr->len = 5; DecimalValue decimal_value(1.1); *(DecimalValue*)(tuple->get_slot(slots[9]->tuple_offset())) = decimal_value; @@ -385,13 +385,13 @@ TEST_F(TestDeltaWriter, write) { memcpy(tuple->get_slot(slots[14]->tuple_offset()), &large_int_value, sizeof(int128_t)); - ((DateTimeValue*)(tuple->get_slot(slots[15]->tuple_offset())))->from_date_str("2048-11-10", 10); - ((DateTimeValue*)(tuple->get_slot(slots[16]->tuple_offset())))->from_date_str("2636-08-16 19:39:43", 19); + ((DateTimeValue*)(tuple->get_slot(slots[15]->tuple_offset())))->from_date_str("2048-11-10", 10); + ((DateTimeValue*)(tuple->get_slot(slots[16]->tuple_offset())))->from_date_str("2636-08-16 19:39:43", 19); char_ptr = (StringValue*)(tuple->get_slot(slots[17]->tuple_offset())); char_ptr->ptr = (char*)pool.allocate(4); memcpy(char_ptr->ptr, "abcd", 4); - char_ptr->len = 4; + char_ptr->len = 4; var_ptr = (StringValue*)(tuple->get_slot(slots[18]->tuple_offset())); var_ptr->ptr = (char*)pool.allocate(5); @@ -425,7 +425,7 @@ TEST_F(TestDeltaWriter, write) { std::cout << "start to publish txn" << std::endl; RowsetSharedPtr rowset = tablet_rs.second; res = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, - write_req.tablet_id, write_req.schema_hash, tablet_rs.first.tablet_uid, + write_req.tablet_id, write_req.schema_hash, tablet_rs.first.tablet_uid, version, version_hash); ASSERT_EQ(OLAP_SUCCESS, res); std::cout << "start to add inc rowset:" << rowset->rowset_id() << ", num rows:" << rowset->num_rows() @@ -441,6 +441,7 @@ TEST_F(TestDeltaWriter, write) { auto schema_hash = 270068375; res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash); ASSERT_EQ(OLAP_SUCCESS, res); + delete delta_writer; } } // namespace doris diff --git a/be/test/util/threadpool_test.cpp b/be/test/util/threadpool_test.cpp index b752de3..f3ea989 100644 --- a/be/test/util/threadpool_test.cpp +++ b/be/test/util/threadpool_test.cpp @@ -33,7 +33,6 @@ #include <gflags/gflags_declare.h> #include <gtest/gtest.h> -#include <boost/bind.hpp> #include "common/logging.h" #include "common/status.h" #include "gutil/atomicops.h" @@ -119,9 +118,9 @@ TEST_F(ThreadPoolTest, TestSimpleTasks) { std::atomic<int32_t> counter(0); std::shared_ptr<Runnable> task(new SimpleTask(15, &counter)); - ASSERT_TRUE(_pool->submit_func(boost::bind(&simple_task_method, 10, &counter)).ok()); + ASSERT_TRUE(_pool->submit_func(std::bind(&simple_task_method, 10, &counter)).ok()); ASSERT_TRUE(_pool->submit(task).ok()); - ASSERT_TRUE(_pool->submit_func(boost::bind(&simple_task_method, 20, &counter)).ok()); + ASSERT_TRUE(_pool->submit_func(std::bind(&simple_task_method, 20, &counter)).ok()); ASSERT_TRUE(_pool->submit(task).ok()); _pool->wait(); ASSERT_EQ(10 + 15 + 20 + 15, counter.load()); @@ -219,16 +218,19 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithNoMaxThreads) { TEST_F(ThreadPoolTest, TestRace) { alarm(60); auto cleanup = MakeScopedCleanup([]() { - alarm(0); // Disable alarm on test exit. - }); + alarm(0); // Disable alarm on test exit. + }); ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName) - .set_min_threads(0) - .set_max_threads(1) - .set_idle_timeout(MonoDelta::FromMicroseconds(1))).ok()); + .set_min_threads(0) + .set_max_threads(1) + .set_idle_timeout(MonoDelta::FromMicroseconds(1))).ok()); for (int i = 0; i < 500; i++) { CountDownLatch l(1); - ASSERT_TRUE(_pool->submit_func(boost::bind(&CountDownLatch::count_down, &l)).ok()); + // CountDownLatch::count_down has multiple overloaded version, + // so an cast is needed to use std::bind + ASSERT_TRUE(_pool->submit_func( + std::bind((void (CountDownLatch::*)())(&CountDownLatch::count_down), &l)).ok()); l.wait(); // Sleeping a different amount in each iteration makes it more likely to hit // the bug. @@ -303,7 +305,6 @@ TEST_F(ThreadPoolTest, TestZeroQueueSize) { _pool->shutdown(); } -/* // Test that a thread pool will crash if asked to run its own blocking // functions in a pool thread. // @@ -319,20 +320,17 @@ TEST_F(ThreadPoolTest, TestDeadlocks) { const char* death_msg = "called pool function that would result in deadlock"; ASSERT_DEATH({ ASSERT_TRUE(rebuild_pool_with_min_max(1, 1).ok()); - ASSERT_TRUE(_pool->submit_func( - Bind(&ThreadPool::shutdown, Unretained(_pool.get()))).ok()); + ASSERT_TRUE(_pool->submit_func(std::bind((&ThreadPool::shutdown), _pool.get())).ok()); _pool->wait(); }, death_msg); ASSERT_DEATH({ ASSERT_TRUE(rebuild_pool_with_min_max(1, 1).ok()); - ASSERT_TRUE(_pool->submit_func( - Bind(&ThreadPool::ok(), Unretained(_pool.get()))).ok()); + ASSERT_TRUE(_pool->submit_func(std::bind(&ThreadPool::wait, _pool.get())).ok()); _pool->wait(); }, death_msg); } #endif -*/ class SlowDestructorRunnable : public Runnable { public: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org