package com.inspect.tcpserver.sip.service; import com.inspect.tcpserver.sip.registry.SipEventRegistry; import com.inspect.tcpserver.sip.utils.DigestUtil; import com.inspect.tcpserver.sip.utils.SipXmlEnvelope; import com.inspect.tcpserver.sip.utils.SipXmlParser; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.sip.*; import javax.sip.address.Address; import javax.sip.address.AddressFactory; import javax.sip.address.SipURI; import javax.sip.header.*; import javax.sip.message.MessageFactory; import javax.sip.message.Request; import javax.sip.message.Response; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.UnsupportedEncodingException; import java.net.*; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @Slf4j @Service @ConditionalOnProperty(name = "sip.enable", havingValue = "true") public class SipClientService implements SipListener { private SipFactory sipFactory; private SipStack sipStack; private SipProvider sipProvider; private MessageFactory messageFactory; private HeaderFactory headerFactory; private AddressFactory addressFactory; // 保存上次请求信息,重发用 private String lastDeviceCode; private String lastServerIp; private int lastServerPort; private String lastXmlBody; // 保存 WWW-Authenticate 参数 private String lastNonce; private String lastRealm; private String lastOpaque; private String lastQop; @Value("${sip.username}") private String username; @Value("${sip.password}") private String password; @Value("${sip.domain}") private String domain; @Value("${sip.port}") private int port; @Value("${sip.transport}") private String transport; @Value("${sip.local-ip}") private String localIp; @Value("${sip.local-port}") private int localPort; @Value("${sip.expires}") private int expires; private ClientTransaction lastRegisterTransaction; private Timer refreshTimer = new Timer(true); private String fromTag = Long.toHexString(System.currentTimeMillis()); private AtomicInteger cSeqCounter = new AtomicInteger(1); private static final String[] IP_SERVICES = { "https://api.ipify.org", "https://checkip.amazonaws.com", "https://ifconfig.me/ip", "https://icanhazip.com" }; private static class SubscriptionInfo { String callId; FromHeader from; ToHeader to; int expires; String event; } private final Map subscriptions = new ConcurrentHashMap<>(); /* * B.2 资源上报 */ String resourcesReportTestXml = "\n" + "\n" + " 1234567890\n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + "\n"; /* * B.9.2.4.1 告警事件通知 */ String alarmEventTestXml = "\n" + "\n" + "\n" + "\n"; /* * B.9.2.4.2 状态事件通知 */ String statusEventTestXml = "\n" + "\n" + "\n" + "\n"; String notifyTestXml = "\n" + "\n" + "0\n" + "\n"; @PostConstruct public void init() throws Exception { sipFactory = SipFactory.getInstance(); sipFactory.setPathName("gov.nist"); headerFactory = sipFactory.createHeaderFactory(); addressFactory = sipFactory.createAddressFactory(); messageFactory = sipFactory.createMessageFactory(); // 获取本地实际IP地址(不是0.0.0.0) if (localIp == null || localIp.equals("0.0.0.0")) { try { // 尝试获取公网IP String publicIp = getPublicIp(); if (publicIp != null && !publicIp.isEmpty()) { log.info("Detected public IP: {}", publicIp); localIp = publicIp; } else { // 使用本地IP作为备选 localIp = InetAddress.getLocalHost().getHostAddress(); log.info("Using local IP: {}", localIp); } } catch (Exception e) { log.warn("Failed to get public IP, using local IP", e); localIp = InetAddress.getLocalHost().getHostAddress(); } } log.info("Local IP: " + localIp); Properties properties = new Properties(); properties.setProperty("javax.sip.STACK_NAME", "spring-boot-sip-stack"); properties.setProperty("gov.nist.javax.sip.IP_ADDRESS", localIp); // 新增这些(JAIN-SIP RI 支持的 NAT 参数) properties.setProperty("gov.nist.javax.sip.OUTBOUND_PROXY", domain + ":" + port + ";" + transport); // 可选:强制出站代理 properties.setProperty("gov.nist.javax.sip.AUTOMATIC_NAT_SUPPORT", "true"); // 如果版本支持 properties.setProperty("gov.nist.javax.sip.USE_RPORT_AS_OUTBOUND", "true"); // 使用 rport 作为出站 properties.setProperty("gov.nist.javax.sip.FIX_CONTACT_HEADER", "true"); // 修复 Contact properties.setProperty("gov.nist.javax.sip.DEBUG_LOG", "sipdebug.txt"); properties.setProperty("gov.nist.javax.sip.SERVER_LOG", "sipserverlog.txt"); sipStack = sipFactory.createSipStack(properties); ListeningPoint lp = sipStack.createListeningPoint(localIp, localPort, transport); sipProvider = sipStack.createSipProvider(lp); sipProvider.addSipListener(this); sendRegister(null); } public static String getPublicIp() { for (String service : IP_SERVICES) { try { URL url = new URL(service); try (BufferedReader in = new BufferedReader( new InputStreamReader(url.openStream(), "UTF-8"))) { String ip = in.readLine(); if (ip != null && !ip.trim().isEmpty()) { return ip.trim(); } } } catch (Exception e) { // 尝试下一个服务 continue; } } return null; } // 获取本地IP地址的方法 private String getLocalIpAddress() throws SocketException { Enumeration interfaces = NetworkInterface.getNetworkInterfaces(); while (interfaces.hasMoreElements()) { NetworkInterface iface = interfaces.nextElement(); // 排除回环接口和未启用的接口 if (iface.isLoopback() || !iface.isUp()) { continue; } Enumeration addresses = iface.getInetAddresses(); while (addresses.hasMoreElements()) { InetAddress addr = addresses.nextElement(); // 优先使用IPv4地址 if (addr instanceof Inet4Address && !addr.isLoopbackAddress()) { return addr.getHostAddress(); } } } return null; } public void sendRegister(AuthorizationHeader authHeader) throws Exception { SipURI requestUri = addressFactory.createSipURI(null, domain); requestUri.setTransportParam(transport.toUpperCase()); requestUri.setPort(port); Address fromAddress = addressFactory.createAddress(username, addressFactory.createSipURI(username, domain)); FromHeader fromHeader = headerFactory.createFromHeader(fromAddress, fromTag); Address toAddress = addressFactory.createAddress(username, addressFactory.createSipURI(username, domain)); ToHeader toHeader = headerFactory.createToHeader(toAddress, null); CallIdHeader callId = sipProvider.getNewCallId(); CSeqHeader cSeq = headerFactory.createCSeqHeader(1L, Request.REGISTER); MaxForwardsHeader maxForwards = headerFactory.createMaxForwardsHeader(70); //String natIp = "172.19.1.1"; // 避免硬编码:用环境变量注入 //int natPort = localPort; // 通常保持本地端口,或映射后端口 String natIp = System.getenv("SIP_NAT_IP") != null ? System.getenv("SIP_NAT_IP") : localIp; int natPort = System.getenv("SIP_NAT_PORT") != null ? Integer.parseInt(System.getenv("SIP_NAT_PORT")) : localPort; log.info("SEND_REGISTER NAT_IP: {}, NAT_PORT: {}", natIp, natPort); // 创建带自定义deviceid参数的Contact URI //SipURI contactUri = addressFactory.createSipURI(username, localIp); SipURI contactUri = addressFactory.createSipURI(username, natIp); //contactUri.setPort(localPort); contactUri.setPort(natPort); contactUri.setTransportParam(transport.toUpperCase()); contactUri.setParameter("deviceid", "123456"); Address contactAddress = addressFactory.createAddress(contactUri); ContactHeader contactHeader = headerFactory.createContactHeader(contactAddress); ExpiresHeader expiresHeader = headerFactory.createExpiresHeader(expires); List viaHeaders = new ArrayList<>(); ViaHeader viaHeader = headerFactory.createViaHeader(localIp, localPort, transport.toUpperCase(), null); viaHeader.setRPort(); viaHeaders.add(viaHeader); Request request = messageFactory.createRequest( requestUri, Request.REGISTER, callId, cSeq, fromHeader, toHeader, viaHeaders, maxForwards ); request.addHeader(contactHeader); request.addHeader(expiresHeader); // 添加自定义头,携带设备编号 Header deviceIdHeader = headerFactory.createHeader("X-Device-ID", "123456"); // 设备编号 request.addHeader(deviceIdHeader); if (authHeader != null) { request.addHeader(authHeader); } log.info("Sending REGISTER request:\n" + request); lastRegisterTransaction = sipProvider.getNewClientTransaction(request); lastRegisterTransaction.sendRequest(); } /** * 发送 SUBSCRIBE 请求 * * @param deviceCode SIP 用户名/设备编号 * @param serverIp FreeSWITCH 服务器 IP * @param serverPort FreeSWITCH 端口 * @param event 订阅事件类型,例如 "presence", "message-summary" * @throws Exception */ public void sendSubscribe(String deviceCode, String serverIp, int serverPort, String event) throws Exception { SipURI requestUri = addressFactory.createSipURI(deviceCode, serverIp); requestUri.setPort(serverPort); requestUri.setTransportParam(transport.toUpperCase()); Address fromAddress = addressFactory.createAddress("sip:" + username + "@" + localIp); FromHeader fromHeader = headerFactory.createFromHeader(fromAddress, UUID.randomUUID().toString().substring(0, 8)); Address toAddress = addressFactory.createAddress("sip:" + deviceCode + "@" + serverIp); ToHeader toHeader = headerFactory.createToHeader(toAddress, null); CallIdHeader callId = sipProvider.getNewCallId(); CSeqHeader cSeq = headerFactory.createCSeqHeader(cSeqCounter.getAndIncrement(), Request.SUBSCRIBE); MaxForwardsHeader maxForwards = headerFactory.createMaxForwardsHeader(70); List viaHeaders = new ArrayList<>(); ViaHeader viaHeader = headerFactory.createViaHeader(localIp, localPort, transport, null); viaHeaders.add(viaHeader); Address contactAddr = addressFactory.createAddress("sip:" + username + "@" + localIp + ":" + localPort); ContactHeader contactHeader = headerFactory.createContactHeader(contactAddr); EventHeader eventHeader = headerFactory.createEventHeader(event); ExpiresHeader expiresHeader = headerFactory.createExpiresHeader(expires); Request request = messageFactory.createRequest( requestUri, Request.SUBSCRIBE, callId, cSeq, fromHeader, toHeader, viaHeaders, maxForwards ); request.addHeader(contactHeader); request.addHeader(eventHeader); request.addHeader(expiresHeader); ClientTransaction transaction = sipProvider.getNewClientTransaction(request); transaction.sendRequest(); } /** * 发送带认证头的 SUBSCRIBE */ private void sendSubscribeWithAuth(String deviceCode, String serverIp, int serverPort, String event, AuthorizationHeader authHeader) throws Exception { SipURI requestUri = addressFactory.createSipURI(deviceCode, serverIp); requestUri.setPort(serverPort); requestUri.setTransportParam(transport.toUpperCase()); Address fromAddress = addressFactory.createAddress("sip:" + username + "@" + localIp); FromHeader fromHeader = headerFactory.createFromHeader(fromAddress, UUID.randomUUID().toString().substring(0, 8)); Address toAddress = addressFactory.createAddress("sip:" + deviceCode + "@" + serverIp); ToHeader toHeader = headerFactory.createToHeader(toAddress, null); CallIdHeader callId = sipProvider.getNewCallId(); CSeqHeader cSeq = headerFactory.createCSeqHeader(cSeqCounter.getAndIncrement(), Request.SUBSCRIBE); MaxForwardsHeader maxForwards = headerFactory.createMaxForwardsHeader(70); List viaHeaders = new ArrayList<>(); ViaHeader viaHeader = headerFactory.createViaHeader(localIp, localPort, transport, null); viaHeaders.add(viaHeader); Address contactAddr = addressFactory.createAddress("sip:" + username + "@" + localIp + ":" + localPort); ContactHeader contactHeader = headerFactory.createContactHeader(contactAddr); EventHeader eventHeader = headerFactory.createEventHeader(event); ExpiresHeader expiresHeader = headerFactory.createExpiresHeader(expires); Request request = messageFactory.createRequest( requestUri, Request.SUBSCRIBE, callId, cSeq, fromHeader, toHeader, viaHeaders, maxForwards ); request.addHeader(contactHeader); request.addHeader(eventHeader); request.addHeader(expiresHeader); request.addHeader(authHeader); ClientTransaction transaction = sipProvider.getNewClientTransaction(request); transaction.sendRequest(); } public void sendXmlResource(String targetSipUri, String xml) throws Exception { log.info("Sending XML resource to {}", targetSipUri); SipURI fromUri = addressFactory.createSipURI(username, domain + ":" + port); Address fromAddress = addressFactory.createAddress(fromUri); FromHeader fromHeader = headerFactory.createFromHeader(fromAddress, fromTag); javax.sip.address.URI toUri = addressFactory.createURI(targetSipUri); Address toAddress = addressFactory.createAddress(toUri); ToHeader toHeader = headerFactory.createToHeader(toAddress, null); // 创建空的Via头列表 List viaHeaders = new ArrayList<>(); Request message = messageFactory.createRequest( toUri, Request.MESSAGE, sipProvider.getNewCallId(), headerFactory.createCSeqHeader(1L, Request.MESSAGE), fromHeader, toHeader, viaHeaders, headerFactory.createMaxForwardsHeader(70) ); // 手动创建Via头并添加 ViaHeader viaHeader = headerFactory.createViaHeader(localIp, localPort, "UDP", null); message.addHeader(viaHeader); ContentTypeHeader ct = headerFactory.createContentTypeHeader("Application", "MANSCDP+xml"); message.setContent(xml, ct); ClientTransaction tx = sipProvider.getNewClientTransaction(message); tx.sendRequest(); } @Override public void processResponse(ResponseEvent responseEvent) { Response response = responseEvent.getResponse(); int status = response.getStatusCode(); CSeqHeader cSeqHeader = (CSeqHeader) response.getHeader(CSeqHeader.NAME); if (cSeqHeader == null) return; String method = cSeqHeader.getMethod(); log.info("Received response: {} {}, method: {}", response.getStatusCode(), response.getReasonPhrase(), method); log.info("Response:\n {}", response); try { if (Request.REGISTER.equalsIgnoreCase(method)) { if (status == 401 || status == 407) { log.info("Received 401/407, attempt auth..."); WWWAuthenticateHeader wwwAuth = (WWWAuthenticateHeader) response.getHeader(WWWAuthenticateHeader.NAME); if (wwwAuth == null) { log.error("No WWW-Authenticate header found, cannot authenticate"); return; } String realm = wwwAuth.getRealm(); String nonce = wwwAuth.getNonce(); String qop = wwwAuth.getQop(); String uri = "sip:" + domain + ":" + port; String nc = "00000001"; String cNonce = generateCNonce(); String responseDigest = DigestUtil.computeResponse( username, password, realm, nonce, "REGISTER", uri, nc, cNonce, qop ); AuthorizationHeader authHeader = headerFactory.createAuthorizationHeader("Digest"); authHeader.setUsername(username); authHeader.setRealm(realm); authHeader.setNonce(nonce); authHeader.setURI(addressFactory.createURI(uri)); authHeader.setResponse(responseDigest); if (qop != null) { authHeader.setQop(qop); authHeader.setCNonce(cNonce); authHeader.setNonceCount(Integer.parseInt(nc, 16)); } sendRegister(authHeader); } else if (status >= 200 && status < 300) { log.info("REGISTER success: {}", status); int delay = Math.max(5, expires - 60); refreshTimer.schedule(new TimerTask() { @Override public void run() { try { sendRegister(null); } catch (Exception e) { log.error("Failed to refresh register", e); } } }, delay * 1000L); //sendNotify(username, domain, port, testXml); // 向SIP服务器订阅消息不符合SIP/GB28181标准 //sendSubscribe(username, domain, port, "Push_Resource"); } else { log.warn("REGISTER response: {}", status); } } else if (Request.SUBSCRIBE.equalsIgnoreCase(method)) { if (status == 401 || status == 407) { // TODO: 处理认证,参考 REGISTER 的认证逻辑 log.info("SUBSCRIBE 需要认证,返回码 {}", status); WWWAuthenticateHeader wwwAuth = null; ProxyAuthenticateHeader proxyAuth = null; if (status == 401) { wwwAuth = (WWWAuthenticateHeader) response.getHeader(WWWAuthenticateHeader.NAME); } else if (status == 407) { proxyAuth = (ProxyAuthenticateHeader) response.getHeader(ProxyAuthenticateHeader.NAME); } String realm = null; String nonce = null; String qop = null; if (wwwAuth != null) { realm = wwwAuth.getRealm(); nonce = wwwAuth.getNonce(); qop = wwwAuth.getQop(); } else if (proxyAuth != null) { realm = proxyAuth.getRealm(); nonce = proxyAuth.getNonce(); qop = proxyAuth.getQop(); } else { log.error("No Authenticate header found for SUBSCRIBE"); return; } // 保存上次 SUBSCRIBE 请求参数,供重发使用 ClientTransaction origTransaction = responseEvent.getClientTransaction(); Request origRequest = origTransaction.getRequest(); FromHeader fromHeader = (FromHeader) origRequest.getHeader(FromHeader.NAME); ToHeader toHeader = (ToHeader) origRequest.getHeader(ToHeader.NAME); String deviceCode = ((SipURI) toHeader.getAddress().getURI()).getUser(); String serverIp = ((SipURI) toHeader.getAddress().getURI()).getHost(); int serverPort = ((SipURI) toHeader.getAddress().getURI()).getPort(); serverPort = serverPort < 0 ? 5060 : serverPort; log.info("deviceCode: {}, serverIp: {}, serverPort: {}", deviceCode, serverIp, serverPort); EventHeader eventHeader = (EventHeader) origRequest.getHeader(EventHeader.NAME); String event = eventHeader.getEventType(); String uri = "sip:" + serverIp + ":" + serverPort; String nc = "00000001"; String cNonce = generateCNonce(); String responseDigest = DigestUtil.computeResponse( username, password, realm, nonce, Request.SUBSCRIBE, uri, nc, cNonce, qop ); AuthorizationHeader authHeader = headerFactory.createAuthorizationHeader("Digest"); authHeader.setUsername(username); authHeader.setRealm(realm); authHeader.setNonce(nonce); authHeader.setURI(addressFactory.createURI(uri)); authHeader.setResponse(responseDigest); if (qop != null) { authHeader.setQop(qop); authHeader.setCNonce(cNonce); authHeader.setNonceCount(Integer.parseInt(nc, 16)); } // 重发带认证的 SUBSCRIBE sendSubscribeWithAuth(deviceCode, serverIp, serverPort, event, authHeader); } else if (status >= 200 && status < 300) { log.info("SUBSCRIBE 成功: {}", status); // 成功订阅后,向服务器上报资源 NOTIFY // sendNotify(username, domain, port, resourcesReportTestXml); sendNotify(username, domain, port, notifyTestXml); } else { log.warn("SUBSCRIBE 响应: {}", status); } } else if (Request.MESSAGE.equalsIgnoreCase(method)) { log.info("MESSAGE response: {}", status); } else if (Request.NOTIFY.equalsIgnoreCase(method)) { log.info("NOTIFY response: {}", status); if (status == 401) { WWWAuthenticateHeader wwwAuth = (WWWAuthenticateHeader) response.getHeader(WWWAuthenticateHeader.NAME); if (wwwAuth != null) { lastNonce = wwwAuth.getNonce(); lastRealm = wwwAuth.getRealm(); lastOpaque = wwwAuth.getOpaque(); lastQop = wwwAuth.getQop(); try { resendNotifyWithAuth(); } catch (Exception e) { e.printStackTrace(); } } } } } catch (Exception ex) { log.error("processResponse error", ex); } } private void resendNotifyWithAuth() throws Exception { String method = Request.NOTIFY; String uriStr = lastDeviceCode + "@" + lastServerIp; String nc = "00000001"; String cNonce = md5Hex(Long.toString(System.currentTimeMillis())); String responseDigest = computeResponse( username, lastRealm, password, method, "sip:" + uriStr, lastNonce, nc, cNonce, lastQop != null ? lastQop : "auth" ); AuthorizationHeader authHeader = headerFactory.createAuthorizationHeader("Digest"); authHeader.setUsername(username); authHeader.setRealm(lastRealm); authHeader.setNonce(lastNonce); authHeader.setURI(addressFactory.createURI("sip:" + uriStr)); authHeader.setResponse(responseDigest); authHeader.setAlgorithm("MD5"); authHeader.setCNonce(cNonce); if (lastOpaque != null) authHeader.setOpaque(lastOpaque); authHeader.setQop(lastQop != null ? lastQop : "auth"); authHeader.setNonceCount(1); Request requestWithAuth = createNotifyRequest(lastDeviceCode, lastServerIp, lastServerPort, lastXmlBody, authHeader); ClientTransaction transaction = sipProvider.getNewClientTransaction(requestWithAuth); transaction.sendRequest(); } // 构造NOTIFY请求 private Request createNotifyRequest(String deviceCode, String serverIp, int serverPort, String xmlBody, AuthorizationHeader authHeader) throws Exception { String notifierUser = "290010021201070000"; String localHost = this.localIp; int localPort = this.localPort; SipURI requestUri = addressFactory.createSipURI(deviceCode, serverIp); requestUri.setTransportParam("tcp"); requestUri.setPort(serverPort); Address fromAddress = addressFactory.createAddress("sip:" + username + "@" + localHost); FromHeader fromHeader = headerFactory.createFromHeader(fromAddress, UUID.randomUUID().toString().substring(0, 8)); Address toAddress = addressFactory.createAddress("sip:" + deviceCode + "@" + serverIp); ToHeader toHeader = headerFactory.createToHeader(toAddress, null); CallIdHeader callId = sipProvider.getNewCallId(); CSeqHeader cSeq = headerFactory.createCSeqHeader(cSeqCounter.getAndIncrement(), Request.NOTIFY); MaxForwardsHeader maxForwards = headerFactory.createMaxForwardsHeader(70); List viaHeaders = new ArrayList<>(); ViaHeader viaHeader = headerFactory.createViaHeader(localHost, localPort, transport, null); viaHeaders.add(viaHeader); Address contactAddr = addressFactory.createAddress("sip:" + username + "@" + localHost + ":" + localPort); ContactHeader contactHeader = headerFactory.createContactHeader(contactAddr); // EventHeader eventHeader = headerFactory.createEventHeader("Push_Resource"); EventHeader eventHeader = headerFactory.createEventHeader("Notify_Test"); SubscriptionStateHeader subscriptionStateHeader = headerFactory.createSubscriptionStateHeader(SubscriptionStateHeader.TERMINATED); ContentTypeHeader contentTypeHeader = headerFactory.createContentTypeHeader("application", "xml"); Request request = messageFactory.createRequest( requestUri, Request.NOTIFY, callId, cSeq, fromHeader, toHeader, viaHeaders, maxForwards ); request.addHeader(contactHeader); request.addHeader(eventHeader); request.addHeader(subscriptionStateHeader); request.addHeader(contentTypeHeader); request.setContent(xmlBody, contentTypeHeader); if (authHeader != null) { request.addHeader(authHeader); } return request; } private String generateCNonce() { return Long.toHexString(System.currentTimeMillis() & 0xffffffffL); } @Override public void processRequest(RequestEvent requestEvent) { Request request = requestEvent.getRequest(); String method = request.getMethod(); log.info("Received SIP request: {}", method); try { if (Request.MESSAGE.equalsIgnoreCase(method)) { ContentTypeHeader contentTypeHeader = (ContentTypeHeader) request.getHeader(ContentTypeHeader.NAME); log.info("Processing MESSAGE request from {}, contentType: {}, contentSubType: {}", ((FromHeader) request.getHeader(FromHeader.NAME)).getAddress(), contentTypeHeader != null ? contentTypeHeader.getContentType() : "null", contentTypeHeader != null ? contentTypeHeader.getContentSubType() : "null"); if (contentTypeHeader != null && "application".equalsIgnoreCase(contentTypeHeader.getContentType()) && "xml".equalsIgnoreCase(contentTypeHeader.getContentSubType())) { String xml = new String(request.getRawContent(), StandardCharsets.UTF_8); xml = xml.replace("”", "\"").replace("“", "\""); log.info("MESSAGE body:\n{}", xml); String eventType = SipXmlParser.peekEventType(xml); log.info("RECOGNIZE EventType = {}", eventType); Class itemClass = SipEventRegistry.getItemClass(eventType); if (itemClass == null) { log.warn("UNREGISTERED EventType: {}", eventType); return; } // 200 OK -> SIP Server ServerTransaction st = requestEvent.getServerTransaction(); if (st == null) { st = sipProvider.getNewServerTransaction(request); } Response ok = messageFactory.createResponse(Response.OK, request); st.sendResponse(ok); SipXmlEnvelope envelope = SipXmlParser.parse(xml, itemClass); //handleSipEvent(envelope); } } else if (Request.NOTIFY.equalsIgnoreCase(method)) { ContentTypeHeader ct = (ContentTypeHeader) request.getHeader(ContentTypeHeader.NAME); if (ct != null && "application".equalsIgnoreCase(ct.getContentType()) && "xml".equalsIgnoreCase(ct.getContentSubType())) { String xml = new String(request.getRawContent(), StandardCharsets.UTF_8); log.info("Received NOTIFY XML:\n{}", xml); // 可以解析 SUBSCRIBE 通知内容 String eventType = SipXmlParser.peekEventType(xml); log.info("NOTIFY EventType: {}", eventType); // 回复 200 OK ServerTransaction st = requestEvent.getServerTransaction(); if (st == null) { st = sipProvider.getNewServerTransaction(request); } Response ok = messageFactory.createResponse(Response.OK, request); st.sendResponse(ok); } } else if (Request.SUBSCRIBE.equals(request.getMethod())) { String xml = new String(request.getRawContent(), StandardCharsets.UTF_8); ExpiresHeader expiresHeader = (ExpiresHeader) request.getHeader(ExpiresHeader.NAME); int expires = (expiresHeader != null) ? expiresHeader.getExpires() : 0; log.info("Received SUBSCRIBE expires: {}, XML:\n{}", expires, xml); String eventType = SipXmlParser.peekEventType(xml); log.info("SUBSCRIBE EventType: {}", eventType); ServerTransaction transaction = sipProvider.getNewServerTransaction(request); // 对应协议B.9.1.2 F2 前端系统返回200 OK响应,指示已经接受订阅请求 Response response = messageFactory.createResponse(Response.OK, request); response.setExpires(expiresHeader); transaction.sendResponse(response); if (expires > 0) { saveSubscription(request, expires); // 对应协议B.9.1.2 F3 SIP客户端(前端系统)发送没有消息体的NOTIFY给平台,其中Subscription-State头部字段值为active, 指示订阅关系建立。 sendInitialNotify(request, transaction, "initial"); } else { removeSubscription(request); } } else if (Request.INVITE.equals(request.getMethod())) { handleInvite(requestEvent); } else if(Request.ACK.equals(request.getMethod())) { // RFC 3261: The ACK request does not generate a response. // ACK 是“事务内请求”,不是普通 SIP 方法,不生成任何 Response,ACK 是“事务内请求”,不是普通 SIP 方法 startRtspToRtp(); } else { Response notImpl = messageFactory.createResponse(Response.NOT_IMPLEMENTED, request); requestEvent.getServerTransaction().sendResponse(notImpl); log.info("Method {} not implemented, replied 501", method); } } catch (Exception ex) { log.error("Error processing request: {}", ex.getMessage(), ex); try { Response errorResponse = messageFactory.createResponse(Response.SERVER_INTERNAL_ERROR, request); requestEvent.getServerTransaction().sendResponse(errorResponse); } catch (Exception e) { log.error("Failed to send error response", e); } } } private Process ffmpegProcess; private void startRtspToRtp() { try { String cmd = "ffmpeg -rtsp_transport tcp " + "-i \"rtsp://admin:wd19216811@192.168.1.244:554/h264/ch1/sub/av_stream\" " + "-an -vcodec copy " + "-f rtp -payload_type 96 " + "\"rtp://192.168.1.116:50000?tcp\""; ProcessBuilder pb = new ProcessBuilder( "bash", "-c", cmd ); pb.redirectErrorStream(true); ffmpegProcess = pb.start(); new Thread(() -> { try (BufferedReader br = new BufferedReader(new InputStreamReader( ffmpegProcess.getInputStream()))) { String line; while ((line = br.readLine()) != null) { System.out.println("[FFmpeg] " + line); } } catch (IOException ignored) {} }).start(); } catch (Exception e) { e.printStackTrace(); } } private void handleInvite(RequestEvent requestEvent) throws Exception { Request request = requestEvent.getRequest(); log.info("Received INVITE:\n{}", request); SipProvider provider = (SipProvider) requestEvent.getSource(); ServerTransaction st = requestEvent.getServerTransaction(); if (st == null) { st = provider.getNewServerTransaction(request); } // ① 100 Trying(非常推荐) Response trying = messageFactory.createResponse(Response.TRYING, request); st.sendResponse(trying); // ② 解析对方 SDP(可先只打印) byte[] raw = request.getRawContent(); if (raw != null) { String sdp = new String(raw, StandardCharsets.UTF_8); log.info("INVITE SDP:\n{}", sdp); } // ③ 构造 200 OK + SDP Response ok = messageFactory.createResponse(Response.OK, request); String localSipId = "29001002120107000001"; int localRtpPort = 554; // Contact 必须 Address contactAddress = addressFactory.createAddress( "sip:" + localSipId + "@" + localIp + ":" + localPort ); ContactHeader contactHeader = headerFactory.createContactHeader(contactAddress); ok.addHeader(contactHeader); // Content-Type ContentTypeHeader contentType = headerFactory.createContentTypeHeader("application", "sdp"); // 返回 SDP(示例) String sdp = "v=0\r\n" + "o=" + localSipId + " 0 0 IN IP4 " + localIp + "\r\n" + "s=Play\r\n" + "c=IN IP4 " + localIp + "\r\n" + "t=0 0\r\n" + "m=video " + localRtpPort + " TCP/RTP/AVP 96\r\n" + "a=sendonly\r\n" + "a=setup:active\r\n" + "a=connection:new\r\n" + "a=rtpmap:96 PS/90000\r\n"; ok.setContent(sdp, contentType); // ④ 发送 200 OK st.sendResponse(ok); log.info("INVITE handled: 200 OK sent"); } @Override public void processTimeout(TimeoutEvent timeoutEvent) { log.warn("SIP timeout: {}", timeoutEvent); } @Override public void processIOException(IOExceptionEvent exceptionEvent) { log.error("SIP IO Exception: {}", exceptionEvent); } @Override public void processTransactionTerminated(TransactionTerminatedEvent tte) { } @Override public void processDialogTerminated(DialogTerminatedEvent dte) { } @PreDestroy public void shutdown() { log.info("Shutting down SIP stack..."); refreshTimer.cancel(); if (sipProvider != null) sipProvider.removeSipListener(this); if (sipStack != null) sipStack.stop(); } /** * 发送 SIP NOTIFY 消息 * * @param deviceCode 客户端编码(也是 SIP 用户名) * @param serverIp 服务器 IP * @param serverPort 服务器 平台端口 * @param xmlBody XML 消息体 * @throws Exception */ // 公开调用,发送NOTIFY(首次不带认证) public void sendNotify(String deviceCode, String serverIp, int serverPort, String xmlBody) throws Exception { lastDeviceCode = deviceCode; lastServerIp = serverIp; lastServerPort = serverPort; lastXmlBody = xmlBody; Request request = createNotifyRequest(deviceCode, serverIp, serverPort, xmlBody, null); ClientTransaction transaction = sipProvider.getNewClientTransaction(request); transaction.sendRequest(); } /** * 分片 XML(保证每块 <= maxBytes) */ private List splitXml(String xml, int maxBytes) throws UnsupportedEncodingException { List parts = new ArrayList<>(); byte[] data = xml.getBytes("UTF-8"); int offset = 0; while (offset < data.length) { int len = Math.min(maxBytes, data.length - offset); parts.add(new String(data, offset, len, "UTF-8")); offset += len; } return parts; } // 计算Digest响应值 private String computeResponse(String username, String realm, String password, String method, String uri, String nonce, String nc, String cNonce, String qop) throws Exception { MessageDigest md = MessageDigest.getInstance("MD5"); String ha1 = md5Hex(md, username + ":" + realm + ":" + password); String ha2 = md5Hex(md, method + ":" + uri); return md5Hex(md, ha1 + ":" + nonce + ":" + nc + ":" + cNonce + ":" + qop + ":" + ha2); } private String md5Hex(String data) throws Exception { MessageDigest md = MessageDigest.getInstance("MD5"); return md5Hex(md, data); } private String md5Hex(MessageDigest md, String data) { byte[] digest = md.digest(data.getBytes(StandardCharsets.UTF_8)); StringBuilder sb = new StringBuilder(); for (byte b : digest) { sb.append(String.format("%02x", b & 0xff)); } return sb.toString(); } private void saveSubscription(Request request, int expires) { CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); ToHeader toHeader = (ToHeader) request.getHeader(ToHeader.NAME); FromHeader fromHeader = (FromHeader) request.getHeader(FromHeader.NAME); String key = callIdHeader.getCallId() + "|" + fromHeader.getTag() + "|" + toHeader.getTag(); SubscriptionInfo info = new SubscriptionInfo(); info.callId = callIdHeader.getCallId(); info.from = fromHeader; info.to = toHeader; info.expires = expires; info.event = ((EventHeader) request.getHeader(EventHeader.NAME)).getEventType(); subscriptions.put(key, info); System.out.println("保存订阅: " + key + " 有效期 " + expires + " 秒"); } private void sendInitialNotify(Request subscribeRequest, ServerTransaction st, String bodyText) { try { // 获取必要头字段 CallIdHeader callIdHeader = (CallIdHeader) subscribeRequest.getHeader(CallIdHeader.NAME); CSeqHeader cseqHeader = (CSeqHeader) subscribeRequest.getHeader(CSeqHeader.NAME); FromHeader fromHeader = (FromHeader) subscribeRequest.getHeader(FromHeader.NAME); ToHeader toHeader = (ToHeader) subscribeRequest.getHeader(ToHeader.NAME); EventHeader eventHeader = (EventHeader) subscribeRequest.getHeader(EventHeader.NAME); ContactHeader contactHeader = (ContactHeader) subscribeRequest.getHeader(ContactHeader.NAME); List viaHeaders = new ArrayList<>(); ViaHeader viaHeader = headerFactory.createViaHeader(localIp, localPort, transport.toUpperCase(), null); viaHeaders.add(viaHeader); // 创建 NOTIFY 请求 Request notifyRequest = messageFactory.createRequest( subscribeRequest.getRequestURI(), Request.NOTIFY, callIdHeader, headerFactory.createCSeqHeader(cseqHeader.getSeqNumber() + 1, Request.NOTIFY), fromHeader, toHeader, viaHeaders, headerFactory.createMaxForwardsHeader(70) ); // 加入 Contact notifyRequest.addHeader(contactHeader); // 必须的事件头 notifyRequest.addHeader(eventHeader); // Subscription-State 头 SubscriptionStateHeader ssHeader = headerFactory.createSubscriptionStateHeader("active"); ssHeader.setExpires(3600); notifyRequest.addHeader(ssHeader); // 消息体 // String xmlBody = "\n" + // "\n" + // " " + bodyText + "\n" + // ""; // ContentTypeHeader contentTypeHeader = headerFactory.createContentTypeHeader("application", "xml"); // notifyRequest.setContent(xmlBody, contentTypeHeader); // 发送 ClientTransaction ct = sipProvider.getNewClientTransaction(notifyRequest); ct.sendRequest(); System.out.println("发送初始 NOTIFY: " + callIdHeader.getCallId()); } catch (Exception e) { e.printStackTrace(); } } private void sendSubscribeNotifyToSipServer(Request subscribeRequest) { try { // 获取必要头字段 CallIdHeader callIdHeader = (CallIdHeader) subscribeRequest.getHeader(CallIdHeader.NAME); CSeqHeader cseqHeader = (CSeqHeader) subscribeRequest.getHeader(CSeqHeader.NAME); FromHeader fromHeader = (FromHeader) subscribeRequest.getHeader(FromHeader.NAME); ToHeader toHeader = (ToHeader) subscribeRequest.getHeader(ToHeader.NAME); EventHeader eventHeader = (EventHeader) subscribeRequest.getHeader(EventHeader.NAME); ContactHeader contactHeader = (ContactHeader) subscribeRequest.getHeader(ContactHeader.NAME); List viaHeaders = new ArrayList<>(); ViaHeader viaHeader = headerFactory.createViaHeader(localIp, localPort, transport.toUpperCase(), null); viaHeaders.add(viaHeader); // 创建 NOTIFY 请求 Request notifyRequest = messageFactory.createRequest( subscribeRequest.getRequestURI(), Request.NOTIFY, callIdHeader, headerFactory.createCSeqHeader(cseqHeader.getSeqNumber() + 1, Request.NOTIFY), fromHeader, toHeader, viaHeaders, headerFactory.createMaxForwardsHeader(70) ); // 加入 Contact notifyRequest.addHeader(contactHeader); // 必须的事件头 notifyRequest.addHeader(eventHeader); // Subscription-State 头 SubscriptionStateHeader ssHeader = headerFactory.createSubscriptionStateHeader("active"); ssHeader.setExpires(3600); notifyRequest.addHeader(ssHeader); ContentTypeHeader contentTypeHeader = headerFactory.createContentTypeHeader("application", "xml"); notifyRequest.setContent(null, contentTypeHeader); // 发送 ClientTransaction ct = sipProvider.getNewClientTransaction(notifyRequest); ct.sendRequest(); log.info("sendSubscribeNotifyToSipServer: callId: {}", callIdHeader.getCallId()); } catch (Exception e) { e.printStackTrace(); } } private void removeSubscription(Request request) { CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); ToHeader toHeader = (ToHeader) request.getHeader(ToHeader.NAME); FromHeader fromHeader = (FromHeader) request.getHeader(FromHeader.NAME); String key = callIdHeader.getCallId() + "|" + fromHeader.getTag() + "|" + toHeader.getTag(); subscriptions.remove(key); System.out.println("删除订阅: " + key); } }