chore: retire TEST_EnableHeartBeat (#3435)

Now unit tests will run the same Hearbeat fiber like in prod.
The whole feature was redundant, with just few explicit settings of maxmemory_limit
I succeeeded to make all unit tests pass.

In addition, this change allows passing a global handler that is called by heartbeat from a single thread.
This is not used yet - preparation for the next PR to break hung up replication connections on a master.

Finally, this change has some non-functional clean-ups and warning fixes to improve code quality.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-08-03 20:17:23 +03:00 committed by GitHub
parent 82298b8122
commit c9ed3f7b2b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 56 additions and 100 deletions

View file

@ -897,7 +897,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
ClearPipelinedMessages(); ClearPipelinedMessages();
DCHECK(dispatch_q_.empty()); DCHECK(dispatch_q_.empty());
service_->OnClose(cc_.get()); service_->OnConnectionClose(cc_.get());
DecreaseStatsOnClose(); DecreaseStatsOnClose();
// We wait for dispatch_fb to finish writing the previous replies before replying to the last // We wait for dispatch_fb to finish writing the previous replies before replying to the last

View file

@ -39,7 +39,7 @@ class ServiceInterface {
virtual void ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privileged) { virtual void ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privileged) {
} }
virtual void OnClose(ConnectionContext* cntx) { virtual void OnConnectionClose(ConnectionContext* cntx) {
} }
struct ContextInfo { struct ContextInfo {

View file

@ -52,7 +52,7 @@ void BlockingControllerTest::SetUp() {
}); });
shard_set = new EngineShardSet(pp_.get()); shard_set = new EngineShardSet(pp_.get());
shard_set->Init(kNumThreads, false); shard_set->Init(kNumThreads, nullptr);
trans_.reset(new Transaction{&cid_}); trans_.reset(new Transaction{&cid_});

View file

@ -116,7 +116,7 @@ OpResult<ShardFFResult> FindFirstNonEmpty(Transaction* trans, int req_obj_type)
return OpStatus::WRONG_TYPE; return OpStatus::WRONG_TYPE;
// Order result by their keys position in the command arguments, push errors to back // Order result by their keys position in the command arguments, push errors to back
auto comp = [trans](const OpResult<FFResult>& lhs, const OpResult<FFResult>& rhs) { auto comp = [](const OpResult<FFResult>& lhs, const OpResult<FFResult>& rhs) {
if (!lhs || !rhs) if (!lhs || !rhs)
return lhs.ok(); return lhs.ok();
size_t i1 = std::get<1>(*lhs); size_t i1 = std::get<1>(*lhs);

View file

@ -928,6 +928,7 @@ void DebugCmd::Stacktrace() {
std::unique_lock lk(m); std::unique_lock lk(m);
fb2::detail::FiberInterface::PrintAllFiberStackTraces(); fb2::detail::FiberInterface::PrintAllFiberStackTraces();
}); });
base::FlushLogs();
cntx_->SendOk(); cntx_->SendOk();
} }

View file

@ -186,8 +186,6 @@ bool RunEngine(ProactorPool* pool, AcceptServer* acceptor) {
listeners.push_back(listener.release()); listeners.push_back(listener.release());
} }
Service::InitOpts opts;
opts.disable_time_update = false;
const auto& bind = GetFlag(FLAGS_bind); const auto& bind = GetFlag(FLAGS_bind);
const char* bind_addr = bind.empty() ? nullptr : bind.c_str(); const char* bind_addr = bind.empty() ? nullptr : bind.c_str();
@ -292,7 +290,7 @@ bool RunEngine(ProactorPool* pool, AcceptServer* acceptor) {
listeners.push_back(listener.release()); listeners.push_back(listener.release());
} }
service.Init(acceptor, listeners, opts); service.Init(acceptor, listeners);
VersionMonitor version_monitor; VersionMonitor version_monitor;

View file

@ -660,7 +660,7 @@ std::vector<ReplicaRoleInfo> DflyCmd::GetReplicasRoleInfo() const {
lock_guard lk(mu_); lock_guard lk(mu_);
vec.reserve(replica_infos_.size()); vec.reserve(replica_infos_.size());
auto replication_lags = ReplicationLags(); auto replication_lags = ReplicationLagsLocked();
for (const auto& [id, info] : replica_infos_) { for (const auto& [id, info] : replica_infos_) {
LSN lag = replication_lags[id]; LSN lag = replication_lags[id];
@ -712,14 +712,13 @@ void DflyCmd::GetReplicationMemoryStats(ReplicationMemoryStats* stats) const {
pair<uint32_t, shared_ptr<DflyCmd::ReplicaInfo>> DflyCmd::GetReplicaInfoOrReply( pair<uint32_t, shared_ptr<DflyCmd::ReplicaInfo>> DflyCmd::GetReplicaInfoOrReply(
std::string_view id_str, RedisReplyBuilder* rb) { std::string_view id_str, RedisReplyBuilder* rb) {
unique_lock lk(mu_);
uint32_t sync_id; uint32_t sync_id;
if (!ToSyncId(id_str, &sync_id)) { if (!ToSyncId(id_str, &sync_id)) {
rb->SendError(kInvalidSyncId); rb->SendError(kInvalidSyncId);
return {0, nullptr}; return {0, nullptr};
} }
lock_guard lk(mu_);
auto sync_it = replica_infos_.find(sync_id); auto sync_it = replica_infos_.find(sync_id);
if (sync_it == replica_infos_.end()) { if (sync_it == replica_infos_.end()) {
rb->SendError(kIdNotFound); rb->SendError(kIdNotFound);
@ -729,7 +728,7 @@ pair<uint32_t, shared_ptr<DflyCmd::ReplicaInfo>> DflyCmd::GetReplicaInfoOrReply(
return {sync_id, sync_it->second}; return {sync_id, sync_it->second};
} }
std::map<uint32_t, LSN> DflyCmd::ReplicationLags() const { std::map<uint32_t, LSN> DflyCmd::ReplicationLagsLocked() const {
DCHECK(!mu_.try_lock()); // expects to be under global lock DCHECK(!mu_.try_lock()); // expects to be under global lock
if (replica_infos_.empty()) if (replica_infos_.empty())
return {}; return {};
@ -785,9 +784,6 @@ bool DflyCmd::CheckReplicaStateOrReply(const ReplicaInfo& repl_info, SyncState e
return true; return true;
} }
void DflyCmd::BreakOnShutdown() {
}
void DflyCmd::Shutdown() { void DflyCmd::Shutdown() {
ReplicaInfoMap pending; ReplicaInfoMap pending;
{ {

View file

@ -132,8 +132,6 @@ class DflyCmd {
void OnClose(ConnectionContext* cntx); void OnClose(ConnectionContext* cntx);
void BreakOnShutdown();
// Stop all background processes so we can exit in orderly manner. // Stop all background processes so we can exit in orderly manner.
void Shutdown(); void Shutdown();
@ -214,17 +212,17 @@ class DflyCmd {
bool CheckReplicaStateOrReply(const ReplicaInfo& ri, SyncState expected, bool CheckReplicaStateOrReply(const ReplicaInfo& ri, SyncState expected,
facade::RedisReplyBuilder* rb); facade::RedisReplyBuilder* rb);
private:
// Return a map between replication ID to lag. lag is defined as the maximum of difference // Return a map between replication ID to lag. lag is defined as the maximum of difference
// between the master's LSN and the last acknowledged LSN in over all shards. // between the master's LSN and the last acknowledged LSN in over all shards.
std::map<uint32_t, LSN> ReplicationLags() const; std::map<uint32_t, LSN> ReplicationLagsLocked() const;
private:
ServerFamily* sf_; // Not owned ServerFamily* sf_; // Not owned
uint32_t next_sync_id_ = 1; uint32_t next_sync_id_ = 1;
using ReplicaInfoMap = absl::btree_map<uint32_t, std::shared_ptr<ReplicaInfo>>; using ReplicaInfoMap = absl::btree_map<uint32_t, std::shared_ptr<ReplicaInfo>>;
ReplicaInfoMap replica_infos_; ReplicaInfoMap replica_infos_ ABSL_GUARDED_BY(mu_);
mutable util::fb2::Mutex mu_; // Guard global operations. See header top for locking levels. mutable util::fb2::Mutex mu_; // Guard global operations. See header top for locking levels.
}; };

View file

@ -402,7 +402,6 @@ TEST_F(DflyEngineTest, FlushAll) {
} }
TEST_F(DflyEngineTest, OOM) { TEST_F(DflyEngineTest, OOM) {
shard_set->TEST_EnableHeartBeat();
max_memory_limit = 300000; max_memory_limit = 300000;
size_t i = 0; size_t i = 0;
RespExpr resp; RespExpr resp;
@ -444,7 +443,6 @@ TEST_F(DflyEngineTest, OOM) {
/// and then written with the same key. /// and then written with the same key.
TEST_F(DflyEngineTest, Bug207) { TEST_F(DflyEngineTest, Bug207) {
max_memory_limit = 300000; max_memory_limit = 300000;
shard_set->TEST_EnableHeartBeat();
shard_set->TEST_EnableCacheMode(); shard_set->TEST_EnableCacheMode();
absl::FlagSaver fs; absl::FlagSaver fs;
absl::SetFlag(&FLAGS_oom_deny_ratio, 4); absl::SetFlag(&FLAGS_oom_deny_ratio, 4);
@ -474,7 +472,6 @@ TEST_F(DflyEngineTest, Bug207) {
} }
TEST_F(DflyEngineTest, StickyEviction) { TEST_F(DflyEngineTest, StickyEviction) {
shard_set->TEST_EnableHeartBeat();
shard_set->TEST_EnableCacheMode(); shard_set->TEST_EnableCacheMode();
absl::FlagSaver fs; absl::FlagSaver fs;
absl::SetFlag(&FLAGS_oom_deny_ratio, 4); absl::SetFlag(&FLAGS_oom_deny_ratio, 4);
@ -583,11 +580,7 @@ TEST_F(DflyEngineTest, Bug468) {
} }
TEST_F(DflyEngineTest, Bug496) { TEST_F(DflyEngineTest, Bug496) {
shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) { shard_set->RunBlockingInParallel([](EngineShard* shard) {
EngineShard* shard = EngineShard::tlocal();
if (shard == nullptr)
return;
auto& db = namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id()); auto& db = namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id());
int cb_hits = 0; int cb_hits = 0;

View file

@ -406,14 +406,15 @@ void EngineShard::StopPeriodicFiber() {
} }
} }
void EngineShard::StartPeriodicFiber(util::ProactorBase* pb) { void EngineShard::StartPeriodicFiber(util::ProactorBase* pb, std::function<void()> global_handler) {
uint32_t clock_cycle_ms = 1000 / std::max<uint32_t>(1, GetFlag(FLAGS_hz)); uint32_t clock_cycle_ms = 1000 / std::max<uint32_t>(1, GetFlag(FLAGS_hz));
if (clock_cycle_ms == 0) if (clock_cycle_ms == 0)
clock_cycle_ms = 1; clock_cycle_ms = 1;
fiber_periodic_ = MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms] { fiber_periodic_ = MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms,
handler = std::move(global_handler)] {
ThisFiber::SetName(absl::StrCat("shard_periodic", index)); ThisFiber::SetName(absl::StrCat("shard_periodic", index));
RunPeriodic(std::chrono::milliseconds(period_ms)); RunPeriodic(std::chrono::milliseconds(period_ms), std::move(handler));
}); });
} }
@ -671,7 +672,8 @@ void EngineShard::Heartbeat() {
} }
} }
void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms) { void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms,
std::function<void()> global_handler) {
VLOG(1) << "RunPeriodic with period " << period_ms.count() << "ms"; VLOG(1) << "RunPeriodic with period " << period_ms.count() << "ms";
bool runs_global_periodic = (shard_id() == 0); // Only shard 0 runs global periodic. bool runs_global_periodic = (shard_id() == 0); // Only shard 0 runs global periodic.
@ -716,6 +718,10 @@ void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms) {
rss_mem_peak.store(total_rss, memory_order_relaxed); rss_mem_peak.store(total_rss, memory_order_relaxed);
} }
} }
if (global_handler) {
global_handler();
}
} }
} }
} }
@ -762,12 +768,6 @@ size_t EngineShard::UsedMemory() const {
search_indices()->GetUsedMemory(); search_indices()->GetUsedMemory();
} }
void EngineShard::TEST_EnableHeartbeat() {
fiber_periodic_ = fb2::Fiber("shard_periodic_TEST", [this, period_ms = 1] {
RunPeriodic(std::chrono::milliseconds(period_ms));
});
}
bool EngineShard::ShouldThrottleForTiering() const { // see header for formula justification bool EngineShard::ShouldThrottleForTiering() const { // see header for formula justification
if (!tiered_storage_) if (!tiered_storage_)
return false; return false;
@ -902,7 +902,7 @@ size_t GetTieredFileLimit(size_t threads) {
return max_shard_file_size; return max_shard_file_size;
} }
void EngineShardSet::Init(uint32_t sz, bool update_db_time) { void EngineShardSet::Init(uint32_t sz, std::function<void()> global_handler) {
CHECK_EQ(0u, size()); CHECK_EQ(0u, size());
shard_queue_.resize(sz); shard_queue_.resize(sz);
@ -920,10 +920,8 @@ void EngineShardSet::Init(uint32_t sz, bool update_db_time) {
auto* shard = EngineShard::tlocal(); auto* shard = EngineShard::tlocal();
shard->InitTieredStorage(pb, max_shard_file_size); shard->InitTieredStorage(pb, max_shard_file_size);
if (update_db_time) { // Must be last, as it accesses objects initialized above.
// Must be last, as it accesses objects initialized above. shard->StartPeriodicFiber(pb, global_handler);
shard->StartPeriodicFiber(pb);
}
} }
}); });
} }
@ -949,10 +947,6 @@ void EngineShardSet::InitThreadLocal(ProactorBase* pb) {
shard_queue_[es->shard_id()] = es->GetFiberQueue(); shard_queue_[es->shard_id()] = es->GetFiberQueue();
} }
void EngineShardSet::TEST_EnableHeartBeat() {
RunBriefInParallel([](EngineShard* shard) { shard->TEST_EnableHeartbeat(); });
}
void EngineShardSet::TEST_EnableCacheMode() { void EngineShardSet::TEST_EnableCacheMode() {
RunBlockingInParallel([](EngineShard* shard) { RunBlockingInParallel([](EngineShard* shard) {
namespaces.GetDefaultNamespace().GetCurrentDbSlice().TEST_EnableCacheMode(); namespaces.GetDefaultNamespace().GetCurrentDbSlice().TEST_EnableCacheMode();

View file

@ -150,8 +150,6 @@ class EngineShard {
return continuation_trans_; return continuation_trans_;
} }
void TEST_EnableHeartbeat();
void StopPeriodicFiber(); void StopPeriodicFiber();
struct TxQueueInfo { struct TxQueueInfo {
@ -205,10 +203,10 @@ class EngineShard {
// blocks the calling fiber. // blocks the calling fiber.
void Shutdown(); // called before destructing EngineShard. void Shutdown(); // called before destructing EngineShard.
void StartPeriodicFiber(util::ProactorBase* pb); void StartPeriodicFiber(util::ProactorBase* pb, std::function<void()> global_handler);
void Heartbeat(); void Heartbeat();
void RunPeriodic(std::chrono::milliseconds period_ms); void RunPeriodic(std::chrono::milliseconds period_ms, std::function<void()> global_handler);
void CacheStats(); void CacheStats();
@ -288,7 +286,7 @@ class EngineShardSet {
return pp_; return pp_;
} }
void Init(uint32_t size, bool update_db_time); void Init(uint32_t size, std::function<void()> global_handler);
// Shutdown sequence: // Shutdown sequence:
// - EngineShardSet.PreShutDown() // - EngineShardSet.PreShutDown()
@ -342,7 +340,6 @@ class EngineShardSet {
} }
// Used in tests // Used in tests
void TEST_EnableHeartBeat();
void TEST_EnableCacheMode(); void TEST_EnableCacheMode();
private: private:

View file

@ -842,8 +842,7 @@ Service::~Service() {
shard_set = nullptr; shard_set = nullptr;
} }
void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*> listeners, void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*> listeners) {
const InitOpts& opts) {
InitRedisTables(); InitRedisTables();
config_registry.RegisterMutable("maxmemory", [](const absl::CommandLineFlag& flag) { config_registry.RegisterMutable("maxmemory", [](const absl::CommandLineFlag& flag) {
@ -881,7 +880,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
ServerState::Init(index, shard_num, &user_registry_); ServerState::Init(index, shard_num, &user_registry_);
}); });
shard_set->Init(shard_num, !opts.disable_time_update); shard_set->Init(shard_num, nullptr);
const auto tcp_disabled = GetFlag(FLAGS_port) == 0u; const auto tcp_disabled = GetFlag(FLAGS_port) == 0u;
// We assume that listeners.front() is the main_listener // We assume that listeners.front() is the main_listener
// see dfly_main RunEngine // see dfly_main RunEngine
@ -1600,10 +1599,9 @@ facade::ConnectionContext* Service::CreateContext(util::FiberSocketBase* peer,
// a bit of a hack. I set up breaker callback here for the owner. // a bit of a hack. I set up breaker callback here for the owner.
// Should work though it's confusing to have it here. // Should work though it's confusing to have it here.
owner->RegisterBreakHook([res, this](uint32_t) { owner->RegisterBreakHook([res](uint32_t) {
if (res->transaction) if (res->transaction)
res->transaction->CancelBlocking(nullptr); res->transaction->CancelBlocking(nullptr);
this->server_family().BreakOnShutdown();
}); });
return res; return res;
@ -2529,7 +2527,7 @@ void Service::ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privil
} }
} }
void Service::OnClose(facade::ConnectionContext* cntx) { void Service::OnConnectionClose(facade::ConnectionContext* cntx) {
ConnectionContext* server_cntx = static_cast<ConnectionContext*>(cntx); ConnectionContext* server_cntx = static_cast<ConnectionContext*>(cntx);
ConnectionState& conn_state = server_cntx->conn_state; ConnectionState& conn_state = server_cntx->conn_state;

View file

@ -29,18 +29,10 @@ using facade::MemcacheParser;
class Service : public facade::ServiceInterface { class Service : public facade::ServiceInterface {
public: public:
struct InitOpts {
bool disable_time_update;
InitOpts() : disable_time_update{false} {
}
};
explicit Service(util::ProactorPool* pp); explicit Service(util::ProactorPool* pp);
~Service(); ~Service();
void Init(util::AcceptServer* acceptor, std::vector<facade::Listener*> listeners, void Init(util::AcceptServer* acceptor, std::vector<facade::Listener*> listeners);
const InitOpts& opts = InitOpts{});
void Shutdown(); void Shutdown();
@ -93,7 +85,7 @@ class Service : public facade::ServiceInterface {
GlobalState GetGlobalState() const; GlobalState GetGlobalState() const;
void ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privileged) final; void ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privileged) final;
void OnClose(facade::ConnectionContext* cntx) final; void OnConnectionClose(facade::ConnectionContext* cntx) final;
Service::ContextInfo GetContextInfo(facade::ConnectionContext* cntx) const final; Service::ContextInfo GetContextInfo(facade::ConnectionContext* cntx) const final;

View file

@ -835,7 +835,7 @@ error_code SerializerBase::WriteRaw(const io::Bytes& buf) {
return error_code{}; return error_code{};
} }
error_code SerializerBase::FlushToSink(io::Sink* s, SerializerBase::FlushState flush_state) { error_code SerializerBase::FlushToSink(io::Sink* sink, SerializerBase::FlushState flush_state) {
auto bytes = PrepareFlush(flush_state); auto bytes = PrepareFlush(flush_state);
if (bytes.empty()) if (bytes.empty())
return error_code{}; return error_code{};
@ -843,7 +843,7 @@ error_code SerializerBase::FlushToSink(io::Sink* s, SerializerBase::FlushState f
DVLOG(2) << "FlushToSink " << bytes.size() << " bytes"; DVLOG(2) << "FlushToSink " << bytes.size() << " bytes";
// interrupt point. // interrupt point.
RETURN_ON_ERR(s->Write(bytes)); RETURN_ON_ERR(sink->Write(bytes));
mem_buf_.ConsumeInput(bytes.size()); mem_buf_.ConsumeInput(bytes.size());
return error_code{}; return error_code{};
@ -1121,7 +1121,9 @@ class RdbSaver::Impl {
RdbSaver::SnapshotStats GetCurrentSnapshotProgress() const; RdbSaver::SnapshotStats GetCurrentSnapshotProgress() const;
error_code Flush() { error_code FlushSerializer();
error_code FlushSink() {
return aligned_buf_ ? aligned_buf_->Flush() : error_code{}; return aligned_buf_ ? aligned_buf_->Flush() : error_code{};
} }
@ -1133,10 +1135,6 @@ class RdbSaver::Impl {
return &meta_serializer_; return &meta_serializer_;
} }
io::Sink* sink() {
return sink_;
}
private: private:
unique_ptr<SliceSnapshot>& GetSnapshot(EngineShard* shard); unique_ptr<SliceSnapshot>& GetSnapshot(EngineShard* shard);
@ -1252,6 +1250,7 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
std::optional<SliceSnapshot::DbRecord> record; std::optional<SliceSnapshot::DbRecord> record;
RecordsPopper records_popper(push_to_sink_with_order_, &channel_); RecordsPopper records_popper(push_to_sink_with_order_, &channel_);
auto& stats = ServerState::tlocal()->stats;
// we can not exit on io-error since we spawn fibers that push data. // we can not exit on io-error since we spawn fibers that push data.
// TODO: we may signal them to stop processing and exit asap in case of the error. // TODO: we may signal them to stop processing and exit asap in case of the error.
@ -1266,10 +1265,10 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
DVLOG(2) << "Pulled " << record->id; DVLOG(2) << "Pulled " << record->id;
auto before = absl::GetCurrentTimeNanos(); auto before = absl::GetCurrentTimeNanos();
io_error = sink_->Write(io::Buffer(record->value)); io_error = sink_->Write(io::Buffer(record->value));
auto& stats = ServerState::tlocal()->stats;
stats.rdb_save_usec += (absl::GetCurrentTimeNanos() - before) / 1'000; stats.rdb_save_usec += (absl::GetCurrentTimeNanos() - before) / 1'000;
stats.rdb_save_count++; stats.rdb_save_count++;
if (io_error) { if (io_error) {
VLOG(1) << "Error writing to sink " << io_error.message();
break; break;
} }
} while ((record = records_popper.TryPop())); } while ((record = records_popper.TryPop()));
@ -1369,6 +1368,10 @@ RdbSaver::SnapshotStats RdbSaver::Impl::GetCurrentSnapshotProgress() const {
}); });
} }
error_code RdbSaver::Impl::FlushSerializer() {
return serializer()->FlushToSink(sink_, SerializerBase::FlushState::kFlushMidEntry);
}
RdbSaver::GlobalData RdbSaver::GetGlobalData(const Service* service) { RdbSaver::GlobalData RdbSaver::GetGlobalData(const Service* service) {
StringVec script_bodies, search_indices; StringVec script_bodies, search_indices;
@ -1471,8 +1474,7 @@ error_code RdbSaver::SaveHeader(const GlobalData& glob_state) {
} }
error_code RdbSaver::SaveBody(Context* cntx, RdbTypeFreqMap* freq_map) { error_code RdbSaver::SaveBody(Context* cntx, RdbTypeFreqMap* freq_map) {
RETURN_ON_ERR( RETURN_ON_ERR(impl_->FlushSerializer());
impl_->serializer()->FlushToSink(impl_->sink(), SerializerBase::FlushState::kFlushMidEntry));
if (save_mode_ == SaveMode::SUMMARY) { if (save_mode_ == SaveMode::SUMMARY) {
impl_->serializer()->SendFullSyncCut(); impl_->serializer()->SendFullSyncCut();
@ -1547,9 +1549,9 @@ error_code RdbSaver::SaveEpilog() {
absl::little_endian::Store64(buf, chksum); absl::little_endian::Store64(buf, chksum);
RETURN_ON_ERR(ser.WriteRaw(buf)); RETURN_ON_ERR(ser.WriteRaw(buf));
RETURN_ON_ERR(ser.FlushToSink(impl_->sink(), SerializerBase::FlushState::kFlushMidEntry)); RETURN_ON_ERR(impl_->FlushSerializer());
return impl_->Flush(); return impl_->FlushSink();
} }
error_code RdbSaver::SaveAuxFieldStrInt(string_view key, int64_t val) { error_code RdbSaver::SaveAuxFieldStrInt(string_view key, int64_t val) {

View file

@ -86,7 +86,7 @@ class RdbSaver {
~RdbSaver(); ~RdbSaver();
// Initiates the serialization in the shard's thread. // Initiates the serialization in the shard's thread.
// TODO: to implement break functionality to allow stopping early. // cll allows breaking in the middle.
void StartSnapshotInShard(bool stream_journal, const Cancellation* cll, EngineShard* shard); void StartSnapshotInShard(bool stream_journal, const Cancellation* cll, EngineShard* shard);
// Send only the incremental snapshot since start_lsn. // Send only the incremental snapshot since start_lsn.

View file

@ -45,6 +45,7 @@ class RdbTest : public BaseFamilyTest {
void RdbTest::SetUp() { void RdbTest::SetUp() {
InitWithDbFilename(); InitWithDbFilename();
max_memory_limit = 40000000;
} }
inline const uint8_t* to_byte(const void* s) { inline const uint8_t* to_byte(const void* s) {

View file

@ -628,8 +628,6 @@ TEST_F(SearchFamilyTest, SimpleExpiry) {
EXPECT_THAT(Run({"ft.search", "i1", "*"}), AreDocIds("d:1", "d:2", "d:3")); EXPECT_THAT(Run({"ft.search", "i1", "*"}), AreDocIds("d:1", "d:2", "d:3"));
shard_set->TEST_EnableHeartBeat();
AdvanceTime(60); AdvanceTime(60);
ThisFiber::SleepFor(5ms); // Give heartbeat time to delete expired doc ThisFiber::SleepFor(5ms); // Give heartbeat time to delete expired doc
EXPECT_THAT(Run({"ft.search", "i1", "*"}), AreDocIds("d:1", "d:3")); EXPECT_THAT(Run({"ft.search", "i1", "*"}), AreDocIds("d:1", "d:3"));

View file

@ -683,7 +683,7 @@ std::optional<fb2::Fiber> Pause(std::vector<facade::Listener*> listeners, Namesp
// command that did not pause on the new state yet we will pause after waking up. // command that did not pause on the new state yet we will pause after waking up.
DispatchTracker tracker{std::move(listeners), conn, true /* ignore paused commands */, DispatchTracker tracker{std::move(listeners), conn, true /* ignore paused commands */,
true /*ignore blocking*/}; true /*ignore blocking*/};
shard_set->pool()->AwaitBrief([&tracker, pause_state, ns](unsigned, util::ProactorBase*) { shard_set->pool()->AwaitBrief([&tracker, pause_state](unsigned, util::ProactorBase*) {
// Commands don't suspend before checking the pause state, so // Commands don't suspend before checking the pause state, so
// it's impossible to deadlock on waiting for a command that will be paused. // it's impossible to deadlock on waiting for a command that will be paused.
tracker.TrackOnThread(); tracker.TrackOnThread();
@ -1562,10 +1562,6 @@ void ServerFamily::DbSize(CmdArgList args, ConnectionContext* cntx) {
return cntx->SendLong(num_keys.load(memory_order_relaxed)); return cntx->SendLong(num_keys.load(memory_order_relaxed));
} }
void ServerFamily::BreakOnShutdown() {
dfly_cmd_->BreakOnShutdown();
}
void ServerFamily::CancelBlockingOnThread(std::function<OpStatus(ArgSlice)> status_cb) { void ServerFamily::CancelBlockingOnThread(std::function<OpStatus(ArgSlice)> status_cb) {
auto cb = [status_cb](unsigned thread_index, util::Connection* conn) { auto cb = [status_cb](unsigned thread_index, util::Connection* conn) {
if (auto fcntx = static_cast<facade::Connection*>(conn)->cntx(); fcntx) { if (auto fcntx = static_cast<facade::Connection*>(conn)->cntx(); fcntx) {

View file

@ -227,8 +227,6 @@ class ServerFamily {
void OnClose(ConnectionContext* cntx); void OnClose(ConnectionContext* cntx);
void BreakOnShutdown();
void CancelBlockingOnThread(std::function<facade::OpStatus(ArgSlice)> = {}); void CancelBlockingOnThread(std::function<facade::OpStatus(ArgSlice)> = {});
// Sets the server to replicate another instance. Does not flush the database beforehand! // Sets the server to replicate another instance. Does not flush the database beforehand!

View file

@ -167,10 +167,10 @@ void BaseFamilyTest::SetUpTestSuite() {
SetTestFlag(flag, value); SetTestFlag(flag, value);
} }
} }
max_memory_limit = INT_MAX;
} }
void BaseFamilyTest::SetUp() { void BaseFamilyTest::SetUp() {
max_memory_limit = INT_MAX;
ResetService(); ResetService();
} }
@ -207,9 +207,7 @@ void BaseFamilyTest::ResetService() {
pp_->Run(); pp_->Run();
service_ = std::make_unique<Service>(pp_.get()); service_ = std::make_unique<Service>(pp_.get());
Service::InitOpts opts; service_->Init(nullptr, {});
opts.disable_time_update = true;
service_->Init(nullptr, {}, opts);
used_mem_current = 0; used_mem_current = 0;
TEST_current_time_ms = absl::GetCurrentTimeNanos() / 1000000; TEST_current_time_ms = absl::GetCurrentTimeNanos() / 1000000;

View file

@ -192,7 +192,7 @@ void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, str
for (auto [dbid, hash, item_segment] : ts_->bins_->DeleteBin(segment, page)) { for (auto [dbid, hash, item_segment] : ts_->bins_->DeleteBin(segment, page)) {
// Search for key with the same hash and value pointing to the same segment. // Search for key with the same hash and value pointing to the same segment.
// If it still exists, it must correspond to the value stored in this bin // If it still exists, it must correspond to the value stored in this bin
auto predicate = [item_segment](const PrimeKey& key, const PrimeValue& probe) { auto predicate = [item_segment = item_segment](const PrimeKey& key, const PrimeValue& probe) {
return probe.IsExternal() && tiering::DiskSegment{probe.GetExternalSlice()} == item_segment; return probe.IsExternal() && tiering::DiskSegment{probe.GetExternalSlice()} == item_segment;
}; };
auto it = db_slice_.GetDBTable(dbid)->prime.FindFirst(hash, predicate); auto it = db_slice_.GetDBTable(dbid)->prime.FindFirst(hash, predicate);

View file

@ -209,7 +209,6 @@ TEST_F(TieredStorageTest, BackgroundOffloading) {
const int kNum = 500; const int kNum = 500;
max_memory_limit = kNum * 4096; max_memory_limit = kNum * 4096;
pp_->at(0)->AwaitBrief([] { EngineShard::tlocal()->TEST_EnableHeartbeat(); });
// Stash all values // Stash all values
string value = BuildString(3000); string value = BuildString(3000);
@ -302,10 +301,7 @@ TEST_F(TieredStorageTest, FlushPending) {
TEST_F(TieredStorageTest, MemoryPressure) { TEST_F(TieredStorageTest, MemoryPressure) {
max_memory_limit = 20_MB; max_memory_limit = 20_MB;
pp_->at(0)->AwaitBrief([] { pp_->at(0)->AwaitBrief([] { EngineShard::tlocal()->tiered_storage()->SetMemoryLowLimit(2_MB); });
EngineShard::tlocal()->TEST_EnableHeartbeat();
EngineShard::tlocal()->tiered_storage()->SetMemoryLowLimit(2_MB);
});
constexpr size_t kNum = 10000; constexpr size_t kNum = 10000;
for (size_t i = 0; i < kNum; i++) { for (size_t i = 0; i < kNum; i++) {