我是靠谱客的博主 害怕鼠标,最近开发中收集的这篇文章主要介绍emqttd java 即时通讯_使用Emqttd搭建一个聊天室,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

前言

由于项目需要,目前需要使用Emqttd搭建一个聊天室,自己写了个demo,特记录下来

代码

使用IDEA搭建一个Spring Boot工程

pom.xml文件,此处我只列出dependencies部分

com.alibaba

fastjson

1.2.49

org.fusesource.mqtt-client

mqtt-client

1.14

org.springframework.boot

spring-boot-starter-freemarker

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-devtools

runtime

org.projectlombok

lombok

true

org.springframework.boot

spring-boot-starter-test

test

application.properties文件

server.port=8080

emqt.server=填你的Emqttd服务器地址

emqt.port=1883

emqt.host=tcp://${emqt.server}:${emqt.port}

emqt.clientId=spring-boot-client-${server.port}

emqt.subcribe.topic=/live_test/#

EmqtConfig.java,用于初始化MQTT客户端,配置监听器等

import org.fusesource.hawtbuf.Buffer;

import org.fusesource.hawtbuf.UTF8Buffer;

import org.fusesource.hawtdispatch.DispatchQueue;

import org.fusesource.mqtt.client.*;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.atomic.AtomicInteger;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

import java.util.regex.Matcher;

import java.util.regex.Pattern;

/**

* @author yangxin

* @date 2019/4/23

*/

@Configuration

public class EmqtConfig {

@Value("${emqt.host}")

private String host;

@Value("${emqt.clientId}")

private String clientId;

@Value("${emqt.subcribe.topic}")

private String topicName;

private static Lock lock = new ReentrantLock();

private static Map onlineMap = new ConcurrentHashMap<>();

private final Logger logger = LoggerFactory.getLogger(this.getClass());

@Bean

public MQTT mqtt() {

try {

logger.info("====连接到mqtt===");

MQTT mqtt = new MQTT();

mqtt.setHost(host);

mqtt.setClientId(clientId);

mqtt.setReconnectDelay(100);

mqtt.setKeepAlive((short) 20);

return mqtt;

} catch (Exception e) {

logger.error(e.getMessage(), e);

return null;

}

}

@Bean

public CallbackConnection callbackConnection(MQTT mqtt) {

try {

CallbackConnection connection = mqtt.callbackConnection();

connection.listener(new Listener() {

@Override

public void onConnected() {

logger.info("连接成功");

}

@Override

public void onDisconnected() {

logger.info("断开连接");

}

@Override

public void onPublish(UTF8Buffer topic, Buffer message, Runnable callback) {

try {

lock.lock();

logger.info("收到topic:" + topic.toString() + "消息为:" + message.utf8());

//表示监听成功

String topicName = topic.toString();

if (topicName.startsWith("/liveOnline")) {

Long liveId = findNum(topicName);

Integer integer = onlineMap.get(liveId);

if (integer == null) {

integer = 0;

}

onlineMap.put(liveId, ++integer);

}

}finally {

callback.run();

lock.unlock();

}

}

@Override

public void onFailure(Throwable throwable) {

logger.error(throwable.getMessage(), throwable);

}

});

connection.connect(new Callback() {

@Override

public void onSuccess(Void aVoid) {

//连接成功后会默认订阅主题($client/mengsu)

logger.info(clientId + "连接成功");

}

@Override

public void onFailure(Throwable throwable) {

logger.error(throwable.getMessage(), throwable);

}

});

//新建一个主题

Topic[] topic = new Topic[]{new Topic(topicName, QoS.EXACTLY_ONCE),new Topic("/liveOnline/#",QoS.EXACTLY_ONCE)};

connection.subscribe(topic, new Callback() {

@Override

public void onSuccess(byte[] bytes) {

logger.info(clientId + " topic订阅成功");

}

@Override

public void onFailure(Throwable throwable) {

logger.info(clientId + " topic订阅 失败");

logger.error(throwable.getMessage(), throwable);

}

});

/* connection.publish("/live/1", "这是服务器自己发出来的消息".getBytes(), QoS.AT_LEAST_ONCE, true,new Callback() {

@Override

public void onSuccess(Void aVoid) {

System.out.println("发送成功");

}

@Override

public void onFailure(Throwable throwable) {

throwable.printStackTrace();

}

});*/

DispatchQueue dispatchQueue = connection.getDispatchQueue();

dispatchQueue.execute(new Runnable() {

public void run() {

//在这里进行相应的订阅、发布、停止连接等等操作

System.out.println("在这里进行相应的订阅、发布、停止连接等等操作");

}

});

return connection;

} catch (Exception e) {

logger.error(e.getMessage(), e);

return null;

}

}

private static Long findNum(String str) {

String regEx="[^0-9]";

Pattern p = Pattern.compile(regEx);

Matcher m = p.matcher(str);

String result = m.replaceAll("").trim();

return Long.valueOf(result);

}

public int getOnlineCount(Long liveId){

try {

lock.lock();

Integer integer = onlineMap.get(liveId);

return integer == null ? 0 : integer;

}finally {

lock.unlock();

}

}

}

至此,MQTT客户端就配置好了,下面是controller

IndexController.java

import com.alibaba.fastjson.JSONObject;

import com.example.emqtdemo.emqt.EmqtConfig;

import org.fusesource.mqtt.client.*;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Controller;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;

import javax.servlet.http.HttpServletRequest;

/**

* @author yangxin

* @date 2019/4/23

*/

@Controller

public class IndexController {

@Autowired

private HttpServletRequest request;

@Resource

private CallbackConnection callbackConnection;

@Resource

private EmqtConfig emqtConfig;

@RequestMapping("/")

public String index(Long liveId,String username){

request.setAttribute("liveId", liveId);

request.setAttribute("username", username);

request.setAttribute("clientId","liveroom" + liveId + username);

request.setAttribute("topic","/live_test/" + liveId);

return "index";

}

// 发送消息

@RequestMapping("/send")

@ResponseBody

public Object send(String topic, String clientId,String msg) {

JSONObject content = new JSONObject();

content.put("clientId", clientId);

content.put("msg", msg);

callbackConnection.publish(topic, content.toJSONString().getBytes(), QoS.EXACTLY_ONCE, false,new Callback() {

@Override

public void onSuccess(Void aVoid) {

}

@Override

public void onFailure(Throwable throwable) {

}

});

JSONObject jsonObject = new JSONObject();

jsonObject.put("success", true);

jsonObject.put("content", msg);

return jsonObject;

}

// 获取在线人数

@RequestMapping("/getOnlineCount")

@ResponseBody

public Object getOnlineCount(Long liveId) {

int onlineCount = emqtConfig.getOnlineCount(liveId);

JSONObject jsonObject = new JSONObject();

jsonObject.put("success", true);

jsonObject.put("content", onlineCount);

return jsonObject;

}

}

index.ftl文件,用于展示

title

${username!}成功进入了${liveId!}直播间

当前在线人数:0


输入消息:

发送


取消

var clientId = '${clientId!}';

var username = '${username!}';

var topic = '${topic!}';

var liveId = '${liveId!}';

index.js,用于控制页面的一些逻辑

$(document).ready(function () {

// 将在全局初始化一个 mqtt 变量

console.log(mqtt)

// 连接选项

const options = {

connectTimeout: 4000, // 超时时间

// 认证信息

clientId: clientId, // 客户端id 这个自己填 尽量唯一

username: username, // 取当前用户的名字

password: '123',

}

const client = mqtt.connect('ws://your address:8083/mqtt', options)

// let topic = "/live_dev/${liveId}"

client.on('connect', (e) => {

console.log('成功连接服务器')

$("#connectionTip").html("成功连接到消息服务器")

// 订阅一个主题

client.subscribe(topic, { qos: 2 }, (error) => {

if (!error) {

console.log('订阅成功')

}

},onSubscribeSuccess)

})

/*

// 取消订阅

client.unubscribe(

// topic, topic Array, topic Array-Onject

'hello',

onUnubscribeSuccess,

)

*/

client.on('reconnect', (error) => {

console.log('正在重连:' + error)

})

client.on('error', (error) => {

console.log('连接失败:' + error)

})

function onSubscribeSuccess() {

client.publish('/liveOnline/' + liveId, liveId, { qos: 2, rein: false }, (error) => {

console.log(error || '发布成功')

})

}

function onUnubscribeSuccess() {

console.log("onUnubscribeSuccess")

}

// 监听接收消息事件

client.on('message', (topic, message,callback) => {

console.log('收到来自', topic, '的消息', message.toString());

let html = ` 收到来自 ${topic} 的消息:${ message.toString()}
`;

$("#msgContent").append(html);

})

$("#send").click(function () {

let conent = $("input[name='content']").val();

$.post("/send",{clientId:clientId,msg:conent,topic:topic},function (data) {

if (data.success){

console.log("发送成功")

}

},'json')

})

setInterval(function () {

$.post("/getOnlineCount",{liveId:liveId},function (data) {

if (data.success){

$("#onlineCount").html(data.content)

}

},'json')

},3000)

})

参考的文章

代码地址

最后

以上就是害怕鼠标为你收集整理的emqttd java 即时通讯_使用Emqttd搭建一个聊天室的全部内容,希望文章能够帮你解决emqttd java 即时通讯_使用Emqttd搭建一个聊天室所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(37)

评论列表共有 0 条评论

立即
投稿
返回
顶部