mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
Add OUT_OF_MEMORY status.
make sure that low-level allocators throw bad_alloc to allow handling oom situations. database level does not yet incorporate any OOM logic.
This commit is contained in:
parent
b197e2c78e
commit
d5281721bd
5 changed files with 48 additions and 14 deletions
|
@ -205,7 +205,11 @@ struct TL {
|
||||||
thread_local TL tl;
|
thread_local TL tl;
|
||||||
|
|
||||||
constexpr bool kUseSmallStrings = true;
|
constexpr bool kUseSmallStrings = true;
|
||||||
constexpr bool kUseAsciiEncoding = true;
|
|
||||||
|
/// TODO: Ascii encoding becomes slow for large blobs. We should factor it out into a separate
|
||||||
|
/// file and implement with SIMD instructions.
|
||||||
|
constexpr bool kUseAsciiEncoding = false;
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
static_assert(sizeof(CompactObj) == 18);
|
static_assert(sizeof(CompactObj) == 18);
|
||||||
|
|
|
@ -63,8 +63,12 @@ class SegmentAllocator {
|
||||||
};
|
};
|
||||||
|
|
||||||
inline auto SegmentAllocator::Allocate(uint32_t size) -> std::pair<Ptr, uint8_t*> {
|
inline auto SegmentAllocator::Allocate(uint32_t size) -> std::pair<Ptr, uint8_t*> {
|
||||||
uint64_t ptr = (uint64_t)mi_heap_malloc(heap_, size);
|
void* ptr = mi_heap_malloc(heap_, size);
|
||||||
uint64_t seg_ptr = ptr & kSegmentAlignMask;
|
if (!ptr)
|
||||||
|
throw std::bad_alloc{};
|
||||||
|
|
||||||
|
uint64_t iptr = (uint64_t)ptr;
|
||||||
|
uint64_t seg_ptr = iptr & kSegmentAlignMask;
|
||||||
|
|
||||||
// could be speed up using last used seg_ptr.
|
// could be speed up using last used seg_ptr.
|
||||||
auto [it, inserted] = rev_indx_.emplace(seg_ptr, address_table_.size());
|
auto [it, inserted] = rev_indx_.emplace(seg_ptr, address_table_.size());
|
||||||
|
@ -73,7 +77,7 @@ inline auto SegmentAllocator::Allocate(uint32_t size) -> std::pair<Ptr, uint8_t*
|
||||||
address_table_.push_back((uint8_t*)seg_ptr);
|
address_table_.push_back((uint8_t*)seg_ptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ptr res = (((ptr - seg_ptr) / 8) << kSegmentIdBits) | it->second;
|
Ptr res = (((iptr - seg_ptr) / 8) << kSegmentIdBits) | it->second;
|
||||||
used_ += mi_good_size(size);
|
used_ += mi_good_size(size);
|
||||||
|
|
||||||
return std::make_pair(res, (uint8_t*)ptr);
|
return std::make_pair(res, (uint8_t*)ptr);
|
||||||
|
|
|
@ -17,6 +17,7 @@ enum class OpStatus : uint16_t {
|
||||||
OUT_OF_RANGE,
|
OUT_OF_RANGE,
|
||||||
WRONG_TYPE,
|
WRONG_TYPE,
|
||||||
TIMED_OUT,
|
TIMED_OUT,
|
||||||
|
OUT_OF_MEMORY,
|
||||||
};
|
};
|
||||||
|
|
||||||
class OpResultBase {
|
class OpResultBase {
|
||||||
|
|
|
@ -325,6 +325,15 @@ TEST_F(DflyEngineTest, Memcache) {
|
||||||
EXPECT_THAT(resp, ElementsAre("END"));
|
EXPECT_THAT(resp, ElementsAre("END"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(DflyEngineTest, LimitMemory) {
|
||||||
|
mi_option_enable(mi_option_limit_os_alloc);
|
||||||
|
string blob(1 << 15, 'a');
|
||||||
|
for (size_t i = 0; i < 1000; ++i) {
|
||||||
|
auto resp = Run({"set", absl::StrCat(blob, i), blob});
|
||||||
|
ASSERT_THAT(resp, RespEq("OK"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: to test transactions with a single shard since then all transactions become local.
|
// TODO: to test transactions with a single shard since then all transactions become local.
|
||||||
// To consider having a parameter in dragonfly engine controlling number of shards
|
// To consider having a parameter in dragonfly engine controlling number of shards
|
||||||
// unconditionally from number of cpus. TO TEST BLPOP under multi for single/multi argument case.
|
// unconditionally from number of cpus. TO TEST BLPOP under multi for single/multi argument case.
|
||||||
|
|
|
@ -24,7 +24,6 @@ std::atomic_uint64_t op_seq{1};
|
||||||
|
|
||||||
[[maybe_unused]] constexpr size_t kTransSize = sizeof(Transaction);
|
[[maybe_unused]] constexpr size_t kTransSize = sizeof(Transaction);
|
||||||
|
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
struct Transaction::FindFirstProcessor {
|
struct Transaction::FindFirstProcessor {
|
||||||
|
@ -359,16 +358,25 @@ bool Transaction::RunInShard(EngineShard* shard) {
|
||||||
|
|
||||||
/*************************************************************************/
|
/*************************************************************************/
|
||||||
// Actually running the callback.
|
// Actually running the callback.
|
||||||
OpStatus status = cb_(this, shard);
|
try {
|
||||||
/*************************************************************************/
|
OpStatus status = cb_(this, shard);
|
||||||
|
|
||||||
if (unique_shard_cnt_ == 1) {
|
if (unique_shard_cnt_ == 1) {
|
||||||
cb_ = nullptr; // We can do it because only a single thread runs the callback.
|
cb_ = nullptr; // We can do it because only a single thread runs the callback.
|
||||||
local_result_ = status;
|
local_result_ = status;
|
||||||
} else {
|
} else {
|
||||||
CHECK_EQ(OpStatus::OK, status);
|
CHECK_EQ(OpStatus::OK, status);
|
||||||
|
}
|
||||||
|
} catch (std::bad_alloc&) {
|
||||||
|
// TODO: to log at most once per sec.
|
||||||
|
LOG_FIRST_N(ERROR, 16) << " out of memory";
|
||||||
|
local_result_ = OpStatus::OUT_OF_MEMORY;
|
||||||
|
} catch (std::exception& e) {
|
||||||
|
LOG(FATAL) << "Unexpected exception " << e.what();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*************************************************************************/
|
||||||
|
|
||||||
// at least the coordinator thread owns the reference.
|
// at least the coordinator thread owns the reference.
|
||||||
DCHECK_GE(use_count(), 1u);
|
DCHECK_GE(use_count(), 1u);
|
||||||
|
|
||||||
|
@ -532,7 +540,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
|
||||||
// single hop -> concluding.
|
// single hop -> concluding.
|
||||||
coordinator_state_ |= (COORD_EXEC | COORD_EXEC_CONCLUDING);
|
coordinator_state_ |= (COORD_EXEC | COORD_EXEC_CONCLUDING);
|
||||||
|
|
||||||
if (!multi_) { // for non-multi transactions we schedule exactly once.
|
if (!multi_) { // for non-multi transactions we schedule exactly once.
|
||||||
DCHECK_EQ(0, coordinator_state_ & COORD_SCHED);
|
DCHECK_EQ(0, coordinator_state_ & COORD_SCHED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -765,7 +773,15 @@ void Transaction::RunQuickie(EngineShard* shard) {
|
||||||
DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id() << " " << args_[0];
|
DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id() << " " << args_[0];
|
||||||
CHECK(cb_) << DebugId() << " " << shard->shard_id() << " " << args_[0];
|
CHECK(cb_) << DebugId() << " " << shard->shard_id() << " " << args_[0];
|
||||||
|
|
||||||
local_result_ = cb_(this, shard);
|
// Calling the callback in somewhat safe way
|
||||||
|
try {
|
||||||
|
local_result_ = cb_(this, shard);
|
||||||
|
} catch (std::bad_alloc&) {
|
||||||
|
LOG_FIRST_N(ERROR, 16) << " out of memory";
|
||||||
|
local_result_ = OpStatus::OUT_OF_MEMORY;
|
||||||
|
} catch (std::exception& e) {
|
||||||
|
LOG(FATAL) << "Unexpected exception " << e.what();
|
||||||
|
}
|
||||||
|
|
||||||
sd.local_mask &= ~ARMED;
|
sd.local_mask &= ~ARMED;
|
||||||
cb_ = nullptr; // We can do it because only a single shard runs the callback.
|
cb_ = nullptr; // We can do it because only a single shard runs the callback.
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue