同步操作将从 Xenos/SimpleMqttPool 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
在《用Spring Boot实现一个简易的MQTT客户端》中介绍了一个自定义的MQTT的客户端的实现,但单一的客户端并不能满足实际的开发需求,这里基于之前实现的自定义客户端实现一个简单的连接池
之前的版本是用一个List
存储所有的连接对象,每次获取时从List
中删除,释放连接时重新加入List
,然后用synchronized
关键字做并发控制。再参考了部分的Druid
连接池的代码后,有了这个版本的设计。
用一个数组
activeConnections
存储所有的连接,定义一个指针,指向当前数组中当前可用连接对象的最后一个下标对池中的连接对象进行操作时,只要维持指针的位置,保证指针左边的所有对象都是可用的(滑动窗口)
从池中取走一个对象时,向左移动指针;放回一个对象时,向右移动指针
之前的设计的连接对象在发送数据后会处于忙碌的状态,只有等待回调方法执行成功后,才会处于空闲可用的状态,所以用了一个数组
busyConnections
存储这些忙碌中的连接对象,等到这些对象空闲后,才放回到可用池中对于需要进行删除的对象同样也用一个数组
discardConnections
进行存储,以上两个数组都只需要知道连接对象在activeConnections
的下标即可,不需要存储一个完整的对象用
juc
包下的类做并发控制,提高并发的效率
/**
* 当前连接总数
*/
private int activeCount;
/**
* 当前可以从池中拿取的连接数
*/
private int poolingCount;
/**
* 存储所有连接的数组
*/
private volatile Connection[] activeConnections;
/**
* 存储连接在 activeConnection 中的下标
*/
private volatile Map<String, Integer> connectionIndexDict;
/**
* 处于忙碌状态的连接,存储 activeConnections 中连接的ID
*/
private volatile String[] busyConnections;
/**
* 处于忙碌状态的连接数
*/
private int busyCount;
/**
* 即将丢弃的连接, 存储 activeConnections 中连接的ID
*/
private volatile String[] discardConnections;
/**
* 将要丢弃的连接数
*/
private int discardingCount;
/**
* 连接池初始化
*/
public void initialize() throws PoolInitializeException {
if (isInitialized) {
return;
}
ReentrantLock lock = this.lock;
lock.lock();
try {
// 一些配置文件属性的判断
// ... ...
// 创建数组和字典
activeConnections = new Connection[maxSize];
connectionIndexDict = new ConcurrentHashMap<>(maxSize);
busyConnections = new String[maxSize];
discardConnections = new String[2 * maxSize];
// 创建最小数量的连接
while (activeCount < minSize) {
Connection connection = createConnection();
if (connection != null) {
activeConnections[activeCount] = connection;
connectionIndexDict.put(connection.getClientId(), activeCount++);
poolingCount++;
}
}
// 创建并启动 连接创建线程
createAndStartCreatorThread();
// 创建并启动 连接销毁线程
createAndStartDestroyThread();
// 创建并启动 连接状态刷新线程
createAndStartRefreshThread();
// 创建并启动 连接状态监视线程
createAndStartWatchThread();
// 等待所有线程启动
initLatch.await();
this.initTime = LocalDateTime.now();
this.isInitialized = true;
} catch (Exception e) {
throw new PoolInitializeException("连接池初始化发生异常", e);
} finally {
lock.unlock();
log.info("连接池初始化已完成,当前可用连接数为 " + poolingCount);
}
}
private Connection getLast() throws InterruptedException {
try {
// 如果当前可以弹出的连接数为 0
while (poolingCount == 0) {
// 唤醒创建线程
empty.signal();
// 当前等待获取连接的线程数 +1
waitThreadCount++;
try {
// 等待 连接创建线程 创建新的连接,获取有连接被释放时唤醒
notEmpty.await();
} finally {
// 执行到这里说明已经有新的连接可以被获取
waitThreadCount--;
}
}
} catch (InterruptedException e) {
notEmpty.signal();
throw e;
}
Connection last = null;
try {
// 判断当前连接是否可用,不可用的连接会被放入待删除的数组中等待删除
while (poolingCount > 0 && !testConnection(poolingCount - 1)) {
poolingCount--;
if (poolingCount == 0) {
empty.signal();
waitThreadCount++;
try {
notEmpty.await();
}
finally {
waitThreadCount--;
}
}
}
} catch (InterruptedException e) {
notEmpty.signal();
throw e;
}
last = activeConnections[--poolingCount];
return last;
}
如果当前连接对象不够,并且连接数未达到最大值,由连接创建线程去创建新的连接,过程如下
将连接放回去时,并不会直接将连接认为是可用的,而是放入到 忙碌连接数组中
public void releaseConnection(Connection connection) {
if (connection == null) {
return;
}
lock.lock();
try {
String clientId = connection.getClientId();
if (!connectionIndexDict.containsKey(clientId)) {
return;
}
// 放入忙碌池中
busyConnections[busyCount++] = clientId;
refreshCondition.signal();
} finally {
lock.unlock();
}
}
然后由忙碌连接处理线程去做连接状态的更新操作,过程如下:
无效连接指那些连接断开的连接或者是已经超时的连接(长时间没有被使用的连接),它由状态监视线程监视产生并放入到待删除连接数组中,删除过程如下:
检查连接是否可用以及监视线程的操作
/**
* 判断一个连接是否可用,如果不可用,则加入到待销毁列表中
* @param connectionIndex 连接的下标
* @return 是否可用
*/
private boolean testConnection(int connectionIndex) {
if (connectionIndex > activeConnections.length) {
return false;
}
Connection connection = activeConnections[connectionIndex];
long lastActiveTimeMillis = connection.lastActiveTimeMillis;
long currMillis = System.currentTimeMillis();
int minSize = properties.getPool().getMinPoolSize();
long maxWaitTime = properties.getPool().getMaxWaitTime();
boolean needDiscard = connection.isInTrouble() ||
(activeCount > minSize && currMillis - lastActiveTimeMillis > maxWaitTime);
if (needDiscard) {
// 加入到销毁列表中
discardConnections[discardingCount++] = connection.getClientId();
// 唤醒销毁线程
discardCondition.signal();
return false;
}
return true;
}
/**
* 创建并启动监视线程,定期扫描可用数组,将数组中超时的、不可用的连接移入到待销毁数组中
*/
private void createAndStartWatchThread() {
initLatch.countDown();
long maxWaitTime = properties.getPool().getMaxWaitTime();
WatcherThreadPool.THREAD_POOL.scheduledExecute(() -> {
lock.lock();
try {
int minPoolSize = properties.getPool().getMinPoolSize();
int index = poolingCount - 1;
while (poolingCount > minPoolSize) {
boolean isUsableConnection = testConnection(index);
if (!isUsableConnection) {
// 一旦检查到失效连接,更新可用连接的边界
swapAndUpdateIndex(index, --poolingCount);
}
index--;
}
} finally {
lock.unlock();
}
}, maxWaitTime, maxWaitTime);
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。