/*
 * Decompiled with CFR 0.152.
 */
package com.devexperts.qd.qtp.socket;

import com.devexperts.io.Chunk;
import com.devexperts.io.ChunkList;
import com.devexperts.qd.qtp.QTPConstants;
import com.devexperts.qd.qtp.QTPWorkerThread;
import com.devexperts.qd.qtp.socket.SocketHandler;
import com.devexperts.util.SystemProperties;
import com.devexperts.util.TimePeriod;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.locks.LockSupport;

class SocketReader
extends QTPWorkerThread {
    private static final long WARN_TIMEOUT_NANOS = TimePeriod.valueOf(SystemProperties.getProperty("com.devexperts.qd.qtp.socket.readerWarnTimeout", "15s")).getNanos();
    private final SocketHandler handler;
    private volatile boolean isReadyToProcess = true;

    SocketReader(SocketHandler handler) {
        super(handler + "-Reader");
        this.handler = handler;
    }

    @Override
    protected void doWork() throws IOException, InterruptedException {
        SocketHandler.ThreadData threadData = this.handler.initThreadData();
        if (threadData == null) {
            return;
        }
        InputStream in = threadData.socket.getInputStream();
        block0: while (true) {
            long deltaTimeNanos;
            long timeNanos;
            ChunkList chunks = this.handler.chunkPool.getChunkList(this);
            int totalRead = 0;
            while (true) {
                if (this.isClosed()) {
                    return;
                }
                Chunk chunk = this.handler.chunkPool.getChunk(this);
                int readCapacity = chunk.getLength();
                int bytesRead = in.read(chunk.getBytes(), chunk.getOffset(), readCapacity);
                timeNanos = System.nanoTime();
                if (this.handler.verbose && this.log.debugEnabled()) {
                    this.log.debug(SocketHandler.verboseBytesToString("Read", chunk.getBytes(), chunk.getOffset(), bytesRead));
                }
                if (bytesRead < 0) {
                    throw new EOFException("Connection closed by remote side");
                }
                threadData.connectionStats.addReadBytes(bytesRead);
                chunk.setLength(bytesRead, this);
                chunks.add(chunk, this);
                if ((totalRead += bytesRead) >= QTPConstants.READ_AGGREGATION_SIZE || bytesRead < readCapacity || in.available() == 0) break;
                if (!this.handler.verbose || !this.log.debugEnabled()) continue;
                this.log.debug("More data is available, will read");
            }
            if (this.isClosed()) {
                return;
            }
            this.isReadyToProcess = false;
            if (threadData.connection.processChunks(chunks, this)) {
                this.isReadyToProcess = true;
            }
            if ((deltaTimeNanos = System.nanoTime() - timeNanos) > WARN_TIMEOUT_NANOS) {
                this.log.warn("processChunks took " + deltaTimeNanos + " ns");
            }
            while (true) {
                if (this.isReadyToProcess) continue block0;
                if (this.isClosed()) {
                    return;
                }
                if (this.handler.verbose && this.log.debugEnabled()) {
                    this.log.debug("Parking until more data can be processed");
                }
                LockSupport.park();
                if (!this.handler.verbose || !this.log.debugEnabled()) continue;
                this.log.debug("Unparked");
            }
            break;
        }
    }

    @Override
    protected void handleShutdown() {
        this.handler.stopConnector();
    }

    @Override
    protected void handleClose(Throwable reason) {
        this.handler.exitSocket(reason);
    }

    void readyToProcess() {
        if (this.isReadyToProcess) {
            return;
        }
        this.isReadyToProcess = true;
        LockSupport.unpark(this);
    }
}

