《ZeroMq用户手册完整.pdf》由会员分享,可在线阅读,更多相关《ZeroMq用户手册完整.pdf(353页珍藏版)》请在taowenge.com淘文阁网|工程机械CAD图纸|机械工程制图|CAD装配图下载|SolidWorks_CaTia_CAD_UG_PROE_设计图分享下载上搜索。
1、zguide-cn haozu/zguide-cn 页 1 zguide-cn haozu/zguide-cn 页 2 目录目录 第一章 ZeroMQ 基础.5 拯救世界.5 ZMQ 简介.6 需要具备的知识.6 获取示例.6 提问-回答.7 关于字符串.12 获取版本号.13 让消息流动起来.13 分布式处理.17 使用 ZMQ 编程.23 ZMQ 2.1 版.25 正确地使用上下文.26 正确地退出和清理.26 我们为什么需要 ZMQ.27 套接字的扩展性.31 如果解决丢失消息的问题.32 警告:你的想法可能会被颠覆!.34 第二章 ZeroMQ 进阶.34 零的哲学.35 套接字 AP
2、I.35 使用套接字构建拓扑结构.36 使用套接字传递数据.38 单播传输.39 ZMQ 不只是数据传输.40 I/O 线程.40 核心消息模式.41 上层消息模式.42 消息的使用方法.42 处理多个套接字.44 处理错误和 ETERM 信号.47 zguide-cn haozu/zguide-cn 页 3 处理中断信号.52 检测内存泄露.54 多帧消息.55 中间件和装置.56 ZMQ 多线程编程.68 线程间的信号传输.71 节点协调.74 零拷贝.78 瞬时套接字和持久套接字.79 发布-订阅消息信封.82(半)持久订阅者和阈值(HWM).84 这就是你想要的!.90 第三章 高级请
3、求-应答模式.90 Request-Reply Envelopes.91 自定义请求-应答路由.95 ROUTER-DEALER 路由.96 最近最少使用算法路由(LRU 模式).100 使用地址进行路由.104 请求-应答模式下的消息代理.107 MQ 上层 API 的封装.115 异步 C/S 结构.124 实战:跨代理路由.130 第四章 可靠的请求-应答模式.155 什么是可靠性?.155 可靠性设计.156 客户端的可靠性设计(懒惰海盗模式).157 基本的可靠队列(简单海盗模式).162 健壮的可靠队列(偏执海盗模式).166 心跳.175 约定和协议.176 面向服务的可靠队列(
4、管家模式).177 异步管家模式.203 zguide-cn haozu/zguide-cn 页 4 服务查询.213 幂等服务.215 脱机可靠性(巨人模式).215 高可靠对称节点(双子星模式).228 无中间件的可靠性(自由者模式).249 总结.272 第五章 高级发布-订阅模式.272 检测慢订阅者(自杀的蜗牛模式).272 高速订阅者(黑箱模式).275 共享键值缓存(克隆模式).278 zguide-cn haozu/zguide-cn 页 5 第一章第一章 ZeroMQZeroMQ 基础基础 拯救世界拯救世界 如何解释 ZMQ?有些人会先说一堆 ZMQ 的好:它是一套用于快速构
5、建的套接字组件;它的信箱系统有超强的路由能力;它太快了!而有些人则喜欢分享他们被 ZMQ 点悟的时刻,那些被灵感击中的瞬间:所有的事情突然变得简单明了,让人大开眼界。另一些人则会拿 ZMQ 同其他产品做个比较:它更小,更简单,但却让人觉得如此熟悉。对于我个人而言,我则更倾向于和别人分享 ZMQ 的诞生史,相信会和各位读者有所共鸣。编程是一门科学,但往往会乔装成一门艺术。我们从不去了解软件最底层的机理,或者说根本没有人在乎这些。软件并不只是算法、数据结构、编程语言、或者抽象云云,这些不过是一些工具而已,被我们创造、使用、最后抛弃。软件真正的本质,其实是人的本质。举例来说,当我们遇到一个高度复杂的
6、问题时,我们会群策群力,分工合作,将问题拆分为若干个部分,一起解决。这里就体现了编程的科学:创建一组小型的构建模块,让人们易于理解和使用,那么大家就会一起用它来解决问题。我们生活在一个普遍联系的世界里,需要现代的编程软件为我们做指引。所以,未来我们所需要的用于处理大规模计算的构建模块,必须是普遍联系的,而且能够并行运作。那时,程序代码不能再只关注自己,它们需要互相交流,变得足够健谈。程序代码需要像人脑一样,数以兆计的神经元高速地传输信号,在一个没有中央控制的环境下,没有单点故障的环境下,解决问题。这一点其实并不意外,因为就当今的网络来讲,每个节点其实就像是连接了一个人脑一样。如果你曾和线程、协
7、议、或网络打过交道,你会觉得我上面的话像是天方夜谭。因为在实际应用过程中,只是连接几个程序或网络就已经非常困难和麻烦了。数以兆计的节点?那真是无法想象的。现今只有资金雄厚的企业才能负担得起这种软件和服务。当今世界的网络结构已经远远超越了我们自身的驾驭能力。十九世纪八十年代的软件危机,弗莱德布鲁克斯曾说过,这个世上没有银弹。后来,免费和开源解决了这次软件危机,让我们能够高效地分享知识。如今,我们又面临一次新的软件危机,只不过我们谈论得不多。只有那些大型的、富足的企业才有财力建立高度联系的应用程序。那里有云的存在,但它是私有的。我们的数据和知识正在从我们的个人电脑中消失,流入云端,无法获得或与其竞
8、争。是谁坐拥我们的社交网络?这真像一次巨型主机的革命。zguide-cn haozu/zguide-cn 页 6 我们暂且不谈其中的政治因素,光那些就可以另外出本书了。目前的现状是,虽然互联网能够让千万个程序相连,但我们之中的大多数却无法做到这些。这样一来,那些真正有趣的大型问题(如健康、教育、经济、交通等领域),仍然无法解决。我们没有能力将代码连接起来,也就不能像大脑中的神经元一样处理那些大规模的问题。已经有人尝试用各种方法来连接应用程序,如数以千计的 IETF 规范,每种规范解决一个特定问题。对于开发人员来说,HTTP 协议是比较简单和易用的,但这也往往让问题变得更糟,因为它鼓励人们形成一
9、种重服务端、轻客户端的思想。所以迄今为止人们还在使用原始的 TCP/UDP 协议、私有协议、HTTP 协议、网络套接字等形式连接应用程序。这种做法依旧让人痛苦,速度慢又不易扩展,需要集中化管理。而分布式的 P2P 协议又仅仅适用于娱乐,而非真正的应用。有谁会使用 Skype 或者 Bittorrent 来交换数据呢?这就让我们回归到编程科学的问题上来。想要拯救这个世界,我们需要做两件事情:一,如何在任何地点连接任何两个应用程序;二、将这个解决方案用最为简单的方式包装起来,供程序员使用。也许这听起来太简单了,但事实确实如此。ZMQZMQ 简介简介 ZMQ(MQ、ZeroMQ,0MQ)看起来像是一
10、套嵌入式的网络链接库,但工作起来更像是一个并发式的框架。它提供的套接字可以在多种协议中传输消息,如线程间、进程间、TCP、广播等。你可以使用套接字构建多对多的连接模式,如扇出、发布-订阅、任务分发、请求-应答等。ZMQ 的快速足以胜任集群应用产品。它的异步 I/O 机制让你能够构建多核应用程序,完成异步消息处理任务。ZMQ 有着多语言支持,并能在几乎所有的操作系统上运行。ZMQ 是 iMatix 公司的产品,以 LGPL 开源协议发布。需要具备的知识需要具备的知识 使用最新的 ZMQ 稳定版本;使用 Linux 系统或其他相似的操作系统;能够阅读 C 语言代码,这是本指南示例程序的默认语言;当
11、我们书写诸如 PUSH 或 SUBSCRIBE 等常量时,你能够找到相应语言的实现,如 ZMQ_PUSH、ZMQ_SUBSCRIBE。获取示例获取示例 本指南的所有示例都存放于 github 仓库中,最简单的获取方式是运行以下代码:git clone git:/ zguide-cn haozu/zguide-cn 页 7 浏览 examples 目录,你可以看到多种语言的实现。如果其中缺少了某种你正在使用的语言,我们很希望你可以提交一份补充。这也是本指南实用的原因,要感谢所有做出过贡献的人。所有的示例代码都以 MIT/X11 协议发布,若在源代码中有其他限定的除外。提问提问-回答回答 让我们从
12、简单的代码开始,一段传统的 Hello World 程序。我们会创建一个客户端和一个服务端,客户端发送 Hello 给服务端,服务端返回 World。下文是 C语言编写的服务端,它在 5555 端口打开一个 ZMQ 套接字,等待请求,收到后应答 World。hwserver.c:Hello World serverhwserver.c:Hello World server/Hello World 服务端/绑定一个 REP 套接字至 tcp:/*:5555/从客户端接收 Hello,并应答 World/#include#include#include#include int main(void)
13、void*context=zmq_init(1);/与客户端通信的套接字 void*responder=zmq_socket(context,ZMQ_REP);zmq_bind(responder,tcp:/*:5555);while(1)/等待客户端请求 zmq_msg_t request;zmq_msg_init(&request);zmq_recv(responder,&request,0);printf(收到 Hellon);zmq_msg_close(&request);/做些“处理”sleep(1);zguide-cn haozu/zguide-cn 页 8 /返回应答 zmq_m
14、sg_t reply;zmq_msg_init_size(&reply,5);memcpy(zmq_msg_data(&reply),World,5);zmq_send(responder,&reply,0);zmq_msg_close(&reply);/程序不会运行到这里,以下只是演示我们应该如何结束 zmq_close(responder);zmq_term(context);return 0;使用 REQ-REP 套接字发送和接受消息是需要遵循一定规律的。客户端首先使用zmq_send()发送消息,再用 zmq_recv()接收,如此循环。如果打乱了这个顺序(如连续发送两次)则会报错。类
15、似地,服务端必须先进行接收,后进行发送。ZMQ 使用 C 语言作为它参考手册的语言,本指南也以它作为示例程序的语言。如果你正在阅读本指南的在线版本,你可以看到示例代码的下方有其他语言的实现。如以下是 C+语言:hwserver.cpp:Hello World serverhwserver.cpp:Hello World server/Hello World 服务端 C+语言版/绑定一个 REP 套接字至 tcp:/*:5555/从客户端接收 Hello,并应答 World/#include zguide-cn haozu/zguide-cn 页 9#include#include#includ
16、e int main()/准备上下文和套接字 zmq:context_t context(1);zmq:socket_t socket(context,ZMQ_REP);socket.bind(tcp:/*:5555);while(true)zmq:message_t request;/等待客户端请求 socket.recv(&request);std:cout 收到 Hello std:endl;/做一些“处理”sleep(1);/应答 World zmq:message_t reply(5);memcpy(void*)reply.data(),World,5);socket.send(re
17、ply);return 0;可以看到 C 语言和 C+语言的 API 代码差不多,而在 PHP 这样的语言中,代码就会更为简洁:hwserver.php:Hello World serverhwserver.php:Hello World server?php/*Hello World 服务端 *绑定 REP 套接字至 tcp:/*:5555 *从客户端接收 Hello,并应答 World *author Ian Barber */$context=new ZMQContext(1);/与客户端通信的套接字 zguide-cn haozu/zguide-cn 页 10$responder=ne
18、w ZMQSocket($context,ZMQ:SOCKET_REP);$responder-bind(tcp:/*:5555);while(true)/等待客户端请求$request=$responder-recv();printf(Received request:%sn,$request);/做一些“处理”sleep(1);/应答 World$responder-send(World);下面是客户端的代码:hwclient:Hello World client in Chwclient:Hello World client in C/Hello World 客户端/连接 REQ 套接字
19、至 tcp:/localhost:5555/发送 Hello 给服务端,并接收 World/#include#include#include#include int main(void)void*context=zmq_init(1);/连接至服务端的套接字 printf(正在连接至 hello world 服务端.n);void*requester=zmq_socket(context,ZMQ_REQ);zmq_connect(requester,tcp:/localhost:5555);int request_nbr;for(request_nbr=0;request_nbr!=10;re
20、quest_nbr+)zmq_msg_t request;zmq_msg_init_size(&request,5);memcpy(zmq_msg_data(&request),Hello,5);printf(正在发送 Hello%d.n,request_nbr);zguide-cn haozu/zguide-cn 页 11 zmq_send(requester,&request,0);zmq_msg_close(&request);zmq_msg_t reply;zmq_msg_init(&reply);zmq_recv(requester,&reply,0);printf(接收到 Worl
21、d%dn,request_nbr);zmq_msg_close(&reply);zmq_close(requester);zmq_term(context);return 0;这看起来是否太简单了?ZMQ 就是这样一个东西,你往里加点儿料就能制作出一枚无穷能量的原子弹,用它来拯救世界吧!理论上你可以连接千万个客户端到这个服务端上,同时连接都没问题,程序仍会运作得很好。你可以尝试一下先打开客户端,再打开服务端,可以看到程序仍然会正常工作,想想这意味着什么。让我简单介绍一下这两段程序到底做了什么。首先,他们创建了一个 ZMQ 上下文,然后是一个套接字。不要被这些陌生的名词吓到,后面我们都会讲到。服
22、务端将 REP 套接字绑定到 5555 端口上,并开始等待请求,发出应答,如此循环。客户端则是发送请求并等待服务端的应答。这些代码背后其实发生了很多很多事情,但是程序员完全不必理会这些,只要知道这些代码短小精悍,极少出错,耐高压。这种通信模式我们称之为请求-应答模式,是 ZMQ 最直接的一种应用。你可以拿它和 RPC 及经典的 C/S 模型做类比。zguide-cn haozu/zguide-cn 页 12 关于字符串关于字符串 ZMQ 不会关心发送消息的内容,只要知道它所包含的字节数。所以,程序员需要做一些工作,保证对方节点能够正确读取这些消息。如何将一个对象或复杂数据类型转换成 ZMQ 可
23、以发送的消息,这有类似 Protocol Buffers 的序列化软件可以做到。但对于字符串,你也是需要有所注意的。在 C 语言中,字符串都以一个空字符结尾,你可以像这样发送一个完整的字符串:zmq_msg_init_data(&request,Hello,6,NULL,NULL);但是,如果你用其他语言发送这个字符串,很可能不会包含这个空字节,如你使用 Python 发送:socket.send(Hello)实际发送的消息是:如果你从 C 语言中读取该消息,你会读到一个类似于字符串的内容,甚至它可能就是一个字符串(第六位在内存中正好是一个空字符),但是这并不合适。这样一来,客户端和服务端对字
24、符串的定义就不统一了,你会得到一些奇怪的结果。当你用 C 语言从 ZMQ 中获取字符串,你不能够相信该字符串有一个正确的结尾。因此,当你在接受字符串时,应该建立多一个字节的缓冲区,将字符串放进去,并添加结尾。所以,让我们做如下假设:ZMQZMQ 的字符串是有长度的,且传送时不加结束符的字符串是有长度的,且传送时不加结束符。在最简单的情况下,ZMQ 字符串和 ZMQ 消息中的一帧是等价的,就如上图所展现的,由一个长度属性和一串字节表示。下面这个功能函数会帮助我们在 C 语言中正确的接受字符串消息:/从 ZMQ 套接字中接收字符串,并转换为 C 语言的字符串 static char*s_recv(
25、void*socket)zmq_msg_t message;zmq_msg_init(&message);zmq_recv(socket,&message,0);zguide-cn haozu/zguide-cn 页 13 int size=zmq_msg_size(&message);char*string=malloc(size+1);memcpy(string,zmq_msg_data(&message),size);zmq_msg_close(&message);string size=0;return(string);这段代码我们会在日后的示例中使用,我们可以顺手写一个 s_send
26、()方法,并打包成一个.h 文件供我们使用。这就诞生了 zhelpers.h,一个供 C 语言使用的 ZMQ 功能函数库。它的源代码比较长,而且只对 C 语言程序员有用,你可以在闲暇时看一看。获取版本号获取版本号 ZMQ 目前有多个版本,而且仍在持续更新。如果你遇到了问题,也许这在下一个版本中已经解决了。想知道目前的 ZMQ 版本,你可以在程序中运行如下:version:MQ version reporting in Cversion:MQ version reporting in C/返回当前 ZMQ 的版本号/#include zhelpers.h int main(void)int ma
27、jor,minor,patch;zmq_version(&major,&minor,&patch);printf(当前 ZMQ 版本号为%d.%d.%dn,major,minor,patch);return EXIT_SUCCESS;让消息流动起来让消息流动起来 第二种经典的消息模式是单向数据分发:服务端将更新事件发送给一组客户端。让我们看一个天气信息发布的例子,包括邮编、温度、相对湿度。我们随机生成这些信息,气象站好像也是这么干的。下面是服务端的代码,使用 5556 端口:wuserver:Weather update server in Cwuserver:Weather update s
28、erver in C zguide-cn haozu/zguide-cn 页 14/气象信息更新服务/绑定 PUB 套接字至 tcp:/*:5556 端点/发布随机气象信息/#include zhelpers.h int main(void)/准备上下文和 PUB 套接字 void*context=zmq_init(1);void*publisher=zmq_socket(context,ZMQ_PUB);zmq_bind(publisher,tcp:/*:5556);zmq_bind(publisher,ipc:/weather.ipc);/初始化随机数生成器 srandom(unsigne
29、d)time(NULL);while(1)/生成数据 int zipcode,temperature,relhumidity;zipcode =randof(100000);temperature=randof(215)-80;relhumidity=randof(50)+10;/向所有订阅者发送消息 char update 20;sprintf(update,%05d%d%d,zipcode,temperature,relhumidity);s_send(publisher,update);zmq_close(publisher);zmq_term(context);return 0;这项更
30、新服务没有开始、没有结束,就像永不消失的电波一样。zguide-cn haozu/zguide-cn 页 15 下面是客户端程序,它会接受发布者的消息,只处理特定邮编标注的信息,如纽约的邮编是 10001:wuclient:Weather update client in Cwuclient:Weather update client in C/气象信息客户端/连接 SUB 套接字至 tcp:/*:5556 端点/收集指定邮编的气象信息,并计算平均温度/#include zhelpers.h int main(int argc,char*argv)void*context=zmq_init(1
31、);/创建连接至服务端的套接字 printf(正在收集气象信息.n);void*subscriber=zmq_socket(context,ZMQ_SUB);zmq_connect(subscriber,tcp:/localhost:5556);/设置订阅信息,默认为纽约,邮编 10001 char*filter=(argc 1)?argv 1:10001;zguide-cn haozu/zguide-cn 页 16 zmq_setsockopt(subscriber,ZMQ_SUBSCRIBE,filter,strlen(filter);/处理 100 条更新信息 int update_nb
32、r;long total_temp=0;for(update_nbr=0;update_nbr 100;update_nbr+)char*string=s_recv(subscriber);int zipcode,temperature,relhumidity;sscanf(string,%d%d%d,&zipcode,&temperature,&relhumidity);total_temp+=temperature;free(string);printf(地区邮编%s 的平均温度为%dFn,filter,(int)(total_temp/update_nbr);zmq_close(subs
33、criber);zmq_term(context);return 0;需要注意的是,在使用 SUB 套接字时,必须使用 zmq_setsockopt()方法来设置订阅的内容。如果你不设置订阅内容,那将什么消息都收不到,新手很容易犯这个错误。订阅信息可以是任何字符串,可以设置多次。只要消息满足其中一条订阅信息,SUB 套接字就会收到。订阅者可以选择不接收某类消息,也是通过 zmq_setsockopt()方法实现的。PUB-SUB 套接字组合是异步的。客户端在一个循环体中使用 zmq_recv()接收消息,如果向 SUB 套接字发送消息则会报错;类似地,服务端可以不断地使用zmq_send()发
34、送消息,但不能再 PUB 套接字上使用 zme_recv()。关于 PUB-SUB 套接字,还有一点需要注意:你无法得知 SUB 是何时开始接收消息的。就算你先打开了 SUB 套接字,后打开 PUB 发送消息,这时 SUB 还是会丢失一些消息的,因为建立连接是需要一些时间的。很少,但并不是零。这种“慢连接”的症状一开始会让很多人困惑,所以这里我要详细解释一下。还记得 ZMQ 是在后台进行异步的 I/O 传输的,如果你有两个节点用以下顺序相连:订阅者连接至端点接收消息并计数;发布者绑定至端点并立刻发送 1000 条消息。运行的结果很可能是订阅者一条消息都收不到。这时你可能会傻眼,忙于检查有没有设
35、置订阅信息,并重新尝试,但结果还是一样。zguide-cn haozu/zguide-cn 页 17 我们知道在建立 TCP 连接时需要进行三次握手,会耗费几毫秒的时间,而当节点数增加时这个数字也会上升。在这么短的时间里,ZMQ 就可以发送很多很多消息了。举例来说,如果建立连接需要耗时 5 毫秒,而 ZMQ 只需要 1 毫秒就可以发送完这 1000 条消息。第二章中我会解释如何使发布者和订阅者同步,只有当订阅者准备好时发布者才会开始发送消息。有一种简单的方法来同步 PUB 和 SUB,就是让 PUB 延迟一段时间再发送消息。现实编程中我不建议使用这种方式,因为它太脆弱了,而且不好控制。不过这里
36、我们先暂且使用 sleep 的方式来解决,等到第二章的时候再讲述正确的处理方式。另一种同步的方式则是认为发布者的消息流是无穷无尽的,因此丢失了前面一部分信息也没有关系。我们的气象信息客户端就是这么做的。示例中的气象信息客户端会收集指定邮编的一千条信息,其间大约有 1000 万条信息被发布。你可以先打开客户端,再打开服务端,工作一段时间后重启服务端,这时客户端仍会正常工作。当客户端收集完所需信息后,会计算并输出平均温度。关于发布-订阅模式的几点说明:订阅者可以连接多个发布者,轮流接收消息;如果发布者没有订阅者与之相连,那它发送的消息将直接被丢弃;如果你使用的不是 TCP 协议,那当订阅者处理速度
37、过慢时,消息会在发布者处堆积。以后我们会讨论如何使用阈值(HWM)来保护发布者。在目前版本的 ZMQ 中,消息的过滤是在订阅者处进行的。也就是说,发布者会向订阅者发送所有的消息,订阅者会将未订阅的消息丢弃。我在自己的四核计算机上尝试发布 1000 万条消息,速度很快,但没什么特别的:phws200901:/work/git/0MQGuide/examples/c$time wuclient Collecting updates from weather server.Average temperature for zipcode 10001 was 18F real 0m5.939s user
38、 0m1.590s sys 0m2.290s 分布式处理分布式处理 下面一个示例程序中,我们将使用 ZMQ 进行超级计算,也就是并行处理模型:任务分发器会生成大量可以并行计算的任务;有一组 worker 会处理这些任务;结果收集器会在末端接收所有 worker 的处理结果,进行汇总。zguide-cn haozu/zguide-cn 页 18 现实中,worker 可能散落在不同的计算机中,利用 GPU(图像处理单元)进行复杂计算。下面是任务分发器的代码,它会生成 100 个任务,任务内容是让收到的 worker 延迟若干毫秒。taskvent:Parallel task ventilator
39、 in Ctaskvent:Parallel task ventilator in C/任务分发器/绑定 PUSH 套接字至 tcp:/localhost:5557 端点/发送一组任务给已建立连接的 worker/#include zhelpers.h int main(void)void*context=zmq_init(1);/用于发送消息的套接字 void*sender=zmq_socket(context,ZMQ_PUSH);zmq_bind(sender,tcp:/*:5557);/用于发送开始信号的套接字 void*sink=zmq_socket(context,ZMQ_PUSH)
40、;zmq_connect(sink,tcp:/localhost:5558);printf(准备好 worker 后按任意键开始:);getchar();printf(正在向 worker 分配任务.n);/发送开始信号 s_send(sink,0);/初始化随机数生成器 srandom(unsigned)time(NULL);/发送 100 个任务 int task_nbr;int total_msec=0;/预计执行时间(毫秒)for(task_nbr=0;task_nbr 100;task_nbr+)int workload;/随机产生 1-100 毫秒的工作量 workload=ran
41、dof(100)+1;total_msec+=workload;char string 10;zguide-cn haozu/zguide-cn 页 19 sprintf(string,%d,workload);s_send(sender,string);printf(预计执行时间:%d 毫秒n,total_msec);sleep(1);/延迟一段时间,让任务分发完成 zmq_close(sink);zmq_close(sender);zmq_term(context);return 0;下面是 worker 的代码,它接受信息并延迟指定的毫秒数,并发送执行完毕的信号:zguide-cn ha
42、ozu/zguide-cn 页 20 taskwork:Parallel task worker in Ctaskwork:Parallel task worker in C/任务执行器/连接 PULL 套接字至 tcp:/localhost:5557 端点/从任务分发器处获取任务/连接 PUSH 套接字至 tcp:/localhost:5558 端点/向结果采集器发送结果/#include zhelpers.h int main(void)void*context=zmq_init(1);/获取任务的套接字 void*receiver=zmq_socket(context,ZMQ_PULL)
43、;zmq_connect(receiver,tcp:/localhost:5557);/发送结果的套接字 void*sender=zmq_socket(context,ZMQ_PUSH);zmq_connect(sender,tcp:/localhost:5558);/循环处理任务 while(1)char*string=s_recv(receiver);/输出处理进度 fflush(stdout);printf(%s.,string);/开始处理 s_sleep(atoi(string);free(string);/发送结果 s_send(sender,);zmq_close(receive
44、r);zmq_close(sender);zmq_term(context);return 0;zguide-cn haozu/zguide-cn 页 21 下面是结果收集器的代码。它会收集 100 个处理结果,并计算总的执行时间,让我们由此判别任务是否是并行计算的。tasksink:Parallel task sink in Ctasksink:Parallel task sink in C/任务收集器/绑定 PULL 套接字至 tcp:/localhost:5558 端点/从 worker 处收集处理结果/#include zhelpers.h int main(void)/准备上下文和套
45、接字 void*context=zmq_init(1);void*receiver=zmq_socket(context,ZMQ_PULL);zmq_bind(receiver,tcp:/*:5558);/等待开始信号 char*string=s_recv(receiver);free(string);/开始计时 int64_t start_time=s_clock();/确定 100 个任务均已处理 int task_nbr;for(task_nbr=0;task_nbr 100;task_nbr+)char*string=s_recv(receiver);free(string);if(t
46、ask_nbr/10)*10=task_nbr)printf(:);else printf(.);fflush(stdout);/计算并输出总执行时间 printf(执行时间:%d 毫秒n,(int)(s_clock()-start_time);zmq_close(receiver);zmq_term(context);return 0;zguide-cn haozu/zguide-cn 页 22 一组任务的平均执行时间在 5 秒左右,以下是分别开始 1 个、2 个、4 个worker 时的执行结果:#1 worker Total elapsed time:5034 msec#2 worker
47、s Total elapsed time:2421 msec#4 workers Total elapsed time:1018 msec 关于这段代码的几个细节:worker 上游和任务分发器相连,下游和结果收集器相连,这就意味着你可以开启任意多个 worker。但若 worker 是绑定至端点的,而非连接至端点,那我们就需要准备更多的端点,并配置任务分发器和结果收集器。所以说,任务分发器和结果收集器是这个网络结构中较为稳定的部分,因此应该由它们绑定至端点,而非 worker,因为它们较为动态。我们需要做一些同步的工作,等待 worker 全部启动之后再分发任务。这点在 ZMQ 中很重要,且
48、不易解决。连接套接字的动作会耗费一定的时间,因此当第一个 worker 连接成功时,它会一下收到很多任务。所以说,如果我们不进行同步,那这些任务根本就不会被并行地执行。你可以自己试验一下。任务分发器使用 PUSH 套接字向 worker 均匀地分发任务(假设所有的worker 都已经连接上了),这种机制称为_负载均衡_,以后我们会见得更多。结果收集器的 PULL 套接字会均匀地从 worker 处收集消息,这种机制称为_公平队列_:zguide-cn haozu/zguide-cn 页 23 管道模式也会出现慢连接的情况,让人误以为 PUSH 套接字没有进行负载均衡。如果你的程序中某个 wor
49、ker 接收到了更多的请求,那是因为它的 PULL 套接字连接得比较快,从而在别的 worker 连接之前获取了额外的消息。使用使用 ZMQZMQ 编程编程 看着这些示例程序后,你一定迫不及待想要用 ZMQ 进行编程了。不过在开始之前,我还有几条建议想给到你,这样可以省去未来的一些麻烦:学习 ZMQ 要循序渐进,虽然它只是一套 API,但却提供了无尽的可能。一步一步学习它提供的功能,并完全掌握。编写漂亮的代码。丑陋的代码会隐藏问题,让想要帮助你的人无从下手。比如,你会习惯于使用无意义的变量名,但读你代码的人并不知道。应使用有意义的变量名称,而不是随意起一个。代码的缩进要统一,布局清晰。漂亮的代
50、码可以让你的世界变得更美好。边写边测试,当代码出现问题,你就可以快速定位到某些行。这一点在编写 ZMQ 应用程序时尤为重要,因为很多时候你无法第一次就编写出正确的代码。当你发现自己编写的代码无法正常工作时,你可以将其拆分成一些代码片段,看看哪段没有正确地执行。ZMQ 可以让你构建非常模块化的代码,所以应该好好利用这一点。需要时应使用抽象的方法来编写程序(类、成员函数等等),不要随意拷贝代码,因为拷贝代码的同时也是在拷贝错误。我们看看下面这段代码,是某位同仁让我帮忙修改的:/注意:不要使用这段代码!static char*topic_str=msg.x|;void*pub_worker(void