/*
 * Decompiled with CFR 0.152.
 */
package com.devexperts.qd.qtp;

import com.devexperts.io.BufferedInput;
import com.devexperts.io.BufferedInputPart;
import com.devexperts.io.Chunk;
import com.devexperts.io.ChunkPool;
import com.devexperts.io.ChunkedInput;
import com.devexperts.qd.DataRecord;
import com.devexperts.qd.DataScheme;
import com.devexperts.qd.QDLog;
import com.devexperts.qd.SymbolCodec;
import com.devexperts.qd.ng.EventFlag;
import com.devexperts.qd.ng.RecordBuffer;
import com.devexperts.qd.ng.RecordCursor;
import com.devexperts.qd.qtp.AbstractQTPParser;
import com.devexperts.qd.qtp.BinaryRecordDesc;
import com.devexperts.qd.qtp.FieldReplacer;
import com.devexperts.qd.qtp.HeartbeatPayload;
import com.devexperts.qd.qtp.MessageConsumer;
import com.devexperts.qd.qtp.MessageConsumerAdapter;
import com.devexperts.qd.qtp.MessageType;
import com.devexperts.qd.qtp.ProtocolDescriptor;
import com.devexperts.qd.qtp.ProtocolOption;
import com.devexperts.util.IndexedSet;
import com.devexperts.util.IndexerFunction;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import java.util.function.Consumer;
import javax.annotation.Nonnull;

public class BinaryQTPParser
extends AbstractQTPParser {
    protected final SymbolCodec.Reader symbolReader;
    private final BufferedInputPart msg = new BufferedInputPart();
    private SymbolCodec.Resolver symbolResolver;
    private ProtocolDescriptor protocolDescriptor;
    private HeartbeatPayload lastHeartbeatPayload;
    private BinaryRecordDesc[] recordMap;
    private Set<String> unknownRecordNames;
    private IndexedSet<Long, PartitionedMessage> partitionedMessages;
    private static final IndexerFunction.LongKey<PartitionedMessage> PARTITIONED_MESSAGE_BY_ID_INDEXER = message -> message.id;
    private static final char[] HEX = "0123456789ABCDEF".toCharArray();
    private static final int DUMP_LAST_RECORDS = 10;

    public BinaryQTPParser(DataScheme scheme) {
        super(scheme);
        this.symbolReader = scheme.getCodec().createReader();
    }

    protected boolean isSchemeKnown() {
        return false;
    }

    @Override
    public void resetSession() {
        if (this.recordMap != null) {
            Arrays.fill(this.recordMap, null);
        }
        if (this.unknownRecordNames != null) {
            this.unknownRecordNames.clear();
        }
        this.protocolDescriptor = null;
        if (this.lastHeartbeatPayload != null) {
            this.lastHeartbeatPayload.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void parseImpl(BufferedInput in, MessageConsumer consumer) throws IOException {
        if (consumer instanceof SymbolCodec.Resolver) {
            this.symbolResolver = (SymbolCodec.Resolver)((Object)consumer);
        }
        try {
            while (in.hasAvailable()) {
                long longMessageLength;
                if (!this.resyncOnParse(in)) {
                    return;
                }
                long messageStartPosition = in.totalPosition();
                this.doBeforeMessageLength(in);
                in.mark();
                try {
                    longMessageLength = in.readCompactLong();
                }
                catch (EOFException e) {
                    in.reset();
                    in.unmark();
                    break;
                }
                if (longMessageLength < 0L || longMessageLength > Integer.MAX_VALUE) {
                    this.dumpParseHeaderErrorReport(in, "Invalid messageLength=" + longMessageLength);
                    if (this.resyncOnCorrupted(in)) continue;
                    this.processPending(consumer);
                    consumer.handleCorruptedStream();
                    break;
                }
                int messageLength = (int)longMessageLength;
                if (!in.hasAvailable(messageLength)) {
                    in.reset();
                    in.unmark();
                    break;
                }
                long messageEndPosition = in.totalPosition() + (long)messageLength;
                try {
                    this.parseMessageBody(in, consumer, messageLength, messageEndPosition);
                }
                catch (CorruptedException e) {
                    if (this.resyncOnCorrupted(in)) continue;
                    this.processPending(consumer);
                    if (e instanceof CorruptedMessageException) {
                        consumer.handleCorruptedMessage(((CorruptedMessageException)e).messageTypeId);
                    }
                    consumer.handleCorruptedStream();
                }
                in.seek(messageEndPosition);
                in.unmark();
                this.stats.updateIOReadBytes(messageEndPosition - messageStartPosition);
            }
        }
        finally {
            this.symbolResolver = null;
        }
    }

    protected boolean resyncOnParse(BufferedInput in) throws IOException {
        return true;
    }

    protected boolean resyncOnCorrupted(BufferedInput in) throws IOException {
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void parseMessageBody(BufferedInput in, MessageConsumer consumer, int messageLength, long messageEndPosition) throws IOException, CorruptedException {
        this.msg.setInput(in, messageLength);
        try {
            this.msg.mark();
            int typeId = this.parseMessageType(this.msg);
            this.doAfterMessageType(this.msg);
            this.parseMessagePayload(this.msg, consumer, typeId, (int)(messageEndPosition - this.msg.totalPosition()));
            this.doAfterMessageBody(this.msg, typeId);
        }
        finally {
            this.msg.resetInput();
        }
    }

    private int parseMessageType(BufferedInput msg) throws IOException, CorruptedException {
        long longTypeId;
        if (!msg.hasAvailable()) {
            return 0;
        }
        try {
            longTypeId = msg.readCompactLong();
        }
        catch (EOFException e) {
            this.dumpParseMessageErrorReport(msg, "Not enough bytes in message to read message typeId", e, -1L);
            throw new CorruptedException();
        }
        if (longTypeId < 0L || longTypeId > Integer.MAX_VALUE) {
            this.dumpParseMessageErrorReport(msg, "Invalid typeId=" + longTypeId, null, -1L);
            throw new CorruptedException();
        }
        return (int)longTypeId;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void parseMessagePayload(BufferedInput msg, MessageConsumer consumer, int typeId, int payloadLength) throws IOException, CorruptedMessageException {
        MessageType messageType = MessageType.findById(typeId);
        try {
            if (messageType != null && messageType.hasRecords()) {
                RecordBuffer buf = this.nextRecordsMessage(consumer, messageType);
                if (!buf.isEmpty() && !buf.hasEstimatedCapacityForBytes(payloadLength)) {
                    this.processPending(consumer);
                    buf = this.nextRecordsMessage(consumer, messageType);
                }
                if (messageType.isData()) {
                    this.parseData(msg, buf);
                    return;
                } else {
                    if (!messageType.isSubscription()) throw new AssertionError((Object)messageType.toString());
                    this.parseSubscription(msg, buf, messageType);
                }
                return;
            } else {
                this.parseOther(msg, consumer, typeId, payloadLength);
            }
            return;
        }
        catch (CorruptedException e) {
            throw new CorruptedMessageException(e, typeId);
        }
    }

    private void parseOther(BufferedInput msg, MessageConsumer consumer, int typeId, int payloadLength) throws IOException, CorruptedException {
        switch (typeId) {
            case 0: {
                this.processPending(consumer);
                this.parseHeartbeat(msg, consumer);
                break;
            }
            case 2: {
                this.parseDescribeRecords(msg);
                break;
            }
            case 1: {
                this.processPending(consumer);
                this.parseDescribeProtocol(msg, consumer);
                break;
            }
            case 4: {
                this.parseMessagePart(msg, consumer, payloadLength);
                break;
            }
            case 3: 
            case 61: {
                break;
            }
            default: {
                if (consumer instanceof MessageConsumerAdapter) {
                    ((MessageConsumerAdapter)consumer).processOtherMessage(typeId, msg, payloadLength);
                    break;
                }
                byte[] bytes = new byte[payloadLength];
                msg.readFully(bytes);
                consumer.processOtherMessage(typeId, bytes, 0, payloadLength);
            }
        }
    }

    private void parseDescribeProtocol(BufferedInput msg, MessageConsumer consumer) throws CorruptedException {
        try {
            this.protocolDescriptor = ProtocolDescriptor.newPeerProtocolDescriptor(this.protocolDescriptor);
            this.protocolDescriptor.parseFrom(msg);
        }
        catch (IOException e) {
            this.dumpParseMessageErrorReport(msg, e.getMessage(), e, -1L);
            throw new CorruptedException(e);
        }
        ProtocolDescriptor desc = this.applyReadAs(this.protocolDescriptor);
        this.onDescribeProtocol(desc);
        consumer.processDescribeProtocol(desc, true);
    }

    void onDescribeProtocol(ProtocolDescriptor desc) {
    }

    private void parseHeartbeat(BufferedInput msg, MessageConsumer consumer) throws CorruptedException {
        try {
            if (!msg.hasAvailable()) {
                return;
            }
            if (this.lastHeartbeatPayload == null) {
                this.lastHeartbeatPayload = new HeartbeatPayload();
            } else {
                this.lastHeartbeatPayload.clear();
            }
            this.lastHeartbeatPayload.parseFrom(msg);
        }
        catch (IOException e) {
            this.dumpParseMessageErrorReport(msg, e.getMessage(), e, -1L);
            throw new CorruptedException(e);
        }
        consumer.processHeartbeat(this.lastHeartbeatPayload);
        this.onHeartbeat(this.lastHeartbeatPayload);
    }

    void onHeartbeat(HeartbeatPayload heartbeatPayload) {
    }

    private void parseDescribeRecords(BufferedInput msg) throws CorruptedException {
        long lastRecPosition = msg.totalPosition();
        try {
            while (msg.hasAvailable()) {
                int id = msg.readCompactInt();
                String recordName = msg.readUTFString();
                int nFld = msg.readCompactInt();
                if (id < 0 || recordName == null || recordName.isEmpty() || nFld < 0) {
                    throw new IOException("Corrupted record information");
                }
                String[] names = new String[nFld];
                int[] types = new int[nFld];
                for (int i = 0; i < nFld; ++i) {
                    String name = msg.readUTFString();
                    int type = msg.readCompactInt();
                    if (name == null || name.isEmpty() || type < 0 || type > 255) {
                        throw new IOException("Corrupted field information for field " + name + ", type " + Integer.toHexString(type) + " in record #" + id + " " + recordName);
                    }
                    names[i] = name;
                    types[i] = type;
                }
                DataRecord record = this.scheme.findRecordByName(recordName);
                if (record == null) {
                    if (this.unknownRecordNames == null) {
                        this.unknownRecordNames = new HashSet<String>();
                    }
                    if (this.unknownRecordNames.add(recordName)) {
                        QDLog.log.info("Record #" + id + " '" + recordName + "' is not found in data scheme. Incoming data and subscription will be skipped.");
                    }
                }
                try {
                    this.remapRecord(id, this.wrapRecordDesc(new BinaryRecordDesc(record, nFld, names, types, this.readEventTimeSequence, 1)));
                }
                catch (BinaryRecordDesc.InvalidDescException e) {
                    QDLog.log.info("Record #" + id + " '" + recordName + "' cannot be parsed: " + e.getMessage());
                }
                lastRecPosition = msg.totalPosition();
            }
        }
        catch (IOException e) {
            this.dumpParseMessageErrorReport(msg, e.getMessage(), e, lastRecPosition);
            throw new CorruptedException(e);
        }
    }

    void updateCursorTimeMark(RecordCursor cursor) {
    }

    void updateMoreIOReadSubRecordStats() {
    }

    void updateMoreIOReadDataRecordStats() {
    }

    private void parseData(BufferedInput msg, RecordBuffer buf) throws CorruptedException {
        this.symbolReader.reset(ProtocolOption.SUPPORTED_SET);
        long lastRecPosition = msg.totalPosition();
        long startBufLimit = buf.getLimit();
        try {
            while (msg.hasAvailable()) {
                this.readSymbol(msg);
                int id = this.readRecordId(msg);
                BinaryRecordDesc rr = this.getOrCreateRecordDesc(id);
                if (rr == null) {
                    throw new IOException("Unknown record #" + id);
                }
                RecordCursor cur = rr.readRecord(msg, buf, this.symbolReader.getCipher(), this.symbolReader.getSymbol(), this.symbolReader.getEventFlags());
                this.setEventTimeSequenceIfNeeded(cur);
                long position = msg.totalPosition();
                if (cur != null) {
                    this.updateCursorTimeMark(cur);
                    this.stats.updateIOReadRecordBytes(cur.getRecord().getId(), position - lastRecPosition);
                    this.stats.updateIOReadDataRecord();
                    this.updateMoreIOReadDataRecordStats();
                }
                lastRecPosition = position;
            }
        }
        catch (IOException | IllegalStateException e) {
            this.dumpParseDataErrorReport(msg, e, buf, startBufLimit, lastRecPosition);
            throw new CorruptedException(e);
        }
    }

    private void parseSubscription(BufferedInput msg, RecordBuffer buf, MessageType messageType) throws CorruptedException {
        this.symbolReader.reset(ProtocolOption.SUPPORTED_SET);
        long lastRecPosition = msg.totalPosition();
        long startBufLimit = buf.getLimit();
        boolean historySubscriptionAdd = messageType.isHistorySubscriptionAdd();
        try {
            while (msg.hasAvailable()) {
                this.readSymbol(msg);
                int id = this.readRecordId(msg);
                BinaryRecordDesc rr = this.getOrCreateRecordDesc(id);
                if (rr == null) {
                    throw new IOException("Unknown record #" + id);
                }
                long time = 0L;
                if (historySubscriptionAdd) {
                    time = this.readSubscriptionTime(msg);
                }
                long position = msg.totalPosition();
                DataRecord record = rr.getRecord();
                if (record != null) {
                    RecordCursor cur = buf.add(record, this.symbolReader.getCipher(), this.symbolReader.getSymbol());
                    this.setEventTimeSequenceIfNeeded(cur);
                    cur.setEventFlags(this.symbolReader.getEventFlags());
                    cur.setTime(time);
                    cur.setEventFlags(messageType.isSubscriptionRemove() ? EventFlag.REMOVE_SYMBOL.flag() : 0);
                    this.stats.updateIOReadRecordBytes(record.getId(), position - lastRecPosition);
                    this.stats.updateIOReadSubRecord();
                    this.updateMoreIOReadSubRecordStats();
                }
                lastRecPosition = position;
            }
        }
        catch (IOException | IllegalStateException e) {
            this.dumpParseSubscriptionErrorReport(msg, e, buf, startBufLimit, historySubscriptionAdd, lastRecPosition);
            throw new CorruptedException(e);
        }
    }

    protected void readSymbol(BufferedInput msg) throws IOException {
        this.symbolReader.readSymbol(msg, this.symbolResolver);
    }

    protected int readRecordId(BufferedInput msg) throws IOException {
        return msg.readCompactInt();
    }

    protected BinaryRecordDesc wrapRecordDesc(BinaryRecordDesc desc) {
        if (this.fieldReplacers == null || this.fieldReplacers.isEmpty()) {
            return desc;
        }
        ArrayList<Consumer<RecordCursor>> consumers = new ArrayList<Consumer<RecordCursor>>();
        for (FieldReplacer fieldReplacer : this.fieldReplacers) {
            Consumer<RecordCursor> replacer = fieldReplacer.createFieldReplacer(desc.getRecord());
            if (replacer == null) continue;
            consumers.add(replacer);
        }
        if (consumers.isEmpty()) {
            return desc;
        }
        final Consumer[] consumersArray = consumers.toArray(new Consumer[consumers.size()]);
        return new BinaryRecordDesc(desc){

            @Override
            protected void readFields(BufferedInput msg, RecordCursor cur, int nDesc) throws IOException {
                super.readFields(msg, cur, nDesc);
                for (Consumer replacer : consumersArray) {
                    replacer.accept(cur);
                }
            }
        };
    }

    protected long readSubscriptionTime(BufferedInput msg) throws IOException {
        return msg.readCompactLong();
    }

    @Nonnull
    protected BinaryRecordDesc[] newRecordMap(BinaryRecordDesc[] recordMap, int id) {
        int len = recordMap == null ? 0 : recordMap.length;
        int newLen = Math.max(Math.max(10, id + 1), len * 3 / 2);
        BinaryRecordDesc[] newRecordMap = new BinaryRecordDesc[newLen];
        if (recordMap != null) {
            System.arraycopy(recordMap, 0, newRecordMap, 0, len);
        }
        return newRecordMap;
    }

    protected void remapRecord(int id, BinaryRecordDesc rr) {
        if (this.recordMap == null || id >= this.recordMap.length) {
            this.recordMap = this.newRecordMap(this.recordMap, id);
        }
        this.recordMap[id] = rr;
    }

    private BinaryRecordDesc getRecordDesc(int id) {
        BinaryRecordDesc[] recordMap = this.recordMap;
        return recordMap != null && id >= 0 && id < recordMap.length ? recordMap[id] : null;
    }

    private BinaryRecordDesc getOrCreateRecordDesc(int id) {
        BinaryRecordDesc rr = this.getRecordDesc(id);
        if (rr != null) {
            return rr;
        }
        if (!this.isSchemeKnown()) {
            return null;
        }
        if (id >= 0 && id < this.scheme.getRecordCount()) {
            DataRecord record = this.scheme.getRecord(id);
            try {
                rr = this.wrapRecordDesc(new BinaryRecordDesc(record, false, 1, true));
                this.remapRecord(id, rr);
                return rr;
            }
            catch (BinaryRecordDesc.InvalidDescException e) {
                QDLog.log.info("Record #" + id + " '" + record.getName() + "' cannot be parsed: " + e.getMessage());
            }
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void parseMessagePart(BufferedInput msg, MessageConsumer consumer, int payloadLength) throws IOException, CorruptedException {
        int chunkReadLen;
        int readLen;
        PartitionedMessage partitionedMessage;
        try {
            long idPosition = msg.totalPosition();
            long id = msg.readCompactLong();
            int partLength = payloadLength - (int)(msg.totalPosition() - idPosition);
            if (this.partitionedMessages == null) {
                this.partitionedMessages = IndexedSet.createLong(PARTITIONED_MESSAGE_BY_ID_INDEXER);
            }
            if ((partitionedMessage = this.partitionedMessages.getByKey(id)) == null) {
                if (partLength == 0) {
                    return;
                }
                long lengthPosition = msg.totalPosition();
                long totalLength = msg.readCompactLong();
                if (totalLength < 0L || totalLength > Integer.MAX_VALUE) {
                    this.dumpParseMessageErrorReport(msg, "Invalid totalLength=" + totalLength, null, -1L);
                    throw new CorruptedException();
                }
                int lengthLen = (int)(msg.totalPosition() - lengthPosition);
                int typeId = this.parseMessageType(msg);
                if (typeId < 0) {
                    return;
                }
                if (typeId == 4) {
                    this.dumpParseMessageErrorReport(msg, "Cannot have MESSAGE_PART inside MESSAGE_PART", null, -1L);
                    throw new CorruptedException();
                }
                msg.seek(lengthPosition);
                partitionedMessage = new PartitionedMessage(id, totalLength + (long)lengthLen);
                this.partitionedMessages.add(partitionedMessage);
            }
            if (partLength == 0) {
                this.partitionedMessages.removeKey(id);
                return;
            }
        }
        catch (EOFException e) {
            this.dumpParseMessageErrorReport(msg, e.getMessage(), e, -1L);
            throw new CorruptedException(e);
        }
        partitionedMessage.remaining -= (long)readLen;
        for (readLen = (int)Math.min((long)partLength, partitionedMessage.remaining); readLen > 0; readLen -= chunkReadLen) {
            Chunk chunk = ChunkPool.DEFAULT.getChunk(this);
            chunkReadLen = Math.min(chunk.getLength(), readLen);
            msg.readFully(chunk.getBytes(), chunk.getOffset(), chunkReadLen);
            chunk.setLength(chunkReadLen, this);
            partitionedMessage.in.addToInput(chunk, this);
        }
        if (partitionedMessage.remaining <= 0L) {
            try {
                this.parseImpl(partitionedMessage.in, consumer);
            }
            finally {
                this.partitionedMessages.remove(partitionedMessage);
                partitionedMessage.in.clear();
            }
        }
    }

    private void dumpParseHeaderErrorReport(BufferedInput in, String message) {
        StringBuilder sb = new StringBuilder();
        sb.append("Corrupted QTP byte stream: ").append(message);
        long lastPosition = in.totalPosition();
        in.reset();
        in.mark();
        int cnt = (int)(lastPosition - in.totalPosition());
        this.appendBytes(sb, in, cnt, -1L);
        QDLog.log.error(sb.toString());
    }

    private void dumpParseMessageErrorReport(BufferedInput msg, String message, Exception e, long lastRecPosition) {
        StringBuilder sb = new StringBuilder();
        BinaryQTPParser.appendMessageErrorHead(sb, message);
        this.appendMessageErrorTail(msg, sb, lastRecPosition);
        QDLog.log.error(sb.toString(), e);
    }

    private void dumpParseDataErrorReport(BufferedInput msg, Exception e, RecordBuffer buf, long startBufLimit, long lastRecPosition) {
        RecordCursor cur;
        StringBuilder sb = new StringBuilder();
        BinaryQTPParser.appendMessageErrorHead(sb, e.getMessage());
        sb.append("\n++> === Last parsed data ===");
        RecordBuffer rb = this.prepareToDumpLastRecords(buf);
        while ((cur = rb.next()) != null) {
            int i;
            sb.append("\n++> ");
            DataRecord record = cur.getRecord();
            sb.append(record.getName());
            String symbol = this.scheme.getCodec().decode(cur.getCipher(), cur.getSymbol());
            sb.append('\t').append(symbol);
            for (i = 0; i < record.getIntFieldCount(); ++i) {
                try {
                    sb.append('\t').append(record.getIntField(i).getString(cur));
                    continue;
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
            }
            for (i = 0; i < record.getObjFieldCount(); ++i) {
                try {
                    sb.append('\t').append(record.getObjField(i).getString(cur));
                    continue;
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
            }
        }
        this.appendLastSymbol(sb);
        this.appendMessageErrorTail(msg, sb, lastRecPosition);
        QDLog.log.error(sb.toString(), e);
        this.recoverBuffer(buf, startBufLimit);
    }

    private void dumpParseSubscriptionErrorReport(BufferedInput msg, Exception e, RecordBuffer buf, long startBufLimit, boolean history, long lastRecPosition) {
        RecordCursor cur;
        StringBuilder sb = new StringBuilder();
        BinaryQTPParser.appendMessageErrorHead(sb, e.getMessage());
        sb.append("\n++> === Last parsed subscription ===");
        RecordBuffer rb = this.prepareToDumpLastRecords(buf);
        while ((cur = rb.next()) != null) {
            sb.append("\n++> ");
            sb.append(cur.getRecord().getName());
            sb.append('\t').append(cur.getDecodedSymbol());
            if (!history) continue;
            sb.append('\t').append(cur.getTime());
        }
        this.appendLastSymbol(sb);
        this.appendMessageErrorTail(msg, sb, lastRecPosition);
        QDLog.log.error(sb.toString(), e);
        this.recoverBuffer(buf, startBufLimit);
    }

    private static void appendMessageErrorHead(StringBuilder sb, String message) {
        sb.append("Corrupted QTP message: ").append(message);
    }

    private void appendLastSymbol(StringBuilder sb) {
        sb.append("\n++> Last symbol: (").append(this.symbolReader.getCipher());
        sb.append(", ").append(this.symbolReader.getSymbol());
        sb.append(") = ").append(this.scheme.getCodec().decode(this.symbolReader.getCipher(), this.symbolReader.getSymbol()));
    }

    private void appendMessageErrorTail(BufferedInput msg, StringBuilder sb, long lastRecPosition) {
        try {
            long curPosition = msg.totalPosition();
            msg.reset();
            int parsedBytes = (int)(curPosition - msg.totalPosition());
            int skipBytes = Math.max(0, parsedBytes - 160);
            msg.skip(skipBytes);
            int cnt = parsedBytes - skipBytes;
            this.appendBytes(sb, msg, cnt, lastRecPosition);
        }
        catch (IOException e) {
            throw new AssertionError((Object)e);
        }
    }

    private void appendBytes(StringBuilder sb, BufferedInput in, int cnt, long lastRecPosition) {
        try {
            sb.append("\n++> === Last parsed bytes ===");
            if (lastRecPosition >= 0L) {
                sb.append("\n++> '|' shows where to last record was successfully parsed");
            }
            sb.append("\n++> '!' shows where to last byte was read");
            long startShowPosition = in.totalPosition();
            int wantToShowCnt = cnt + 32 - cnt % 16;
            int canShowCnt = Math.min(wantToShowCnt, in.available());
            char[] asciiBuf = new char[16];
            for (int i = 0; i < wantToShowCnt; ++i) {
                int b;
                if ((i & 0xF) == 0) {
                    sb.append(String.format(Locale.US, "\n++> 0x%08x: ", startShowPosition + (long)i));
                }
                int n = b = i < canShowCnt ? in.read() : -1;
                sb.append((char)(i == cnt ? 33 : (startShowPosition + (long)i == lastRecPosition ? 124 : 32)));
                if (b < 0) {
                    sb.append("  ");
                } else {
                    sb.append(HEX[b >> 4 & 0xF]).append(HEX[b & 0xF]);
                }
                int n2 = b < 0 ? 32 : (asciiBuf[i & 0xF] = b >= 32 ? (int)b : 46);
                if ((i & 0xF) != 15) continue;
                sb.append("   ").append(asciiBuf);
            }
            sb.append("\n++> === END ===");
        }
        catch (IOException e) {
            throw new AssertionError((Object)e);
        }
    }

    private RecordBuffer prepareToDumpLastRecords(RecordBuffer buf) {
        for (int i = 0; i < buf.size() - 10; ++i) {
            buf.next();
        }
        return buf;
    }

    private void recoverBuffer(RecordBuffer buf, long startBufLimit) {
        buf.rewind();
        buf.setLimit(startBufLimit);
    }

    protected void doBeforeMessageLength(BufferedInput in) {
    }

    protected void doAfterMessageType(BufferedInput in) {
    }

    protected void doAfterMessageBody(BufferedInput in, int messageType) {
    }

    protected static class CorruptedMessageException
    extends CorruptedException {
        private static final long serialVersionUID = 0L;
        protected final int messageTypeId;

        public CorruptedMessageException(Throwable cause, int messageTypeId) {
            super(cause);
            this.messageTypeId = messageTypeId;
        }
    }

    protected static class CorruptedException
    extends Exception {
        private static final long serialVersionUID = 0L;

        public CorruptedException() {
        }

        public CorruptedException(Throwable cause) {
            super(cause);
        }
    }

    private static class PartitionedMessage {
        final long id;
        final long totalLength;
        final ChunkedInput in;
        long remaining;

        PartitionedMessage(long id, long totalLength) {
            this.id = id;
            this.totalLength = totalLength;
            this.in = new ChunkedInput();
            this.remaining = totalLength;
        }
    }
}

