/*
 * Decompiled with CFR 0.152.
 */
package com.devexperts.qd.util;

import com.devexperts.logging.Logging;
import com.devexperts.qd.QDContract;
import com.devexperts.qd.QDDistributor;
import com.devexperts.qd.ng.RecordBuffer;
import com.devexperts.qd.ng.RecordListener;
import com.devexperts.qd.ng.RecordMode;
import com.devexperts.qd.ng.RecordProvider;
import com.devexperts.qd.ng.RecordSource;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

public abstract class SubscriptionProcessor {
    private static final boolean TRACE_LOG = SubscriptionProcessor.class.desiredAssertionStatus();
    private static final Logging log = Logging.getLogging(SubscriptionProcessor.class);
    private final Executor executor;
    private final QDContract contract;
    private final SubscriptionHandler addedHandler = new SubscriptionHandler();
    private final SubscriptionHandler removedHandler = new SubscriptionHandler();
    private final AtomicBoolean taskScheduled = new AtomicBoolean(false);

    protected SubscriptionProcessor(Executor executor, QDContract contract) {
        if (executor == null) {
            throw new NullPointerException("executor is null");
        }
        this.executor = executor;
        this.contract = contract;
    }

    protected abstract void processAddedSubscription(RecordSource var1);

    protected abstract void processRemovedSubscription(RecordSource var1);

    public void startProcessing(QDDistributor distributor) {
        this.startProcessing(distributor.getAddedRecordProvider(), distributor.getRemovedRecordProvider());
    }

    public void startProcessing(RecordProvider addedSubscriptionProvider, RecordProvider removedSubscriptionProvider) {
        if (addedSubscriptionProvider == null && removedSubscriptionProvider == null) {
            throw new NullPointerException("Both subscription providers are null");
        }
        if (this.addedHandler.provider != null || this.removedHandler.provider != null) {
            throw new IllegalStateException("startProcessing was already called");
        }
        this.addedHandler.provider = addedSubscriptionProvider;
        this.removedHandler.provider = removedSubscriptionProvider;
        this.addedHandler.setListener(this.addedHandler);
        this.removedHandler.setListener(this.removedHandler);
    }

    public void stopProcessing() {
        if (this.addedHandler.provider == null && this.removedHandler.provider == null) {
            throw new IllegalStateException("startProcessing was not called");
        }
        this.addedHandler.setListener(null);
        this.removedHandler.setListener(null);
    }

    public boolean hasMoreToProcess() {
        return this.taskScheduled.get();
    }

    protected void signalNoMoreToProcess() {
    }

    private void rescheduleTask() {
        this.executor.execute(this.addedHandler);
    }

    private void scheduleTaskIfNeeded() {
        if (this.taskScheduled.compareAndSet(false, true)) {
            this.rescheduleTask();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void executeTask() {
        boolean rescheduleTask = true;
        try {
            RecordBuffer buf = RecordBuffer.getInstance(RecordMode.addedSubscriptionFor(this.contract));
            buf.setCapacityLimited(true);
            this.addedHandler.retrieve(buf);
            if (TRACE_LOG) {
                log.trace("executeTask added size=" + buf.size());
            }
            if (!buf.isEmpty()) {
                this.processAddedSubscription(buf);
                buf.clear();
            }
            buf.setMode(RecordMode.SUBSCRIPTION);
            this.removedHandler.retrieve(buf);
            if (TRACE_LOG) {
                log.trace("executeTask removed size=" + buf.size());
            }
            if (!buf.isEmpty()) {
                this.processRemovedSubscription(buf);
            }
            buf.release();
            rescheduleTask = this.addedHandler.available || this.removedHandler.available;
        }
        finally {
            if (rescheduleTask) {
                this.rescheduleTask();
            } else {
                try {
                    this.signalNoMoreToProcess();
                }
                finally {
                    this.taskScheduled.set(false);
                    if (this.addedHandler.available || this.removedHandler.available) {
                        this.scheduleTaskIfNeeded();
                    }
                }
            }
        }
    }

    private final class SubscriptionHandler
    implements RecordListener,
    Runnable {
        RecordProvider provider;
        volatile boolean available;

        SubscriptionHandler() {
        }

        void setListener(RecordListener listener) {
            if (this.provider != null) {
                this.provider.setRecordListener(listener);
            }
        }

        void retrieve(RecordBuffer buf) {
            if (!this.available) {
                return;
            }
            boolean more = true;
            try {
                this.available = false;
                more = this.provider.retrieve(buf);
            }
            finally {
                if (more) {
                    this.available = true;
                }
            }
        }

        @Override
        public void recordsAvailable(RecordProvider provider) {
            if (TRACE_LOG) {
                log.trace("recordsAvailable from " + provider);
            }
            this.available = true;
            SubscriptionProcessor.this.scheduleTaskIfNeeded();
        }

        @Override
        public void run() {
            SubscriptionProcessor.this.executeTask();
        }
    }
}

