概述
前言
由于项目需要,目前需要使用Emqttd搭建一个聊天室,自己写了个demo,特记录下来
代码
使用IDEA搭建一个Spring Boot工程
pom.xml文件,此处我只列出dependencies部分
com.alibaba
fastjson
1.2.49
org.fusesource.mqtt-client
mqtt-client
1.14
org.springframework.boot
spring-boot-starter-freemarker
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-devtools
runtime
org.projectlombok
lombok
true
org.springframework.boot
spring-boot-starter-test
test
application.properties文件
server.port=8080
emqt.server=填你的Emqttd服务器地址
emqt.port=1883
emqt.host=tcp://${emqt.server}:${emqt.port}
emqt.clientId=spring-boot-client-${server.port}
emqt.subcribe.topic=/live_test/#
EmqtConfig.java,用于初始化MQTT客户端,配置监听器等
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.hawtdispatch.DispatchQueue;
import org.fusesource.mqtt.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* @author yangxin
* @date 2019/4/23
*/
@Configuration
public class EmqtConfig {
@Value("${emqt.host}")
private String host;
@Value("${emqt.clientId}")
private String clientId;
@Value("${emqt.subcribe.topic}")
private String topicName;
private static Lock lock = new ReentrantLock();
private static Map onlineMap = new ConcurrentHashMap<>();
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Bean
public MQTT mqtt() {
try {
logger.info("====连接到mqtt===");
MQTT mqtt = new MQTT();
mqtt.setHost(host);
mqtt.setClientId(clientId);
mqtt.setReconnectDelay(100);
mqtt.setKeepAlive((short) 20);
return mqtt;
} catch (Exception e) {
logger.error(e.getMessage(), e);
return null;
}
}
@Bean
public CallbackConnection callbackConnection(MQTT mqtt) {
try {
CallbackConnection connection = mqtt.callbackConnection();
connection.listener(new Listener() {
@Override
public void onConnected() {
logger.info("连接成功");
}
@Override
public void onDisconnected() {
logger.info("断开连接");
}
@Override
public void onPublish(UTF8Buffer topic, Buffer message, Runnable callback) {
try {
lock.lock();
logger.info("收到topic:" + topic.toString() + "消息为:" + message.utf8());
//表示监听成功
String topicName = topic.toString();
if (topicName.startsWith("/liveOnline")) {
Long liveId = findNum(topicName);
Integer integer = onlineMap.get(liveId);
if (integer == null) {
integer = 0;
}
onlineMap.put(liveId, ++integer);
}
}finally {
callback.run();
lock.unlock();
}
}
@Override
public void onFailure(Throwable throwable) {
logger.error(throwable.getMessage(), throwable);
}
});
connection.connect(new Callback() {
@Override
public void onSuccess(Void aVoid) {
//连接成功后会默认订阅主题($client/mengsu)
logger.info(clientId + "连接成功");
}
@Override
public void onFailure(Throwable throwable) {
logger.error(throwable.getMessage(), throwable);
}
});
//新建一个主题
Topic[] topic = new Topic[]{new Topic(topicName, QoS.EXACTLY_ONCE),new Topic("/liveOnline/#",QoS.EXACTLY_ONCE)};
connection.subscribe(topic, new Callback() {
@Override
public void onSuccess(byte[] bytes) {
logger.info(clientId + " topic订阅成功");
}
@Override
public void onFailure(Throwable throwable) {
logger.info(clientId + " topic订阅 失败");
logger.error(throwable.getMessage(), throwable);
}
});
/* connection.publish("/live/1", "这是服务器自己发出来的消息".getBytes(), QoS.AT_LEAST_ONCE, true,new Callback() {
@Override
public void onSuccess(Void aVoid) {
System.out.println("发送成功");
}
@Override
public void onFailure(Throwable throwable) {
throwable.printStackTrace();
}
});*/
DispatchQueue dispatchQueue = connection.getDispatchQueue();
dispatchQueue.execute(new Runnable() {
public void run() {
//在这里进行相应的订阅、发布、停止连接等等操作
System.out.println("在这里进行相应的订阅、发布、停止连接等等操作");
}
});
return connection;
} catch (Exception e) {
logger.error(e.getMessage(), e);
return null;
}
}
private static Long findNum(String str) {
String regEx="[^0-9]";
Pattern p = Pattern.compile(regEx);
Matcher m = p.matcher(str);
String result = m.replaceAll("").trim();
return Long.valueOf(result);
}
public int getOnlineCount(Long liveId){
try {
lock.lock();
Integer integer = onlineMap.get(liveId);
return integer == null ? 0 : integer;
}finally {
lock.unlock();
}
}
}
至此,MQTT客户端就配置好了,下面是controller
IndexController.java
import com.alibaba.fastjson.JSONObject;
import com.example.emqtdemo.emqt.EmqtConfig;
import org.fusesource.mqtt.client.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
/**
* @author yangxin
* @date 2019/4/23
*/
@Controller
public class IndexController {
@Autowired
private HttpServletRequest request;
@Resource
private CallbackConnection callbackConnection;
@Resource
private EmqtConfig emqtConfig;
@RequestMapping("/")
public String index(Long liveId,String username){
request.setAttribute("liveId", liveId);
request.setAttribute("username", username);
request.setAttribute("clientId","liveroom" + liveId + username);
request.setAttribute("topic","/live_test/" + liveId);
return "index";
}
// 发送消息
@RequestMapping("/send")
@ResponseBody
public Object send(String topic, String clientId,String msg) {
JSONObject content = new JSONObject();
content.put("clientId", clientId);
content.put("msg", msg);
callbackConnection.publish(topic, content.toJSONString().getBytes(), QoS.EXACTLY_ONCE, false,new Callback() {
@Override
public void onSuccess(Void aVoid) {
}
@Override
public void onFailure(Throwable throwable) {
}
});
JSONObject jsonObject = new JSONObject();
jsonObject.put("success", true);
jsonObject.put("content", msg);
return jsonObject;
}
// 获取在线人数
@RequestMapping("/getOnlineCount")
@ResponseBody
public Object getOnlineCount(Long liveId) {
int onlineCount = emqtConfig.getOnlineCount(liveId);
JSONObject jsonObject = new JSONObject();
jsonObject.put("success", true);
jsonObject.put("content", onlineCount);
return jsonObject;
}
}
index.ftl文件,用于展示
title${username!}成功进入了${liveId!}直播间
当前在线人数:0
输入消息:
发送
取消
var clientId = '${clientId!}';
var username = '${username!}';
var topic = '${topic!}';
var liveId = '${liveId!}';
index.js,用于控制页面的一些逻辑
$(document).ready(function () {
// 将在全局初始化一个 mqtt 变量
console.log(mqtt)
// 连接选项
const options = {
connectTimeout: 4000, // 超时时间
// 认证信息
clientId: clientId, // 客户端id 这个自己填 尽量唯一
username: username, // 取当前用户的名字
password: '123',
}
const client = mqtt.connect('ws://your address:8083/mqtt', options)
// let topic = "/live_dev/${liveId}"
client.on('connect', (e) => {
console.log('成功连接服务器')
$("#connectionTip").html("成功连接到消息服务器")
// 订阅一个主题
client.subscribe(topic, { qos: 2 }, (error) => {
if (!error) {
console.log('订阅成功')
}
},onSubscribeSuccess)
})
/*
// 取消订阅
client.unubscribe(
// topic, topic Array, topic Array-Onject
'hello',
onUnubscribeSuccess,
)
*/
client.on('reconnect', (error) => {
console.log('正在重连:' + error)
})
client.on('error', (error) => {
console.log('连接失败:' + error)
})
function onSubscribeSuccess() {
client.publish('/liveOnline/' + liveId, liveId, { qos: 2, rein: false }, (error) => {
console.log(error || '发布成功')
})
}
function onUnubscribeSuccess() {
console.log("onUnubscribeSuccess")
}
// 监听接收消息事件
client.on('message', (topic, message,callback) => {
console.log('收到来自', topic, '的消息', message.toString());
let html = ` 收到来自 ${topic} 的消息:${ message.toString()}
`;
$("#msgContent").append(html);
})
$("#send").click(function () {
let conent = $("input[name='content']").val();
$.post("/send",{clientId:clientId,msg:conent,topic:topic},function (data) {
if (data.success){
console.log("发送成功")
}
},'json')
})
setInterval(function () {
$.post("/getOnlineCount",{liveId:liveId},function (data) {
if (data.success){
$("#onlineCount").html(data.content)
}
},'json')
},3000)
})
参考的文章
代码地址
最后
以上就是害怕鼠标为你收集整理的emqttd java 即时通讯_使用Emqttd搭建一个聊天室的全部内容,希望文章能够帮你解决emqttd java 即时通讯_使用Emqttd搭建一个聊天室所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复