/*
 * Decompiled with CFR 0.152.
 */
package com.devexperts.qd.util;

import com.devexperts.qd.ng.RecordBuffer;
import com.devexperts.qd.ng.RecordListener;
import com.devexperts.qd.ng.RecordProvider;
import com.devexperts.qd.ng.RecordSource;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

public abstract class RecordProcessor {
    private final Executor executor;
    private final RecordHandler handler = new RecordHandler();
    private final AtomicBoolean taskScheduled = new AtomicBoolean(false);

    protected RecordProcessor(Executor executor) {
        if (executor == null) {
            throw new NullPointerException("executor is null");
        }
        this.executor = executor;
    }

    protected abstract void process(RecordSource var1);

    public void startProcessing(RecordProvider provider) {
        if (provider == null) {
            throw new NullPointerException("provider is null");
        }
        if (this.handler.provider != null) {
            throw new IllegalStateException("startProcessing was already called");
        }
        this.handler.provider = provider;
        provider.setRecordListener(this.handler);
    }

    public void stopProcessing() {
        if (this.handler.provider == null) {
            throw new IllegalStateException("startProcessing was not called");
        }
        this.handler.provider.setRecordListener(null);
    }

    public boolean hasMoreToProcess() {
        return this.taskScheduled.get();
    }

    protected void signalNoMoreToProcess() {
    }

    private void rescheduleTask() {
        this.executor.execute(this.handler);
    }

    private void scheduleTaskIfNeeded() {
        if (this.taskScheduled.compareAndSet(false, true)) {
            this.rescheduleTask();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void executeTask() {
        boolean rescheduleTask = true;
        try {
            RecordBuffer buf = RecordBuffer.getInstance(this.handler.provider.getMode());
            buf.setCapacityLimited(true);
            this.handler.retrieve(buf);
            if (!buf.isEmpty()) {
                this.process(buf);
            }
            buf.release();
            rescheduleTask = this.handler.available;
        }
        finally {
            if (rescheduleTask) {
                this.rescheduleTask();
            } else {
                try {
                    this.signalNoMoreToProcess();
                }
                finally {
                    this.taskScheduled.set(false);
                    if (this.handler.available) {
                        this.scheduleTaskIfNeeded();
                    }
                }
            }
        }
    }

    private final class RecordHandler
    implements RecordListener,
    Runnable {
        RecordProvider provider;
        volatile boolean available;

        RecordHandler() {
        }

        void retrieve(RecordBuffer buf) {
            if (!this.available) {
                return;
            }
            boolean more = true;
            try {
                this.available = false;
                more = this.provider.retrieve(buf);
            }
            finally {
                if (more) {
                    this.available = true;
                }
            }
        }

        @Override
        public void recordsAvailable(RecordProvider provider) {
            this.available = true;
            RecordProcessor.this.scheduleTaskIfNeeded();
        }

        @Override
        public void run() {
            RecordProcessor.this.executeTask();
        }
    }
}

