mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
fix: Support replicating Valkey and Redis 7.2 (#3927)
Until now, we only tested Dragonfly against Redis 6.2. It appears that something has changed in the way Redis sends stable sync commands, and now they also forward `MULTI` and `EXEC` as part of their replication. Since we do not allow all commands to run under `MULTI`/`EXEC`, specifically `SELECT`, a Dragonfly replica of such servers failed these commands and became inconsistent with the data on the master. The proposed fix is to simply ignore (i.e. not execute) `MULTI`/`EXEC` coming from a Redis/Valkey master, and run the commands within those transactions individually, like we do for other transactions. To test this we randomly choose a redis/valkey server based on 3 available installed binaries and test against them.
This commit is contained in:
parent
d2a83121e4
commit
c868b27bbe
2 changed files with 18 additions and 10 deletions
|
@ -635,18 +635,24 @@ error_code Replica::ConsumeRedisStream() {
|
|||
}
|
||||
|
||||
if (!LastResponseArgs().empty()) {
|
||||
VLOG(2) << "Got command " << absl::CHexEscape(ToSV(LastResponseArgs()[0].GetBuf()))
|
||||
<< "\n consumed: " << response->total_read;
|
||||
string_view cmd = absl::CHexEscape(ToSV(LastResponseArgs()[0].GetBuf()));
|
||||
|
||||
if (LastResponseArgs()[0].GetBuf()[0] == '\r') {
|
||||
for (const auto& arg : LastResponseArgs()) {
|
||||
LOG(INFO) << absl::CHexEscape(ToSV(arg.GetBuf()));
|
||||
// Valkey and Redis may send MULTI and EXEC as part of their replication commands.
|
||||
// Dragonfly disallows some commands, such as SELECT, inside of MULTI/EXEC, so here we simply
|
||||
// ignore MULTI/EXEC and execute their inner commands individually.
|
||||
if (!absl::EqualsIgnoreCase(cmd, "MULTI") && !absl::EqualsIgnoreCase(cmd, "EXEC")) {
|
||||
VLOG(2) << "Got command " << cmd << "\n consumed: " << response->total_read;
|
||||
|
||||
if (LastResponseArgs()[0].GetBuf()[0] == '\r') {
|
||||
for (const auto& arg : LastResponseArgs()) {
|
||||
LOG(INFO) << absl::CHexEscape(ToSV(arg.GetBuf()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
facade::RespExpr::VecToArgList(LastResponseArgs(), &args_vector);
|
||||
CmdArgList arg_list{args_vector.data(), args_vector.size()};
|
||||
service_.DispatchCommand(arg_list, &conn_context);
|
||||
facade::RespExpr::VecToArgList(LastResponseArgs(), &args_vector);
|
||||
CmdArgList arg_list{args_vector.data(), args_vector.size()};
|
||||
service_.DispatchCommand(arg_list, &conn_context);
|
||||
}
|
||||
}
|
||||
|
||||
io_buf.ConsumeInput(response->left_in_buffer);
|
||||
|
|
|
@ -3,6 +3,7 @@ import os
|
|||
import threading
|
||||
import time
|
||||
import subprocess
|
||||
import random
|
||||
import aiohttp
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
|
@ -455,8 +456,9 @@ class RedisServer:
|
|||
self.proc = None
|
||||
|
||||
def start(self, **kwargs):
|
||||
servers = ["redis-server-6.2.11", "redis-server-7.2.2", "valkey-server-8.0.1"]
|
||||
command = [
|
||||
"redis-server-6.2.11",
|
||||
random.choice(servers),
|
||||
f"--port {self.port}",
|
||||
"--save ''",
|
||||
"--appendonly no",
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue