mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
Merge branch 'main' into bobik/better_logging_around_replica
This commit is contained in:
commit
61fe1ff944
6 changed files with 96 additions and 18 deletions
|
@ -25,6 +25,7 @@ extern "C" {
|
||||||
#include "base/pod_array.h"
|
#include "base/pod_array.h"
|
||||||
#include "core/bloom.h"
|
#include "core/bloom.h"
|
||||||
#include "core/detail/bitpacking.h"
|
#include "core/detail/bitpacking.h"
|
||||||
|
#include "core/huff_coder.h"
|
||||||
#include "core/qlist.h"
|
#include "core/qlist.h"
|
||||||
#include "core/sorted_map.h"
|
#include "core/sorted_map.h"
|
||||||
#include "core/string_map.h"
|
#include "core/string_map.h"
|
||||||
|
@ -34,7 +35,6 @@ ABSL_FLAG(bool, experimental_flat_json, false, "If true uses flat json implement
|
||||||
|
|
||||||
namespace dfly {
|
namespace dfly {
|
||||||
using namespace std;
|
using namespace std;
|
||||||
using absl::GetFlag;
|
|
||||||
using detail::binpacked_len;
|
using detail::binpacked_len;
|
||||||
using MemoryResource = detail::RobjWrapper::MemoryResource;
|
using MemoryResource = detail::RobjWrapper::MemoryResource;
|
||||||
|
|
||||||
|
@ -380,6 +380,8 @@ struct TL {
|
||||||
size_t small_str_bytes;
|
size_t small_str_bytes;
|
||||||
base::PODArray<uint8_t> tmp_buf;
|
base::PODArray<uint8_t> tmp_buf;
|
||||||
string tmp_str;
|
string tmp_str;
|
||||||
|
HuffmanEncoder huff_encoder;
|
||||||
|
HuffmanDecoder huff_decoder;
|
||||||
};
|
};
|
||||||
|
|
||||||
thread_local TL tl;
|
thread_local TL tl;
|
||||||
|
@ -750,6 +752,20 @@ void CompactObj::InitThreadLocal(MemoryResource* mr) {
|
||||||
tl.tmp_buf = base::PODArray<uint8_t>{mr};
|
tl.tmp_buf = base::PODArray<uint8_t>{mr};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool CompactObj::InitHuffmanThreadLocal(std::string_view hufftable) {
|
||||||
|
string err_msg;
|
||||||
|
if (!tl.huff_encoder.Load(hufftable, &err_msg)) {
|
||||||
|
LOG(DFATAL) << "Failed to load huffman table: " << err_msg;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tl.huff_decoder.Load(hufftable, &err_msg)) {
|
||||||
|
LOG(DFATAL) << "Failed to load huffman table: " << err_msg;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
CompactObj::~CompactObj() {
|
CompactObj::~CompactObj() {
|
||||||
if (HasAllocated()) {
|
if (HasAllocated()) {
|
||||||
Free();
|
Free();
|
||||||
|
@ -1402,7 +1418,47 @@ void CompactObj::EncodeString(string_view str) {
|
||||||
DCHECK_EQ(NONE_ENC, mask_bits_.encoding);
|
DCHECK_EQ(NONE_ENC, mask_bits_.encoding);
|
||||||
|
|
||||||
string_view encoded = str;
|
string_view encoded = str;
|
||||||
bool is_ascii = kUseAsciiEncoding && detail::validate_ascii_fast(str.data(), str.size());
|
bool huff_encoded = false;
|
||||||
|
|
||||||
|
// We chose such length that we can store the decoded length delta into 1 byte.
|
||||||
|
// The maximum huffman compression is 1/8, so 288 / 8 = 36.
|
||||||
|
// 288 - 36 = 252, which is smaller than 256.
|
||||||
|
constexpr unsigned kMaxHuffLen = 288;
|
||||||
|
|
||||||
|
// TODO: for sizes 17, 18 we would like to test ascii encoding first as it's more efficient.
|
||||||
|
// And if it succeeds we can squash into the inline buffer. Starting from 19,
|
||||||
|
// we can prioritize huffman encoding.
|
||||||
|
if (str.size() <= kMaxHuffLen && tl.huff_encoder.valid()) {
|
||||||
|
unsigned dest_len = tl.huff_encoder.CompressedBound(str.size());
|
||||||
|
// 1 byte for storing the size delta.
|
||||||
|
tl.tmp_buf.resize(1 + dest_len);
|
||||||
|
string err_msg;
|
||||||
|
bool res = tl.huff_encoder.Encode(str, tl.tmp_buf.data() + 1, &dest_len, &err_msg);
|
||||||
|
if (res) {
|
||||||
|
// we accept huffman encoding only if it is:
|
||||||
|
// 1. smaller than the original string by 20%
|
||||||
|
// 2. allows us to store the encoded string in the inline buffer
|
||||||
|
if (dest_len && (dest_len < kInlineLen || (dest_len + dest_len / 5) < str.size())) {
|
||||||
|
huff_encoded = true;
|
||||||
|
encoded = string_view{reinterpret_cast<char*>(tl.tmp_buf.data()), dest_len + 1};
|
||||||
|
unsigned delta = str.size() - dest_len;
|
||||||
|
DCHECK_LT(delta, 256u);
|
||||||
|
tl.tmp_buf[0] = static_cast<uint8_t>(delta);
|
||||||
|
mask_bits_.encoding = HUFFMAN_ENC;
|
||||||
|
if (encoded.size() <= kInlineLen) {
|
||||||
|
SetMeta(encoded.size(), mask_);
|
||||||
|
memcpy(u_.inline_str, tl.tmp_buf.data(), encoded.size());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Should not happen, means we have an internal buf.
|
||||||
|
LOG(DFATAL) << "Failed to encode string with huffman: " << err_msg;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool is_ascii =
|
||||||
|
kUseAsciiEncoding && !huff_encoded && detail::validate_ascii_fast(str.data(), str.size());
|
||||||
|
|
||||||
if (is_ascii) {
|
if (is_ascii) {
|
||||||
size_t encode_len = binpacked_len(str.size());
|
size_t encode_len = binpacked_len(str.size());
|
||||||
|
|
|
@ -116,7 +116,7 @@ class CompactObj {
|
||||||
CompactObj(const CompactObj&) = delete;
|
CompactObj(const CompactObj&) = delete;
|
||||||
|
|
||||||
// 0-16 is reserved for inline lengths of string type.
|
// 0-16 is reserved for inline lengths of string type.
|
||||||
enum TagEnum {
|
enum TagEnum : uint8_t {
|
||||||
INT_TAG = 17,
|
INT_TAG = 17,
|
||||||
SMALL_TAG = 18,
|
SMALL_TAG = 18,
|
||||||
ROBJ_TAG = 19,
|
ROBJ_TAG = 19,
|
||||||
|
@ -125,6 +125,11 @@ class CompactObj {
|
||||||
SBF_TAG = 22,
|
SBF_TAG = 22,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// String encoding types.
|
||||||
|
// With ascii compression it compresses 8 bytes to 7 but also 7 to 7.
|
||||||
|
// Therefore, in order to know the original length we introduce 2 states that
|
||||||
|
// correct the length upon decoding. ASCII1_ENC rounds down the decoded length,
|
||||||
|
// while ASCII2_ENC rounds it up. See DecodedLen implementation for more info.
|
||||||
enum Encoding : uint8_t {
|
enum Encoding : uint8_t {
|
||||||
NONE_ENC = 0,
|
NONE_ENC = 0,
|
||||||
ASCII1_ENC = 1,
|
ASCII1_ENC = 1,
|
||||||
|
@ -373,6 +378,7 @@ class CompactObj {
|
||||||
static Stats GetStats();
|
static Stats GetStats();
|
||||||
|
|
||||||
static void InitThreadLocal(MemoryResource* mr);
|
static void InitThreadLocal(MemoryResource* mr);
|
||||||
|
static bool InitHuffmanThreadLocal(std::string_view hufftable);
|
||||||
static MemoryResource* memory_resource(); // thread-local.
|
static MemoryResource* memory_resource(); // thread-local.
|
||||||
|
|
||||||
template <typename T, typename... Args> static T* AllocateMR(Args&&... args) {
|
template <typename T, typename... Args> static T* AllocateMR(Args&&... args) {
|
||||||
|
@ -490,10 +496,7 @@ class CompactObj {
|
||||||
uint8_t expire : 1;
|
uint8_t expire : 1;
|
||||||
uint8_t mc_flag : 1; // Marks keys that have memcache flags assigned.
|
uint8_t mc_flag : 1; // Marks keys that have memcache flags assigned.
|
||||||
|
|
||||||
// ascii encoding is not an injective function. it compresses 8 bytes to 7 but also 7 to 7.
|
// See the Encoding enum for the meaning of these bits.
|
||||||
// therefore, in order to know the original length we introduce 2 flags that
|
|
||||||
// correct the length upon decoding. ASCII1_ENC_BIT rounds down the decoded length,
|
|
||||||
// while ASCII2_ENC_BIT rounds it up. See DecodedLen implementation for more info.
|
|
||||||
uint8_t encoding : 2;
|
uint8_t encoding : 2;
|
||||||
|
|
||||||
// IO_PENDING is set when the tiered storage has issued an i/o request to save the value.
|
// IO_PENDING is set when the tiered storage has issued an i/o request to save the value.
|
||||||
|
|
|
@ -214,10 +214,10 @@ TEST_F(HuffCoderTest, Load) {
|
||||||
TEST_F(HuffCoderTest, Encode) {
|
TEST_F(HuffCoderTest, Encode) {
|
||||||
ASSERT_TRUE(encoder_.Load(good_table_, &error_msg_)) << error_msg_;
|
ASSERT_TRUE(encoder_.Load(good_table_, &error_msg_)) << error_msg_;
|
||||||
|
|
||||||
EXPECT_EQ(1, encoder_.BitCount('x'));
|
EXPECT_EQ(1, encoder_.GetNBits('x'));
|
||||||
EXPECT_EQ(3, encoder_.BitCount(':'));
|
EXPECT_EQ(3, encoder_.GetNBits(':'));
|
||||||
EXPECT_EQ(5, encoder_.BitCount('2'));
|
EXPECT_EQ(5, encoder_.GetNBits('2'));
|
||||||
EXPECT_EQ(5, encoder_.BitCount('3'));
|
EXPECT_EQ(5, encoder_.GetNBits('3'));
|
||||||
|
|
||||||
string data("x:23xx");
|
string data("x:23xx");
|
||||||
|
|
||||||
|
@ -225,6 +225,15 @@ TEST_F(HuffCoderTest, Encode) {
|
||||||
uint32_t dest_size = dest.size();
|
uint32_t dest_size = dest.size();
|
||||||
ASSERT_TRUE(encoder_.Encode(data, dest.data(), &dest_size, &error_msg_));
|
ASSERT_TRUE(encoder_.Encode(data, dest.data(), &dest_size, &error_msg_));
|
||||||
ASSERT_EQ(3, dest_size);
|
ASSERT_EQ(3, dest_size);
|
||||||
|
|
||||||
|
// testing small destination buffer.
|
||||||
|
data = "3333333333333333333";
|
||||||
|
dest_size = 16;
|
||||||
|
EXPECT_TRUE(encoder_.Encode(data, dest.data(), &dest_size, &error_msg_));
|
||||||
|
|
||||||
|
// destination too small
|
||||||
|
ASSERT_EQ(0, dest_size);
|
||||||
|
ASSERT_EQ("", error_msg_);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(HuffCoderTest, Decode) {
|
TEST_F(HuffCoderTest, Decode) {
|
||||||
|
|
|
@ -36,6 +36,9 @@ bool HuffmanEncoder::Load(std::string_view binary_data, std::string* error_msg)
|
||||||
huf_ctable_.reset();
|
huf_ctable_.reset();
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
HUF_CTableHeader header = HUF_readCTableHeader(huf_ctable_.get());
|
||||||
|
num_bits_ = header.tableLog;
|
||||||
|
table_max_symbol_ = header.maxSymbolValue;
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -83,11 +86,6 @@ unsigned HuffmanEncoder::GetNBits(uint8_t symbol) const {
|
||||||
return HUF_getNbBitsFromCTable(huf_ctable_.get(), symbol);
|
return HUF_getNbBitsFromCTable(huf_ctable_.get(), symbol);
|
||||||
}
|
}
|
||||||
|
|
||||||
unsigned HuffmanEncoder::BitCount(uint8_t symbol) const {
|
|
||||||
DCHECK(huf_ctable_);
|
|
||||||
return HUF_getNbBitsFromCTable(huf_ctable_.get(), symbol);
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t HuffmanEncoder::EstimateCompressedSize(const unsigned hist[], unsigned max_symbol) const {
|
size_t HuffmanEncoder::EstimateCompressedSize(const unsigned hist[], unsigned max_symbol) const {
|
||||||
DCHECK(huf_ctable_);
|
DCHECK(huf_ctable_);
|
||||||
size_t res = HUF_estimateCompressedSize(huf_ctable_.get(), hist, max_symbol);
|
size_t res = HUF_estimateCompressedSize(huf_ctable_.get(), hist, max_symbol);
|
||||||
|
@ -112,6 +110,11 @@ string HuffmanEncoder::Export() const {
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Copied from HUF_tightCompressBound.
|
||||||
|
size_t HuffmanEncoder::CompressedBound(size_t src_size) const {
|
||||||
|
return ((src_size * num_bits_) >> 3) + 8;
|
||||||
|
}
|
||||||
|
|
||||||
bool HuffmanDecoder::Load(std::string_view binary_data, std::string* error_msg) {
|
bool HuffmanDecoder::Load(std::string_view binary_data, std::string* error_msg) {
|
||||||
DCHECK(!huf_dtable_);
|
DCHECK(!huf_dtable_);
|
||||||
huf_dtable_.reset(new HUF_DTable[HUF_DTABLE_SIZE(HUF_TABLELOG_MAX)]);
|
huf_dtable_.reset(new HUF_DTable[HUF_DTABLE_SIZE(HUF_TABLELOG_MAX)]);
|
||||||
|
|
|
@ -16,7 +16,6 @@ class HuffmanEncoder {
|
||||||
|
|
||||||
bool Encode(std::string_view data, uint8_t* dest, uint32_t* dest_size,
|
bool Encode(std::string_view data, uint8_t* dest, uint32_t* dest_size,
|
||||||
std::string* error_msg) const;
|
std::string* error_msg) const;
|
||||||
unsigned BitCount(uint8_t symbol) const;
|
|
||||||
|
|
||||||
size_t EstimateCompressedSize(const unsigned hist[], unsigned max_symbol) const;
|
size_t EstimateCompressedSize(const unsigned hist[], unsigned max_symbol) const;
|
||||||
|
|
||||||
|
@ -42,6 +41,10 @@ class HuffmanEncoder {
|
||||||
|
|
||||||
unsigned GetNBits(uint8_t symbol) const;
|
unsigned GetNBits(uint8_t symbol) const;
|
||||||
|
|
||||||
|
// Estimation of the size of the destination buffer needed to store the compressed data.
|
||||||
|
// destination of this size must be passed to Encode().
|
||||||
|
size_t CompressedBound(size_t src_size) const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
using HUF_CElt = size_t;
|
using HUF_CElt = size_t;
|
||||||
std::unique_ptr<HUF_CElt[]> huf_ctable_;
|
std::unique_ptr<HUF_CElt[]> huf_ctable_;
|
||||||
|
|
|
@ -65,6 +65,10 @@ ABSL_FLAG(string, P, "", "protocol can be empty (for RESP) or memcache_text");
|
||||||
|
|
||||||
ABSL_FLAG(bool, tcp_nodelay, false, "If true, set nodelay option on tcp socket");
|
ABSL_FLAG(bool, tcp_nodelay, false, "If true, set nodelay option on tcp socket");
|
||||||
ABSL_FLAG(bool, noreply, false, "If true, does not wait for replies. Relevant only for memcached.");
|
ABSL_FLAG(bool, noreply, false, "If true, does not wait for replies. Relevant only for memcached.");
|
||||||
|
|
||||||
|
ABSL_FLAG(bool, probe_cluster, true,
|
||||||
|
"If false, skips cluster-mode probing and works only in single node mode");
|
||||||
|
|
||||||
ABSL_FLAG(bool, greet, true,
|
ABSL_FLAG(bool, greet, true,
|
||||||
"If true, sends a greeting command on each connection, "
|
"If true, sends a greeting command on each connection, "
|
||||||
"to make sure the connection succeeded");
|
"to make sure the connection succeeded");
|
||||||
|
@ -1121,7 +1125,7 @@ int main(int argc, char* argv[]) {
|
||||||
tcp::endpoint ep{address, GetFlag(FLAGS_p)};
|
tcp::endpoint ep{address, GetFlag(FLAGS_p)};
|
||||||
|
|
||||||
ClusterShards shards;
|
ClusterShards shards;
|
||||||
if (protocol == RESP) {
|
if (protocol == RESP && GetFlag(FLAGS_probe_cluster)) {
|
||||||
shards = proactor->Await([&] { return FetchClusterInfo(ep, proactor); });
|
shards = proactor->Await([&] { return FetchClusterInfo(ep, proactor); });
|
||||||
}
|
}
|
||||||
CONSOLE_INFO << "Connecting to "
|
CONSOLE_INFO << "Connecting to "
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue