feat(server): Add missing fields to INFO PERSISTENCE command (#1408) (#1438)

* Add loading field
* Add rdb_changes_since_last_save field
This commit is contained in:
Kostas Kyrimis 2023-06-21 23:35:36 +03:00 committed by GitHub
parent 5e479ed967
commit 99f3284910
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 106 additions and 4 deletions

View file

@ -215,7 +215,7 @@ DbStats& DbStats::operator+=(const DbStats& o) {
}
SliceEvents& SliceEvents::operator+=(const SliceEvents& o) {
static_assert(sizeof(SliceEvents) == 80, "You should update this function with new fields");
static_assert(sizeof(SliceEvents) == 88, "You should update this function with new fields");
ADD(evicted_keys);
ADD(hard_evictions);
@ -227,6 +227,7 @@ SliceEvents& SliceEvents::operator+=(const SliceEvents& o) {
ADD(hits);
ADD(misses);
ADD(insertion_rejections);
ADD(update);
return *this;
}
@ -873,6 +874,8 @@ void DbSlice::PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key,
}
}
++events_.update;
if (ClusterConfig::IsClusterEnabled()) {
db.slots_stats[ClusterConfig::KeySlot(key)].total_writes += 1;
}
@ -1143,4 +1146,8 @@ void DbSlice::SetDocDeletionCallback(DocDeletionCallback ddcb) {
doc_del_cb_ = move(ddcb);
}
void DbSlice::ResetUpdateEvents() {
events_.update = 0;
}
} // namespace dfly

View file

@ -51,6 +51,9 @@ struct SliceEvents {
// how many insertions were rejected due to OOM.
size_t insertion_rejections = 0;
// how many updates and insertions of keys between snapshot intervals
size_t update = 0;
SliceEvents& operator+=(const SliceEvents& o);
};
@ -318,6 +321,9 @@ class DbSlice {
void SetDocDeletionCallback(DocDeletionCallback ddcb);
// Resets the event counter for updates/insertions
void ResetUpdateEvents();
private:
std::pair<PrimeIterator, bool> AddOrUpdateInternal(const Context& cntx, std::string_view key,
PrimeValue obj, uint64_t expire_at_ms,

View file

@ -540,4 +540,44 @@ TEST_F(GenericFamilyTest, Restore) {
EXPECT_EQ(CheckedInt({"ttl", "string-key"}), -1);
}
TEST_F(GenericFamilyTest, Info) {
auto get_rdb_changes_since_last_save = [](const string& str) -> size_t {
const string matcher = "rdb_changes_since_last_save:";
const auto pos = str.find(matcher) + matcher.size();
const auto sub = str.substr(pos, 1);
return atoi(sub.c_str());
};
auto resp = Run({"set", "k", "1"});
resp = Run({"info", "persistence"});
EXPECT_EQ(1, get_rdb_changes_since_last_save(resp.GetString()));
resp = Run({"set", "k", "1"});
resp = Run({"info", "persistence"});
EXPECT_EQ(2, get_rdb_changes_since_last_save(resp.GetString()));
resp = Run({"set", "k2", "2"});
resp = Run({"info", "persistence"});
EXPECT_EQ(3, get_rdb_changes_since_last_save(resp.GetString()));
resp = Run({"save"});
resp = Run({"info", "persistence"});
EXPECT_EQ(0, get_rdb_changes_since_last_save(resp.GetString()));
resp = Run({"set", "k2", "2"});
resp = Run({"info", "persistence"});
EXPECT_EQ(1, get_rdb_changes_since_last_save(resp.GetString()));
resp = Run({"bgsave"});
resp = Run({"info", "persistence"});
EXPECT_EQ(0, get_rdb_changes_since_last_save(resp.GetString()));
resp = Run({"set", "k3", "3"});
resp = Run({"info", "persistence"});
EXPECT_EQ(1, get_rdb_changes_since_last_save(resp.GetString()));
resp = Run({"del", "k3"});
resp = Run({"info", "persistence"});
EXPECT_EQ(1, get_rdb_changes_since_last_save(resp.GetString()));
}
} // namespace dfly

View file

@ -1806,6 +1806,11 @@ GlobalState Service::SwitchState(GlobalState from, GlobalState to) {
return to;
}
GlobalState Service::GetGlobalState() const {
lock_guard lk(mu_);
return global_state_;
}
void Service::ConfigureHttpHandlers(util::HttpListenerBase* base) {
server_family_.ConfigureMetrics(base);
base->RegisterCb("/txz", TxTable);

View file

@ -85,6 +85,8 @@ class Service : public facade::ServiceInterface {
// Upon switch, updates cached global state in threadlocal ServerState struct.
GlobalState SwitchState(GlobalState from, GlobalState to);
GlobalState GetGlobalState() const;
void ConfigureHttpHandlers(util::HttpListenerBase* base) final;
void OnClose(facade::ConnectionContext* cntx) final;
std::string GetContextInfo(facade::ConnectionContext* cntx) final;

View file

@ -984,9 +984,14 @@ void ServerFamily::StatsMC(std::string_view section, facade::ConnectionContext*
// Run callback for all active RdbSnapshots (passed as index).
// .dfs format contains always `shard_set->size() + 1` snapshots (for every shard and summary
// file).
static void RunStage(bool new_version, std::function<void(unsigned)> cb) {
static void RunStage(
bool new_version, std::function<void(unsigned)> cb,
std::function<void(EngineShard*)> maybe_reset_events = []([[maybe_unused]] auto*) {}) {
if (new_version) {
shard_set->RunBlockingInParallel([&](EngineShard* es) { cb(es->shard_id()); });
shard_set->RunBlockingInParallel([&](EngineShard* es) {
cb(es->shard_id());
maybe_reset_events(es);
});
cb(shard_set->size());
} else {
cb(0);
@ -1171,7 +1176,9 @@ GenericError ServerFamily::DoSave(bool new_version, string_view basename, Transa
is_saving_.store(false, memory_order_relaxed);
RunStage(new_version, close_cb);
auto reset_event_cb = [](EngineShard* es) { es->db_slice().ResetUpdateEvents(); };
RunStage(new_version, close_cb, reset_event_cb);
if (new_version) {
ExtendDfsFilename("summary", &fpath);
@ -1625,10 +1632,13 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("last_save", save_info->save_time);
append("last_save_duration_sec", save_info->duration_sec);
append("last_save_file", save_info->file_name);
size_t is_loading = service_.GetGlobalState() == GlobalState::LOADING;
append("loading", is_loading);
for (const auto& k_v : save_info->freq_map) {
append(StrCat("rdb_", k_v.first), k_v.second);
}
append("rdb_changes_since_last_save", m.events.update);
}
if (should_enter("REPLICATION")) {

View file

@ -191,3 +191,35 @@ class TestDflySnapshotOnShutdown(SnapshotTestBase):
await a_client.connection_pool.disconnect()
assert await seeder.compare(start_capture)
@dfly_args({**BASIC_ARGS, "dbfilename": "test-info-persistence"})
class TestDflyInfoPersistenceLoadingField(SnapshotTestBase):
"""Test is_loading field on INFO PERSISTENCE during snapshot loading"""
@pytest.fixture(autouse=True)
def setup(self, tmp_dir: Path):
self.tmp_dir = tmp_dir
def extract_is_loading_field(self, res):
matcher = b'loading:'
start = res.find(matcher)
pos = start + len(matcher)
return chr(res[pos])
@pytest.mark.asyncio
async def test_snapshot(self, df_seeder_factory, df_server):
seeder = df_seeder_factory.create(port=df_server.port, **SEEDER_ARGS)
await seeder.run(target_deviation=0.1)
df_server.stop()
df_server.start()
a_client = aioredis.Redis(port=df_server.port)
res = await a_client.execute_command("INFO PERSISTENCE")
assert '1' == self.extract_is_loading_field(res)
#Wait for snapshot to finish loading and retry INFO PERSISTENCE
await wait_available_async(a_client)
res = await a_client.execute_command("INFO PERSISTENCE")
assert '0' == self.extract_is_loading_field(res)
await a_client.connection_pool.disconnect()