/*
 * Decompiled with CFR 0.152.
 */
package com.ddfplus.net;

import com.ddfplus.api.ConnectionEvent;
import com.ddfplus.api.ConnectionEventType;
import com.ddfplus.net.Cmd;
import com.ddfplus.net.Connection;
import com.ddfplus.net.SymbolProvider;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class IoChannel
extends Thread {
    protected static final long CONNECTION_TIMEOUT_SEC = 60L;
    protected static final int RECONNECTION_INTERVAL_SEC = 3;
    private static final int CMD_INITIAL_DELAY_MS = 100;
    private static final long CMD_TIMEOUT_MS = 250L;
    private static volatile int s_InstanceId = 0;
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    protected final Connection connection;
    protected boolean isRunning = false;
    protected ConnectionState connState = ConnectionState.NotConnected;
    protected SymbolProvider symbolProvider;
    private BlockingQueue<Cmd> commandQ = new LinkedBlockingQueue<Cmd>();
    private ScheduledExecutorService es = Executors.newScheduledThreadPool(1);
    private CmdThread cmdThread;

    public IoChannel(Connection connection) {
        super("ConnectionListenerThread#" + ++s_InstanceId + " for " + connection.getId());
        this.connection = connection;
        this.cmdThread = new CmdThread(this.commandQ);
        this.es.scheduleAtFixedRate(this.cmdThread, 100L, 250L, TimeUnit.MILLISECONDS);
    }

    protected abstract void sendCommand(String var1);

    public abstract void disconnectAndShutdown();

    public final void enqueueCommand(String cmd) {
        Cmd c = new Cmd(cmd);
        this.enqueueCommand(c);
    }

    public final void enqueueCommand(Cmd cmd) {
        this.commandQ.offer(cmd);
    }

    public int createReadTimeoutMs() {
        return 25000 + (int)(Math.random() * 10.0) * 1000;
    }

    public int getMaxQueueSize() {
        return -1;
    }

    public int getQueueSize() {
        return -1;
    }

    protected void stopCommandThread() {
        this.es.shutdownNow();
    }

    protected void distributeMessage(byte[] array) {
        if (array == null || array.length < 1) {
            return;
        }
        boolean isStart = true;
        int pos1 = 0;
        int pos2 = 0;
        for (pos1 = 0; pos1 < array.length; ++pos1) {
            char c = (char)array[pos1];
            if (isStart) {
                if (pos1 > pos2) {
                    byte[] ba = new byte[pos1 - pos2];
                    System.arraycopy(array, pos2, ba, 0, ba.length);
                    this.connection.newQueueMessage(ba);
                    pos2 = pos1;
                }
                if (c != '\u0001') continue;
                isStart = false;
                continue;
            }
            if (c != '\u0003') continue;
            if (array.length > pos1 + 9 && array[pos1 + 1] == 20) {
                pos1 += 9;
            }
            isStart = true;
        }
        if (pos1 > pos2) {
            byte[] ba = new byte[pos1 - pos2];
            System.arraycopy(array, pos2, ba, 0, ba.length);
            this.connection.newQueueMessage(ba);
            pos2 = pos1;
        }
    }

    protected Connection getConnection() {
        return this.connection;
    }

    protected void sleep(int ms) {
        try {
            Thread.sleep(ms);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    protected void resendSubscriptionsOnReconnection() {
        this.log.info("Resending subscriptons due to a reconnection.");
        this.cmdThread.resendSubscriptionRequests();
    }

    protected ConnectionEvent makeConnectionEvent(ConnectionEventType type, boolean reconnection) {
        return new ConnectionEvent(type, reconnection);
    }

    static enum ConnectionState {
        NotConnected,
        Connecting,
        Connected,
        LoggedIn;

    }

    private class CmdThread
    implements Runnable {
        private static final int COMMAND_AGGREGATION_COUNT = 25;
        private BlockingQueue<Cmd> q;
        private StringBuilder sendBuf = new StringBuilder();
        private StringBuilder goBuf = new StringBuilder();
        private int numGo = 0;
        private StringBuilder stopBuf = new StringBuilder();
        private int numStop = 0;
        private List<String> subscriptionCmdHistory = new ArrayList<String>();

        public CmdThread(BlockingQueue<Cmd> q) {
            this.q = q;
        }

        @Override
        public void run() {
            this.initGoBuf();
            this.initStopBuf();
            while (this.q.peek() != null) {
                this.sendBuf.setLength(0);
                try {
                    Cmd cmd = (Cmd)this.q.poll();
                    if (cmd == null) continue;
                    if (!cmd.getCmd().startsWith("LOGIN") && IoChannel.this.connState != ConnectionState.LoggedIn) {
                        IoChannel.this.log.warn("Not logined in, ignoring command: " + cmd);
                        continue;
                    }
                    if (cmd.getCmd().startsWith("LOGIN") || cmd.getCmd().startsWith("VERSION") || cmd.getCmd().startsWith("LOGOFF") || cmd.getCmd().startsWith("LOGOUT")) {
                        this.sendBuf.append(cmd.getCmd());
                        this.send(this.sendBuf.toString());
                    } else if (cmd.getCmd().equals("GO")) {
                        ++this.numGo;
                        if (this.numGo > 1) {
                            this.goBuf.append(',');
                        }
                        this.goBuf.append(cmd.getSymbol() + "=" + cmd.getSuffix());
                    } else if (cmd.getCmd().equals("STOP")) {
                        ++this.numStop;
                        if (this.numStop > 1) {
                            this.stopBuf.append(',');
                        }
                        this.stopBuf.append(cmd.getSymbol() + "=" + cmd.getSuffix());
                    } else if (cmd.getCmd().equals("STR")) {
                        this.sendBuf.append(cmd.getCmd() + " L " + cmd.getSymbol() + ";");
                        this.send(this.sendBuf.toString());
                        this.subscriptionCmdHistory.add(this.sendBuf.toString());
                    }
                    if (this.numGo >= 25) {
                        this.send(this.goBuf.toString());
                        this.subscriptionCmdHistory.add(this.goBuf.toString());
                        this.initGoBuf();
                    }
                    if (this.numStop < 25) continue;
                    this.send(this.stopBuf.toString());
                    this.initStopBuf();
                }
                catch (Exception e) {
                    IoChannel.this.log.error("Could not send command: " + e.getMessage());
                }
            }
            if (this.numGo > 0) {
                this.send(this.goBuf.toString());
                this.subscriptionCmdHistory.add(this.goBuf.toString());
                this.initGoBuf();
            }
            if (this.numStop > 0) {
                this.send(this.stopBuf.toString());
                this.initStopBuf();
            }
        }

        public void resendSubscriptionRequests() {
            for (String cmd : this.subscriptionCmdHistory) {
                this.send(cmd);
            }
        }

        private void initStopBuf() {
            this.stopBuf.setLength(0);
            this.stopBuf.append("STOP ");
            this.numStop = 0;
        }

        private void initGoBuf() {
            this.goBuf.setLength(0);
            this.goBuf.append("GO ");
            this.numGo = 0;
        }

        private void send(String cmd) {
            IoChannel.this.log.info("> " + cmd);
            IoChannel.this.sendCommand(cmd);
        }
    }
}

