/*
 * Decompiled with CFR 0.152.
 */
package com.devexperts.qd.qtp;

import com.devexperts.connector.proto.ApplicationConnection;
import com.devexperts.connector.proto.TransportConnection;
import com.devexperts.io.ChunkList;
import com.devexperts.io.ChunkPool;
import com.devexperts.qd.DataRecord;
import com.devexperts.qd.QDLog;
import com.devexperts.qd.qtp.BinaryRecordDesc;
import com.devexperts.qd.qtp.ConnectionQTPComposer;
import com.devexperts.qd.qtp.ConnectionQTPParser;
import com.devexperts.qd.qtp.HeartbeatPayload;
import com.devexperts.qd.qtp.MessageAdapter;
import com.devexperts.qd.qtp.MessageAdapterConnectionFactory;
import com.devexperts.qd.qtp.MessageConnectors;
import com.devexperts.qd.qtp.MessageListener;
import com.devexperts.qd.qtp.MessageProvider;
import com.devexperts.qd.qtp.ProtocolDescriptor;
import com.devexperts.qd.qtp.ProtocolOption;
import com.devexperts.qd.stats.QDStats;
import com.devexperts.qd.util.TimeMarkUtil;
import com.devexperts.util.SystemProperties;

class MessageAdapterConnection
extends ApplicationConnection<MessageAdapterConnectionFactory>
implements MessageListener,
MessageAdapter.CloseListener {
    private static final int BYTES_TO_HEARTBEAT = SystemProperties.getIntProperty(MessageAdapterConnection.class, "bytesToHeartbeat", 32768);
    private static final int DELTA_MARK_UNKNOWN = Integer.MAX_VALUE;
    private final MessageAdapter adapter;
    private final ConnectionQTPComposer composer;
    private final ConnectionQTPParser parser;
    private QDStats stats = QDStats.VOID;
    private volatile long nextHeartbeatTime;
    private volatile long heartbeatDisconnectTime;
    private long heartbeatPeriod;
    private long bytesToNextHeartbeat;
    private HeartbeatPayload heartbeatPayloadOut = new HeartbeatPayload();
    private volatile int lastDeltaMark = Integer.MAX_VALUE;
    private int connectionRttMark;
    private int incomingLagMark;

    MessageAdapterConnection(MessageAdapter adapter, MessageAdapterConnectionFactory factory, TransportConnection transportConnection) {
        super(factory, transportConnection);
        this.adapter = adapter;
        this.composer = new ConnectionQTPComposer(adapter.getScheme(), this);
        this.parser = new ConnectionQTPParser(adapter.getScheme(), this);
        this.parser.setMixedSubscription(adapter.supportsMixedSubscription());
        this.heartbeatPeriod = factory.getInitialHeartbeatPeriod().getTime();
        this.heartbeatDisconnectTime = System.currentTimeMillis() + factory.getHeartbeatTimeout().getTime();
        adapter.setMessageListener(this);
        adapter.setCloseListener(this);
        adapter.useDescribeProtocol();
    }

    ChunkPool getChunkPool() {
        return ((MessageAdapterConnectionFactory)this.factory).getChunkPool();
    }

    @Override
    protected void startImpl() {
        QDStats stats = this.transportConnection.variables().get(MessageConnectors.STATS_KEY);
        if (stats != null) {
            stats.addMBean(QDStats.SType.CONNECTION.getName(), this.adapter);
            this.composer.setStats(stats);
            this.parser.setStats(stats);
            this.stats = stats;
        }
        this.adapter.start();
    }

    @Override
    protected void closeImpl() {
        this.adapter.close();
    }

    @Override
    public long examine(long currentTime) {
        long nextRetrieveTime;
        if (currentTime >= this.heartbeatDisconnectTime) {
            QDLog.log.info(this.adapter + " heartbeat timeout exceeded: disconnecting");
            this.close();
        }
        if (currentTime >= (nextRetrieveTime = Math.min(this.adapter.nextRetrieveTime(currentTime), this.nextHeartbeatTime))) {
            this.notifyChunksAvailable();
            nextRetrieveTime = currentTime + 10L;
        }
        return Math.min(nextRetrieveTime, this.heartbeatDisconnectTime);
    }

    @Override
    public ChunkList retrieveChunks(Object owner) {
        int composingTimeMark = TimeMarkUtil.currentTimeMark();
        this.composer.setComposingTimeMark(composingTimeMark);
        if (this.composer.compose(this.adapter)) {
            this.notifyChunksAvailable();
        }
        long payloadBytes = this.composer.getProcessed();
        long currentTimeMillis = System.currentTimeMillis();
        int currentTimeMark = TimeMarkUtil.currentTimeMark();
        this.composer.addComposingLag(TimeMarkUtil.signedDeltaMark(currentTimeMark - composingTimeMark), this.stats);
        if (currentTimeMillis >= this.nextHeartbeatTime || this.bytesToNextHeartbeat <= payloadBytes) {
            ChunkList chunks = this.composer.getOutput(this);
            this.createOutgoingHeartbeat(currentTimeMillis, currentTimeMark, this.composer.getTotalAverageLagAndClear());
            if (chunks != null) {
                this.composer.writeAllFromChunkList(chunks, this);
            }
        }
        this.bytesToNextHeartbeat -= payloadBytes;
        return this.composer.getOutput(owner);
    }

    @Override
    public boolean processChunks(ChunkList chunks, Object owner) {
        this.parser.addChunks(chunks, owner);
        this.parser.setCurrentTimeMark(this.computeTimeMark(TimeMarkUtil.currentTimeMark()));
        this.parser.parse(this.adapter);
        this.heartbeatDisconnectTime = System.currentTimeMillis() + ((MessageAdapterConnectionFactory)this.factory).getHeartbeatTimeout().getTime();
        return true;
    }

    private void createOutgoingHeartbeat(long currentTimeMillis, int currentTimeMark, int lagMark) {
        this.heartbeatPayloadOut.setTimeMillis(currentTimeMillis);
        this.heartbeatPayloadOut.setTimeMark(currentTimeMark);
        this.heartbeatPayloadOut.setLagMark(lagMark);
        int deltaMark = this.lastDeltaMark;
        if (deltaMark != Integer.MAX_VALUE) {
            this.heartbeatPayloadOut.setDeltaMark(deltaMark);
        }
        this.composer.composeHeartbeatMessage(this.heartbeatPayloadOut);
        this.nextHeartbeatTime = currentTimeMillis + this.heartbeatPeriod;
        this.bytesToNextHeartbeat = BYTES_TO_HEARTBEAT;
        this.heartbeatPeriod = Math.min(this.heartbeatPeriod * 2L, ((MessageAdapterConnectionFactory)this.factory).getHeartbeatPeriod().getTime());
    }

    public BinaryRecordDesc getRequestedRecordDesc(DataRecord record) {
        return this.parser.getRequestedRecordDesc(record);
    }

    int getConnectionRttMark() {
        return this.connectionRttMark;
    }

    int getIncomingLagMark() {
        return this.incomingLagMark;
    }

    void processIncomingDescribeProtocol(ProtocolDescriptor desc) {
        this.composer.setOptSet(ProtocolOption.parseProtocolOptions(desc.getProperty("opt")));
        String version = desc.getProperty("version");
        if (version != null && version.startsWith("QDS-3.")) {
            int n;
            for (n = "QDS-3.".length(); n < version.length() && version.charAt(n) >= '0' && version.charAt(n) <= '9'; ++n) {
            }
            try {
                this.composer.wideDecimalSupported = Long.parseLong(version.substring("QDS-3.".length(), n)) >= 263L;
            }
            catch (NumberFormatException e) {
                this.composer.wideDecimalSupported = true;
            }
        } else {
            this.composer.wideDecimalSupported = true;
        }
    }

    void processIncomingHeartbeat(HeartbeatPayload heartbeatPayloadIn) {
        if (heartbeatPayloadIn == null) {
            return;
        }
        int currentTimeMark = TimeMarkUtil.currentTimeMark();
        if (heartbeatPayloadIn.hasTimeMark()) {
            this.lastDeltaMark = TimeMarkUtil.signedDeltaMark(currentTimeMark - heartbeatPayloadIn.getTimeMark());
            if (heartbeatPayloadIn.hasDeltaMark()) {
                this.connectionRttMark = TimeMarkUtil.signedDeltaMark(this.lastDeltaMark + heartbeatPayloadIn.getDeltaMark());
            }
        }
        this.incomingLagMark = heartbeatPayloadIn.getLagMark() + this.connectionRttMark;
        this.parser.setCurrentTimeMark(this.computeTimeMark(currentTimeMark));
    }

    private int computeTimeMark(int currentTimeMark) {
        int mark = currentTimeMark - this.incomingLagMark & 0x3FFFFFFF;
        if (mark == 0) {
            mark = 1;
        }
        return mark;
    }

    @Override
    public void messagesAvailable(MessageProvider provider) {
        this.notifyChunksAvailable();
    }

    @Override
    public void adapterClosed(MessageAdapter adapter) {
        if (adapter.isMarkedForImmediateRestart()) {
            this.markForImmediateRestart();
        }
        this.close();
    }
}

