概述
BindMessageListener
如下代码作为理解入口,安卓、IOS、web在连接成功后才会发起所谓登录,登录时会携带必要信息,例如个人账号唯一标识uid、设备类型deviceId等字段发起登录请求,目前重写的onMessage回调方法中只是对登录逻辑进行处理,若要实现单聊及群聊需要额外拓展参数msgType(信息参数类型),根据信息类型作出相应的逻辑处理,在登录逻辑中:
1.Session session = JSONUtils.fromJson(redisMessage.getBody(), Session.class); String uid = session.getUid();
获取uid,后面根据uid及设备限定类型过滤筛选出此账号的所有频道(因为一个账号可以在不同终端登录,而同一终端只能一个账号登录)
2.channelList.removeIf(channel -> session.getNid().equals(channel.attr(ChannelAttr.ID).get()));
意思是将要通知下线的频道集合中过滤掉本次连接的频道,不然也会通知刚连接上的频道
3.Collection<Channel> 相同账号不同终端的频道都放在此
4.@Resource private SessionGroup sessionGroup;
sessionGroup是存放所有频道信息的容器,后续单聊、群聊拓展需要从此理解及开发逻辑
package com.farsunset.cim.component.message;
import com.farsunset.cim.entity.Session;
import com.farsunset.cim.sdk.server.constant.ChannelAttr;
import com.farsunset.cim.sdk.server.group.SessionGroup;
import com.farsunset.cim.sdk.server.model.Message;
import com.farsunset.cim.util.JSONUtils;
import io.netty.channel.Channel;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/**
* 集群环境下,监控多设备登录情况,控制是否其余终端下线的逻辑
*/
@Component
public class BindMessageListener implements MessageListener {
private static final String FORCE_OFFLINE_ACTION = "999";
private static final String SYSTEM_ID = "0";
/*
一个账号只能在同一个类型的终端登录
如: 多个android或ios不能同时在线
一个android或ios可以和web,桌面同时在线
*/
private final Map<String,String[]> conflictMap = new HashMap<>();
@Resource
private SessionGroup sessionGroup;
public BindMessageListener(){
conflictMap.put(Session.CHANNEL_ANDROID,new String[]{Session.CHANNEL_ANDROID,Session.CHANNEL_IOS});
conflictMap.put(Session.CHANNEL_IOS,new String[]{Session.CHANNEL_ANDROID,Session.CHANNEL_IOS});
conflictMap.put(Session.CHANNEL_WINDOWS,new String[]{Session.CHANNEL_WINDOWS,Session.CHANNEL_WEB,Session.CHANNEL_MAC});
conflictMap.put(Session.CHANNEL_WEB,new String[]{Session.CHANNEL_WINDOWS,Session.CHANNEL_WEB,Session.CHANNEL_MAC});
conflictMap.put(Session.CHANNEL_MAC,new String[]{Session.CHANNEL_WINDOWS,Session.CHANNEL_WEB,Session.CHANNEL_MAC});
}
@Override
public void onMessage(org.springframework.data.redis.connection.Message redisMessage, byte[] bytes) {
Session session = JSONUtils.fromJson(redisMessage.getBody(), Session.class);
String uid = session.getUid();
String[] conflictChannels = conflictMap.get(session.getChannel());
Collection<Channel> channelList = sessionGroup.find(uid,conflictChannels);
channelList.removeIf(channel -> session.getNid().equals(channel.attr(ChannelAttr.ID).get()));
/*
* 获取到其他在线的终端连接,提示账号再其他终端登录
*/
channelList.forEach(channel -> {
if (Objects.equals(session.getDeviceId(),channel.attr(ChannelAttr.DEVICE_ID).get())){
channel.close();
return;
}
Message message = new Message();
message.setAction(FORCE_OFFLINE_ACTION);
message.setReceiver(uid);
message.setSender(SYSTEM_ID);
message.setContent(session.getDeviceName());
channel.writeAndFlush(message);
channel.close();
});
}
}
netty即时通讯SDK部分源码
package com.farsunset.cim.sdk.server.group;
import com.farsunset.cim.sdk.server.constant.ChannelAttr;
import com.farsunset.cim.sdk.server.model.Message;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Predicate;
import java.util.stream.Collectors;
public class SessionGroup extends ConcurrentHashMap<String, Collection<Channel>> {
private static final Collection<Channel> EMPTY_LIST = new LinkedList<>();
private final transient ChannelFutureListener remover = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future){
future.removeListener(this);
remove(future.channel());
}
};
protected String getKey(Channel channel){
return channel.attr(ChannelAttr.UID).get();
}
public void remove(Channel channel){
String uid = getKey(channel);
if(uid == null){
return;
}
Collection<Channel> collections = getOrDefault(uid,EMPTY_LIST);
collections.remove(channel);
if (collections.isEmpty()){
remove(uid);
}
}
public void add(Channel channel){
String uid = getKey(channel);
if (uid == null || !channel.isActive()){
return;
}
channel.closeFuture().addListener(remover);
Collection<Channel> collections = this.putIfAbsent(uid,new ConcurrentLinkedQueue<>(Collections.singleton(channel)));
if (collections != null){
collections.add(channel);
}
if (!channel.isActive()){
remove(channel);
}
}
public void write(String key,Message message){
find(key).forEach(channel -> channel.writeAndFlush(message));
}
public void write(String key, Message message, Predicate<Channel> matcher){
find(key).stream().filter(matcher).forEach(channel -> channel.writeAndFlush(message));
}
public void write(String key, Message message, Collection<String> excludedSet){
find(key).stream().filter(channel -> excludedSet == null || !excludedSet.contains(channel.attr(ChannelAttr.UID).get())).forEach(channel -> channel.writeAndFlush(message));
}
public void write(Message message){
this.write(message.getReceiver(),message);
}
public Collection<Channel> find(String key){
return this.getOrDefault(key,EMPTY_LIST);
}
public Collection<Channel> find(String key,String... channel){
List<String> channels = Arrays.asList(channel);
return find(key).stream().filter(item -> channels.contains(item.attr(ChannelAttr.CHANNEL).get())).collect(Collectors.toList());
}
}
安卓发送信息接口
package com.farsunset.cim.http;
import retrofit2.Call;
import retrofit2.http.Field;
import retrofit2.http.FormUrlEncoded;
import retrofit2.http.POST;
public interface MessageService {
@POST("/send")
@FormUrlEncoded
Call<Void> send(@Field("sender") String sender,
@Field("receiver") String receiver,
@Field("action") String action,
@Field("content") String content);
}
最后
以上就是平常西装为你收集整理的CIM即时通讯源码初步解析(一款个人推荐的带集群的开源项目)的全部内容,希望文章能够帮你解决CIM即时通讯源码初步解析(一款个人推荐的带集群的开源项目)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复