Skip to content

Commit

Permalink
channel + fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
Ye-Yu-Mo committed Oct 11, 2024
1 parent 3256281 commit fb65ae5
Show file tree
Hide file tree
Showing 2,142 changed files with 95,680 additions and 5,661 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -372,4 +372,5 @@ TcpClient需要一个EventLoopThread模块进行IO事件监控

* VirtualHost的创建删除
* 客户端主动拉取消息功能
* 服务端交换机检测到当前所有客户端退出, 自动删除交换机\\队列
* 服务端交换机检测到当前所有客户端退出, 自动删除交换机\\队列
* 独占功能: 只有自己发布的消息才能消费 登录注册
5,537 changes: 5,537 additions & 0 deletions common/protocol.pb.cc

Large diffs are not rendered by default.

5,807 changes: 5,807 additions & 0 deletions common/protocol.pb.h

Large diffs are not rendered by default.

99 changes: 99 additions & 0 deletions common/protocol.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
syntax = "proto3";
package XuMQ;
import "msg.proto";
// 信道的打开与关闭
message openChannelRequest{
string rid = 1;
string cid = 2;
};
message closeChannelRequest{
string rid = 1;
string cid = 2;
};
// 交换机的声明与删除
message declareExchangeRequest{
string rid = 1;
string cid = 2;
string exchange_name = 3;
ExchangeType exchange_type = 4;
bool durable = 5;
bool auto_delete =6;
map<string, string> args = 7;
};
message deleteExchangeRequest{
string rid = 1;
string cid = 2;
string exchange_name = 3;
};
// 队列的声明与删除
message declareQueueRequest{
string rid = 1;
string cid = 2;
string queue_name = 3;
bool exclusive = 4;
bool durable = 5;
bool auto_delete =6;
map<string, string> args = 7;
};
message deleteQueueRequest{
string rid = 1;
string cid = 2;
string queue_name = 3;
};
// 队列的绑定与解除绑定
message queueBindRequest{
string rid = 1;
string cid = 2;
string exchange_name = 3;
string queue_name = 4;
string binding_key = 5;
};
message queueUnBindRequest{
string rid = 1;
string cid = 2;
string exchange_name = 3;
string queue_name = 4;
};
// 消息的发布
message basicPublishRequest{
string rid = 1;
string cid = 2;
string exchange_name = 3;
string body = 4;
BasicProperties properties = 5;
};
// 消息的确认
message basicAckRequest{
string rid = 1;
string cid = 2;
string queue_name = 3;
string msg_id = 4;
};
// 队列的订阅
message basicConsumeRequest{
string rid = 1;
string cid = 2;
string consumer_tag = 3;
string queue_name = 4;
bool auto_ack = 5;
};
// 订阅的取消
message basicCancelRequest{
string rid = 1;
string cid = 2;
string consumer_tag = 3;
string queue_name = 4;
};
// 消息的推送
message basicConsumeResponse{
string cid = 1;
string consumer_tag = 2;
string body = 3;
BasicProperties properties = 4;
};
// 通用响应
message basicResponse{
string rid = 1;
string cid = 2;
bool ok = 3;
};
92 changes: 92 additions & 0 deletions common/threadpool.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#pragma once
#include <iostream>
#include <functional>
#include <memory>
#include <thread>
#include <future>
#include <mutex>
#include <vector>
#include <condition_variable>
#include <atomic>
#include "../third/Xulog/logs/Xulog.h"

namespace XuMQ
{
class threadpool
{
public:
using ptr = std::shared_ptr<threadpool>;
using Functor = std::function<void(void)>;
threadpool(int thr_count = 1) : _stop(false)
{
for (int i = 0; i < thr_count; i++)
_threads.emplace_back(&threadpool::entry, this);
}
~threadpool()
{
stop();
}
void stop()
{
if (_stop == true)
return;
_stop = true;
_cv.notify_all(); // 唤醒线程
for (auto &thread : _threads)
thread.join();
}

// 自动推导返回值类型
template <typename F, typename... Args>
auto push(F &&func, Args &&...args) -> std::future<decltype(func(args...))>
{
// 将传入函数封装成packaged_task任务包
using return_type = decltype(func(args...));
auto tmp_func = std::bind(std::forward<F>(func), std::forward<Args>(args)...);
auto task = std::make_shared<std::packaged_task<return_type()>>(tmp_func);
std::future<return_type> fu = task->get_future();
// 构造lambda表达式(捕获任务对象,函数内执行任务对象)
{
std::unique_lock<std::mutex> lock(_mutex);
// 将构造出来的匿名对象传入任务池
_taskpool.push_back([task]()
{ (*task)(); });
_cv.notify_one();
}
return fu;
}

private:
// 线程入口函数 从任务池中取出任务执行
void entry()
{
while (!_stop)
{
// 临时任务池
// 避免频繁加解锁
std::vector<Functor> tmp_taskpool;
{
// 加锁
std::unique_lock<std::mutex> lock(_mutex);
// 等待任务不为空或_stop被置为1
_cv.wait(lock, [this]()
{ return _stop || !_taskpool.empty(); });

// 取出任务进行执行
tmp_taskpool.swap(_taskpool);
}
for (auto &task : tmp_taskpool)
{
task();
}
}
}

private:
std::atomic<bool> _stop;
std::vector<Functor> _taskpool;
std::mutex _mutex;
std::condition_variable _cv;
std::vector<std::thread> _threads;
};
}
Loading

0 comments on commit fb65ae5

Please sign in to comment.