ZMQ协议

2023-09-21 09:43:24

一、ZMQ特点

普通的socket是端对端的关系,ZMQ是N:M的关系,socket的连接需要显式地建立连接,销毁连接,选择协议(TCP/UDP)和错误处理,ZMQ屏蔽了这些细节,像是一个封装了的socket库,让网络编程变得更简单。ZMQ不光用于主机与主机之间的socket通信,还可以是线程和进程之间的通信。ZMQ提供的套接字可以在多种协议中传输消息,线程间,进程间,TCP等。可以使用套接字创建多种消息模式,如‘请求-应答模式’,‘发布-订阅模式’,‘分布式模式’等。

组件来去自如,ZQM会负责自动重连,服务端和客户端可以随意的退出网络。tcp的话,必须先服务端启动,再启动客户端,否则会报错。

  • ZMQ会在必要的情况下将消息放入队列中保存,一旦建立了连接就开始发送。
  • ZMQ有阈值机制,当队列满的时候,可以自动阻塞发送者,或者丢弃部分消息。
  • ZMQ可以使用不同的通信协议进行连接,TCP,进程间,线程间。
  • ZMQ提供了多种模式进行消息路由。如请求-应答模式,发布-订阅模式等,这些模式可以用来搭建网络拓扑结构。
  • ZMQ会在后台线程异步的处理I/O操作,他使用一种不会死锁的数据结构来存储消息。

二、ZMQ消息模式

1、Reuqest-Reply(请求-应答模式)

  • 使用Request-Reply模式,需要遵循一定的规律。
  • 客户端必要先发送消息,在接收消息;服务端必须先进行接收客户端发送过来的消息,在发送应答给客户端,如此循环
  • 服务端和客户端谁先启动,效果都是一样的。
  • 服务端在收到消息之前,会一直阻塞,等待客户端连上来。

server.py

import zmq
import time
 
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
count = 0
 
# 必须要先接收消息,然后再应答
if __name__ == '__main__':
    print('zmq server start....')
    while True:
        message = socket.recv().decode("UTF-8")
        count += 1
        print(f'received request. message:{message}; count:{count}')
        time.sleep(1)
        socket.send_string("world!")

client.py

import zmq
 
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
 
# 客户端必须要先发送消息,然后在接收消息
if __name__ == '__main__':
    print('zmq client start....')
    for i in range(1, 10):
        socket.send_string("hello")
        message = socket.recv().decode("UTF-8")
        print(f'received response. message:{message}')

常用数据发送和接收:

# 发送数据
socket.send_json(data)                      # data 会被json序列化后进行传输 (json.dumps)
socket.send_string(data, encoding="utf-8")  # data为unicode字符串,会进行编码成子节再传输
socket.send_pyobj(obj)                      # obj为python对象,采用pickle进行序列化后传输
socket.send_multipart(msg_parts)            # msg_parts, 发送多条消息组成的迭代器序列,每条消息是子节类型,
                                            # 如[b"message1", b"message2", b"message2"]
 
# 接收数据
socket.recv_json()
socket.recv_string()
socket.recv_pyobj()
socket.recv_multipart()

2、Publisher-Subscriber(发布-订阅模式)

Publisher-Subscriber模式,消息是单向流动的,发布者只能发布消息,不能接受消息;订阅者只能接受消息,不能发送消息(可参考 Redis 的发布和订阅方式)。服务端发布消息的过程中,如果有订阅者退出,不影响发布者继续发布消息,当订阅者再次连接上来,收到的消息是后来发布的消息。比较晚加入的订阅者,或者中途离开的订阅者,必然会丢掉一部分信息,如果发布者停止,所有的订阅者会阻塞,等发布者再次上线的时候回继续接受消息。

“慢连接”: 我们不知道订阅者是何时开始接受消息的,就算启动"订阅者",再启动"发布者", “订阅者"还是会缺失一部分的消息,因为建立连接是需要时间的,虽然时间很短,但不是零。ZMQ在后台是进行异步的IO传输,在建立TCP连接的短时间段内,ZMQ就可以发送很多消息了。有种简单的方法来同步"发布者” 和"订阅者", 通过sleep让发布者延迟发布消息,等连接建立完成后再进行发送。
publisher.py

import zmq
import time
import random
 
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")
 
if __name__ == '__main__':
    print("发布者启动.....")
    for i in range(1000):
        time.sleep(0.1)
        temperature = random.randint(-10, 40)
        message = f"我是publisher, 这是我发布给你们的第{i+1}个消息!今日温度{temperature}"
        socket.send_string(message)

subscriber.py

import zmq
 
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
 
# 客户端需要设定一个过滤,否则收不到任何信息
socket.setsockopt_string(zmq.SUBSCRIBE, '')
 
if __name__ == '__main__':
    print('订阅者一号启动....')
    while True:
        message = socket.recv_string()
        print(f"(subscriber1)接收到'发布者'发送的消息:{message}")

3、Push-Pull(平行管道模式/分布式处理)

Ventilator:任务发布器会生成大量可以并行运算的任务。

Worker:有一组worker会处理这些任务。

Sink:结果接收器会在末端接收所有的Worker的处理结果,进行汇总。

Worker上游和"任务发布器"相连,下游和"结果接收器"相连,“任务发布器” 和 "结果接收器"是这个网路结构中比较稳定的部分,由他们绑定至端点

Worker只是连接两个端点,需要等Worker全部启动后,再进行任务分发。Socket的连接会消耗一定时间(慢连接), 如果不进行同步的话,第一个Worker启动,会一下子接收很多任务。

“任务分发器” 会向Worker均匀的分发任务(负载均衡机制)

“结果接收器” 会均匀地从Worker处收集消息(公平队列机制)

更多推荐

Web 3.0 安全风险,您需要了解这些内容

随着技术的不断进步,我们正迎来一个全新的互联网时代,被称为Web3.0。Web3.0将带来许多令人兴奋的机会,但与之同时,也伴随着一系列新的安全风险。在这篇文章中,我们将探讨Web3.0的安全挑战,以帮助您更好地了解并准备迎接这个新时代。Web3.0简介Web3.0是互联网的下一代,将构建在区块链、分布式技术和智能合约

人工智能:人脸识别技术应用场景介绍

目录人脸识别介绍什么是人脸识别技术人脸识别的流程1、场景分类2、认证对比3、金融领保险应用3.1金融行业3.2保险行业4、安防交通领域4.1公园景点人脸识别闸机4.2高铁站进站人脸识别闸机5、警务领域5.1抓拍交通违法人脸识别介绍什么是人脸识别技术人脸识别技术是一种通过计算机技术和模式识别算法来识别和验证人脸的技术。它

SQLyog 各版本下载与安装(目前最新版本为13.2.0)

文章目录一、SQLyogUltimate各版本下载1.ForWindowsx642.ForWindowsx86二、SQLyogCommunity各版本下载1.ForWindowsx642.ForWindowsx863.ForLinuxx86_644.ForLinuxi386三、SQLyog安装四、如何解决SQLyog试

月木学途开发 1.后台用户模块

概述权限控制采用springsecurity数据库设计用户表DROPTABLEIFEXISTS`admin`;CREATETABLE`admin`(`aid`int(32)NOTNULLAUTO_INCREMENT,`email`varchar(50)DEFAULTNULL,`username`varchar(50)D

Java————List

一、顺序表和链表线性表(linearlist)是n个具有相同特性的数据元素的有限序列。线性表是一种在实际中广泛使用的数据结构,常见的线性表:顺序表、链表、栈、队列…线性表在逻辑上是线性结构,也就说是连续的一条直线。但是在物理结构上并不一定是连续的,线性表在物理上存储时,通常以数组和链式结构的形式存储。1.顺序表顺序表是

java版Spring Cloud+Mybatis+Oauth2+分布式+微服务+实现工程管理系统

鸿鹄工程项目管理系统SpringCloud+SpringBoot+Mybatis+Vue+ElementUI+前后端分离构建工程项目管理系统1.项目背景一、随着公司的快速发展,企业人员和经营规模不断壮大。为了提高工程管理效率、减轻劳动强度、提高信息处理速度和准确性,公司对内部工程管理的提升提出了更高的要求。二、企业通过

9.18号作业

完善登录框点击登录按钮后,判断账号(admin)和密码(123456)是否一致,如果匹配失败,则弹出错误对话框,文本内容“账号密码不匹配,是否重新登录”,给定两个按钮ok和cancel,点击ok后,会清除密码框中的内容,继续进行登录;如果点击cancel按钮,则关闭界面。如果账号和密码匹配,则弹出信息对话框,给出提示信

函数式编程汇总

目录一.Lambda表达式实例省略规则二.Stream流案例数据准备入门实例调试技巧常用操作创建流1.单例集合2.数组3.双列集合中间操作1.filter2.map3.distinct4.sorted5.limit7.flatMap终结操作1.forEach2.count3.max&min4.collect查找与匹配1

Vue中如何封装组件,如何进行跨组件通信

封装组件和跨组件通信是Vue.js中非常重要的概念,它们有助于构建模块化、可维护和可扩展的应用程序。本文将深入讨论如何在Vue中封装组件以及如何实现跨组件通信,同时提供示例代码来帮助您更好地理解这些概念。第一部分:Vue组件的封装在Vue中,组件是可复用的UI元素,可以将其封装成独立的模块,以便在应用程序中多次使用。组

WRF高精度气象模拟

气候是多个领域(生态、水资源、风资源及碳中和等问题)的主要驱动因素,合理认知气候变化有利于解释生态环境变化机理及过程,而了解现在、未来气候变化则是进行生态、环境及能源评估、碳政策规划的先决条件,而气候模拟是获取高精度气候信息的最主要手段,现代生态、水文、新能源及碳中和领域需要亚公里及更高分辨率的气象模拟,WRF模式是国

Java的abstract应用和代理模式应用

记录:474场景:Java的abstract应用,一个抽象类,多个实现类。Java代理模式应用,使用代理模式执行具体实现类,在代理类中做日志等记录。版本:JDK1.8。1.一个Java抽象类(1)说明使用Java关键字abstract修饰的类,就是抽象类。使用Java关键字abstract修饰的方法,就是抽象方法。抽象

热文推荐