同步通道任务完善

This commit is contained in:
lxj
2025-07-31 15:38:04 +08:00
parent 8f71f5f261
commit e016e6d44c
10 changed files with 272 additions and 76 deletions

View File

@@ -5,9 +5,10 @@ import com.aizuda.snailjob.client.job.core.annotation.JobExecutor;
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.alibaba.fastjson2.JSONObject;
import lombok.RequiredArgsConstructor;
import org.apache.dubbo.config.annotation.DubboReference;
import org.dromara.sis.api.RemoteHikDeviceService;
import org.dromara.sis.api.RemoteDeviceService;
import org.dromara.sis.api.RemoteHikSdkService;
import org.dromara.sis.api.domain.RemoteSdkChannel;
import org.dromara.sis.api.domain.RemoteSisDeviceChannel;
@@ -15,8 +16,7 @@ import org.dromara.sis.api.domain.RemoteSisDeviceManage;
import org.dromara.sis.api.enums.DeviceTypeEnum;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
/**
@@ -31,7 +31,7 @@ import java.util.stream.Collectors;
public class HikDeviceCheckStateTask {
@DubboReference
private RemoteHikDeviceService remoteHikDeviceService;
private RemoteDeviceService remoteDeviceService;
@DubboReference
private RemoteHikSdkService remoteHikSdkService;
@@ -44,7 +44,7 @@ public class HikDeviceCheckStateTask {
* @throws InterruptedException
*/
public ExecuteResult jobExecute(JobArgs jobArgs) throws InterruptedException {
List<RemoteSisDeviceManage> device = remoteHikDeviceService.queryHikDevices();
List<RemoteSisDeviceManage> device = remoteDeviceService.queryHikDevices();
if (CollUtil.isEmpty(device)) {
SnailJobLog.REMOTE.info("需要同步的hik设备数量为0任务执行结束");
return ExecuteResult.success();
@@ -76,17 +76,71 @@ public class HikDeviceCheckStateTask {
*/
private void updateDeviceChannelStatus(RemoteSisDeviceManage sisDeviceManage) {
// 查询设备通道信息
List<RemoteSisDeviceChannel> ls = remoteHikDeviceService.queryDeviceChannels(sisDeviceManage.getDeviceIp());
List<RemoteSisDeviceChannel> ls = remoteDeviceService.queryDeviceChannels(sisDeviceManage.getDeviceIp());
SnailJobLog.REMOTE.info("设备[{}]本地通道数量={}", sisDeviceManage.getDeviceIp(), ls.size());
List<RemoteSdkChannel> sdkChannels = remoteHikSdkService.getDeviceChannel(sisDeviceManage.getDeviceIp());
SnailJobLog.REMOTE.info("设备[{}]sdk通道数量={}", sisDeviceManage.getDeviceIp(), sdkChannels.size());
Map<String, RemoteSisDeviceChannel> data1 = ls.stream().collect(Collectors.toMap(RemoteSisDeviceChannel::getDeviceIp, item -> item));
Map<String, RemoteSdkChannel> data2 = sdkChannels.stream().collect(Collectors.toMap(RemoteSdkChannel::getChannelIp, item -> item));
Set<String> keys = new HashSet<>(data1.size() + data2.size());
keys.addAll(data1.keySet());
keys.addAll(data2.keySet());
// 在data1 不在data2 的数据 -- 删除本体通道
List<RemoteSisDeviceChannel> deleteData = new ArrayList<>(data1.size());
// 在data2有 data1中没有的数据 -- 写入本地通道
List<RemoteSdkChannel> insertData = new ArrayList<>(data1.size());
// 在2个集合中都存在的数据 -- 更新本地通道
List<RemoteSisDeviceChannel> updateData = new ArrayList<>(data1.size());
keys.forEach(item -> {
RemoteSisDeviceChannel remoteSisDeviceChannel = data1.get(item);
RemoteSdkChannel remoteSdkChannel = data2.get(item);
if (remoteSisDeviceChannel == null) {
if (remoteSdkChannel != null) {
remoteSdkChannel.setDeviceId(sisDeviceManage.getId());
remoteSdkChannel.setNvrIp(sisDeviceManage.getDeviceIp());
remoteSdkChannel.setNvrPort(sisDeviceManage.getDevicePort());
remoteSdkChannel.setNvrAccount(sisDeviceManage.getDeviceAccount());
remoteSdkChannel.setNvrPwd(sisDeviceManage.getDevicePwd());
remoteSdkChannel.setNvrFactoryNo(sisDeviceManage.getFactoryNo());
remoteSdkChannel.setGroupId(sisDeviceManage.getGroupId());
remoteSdkChannel.setTenantId(sisDeviceManage.getTenantId());
// 写入
insertData.add(remoteSdkChannel);
}
} else {
if (remoteSdkChannel == null) {
// 删除
deleteData.add(remoteSisDeviceChannel);
} else {
// 更新
if (!Objects.equals(remoteSdkChannel.getChannelStatus(), remoteSisDeviceChannel.getChannelState())) {
updateData.add(remoteSisDeviceChannel);
}
}
}
});
// 开始执行更新操作
if (!deleteData.isEmpty()) {
List<String> delList = deleteData.stream().map(RemoteSisDeviceChannel::getDeviceIp).toList();
int num = remoteDeviceService.deleteByChannelIps(delList);
SnailJobLog.REMOTE.info("删除通道完成,删除数量={}", num);
SnailJobLog.REMOTE.info("删除的通道列表={}", JSONObject.toJSONString(delList));
}
// 开始写入操作
if (!insertData.isEmpty()) {
Boolean result = remoteDeviceService.insertChannel(insertData);
SnailJobLog.REMOTE.info("写入通道信息完成result={}", result);
}
// 开始更新操作
if (!updateData.isEmpty()) {
Integer i = remoteDeviceService.updateChannelInfo(updateData);
SnailJobLog.REMOTE.info("更新通道完成,通道数量={},完成数量={}", updateData.size(), i);
}
}
@@ -104,16 +158,17 @@ public class HikDeviceCheckStateTask {
if (item.getDeviceStatus() != onLineState) {
SnailJobLog.REMOTE.info("设备[{}]在线状态变更,开始更新状态。 old={}new ={} ", item.getDeviceIp(), item.getDeviceStatus(), onLineState);
item.setDeviceStatus(onLineState);
Boolean result = remoteHikDeviceService.updateDeviceState(item);
Boolean result = remoteDeviceService.updateDeviceState(item);
SnailJobLog.REMOTE.info("设备[{}]在线状态变更,状态更新完成。 result={} ", item.getDeviceIp(), result);
// 监测当前设备是否存在通道,如果有则跟新通道信息
List<RemoteSisDeviceChannel> ls = remoteHikDeviceService.queryDeviceChannels(item.getDeviceIp());
List<RemoteSisDeviceChannel> ls = remoteDeviceService.queryDeviceChannels(item.getDeviceIp());
if (CollUtil.isNotEmpty(ls)) {
Boolean r1 = remoteHikDeviceService.updateDeviceChannelState(item.getDeviceIp(), onLineState);
Boolean r1 = remoteDeviceService.updateDeviceChannelState(item.getDeviceIp(), onLineState);
SnailJobLog.REMOTE.info("设备通道[{}]在线状态变更,状态更新完成。 result={} ", item.getDeviceIp(), r1);
}
}
return isLogin;
}
}