From a2ca791cca358fb8f0f04a6dddc51296a2830cb0 Mon Sep 17 00:00:00 2001 From: htjcAdmin Date: Thu, 12 Feb 2026 16:18:37 +0800 Subject: [PATCH] =?UTF-8?q?/*SIP=E5=8D=8F=E8=AE=AE=EF=BC=9A=E5=9C=A8?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=E7=8E=AF=E5=A2=83=E6=89=93=E9=80=9AINVITE?= =?UTF-8?q?=E8=A7=86=E9=A2=91=E6=B5=81=E6=92=AD=E6=94=BE=E6=B5=81=E7=A8=8B?= =?UTF-8?q?*/?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sip/media/Gb28181StreamService.java | 143 ++++++++++++++++++ .../tcpserver/sip/media/InviteSdpInfo.java | 12 ++ .../inspect/tcpserver/sip/media/PsMuxer.java | 26 ++++ .../tcpserver/sip/media/RtpSender.java | 49 ++++++ .../tcpserver/sip/media/RtspPuller.java | 82 ++++++++++ .../sip/service/SipClientService.java | 132 +++------------- .../tcpserver/sip/utils/Gb28181SdpParser.java | 36 +++++ src/main/resources/application-dev.yml | 2 +- 8 files changed, 374 insertions(+), 108 deletions(-) create mode 100644 src/main/java/com/inspect/tcpserver/sip/media/Gb28181StreamService.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/media/InviteSdpInfo.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/media/PsMuxer.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/media/RtpSender.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/media/RtspPuller.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/utils/Gb28181SdpParser.java diff --git a/src/main/java/com/inspect/tcpserver/sip/media/Gb28181StreamService.java b/src/main/java/com/inspect/tcpserver/sip/media/Gb28181StreamService.java new file mode 100644 index 0000000..5335d46 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/media/Gb28181StreamService.java @@ -0,0 +1,143 @@ +package com.inspect.tcpserver.sip.media; + +import lombok.extern.slf4j.Slf4j; +import org.bytedeco.ffmpeg.global.avcodec; +import org.bytedeco.javacv.FFmpegFrameGrabber; +import org.bytedeco.javacv.FFmpegFrameRecorder; +import org.bytedeco.javacv.Frame; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +public class Gb28181StreamService { + + private Thread pushThread; + private volatile boolean isPushing = false; + + private final String rtspInputUrl = "rtsp://admin:wd19216811@192.168.1.244:554/h264/ch1/sub/av_stream"; + private final String rtpOutputUrl= "rtp://192.168.1.116:30000"; + + public void startPushStream() { + if (isPushing) { + log.warn("Push is already running."); + return; + } + + pushThread = new Thread(() -> { + FFmpegFrameGrabber grabber = null; + FFmpegFrameRecorder recorder = null; + try { + // 初始化 grabber,拉取 RTSP 源 + grabber = new FFmpegFrameGrabber(rtspInputUrl); + grabber.setOption("rtsp_transport", "tcp"); // 使用 TCP 传输 + grabber.start(); + + // 初始化 recorder,输出 RTP over MPEG-TS + recorder = new FFmpegFrameRecorder(rtpOutputUrl, grabber.getImageWidth(), grabber.getImageHeight(), 0); // 0 表示无音频 + recorder.setFormat("rtp_mpegts"); // 输出格式为 rtp_mpegts + + // 配置视频编码器:libx264 + recorder.setVideoCodec(avcodec.AV_CODEC_ID_H264); + recorder.setVideoOption("preset", "ultrafast"); // 超快预设 + recorder.setVideoOption("g", "60"); // GOP 大小 60 + recorder.setVideoOption("keyint_min", "60"); // 最小关键帧间隔 60 + recorder.setVideoOption("sc_threshold", "0"); // 场景变化阈值 0(强制每 GOP 插入 I 帧) + + recorder.start(); + + isPushing = true; + log.info("Started pushing stream from {} to {}", rtspInputUrl, rtpOutputUrl); + + Frame frame; + while (isPushing && (frame = grabber.grabFrame()) != null) { + recorder.record(frame); // 推送帧 + } + + } catch (Exception e) { + log.error("Error during push: {}", e.getMessage(), e); + } finally { + try { + if (recorder != null) recorder.stop(); + if (grabber != null) grabber.stop(); + } catch (Exception e) { + log.error("Error stopping resources: {}", e.getMessage(), e); + } + isPushing = false; + log.info("Stopped pushing stream."); + } + }); + + pushThread.start(); + } + + public void startPushStream(String rtspSourceUrl, String rtpHost, int rtpPort) { + if (isPushing) { + log.warn("Push is already running."); + return; + } + + pushThread = new Thread(() -> { + FFmpegFrameGrabber grabber = null; + FFmpegFrameRecorder recorder = null; + try { + // 初始化 grabber,拉取 RTSP 源 + grabber = new FFmpegFrameGrabber(rtspSourceUrl); + grabber.setOption("rtsp_transport", "tcp"); // 使用 TCP 传输 + grabber.start(); + + // 初始化 recorder,输出 RTP over MPEG-TS + String rtpDestUrl = "rtp://" + rtpHost + ":" + rtpPort; + recorder = new FFmpegFrameRecorder(rtpDestUrl, grabber.getImageWidth(), grabber.getImageHeight(), 0); // 0 表示无音频 + recorder.setFormat("rtp_mpegts"); // 输出格式为 rtp_mpegts + + // 配置视频编码器:libx264 + recorder.setVideoCodec(avcodec.AV_CODEC_ID_H264); + recorder.setVideoOption("preset", "ultrafast"); // 超快预设 + recorder.setVideoOption("g", "60"); // GOP 大小 60 + recorder.setVideoOption("keyint_min", "60"); // 最小关键帧间隔 60 + recorder.setVideoOption("sc_threshold", "0"); // 场景变化阈值 0(强制每 GOP 插入 I 帧) + + recorder.start(); + + isPushing = true; + log.info("Started pushing rtsp stream from {} to {}", rtspSourceUrl, rtpDestUrl); + + Frame frame; + while (isPushing && (frame = grabber.grabFrame()) != null) { + recorder.record(frame); // 推送帧 + } + + } catch (Exception e) { + log.error("Error during push: {}", e.getMessage(), e); + } finally { + try { + if (recorder != null) recorder.stop(); + if (grabber != null) grabber.stop(); + } catch (Exception e) { + log.error("Error stopping resources: {}", e.getMessage(), e); + } + isPushing = false; + log.info("Stopped pushing stream."); + } + }); + + pushThread.start(); + } + + + public void stopPush() { + isPushing = false; + if (pushThread != null) { + try { + pushThread.join(); // 等待线程结束 + } catch (InterruptedException e) { + log.error("Error joining push thread: {}", e.getMessage(), e); + } + } + } + + public boolean isPushing() { + return isPushing; + } +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/media/InviteSdpInfo.java b/src/main/java/com/inspect/tcpserver/sip/media/InviteSdpInfo.java new file mode 100644 index 0000000..870090a --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/media/InviteSdpInfo.java @@ -0,0 +1,12 @@ +package com.inspect.tcpserver.sip.media; + +public class InviteSdpInfo { + public String ip; + public int port; + public long ssrc; + + @Override + public String toString() { + return "RTP推流目标 -> IP=" + ip + ", port=" + port + ", ssrc=" + ssrc; + } +} diff --git a/src/main/java/com/inspect/tcpserver/sip/media/PsMuxer.java b/src/main/java/com/inspect/tcpserver/sip/media/PsMuxer.java new file mode 100644 index 0000000..1e45b93 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/media/PsMuxer.java @@ -0,0 +1,26 @@ +package com.inspect.tcpserver.sip.media; + +import java.io.ByteArrayOutputStream; + +public class PsMuxer { + + public byte[] pack(byte[] h264) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + // PS header (简化版) + out.write(new byte[]{ + 0x00,0x00,0x01,(byte)0xBA, + 0x44,0x00,0x04,0x00,0x04,0x01,(byte)0x89,(byte)0xC3,(byte)0xF8 + }); + + // PES header + out.write(new byte[]{ + 0x00,0x00,0x01,(byte)0xE0, + 0x00,0x00 + }); + + out.write(h264); + return out.toByteArray(); + } +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/media/RtpSender.java b/src/main/java/com/inspect/tcpserver/sip/media/RtpSender.java new file mode 100644 index 0000000..f1b3cea --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/media/RtpSender.java @@ -0,0 +1,49 @@ +package com.inspect.tcpserver.sip.media; + +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; + +public class RtpSender { + + private DatagramSocket socket; + private InetAddress address; + private int port; + private int seq = 0; + private long timestamp = 0; + private int ssrc; + + public RtpSender(String ip, int port, int ssrc) throws Exception { + this.socket = new DatagramSocket(); + this.address = InetAddress.getByName(ip); + this.port = port; + this.ssrc = ssrc; + } + + public void send(byte[] payload, boolean mark) throws Exception { + byte[] rtp = new byte[12 + payload.length]; + + rtp[0] = (byte) 0x80; + rtp[1] = (byte) (96 | (mark ? 0x80 : 0)); + + rtp[2] = (byte) (seq >> 8); + rtp[3] = (byte) (seq); + seq++; + + timestamp += 3600; // 90k时钟 + rtp[4] = (byte) (timestamp >> 24); + rtp[5] = (byte) (timestamp >> 16); + rtp[6] = (byte) (timestamp >> 8); + rtp[7] = (byte) (timestamp); + + rtp[8] = (byte) (ssrc >> 24); + rtp[9] = (byte) (ssrc >> 16); + rtp[10] = (byte) (ssrc >> 8); + rtp[11] = (byte) (ssrc); + + System.arraycopy(payload, 0, rtp, 12, payload.length); + + socket.send(new DatagramPacket(rtp, rtp.length, address, port)); + } +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/media/RtspPuller.java b/src/main/java/com/inspect/tcpserver/sip/media/RtspPuller.java new file mode 100644 index 0000000..c54e367 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/media/RtspPuller.java @@ -0,0 +1,82 @@ +package com.inspect.tcpserver.sip.media; + +import org.bytedeco.ffmpeg.avcodec.AVPacket; +import org.bytedeco.javacv.FFmpegFrameGrabber; + +import java.nio.ByteBuffer; +import java.io.ByteArrayOutputStream; + +public class RtspPuller { + + public interface StreamCallback { + void onH264(byte[] data, boolean keyFrame) throws Exception; + } + + private final String rtspUrl; + + public RtspPuller(String rtspUrl) { + this.rtspUrl = rtspUrl; + } + + public void start(StreamCallback callback) throws Exception { + + FFmpegFrameGrabber grabber = new FFmpegFrameGrabber(rtspUrl); + grabber.setOption("rtsp_transport", "tcp"); + grabber.setOption("stimeout", "5000000"); + grabber.start(); + + System.out.println("RTSP connected: " + rtspUrl); + + AVPacket pkt; + + while (true) { + pkt = grabber.grabPacket(); + + if (pkt == null || pkt.size() <= 0) + continue; + + if (pkt.stream_index() != grabber.getVideoStream()) + continue; + + // ===== 读取裸数据 ===== + ByteBuffer buffer = pkt.data().asByteBuffer(); + buffer.rewind(); + byte[] avcc = new byte[pkt.size()]; + buffer.get(avcc); + + // ===== AVCC → AnnexB 转换 ===== + byte[] annexb = avccToAnnexB(avcc); + + boolean keyFrame = (pkt.flags() & 1) != 0; + + callback.onH264(annexb, keyFrame); + } + } + + /** + * AVCC 转 AnnexB (核心函数) + */ + private byte[] avccToAnnexB(byte[] avcc) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + int offset = 0; + while (offset < avcc.length) { + + int naluSize = + ((avcc[offset] & 0xff) << 24) | + ((avcc[offset+1] & 0xff) << 16) | + ((avcc[offset+2] & 0xff) << 8) | + (avcc[offset+3] & 0xff); + + offset += 4; + + // 写入起始码 00 00 00 01 + out.write(new byte[]{0,0,0,1}); + out.write(avcc, offset, naluSize); + + offset += naluSize; + } + + return out.toByteArray(); + } +} diff --git a/src/main/java/com/inspect/tcpserver/sip/service/SipClientService.java b/src/main/java/com/inspect/tcpserver/sip/service/SipClientService.java index 30a890d..89e4da3 100644 --- a/src/main/java/com/inspect/tcpserver/sip/service/SipClientService.java +++ b/src/main/java/com/inspect/tcpserver/sip/service/SipClientService.java @@ -1,16 +1,16 @@ package com.inspect.tcpserver.sip.service; - -import com.inspect.tcpserver.sip.gb28181.MediaSession; -import com.inspect.tcpserver.sip.gb28181.MediaSessionEx; -import com.inspect.tcpserver.sip.gb28181.RtspClient; +import com.inspect.tcpserver.sip.media.Gb28181StreamService; +import com.inspect.tcpserver.sip.media.InviteSdpInfo; import com.inspect.tcpserver.sip.registry.SipEventRegistry; import com.inspect.tcpserver.sip.utils.DigestUtil; +import com.inspect.tcpserver.sip.utils.Gb28181SdpParser; import com.inspect.tcpserver.sip.utils.SipXmlEnvelope; import com.inspect.tcpserver.sip.utils.SipXmlParser; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; @@ -85,12 +85,19 @@ public class SipClientService implements SipListener { @Value("${sip.expires}") private int expires; + private String rtpStreamHost= "192.168.1.116"; + private int rtpStreamPort= 30000; + private long rtpStreamSSRC = 12345678; + private ClientTransaction lastRegisterTransaction; private Timer refreshTimer = new Timer(true); private String fromTag = Long.toHexString(System.currentTimeMillis()); private AtomicInteger cSeqCounter = new AtomicInteger(1); + @Autowired + Gb28181StreamService streamService; + private static final String[] IP_SERVICES = { "https://api.ipify.org", "https://checkip.amazonaws.com", @@ -755,6 +762,7 @@ public class SipClientService implements SipListener { return Long.toHexString(System.currentTimeMillis() & 0xffffffffL); } + private final String rtspInputUrl = "rtsp://admin:wd19216811@192.168.1.244:554/h264/ch1/sub/av_stream"; @Override public void processRequest(RequestEvent requestEvent) { Request request = requestEvent.getRequest(); @@ -838,11 +846,15 @@ public class SipClientService implements SipListener { handleInvite(requestEvent); } else if(Request.ACK.equals(request.getMethod())) { // RFC 3261: The ACK request does not generate a response. - // ACK 是“事务内请求”,不是普通 SIP 方法,不生成任何 Response,ACK 是“事务内请求”,不是普通 SIP 方法 - - //Thread.sleep(30); + // ACK 是"事务内请求", 不是普通 SIP 方法, 不生成任何 Response log.info("CALL startRtspToRtp"); - //startRtspToRtp(); + byte[] raw = request.getRawContent(); + if (raw != null) { + String inviteAckContent = new String(raw, StandardCharsets.UTF_8); + log.info("INVITE ACK content: \n{}", inviteAckContent); + } + + streamService.startPushStream(rtspInputUrl, rtpStreamHost, rtpStreamPort); } else { Response notImpl = messageFactory.createResponse(Response.NOT_IMPLEMENTED, request); @@ -860,105 +872,6 @@ public class SipClientService implements SipListener { } } - private Process ffmpegProcess; - -// private void startRtspToRtp() { -// try { -// String cmd = -// "ffmpeg -rtsp_transport tcp " + -// "-i \"rtsp://admin:wd19216811@192.168.1.244:554/h264/ch1/sub/av_stream\" " + -// "-an -vcodec copy " + -// "-f rtp -payload_type 96 " + -// "\"rtp://192.168.1.116:50000?tcp\""; -// -// ProcessBuilder pb = new ProcessBuilder( -// "bash", "-c", cmd -// ); -// pb.redirectErrorStream(true); -// -// ffmpegProcess = pb.start(); -// -// new Thread(() -> { -// try (BufferedReader br = -// new BufferedReader(new InputStreamReader( -// ffmpegProcess.getInputStream()))) { -// String line; -// while ((line = br.readLine()) != null) { -// System.out.println("[FFmpeg] " + line); -// } -// } catch (IOException ignored) {} -// }).start(); -// -// } catch (Exception e) { -// e.printStackTrace(); -// } -// -// try { -// MediaSession session = new MediaSession( -// "192.168.1.116", -// 50000, -// 1, -// false, -// "rtsp://admin:wd19216811@192.168.1.244:554/h264/ch1/sub/av_stream" -// ); -// } catch (Exception e) { -// -// } -// -// -// } - -// private void startRtspToRtp() { -// try { -// MediaSession session = new MediaSession( -// "192.168.1.116", -// 50000, -// 1 -// ); -// -// session.startRtsp( -// "rtsp://admin:wd19216811@192.168.1.244:554/h264/ch1/sub/av_stream" -// ); -// -// } catch (Exception e) { -// e.printStackTrace(); -// } -// } - - private void startRtspToRtp() { - try { - MediaSessionEx session = new MediaSessionEx( - "192.168.1.116", - 50000, - 1 - ); - //session.start(); - - RtspClient client = new RtspClient( - "rtsp://admin:wd19216811@192.168.1.244:554/h264/ch1/sub/av_stream" - ); - -// client.start((h264, pts90k) -> { -// try { -// session.onH264Frame(h264, pts90k); -// } catch (Exception e) { -// e.printStackTrace(); -// } -// }); - client.start((h264, pts90k) -> { - try { - session.onH264Frame(h264, pts90k); - } catch (Exception e) { - e.printStackTrace(); - } - }); - - } catch (Exception e) { - e.printStackTrace(); - } - } - - private void handleInvite(RequestEvent requestEvent) throws Exception { Request request = requestEvent.getRequest(); log.info("Received INVITE:\n{}", request); @@ -978,6 +891,11 @@ public class SipClientService implements SipListener { if (raw != null) { String sdp = new String(raw, StandardCharsets.UTF_8); log.info("INVITE SDP:\n{}", sdp); + InviteSdpInfo sdpInfo = Gb28181SdpParser.parse(sdp); + log.info("INVITE sdpInfo: {}", sdpInfo); + rtpStreamHost = sdpInfo.ip; + rtpStreamPort = sdpInfo.port; + rtpStreamSSRC = sdpInfo.ssrc; } // ③ 构造 200 OK + SDP diff --git a/src/main/java/com/inspect/tcpserver/sip/utils/Gb28181SdpParser.java b/src/main/java/com/inspect/tcpserver/sip/utils/Gb28181SdpParser.java new file mode 100644 index 0000000..daf5f80 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/utils/Gb28181SdpParser.java @@ -0,0 +1,36 @@ +package com.inspect.tcpserver.sip.utils; + +import com.inspect.tcpserver.sip.media.InviteSdpInfo; + +public class Gb28181SdpParser { + public static InviteSdpInfo parse(String sdp) { + InviteSdpInfo info = new InviteSdpInfo(); + + String[] lines = sdp.split("\r?\n"); + + for (String line : lines) { + line = line.trim(); + + // 解析IP + if (line.startsWith("c=IN IP4")) { + // c=IN IP4 192.168.1.116 + info.ip = line.substring("c=IN IP4".length()).trim(); + } + + // 解析端口 + if (line.startsWith("m=video")) { + // m=video 30000 RTP/AVP 98 + String[] arr = line.split(" "); + info.port = Integer.parseInt(arr[1]); + } + + // 解析SSRC + if (line.startsWith("y=")) { + // y=0700000100 + info.ssrc = Long.parseLong(line.substring(2)); + } + } + + return info; + } +} diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 4153a9b..f25e91f 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -52,7 +52,7 @@ sip: domain: 192.168.1.116 # 平台SIP服务IP或域名 port: 5060 transport: tcp # 也可以为udp - local-ip: 192.168.1.97 # 本服务的外网或可达IP + local-ip: 192.168.1.3 # 本服务的外网或可达IP local-port: 5061 expires: 3600