我是靠谱客的博主 怡然汽车,最近开发中收集的这篇文章主要介绍zmq是基于tcp实现的吗_python--zmq的三种形式实现,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

zeromq是什么?

这是个类似于Socket的一系列接口,他跟Socket的区别是:普通的socket是端到端的(1:1的关系),而ZMQ却是可以N:M 的关系,人们对BSD套接字的了解较多的是点对点的连接,点对点连接需要显式地建立连接、销毁连接、选择协议(TCP/UDP)和处理错误等,而ZMQ屏蔽了这些细节,让你的网络编程更为简单。ZMQ用于node与node间的通信,node可以是主机或者是进程。

引用官方的说法: “ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ的明确目标是“成为标准网络协议栈的一部分,之后进入Linux内核”。现在还未看到它们的成功。但是,它无疑是极具前景的、并且是人们更加需要的“传统”BSD套接字之上的一 层封装。ZMQ让编写高性能网络应用程序极为简单和有趣。”

zmq官方的API文档地址http://api.zeromq.org/

以下是由flask和zmq实现的广播和订阅的模型

广播所有client,没有队列缓存,断开连接数据将永远丢失。client可以进行数据过滤

-------------server-----------------------------------------------------------------------

# coding=utf8

import json

import time

import zmq

HOST = '127.0.0.1'

# HOST = '0.0.0.0'

PORT = '6931'

_context = zmq.Context()

_publisher = _context.socket(zmq.PUB)

url = 'tcp://{}:{}'.format(HOST, PORT)

def publish_message(message):

try:

# print(url, message)

_publisher.bind(url)

time.sleep(1)

_publisher.send_string(message)

except Exception as e:

print("error {}".format(e))

finally:

_publisher.unbind(url)

class ReturnJ(object):

def __init__(self):

#由于存在setattr,此处必须采用这种方式赋值

self.__dict__['res'] = {

# 'code': 200,

# 'msg': '请求成功!'

}

@property

def toJson(self):

return json.dumps(self.res, ensure_ascii=False)

def __setattr__(self, key, val):

self.res[key] = val

from flask import Flask

from flask import request

app = Flask(__name__)

# @app.route("/control/", methods=['GET'])

@app.route("/", methods=['GET'])

def lowerString():

ret = ReturnJ()

try:

data = {'SceFlag':1,'RoadMapX':122.324,'RoadMapY':123.123,'TargetSpeed':1}

ret.FiveG = data

# ret = {'longitude':234, "latitude":456, 'speed':30}

import json

# response = json.dumps(ret)

publish_message(ret.toJson)

except Exception as e:

print(e)

ret.code = 1000

ret.msg = '消息队列出错'

response = ret.toJson

return response

if __name__ == '__main__':

app.run(host='0.0.0.0',debug=False)

# app.run(debug=False,port=5555)

------------------------------------client---------------------------------

# coding=utf8

import zmq

import sys

import time

import logging

import json

import os

# 监听的ip和端口

HOST = '127.0.0.1'

PORT = '6931'

# 请求后端

url = 'http://192.168.50.205:8000/'

logging.basicConfig(filename='subscriber.log', level=logging.INFO)

def request_api(dict):

import urllib.parse

import urllib.request

headers = {

'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36'}

headers['Host'] = 'httpbin.org'

# dict = {'user': 'admin', 'pwd': '12345'}

print(111)

data = urllib.parse.urlencode(dict).encode('utf-8')

print(data)

# data参数如果要传必须传bytes(字节流)类型的,如果是一个字典,先用urllib.parse.urlencode()编码。

request = urllib.request.Request(url=url, data=data, headers=headers, method='POST')

urllib.request.urlopen(request)

class ZClient(object):

def __init__(self, host=HOST, port=PORT):

"""Initialize Worker"""

self.host = host

self.port = port

self._context = zmq.Context()

self._subscriber = self._context.socket(zmq.SUB)

print("Client Initiated")

def receive_message(self):

"""Start receiving messages"""

self._subscriber.connect('tcp://{}:{}'.format(self.host, self.port))

self._subscriber.setsockopt(zmq.SUBSCRIBE, b"")

while True:

print ('listening on tcp://{}:{}'.format(self.host, self.port))

message = self._subscriber.recv()

print(json.loads(message))

# request_api(json.loads(message))

logging.info(

'{} - {}'.format(message, time.strftime("%Y-%m-%d %H:%M")))

if __name__ == '__main__':

zs = ZClient()

zs.receive_message()

第二种  .Request-Reply模式:

客户端在请求后,服务端必须回响应

server

#!/usr/bin/python

# -*- config:utf-8 -*-

# project: testenv

# user:kaikai136

# Author: 开开

# email: jienkai136@sina.com

# createtime: 2019/4/79:58

#!/usr/bin/python

#-*-coding:utf-8-*-

import time

import zmq

context = zmq.Context()

socket = context.socket(zmq.REP)

socket.bind("tcp://*:6845")

while True:

message = socket.recv()

print(message)

#time.sleep(1)

socket.send("server response!".encode('utf-8'))

client

#!/usr/bin/python

# -*- config:utf-8 -*-

# project: testenv

# user:kaikai136

# Author: 开开

# email: jienkai136@sina.com

# createtime: 2019/4/79:58

#!/usr/bin/python

#-*-coding:utf-8-*-

import zmq

import sys

context = zmq.Context()

socket = context.socket(zmq.REQ)

socket.connect("tcp://localhost:6845")

while(True):

data = input("input your data:")

if data == 'q':

sys.exit()

# socket.send(data)

socket.send_unicode(data)

response = socket.recv();

print(response)

第三种.Parallel Pipeline模式:

由三部分组成,push进行数据推送,work进行数据缓存,pull进行数据竞争获取处理。区别于Publish-Subscribe存在一个数据缓存和处理负载。

当连接被断开,数据不会丢失,重连后数据继续发送到对端

server

#!/usr/bin/python

# -*- config:utf-8 -*-

# project: testenv

# user:kaikai136

# Author: 开开

# email: jienkai136@sina.com

# createtime: 2019/4/710:10

#!/usr/bin/python

#-*-coding:utf-8-*-

import zmq

context = zmq.Context()

socket = context.socket(zmq.PULL)

socket.bind('tcp://*:5558')

while True:

data = socket.recv()

print (data)

work

#!/usr/bin/python

# -*- config:utf-8 -*-

# project: testenv

# user:kaikai136

# Author: 开开

# email: jienkai136@sina.com

# createtime: 2019/4/710:11

#!/usr/bin/python

#-*-coding:utf-8-*-

import zmq

context = zmq.Context()

recive = context.socket(zmq.PULL)

recive.connect('tcp://127.0.0.1:5557')

sender = context.socket(zmq.PUSH)

sender.connect('tcp://127.0.0.1:5558')

while True:

data = recive.recv()

print(data)

sender.send(data)

client

#!/usr/bin/python

# -*- config:utf-8 -*-

# project: testenv

# user:kaikai136

# Author: 开开

# email: jienkai136@sina.com

# createtime: 2019/4/710:12

#!/usr/bin/python

#-*-coding:utf-8-*-

import zmq

import time

context = zmq.Context()

socket = context.socket(zmq.PUSH)

socket.bind('tcp://*:5557')

while True:

data = input('input your data:')

socket.send_unicode(data)

最后

以上就是怡然汽车为你收集整理的zmq是基于tcp实现的吗_python--zmq的三种形式实现的全部内容,希望文章能够帮你解决zmq是基于tcp实现的吗_python--zmq的三种形式实现所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(37)

评论列表共有 0 条评论

立即
投稿
返回
顶部