diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index d8cc3403d..1ba3e70c6 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -2,3 +2,5 @@ * **[Philipp Born](https://github.com/tamcore)** * Helm Chart +* **[Ryan Russell](https://github.com/ryanrussell)** + * Docs & Code Readability \ No newline at end of file diff --git a/src/server/channel_slice.cc b/src/server/channel_slice.cc index 93f5ec637..d7e445a73 100644 --- a/src/server/channel_slice.cc +++ b/src/server/channel_slice.cc @@ -55,21 +55,21 @@ auto ChannelSlice::FetchSubscribers(string_view channel) -> vector { auto it = channels_.find(channel); if (it != channels_.end()) { res.reserve(it->second->subscribers.size()); - CopySubsribers(it->second->subscribers, string{}, &res); + CopySubscribers(it->second->subscribers, string{}, &res); } for (const auto& k_v : patterns_) { const string& pat = k_v.first; // 1 - match if (stringmatchlen(pat.data(), pat.size(), channel.data(), channel.size(), 0) == 1) { - CopySubsribers(k_v.second->subscribers, pat, &res); + CopySubscribers(k_v.second->subscribers, pat, &res); } } return res; } -void ChannelSlice::CopySubsribers(const SubsribeMap& src, const std::string& pattern, +void ChannelSlice::CopySubscribers(const SubscribeMap& src, const std::string& pattern, vector* dest) { for (const auto& sub : src) { Subscriber s(sub.first, sub.second.thread_id); diff --git a/src/server/channel_slice.h b/src/server/channel_slice.h index 9403aabed..f2f3d5ec4 100644 --- a/src/server/channel_slice.h +++ b/src/server/channel_slice.h @@ -48,13 +48,13 @@ class ChannelSlice { } }; - using SubsribeMap = absl::flat_hash_map; + using SubscribeMap = absl::flat_hash_map; - static void CopySubsribers(const SubsribeMap& src, const std::string& pattern, + static void CopySubscribers(const SubscribeMap& src, const std::string& pattern, std::vector* dest); struct Channel { - SubsribeMap subscribers; + SubscribeMap subscribers; }; absl::flat_hash_map> channels_; diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index b34358175..4390b2e3a 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -27,7 +27,7 @@ void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgLis this->force_dispatch = true; } - // Gather all the channels we need to subsribe to / remove. + // Gather all the channels we need to subscribe to / remove. for (size_t i = 0; i < args.size(); ++i) { bool res = false; string_view channel = ArgS(args, i); @@ -71,7 +71,7 @@ void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgLis int32_t tid = util::ProactorBase::GetIndex(); DCHECK_GE(tid, 0); - // Update the subsribers on publisher's side. + // Update the subscribers on publisher's side. auto cb = [&](EngineShard* shard) { ChannelSlice& cs = shard->channel_slice(); unsigned start = shard_idx[shard->shard_id()]; @@ -100,8 +100,8 @@ void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgLis (*this)->SendBulkString(action[to_add]); (*this)->SendBulkString(ArgS(args, i)); // channel - // number of subsribed channels for this connection *right after* - // we subsribe. + // number of subscribed channels for this connection *right after* + // we subscribe. (*this)->SendLong(result[i]); } } @@ -121,7 +121,7 @@ void ConnectionContext::ChangePSub(bool to_add, bool to_reply, CmdArgList args) this->force_dispatch = true; } - // Gather all the patterns we need to subsribe to / remove. + // Gather all the patterns we need to subscribe to / remove. for (size_t i = 0; i < args.size(); ++i) { bool res = false; string_view pattern = ArgS(args, i); @@ -147,7 +147,7 @@ void ConnectionContext::ChangePSub(bool to_add, bool to_reply, CmdArgList args) int32_t tid = util::ProactorBase::GetIndex(); DCHECK_GE(tid, 0); - // Update the subsribers on publisher's side. + // Update the subscribers on publisher's side. auto cb = [&](EngineShard* shard) { ChannelSlice& cs = shard->channel_slice(); for (string_view pattern : patterns) { diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index 9d4d95617..c3b90ee18 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -417,7 +417,7 @@ TEST_F(DflyEngineTest, PSubscribe) { resp = pp_->at(0)->Await([&] { return Run({"publish", "ab", "foo"}); }); EXPECT_THAT(resp, IntArg(1)); - ASSERT_EQ(1, SubsriberMessagesLen("IO1")); + ASSERT_EQ(1, SubscriberMessagesLen("IO1")); facade::Connection::PubMessage msg = GetPublishedMessage("IO1", 0); EXPECT_EQ("foo", msg.message); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 4c70d497b..9558d4944 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -897,31 +897,31 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) { auto cb = [&] { return EngineShard::tlocal()->channel_slice().FetchSubscribers(channel); }; - vector subsriber_arr = shard_set->Await(sid, std::move(cb)); + vector subscriber_arr = shard_set->Await(sid, std::move(cb)); atomic_uint32_t published{0}; - if (!subsriber_arr.empty()) { - sort(subsriber_arr.begin(), subsriber_arr.end(), + if (!subscriber_arr.empty()) { + sort(subscriber_arr.begin(), subscriber_arr.end(), [](const auto& left, const auto& right) { return left.thread_id < right.thread_id; }); vector slices(shard_set->pool()->size(), UINT_MAX); - for (size_t i = 0; i < subsriber_arr.size(); ++i) { - if (slices[subsriber_arr[i].thread_id] > i) { - slices[subsriber_arr[i].thread_id] = i; + for (size_t i = 0; i < subscriber_arr.size(); ++i) { + if (slices[subscriber_arr[i].thread_id] > i) { + slices[subscriber_arr[i].thread_id] = i; } } - fibers_ext::BlockingCounter bc(subsriber_arr.size()); + fibers_ext::BlockingCounter bc(subscriber_arr.size()); auto publish_cb = [&, bc](unsigned idx, util::ProactorBase*) mutable { unsigned start = slices[idx]; - for (unsigned i = start; i < subsriber_arr.size(); ++i) { - const ChannelSlice::Subscriber& subscriber = subsriber_arr[i]; + for (unsigned i = start; i < subscriber_arr.size(); ++i) { + const ChannelSlice::Subscriber& subscriber = subscriber_arr[i]; if (subscriber.thread_id != idx) break; published.fetch_add(1, memory_order_relaxed); - facade::Connection* conn = subsriber_arr[i].conn_cntx->owner(); + facade::Connection* conn = subscriber_arr[i].conn_cntx->owner(); DCHECK(conn); facade::Connection::PubMessage pmsg; pmsg.channel = channel; @@ -936,10 +936,10 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) { bc.Wait(); // Wait for all the messages to be sent. } - // If subsriber connections are closing they will wait + // If subscriber connections are closing they will wait // for the tokens to be reclaimed in OnClose(). This guarantees that subscribers we gathered // still exist till we finish publishing. - for (auto& s : subsriber_arr) { + for (auto& s : subscriber_arr) { s.borrow_token.Dec(); } diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index c90651640..f9386b50d 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -327,7 +327,7 @@ string BaseFamilyTest::GetId() const { return absl::StrCat("IO", id); } -size_t BaseFamilyTest::SubsriberMessagesLen(string_view conn_id) const { +size_t BaseFamilyTest::SubscriberMessagesLen(string_view conn_id) const { auto it = connections_.find(conn_id); if (it == connections_.end()) return 0; diff --git a/src/server/test_utils.h b/src/server/test_utils.h index 0bfbbb687..fd11c9f9f 100644 --- a/src/server/test_utils.h +++ b/src/server/test_utils.h @@ -72,7 +72,7 @@ class BaseFamilyTest : public ::testing::Test { void UpdateTime(uint64_t ms); std::string GetId() const; - size_t SubsriberMessagesLen(std::string_view conn_id) const; + size_t SubscriberMessagesLen(std::string_view conn_id) const; // Returns message parts as returned by RESP: // pmessage, pattern, channel, message