fix: remove bad check-fail in the transaction code (#1420)

fix: remove bad check-fail in the transaction code.

Fixes #1421.

The failure reproduces for dragongly running with a single thread where all the
arguments grouped within the same ShardData

Also, we improve verbosity levels inside reply_builder.cc.
For that we extend SinkReplyBuilder to support protocol errors reporting
and we remove ad-hoc code for this from dragonfly_connection.
Required to track errors easily with `--vmodule=reply_builder=1`

Finally, a pytest is added to cover the issue.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-06-18 07:03:08 +03:00 committed by GitHub
parent 956b39c553
commit 69e6ad799a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 52 additions and 34 deletions

View file

@ -43,18 +43,13 @@ using nonstd::make_unexpected;
namespace facade {
namespace {
void SendProtocolError(RedisParser::Result pres, FiberSocketBase* peer) {
void SendProtocolError(RedisParser::Result pres, SinkReplyBuilder* builder) {
string res("-ERR Protocol error: ");
if (pres == RedisParser::BAD_BULKLEN) {
res.append("invalid bulk length\r\n");
builder->SendProtocolError("invalid bulk length");
} else {
CHECK_EQ(RedisParser::BAD_ARRAYLEN, pres);
res.append("invalid multibulk length\r\n");
}
error_code ec = peer->Write(::io::Buffer(res));
if (ec) {
LOG(WARNING) << "Error " << ec;
builder->SendProtocolError("invalid multibulk length");
}
}
@ -503,20 +498,17 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
// offending request.
if (parse_status == ERROR) {
VLOG(1) << "Error parser status " << parser_error_;
++stats_->parser_err_cnt;
if (redis_parser_) {
SendProtocolError(RedisParser::Result(parser_error_), peer);
SendProtocolError(RedisParser::Result(parser_error_), orig_builder);
} else {
string_view sv{"CLIENT_ERROR bad command line format\r\n"};
error_code ec2 = peer->Write(::io::Buffer(sv));
if (ec2) {
LOG(WARNING) << "Error " << ec2;
ec = ec2;
}
DCHECK(memcache_parser_);
orig_builder->SendProtocolError("bad command line format");
}
error_code ec2 = peer->Shutdown(SHUT_RDWR);
LOG_IF(WARNING, ec2) << "Could not shutdown socket " << ec2;
FetchBuilderStats(stats_, orig_builder);
}
if (ec && !FiberSocketBase::IsConnClosed(ec)) {

View file

@ -21,7 +21,7 @@ constexpr size_t kSizeConnStats = sizeof(ConnectionStats);
ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
// To break this code deliberately if we add/remove a field to this struct.
static_assert(kSizeConnStats == 184);
static_assert(kSizeConnStats == 176);
ADD(read_buf_capacity);
ADD(pipeline_cache_capacity);
@ -31,7 +31,6 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
ADD(io_write_bytes);
ADD(command_cnt);
ADD(pipelined_cmd_cnt);
ADD(parser_err_cnt);
ADD(async_writes_cnt);
ADD(conn_received_cnt);
ADD(num_conns);

View file

@ -44,7 +44,6 @@ struct ConnectionStats {
size_t io_write_bytes = 0;
uint64_t command_cnt = 0;
uint64_t pipelined_cmd_cnt = 0;
uint64_t parser_err_cnt = 0;
// Writes count that happened via DispatchOperations call.
uint64_t async_writes_cnt = 0;

View file

@ -13,6 +13,7 @@ using namespace std;
namespace {
// When changing this constant, also update `test_large_cmd` test in connection_test.py.
constexpr int kMaxArrayLen = 65536;
constexpr int64_t kMaxBulkLen = 64 * (1ul << 20); // 64MB.

View file

@ -54,7 +54,7 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
}
// Allow batching with up to 8K of data.
if ((should_batch_ || should_aggregate_) && batch_.size() + bsize < kMaxBatchSize) {
if ((should_batch_ || should_aggregate_) && (batch_.size() + bsize < kMaxBatchSize)) {
for (unsigned i = 0; i < len; ++i) {
std::string_view src((char*)v[i].iov_base, v[i].iov_len);
DVLOG(2) << "Appending to stream " << src;
@ -71,7 +71,7 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
if (batch_.empty()) {
ec = sink_->Write(v, len);
} else {
DVLOG(1) << "Sending batch to stream " << sink_ << "\n" << batch_;
DVLOG(2) << "Sending batch to stream " << sink_ << "\n" << batch_;
io_write_bytes_ += batch_.size();
@ -162,6 +162,10 @@ void MCReplyBuilder::SendError(string_view str, std::string_view type) {
SendSimpleString("ERROR");
}
void MCReplyBuilder::SendProtocolError(std::string_view str) {
SendSimpleString(absl::StrCat("CLIENT_ERROR ", str));
}
void MCReplyBuilder::SendClientError(string_view str) {
iovec v[] = {IoVec("CLIENT_ERROR "), IoVec(str), IoVec(kCRLF)};
Send(v, ABSL_ARRAYSIZE(v));
@ -197,6 +201,8 @@ void RedisReplyBuilder::SetResp3(bool is_resp3) {
}
void RedisReplyBuilder::SendError(string_view str, string_view err_type) {
VLOG(1) << "Error: " << str;
if (err_type.empty()) {
err_type = str;
if (err_type == kSyntaxErr)
@ -214,6 +220,10 @@ void RedisReplyBuilder::SendError(string_view str, string_view err_type) {
}
}
void RedisReplyBuilder::SendProtocolError(std::string_view str) {
SendError(absl::StrCat("-ERR Protocol error: ", str), "protocol_error");
}
void RedisReplyBuilder::SendSimpleString(std::string_view str) {
iovec v[3] = {IoVec(kSimplePref), IoVec(str), IoVec(kCRLF)};
@ -483,7 +493,7 @@ void RedisReplyBuilder::SendStringArrInternal(WrappedStrSpan arr, CollectionType
}
void ReqSerializer::SendCommand(std::string_view str) {
VLOG(1) << "SendCommand: " << str;
VLOG(2) << "SendCommand: " << str;
iovec v[] = {IoVec(str), IoVec(kCRLF)};
ec_ = sink_->Write(v, ABSL_ARRAYSIZE(v));

View file

@ -54,6 +54,8 @@ class SinkReplyBuilder {
SendSimpleString("OK");
}
virtual void SendProtocolError(std::string_view str) = 0;
// In order to reduce interrupt rate we allow coalescing responses together using
// Batch mode. It is controlled by Connection state machine because it makes sense only
// when pipelined requests are arriving.
@ -154,6 +156,7 @@ class MCReplyBuilder : public SinkReplyBuilder {
void SendClientError(std::string_view str);
void SendNotFound();
void SendSimpleString(std::string_view str) final;
void SendProtocolError(std::string_view str) final;
void SetNoreply(bool noreply) {
noreply_ = noreply;
@ -176,6 +179,7 @@ class RedisReplyBuilder : public SinkReplyBuilder {
void SendStored() override;
void SendSetSkipped() override;
virtual void SendError(OpStatus status);
void SendProtocolError(std::string_view str) override;
virtual void SendNullArray(); // Send *-1
virtual void SendEmptyArray(); // Send *0

View file

@ -1594,7 +1594,6 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("total_reads_processed", m.conn_stats.io_read_cnt);
append("total_writes_processed", m.conn_stats.io_write_cnt);
append("async_writes_count", m.conn_stats.async_writes_cnt);
append("parser_err_count", m.conn_stats.parser_err_cnt);
append("defrag_attempt_total", m.shard_stats.defrag_attempt_total);
append("defrag_realloc_total", m.shard_stats.defrag_realloc_total);
append("defrag_task_invocation_total", m.shard_stats.defrag_task_invocation_total);

View file

@ -133,8 +133,6 @@ void Transaction::InitShardData(absl::Span<const PerShardCache> shard_index, siz
auto& sd = shard_data_[i];
auto& si = shard_index[i];
CHECK_LT(si.args.size(), 1u << 15);
sd.arg_count = si.args.size();
sd.arg_start = args_.size();
@ -157,7 +155,7 @@ void Transaction::InitShardData(absl::Span<const PerShardCache> shard_index, siz
}
}
CHECK(args_.size() == num_args);
CHECK_EQ(args_.size(), num_args);
}
void Transaction::InitMultiData(KeyIndex key_index) {

View file

@ -329,16 +329,16 @@ class Transaction {
char pad[46]; // to make sure PerShardData is 64 bytes and takes full cacheline.
uint32_t arg_start = 0; // Indices into args_ array.
uint16_t arg_count = 0;
// Accessed within shard thread.
// Bitmask of LocalState enums.
uint16_t local_mask = 0;
uint32_t arg_count = 0;
// Needed to rollback inconsistent schedulings or remove OOO transactions from
// tx queue.
uint32_t pq_pos = TxQueue::kEnd;
// Accessed within shard thread.
// Bitmask of LocalState enums.
uint16_t local_mask = 0;
// Index of key relative to args in shard that the shard was woken up after blocking wait.
uint16_t wake_key_pos = UINT16_MAX;
};

View file

@ -80,7 +80,7 @@ def df_factory(request, tmp_dir, test_env) -> DflyInstanceFactory:
args=request.config.getoption("--df"),
existing_port=int(existing) if existing else None,
existing_admin_port=int(existing_admin) if existing_admin else None,
existing_mc_port=int(existing_mc) if existing else None,
existing_mc_port=int(existing_mc) if existing_mc else None,
env=test_env
)

View file

@ -4,7 +4,8 @@ import asyncio
from redis import asyncio as aioredis
import async_timeout
from . import DflyInstance
from . import DflyInstance, dfly_args
async def run_monitor_eval(monitor, expected):
async with monitor as mon:
@ -346,7 +347,8 @@ async def test_subscribe_in_pipeline(async_client: aioredis.Redis):
pipe.echo("three")
res = await pipe.execute()
assert res == ['one', ['subscribe', 'ch1', 1], 'two', ['subscribe', 'ch2', 2], 'three']
assert res == ['one', ['subscribe', 'ch1', 1],
'two', ['subscribe', 'ch2', 2], 'three']
"""
This test makes sure that Dragonfly can receive blocks of pipelined commands even
@ -376,6 +378,7 @@ PACKET3 = """
PING
""" * 500 + "ECHO DONE\n"
async def test_parser_while_script_running(async_client: aioredis.Redis, df_server: DflyInstance):
sha = await async_client.script_load(BUSY_SCRIPT)
@ -399,3 +402,16 @@ async def test_parser_while_script_running(async_client: aioredis.Redis, df_serv
await reader.readuntil(b"DONE")
writer.close()
await writer.wait_closed()
@dfly_args({"proactor_threads": 1})
async def test_large_cmd(async_client: aioredis.Redis):
MAX_ARR_SIZE = 65535
res = await async_client.hset('foo', mapping={f"key{i}": f"val{i}" for i in range(MAX_ARR_SIZE // 2)})
assert res == MAX_ARR_SIZE // 2
res = await async_client.mset({f"key{i}": f"val{i}" for i in range(MAX_ARR_SIZE // 2)})
assert res
res = await async_client.mget([f"key{i}" for i in range(MAX_ARR_SIZE)])
assert len(res) == MAX_ARR_SIZE