Skip to content

Commit

Permalink
client
Browse files Browse the repository at this point in the history
  • Loading branch information
Ye-Yu-Mo committed Oct 13, 2024
1 parent 8bf1219 commit fd8eb63
Show file tree
Hide file tree
Showing 511 changed files with 4,833 additions and 3,312 deletions.
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,12 @@ TcpClient需要一个EventLoopThread模块进行IO事件监控

## TODO

* VirtualHost的创建删除
* 使用工厂模式创建对象
* 虚拟机管理
* 用户管理
* 服务端对发布用户的确认
* 客户端主动拉取消息功能
* 消息管理的方式
* 管理接口+页面
* 服务端交换机检测到当前所有客户端退出, 自动删除交换机\\队列
* 独占功能: 只有自己发布的消息才能消费 登录注册
* 独占功能: 只有自己发布的消息才能消费
13 changes: 13 additions & 0 deletions client/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
.PHONY:all
all:publish_client consume_client
CFLAG= -I../third/muduo/include/
LFALG= -L../third/muduo/lib -lgtest -lsqlite3 -lprotobuf -pthread -lmuduo_net -lmuduo_base -lz
publish_client:publish_client.cc ../common/msg.pb.cc ../common/protocol.pb.cc ../third/muduo/include/muduo/protobuf/codec.cc
g++ -g $(CFLAG) $^ -o $@ -std=c++17 $(LFALG)

consume_client:consume_client.cc ../common/msg.pb.cc ../common/protocol.pb.cc ../third/muduo/include/muduo/protobuf/codec.cc
g++ -g $(CFLAG) $^ -o $@ -std=c++17 $(LFALG)

.PHONY:clean
clean:
rm publish_client consume_client -rf
23 changes: 14 additions & 9 deletions client/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,19 @@ namespace XuMQ
waitResponse(rid);
}
/// @brief 应答消息
/// @param qname 消息队列名称
/// @param msg_id 消息id
void basicAck(const std::string &qname, const std::string &msg_id)
void basicAck(const std::string &msg_id)
{
if (_consumer.get() == nullptr) // 消费者不存在 无法进行确认
{
error(logger, "消息确认时 当前信道未设置消费者!");
return;
}
basicAckRequest req;
std::string rid = UUIDHelper::uuid();
req.set_rid(rid);
req.set_cid(_cid);
req.set_queue_name(qname);
req.set_queue_name(_consumer->qname);
req.set_msg_id(msg_id);
_codec->send(_conn, req);
waitResponse(rid);
Expand All @@ -215,13 +219,11 @@ namespace XuMQ
/// @param auto_ack 自动应答标志
/// @param cb 消费者回调函数
/// @return 成功返回true 失败返回false
bool basicConsume(const std::string &tag, const std::string &qname, bool auto_ack, ConsumerCallback &cb)
bool basicConsume(const std::string &tag, const std::string &qname, bool auto_ack, const ConsumerCallback &cb)
{
if (_consumer.get() != nullptr)
{
warn(logger, "当前信道已订阅其他消息!");
if (_consumer.get() != nullptr) // 不为空时 说明消费者已经存在 不需要再创建
return false;
}

basicConsumeRequest req;
std::string rid = UUIDHelper::uuid();
req.set_rid(rid);
Expand All @@ -231,6 +233,7 @@ namespace XuMQ
req.set_auto_ack(auto_ack);
_codec->send(_conn, req);
basicResponsePtr resp = waitResponse(rid);
debug(logger,"请求创建一个消费者 消费者tag为%s 消费者所在队列是%s",tag.c_str(), qname.c_str());
if (resp->ok() == false)
{
error(logger, "添加订阅失败!");
Expand All @@ -242,7 +245,7 @@ namespace XuMQ
/// @brief 取消订阅
void basicCancel()
{
if (_consumer.get() != nullptr)
if (_consumer.get() == nullptr) // 消费者为空 无法取消订阅
return;
basicCancelRequest req;
std::string rid = UUIDHelper::uuid();
Expand Down Expand Up @@ -342,7 +345,9 @@ namespace XuMQ
std::unique_lock<std::mutex> lock(_mutex);
auto it = _channels.find(cid);
if (it == _channels.end())
{
return Channel::ptr();
}
return it->second;
}

Expand Down
9 changes: 9 additions & 0 deletions client/connection.hpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
/**
* @file connection.hpp
* @brief 该文件定义了 Connection 类,负责管理客户端连接、信道的创建与关闭以及消息处理。
*
* 包含了对 Muduo 网络库、Protobuf 协议编码与分发、异步工作器和日志模块的引用。
* 主要功能是通过 TcpClient 管理与服务器的连接,利用分发器处理不同类型的消息,并支持信道的创建与关闭。
* 信道用于消息队列中消息的发送与消费,异步工作器则管理多线程任务。
*/
#pragma once

#include "muduo/protobuf/dispatcher.h"
Expand All @@ -20,6 +28,7 @@ namespace XuMQ
class Connection
{
public:
using ptr = std::shared_ptr<Connection>; ///< 连接管理句柄
/// @brief 连接构造函数
/// @param sip 服务器ip
/// @param sport 服务器端口号
Expand Down
Binary file added client/consume_client
Binary file not shown.
42 changes: 42 additions & 0 deletions client/consume_client.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#include "connection.hpp"
#include "../common/logger.hpp"

void callBack(XuMQ::Channel::ptr &channel, const std::string &consumer_tag, const XuMQ::BasicProperties *bp, const std::string &body)
{
info(XuMQ::logger, "消费者%s 消费的消息是:%s", consumer_tag.c_str(), body.c_str());
channel->basicAck(bp->id());
}
int main(int argc, char *argv[])
{
if (argc != 2)
{
info(XuMQ::logger, "usage: ./consume_client queue1");
}
// 实例化异步工作线程对象
XuMQ::AsyncWorker::ptr awp = std::make_shared<XuMQ::AsyncWorker>();
// 实例化连接对象
XuMQ::Connection::ptr conn = std::make_shared<XuMQ::Connection>("127.0.0.1", 8888, awp);
// 通过连接创建信道
XuMQ::Channel::ptr channel = conn->openChannel();

// 声明交换机exchange1
google::protobuf::Map<std::string, std::string> tmp;
channel->declareExchange("exchange1", XuMQ::ExchangeType::TOPIC, true, false, tmp);
// 声明队列queue1
channel->declareQueue("queue1", true, false, false, tmp);
// 声明队列queue2
channel->declareQueue("queue2", true, false, false, tmp);
// 绑定queue1-exchange1 设置binding_key = queue1
channel->queueBind("exchange1", "queue1", "queue1");
// 绑定queue2-exchange1 设置binding_key = news.music.#
channel->queueBind("exchange1", "queue2", "news.music.#");
// 订阅指定队列消息
auto func = std::bind(callBack, channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
channel->basicConsume("consumer1", argv[1], false, func);
while (true)
{
std::this_thread::sleep_for(std::chrono::seconds(3));
}
conn->closeChannel(channel);
return 0;
}
Binary file added client/publish_client
Binary file not shown.
45 changes: 45 additions & 0 deletions client/publish_client.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#include "connection.hpp"
int main()
{
// 实例化异步工作线程对象
XuMQ::AsyncWorker::ptr awp = std::make_shared<XuMQ::AsyncWorker>();
// 实例化连接对象
XuMQ::Connection::ptr conn = std::make_shared<XuMQ::Connection>("127.0.0.1", 8888, awp);
// 通过连接创建信道
XuMQ::Channel::ptr channel = conn->openChannel();
// 完成服务
// 声明交换机exchange1
google::protobuf::Map<std::string, std::string> tmp;
channel->declareExchange("exchange1", XuMQ::ExchangeType::TOPIC, true, false, tmp);
// 声明队列queue1
channel->declareQueue("queue1", true, false, false, tmp);
// 声明队列queue2
channel->declareQueue("queue2", true, false, false, tmp);
// 绑定queue1-exchange1 设置binding_key = queue1
channel->queueBind("exchange1", "queue1", "queue1");
// 绑定queue2-exchange1 设置binding_key = news.music.#
channel->queueBind("exchange1", "queue2", "news.music.#");
// 发布消息

for (int i = 0; i < 10; i++)
{
XuMQ::BasicProperties bp;
bp.set_id(XuMQ::UUIDHelper::uuid());
bp.set_delivery_mode(XuMQ::DeliveryMode::DURABLE);
bp.set_routing_key("news.music.pop");
channel->basicPublish("exchange1", &bp, "hello-" + std::to_string(i));
}
XuMQ::BasicProperties bp1;
bp1.set_id(XuMQ::UUIDHelper::uuid());
bp1.set_delivery_mode(XuMQ::DeliveryMode::DURABLE);
bp1.set_routing_key("news.music.sport");
channel->basicPublish("exchange1", &bp1, "hello XU");
XuMQ::BasicProperties bp2;
bp2.set_id(XuMQ::UUIDHelper::uuid());
bp2.set_delivery_mode(XuMQ::DeliveryMode::DURABLE);
bp2.set_routing_key("news.sport");
channel->basicPublish("exchange1", &bp2, "hello ???");
// 关闭信道
conn->closeChannel(channel);
return 0;
}
5 changes: 0 additions & 5 deletions demo/random/Makefile

This file was deleted.

Binary file removed demo/random/random
Binary file not shown.
49 changes: 0 additions & 49 deletions demo/random/random.cpp

This file was deleted.

43 changes: 0 additions & 43 deletions demo/split/split.cpp

This file was deleted.

1 change: 0 additions & 1 deletion docs/annotated.html
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@
<tr id="row_9_" class="even"><td class="entry"><span style="width:16px;display:inline-block;">&#160;</span><span class="icona"><span class="icon">C</span></span><a class="el" href="class_route_test.html" target="_self">RouteTest</a></td><td class="desc"></td></tr>
<tr id="row_10_" class="odd"><td class="entry"><span style="width:16px;display:inline-block;">&#160;</span><span class="icona"><span class="icon">C</span></span><a class="el" href="struct_table_struct__msg__2eproto.html" target="_self">TableStruct_msg_2eproto</a></td><td class="desc"></td></tr>
<tr id="row_11_" class="even"><td class="entry"><span style="width:16px;display:inline-block;">&#160;</span><span class="icona"><span class="icon">C</span></span><a class="el" href="struct_table_struct__protocol__2eproto.html" target="_self">TableStruct_protocol_2eproto</a></td><td class="desc"></td></tr>
<tr id="row_12_" class="odd"><td class="entry"><span style="width:16px;display:inline-block;">&#160;</span><span class="icona"><span class="icon">C</span></span><a class="el" href="class_u_u_i_d_helper.html" target="_self">UUIDHelper</a></td><td class="desc"></td></tr>
</table>
</div><!-- directory -->
</div><!-- contents -->
Expand Down
3 changes: 1 addition & 2 deletions docs/annotated_dup.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 20 additions & 20 deletions docs/binding_8hpp.html
Original file line number Diff line number Diff line change
Expand Up @@ -201,26 +201,26 @@
<div class="dyncontent">
<div class="center"><img src="binding_8hpp__dep__incl.png" border="0" usemap="#aserver_2binding_8hppdep" alt=""/></div>
<map name="aserver_2binding_8hppdep" id="aserver_2binding_8hppdep">
<area shape="rect" title="该文件包含消息队列绑定信息的管理类和结构体定义。" alt="" coords="210,5,353,31"/>
<area shape="rect" href="host_8hpp.html" title="XuMQ 虚拟机模块定义" alt="" coords="135,79,257,104"/>
<area shape="poly" title=" " alt="" coords="259,41,212,80,208,76,256,37"/>
<area shape="rect" href="mqbindingtest_8cpp.html" title=" " alt="" coords="282,79,454,104"/>
<area shape="poly" title=" " alt="" coords="307,37,356,76,352,81,304,41"/>
<area shape="rect" href="broker_8hpp.html" title="XuMQ消息队列服务器类的声明文件。" alt="" coords="7,299,142,324"/>
<area shape="poly" title=" " alt="" coords="160,113,134,130,113,154,94,192,83,233,77,299,72,298,78,232,89,190,108,150,131,126,157,108"/>
<area shape="rect" href="server_2channel_8hpp.html" title="信道与信道管理模块的头文件,包含了信道的声明和其管理类的定义。" alt="" coords="123,152,269,177"/>
<area shape="poly" title=" " alt="" coords="199,118,199,152,193,152,193,118"/>
<area shape="rect" href="mqhosttest_8cpp.html" title=" " alt="" coords="293,152,445,177"/>
<area shape="poly" title=" " alt="" coords="237,107,342,149,340,154,235,112"/>
<area shape="rect" href="mqserver_8cpp.html" title=" " alt="" coords="5,372,144,397"/>
<area shape="poly" title=" " alt="" coords="77,338,77,372,72,372,72,338"/>
<area shape="rect" href="server_2connection_8hpp.html" title="声明了连接类和连接管理类,主要用于管理连接的建立、维护与信道的操作。" alt="" coords="113,225,279,251"/>
<area shape="poly" title=" " alt="" coords="199,191,199,225,193,225,193,191"/>
<area shape="rect" href="mqchanneltest_8cpp.html" title=" " alt="" coords="303,225,478,251"/>
<area shape="poly" title=" " alt="" coords="241,179,360,223,358,228,239,184"/>
<area shape="poly" title=" " alt="" coords="166,260,96,301,93,296,163,255"/>
<area shape="rect" href="mqconnectiontest_8cpp.html" title=" " alt="" coords="167,299,361,324"/>
<area shape="poly" title=" " alt="" coords="218,259,255,297,251,300,215,262"/>
<area shape="rect" title="该文件包含消息队列绑定信息的管理类和结构体定义。" alt="" coords="218,5,361,31"/>
<area shape="rect" href="host_8hpp.html" title="XuMQ 虚拟机模块定义" alt="" coords="143,79,265,104"/>
<area shape="poly" title=" " alt="" coords="267,41,220,80,216,76,264,37"/>
<area shape="rect" href="mqbindingtest_8cpp.html" title=" " alt="" coords="290,79,462,104"/>
<area shape="poly" title=" " alt="" coords="315,37,364,76,360,81,312,41"/>
<area shape="rect" href="broker_8hpp.html" title="XuMQ消息队列服务器类的声明文件。" alt="" coords="15,299,150,324"/>
<area shape="poly" title=" " alt="" coords="168,113,142,130,121,154,102,192,91,233,85,299,80,298,86,232,97,190,116,150,139,126,165,108"/>
<area shape="rect" href="server_2channel_8hpp.html" title="信道与信道管理模块的头文件,包含了信道的声明和其管理类的定义。" alt="" coords="131,152,277,177"/>
<area shape="poly" title=" " alt="" coords="207,118,207,152,201,152,201,118"/>
<area shape="rect" href="mqhosttest_8cpp.html" title=" " alt="" coords="301,152,453,177"/>
<area shape="poly" title=" " alt="" coords="245,107,350,149,348,154,243,112"/>
<area shape="rect" href="mqserver_8cpp.html" title=" " alt="" coords="5,372,160,397"/>
<area shape="poly" title=" " alt="" coords="85,338,85,372,80,372,80,338"/>
<area shape="rect" href="server_2connection_8hpp.html" title="声明了连接类和连接管理类,主要用于管理连接的建立、维护与信道的操作。" alt="" coords="121,225,287,251"/>
<area shape="poly" title=" " alt="" coords="207,191,207,225,201,225,201,191"/>
<area shape="rect" href="mqchanneltest_8cpp.html" title=" " alt="" coords="311,225,486,251"/>
<area shape="poly" title=" " alt="" coords="249,179,368,223,366,228,247,184"/>
<area shape="poly" title=" " alt="" coords="174,260,104,301,101,296,171,255"/>
<area shape="rect" href="mqconnectiontest_8cpp.html" title=" " alt="" coords="175,299,369,324"/>
<area shape="poly" title=" " alt="" coords="226,259,263,297,259,300,223,262"/>
</map>
</div>
</div>
Expand Down
Loading

0 comments on commit fd8eb63

Please sign in to comment.