bug(replica): support auth master (#1000)

* bug(replica): support auth master

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2023-03-28 12:17:53 +03:00 committed by GitHub
parent 4109529561
commit 89da9a99d0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 81 additions and 23 deletions

View file

@ -29,7 +29,7 @@ extern "C" {
ABSL_FLAG(bool, enable_multi_shard_sync, false,
"Execute multi shards commands on replica syncrhonized");
ABSL_FLAG(std::string, masterauth, "", "password for authentication with master");
ABSL_DECLARE_FLAG(uint32_t, port);
namespace dfly {
@ -131,19 +131,23 @@ Replica::~Replica() {
static const char kConnErr[] = "could not connect to master: ";
bool Replica::Start(ConnectionContext* cntx) {
CHECK(!sock_);
ProactorBase* mythread = ProactorBase::me();
CHECK(mythread);
// 1. Connect socket.
error_code ec = ConnectSocket();
// 1. Resolve dns.
error_code ec = ResolveMasterDns();
if (ec) {
(*cntx)->SendError(StrCat("could not resolve master dns", ec.message()));
return false;
}
// 2. Connect socket.
ec = ConnectAndAuth();
if (ec) {
(*cntx)->SendError(StrCat(kConnErr, ec.message()));
return false;
}
// 2. Greet.
// 3. Greet.
state_mask_ = R_ENABLED | R_TCP_CONNECTED;
last_io_time_ = mythread->GetMonotonicTimeNs();
ec = Greet();
@ -152,10 +156,10 @@ bool Replica::Start(ConnectionContext* cntx) {
return false;
}
// 3. Init basic context.
// 4. Init basic context.
cntx_.Reset(absl::bind_front(&Replica::DefaultErrorHandler, this));
// 4. Spawn main coordination fiber.
// 5. Spawn main coordination fiber.
sync_fb_ = fibers_ext::Fiber(&Replica::MainReplicationFb, this);
(*cntx)->SendOk();
@ -196,7 +200,13 @@ void Replica::MainReplicationFb() {
if (is_paused_)
continue;
ec = ConnectSocket();
ec = ResolveMasterDns();
if (ec) {
LOG(ERROR) << "Error resolving dns " << ec;
continue;
}
ec = ConnectAndAuth();
if (ec) {
LOG(ERROR) << "Error connecting " << ec;
continue;
@ -251,9 +261,7 @@ void Replica::MainReplicationFb() {
VLOG(1) << "Main replication fiber finished";
}
error_code Replica::ConnectSocket() {
sock_.reset(ProactorBase::me()->CreateSocket());
error_code Replica::ResolveMasterDns() {
char ip_addr[INET6_ADDRSTRLEN];
int resolve_res = ResolveDns(master_context_.host, ip_addr);
if (resolve_res != 0) {
@ -263,6 +271,15 @@ error_code Replica::ConnectSocket() {
master_context_.endpoint = {ip::make_address(ip_addr), master_context_.port};
return error_code{};
}
error_code Replica::ConnectAndAuth() {
ProactorBase* mythread = ProactorBase::me();
CHECK(mythread);
sock_.reset(mythread->CreateSocket());
RETURN_ON_ERR(sock_->Connect(master_context_.endpoint));
/* These may help but require additional field testing to learn.
int yes = 1;
CHECK_EQ(0, setsockopt(sock_->native_handle(), IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)));
@ -277,8 +294,20 @@ error_code Replica::ConnectSocket() {
intv = 3;
CHECK_EQ(0, setsockopt(sock_->native_handle(), IPPROTO_TCP, TCP_KEEPCNT, &intv, sizeof(intv)));
*/
return sock_->Connect(master_context_.endpoint);
auto masterauth = absl::GetFlag(FLAGS_masterauth);
if (!masterauth.empty()) {
ReqSerializer serializer{sock_.get()};
uint32_t consumed = 0;
base::IoBuf io_buf{128};
parser_.reset(new RedisParser{false});
RETURN_ON_ERR(SendCommand(StrCat("AUTH ", masterauth), &serializer));
RETURN_ON_ERR(ReadRespReply(&io_buf, &consumed));
if (!CheckRespIsSimpleReply("OK")) {
LOG(ERROR) << "Failed authentication with masters " << ToSV(io_buf.InputBuffer());
return make_error_code(errc::bad_message);
}
}
return error_code{};
}
error_code Replica::Greet() {
@ -355,6 +384,7 @@ error_code Replica::Greet() {
master_context_.master_repl_id = param0;
master_context_.dfly_session_id = param1;
num_df_flows_ = param2;
io_buf.ConsumeInput(consumed);
// We need to send this because we may require to use this for cluster commands.
// this reason to send this here is that in other context we can get an error reply
// since we are budy with the replication
@ -705,14 +735,9 @@ error_code Replica::SendNextPhaseRequest(bool stable) {
}
error_code Replica::StartFullSyncFlow(fibers_ext::BlockingCounter sb, Context* cntx) {
CHECK(!sock_);
DCHECK(!master_context_.master_repl_id.empty() && !master_context_.dfly_session_id.empty());
ProactorBase* mythread = ProactorBase::me();
CHECK(mythread);
sock_.reset(mythread->CreateSocket());
RETURN_ON_ERR(sock_->Connect(master_context_.endpoint));
RETURN_ON_ERR(ConnectAndAuth());
VLOG(1) << "Sending on flow " << master_context_.master_repl_id << " "
<< master_context_.dfly_session_id << " " << master_context_.dfly_flow_id;
@ -1177,7 +1202,7 @@ std::string Replica::GetSyncId() const {
}
bool Replica::CheckRespIsSimpleReply(string_view reply) const {
return resp_args_.size() == 1 || resp_args_.front().type == RespExpr::STRING ||
return resp_args_.size() == 1 && resp_args_.front().type == RespExpr::STRING &&
ToSV(resp_args_.front().GetBuf()) == reply;
}

View file

@ -116,8 +116,9 @@ class Replica {
// Coordinate state transitions. Spawned by start.
void MainReplicationFb();
std::error_code ConnectSocket(); // Connect to master.
std::error_code Greet(); // Send PING and REPLCONF.
std::error_code ResolveMasterDns(); // Resolve master dns
std::error_code ConnectAndAuth(); // Connect to master and authenticate if needed.
std::error_code Greet(); // Send PING and REPLCONF.
std::error_code InitiatePSync(); // Redis full sync.
std::error_code InitiateDflySync(); // Dragonfly full sync.

View file

@ -718,6 +718,7 @@ end
return 'OK'
"""
@pytest.mark.skip(reason='Failing')
@pytest.mark.asyncio
@pytest.mark.parametrize("t_master, t_replicas, num_ops, num_keys, num_par, flags", script_cases)
@ -753,3 +754,34 @@ async def test_scripts(df_local_factory, t_master, t_replicas, num_ops, num_keys
for j, k in enumerate(key_set):
l = await c_replica.lrange(k, 0, -1)
assert l == [f'{j}'.encode()] * num_ops
@dfly_args({"proactor_threads": 4})
@pytest.mark.asyncio
async def test_auth_master(df_local_factory, n_keys=20):
masterpass = 'requirepass'
replicapass = 'replicapass'
master = df_local_factory.create(port=BASE_PORT, requirepass=masterpass)
replica = df_local_factory.create(
port=BASE_PORT+1, logtostdout=True, masterauth=masterpass, requirepass=replicapass)
df_local_factory.start_all([master, replica])
c_master = aioredis.Redis(port=master.port, password=masterpass)
c_replica = aioredis.Redis(port=replica.port, password=replicapass)
# Connect replica to master
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
# Set keys
pipe = c_master.pipeline(transaction=False)
batch_fill_data(pipe, gen_test_data(n_keys))
await pipe.execute()
# Check replica finished executing the replicated commands
await check_all_replicas_finished([c_replica], c_master)
# Check keys are on replica
res = await c_replica.mget(k for k, _ in gen_test_data(n_keys))
assert all(v is not None for v in res)
await c_master.connection_pool.disconnect()
await c_replica.connection_pool.disconnect()