我是靠谱客的博主 大胆雨,最近开发中收集的这篇文章主要介绍Python+Twisted+Autobahn实现Websocket(附完整demo),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

在游戏行业游戏客户端和服务端需要大量的,快速的通讯,这里面就会用到websocket 。
Autobahn 是一个高性能的websocket 它采用了两种实现方案 。
1 基于Twisted
2 基于asyncio

开发语言: python
PS:twisted 的 reactor.callLater回调方法本身有bug,不稳定,过一段时间会假死,可能是因为我没深入了解的原因。所以我用tornado替代了twisted实现了websocket,会在下一篇详解,tornado做websocket是真NB,稳定高效简介,可参考我这篇博客【https://editor.csdn.net/md/?articleId=106819559】,而且更简单。

Autobahn 有如下的特点:

framework for WebSocket and WAMP clients 	(Websocket框架和WAMP框架)
compatible with Python 2.7 and 3.3+ 		(兼容python2.7和python3.3以上的版本)
runs on CPython, PyPy and Jython 			(可以运行在Cpython, PyPy 和Jython 上面)
runs under Twisted and asyncio 				(可以选择Twisted 或者是asyncio的方式来运行)
implements WebSocket RFC6455 (and draft versions Hybi-10+) (实现WebSocket RFC6455(以及草案版本Hybi-10 +)
implements WebSocket compression 			(实现WebSocket压缩)
implements WAMP, the Web Application Messaging Protocol 	(实现Web应用程序消息传递协议)
supports TLS (secure WebSocket) and proxies 				(支持TLS(安全WebSocket)和代理)
Open-source (MIT license) 									(开源)

Autobahn 可以用来做什么

Autobahn 非常适用于交易系统,多人游戏, 实时聊天等应用程序的开发
一个简单的webSocket服务端程序

from autobahn.twisted.websocket import WebSocketServerProtocol
#or: from autobahn.asyncio.websocket import WebSocketServerProtocol
class MyServerProtocol(WebSocketServerProtocol):
   def onConnect(self, request):
       print("Client connecting: {}".format(request.peer))

   def onOpen(self):
       print("WebSocket connection open.")

   def onMessage(self, payload, isBinary):
       if isBinary:
           print("Binary message received: {} bytes".format(len(payload)))
       else:
           print("Text message received: {}".format(payload.decode('utf8')))

       ## echo back message verbatim
       self.sendMessage(payload, isBinary)

   def onClose(self, wasClean, code, reason):
       print("WebSocket connection closed: {}".format(reason))

几个重要的方法:

onConnect: 当有新的客户端连接进来的时候调用该方法
onOpen: 当连接成功时调用该方法
onMessage:该方法是接收来自客户端的消息
onClose: 当关闭时调用该方法

安装Autobahn

Autobahn 的运行依赖于 Twisted 和 asyncio 因此安装Autobahn 需要确认你的python支持了
Twisted 和 asyncio

各个python 版本对 Twisted 和 asyncio 支持如下
使用pip 安装
pip install autobahn[twisted]
pip install autobahn[asyncio]

验证是否安装成功
from autobahn import version
print(version)
0.9.1

启动webScoketServer
twisted 版本:

import sys

   from twisted.python import log
   from twisted.internet import reactor
   log.startLogging(sys.stdout)

   from autobahn.twisted.websocket import WebSocketServerFactory
   factory = WebSocketServerFactory()
   factory.protocol = MyServerProtocol

   reactor.listenTCP(9000, factory)
   reactor.run()

asyncio 版本:

try:
      import asyncio
   except ImportError:
      ## Trollius >= 0.3 was renamed
      import trollius as asyncio

   from autobahn.asyncio.websocket import WebSocketServerFactory
   factory = WebSocketServerFactory()
   factory.protocol = MyServerProtocol

   loop = asyncio.get_event_loop()
   coro = loop.create_server(factory, '127.0.0.1', 9000)
   server = loop.run_until_complete(coro)

   try:
      loop.run_forever()
   except KeyboardInterrupt:
      pass
   finally:
      server.close()
      loop.close()

创建WebSocketClient

class MyClientProtocol(WebSocketClientProtocol):

   def onOpen(self):
      self.sendMessage(u"Hello, world!".encode('utf8'))

   def onMessage(self, payload, isBinary):
      if isBinary:
         print("Binary message received: {0} bytes".format(len(payload)))
      else:
         print("Text message received: {0}".format(payload.decode('utf8')))

WebSocketClientProtocol 可以使用:

autobahn.twisted.websocket.WebSocketClientProtocol
autobahn.asyncio.websocket.WebSocketClientProtocol

使用Client

Twisted版本

import sys

   from twisted.python import log
   from twisted.internet import reactor
   log.startLogging(sys.stdout)

   from autobahn.twisted.websocket import WebSocketClientFactory
   factory = WebSocketClientFactory()
   factory.protocol = MyClientProtocol

   reactor.connectTCP("127.0.0.1", 9000, factory)
   reactor.run()

asyncio版本

try:
      import asyncio
   except ImportError:
      ## Trollius >= 0.3 was renamed
      import trollius as asyncio

   from autobahn.asyncio.websocket import WebSocketClientFactory
   factory = WebSocketClientFactory()
   factory.protocol = MyClientProtocol

   loop = asyncio.get_event_loop()
   coro = loop.create_connection(factory, '127.0.0.1', 9000)
   loop.run_until_complete(coro)
   loop.run_forever()
   loop.close()

发送消息
当我们的server或者client 继承了autobahn之后就可以使用 sendMessage 方法来发送消息

接收消息
当我们的server或者client 实现了onMessage(self, payload, isBinary): 定义了该方法之后,就可以接收到消息

更多详细介绍请查看: https://autobahn.readthedocs.io/en/latest/index.html

Twisted demo,本人写的测试脚本:

以下的爬取数据方法,以及组装数据结构到客户端组方法,需要你自己重写下,大致流程就是下面这样,亲测可用。
PS:twisted 的 reactor.callLater回调方法本身有bug,不稳定,过一段时间会假死。所以我用tornado替代了twisted实现了websocket,会在下一篇详解,tornado做websocket是真NB,不解释。

# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   File Name :views
   Description :
   Author : cjh
   date :2020-02-20
-------------------------------------------------
"""
import sys
import json
import time
from utils.dbapi import DBPool
from twisted.python import log
from twisted.internet import reactor
from autobahn.twisted.websocket import WebSocketServerFactory, WebSocketServerProtocol, listenWS

log.startLogging(sys.stdout)


class BroadcastServerProtocol(WebSocketServerProtocol):
    """ webSocket 服务端 """
    def onOpen(self):
        """
        打开连接
        :return:
        """
        self.factory.register(self)
        print("WebSocket connection open.")

    def onMessage(self, payload, isBinary):
        """
        接收消息
        :param payload:
        :param isBinary: Boolean值,判断payload是否为二进制数据
        :return:
        """
        if isBinary:
            print('WebSocket Data Type Error')
        else:
            obj = eval(payload.decode('utf-8'))
            unit_id = obj.get('unit_id')            # 工程id
            handle = obj.get('handle')              # 操作类型
            db_code = obj.get('db_code')            # 数据库 编码
            config = eval(obj.get('config'))        # 数据库 链接信息
            self.factory.client_to_group(self, db_code, handle, unit_id, config)

    def onConnect(self, request):
        """
        打开链接
        :param request:
        :return:
        """
        print("Client connecting: {}".format(request.peer))

    def onClose(self, wasClean, code, reason):
        """
        关闭连接
        :param wasClean:
        :param code:
        :param reason:
        :return:
        """
        print("WebSocket connection closed: {}".format(reason))
        self.factory.un_register(self)

    def send_msg(self, payload, is_binary):
        """
        重写 sendMessage
        :param payload:   务必编码  .encode('utf-8')
        :param is_binary:
        :return:
        """
        self.sendMessage(payload=payload, isBinary=is_binary)


class BroadcastServerFactory(WebSocketServerFactory, DBPool):
    """ 工厂 创建 协议类的实例"""
    def __init__(self, url):
        WebSocketServerFactory.__init__(self, url)
        self.data = []                  # 待发送客户端的数据
        self.send_list = []             # 待发送客户端 列表
        self.client_list = []           # 客户端 列表
        self.client_group = {}          # 请求参数,组装后字典
        self.polling()                  # 回调函数,定时主动链接客户端,广播数据

    def polling(self):
        """
        回调函数
        :return:
        """
        self.item_client_group()
        reactor.callLater(5, self.polling)      # 异步递归调用,5秒

    def register(self, client):
        """
        注册客户端,加入客户端 列表
        :param client:
        :return:
        """
        if client not in self.client_list:
            print("registered client {}".format(client.peer))
            self.client_list.append(client)

        print self.client_list
        print self.client_group

    def un_register(self, client):
        """
        删除客户端
        :param client:
        :return:
        """
        if client in self.client_list:
            print("unregistered client {}".format(client.peer))
            self.client_list.remove(client)

        print self.client_list
        print self.client_group

    def send_msg(self):
        """
        轮循服务列表,逐个发消息(会一直被回调函数触发)
        :param data:
        :return:
        """
        self.data = json.dumps(self.data).encode('utf8')
        for client in self.send_list:
            client.sendMessage(self.data)
            print("broadcasting message to {}".format(client.peer))

    def client_to_group(self, client, db_code, handle, unit_id, config):
        """
        组装请求参数,挂载到客户端
        :param client:
        :param db_code:
        :param unit_id:
        :param config:
        :param handle:
        :return:
        """
        if db_code in self.client_group:
            if handle in self.client_group[db_code]:
                if unit_id in self.client_group[db_code][handle]:
                    self.client_group[db_code][handle][unit_id]['client'].append(client)
                else:
                    self.client_group[db_code][handle].update({unit_id: {'client': [client], 'config': config}})
            else:
                self.client_group[db_code].update({handle: {unit_id: {'client': [client], 'config': config}}})
        else:
            self.client_group.update({db_code: {handle: {unit_id: {'client': [client], 'config': config}}}})

    def item_client_group(self):
        """
        迭代客户端及请求参数
        :return:
        """
        for db_code, v_1 in self.client_group.items():
            for handle, v_2 in v_1.items():
                for unit_id, v_3 in v_2.items():
                    self.get_close_client(v_3['client'])
                    v_3['client'] = self.send_list
                    self.get_data(handle, unit_id, v_3['config'])		# 实际业务操作,爬取数据
                    self.send_msg()										# 发送数据到客户端

    def get_close_client(self, client_send):
        """
        主动剔除已关闭 客户端
        :param client_send:
        :return:
        """
        self.send_list = list(set(self.client_list) & set(client_send))

    def get_data(self, handle, unit_id, config):
        """
        爬取数据
        :param db_code:
        :param handle:
        :param unit_id:
        :param config:
        :return:
        """
        print 'start'
        print time.time()
        self.data = []          # 初始化
        if handle == 'real' and unit_id and config:
            config = dict(host=config.get('HOST'), port=eval(config.get('PORT')), db=config.get('NAME'),
                          user=config.get('USER'), passwd=config.get('PASSWORD'))
            DBPool.__init__(self, config)
            sql_1 = "select `code`,recode,`name`,`rename`,show_id,gateway_address address,gateway_id,lot_type " 
                    "address_type from archive_device where status='a' and unit_id={}".format(unit_id)
            sql_data = "select * from device_data_real where equip_address='{}' limit 1"
            self.data = self.query(sql_1)
            for i in self.data:
                data = self.query(sql_data.format(i.get('address')))
                if data:
                    value = data[0].get(i.get('code'))
                    i['value'] = value
                else:
                    i['value'] = ''

        print time.time()
        print '_________________________________________'


def run():
    factory = BroadcastServerFactory
    factory = factory('ws://0.0.0.0:4053')          # 实例化 WebSocketServerFactory 类
    factory.protocol = BroadcastServerProtocol      # 运行服务
    listenWS(factory)                               # webSocket监听
    reactor.run()                                   # 监听


if __name__ == '__main__':
    run()

最后

以上就是大胆雨为你收集整理的Python+Twisted+Autobahn实现Websocket(附完整demo)的全部内容,希望文章能够帮你解决Python+Twisted+Autobahn实现Websocket(附完整demo)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(32)

评论列表共有 0 条评论

立即
投稿
返回
顶部