概述
SDN实验(六)——SDN流量监控
- 一、流量监控原理
- 二、代码实现
- (一)代码
- (二)讲解
- 三、实验演示
- (一)开启Ryu
- (二)开启Mininet
- (三)Ryu显示结果
- 四、扩展
一、流量监控原理
解析:
- 流速公式:(t1时刻的流量-t0时刻的流量)/(t0-t1)
- 剩余带宽公式:链路总带宽-流速
- 路径有效带宽:一条链路中的最小剩余带宽
基于流量的最优路径转发示意图:
二、代码实现
(一)代码
from operator import attrgetter
from ryu.app import simple_switch_13
from ryu.controller.handler import set_ev_cls
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER,DEAD_DISPATCHER
from ryu.lib import hub
class MyMonitor(simple_switch_13.SimpleSwitch13): #simple_switch_13 is same as the last experiment which named self_learn_switch
'''
design a class to achvie managing the quantity of flow
'''
def __init__(self,*args,**kwargs):
super(MyMonitor,self).__init__(*args,**kwargs)
self.datapaths = {}
#use gevent to start monitor
self.monitor_thread = hub.spawn(self._monitor)
@set_ev_cls(ofp_event.EventOFPStateChange,[MAIN_DISPATCHER,DEAD_DISPATCHER])
def _state_change_handler(self,ev):
'''
design a handler to get switch state transition condition
'''
#first get ofprocotol info
datapath = ev.datapath
ofproto = datapath.ofproto
ofp_parser = datapath.ofproto_parser
#judge datapath`s status to decide how to operate
if datapath.state == MAIN_DISPATCHER: #should save info to dictation
if datapath.id not in self.datapaths:
self.datapaths[datapath.id] = datapath
self.logger.debug("Regist datapath: %16x",datapath.id)
elif datapath.state == DEAD_DISPATCHER: #should remove info from dictation
if datapath.id in self.datapaths:
del self.datapaths[datapath.id]
self.logger.debug("Unregist datapath: %16x",datapath.id)
def _monitor(self):
'''
design a monitor on timing system to request switch infomations about port and flow
'''
while True: #initiatie to request port and flow info all the time
for dp in self.datapaths.values():
self._request_stats(dp)
hub.sleep(5) #pause to sleep to wait reply, and gave time to other gevent to request
def _request_stats(self,datapath):
'''
the function is to send requery to datapath
'''
self.logger.debug("send stats reques to datapath: %16x for port and flow info",datapath.id)
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
req = parser.OFPFlowStatsRequest(datapath)
datapath.send_msg(req)
req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)
datapath.send_msg(req)
@set_ev_cls(ofp_event.EventOFPPortStatsReply,MAIN_DISPATCHER)
def _port_stats_reply_handler(self,ev):
'''
monitor to require the port state, then this function is to get infomation for port`s info
print("6666666666port info:")
print(ev.msg)
print(dir(ev.msg))
'''
body = ev.msg.body
self.logger.info('datapath port '
'rx_packets tx_packets'
'rx_bytes tx_bytes'
'rx_errors tx_errors'
)
self.logger.info('--------------- --------'
'-------- --------'
'-------- --------'
'-------- --------'
)
for port_stat in sorted(body,key=attrgetter('port_no')):
self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d',
ev.msg.datapath.id,port_stat.port_no,port_stat.rx_packets,port_stat.tx_packets,
port_stat.rx_bytes,port_stat.tx_bytes,port_stat.rx_errors,port_stat.tx_errors
)
@set_ev_cls(ofp_event.EventOFPFlowStatsReply,MAIN_DISPATCHER)
def _flow_stats_reply_handler(self,ev):
'''
monitor to require the flow state, then this function is to get infomation for flow`s info
print("777777777flow info:")
print(ev.msg)
print(dir(ev.msg))
'''
body = ev.msg.body
self.logger.info('datapath '
'in_port eth_src'
'out_port eth_dst'
'packet_count byte_count'
)
self.logger.info('--------------- '
'---- -----------------'
'---- -----------------'
'--------- ---------'
)
for flow_stat in sorted([flow for flow in body if flow.priority==1],
key=lambda flow:(flow.match['in_port'],flow.match['eth_src'])):
self.logger.info('%016x %8x %17s %8x %17s %8d %8d',
ev.msg.datapath.id,flow_stat.match['in_port'],flow_stat.match['eth_src'],
flow_stat.instructions[0].actions[0].port,flow_stat.match['eth_dst'],
flow_stat.packet_count,flow_stat.byte_count
)
(二)讲解
1. simple_switch_13.SimpleSwitch13
simple_switch_13.SimpleSwitch13是样例代码,其中实现了和我们上一次实验中,自学习交换机类似的功能(稍微多了个关于交换机是否上传全部packet还是只上传buffer_id),所以我们直接继承,可以减少写代码时间。
2. 协程实现伪并发
self.monitor_thread = hub.spawn(self._monitor)
def __init__(self,*args,**kwargs):
super(MyMonitor,self).__init__(*args,**kwargs)
self.datapaths = {}
#use gevent to start monitor
self.monitor_thread = hub.spawn(self._monitor)
3. 在协程中实现周期请求交换机信息
def _monitor(self):
'''
design a monitor on timing system to request switch infomations about port and flow
'''
while True: #initiatie to request port and flow info all the time
for dp in self.datapaths.values():
self._request_stats(dp)
hub.sleep(5) #pause to sleep to wait reply, and gave time to other gevent to request
4. 主动下发消息,请求交换机信息
OFPFlowStatsRequest------注意:我们这里请求两个(端口和协议信息),所以我们要使用两个函数来分别处理port和flow响应。
def _request_stats(self,datapath):
'''
the function is to send requery to datapath
'''
self.logger.debug("send stats reques to datapath: %16x for port and flow info",datapath.id)
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
req = parser.OFPFlowStatsRequest(datapath)
datapath.send_msg(req)
req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY) #可以向上面一样省略默认参数
datapath.send_msg(req)
源码参数信息:
@_set_stats_type(ofproto.OFPMP_FLOW, OFPFlowStats)
@_set_msg_type(ofproto.OFPT_MULTIPART_REQUEST)
class OFPFlowStatsRequest(OFPFlowStatsRequestBase):
"""
Individual flow statistics request message
The controller uses this message to query individual flow statistics.
================ ======================================================
Attribute Description
================ ======================================================
flags Zero or ``OFPMPF_REQ_MORE``
table_id ID of table to read
out_port Require matching entries to include this as an output
port
out_group Require matching entries to include this as an output
group
cookie Require matching entries to contain this cookie value
cookie_mask Mask used to restrict the cookie bits that must match
match Instance of ``OFPMatch``
================ ======================================================
Example::
def send_flow_stats_request(self, datapath):
ofp = datapath.ofproto
ofp_parser = datapath.ofproto_parser
cookie = cookie_mask = 0
match = ofp_parser.OFPMatch(in_port=1)
req = ofp_parser.OFPFlowStatsRequest(datapath, 0,
ofp.OFPTT_ALL,
ofp.OFPP_ANY, ofp.OFPG_ANY,
cookie, cookie_mask,
match)
datapath.send_msg(req)
"""
def __init__(self, datapath, flags=0, table_id=ofproto.OFPTT_ALL,
out_port=ofproto.OFPP_ANY,
out_group=ofproto.OFPG_ANY,
cookie=0, cookie_mask=0, match=None, type_=None):
5. 获取端口响应信息
ofp_event.EventOFPPortStatsReply
@set_ev_cls(ofp_event.EventOFPPortStatsReply,MAIN_DISPATCHER)
def _port_stats_reply_handler(self,ev):
'''
monitor to require the port state, then this function is to get infomation for port`s info
print("6666666666port info:")
print(ev.msg)
print(dir(ev.msg))
'''
body = ev.msg.body
self.logger.info('datapath port '
'rx_packets tx_packets'
'rx_bytes tx_bytes'
'rx_errors tx_errors'
)
self.logger.info('--------------- --------'
'-------- --------'
'-------- --------'
'-------- --------'
)
for port_stat in sorted(body,key=attrgetter('port_no')):
self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d',
ev.msg.datapath.id,port_stat.port_no,port_stat.rx_packets,port_stat.tx_packets,
port_stat.rx_bytes,port_stat.tx_bytes,port_stat.rx_errors,port_stat.tx_errors
)
端口信息:
6666666666port info:
version=0x4,msg_type=0x13,msg_len=0x1d0,xid=0x8dcd9187,
OFPPortStatsReply(
body=[
OFPPortStats(port_no=4294967294,rx_packets=0,tx_packets=0,rx_bytes=0,tx_bytes=0,rx_dropped=65,tx_dropped=0,rx_errors=0,tx_errors=0,rx_frame_err=0,rx_over_err=0,rx_crc_err=0,collisions=0,duration_sec=1912,duration_nsec=331000000), OFPPortStats(port_no=1,rx_packets=154,tx_packets=225,rx_bytes=11660,tx_bytes=19503,rx_dropped=0,tx_dropped=0,rx_errors=0,tx_errors=0,rx_frame_err=0,rx_over_err=0,rx_crc_err=0,collisions=0,duration_sec=1912,duration_nsec=333000000), OFPPortStats(port_no=2,rx_packets=186,tx_packets=257,rx_bytes=14516,tx_bytes=22343,rx_dropped=0,tx_dropped=0,rx_errors=0,tx_errors=0,rx_frame_err=0,rx_over_err=0,rx_crc_err=0,collisions=0,duration_sec=1912,duration_nsec=334000000), OFPPortStats(port_no=3,rx_packets=220,tx_packets=232,rx_bytes=18439,tx_bytes=19311,rx_dropped=0,tx_dropped=0,rx_errors=0,tx_errors=0,rx_frame_err=0,rx_over_err=0,rx_crc_err=0,collisions=0,duration_sec=1912,duration_nsec=333000000)
]
,flags=0,type=4)
OFPPortStats(
port_no=4294967294, ----------
rx_packets=0, ----------
tx_packets=0, ----------
rx_bytes=0, ----------
tx_bytes=0, ----------
rx_dropped=65,
tx_dropped=0,
rx_errors=0, ----------
tx_errors=0, ----------
rx_frame_err=0,
rx_over_err=0,
rx_crc_err=0,
collisions=0,
duration_sec=1912,
duration_nsec=331000000)
['_STATS_MSG_TYPES', '_TYPE', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_base_attributes', '_class_prefixes', '_class_suffixes', '_decode_value', '_encode_value', '_get_decoder', '_get_default_decoder', '_get_default_encoder', '_get_encoder', '_get_type', '_is_class', '_opt_attributes', '_restore_args', '_serialize_body', '_serialize_header', '_serialize_pre', 'body', 'buf', 'cls_body_single_struct', 'cls_from_jsondict_key', 'cls_msg_type', 'cls_stats_body_cls', 'cls_stats_type',
'datapath' ----------
, 'flags', 'from_jsondict', 'msg_len', 'msg_type', 'obj_from_jsondict', 'parser', 'parser_stats', 'parser_stats_body', 'register_stats_type', 'serialize', 'set_buf', 'set_classes', 'set_headers', 'set_xid', 'stringify_attrs', 'to_jsondict', 'type', 'version', 'xid']
6. 获取flow协议响应信息
ofp_event.EventOFPFlowStatsReply
@set_ev_cls(ofp_event.EventOFPFlowStatsReply,MAIN_DISPATCHER)
def _flow_stats_reply_handler(self,ev):
'''
monitor to require the flow state, then this function is to get infomation for flow`s info
print("777777777flow info:")
print(ev.msg)
print(dir(ev.msg))
'''
body = ev.msg.body
self.logger.info('datapath '
'in_port eth_src'
'out_port eth_dst'
'packet_count byte_count'
)
self.logger.info('--------------- '
'---- -----------------'
'---- -----------------'
'--------- ---------'
)
for flow_stat in sorted([flow for flow in body if flow.priority==1],
key=lambda flow:(flow.match['in_port'],flow.match['eth_src'])):
self.logger.info('%016x %8x %17s %8x %17s %8d %8d',
ev.msg.datapath.id,flow_stat.match['in_port'],flow_stat.match['eth_src'],
flow_stat.instructions[0].actions[0].port,flow_stat.match['eth_dst'],
flow_stat.packet_count,flow_stat.byte_count
)
协议信息:
777777777flow info:
version=0x4,msg_type=0x13,msg_len=0x200,xid=0x9e448a1a,
OFPFlowStatsReply(
body=[
OFPFlowStats(byte_count=5446,cookie=0,duration_nsec=552000000,duration_sec=1893,flags=0,hard_timeout=0,idle_timeout=0,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=16,max_len=65509,port=1,type=0)],len=24,type=4)],
length=104,match=OFPMatch(oxm_fields={'in_port': 2, 'eth_src': '8a:06:6a:2c:10:fc', 'eth_dst': '26:20:2f:85:5a:9a'}),packet_count=71,priority=1,table_id=0), OFPFlowStats(byte_count=5348,cookie=0,duration_nsec=549000000,duration_sec=1893,flags=0,hard_timeout=0,idle_timeout=0,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=16,max_len=65509,port=2,type=0)],len=24,type=4)],
length=104,match=OFPMatch(oxm_fields={'in_port': 1, 'eth_src': '26:20:2f:85:5a:9a', 'eth_dst': '8a:06:6a:2c:10:fc'}),packet_count=70,priority=1,table_id=0), OFPFlowStats(byte_count=8302,cookie=0,duration_nsec=438000000,duration_sec=1887,flags=0,hard_timeout=0,idle_timeout=0,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=16,max_len=65509,port=1,type=0)],len=24,type=4)],
length=104,match=OFPMatch(oxm_fields={'in_port': 2, 'eth_src': 'ca:9e:a1:af:b9:5f', 'eth_dst': '26:20:2f:85:5a:9a'}),packet_count=103,priority=1,table_id=0), OFPFlowStats(byte_count=8204,cookie=0,duration_nsec=436000000,duration_sec=1887,flags=0,hard_timeout=0,idle_timeout=0,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=16,max_len=65509,port=2,type=0)],len=24,type=4)]
,length=104,match=OFPMatch(oxm_fields={'in_port': 1, 'eth_src': '26:20:2f:85:5a:9a', 'eth_dst': 'ca:9e:a1:af:b9:5f'}),packet_count=102,priority=1,table_id=0), OFPFlowStats(byte_count=6739,cookie=0,duration_nsec=807000000,duration_sec=9,flags=0,hard_timeout=0,idle_timeout=0,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=16,max_len=65535,port=4294967293,type=0)],len=24,type=4)],
length=80,match=OFPMatch(oxm_fields={}),packet_count=74,priority=0,table_id=0)
]
,flags=0,type=1)
OFPFlowStats(
byte_count=5446, ----------
cookie=0,
duration_nsec=552000000,
duration_sec=1893,
flags=0,
hard_timeout=0,
idle_timeout=0,
instructions=[
OFPInstructionActions(
actions=[
OFPActionOutput(
len=16,
max_len=65509,
port=1, ----------
type=0)
],
len=24,
type=4
)
],
length=104,
match=OFPMatch(oxm_fields={
'in_port': 2, ----------
'eth_src': '8a:06:6a:2c:10:fc', ----------
'eth_dst': '26:20:2f:85:5a:9a' ----------
}),
packet_count=71, ----------
priority=1,
table_id=0
)
['_STATS_MSG_TYPES', '_TYPE', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_base_attributes', '_class_prefixes', '_class_suffixes', '_decode_value', '_encode_value', '_get_decoder', '_get_default_decoder', '_get_default_encoder', '_get_encoder', '_get_type', '_is_class', '_opt_attributes', '_restore_args', '_serialize_body', '_serialize_header', '_serialize_pre', 'body', 'buf', 'cls_body_single_struct', 'cls_from_jsondict_key', 'cls_msg_type', 'cls_stats_body_cls', 'cls_stats_type',
'datapath' ----------
, 'flags', 'from_jsondict', 'msg_len', 'msg_type', 'obj_from_jsondict', 'parser', 'parser_stats', 'parser_stats_body', 'register_stats_type', 'serialize', 'set_buf', 'set_classes', 'set_headers', 'set_xid', 'stringify_attrs', 'to_jsondict', 'type', 'version', 'xid']
三、实验演示
(一)开启Ryu
ryu-manager my_monitor_13.py
(二)开启Mininet
sudo mn --topo=tree,2,2 --controller=remote --mac
(三)Ryu显示结果
四、扩展
python的协程……以后再补充了。。。
参考:
[1] 山上有风景:https://www.cnblogs.com/ssyfj/
[2] Ryubook:http://osrg.github.io/ryu-book/en/html/
[3] Ryu官方文档:https://ryu.readthedocs.io/en/latest/
[4] 未来网络学院:https://www.bilibili.com/video/BV1ft4y1a7ip/?spm_id_from=333.337.search-card.all.click&vd_source=f8206d9b2ac93039311dbe9fdd0bcc87
最后
以上就是整齐香烟为你收集整理的SDN实验(六)——SDN流量监控一、流量监控原理二、代码实现三、实验演示四、扩展的全部内容,希望文章能够帮你解决SDN实验(六)——SDN流量监控一、流量监控原理二、代码实现三、实验演示四、扩展所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复