概述
官网地址
An open-source universal messaging library
看起来像一个可嵌入的网络库,但实际上像一个并发框架。
它为您提供了跨进程内、进程间、TCP和多播等各种传输方式传输原子消息的套接字。
您可以使用扇出、发布-订阅、任务分发和请求-应答等模式将套接字N对N连接起来。
它的速度足以成为集群产品的结构。
它的异步I/O模型为您提供了可伸缩的多核应用程序,构建为异步消息处理任务。
它有许多语言API,并在大多数操作系统上运行。
项目实际使用在和C++ 构建的系统上做通讯;用于接收客户端发送过来的图片信息;
<!--maven版本-->
<dependency>
<groupId>org.zeromq</groupId>
<artifactId>jeromq</artifactId>
<version>0.5.1</version>
</dependency>
构建模拟客户端,实际开发中由客户端发送数据
其中AlarmParams 用于转换为json,这里只演示将图片转换为byte[],其余数据自行定义即可
ObjectMapper 来自于 com.fasterxml.jackson.databind;
public class ClientTest {
public static void main(String args[]) throws Exception {
int i = 0;
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.REQ);
System.out.println("Connecting to hello world server...");
socket.connect("tcp://localhost:9999");
AlarmParams alarmParams = new AlarmParams();
byte[] bytes = image2byte("E:\fileStroage\alarm_image\20211009\202110091515100720.mp4");
alarmParams.setImage(bytes);
ObjectMapper om = new ObjectMapper();
String res = om.writeValueAsString(alarmParams);
socket.send(res);
Thread.sleep(100);
byte[] reply = socket.recv(0);
System.out.println("客户端接收的是: [" + new String(reply) + "]");
i++;
Thread.sleep(1000);
}
public static byte[] image2byte(String path) throws Exception {
byte[] data = null;
FileImageInputStream input = null;
input = new FileImageInputStream(new File(path));
ByteArrayOutputStream output = new ByteArrayOutputStream();
byte[] buf = new byte[1024];
int numBytesRead = 0;
while ((numBytesRead = input.read(buf)) != -1) {
output.write(buf, 0, numBytesRead);
}
data = output.toByteArray();
output.close();
input.close();
return data;
}
}
构建java服务端,由于采用的 REP-REQ 模式,客户端在收到服务端的回应以后才会继续进行下一步操作,所以不管成功或者失败都要将结果返回给客户端;
/**
* @author zy
* @Description: 接收zmq客户端消息
* @Date 2021/9/22 9:45
*/
public class AlarmPhotoTCPServer {
private static Logger log = Logger.getLogger(AlarmPhotoTCPServer.class);
private static ZMQ.Socket socket = null;
private static String head = null;
private static int num = 1;
public AlarmPhotoTCPServer() {
ZMQ.Context context = ZMQ.context(1);
socket = context.socket(SocketType.REP);
System.out.println(socket.bind("tcp://*:" + ResourceProperty.serverPort));
new Thread(() -> {
start();
}).start();
}
public void start() {
while (true) {
byte[] request = null;
ZMsg msg = ZMsg.recvMsg(socket);
// System.out.println( socket.get());
if (msg == null)
continue;
Iterator<ZFrame> frame = msg.iterator();
while (frame.hasNext()) {
request = frame.next().getData();
if (num == 1) {
try {
head = new String(request, "UTF-8");
} catch (UnsupportedEncodingException e) {
log.error("解析客户端数据出错:" + e);
}
}
num++;
}
try {
dealAlarmParams(request);
} catch (Exception e) {
log.error("处理客户端数据出错:" + e);
socket.send("图片处理失败;");
}
}
}
private static void dealAlarmParams(byte[] data) throws Exception {
// 解析数据
AlarmParams alarmParams = Constant.objectMapper.readValue(data, AlarmParams.class);
if (alarmParams.getAlarmType() == 0) {
log.info("与客户端建立连接成功;");
socket.send(0 + "");
return;
}
String path = FileUtils.saveImageData(alarmParams.getImage(), name);
String alarmTime = getTime(alarmParams.getName(), name);
if (StringUtils.isBlank(path)) {
socket.send("save error");
return;
}
}
}
其中FileUtils.saveImageData(alarmParams.getImage(), name);
部分代码
由于客户端传输的可能是视频或者图片,这里还针对文件名结尾的.mp4或者.jpeg的不同作出对应处理;
其中由于接受的是海康的视频,直接取得流,转成视频以后不可用(具体原理自行百度),这边还通过ffmpeg对流进行了进一步处理;
/**
* @author zy
* @className:FileUtils
* @Description: 文件工具类
* @Date 2021/9/22 10:30
*/
public class FileUtils {
private static Logger log = Logger.getLogger(FileUtils.class);
public static String saveImageData(byte[] data, String target) {
if (target.equals(".mp4")) {
return saveVideo(DateUtil.formats.format(new Date()) + target, data);
}
return saveImageData(DateUtil.formats.format(new Date()) + target, data);
}
/**
* @author zy
* @Description 保存转码视频
* @Param [s, data]
* @return java.lang.String
**/
private static String saveVideo(String s, byte[] data) {
// 保存接收文件
String input = ResourceProperty.savePath + saveImageData("temp/" + s, data);
String output = ResourceProperty.savePath + Constant.getImageDate() + s;
String res =null;
// 生成新文件
if ( !StringUtils.isBlank(processMP4(input, output))){
// 删除源文件
delOriginalFile(input);
// 新文件路径
res = output.replace(ResourceProperty.savePath, "");
}
return res;
}
/**
* @author zy
* @Description 保存图片信息
* @Param [s, data]
* @return java.lang.String
**/
public static String saveImageData(String imageName, byte[] data) {
FileOutputStream outputStream = null;
String path = null;
try {
/*if (StringUtils.isBlank(imageName) || data == null || data.length <= 0) {
return path;
}*/
// 获取图片文件保存的域路径,以及拼接文件名,形成完整的路径
String domainPath = Constant.getImageDate();
String filePath = ResourceProperty.savePath + domainPath + imageName;
File file = new File(filePath);
if (!file.getParentFile().exists()) {
// 如果目标文件所在的文件夹不存在,则创建父文件夹
log.debug("图片保存文件所在目录不存在,准备创建它!");
// 判断创建目录是否成功
if (!file.getParentFile().mkdirs()) {
log.error("创建图片保存文件所在的目录失败!");
return null;
}
}
if (!file.exists()) {
file.createNewFile();
log.debug("目标文件创建成功: " + filePath);
} else {
log.info("目标文件创建失败");
return null;
}
outputStream = new FileOutputStream(file);
outputStream.write(data);
outputStream.flush();
// 图片保存以后才给返回数据赋值图片的存储路径,避免图片存储异常,提前赋值图片路径,导致后期查询异常
path = domainPath + imageName;
log.info("文件保存在:[" + filePath + "]");
} catch (Exception e) {
log.error("保存图片失败:" + e);
} finally {
IOUtils.closeQuietly(outputStream);
outputStream = null;
// 设置文件权限
// Constant.setfileSystemPermission();
}
return path;
}
/**
* 删除文件夹下的指定以外的文件
*
* @author zy
* @param filepath
* @param prefix 删除以外文件的后缀
*/
public static void deleteExceptfile(String filepath, String... prefix) {
File file = new File(filepath);
setFileDirPermission(file);
if (!file.isDirectory()) {
String name = file.getName().toLowerCase();
for (int i = 0, length = prefix.length; i < length; i++) {
if (!name.endsWith(prefix[i].toLowerCase())) {
System.out.println("path=" + file.getPath());
file.delete();
}
}
} else if (file.isDirectory()) {
String[] filelist = file.list();
for (int i = 0; i < filelist.length; i++) {
File readfile = new File(filepath + File.separator + filelist[i]);
if (!readfile.isDirectory()) {
String name = readfile.getName().toLowerCase();
for (int j = 0, length = prefix.length; j < length; j++) {
if (!name.endsWith(prefix[j].toLowerCase())) {
System.out.println("path=" + readfile.getPath());
readfile.delete();
}
}
} else if (readfile.isDirectory()) {
deleteExceptfile(filepath + File.separator + filelist[i], prefix);
}
}
}
}
/**
* @author zy
* @param filepath
* @param prefix 删除文件的后缀
*/
public static void deletefile(String filepath, String... prefix) {
File file = new File(filepath);
setFileDirPermission(file);
if (!file.isDirectory()) {
String name = file.getName().toLowerCase();
for (int i = 0, length = prefix.length; i < length; i++) {
if (name.endsWith(prefix[i].toLowerCase())) {
System.out.println("path=" + file.getPath());
file.delete();
}
}
} else if (file.isDirectory()) {
String[] filelist = file.list();
for (int i = 0; i < filelist.length; i++) {
File readfile = new File(filepath + File.separator + filelist[i]);
if (!readfile.isDirectory()) {
String name = readfile.getName().toLowerCase();
for (int j = 0, length = prefix.length; j < length; j++) {
if (name.endsWith(prefix[j].toLowerCase())) {
System.out.println("path=" + readfile.getPath());
readfile.delete();
}
}
} else if (readfile.isDirectory()) {
deletefile(filepath + File.separator + filelist[i], prefix);
}
}
}
}
/**
* 文件夹设置权限
* 文件第一层文件夹的权限设置,只要在第一层文件夹下的文件,均有权限
* @author zy
* @param files
*/
public static void setFileDirPermission(File... files) {
try {
String os = System.getProperty("os.name").toLowerCase();
if (os.contains("windows")) {// windows
for (File fil : files) {
String path = fil.getAbsolutePath();
String[] split = path.split("\" + File.separator);
if (split.length >= 2) {
path = split[0] + File.separator + split[1] + File.separator;
}
File file = new File(path);
file.setReadable(true);// 设置可读权限
file.setExecutable(true);// 设置可执行权限
file.setWritable(true);// 设置可写权限
}
} else {
for (File fil : files) {
String path = fil.getAbsolutePath();
String[] split = path.split(File.separator);
if (split.length >= 2) {
path = split[0] + File.separator + split[1] + File.separator;
}
Runtime.getRuntime().exec("sudo chmod 777 -R " + path);
}
}
} catch (IOException e) {
System.err.println("文件夹设置权限异常");
}
}
}
最后
以上就是快乐鸭子为你收集整理的java集成ZMQ,并使用应答模式发送接收数据;的全部内容,希望文章能够帮你解决java集成ZMQ,并使用应答模式发送接收数据;所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复