Browse Source

/*SIP协议:在测试环境打通INVITE视频流播放流程*/

master
htjcAdmin 1 month ago
parent
commit
a2ca791cca
8 changed files with 374 additions and 108 deletions
  1. +143
    -0
      src/main/java/com/inspect/tcpserver/sip/media/Gb28181StreamService.java
  2. +12
    -0
      src/main/java/com/inspect/tcpserver/sip/media/InviteSdpInfo.java
  3. +26
    -0
      src/main/java/com/inspect/tcpserver/sip/media/PsMuxer.java
  4. +49
    -0
      src/main/java/com/inspect/tcpserver/sip/media/RtpSender.java
  5. +82
    -0
      src/main/java/com/inspect/tcpserver/sip/media/RtspPuller.java
  6. +25
    -107
      src/main/java/com/inspect/tcpserver/sip/service/SipClientService.java
  7. +36
    -0
      src/main/java/com/inspect/tcpserver/sip/utils/Gb28181SdpParser.java
  8. +1
    -1
      src/main/resources/application-dev.yml

+ 143
- 0
src/main/java/com/inspect/tcpserver/sip/media/Gb28181StreamService.java View File

@ -0,0 +1,143 @@
package com.inspect.tcpserver.sip.media;
import lombok.extern.slf4j.Slf4j;
import org.bytedeco.ffmpeg.global.avcodec;
import org.bytedeco.javacv.FFmpegFrameGrabber;
import org.bytedeco.javacv.FFmpegFrameRecorder;
import org.bytedeco.javacv.Frame;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class Gb28181StreamService {
private Thread pushThread;
private volatile boolean isPushing = false;
private final String rtspInputUrl = "rtsp://admin:wd19216811@192.168.1.244:554/h264/ch1/sub/av_stream";
private final String rtpOutputUrl= "rtp://192.168.1.116:30000";
public void startPushStream() {
if (isPushing) {
log.warn("Push is already running.");
return;
}
pushThread = new Thread(() -> {
FFmpegFrameGrabber grabber = null;
FFmpegFrameRecorder recorder = null;
try {
// 初始化 grabber拉取 RTSP
grabber = new FFmpegFrameGrabber(rtspInputUrl);
grabber.setOption("rtsp_transport", "tcp"); // 使用 TCP 传输
grabber.start();
// 初始化 recorder输出 RTP over MPEG-TS
recorder = new FFmpegFrameRecorder(rtpOutputUrl, grabber.getImageWidth(), grabber.getImageHeight(), 0); // 0 表示无音频
recorder.setFormat("rtp_mpegts"); // 输出格式为 rtp_mpegts
// 配置视频编码器libx264
recorder.setVideoCodec(avcodec.AV_CODEC_ID_H264);
recorder.setVideoOption("preset", "ultrafast"); // 超快预设
recorder.setVideoOption("g", "60"); // GOP 大小 60
recorder.setVideoOption("keyint_min", "60"); // 最小关键帧间隔 60
recorder.setVideoOption("sc_threshold", "0"); // 场景变化阈值 0强制每 GOP 插入 I
recorder.start();
isPushing = true;
log.info("Started pushing stream from {} to {}", rtspInputUrl, rtpOutputUrl);
Frame frame;
while (isPushing && (frame = grabber.grabFrame()) != null) {
recorder.record(frame); // 推送帧
}
} catch (Exception e) {
log.error("Error during push: {}", e.getMessage(), e);
} finally {
try {
if (recorder != null) recorder.stop();
if (grabber != null) grabber.stop();
} catch (Exception e) {
log.error("Error stopping resources: {}", e.getMessage(), e);
}
isPushing = false;
log.info("Stopped pushing stream.");
}
});
pushThread.start();
}
public void startPushStream(String rtspSourceUrl, String rtpHost, int rtpPort) {
if (isPushing) {
log.warn("Push is already running.");
return;
}
pushThread = new Thread(() -> {
FFmpegFrameGrabber grabber = null;
FFmpegFrameRecorder recorder = null;
try {
// 初始化 grabber拉取 RTSP
grabber = new FFmpegFrameGrabber(rtspSourceUrl);
grabber.setOption("rtsp_transport", "tcp"); // 使用 TCP 传输
grabber.start();
// 初始化 recorder输出 RTP over MPEG-TS
String rtpDestUrl = "rtp://" + rtpHost + ":" + rtpPort;
recorder = new FFmpegFrameRecorder(rtpDestUrl, grabber.getImageWidth(), grabber.getImageHeight(), 0); // 0 表示无音频
recorder.setFormat("rtp_mpegts"); // 输出格式为 rtp_mpegts
// 配置视频编码器libx264
recorder.setVideoCodec(avcodec.AV_CODEC_ID_H264);
recorder.setVideoOption("preset", "ultrafast"); // 超快预设
recorder.setVideoOption("g", "60"); // GOP 大小 60
recorder.setVideoOption("keyint_min", "60"); // 最小关键帧间隔 60
recorder.setVideoOption("sc_threshold", "0"); // 场景变化阈值 0强制每 GOP 插入 I
recorder.start();
isPushing = true;
log.info("Started pushing rtsp stream from {} to {}", rtspSourceUrl, rtpDestUrl);
Frame frame;
while (isPushing && (frame = grabber.grabFrame()) != null) {
recorder.record(frame); // 推送帧
}
} catch (Exception e) {
log.error("Error during push: {}", e.getMessage(), e);
} finally {
try {
if (recorder != null) recorder.stop();
if (grabber != null) grabber.stop();
} catch (Exception e) {
log.error("Error stopping resources: {}", e.getMessage(), e);
}
isPushing = false;
log.info("Stopped pushing stream.");
}
});
pushThread.start();
}
public void stopPush() {
isPushing = false;
if (pushThread != null) {
try {
pushThread.join(); // 等待线程结束
} catch (InterruptedException e) {
log.error("Error joining push thread: {}", e.getMessage(), e);
}
}
}
public boolean isPushing() {
return isPushing;
}
}

+ 12
- 0
src/main/java/com/inspect/tcpserver/sip/media/InviteSdpInfo.java View File

@ -0,0 +1,12 @@
package com.inspect.tcpserver.sip.media;
public class InviteSdpInfo {
public String ip;
public int port;
public long ssrc;
@Override
public String toString() {
return "RTP推流目标 -> IP=" + ip + ", port=" + port + ", ssrc=" + ssrc;
}
}

+ 26
- 0
src/main/java/com/inspect/tcpserver/sip/media/PsMuxer.java View File

@ -0,0 +1,26 @@
package com.inspect.tcpserver.sip.media;
import java.io.ByteArrayOutputStream;
public class PsMuxer {
public byte[] pack(byte[] h264) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
// PS header (简化版)
out.write(new byte[]{
0x00,0x00,0x01,(byte)0xBA,
0x44,0x00,0x04,0x00,0x04,0x01,(byte)0x89,(byte)0xC3,(byte)0xF8
});
// PES header
out.write(new byte[]{
0x00,0x00,0x01,(byte)0xE0,
0x00,0x00
});
out.write(h264);
return out.toByteArray();
}
}

+ 49
- 0
src/main/java/com/inspect/tcpserver/sip/media/RtpSender.java View File

@ -0,0 +1,49 @@
package com.inspect.tcpserver.sip.media;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
public class RtpSender {
private DatagramSocket socket;
private InetAddress address;
private int port;
private int seq = 0;
private long timestamp = 0;
private int ssrc;
public RtpSender(String ip, int port, int ssrc) throws Exception {
this.socket = new DatagramSocket();
this.address = InetAddress.getByName(ip);
this.port = port;
this.ssrc = ssrc;
}
public void send(byte[] payload, boolean mark) throws Exception {
byte[] rtp = new byte[12 + payload.length];
rtp[0] = (byte) 0x80;
rtp[1] = (byte) (96 | (mark ? 0x80 : 0));
rtp[2] = (byte) (seq >> 8);
rtp[3] = (byte) (seq);
seq++;
timestamp += 3600; // 90k时钟
rtp[4] = (byte) (timestamp >> 24);
rtp[5] = (byte) (timestamp >> 16);
rtp[6] = (byte) (timestamp >> 8);
rtp[7] = (byte) (timestamp);
rtp[8] = (byte) (ssrc >> 24);
rtp[9] = (byte) (ssrc >> 16);
rtp[10] = (byte) (ssrc >> 8);
rtp[11] = (byte) (ssrc);
System.arraycopy(payload, 0, rtp, 12, payload.length);
socket.send(new DatagramPacket(rtp, rtp.length, address, port));
}
}

+ 82
- 0
src/main/java/com/inspect/tcpserver/sip/media/RtspPuller.java View File

@ -0,0 +1,82 @@
package com.inspect.tcpserver.sip.media;
import org.bytedeco.ffmpeg.avcodec.AVPacket;
import org.bytedeco.javacv.FFmpegFrameGrabber;
import java.nio.ByteBuffer;
import java.io.ByteArrayOutputStream;
public class RtspPuller {
public interface StreamCallback {
void onH264(byte[] data, boolean keyFrame) throws Exception;
}
private final String rtspUrl;
public RtspPuller(String rtspUrl) {
this.rtspUrl = rtspUrl;
}
public void start(StreamCallback callback) throws Exception {
FFmpegFrameGrabber grabber = new FFmpegFrameGrabber(rtspUrl);
grabber.setOption("rtsp_transport", "tcp");
grabber.setOption("stimeout", "5000000");
grabber.start();
System.out.println("RTSP connected: " + rtspUrl);
AVPacket pkt;
while (true) {
pkt = grabber.grabPacket();
if (pkt == null || pkt.size() <= 0)
continue;
if (pkt.stream_index() != grabber.getVideoStream())
continue;
// ===== 读取裸数据 =====
ByteBuffer buffer = pkt.data().asByteBuffer();
buffer.rewind();
byte[] avcc = new byte[pkt.size()];
buffer.get(avcc);
// ===== AVCC AnnexB 转换 =====
byte[] annexb = avccToAnnexB(avcc);
boolean keyFrame = (pkt.flags() & 1) != 0;
callback.onH264(annexb, keyFrame);
}
}
/**
* AVCC AnnexB (核心函数)
*/
private byte[] avccToAnnexB(byte[] avcc) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
int offset = 0;
while (offset < avcc.length) {
int naluSize =
((avcc[offset] & 0xff) << 24) |
((avcc[offset+1] & 0xff) << 16) |
((avcc[offset+2] & 0xff) << 8) |
(avcc[offset+3] & 0xff);
offset += 4;
// 写入起始码 00 00 00 01
out.write(new byte[]{0,0,0,1});
out.write(avcc, offset, naluSize);
offset += naluSize;
}
return out.toByteArray();
}
}

+ 25
- 107
src/main/java/com/inspect/tcpserver/sip/service/SipClientService.java View File

@ -1,16 +1,16 @@
package com.inspect.tcpserver.sip.service;
import com.inspect.tcpserver.sip.gb28181.MediaSession;
import com.inspect.tcpserver.sip.gb28181.MediaSessionEx;
import com.inspect.tcpserver.sip.gb28181.RtspClient;
import com.inspect.tcpserver.sip.media.Gb28181StreamService;
import com.inspect.tcpserver.sip.media.InviteSdpInfo;
import com.inspect.tcpserver.sip.registry.SipEventRegistry;
import com.inspect.tcpserver.sip.utils.DigestUtil;
import com.inspect.tcpserver.sip.utils.Gb28181SdpParser;
import com.inspect.tcpserver.sip.utils.SipXmlEnvelope;
import com.inspect.tcpserver.sip.utils.SipXmlParser;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
@ -85,12 +85,19 @@ public class SipClientService implements SipListener {
@Value("${sip.expires}")
private int expires;
private String rtpStreamHost= "192.168.1.116";
private int rtpStreamPort= 30000;
private long rtpStreamSSRC = 12345678;
private ClientTransaction lastRegisterTransaction;
private Timer refreshTimer = new Timer(true);
private String fromTag = Long.toHexString(System.currentTimeMillis());
private AtomicInteger cSeqCounter = new AtomicInteger(1);
@Autowired
Gb28181StreamService streamService;
private static final String[] IP_SERVICES = {
"https://api.ipify.org",
"https://checkip.amazonaws.com",
@ -755,6 +762,7 @@ public class SipClientService implements SipListener {
return Long.toHexString(System.currentTimeMillis() & 0xffffffffL);
}
private final String rtspInputUrl = "rtsp://admin:wd19216811@192.168.1.244:554/h264/ch1/sub/av_stream";
@Override
public void processRequest(RequestEvent requestEvent) {
Request request = requestEvent.getRequest();
@ -838,11 +846,15 @@ public class SipClientService implements SipListener {
handleInvite(requestEvent);
} else if(Request.ACK.equals(request.getMethod())) {
// RFC 3261: The ACK request does not generate a response.
// ACK 事务内请求不是普通 SIP 方法,不生成任何 Response,ACK 事务内请求不是普通 SIP 方法
//Thread.sleep(30);
// ACK "事务内请求", 不是普通 SIP 方法, 不生成任何 Response
log.info("CALL startRtspToRtp");
//startRtspToRtp();
byte[] raw = request.getRawContent();
if (raw != null) {
String inviteAckContent = new String(raw, StandardCharsets.UTF_8);
log.info("INVITE ACK content: \n{}", inviteAckContent);
}
streamService.startPushStream(rtspInputUrl, rtpStreamHost, rtpStreamPort);
}
else {
Response notImpl = messageFactory.createResponse(Response.NOT_IMPLEMENTED, request);
@ -860,105 +872,6 @@ public class SipClientService implements SipListener {
}
}
private Process ffmpegProcess;
// private void startRtspToRtp() {
// try {
// String cmd =
// "ffmpeg -rtsp_transport tcp " +
// "-i \"rtsp://admin:wd19216811@192.168.1.244:554/h264/ch1/sub/av_stream\" " +
// "-an -vcodec copy " +
// "-f rtp -payload_type 96 " +
// "\"rtp://192.168.1.116:50000?tcp\"";
//
// ProcessBuilder pb = new ProcessBuilder(
// "bash", "-c", cmd
// );
// pb.redirectErrorStream(true);
//
// ffmpegProcess = pb.start();
//
// new Thread(() -> {
// try (BufferedReader br =
// new BufferedReader(new InputStreamReader(
// ffmpegProcess.getInputStream()))) {
// String line;
// while ((line = br.readLine()) != null) {
// System.out.println("[FFmpeg] " + line);
// }
// } catch (IOException ignored) {}
// }).start();
//
// } catch (Exception e) {
// e.printStackTrace();
// }
//
// try {
// MediaSession session = new MediaSession(
// "192.168.1.116",
// 50000,
// 1,
// false,
// "rtsp://admin:wd19216811@192.168.1.244:554/h264/ch1/sub/av_stream"
// );
// } catch (Exception e) {
//
// }
//
//
// }
// private void startRtspToRtp() {
// try {
// MediaSession session = new MediaSession(
// "192.168.1.116",
// 50000,
// 1
// );
//
// session.startRtsp(
// "rtsp://admin:wd19216811@192.168.1.244:554/h264/ch1/sub/av_stream"
// );
//
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
private void startRtspToRtp() {
try {
MediaSessionEx session = new MediaSessionEx(
"192.168.1.116",
50000,
1
);
//session.start();
RtspClient client = new RtspClient(
"rtsp://admin:wd19216811@192.168.1.244:554/h264/ch1/sub/av_stream"
);
// client.start((h264, pts90k) -> {
// try {
// session.onH264Frame(h264, pts90k);
// } catch (Exception e) {
// e.printStackTrace();
// }
// });
client.start((h264, pts90k) -> {
try {
session.onH264Frame(h264, pts90k);
} catch (Exception e) {
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
private void handleInvite(RequestEvent requestEvent) throws Exception {
Request request = requestEvent.getRequest();
log.info("Received INVITE:\n{}", request);
@ -978,6 +891,11 @@ public class SipClientService implements SipListener {
if (raw != null) {
String sdp = new String(raw, StandardCharsets.UTF_8);
log.info("INVITE SDP:\n{}", sdp);
InviteSdpInfo sdpInfo = Gb28181SdpParser.parse(sdp);
log.info("INVITE sdpInfo: {}", sdpInfo);
rtpStreamHost = sdpInfo.ip;
rtpStreamPort = sdpInfo.port;
rtpStreamSSRC = sdpInfo.ssrc;
}
// 构造 200 OK + SDP


+ 36
- 0
src/main/java/com/inspect/tcpserver/sip/utils/Gb28181SdpParser.java View File

@ -0,0 +1,36 @@
package com.inspect.tcpserver.sip.utils;
import com.inspect.tcpserver.sip.media.InviteSdpInfo;
public class Gb28181SdpParser {
public static InviteSdpInfo parse(String sdp) {
InviteSdpInfo info = new InviteSdpInfo();
String[] lines = sdp.split("\r?\n");
for (String line : lines) {
line = line.trim();
// 解析IP
if (line.startsWith("c=IN IP4")) {
// c=IN IP4 192.168.1.116
info.ip = line.substring("c=IN IP4".length()).trim();
}
// 解析端口
if (line.startsWith("m=video")) {
// m=video 30000 RTP/AVP 98
String[] arr = line.split(" ");
info.port = Integer.parseInt(arr[1]);
}
// 解析SSRC
if (line.startsWith("y=")) {
// y=0700000100
info.ssrc = Long.parseLong(line.substring(2));
}
}
return info;
}
}

+ 1
- 1
src/main/resources/application-dev.yml View File

@ -52,7 +52,7 @@ sip:
domain: 192.168.1.116 # 平台SIP服务IP或域名
port: 5060
transport: tcp # 也可以为udp
local-ip: 192.168.1.97 # 本服务的外网或可达IP
local-ip: 192.168.1.3 # 本服务的外网或可达IP
local-port: 5061
expires: 3600

Loading…
Cancel
Save