优化设备批量离线性能

This commit is contained in:
lin
2026-01-24 22:51:04 +08:00
parent 519ccccd7b
commit 588de2d5ec
9 changed files with 93 additions and 24 deletions

View File

@@ -591,6 +591,9 @@ public interface CommonGBChannelMapper {
@SelectProvider(type = ChannelProvider.class, method = "queryOnlineListsByGbDeviceId")
List<CommonGBChannel> queryOnlineListsByGbDeviceId(@Param("deviceId") int deviceId);
@SelectProvider(type = ChannelProvider.class, method = "queryOnlineListsByGbDeviceIds")
List<CommonGBChannel> queryOnlineListsByGbDeviceIds(List<Device> deviceList);
@SelectProvider(type = ChannelProvider.class, method = "queryCommonChannelByDeviceChannel")
CommonGBChannel queryCommonChannelByDeviceChannel(@Param("dataType") Integer dataType, @Param("dataDeviceId") Integer dataDeviceId, @Param("deviceId") String deviceId);

View File

@@ -610,5 +610,11 @@ public interface DeviceChannelMapper {
@Update(value = {"UPDATE wvp_device_channel SET status = 'OFF' WHERE data_type = 1 and data_device_id=#{deviceId}"})
void offlineByDeviceId(@Param("deviceId") int deviceId);
@Update(value = {"<script>" +
"UPDATE wvp_device_channel SET status = 'OFF' WHERE data_type = 1 and data_device_id in " +
" <foreach item='item' index='index' collection='deviceList' open='(' separator=',' close=')'> #{item.id} </foreach>" +
" </script>"})
void offlineByDeviceIds(List<Device> deviceList);
}

View File

@@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.gb28181.dao.provider;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Group;
import com.genersoft.iot.vmp.streamPush.bean.StreamPush;
import com.genersoft.iot.vmp.web.custom.bean.CameraGroup;
@@ -601,6 +602,28 @@ public class ChannelProvider {
return sqlBuild.toString();
}
public String queryOnlineListsByGbDeviceIds(Map<String, Object> params ){
StringBuilder sqlBuild = new StringBuilder();
sqlBuild.append(BASE_SQL_TABLE_NAME);
sqlBuild.append(" where wdc.channel_type = 0 AND coalesce(wdc.gb_status, wdc.status) = 'ON' AND wdc.data_type = 1 ");
List<Device> deviceList = (List<Device>)params.get("deviceList");
if (deviceList != null && !deviceList.isEmpty()) {
sqlBuild.append(" AND data_device_id in (");
boolean first = true;
for (Device device : deviceList) {
if (!first) {
sqlBuild.append(",");
}
sqlBuild.append("'" + device.getId() + "'");
first = false;
}
sqlBuild.append(" )");
}
return sqlBuild.toString();
}
public String queryListForSy(Map<String, Object> params ){
StringBuilder sqlBuild = new StringBuilder();
sqlBuild.append(BASE_SQL_FOR_CAMERA_DEVICE);

View File

@@ -118,9 +118,9 @@ public class EventPublisher {
}
public void deviceOfflineEventPublish(String deviceId) {
public void deviceOfflineEventPublish(Set<String> deviceIds) {
DeviceOfflineEvent event = new DeviceOfflineEvent(this);
event.setDeviceId(deviceId);
event.setDeviceIds(deviceIds);
applicationEventPublisher.publishEvent(event);
}
}

View File

@@ -6,12 +6,13 @@ import lombok.Setter;
import org.springframework.context.ApplicationEvent;
import java.io.Serial;
import java.util.Set;
@Getter
@Setter
public class DeviceOfflineEvent extends ApplicationEvent {
private String deviceId;
private Set<String> deviceIds;
@Serial
private static final long serialVersionUID = 1L;

View File

@@ -278,15 +278,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
@Async
@EventListener
public void onApplicationEvent(DeviceOfflineEvent event) {
log.info("[设备状态] 到期, 编号: {}", event.getDeviceId());
Device device = getDeviceByDeviceId(event.getDeviceId());
Boolean deviceStatus = getDeviceStatus(device);
if (deviceStatus != null && deviceStatus) {
log.info("[设备离线] 主动探测发现设备在线,暂不处理 device{}", event.getDeviceId());
online(device);
return;
}
offline(device);
log.info("[设备状态] 到期, 编号: {}", event.getDeviceIds().toString());
List<Device> deviceList = redisCatchStorage.getDeviceList(event.getDeviceIds());
offline(deviceList);
}
@Override
@@ -392,17 +386,44 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, false);
}
if (isDevice(deviceId)) {
channelOfflineByDevice(device);
channelOfflineByDevice(List.of(device));
}
}
private void channelOfflineByDevice(Device device) {
public void offline(List<Device> deviceList) {
if (deviceList == null || deviceList.isEmpty()) {
log.warn("[设备不存在]");
return;
}
List<Device> realDeviceList = new ArrayList<>();
for (Device device : deviceList) {
log.info("[设备离线] device{} 心跳间隔: {},心跳超时次数: {} 上次心跳时间:{} 上次注册时间: {}", device.getDeviceId(),
device.getHeartBeatInterval(), device.getHeartBeatCount(), device.getKeepaliveTime(), device.getRegisterTime());
device.setOnLine(false);
cleanOfflineDevice(device);
if (isDevice(device.getDeviceId())) {
realDeviceList.add(device);
}
redisCatchStorage.updateDevice(device);
if (userSetting.getDeviceStatusNotify()) {
// 发送 redis 消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, false);
}
}
deviceMapper.offlineByList(deviceList);
if (!realDeviceList.isEmpty()) {
channelOfflineByDevice(realDeviceList);
}
}
private void channelOfflineByDevice(List<Device> deviceList) {
// 进行通道离线
List<CommonGBChannel> channelList = commonGBChannelMapper.queryOnlineListsByGbDeviceId(device.getId());
List<CommonGBChannel> channelList = commonGBChannelMapper.queryOnlineListsByGbDeviceIds(deviceList);
if (channelList.isEmpty()) {
return;
}
deviceChannelMapper.offlineByDeviceId(device.getId());
deviceChannelMapper.offlineByDeviceIds(deviceList);
// 发送通道离线通知
eventPublisher.channelEventPublish(channelList, ChannelEvent.ChannelEventMessageType.OFF);
}

View File

@@ -48,17 +48,15 @@ public class DeviceStatusManager {
if (expiredIds != null && !expiredIds.isEmpty()) {
redisTemplate.opsForZSet().remove(redisKey(), expiredIds.toArray());
// 使用 JDK 21 虚拟线程异步分发事件
for (String deviceId : expiredIds) {
Thread.startVirtualThread(() -> {
// 获取详情后删除缓存
Thread.startVirtualThread(() -> {
// 获取详情后删除缓存
// Device device = redisCatchStorage.getDevice(deviceId);
// redisCatchStorage.removeDevice(deviceId);
// 发送 Spring 异步事件
eventPublisher.deviceOfflineEventPublish(deviceId);
});
}
// 发送 Spring 异步事件
eventPublisher.deviceOfflineEventPublish(expiredIds);
});
}
}

View File

@@ -12,6 +12,7 @@ import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import java.util.List;
import java.util.Map;
import java.util.Set;
public interface IRedisCatchStorage {
@@ -79,6 +80,11 @@ public interface IRedisCatchStorage {
*/
Device getDevice(String deviceId);
/**
* 获取Device
*/
List<Device> getDeviceList(Set<String> deviceIds);
void resetAllCSEQ();
void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo);

View File

@@ -184,6 +184,17 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
return device;
}
@Override
public List<Device> getDeviceList(Set<String> deviceIds) {
String key = VideoManagerConstants.DEVICE_PREFIX;
List<Device> deviceList = new ArrayList<>();
List<Object> objectList = redisTemplate.opsForHash().multiGet(key, Arrays.asList(deviceIds.toArray()));
for (Object object : objectList) {
deviceList.add((Device)object);
}
return deviceList;
}
@Override
public void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo) {
String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetting.getServerId();