1.原理
(1)基于ros订阅有关目标信息的消息
获取每帧中每个目标的id,位置坐标(px,py),尺寸大小(ox,oy)
(2)获取每个目标的状态量Object(id,px,py,vx,vy,ax,ay,r,ox,oy)
通过位置坐标的差分计算出速度v,再通过速度的差分计算加速度a,选取目标长宽的最大值一半作为有效半径r
(3)碰撞检测
为每帧中的所有目标两两之间进行时间T内的轨迹预测,并采用圆形检测(当两者间的相对位移小于两者的有效半径之和判定为有碰撞风险)判断是否有碰撞风险,并对有碰撞风险的目标进行标记。
轨迹预测与圆形检测(pzjc.py):
时间T的确定:计算两目标的x、y向速度最大值与最大刹车加速度a_brake之比t,将其作为预测时间T(至少为1s)
T=max(abs(object1[3]/self.a_brake), abs(object2[3]/self.a_brake), abs(object1[4]/self.a_brake), abs(object2[4]/self.a_brake), 1)
def judge_pre(self, object1, object2)函数为预判断是否具有发生碰撞的风险,即是否存在仅就x或y向考虑,可能会发生碰撞
def xv_cal(self, v0, x0, a, t, v_state)函数进行轨迹预测,更新预测过程中目标的位置坐标和速度信息
def distance_cal(self, x1, y1, x2, y2)函数计算轨迹预测过程中两目标间的相对位移
def collision_detect(self, object1, object2)函数对两目标间判断在预测时间T内是否会发生碰撞,具体地,预测过程中两目标间相对位移小于两目标的有效半径之和时判定为具有碰撞风险,并输出可能发生碰撞区域的信息
def collision_predict(self, information)函数对每帧中的全部目标两两间判断是否具有碰撞风险,输出全部具有碰撞趋势的目标信息和可能发生碰撞区域的信息
(4)发布碰撞预警信息
将具有碰撞风险目标的id,px,py,ox,oy以及预测会发生碰撞的区域(x,y,r)发布出去
2.代码
ros接受每帧中的目标信息:id、中心坐标和尺寸大小
对每个目标进行碰撞检测(pzjc2.py)
发出可能发生碰撞的目标和发生碰撞的区域(collision.py)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173#! /usr/bin/env python3 #python2 import roslib import rospy from std_msgs.msg import Header from std_msgs.msg import String from sensor_msgs.msg import CompressedImage from visualization_msgs.msg import Marker, MarkerArray from perception_msgs.msg import Obstacle, ObstacleArray # from ros_numpy import msgify # from cv_bridge import CvBridge import sys sys.path.remove('/opt/ros/melodic/lib/python2.7/dist-packages') from socketserver import BaseRequestHandler,ThreadingTCPServer import threading import time import numpy as np import pzjc2 as cd BUF_SIZE=5000 #python3 global_data="wo shi ni die"*100 class SubscribeAndPublish: def __init__(self): self.all_obstacle_str='' self.sub_marker_array_name=rospy.get_param("~sub_marker_array_name") print(self.sub_marker_array_name) self.sub_marker_array= rospy.Subscriber(self.sub_marker_array_name, MarkerArray,self.callback_marker_array) self.id_array=np.zeros(100,np.float32) self.id_arrayp=np.zeros(100,np.float32) self.infor=np.zeros((100,10),np.float64) self.inforp=np.zeros((100,10),np.float64) self.deltatime=0.1 # self.pub_marker_array_name=rospy.get_param("~pub_collision_array_name") self.pub = rospy.Publisher(self.pub_marker_array_name,MarkerArray,queue_size=20) def make_empty_marker_cylinder(self): marker = Marker() marker.header.frame_id = 'collision' marker.header.stamp = rospy.Time.now() marker.id = 0 # enumerate subsequent markers here marker.action = Marker.ADD # can be ADD, REMOVE, or MODIFY marker.ns = "collision" marker.type = Marker.CYLINDER marker.pose.position.x = 1.0 marker.pose.position.y = 1.0 marker.pose.position.z = 0.0 marker.pose.orientation.x = 0.0 marker.pose.orientation.y = 0.0 marker.pose.orientation.z = 0.0 marker.pose.orientation.w = 1.0 marker.scale.x = 1 # artifact of sketchup export marker.scale.y = 1 # artifact of sketchup export marker.scale.z = 1 # artifact of sketchup export marker.color.r = 1.0 marker.color.g = 0 marker.color.b = 0 marker.color.a = 1.0 # marker.lifetime = rospy.Duration() # will last forever unless modifie marker.lifetime = rospy.Duration(0.1) # will last forever unless modifie marker.text = "COLLISION" return marker def callback_marker_array(self,mka): # thistime=time.time() # print('time',thistime) print('') print('rec obs') num=len(mka.markers) Object=np.zeros((num,10),np.float32) state_collision=np.zeros(num,np.int8) print('num of markers',num) if num>0: for i in range(0,num): one_str=self.one_m_to_str(mka.markers[i]) #get information of object id=mka.markers[i].id id=int(id) px=mka.markers[i].pose.position.x py=mka.markers[i].pose.position.y ox=mka.markers[i].scale.x oy=mka.markers[i].scale.y r=max(ox,oy)/2 state=0 statep=0 for k in range(0,len(self.id_arrayp)): #judge whether object has appeared once if self.id_arrayp[k]==id: statep=k+1 for j in range(0,len(self.id_array)): #judge whether object has appeared twice if self.id_array[j]==id: state=j+1 #print(statep,state) if statep==0: vx=0 vy=0 ax=0 ay=0 self.inforp[i]=[id,px,py,0,0,0,0,r,ox,oy] self.infor[i]=[id,0,0,0,0,0,0,r,ox,oy] self.id_arrayp[i]=id self.id_array[i]=0 if state==0 and statep: #Differential calculation speed vx=(px-self.inforp[statep-1][1])/self.deltatime vy=(py-self.inforp[statep-1][2])/self.deltatime ax=0 ay=0 self.infor[i]=[id,px,py,vx,vy,ax,ay,r,ox,oy] self.id_arrayp[i]=id self.id_array[i]=id if state and statep: #Differential calculation acceleration vx=(px-self.infor[state-1][1])/self.deltatime vy=(py-self.infor[state-1][2])/self.deltatime ax=(vx-self.infor[state-1][3])/self.deltatime ay=(vy-self.infor[state-1][4])/self.deltatime self.inforp[i]=self.infor[i] self.infor[i]=[id,px,py,vx,vy,ax,ay,r,ox,oy] print(id,px,py,vx,vy,ax,ay,r,ox,oy) Object[i]=[id,px,py,vx,vy,ax,ay,r,ox,oy] d=cd.Collision_Detection() state_collision,area_collision=d.collision_predict(Object) count=0 #numbei of collision object Object_collision=[] for j in range(0,num): if state_collision[j]: print("collision object id",int(Object[j][0]),":") print('x:',Object[j][1],'y:',Object[j][2],'scale_x:',Object[j][8],'scale_y:',Object[j][9]) count+=1 Object_collision=np.zeros((count,5),np.float32) i=0 for j in range(0,num): if state_collision[j]: Object_collision[i]=[Object[j][0],Object[j][1],Object[j][2],Object[j][8],Object[j][9]] i+=1 print(Object_collision) print("area_collision:",area_collision) #发布collision信息 test_collision_array=MarkerArray() for i in range(0,count): tmp=self.make_empty_marker_cylinder() tmp.id =int(Object_collision[i][0]) tmp.pose.position.x=Object_collision[i][1] tmp.pose.position.y=Object_collision[i][2] tmp.scale.x =Object_collision[i][3] tmp.scale.y =Object_collision[i][4] test_collision_array.markers.append(tmp) #print("(len(area_collision)",(len(area_collision))) for i in range(len(area_collision)): tmp=self.make_empty_marker_cylinder() tmp.id=0 tmp.pose.position.x=area_collision[i][0] tmp.pose.position.y=area_collision[i][1] tmp.scale.x =area_collision[i][2] tmp.scale.y =area_collision[i][2] tmp.color.r=0 tmp.color.b=1.0 test_collision_array.markers.append(tmp) self.pub.publish(test_collision_array) print("publish collision once!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") def main(): rospy.init_node('road_mks_tcp_server', anonymous=True) ##################### #开启ros t=SubscribeAndPublish() ##################### rospy.spin() if __name__ == "__main__": main()
实现圆形检测原理过程(pzjc2.py)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179import os import math import numpy as np class Collision_Detection: #limit v to [-60,60](m/s) def __init__(self): self.v_max=60 self.v_min=-60 self.a_brake=7.5 #print("Collision_Detection init") def collision_predict(self, information): #information[i]=[id, x, y, vx, vy, ax, ay, r, ox, oy] num=len(information) Object=np.zeros((num, 10), dtype='float32') #object information Object_state=np.zeros(num, dtype='float32') #collision warning label #read information for each target count_object=0 for arr in information: Object[count_object]=arr count_object+=1 num_collision_area=0 collision_area=[] #collision warning detection between two object for i in range(num): for j in range(i+1, num, 1): state=0 if self.judge_pre(Object[i], Object[j])==1: #remove objects that are far away or do not collide d_rel=[] state, area=self.collision_detect(Object[i], Object[j]) #print(d_rel) if state==1: Object_state[i]=1 Object_state[j]=1 collision_area.append(area) return Object_state, collision_area def judge_pre(self, object1, object2): #If drive at the current speed, collision may occur in both X and Y directions in the next T. r1=object1[7]*1.1 r2=object2[7]*1.1 #calculate relative x, y , vx, vy x=object1[1]-object2[1] y=object1[2]-object2[2] vx=object1[3]-object2[3] vy=object1[4]-object2[4] T=max(abs(object1[3]/self.a_brake), abs(object2[3]/self.a_brake), abs(object1[4]/self.a_brake), abs(object2[4]/self.a_brake), 1) #judge time #print("T:",T) result_x=0 #if 1, maybe collision in x axis result_y=0 #if 1, maybe collision in y axis result=0 if x<0: x=x+r1+r2 if x>0: x=0 result=1 else: x=x-r1-r2 if x<0: x=0 result=1 if y<0: y=y+r1+r2 if y>0: y=0 result=1 else: y=y-r1-r2 if y<0: y=0 result=1 if vx!=0: tx=x/vx if tx<=T: result_x=1 if vy!=0: ty=y/vy if ty<=T: result_y=1 if result_x==1&result_y==1: result=1 return result def collision_detect(self, object1, object2): #object1:[id, x, y, vx, vy, ax, ay, r, ox, oy] x1=object1[1] y1=object1[2] x2=object2[1] y2=object2[2] v1_x=object1[3] v1_y=object1[4] v2_x=object2[3] v2_y=object2[4] a1_x=object1[5] a1_y=object1[6] a2_x=object2[5] a2_y=object2[6] r1=object1[7]*1.1 r2=object2[7]*1.1 T=max(abs(object1[3]/self.a_brake), abs(object2[3]/self.a_brake), abs(object1[4]/self.a_brake), abs(object2[4]/self.a_brake), 1) #predict time t=0.1 #time gap for detection state=0 #0:safe; 1:collision collision_area=[0,0,0] #[x,y,r],information of collision area num=int(T/t) d_rel=[] #relative distance for cars #judge whether it is deceleration. 0:speed up; 1:slow down v1_x_state=0 v1_y_state=0 v2_x_state=0 v2_y_state=0 state_coll=0 for i in range(num): #update x,v x1, v1_x, v1_x_state = self.xv_cal(v1_x, x1, a1_x, t, v1_x_state) y1, v1_y, v1_y_state = self.xv_cal(v1_y, y1, a1_y, t, v1_y_state) x2, v2_x, v2_x_state = self.xv_cal(v2_x, x2, a2_x, t, v2_x_state) y2, v2_y, v2_y_state = self.xv_cal(v2_y, y2, a2_y, t, v2_y_state) #calculate relativee distance distance = self.distance_cal(x1, y1, x2, y2) d_rel.append(distance) d_safe = r1+r2 if distance<d_safe: state=1 state_coll+=1 if state_coll==1: x=(x1+x2)/2 y=(y1+y2)/2 r=r1+r2 collision_area=[x,y,r] return state, collision_area def distance_cal(self, x1, y1, x2, y2): distance = math.sqrt(pow((x1-x2), 2) + pow((y1-y2), 2)) return distance def xv_cal(self, v0, x0, a, t, v_state): if a==0: #uniform velocity x=x0+v0*t v=v0 else: if v0*a<0: #slow down v_state=1 if v_state==1: v=v0+a*t if v*v0<=0: v=0 x1=(v*v-v0*v0)/(2*a) t1=(v-v0)/a x2=v*(t-t1) x=x0+x1+x2 else: x=(v*v-v0*v0)/(2*a)+x0 else: #speed up v = v0+a*t if v>self.v_max: v=self.v_max x1=(v*v-v0*v0)/(2*a) t1=(v-v0)/a x2=v*(t-t1) x=x0+x1+x2 elif v<self.v_min: v=self.v_min x1=(v*v-v0*v0)/(2*a) t1=(v-v0)/a x2=v*(t-t1) x=x0+x1+x2 else: x=(v*v-v0*v0)/(2*a)+x0 return x, v, v_state if __name__ == '__main__': #only o1 and o2 will collide #[id, x, y, vx, vy, ax, ay, r, ox, oy] o1=[1,0,0,10,10,1,1,1,1,1] o2=[2,10,10,-4,-4,-2,-2,1,1,1] o3=[3,15,15,10,10,1,1,1,1,1] o4=[1,1,0,10,10,1,1,1,1,1] o=[o1, o2, o3, o4] print(o) t=Collision_Detection() state,area=t.collision_predict(o) print(state) print(area)
3.如何启动碰撞预警代码
(1)前置准备
1
2roscore #启动ros rosbag play -l xxx.bag #循环播放xxx.bag
(2)碰撞预警代码启动:
1roslaunch collision collision.launch
(3)观测预警效果
1
2rviz #启动rviz
Fixed Frame改为collision
点击add,选中By topic中collision_marker下的MarkerArray,点击ok,获取观测结果
最后
以上就是阳光大炮最近收集整理的关于碰撞预警检测1.原理2.代码3.如何启动碰撞预警代码的全部内容,更多相关碰撞预警检测1内容请搜索靠谱客的其他文章。
发表评论 取消回复