From 1fbd323781dd980ff5e38a17ce4da50ad893c00d Mon Sep 17 00:00:00 2001 From: htjcAdmin Date: Thu, 26 Feb 2026 10:50:06 +0800 Subject: [PATCH] =?UTF-8?q?/*=E5=AE=8C=E6=88=90ffmpeg=E6=8E=A8=E6=B5=81?= =?UTF-8?q?=E5=88=B0zlmedia=E6=B5=81=E5=AA=92=E4=BD=93=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E5=99=A8=E5=8A=9F=E8=83=BD=E3=80=82*/?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sip/media/Gb28181RtpStreamer.java | 83 +++++++++++++++++++ .../sip/media/Gb28181StreamService.java | 74 ++++++++++++++++- .../sip/service/SipClientService.java | 7 ++ 3 files changed, 161 insertions(+), 3 deletions(-) create mode 100644 src/main/java/com/inspect/tcpserver/sip/media/Gb28181RtpStreamer.java diff --git a/src/main/java/com/inspect/tcpserver/sip/media/Gb28181RtpStreamer.java b/src/main/java/com/inspect/tcpserver/sip/media/Gb28181RtpStreamer.java new file mode 100644 index 0000000..6acc8bb --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/media/Gb28181RtpStreamer.java @@ -0,0 +1,83 @@ +package com.inspect.tcpserver.sip.media; + +import lombok.extern.slf4j.Slf4j; +import org.bytedeco.javacv.*; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +public class Gb28181RtpStreamer { + + private volatile boolean running = false; + private Thread worker; + + public void start(String rtspUrl, String zlmIp, int zlmPort) { + + if (running) return; + running = true; + + worker = new Thread(() -> pushLoop(rtspUrl, zlmIp, zlmPort)); + worker.start(); + } + + public void stop() { + running = false; + } + + private void pushLoop(String rtspUrl, String ip, int port) { + + while (running) { + try { + push(rtspUrl, ip, port); + } catch (Exception e) { + log.error("推流断开,5秒重连", e); + sleep(5000); + } + } + } + + private void push(String rtspUrl, String ip, int port) throws Exception { + + String outputUrl = "rtp://" + ip + ":" + port + + "?pkt_size=1400&localrtpport=50000"; + + log.info("开始推流 -> {}", outputUrl); + + // RTSP + FFmpegFrameGrabber grabber = new FFmpegFrameGrabber(rtspUrl); + grabber.setOption("rtsp_transport", "tcp"); + grabber.setOption("stimeout", "5000000"); + grabber.start(); + + // 推RTP(MPEGTS) + FFmpegFrameRecorder recorder = + new FFmpegFrameRecorder(outputUrl, + grabber.getImageWidth(), + grabber.getImageHeight()); + + // 关键配置(等价ffmpeg命令) + recorder.setFormat("rtp_mpegts"); // ⭐核心 + recorder.setVideoCodec(grabber.getVideoCodec()); + recorder.setFrameRate(grabber.getFrameRate()); + recorder.setVideoBitrate(grabber.getVideoBitrate()); + + // AnnexB + SPS PPS + recorder.setVideoOption("bsf:v", "h264_mp4toannexb"); + recorder.setOption("mpegts_flags", "resend_headers"); + + recorder.start(); + + Frame frame; + while (running && (frame = grabber.grab()) != null) { + recorder.record(frame); + } + + recorder.stop(); + grabber.stop(); + } + + private void sleep(long ms) { + try { Thread.sleep(ms); } catch (Exception ignored) {} + } +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/media/Gb28181StreamService.java b/src/main/java/com/inspect/tcpserver/sip/media/Gb28181StreamService.java index 5335d46..1c5e3bd 100644 --- a/src/main/java/com/inspect/tcpserver/sip/media/Gb28181StreamService.java +++ b/src/main/java/com/inspect/tcpserver/sip/media/Gb28181StreamService.java @@ -2,9 +2,7 @@ package com.inspect.tcpserver.sip.media; import lombok.extern.slf4j.Slf4j; import org.bytedeco.ffmpeg.global.avcodec; -import org.bytedeco.javacv.FFmpegFrameGrabber; -import org.bytedeco.javacv.FFmpegFrameRecorder; -import org.bytedeco.javacv.Frame; +import org.bytedeco.javacv.*; import org.springframework.stereotype.Service; @Slf4j @@ -14,9 +12,79 @@ public class Gb28181StreamService { private Thread pushThread; private volatile boolean isPushing = false; + private volatile boolean running = false; + private final String rtspInputUrl = "rtsp://admin:wd19216811@192.168.1.244:554/h264/ch1/sub/av_stream"; private final String rtpOutputUrl= "rtp://192.168.1.116:30000"; + public void startPush() { + if (running) { + log.warn("推流已在运行"); + return; + } + + running = true; + pushThread = new Thread(() -> { + FFmpegFrameGrabber grabber = null; + FFmpegFrameRecorder recorder = null; + + try { + // ================= 第一步:拉取 RTSP 源 ================= + grabber = new FFmpegFrameGrabber("rtsp://admin:wd19216811@192.168.1.244:554/h264/ch1/sub/av_stream"); + grabber.setOption("rtsp_transport", "tcp"); // 强制 TCP 拉流 + grabber.setOption("fflags", "nobuffer"); // 降低延迟 + grabber.setOption("flags", "low_delay"); + grabber.setOption("analyzeduration", "2000000"); // 分析时长 + grabber.setOption("probesize", "2000000"); + grabber.start(); + + // ================= 第二步:配置 RTP_MPEGTS 输出 ================= + String outputUrl = "rtp://192.168.1.116:30000?pkt_size=1400&localrtpport=50000"; + + recorder = new FFmpegFrameRecorder(outputUrl, grabber.getImageWidth(), grabber.getImageHeight(), 0); // 0 = 无音频通道 + recorder.setFormat("rtp_mpegts"); // 核心:rtp_mpegts 封装 + recorder.setVideoCodec(avcodec.AV_CODEC_ID_H264); // H264 + recorder.setVideoCodecName("copy"); // -c:v copy(尽量不重新编码) + recorder.setVideoBitrate(grabber.getVideoBitrate()); // 继承源码率 + recorder.setFrameRate(grabber.getFrameRate()); // 继承帧率 + + // 模拟 -bsf:v h264_mp4toannexb(JavaCV 会自动处理 Annex-B 输出) + recorder.setVideoOption("bsf:v", "h264_mp4toannexb"); + + // 模拟 -mpegts_flags resend_headers(重复发送 TS 头) + //recorder.setFormatOption("mpegts_flags", "resend_headers"); + recorder.setFormat("rtp_mpegts"); + recorder.setVideoCodecName("copy"); // -c:v copy + + + // pkt_size 和 localrtpport 通过输出 URL 参数已设置 + + recorder.start(); + + log.info("GB28181 PS over RTP 推流启动:{} -> {}", grabber.getFormat(), outputUrl); + + Frame frame; + while (running && (frame = grabber.grabFrame()) != null) { + recorder.record(frame); + } + + } catch (FrameGrabber.Exception | FrameRecorder.Exception e) { + log.error("推流异常", e); + } finally { + try { + if (recorder != null) recorder.stop(); + if (grabber != null) grabber.stop(); + } catch (Exception e) { + log.error("关闭资源异常", e); + } + running = false; + log.info("推流线程结束"); + } + }, "rtp-push-thread"); + + pushThread.start(); + } + public void startPushStream() { if (isPushing) { log.warn("Push is already running."); diff --git a/src/main/java/com/inspect/tcpserver/sip/service/SipClientService.java b/src/main/java/com/inspect/tcpserver/sip/service/SipClientService.java index 89e4da3..7930c17 100644 --- a/src/main/java/com/inspect/tcpserver/sip/service/SipClientService.java +++ b/src/main/java/com/inspect/tcpserver/sip/service/SipClientService.java @@ -1,5 +1,6 @@ package com.inspect.tcpserver.sip.service; +import com.inspect.tcpserver.sip.media.Gb28181RtpStreamer; import com.inspect.tcpserver.sip.media.Gb28181StreamService; import com.inspect.tcpserver.sip.media.InviteSdpInfo; import com.inspect.tcpserver.sip.registry.SipEventRegistry; @@ -98,6 +99,9 @@ public class SipClientService implements SipListener { @Autowired Gb28181StreamService streamService; + @Autowired + Gb28181RtpStreamer gb28181RtpStreamer; + private static final String[] IP_SERVICES = { "https://api.ipify.org", "https://checkip.amazonaws.com", @@ -855,6 +859,8 @@ public class SipClientService implements SipListener { } streamService.startPushStream(rtspInputUrl, rtpStreamHost, rtpStreamPort); + //streamService.startPush();//PS OVER RTP + //gb28181RtpStreamer.start(rtspInputUrl, rtpStreamHost, rtpStreamPort); } else { Response notImpl = messageFactory.createResponse(Response.NOT_IMPLEMENTED, request); @@ -902,6 +908,7 @@ public class SipClientService implements SipListener { Response ok = messageFactory.createResponse(Response.OK, request); String localSipId = "29001002120107000001"; + // 这个端口几乎没有任何作用,可以改成任何值,只是为了协议的完整性,因为这里的TCP是设备主动连接平台的,不需要平台通过这个端口反过来连接设备 int localRtpPort = 554; // Contact 必须 Address contactAddress = addressFactory.createAddress(