diff --git a/inspect-ivs/pom.xml b/inspect-ivs/pom.xml index b016936..be88d47 100644 --- a/inspect-ivs/pom.xml +++ b/inspect-ivs/pom.xml @@ -62,6 +62,16 @@ com.inspect inspect-base-ivs + + + org.springframework.retry + spring-retry + + + + org.springframework.boot + spring-boot-starter-aop + ${project.artifactId} diff --git a/inspect-ivs/src/main/java/com/inspect/ivs/InspectIvsApplication.java b/inspect-ivs/src/main/java/com/inspect/ivs/InspectIvsApplication.java index b23676d..132b2fa 100644 --- a/inspect-ivs/src/main/java/com/inspect/ivs/InspectIvsApplication.java +++ b/inspect-ivs/src/main/java/com/inspect/ivs/InspectIvsApplication.java @@ -8,7 +8,9 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.context.annotation.ComponentScan; +import org.springframework.retry.annotation.EnableRetry; +@EnableRetry @EnableCustomConfig @EnableRyFeignClients @SpringBootApplication( diff --git a/inspect-ivs/src/main/java/com/inspect/ivs/controller/IvsControlController.java b/inspect-ivs/src/main/java/com/inspect/ivs/controller/IvsControlController.java index 1a7df9b..de22e70 100644 --- a/inspect-ivs/src/main/java/com/inspect/ivs/controller/IvsControlController.java +++ b/inspect-ivs/src/main/java/com/inspect/ivs/controller/IvsControlController.java @@ -73,6 +73,7 @@ public class IvsControlController { redisService.setCacheMapValue(IvsConst.IVS_PRESET_MAP, ivsPresetVo.getPreset(), ivsPresetVo.getPreset()); return Response.ok(); } else if (command.equals("goto")) { + log.info("IVS_PRESET GOTO: {}, CameraCode: {}", ivsPresetVo.getCommand(), ivsPresetVo.getCameraCode()); JSONObject paramJson = new JSONObject(); paramJson.put("cameraCode", ivsPresetVo.getCameraCode()); log.debug("============================VERSION {}", version); diff --git a/inspect-ivs/src/main/java/com/inspect/ivs/controller/IvsDeviceController.java b/inspect-ivs/src/main/java/com/inspect/ivs/controller/IvsDeviceController.java index 0948e58..bfa7298 100644 --- a/inspect-ivs/src/main/java/com/inspect/ivs/controller/IvsDeviceController.java +++ b/inspect-ivs/src/main/java/com/inspect/ivs/controller/IvsDeviceController.java @@ -10,6 +10,7 @@ import com.inspect.ivs.constant.IvsConst; import com.inspect.ivs.domain.Temp; import com.inspect.ivs.domain.TempConfiguration; import com.inspect.ivs.service.IvsCommonService; +import com.inspect.ivs.service.PictureDownloadRetryableDelegate; import com.inspect.ivs.util.UriUtils; import com.inspect.ivs.view.IvsPlatformSnapshotView; import com.inspect.ivs.view.IvsSnapshotView; @@ -17,16 +18,10 @@ import com.inspect.ivs.vo.IvsChanSnapVo; import com.inspect.ivs.vo.IvsDevChanListVo; import com.inspect.ivs.vo.IvsDevChanSnapVo; -import java.awt.*; -import java.awt.image.BufferedImage; import java.io.*; import java.util.*; import java.util.List; -import org.apache.http.HttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; @@ -34,7 +29,7 @@ import org.springframework.core.io.InputStreamResource; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; -import javax.imageio.ImageIO; +import javax.annotation.Resource; @RestController @RequestMapping({"/api/v1/device"}) @@ -53,6 +48,9 @@ public class IvsDeviceController { private final IvsCommonService ivsCommonService; + @Resource + private PictureDownloadRetryableDelegate retryDelegate; + public IvsDeviceController(IvsCommonService ivsCommonService) { this.ivsCommonService = ivsCommonService; } @@ -89,7 +87,7 @@ public class IvsDeviceController { ruleItem.put("alarmSettingList", alarmSettingListEx);//写死 radiometryRuleExList.add(ruleItem); // 遍历 FastJSON 的 JSONArray - if(StringUtils.isNotNull(radiometryRuleExListOrginl)){ + if (StringUtils.isNotNull(radiometryRuleExListOrginl)) { for (Object obj : radiometryRuleExListOrginl) { if (obj instanceof JSONObject) { JSONObject jsonObject = (JSONObject) obj; @@ -143,7 +141,7 @@ public class IvsDeviceController { radiometryRuleList.add(radiometryRule); // 遍历 FastJSON 的 JSONArray - if(StringUtils.isNotNull(radiometryRuleListOrginl)){ + if (StringUtils.isNotNull(radiometryRuleListOrginl)) { for (Object obj : radiometryRuleListOrginl) { if (obj instanceof JSONObject) { JSONObject jsonObject = (JSONObject) obj; @@ -188,7 +186,7 @@ public class IvsDeviceController { */ @PostMapping("/temper") public String temper(@RequestBody Temp temp) { - log.info( "[infrared_1800] 获取红外温度 temper param: {}", temp); + log.info("[infrared_1800] 获取红外温度 temper param: {}", temp); String cameraCode = temp.getCameraCode().split("#")[0]; //cameraCode摄像机编码 int presetId = temp.getPresetId(); //预设位ID int ruleId = temp.getRuleId(); //规则ID 暂时写死取12就行 @@ -196,12 +194,12 @@ public class IvsDeviceController { String requestMsg = "presetId=" + presetId + "&" + "ruleId=" + ruleId + "&" + "meterType=" + 3; //发送请求 String resp = ivsCommonService.sendsslGetCookie(address + "/device/radiometry-temper/" + cameraCode, requestMsg, "UTF-8"); - log.info( "获取红外温度: {}", resp); + log.info("获取红外温度: {}", resp); JSONObject jsonObject = JSON.parseObject(resp); String resultCode = jsonObject.getString("resultCode"); String temperMax = ""; if (resultCode.equals("0")) { - JSONObject radiometryInfo = (JSONObject) jsonObject.get("radiometryInfo"); + JSONObject radiometryInfo = (JSONObject) jsonObject.get("radiometryInfo"); temperMax = radiometryInfo.getString("temperMax"); return temperMax; } @@ -217,135 +215,171 @@ public class IvsDeviceController { */ @PostMapping("/getConfig") public JSONObject getConfig(@RequestBody Temp temp) { - log.info( "[infrared_1800] 获取红外温度 temper param: {}", temp); + log.info("[infrared_1800] 获取红外温度 temper param: {}", temp); String cameraCode = temp.getCameraCode().split("#")[0]; String nvrcode = temp.getCameraCode().split("#")[1]; - String requestMsg = ""; + String requestMsg = ""; //发送请求 - String resp = ivsCommonService.sendsslGetCookie(address + "/device/deviceconfig/500/"+cameraCode+"/"+nvrcode, requestMsg,"UTF-8"); + String resp = ivsCommonService.sendsslGetCookie(address + "/device/deviceconfig/500/" + cameraCode + "/" + nvrcode, requestMsg, "UTF-8"); JSONObject jsonObject = JSON.parseObject(resp); JSONObject configItem = jsonObject.getJSONObject("configItem"); - log.info( "configItem: {}", configItem); + log.info("configItem: {}", configItem); return configItem; // JSONArray radiometryRuleExList = configItem.getJSONArray("radiometryRuleExList"); // JSONArray radiometryRuleList = configItem.getJSONArray("radiometryRuleList"); // return "0"; } +// @GetMapping({"channelsnap"}) +// public ResponseEntity channelSnap(IvsDevChanSnapVo ivsDevChanSnapVo) throws Exception { +// if (version.equals("1800")) { +// byte[] bytes = Ivs1800channelSnap(ivsDevChanSnapVo); +//// String filePath = "/home/1.JPG"; // 替换为你想保存图片的本地路径 +//// try (FileOutputStream fos = new FileOutputStream(filePath)) { +//// fos.write(bytes); +//// log.info("图片已成功保存到本地:" + filePath); +//// } catch (IOException e) { +//// e.printStackTrace(); +//// log.info("保存图片时出现错误:" + e.getMessage()); +//// } +//// byte[] bytes = readStream(content); +// InputStream inputStream = new ByteArrayInputStream(bytes); +// return ResponseEntity.ok().body(new InputStreamResource(inputStream)); +// } else { +// int index = 0; +// +// final int TRY_TIMES = 1; +// final long mDelay = 3000; +// while (index < TRY_TIMES) { +// log.info(com.inspect.base.core.constant.Color.MAGENTA + "[IVS] SNAPSHOT: {}, TRY TIMES: {}" + com.inspect.base.core.constant.Color.END, ivsDevChanSnapVo.getCameraCode(), index); +// try { +// IvsSnapshotView ivsSnapshotView = ivsCommonService.get(UriUtils.parse(IvsConst.URI_PLATFORM_SNAPSHOT, ivsDevChanSnapVo), IvsSnapshotView.class); +// IvsChanSnapVo ivsChanSnapVo = new IvsChanSnapVo(ivsDevChanSnapVo.getCameraCode(), +// ivsDevChanSnapVo.getDomainCode(), +// ivsSnapshotView.getTaskID() +// ); +// +// //if ("prod".equals(activeProfile)) { +// if(!testMode) { +// log.info("[IVS] prod env get stream delay 9000ms"); +// Thread.sleep(9000L); +// } else { +// Thread.sleep(2000L); +// } +// +// IvsPlatformSnapshotView snapShotView = ivsCommonService.postJson( +// ivsChanSnapVo, +// IvsConst.URI_SNAPSHOT_LIST, +// IvsPlatformSnapshotView.class); +// if (snapShotView != null && snapShotView.getSnapshotInfoList() != null +// && snapShotView.getSnapshotInfoList().getSnapshotInfos() != null +// && !snapShotView.getSnapshotInfoList().getSnapshotInfos().isEmpty()) { +// String cameraCode = snapShotView.getSnapshotInfoList().getSnapshotInfos().get(0).getCameraCode(); +// String pictureUrl = snapShotView.getSnapshotInfoList().getSnapshotInfos().get(0).getPictureUrl(); +// log.info(com.inspect.base.core.constant.Color.MAGENTA + "[IVS] TRYS: {}, SNAPSHOT cameraCode: {}, pictureUrl: {}" + com.inspect.base.core.constant.Color.END, index, cameraCode, pictureUrl); +// try (CloseableHttpClient httpClient = HttpClients.createDefault()) { +// HttpResponse httpResponse = httpClient.execute(new HttpGet(pictureUrl)); +// InputStream content = httpResponse.getEntity().getContent(); +// byte[] bytes = readStream(content); +// InputStream inputStream = new ByteArrayInputStream(bytes); +// log.info(com.inspect.base.core.constant.Color.MAGENTA + "[IVS] tries: {}, pictureUrl: {}, size: {} END" + com.inspect.base.core.constant.Color.END, index, pictureUrl, bytes.length); +// +// if(bytes.length > 0) { +// return ResponseEntity.ok().body(new InputStreamResource(inputStream)); +// } +// } +// } +// } catch (Exception e) { +// log.warn(com.inspect.base.core.constant.Color.MAGENTA + "[IVS] SNAPSHOT EXCEPTION: {}" + com.inspect.base.core.constant.Color.END, e.getMessage()); +// } +// +// index++; +// Thread.sleep(mDelay); +// } +// +// log.info(com.inspect.base.core.constant.Color.RED + "[IVS] GET PIC FAIL cameraCode: {}, channel: {}, domainCode: {}" + com.inspect.base.core.constant.Color.END, +// ivsDevChanSnapVo.getCameraCode(), +// ivsDevChanSnapVo.getChannel(), +// ivsDevChanSnapVo.getDomainCode() +// ); +// return ResponseEntity.ok().body(new InputStreamResource(generateErrorImage())); +// } +// } + @GetMapping({"channelsnap"}) - public ResponseEntity channelSnap(IvsDevChanSnapVo ivsDevChanSnapVo) throws Exception { + public ResponseEntity channelSnap(IvsDevChanSnapVo ivsDevChanSnapVo) { if (version.equals("1800")) { - byte[] bytes = Ivs1800channelSnap(ivsDevChanSnapVo); -// String filePath = "/home/1.JPG"; // 替换为你想保存图片的本地路径 -// try (FileOutputStream fos = new FileOutputStream(filePath)) { -// fos.write(bytes); -// log.info("图片已成功保存到本地:" + filePath); -// } catch (IOException e) { -// e.printStackTrace(); -// log.info("保存图片时出现错误:" + e.getMessage()); -// } -// byte[] bytes = readStream(content); - InputStream inputStream = new ByteArrayInputStream(bytes); - return ResponseEntity.ok().body(new InputStreamResource(inputStream)); + try { + byte[] bytes = getIvs1800ChannelSnapBuffer(ivsDevChanSnapVo); + InputStream inputStream = new ByteArrayInputStream(bytes); + return ResponseEntity.ok().body(new InputStreamResource(inputStream)); + } catch (Exception e) { + return ResponseEntity.ok().body(new InputStreamResource(retryDelegate.generateErrorImage())); + } } else { - int index = 0; - - final int TRY_TIMES = 1; - final long mDelay = 3000; - while (index < TRY_TIMES) { - log.info(com.inspect.base.core.constant.Color.MAGENTA + "[IVS] SNAPSHOT: {}, TRY TIMES: {}" + com.inspect.base.core.constant.Color.END, ivsDevChanSnapVo.getCameraCode(), index); + log.info(com.inspect.base.core.constant.Color.MAGENTA + "[IVS] SNAPSHOT: {}" + com.inspect.base.core.constant.Color.END, ivsDevChanSnapVo.getCameraCode()); + IvsSnapshotView ivsSnapshotView = ivsCommonService.get(UriUtils.parse(IvsConst.URI_PLATFORM_SNAPSHOT, ivsDevChanSnapVo), IvsSnapshotView.class); + IvsChanSnapVo ivsChanSnapVo = new IvsChanSnapVo(ivsDevChanSnapVo.getCameraCode(), + ivsDevChanSnapVo.getDomainCode(), + ivsSnapshotView.getTaskID()); + + if (!testMode) { + log.info("[IVS] prod env get stream delay 9000ms"); try { - IvsSnapshotView ivsSnapshotView = ivsCommonService.get(UriUtils.parse(IvsConst.URI_PLATFORM_SNAPSHOT, ivsDevChanSnapVo), IvsSnapshotView.class); - IvsChanSnapVo ivsChanSnapVo = new IvsChanSnapVo(ivsDevChanSnapVo.getCameraCode(), - ivsDevChanSnapVo.getDomainCode(), - ivsSnapshotView.getTaskID() - ); - - //if ("prod".equals(activeProfile)) { - if(!testMode) { - log.info("[IVS] prod env get stream delay 9000ms"); - Thread.sleep(9000L); - } else { - Thread.sleep(2000L); - } - - IvsPlatformSnapshotView snapShotView = ivsCommonService.postJson( - ivsChanSnapVo, - IvsConst.URI_SNAPSHOT_LIST, - IvsPlatformSnapshotView.class); - if (snapShotView != null && snapShotView.getSnapshotInfoList() != null - && snapShotView.getSnapshotInfoList().getSnapshotInfos() != null - && !snapShotView.getSnapshotInfoList().getSnapshotInfos().isEmpty()) { - String cameraCode = snapShotView.getSnapshotInfoList().getSnapshotInfos().get(0).getCameraCode(); - String pictureUrl = snapShotView.getSnapshotInfoList().getSnapshotInfos().get(0).getPictureUrl(); - log.info(com.inspect.base.core.constant.Color.MAGENTA + "[IVS] TRYS: {}, SNAPSHOT cameraCode: {}, pictureUrl: {}" + com.inspect.base.core.constant.Color.END, index, cameraCode, pictureUrl); - try (CloseableHttpClient httpClient = HttpClients.createDefault()) { - HttpResponse httpResponse = httpClient.execute(new HttpGet(pictureUrl)); - InputStream content = httpResponse.getEntity().getContent(); - byte[] bytes = readStream(content); - InputStream inputStream = new ByteArrayInputStream(bytes); - log.info(com.inspect.base.core.constant.Color.MAGENTA + "[IVS] tries: {}, pictureUrl: {}, size: {} END" + com.inspect.base.core.constant.Color.END, index, pictureUrl, bytes.length); - - if(bytes.length > 0) { - return ResponseEntity.ok().body(new InputStreamResource(inputStream)); - } - } - } + Thread.sleep(9000L); } catch (Exception e) { - log.warn(com.inspect.base.core.constant.Color.MAGENTA + "[IVS] SNAPSHOT EXCEPTION: {}" + com.inspect.base.core.constant.Color.END, e.getMessage()); + } + } else { + try { + Thread.sleep(2000L); + } catch (Exception e) {} + } - index++; - Thread.sleep(mDelay); + IvsPlatformSnapshotView snapShotView = ivsCommonService.postJson( + ivsChanSnapVo, + IvsConst.URI_SNAPSHOT_LIST, + IvsPlatformSnapshotView.class); + + if (snapShotView == null + || snapShotView.getSnapshotInfoList() == null + || snapShotView.getSnapshotInfoList().getSnapshotInfos() == null + || snapShotView.getSnapshotInfoList().getSnapshotInfos().isEmpty()) { + log.info(com.inspect.base.core.constant.Color.RED + "[IVS] GET PIC FAIL cameraCode: {}, channel: {}, domainCode: {}" + com.inspect.base.core.constant.Color.END, + ivsDevChanSnapVo.getCameraCode(), + ivsDevChanSnapVo.getChannel(), + ivsDevChanSnapVo.getDomainCode() + ); + return ResponseEntity.ok().body(new InputStreamResource(retryDelegate.generateErrorImage())); } - log.info(com.inspect.base.core.constant.Color.RED + "[IVS] GET PIC FAIL cameraCode: {}, channel: {}, domainCode: {}" + com.inspect.base.core.constant.Color.END, - ivsDevChanSnapVo.getCameraCode(), - ivsDevChanSnapVo.getChannel(), - ivsDevChanSnapVo.getDomainCode() - ); - return ResponseEntity.ok().body(new InputStreamResource(generateErrorImage())); + String cameraCode = snapShotView.getSnapshotInfoList().getSnapshotInfos().get(0).getCameraCode(); + String pictureUrl = snapShotView.getSnapshotInfoList().getSnapshotInfos().get(0).getPictureUrl(); + log.info(com.inspect.base.core.constant.Color.MAGENTA + "[IVS] SNAPSHOT cameraCode: {}, pictureUrl: {}" + com.inspect.base.core.constant.Color.END, cameraCode, pictureUrl); + try { + return retryDelegate.downloadPicture(pictureUrl); + } catch (IOException e) { + log.info("所有重试失败2, 生成一个默认图片: {}, msg: {}", pictureUrl, e.getMessage()); + return ResponseEntity.ok().body(new InputStreamResource(retryDelegate.generateErrorImage())); + } } } - private InputStream generateErrorImage() { - int width = 400, height = 200; - BufferedImage bufferedImage = new BufferedImage(width, height, BufferedImage.TYPE_INT_RGB); - Graphics2D g = bufferedImage.createGraphics(); - - // 背景 - g.setColor(Color.WHITE); - g.fillRect(0, 0, width, height); - - // 字体 - g.setColor(Color.RED); - g.setFont(new Font("Arial", Font.BOLD, 20)); - g.drawString("Get Picture Fail", 120, 100); - g.dispose(); - - try { - ByteArrayOutputStream os = new ByteArrayOutputStream(); - ImageIO.write(bufferedImage, "png", os); - return new ByteArrayInputStream(os.toByteArray()); - } catch (IOException e) { - throw new RuntimeException("生成错误图片失败", e); - } - } +// public byte[] readStream(InputStream inStream) throws Exception { +// ByteArrayOutputStream outStream = new ByteArrayOutputStream(); +// byte[] buffer = new byte[1024]; +// int len; +// while ((len = inStream.read(buffer)) != -1) { +// outStream.write(buffer, 0, len); +// } +// outStream.close(); +// inStream.close(); +// return outStream.toByteArray(); +// } - public byte[] readStream(InputStream inStream) throws Exception { - ByteArrayOutputStream outStream = new ByteArrayOutputStream(); - byte[] buffer = new byte[1024]; - int len; - while ((len = inStream.read(buffer)) != -1) { - outStream.write(buffer, 0, len); - } - outStream.close(); - inStream.close(); - return outStream.toByteArray(); - } - public byte[] Ivs1800channelSnap(IvsDevChanSnapVo ivsDevChanSnapVo) throws Exception { + public byte[] getIvs1800ChannelSnapBuffer(IvsDevChanSnapVo ivsDevChanSnapVo) throws Exception { log.info("++++++++++++++++++++++++执行1800channelsnap截图接口++++++++++++++++++++++++++++++"); String url = address + "/snapshot/manualsnapshot"; Map controlMap = new HashMap<>(); @@ -408,7 +442,7 @@ public class IvsDeviceController { @GetMapping({"channellist"}) public SipbDeviceListView channelList(IvsDevChanListVo ivsDevChanListVo) { - if(version.equals("1800")){ + if (version.equals("1800")) { ivsDevChanListVo.setDeviceType(2); } return ivsCommonService.get(UriUtils.parse(IvsConst.URI_DEVICE_LIST, ivsDevChanListVo), SipbDeviceListView.class); @@ -417,7 +451,7 @@ public class IvsDeviceController { @GetMapping({"channellistJson"}) public JSONObject channelListJson(IvsDevChanListVo ivsDevChanListVo) { - if(version.equals("1800")){ + if (version.equals("1800")) { ivsDevChanListVo.setDeviceType(2); } return ivsCommonService.getResultJson(UriUtils.parse(IvsConst.URI_DEVICE_LIST, ivsDevChanListVo)); diff --git a/inspect-ivs/src/main/java/com/inspect/ivs/service/PictureDownloadRetryableDelegate.java b/inspect-ivs/src/main/java/com/inspect/ivs/service/PictureDownloadRetryableDelegate.java new file mode 100644 index 0000000..ca39cc7 --- /dev/null +++ b/inspect-ivs/src/main/java/com/inspect/ivs/service/PictureDownloadRetryableDelegate.java @@ -0,0 +1,94 @@ +package com.inspect.ivs.service; + +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.io.InputStreamResource; +import org.springframework.http.ResponseEntity; +import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Recover; +import org.springframework.retry.annotation.Retryable; +import org.springframework.retry.support.RetrySynchronizationManager; +import org.springframework.stereotype.Component; + +import javax.imageio.ImageIO; +import java.awt.*; +import java.awt.image.BufferedImage; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +@Component +public class PictureDownloadRetryableDelegate { + private static final Logger log = LoggerFactory.getLogger(PictureDownloadRetryableDelegate.class); + + @Retryable( + value = IOException.class, + maxAttempts = 4, + backoff = @Backoff(delay = 2000)) // 每次重试间隔 2 秒 + public ResponseEntity downloadPicture(String pictureUrl) throws IOException { + int retryCount = RetrySynchronizationManager.getContext() != null + ? RetrySynchronizationManager.getContext().getRetryCount() + 1 + : 1; + + log.info("downloadPicture 尝试第 {} 次下载: {}", retryCount, pictureUrl); + try (CloseableHttpClient httpClient = HttpClients.createDefault()) { + HttpResponse httpResponse = httpClient.execute(new HttpGet(pictureUrl)); + InputStream content = httpResponse.getEntity().getContent(); + byte[] bytes = readStream(content); + + if (bytes.length > 0) { + InputStream inputStream = new ByteArrayInputStream(bytes); + return ResponseEntity.ok().body(new InputStreamResource(inputStream)); + } else { + throw new IOException("图片为空: " + pictureUrl); + } + } + } + + @SuppressWarnings("unused") + @Recover + public ResponseEntity recover(IOException e, String pictureUrl) { + log.info("所有重试失败, 生成一个默认图片: {}, msg: {}", pictureUrl, e.getMessage()); + return ResponseEntity.ok().body(new InputStreamResource(generateErrorImage())); + } + + public static byte[] readStream(InputStream inputStream) throws IOException { + try (ByteArrayOutputStream outStream = new ByteArrayOutputStream()) { + byte[] buffer = new byte[4096]; // 4KB 缓冲区 + int len; + while ((len = inputStream.read(buffer)) != -1) { + outStream.write(buffer, 0, len); + } + return outStream.toByteArray(); + } + } + + public InputStream generateErrorImage() { + int width = 400, height = 200; + BufferedImage bufferedImage = new BufferedImage(width, height, BufferedImage.TYPE_INT_RGB); + Graphics2D g = bufferedImage.createGraphics(); + + // 背景 + g.setColor(Color.WHITE); + g.fillRect(0, 0, width, height); + + // 字体 + g.setColor(Color.RED); + g.setFont(new Font("Arial", Font.BOLD, 20)); + g.drawString("Get Picture Fail", 120, 100); + g.dispose(); + + try { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + ImageIO.write(bufferedImage, "png", os); + return new ByteArrayInputStream(os.toByteArray()); + } catch (IOException e) { + throw new RuntimeException("生成错误图片失败", e); + } + } +} diff --git a/inspect-job/pom.xml b/inspect-job/pom.xml index 43774ba..b5a5c54 100644 --- a/inspect-job/pom.xml +++ b/inspect-job/pom.xml @@ -82,6 +82,16 @@ com.inspect inspect-base-websocket + + + org.springframework.retry + spring-retry + + + + org.springframework.boot + spring-boot-starter-aop + diff --git a/inspect-job/src/main/java/com/inspect/job/InspectJobApplication.java b/inspect-job/src/main/java/com/inspect/job/InspectJobApplication.java index 2a6dd02..5365ab8 100644 --- a/inspect-job/src/main/java/com/inspect/job/InspectJobApplication.java +++ b/inspect-job/src/main/java/com/inspect/job/InspectJobApplication.java @@ -8,7 +8,9 @@ import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; +import org.springframework.retry.annotation.EnableRetry; +@EnableRetry @EnableCustomConfig @EnableCustomSwagger2 @EnableRyFeignClients diff --git a/inspect-job/src/main/java/com/inspect/job/task/CameraOperateRetryableDelegate.java b/inspect-job/src/main/java/com/inspect/job/task/CameraOperateRetryableDelegate.java new file mode 100644 index 0000000..8244b7e --- /dev/null +++ b/inspect-job/src/main/java/com/inspect/job/task/CameraOperateRetryableDelegate.java @@ -0,0 +1,46 @@ +package com.inspect.job.task; + +import com.inspect.base.core.utils.HttpClientUtils; +import com.inspect.job.domain.task.PatrolPresetPos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.retry.RetryContext; +import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Retryable; +import org.springframework.retry.support.RetrySynchronizationManager; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +@Component +public class CameraOperateRetryableDelegate { + private static final Logger log = LoggerFactory.getLogger(CameraOperateRetryableDelegate.class); + + private String API_CONTROL_PRESET = "/api/v1/control/preset"; + + @Value("${liveSIPB.url}") + private String liveIVS_URL; + + @Retryable( + value = IOException.class, + maxAttempts = 5, + backoff = @Backoff(delay = 2000, multiplier = 1.0)) + public boolean sendCameraPresetRequest(PatrolPresetPos presetPos) throws IOException { + try { + RetryContext retryContext = RetrySynchronizationManager.getContext(); + int retryCount = retryContext != null ? retryContext.getRetryCount() : 0; + final String url = liveIVS_URL + API_CONTROL_PRESET; + final String param = "serial=" + presetPos.getVideoNvrCode() + "&command=goto&preset=" + presetPos.getPresetPosCode() + "&code=" + presetPos.getChannelCode(); + log.info("sendCameraPresetRequest retryCount: {}, param: {}, request: {}", + retryCount, + presetPos.getPatrolPointId() + "-" + presetPos.getPresetPosName(), + url + "?" + param); + HttpClientUtils.get(url, param); + return true; + } catch (Exception e) { + log.info("error: {}", e.getMessage()); + throw new IOException("设置失败: " + e.getMessage()); + } + } +} diff --git a/inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java b/inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java index f79d234..2146386 100644 --- a/inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java +++ b/inspect-job/src/main/java/com/inspect/job/task/JobMainTask.java @@ -31,11 +31,11 @@ import com.inspect.job.mapper.PatrolDeviceStateMapper; import com.inspect.job.util.HttpClientUtil; import com.inspect.system.base.domain.SysDictData; +import java.io.IOException; import java.net.URLConnection; import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.time.LocalDateTime; -import java.time.LocalTime; import java.time.format.DateTimeFormatter; import java.util.*; import java.util.concurrent.*; @@ -43,7 +43,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Supplier; import feign.RetryableException; import org.slf4j.Logger; @@ -58,6 +57,7 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.PostConstruct; +import javax.annotation.Resource; @Component("jobMainTask") @RestController @@ -116,6 +116,9 @@ public class JobMainTask { @Value("${task.test-mode:false}") private boolean testMode; + @Resource + private CameraOperateRetryableDelegate cameraOperateRetryableDelegate; + private ExecutorService threadPool; final ShaoXinBigModel shaoXinBigModel; @@ -453,6 +456,8 @@ public class JobMainTask { } } + + private PatrolTaskExecRecord prePointExec(PatrolTaskExecRecord taskExecRecord, PatrolTaskInfo patrolTaskInfo, int infoListSize) { String uuid = UUID.randomUUID().toString().trim().replaceAll(StringUtils.DASH, StringUtils.EMPTY); PatrolPresetPos patrolPresetPos = PatrolPresetPos.builder() @@ -472,8 +477,18 @@ public class JobMainTask { String taskPatrolId = taskExecRecord.getTaskPatrolId(); /*摄像头转到预置位*/ - boolean noError = setCameraToPreset(presetPos); -// if ("prod".equals(activeProfile) && noError) { + //boolean noError = setCameraToPreset(presetPos); + + boolean noError = true; + try { + noError = cameraOperateRetryableDelegate.sendCameraPresetRequest(presetPos); + } catch (IOException e) { + log.error("最终重试失败: channelCode: {}, presetPosName: {}, patrolPointId: {}, msg: {}", + presetPos.getChannelCode(), presetPos.getPresetPosName(), presetPos.getPatrolPointId(), e.getMessage()); + noError = false; + } + + if (!testMode && noError) { log.info("setCameraToPreset no error, delay 20 seconds"); myDelay(20000); @@ -482,178 +497,140 @@ public class JobMainTask { myDelay(2000); } -// boolean taskHalted = false; -// String taskPatrolledId = taskPatrolId.split(StringUtils.UNDERLINE)[1] + "_" + taskPatrolId.split(StringUtils.UNDERLINE)[2]; -// PatrolTaskStatus patrolTaskStatus = new PatrolTaskStatus(); -// patrolTaskStatus.setTaskPatrolledId(taskPatrolledId); -// List list = taskExecClient.selectPatrolTaskStatusList(patrolTaskStatus); -// if (!list.isEmpty()) { -// if ("3".equals(list.get(0).getTaskState())) {//暂停 -// while (true) { -// log.info("-----------------------task pause: {}", taskPatrolledId); -// try { -// Thread.sleep(1000); -// } catch (InterruptedException e) { -// } -// -// list = taskExecClient.selectPatrolTaskStatusList(patrolTaskStatus); -// if (!list.isEmpty()) { -// log.info("-----------------------TaskState: {}", list.get(0).getTaskState()); -// if ("4".equals(list.get(0).getTaskState())) {//终止 -// log.info("-----------------------task terminate: {}", taskPatrolledId); -// taskHalted = true; -// break; -// } else if ("2".equals(list.get(0).getTaskState())) {//正在执行 -// log.info("-----------------------task resume: {}", taskPatrolledId); -// break; -// } -// } else { -// log.info("-----------------------selectPatrolTaskStatusList is empty"); -// } -// } -// } else if ("4".equals(list.get(0).getTaskState())) {//终止 -// log.info("-----------------------task terminate: {}", taskPatrolledId); -// taskHalted = true; -// } -// } -// -// if (!taskHalted) - { - StringBuffer fileTypes = new StringBuffer(); - StringBuffer filePaths = new StringBuffer(); - PatrolPresetAction patrolPresetAction = PatrolPresetAction.builder().isEnable("1").presetPosId(presetPos.getPresetPosId()).build(); - List patrolPresetActionList = taskExecClient.selectPatrolPresetActionList(patrolPresetAction); - PatrolPresetAction presetAction = patrolPresetActionList.isEmpty() ? - PatrolPresetAction.builder().presetPosId(presetPos.getPresetPosId()).actionType("1").photoCount(1L).photoGap(1).build() - : patrolPresetActionList.get(0); - - String basePath = (StringUtils.isEmpty(taskExecRecord.getStationCode()) ? stationCode : taskExecRecord.getStationCode()) - + "/" + DateUtils.getYearEven() - + "/" + DateUtils.getMonthEven() - + "/" + DateUtils.getDayEven() - + "/" + taskExecRecord.getTaskCode() - + "/"; - log.info("uuid: {}, basePath: {}", uuid, basePath); - if (PresetAction.PHOTO.getCode().equals(presetAction.getActionType())) { - final String chanType = presetPos.getChannelType(); - fileTypes.append("ir".equals(chanType) ? "1" : "vl".equals(chanType) ? "2" : "").append(","); - boolean bOk = false; - try { - log.info("PHOTO PresetType uuid: {}, chanType: {}, videoNvrCode: {}, channelCode: {}, patrolPointId: {}", uuid, chanType, presetPos.getVideoNvrCode(), presetPos.getChannelCode(), presetPos.getPatrolPointId()); - if ("vl".equals(chanType)) { + StringBuffer fileTypes = new StringBuffer(); + StringBuffer filePaths = new StringBuffer(); + PatrolPresetAction patrolPresetAction = PatrolPresetAction.builder().isEnable("1").presetPosId(presetPos.getPresetPosId()).build(); + List patrolPresetActionList = taskExecClient.selectPatrolPresetActionList(patrolPresetAction); + PatrolPresetAction presetAction = patrolPresetActionList.isEmpty() ? + PatrolPresetAction.builder().presetPosId(presetPos.getPresetPosId()).actionType("1").photoCount(1L).photoGap(1).build() + : patrolPresetActionList.get(0); + + String basePath = (StringUtils.isEmpty(taskExecRecord.getStationCode()) ? stationCode : taskExecRecord.getStationCode()) + + "/" + DateUtils.getYearEven() + + "/" + DateUtils.getMonthEven() + + "/" + DateUtils.getDayEven() + + "/" + taskExecRecord.getTaskCode() + + "/"; + log.info("uuid: {}, basePath: {}", uuid, basePath); + if (PresetAction.PHOTO.getCode().equals(presetAction.getActionType())) { + final String chanType = presetPos.getChannelType(); + fileTypes.append("ir".equals(chanType) ? "1" : "vl".equals(chanType) ? "2" : "").append(","); + boolean bOk = false; + try { + log.info("PHOTO PresetType uuid: {}, chanType: {}, videoNvrCode: {}, channelCode: {}, patrolPointId: {}", uuid, chanType, presetPos.getVideoNvrCode(), presetPos.getChannelCode(), presetPos.getPatrolPointId()); + if ("vl".equals(chanType)) { + String paramUrl = liveIVS_URL + "/api/v1/device/channelsnap?serial=" + presetPos.getVideoNvrCode() + "&realtime=true&code=" + presetPos.getChannelCode(); + String paramFileName = taskPatrolId.split(StringUtils.UNDERLINE)[1] + StringUtils.UNDERLINE + + taskPatrolId.split(StringUtils.UNDERLINE)[2] + StringUtils.UNDERLINE + + presetPos.getPatrolPointId() + StringUtils.UNDERLINE + + patrolTaskInfo.getDevNo() + ".jpg"; + String paramFileDir = basePath + "CCD"; + log.info("paramUrl: {}, paramFileName: {}, paramFileDir: {}", paramUrl, paramFileName, paramFileDir); + FtpResult ftpResult = getFileAndSave(paramUrl, paramFileName, paramFileDir); + if (ftpResult != null && StringUtils.isNotEmpty(ftpResult.getFilepath())) { + log.info("filePath: {}", ftpResult.getFilepath()); + bOk = ftpResult.isOk(); + filePaths.append("/" + ftpResult.getFilepath()); + } + } else if ("ir".equals(chanType)) { + log.info("[JOB] 红外处理流程, chanType: {}, patrolPointId: {}", chanType, presetPos.getPatrolPointId()); + Map algSubtypeIdMap = taskExecClient.getAlgTypeListByPatrolPointId(String.valueOf(presetPos.getPatrolPointId())); + log.info("prePointExec algorithmTypeMap: {}", algSubtypeIdMap); + final String algSubtypeCode = algSubtypeIdMap == null ? StringUtils.EMPTY : + algSubtypeIdMap.getOrDefault(AlgConstants.ALG_SUBTYPE_CODE, StringUtils.EMPTY); + if (algSubtypeCode.contains(AlgConstants.INFRA_1800) + || algSubtypeCode.contains(AlgConstants.INFRA_YU3) + || algSubtypeCode.contains(AlgConstants.INFRA_CAMERA) + || algSubtypeCode.contains(AlgConstants.INFRA_CAMERA_REVERSE)) { + log.info("[infra_1800] 红外处理新流程: prePointExec algSubtypeCode: {}", algSubtypeCode); + String paramFileName; + if (algSubtypeCode.contains(AlgConstants.INFRA_1800)) { + Map threshold = patrolDeviceStateMapper.selectPatrolPresetParam(presetPos.getPatrolPointId()); + final String temperUrl = liveIVS_URL + "/api/v1/device/temper"; + JSONObject temperParam = new JSONObject(); + temperParam.put("ruleId", threshold.get("preset_param_code")); + log.info("[infra_1800] =======ruleId: {}", threshold.get("preset_param_code")); + temperParam.put("presetId", Integer.parseInt(presetPos.getPresetPosCode())); + temperParam.put("cameraCode", presetPos.getChannelCode()); + final String temperUrlRes = HttpClientUtils.sendPostAgain(temperUrl, temperParam.toString()); + log.info("[infra_1800] prePointExec temperUrlRes: {}", temperUrlRes); + paramFileName = taskPatrolId.split(StringUtils.UNDERLINE)[1] + StringUtils.UNDERLINE + + taskPatrolId.split(StringUtils.UNDERLINE)[2] + StringUtils.UNDERLINE + + presetPos.getPatrolPointId() + StringUtils.UNDERLINE + + patrolTaskInfo.getDevNo() + "_" + temperUrlRes + ".jpg"; + } else { + paramFileName = taskPatrolId.split(StringUtils.UNDERLINE)[1] + StringUtils.UNDERLINE + + taskPatrolId.split(StringUtils.UNDERLINE)[2] + StringUtils.UNDERLINE + + presetPos.getPatrolPointId() + StringUtils.UNDERLINE + + patrolTaskInfo.getDevNo() + ".jpg"; + } String paramUrl = liveIVS_URL + "/api/v1/device/channelsnap?serial=" + presetPos.getVideoNvrCode() + "&realtime=true&code=" + presetPos.getChannelCode(); - String paramFileName = taskPatrolId.split(StringUtils.UNDERLINE)[1] + StringUtils.UNDERLINE - + taskPatrolId.split(StringUtils.UNDERLINE)[2] + StringUtils.UNDERLINE - + presetPos.getPatrolPointId() + StringUtils.UNDERLINE - + patrolTaskInfo.getDevNo() + ".jpg"; String paramFileDir = basePath + "CCD"; - log.info("paramUrl: {}, paramFileName: {}, paramFileDir: {}", paramUrl, paramFileName, paramFileDir); - FtpResult ftpResult = saveFile(paramUrl, paramFileName, paramFileDir); + log.info("红外处理新流程: paramUrl: {}, paramFileName: {}, paramFileDir: {}", paramUrl, paramFileName, paramFileDir); + FtpResult ftpResult = getFileAndSave(paramUrl, paramFileName, paramFileDir); if (ftpResult != null && StringUtils.isNotEmpty(ftpResult.getFilepath())) { - log.info("filePath: {}", ftpResult.getFilepath()); + log.info("红外处理新流程: filePath: {}", ftpResult.getFilepath()); bOk = ftpResult.isOk(); filePaths.append("/" + ftpResult.getFilepath()); } - } else if ("ir".equals(chanType)) { - log.info("[JOB] 红外处理流程, chanType: {}, patrolPointId: {}", chanType, presetPos.getPatrolPointId()); - Map algSubtypeIdMap = taskExecClient.getAlgTypeListByPatrolPointId(String.valueOf(presetPos.getPatrolPointId())); - log.info("prePointExec algorithmTypeMap: {}", algSubtypeIdMap); - final String algSubtypeCode = algSubtypeIdMap == null ? StringUtils.EMPTY : - algSubtypeIdMap.getOrDefault(AlgConstants.ALG_SUBTYPE_CODE, StringUtils.EMPTY); - if (algSubtypeCode.contains(AlgConstants.INFRA_1800) - || algSubtypeCode.contains(AlgConstants.INFRA_YU3) - || algSubtypeCode.contains(AlgConstants.INFRA_CAMERA) - || algSubtypeCode.contains(AlgConstants.INFRA_CAMERA_REVERSE)) { - log.info("[infra_1800] 红外处理新流程: prePointExec algSubtypeCode: {}", algSubtypeCode); - String paramFileName; - if (algSubtypeCode.contains(AlgConstants.INFRA_1800)) { - Map threshold = patrolDeviceStateMapper.selectPatrolPresetParam(presetPos.getPatrolPointId()); - final String temperUrl = liveIVS_URL + "/api/v1/device/temper"; - JSONObject temperParam = new JSONObject(); - temperParam.put("ruleId", threshold.get("preset_param_code")); - log.info("[infra_1800] =======ruleId: {}", threshold.get("preset_param_code")); - temperParam.put("presetId", Integer.parseInt(presetPos.getPresetPosCode())); - temperParam.put("cameraCode", presetPos.getChannelCode()); - final String temperUrlRes = HttpClientUtils.sendPostAgain(temperUrl, temperParam.toString()); - log.info("[infra_1800] prePointExec temperUrlRes: {}", temperUrlRes); - paramFileName = taskPatrolId.split(StringUtils.UNDERLINE)[1] + StringUtils.UNDERLINE - + taskPatrolId.split(StringUtils.UNDERLINE)[2] + StringUtils.UNDERLINE - + presetPos.getPatrolPointId() + StringUtils.UNDERLINE - + patrolTaskInfo.getDevNo() + "_" + temperUrlRes + ".jpg"; - } else { - paramFileName = taskPatrolId.split(StringUtils.UNDERLINE)[1] + StringUtils.UNDERLINE - + taskPatrolId.split(StringUtils.UNDERLINE)[2] + StringUtils.UNDERLINE - + presetPos.getPatrolPointId() + StringUtils.UNDERLINE - + patrolTaskInfo.getDevNo() + ".jpg"; - } - String paramUrl = liveIVS_URL + "/api/v1/device/channelsnap?serial=" + presetPos.getVideoNvrCode() + "&realtime=true&code=" + presetPos.getChannelCode(); - String paramFileDir = basePath + "CCD"; - log.info("红外处理新流程: paramUrl: {}, paramFileName: {}, paramFileDir: {}", paramUrl, paramFileName, paramFileDir); - FtpResult ftpResult = saveFile(paramUrl, paramFileName, paramFileDir); - if (ftpResult != null && StringUtils.isNotEmpty(ftpResult.getFilepath())) { - log.info("红外处理新流程: filePath: {}", ftpResult.getFilepath()); - bOk = ftpResult.isOk(); - filePaths.append("/" + ftpResult.getFilepath()); - } - } else { // 古老的红外处理流程,保留勿删除 - InfraredModel infraredModel = new InfraredModel(); - infraredModel.setChannelSerial(presetPos.getChannelCode()); - infraredModel.setPresetId(Integer.parseInt(presetPos.getPresetPosCode())); - EqpBook eqpbook = new EqpBook(); - eqpbook.setPatrolDeviceCode(presetPos.getPatrolDeviceCode()); - List eqpBookListList = taskExecClient.selectBasedataEqpBookList(eqpbook); - if (!eqpBookListList.isEmpty()) { - Response ir_gen = remoteInvokeService.invoke("IR_GEN", presetPos.getPatrolDeviceCode().split("#")[0], new HashMap<>()); - log.info("ir_gen: {}", JSONObject.toJSONString(ir_gen)); - Object data = ir_gen.getData(); - String path = JSONObject.parseObject(JSONObject.toJSONString(data)).getString("filePath"); - path = path.split("33333")[1]; - filePaths.append(path).append(","); - log.info("INFRA:{}", taskExecRecord.getTaskName() + " path : " + path); - } + } else { // 古老的红外处理流程,保留勿删除 + InfraredModel infraredModel = new InfraredModel(); + infraredModel.setChannelSerial(presetPos.getChannelCode()); + infraredModel.setPresetId(Integer.parseInt(presetPos.getPresetPosCode())); + EqpBook eqpbook = new EqpBook(); + eqpbook.setPatrolDeviceCode(presetPos.getPatrolDeviceCode()); + List eqpBookListList = taskExecClient.selectBasedataEqpBookList(eqpbook); + if (!eqpBookListList.isEmpty()) { + Response ir_gen = remoteInvokeService.invoke("IR_GEN", presetPos.getPatrolDeviceCode().split("#")[0], new HashMap<>()); + log.info("ir_gen: {}", JSONObject.toJSONString(ir_gen)); + Object data = ir_gen.getData(); + String path = JSONObject.parseObject(JSONObject.toJSONString(data)).getString("filePath"); + path = path.split("33333")[1]; + filePaths.append(path).append(","); + log.info("INFRA:{}", taskExecRecord.getTaskName() + " path : " + path); } } - } catch (Exception e) { - log.error("error", e); } + } catch (Exception e) { + log.error("error", e); + } - recordPersist(bOk, taskExecRecord, infoListSize, patrolTaskInfo, presetPos, fileTypes, filePaths); - } else if (PresetAction.VIDEO.getCode().equals(presetAction.getActionType())) { - log.info("VIDEO PresetType videoNvrCode: {}, channelCode: {}", presetPos.getVideoNvrCode(), presetPos.getChannelCode()); - fileTypes.append(4).append(","); - String jsonStr = HttpClientUtil.getHttpResultJson(liveIVS_URL + "/api/v1/stream/start?serial=" + presetPos.getVideoNvrCode() + "&code=" + presetPos.getChannelCode()); - log.info("先调用直播开始,获取流对象:/api/v1/stream/start?serial= :{} ", presetPos.getVideoNvrCode() + "&code=" + presetPos.getChannelCode()); - log.info("/api/v1/stream/start 返回 json 字符串{} ", jsonStr); - JSONObject UserInfoReturn = JSON.parseObject(jsonStr); - Stream stream = JSON.toJavaObject(UserInfoReturn, Stream.class); - try { - String rel = HttpClientUtils.get(liveIVS_URL + "/api/v1/record/start", "streamid=" + stream.getStreamID() + "&code=" + presetPos.getChannelCode()); - log.info("视频录像开始:/api/v1/record/start streamid={} ", stream.getStreamID() + ",code=" + presetPos.getChannelCode() + ", 调用 /api/v1/record/start 返回值=" + rel); - if (rel.isEmpty()) { - rel = HttpClientUtils.get(liveIVS_URL + "/api/v1/record/start", "streamid=" + stream.getStreamID() + "&code=" + presetPos.getChannelCode()); - log.info("一次调用失败,第二次调用 视频录像开始:/api/v1/record/start streamid={} ", stream.getStreamID() + ",code=" + presetPos.getChannelCode() + ", 调用 /api/v1/record/start 返回值=" + rel); - } - } catch (Exception e) { - log.error("error", e); + recordPersist(bOk, taskExecRecord, infoListSize, patrolTaskInfo, presetPos, fileTypes, filePaths); + } else if (PresetAction.VIDEO.getCode().equals(presetAction.getActionType())) { + log.info("VIDEO PresetType videoNvrCode: {}, channelCode: {}", presetPos.getVideoNvrCode(), presetPos.getChannelCode()); + fileTypes.append(4).append(","); + String jsonStr = HttpClientUtil.getHttpResultJson(liveIVS_URL + "/api/v1/stream/start?serial=" + presetPos.getVideoNvrCode() + "&code=" + presetPos.getChannelCode()); + log.info("先调用直播开始,获取流对象:/api/v1/stream/start?serial= :{} ", presetPos.getVideoNvrCode() + "&code=" + presetPos.getChannelCode()); + log.info("/api/v1/stream/start 返回 json 字符串{} ", jsonStr); + JSONObject UserInfoReturn = JSON.parseObject(jsonStr); + Stream stream = JSON.toJavaObject(UserInfoReturn, Stream.class); + try { + String rel = HttpClientUtils.get(liveIVS_URL + "/api/v1/record/start", "streamid=" + stream.getStreamID() + "&code=" + presetPos.getChannelCode()); + log.info("视频录像开始:/api/v1/record/start streamid={} ", stream.getStreamID() + ",code=" + presetPos.getChannelCode() + ", 调用 /api/v1/record/start 返回值=" + rel); + if (rel.isEmpty()) { + rel = HttpClientUtils.get(liveIVS_URL + "/api/v1/record/start", "streamid=" + stream.getStreamID() + "&code=" + presetPos.getChannelCode()); + log.info("一次调用失败,第二次调用 视频录像开始:/api/v1/record/start streamid={} ", stream.getStreamID() + ",code=" + presetPos.getChannelCode() + ", 调用 /api/v1/record/start 返回值=" + rel); } + } catch (Exception e) { + log.error("error", e); + } - final int videoTime = presetAction.getVideoTime(); - Timer mTimer = new Timer(); - mTimer.schedule(new TimerTask() { - int mCount = 0; - - @Override - public void run() { - mCount++; - PatrolTaskExecRecord tempRecord = taskExecClient.selectPatrolTaskExecRecordByTaskPatrolId(taskPatrolId); - if (mCount >= videoTime + 2 || !isRunningState(tempRecord.getTaskPatrolId())) { - log.info("视频录像结束:查询当前巡检任务执行状态,当状态为暂停或者终止时,退出当前执行方法:TaskPatrolledId :{}", taskPatrolId); - videotapeRecordStop(filePaths, stream, presetPos.getChannelCode(), basePath, presetPos.getPatrolPointId(), presetPos.getVideoNvrCode()); - mTimer.cancel(); - } + final int videoTime = presetAction.getVideoTime(); + Timer mTimer = new Timer(); + mTimer.schedule(new TimerTask() { + int mCount = 0; + + @Override + public void run() { + mCount++; + PatrolTaskExecRecord tempRecord = taskExecClient.selectPatrolTaskExecRecordByTaskPatrolId(taskPatrolId); + if (mCount >= videoTime + 2 || !isRunningState(tempRecord.getTaskPatrolId())) { + log.info("视频录像结束:查询当前巡检任务执行状态,当状态为暂停或者终止时,退出当前执行方法:TaskPatrolledId :{}", taskPatrolId); + videotapeRecordStop(filePaths, stream, presetPos.getChannelCode(), basePath, presetPos.getPatrolPointId(), presetPos.getVideoNvrCode()); + mTimer.cancel(); } - }, 0, 1000L); - } + } + }, 0, 1000L); } return taskExecRecord; @@ -950,7 +927,7 @@ public class JobMainTask { JSONObject jsonObject = JSON.parseObject(jsonStr); RecordData recordData = JSON.toJavaObject(jsonObject, RecordData.class); String path = recordData.getRecordList().get(0).getDownloadURL(); - FtpResult ftpResult = saveFile(path, patrolpointId + "_" + devNo + "_" + path.split("/")[path.split("/").length - 1].split("_")[2] + ".mp4", basePath + "Video"); + FtpResult ftpResult = getFileAndSave(path, patrolpointId + "_" + devNo + "_" + path.split("/")[path.split("/").length - 1].split("_")[2] + ".mp4", basePath + "Video"); filePaths.append(path).append(","); log.info("视频保存路径 path={} ", path); } catch (Exception e) { @@ -1411,7 +1388,7 @@ public class JobMainTask { return resList; } - public FtpResult saveFile(String url, String fileName, String fileTypeDir) { + public FtpResult getFileAndSave(String url, String fileName, String fileTypeDir) { try { log.info("saveFile STREAM url: {}", url); String[] split = url.split(StringUtils.QUESTION); diff --git a/inspect-main/inspect-main-start/src/main/java/com/inspect/InspectStartApplication.java b/inspect-main/inspect-main-start/src/main/java/com/inspect/InspectStartApplication.java index 2a1bd00..911d6df 100644 --- a/inspect-main/inspect-main-start/src/main/java/com/inspect/InspectStartApplication.java +++ b/inspect-main/inspect-main-start/src/main/java/com/inspect/InspectStartApplication.java @@ -8,7 +8,9 @@ import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; +import org.springframework.retry.annotation.EnableRetry; +@EnableRetry @EnableCustomConfig @EnableCustomSwagger2 @EnableRyFeignClients diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/domain/AnalyseResult.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/domain/AnalyseResult.java index fad70f3..95fe93a 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/domain/AnalyseResult.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/domain/AnalyseResult.java @@ -40,6 +40,9 @@ public class AnalyseResult implements Serializable { resultList.add(resItem); AnalyseReqItem analyseReqItem = analyseRequest.getObjectList().get(i); AnalyseResPoint resPoint = analyseResItem.getResults().get(0); + if("-9999".equals(resPoint.getValue())) { + resPoint.setValue(""); + } List results = new ArrayList<>(); resItem.setResults(results); for (String type : analyseReqItem.getTypeList()) { diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/RetryableRequest.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/RetryableRequest.java new file mode 100644 index 0000000..63daf88 --- /dev/null +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/RetryableRequest.java @@ -0,0 +1,6 @@ +package com.inspect.analysis.service; + +public interface RetryableRequest { + int getRetryCount(); + void setRetryCount(int retryCount); +} diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestConsumerManager.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestConsumerManager.java index e77bfb9..3056abc 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestConsumerManager.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestConsumerManager.java @@ -13,7 +13,7 @@ import javax.annotation.PostConstruct; import javax.annotation.Resource; import javax.annotation.PreDestroy; -@Component +//@Component public class AlgorithmRequestConsumerManager { private static final Logger log = LoggerFactory.getLogger(AlgorithmRequestConsumerManager.class); diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestProcessConsumer.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestProcessConsumer.java index aa11c31..80cd031 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestProcessConsumer.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestProcessConsumer.java @@ -14,6 +14,7 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.io.IOException; import java.time.Duration; //@Component @@ -55,7 +56,11 @@ public class AlgorithmRequestProcessConsumer { 1000, // 每条消息间隔 1000ms 处理 500, // 空队列时等待 500ms request -> { - analyseRemoteService.sendRequest(request, request.getTypeList(), request.isFilter()); + try { + analyseRemoteService.sendRequest(request, request.getTypeList(), request.isFilter()); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } } ); diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestRetryableConsumerManager.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestRetryableConsumerManager.java new file mode 100644 index 0000000..3d09ab0 --- /dev/null +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AlgorithmRequestRetryableConsumerManager.java @@ -0,0 +1,100 @@ +package com.inspect.analysis.service.impl; + +import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.inspect.analysis.constant.AnalyseConstants; +import com.inspect.base.core.utils.HttpClientUtils; +import com.inspect.base.redis.service.RedisService; +import com.inspect.partrolresult.domain.AnalyseRequest; +import com.inspect.partrolresult.service.AnalyseRemoteService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.annotation.Resource; +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; + +@Component +public class AlgorithmRequestRetryableConsumerManager { + private static final Logger log = LoggerFactory.getLogger(AlgorithmRequestRetryableConsumerManager.class); + + @Value("${server.port}") + private String port; + + @Value("${task.redis.request.interval-ms:2000}") + private long intervalMs; + + @Value("${task.redis.request.idle-sleep-ms:500}") + private long idleSleepMs; + + private RedisQueueRetryableConsumerAsync consumer; + + @Resource + private RedisService redisService; + + @Resource + private AnalyseRemoteService analyseRemoteService; + + private final ExecutorService executor = Executors.newFixedThreadPool(10); + + @PostConstruct + public void initConsumer() { + consumer = new RedisQueueRetryableConsumerAsync( + redisService.redisTemplate, + AnalyseConstants.ALGORITHM_REQUEST_QUEUE, + AnalyseRequest.class, + intervalMs, + idleSleepMs, + request -> executor.submit(() -> processRequest(request)) // 交给线程池处理 + ); + + consumer.start(); // 应用启动时即开始消费 + } + + private void processRequest(AnalyseRequest request) { + try { + log.info("AlgorithmRequestRetryableConsumerManager Processing queueSize: {}, request: {}", getQueueSize(), request); + analyseRemoteService.sendRequest(request, request.getTypeList(), request.isFilter()); + } catch (IOException e) { + // 可以记录失败次数,避免无限重试 + int retryCount = Optional.of(request.getRetryCount()).orElse(0); + log.info("AlgorithmRequestRetryableConsumerManager Send failed, will retry: {}, retryCount: {}, msg: {}", request, request.getRetryCount(), e.getMessage()); + if (retryCount < 1) { + request.setRetryCount(retryCount + 1); + redisService.redisTemplate.opsForList().rightPush(AnalyseConstants.ALGORITHM_REQUEST_QUEUE, new Gson().toJson(request)); + } else { + log.info("AlgorithmRequestRetryableConsumerManager Max retry reached, dropping request: {}", request); + final String defaultUrl = "http://localhost:" + this.port + AnalyseConstants.ANALYSE_RET_URI; + HttpClientUtils.sendPostAgain(defaultUrl, request.toErrorResultStr()); + } + } + } + + @PreDestroy + public void stopConsumer() { + if (consumer != null) { + consumer.stop(); + } + } + + public long getQueueSize() { + return redisService.redisTemplate.opsForList().size(AnalyseConstants.ALGORITHM_REQUEST_QUEUE); + } + + @Scheduled(fixedDelay = 5000) + public void logThreadPoolStatus() { + ThreadPoolExecutor executor = consumer.getExecutor(); + log.info("AlgorithmRequestRetryableConsumerManager ThreadPool - active: {}, queue: {}, completed: {}", + executor.getActiveCount(), + executor.getQueue().size(), + executor.getCompletedTaskCount()); + } +} + diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisServiceImpl.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisServiceImpl.java index d92de99..8f93977 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisServiceImpl.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/AnalysisServiceImpl.java @@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import java.io.IOException; import java.text.DecimalFormat; import java.util.*; import java.util.stream.Collectors; @@ -272,7 +273,11 @@ public class AnalysisServiceImpl implements IAnalysisService { // final long requestTimeout = 1L; // final String analyzeBigModelRequestIdRedisKey = AnalyseConstants.ANALYSE_AI_REQUEST.concat(requestId); // redisService.setCacheObject(analyzeBigModelRequestIdRedisKey, analyseRequest.clone(), requestTimeout, TimeUnit.DAYS); - analyseRemoteService.sendRequest(analyseRequest, algTypeList, Boolean.FALSE); + try { + analyseRemoteService.sendRequest(analyseRequest, algTypeList, Boolean.FALSE); + } catch (IOException e) { + log.error("大模型算法调用异常: ", e); + } } } diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/RedisQueueRetryableConsumerAsync.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/RedisQueueRetryableConsumerAsync.java new file mode 100644 index 0000000..a416926 --- /dev/null +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/service/impl/RedisQueueRetryableConsumerAsync.java @@ -0,0 +1,122 @@ +package com.inspect.analysis.service.impl; + +import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.inspect.analysis.service.RetryableRequest; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.redis.core.RedisTemplate; + +import java.time.Duration; +import java.util.concurrent.*; +import java.util.function.Consumer; + +public class RedisQueueRetryableConsumerAsync implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(RedisQueueRetryableConsumerAsync.class); + + private final RedisTemplate redisTemplate; + private final Gson gson = new Gson(); + private final String queueKey; + private final Class clazz; + + private final long intervalMs; + private final long idleSleepMs; + + private final Consumer handler; + + private volatile boolean running = false; + + //private final ExecutorService executor = Executors.newFixedThreadPool(10); + + public ThreadPoolExecutor getExecutor() { + return executor; + } + + private final ThreadPoolExecutor executor = new ThreadPoolExecutor( + 4, // core threads + 10, // max threads + 60, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(100), // 队列容量 + new ThreadPoolExecutor.CallerRunsPolicy() // 背压策略:调用者线程执行任务 + ); + + public RedisQueueRetryableConsumerAsync(RedisTemplate redisTemplate, + String queueKey, + Class clazz, + long intervalMs, + long idleSleepMs, + Consumer handler) { + this.redisTemplate = redisTemplate; + this.queueKey = queueKey; + this.clazz = clazz; + this.intervalMs = intervalMs; + this.idleSleepMs = idleSleepMs; + this.handler = handler; + } + + public void start() { + if (!running) { + running = true; + new Thread(this, "RedisQueueRetryableConsumerAsync-" + queueKey).start(); + } + } + + public void stop() { + running = false; + executor.shutdown(); + } + + @Override + public void run() { + while (running) { + try { + String json = redisTemplate.opsForList().leftPop(queueKey, Duration.ofSeconds(1)); + if (StringUtils.isEmpty(json)) { + Thread.sleep(idleSleepMs); + continue; + } + + T obj = gson.fromJson(json, clazz); + + // 判断线程池是否拥堵 + if (executor.getQueue().size() > 80) { + logger.warn("RedisQueueRetryableConsumerAsync Thread pool queue is almost full, pushing back to Redis: {}", queueKey); + // 推回 Redis 保证不丢数据 + redisTemplate.opsForList().rightPush(queueKey, json); + Thread.sleep(1000); + continue; + } + + executor.submit(() -> { + try { + handler.accept(obj); + } catch (Exception e) { + logger.warn("RedisQueueRetryableConsumerAsync handler failed for obj={}, retrying...", json, e); + + // 反序列化带 retryCount 的对象 + if (obj instanceof RetryableRequest) { + RetryableRequest rr = (RetryableRequest) obj; + int retry = rr.getRetryCount(); + if (retry < 3) { + rr.setRetryCount(retry + 1); + redisTemplate.opsForList().rightPush(queueKey, gson.toJson(rr)); + } else { + logger.error("RedisQueueRetryableConsumerAsync max retries reached for: {}", json); + } + } else { + redisTemplate.opsForList().rightPush(queueKey, json); // 简单重入队 + } + } + }); + + Thread.sleep(intervalMs); + + } catch (Exception e) { + logger.error("RedisQueueRetryableConsumerAsync error", e); + } + } + + logger.info("RedisQueueRetryableConsumerAsync for [{}] stopped.", queueKey); + } +} + diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/utils/SimpleHttpClient.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/utils/SimpleHttpClient.java new file mode 100644 index 0000000..36d9eea --- /dev/null +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/analysis/utils/SimpleHttpClient.java @@ -0,0 +1,113 @@ +package com.inspect.analysis.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +public class SimpleHttpClient { + private static final Logger log = LoggerFactory.getLogger(SimpleHttpClient.class); + + private static final int DEFAULT_TIMEOUT_MS = 30000; + + public static String doPostJson(String url, String jsonBody) throws IOException { + return doPostJson(url, jsonBody, null, DEFAULT_TIMEOUT_MS); + } + + public static String doPostJson(String url, String jsonBody, Map headers, int timeoutMs) throws IOException { + HttpURLConnection conn = null; + BufferedReader reader = null; + try { + URL realUrl = new URL(url); + conn = (HttpURLConnection) realUrl.openConnection(); + conn.setRequestMethod("POST"); + conn.setConnectTimeout(timeoutMs); + conn.setReadTimeout(timeoutMs); + conn.setDoOutput(true); + conn.setDoInput(true); + conn.setRequestProperty("Content-Type", "application/json;charset=UTF-8"); + + if (headers != null) { + for (Map.Entry entry : headers.entrySet()) { + conn.setRequestProperty(entry.getKey(), entry.getValue()); + } + } + + try (OutputStream os = conn.getOutputStream()) { + os.write(jsonBody.getBytes(StandardCharsets.UTF_8)); + os.flush(); + } + + int responseCode = conn.getResponseCode(); + InputStream is = (responseCode >= 400) ? conn.getErrorStream() : conn.getInputStream(); + reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8)); + + StringBuilder result = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + result.append(line); + } + + if (responseCode >= 400) { + throw new IOException("HTTP POST failed with status " + responseCode + ": " + result); + } + + return result.toString(); + + } finally { + if (reader != null) try { reader.close(); } catch (IOException ignore) {} + if (conn != null) conn.disconnect(); + } + } + + public static String doGet(String url) throws IOException { + return doGet(url, null, DEFAULT_TIMEOUT_MS); + } + + public static String doGet(String url, Map headers, int timeoutMs) throws IOException { + HttpURLConnection conn = null; + BufferedReader reader = null; + try { + URL realUrl = new URL(url); + conn = (HttpURLConnection) realUrl.openConnection(); + conn.setRequestMethod("GET"); + conn.setConnectTimeout(timeoutMs); + conn.setReadTimeout(timeoutMs); + conn.setRequestProperty("Accept", "application/json"); + + if (headers != null) { + for (Map.Entry entry : headers.entrySet()) { + conn.setRequestProperty(entry.getKey(), entry.getValue()); + } + } + + int responseCode = conn.getResponseCode(); + InputStream is = (responseCode >= 400) ? conn.getErrorStream() : conn.getInputStream(); + reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8)); + + StringBuilder result = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + result.append(line); + } + + if (responseCode >= 400) { + log.error("HTTP GET Failed: {}", responseCode + " - " + result); + } + + return result.toString(); + + } catch (IOException ex) { + log.error("HTTP GET 请求异常: {}", ex.getClass().getSimpleName() + " - " + ex.getMessage()); + throw ex; + } finally { + if (reader != null) try { reader.close(); } catch (IOException ignore) {} + if (conn != null) conn.disconnect(); + } + } +} + diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/controller/PatrolResultController.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/controller/PatrolResultController.java index 923768d..6069fed 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/controller/PatrolResultController.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/controller/PatrolResultController.java @@ -1348,7 +1348,7 @@ public class PatrolResultController extends BaseController { List resultList = new ArrayList<>(); for (PatrolResult patrolResult : patrolResultList) { - logger.info(Color.GREEN + "PatrolResult: {}" + Color.END, patrolResult); + logger.info(Color.CYAN + "PatrolResult: {}" + Color.END, patrolResult); datetype = ""; String str; if (StringUtils.isNotEmpty(patrolResult.getPatrolDeviceCode())) { diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/AnalyseRequest.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/AnalyseRequest.java index 1db09cc..a5ea3d9 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/AnalyseRequest.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/domain/AnalyseRequest.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject; import com.inspect.analysis.domain.AnalyseResItem; import com.inspect.analysis.domain.AnalyseResPoint; import com.inspect.analysis.domain.AnalyseResult; +import com.inspect.analysis.service.RetryableRequest; import lombok.Getter; import lombok.Setter; @@ -14,7 +15,7 @@ import java.util.Objects; @Setter @Getter -public class AnalyseRequest implements Serializable { +public class AnalyseRequest implements RetryableRequest, Serializable { private List objectList; private String requestHostIp; private String requestHostPort; @@ -29,6 +30,18 @@ public class AnalyseRequest implements Serializable { private String[] typeList; private String algorithmType; private int totalNumber; + private int retryCount; + private String requestUrl; + + @Override + public int getRetryCount() { + return retryCount; + } + + @Override + public void setRetryCount(int retryCount) { + this.retryCount = retryCount; + } public AnalyseRequest clone() { return JSONObject.parseObject(JSONObject.toJSONString(this), AnalyseRequest.class); diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRemoteService.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRemoteService.java index 73a98e5..58c8f8a 100644 --- a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRemoteService.java +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRemoteService.java @@ -1,17 +1,14 @@ package com.inspect.partrolresult.service; -import com.alibaba.fastjson.JSONObject; import com.github.pagehelper.util.StringUtil; import com.inspect.analysis.constant.AnalyseConstants; -import com.inspect.analysis.domain.AnalyseLog; +import java.io.IOException; import java.util.Collections; import java.util.UUID; import java.util.concurrent.TimeUnit; -import com.inspect.base.core.constant.Color; import com.inspect.base.core.constant.RedisConst; -import com.inspect.base.core.utils.HttpClientUtils; import com.inspect.base.core.utils.StringUtils; import com.inspect.base.redis.service.RedisService; import com.inspect.partrolresult.domain.AnalyseReqItem; @@ -35,13 +32,16 @@ public class AnalyseRemoteService { @Resource private IPatrolTaskService patrolTaskService; + @Resource + private AnalyseRequestRetryableDelegate retryDelegate; + //qinyl - public void sendRequest(AnalyseRequest analyseReq, String[] typeList, boolean isFilter) { + public void sendRequest(AnalyseRequest analyseReq, String[] typeList, boolean isFilter) throws IOException { final long requestTimeout = 1L; String requestId = UUID.randomUUID().toString().trim().replaceAll(StringUtils.DASH, StringUtils.EMPTY); String taskPatrolId = analyseReq.getTaskPatrolId(); redisService.setCacheObject(RedisConst.REQUEST_UUID + requestId, taskPatrolId, requestTimeout, TimeUnit.DAYS); - log.info("CALL_REMOTE_ANALYZE isFilter: {}, requestId: {}, taskPatrolId: {}", isFilter, requestId, taskPatrolId); + //log.info("CALL_REMOTE_ANALYZE isFilter: {}, requestId: {}, taskPatrolId: {}", isFilter, requestId, taskPatrolId); analyseReq.setRequestId(requestId); String taskSetKey = AnalyseConstants.ANALYSE_TASK_REQUEST + taskPatrolId; @@ -79,22 +79,19 @@ public class AnalyseRemoteService { requestUrl = "http://pleaseSetRemoteUrl:8080"; } - //this.logService.log(new AnalyseLog(analyseReq.toString(), "0", taskPatrolId, filter, requestId)); + analyseReq.setFilter(isFilter); + analyseReq.setRequestUrl(requestUrl.concat(AnalyseConstants.ANALYSE_URI)); try { - log.info("CALL_REMOTE_ANALYZE URL: {}, PARAMS: {}", requestUrl, analyseReq); - String result = HttpClientUtils.sendPostAgain(requestUrl.concat(AnalyseConstants.ANALYSE_URI), analyseReq.toString()); - log.info("CALL_REMOTE_ANALYZE RESULT: {}", result); -// log.info("[CALL REMOTE ANALYZE] <=========== CODE={} \n PARAMS={}", JSONObject.parseObject(result).getString(AnalyseConstants.ANALYSE_CODE), result); - if (!"200".equals(JSONObject.parseObject(result).getString(AnalyseConstants.ANALYSE_CODE))) { - log.info("CALL_REMOTE_ANALYZE FAIL: {}", JSONObject.parseObject(result).getString(AnalyseConstants.ANALYSE_CODE)); - } - } catch (Exception e) { - log.info("CALL_REMOTE_ANALYZE EXCEPTION: URL: {}, PARAMS: {}", requestUrl, analyseReq.getRequestId()); - log.error("error", e); - HttpClientUtils.sendPostAgain("http://localhost:" + this.port + AnalyseConstants.ANALYSE_RET_URI, analyseReq.toErrorResultStr()); + retryDelegate.doCallRemoteAnalyseService(analyseReq); + } catch (IOException e) { + log.info("FINALLY FAIL CALL REMOTE ANALYSE SERVICE, requestId: {}, taskPatrolId: {}, error: {}", + analyseReq.getRequestId(), + analyseReq.getTaskPatrolId(), + e.getMessage()); + throw e; } - } + } diff --git a/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestRetryableDelegate.java b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestRetryableDelegate.java new file mode 100644 index 0000000..946c093 --- /dev/null +++ b/inspect-main/inspect-main-task/src/main/java/com/inspect/partrolresult/service/AnalyseRequestRetryableDelegate.java @@ -0,0 +1,36 @@ +package com.inspect.partrolresult.service; + +import com.alibaba.fastjson.JSONObject; +import com.inspect.analysis.constant.AnalyseConstants; +import com.inspect.analysis.utils.SimpleHttpClient; +import com.inspect.partrolresult.domain.AnalyseRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.retry.RetryContext; +import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Retryable; +import org.springframework.retry.support.RetrySynchronizationManager; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +@Component +public class AnalyseRequestRetryableDelegate { + private static final Logger log = LoggerFactory.getLogger(AnalyseRemoteService.class); + + @Retryable( + value = IOException.class, + maxAttempts = 5, + backoff = @Backoff(delay = 2000, multiplier = 1.0)) + public void doCallRemoteAnalyseService(final AnalyseRequest analyseReq) throws IOException { + RetryContext retryContext = RetrySynchronizationManager.getContext(); + int retryCount = retryContext != null ? retryContext.getRetryCount() : 0; + log.info("CALL_REMOTE_ANALYZE retryCount: {}, requestId: {}, isFilter: {}, URL: {}, PARAMS: {}", + retryCount, analyseReq.getRequestId(), analyseReq.isFilter(), analyseReq.getRequestUrl(), analyseReq); + String result = SimpleHttpClient.doPostJson(analyseReq.getRequestUrl(), analyseReq.toString()); + log.info("CALL_REMOTE_ANALYZE RESULT: {}", result); + if (!"200".equals(JSONObject.parseObject(result).getString(AnalyseConstants.ANALYSE_CODE))) { + log.info("CALL_REMOTE_ANALYZE FAIL: {}", JSONObject.parseObject(result).getString(AnalyseConstants.ANALYSE_CODE)); + } + } +} diff --git a/inspect-main/pom.xml b/inspect-main/pom.xml index b6e41be..f20466e 100644 --- a/inspect-main/pom.xml +++ b/inspect-main/pom.xml @@ -51,5 +51,15 @@ mysql mysql-connector-java + + + org.springframework.retry + spring-retry + + + + org.springframework.boot + spring-boot-starter-aop + \ No newline at end of file