我是靠谱客的博主 快乐鸭子,最近开发中收集的这篇文章主要介绍java集成ZMQ,并使用应答模式发送接收数据;,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

官网地址

An open-source universal messaging library
看起来像一个可嵌入的网络库,但实际上像一个并发框架。
它为您提供了跨进程内、进程间、TCP和多播等各种传输方式传输原子消息的套接字。
您可以使用扇出、发布-订阅、任务分发和请求-应答等模式将套接字N对N连接起来。
它的速度足以成为集群产品的结构。
它的异步I/O模型为您提供了可伸缩的多核应用程序,构建为异步消息处理任务。
它有许多语言API,并在大多数操作系统上运行。

项目实际使用在和C++ 构建的系统上做通讯;用于接收客户端发送过来的图片信息;

<!--maven版本-->
<dependency>
     <groupId>org.zeromq</groupId>
     <artifactId>jeromq</artifactId>
     <version>0.5.1</version>
 </dependency>

构建模拟客户端,实际开发中由客户端发送数据
其中AlarmParams 用于转换为json,这里只演示将图片转换为byte[],其余数据自行定义即可
ObjectMapper 来自于 com.fasterxml.jackson.databind;

public class ClientTest {
    public static void main(String args[]) throws Exception {
        int i = 0;
        ZMQ.Context context = ZMQ.context(1);
        ZMQ.Socket socket = context.socket(ZMQ.REQ);
        System.out.println("Connecting to hello world server...");
        socket.connect("tcp://localhost:9999");
        AlarmParams alarmParams = new AlarmParams();
        byte[] bytes = image2byte("E:\fileStroage\alarm_image\20211009\202110091515100720.mp4");
        alarmParams.setImage(bytes);
        ObjectMapper om = new ObjectMapper();
        String res = om.writeValueAsString(alarmParams);
        socket.send(res);
        Thread.sleep(100);
        byte[] reply = socket.recv(0);
        System.out.println("客户端接收的是: [" + new String(reply) + "]");
        i++;
        Thread.sleep(1000);

    }
    
    public static byte[] image2byte(String path) throws Exception {
        byte[] data = null;
        FileImageInputStream input = null;
        input = new FileImageInputStream(new File(path));
        ByteArrayOutputStream output = new ByteArrayOutputStream();
        byte[] buf = new byte[1024];
        int numBytesRead = 0;
        while ((numBytesRead = input.read(buf)) != -1) {
            output.write(buf, 0, numBytesRead);
        }
        data = output.toByteArray();
        output.close();
        input.close();
        return data;
    }
}

构建java服务端,由于采用的 REP-REQ 模式,客户端在收到服务端的回应以后才会继续进行下一步操作,所以不管成功或者失败都要将结果返回给客户端;

/**
 * @author zy
 * @Description: 接收zmq客户端消息
 * @Date 2021/9/22 9:45
 */
public class AlarmPhotoTCPServer {
    private static Logger log = Logger.getLogger(AlarmPhotoTCPServer.class);

    private static ZMQ.Socket socket = null;
    private static String head = null;
    private static int num = 1;

    public AlarmPhotoTCPServer() {
        ZMQ.Context context = ZMQ.context(1);
        socket = context.socket(SocketType.REP);
        System.out.println(socket.bind("tcp://*:" + ResourceProperty.serverPort));
        new Thread(() -> {
            start();
        }).start();
    }

    public void start() {
        while (true) {
            byte[] request = null;
            ZMsg msg = ZMsg.recvMsg(socket);
//            System.out.println( socket.get());
            if (msg == null)
                continue;
            Iterator<ZFrame> frame = msg.iterator();
            while (frame.hasNext()) {
                request = frame.next().getData();
                if (num == 1) {
                    try {
                        head = new String(request, "UTF-8");
                    } catch (UnsupportedEncodingException e) {
                        log.error("解析客户端数据出错:" + e);
                    }
                }
                num++;
            }
            try {
                dealAlarmParams(request);
            } catch (Exception e) {
                log.error("处理客户端数据出错:" + e);
                socket.send("图片处理失败;");
            }
        }
    }

    private static void dealAlarmParams(byte[] data) throws Exception {
        // 解析数据
        AlarmParams alarmParams = Constant.objectMapper.readValue(data, AlarmParams.class);
        if (alarmParams.getAlarmType() == 0) {
            log.info("与客户端建立连接成功;");
            socket.send(0 + "");
            return;
        }
        String path = FileUtils.saveImageData(alarmParams.getImage(), name);
        String alarmTime = getTime(alarmParams.getName(), name);
        if (StringUtils.isBlank(path)) {
            socket.send("save error");
            return;
        }
    }
}

其中FileUtils.saveImageData(alarmParams.getImage(), name);部分代码
由于客户端传输的可能是视频或者图片,这里还针对文件名结尾的.mp4或者.jpeg的不同作出对应处理;
其中由于接受的是海康的视频,直接取得流,转成视频以后不可用(具体原理自行百度),这边还通过ffmpeg对流进行了进一步处理;

/**
 * @author zy
 * @className:FileUtils
 * @Description: 文件工具类
 * @Date 2021/9/22 10:30
 */
public class FileUtils {
    private static Logger log = Logger.getLogger(FileUtils.class);

    public static String saveImageData(byte[] data, String target) {
        if (target.equals(".mp4")) {
            return saveVideo(DateUtil.formats.format(new Date()) + target, data);
        }
        return saveImageData(DateUtil.formats.format(new Date()) + target, data);
    }

    /**
     * @author zy
     * @Description 保存转码视频
     * @Param [s, data]
     * @return java.lang.String
     **/
    private static String saveVideo(String s, byte[] data) {
        // 保存接收文件
        String input = ResourceProperty.savePath + saveImageData("temp/" + s, data);
        String output = ResourceProperty.savePath + Constant.getImageDate() + s;
        String res =null;
        // 生成新文件
        if ( !StringUtils.isBlank(processMP4(input, output))){
            // 删除源文件
            delOriginalFile(input);
            // 新文件路径
            res = output.replace(ResourceProperty.savePath, "");
        }
        return res;
    }

	/**
     * @author zy
     * @Description 保存图片信息
     * @Param [s, data]
     * @return java.lang.String
     **/
    public static String saveImageData(String imageName, byte[] data) {
        FileOutputStream outputStream = null;
        String path = null;
        try {
            /*if (StringUtils.isBlank(imageName) || data == null || data.length <= 0) {
                return path;
            }*/
            // 获取图片文件保存的域路径,以及拼接文件名,形成完整的路径
            String domainPath = Constant.getImageDate();
            String filePath = ResourceProperty.savePath + domainPath + imageName;
            File file = new File(filePath);
            if (!file.getParentFile().exists()) {
                // 如果目标文件所在的文件夹不存在,则创建父文件夹
                log.debug("图片保存文件所在目录不存在,准备创建它!");
                // 判断创建目录是否成功
                if (!file.getParentFile().mkdirs()) {
                    log.error("创建图片保存文件所在的目录失败!");
                    return null;
                }
            }
            if (!file.exists()) {
                file.createNewFile();
                log.debug("目标文件创建成功: " + filePath);
            } else {
                log.info("目标文件创建失败");
                return null;
            }
            outputStream = new FileOutputStream(file);
            outputStream.write(data);
            outputStream.flush();
            // 图片保存以后才给返回数据赋值图片的存储路径,避免图片存储异常,提前赋值图片路径,导致后期查询异常
            path = domainPath + imageName;
            log.info("文件保存在:[" + filePath + "]");
        } catch (Exception e) {
            log.error("保存图片失败:" + e);
        } finally {
            IOUtils.closeQuietly(outputStream);
            outputStream = null;
            // 设置文件权限
//            Constant.setfileSystemPermission();

        }
        return path;
    }

    /**
     * 删除文件夹下的指定以外的文件
     *
     * @author zy
     * @param filepath
     * @param prefix 删除以外文件的后缀
     */
    public static void deleteExceptfile(String filepath, String... prefix) {
        File file = new File(filepath);
        setFileDirPermission(file);
        if (!file.isDirectory()) {
            String name = file.getName().toLowerCase();
            for (int i = 0, length = prefix.length; i < length; i++) {
                if (!name.endsWith(prefix[i].toLowerCase())) {
                    System.out.println("path=" + file.getPath());
                    file.delete();
                }
            }
        } else if (file.isDirectory()) {
            String[] filelist = file.list();
            for (int i = 0; i < filelist.length; i++) {
                File readfile = new File(filepath + File.separator + filelist[i]);
                if (!readfile.isDirectory()) {
                    String name = readfile.getName().toLowerCase();
                    for (int j = 0, length = prefix.length; j < length; j++) {
                        if (!name.endsWith(prefix[j].toLowerCase())) {
                            System.out.println("path=" + readfile.getPath());
                            readfile.delete();
                        }
                    }
                } else if (readfile.isDirectory()) {
                    deleteExceptfile(filepath + File.separator + filelist[i], prefix);
                }
            }
        }
    }

    /**
     * @author zy
     * @param filepath
     * @param prefix 删除文件的后缀
     */
    public static void deletefile(String filepath, String... prefix) {
        File file = new File(filepath);
        setFileDirPermission(file);
        if (!file.isDirectory()) {
            String name = file.getName().toLowerCase();
            for (int i = 0, length = prefix.length; i < length; i++) {
                if (name.endsWith(prefix[i].toLowerCase())) {
                    System.out.println("path=" + file.getPath());
                    file.delete();
                }
            }
        } else if (file.isDirectory()) {
            String[] filelist = file.list();
            for (int i = 0; i < filelist.length; i++) {
                File readfile = new File(filepath + File.separator + filelist[i]);
                if (!readfile.isDirectory()) {
                    String name = readfile.getName().toLowerCase();
                    for (int j = 0, length = prefix.length; j < length; j++) {
                        if (name.endsWith(prefix[j].toLowerCase())) {
                            System.out.println("path=" + readfile.getPath());
                            readfile.delete();
                        }
                    }
                } else if (readfile.isDirectory()) {
                    deletefile(filepath + File.separator + filelist[i], prefix);
                }
            }
        }
    }

    /**
     * 文件夹设置权限
     * 文件第一层文件夹的权限设置,只要在第一层文件夹下的文件,均有权限
     * @author zy
     * @param files
     */
    public static void setFileDirPermission(File... files) {
        try {
            String os = System.getProperty("os.name").toLowerCase();
            if (os.contains("windows")) {// windows
                for (File fil : files) {
                    String path = fil.getAbsolutePath();
                    String[] split = path.split("\" + File.separator);
                    if (split.length >= 2) {
                        path = split[0] + File.separator + split[1] + File.separator;
                    }
                    File file = new File(path);
                    file.setReadable(true);// 设置可读权限
                    file.setExecutable(true);// 设置可执行权限
                    file.setWritable(true);// 设置可写权限
                }
            } else {
                for (File fil : files) {
                    String path = fil.getAbsolutePath();
                    String[] split = path.split(File.separator);
                    if (split.length >= 2) {
                        path = split[0] + File.separator + split[1] + File.separator;
                    }
                    Runtime.getRuntime().exec("sudo chmod 777 -R " + path);
                }
            }
        } catch (IOException e) {
            System.err.println("文件夹设置权限异常");
        }
    }
}

最后

以上就是快乐鸭子为你收集整理的java集成ZMQ,并使用应答模式发送接收数据;的全部内容,希望文章能够帮你解决java集成ZMQ,并使用应答模式发送接收数据;所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(41)

评论列表共有 0 条评论

立即
投稿
返回
顶部