在好例子网,分享、交流、成长!
您当前所在位置:首页C/C++ 开发实例C/C++语言基础 → ZeroMQ示例

ZeroMQ示例

C/C++语言基础

下载此实例
  • 开发语言:C/C++
  • 实例大小:5.53M
  • 下载次数:10
  • 浏览次数:88
  • 发布时间:2022-02-02
  • 实例类别:C/C++语言基础
  • 发 布 人:mellkaisen
  • 文件格式:.7z
  • 所需积分:2
 相关标签: zero test ROM ST ES

实例介绍

【实例简介】ZeroMQ示例

【实例截图】

from clipboard

【核心代码】


using Clock = std::chrono::steady_clock;
using TimePoint = Clock::time_point;
using namespace std::chrono_literals;
using namespace ZMQ;
using namespace LOGGER;

void* contextInproc = nullptr;
void InprocRevThread()
{
    while (contextInproc==nullptr)Sleep(100);
    void* socket = zmq_socket(contextInproc, ZMQ_SUB);
    zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0);
    int err = zmq_connect(socket, "inproc://cbbps");

    TimePoint prevTimePoint = Clock::now();
    int loop = 0;
    int prevLoop = 0;
    while (1) {
        char buffer[64] = { 0 };
        int size = zmq_recv(socket, buffer, 64, ZMQ_DONTWAIT);
        if (size > 0)loop ;

        if (Clock::now() - prevTimePoint >= 1s) {
            std::cout << "INPROC RevCount:" << std::to_string(loop - prevLoop) << "\n";
            prevTimePoint = Clock::now();
            prevLoop = loop;
        }

        std::this_thread::sleep_for(1us);
    }
}


void InprocSendThread()
{
    contextInproc = nullptr;
    contextInproc = zmq_ctx_new();
    void* socket = zmq_socket(contextInproc, ZMQ_PUB);
    int err= zmq_bind(socket, "inproc://cbbps");

    while (1) {
        char  buffer[64] = "Inproc ZeroMQ Test";
        int err = zmq_send(socket, buffer, 64, ZMQ_DONTWAIT);
        std::this_thread::sleep_for(2us);
    }

}


void TCPRev1Thread(int port)
{
    ZeroMQ TCP_SUB_recv;
    int which_port = 5560 port;
    std::string ip_port_str = "tcp://10.23.33.88:" std::to_string(5560);
    TCP_SUB_recv.InitTCPReceiveMode_SUB(MAX_SIZE, ip_port_str.c_str());
    
    
    TimePoint prevTimePoint = Clock::now();
    uint32_t loop = 0;
    uint32_t prevLoop = 0;
    uint32_t reved = 0;
    uint32_t prev_reved = 0;
    while (1) {
        char buffer[MAX_SIZE] = { 0 };

        int size = TCP_SUB_recv.GetTCPReceiveData_SUB_More(buffer);

        if (size >= 0) {
            loop ;
            uint32_t* p = (uint32_t*)&buffer[MAX_SIZE - 4];
            if (*p != 0)reved ;
        }


        if (Clock::now() - prevTimePoint >= 1s) {
            std::cout << "TCP Rev"<< which_port<<"Count:" << std::to_string(loop - prevLoop) << "\n";
           
            std::cout << "TCP RevData" << which_port << "Count:"<<std::to_string(reved- prev_reved)  << "\n";
            prev_reved = reved;
            prevTimePoint = Clock::now();
            prevLoop = loop;
        }

        std::this_thread::sleep_for(1us);
    }
}


void TCPSend1Thread(int port)
{
    ZeroMQ TCP_PUB_send;
    port = 5560 0;
    std::string ip_port_str = "tcp://10.23.33.88:" std::to_string(port);
    TCP_PUB_send.InitTCPSendMode_PUB(MAX_SIZE, ip_port_str.c_str());

    uint32_t loop = 0;
    char buffer[MAX_SIZE] = { 1 };
    while (1) {
        buffer[0] = loop;
        int* p = (int*)&buffer[MAX_SIZE - 4];
         *p= loop ;
        TCP_PUB_send.SetTCPSendData_PUB_More((void*)buffer, MAX_SIZE,1024);

        std::this_thread::sleep_for(2us);
    }
}

void TCPSendThread_PUSH(int port)
{
    ZeroMQ TCP_send;
    int which_port = 5560 0;
    std::string ip_port_str = "tcp://10.23.33.88:" std::to_string(which_port);
    TCP_send.InitTCPSendMode_PUSH(MAX_SIZE, ip_port_str.c_str());

    uint32_t loop = 0;
    char buffer[MAX_SIZE] = { 1 };
    while (1) {
        buffer[0] = (char)port 1;
        int* p = (int*)&buffer[MAX_SIZE - 4];
        *p = loop ;
        TCP_send.SetTCPSendData_PUSH((void*)buffer, MAX_SIZE);

        std::this_thread::sleep_for(2us);
    }
}

void TCPRevThread_PULL(int port)
{
    ZeroMQ TCP_recv;
    int which_port = 5560 port;
    std::string ip_port_str = "tcp://10.23.33.88:" std::to_string(5560);
    TCP_recv.InitTCPReceiveMode_PULL(MAX_SIZE, ip_port_str.c_str());


    TimePoint prevTimePoint = Clock::now();
    uint32_t loop = 0;
    uint32_t prevLoop = 0;
    uint32_t reved = 0;
    uint32_t prev_reved = 0;
    while (1) {
        char buffer[MAX_SIZE] = { 0 };

        int size = TCP_recv.GetTCPReceiveData_PULL(buffer);

        if (size >= 0) {
            loop ;
            uint32_t* p = (uint32_t*)&buffer[MAX_SIZE - 4];
            if (*p != 0)reved ;
            uint8_t port = buffer[0];
            std::cout << "TCP Rev Port" << std::to_string(port) << "\n";
        }


        if (Clock::now() - prevTimePoint >= 1s) {
            std::cout << "TCP Rev" << which_port << "Count:" << std::to_string(loop - prevLoop) << "\n";

            std::cout << "TCP RevData" << which_port << "Count:" << std::to_string(reved - prev_reved) << "\n";
            prev_reved = reved;
            prevTimePoint = Clock::now();
            prevLoop = loop;
        }

        std::this_thread::sleep_for(1us);
    }
}


void DEALER_Send_Thread()
{
    ZeroMQ _0MQ;
    std::string ip_port_str = "tcp://192.168.203.127:" std::to_string(52000);
    _0MQ.InitTCPSendMode_DEALER(MAX_SIZE, ip_port_str.c_str());

    uint32_t loop = 1;
    char buffer[MAX_SIZE] = { 0 };
    while (1) {
        uint32_t* p = (uint32_t*)&buffer[0];
        *p = loop;
        if (_0MQ.SetTCPSendData_DEALER((void*)buffer, MAX_SIZE)) {
            if ((*p) % 10 == 0) {
                LOGGER_INFO("send thread sended msg count{:d}", *p);
            }
            loop ;
        }
        else {
            LOGGER_WARN("send thread sended error");
        }
        
        std::this_thread::sleep_for(10ms);
    }
}

void DEALER_RECV_Thread(size_t id)
{
    ZeroMQ _0MQ;
    std::string ip_port_str = "tcp://192.168.203.127:" std::to_string(52000);
    _0MQ.InitTCPReceiveMode_DEALER(MAX_SIZE, ip_port_str.c_str());

    uint32_t prev_head = 0;
    char buffer[MAX_SIZE] = { 0 };
    while (1) {
        if (_0MQ.GetTCPReceiveData_DEALER((void*)buffer)== MAX_SIZE) {
            uint32_t* head = (uint32_t*)&buffer[0];
            if ((*head) % 10 == 0) {
                LOGGER_INFO("recv thread{:d} recved msg count{:d}",id,*head);
            }

            if ((*head - prev_head) > 1) {
                LOGGER_ERROR("recv thread{:d} miss msg{:d}", id, *head);
            }

            prev_head = *head;
        }

        std::this_thread::sleep_for(1ms);
    }
}

void UDPRev1Thread()
{
    ZeroMQ UDP_SUB_recv;

    UDP_SUB_recv.InitUDPReceiveMode_DISH(MAX_SIZE, "udp://226.8.5.5:5555", "TV");

    TimePoint prevTimePoint = Clock::now();
    int loop = 0;
    int prevLoop = 0;
    while (1) {
        char buffer[MAX_SIZE] = { 0 };

        int size = UDP_SUB_recv.GetUDPReceiveData_DISH(buffer);

        if (size >= 0) {
            loop ;
        }


        if (Clock::now() - prevTimePoint >= 1s) {
            std::cout << "UDP Rev2Count:" << std::to_string(loop - prevLoop) << "\n";
            prevTimePoint = Clock::now();
            prevLoop = loop;
        }

        std::this_thread::sleep_for(1us);
    }
}


void UDPRev2Thread()
{
    ZeroMQ UDP_SUB_recv;

    UDP_SUB_recv.InitUDPReceiveMode_DISH(MAX_SIZE, "udp://226.8.5.5:5555", "TV");

    TimePoint prevTimePoint = Clock::now();
    int loop = 0;
    int prevLoop = 0;
    while (1) {
        char buffer[MAX_SIZE] = { 0 };

        int size=UDP_SUB_recv.GetUDPReceiveData_DISH(buffer);

        if (size >= 0) {
            loop ;
        }


        if (Clock::now() - prevTimePoint >= 1s) {
            std::cout << "UDP Rev2Count:" << std::to_string(loop - prevLoop) << "\n";
            prevTimePoint = Clock::now();
            prevLoop = loop;
        }

        std::this_thread::sleep_for(1us);
    }
}

void UDPSendThread()
{
    ZeroMQ UDP_PUB_send;
    
    UDP_PUB_send.InitUDPSendMode_RADIO(MAX_SIZE, "udp://226.8.5.5:5555", "TV");
    
    int loop = 0;
    char buffer[MAX_SIZE] = { 1 };
    while (1) {
        buffer[0] = loop;
        buffer[MAX_SIZE-1] = loop ;
        UDP_PUB_send.SetUDPSendData_RADIO(buffer, MAX_SIZE);

        std::this_thread::sleep_for(2us);
    }
}


void TCPSendREPThread()
{
    ZeroMQ TCP_REP_send;

    TCP_REP_send.InitTCPSendMode_REP(MAX_SIZE, "tcp://127.0.0.1:5566");

    int loop = 0;
    int prevLoop = 0;
    uint8_t buffer[MAX_SIZE] = { 1,2,3 };
    TimePoint prevTimePoint = Clock::now();
    while (1) {
        buffer[MAX_SIZE-1] = loop;

        

        uint8_t rev_buffer[MAX_SIZE] = { 1,2,3 };
        if (TCP_REP_send.GetTCPReceiveData_REP(rev_buffer)>0) {
            //std::cout << "REP RevCount:"  << "\n";
            loop ;
            TCP_REP_send.SetTCPSendData_REP((void*)buffer, MAX_SIZE);
            
        }

        if (Clock::now() - prevTimePoint >= 1s) {
            std::cout << "TCP REP RevCount:" << std::to_string(loop - prevLoop) << "\n";
            prevTimePoint = Clock::now();
            prevLoop = loop;
        }
        std::this_thread::sleep_for(2us);
    }

}

void TCPRevREQThread()
{
    ZeroMQ TCP_REQ_rev;

    TCP_REQ_rev.InitTCPReceiveMode_REQ(MAX_SIZE, "tcp://127.0.0.1:5566");//不能写localhost

    uint8_t buffer[MAX_SIZE] = { 9,2,3 };
    int loop = 1;
    
    while (1) {

        if (TCP_REQ_rev.GetTCPReceiveData_REQ(buffer)>0) {
            //std::cout << "REQ RevCount:" << "\n";
            buffer[0] = (loop 1);
            buffer[1] = (loop 1);
            buffer[2] = (loop 1);
            buffer[MAX_SIZE-1] = (loop 1);
            loop ;
            TCP_REQ_rev.SetTCPSendData_REQ((void*)buffer, MAX_SIZE);
        }
        std::this_thread::sleep_for(2us);
    }

}

std::array< std::thread, 10> rev_thr_array_list;
std::thread send_thr;
std::thread revc_thr;
int main()
{
    std::cout << "Hello World!\n";
    if (!g_Logger) {
        g_Logger = std::make_unique<LogSPD>("log/ARJ21_HYS_panel.log", spdlog::level::trace);
    }
   
    char c=getchar();
   // std::thread th1=std::thread(InprocSendThread);
   // th1.detach();
   // std::thread th2= std::thread(InprocRevThread);
   // th2.detach();

   //std::thread th2= std::thread(TCPRevREQThread);
   //th2.detach();
   //std::thread th3 = std::thread(TCPSendREPThread);
   //th3.detach();

    //TCP PUB
    /*
    if (c == '1')
    {
        for (int i = 0; i < 1; i ) {
            thr_array[i] = std::thread(TCPSend1Thread,i);
            thr_array[i].detach();
        }
    }
    else if (c == '2')
    {
        for (int i = 0; i < 1; i ) {
            thr_array[i] = std::thread(TCPRev1Thread,i);
            thr_array[i].detach();
        }
    }
    */

    //TCP PUB
    /*
    for (int i = 0; i < 1; i ) {
        thr_array[i] = std::thread(TCPSend1Thread, i);
        thr_array[i].detach();
        Sleep(3000);
    }
    Sleep(1000);
    for (int i = 0; i < 3; i ) {
        thr_array[i] = std::thread(TCPRev1Thread, i);
        thr_array[i].detach();
        Sleep(3000);
    }
    */
   
    //TCP REP REQ
   //std::thread th6 = std::thread(TCPRevREQThread);
   //th6.detach();
   // Sleep(1000);
   // std::thread th7 = std::thread(TCPSendREPThread);
   //th7.detach();

    //UDP PUB SUB
    //std::thread th3 = std::thread(UDPSendThread);
    //th3.detach();
    //Sleep(1000);
    //std::thread th4 = std::thread(UDPRev1Thread);
    //th4.detach();
    //std::thread th5 = std::thread(UDPRev2Thread);
    //th5.detach();

    //TCP PUSH PULL
    /*
    for (int i = 0; i < 2; i ) {
        thr_array[i] = std::thread(TCPSendThread_PUSH, i);
        thr_array[i].detach();
        Sleep(3000);
    }
    Sleep(1000);
    for (int i = 0; i < 1; i ) {
        thr_array[i] = std::thread(TCPRevThread_PULL, i);
        thr_array[i].detach();
        Sleep(3000);
    }
    */


    if (c == '1') {
        LOGGER_INFO("send mode");
        send_thr = std::thread(DEALER_Send_Thread);
    }
    else if (c == '2') {
        LOGGER_INFO("recv mode");
        for (auto i = 0; i < 10; i ) {
            rev_thr_array_list[i] = std::thread(DEALER_RECV_Thread, i);
        }
    }
    else if (c == '3') {
        LOGGER_INFO("send recv mode");
        send_thr = std::thread(DEALER_Send_Thread);
        for (auto i = 0; i < 1; i ) {
            rev_thr_array_list[i] = std::thread(DEALER_RECV_Thread, i);
        }
    }

    while (1) {
        Sleep(150);
    }

}

.
├── ZeroMQ_Test
│   ├── Debug
│   │   ├── ZeroMQ_Test.log
│   │   ├── ZeroMQ_Test.obj
│   │   ├── ZeroMQ_Test.tlog
│   │   │   ├── CL.command.1.tlog
│   │   │   ├── CL.read.1.tlog
│   │   │   ├── CL.write.1.tlog
│   │   │   ├── ZeroMQ_Test.lastbuildstate
│   │   │   ├── link-cvtres.read.1.tlog
│   │   │   ├── link-cvtres.write.1.tlog
│   │   │   ├── link-rc.read.1.tlog
│   │   │   ├── link-rc.write.1.tlog
│   │   │   ├── link.command.1.tlog
│   │   │   ├── link.read.1.tlog
│   │   │   ├── link.write.1.tlog
│   │   │   └── unsuccessfulbuild
│   │   ├── vc142.idb
│   │   └── vc142.pdb
│   ├── LogSPD.h
│   ├── Logger.h
│   ├── ZMQ.cpp
│   ├── ZeroMQ.cpp
│   ├── ZeroMQ.h
│   ├── ZeroMQ_Test.cpp
│   ├── ZeroMQ_Test.vcxproj
│   ├── ZeroMQ_Test.vcxproj.filters
│   ├── ZeroMQ_Test.vcxproj.user
│   ├── cbbps
│   ├── log
│   │   └── ARJ21_HYS_panel.log
│   ├── pch.cpp
│   ├── pch.h
│   ├── spdlog
│   │   ├── async.h
│   │   ├── async_logger-inl.h
│   │   ├── async_logger.h
│   │   ├── cfg
│   │   │   ├── argv.h
│   │   │   ├── env.h
│   │   │   ├── helpers-inl.h
│   │   │   └── helpers.h
│   │   ├── common-inl.h
│   │   ├── common.h
│   │   ├── details
│   │   │   ├── backtracer-inl.h
│   │   │   ├── backtracer.h
│   │   │   ├── circular_q.h
│   │   │   ├── console_globals.h
│   │   │   ├── file_helper-inl.h
│   │   │   ├── file_helper.h
│   │   │   ├── fmt_helper.h
│   │   │   ├── log_msg-inl.h
│   │   │   ├── log_msg.h
│   │   │   ├── log_msg_buffer-inl.h
│   │   │   ├── log_msg_buffer.h
│   │   │   ├── mpmc_blocking_q.h
│   │   │   ├── null_mutex.h
│   │   │   ├── os-inl.h
│   │   │   ├── os.h
│   │   │   ├── periodic_worker-inl.h
│   │   │   ├── periodic_worker.h
│   │   │   ├── registry-inl.h
│   │   │   ├── registry.h
│   │   │   ├── synchronous_factory.h
│   │   │   ├── tcp_client-windows.h
│   │   │   ├── tcp_client.h
│   │   │   ├── thread_pool-inl.h
│   │   │   ├── thread_pool.h
│   │   │   ├── udp_client-windows.h
│   │   │   ├── udp_client.h
│   │   │   └── windows_include.h
│   │   ├── fmt
│   │   │   ├── bin_to_hex.h
│   │   │   ├── bundled
│   │   │   │   ├── args.h
│   │   │   │   ├── chrono.h
│   │   │   │   ├── color.h
│   │   │   │   ├── compile.h
│   │   │   │   ├── core.h
│   │   │   │   ├── fmt.license.rst
│   │   │   │   ├── format-inl.h
│   │   │   │   ├── format.h
│   │   │   │   ├── locale.h
│   │   │   │   ├── os.h
│   │   │   │   ├── ostream.h
│   │   │   │   ├── printf.h
│   │   │   │   ├── ranges.h
│   │   │   │   └── xchar.h
│   │   │   ├── chrono.h
│   │   │   ├── compile.h
│   │   │   ├── fmt.h
│   │   │   ├── ostr.h
│   │   │   └── xchar.h
│   │   ├── formatter.h
│   │   ├── fwd.h
│   │   ├── logger-inl.h
│   │   ├── logger.h
│   │   ├── pattern_formatter-inl.h
│   │   ├── pattern_formatter.h
│   │   ├── sinks
│   │   │   ├── android_sink.h
│   │   │   ├── ansicolor_sink-inl.h
│   │   │   ├── ansicolor_sink.h
│   │   │   ├── base_sink-inl.h
│   │   │   ├── base_sink.h
│   │   │   ├── basic_file_sink-inl.h
│   │   │   ├── basic_file_sink.h
│   │   │   ├── daily_file_sink.h
│   │   │   ├── dist_sink.h
│   │   │   ├── dup_filter_sink.h
│   │   │   ├── hourly_file_sink.h
│   │   │   ├── mongo_sink.h
│   │   │   ├── msvc_sink.h
│   │   │   ├── null_sink.h
│   │   │   ├── ostream_sink.h
│   │   │   ├── qt_sinks.h
│   │   │   ├── ringbuffer_sink.h
│   │   │   ├── rotating_file_sink-inl.h
│   │   │   ├── rotating_file_sink.h
│   │   │   ├── sink-inl.h
│   │   │   ├── sink.h
│   │   │   ├── stdout_color_sinks-inl.h
│   │   │   ├── stdout_color_sinks.h
│   │   │   ├── stdout_sinks-inl.h
│   │   │   ├── stdout_sinks.h
│   │   │   ├── syslog_sink.h
│   │   │   ├── systemd_sink.h
│   │   │   ├── tcp_sink.h
│   │   │   ├── udp_sink.h
│   │   │   ├── win_eventlog_sink.h
│   │   │   ├── wincolor_sink-inl.h
│   │   │   └── wincolor_sink.h
│   │   ├── spdlog-inl.h
│   │   ├── spdlog.h
│   │   ├── stopwatch.h
│   │   ├── tweakme.h
│   │   └── version.h
│   ├── x64
│   │   └── Debug
│   │       ├── ZeroMQ.obj
│   │       ├── ZeroMQ_Test.Build.CppClean.log
│   │       ├── ZeroMQ_Test.exe.recipe
│   │       ├── ZeroMQ_Test.ilk
│   │       ├── ZeroMQ_Test.log
│   │       ├── ZeroMQ_Test.obj
│   │       ├── ZeroMQ_Test.tlog
│   │       │   ├── CL.command.1.tlog
│   │       │   ├── CL.read.1.tlog
│   │       │   ├── CL.write.1.tlog
│   │       │   ├── ZeroMQ_Test.lastbuildstate
│   │       │   ├── link.command.1.tlog
│   │       │   ├── link.read.1.tlog
│   │       │   └── link.write.1.tlog
│   │       ├── ZeroMQ_Test.vcxproj.FileListAbsolute.txt
│   │       ├── vc142.idb
│   │       └── vc142.pdb
│   ├── zmq.h
│   └── zmq_utils.h
├── ZeroMQ_d
│   ├── libzmq-v142-mt-gd-4_3_5.exp
│   ├── libzmq-v142-mt-gd-4_3_5.lib
│   ├── libzmq-v142-mt-sgd-4_3_5.lib
│   ├── testutil-static.lib
│   ├── testutil.lib
│   ├── unity.lib
│   └── unity.pdb
└── 好例子网_ZeroMQ_Test.7z

14 directories, 154 files


标签: zero test ROM ST ES

实例下载地址

ZeroMQ示例

不能下载?内容有错? 点击这里报错 + 投诉 + 提问

好例子网口号:伸出你的我的手 — 分享

网友评论

发表评论

(您的评论需要经过审核才能显示)

查看所有0条评论>>

小贴士

感谢您为本站写下的评论,您的评论对其它用户来说具有重要的参考价值,所以请认真填写。

  • 类似“顶”、“沙发”之类没有营养的文字,对勤劳贡献的楼主来说是令人沮丧的反馈信息。
  • 相信您也不想看到一排文字/表情墙,所以请不要反馈意义不大的重复字符,也请尽量不要纯表情的回复。
  • 提问之前请再仔细看一遍楼主的说明,或许是您遗漏了。
  • 请勿到处挖坑绊人、招贴广告。既占空间让人厌烦,又没人会搭理,于人于己都无利。

关于好例子网

本站旨在为广大IT学习爱好者提供一个非营利性互相学习交流分享平台。本站所有资源都可以被免费获取学习研究。本站资源来自网友分享,对搜索内容的合法性不具有预见性、识别性、控制性,仅供学习研究,请务必在下载后24小时内给予删除,不得用于其他任何用途,否则后果自负。基于互联网的特殊性,平台无法对用户传输的作品、信息、内容的权属或合法性、安全性、合规性、真实性、科学性、完整权、有效性等进行实质审查;无论平台是否已进行审查,用户均应自行承担因其传输的作品、信息、内容而可能或已经产生的侵权或权属纠纷等法律责任。本站所有资源不代表本站的观点或立场,基于网友分享,根据中国法律《信息网络传播权保护条例》第二十二与二十三条之规定,若资源存在侵权或相关问题请联系本站客服人员,点此联系我们。关于更多版权及免责申明参见 版权及免责申明

;
报警