diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index c61506b28..82f80f8b4 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -205,7 +205,11 @@ struct TL { thread_local TL tl; constexpr bool kUseSmallStrings = true; -constexpr bool kUseAsciiEncoding = true; + +/// TODO: Ascii encoding becomes slow for large blobs. We should factor it out into a separate +/// file and implement with SIMD instructions. +constexpr bool kUseAsciiEncoding = false; + } // namespace static_assert(sizeof(CompactObj) == 18); diff --git a/src/core/segment_allocator.h b/src/core/segment_allocator.h index b252e986b..05e89a9ca 100644 --- a/src/core/segment_allocator.h +++ b/src/core/segment_allocator.h @@ -63,8 +63,12 @@ class SegmentAllocator { }; inline auto SegmentAllocator::Allocate(uint32_t size) -> std::pair { - uint64_t ptr = (uint64_t)mi_heap_malloc(heap_, size); - uint64_t seg_ptr = ptr & kSegmentAlignMask; + void* ptr = mi_heap_malloc(heap_, size); + if (!ptr) + throw std::bad_alloc{}; + + uint64_t iptr = (uint64_t)ptr; + uint64_t seg_ptr = iptr & kSegmentAlignMask; // could be speed up using last used seg_ptr. auto [it, inserted] = rev_indx_.emplace(seg_ptr, address_table_.size()); @@ -73,7 +77,7 @@ inline auto SegmentAllocator::Allocate(uint32_t size) -> std::pairsecond; + Ptr res = (((iptr - seg_ptr) / 8) << kSegmentIdBits) | it->second; used_ += mi_good_size(size); return std::make_pair(res, (uint8_t*)ptr); diff --git a/src/facade/op_status.h b/src/facade/op_status.h index df5ea4e1d..cf1f9312a 100644 --- a/src/facade/op_status.h +++ b/src/facade/op_status.h @@ -17,6 +17,7 @@ enum class OpStatus : uint16_t { OUT_OF_RANGE, WRONG_TYPE, TIMED_OUT, + OUT_OF_MEMORY, }; class OpResultBase { diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index 4279a6fd2..2ba45e3d0 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -325,6 +325,15 @@ TEST_F(DflyEngineTest, Memcache) { EXPECT_THAT(resp, ElementsAre("END")); } +TEST_F(DflyEngineTest, LimitMemory) { + mi_option_enable(mi_option_limit_os_alloc); + string blob(1 << 15, 'a'); + for (size_t i = 0; i < 1000; ++i) { + auto resp = Run({"set", absl::StrCat(blob, i), blob}); + ASSERT_THAT(resp, RespEq("OK")); + } +} + // TODO: to test transactions with a single shard since then all transactions become local. // To consider having a parameter in dragonfly engine controlling number of shards // unconditionally from number of cpus. TO TEST BLPOP under multi for single/multi argument case. diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 823dcfbc7..e5b3f8447 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -24,7 +24,6 @@ std::atomic_uint64_t op_seq{1}; [[maybe_unused]] constexpr size_t kTransSize = sizeof(Transaction); - } // namespace struct Transaction::FindFirstProcessor { @@ -359,16 +358,25 @@ bool Transaction::RunInShard(EngineShard* shard) { /*************************************************************************/ // Actually running the callback. - OpStatus status = cb_(this, shard); - /*************************************************************************/ + try { + OpStatus status = cb_(this, shard); - if (unique_shard_cnt_ == 1) { - cb_ = nullptr; // We can do it because only a single thread runs the callback. - local_result_ = status; - } else { - CHECK_EQ(OpStatus::OK, status); + if (unique_shard_cnt_ == 1) { + cb_ = nullptr; // We can do it because only a single thread runs the callback. + local_result_ = status; + } else { + CHECK_EQ(OpStatus::OK, status); + } + } catch (std::bad_alloc&) { + // TODO: to log at most once per sec. + LOG_FIRST_N(ERROR, 16) << " out of memory"; + local_result_ = OpStatus::OUT_OF_MEMORY; + } catch (std::exception& e) { + LOG(FATAL) << "Unexpected exception " << e.what(); } + /*************************************************************************/ + // at least the coordinator thread owns the reference. DCHECK_GE(use_count(), 1u); @@ -532,7 +540,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { // single hop -> concluding. coordinator_state_ |= (COORD_EXEC | COORD_EXEC_CONCLUDING); - if (!multi_) { // for non-multi transactions we schedule exactly once. + if (!multi_) { // for non-multi transactions we schedule exactly once. DCHECK_EQ(0, coordinator_state_ & COORD_SCHED); } @@ -765,7 +773,15 @@ void Transaction::RunQuickie(EngineShard* shard) { DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id() << " " << args_[0]; CHECK(cb_) << DebugId() << " " << shard->shard_id() << " " << args_[0]; - local_result_ = cb_(this, shard); + // Calling the callback in somewhat safe way + try { + local_result_ = cb_(this, shard); + } catch (std::bad_alloc&) { + LOG_FIRST_N(ERROR, 16) << " out of memory"; + local_result_ = OpStatus::OUT_OF_MEMORY; + } catch (std::exception& e) { + LOG(FATAL) << "Unexpected exception " << e.what(); + } sd.local_mask &= ~ARMED; cb_ = nullptr; // We can do it because only a single shard runs the callback.