chore: add -Wthread-analysis and annotate (part 1/2) (#3502)

* enable -Wthread-analysis
* add missing annotations
* small fixes

---------

Signed-off-by: kostas <kostas@dragonflydb.io>
This commit is contained in:
Kostas Kyrimis 2024-08-26 18:22:38 +03:00 committed by GitHub
parent 238bf3ee85
commit 839b1be82d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 163 additions and 147 deletions

View file

@ -22,6 +22,7 @@
#include "absl/strings/ascii.h"
#include "facade/error.h"
#include "slowlog.h"
#include "util/fibers/synchronization.h"
extern "C" {
#include "redis/redis_aux.h"
@ -899,7 +900,7 @@ void ServerFamily::JoinSnapshotSchedule() {
schedule_done_.Reset();
}
void ServerFamily::Shutdown() {
void ServerFamily::Shutdown() ABSL_LOCKS_EXCLUDED(replicaof_mu_) {
VLOG(1) << "ServerFamily::Shutdown";
if (load_result_) {
@ -933,7 +934,7 @@ void ServerFamily::Shutdown() {
auto ec = journal_->Close();
LOG_IF(ERROR, ec) << "Error closing journal " << ec;
unique_lock lk(replicaof_mu_);
util::fb2::LockGuard lk(replicaof_mu_);
if (replica_) {
replica_->Stop();
}
@ -1430,7 +1431,7 @@ void ServerFamily::ConfigureMetrics(util::HttpListenerBase* http_base) {
}
void ServerFamily::PauseReplication(bool pause) {
unique_lock lk(replicaof_mu_);
util::fb2::LockGuard lk(replicaof_mu_);
// Switch to primary mode.
if (!ServerState::tlocal()->is_master) {
@ -1441,7 +1442,7 @@ void ServerFamily::PauseReplication(bool pause) {
}
std::optional<ReplicaOffsetInfo> ServerFamily::GetReplicaOffsetInfo() {
unique_lock lk(replicaof_mu_);
util::fb2::LockGuard lk(replicaof_mu_);
// Switch to primary mode.
if (!ServerState::tlocal()->is_master) {
@ -1464,7 +1465,7 @@ vector<facade::Listener*> ServerFamily::GetNonPriviligedListeners() const {
}
optional<Replica::Summary> ServerFamily::GetReplicaSummary() const {
unique_lock lk(replicaof_mu_);
util::fb2::LockGuard lk(replicaof_mu_);
if (replica_ == nullptr) {
return nullopt;
} else {
@ -1538,7 +1539,7 @@ GenericError ServerFamily::DoSaveCheckAndStart(bool new_version, string_view bas
StrCat(GlobalStateName(state), " - can not save database")};
}
{
std::lock_guard lk(save_mu_);
util::fb2::LockGuard lk(save_mu_);
if (save_controller_) {
return GenericError{make_error_code(errc::operation_in_progress),
"SAVING - can not save database"};
@ -1613,7 +1614,7 @@ error_code ServerFamily::Drakarys(Transaction* transaction, DbIndex db_ind) {
}
LastSaveInfo ServerFamily::GetLastSaveInfo() const {
lock_guard lk(save_mu_);
util::fb2::LockGuard lk(save_mu_);
return last_save_info_;
}
@ -2204,7 +2205,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
}
{
lock_guard lk{save_mu_};
util::fb2::LockGuard lk{save_mu_};
if (save_controller_) {
append("save_buffer_bytes", save_controller_->GetSaveBuffersSize());
}
@ -2291,7 +2292,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
bool is_saving = false;
uint32_t curent_durration_sec = 0;
{
lock_guard lk{save_mu_};
util::fb2::LockGuard lk{save_mu_};
if (save_controller_) {
is_saving = true;
curent_durration_sec = save_controller_->GetCurrentSaveDuration();
@ -2352,7 +2353,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
}
if (should_enter("REPLICATION")) {
unique_lock lk(replicaof_mu_);
util::fb2::LockGuard lk(replicaof_mu_);
// Thread local var is_master is updated under mutex replicaof_mu_ together with replica_,
// ensuring eventual consistency of is_master. When determining if the server is a replica and
// accessing the replica_ object, we must lock replicaof_mu_. Using is_master alone is
@ -2552,7 +2553,7 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) {
}
void ServerFamily::AddReplicaOf(CmdArgList args, ConnectionContext* cntx) {
unique_lock lk(replicaof_mu_);
util::fb2::LockGuard lk(replicaof_mu_);
if (ServerState::tlocal()->is_master) {
cntx->SendError("Calling ADDREPLICAOFF allowed only after server is already a replica");
return;
@ -2578,70 +2579,72 @@ void ServerFamily::AddReplicaOf(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::ReplicaOfInternal(CmdArgList args, ConnectionContext* cntx,
ActionOnConnectionFail on_err) {
unique_lock lk(replicaof_mu_); // Only one REPLICAOF command can run at a time
std::shared_ptr<Replica> new_replica;
{
util::fb2::LockGuard lk(replicaof_mu_); // Only one REPLICAOF command can run at a time
// We should not execute replica of command while loading from snapshot.
if (ServerState::tlocal()->is_master && service_.GetGlobalState() == GlobalState::LOADING) {
cntx->SendError(kLoadingErr);
return;
}
auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, cntx);
if (!replicaof_args.has_value()) {
return;
}
LOG(INFO) << "Replicating " << *replicaof_args;
// If NO ONE was supplied, just stop the current replica (if it exists)
if (replicaof_args->IsReplicaOfNoOne()) {
if (!ServerState::tlocal()->is_master) {
CHECK(replica_);
SetMasterFlagOnAllThreads(true); // Flip flag before clearing replica
replica_->Stop();
replica_.reset();
StopAllClusterReplicas();
// We should not execute replica of command while loading from snapshot.
if (ServerState::tlocal()->is_master && service_.GetGlobalState() == GlobalState::LOADING) {
cntx->SendError(kLoadingErr);
return;
}
CHECK_EQ(service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE), GlobalState::ACTIVE)
<< "Server is set to replica no one, yet state is not active!";
auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, cntx);
if (!replicaof_args.has_value()) {
return;
}
return cntx->SendOk();
}
LOG(INFO) << "Replicating " << *replicaof_args;
// If any replication is in progress, stop it, cancellation should kick in immediately
if (replica_)
replica_->Stop();
StopAllClusterReplicas();
// If NO ONE was supplied, just stop the current replica (if it exists)
if (replicaof_args->IsReplicaOfNoOne()) {
if (!ServerState::tlocal()->is_master) {
CHECK(replica_);
// First, switch into the loading state
if (auto new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
new_state != GlobalState::LOADING) {
LOG(WARNING) << new_state << " in progress, ignored";
cntx->SendError("Invalid state");
return;
}
SetMasterFlagOnAllThreads(true); // Flip flag before clearing replica
replica_->Stop();
replica_.reset();
// If we are called by "Replicate", cntx->transaction will be null but we do not need
// to flush anything.
if (cntx->transaction) {
Drakarys(cntx->transaction, DbSlice::kDbAll);
}
StopAllClusterReplicas();
}
// Create a new replica and assing it
auto new_replica = make_shared<Replica>(replicaof_args->host, replicaof_args->port, &service_,
master_replid(), replicaof_args->slot_range);
CHECK_EQ(service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE), GlobalState::ACTIVE)
<< "Server is set to replica no one, yet state is not active!";
replica_ = new_replica;
return cntx->SendOk();
}
// TODO: disconnect pending blocked clients (pubsub, blocking commands)
SetMasterFlagOnAllThreads(false); // Flip flag after assiging replica
// If any replication is in progress, stop it, cancellation should kick in immediately
if (replica_)
replica_->Stop();
StopAllClusterReplicas();
// First, switch into the loading state
if (auto new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
new_state != GlobalState::LOADING) {
LOG(WARNING) << new_state << " in progress, ignored";
cntx->SendError("Invalid state");
return;
}
// If we are called by "Replicate", cntx->transaction will be null but we do not need
// to flush anything.
if (cntx->transaction) {
Drakarys(cntx->transaction, DbSlice::kDbAll);
}
// Create a new replica and assing it
new_replica = make_shared<Replica>(replicaof_args->host, replicaof_args->port, &service_,
master_replid(), replicaof_args->slot_range);
replica_ = new_replica;
// TODO: disconnect pending blocked clients (pubsub, blocking commands)
SetMasterFlagOnAllThreads(false); // Flip flag after assiging replica
} // release the lock, lk.unlock()
// We proceed connecting below without the lock to allow interrupting the replica immediately.
// From this point and onward, it should be highly responsive.
lk.unlock();
error_code ec{};
switch (on_err) {
@ -2655,7 +2658,7 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, ConnectionContext* cntx,
// If the replication attempt failed, clean up global state. The replica should have stopped
// internally.
lk.lock();
util::fb2::LockGuard lk(replicaof_mu_); // Only one REPLICAOF command can run at a time
if (ec && replica_ == new_replica) {
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
SetMasterFlagOnAllThreads(true);
@ -2697,7 +2700,7 @@ void ServerFamily::Replicate(string_view host, string_view port) {
void ServerFamily::ReplTakeOver(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "ReplTakeOver start";
unique_lock lk(replicaof_mu_);
util::fb2::LockGuard lk(replicaof_mu_);
CmdArgParser parser{args};
@ -2833,7 +2836,7 @@ err:
void ServerFamily::Role(CmdArgList args, ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
unique_lock lk(replicaof_mu_);
util::fb2::LockGuard lk(replicaof_mu_);
// Thread local var is_master is updated under mutex replicaof_mu_ together with replica_,
// ensuring eventual consistency of is_master. When determining if the server is a replica and
// accessing the replica_ object, we must lock replicaof_mu_. Using is_master alone is
@ -2884,7 +2887,7 @@ void ServerFamily::Script(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::LastSave(CmdArgList args, ConnectionContext* cntx) {
time_t save_time;
{
lock_guard lk(save_mu_);
util::fb2::LockGuard lk(save_mu_);
save_time = last_save_info_.save_time;
}
cntx->SendLong(save_time);