Skip to content

Commit

Permalink
bpop when rpoplpush and client closed
Browse files Browse the repository at this point in the history
  • Loading branch information
SamuelSze1 committed Aug 18, 2024
1 parent 645829e commit 47252a4
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 1 deletion.
7 changes: 7 additions & 0 deletions src/base_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,21 @@ void BaseCmd::ServeAndUnblockConns(PClient* client) {
conn_blocked = waitting_list->erase(conn_blocked);
continue;
}

PClient* BlockedClient = (*conn_blocked).GetBlockedClient();

if (BlockedClient->State() == ClientState::kClosed) {
conn_blocked = waitting_list->erase(conn_blocked);
continue;
}

switch (conn_blocked->GetCmdType()) {
case BlockedConnNode::Type::BLPop:
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->LPop(client->Key(), 1, &elements);
break;
case BlockedConnNode::Type::BRPop:
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPop(client->Key(), 1, &elements);
break;
}

if (s.ok()) {
Expand Down
2 changes: 2 additions & 0 deletions src/cmd_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ void RPoplpushCmd::DoCmd(PClient* client) {
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPoplpush(source_, receiver_, &value);
if (s.ok()) {
client->AppendString(value);
client->SetKey(receiver_);
ServeAndUnblockConns(client);
} else if (s.IsNotFound()) {
client->AppendStringLen(-1);
} else if (s.IsInvalidArgument()) {
Expand Down
2 changes: 1 addition & 1 deletion src/pikiwidb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void PikiwiDB::ScanEvictedBlockedConnsOfBlrpop() {
for (auto& it : key_to_blocked_conns) {
auto& conns_list = it.second;
for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) {
if (conn_node->is_done_->exchange(true)) {
if (conn_node->is_done_->exchange(true) || conn_node->GetBlockedClient()->State() == ClientState::kClosed) {
conn_node = conns_list->erase(conn_node);
} else if (conn_node->IsExpired()) {
PClient* conn_ptr = conn_node->GetBlockedClient();
Expand Down

0 comments on commit 47252a4

Please sign in to comment.