ZeroMQ的PUB/SUB与REQ/REP是两种根本不同的通信模式:前者单向广播、无连接、需显式订阅;后者双向配对、强顺序、send/recv必须严格交替,socket类型不可混用或复用。
ZeroMQ 的 C++ 发布订阅(PUB/SUB)和请求响应(REQ/REP)模式,本质是两种完全不同的通信契约:前者是单向广播、无连接、无确认;后者是双向配对、隐式同步、强顺序。选错模式或混用语义,90% 的“收不到消息”“卡死”“乱序”问题都源于此。
zmq::socket_t 类型必须严格匹配模式,且不能复用ZeroMQ 不允许一个 socket 同时承担多种角色。比如用 ZMQ_REQ socket 去 connect 一个 ZMQ_PUB 端口,程序不会报错,但永远收不到任何数据——因为协议层根本不兼容。
ZMQ_REQ 只能 connect 到 ZMQ_REP(或 ZMQ_ROUTER),且每次 send 后必须紧跟 recv,否则下一次 send 会阻塞ZMQ_PUB 只能 bind(不建议 connect),ZMQ_SUB 只能 connect(不建议 bind),且 ZMQ_SUB 必须调用 setsockopt(ZMQ_SUBSCRIBE, ...) 才能收到消息,空订阅("")在较新版本中默认不生效zmq::context_t 可创建多个 socket,但每个 socket 的类型和行为由构造时第二个参数唯一决定,不可运行时变更zmq_setsockopt(..., ZMQ_SUBSCRIBE, ...) 是必填项这是新手最常踩的坑:SUB 端代码写了 connect,也写了 recv,但始终收不到 PUB 发出的消息。根本原因就是漏了订阅动作——ZeroMQ 的 SUB socket 默认屏蔽所有消息,哪怕 PUB 已经在发。
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0); 表示接收所有主题(注意长度传 0,不是 1)std::string topic = "stock.AAPL"; subscriber.setsockopt(ZMQ_SUBSCRIBE, topic.c_str(), topic.size());
setsockopt(ZMQ_SUBSCRIBE, ...) 实现多主题订阅,但不支持通配符(如 "stock.*")——需靠应用层过滤setsockopt(ZMQ_UNSUBSCRIBE, ...),但实际中极少需要send 和 recv 必须严格交替,且不能跳过响应ZMQ_REQ socket 内部维护一个“请求-响应”状态机。一旦发出请求未收到回复,socket 就会进入不可用状态,后续 send 直接返回 false 或抛异常(取决于 cppzmq 版本)。
ZMQ_REP 也必须先 recv 再 send,否则客户端会一直等待ZMQ_DEALER + ZMQ_ROUTER
setsockopt(ZMQ_RCVTIMEO, &timeout_ms, sizeof(timeout_ms)),避免无限阻塞;但注意:ZMQ_SNDTIMEO 对 REQ socket 无效#include#include #include // 客户端:REQ 模式(务必配对 recv) int main() { zmq::context_t ctx; zmq::socket_t sock(ctx, ZMQ_REQ); sock.connect("tcp://localhost:5555"); // 设置接收超时,防止永久卡住 int timeout_ms = 3000; sock.setsockopt(ZMQ_RCVTIMEO, &timeout_ms, sizeof(timeout_ms)); zmq::message_t req(5); memcpy(req.data(), "HELLO", 5); sock.send(req); zmq::message_t reply; if (sock.recv(reply) == 0) { // 返回 0 表示成功 std::cout << "Got: " << std::string(static_cast (reply.data()), reply.size()) << "\n"; } else { std::cerr << "Timeout or error\n"; } }
PUB 端 bind 后立刻开始发消息,但 SUB 端 connect 是异步建立的,中间存在“连接窗口期”。若 PUB 在 SUB 还没连上来就已发消息,这些消息将永久丢失——ZeroMQ 默认不缓存(不像 Kafk
a)。这不是 bug,是设计选择。
ZMQ_XPUB + ZMQ_XSUB 设备做代理,并启用 ZMQ_XPUB_VERBOSE 监听订阅事件,实现动态流控ZMQ_CONFLATE 可开启“只保留最新消息”,但仅对单个订阅者有效,且不解决初始连接丢失问题真正难的不是写通代码,而是理解 ZeroMQ 每种 socket 类型背后的状态机和契约约束。REQ/REP 要求严格配对,PUB/SUB 要求显式订阅+容忍丢消息——把它们当普通 TCP socket 用,一定会掉进坑里。