package com.inspect.tcpserver.tcp; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import org.apache.commons.lang3.RandomStringUtils; import org.apache.tomcat.util.buf.HexUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; public class MyDecoder extends ByteToMessageDecoder { private static final Logger log = LoggerFactory.getLogger(MyDecoder.class); private final String PACKET_FLAG = "EB90"; private final int BASE_LENGTH = 2 + 8 + 8 + 1 + 4 + 2; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { if (in.readableBytes() >= BASE_LENGTH) { try { //skip if (in.readableBytes() > 512 * 1024) { in.skipBytes(in.readableBytes()); } final String uuid = RandomStringUtils.randomAlphanumeric(32); ByteBuf forPrint = in.copy(); log.info("################ 会话: {}, 客户: {}, 上行原始报文 ################ \n {}", uuid, ctx.channel().id().asShortText(), ByteBufUtil.hexDump(forPrint)); int index; String flag; while (true) { index = in.readerIndex(); in.markReaderIndex(); byte[] dst = new byte[2]; in.readBytes(dst, 0, 2); flag = ByteUtils.byte2Hex(dst); if (PACKET_FLAG.equalsIgnoreCase(flag)) { break; } in.resetReaderIndex(); if (in.readableBytes() < BASE_LENGTH) { return; } } long sendIndex = in.readLongLE(); long receiveIndex = in.readLongLE(); byte sourceFlag = in.readByte(); int xmlLength = in.readIntLE(); if (in.readableBytes() < xmlLength) { in.readerIndex(index); return; } byte[] payload = new byte[xmlLength]; in.readBytes(payload); in.readShortLE(); BinaryModel binaryModel = new BinaryModel(); binaryModel.receiveIndex = receiveIndex; binaryModel.sendIndex = sendIndex; binaryModel.sourceFlag = sourceFlag; binaryModel.dataLength = xmlLength; binaryModel.dataBuf = Unpooled.copiedBuffer(payload); binaryModel.uuid = uuid; out.add(binaryModel); } catch (Exception e) { e.printStackTrace(); } } } }