feat: Huge values breakdown in cluster migration (#4144)

* feat: Huge values breakdown in cluster migration

Before this PR we used `RESTORE` commands for transferring data between
source and target nodes in cluster slots migration.

While this _works_, it has a side effect of consuming 2x memory for huge
values (i.e. if a single key's value takes 10gb, serializing it will
take 20gb or even 30gb).

With this PR we break down huge keys into multiple commands (`RPUSH`,
`HSET`, etc), respecting the existing `--serialization_max_chunk_size`
flag.

Part of #4100
This commit is contained in:
Shahar Mike 2024-11-25 15:58:18 +02:00 committed by GitHub
parent 2b3c182cc9
commit 3c65651c69
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 299 additions and 36 deletions

View file

@ -28,7 +28,7 @@ endif()
add_library(dfly_transaction db_slice.cc blocking_controller.cc add_library(dfly_transaction db_slice.cc blocking_controller.cc
command_registry.cc cluster/cluster_utility.cc command_registry.cc cluster/cluster_utility.cc
journal/tx_executor.cc namespaces.cc journal/cmd_serializer.cc journal/tx_executor.cc namespaces.cc
common.cc journal/journal.cc journal/types.cc journal/journal_slice.cc common.cc journal/journal.cc journal/types.cc journal/journal_slice.cc
server_state.cc table.cc top_keys.cc transaction.cc tx_base.cc server_state.cc table.cc top_keys.cc transaction.cc tx_base.cc
serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc

View file

@ -270,6 +270,36 @@ bool IterateSortedSet(const detail::RobjWrapper* robj_wrapper, const IterateSort
return false; return false;
} }
bool IterateMap(const PrimeValue& pv, const IterateKVFunc& func) {
bool finished = true;
if (pv.Encoding() == kEncodingListPack) {
uint8_t intbuf[LP_INTBUF_SIZE];
uint8_t* lp = (uint8_t*)pv.RObjPtr();
uint8_t* fptr = lpFirst(lp);
while (fptr) {
string_view key = LpGetView(fptr, intbuf);
fptr = lpNext(lp, fptr);
string_view val = LpGetView(fptr, intbuf);
fptr = lpNext(lp, fptr);
if (!func(ContainerEntry{key.data(), key.size()}, ContainerEntry{val.data(), val.size()})) {
finished = false;
break;
}
}
} else {
StringMap* sm = static_cast<StringMap*>(pv.RObjPtr());
for (const auto& k_v : *sm) {
if (!func(ContainerEntry{k_v.first, sdslen(k_v.first)},
ContainerEntry{k_v.second, sdslen(k_v.second)})) {
finished = false;
break;
}
}
}
return finished;
}
StringMap* GetStringMap(const PrimeValue& pv, const DbContext& db_context) { StringMap* GetStringMap(const PrimeValue& pv, const DbContext& db_context) {
DCHECK_EQ(pv.Encoding(), kEncodingStrMap2); DCHECK_EQ(pv.Encoding(), kEncodingStrMap2);
StringMap* res = static_cast<StringMap*>(pv.RObjPtr()); StringMap* res = static_cast<StringMap*>(pv.RObjPtr());

View file

@ -54,6 +54,7 @@ struct ContainerEntry {
using IterateFunc = std::function<bool(ContainerEntry)>; using IterateFunc = std::function<bool(ContainerEntry)>;
using IterateSortedFunc = std::function<bool(ContainerEntry, double)>; using IterateSortedFunc = std::function<bool(ContainerEntry, double)>;
using IterateKVFunc = std::function<bool(ContainerEntry, ContainerEntry)>;
// Iterate over all values and call func(val). Iteration stops as soon // Iterate over all values and call func(val). Iteration stops as soon
// as func return false. Returns true if it successfully processed all elements // as func return false. Returns true if it successfully processed all elements
@ -72,6 +73,8 @@ bool IterateSortedSet(const detail::RobjWrapper* robj_wrapper, const IterateSort
int32_t start = 0, int32_t end = -1, bool reverse = false, int32_t start = 0, int32_t end = -1, bool reverse = false,
bool use_score = false); bool use_score = false);
bool IterateMap(const PrimeValue& pv, const IterateKVFunc& func);
// Get StringMap pointer from primetable value. Sets expire time from db_context // Get StringMap pointer from primetable value. Sets expire time from db_context
StringMap* GetStringMap(const PrimeValue& pv, const DbContext& db_context); StringMap* GetStringMap(const PrimeValue& pv, const DbContext& db_context);

View file

@ -0,0 +1,206 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/journal/cmd_serializer.h"
#include "server/container_utils.h"
#include "server/journal/serializer.h"
#include "server/rdb_save.h"
namespace dfly {
namespace {
using namespace std;
class CommandAggregator {
public:
using WriteCmdCallback = std::function<void(absl::Span<const string_view>)>;
CommandAggregator(string_view key, WriteCmdCallback cb) : key_(key), cb_(cb) {
}
~CommandAggregator() {
CommitPending();
}
enum class CommitMode { kAuto, kNoCommit };
void AddArg(string arg, CommitMode commit_mode = CommitMode::kAuto) {
agg_bytes_ += arg.size();
members_.push_back(std::move(arg));
if (commit_mode != CommitMode::kNoCommit && agg_bytes_ >= serialization_max_chunk_size) {
CommitPending();
}
}
private:
void CommitPending() {
if (members_.empty()) {
return;
}
args_.clear();
args_.reserve(members_.size() + 1);
args_.push_back(key_);
for (string_view member : members_) {
args_.push_back(member);
}
cb_(args_);
members_.clear();
}
string_view key_;
WriteCmdCallback cb_;
vector<string> members_;
absl::InlinedVector<string_view, 5> args_;
size_t agg_bytes_ = 0;
};
} // namespace
CmdSerializer::CmdSerializer(FlushSerialized cb) : cb_(std::move(cb)) {
}
void CmdSerializer::SerializeEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms) {
// We send RESTORE commands for small objects, or objects we don't support breaking.
bool use_restore_serialization = true;
if (serialization_max_chunk_size > 0 && pv.MallocUsed() > serialization_max_chunk_size) {
switch (pv.ObjType()) {
case OBJ_SET:
SerializeSet(key, pv);
use_restore_serialization = false;
break;
case OBJ_ZSET:
SerializeZSet(key, pv);
use_restore_serialization = false;
break;
case OBJ_HASH:
SerializeHash(key, pv);
use_restore_serialization = false;
break;
case OBJ_LIST:
SerializeList(key, pv);
use_restore_serialization = false;
break;
case OBJ_STRING:
case OBJ_STREAM:
case OBJ_JSON:
case OBJ_SBF:
default:
// These types are unsupported wrt splitting huge values to multiple commands, so we send
// them as a RESTORE command.
break;
}
}
if (use_restore_serialization) {
// RESTORE sets STICK and EXPIRE as part of the command.
SerializeRestore(key, pk, pv, expire_ms);
} else {
SerializeStickIfNeeded(key, pk);
SerializeExpireIfNeeded(key, expire_ms);
}
}
void CmdSerializer::SerializeCommand(string_view cmd, absl::Span<const string_view> args) {
journal::Entry entry(0, // txid
journal::Op::COMMAND, // single command
0, // db index
1, // shard count
0, // slot-id, but it is ignored at this level
journal::Entry::Payload(cmd, ArgSlice(args)));
// Serialize into a string
io::StringSink cmd_sink;
JournalWriter writer{&cmd_sink};
writer.Write(entry);
cb_(std::move(cmd_sink).str());
}
void CmdSerializer::SerializeStickIfNeeded(string_view key, const PrimeValue& pk) {
if (!pk.IsSticky()) {
return;
}
SerializeCommand("STICK", {key});
}
void CmdSerializer::SerializeExpireIfNeeded(string_view key, uint64_t expire_ms) {
if (expire_ms == 0) {
return;
}
SerializeCommand("PEXIRE", {key, absl::StrCat(expire_ms)});
}
void CmdSerializer::SerializeSet(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("SADD", args); });
container_utils::IterateSet(pv, [&](container_utils::ContainerEntry ce) {
aggregator.AddArg(ce.ToString());
return true;
});
}
void CmdSerializer::SerializeZSet(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("ZADD", args); });
container_utils::IterateSortedSet(
pv.GetRobjWrapper(),
[&](container_utils::ContainerEntry ce, double score) {
aggregator.AddArg(absl::StrCat(score), CommandAggregator::CommitMode::kNoCommit);
aggregator.AddArg(ce.ToString());
return true;
},
/*start=*/0, /*end=*/-1, /*reverse=*/false, /*use_score=*/true);
}
void CmdSerializer::SerializeHash(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("HSET", args); });
container_utils::IterateMap(
pv, [&](container_utils::ContainerEntry k, container_utils::ContainerEntry v) {
aggregator.AddArg(k.ToString(), CommandAggregator::CommitMode::kNoCommit);
aggregator.AddArg(v.ToString());
return true;
});
}
void CmdSerializer::SerializeList(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("RPUSH", args); });
container_utils::IterateList(pv, [&](container_utils::ContainerEntry ce) {
aggregator.AddArg(ce.ToString());
return true;
});
}
void CmdSerializer::SerializeRestore(string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms) {
absl::InlinedVector<string_view, 5> args;
args.push_back(key);
string expire_str = absl::StrCat(expire_ms);
args.push_back(expire_str);
io::StringSink value_dump_sink;
SerializerBase::DumpObject(pv, &value_dump_sink);
args.push_back(value_dump_sink.str());
args.push_back("ABSTTL"); // Means expire string is since epoch
if (pk.IsSticky()) {
args.push_back("STICK");
}
SerializeCommand("RESTORE", args);
}
} // namespace dfly

View file

@ -0,0 +1,44 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <absl/types/span.h>
#include <string>
#include <string_view>
#include "server/table.h"
namespace dfly {
// CmdSerializer serializes DB entries (key+value) into command(s) in RESP format string.
// Small entries are serialized as RESTORE commands, while bigger ones (see
// serialization_max_chunk_size) are split into multiple commands (like rpush, hset, etc).
// Expiration and stickiness are also serialized into commands.
class CmdSerializer {
public:
using FlushSerialized = std::function<void(std::string)>;
explicit CmdSerializer(FlushSerialized cb);
void SerializeEntry(std::string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms);
private:
void SerializeCommand(std::string_view cmd, absl::Span<const std::string_view> args);
void SerializeStickIfNeeded(std::string_view key, const PrimeValue& pk);
void SerializeExpireIfNeeded(std::string_view key, uint64_t expire_ms);
void SerializeSet(std::string_view key, const PrimeValue& pv);
void SerializeZSet(std::string_view key, const PrimeValue& pv);
void SerializeHash(std::string_view key, const PrimeValue& pv);
void SerializeList(std::string_view key, const PrimeValue& pv);
void SerializeRestore(std::string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms);
FlushSerialized cb_;
};
} // namespace dfly

View file

@ -9,6 +9,7 @@
#include "base/flags.h" #include "base/flags.h"
#include "base/logging.h" #include "base/logging.h"
#include "server/cluster/cluster_defs.h" #include "server/cluster/cluster_defs.h"
#include "server/journal/cmd_serializer.h"
#include "util/fibers/synchronization.h" #include "util/fibers/synchronization.h"
using namespace facade; using namespace facade;
@ -317,37 +318,8 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req
void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv, void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms) { uint64_t expire_ms) {
absl::InlinedVector<string_view, 5> args; CmdSerializer serializer([&](std::string s) { Write(s); });
args.push_back(key); serializer.SerializeEntry(key, pk, pv, expire_ms);
string expire_str = absl::StrCat(expire_ms);
args.push_back(expire_str);
io::StringSink restore_cmd_sink;
{ // to destroy extra copy
io::StringSink value_dump_sink;
SerializerBase::DumpObject(pv, &value_dump_sink);
args.push_back(value_dump_sink.str());
args.push_back("ABSTTL"); // Means expire string is since epoch
if (pk.IsSticky()) {
args.push_back("STICK");
}
journal::Entry entry(0, // txid
journal::Op::COMMAND, // single command
0, // db index
1, // shard count
0, // slot-id, but it is ignored at this level
journal::Entry::Payload("RESTORE", ArgSlice(args)));
JournalWriter writer{&restore_cmd_sink};
writer.Write(entry);
}
// TODO: From DumpObject to till Write we tripple copy the PrimeValue. It's very inefficient and
// will burn CPU for large values.
Write(restore_cmd_sink.str());
} }
} // namespace dfly } // namespace dfly

View file

@ -98,7 +98,8 @@ class RestoreStreamer : public JournalStreamer {
// Returns whether anything was written // Returns whether anything was written
void WriteBucket(PrimeTable::bucket_iterator it); void WriteBucket(PrimeTable::bucket_iterator it);
void WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv, uint64_t expire_ms); void WriteEntry(std::string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms);
DbSlice* db_slice_; DbSlice* db_slice_;
DbTableArray db_array_; DbTableArray db_array_;

View file

@ -1315,10 +1315,14 @@ async def test_network_disconnect_during_migration(df_factory, df_seeder_factory
@pytest.mark.parametrize( @pytest.mark.parametrize(
"node_count, segments, keys", "node_count, segments, keys, huge_values",
[ [
pytest.param(3, 16, 20_000), pytest.param(3, 16, 20_000, 10),
pytest.param(5, 20, 30_000, marks=[pytest.mark.slow, pytest.mark.opt_only]), # 1mb effectively disables breakdown of huge values.
# TODO: add a test that mixes huge and small values, see
# https://github.com/dragonflydb/dragonfly/pull/4144/files/11e5e387d31bcf1bc53dfbb28cf3bcaf094d77fa#r1850130930
pytest.param(3, 16, 20_000, 1_000_000),
pytest.param(5, 20, 30_000, 1_000_000, marks=[pytest.mark.slow, pytest.mark.opt_only]),
], ],
) )
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
@ -1328,12 +1332,15 @@ async def test_cluster_fuzzymigration(
node_count: int, node_count: int,
segments: int, segments: int,
keys: int, keys: int,
huge_values: int,
): ):
instances = [ instances = [
df_factory.create( df_factory.create(
port=BASE_PORT + i, port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000, admin_port=BASE_PORT + i + 1000,
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9", vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9",
serialization_max_chunk_size=huge_values,
replication_stream_output_limit=10,
) )
for i in range(node_count) for i in range(node_count)
] ]