将多个响应处理逻辑更新为异步处理,调整订阅添加和移除逻辑。

This commit is contained in:
lin
2026-03-31 10:47:17 +08:00
parent 01c71f8800
commit 85c7ec3d3e
11 changed files with 132 additions and 63 deletions

View File

@@ -454,14 +454,11 @@ public class DeviceQuery {
return deviceService.getRegisterTimeStatistics(deviceId, count);
}
@GetMapping("/subscribe/alarm")
@Operation(summary = "开启/关闭报警订阅")
@Parameter(name = "id", description = "通道的Id", required = true)
@Parameter(name = "cycle", description = "订阅周期", required = true)
@Parameter(name = "interval", description = "报送间隔", required = true)
public void subscribeAlarm(int id, int cycle, int interval) {
deviceService.subscribeMobilePosition(id, cycle, interval);
public void subscribeAlarm(int id, int cycle) {
deviceService.subscribeAlarm(id, cycle);
}
}

View File

@@ -582,9 +582,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
@Override
public boolean removeCatalogSubscribe(@NotNull Device device, CommonCallback<Boolean> callback) {
log.info("[移除目录订阅]: {}", device.getDeviceId());
String key = SubscribeTaskForCatalog.getKey(device);
if (subscribeTaskRunner.containsKey(key)) {
log.info("[移除目录订阅]: {}", device.getDeviceId());
SipTransactionInfo transactionInfo = subscribeTaskRunner.getTransactionInfo(key);
if (transactionInfo == null) {
log.warn("[移除目录订阅] 未找到事务信息,{}", device.getDeviceId());
@@ -646,10 +646,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
@Override
public boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback) {
log.info("[移除移动位置订阅]: {}", device.getDeviceId());
String key = SubscribeTaskForMobilPosition.getKey(device);
if (subscribeTaskRunner.containsKey(key)) {
log.info("[移除移动位置订阅]: {}", device.getDeviceId());
SipTransactionInfo transactionInfo = subscribeTaskRunner.getTransactionInfo(key);
if (transactionInfo == null) {
log.warn("[移除移动位置订阅] 未找到事务信息,{}", device.getDeviceId());
@@ -682,6 +681,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}else {
log.info("[报警订阅续期] 设备 {}", device.getDeviceId());
}
try {
sipCommander.alarmSubscribe(device, transactionInfo, eventResult -> {
ResponseEvent event = (ResponseEvent) eventResult.event;
@@ -711,10 +711,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
@Override
public boolean removeAlarmSubscribe(Device device, CommonCallback<Boolean> callback) {
log.info("[移除报警订阅]: {}", device.getDeviceId());
String key = SubscribeTaskForAlarm.getKey(device);
if (subscribeTaskRunner.containsKey(key)) {
log.info("[移除报警订阅]: {}", device.getDeviceId());
SipTransactionInfo transactionInfo = subscribeTaskRunner.getTransactionInfo(key);
if (transactionInfo == null) {
log.warn("[移除报警订阅] 未找到事务信息,{}", device.getDeviceId());
@@ -995,22 +994,27 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
redisRpcService.subscribeCatalog(id, cycle);
return;
}
// 目录订阅相关的信息
if (device.getSubscribeCycleForCatalog() > 0) {
// 订阅周期不同,则先取消
removeCatalogSubscribe(device, result->{
device.setSubscribeCycleForCatalog(cycle);
updateDevice(device);
if (cycle > 0) {
if (cycle > 0) {
// 目录订阅相关的信息
if (device.getSubscribeCycleForCatalog() > 0) {
// 订阅周期不同,则先取消
removeCatalogSubscribe(device, result->{
device.setSubscribeCycleForCatalog(cycle);
updateDevice(device);
// 开启订阅
addCatalogSubscribe(device, null);
}
});
});
}else {
// 开启订阅
device.setSubscribeCycleForCatalog(cycle);
updateDevice(device);
addCatalogSubscribe(device, null);
}
}else {
// 开启订阅
device.setSubscribeCycleForCatalog(cycle);
// 取消订阅
removeCatalogSubscribe(device, null);
device.setSubscribeCycleForCatalog(0);
updateDevice(device);
addCatalogSubscribe(device, null);
}
}
@@ -1036,25 +1040,30 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
redisRpcService.subscribeMobilePosition(id, cycle, interval);
return;
}
// 目录订阅相关的信息
if (device.getSubscribeCycleForMobilePosition() > 0) {
// 订阅周期已经开启,则先取消
removeMobilePositionSubscribe(device, result->{
// 开启订阅
if (cycle > 0) {
// 目录订阅相关的信息
if (device.getSubscribeCycleForMobilePosition() > 0) {
// 订阅周期已经开启,则先取消
removeMobilePositionSubscribe(device, result->{
// 开启订阅
device.setSubscribeCycleForMobilePosition(cycle);
device.setMobilePositionSubmissionInterval(interval);
updateDevice(device);
addMobilePositionSubscribe(device, null);
});
}else {
// 订阅未开启
device.setSubscribeCycleForMobilePosition(cycle);
device.setMobilePositionSubmissionInterval(interval);
updateDevice(device);
if (cycle > 0) {
addMobilePositionSubscribe(device, null);
}
});
// 开启订阅
addMobilePositionSubscribe(device, null);
}
}else {
// 订阅未开启
device.setSubscribeCycleForMobilePosition(cycle);
device.setMobilePositionSubmissionInterval(interval);
// 取消订阅
removeMobilePositionSubscribe(device, null);
device.setSubscribeCycleForMobilePosition(0);
updateDevice(device);
// 开启订阅
addMobilePositionSubscribe(device, null);
}
}
@@ -1063,29 +1072,34 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
Device device = deviceMapper.query(id);
Assert.notNull(device, "未找到设备");
Assert.isTrue(device.isOnLine(), "设备已离线");
if (device.getSubscribeCycleForCatalog() == cycle) {
if (device.getSubscribeCycleForAlarm() == cycle) {
return;
}
if (!userSetting.getServerId().equals(device.getServerId())) {
redisRpcService.subscribeAlarm(id, cycle);
return;
}
// 目录订阅相关的信息
if (device.getSubscribeCycleForAlarm() > 0) {
// 订阅周期不同,则先取消
removeAlarmSubscribe(device, result->{
device.setSubscribeCycleForCatalog(cycle);
updateDevice(device);
if (cycle > 0) {
if (cycle > 0) {
// 目录订阅相关的信息
if (device.getSubscribeCycleForAlarm() > 0) {
// 订阅周期不同,则先取消
removeAlarmSubscribe(device, result->{
device.setSubscribeCycleForCatalog(cycle);
updateDevice(device);
// 开启订阅
addAlarmSubscribe(device, null);
}
});
});
}else {
// 开启订阅
device.setSubscribeCycleForAlarm(cycle);
updateDevice(device);
addAlarmSubscribe(device, null);
}
}else {
// 开启订阅
device.setSubscribeCycleForAlarm(cycle);
// 取消订阅
removeAlarmSubscribe(device, null);
device.setSubscribeCycleForAlarm(0);
updateDevice(device);
addAlarmSubscribe(device, null);
}
}

View File

@@ -1240,8 +1240,8 @@ public class SIPCommander implements ISIPCommander {
// 退后一个月作为结束时间
String endTime = DateUtil.formatterISO8601.format(nowDateTime.plusMonths(1));
cmdXml.append("<StartAlarmTime>" + startTime + "</StartAlarmTime>\r\n");
cmdXml.append("<EndAlarmTime>" + endTime + "</EndAlarmTime>\r\n");
cmdXml.append("<StartTime>" + startTime + "</StartTime>\r\n");
cmdXml.append("<EndTime>" + endTime + "</EndTime>\r\n");
cmdXml.append("</Query>\r\n");

View File

@@ -42,7 +42,7 @@ public class AlarmResponseMessageHandler extends SIPRequestProcessorParent imple
public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
// 回复200 OK
try {
responseAck((SIPRequest) evt.getRequest(), Response.OK);
responseAckAsync((SIPRequest) evt.getRequest(), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 目录查询回复: {}", e.getMessage());
}

View File

@@ -69,7 +69,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
public void handForDevice(RequestEvent evt, Device device, Element element) {
// 回复200 OK
try {
responseAck((SIPRequest) evt.getRequest(), Response.OK);
responseAckAsync((SIPRequest) evt.getRequest(), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 目录查询回复: {}", e.getMessage());
}

View File

@@ -0,0 +1,60 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.message.Response;
import java.text.ParseException;
@Slf4j
@Component
public class DeviceControlResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler {
private final String cmdType = "DeviceControl";
@Autowired
private ResponseMessageHandler responseMessageHandler;
@Autowired
private IDeviceService deviceService;
@Override
public void afterPropertiesSet() throws Exception {
responseMessageHandler.addHandler(cmdType, this);
}
@Override
public void handForDevice(RequestEvent evt, Device device, Element element) {
log.info("[DeviceControl Response] \n {}", element.asXML());
// 检查设备是否存在, 不存在则不回复
if (device == null) {
return;
}
// 回复200 OK
try {
responseAckAsync((SIPRequest) evt.getRequest(), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 国标级联 设备状态应答回复200OK: {}", e.getMessage());
}
}
@Override
public void handForPlatform(RequestEvent evt, Platform parentPlatform, Element rootElement) {
}
}

View File

@@ -81,7 +81,7 @@ public class DeviceInfoResponseMessageHandler extends SIPRequestProcessorParent
}
try {
// 回复200 OK
responseAck(request, Response.OK);
responseAckAsync(request, Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] DeviceInfo应答消息 200: {}", e.getMessage());
}

View File

@@ -47,7 +47,7 @@ public class DeviceStatusResponseMessageHandler extends SIPRequestProcessorParen
}
// 回复200 OK
try {
responseAck((SIPRequest) evt.getRequest(), Response.OK);
responseAckAsync((SIPRequest) evt.getRequest(), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 国标级联 设备状态应答回复200OK: {}", e.getMessage());
}

View File

@@ -63,7 +63,7 @@ public class MobilePositionResponseMessageHandler extends SIPRequestProcessorPar
if (rootElement == null) {
log.warn("[ 移动设备位置数据查询回复 ] content cannot be null, {}", evt.getRequest());
try {
responseAck(request, Response.BAD_REQUEST);
responseAckAsync(request, Response.BAD_REQUEST);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 移动设备位置数据查询 BAD_REQUEST: {}", e.getMessage());
}
@@ -124,7 +124,7 @@ public class MobilePositionResponseMessageHandler extends SIPRequestProcessorPar
//回复 200 OK
try {
responseAck(request, Response.OK);
responseAckAsync(request, Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 移动设备位置数据查询 200: {}", e.getMessage());
}

View File

@@ -94,14 +94,13 @@ export function subscribeMobilePosition(params) {
})
}
export function subscribeForAlarm(params) {
const { id, cycle, interval } = params
const { id, cycle } = params
return request({
method: 'get',
url: `/api/device/query/subscribe/alarm`,
url: '/api/device/query/subscribe/alarm',
params: {
id: id,
cycle: cycle,
interval: interval
cycle: cycle
}
})
}

View File

@@ -459,8 +459,7 @@ export default {
subscribeForAlarm: function(data, value) {
this.$store.dispatch('device/subscribeForAlarm', {
id: data,
cycle: value ? 60 : 0,
interval: value ? 5 : 0
cycle: value ? 60 : 0
}).then((data) => {
this.$message.success({
showClose: true,