概述
组成:
- 路由: 分发rpc的服务器 可以现在go语言版的,https://github.com/jcelliott/turnpike
- server: 函数提供者
- client:函数调用方
server和client 可以选择 https://wamp-proto.org/implementations/index.html
python 例子 http://autobahn.readthedocs.io/en/latest/wamp/examples.html
python为例 server端
import datetime
from twisted.internet.defer import inlineCallbacks
from autobahn.twisted.wamp import ApplicationSession,ApplicationRunner
# from autobahn_autoreconnect import ApplicationRunner
class Component(ApplicationSession):
"""
A simple time service application component.
"""
@inlineCallbacks
def onJoin(self, details):
print("session attached")
def utcnow():
now = datetime.datetime.utcnow()
return now.strftime("%Y-%m-%dT%H:%M:%SZ")
try:
yield self.register(utcnow, u'com.timeservice.now')
except Exception as e:
print("failed to register procedure: {}".format(e))
else:
print("procedure registered")
if __name__ == '__main__':
runner = ApplicationRunner(u"ws://xxx.xxx.com:31000/ws", u"pwd")
runner.run(Component, auto_reconnect=True)
客户端
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from autobahn.twisted.wamp import ApplicationSession, ApplicationRunner
class Component(ApplicationSession):
"""
An application component using the time service.
"""
@inlineCallbacks
def onJoin(self, details):
print("session attached")
try:
now = yield self.call(u'com.timeservice.now')
except Exception as e:
print("Error: {}".format(e))
else:
print("Current time from time service: {}".format(now))
self.leave()
def onDisconnect(self):
print("disconnected")
reactor.stop()
if __name__ == '__main__':
runner = ApplicationRunner(u"ws://xxx.xxx.com:31000/ws", u"pwd")
runner.run(Component, auto_reconnect=True)
经过一段时间的测试发现turnpike路由存在一个问题:
当server意外断网后(拔网线这种), turnpike不会发现这个server已掉线, 重启server注册时提示函数已经存在, 必须重启turnpike
现改用https://github.com/gammazero/nexus
后测试发现nexus也有这个问题,测试方法,但拔掉客户端的网线再插上,报提示”REGISTER for already registered procedure”
目前通过修改nexus的代码解决,使得收到注册消息时永远是替换方式
dealer.go:register函数
var created string
var regID wamp.ID
// If no existing registration found for the procedure, then create a new
// registration.
if reg == nil { //改成 if true {
regID = d.idGen.Next()
created = wamp.NowISO8601()
reg = ®istration{
dealer.go delCalleeReg函数
if len(reg.callees) == 0 {
delete(d.registrations, regID)
switch reg.match {
default:
if d.procRegMap[reg.procedure] != nil && d.procRegMap[reg.procedure].id == regID {
delete(d.procRegMap, reg.procedure)
}
2018.12.10
使用未按上面修改过的最新的nexus,加上autobahn js或python版可以实现rpc callee的负载均衡
session.register('com.myapp.add2', add2, {
'invoke': "random" //负载方式 "single", "roundrobin", "random", "first", "last"
});
python
yield self.register(utcnow, u'com.timeservice.now1', RegisterOptions(invoke=u'random'))
当时使用最新的autobahn python版客户端会提示注册失败。
性能测试
单核本机caller +本机callee 每秒大约可以到2500个rpc请求 cpu跑满
异常问题
启动两个callee,然后caller循环调用rpc,这时kill掉其中一个callee,有几率caller的call函数不再返回卡死,估计是nexus的bug
用crossbar.io做router就不存在这个问题,而且crossbar.io貌似支持集群,可以规避router的单点故障,但crossbar.io的单核性能不如nexus
最后
以上就是知性砖头为你收集整理的wamp协议实现rpc调用的全部内容,希望文章能够帮你解决wamp协议实现rpc调用所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复