mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
Fix Various Naming Conventions around Subscriber
(#87)
* Fix `SubscribeMap` Signed-off-by: Ryan Russell <git@ryanrussell.org> * Fix `subscriber_arr` Signed-off-by: Ryan Russell <git@ryanrussell.org> * Fix `CopySubscribers` Signed-off-by: Ryan Russell <git@ryanrussell.org> * Fix `SubscriberMessagesLen` Signed-off-by: Ryan Russell <git@ryanrussell.org> * Fix remaining `subscribe` variants Signed-off-by: Ryan Russell <git@ryanrussell.org> * Add Ryan Russell to Contributors Signed-off-by: Ryan Russell <git@ryanrussell.org>
This commit is contained in:
parent
e806e6ccd8
commit
4a8644559a
8 changed files with 29 additions and 27 deletions
|
@ -2,3 +2,5 @@
|
||||||
|
|
||||||
* **[Philipp Born](https://github.com/tamcore)**
|
* **[Philipp Born](https://github.com/tamcore)**
|
||||||
* Helm Chart
|
* Helm Chart
|
||||||
|
* **[Ryan Russell](https://github.com/ryanrussell)**
|
||||||
|
* Docs & Code Readability
|
|
@ -55,21 +55,21 @@ auto ChannelSlice::FetchSubscribers(string_view channel) -> vector<Subscriber> {
|
||||||
auto it = channels_.find(channel);
|
auto it = channels_.find(channel);
|
||||||
if (it != channels_.end()) {
|
if (it != channels_.end()) {
|
||||||
res.reserve(it->second->subscribers.size());
|
res.reserve(it->second->subscribers.size());
|
||||||
CopySubsribers(it->second->subscribers, string{}, &res);
|
CopySubscribers(it->second->subscribers, string{}, &res);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const auto& k_v : patterns_) {
|
for (const auto& k_v : patterns_) {
|
||||||
const string& pat = k_v.first;
|
const string& pat = k_v.first;
|
||||||
// 1 - match
|
// 1 - match
|
||||||
if (stringmatchlen(pat.data(), pat.size(), channel.data(), channel.size(), 0) == 1) {
|
if (stringmatchlen(pat.data(), pat.size(), channel.data(), channel.size(), 0) == 1) {
|
||||||
CopySubsribers(k_v.second->subscribers, pat, &res);
|
CopySubscribers(k_v.second->subscribers, pat, &res);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ChannelSlice::CopySubsribers(const SubsribeMap& src, const std::string& pattern,
|
void ChannelSlice::CopySubscribers(const SubscribeMap& src, const std::string& pattern,
|
||||||
vector<Subscriber>* dest) {
|
vector<Subscriber>* dest) {
|
||||||
for (const auto& sub : src) {
|
for (const auto& sub : src) {
|
||||||
Subscriber s(sub.first, sub.second.thread_id);
|
Subscriber s(sub.first, sub.second.thread_id);
|
||||||
|
|
|
@ -48,13 +48,13 @@ class ChannelSlice {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
using SubsribeMap = absl::flat_hash_map<ConnectionContext*, SubscriberInternal>;
|
using SubscribeMap = absl::flat_hash_map<ConnectionContext*, SubscriberInternal>;
|
||||||
|
|
||||||
static void CopySubsribers(const SubsribeMap& src, const std::string& pattern,
|
static void CopySubscribers(const SubscribeMap& src, const std::string& pattern,
|
||||||
std::vector<Subscriber>* dest);
|
std::vector<Subscriber>* dest);
|
||||||
|
|
||||||
struct Channel {
|
struct Channel {
|
||||||
SubsribeMap subscribers;
|
SubscribeMap subscribers;
|
||||||
};
|
};
|
||||||
|
|
||||||
absl::flat_hash_map<std::string, std::unique_ptr<Channel>> channels_;
|
absl::flat_hash_map<std::string, std::unique_ptr<Channel>> channels_;
|
||||||
|
|
|
@ -27,7 +27,7 @@ void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgLis
|
||||||
this->force_dispatch = true;
|
this->force_dispatch = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Gather all the channels we need to subsribe to / remove.
|
// Gather all the channels we need to subscribe to / remove.
|
||||||
for (size_t i = 0; i < args.size(); ++i) {
|
for (size_t i = 0; i < args.size(); ++i) {
|
||||||
bool res = false;
|
bool res = false;
|
||||||
string_view channel = ArgS(args, i);
|
string_view channel = ArgS(args, i);
|
||||||
|
@ -71,7 +71,7 @@ void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgLis
|
||||||
int32_t tid = util::ProactorBase::GetIndex();
|
int32_t tid = util::ProactorBase::GetIndex();
|
||||||
DCHECK_GE(tid, 0);
|
DCHECK_GE(tid, 0);
|
||||||
|
|
||||||
// Update the subsribers on publisher's side.
|
// Update the subscribers on publisher's side.
|
||||||
auto cb = [&](EngineShard* shard) {
|
auto cb = [&](EngineShard* shard) {
|
||||||
ChannelSlice& cs = shard->channel_slice();
|
ChannelSlice& cs = shard->channel_slice();
|
||||||
unsigned start = shard_idx[shard->shard_id()];
|
unsigned start = shard_idx[shard->shard_id()];
|
||||||
|
@ -100,8 +100,8 @@ void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgLis
|
||||||
(*this)->SendBulkString(action[to_add]);
|
(*this)->SendBulkString(action[to_add]);
|
||||||
(*this)->SendBulkString(ArgS(args, i)); // channel
|
(*this)->SendBulkString(ArgS(args, i)); // channel
|
||||||
|
|
||||||
// number of subsribed channels for this connection *right after*
|
// number of subscribed channels for this connection *right after*
|
||||||
// we subsribe.
|
// we subscribe.
|
||||||
(*this)->SendLong(result[i]);
|
(*this)->SendLong(result[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -121,7 +121,7 @@ void ConnectionContext::ChangePSub(bool to_add, bool to_reply, CmdArgList args)
|
||||||
this->force_dispatch = true;
|
this->force_dispatch = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Gather all the patterns we need to subsribe to / remove.
|
// Gather all the patterns we need to subscribe to / remove.
|
||||||
for (size_t i = 0; i < args.size(); ++i) {
|
for (size_t i = 0; i < args.size(); ++i) {
|
||||||
bool res = false;
|
bool res = false;
|
||||||
string_view pattern = ArgS(args, i);
|
string_view pattern = ArgS(args, i);
|
||||||
|
@ -147,7 +147,7 @@ void ConnectionContext::ChangePSub(bool to_add, bool to_reply, CmdArgList args)
|
||||||
int32_t tid = util::ProactorBase::GetIndex();
|
int32_t tid = util::ProactorBase::GetIndex();
|
||||||
DCHECK_GE(tid, 0);
|
DCHECK_GE(tid, 0);
|
||||||
|
|
||||||
// Update the subsribers on publisher's side.
|
// Update the subscribers on publisher's side.
|
||||||
auto cb = [&](EngineShard* shard) {
|
auto cb = [&](EngineShard* shard) {
|
||||||
ChannelSlice& cs = shard->channel_slice();
|
ChannelSlice& cs = shard->channel_slice();
|
||||||
for (string_view pattern : patterns) {
|
for (string_view pattern : patterns) {
|
||||||
|
|
|
@ -417,7 +417,7 @@ TEST_F(DflyEngineTest, PSubscribe) {
|
||||||
resp = pp_->at(0)->Await([&] { return Run({"publish", "ab", "foo"}); });
|
resp = pp_->at(0)->Await([&] { return Run({"publish", "ab", "foo"}); });
|
||||||
EXPECT_THAT(resp, IntArg(1));
|
EXPECT_THAT(resp, IntArg(1));
|
||||||
|
|
||||||
ASSERT_EQ(1, SubsriberMessagesLen("IO1"));
|
ASSERT_EQ(1, SubscriberMessagesLen("IO1"));
|
||||||
|
|
||||||
facade::Connection::PubMessage msg = GetPublishedMessage("IO1", 0);
|
facade::Connection::PubMessage msg = GetPublishedMessage("IO1", 0);
|
||||||
EXPECT_EQ("foo", msg.message);
|
EXPECT_EQ("foo", msg.message);
|
||||||
|
|
|
@ -897,31 +897,31 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) {
|
||||||
|
|
||||||
auto cb = [&] { return EngineShard::tlocal()->channel_slice().FetchSubscribers(channel); };
|
auto cb = [&] { return EngineShard::tlocal()->channel_slice().FetchSubscribers(channel); };
|
||||||
|
|
||||||
vector<ChannelSlice::Subscriber> subsriber_arr = shard_set->Await(sid, std::move(cb));
|
vector<ChannelSlice::Subscriber> subscriber_arr = shard_set->Await(sid, std::move(cb));
|
||||||
atomic_uint32_t published{0};
|
atomic_uint32_t published{0};
|
||||||
|
|
||||||
if (!subsriber_arr.empty()) {
|
if (!subscriber_arr.empty()) {
|
||||||
sort(subsriber_arr.begin(), subsriber_arr.end(),
|
sort(subscriber_arr.begin(), subscriber_arr.end(),
|
||||||
[](const auto& left, const auto& right) { return left.thread_id < right.thread_id; });
|
[](const auto& left, const auto& right) { return left.thread_id < right.thread_id; });
|
||||||
|
|
||||||
vector<unsigned> slices(shard_set->pool()->size(), UINT_MAX);
|
vector<unsigned> slices(shard_set->pool()->size(), UINT_MAX);
|
||||||
for (size_t i = 0; i < subsriber_arr.size(); ++i) {
|
for (size_t i = 0; i < subscriber_arr.size(); ++i) {
|
||||||
if (slices[subsriber_arr[i].thread_id] > i) {
|
if (slices[subscriber_arr[i].thread_id] > i) {
|
||||||
slices[subsriber_arr[i].thread_id] = i;
|
slices[subscriber_arr[i].thread_id] = i;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fibers_ext::BlockingCounter bc(subsriber_arr.size());
|
fibers_ext::BlockingCounter bc(subscriber_arr.size());
|
||||||
auto publish_cb = [&, bc](unsigned idx, util::ProactorBase*) mutable {
|
auto publish_cb = [&, bc](unsigned idx, util::ProactorBase*) mutable {
|
||||||
unsigned start = slices[idx];
|
unsigned start = slices[idx];
|
||||||
|
|
||||||
for (unsigned i = start; i < subsriber_arr.size(); ++i) {
|
for (unsigned i = start; i < subscriber_arr.size(); ++i) {
|
||||||
const ChannelSlice::Subscriber& subscriber = subsriber_arr[i];
|
const ChannelSlice::Subscriber& subscriber = subscriber_arr[i];
|
||||||
if (subscriber.thread_id != idx)
|
if (subscriber.thread_id != idx)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
published.fetch_add(1, memory_order_relaxed);
|
published.fetch_add(1, memory_order_relaxed);
|
||||||
facade::Connection* conn = subsriber_arr[i].conn_cntx->owner();
|
facade::Connection* conn = subscriber_arr[i].conn_cntx->owner();
|
||||||
DCHECK(conn);
|
DCHECK(conn);
|
||||||
facade::Connection::PubMessage pmsg;
|
facade::Connection::PubMessage pmsg;
|
||||||
pmsg.channel = channel;
|
pmsg.channel = channel;
|
||||||
|
@ -936,10 +936,10 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) {
|
||||||
bc.Wait(); // Wait for all the messages to be sent.
|
bc.Wait(); // Wait for all the messages to be sent.
|
||||||
}
|
}
|
||||||
|
|
||||||
// If subsriber connections are closing they will wait
|
// If subscriber connections are closing they will wait
|
||||||
// for the tokens to be reclaimed in OnClose(). This guarantees that subscribers we gathered
|
// for the tokens to be reclaimed in OnClose(). This guarantees that subscribers we gathered
|
||||||
// still exist till we finish publishing.
|
// still exist till we finish publishing.
|
||||||
for (auto& s : subsriber_arr) {
|
for (auto& s : subscriber_arr) {
|
||||||
s.borrow_token.Dec();
|
s.borrow_token.Dec();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -327,7 +327,7 @@ string BaseFamilyTest::GetId() const {
|
||||||
return absl::StrCat("IO", id);
|
return absl::StrCat("IO", id);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t BaseFamilyTest::SubsriberMessagesLen(string_view conn_id) const {
|
size_t BaseFamilyTest::SubscriberMessagesLen(string_view conn_id) const {
|
||||||
auto it = connections_.find(conn_id);
|
auto it = connections_.find(conn_id);
|
||||||
if (it == connections_.end())
|
if (it == connections_.end())
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -72,7 +72,7 @@ class BaseFamilyTest : public ::testing::Test {
|
||||||
void UpdateTime(uint64_t ms);
|
void UpdateTime(uint64_t ms);
|
||||||
|
|
||||||
std::string GetId() const;
|
std::string GetId() const;
|
||||||
size_t SubsriberMessagesLen(std::string_view conn_id) const;
|
size_t SubscriberMessagesLen(std::string_view conn_id) const;
|
||||||
|
|
||||||
// Returns message parts as returned by RESP:
|
// Returns message parts as returned by RESP:
|
||||||
// pmessage, pattern, channel, message
|
// pmessage, pattern, channel, message
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue