diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 2909cf6bd..c5286c61b 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -43,18 +43,13 @@ using nonstd::make_unexpected; namespace facade { namespace { -void SendProtocolError(RedisParser::Result pres, FiberSocketBase* peer) { +void SendProtocolError(RedisParser::Result pres, SinkReplyBuilder* builder) { string res("-ERR Protocol error: "); if (pres == RedisParser::BAD_BULKLEN) { - res.append("invalid bulk length\r\n"); + builder->SendProtocolError("invalid bulk length"); } else { CHECK_EQ(RedisParser::BAD_ARRAYLEN, pres); - res.append("invalid multibulk length\r\n"); - } - - error_code ec = peer->Write(::io::Buffer(res)); - if (ec) { - LOG(WARNING) << "Error " << ec; + builder->SendProtocolError("invalid multibulk length"); } } @@ -503,20 +498,17 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { // offending request. if (parse_status == ERROR) { VLOG(1) << "Error parser status " << parser_error_; - ++stats_->parser_err_cnt; if (redis_parser_) { - SendProtocolError(RedisParser::Result(parser_error_), peer); + SendProtocolError(RedisParser::Result(parser_error_), orig_builder); } else { - string_view sv{"CLIENT_ERROR bad command line format\r\n"}; - error_code ec2 = peer->Write(::io::Buffer(sv)); - if (ec2) { - LOG(WARNING) << "Error " << ec2; - ec = ec2; - } + DCHECK(memcache_parser_); + orig_builder->SendProtocolError("bad command line format"); } error_code ec2 = peer->Shutdown(SHUT_RDWR); LOG_IF(WARNING, ec2) << "Could not shutdown socket " << ec2; + + FetchBuilderStats(stats_, orig_builder); } if (ec && !FiberSocketBase::IsConnClosed(ec)) { diff --git a/src/facade/facade.cc b/src/facade/facade.cc index 69873f4ac..8bc26ca99 100644 --- a/src/facade/facade.cc +++ b/src/facade/facade.cc @@ -21,7 +21,7 @@ constexpr size_t kSizeConnStats = sizeof(ConnectionStats); ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { // To break this code deliberately if we add/remove a field to this struct. - static_assert(kSizeConnStats == 184); + static_assert(kSizeConnStats == 176); ADD(read_buf_capacity); ADD(pipeline_cache_capacity); @@ -31,7 +31,6 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { ADD(io_write_bytes); ADD(command_cnt); ADD(pipelined_cmd_cnt); - ADD(parser_err_cnt); ADD(async_writes_cnt); ADD(conn_received_cnt); ADD(num_conns); diff --git a/src/facade/facade_types.h b/src/facade/facade_types.h index 42a531a10..b25420876 100644 --- a/src/facade/facade_types.h +++ b/src/facade/facade_types.h @@ -44,7 +44,6 @@ struct ConnectionStats { size_t io_write_bytes = 0; uint64_t command_cnt = 0; uint64_t pipelined_cmd_cnt = 0; - uint64_t parser_err_cnt = 0; // Writes count that happened via DispatchOperations call. uint64_t async_writes_cnt = 0; diff --git a/src/facade/redis_parser.cc b/src/facade/redis_parser.cc index c3754c234..5949094fb 100644 --- a/src/facade/redis_parser.cc +++ b/src/facade/redis_parser.cc @@ -13,6 +13,7 @@ using namespace std; namespace { +// When changing this constant, also update `test_large_cmd` test in connection_test.py. constexpr int kMaxArrayLen = 65536; constexpr int64_t kMaxBulkLen = 64 * (1ul << 20); // 64MB. diff --git a/src/facade/reply_builder.cc b/src/facade/reply_builder.cc index 517199f40..5bfb3be49 100644 --- a/src/facade/reply_builder.cc +++ b/src/facade/reply_builder.cc @@ -54,7 +54,7 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) { } // Allow batching with up to 8K of data. - if ((should_batch_ || should_aggregate_) && batch_.size() + bsize < kMaxBatchSize) { + if ((should_batch_ || should_aggregate_) && (batch_.size() + bsize < kMaxBatchSize)) { for (unsigned i = 0; i < len; ++i) { std::string_view src((char*)v[i].iov_base, v[i].iov_len); DVLOG(2) << "Appending to stream " << src; @@ -71,7 +71,7 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) { if (batch_.empty()) { ec = sink_->Write(v, len); } else { - DVLOG(1) << "Sending batch to stream " << sink_ << "\n" << batch_; + DVLOG(2) << "Sending batch to stream " << sink_ << "\n" << batch_; io_write_bytes_ += batch_.size(); @@ -162,6 +162,10 @@ void MCReplyBuilder::SendError(string_view str, std::string_view type) { SendSimpleString("ERROR"); } +void MCReplyBuilder::SendProtocolError(std::string_view str) { + SendSimpleString(absl::StrCat("CLIENT_ERROR ", str)); +} + void MCReplyBuilder::SendClientError(string_view str) { iovec v[] = {IoVec("CLIENT_ERROR "), IoVec(str), IoVec(kCRLF)}; Send(v, ABSL_ARRAYSIZE(v)); @@ -197,6 +201,8 @@ void RedisReplyBuilder::SetResp3(bool is_resp3) { } void RedisReplyBuilder::SendError(string_view str, string_view err_type) { + VLOG(1) << "Error: " << str; + if (err_type.empty()) { err_type = str; if (err_type == kSyntaxErr) @@ -214,6 +220,10 @@ void RedisReplyBuilder::SendError(string_view str, string_view err_type) { } } +void RedisReplyBuilder::SendProtocolError(std::string_view str) { + SendError(absl::StrCat("-ERR Protocol error: ", str), "protocol_error"); +} + void RedisReplyBuilder::SendSimpleString(std::string_view str) { iovec v[3] = {IoVec(kSimplePref), IoVec(str), IoVec(kCRLF)}; @@ -483,7 +493,7 @@ void RedisReplyBuilder::SendStringArrInternal(WrappedStrSpan arr, CollectionType } void ReqSerializer::SendCommand(std::string_view str) { - VLOG(1) << "SendCommand: " << str; + VLOG(2) << "SendCommand: " << str; iovec v[] = {IoVec(str), IoVec(kCRLF)}; ec_ = sink_->Write(v, ABSL_ARRAYSIZE(v)); diff --git a/src/facade/reply_builder.h b/src/facade/reply_builder.h index 8652c7d48..4dd51bd57 100644 --- a/src/facade/reply_builder.h +++ b/src/facade/reply_builder.h @@ -54,6 +54,8 @@ class SinkReplyBuilder { SendSimpleString("OK"); } + virtual void SendProtocolError(std::string_view str) = 0; + // In order to reduce interrupt rate we allow coalescing responses together using // Batch mode. It is controlled by Connection state machine because it makes sense only // when pipelined requests are arriving. @@ -154,6 +156,7 @@ class MCReplyBuilder : public SinkReplyBuilder { void SendClientError(std::string_view str); void SendNotFound(); void SendSimpleString(std::string_view str) final; + void SendProtocolError(std::string_view str) final; void SetNoreply(bool noreply) { noreply_ = noreply; @@ -176,6 +179,7 @@ class RedisReplyBuilder : public SinkReplyBuilder { void SendStored() override; void SendSetSkipped() override; virtual void SendError(OpStatus status); + void SendProtocolError(std::string_view str) override; virtual void SendNullArray(); // Send *-1 virtual void SendEmptyArray(); // Send *0 diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 924ba7766..924c9e2b7 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1594,7 +1594,6 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("total_reads_processed", m.conn_stats.io_read_cnt); append("total_writes_processed", m.conn_stats.io_write_cnt); append("async_writes_count", m.conn_stats.async_writes_cnt); - append("parser_err_count", m.conn_stats.parser_err_cnt); append("defrag_attempt_total", m.shard_stats.defrag_attempt_total); append("defrag_realloc_total", m.shard_stats.defrag_realloc_total); append("defrag_task_invocation_total", m.shard_stats.defrag_task_invocation_total); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index b51867d92..54396be79 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -133,8 +133,6 @@ void Transaction::InitShardData(absl::Span shard_index, siz auto& sd = shard_data_[i]; auto& si = shard_index[i]; - CHECK_LT(si.args.size(), 1u << 15); - sd.arg_count = si.args.size(); sd.arg_start = args_.size(); @@ -157,7 +155,7 @@ void Transaction::InitShardData(absl::Span shard_index, siz } } - CHECK(args_.size() == num_args); + CHECK_EQ(args_.size(), num_args); } void Transaction::InitMultiData(KeyIndex key_index) { diff --git a/src/server/transaction.h b/src/server/transaction.h index 94ce97e52..4e3bc7d57 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -329,16 +329,16 @@ class Transaction { char pad[46]; // to make sure PerShardData is 64 bytes and takes full cacheline. uint32_t arg_start = 0; // Indices into args_ array. - uint16_t arg_count = 0; - - // Accessed within shard thread. - // Bitmask of LocalState enums. - uint16_t local_mask = 0; + uint32_t arg_count = 0; // Needed to rollback inconsistent schedulings or remove OOO transactions from // tx queue. uint32_t pq_pos = TxQueue::kEnd; + // Accessed within shard thread. + // Bitmask of LocalState enums. + uint16_t local_mask = 0; + // Index of key relative to args in shard that the shard was woken up after blocking wait. uint16_t wake_key_pos = UINT16_MAX; }; diff --git a/tests/dragonfly/conftest.py b/tests/dragonfly/conftest.py index b7c6d7845..5de3fa3a6 100644 --- a/tests/dragonfly/conftest.py +++ b/tests/dragonfly/conftest.py @@ -80,7 +80,7 @@ def df_factory(request, tmp_dir, test_env) -> DflyInstanceFactory: args=request.config.getoption("--df"), existing_port=int(existing) if existing else None, existing_admin_port=int(existing_admin) if existing_admin else None, - existing_mc_port=int(existing_mc) if existing else None, + existing_mc_port=int(existing_mc) if existing_mc else None, env=test_env ) diff --git a/tests/dragonfly/connection_test.py b/tests/dragonfly/connection_test.py index 059ee4d17..8b32e3e71 100644 --- a/tests/dragonfly/connection_test.py +++ b/tests/dragonfly/connection_test.py @@ -4,7 +4,8 @@ import asyncio from redis import asyncio as aioredis import async_timeout -from . import DflyInstance +from . import DflyInstance, dfly_args + async def run_monitor_eval(monitor, expected): async with monitor as mon: @@ -346,7 +347,8 @@ async def test_subscribe_in_pipeline(async_client: aioredis.Redis): pipe.echo("three") res = await pipe.execute() - assert res == ['one', ['subscribe', 'ch1', 1], 'two', ['subscribe', 'ch2', 2], 'three'] + assert res == ['one', ['subscribe', 'ch1', 1], + 'two', ['subscribe', 'ch2', 2], 'three'] """ This test makes sure that Dragonfly can receive blocks of pipelined commands even @@ -376,6 +378,7 @@ PACKET3 = """ PING """ * 500 + "ECHO DONE\n" + async def test_parser_while_script_running(async_client: aioredis.Redis, df_server: DflyInstance): sha = await async_client.script_load(BUSY_SCRIPT) @@ -399,3 +402,16 @@ async def test_parser_while_script_running(async_client: aioredis.Redis, df_serv await reader.readuntil(b"DONE") writer.close() await writer.wait_closed() + + +@dfly_args({"proactor_threads": 1}) +async def test_large_cmd(async_client: aioredis.Redis): + MAX_ARR_SIZE = 65535 + res = await async_client.hset('foo', mapping={f"key{i}": f"val{i}" for i in range(MAX_ARR_SIZE // 2)}) + assert res == MAX_ARR_SIZE // 2 + + res = await async_client.mset({f"key{i}": f"val{i}" for i in range(MAX_ARR_SIZE // 2)}) + assert res + + res = await async_client.mget([f"key{i}" for i in range(MAX_ARR_SIZE)]) + assert len(res) == MAX_ARR_SIZE