我是靠谱客的博主 快乐鸭子,这篇文章主要介绍java集成ZMQ,并使用应答模式发送接收数据;,现在分享给大家,希望可以做个参考。

官网地址

复制代码
1
2
3
4
5
6
7
8
An open-source universal messaging library 看起来像一个可嵌入的网络库,但实际上像一个并发框架。 它为您提供了跨进程内、进程间、TCP和多播等各种传输方式传输原子消息的套接字。 您可以使用扇出、发布-订阅、任务分发和请求-应答等模式将套接字N对N连接起来。 它的速度足以成为集群产品的结构。 它的异步I/O模型为您提供了可伸缩的多核应用程序,构建为异步消息处理任务。 它有许多语言API,并在大多数操作系统上运行。

项目实际使用在和C++ 构建的系统上做通讯;用于接收客户端发送过来的图片信息;

复制代码
1
2
3
4
5
6
7
<!--maven版本--> <dependency> <groupId>org.zeromq</groupId> <artifactId>jeromq</artifactId> <version>0.5.1</version> </dependency>

构建模拟客户端,实际开发中由客户端发送数据
其中AlarmParams 用于转换为json,这里只演示将图片转换为byte[],其余数据自行定义即可
ObjectMapper 来自于 com.fasterxml.jackson.databind;

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ClientTest { public static void main(String args[]) throws Exception { int i = 0; ZMQ.Context context = ZMQ.context(1); ZMQ.Socket socket = context.socket(ZMQ.REQ); System.out.println("Connecting to hello world server..."); socket.connect("tcp://localhost:9999"); AlarmParams alarmParams = new AlarmParams(); byte[] bytes = image2byte("E:\fileStroage\alarm_image\20211009\202110091515100720.mp4"); alarmParams.setImage(bytes); ObjectMapper om = new ObjectMapper(); String res = om.writeValueAsString(alarmParams); socket.send(res); Thread.sleep(100); byte[] reply = socket.recv(0); System.out.println("客户端接收的是: [" + new String(reply) + "]"); i++; Thread.sleep(1000); } public static byte[] image2byte(String path) throws Exception { byte[] data = null; FileImageInputStream input = null; input = new FileImageInputStream(new File(path)); ByteArrayOutputStream output = new ByteArrayOutputStream(); byte[] buf = new byte[1024]; int numBytesRead = 0; while ((numBytesRead = input.read(buf)) != -1) { output.write(buf, 0, numBytesRead); } data = output.toByteArray(); output.close(); input.close(); return data; } }

构建java服务端,由于采用的 REP-REQ 模式,客户端在收到服务端的回应以后才会继续进行下一步操作,所以不管成功或者失败都要将结果返回给客户端;

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/** * @author zy * @Description: 接收zmq客户端消息 * @Date 2021/9/22 9:45 */ public class AlarmPhotoTCPServer { private static Logger log = Logger.getLogger(AlarmPhotoTCPServer.class); private static ZMQ.Socket socket = null; private static String head = null; private static int num = 1; public AlarmPhotoTCPServer() { ZMQ.Context context = ZMQ.context(1); socket = context.socket(SocketType.REP); System.out.println(socket.bind("tcp://*:" + ResourceProperty.serverPort)); new Thread(() -> { start(); }).start(); } public void start() { while (true) { byte[] request = null; ZMsg msg = ZMsg.recvMsg(socket); // System.out.println( socket.get()); if (msg == null) continue; Iterator<ZFrame> frame = msg.iterator(); while (frame.hasNext()) { request = frame.next().getData(); if (num == 1) { try { head = new String(request, "UTF-8"); } catch (UnsupportedEncodingException e) { log.error("解析客户端数据出错:" + e); } } num++; } try { dealAlarmParams(request); } catch (Exception e) { log.error("处理客户端数据出错:" + e); socket.send("图片处理失败;"); } } } private static void dealAlarmParams(byte[] data) throws Exception { // 解析数据 AlarmParams alarmParams = Constant.objectMapper.readValue(data, AlarmParams.class); if (alarmParams.getAlarmType() == 0) { log.info("与客户端建立连接成功;"); socket.send(0 + ""); return; } String path = FileUtils.saveImageData(alarmParams.getImage(), name); String alarmTime = getTime(alarmParams.getName(), name); if (StringUtils.isBlank(path)) { socket.send("save error"); return; } } }

其中FileUtils.saveImageData(alarmParams.getImage(), name);部分代码
由于客户端传输的可能是视频或者图片,这里还针对文件名结尾的.mp4或者.jpeg的不同作出对应处理;
其中由于接受的是海康的视频,直接取得流,转成视频以后不可用(具体原理自行百度),这边还通过ffmpeg对流进行了进一步处理;

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
/** * @author zy * @className:FileUtils * @Description: 文件工具类 * @Date 2021/9/22 10:30 */ public class FileUtils { private static Logger log = Logger.getLogger(FileUtils.class); public static String saveImageData(byte[] data, String target) { if (target.equals(".mp4")) { return saveVideo(DateUtil.formats.format(new Date()) + target, data); } return saveImageData(DateUtil.formats.format(new Date()) + target, data); } /** * @author zy * @Description 保存转码视频 * @Param [s, data] * @return java.lang.String **/ private static String saveVideo(String s, byte[] data) { // 保存接收文件 String input = ResourceProperty.savePath + saveImageData("temp/" + s, data); String output = ResourceProperty.savePath + Constant.getImageDate() + s; String res =null; // 生成新文件 if ( !StringUtils.isBlank(processMP4(input, output))){ // 删除源文件 delOriginalFile(input); // 新文件路径 res = output.replace(ResourceProperty.savePath, ""); } return res; } /** * @author zy * @Description 保存图片信息 * @Param [s, data] * @return java.lang.String **/ public static String saveImageData(String imageName, byte[] data) { FileOutputStream outputStream = null; String path = null; try { /*if (StringUtils.isBlank(imageName) || data == null || data.length <= 0) { return path; }*/ // 获取图片文件保存的域路径,以及拼接文件名,形成完整的路径 String domainPath = Constant.getImageDate(); String filePath = ResourceProperty.savePath + domainPath + imageName; File file = new File(filePath); if (!file.getParentFile().exists()) { // 如果目标文件所在的文件夹不存在,则创建父文件夹 log.debug("图片保存文件所在目录不存在,准备创建它!"); // 判断创建目录是否成功 if (!file.getParentFile().mkdirs()) { log.error("创建图片保存文件所在的目录失败!"); return null; } } if (!file.exists()) { file.createNewFile(); log.debug("目标文件创建成功: " + filePath); } else { log.info("目标文件创建失败"); return null; } outputStream = new FileOutputStream(file); outputStream.write(data); outputStream.flush(); // 图片保存以后才给返回数据赋值图片的存储路径,避免图片存储异常,提前赋值图片路径,导致后期查询异常 path = domainPath + imageName; log.info("文件保存在:[" + filePath + "]"); } catch (Exception e) { log.error("保存图片失败:" + e); } finally { IOUtils.closeQuietly(outputStream); outputStream = null; // 设置文件权限 // Constant.setfileSystemPermission(); } return path; } /** * 删除文件夹下的指定以外的文件 * * @author zy * @param filepath * @param prefix 删除以外文件的后缀 */ public static void deleteExceptfile(String filepath, String... prefix) { File file = new File(filepath); setFileDirPermission(file); if (!file.isDirectory()) { String name = file.getName().toLowerCase(); for (int i = 0, length = prefix.length; i < length; i++) { if (!name.endsWith(prefix[i].toLowerCase())) { System.out.println("path=" + file.getPath()); file.delete(); } } } else if (file.isDirectory()) { String[] filelist = file.list(); for (int i = 0; i < filelist.length; i++) { File readfile = new File(filepath + File.separator + filelist[i]); if (!readfile.isDirectory()) { String name = readfile.getName().toLowerCase(); for (int j = 0, length = prefix.length; j < length; j++) { if (!name.endsWith(prefix[j].toLowerCase())) { System.out.println("path=" + readfile.getPath()); readfile.delete(); } } } else if (readfile.isDirectory()) { deleteExceptfile(filepath + File.separator + filelist[i], prefix); } } } } /** * @author zy * @param filepath * @param prefix 删除文件的后缀 */ public static void deletefile(String filepath, String... prefix) { File file = new File(filepath); setFileDirPermission(file); if (!file.isDirectory()) { String name = file.getName().toLowerCase(); for (int i = 0, length = prefix.length; i < length; i++) { if (name.endsWith(prefix[i].toLowerCase())) { System.out.println("path=" + file.getPath()); file.delete(); } } } else if (file.isDirectory()) { String[] filelist = file.list(); for (int i = 0; i < filelist.length; i++) { File readfile = new File(filepath + File.separator + filelist[i]); if (!readfile.isDirectory()) { String name = readfile.getName().toLowerCase(); for (int j = 0, length = prefix.length; j < length; j++) { if (name.endsWith(prefix[j].toLowerCase())) { System.out.println("path=" + readfile.getPath()); readfile.delete(); } } } else if (readfile.isDirectory()) { deletefile(filepath + File.separator + filelist[i], prefix); } } } } /** * 文件夹设置权限 * 文件第一层文件夹的权限设置,只要在第一层文件夹下的文件,均有权限 * @author zy * @param files */ public static void setFileDirPermission(File... files) { try { String os = System.getProperty("os.name").toLowerCase(); if (os.contains("windows")) {// windows for (File fil : files) { String path = fil.getAbsolutePath(); String[] split = path.split("\" + File.separator); if (split.length >= 2) { path = split[0] + File.separator + split[1] + File.separator; } File file = new File(path); file.setReadable(true);// 设置可读权限 file.setExecutable(true);// 设置可执行权限 file.setWritable(true);// 设置可写权限 } } else { for (File fil : files) { String path = fil.getAbsolutePath(); String[] split = path.split(File.separator); if (split.length >= 2) { path = split[0] + File.separator + split[1] + File.separator; } Runtime.getRuntime().exec("sudo chmod 777 -R " + path); } } } catch (IOException e) { System.err.println("文件夹设置权限异常"); } } }

最后

以上就是快乐鸭子最近收集整理的关于java集成ZMQ,并使用应答模式发送接收数据;的全部内容,更多相关java集成ZMQ内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(70)

评论列表共有 0 条评论

立即
投稿
返回
顶部