From 640c732b2bf97c32272fed54c76abcd54b14696a Mon Sep 17 00:00:00 2001 From: htjcAdmin Date: Mon, 12 Jan 2026 15:45:04 +0800 Subject: [PATCH] =?UTF-8?q?/*SIP=E5=8D=8F=E8=AE=AE=E5=AF=B9=E6=8E=A5?= =?UTF-8?q?=E9=9F=B3=E9=A2=91=E5=8D=8F=E8=AE=AE=E6=8E=A8=E6=B5=81=E7=9B=B8?= =?UTF-8?q?=E5=85=B3=E7=9A=84=E5=B0=9D=E8=AF=95=E5=92=8C=E4=BF=AE=E6=94=B9?= =?UTF-8?q?*/?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 5 + .../tcpserver/sip/gb28181/Gb28181PsMuxer.java | 45 ++++++ .../sip/gb28181/H264AnnexBExtractor.java | 71 +++++++++ .../tcpserver/sip/gb28181/MediaSession.java | 46 ++++++ .../tcpserver/sip/gb28181/MediaSessionEx.java | 28 ++++ .../tcpserver/sip/gb28181/PesPacket.java | 30 ++++ .../tcpserver/sip/gb28181/PsHeader.java | 35 +++++ .../tcpserver/sip/gb28181/PsMuxer.java | 21 +++ .../tcpserver/sip/gb28181/PsPackHeader.java | 10 ++ .../tcpserver/sip/gb28181/PsPesPacket.java | 28 ++++ .../sip/gb28181/PsProgramStreamMap.java | 12 ++ .../tcpserver/sip/gb28181/PsSystemHeader.java | 12 ++ .../tcpserver/sip/gb28181/RtpSender.java | 60 ++++++++ .../tcpserver/sip/gb28181/RtpSenderEx.java | 84 +++++++++++ .../tcpserver/sip/gb28181/RtspClient.java | 74 ++++++++++ .../tcpserver/sip/media/MediaSession.java | 45 ++++++ .../inspect/tcpserver/sip/media/SdpInfo.java | 138 ++++++++++++++++++ .../tcpserver/sip/media/SdpParser.java | 92 ++++++++++++ .../tcpserver/sip/media/rtcp/RtcpPacket.java | 24 +++ .../sip/media/rtcp/RtcpSrSender.java | 13 ++ .../tcpserver/sip/media/rtp/RtpPacket.java | 37 +++++ .../sip/media/rtp/RtpPacketizer.java | 48 ++++++ .../tcpserver/sip/media/rtp/RtpSender.java | 11 ++ .../tcpserver/sip/media/rtp/RtpTcpSender.java | 31 ++++ .../tcpserver/sip/media/rtp/RtpUdpSender.java | 32 ++++ .../sip/media/rtsp/H264AnnexBExtractor.java | 21 +++ .../tcpserver/sip/media/rtsp/RtspClient.java | 33 +++++ .../sip/service/SipClientService.java | 119 ++++++++++++--- .../sip/stream/H264AnnexBExtractor.java | 35 +++++ .../tcpserver/sip/stream/MediaSession.java | 35 +++++ .../tcpserver/sip/stream/RtpClock.java | 15 ++ .../tcpserver/sip/stream/RtpPacket.java | 39 +++++ .../tcpserver/sip/stream/RtpSender.java | 46 ++++++ .../tcpserver/sip/stream/RtpSocket.java | 26 ++++ .../tcpserver/sip/stream/RtspClient.java | 59 ++++++++ .../tcpserver/sip/stream/ps/PsConstants.java | 15 ++ .../tcpserver/sip/stream/ps/PsMuxer.java | 28 ++++ .../tcpserver/sip/stream/ps/PsPackHeader.java | 27 ++++ .../tcpserver/sip/stream/ps/PsPesPacket.java | 35 +++++ .../sip/stream/ps/PsProgramStreamMap.java | 26 ++++ .../sip/stream/ps/PsSystemHeader.java | 21 +++ src/main/resources/application-dev.yml | 2 +- 42 files changed, 1590 insertions(+), 24 deletions(-) create mode 100644 src/main/java/com/inspect/tcpserver/sip/gb28181/Gb28181PsMuxer.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/gb28181/H264AnnexBExtractor.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/gb28181/MediaSession.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/gb28181/MediaSessionEx.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/gb28181/PesPacket.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/gb28181/PsHeader.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/gb28181/PsMuxer.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/gb28181/PsPackHeader.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/gb28181/PsPesPacket.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/gb28181/PsProgramStreamMap.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/gb28181/PsSystemHeader.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/gb28181/RtpSender.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/gb28181/RtpSenderEx.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/gb28181/RtspClient.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/media/MediaSession.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/media/SdpInfo.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/media/SdpParser.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/media/rtcp/RtcpPacket.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/media/rtcp/RtcpSrSender.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/media/rtp/RtpPacket.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/media/rtp/RtpPacketizer.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/media/rtp/RtpSender.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/media/rtp/RtpTcpSender.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/media/rtp/RtpUdpSender.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/media/rtsp/H264AnnexBExtractor.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/media/rtsp/RtspClient.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/stream/H264AnnexBExtractor.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/stream/MediaSession.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/stream/RtpClock.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/stream/RtpPacket.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/stream/RtpSender.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/stream/RtpSocket.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/stream/RtspClient.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/stream/ps/PsConstants.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/stream/ps/PsMuxer.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/stream/ps/PsPackHeader.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/stream/ps/PsPesPacket.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/stream/ps/PsProgramStreamMap.java create mode 100644 src/main/java/com/inspect/tcpserver/sip/stream/ps/PsSystemHeader.java diff --git a/pom.xml b/pom.xml index 64bd741..5d5446d 100644 --- a/pom.xml +++ b/pom.xml @@ -136,6 +136,11 @@ com.fasterxml.jackson.dataformat jackson-dataformat-xml + + org.bytedeco + javacv-platform + 1.5.9 + diff --git a/src/main/java/com/inspect/tcpserver/sip/gb28181/Gb28181PsMuxer.java b/src/main/java/com/inspect/tcpserver/sip/gb28181/Gb28181PsMuxer.java new file mode 100644 index 0000000..54a17dc --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/gb28181/Gb28181PsMuxer.java @@ -0,0 +1,45 @@ +package com.inspect.tcpserver.sip.gb28181; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; + +public class Gb28181PsMuxer { + + private boolean headerSent = false; + + public byte[] muxH264(byte[] annexB, long pts90k) { + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + if (!headerSent) { + out.write(PsHeader.packHeader()); + out.write(PsHeader.systemHeader()); + out.write(PsHeader.psmHeader()); + headerSent = true; + } + + out.write(PesPacket.buildH264Pes(annexB, pts90k)); + return out.toByteArray(); + + } catch (Exception e) { + return null; + } + } + + public byte[] muxOneFrame(byte[] h264, long pts90k) { + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + // ⚠️ 每一帧都必须有 + out.write(PsHeader.packHeader()); + out.write(PsHeader.systemHeader()); + out.write(PsHeader.psmHeader()); + + out.write(PesPacket.buildH264Pes(h264, pts90k)); + return out.toByteArray(); + + } catch (Exception e) { + return null; + } + } +} diff --git a/src/main/java/com/inspect/tcpserver/sip/gb28181/H264AnnexBExtractor.java b/src/main/java/com/inspect/tcpserver/sip/gb28181/H264AnnexBExtractor.java new file mode 100644 index 0000000..ca65b8a --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/gb28181/H264AnnexBExtractor.java @@ -0,0 +1,71 @@ +package com.inspect.tcpserver.sip.gb28181; + +import org.bytedeco.ffmpeg.avcodec.AVPacket; +import org.bytedeco.javacpp.BytePointer; + +import java.nio.ByteBuffer; + +public class H264AnnexBExtractor { + + private static final byte[] START_CODE = {0, 0, 0, 1}; + + /** + * 从 AVPacket 中提取 Annex-B H264 + */ + public static byte[] extractFromAvPacket(AVPacket pkt) { + if (pkt == null || pkt.size() <= 0 || pkt.data() == null) { + return null; + } + + BytePointer ptr = pkt.data(); + int size = pkt.size(); + + byte[] raw = new byte[size]; + ptr.get(raw); + + // 已经是 Annex-B + if (isAnnexB(raw)) { + return raw; + } + + // AVCC → Annex-B + return avccToAnnexB(raw); + } + + /** + * 判断是否 Annex-B + */ + private static boolean isAnnexB(byte[] data) { + if (data.length < 4) return false; + return data[0] == 0 && data[1] == 0 && data[2] == 0 && data[3] == 1; + } + + /** + * AVCC(length-prefixed)→ Annex-B(start code) + */ + private static byte[] avccToAnnexB(byte[] avcc) { + ByteBuffer in = ByteBuffer.wrap(avcc); + ByteBuffer out = ByteBuffer.allocate(avcc.length + 64); + + while (in.remaining() >= 4) { + int nalLen = + ((in.get() & 0xff) << 24) | + ((in.get() & 0xff) << 16) | + ((in.get() & 0xff) << 8) | + (in.get() & 0xff); + + if (nalLen <= 0 || nalLen > in.remaining()) { + break; + } + + out.put(START_CODE); + out.put(in.array(), in.position(), nalLen); + in.position(in.position() + nalLen); + } + + byte[] annexB = new byte[out.position()]; + out.flip(); + out.get(annexB); + return annexB; + } +} diff --git a/src/main/java/com/inspect/tcpserver/sip/gb28181/MediaSession.java b/src/main/java/com/inspect/tcpserver/sip/gb28181/MediaSession.java new file mode 100644 index 0000000..83fc511 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/gb28181/MediaSession.java @@ -0,0 +1,46 @@ +package com.inspect.tcpserver.sip.gb28181; + +import java.net.*; +import java.util.concurrent.atomic.AtomicBoolean; + + +public class MediaSession { + private final String remoteIp; + private final int remotePort; + private final int ssrc; + private DatagramSocket socket; + private final AtomicBoolean running = new AtomicBoolean(false); + + + private final RtpSender rtpSender; + private final PsMuxer psMuxer; + + + public MediaSession(String ip, int port, int ssrc) throws Exception { + this.remoteIp = ip; + this.remotePort = port; + this.ssrc = ssrc; + this.socket = new DatagramSocket(); + this.rtpSender = new RtpSender(socket, InetAddress.getByName(ip), port, ssrc); + this.psMuxer = new PsMuxer(); + } + + + public void start() { + running.set(true); + } + + + public void stop() { + running.set(false); + socket.close(); + } + + + // RTSP Client 回调此方法 + public void onH264Frame(byte[] annexB, long pts90k) throws Exception { + if (!running.get()) return; + byte[] ps = psMuxer.muxH264(annexB, pts90k); + rtpSender.send(ps, true); + } +} diff --git a/src/main/java/com/inspect/tcpserver/sip/gb28181/MediaSessionEx.java b/src/main/java/com/inspect/tcpserver/sip/gb28181/MediaSessionEx.java new file mode 100644 index 0000000..6824d61 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/gb28181/MediaSessionEx.java @@ -0,0 +1,28 @@ +package com.inspect.tcpserver.sip.gb28181; + +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetSocketAddress; + +public class MediaSessionEx { + + private final RtpSenderEx rtpSenderEx; + private final Gb28181PsMuxer psMuxer; + + public MediaSessionEx(String remoteIp, int remotePort, int ssrc) throws Exception { + this.rtpSenderEx = new RtpSenderEx(remoteIp, remotePort, ssrc, 96); + this.psMuxer = new Gb28181PsMuxer(); + } + + /** RTSP Client 回调 */ + public void onH264Frame(byte[] annexB, long pts90k) { + byte[] ps = psMuxer.muxOneFrame(annexB, pts90k); + if (ps != null) { + rtpSenderEx.sendOne(ps, pts90k); + } + } + + public void close() { + rtpSenderEx.close(); + } +} diff --git a/src/main/java/com/inspect/tcpserver/sip/gb28181/PesPacket.java b/src/main/java/com/inspect/tcpserver/sip/gb28181/PesPacket.java new file mode 100644 index 0000000..f9c9be0 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/gb28181/PesPacket.java @@ -0,0 +1,30 @@ +package com.inspect.tcpserver.sip.gb28181; + +import java.io.ByteArrayOutputStream; + +public class PesPacket { + + public static byte[] buildH264Pes(byte[] h264, long pts90k) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + out.write(new byte[]{ + 0x00,0x00,0x01,(byte)0xE0, + 0x00,0x00, + (byte)0x80,(byte)0x80,0x05 + }); + + writePts(out, pts90k); + out.write(h264); + + return out.toByteArray(); + } + + private static void writePts(ByteArrayOutputStream out, long pts) { + long val = pts & 0x1FFFFFFFFL; + out.write((byte)(0x21 | ((val >> 29) & 0x0E))); + out.write((byte)(val >> 22)); + out.write((byte)(0x01 | ((val >> 14) & 0xFE))); + out.write((byte)(val >> 7)); + out.write((byte)(0x01 | ((val << 1) & 0xFE))); + } +} diff --git a/src/main/java/com/inspect/tcpserver/sip/gb28181/PsHeader.java b/src/main/java/com/inspect/tcpserver/sip/gb28181/PsHeader.java new file mode 100644 index 0000000..7b87773 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/gb28181/PsHeader.java @@ -0,0 +1,35 @@ +package com.inspect.tcpserver.sip.gb28181; + +public class PsHeader { + + public static byte[] packHeader() { + return new byte[]{ + 0x00,0x00,0x01,(byte)0xBA, + 0x44,0x00,0x04,0x00, + 0x04,0x01,(byte)0x89,(byte)0xC3, + (byte)0xF8 + }; + } + + public static byte[] systemHeader() { + return new byte[]{ + 0x00,0x00,0x01,(byte)0xBB, + 0x00,0x0C, + (byte)0x80,0x04, + (byte)0xE0,0x07, + (byte)0xE0,(byte)0xC0, + 0x20,(byte)0xBD,(byte)0xE0 + }; + } + + public static byte[] psmHeader() { + return new byte[]{ + 0x00,0x00,0x01,(byte)0xBC, + 0x00,0x12, + 0x00,0x00, + (byte)0xE0,0x00, + 0x1B,0x00,0x00, + 0x00,0x00 + }; + } +} diff --git a/src/main/java/com/inspect/tcpserver/sip/gb28181/PsMuxer.java b/src/main/java/com/inspect/tcpserver/sip/gb28181/PsMuxer.java new file mode 100644 index 0000000..76c43c4 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/gb28181/PsMuxer.java @@ -0,0 +1,21 @@ +package com.inspect.tcpserver.sip.gb28181; + +import java.io.*; + + +public class PsMuxer { + private boolean sentHeader = false; + + + public byte[] muxH264(byte[] annexB, long pts90k) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + if (!sentHeader) { + out.write(PsPackHeader.packHeader()); + out.write(PsSystemHeader.systemHeader()); + out.write(PsProgramStreamMap.psm()); + sentHeader = true; + } + out.write(PsPesPacket.pes(annexB, pts90k)); + return out.toByteArray(); + } +} diff --git a/src/main/java/com/inspect/tcpserver/sip/gb28181/PsPackHeader.java b/src/main/java/com/inspect/tcpserver/sip/gb28181/PsPackHeader.java new file mode 100644 index 0000000..02f9050 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/gb28181/PsPackHeader.java @@ -0,0 +1,10 @@ +package com.inspect.tcpserver.sip.gb28181; + +public class PsPackHeader { + public static byte[] packHeader() { + return new byte[]{ + 0x00,0x00,0x01,(byte)0xBA, + 0x44,0x00,0x04,0x00,0x04,0x01,0x00,0x01 + }; + } +} diff --git a/src/main/java/com/inspect/tcpserver/sip/gb28181/PsPesPacket.java b/src/main/java/com/inspect/tcpserver/sip/gb28181/PsPesPacket.java new file mode 100644 index 0000000..3b518bf --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/gb28181/PsPesPacket.java @@ -0,0 +1,28 @@ +package com.inspect.tcpserver.sip.gb28181; + +import java.io.*; + + +public class PsPesPacket { + public static byte[] pes(byte[] h264, long pts) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + out.write(new byte[]{0x00,0x00,0x01,(byte)0xE0}); + int len = h264.length + 13; + out.write((len >> 8) & 0xFF); + out.write(len & 0xFF); + out.write(new byte[]{(byte)0x80,(byte)0x80,0x05}); + writePts(out, pts); + out.write(h264); + return out.toByteArray(); + } + + + private static void writePts(ByteArrayOutputStream out, long pts) throws IOException { + long val = pts & 0x1FFFFFFFFL; + out.write((byte)(0x21 | (((val >> 30) & 0x07) << 1))); + out.write((byte)((val >> 22) & 0xFF)); + out.write((byte)((((val >> 15) & 0x7F) << 1) | 1)); + out.write((byte)((val >> 7) & 0xFF)); + out.write((byte)(((val & 0x7F) << 1) | 1)); + } +} diff --git a/src/main/java/com/inspect/tcpserver/sip/gb28181/PsProgramStreamMap.java b/src/main/java/com/inspect/tcpserver/sip/gb28181/PsProgramStreamMap.java new file mode 100644 index 0000000..53f6a23 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/gb28181/PsProgramStreamMap.java @@ -0,0 +1,12 @@ +package com.inspect.tcpserver.sip.gb28181; + +public class PsProgramStreamMap { + public static byte[] psm() { + return new byte[]{ + 0x00,0x00,0x01,(byte)0xBC, + 0x00,0x12, + (byte)0xE0,0x00,0x00,0x00, + 0x01,(byte)0xE0,0x00,0x00 + }; + } +} diff --git a/src/main/java/com/inspect/tcpserver/sip/gb28181/PsSystemHeader.java b/src/main/java/com/inspect/tcpserver/sip/gb28181/PsSystemHeader.java new file mode 100644 index 0000000..71b7238 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/gb28181/PsSystemHeader.java @@ -0,0 +1,12 @@ +package com.inspect.tcpserver.sip.gb28181; + +public class PsSystemHeader { + public static byte[] systemHeader() { + return new byte[]{ + 0x00,0x00,0x01,(byte)0xBB, + 0x00,0x0C, + (byte)0x80,(byte)0x04,(byte)0xE4,(byte)0x7F, + (byte)0xE0,0x07,(byte)0xE1,0x07 + }; + } +} diff --git a/src/main/java/com/inspect/tcpserver/sip/gb28181/RtpSender.java b/src/main/java/com/inspect/tcpserver/sip/gb28181/RtpSender.java new file mode 100644 index 0000000..60fa841 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/gb28181/RtpSender.java @@ -0,0 +1,60 @@ +package com.inspect.tcpserver.sip.gb28181; + +import java.net.*; +import java.util.concurrent.atomic.AtomicInteger; + + +public class RtpSender { + private static final int RTP_HEADER_SIZE = 12; + private static final int MTU = 1400; + + + private final DatagramSocket socket; + private final InetAddress remote; + private final int port; + private final int ssrc; + private final AtomicInteger seq = new AtomicInteger(0); + private int timestamp = 0; + + + public RtpSender(DatagramSocket socket, InetAddress remote, int port, int ssrc) { + this.socket = socket; + this.remote = remote; + this.port = port; + this.ssrc = ssrc; + } + + + public void send(byte[] ps, boolean marker) throws Exception { + int offset = 0; + while (offset < ps.length) { + + int size = Math.min(MTU, ps.length - offset); + byte[] pkt = new byte[RTP_HEADER_SIZE + size]; + + + pkt[0] = (byte) 0x80; + pkt[1] = (byte) (96 | (marker ? 0x80 : 0x00)); + pkt[2] = (byte) (seq.get() >> 8); + pkt[3] = (byte) (seq.getAndIncrement()); + + + timestamp += 3600; + pkt[4] = (byte) (timestamp >> 24); + pkt[5] = (byte) (timestamp >> 16); + pkt[6] = (byte) (timestamp >> 8); + pkt[7] = (byte) (timestamp); + + + pkt[8] = (byte) (ssrc >> 24); + pkt[9] = (byte) (ssrc >> 16); + pkt[10] = (byte) (ssrc >> 8); + pkt[11] = (byte) (ssrc); + + + System.arraycopy(ps, offset, pkt, RTP_HEADER_SIZE, size); + socket.send(new DatagramPacket(pkt, pkt.length, remote, port)); + offset += size; + } + } +} diff --git a/src/main/java/com/inspect/tcpserver/sip/gb28181/RtpSenderEx.java b/src/main/java/com/inspect/tcpserver/sip/gb28181/RtpSenderEx.java new file mode 100644 index 0000000..12d6044 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/gb28181/RtpSenderEx.java @@ -0,0 +1,84 @@ +package com.inspect.tcpserver.sip.gb28181; + +import java.net.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class RtpSenderEx { + + private final DatagramSocket socket; + private final InetSocketAddress target; + private final int ssrc; + private final int payloadType; + private final AtomicInteger seq = new AtomicInteger(0); + + public RtpSenderEx(String ip, int port, int ssrc, int pt) throws Exception { + this.socket = new DatagramSocket(); + this.target = new InetSocketAddress(ip, port); + this.ssrc = ssrc; + this.payloadType = pt; + } + +// public void send(byte[] ps, long ts90k) { +// int offset = 0; +// while (offset < ps.length) { +// int len = Math.min(1400, ps.length - offset); +// byte[] rtp = buildRtp(ps, offset, len, ts90k); +// socketSend(rtp); +// offset += len; +// } +// } +// +// private byte[] buildRtp(byte[] ps, int off, int len, long ts) { +// byte[] pkt = new byte[12 + len]; +// pkt[0] = (byte) 0x80; +// pkt[1] = (byte) payloadType; +// pkt[2] = (byte) (seq.getAndIncrement() >> 8); +// pkt[3] = (byte) seq.get(); +// pkt[4] = (byte) (ts >> 24); +// pkt[5] = (byte) (ts >> 16); +// pkt[6] = (byte) (ts >> 8); +// pkt[7] = (byte) ts; +// pkt[8] = (byte) (ssrc >> 24); +// pkt[9] = (byte) (ssrc >> 16); +// pkt[10] = (byte) (ssrc >> 8); +// pkt[11] = (byte) ssrc; +// +// System.arraycopy(ps, off, pkt, 12, len); +// return pkt; +// } + + public void sendOne(byte[] ps, long ts90k) { + byte[] rtp = buildRtp(ps, ts90k, true); + socketSend(rtp); + } + + private byte[] buildRtp(byte[] payload, long ts, boolean marker) { + byte[] pkt = new byte[12 + payload.length]; + pkt[0] = (byte) 0x80; + pkt[1] = (byte) (payloadType | (marker ? 0x80 : 0)); + pkt[2] = (byte) (seq.get() >> 8); + pkt[3] = (byte) seq.getAndIncrement(); + pkt[4] = (byte) (ts >> 24); + pkt[5] = (byte) (ts >> 16); + pkt[6] = (byte) (ts >> 8); + pkt[7] = (byte) ts; + pkt[8] = (byte) (ssrc >> 24); + pkt[9] = (byte) (ssrc >> 16); + pkt[10] = (byte) (ssrc >> 8); + pkt[11] = (byte) ssrc; + + System.arraycopy(payload, 0, pkt, 12, payload.length); + return pkt; + } + + + private void socketSend(byte[] pkt) { + try { + socket.send(new DatagramPacket(pkt, pkt.length, target)); + } catch (Exception ignored) {} + } + + public void close() { + socket.close(); + } +} diff --git a/src/main/java/com/inspect/tcpserver/sip/gb28181/RtspClient.java b/src/main/java/com/inspect/tcpserver/sip/gb28181/RtspClient.java new file mode 100644 index 0000000..273a557 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/gb28181/RtspClient.java @@ -0,0 +1,74 @@ +package com.inspect.tcpserver.sip.gb28181; + +import org.bytedeco.ffmpeg.avcodec.AVPacket; +import org.bytedeco.javacv.FFmpegFrameGrabber; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; + +public class RtspClient { + + private final String url; + private FFmpegFrameGrabber grabber; + private final AtomicBoolean running = new AtomicBoolean(false); + + public RtspClient(String url) { + this.url = url; + } + + /** + * @param onH264 callback(h264AnnexB, pts90k) + */ + public void start(BiConsumer onH264) throws Exception { + grabber = new FFmpegFrameGrabber(url); + grabber.setOption("rtsp_transport", "tcp"); + grabber.setOption("stimeout", "5000000"); + grabber.start(); + + running.set(true); + + new Thread(() -> { + try { + while (running.get()) { + AVPacket pkt = grabber.grabPacket(); + if (pkt == null || pkt.size() <= 0) { + continue; + } + + // 1️⃣ 提取 Annex-B H264 + byte[] annexB = H264AnnexBExtractor.extractFromAvPacket(pkt); + if (annexB == null) { + continue; + } + + // 2️⃣ PTS → 90kHz + long pts90k = ptsTo90k(pkt.pts(), grabber.getTimestamp()); + + onH264.accept(annexB, pts90k); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + stop(); + } + }, "RtspClient-Thread").start(); + } + + private long ptsTo90k(long pts, double timeBase) { + if (pts <= 0 || timeBase <= 0) { + return System.nanoTime() / 11_111; // fallback + } + return (long) (pts * timeBase * 90_000); + } + + public void stop() { + running.set(false); + try { + if (grabber != null) { + grabber.stop(); + grabber.release(); + } + } catch (Exception ignored) { + } + } +} diff --git a/src/main/java/com/inspect/tcpserver/sip/media/MediaSession.java b/src/main/java/com/inspect/tcpserver/sip/media/MediaSession.java new file mode 100644 index 0000000..00c6987 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/media/MediaSession.java @@ -0,0 +1,45 @@ +package com.inspect.tcpserver.sip.media; + +import com.inspect.tcpserver.sip.media.rtcp.RtcpSrSender; +import com.inspect.tcpserver.sip.media.rtp.RtpPacketizer; +import com.inspect.tcpserver.sip.media.rtp.RtpSender; +import com.inspect.tcpserver.sip.media.rtp.RtpTcpSender; +import com.inspect.tcpserver.sip.media.rtp.RtpUdpSender; +import com.inspect.tcpserver.sip.media.rtsp.RtspClient; + +public class MediaSession { + + private final String remoteIp; + private final int remotePort; + private final long ssrc; + private final boolean tcp; + private final String rtspUrl; + + public MediaSession(String remoteIp, int remotePort, long ssrc, boolean tcp, String rtspUrl) { + this.remoteIp = remoteIp; + this.remotePort = remotePort; + this.ssrc = ssrc; + this.tcp = tcp; + this.rtspUrl = rtspUrl; + } + + public void start() throws Exception { + RtpSender sender = tcp + ? new RtpTcpSender(remoteIp, remotePort) + : new RtpUdpSender(remoteIp, remotePort); + + // 先发 RTCP SR(B.6.1) + RtcpSrSender.send(sender.getControlStream(), ssrc); + + // 启动 RTSP 拉流 → RTP + RtspClient rtspClient = new RtspClient(rtspUrl); + rtspClient.start(nal -> { + try { + RtpPacketizer.packetizerAndSend(nal, ssrc, sender); + } catch (Exception e) { + e.printStackTrace(); + } + }); + } +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/media/SdpInfo.java b/src/main/java/com/inspect/tcpserver/sip/media/SdpInfo.java new file mode 100644 index 0000000..a524b84 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/media/SdpInfo.java @@ -0,0 +1,138 @@ +package com.inspect.tcpserver.sip.media; + +import java.util.HashMap; +import java.util.Map; + +public class SdpInfo { + + /** 会话级 IP(c= 行) */ + private String remoteIp; + + /** 媒体端口(m=video) */ + private int mediaPort; + + /** RTP / TCP / UDP */ + private Transport transport; + + /** payload type,如 98 */ + private int payloadType; + + /** 编码,如 H264 */ + private String codec; + + /** 时钟频率,如 90000 */ + private int clockRate; + + /** SDP setup 属性(RFC4145) */ + private String setup; // active / passive + + /** SDP connection 属性 */ + private String connection; // new + + /** y= 行里的 SSRC */ + private long ssrc; + + /** 原始属性缓存(方便扩展) */ + private final Map attributes = new HashMap<>(); + + public enum Transport { + RTP_UDP, + RTP_TCP + } + + public String getRemoteIp() { + return remoteIp; + } + + public void setRemoteIp(String remoteIp) { + this.remoteIp = remoteIp; + } + + public int getMediaPort() { + return mediaPort; + } + + public void setMediaPort(int mediaPort) { + this.mediaPort = mediaPort; + } + + public Transport getTransport() { + return transport; + } + + public void setTransport(Transport transport) { + this.transport = transport; + } + + public int getPayloadType() { + return payloadType; + } + + public void setPayloadType(int payloadType) { + this.payloadType = payloadType; + } + + public String getCodec() { + return codec; + } + + public void setCodec(String codec) { + this.codec = codec; + } + + public int getClockRate() { + return clockRate; + } + + public void setClockRate(int clockRate) { + this.clockRate = clockRate; + } + + public String getSetup() { + return setup; + } + + public void setSetup(String setup) { + this.setup = setup; + } + + public String getConnection() { + return connection; + } + + public void setConnection(String connection) { + this.connection = connection; + } + + public long getSsrc() { + return ssrc; + } + + public void setSsrc(long ssrc) { + this.ssrc = ssrc; + } + + public void putAttribute(String key, String value) { + attributes.put(key, value); + } + + public String getAttribute(String key) { + return attributes.get(key); + } + + @Override + public String toString() { + return "SdpInfo{" + + "remoteIp='" + remoteIp + '\'' + + ", mediaPort=" + mediaPort + + ", transport=" + transport + + ", payloadType=" + payloadType + + ", codec='" + codec + '\'' + + ", clockRate=" + clockRate + + ", setup='" + setup + '\'' + + ", connection='" + connection + '\'' + + ", ssrc=" + ssrc + + '}'; + } +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/media/SdpParser.java b/src/main/java/com/inspect/tcpserver/sip/media/SdpParser.java new file mode 100644 index 0000000..67c90c5 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/media/SdpParser.java @@ -0,0 +1,92 @@ +package com.inspect.tcpserver.sip.media; + +import java.util.StringTokenizer; + +public class SdpParser { + + public static SdpInfo parse(String sdp) { + SdpInfo info = new SdpInfo(); + + String[] lines = sdp.split("\\r?\\n"); + + for (String line : lines) { + line = line.trim(); + if (line.isEmpty()) continue; + + // ---------- 会话级 ---------- + if (line.startsWith("c=")) { + // c=IN IP4 192.168.1.116 + String[] parts = line.split("\\s+"); + if (parts.length >= 3) { + info.setRemoteIp(parts[2]); + } + } + + // ---------- media ---------- + else if (line.startsWith("m=video")) { + // m=video 50000 TCP/AVP 98 + StringTokenizer st = new StringTokenizer(line); + st.nextToken(); // m=video + + info.setMediaPort(Integer.parseInt(st.nextToken())); + + String proto = st.nextToken(); + if (proto.toUpperCase().contains("TCP")) { + info.setTransport(SdpInfo.Transport.RTP_TCP); + } else { + info.setTransport(SdpInfo.Transport.RTP_UDP); + } + + info.setPayloadType(Integer.parseInt(st.nextToken())); + } + + // ---------- rtpmap ---------- + else if (line.startsWith("a=rtpmap")) { + // a=rtpmap:98 H264/90000 + String value = line.substring("a=rtpmap:".length()); + String[] parts = value.split("\\s+"); + if (parts.length == 2) { + String[] codecInfo = parts[1].split("/"); + info.setCodec(codecInfo[0]); + info.setClockRate(Integer.parseInt(codecInfo[1])); + } + } + + // ---------- setup ---------- + else if (line.startsWith("a=setup:")) { + info.setSetup(line.substring("a=setup:".length())); + } + + // ---------- connection ---------- + else if (line.startsWith("a=connection:")) { + info.setConnection(line.substring("a=connection:".length())); + } + + // ---------- fmtp ---------- + else if (line.startsWith("a=fmtp:")) { + // 可留作扩展 + info.putAttribute("fmtp", line); + } + + // ---------- y= (SSRC) ---------- + else if (line.startsWith("y=")) { + // y=0700000038 + info.setSsrc(Long.parseLong(line.substring(2))); + } + + // ---------- 其他属性 ---------- + else if (line.startsWith("a=")) { + int idx = line.indexOf(':'); + if (idx > 0) { + info.putAttribute( + line.substring(2, idx), + line.substring(idx + 1) + ); + } + } + } + + return info; + } +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/media/rtcp/RtcpPacket.java b/src/main/java/com/inspect/tcpserver/sip/media/rtcp/RtcpPacket.java new file mode 100644 index 0000000..e094cf9 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/media/rtcp/RtcpPacket.java @@ -0,0 +1,24 @@ +package com.inspect.tcpserver.sip.media.rtcp; + +import java.nio.ByteBuffer; + +public class RtcpPacket { + + public static byte[] buildSr(long ssrc) { + ByteBuffer buf = ByteBuffer.allocate(28); + + buf.put((byte) 0x80); + buf.put((byte) 200); + buf.putShort((short) 6); + buf.putInt((int) ssrc); + + long ntp = System.currentTimeMillis() * 2208988800L; + buf.putLong(ntp); + buf.putInt(0); + buf.putInt(0); + buf.putInt(0); + + return buf.array(); + } +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/media/rtcp/RtcpSrSender.java b/src/main/java/com/inspect/tcpserver/sip/media/rtcp/RtcpSrSender.java new file mode 100644 index 0000000..290cfb4 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/media/rtcp/RtcpSrSender.java @@ -0,0 +1,13 @@ +package com.inspect.tcpserver.sip.media.rtcp; + +import java.io.OutputStream; + +public class RtcpSrSender { + + public static void send(OutputStream out, long ssrc) throws Exception { + byte[] sr = RtcpPacket.buildSr(ssrc); + out.write(sr); + out.flush(); + } +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/media/rtp/RtpPacket.java b/src/main/java/com/inspect/tcpserver/sip/media/rtp/RtpPacket.java new file mode 100644 index 0000000..1460f32 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/media/rtp/RtpPacket.java @@ -0,0 +1,37 @@ +package com.inspect.tcpserver.sip.media.rtp; + +public class RtpPacket { + + public static byte[] build( + int payloadType, + int seq, + int timestamp, + long ssrc, + byte[] payload, + boolean marker + ) { + int headerSize = 12; + byte[] packet = new byte[headerSize + payload.length]; + + packet[0] = (byte) 0x80; + packet[1] = (byte) (payloadType & 0x7F); + if (marker) { + packet[1] |= 0x80; + } + + packet[2] = (byte) (seq >> 8); + packet[3] = (byte) seq; + packet[4] = (byte) (timestamp >> 24); + packet[5] = (byte) (timestamp >> 16); + packet[6] = (byte) (timestamp >> 8); + packet[7] = (byte) timestamp; + packet[8] = (byte) (ssrc >> 24); + packet[9] = (byte) (ssrc >> 16); + packet[10] = (byte) (ssrc >> 8); + packet[11] = (byte) ssrc; + + System.arraycopy(payload, 0, packet, headerSize, payload.length); + return packet; + } +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/media/rtp/RtpPacketizer.java b/src/main/java/com/inspect/tcpserver/sip/media/rtp/RtpPacketizer.java new file mode 100644 index 0000000..d687356 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/media/rtp/RtpPacketizer.java @@ -0,0 +1,48 @@ +package com.inspect.tcpserver.sip.media.rtp; + +public class RtpPacketizer { + + private static final int MTU = 1400; + private static int seq = 0; + private static int timestamp = 0; + + public static void packetizerAndSend(byte[] nal, long ssrc, RtpSender sender) throws Exception { + if (nal.length <= MTU) { + byte[] pkt = RtpPacket.build( + 98, seq++, timestamp, ssrc, nal, true + ); + sender.send(pkt, pkt.length); + } else { + // FU-A + int nalHeader = nal[0] & 0xFF; + int fuIndicator = (nalHeader & 0xE0) | 28; + int fuHeaderStart = 0x80 | (nalHeader & 0x1F); + int fuHeaderMid = nalHeader & 0x1F; + int fuHeaderEnd = 0x40 | (nalHeader & 0x1F); + + int offset = 1; + boolean start = true; + + while (offset < nal.length) { + int size = Math.min(MTU - 2, nal.length - offset); + byte[] payload = new byte[size + 2]; + + payload[0] = (byte) fuIndicator; + payload[1] = (byte) (start ? fuHeaderStart : + (offset + size >= nal.length ? fuHeaderEnd : fuHeaderMid)); + + System.arraycopy(nal, offset, payload, 2, size); + offset += size; + start = false; + + byte[] pkt = RtpPacket.build( + 98, seq++, timestamp, ssrc, payload, offset >= nal.length + ); + sender.send(pkt, pkt.length); + } + } + + timestamp += 3600; + } +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/media/rtp/RtpSender.java b/src/main/java/com/inspect/tcpserver/sip/media/rtp/RtpSender.java new file mode 100644 index 0000000..45f632d --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/media/rtp/RtpSender.java @@ -0,0 +1,11 @@ +package com.inspect.tcpserver.sip.media.rtp; + +import java.io.OutputStream; + +public interface RtpSender { + + void send(byte[] data, int len) throws Exception; + + OutputStream getControlStream(); +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/media/rtp/RtpTcpSender.java b/src/main/java/com/inspect/tcpserver/sip/media/rtp/RtpTcpSender.java new file mode 100644 index 0000000..9f9e7c9 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/media/rtp/RtpTcpSender.java @@ -0,0 +1,31 @@ +package com.inspect.tcpserver.sip.media.rtp; + +import java.io.OutputStream; +import java.net.Socket; +import java.nio.ByteBuffer; + +public class RtpTcpSender implements RtpSender { + + private final Socket socket; + private final OutputStream out; + + public RtpTcpSender(String ip, int port) throws Exception { + this.socket = new Socket(ip, port); + this.out = socket.getOutputStream(); + } + + @Override + public void send(byte[] data, int len) throws Exception { + ByteBuffer buf = ByteBuffer.allocate(len + 2); + buf.putShort((short) len); + buf.put(data, 0, len); + out.write(buf.array()); + out.flush(); + } + + @Override + public OutputStream getControlStream() { + return out; + } +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/media/rtp/RtpUdpSender.java b/src/main/java/com/inspect/tcpserver/sip/media/rtp/RtpUdpSender.java new file mode 100644 index 0000000..84a1b25 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/media/rtp/RtpUdpSender.java @@ -0,0 +1,32 @@ +package com.inspect.tcpserver.sip.media.rtp; + +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; + +public class RtpUdpSender implements RtpSender { + + private final DatagramSocket socket; + private final InetAddress addr; + private final int port; + + public RtpUdpSender(String ip, int port) throws Exception { + this.socket = new DatagramSocket(); + this.addr = InetAddress.getByName(ip); + this.port = port; + } + + @Override + public void send(byte[] data, int len) throws Exception { + DatagramPacket packet = new DatagramPacket(data, len, addr, port); + socket.send(packet); + } + + @Override + public OutputStream getControlStream() { + return new ByteArrayOutputStream(); // UDP 场景可忽略 + } +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/media/rtsp/H264AnnexBExtractor.java b/src/main/java/com/inspect/tcpserver/sip/media/rtsp/H264AnnexBExtractor.java new file mode 100644 index 0000000..21bb50d --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/media/rtsp/H264AnnexBExtractor.java @@ -0,0 +1,21 @@ +package com.inspect.tcpserver.sip.media.rtsp; + +import org.bytedeco.javacv.Frame; + +import java.nio.ByteBuffer; + +public class H264AnnexBExtractor { + + public static byte[] extract(Frame frame) { + if (frame.image == null || frame.image.length == 0) { + return null; + } + + ByteBuffer buffer = (ByteBuffer) frame.image[0]; + byte[] data = new byte[buffer.remaining()]; + buffer.get(data); + + return data; + } +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/media/rtsp/RtspClient.java b/src/main/java/com/inspect/tcpserver/sip/media/rtsp/RtspClient.java new file mode 100644 index 0000000..549018d --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/media/rtsp/RtspClient.java @@ -0,0 +1,33 @@ +package com.inspect.tcpserver.sip.media.rtsp; + +import org.bytedeco.javacv.FFmpegFrameGrabber; +import org.bytedeco.javacv.Frame; + +import java.util.function.Consumer; + +public class RtspClient { + + private final String url; + + public RtspClient(String url) { + this.url = url; + } + + public void start(Consumer onNal) throws Exception { + FFmpegFrameGrabber grabber = new FFmpegFrameGrabber(url); + grabber.setOption("rtsp_transport", "tcp"); + grabber.start(); + + while (true) { + Frame frame = grabber.grabImage(); + if (frame == null || frame.image == null) { + continue; + } + + byte[] annexB = H264AnnexBExtractor.extract(frame); + if (annexB != null) { + onNal.accept(annexB); + } + } + } +} diff --git a/src/main/java/com/inspect/tcpserver/sip/service/SipClientService.java b/src/main/java/com/inspect/tcpserver/sip/service/SipClientService.java index d247102..30a890d 100644 --- a/src/main/java/com/inspect/tcpserver/sip/service/SipClientService.java +++ b/src/main/java/com/inspect/tcpserver/sip/service/SipClientService.java @@ -1,6 +1,12 @@ package com.inspect.tcpserver.sip.service; + +import com.inspect.tcpserver.sip.gb28181.MediaSession; +import com.inspect.tcpserver.sip.gb28181.MediaSessionEx; +import com.inspect.tcpserver.sip.gb28181.RtspClient; import com.inspect.tcpserver.sip.registry.SipEventRegistry; + + import com.inspect.tcpserver.sip.utils.DigestUtil; import com.inspect.tcpserver.sip.utils.SipXmlEnvelope; import com.inspect.tcpserver.sip.utils.SipXmlParser; @@ -20,7 +26,7 @@ import javax.sip.message.MessageFactory; import javax.sip.message.Request; import javax.sip.message.Response; import java.io.BufferedReader; -import java.io.IOException; + import java.io.InputStreamReader; import java.io.UnsupportedEncodingException; import java.net.*; @@ -833,7 +839,10 @@ public class SipClientService implements SipListener { } else if(Request.ACK.equals(request.getMethod())) { // RFC 3261: The ACK request does not generate a response. // ACK 是“事务内请求”,不是普通 SIP 方法,不生成任何 Response,ACK 是“事务内请求”,不是普通 SIP 方法 - startRtspToRtp(); + + //Thread.sleep(30); + log.info("CALL startRtspToRtp"); + //startRtspToRtp(); } else { Response notImpl = messageFactory.createResponse(Response.NOT_IMPLEMENTED, request); @@ -853,32 +862,96 @@ public class SipClientService implements SipListener { private Process ffmpegProcess; +// private void startRtspToRtp() { +// try { +// String cmd = +// "ffmpeg -rtsp_transport tcp " + +// "-i \"rtsp://admin:wd19216811@192.168.1.244:554/h264/ch1/sub/av_stream\" " + +// "-an -vcodec copy " + +// "-f rtp -payload_type 96 " + +// "\"rtp://192.168.1.116:50000?tcp\""; +// +// ProcessBuilder pb = new ProcessBuilder( +// "bash", "-c", cmd +// ); +// pb.redirectErrorStream(true); +// +// ffmpegProcess = pb.start(); +// +// new Thread(() -> { +// try (BufferedReader br = +// new BufferedReader(new InputStreamReader( +// ffmpegProcess.getInputStream()))) { +// String line; +// while ((line = br.readLine()) != null) { +// System.out.println("[FFmpeg] " + line); +// } +// } catch (IOException ignored) {} +// }).start(); +// +// } catch (Exception e) { +// e.printStackTrace(); +// } +// +// try { +// MediaSession session = new MediaSession( +// "192.168.1.116", +// 50000, +// 1, +// false, +// "rtsp://admin:wd19216811@192.168.1.244:554/h264/ch1/sub/av_stream" +// ); +// } catch (Exception e) { +// +// } +// +// +// } + +// private void startRtspToRtp() { +// try { +// MediaSession session = new MediaSession( +// "192.168.1.116", +// 50000, +// 1 +// ); +// +// session.startRtsp( +// "rtsp://admin:wd19216811@192.168.1.244:554/h264/ch1/sub/av_stream" +// ); +// +// } catch (Exception e) { +// e.printStackTrace(); +// } +// } + private void startRtspToRtp() { try { - String cmd = - "ffmpeg -rtsp_transport tcp " + - "-i \"rtsp://admin:wd19216811@192.168.1.244:554/h264/ch1/sub/av_stream\" " + - "-an -vcodec copy " + - "-f rtp -payload_type 96 " + - "\"rtp://192.168.1.116:50000?tcp\""; - - ProcessBuilder pb = new ProcessBuilder( - "bash", "-c", cmd + MediaSessionEx session = new MediaSessionEx( + "192.168.1.116", + 50000, + 1 ); - pb.redirectErrorStream(true); + //session.start(); - ffmpegProcess = pb.start(); + RtspClient client = new RtspClient( + "rtsp://admin:wd19216811@192.168.1.244:554/h264/ch1/sub/av_stream" + ); - new Thread(() -> { - try (BufferedReader br = - new BufferedReader(new InputStreamReader( - ffmpegProcess.getInputStream()))) { - String line; - while ((line = br.readLine()) != null) { - System.out.println("[FFmpeg] " + line); - } - } catch (IOException ignored) {} - }).start(); +// client.start((h264, pts90k) -> { +// try { +// session.onH264Frame(h264, pts90k); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// }); + client.start((h264, pts90k) -> { + try { + session.onH264Frame(h264, pts90k); + } catch (Exception e) { + e.printStackTrace(); + } + }); } catch (Exception e) { e.printStackTrace(); diff --git a/src/main/java/com/inspect/tcpserver/sip/stream/H264AnnexBExtractor.java b/src/main/java/com/inspect/tcpserver/sip/stream/H264AnnexBExtractor.java new file mode 100644 index 0000000..f79ecc4 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/stream/H264AnnexBExtractor.java @@ -0,0 +1,35 @@ +package com.inspect.tcpserver.sip.stream; + +import org.bytedeco.ffmpeg.avcodec.AVPacket; +import org.bytedeco.javacpp.BytePointer; + +public class H264AnnexBExtractor { + + private static final byte[] START_CODE = {0x00, 0x00, 0x00, 0x01}; + + public static byte[] extractFromAvPacket(AVPacket pkt) { + BytePointer dataPtr = pkt.data(); + int size = pkt.size(); + + if (dataPtr == null || size <= 0) { + return null; + } + + byte[] raw = new byte[size]; + dataPtr.get(raw); + + // 如果已经是 Annex-B(00 00 00 01) + if (raw.length >= 4 && + raw[0] == 0x00 && raw[1] == 0x00 && + raw[2] == 0x00 && raw[3] == 0x01) { + return raw; + } + + // AVCC → Annex-B(简单处理:前置 start code) + byte[] annexB = new byte[START_CODE.length + raw.length]; + System.arraycopy(START_CODE, 0, annexB, 0, START_CODE.length); + System.arraycopy(raw, 0, annexB, START_CODE.length, raw.length); + return annexB; + } +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/stream/MediaSession.java b/src/main/java/com/inspect/tcpserver/sip/stream/MediaSession.java new file mode 100644 index 0000000..02c79b4 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/stream/MediaSession.java @@ -0,0 +1,35 @@ +package com.inspect.tcpserver.sip.stream; + +import com.inspect.tcpserver.sip.stream.ps.PsMuxer; + +public class MediaSession { + + private final PsMuxer psMuxer; + private final RtpSender rtpSender; + private RtspClient rtspClient; + + public MediaSession(String remoteIp, int remotePort, int ssrc) throws Exception { + this.psMuxer = new PsMuxer(); + this.rtpSender = new RtpSender(remoteIp, remotePort, ssrc); + } + + public void startRtsp(String rtspUrl) throws Exception { + rtspClient = new RtspClient(rtspUrl); + + rtspClient.start(h264 -> { + try { + byte[] ps = psMuxer.mux(h264); + rtpSender.sendPs(ps); + } catch (Exception e) { + e.printStackTrace(); + } + }); + } + + public void stop() { + if (rtspClient != null) { + rtspClient.stop(); + } + rtpSender.close(); + } +} diff --git a/src/main/java/com/inspect/tcpserver/sip/stream/RtpClock.java b/src/main/java/com/inspect/tcpserver/sip/stream/RtpClock.java new file mode 100644 index 0000000..49dd4ca --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/stream/RtpClock.java @@ -0,0 +1,15 @@ +package com.inspect.tcpserver.sip.stream; + +public class RtpClock { + + private static final long CLOCK_90K = 90000; + private static final long FRAME_INTERVAL = 3600; // 40ms + + private long timestamp = 0; + + public long next() { + timestamp += FRAME_INTERVAL; + return timestamp; + } +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/stream/RtpPacket.java b/src/main/java/com/inspect/tcpserver/sip/stream/RtpPacket.java new file mode 100644 index 0000000..3ddd67d --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/stream/RtpPacket.java @@ -0,0 +1,39 @@ +package com.inspect.tcpserver.sip.stream; + +public class RtpPacket { + + public static final int HEADER_SIZE = 12; + + public byte[] buffer; + public int length; + + public RtpPacket(int payloadSize) { + buffer = new byte[HEADER_SIZE + payloadSize]; + } + + public void setHeader( + int payloadType, + int seq, + long timestamp, + int ssrc, + boolean marker + ) { + buffer[0] = (byte) 0x80; + buffer[1] = (byte) (payloadType & 0x7F); + if (marker) buffer[1] |= 0x80; + + buffer[2] = (byte) (seq >> 8); + buffer[3] = (byte) (seq); + + buffer[4] = (byte) (timestamp >> 24); + buffer[5] = (byte) (timestamp >> 16); + buffer[6] = (byte) (timestamp >> 8); + buffer[7] = (byte) (timestamp); + + buffer[8] = (byte) (ssrc >> 24); + buffer[9] = (byte) (ssrc >> 16); + buffer[10] = (byte) (ssrc >> 8); + buffer[11] = (byte) (ssrc); + } +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/stream/RtpSender.java b/src/main/java/com/inspect/tcpserver/sip/stream/RtpSender.java new file mode 100644 index 0000000..0f1acf2 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/stream/RtpSender.java @@ -0,0 +1,46 @@ +package com.inspect.tcpserver.sip.stream; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class RtpSender { + + private static final int MTU = 1400; + private static final int PAYLOAD_TYPE = 98; + + private int seq = 0; + private final int ssrc; + private final RtpSocket socket; + private final RtpClock clock = new RtpClock(); + + public RtpSender(String ip, int port, int ssrc) throws Exception { + this.ssrc = ssrc; + this.socket = new RtpSocket(ip, port); + } + + public void sendPs(byte[] ps) throws Exception { + long ts = clock.next(); + + int offset = 0; + while (offset < ps.length) { + int size = Math.min(MTU, ps.length - offset); + boolean marker = (offset + size) >= ps.length; + + RtpPacket pkt = new RtpPacket(size); + pkt.setHeader(PAYLOAD_TYPE, seq++, ts, ssrc, marker); + + System.arraycopy(ps, offset, pkt.buffer, RtpPacket.HEADER_SIZE, size); + pkt.length = RtpPacket.HEADER_SIZE + size; + + + socket.send(pkt.buffer, pkt.length); + offset += size; + log.info("sendPs offset = {}", offset); + } + } + + public void close() { + socket.close(); + } +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/stream/RtpSocket.java b/src/main/java/com/inspect/tcpserver/sip/stream/RtpSocket.java new file mode 100644 index 0000000..43f5a93 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/stream/RtpSocket.java @@ -0,0 +1,26 @@ +package com.inspect.tcpserver.sip.stream; + +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetSocketAddress; + +public class RtpSocket { + + private final DatagramSocket socket; + private final InetSocketAddress remote; + + public RtpSocket(String ip, int port) throws Exception { + socket = new DatagramSocket(); + remote = new InetSocketAddress(ip, port); + } + + public void send(byte[] data, int len) throws Exception { + DatagramPacket packet = new DatagramPacket(data, len, remote); + socket.send(packet); + } + + public void close() { + socket.close(); + } +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/stream/RtspClient.java b/src/main/java/com/inspect/tcpserver/sip/stream/RtspClient.java new file mode 100644 index 0000000..57310e8 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/stream/RtspClient.java @@ -0,0 +1,59 @@ +package com.inspect.tcpserver.sip.stream; + +import org.bytedeco.ffmpeg.avcodec.AVPacket; +import org.bytedeco.javacv.FFmpegFrameGrabber; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +public class RtspClient { + + private final String url; + private FFmpegFrameGrabber grabber; + private final AtomicBoolean running = new AtomicBoolean(false); + + public RtspClient(String url) { + this.url = url; + } + + public void start(Consumer onH264) throws Exception { + grabber = new FFmpegFrameGrabber(url); + grabber.setOption("rtsp_transport", "tcp"); + grabber.setOption("stimeout", "5000000"); + grabber.start(); + + running.set(true); + + new Thread(() -> { + try { + while (running.get()) { + AVPacket pkt = grabber.grabPacket(); + if (pkt == null || pkt.size() <= 0) { + continue; + } + + byte[] annexB = H264AnnexBExtractor.extractFromAvPacket(pkt); + if (annexB != null) { + onH264.accept(annexB); + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + stop(); + } + }, "RtspClient-Thread").start(); + } + + public void stop() { + running.set(false); + try { + if (grabber != null) { + grabber.stop(); + grabber.release(); + } + } catch (Exception ignored) { + } + } +} diff --git a/src/main/java/com/inspect/tcpserver/sip/stream/ps/PsConstants.java b/src/main/java/com/inspect/tcpserver/sip/stream/ps/PsConstants.java new file mode 100644 index 0000000..286dfb0 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/stream/ps/PsConstants.java @@ -0,0 +1,15 @@ +package com.inspect.tcpserver.sip.stream.ps; + +public final class PsConstants { + + public static final int PACK_START_CODE = 0x000001BA; + public static final int SYSTEM_HEADER_START_CODE = 0x000001BB; + public static final int PROGRAM_STREAM_MAP = 0x000001BC; + public static final int PES_VIDEO_START_CODE = 0x000001E0; + + public static final int STREAM_ID_VIDEO = 0xE0; + public static final long CLOCK_90K = 90000L; + + private PsConstants() {} +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/stream/ps/PsMuxer.java b/src/main/java/com/inspect/tcpserver/sip/stream/ps/PsMuxer.java new file mode 100644 index 0000000..4eae578 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/stream/ps/PsMuxer.java @@ -0,0 +1,28 @@ +package com.inspect.tcpserver.sip.stream.ps; + +import java.io.ByteArrayOutputStream; + +public class PsMuxer { + + private boolean headerSent = false; + private long scr = 0; + + public byte[] mux(byte[] h264) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + scr += 3600; + + if (!headerSent) { + out.write(PsPackHeader.create(scr)); + out.write(PsSystemHeader.create()); + out.write(PsProgramStreamMap.create()); + headerSent = true; + } + + out.write(PsPackHeader.create(scr)); + out.write(PsPesPacket.create(h264, scr)); + + return out.toByteArray(); + } +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/stream/ps/PsPackHeader.java b/src/main/java/com/inspect/tcpserver/sip/stream/ps/PsPackHeader.java new file mode 100644 index 0000000..90202e1 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/stream/ps/PsPackHeader.java @@ -0,0 +1,27 @@ +package com.inspect.tcpserver.sip.stream.ps; + +import java.io.ByteArrayOutputStream; + +public class PsPackHeader { + + public static byte[] create(long scr) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + out.write(new byte[]{0, 0, 1, (byte) 0xBA}); + + long b = scr & 0x1FFFFFFFFL; + + out.write((byte) (0x44 | ((b >> 27) & 0x38))); + out.write((byte) (b >> 19)); + out.write((byte) (0x04 | ((b >> 14) & 0xF8))); + out.write((byte) (b >> 6)); + out.write((byte) (0x04 | ((b << 2) & 0xF8))); + + out.write(0x01); + out.write(new byte[]{0x00, 0x01, (byte) 0x89, (byte) 0xC3}); + out.write(0xF8); + + return out.toByteArray(); + } +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/stream/ps/PsPesPacket.java b/src/main/java/com/inspect/tcpserver/sip/stream/ps/PsPesPacket.java new file mode 100644 index 0000000..d0410fe --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/stream/ps/PsPesPacket.java @@ -0,0 +1,35 @@ +package com.inspect.tcpserver.sip.stream.ps; + +import java.io.ByteArrayOutputStream; + +public class PsPesPacket { + + public static byte[] create(byte[] es, long pts) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + out.write(new byte[]{0, 0, 1, (byte) 0xE0}); + + int len = es.length + 8; + out.write((len >> 8) & 0xFF); + out.write(len & 0xFF); + + out.write(0x80); + out.write(0x80); + out.write(0x05); + + writePts(out, pts); + out.write(es); + + return out.toByteArray(); + } + + private static void writePts(ByteArrayOutputStream out, long pts) { + long v = pts & 0x1FFFFFFFFL; + out.write((byte) (0x21 | ((v >> 29) & 0x0E))); + out.write((byte) (v >> 22)); + out.write((byte) (0x01 | ((v >> 14) & 0xFE))); + out.write((byte) (v >> 7)); + out.write((byte) (0x01 | ((v << 1) & 0xFE))); + } +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/stream/ps/PsProgramStreamMap.java b/src/main/java/com/inspect/tcpserver/sip/stream/ps/PsProgramStreamMap.java new file mode 100644 index 0000000..d21ae76 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/stream/ps/PsProgramStreamMap.java @@ -0,0 +1,26 @@ +package com.inspect.tcpserver.sip.stream.ps; + +import java.io.ByteArrayOutputStream; + +public class PsProgramStreamMap { + + public static byte[] create() throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + out.write(new byte[]{0, 0, 1, (byte) 0xBC}); + out.write(new byte[]{0x00, 0x12}); + + out.write(0xE0); + out.write(new byte[]{0x00, 0x00}); + out.write(new byte[]{0x00, 0x08}); + + out.write(0x1B); // H264 + out.write(PsConstants.STREAM_ID_VIDEO); + out.write(new byte[]{0x00, 0x00}); + + out.write(new byte[]{0, 0, 0, 0}); // CRC + + return out.toByteArray(); + } +} + diff --git a/src/main/java/com/inspect/tcpserver/sip/stream/ps/PsSystemHeader.java b/src/main/java/com/inspect/tcpserver/sip/stream/ps/PsSystemHeader.java new file mode 100644 index 0000000..9844439 --- /dev/null +++ b/src/main/java/com/inspect/tcpserver/sip/stream/ps/PsSystemHeader.java @@ -0,0 +1,21 @@ +package com.inspect.tcpserver.sip.stream.ps; + +import java.io.ByteArrayOutputStream; + +public class PsSystemHeader { + + public static byte[] create() throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + out.write(new byte[]{0, 0, 1, (byte) 0xBB}); + out.write(new byte[]{0x00, 0x0C}); + + out.write(new byte[]{(byte) 0x80, 0x04, (byte) 0xE4}); + out.write(0x04); + out.write(0xE1); + out.write(new byte[]{0x7F, (byte) 0xE0, 0x7F}); + + return out.toByteArray(); + } +} + diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index b8fa741..4153a9b 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -52,7 +52,7 @@ sip: domain: 192.168.1.116 # 平台SIP服务IP或域名 port: 5060 transport: tcp # 也可以为udp - local-ip: 192.168.1.8 # 本服务的外网或可达IP + local-ip: 192.168.1.97 # 本服务的外网或可达IP local-port: 5061 expires: 3600