实例介绍
【实例简介】ZeroMQ示例
【实例截图】
【核心代码】
using Clock = std::chrono::steady_clock;
using TimePoint = Clock::time_point;
using namespace std::chrono_literals;
using namespace ZMQ;
using namespace LOGGER;
void* contextInproc = nullptr;
void InprocRevThread()
{
while (contextInproc==nullptr)Sleep(100);
void* socket = zmq_socket(contextInproc, ZMQ_SUB);
zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0);
int err = zmq_connect(socket, "inproc://cbbps");
TimePoint prevTimePoint = Clock::now();
int loop = 0;
int prevLoop = 0;
while (1) {
char buffer[64] = { 0 };
int size = zmq_recv(socket, buffer, 64, ZMQ_DONTWAIT);
if (size > 0)loop ;
if (Clock::now() - prevTimePoint >= 1s) {
std::cout << "INPROC RevCount:" << std::to_string(loop - prevLoop) << "\n";
prevTimePoint = Clock::now();
prevLoop = loop;
}
std::this_thread::sleep_for(1us);
}
}
void InprocSendThread()
{
contextInproc = nullptr;
contextInproc = zmq_ctx_new();
void* socket = zmq_socket(contextInproc, ZMQ_PUB);
int err= zmq_bind(socket, "inproc://cbbps");
while (1) {
char buffer[64] = "Inproc ZeroMQ Test";
int err = zmq_send(socket, buffer, 64, ZMQ_DONTWAIT);
std::this_thread::sleep_for(2us);
}
}
void TCPRev1Thread(int port)
{
ZeroMQ TCP_SUB_recv;
int which_port = 5560 port;
std::string ip_port_str = "tcp://10.23.33.88:" std::to_string(5560);
TCP_SUB_recv.InitTCPReceiveMode_SUB(MAX_SIZE, ip_port_str.c_str());
TimePoint prevTimePoint = Clock::now();
uint32_t loop = 0;
uint32_t prevLoop = 0;
uint32_t reved = 0;
uint32_t prev_reved = 0;
while (1) {
char buffer[MAX_SIZE] = { 0 };
int size = TCP_SUB_recv.GetTCPReceiveData_SUB_More(buffer);
if (size >= 0) {
loop ;
uint32_t* p = (uint32_t*)&buffer[MAX_SIZE - 4];
if (*p != 0)reved ;
}
if (Clock::now() - prevTimePoint >= 1s) {
std::cout << "TCP Rev"<< which_port<<"Count:" << std::to_string(loop - prevLoop) << "\n";
std::cout << "TCP RevData" << which_port << "Count:"<<std::to_string(reved- prev_reved) << "\n";
prev_reved = reved;
prevTimePoint = Clock::now();
prevLoop = loop;
}
std::this_thread::sleep_for(1us);
}
}
void TCPSend1Thread(int port)
{
ZeroMQ TCP_PUB_send;
port = 5560 0;
std::string ip_port_str = "tcp://10.23.33.88:" std::to_string(port);
TCP_PUB_send.InitTCPSendMode_PUB(MAX_SIZE, ip_port_str.c_str());
uint32_t loop = 0;
char buffer[MAX_SIZE] = { 1 };
while (1) {
buffer[0] = loop;
int* p = (int*)&buffer[MAX_SIZE - 4];
*p= loop ;
TCP_PUB_send.SetTCPSendData_PUB_More((void*)buffer, MAX_SIZE,1024);
std::this_thread::sleep_for(2us);
}
}
void TCPSendThread_PUSH(int port)
{
ZeroMQ TCP_send;
int which_port = 5560 0;
std::string ip_port_str = "tcp://10.23.33.88:" std::to_string(which_port);
TCP_send.InitTCPSendMode_PUSH(MAX_SIZE, ip_port_str.c_str());
uint32_t loop = 0;
char buffer[MAX_SIZE] = { 1 };
while (1) {
buffer[0] = (char)port 1;
int* p = (int*)&buffer[MAX_SIZE - 4];
*p = loop ;
TCP_send.SetTCPSendData_PUSH((void*)buffer, MAX_SIZE);
std::this_thread::sleep_for(2us);
}
}
void TCPRevThread_PULL(int port)
{
ZeroMQ TCP_recv;
int which_port = 5560 port;
std::string ip_port_str = "tcp://10.23.33.88:" std::to_string(5560);
TCP_recv.InitTCPReceiveMode_PULL(MAX_SIZE, ip_port_str.c_str());
TimePoint prevTimePoint = Clock::now();
uint32_t loop = 0;
uint32_t prevLoop = 0;
uint32_t reved = 0;
uint32_t prev_reved = 0;
while (1) {
char buffer[MAX_SIZE] = { 0 };
int size = TCP_recv.GetTCPReceiveData_PULL(buffer);
if (size >= 0) {
loop ;
uint32_t* p = (uint32_t*)&buffer[MAX_SIZE - 4];
if (*p != 0)reved ;
uint8_t port = buffer[0];
std::cout << "TCP Rev Port" << std::to_string(port) << "\n";
}
if (Clock::now() - prevTimePoint >= 1s) {
std::cout << "TCP Rev" << which_port << "Count:" << std::to_string(loop - prevLoop) << "\n";
std::cout << "TCP RevData" << which_port << "Count:" << std::to_string(reved - prev_reved) << "\n";
prev_reved = reved;
prevTimePoint = Clock::now();
prevLoop = loop;
}
std::this_thread::sleep_for(1us);
}
}
void DEALER_Send_Thread()
{
ZeroMQ _0MQ;
std::string ip_port_str = "tcp://192.168.203.127:" std::to_string(52000);
_0MQ.InitTCPSendMode_DEALER(MAX_SIZE, ip_port_str.c_str());
uint32_t loop = 1;
char buffer[MAX_SIZE] = { 0 };
while (1) {
uint32_t* p = (uint32_t*)&buffer[0];
*p = loop;
if (_0MQ.SetTCPSendData_DEALER((void*)buffer, MAX_SIZE)) {
if ((*p) % 10 == 0) {
LOGGER_INFO("send thread sended msg count{:d}", *p);
}
loop ;
}
else {
LOGGER_WARN("send thread sended error");
}
std::this_thread::sleep_for(10ms);
}
}
void DEALER_RECV_Thread(size_t id)
{
ZeroMQ _0MQ;
std::string ip_port_str = "tcp://192.168.203.127:" std::to_string(52000);
_0MQ.InitTCPReceiveMode_DEALER(MAX_SIZE, ip_port_str.c_str());
uint32_t prev_head = 0;
char buffer[MAX_SIZE] = { 0 };
while (1) {
if (_0MQ.GetTCPReceiveData_DEALER((void*)buffer)== MAX_SIZE) {
uint32_t* head = (uint32_t*)&buffer[0];
if ((*head) % 10 == 0) {
LOGGER_INFO("recv thread{:d} recved msg count{:d}",id,*head);
}
if ((*head - prev_head) > 1) {
LOGGER_ERROR("recv thread{:d} miss msg{:d}", id, *head);
}
prev_head = *head;
}
std::this_thread::sleep_for(1ms);
}
}
void UDPRev1Thread()
{
ZeroMQ UDP_SUB_recv;
UDP_SUB_recv.InitUDPReceiveMode_DISH(MAX_SIZE, "udp://226.8.5.5:5555", "TV");
TimePoint prevTimePoint = Clock::now();
int loop = 0;
int prevLoop = 0;
while (1) {
char buffer[MAX_SIZE] = { 0 };
int size = UDP_SUB_recv.GetUDPReceiveData_DISH(buffer);
if (size >= 0) {
loop ;
}
if (Clock::now() - prevTimePoint >= 1s) {
std::cout << "UDP Rev2Count:" << std::to_string(loop - prevLoop) << "\n";
prevTimePoint = Clock::now();
prevLoop = loop;
}
std::this_thread::sleep_for(1us);
}
}
void UDPRev2Thread()
{
ZeroMQ UDP_SUB_recv;
UDP_SUB_recv.InitUDPReceiveMode_DISH(MAX_SIZE, "udp://226.8.5.5:5555", "TV");
TimePoint prevTimePoint = Clock::now();
int loop = 0;
int prevLoop = 0;
while (1) {
char buffer[MAX_SIZE] = { 0 };
int size=UDP_SUB_recv.GetUDPReceiveData_DISH(buffer);
if (size >= 0) {
loop ;
}
if (Clock::now() - prevTimePoint >= 1s) {
std::cout << "UDP Rev2Count:" << std::to_string(loop - prevLoop) << "\n";
prevTimePoint = Clock::now();
prevLoop = loop;
}
std::this_thread::sleep_for(1us);
}
}
void UDPSendThread()
{
ZeroMQ UDP_PUB_send;
UDP_PUB_send.InitUDPSendMode_RADIO(MAX_SIZE, "udp://226.8.5.5:5555", "TV");
int loop = 0;
char buffer[MAX_SIZE] = { 1 };
while (1) {
buffer[0] = loop;
buffer[MAX_SIZE-1] = loop ;
UDP_PUB_send.SetUDPSendData_RADIO(buffer, MAX_SIZE);
std::this_thread::sleep_for(2us);
}
}
void TCPSendREPThread()
{
ZeroMQ TCP_REP_send;
TCP_REP_send.InitTCPSendMode_REP(MAX_SIZE, "tcp://127.0.0.1:5566");
int loop = 0;
int prevLoop = 0;
uint8_t buffer[MAX_SIZE] = { 1,2,3 };
TimePoint prevTimePoint = Clock::now();
while (1) {
buffer[MAX_SIZE-1] = loop;
uint8_t rev_buffer[MAX_SIZE] = { 1,2,3 };
if (TCP_REP_send.GetTCPReceiveData_REP(rev_buffer)>0) {
//std::cout << "REP RevCount:" << "\n";
loop ;
TCP_REP_send.SetTCPSendData_REP((void*)buffer, MAX_SIZE);
}
if (Clock::now() - prevTimePoint >= 1s) {
std::cout << "TCP REP RevCount:" << std::to_string(loop - prevLoop) << "\n";
prevTimePoint = Clock::now();
prevLoop = loop;
}
std::this_thread::sleep_for(2us);
}
}
void TCPRevREQThread()
{
ZeroMQ TCP_REQ_rev;
TCP_REQ_rev.InitTCPReceiveMode_REQ(MAX_SIZE, "tcp://127.0.0.1:5566");//不能写localhost
uint8_t buffer[MAX_SIZE] = { 9,2,3 };
int loop = 1;
while (1) {
if (TCP_REQ_rev.GetTCPReceiveData_REQ(buffer)>0) {
//std::cout << "REQ RevCount:" << "\n";
buffer[0] = (loop 1);
buffer[1] = (loop 1);
buffer[2] = (loop 1);
buffer[MAX_SIZE-1] = (loop 1);
loop ;
TCP_REQ_rev.SetTCPSendData_REQ((void*)buffer, MAX_SIZE);
}
std::this_thread::sleep_for(2us);
}
}
std::array< std::thread, 10> rev_thr_array_list;
std::thread send_thr;
std::thread revc_thr;
int main()
{
std::cout << "Hello World!\n";
if (!g_Logger) {
g_Logger = std::make_unique<LogSPD>("log/ARJ21_HYS_panel.log", spdlog::level::trace);
}
char c=getchar();
// std::thread th1=std::thread(InprocSendThread);
// th1.detach();
// std::thread th2= std::thread(InprocRevThread);
// th2.detach();
//std::thread th2= std::thread(TCPRevREQThread);
//th2.detach();
//std::thread th3 = std::thread(TCPSendREPThread);
//th3.detach();
//TCP PUB
/*
if (c == '1')
{
for (int i = 0; i < 1; i ) {
thr_array[i] = std::thread(TCPSend1Thread,i);
thr_array[i].detach();
}
}
else if (c == '2')
{
for (int i = 0; i < 1; i ) {
thr_array[i] = std::thread(TCPRev1Thread,i);
thr_array[i].detach();
}
}
*/
//TCP PUB
/*
for (int i = 0; i < 1; i ) {
thr_array[i] = std::thread(TCPSend1Thread, i);
thr_array[i].detach();
Sleep(3000);
}
Sleep(1000);
for (int i = 0; i < 3; i ) {
thr_array[i] = std::thread(TCPRev1Thread, i);
thr_array[i].detach();
Sleep(3000);
}
*/
//TCP REP REQ
//std::thread th6 = std::thread(TCPRevREQThread);
//th6.detach();
// Sleep(1000);
// std::thread th7 = std::thread(TCPSendREPThread);
//th7.detach();
//UDP PUB SUB
//std::thread th3 = std::thread(UDPSendThread);
//th3.detach();
//Sleep(1000);
//std::thread th4 = std::thread(UDPRev1Thread);
//th4.detach();
//std::thread th5 = std::thread(UDPRev2Thread);
//th5.detach();
//TCP PUSH PULL
/*
for (int i = 0; i < 2; i ) {
thr_array[i] = std::thread(TCPSendThread_PUSH, i);
thr_array[i].detach();
Sleep(3000);
}
Sleep(1000);
for (int i = 0; i < 1; i ) {
thr_array[i] = std::thread(TCPRevThread_PULL, i);
thr_array[i].detach();
Sleep(3000);
}
*/
if (c == '1') {
LOGGER_INFO("send mode");
send_thr = std::thread(DEALER_Send_Thread);
}
else if (c == '2') {
LOGGER_INFO("recv mode");
for (auto i = 0; i < 10; i ) {
rev_thr_array_list[i] = std::thread(DEALER_RECV_Thread, i);
}
}
else if (c == '3') {
LOGGER_INFO("send recv mode");
send_thr = std::thread(DEALER_Send_Thread);
for (auto i = 0; i < 1; i ) {
rev_thr_array_list[i] = std::thread(DEALER_RECV_Thread, i);
}
}
while (1) {
Sleep(150);
}
}
.
├── ZeroMQ_Test
│ ├── Debug
│ │ ├── ZeroMQ_Test.log
│ │ ├── ZeroMQ_Test.obj
│ │ ├── ZeroMQ_Test.tlog
│ │ │ ├── CL.command.1.tlog
│ │ │ ├── CL.read.1.tlog
│ │ │ ├── CL.write.1.tlog
│ │ │ ├── ZeroMQ_Test.lastbuildstate
│ │ │ ├── link-cvtres.read.1.tlog
│ │ │ ├── link-cvtres.write.1.tlog
│ │ │ ├── link-rc.read.1.tlog
│ │ │ ├── link-rc.write.1.tlog
│ │ │ ├── link.command.1.tlog
│ │ │ ├── link.read.1.tlog
│ │ │ ├── link.write.1.tlog
│ │ │ └── unsuccessfulbuild
│ │ ├── vc142.idb
│ │ └── vc142.pdb
│ ├── LogSPD.h
│ ├── Logger.h
│ ├── ZMQ.cpp
│ ├── ZeroMQ.cpp
│ ├── ZeroMQ.h
│ ├── ZeroMQ_Test.cpp
│ ├── ZeroMQ_Test.vcxproj
│ ├── ZeroMQ_Test.vcxproj.filters
│ ├── ZeroMQ_Test.vcxproj.user
│ ├── cbbps
│ ├── log
│ │ └── ARJ21_HYS_panel.log
│ ├── pch.cpp
│ ├── pch.h
│ ├── spdlog
│ │ ├── async.h
│ │ ├── async_logger-inl.h
│ │ ├── async_logger.h
│ │ ├── cfg
│ │ │ ├── argv.h
│ │ │ ├── env.h
│ │ │ ├── helpers-inl.h
│ │ │ └── helpers.h
│ │ ├── common-inl.h
│ │ ├── common.h
│ │ ├── details
│ │ │ ├── backtracer-inl.h
│ │ │ ├── backtracer.h
│ │ │ ├── circular_q.h
│ │ │ ├── console_globals.h
│ │ │ ├── file_helper-inl.h
│ │ │ ├── file_helper.h
│ │ │ ├── fmt_helper.h
│ │ │ ├── log_msg-inl.h
│ │ │ ├── log_msg.h
│ │ │ ├── log_msg_buffer-inl.h
│ │ │ ├── log_msg_buffer.h
│ │ │ ├── mpmc_blocking_q.h
│ │ │ ├── null_mutex.h
│ │ │ ├── os-inl.h
│ │ │ ├── os.h
│ │ │ ├── periodic_worker-inl.h
│ │ │ ├── periodic_worker.h
│ │ │ ├── registry-inl.h
│ │ │ ├── registry.h
│ │ │ ├── synchronous_factory.h
│ │ │ ├── tcp_client-windows.h
│ │ │ ├── tcp_client.h
│ │ │ ├── thread_pool-inl.h
│ │ │ ├── thread_pool.h
│ │ │ ├── udp_client-windows.h
│ │ │ ├── udp_client.h
│ │ │ └── windows_include.h
│ │ ├── fmt
│ │ │ ├── bin_to_hex.h
│ │ │ ├── bundled
│ │ │ │ ├── args.h
│ │ │ │ ├── chrono.h
│ │ │ │ ├── color.h
│ │ │ │ ├── compile.h
│ │ │ │ ├── core.h
│ │ │ │ ├── fmt.license.rst
│ │ │ │ ├── format-inl.h
│ │ │ │ ├── format.h
│ │ │ │ ├── locale.h
│ │ │ │ ├── os.h
│ │ │ │ ├── ostream.h
│ │ │ │ ├── printf.h
│ │ │ │ ├── ranges.h
│ │ │ │ └── xchar.h
│ │ │ ├── chrono.h
│ │ │ ├── compile.h
│ │ │ ├── fmt.h
│ │ │ ├── ostr.h
│ │ │ └── xchar.h
│ │ ├── formatter.h
│ │ ├── fwd.h
│ │ ├── logger-inl.h
│ │ ├── logger.h
│ │ ├── pattern_formatter-inl.h
│ │ ├── pattern_formatter.h
│ │ ├── sinks
│ │ │ ├── android_sink.h
│ │ │ ├── ansicolor_sink-inl.h
│ │ │ ├── ansicolor_sink.h
│ │ │ ├── base_sink-inl.h
│ │ │ ├── base_sink.h
│ │ │ ├── basic_file_sink-inl.h
│ │ │ ├── basic_file_sink.h
│ │ │ ├── daily_file_sink.h
│ │ │ ├── dist_sink.h
│ │ │ ├── dup_filter_sink.h
│ │ │ ├── hourly_file_sink.h
│ │ │ ├── mongo_sink.h
│ │ │ ├── msvc_sink.h
│ │ │ ├── null_sink.h
│ │ │ ├── ostream_sink.h
│ │ │ ├── qt_sinks.h
│ │ │ ├── ringbuffer_sink.h
│ │ │ ├── rotating_file_sink-inl.h
│ │ │ ├── rotating_file_sink.h
│ │ │ ├── sink-inl.h
│ │ │ ├── sink.h
│ │ │ ├── stdout_color_sinks-inl.h
│ │ │ ├── stdout_color_sinks.h
│ │ │ ├── stdout_sinks-inl.h
│ │ │ ├── stdout_sinks.h
│ │ │ ├── syslog_sink.h
│ │ │ ├── systemd_sink.h
│ │ │ ├── tcp_sink.h
│ │ │ ├── udp_sink.h
│ │ │ ├── win_eventlog_sink.h
│ │ │ ├── wincolor_sink-inl.h
│ │ │ └── wincolor_sink.h
│ │ ├── spdlog-inl.h
│ │ ├── spdlog.h
│ │ ├── stopwatch.h
│ │ ├── tweakme.h
│ │ └── version.h
│ ├── x64
│ │ └── Debug
│ │ ├── ZeroMQ.obj
│ │ ├── ZeroMQ_Test.Build.CppClean.log
│ │ ├── ZeroMQ_Test.exe.recipe
│ │ ├── ZeroMQ_Test.ilk
│ │ ├── ZeroMQ_Test.log
│ │ ├── ZeroMQ_Test.obj
│ │ ├── ZeroMQ_Test.tlog
│ │ │ ├── CL.command.1.tlog
│ │ │ ├── CL.read.1.tlog
│ │ │ ├── CL.write.1.tlog
│ │ │ ├── ZeroMQ_Test.lastbuildstate
│ │ │ ├── link.command.1.tlog
│ │ │ ├── link.read.1.tlog
│ │ │ └── link.write.1.tlog
│ │ ├── ZeroMQ_Test.vcxproj.FileListAbsolute.txt
│ │ ├── vc142.idb
│ │ └── vc142.pdb
│ ├── zmq.h
│ └── zmq_utils.h
├── ZeroMQ_d
│ ├── libzmq-v142-mt-gd-4_3_5.exp
│ ├── libzmq-v142-mt-gd-4_3_5.lib
│ ├── libzmq-v142-mt-sgd-4_3_5.lib
│ ├── testutil-static.lib
│ ├── testutil.lib
│ ├── unity.lib
│ └── unity.pdb
└── 好例子网_ZeroMQ_Test.7z
14 directories, 154 files
小贴士
感谢您为本站写下的评论,您的评论对其它用户来说具有重要的参考价值,所以请认真填写。
- 类似“顶”、“沙发”之类没有营养的文字,对勤劳贡献的楼主来说是令人沮丧的反馈信息。
- 相信您也不想看到一排文字/表情墙,所以请不要反馈意义不大的重复字符,也请尽量不要纯表情的回复。
- 提问之前请再仔细看一遍楼主的说明,或许是您遗漏了。
- 请勿到处挖坑绊人、招贴广告。既占空间让人厌烦,又没人会搭理,于人于己都无利。
关于好例子网
本站旨在为广大IT学习爱好者提供一个非营利性互相学习交流分享平台。本站所有资源都可以被免费获取学习研究。本站资源来自网友分享,对搜索内容的合法性不具有预见性、识别性、控制性,仅供学习研究,请务必在下载后24小时内给予删除,不得用于其他任何用途,否则后果自负。基于互联网的特殊性,平台无法对用户传输的作品、信息、内容的权属或合法性、安全性、合规性、真实性、科学性、完整权、有效性等进行实质审查;无论平台是否已进行审查,用户均应自行承担因其传输的作品、信息、内容而可能或已经产生的侵权或权属纠纷等法律责任。本站所有资源不代表本站的观点或立场,基于网友分享,根据中国法律《信息网络传播权保护条例》第二十二与二十三条之规定,若资源存在侵权或相关问题请联系本站客服人员,点此联系我们。关于更多版权及免责申明参见 版权及免责申明
网友评论
我要评论