我是靠谱客的博主 平常西装,最近开发中收集的这篇文章主要介绍CIM即时通讯源码初步解析(一款个人推荐的带集群的开源项目),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

BindMessageListener

如下代码作为理解入口,安卓、IOS、web在连接成功后才会发起所谓登录,登录时会携带必要信息,例如个人账号唯一标识uid、设备类型deviceId等字段发起登录请求,目前重写的onMessage回调方法中只是对登录逻辑进行处理,若要实现单聊及群聊需要额外拓展参数msgType(信息参数类型),根据信息类型作出相应的逻辑处理,在登录逻辑中:
1.Session session = JSONUtils.fromJson(redisMessage.getBody(), Session.class); String uid = session.getUid(); 获取uid,后面根据uid及设备限定类型过滤筛选出此账号的所有频道(因为一个账号可以在不同终端登录,而同一终端只能一个账号登录)
2.channelList.removeIf(channel -> session.getNid().equals(channel.attr(ChannelAttr.ID).get()));意思是将要通知下线的频道集合中过滤掉本次连接的频道,不然也会通知刚连接上的频道
3.Collection<Channel> 相同账号不同终端的频道都放在此
4.@Resource private SessionGroup sessionGroup;sessionGroup是存放所有频道信息的容器,后续单聊、群聊拓展需要从此理解及开发逻辑

package com.farsunset.cim.component.message;

import com.farsunset.cim.entity.Session;
import com.farsunset.cim.sdk.server.constant.ChannelAttr;
import com.farsunset.cim.sdk.server.group.SessionGroup;
import com.farsunset.cim.sdk.server.model.Message;
import com.farsunset.cim.util.JSONUtils;
import io.netty.channel.Channel;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
 * 集群环境下,监控多设备登录情况,控制是否其余终端下线的逻辑
 */
@Component
public class BindMessageListener implements MessageListener {

    private static final String FORCE_OFFLINE_ACTION = "999";

    private static final String SYSTEM_ID = "0";

    /*
     一个账号只能在同一个类型的终端登录
     如: 多个android或ios不能同时在线
         一个android或ios可以和web,桌面同时在线
     */
    private final Map<String,String[]> conflictMap = new HashMap<>();

    @Resource
    private SessionGroup sessionGroup;

    public BindMessageListener(){
        conflictMap.put(Session.CHANNEL_ANDROID,new String[]{Session.CHANNEL_ANDROID,Session.CHANNEL_IOS});
        conflictMap.put(Session.CHANNEL_IOS,new String[]{Session.CHANNEL_ANDROID,Session.CHANNEL_IOS});
        conflictMap.put(Session.CHANNEL_WINDOWS,new String[]{Session.CHANNEL_WINDOWS,Session.CHANNEL_WEB,Session.CHANNEL_MAC});
        conflictMap.put(Session.CHANNEL_WEB,new String[]{Session.CHANNEL_WINDOWS,Session.CHANNEL_WEB,Session.CHANNEL_MAC});
        conflictMap.put(Session.CHANNEL_MAC,new String[]{Session.CHANNEL_WINDOWS,Session.CHANNEL_WEB,Session.CHANNEL_MAC});
    }
    @Override
    public void onMessage(org.springframework.data.redis.connection.Message redisMessage, byte[] bytes) {

        Session session = JSONUtils.fromJson(redisMessage.getBody(), Session.class);
        String uid = session.getUid();
        String[] conflictChannels = conflictMap.get(session.getChannel());

        Collection<Channel> channelList = sessionGroup.find(uid,conflictChannels);

        channelList.removeIf(channel -> session.getNid().equals(channel.attr(ChannelAttr.ID).get()));

        /*
         * 获取到其他在线的终端连接,提示账号再其他终端登录
         */
        channelList.forEach(channel -> {

            if (Objects.equals(session.getDeviceId(),channel.attr(ChannelAttr.DEVICE_ID).get())){
                channel.close();
                return;
            }

            Message message = new Message();
            message.setAction(FORCE_OFFLINE_ACTION);
            message.setReceiver(uid);
            message.setSender(SYSTEM_ID);
            message.setContent(session.getDeviceName());
            channel.writeAndFlush(message);
            channel.close();
        });


    }
}

netty即时通讯SDK部分源码

package com.farsunset.cim.sdk.server.group;

import com.farsunset.cim.sdk.server.constant.ChannelAttr;
import com.farsunset.cim.sdk.server.model.Message;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Predicate;
import java.util.stream.Collectors;

public class SessionGroup extends ConcurrentHashMap<String, Collection<Channel>> {

    private static final Collection<Channel> EMPTY_LIST = new LinkedList<>();

    private final transient ChannelFutureListener remover = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future){
            future.removeListener(this);
            remove(future.channel());
        }
    };

    protected String getKey(Channel channel){
        return channel.attr(ChannelAttr.UID).get();
    }

    public void remove(Channel channel){

        String uid = getKey(channel);

        if(uid == null){
            return;
        }

        Collection<Channel> collections = getOrDefault(uid,EMPTY_LIST);

        collections.remove(channel);

        if (collections.isEmpty()){
            remove(uid);
        }
    }


    public void add(Channel channel){

        String uid = getKey(channel);

        if (uid == null || !channel.isActive()){
            return;
        }

        channel.closeFuture().addListener(remover);

        Collection<Channel> collections = this.putIfAbsent(uid,new ConcurrentLinkedQueue<>(Collections.singleton(channel)));
        if (collections != null){
            collections.add(channel);
        }

        if (!channel.isActive()){
            remove(channel);
        }
    }


    public void write(String key,Message message){
        find(key).forEach(channel -> channel.writeAndFlush(message));
    }

    public void write(String key, Message message, Predicate<Channel> matcher){
        find(key).stream().filter(matcher).forEach(channel -> channel.writeAndFlush(message));
    }

    public void write(String key, Message message, Collection<String> excludedSet){
        find(key).stream().filter(channel -> excludedSet == null || !excludedSet.contains(channel.attr(ChannelAttr.UID).get())).forEach(channel -> channel.writeAndFlush(message));
    }

    public void write(Message message){
        this.write(message.getReceiver(),message);
    }

    public Collection<Channel> find(String key){
        return this.getOrDefault(key,EMPTY_LIST);
    }

    public Collection<Channel> find(String key,String... channel){
        List<String> channels = Arrays.asList(channel);
        return find(key).stream().filter(item -> channels.contains(item.attr(ChannelAttr.CHANNEL).get())).collect(Collectors.toList());
    }
}

安卓发送信息接口

package com.farsunset.cim.http;

import retrofit2.Call;
import retrofit2.http.Field;
import retrofit2.http.FormUrlEncoded;
import retrofit2.http.POST;

public interface MessageService {

    @POST("/send")
    @FormUrlEncoded
    Call<Void> send(@Field("sender") String sender,
                            @Field("receiver") String receiver,
                            @Field("action") String action,
                            @Field("content") String content);
}

最后

以上就是平常西装为你收集整理的CIM即时通讯源码初步解析(一款个人推荐的带集群的开源项目)的全部内容,希望文章能够帮你解决CIM即时通讯源码初步解析(一款个人推荐的带集群的开源项目)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(56)

评论列表共有 0 条评论

立即
投稿
返回
顶部