26 Star 109 Fork 26

xxssyyyyssxx / jfinal-websocket

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
README.md 7.32 KB
一键复制 编辑 原始数据 按行查看 历史
xxssyyyyssxx 提交于 2021-04-23 16:10 . jitpack-badge

jfinal-websocket

项目介绍

jfinal-websocket to develop websocket based on javax.websocket

项目主要适用于服务端有数据变动想主动通知客户端场景,例如web网络聊天室,服务端数据变动通知web页面等。解决服务端和客户端的双向通信,可以代替ajax轮询技术。在服务端主动通知的场景下可以大幅度降低架构复杂度。 在开发WebSocket的时候一般会遇到两个问题:

  1. 如何存储Session?
  2. 如何集群?

我们一般都用一个Map来存储Session,你会在@ServerEndPoint类中见到类似public static final Map<String , Session> sessions = new ConcurrentHashMap<>();这样的代码。基于此也能开发, 但是一个很大的问题就是不易扩展。基于此考虑,本项目将针对Session的处理的代码提成接口WebSocketManager,实现了单机版的和集群的,于是在两种场景下使用方式完全一样。

public interface WebSocketManager {
    /**
     * 在容器中的名字
     */
    String WEBSOCKET_MANAGER_NAME  = "webSocketManager";
    /**
     * 根据标识获取websocket session
     * @param identifier 标识
     * @return WebSocket
     */
    WebSocket get(String identifier);

    /**
     * 放入一个 websocket session
     * @param identifier 标识
     * @param webSocket websocket
     */
    void put(String identifier, WebSocket webSocket);

    /**
     * 删除
     * @param identifier 标识
     */
    void remove(String identifier);

    /**
     * 获取当前机器上的保存的WebSocket
     * @return WebSocket Map
     */
    Map<String , WebSocket> localWebSocketMap();

    /**
     * 统计所有在线人数
     * @return 所有在线人数
     */
    default int size(){
        return localWebSocketMap().size();
    }

    /**
     * 给某人发送消息
     * @param identifier 标识
     * @param message 消息
     */
    void sendMessage(String identifier, String message);

    /**
     * 广播
     * @param message 消息
     */
    void broadcast(String message);

    /**
     * WebSocket接收到消息的函数调用
     * @param identifier 标识
     * @param message 消息内容
     */
    void onMessage(String identifier , String message);

    /**
     * 在OnMessage中判断是否是心跳,
     * 从客户端的消息判断是否是ping消息
     * @param identifier 标识
     * @param message 消息
     * @return 是否是ping消息
     */
    default boolean isPing(String identifier , String message){
        return "ping".equalsIgnoreCase(message);
    }

    /**
     * 返回心跳信息
     * @param identifier 标识
     * @param message 消息
     * @return 返回的pong消息
     */
    default String pong(String identifier , String message){
        return "pong";
    }
}

集群版的基于Redis的发布订阅功能,为什么要整这么复杂呢?不能像HttpSession一样直接存储到Redis吗?不能,因为WebSocket的Session无法序列化。java.io.NotSerializableException

使用时面向接口WebSocketManager,支持单机(基于内存)和集群(基于Redis的发布订阅)。

软件架构

1.基于 websocket 定制,主要完成的功能是WebSocket session的状态管理,具备单机和集群能力。

2.可以定制自己的 ServerEndPoint 和 WebSocketManager。

SpringBoot架构下开发WebSocket参见 https://gitee.com/xxssyyyyssxx/websocket-springboot-starter

安装教程

1.0.0版本之前使用maven中央仓库 compile 'top.jfunc.websocket:jfinal-websocket:1.0.0' 1.0.1版本之后使用jitpack maven { url 'https://jitpack.io' } compile 'com.gitee.xxssyyyyssxx:jfinal-websocket:v1.0.1'

使用方式参见 https://gitee.com/xxssyyyyssxx/jfinal-websocket-demo

使用说明

1.配置WebSocketHandler用于排除JFinal的路由拦截

```
/**
 * 接收处理跳转
 */
@Override
public void configHandler(Handlers me) {
      me.add(new WebSocketHandler("^/websocket"));
}
```

2.配置WebSocketManager 在JFinal的afterJFinalStart或者onStart方法中调用一下类的相应方法

```
public class WebSocketManagerConfig {
    /**
     * 基于内存的单机 WebSocketManager
     */
    public static void configMemory(){
        WebSocketManagerUtil.setWebSocketManager(new MemWebSocketManager());
    }
    
    /**
     * 基于Redis的集群 WebSocketManager
     */
    public static void configCluster(Jedis jedis){
        jedis.subscribe(new Subscriber() , RedisWebSocketManager.CHANNEL);
        WebSocketManagerUtil.setWebSocketManager(new RedisWebSocketManager(jedis));
    }
    
    /**
     * 配置心跳监测
     * @param period 多长时间执行一次
     * @param timeSpan 时间间隔
     * @param errorTolerant 错误容忍次数
     * @param todoAtRemoved 干什么
     */
    public static void configHeartBeatCheck(int period , long timeSpan , int errorTolerant , TodoAtRemoved todoAtRemoved){
        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(()->{
            new WebSocketHeartBeatChecker().check(
                    WebSocketManagerUtil.getWebSocketManager() ,
                    timeSpan , errorTolerant, todoAtRemoved);
        } , 0 , period , TimeUnit.SECONDS);
    }
}
```

3.配置端点,调用相应的方法

```
@ServerEndpoint("/websocket/{identifier}")
public class WebSocketServerEndPoint extends BaseWebSocketEndpoint {
    @OnOpen
    public void onOpen(Session session , @PathParam(IDENTIFIER) String identifier) {
    }
    @OnClose
    public void onClose(Session session , @PathParam(IDENTIFIER) String identifier) {
    }
    @OnMessage
    public void onMessage(String message, Session session , @PathParam(IDENTIFIER) String identifier) {
    }
}
```

4.使用WebSocketManagerUtil获取到WebSocketManager

5.如果需要,可以写一个监听器用以监听WebSocket连接或者断开事件,做进一步处理[基于如梦技术 https://gitee.com/596392912/JFinal-event 改造]

5.1首先写个监听器


public class DemoListener implements top.jfunc.common.event.core.ApplicationListener<top.jfunc.websocket.WebSocketConnectEvent/WebSocketCloseEvent> {
    @Override
    public void onApplicationEvent(top.jfunc.websocket.WebSocketConnectEvent/WebSocketCloseEvent event) {
        //do with event
    }
}

5.2然后在afterJFinalStart或者onStart方法中初始化扫描此监听器


new EventInitializer().scanPackage("xxx listener包")/addListener(xxx listener).async().start();

6.如果使用了 ** Nginx ** 作为负载均衡器,则需要在配置中添加

```
Nginx反向代理要支持WebSocket,需要配置几个header,否则连接的时候就报404
       proxy_http_version 1.1;
       proxy_set_header Upgrade $http_upgrade;
       proxy_set_header Connection "upgrade";
       proxy_read_timeout 3600s; //这个时间不长的话就容易断开连接
```

此项目理论上可以应用于所有Web框架,而不仅仅适用于JFinal,只需要在某个时刻调用配置类的相应方法即可

Java
1
https://gitee.com/xxssyyyyssxx/jfinal-websocket.git
git@gitee.com:xxssyyyyssxx/jfinal-websocket.git
xxssyyyyssxx
jfinal-websocket
jfinal-websocket
master

搜索帮助