概述
在游戏行业游戏客户端和服务端需要大量的,快速的通讯,这里面就会用到websocket 。
Autobahn 是一个高性能的websocket 它采用了两种实现方案 。
1 基于Twisted
2 基于asyncio
开发语言: python
PS:twisted 的 reactor.callLater回调方法本身有bug,不稳定,过一段时间会假死,可能是因为我没深入了解的原因。所以我用tornado替代了twisted实现了websocket,会在下一篇详解,tornado做websocket是真NB,稳定高效简介,可参考我这篇博客【https://editor.csdn.net/md/?articleId=106819559】,而且更简单。
Autobahn 有如下的特点:
framework for WebSocket and WAMP clients (Websocket框架和WAMP框架)
compatible with Python 2.7 and 3.3+ (兼容python2.7和python3.3以上的版本)
runs on CPython, PyPy and Jython (可以运行在Cpython, PyPy 和Jython 上面)
runs under Twisted and asyncio (可以选择Twisted 或者是asyncio的方式来运行)
implements WebSocket RFC6455 (and draft versions Hybi-10+) (实现WebSocket RFC6455(以及草案版本Hybi-10 +))
implements WebSocket compression (实现WebSocket压缩)
implements WAMP, the Web Application Messaging Protocol (实现Web应用程序消息传递协议)
supports TLS (secure WebSocket) and proxies (支持TLS(安全WebSocket)和代理)
Open-source (MIT license) (开源)
Autobahn 可以用来做什么
Autobahn 非常适用于交易系统,多人游戏, 实时聊天等应用程序的开发
一个简单的webSocket服务端程序
from autobahn.twisted.websocket import WebSocketServerProtocol
#or: from autobahn.asyncio.websocket import WebSocketServerProtocol
class MyServerProtocol(WebSocketServerProtocol):
def onConnect(self, request):
print("Client connecting: {}".format(request.peer))
def onOpen(self):
print("WebSocket connection open.")
def onMessage(self, payload, isBinary):
if isBinary:
print("Binary message received: {} bytes".format(len(payload)))
else:
print("Text message received: {}".format(payload.decode('utf8')))
## echo back message verbatim
self.sendMessage(payload, isBinary)
def onClose(self, wasClean, code, reason):
print("WebSocket connection closed: {}".format(reason))
几个重要的方法:
onConnect: 当有新的客户端连接进来的时候调用该方法
onOpen: 当连接成功时调用该方法
onMessage:该方法是接收来自客户端的消息
onClose: 当关闭时调用该方法
安装Autobahn
Autobahn 的运行依赖于 Twisted 和 asyncio 因此安装Autobahn 需要确认你的python支持了
Twisted 和 asyncio
各个python 版本对 Twisted 和 asyncio 支持如下
使用pip 安装
pip install autobahn[twisted]
pip install autobahn[asyncio]
验证是否安装成功
from autobahn import version
print(version)
0.9.1
启动webScoketServer
twisted 版本:
import sys
from twisted.python import log
from twisted.internet import reactor
log.startLogging(sys.stdout)
from autobahn.twisted.websocket import WebSocketServerFactory
factory = WebSocketServerFactory()
factory.protocol = MyServerProtocol
reactor.listenTCP(9000, factory)
reactor.run()
asyncio 版本:
try:
import asyncio
except ImportError:
## Trollius >= 0.3 was renamed
import trollius as asyncio
from autobahn.asyncio.websocket import WebSocketServerFactory
factory = WebSocketServerFactory()
factory.protocol = MyServerProtocol
loop = asyncio.get_event_loop()
coro = loop.create_server(factory, '127.0.0.1', 9000)
server = loop.run_until_complete(coro)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
server.close()
loop.close()
创建WebSocketClient
class MyClientProtocol(WebSocketClientProtocol):
def onOpen(self):
self.sendMessage(u"Hello, world!".encode('utf8'))
def onMessage(self, payload, isBinary):
if isBinary:
print("Binary message received: {0} bytes".format(len(payload)))
else:
print("Text message received: {0}".format(payload.decode('utf8')))
WebSocketClientProtocol 可以使用:
autobahn.twisted.websocket.WebSocketClientProtocol
autobahn.asyncio.websocket.WebSocketClientProtocol
使用Client
Twisted版本
import sys
from twisted.python import log
from twisted.internet import reactor
log.startLogging(sys.stdout)
from autobahn.twisted.websocket import WebSocketClientFactory
factory = WebSocketClientFactory()
factory.protocol = MyClientProtocol
reactor.connectTCP("127.0.0.1", 9000, factory)
reactor.run()
asyncio版本
try:
import asyncio
except ImportError:
## Trollius >= 0.3 was renamed
import trollius as asyncio
from autobahn.asyncio.websocket import WebSocketClientFactory
factory = WebSocketClientFactory()
factory.protocol = MyClientProtocol
loop = asyncio.get_event_loop()
coro = loop.create_connection(factory, '127.0.0.1', 9000)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
发送消息
当我们的server或者client 继承了autobahn之后就可以使用 sendMessage 方法来发送消息
接收消息
当我们的server或者client 实现了onMessage(self, payload, isBinary): 定义了该方法之后,就可以接收到消息
更多详细介绍请查看: https://autobahn.readthedocs.io/en/latest/index.html
Twisted demo,本人写的测试脚本:
以下的爬取数据方法,以及组装数据结构到客户端组方法,需要你自己重写下,大致流程就是下面这样,亲测可用。
PS:twisted 的 reactor.callLater回调方法本身有bug,不稳定,过一段时间会假死。所以我用tornado替代了twisted实现了websocket,会在下一篇详解,tornado做websocket是真NB,不解释。
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
File Name :views
Description :
Author : cjh
date :2020-02-20
-------------------------------------------------
"""
import sys
import json
import time
from utils.dbapi import DBPool
from twisted.python import log
from twisted.internet import reactor
from autobahn.twisted.websocket import WebSocketServerFactory, WebSocketServerProtocol, listenWS
log.startLogging(sys.stdout)
class BroadcastServerProtocol(WebSocketServerProtocol):
""" webSocket 服务端 """
def onOpen(self):
"""
打开连接
:return:
"""
self.factory.register(self)
print("WebSocket connection open.")
def onMessage(self, payload, isBinary):
"""
接收消息
:param payload:
:param isBinary: Boolean值,判断payload是否为二进制数据
:return:
"""
if isBinary:
print('WebSocket Data Type Error')
else:
obj = eval(payload.decode('utf-8'))
unit_id = obj.get('unit_id') # 工程id
handle = obj.get('handle') # 操作类型
db_code = obj.get('db_code') # 数据库 编码
config = eval(obj.get('config')) # 数据库 链接信息
self.factory.client_to_group(self, db_code, handle, unit_id, config)
def onConnect(self, request):
"""
打开链接
:param request:
:return:
"""
print("Client connecting: {}".format(request.peer))
def onClose(self, wasClean, code, reason):
"""
关闭连接
:param wasClean:
:param code:
:param reason:
:return:
"""
print("WebSocket connection closed: {}".format(reason))
self.factory.un_register(self)
def send_msg(self, payload, is_binary):
"""
重写 sendMessage
:param payload: 务必编码 .encode('utf-8')
:param is_binary:
:return:
"""
self.sendMessage(payload=payload, isBinary=is_binary)
class BroadcastServerFactory(WebSocketServerFactory, DBPool):
""" 工厂 创建 协议类的实例"""
def __init__(self, url):
WebSocketServerFactory.__init__(self, url)
self.data = [] # 待发送客户端的数据
self.send_list = [] # 待发送客户端 列表
self.client_list = [] # 客户端 列表
self.client_group = {} # 请求参数,组装后字典
self.polling() # 回调函数,定时主动链接客户端,广播数据
def polling(self):
"""
回调函数
:return:
"""
self.item_client_group()
reactor.callLater(5, self.polling) # 异步递归调用,5秒
def register(self, client):
"""
注册客户端,加入客户端 列表
:param client:
:return:
"""
if client not in self.client_list:
print("registered client {}".format(client.peer))
self.client_list.append(client)
print self.client_list
print self.client_group
def un_register(self, client):
"""
删除客户端
:param client:
:return:
"""
if client in self.client_list:
print("unregistered client {}".format(client.peer))
self.client_list.remove(client)
print self.client_list
print self.client_group
def send_msg(self):
"""
轮循服务列表,逐个发消息(会一直被回调函数触发)
:param data:
:return:
"""
self.data = json.dumps(self.data).encode('utf8')
for client in self.send_list:
client.sendMessage(self.data)
print("broadcasting message to {}".format(client.peer))
def client_to_group(self, client, db_code, handle, unit_id, config):
"""
组装请求参数,挂载到客户端
:param client:
:param db_code:
:param unit_id:
:param config:
:param handle:
:return:
"""
if db_code in self.client_group:
if handle in self.client_group[db_code]:
if unit_id in self.client_group[db_code][handle]:
self.client_group[db_code][handle][unit_id]['client'].append(client)
else:
self.client_group[db_code][handle].update({unit_id: {'client': [client], 'config': config}})
else:
self.client_group[db_code].update({handle: {unit_id: {'client': [client], 'config': config}}})
else:
self.client_group.update({db_code: {handle: {unit_id: {'client': [client], 'config': config}}}})
def item_client_group(self):
"""
迭代客户端及请求参数
:return:
"""
for db_code, v_1 in self.client_group.items():
for handle, v_2 in v_1.items():
for unit_id, v_3 in v_2.items():
self.get_close_client(v_3['client'])
v_3['client'] = self.send_list
self.get_data(handle, unit_id, v_3['config']) # 实际业务操作,爬取数据
self.send_msg() # 发送数据到客户端
def get_close_client(self, client_send):
"""
主动剔除已关闭 客户端
:param client_send:
:return:
"""
self.send_list = list(set(self.client_list) & set(client_send))
def get_data(self, handle, unit_id, config):
"""
爬取数据
:param db_code:
:param handle:
:param unit_id:
:param config:
:return:
"""
print 'start'
print time.time()
self.data = [] # 初始化
if handle == 'real' and unit_id and config:
config = dict(host=config.get('HOST'), port=eval(config.get('PORT')), db=config.get('NAME'),
user=config.get('USER'), passwd=config.get('PASSWORD'))
DBPool.__init__(self, config)
sql_1 = "select `code`,recode,`name`,`rename`,show_id,gateway_address address,gateway_id,lot_type "
"address_type from archive_device where status='a' and unit_id={}".format(unit_id)
sql_data = "select * from device_data_real where equip_address='{}' limit 1"
self.data = self.query(sql_1)
for i in self.data:
data = self.query(sql_data.format(i.get('address')))
if data:
value = data[0].get(i.get('code'))
i['value'] = value
else:
i['value'] = ''
print time.time()
print '_________________________________________'
def run():
factory = BroadcastServerFactory
factory = factory('ws://0.0.0.0:4053') # 实例化 WebSocketServerFactory 类
factory.protocol = BroadcastServerProtocol # 运行服务
listenWS(factory) # webSocket监听
reactor.run() # 监听
if __name__ == '__main__':
run()
最后
以上就是大胆雨为你收集整理的Python+Twisted+Autobahn实现Websocket(附完整demo)的全部内容,希望文章能够帮你解决Python+Twisted+Autobahn实现Websocket(附完整demo)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复