Browse Source

/*完成ffmpeg推流到zlmedia流媒体服务器功能。*/

master
htjcAdmin 4 weeks ago
parent
commit
1fbd323781
3 changed files with 161 additions and 3 deletions
  1. +83
    -0
      src/main/java/com/inspect/tcpserver/sip/media/Gb28181RtpStreamer.java
  2. +71
    -3
      src/main/java/com/inspect/tcpserver/sip/media/Gb28181StreamService.java
  3. +7
    -0
      src/main/java/com/inspect/tcpserver/sip/service/SipClientService.java

+ 83
- 0
src/main/java/com/inspect/tcpserver/sip/media/Gb28181RtpStreamer.java View File

@ -0,0 +1,83 @@
package com.inspect.tcpserver.sip.media;
import lombok.extern.slf4j.Slf4j;
import org.bytedeco.javacv.*;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class Gb28181RtpStreamer {
private volatile boolean running = false;
private Thread worker;
public void start(String rtspUrl, String zlmIp, int zlmPort) {
if (running) return;
running = true;
worker = new Thread(() -> pushLoop(rtspUrl, zlmIp, zlmPort));
worker.start();
}
public void stop() {
running = false;
}
private void pushLoop(String rtspUrl, String ip, int port) {
while (running) {
try {
push(rtspUrl, ip, port);
} catch (Exception e) {
log.error("推流断开,5秒重连", e);
sleep(5000);
}
}
}
private void push(String rtspUrl, String ip, int port) throws Exception {
String outputUrl = "rtp://" + ip + ":" + port +
"?pkt_size=1400&localrtpport=50000";
log.info("开始推流 -> {}", outputUrl);
// RTSP
FFmpegFrameGrabber grabber = new FFmpegFrameGrabber(rtspUrl);
grabber.setOption("rtsp_transport", "tcp");
grabber.setOption("stimeout", "5000000");
grabber.start();
// 推RTP(MPEGTS)
FFmpegFrameRecorder recorder =
new FFmpegFrameRecorder(outputUrl,
grabber.getImageWidth(),
grabber.getImageHeight());
// 关键配置等价ffmpeg命令
recorder.setFormat("rtp_mpegts"); // 核心
recorder.setVideoCodec(grabber.getVideoCodec());
recorder.setFrameRate(grabber.getFrameRate());
recorder.setVideoBitrate(grabber.getVideoBitrate());
// AnnexB + SPS PPS
recorder.setVideoOption("bsf:v", "h264_mp4toannexb");
recorder.setOption("mpegts_flags", "resend_headers");
recorder.start();
Frame frame;
while (running && (frame = grabber.grab()) != null) {
recorder.record(frame);
}
recorder.stop();
grabber.stop();
}
private void sleep(long ms) {
try { Thread.sleep(ms); } catch (Exception ignored) {}
}
}

+ 71
- 3
src/main/java/com/inspect/tcpserver/sip/media/Gb28181StreamService.java View File

@ -2,9 +2,7 @@ package com.inspect.tcpserver.sip.media;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.bytedeco.ffmpeg.global.avcodec; import org.bytedeco.ffmpeg.global.avcodec;
import org.bytedeco.javacv.FFmpegFrameGrabber;
import org.bytedeco.javacv.FFmpegFrameRecorder;
import org.bytedeco.javacv.Frame;
import org.bytedeco.javacv.*;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@Slf4j @Slf4j
@ -14,9 +12,79 @@ public class Gb28181StreamService {
private Thread pushThread; private Thread pushThread;
private volatile boolean isPushing = false; private volatile boolean isPushing = false;
private volatile boolean running = false;
private final String rtspInputUrl = "rtsp://admin:wd19216811@192.168.1.244:554/h264/ch1/sub/av_stream"; private final String rtspInputUrl = "rtsp://admin:wd19216811@192.168.1.244:554/h264/ch1/sub/av_stream";
private final String rtpOutputUrl= "rtp://192.168.1.116:30000"; private final String rtpOutputUrl= "rtp://192.168.1.116:30000";
public void startPush() {
if (running) {
log.warn("推流已在运行");
return;
}
running = true;
pushThread = new Thread(() -> {
FFmpegFrameGrabber grabber = null;
FFmpegFrameRecorder recorder = null;
try {
// ================= 第一步拉取 RTSP =================
grabber = new FFmpegFrameGrabber("rtsp://admin:wd19216811@192.168.1.244:554/h264/ch1/sub/av_stream");
grabber.setOption("rtsp_transport", "tcp"); // 强制 TCP 拉流
grabber.setOption("fflags", "nobuffer"); // 降低延迟
grabber.setOption("flags", "low_delay");
grabber.setOption("analyzeduration", "2000000"); // 分析时长
grabber.setOption("probesize", "2000000");
grabber.start();
// ================= 第二步配置 RTP_MPEGTS 输出 =================
String outputUrl = "rtp://192.168.1.116:30000?pkt_size=1400&localrtpport=50000";
recorder = new FFmpegFrameRecorder(outputUrl, grabber.getImageWidth(), grabber.getImageHeight(), 0); // 0 = 无音频通道
recorder.setFormat("rtp_mpegts"); // 核心rtp_mpegts 封装
recorder.setVideoCodec(avcodec.AV_CODEC_ID_H264); // H264
recorder.setVideoCodecName("copy"); // -c:v copy尽量不重新编码
recorder.setVideoBitrate(grabber.getVideoBitrate()); // 继承源码率
recorder.setFrameRate(grabber.getFrameRate()); // 继承帧率
// 模拟 -bsf:v h264_mp4toannexbJavaCV 会自动处理 Annex-B 输出
recorder.setVideoOption("bsf:v", "h264_mp4toannexb");
// 模拟 -mpegts_flags resend_headers重复发送 TS
//recorder.setFormatOption("mpegts_flags", "resend_headers");
recorder.setFormat("rtp_mpegts");
recorder.setVideoCodecName("copy"); // -c:v copy
// pkt_size localrtpport 通过输出 URL 参数已设置
recorder.start();
log.info("GB28181 PS over RTP 推流启动:{} -> {}", grabber.getFormat(), outputUrl);
Frame frame;
while (running && (frame = grabber.grabFrame()) != null) {
recorder.record(frame);
}
} catch (FrameGrabber.Exception | FrameRecorder.Exception e) {
log.error("推流异常", e);
} finally {
try {
if (recorder != null) recorder.stop();
if (grabber != null) grabber.stop();
} catch (Exception e) {
log.error("关闭资源异常", e);
}
running = false;
log.info("推流线程结束");
}
}, "rtp-push-thread");
pushThread.start();
}
public void startPushStream() { public void startPushStream() {
if (isPushing) { if (isPushing) {
log.warn("Push is already running."); log.warn("Push is already running.");


+ 7
- 0
src/main/java/com/inspect/tcpserver/sip/service/SipClientService.java View File

@ -1,5 +1,6 @@
package com.inspect.tcpserver.sip.service; package com.inspect.tcpserver.sip.service;
import com.inspect.tcpserver.sip.media.Gb28181RtpStreamer;
import com.inspect.tcpserver.sip.media.Gb28181StreamService; import com.inspect.tcpserver.sip.media.Gb28181StreamService;
import com.inspect.tcpserver.sip.media.InviteSdpInfo; import com.inspect.tcpserver.sip.media.InviteSdpInfo;
import com.inspect.tcpserver.sip.registry.SipEventRegistry; import com.inspect.tcpserver.sip.registry.SipEventRegistry;
@ -98,6 +99,9 @@ public class SipClientService implements SipListener {
@Autowired @Autowired
Gb28181StreamService streamService; Gb28181StreamService streamService;
@Autowired
Gb28181RtpStreamer gb28181RtpStreamer;
private static final String[] IP_SERVICES = { private static final String[] IP_SERVICES = {
"https://api.ipify.org", "https://api.ipify.org",
"https://checkip.amazonaws.com", "https://checkip.amazonaws.com",
@ -855,6 +859,8 @@ public class SipClientService implements SipListener {
} }
streamService.startPushStream(rtspInputUrl, rtpStreamHost, rtpStreamPort); streamService.startPushStream(rtspInputUrl, rtpStreamHost, rtpStreamPort);
//streamService.startPush();//PS OVER RTP
//gb28181RtpStreamer.start(rtspInputUrl, rtpStreamHost, rtpStreamPort);
} }
else { else {
Response notImpl = messageFactory.createResponse(Response.NOT_IMPLEMENTED, request); Response notImpl = messageFactory.createResponse(Response.NOT_IMPLEMENTED, request);
@ -902,6 +908,7 @@ public class SipClientService implements SipListener {
Response ok = messageFactory.createResponse(Response.OK, request); Response ok = messageFactory.createResponse(Response.OK, request);
String localSipId = "29001002120107000001"; String localSipId = "29001002120107000001";
// 这个端口几乎没有任何作用可以改成任何值只是为了协议的完整性因为这里的TCP是设备主动连接平台的不需要平台通过这个端口反过来连接设备
int localRtpPort = 554; int localRtpPort = 554;
// Contact 必须 // Contact 必须
Address contactAddress = addressFactory.createAddress( Address contactAddress = addressFactory.createAddress(


Loading…
Cancel
Save