1 Star 0 Fork 5

Hermit / SimpleMqttPool

forked from Xenos / SimpleMqttPool 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
MulanPSL-2.0

重构 SimpleMqttPool

介绍

在《用Spring Boot实现一个简易的MQTT客户端》中介绍了一个自定义的MQTT的客户端的实现,但单一的客户端并不能满足实际的开发需求,这里基于之前实现的自定义客户端实现一个简单的连接池

设计思路

之前的版本是用一个List存储所有的连接对象,每次获取时从List中删除,释放连接时重新加入List,然后用synchronized 关键字做并发控制。再参考了部分的Druid连接池的代码后,有了这个版本的设计。

  1. 用一个数组 activeConnections 存储所有的连接,定义一个指针,指向当前数组中当前可用连接对象的最后一个下标

  2. 对池中的连接对象进行操作时,只要维持指针的位置,保证指针左边的所有对象都是可用的(滑动窗口)

  3. 从池中取走一个对象时,向左移动指针;放回一个对象时,向右移动指针

  4. 之前的设计的连接对象在发送数据后会处于忙碌的状态,只有等待回调方法执行成功后,才会处于空闲可用的状态,所以用了一个数组 busyConnections 存储这些忙碌中的连接对象,等到这些对象空闲后,才放回到可用池中

  5. 对于需要进行删除的对象同样也用一个数组 discardConnections 进行存储,以上两个数组都只需要知道连接对象在 activeConnections 的下标即可,不需要存储一个完整的对象

  6. juc 包下的类做并发控制,提高并发的效率

具体实现

  • 定义相关变量
    /**
     * 当前连接总数
     */
    private int activeCount;
    /**
     * 当前可以从池中拿取的连接数
     */
    private int poolingCount;
    /**
     * 存储所有连接的数组
     */
    private volatile Connection[] activeConnections;
    /**
     * 存储连接在 activeConnection 中的下标
     */
    private volatile Map<String, Integer> connectionIndexDict;
    /**
     * 处于忙碌状态的连接,存储 activeConnections 中连接的ID
     */
    private volatile String[] busyConnections;
    /**
     * 处于忙碌状态的连接数
     */
    private int busyCount;
    /**
     * 即将丢弃的连接, 存储 activeConnections 中连接的ID
     */
    private volatile String[] discardConnections;
    /**
     * 将要丢弃的连接数
     */
    private int discardingCount;
  • 连接池的初始化

init-pool

/**
     * 连接池初始化
     */
    public void initialize() throws PoolInitializeException {
        if (isInitialized) {
            return;
        }
        ReentrantLock lock = this.lock;
        lock.lock();
        try {

            // 一些配置文件属性的判断
            // ... ...

            // 创建数组和字典
            activeConnections = new Connection[maxSize];
            connectionIndexDict = new ConcurrentHashMap<>(maxSize);
            busyConnections = new String[maxSize];
            discardConnections = new String[2 * maxSize];

            // 创建最小数量的连接
            while (activeCount < minSize) {
                Connection connection = createConnection();
                if (connection != null) {
                    activeConnections[activeCount] = connection;
                    connectionIndexDict.put(connection.getClientId(), activeCount++);
                    poolingCount++;
                }
            }
            // 创建并启动 连接创建线程
            createAndStartCreatorThread();
            // 创建并启动 连接销毁线程
            createAndStartDestroyThread();
            // 创建并启动 连接状态刷新线程
            createAndStartRefreshThread();
            // 创建并启动 连接状态监视线程
            createAndStartWatchThread();

            // 等待所有线程启动
            initLatch.await();

            this.initTime = LocalDateTime.now();
            this.isInitialized = true;
        } catch (Exception e) {
            throw new PoolInitializeException("连接池初始化发生异常", e);
        } finally {
            lock.unlock();
            log.info("连接池初始化已完成,当前可用连接数为 " + poolingCount);
        }
    }
  • 获取连接

get-connection

    private Connection getLast() throws InterruptedException {
        try {
            // 如果当前可以弹出的连接数为 0
            while (poolingCount == 0) {
                // 唤醒创建线程
                empty.signal();
                // 当前等待获取连接的线程数 +1
                waitThreadCount++;
                try {
                    // 等待 连接创建线程 创建新的连接,获取有连接被释放时唤醒
                    notEmpty.await();
                }  finally {
                    // 执行到这里说明已经有新的连接可以被获取
                    waitThreadCount--;
                }
            }
        } catch (InterruptedException e) {
            notEmpty.signal();
            throw e;
        }
        Connection last = null;
        try {
            // 判断当前连接是否可用,不可用的连接会被放入待删除的数组中等待删除
            while (poolingCount > 0 && !testConnection(poolingCount - 1)) {
                poolingCount--;
                if (poolingCount == 0) {
                    empty.signal();
                    waitThreadCount++;
                    try {
                        notEmpty.await();
                    }
                    finally {
                        waitThreadCount--;
                    }
                }
            }
        } catch (InterruptedException e) {
            notEmpty.signal();
            throw e;
        }
        last  = activeConnections[--poolingCount];
        return last;
    }

如果当前连接对象不够,并且连接数未达到最大值,由连接创建线程去创建新的连接,过程如下

createConnection

  • 将连接放回池中

将连接放回去时,并不会直接将连接认为是可用的,而是放入到 忙碌连接数组中

    public void releaseConnection(Connection connection) {
        if (connection == null) {
            return;
        }
        lock.lock();
        try {
            String clientId = connection.getClientId();
            if (!connectionIndexDict.containsKey(clientId)) {
                return;
            }
            // 放入忙碌池中
            busyConnections[busyCount++] = clientId;
            refreshCondition.signal();
        } finally {
            lock.unlock();
        }
    }

然后由忙碌连接处理线程去做连接状态的更新操作,过程如下:

refreshConnection

  • 清除无效连接

无效连接指那些连接断开的连接或者是已经超时的连接(长时间没有被使用的连接),它由状态监视线程监视产生并放入到待删除连接数组中,删除过程如下:

closeConnection

检查连接是否可用以及监视线程的操作

    /**
     * 判断一个连接是否可用,如果不可用,则加入到待销毁列表中
     * @param connectionIndex 连接的下标
     * @return 是否可用
     */
    private boolean testConnection(int connectionIndex) {
        if (connectionIndex > activeConnections.length) {
            return false;
        }
        Connection connection = activeConnections[connectionIndex];
        long lastActiveTimeMillis = connection.lastActiveTimeMillis;
        long currMillis = System.currentTimeMillis();
        int minSize = properties.getPool().getMinPoolSize();
        long maxWaitTime = properties.getPool().getMaxWaitTime();
        boolean needDiscard = connection.isInTrouble() ||
                (activeCount > minSize && currMillis - lastActiveTimeMillis > maxWaitTime);
        if (needDiscard) {
            // 加入到销毁列表中
            discardConnections[discardingCount++] = connection.getClientId();
            // 唤醒销毁线程
            discardCondition.signal();
            return false;
        }
        return true;
    }

     /**
     * 创建并启动监视线程,定期扫描可用数组,将数组中超时的、不可用的连接移入到待销毁数组中
     */
    private void createAndStartWatchThread() {
        initLatch.countDown();
        long maxWaitTime = properties.getPool().getMaxWaitTime();
        WatcherThreadPool.THREAD_POOL.scheduledExecute(() -> {
            lock.lock();
            try {
                int minPoolSize = properties.getPool().getMinPoolSize();
                int index = poolingCount - 1;
                while (poolingCount > minPoolSize) {
                    boolean isUsableConnection = testConnection(index);
                    if (!isUsableConnection) {
                        // 一旦检查到失效连接,更新可用连接的边界
                        swapAndUpdateIndex(index, --poolingCount);
                    }
                    index--;
                }
            } finally {
                lock.unlock();
            }

        }, maxWaitTime, maxWaitTime);
    }
木兰宽松许可证, 第2版 木兰宽松许可证, 第2版 2020年1月 http://license.coscl.org.cn/MulanPSL2 您对“软件”的复制、使用、修改及分发受木兰宽松许可证,第2版(“本许可证”)的如下条款的约束: 0. 定义 “软件”是指由“贡献”构成的许可在“本许可证”下的程序和相关文档的集合。 “贡献”是指由任一“贡献者”许可在“本许可证”下的受版权法保护的作品。 “贡献者”是指将受版权法保护的作品许可在“本许可证”下的自然人或“法人实体”。 “法人实体”是指提交贡献的机构及其“关联实体”。 “关联实体”是指,对“本许可证”下的行为方而言,控制、受控制或与其共同受控制的机构,此处的控制是指有受控方或共同受控方至少50%直接或间接的投票权、资金或其他有价证券。 1. 授予版权许可 每个“贡献者”根据“本许可证”授予您永久性的、全球性的、免费的、非独占的、不可撤销的版权许可,您可以复制、使用、修改、分发其“贡献”,不论修改与否。 2. 授予专利许可 每个“贡献者”根据“本许可证”授予您永久性的、全球性的、免费的、非独占的、不可撤销的(根据本条规定撤销除外)专利许可,供您制造、委托制造、使用、许诺销售、销售、进口其“贡献”或以其他方式转移其“贡献”。前述专利许可仅限于“贡献者”现在或将来拥有或控制的其“贡献”本身或其“贡献”与许可“贡献”时的“软件”结合而将必然会侵犯的专利权利要求,不包括对“贡献”的修改或包含“贡献”的其他结合。如果您或您的“关联实体”直接或间接地,就“软件”或其中的“贡献”对任何人发起专利侵权诉讼(包括反诉或交叉诉讼)或其他专利维权行动,指控其侵犯专利权,则“本许可证”授予您对“软件”的专利许可自您提起诉讼或发起维权行动之日终止。 3. 无商标许可 “本许可证”不提供对“贡献者”的商品名称、商标、服务标志或产品名称的商标许可,但您为满足第4条规定的声明义务而必须使用除外。 4. 分发限制 您可以在任何媒介中将“软件”以源程序形式或可执行形式重新分发,不论修改与否,但您必须向接收者提供“本许可证”的副本,并保留“软件”中的版权、商标、专利及免责声明。 5. 免责声明与责任限制 “软件”及其中的“贡献”在提供时不带任何明示或默示的担保。在任何情况下,“贡献者”或版权所有者不对任何人因使用“软件”或其中的“贡献”而引发的任何直接或间接损失承担责任,不论因何种原因导致或者基于何种法律理论,即使其曾被建议有此种损失的可能性。 6. 语言 “本许可证”以中英文双语表述,中英文版本具有同等法律效力。如果中英文版本存在任何冲突不一致,以中文版为准。 条款结束 如何将木兰宽松许可证,第2版,应用到您的软件 如果您希望将木兰宽松许可证,第2版,应用到您的新软件,为了方便接收者查阅,建议您完成如下三步: 1, 请您补充如下声明中的空白,包括软件名、软件的首次发表年份以及您作为版权人的名字; 2, 请您在软件包的一级目录下创建以“LICENSE”为名的文件,将整个许可证文本放入该文件中; 3, 请将如下声明文本放入每个源文件的头部注释中。 Copyright (c) [Year] [name of copyright holder] [Software Name] is licensed under Mulan PSL v2. You can use this software according to the terms and conditions of the Mulan PSL v2. You may obtain a copy of Mulan PSL v2 at: http://license.coscl.org.cn/MulanPSL2 THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. See the Mulan PSL v2 for more details. Mulan Permissive Software License,Version 2 Mulan Permissive Software License,Version 2 (Mulan PSL v2) January 2020 http://license.coscl.org.cn/MulanPSL2 Your reproduction, use, modification and distribution of the Software shall be subject to Mulan PSL v2 (this License) with the following terms and conditions: 0. Definition Software means the program and related documents which are licensed under this License and comprise all Contribution(s). Contribution means the copyrightable work licensed by a particular Contributor under this License. Contributor means the Individual or Legal Entity who licenses its copyrightable work under this License. Legal Entity means the entity making a Contribution and all its Affiliates. Affiliates means entities that control, are controlled by, or are under common control with the acting entity under this License, ‘control’ means direct or indirect ownership of at least fifty percent (50%) of the voting power, capital or other securities of controlled or commonly controlled entity. 1. Grant of Copyright License Subject to the terms and conditions of this License, each Contributor hereby grants to you a perpetual, worldwide, royalty-free, non-exclusive, irrevocable copyright license to reproduce, use, modify, or distribute its Contribution, with modification or not. 2. Grant of Patent License Subject to the terms and conditions of this License, each Contributor hereby grants to you a perpetual, worldwide, royalty-free, non-exclusive, irrevocable (except for revocation under this Section) patent license to make, have made, use, offer for sale, sell, import or otherwise transfer its Contribution, where such patent license is only limited to the patent claims owned or controlled by such Contributor now or in future which will be necessarily infringed by its Contribution alone, or by combination of the Contribution with the Software to which the Contribution was contributed. The patent license shall not apply to any modification of the Contribution, and any other combination which includes the Contribution. If you or your Affiliates directly or indirectly institute patent litigation (including a cross claim or counterclaim in a litigation) or other patent enforcement activities against any individual or entity by alleging that the Software or any Contribution in it infringes patents, then any patent license granted to you under this License for the Software shall terminate as of the date such litigation or activity is filed or taken. 3. No Trademark License No trademark license is granted to use the trade names, trademarks, service marks, or product names of Contributor, except as required to fulfill notice requirements in Section 4. 4. Distribution Restriction You may distribute the Software in any medium with or without modification, whether in source or executable forms, provided that you provide recipients with a copy of this License and retain copyright, patent, trademark and disclaimer statements in the Software. 5. Disclaimer of Warranty and Limitation of Liability THE SOFTWARE AND CONTRIBUTION IN IT ARE PROVIDED WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED. IN NO EVENT SHALL ANY CONTRIBUTOR OR COPYRIGHT HOLDER BE LIABLE TO YOU FOR ANY DAMAGES, INCLUDING, BUT NOT LIMITED TO ANY DIRECT, OR INDIRECT, SPECIAL OR CONSEQUENTIAL DAMAGES ARISING FROM YOUR USE OR INABILITY TO USE THE SOFTWARE OR THE CONTRIBUTION IN IT, NO MATTER HOW IT’S CAUSED OR BASED ON WHICH LEGAL THEORY, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGES. 6. Language THIS LICENSE IS WRITTEN IN BOTH CHINESE AND ENGLISH, AND THE CHINESE VERSION AND ENGLISH VERSION SHALL HAVE THE SAME LEGAL EFFECT. IN THE CASE OF DIVERGENCE BETWEEN THE CHINESE AND ENGLISH VERSIONS, THE CHINESE VERSION SHALL PREVAIL. END OF THE TERMS AND CONDITIONS How to Apply the Mulan Permissive Software License,Version 2 (Mulan PSL v2) to Your Software To apply the Mulan PSL v2 to your work, for easy identification by recipients, you are suggested to complete following three steps: i Fill in the blanks in following statement, including insert your software name, the year of the first publication of your software, and your name identified as the copyright owner; ii Create a file named “LICENSE” which contains the whole context of this License in the first directory of your software package; iii Attach the statement to the appropriate annotated syntax at the beginning of each source file. Copyright (c) [Year] [name of copyright holder] [Software Name] is licensed under Mulan PSL v2. You can use this software according to the terms and conditions of the Mulan PSL v2. You may obtain a copy of Mulan PSL v2 at: http://license.coscl.org.cn/MulanPSL2 THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. See the Mulan PSL v2 for more details.

简介

基于org.eclipse.paho.client.mqttv3实现的一个简易的Mqtt连接池 展开 收起
Java
MulanPSL-2.0
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Java
1
https://gitee.com/hermit2020/mqtt_pool.git
git@gitee.com:hermit2020/mqtt_pool.git
hermit2020
mqtt_pool
SimpleMqttPool
master

搜索帮助