package com.inspect.tcpserver.tcp; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import java.util.List; @Slf4j public class MyDecoder extends ByteToMessageDecoder { private final int BASE_LENGTH = 2 + 8 + 8 + 1 + 4 + 2; private Integer printRecvData = 0; public MyDecoder(Integer printRecvData) { this.printRecvData = printRecvData; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { try { // 获取当前可读字节数 int length = in.readableBytes(); if (length < BASE_LENGTH) { // log.error("not enough readableBytes: {}", length); return; } final String uuid = RandomStringUtils.randomAlphanumeric(16); if(printRecvData > 0) { ByteBuf forPrint = in.copy(); log.info("###### 会话:{}, 客户:{}, 上行原始报文 ######\n {}", uuid, ctx.channel().id().asShortText(), ByteBufUtil.hexDump(forPrint)); forPrint.release(); } while (in.readableBytes() >= 2) { // 标记当前位置 in.markReaderIndex(); byte[] start = new byte[2]; in.readBytes(start); if (start[0] == -21 && start[1] == -112) { // 检查是否有足够数据读取固定头部 if (in.readableBytes() < 21) { // 8+8+1+4 = 21字节 in.resetReaderIndex(); return; // 数据不足,等待下次 } long sendIndex = in.readLongLE(); long receiveIndex = in.readLongLE(); byte sourceFlag = in.readByte(); int xmlLength = in.readIntLE(); // 检查XML长度是否合理 if (xmlLength < 0 || xmlLength > 1024 * 1024) { log.warn("XML长度异常: {}", xmlLength); in.resetReaderIndex(); in.skipBytes(2); // 跳过错误的起始标志 continue; } // 检查是否有完整的XML数据 + 结束标志 if (in.readableBytes() < xmlLength + 2) { in.resetReaderIndex(); return; // 数据不完整,等待更多数据 } byte[] payload = new byte[xmlLength]; in.readBytes(payload); // 检查结束标志 if (in.readableBytes() < 2) { in.resetReaderIndex(); return; } byte[] end = new byte[2]; in.readBytes(end); if (end[0] != -21 || end[1] != -112) { // 结束标志错误,回退并跳过这个包 log.warn("报文结束标志不正确"); in.resetReaderIndex(); in.skipBytes(2); continue; } // 成功解析一个完整包 BinaryModel binaryModel = new BinaryModel(); binaryModel.receiveIndex = receiveIndex; binaryModel.sendIndex = sendIndex; binaryModel.sourceFlag = sourceFlag; binaryModel.dataLength = xmlLength; binaryModel.dataBuf = Unpooled.copiedBuffer(payload); binaryModel.uuid = uuid; binaryModel.id = ctx.channel().id().asShortText(); out.add(binaryModel); } else { // 不是起始标志,回退并跳过一个字节 in.resetReaderIndex(); in.skipBytes(1); } } } catch (Exception e) { log.error("解码发生异常", e); // 异常时,如果有未处理数据,跳过所有数据,避免无限循环 if (in.readableBytes() > 0) { in.skipBytes(in.readableBytes()); } } } }