/*
 * Decompiled with CFR 0.152.
 */
package com.devexperts.qd.qtp;

import com.devexperts.qd.QDAgent;
import com.devexperts.qd.QDCollector;
import com.devexperts.qd.QDContract;
import com.devexperts.qd.QDFactory;
import com.devexperts.qd.QDFilter;
import com.devexperts.qd.QDStream;
import com.devexperts.qd.SubscriptionFilter;
import com.devexperts.qd.kit.CompositeFilters;
import com.devexperts.qd.ng.AbstractRecordProvider;
import com.devexperts.qd.ng.AbstractRecordSink;
import com.devexperts.qd.ng.EventFlag;
import com.devexperts.qd.ng.RecordBuffer;
import com.devexperts.qd.ng.RecordCursor;
import com.devexperts.qd.ng.RecordFilter;
import com.devexperts.qd.ng.RecordListener;
import com.devexperts.qd.ng.RecordMode;
import com.devexperts.qd.ng.RecordProvider;
import com.devexperts.qd.ng.RecordSink;
import com.devexperts.qd.ng.RecordSource;
import com.devexperts.qd.qtp.AgentAdapter;
import com.devexperts.qd.qtp.ChannelShaper;
import com.devexperts.qd.qtp.MessageType;
import com.devexperts.qd.qtp.MessageVisitor;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

class AgentChannel
implements RecordListener {
    private static final byte SUB_NO_FILTER = 0;
    private static final byte SUB_FILTER_AGENT = 1;
    private static final byte SUB_FILTER_PROCESS = 2;
    private static final byte SUB_FILTER_EXECUTOR = 3;
    private static final byte ACTION_CLOSE = 0;
    private static final byte ACTION_RECONFIGURE_1 = 1;
    private static final byte ACTION_RECONFIGURE_2 = 2;
    private static final byte ACTION_RECONFIGURE_3 = 3;
    private static final byte ACTION_RECONFIGURE_4 = 4;
    private static final byte ACTION_ADD_SUB_FILTER = 5;
    private static final byte ACTION_REMOVE_SUB_FILTER = 6;
    private static final byte ACTION_ADD_SUB = 7;
    private static final byte ACTION_REMOVE_SUB = 8;
    private static final int DATA_NOT_AVAILABLE = 0;
    private static final int DATA_AVAILABLE = 1;
    private static final int DATA_WAIT = 2;
    private static final int DATA_WAIT_AVAILABLE = 3;
    private static final AtomicIntegerFieldUpdater<AgentChannel> DATA_AVAILABLE_STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AgentChannel.class, "dataAvailableState");
    private static final Config CLOSED_CONFIG = new Config(null, null, null, 0, 0L);
    final AgentAdapter adapter;
    final ChannelShaper shaper;
    private volatile Config subActionConfig;
    private final SubActionQueue subActionQueue = new SubActionQueue();
    private volatile AgentConfig agentConfig;
    private QDAgent rejectedAgent;
    private FilteringRecordProvider filteringRecordProvider;
    private volatile boolean snapshotIsAvailable;
    private volatile int dataAvailableState = 0;
    private volatile long nextDataTime;
    double quota;

    AgentChannel(AgentAdapter adapter, ChannelShaper shaper) {
        this.adapter = adapter;
        this.shaper = shaper;
        this.subActionConfig = this.createNewConfig();
    }

    private boolean hasSubscriptionExecutor() {
        return this.shaper.hasSubscriptionExecutor();
    }

    private Config createNewConfig() {
        QDFilter subscriptionFilter = this.shaper.getSubscriptionFilter();
        QDFilter completeSubscriptionFilter = CompositeFilters.makeAnd(this.adapter.peerFilter[this.shaper.getContract().ordinal()], subscriptionFilter);
        byte subFilterMode = completeSubscriptionFilter == QDFilter.ANYTHING ? (byte)0 : (this.hasSubscriptionExecutor() ? (completeSubscriptionFilter.isFast() ? (byte)2 : 3) : (completeSubscriptionFilter.isFast() && !this.shaper.isKeepRejected() ? (byte)1 : 2));
        return new Config(this.shaper.getCollector(), subscriptionFilter, completeSubscriptionFilter, subFilterMode, this.shaper.getAggregationPeriod());
    }

    private boolean underLockOrInSubActionThread() {
        if (this.hasSubscriptionExecutor()) {
            return !Thread.holdsLock(this) && this.subActionQueue.running;
        }
        return Thread.holdsLock(this);
    }

    void reconfigureIfNeeded() {
        if (!this.needToReconfigure(this.subActionConfig)) {
            return;
        }
        this.reconfigureIfNeededSync();
    }

    private boolean needToReconfigure(Config config) {
        return config != CLOSED_CONFIG && (config.collector != this.shaper.getCollector() || config.subscriptionFilter != this.shaper.getSubscriptionFilter() || config.aggregationPeriod != this.shaper.getAggregationPeriod() || this.adapter.isClosed());
    }

    private synchronized void reconfigureIfNeededSync() {
        if (!this.needToReconfigure(this.subActionConfig)) {
            return;
        }
        if (this.adapter.isClosed()) {
            this.close();
            return;
        }
        this.subActionConfig = this.createNewConfig();
        this.subActionQueue.addAction(this.subActionConfig, (byte)1);
    }

    private SubAction processReconfigurePhase1Action(final SubAction a) {
        boolean aggregationChanged;
        assert (this.underLockOrInSubActionThread());
        assert (a.sub == null);
        if (this.agentConfig == null) {
            return null;
        }
        Config oldConfig = this.agentConfig.config;
        boolean collectorChanged = oldConfig.collector != a.config.collector;
        boolean filterChanged = !oldConfig.completeSubscriptionFilter.equals(a.config.completeSubscriptionFilter) || oldConfig.subFilterMode != a.config.subFilterMode;
        boolean bl = aggregationChanged = oldConfig.aggregationPeriod != a.config.aggregationPeriod;
        if (!(collectorChanged || filterChanged || aggregationChanged)) {
            this.agentConfig = new AgentConfig(a.config, this.agentConfig.agent);
            return null;
        }
        if (!collectorChanged && !filterChanged) {
            this.updateAgentAggregation(a.config);
            return null;
        }
        if (!collectorChanged && oldConfig.subFilterMode != 1 && a.config.subFilterMode != 1) {
            this.reconfigurePhase1Light(a);
            return null;
        }
        QDContract contract = this.shaper.getContract();
        final RecordBuffer reincarnationBuffer = RecordBuffer.getInstance(RecordMode.addedSubscriptionFor(contract));
        final RecordBuffer rejectionBuffer = !this.shaper.isKeepRejected() ? null : RecordBuffer.getInstance(RecordMode.addedSubscriptionFor(contract));
        RecordSink sink = reincarnationBuffer;
        switch (a.config.subFilterMode) {
            case 2: 
            case 3: {
                sink = new AbstractRecordSink(){

                    @Override
                    public void append(RecordCursor cur) {
                        if (a.config.completeSubscriptionFilter.accept(AgentChannel.this.shaper.getContract(), cur.getRecord(), cur.getCipher(), cur.getSymbol())) {
                            reincarnationBuffer.append(cur);
                        } else if (rejectionBuffer != null) {
                            rejectionBuffer.append(cur);
                        }
                    }
                };
                break;
            }
            default: {
                assert (rejectionBuffer == null);
                break;
            }
        }
        this.agentConfig.agent.examineSubscription(sink);
        if (this.shaper.isKeepRejected()) {
            if (this.rejectedAgent != null) {
                this.rejectedAgent.examineSubscription(sink);
            }
            if (!rejectionBuffer.isEmpty()) {
                this.getOrCreateRejectedAgent().setSubscription(rejectionBuffer);
            }
            rejectionBuffer.release();
        }
        a.action = (byte)3;
        a.sub = reincarnationBuffer;
        return a;
    }

    private void reconfigurePhase1Light(final SubAction a) {
        this.updateAgentAggregation(a.config);
        final ArrayList<RecordBuffer> changeSubList = new ArrayList<RecordBuffer>();
        this.agentConfig.agent.examineSubscription(new AbstractRecordSink(){
            RecordBuffer sub;
            QDContract contract;
            {
                this.contract = AgentChannel.this.shaper.getContract();
            }

            @Override
            public void append(RecordCursor cur) {
                if (a.config.completeSubscriptionFilter.accept(this.contract, cur.getRecord(), cur.getCipher(), cur.getSymbol())) {
                    return;
                }
                if (this.sub == null) {
                    this.sub = RecordBuffer.getInstance(RecordMode.addedSubscriptionFor(this.contract).withEventFlags());
                    this.sub.setCapacityLimited(true);
                    changeSubList.add(this.sub);
                }
                this.sub.add(cur).setEventFlags(EventFlag.REMOVE_SYMBOL.flag());
                if (!this.sub.hasCapacity()) {
                    this.sub = null;
                }
            }
        });
        if (this.shaper.isKeepRejected()) {
            this.processReconfigurePhase1LightRejectedAgent(a, changeSubList);
        }
        if (!changeSubList.isEmpty()) {
            this.subActionQueue.addActionListToHeadAndConsumeBuffers(a.config, (byte)2, changeSubList);
        }
    }

    private void processReconfigurePhase1LightRejectedAgent(final SubAction a, final List<RecordBuffer> changeSubList) {
        if (this.rejectedAgent != null) {
            this.rejectedAgent.examineSubscription(new AbstractRecordSink(){
                RecordBuffer sub;
                QDContract contract;
                {
                    this.contract = AgentChannel.this.shaper.getContract();
                }

                @Override
                public void append(RecordCursor cur) {
                    if (!a.config.completeSubscriptionFilter.accept(this.contract, cur.getRecord(), cur.getCipher(), cur.getSymbol())) {
                        return;
                    }
                    if (this.sub == null) {
                        this.sub = RecordBuffer.getInstance(RecordMode.addedSubscriptionFor(this.contract));
                        this.sub.setCapacityLimited(true);
                        changeSubList.add(this.sub);
                    }
                    this.sub.append(cur);
                    if (!this.sub.hasCapacity()) {
                        this.sub = null;
                    }
                }
            });
        }
        QDContract contract = this.shaper.getContract();
        for (RecordBuffer sub : changeSubList) {
            RecordCursor cur;
            RecordBuffer rejectedSub = RecordBuffer.getInstance(RecordMode.addedSubscriptionFor(contract).withEventFlags());
            while ((cur = sub.next()) != null) {
                RecordCursor rejectedCur = rejectedSub.add(cur);
                rejectedCur.setEventFlags(EventFlag.REMOVE_SYMBOL.in(cur.getEventFlags()) ? 0 : EventFlag.REMOVE_SYMBOL.flag());
            }
            sub.rewind();
            if (!rejectedSub.isEmpty()) {
                this.getOrCreateRejectedAgent().addSubscription(rejectedSub);
            }
            rejectedSub.release();
        }
    }

    private SubAction processReconfigurePhase2Action(SubAction a) {
        assert (this.underLockOrInSubActionThread());
        assert (a.sub != null);
        assert (this.agentConfig != null);
        a.notify = this.agentConfig.agent.addSubscriptionPart(a.sub, a.notify);
        if (a.notify != 0) {
            return a;
        }
        a.sub.release();
        return null;
    }

    private SubAction processReconfigurePhase3Action(SubAction a) {
        assert (this.underLockOrInSubActionThread());
        assert (a.sub != null);
        assert (this.agentConfig != null);
        a.notify = this.agentConfig.agent.closePart(a.notify);
        if (a.notify != 0) {
            return a;
        }
        a.action = (byte)4;
        return a;
    }

    private SubAction processReconfigurePhase4Action(SubAction a) {
        assert (this.underLockOrInSubActionThread());
        assert (a.sub != null);
        assert (this.agentConfig != null);
        if (a.notify == 0) {
            this.initNewAgent(a.config);
            if (a.sub.isEmpty()) {
                a.sub.release();
                return null;
            }
        }
        a.notify = this.agentConfig.agent.addSubscriptionPart(a.sub, a.notify);
        if (a.notify != 0) {
            return a;
        }
        a.sub.release();
        return null;
    }

    private QDAgent createAgent(Config config) {
        assert (this.underLockOrInSubActionThread());
        QDCollector collector = config.collector;
        return collector == null ? QDFactory.getDefaultFactory().createVoidAgentBuilder(this.shaper.getContract(), this.adapter.getScheme()).build() : this.adapter.createAgent(collector, config.subFilterMode == 1 ? config.completeSubscriptionFilter : QDFilter.ANYTHING, this.adapter.getStats().getFullKeyProperties());
    }

    private synchronized QDAgent getOrCreateRejectedAgent() {
        if (this.rejectedAgent != null) {
            return this.rejectedAgent;
        }
        this.rejectedAgent = QDFactory.getDefaultFactory().createVoidAgentBuilder(this.shaper.getContract(), this.adapter.getScheme()).build();
        return this.rejectedAgent;
    }

    final synchronized void close() {
        if (this.subActionConfig == CLOSED_CONFIG) {
            return;
        }
        this.shaper.close();
        this.subActionConfig = CLOSED_CONFIG;
        this.subActionQueue.addCloseAction();
    }

    @Override
    public void recordsAvailable(RecordProvider provider) {
        AgentConfig agentConfig = this.agentConfig;
        QDAgent agent = agentConfig.agent;
        if (provider == agent) {
            int oldState;
            do {
                if (((oldState = this.dataAvailableState) & 1) == 0) continue;
                return;
            } while (!DATA_AVAILABLE_STATE_UPDATER.compareAndSet(this, oldState, oldState | 1));
            if ((oldState & 2) != 0) {
                return;
            }
            this.adapter.notifyListener();
        } else if (agentConfig.config.hasAggregationPeriod() && provider == agent.getSnapshotProvider()) {
            if (this.snapshotIsAvailable) {
                return;
            }
            this.snapshotIsAvailable = true;
            this.adapter.notifyListener();
        }
    }

    boolean hasSnapshotOrDataForNow(long currentTime) {
        return this.snapshotIsAvailable || this.hasDataForNow(currentTime);
    }

    private boolean hasDataForNow(long currentTime) {
        switch (this.dataAvailableState) {
            case 0: {
                return false;
            }
            case 1: {
                return true;
            }
            case 2: {
                return false;
            }
            case 3: {
                return currentTime >= this.nextDataTime;
            }
        }
        throw new IllegalStateException();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean retrieveSnapshotOrData(MessageVisitor visitor, long currentTime) {
        QDAgent agent;
        Config config;
        block12: {
            boolean result;
            block13: {
                long period;
                block10: {
                    AgentConfig agentConfig = this.agentConfig;
                    config = agentConfig.config;
                    agent = agentConfig.agent;
                    if (!this.hasDataForNow(currentTime)) break block12;
                    if (this.dataAvailableState == 1) {
                        this.nextDataTime = 0L;
                    }
                    this.dataAvailableState = (period = config.aggregationPeriod) <= 0L ? 0 : 2;
                    result = true;
                    try {
                        result = this.retrieveFromProvider(config, agent, visitor);
                        if (!result) break block10;
                        this.dataAvailableState = period <= 0L ? 1 : 3;
                    }
                    catch (Throwable throwable) {
                        if (result) {
                            this.dataAvailableState = period <= 0L ? 1 : 3;
                        } else {
                            this.nextDataTime = period <= 0L ? 0L : (currentTime < this.nextDataTime + period / 2L ? (this.nextDataTime += period) : currentTime + period);
                        }
                        throw throwable;
                    }
                    break block13;
                }
                this.nextDataTime = period <= 0L ? 0L : (currentTime < this.nextDataTime + period / 2L ? (this.nextDataTime += period) : currentTime + period);
            }
            return result;
        }
        if (this.snapshotIsAvailable) {
            this.snapshotIsAvailable = false;
            boolean result = true;
            try {
                result = this.retrieveFromProvider(config, agent.getSnapshotProvider(), visitor);
            }
            finally {
                if (result) {
                    this.snapshotIsAvailable = true;
                }
            }
            return result;
        }
        return false;
    }

    long nextRetrieveTime(long currentTime) {
        if (this.snapshotIsAvailable) {
            return 0L;
        }
        block6: while (true) {
            int oldState = this.dataAvailableState;
            switch (oldState) {
                case 0: {
                    return Long.MAX_VALUE;
                }
                case 1: {
                    return 0L;
                }
                case 2: {
                    long nextTime = this.nextDataTime;
                    if (currentTime >= nextTime) continue block6;
                    return nextTime;
                    if (!DATA_AVAILABLE_STATE_UPDATER.compareAndSet(this, oldState, 0)) continue block6;
                    return Long.MAX_VALUE;
                }
                case 3: {
                    return this.nextDataTime;
                }
            }
            break;
        }
        throw new IllegalStateException();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean retrieveFromProvider(Config config, RecordProvider provider, MessageVisitor visitor) {
        try {
            QDFilter remainingSubscriptionFilter = config.subFilterMode != 1 && config.collector instanceof QDStream && ((QDStream)config.collector).getEnableWildcards() ? config.completeSubscriptionFilter : null;
            RecordFilter dataFilter = this.shaper.getDataFilter();
            if (remainingSubscriptionFilter != null && remainingSubscriptionFilter != QDFilter.ANYTHING || dataFilter != null) {
                if (this.filteringRecordProvider == null) {
                    this.filteringRecordProvider = new FilteringRecordProvider();
                }
                this.filteringRecordProvider.set(provider, remainingSubscriptionFilter, dataFilter);
                provider = this.filteringRecordProvider;
            }
            boolean bl = this.adapter.visitData(visitor, provider, MessageType.forData(this.shaper.getContract()));
            return bl;
        }
        finally {
            if (this.filteringRecordProvider != null) {
                this.filteringRecordProvider.set(null, null, null);
            }
        }
    }

    synchronized void processSubscription(MessageType message, RecordSource source) {
        byte action;
        assert (message.getContract() == this.shaper.getContract() && message.isSubscription());
        this.reconfigureIfNeededSync();
        Config config = this.subActionConfig;
        if (config == CLOSED_CONFIG) {
            return;
        }
        byte by = config.subFilterMode == 3 ? (message.isSubscriptionAdd() ? (byte)5 : 6) : (action = message.isSubscriptionAdd() ? (byte)7 : 8);
        if (config.subFilterMode == 2) {
            this.filterSubscriptionAndAddActions(source, config, action);
            return;
        }
        this.subActionQueue.addActionAndCopySource(config, action, source);
    }

    private void filterSubscriptionAndAddActions(RecordSource source, Config config, byte action) {
        RecordCursor cur;
        assert (action == 7 || action == 8);
        RecordBuffer sub = null;
        RecordBuffer rejectedSub = null;
        QDContract contract = this.shaper.getContract();
        while ((cur = source.next()) != null) {
            if (!config.completeSubscriptionFilter.accept(contract, cur.getRecord(), cur.getCipher(), cur.getSymbol())) {
                if (!this.shaper.isKeepRejected()) continue;
                if (rejectedSub == null) {
                    rejectedSub = RecordBuffer.getInstance(RecordMode.addedSubscriptionFor(contract).withEventFlags());
                }
                rejectedSub.append(cur);
                continue;
            }
            if (sub == null) {
                sub = RecordBuffer.getInstance(source.getMode());
                sub.setCapacityLimited(true);
            }
            sub.append(cur);
            if (sub.hasCapacity()) continue;
            this.subActionQueue.addActionAndConsumeBuffer(this.subActionConfig, action, sub);
            sub = null;
        }
        if (sub != null) {
            this.subActionQueue.addActionAndConsumeBuffer(this.subActionConfig, action, sub);
        }
        if (rejectedSub != null) {
            switch (action) {
                case 7: {
                    this.getOrCreateRejectedAgent().addSubscription(rejectedSub);
                    break;
                }
                case 8: {
                    if (this.rejectedAgent == null) break;
                    this.rejectedAgent.removeSubscription(rejectedSub);
                    break;
                }
                default: {
                    assert (false);
                    break;
                }
            }
            rejectedSub.release();
        }
    }

    QDCollector getSubActionCollector(SubAction a) {
        switch (a.action) {
            case 0: 
            case 2: 
            case 3: 
            case 7: 
            case 8: {
                AgentConfig agentConfig = this.agentConfig;
                return agentConfig == null ? null : agentConfig.config.collector;
            }
            case 4: {
                return a.config.collector;
            }
        }
        return null;
    }

    SubAction processSubAction(SubAction a) {
        switch (a.action) {
            case 0: {
                return this.processCloseAction(a);
            }
            case 1: {
                return this.processReconfigurePhase1Action(a);
            }
            case 2: {
                return this.processReconfigurePhase2Action(a);
            }
            case 3: {
                return this.processReconfigurePhase3Action(a);
            }
            case 4: {
                return this.processReconfigurePhase4Action(a);
            }
            case 5: 
            case 6: {
                return this.processSubscriptionFilterAction(a);
            }
            case 7: 
            case 8: {
                return this.processSubscriptionChangeAction(a);
            }
        }
        throw new AssertionError();
    }

    private SubAction processCloseAction(SubAction a) {
        assert (this.underLockOrInSubActionThread());
        assert (a.action == 0 && a.sub == null);
        assert (this.subActionConfig == CLOSED_CONFIG);
        if (this.agentConfig == null) {
            return null;
        }
        QDAgent agent = this.agentConfig.agent;
        if (this.agentConfig.config != CLOSED_CONFIG) {
            this.agentConfig = new AgentConfig(CLOSED_CONFIG, agent);
        }
        a.notify = agent.closePart(a.notify);
        return a.notify != 0 ? a : null;
    }

    private SubAction processSubscriptionFilterAction(SubAction a) {
        RecordCursor cur;
        assert (this.underLockOrInSubActionThread());
        assert ((a.action == 5 || a.action == 6) && a.sub != null);
        RecordBuffer sub = RecordBuffer.getInstance(a.sub.getMode());
        RecordBuffer rejectedSub = null;
        QDContract contract = this.shaper.getContract();
        while ((cur = a.sub.next()) != null) {
            if (!a.config.completeSubscriptionFilter.accept(contract, cur.getRecord(), cur.getCipher(), cur.getSymbol())) {
                if (!this.shaper.isKeepRejected()) continue;
                if (rejectedSub == null) {
                    rejectedSub = RecordBuffer.getInstance(RecordMode.addedSubscriptionFor(contract).withEventFlags());
                }
                rejectedSub.append(cur);
                continue;
            }
            sub.append(cur);
        }
        a.sub.release();
        if (sub.isEmpty()) {
            sub.release();
            return null;
        }
        if (rejectedSub != null) {
            switch (a.action) {
                case 5: {
                    this.getOrCreateRejectedAgent().addSubscription(rejectedSub);
                    break;
                }
                case 6: {
                    if (this.rejectedAgent == null) break;
                    this.rejectedAgent.removeSubscription(rejectedSub);
                    break;
                }
                default: {
                    assert (false);
                    break;
                }
            }
            rejectedSub.release();
        }
        a.action = (byte)(a.action == 5 ? 7 : 8);
        a.sub = sub;
        return a;
    }

    private SubAction processSubscriptionChangeAction(SubAction a) {
        assert (this.underLockOrInSubActionThread());
        assert (a.sub != null);
        switch (a.action) {
            case 7: {
                this.initNewAgentIfNeeded(a.config);
                a.notify = this.agentConfig.agent.addSubscriptionPart(a.sub, a.notify);
                break;
            }
            case 8: {
                if (this.agentConfig == null) {
                    a.sub.release();
                    return null;
                }
                a.notify = this.agentConfig.agent.removeSubscriptionPart(a.sub, a.notify);
                break;
            }
            default: {
                assert (false);
                break;
            }
        }
        if (a.notify != 0) {
            return a;
        }
        a.sub.release();
        return null;
    }

    private void initNewAgentIfNeeded(Config config) {
        assert (this.underLockOrInSubActionThread());
        if (this.agentConfig != null) {
            return;
        }
        this.initNewAgent(config);
    }

    private void initNewAgent(Config config) {
        assert (this.underLockOrInSubActionThread());
        QDAgent agent = this.createAgent(config);
        this.agentConfig = new AgentConfig(config, agent);
        agent.setRecordListener(this);
        if (config.hasAggregationPeriod()) {
            agent.getSnapshotProvider().setRecordListener(this);
        }
    }

    private void updateAgentAggregation(Config config) {
        this.agentConfig = new AgentConfig(config, this.agentConfig.agent);
        this.agentConfig.agent.getSnapshotProvider().setRecordListener(config.hasAggregationPeriod() ? this : null);
    }

    private static class FilteringRecordSink
    extends AbstractRecordSink {
        private SubscriptionFilter subscriptionFilter;
        private RecordFilter dataFilter;
        RecordSink recordSink;

        FilteringRecordSink() {
        }

        void set(SubscriptionFilter subscriptionFilter, RecordFilter dataFilter) {
            this.subscriptionFilter = subscriptionFilter;
            this.dataFilter = dataFilter;
        }

        @Override
        public boolean hasCapacity() {
            return this.recordSink.hasCapacity();
        }

        @Override
        public void append(RecordCursor cursor) {
            if ((this.subscriptionFilter == null || this.subscriptionFilter.acceptRecord(cursor.getRecord(), cursor.getCipher(), cursor.getSymbol())) && (this.dataFilter == null || this.dataFilter.accept(cursor))) {
                this.recordSink.append(cursor);
            }
        }
    }

    private static class FilteringRecordProvider
    extends AbstractRecordProvider {
        private final FilteringRecordSink filteringSink = new FilteringRecordSink();
        private RecordProvider dataSource;

        FilteringRecordProvider() {
        }

        void set(RecordProvider dataSource, SubscriptionFilter subscriptionFilter, RecordFilter dataFilter) {
            this.dataSource = dataSource;
            this.filteringSink.set(subscriptionFilter, dataFilter);
        }

        @Override
        public RecordMode getMode() {
            return this.dataSource.getMode();
        }

        @Override
        public boolean retrieve(RecordSink sink) {
            try {
                this.filteringSink.recordSink = sink;
                boolean bl = this.dataSource.retrieve(this.filteringSink);
                return bl;
            }
            finally {
                this.filteringSink.recordSink = null;
            }
        }
    }

    private static class AgentConfig {
        final Config config;
        final QDAgent agent;

        private AgentConfig(Config config, QDAgent agent) {
            this.config = config;
            this.agent = agent;
        }
    }

    private static class Config {
        final QDCollector collector;
        final QDFilter subscriptionFilter;
        final QDFilter completeSubscriptionFilter;
        final byte subFilterMode;
        final long aggregationPeriod;

        Config(QDCollector collector, QDFilter subscriptionFilter, QDFilter completeSubscriptionFilter, byte subFilterMode, long aggregationPeriod) {
            this.collector = collector;
            this.subscriptionFilter = subscriptionFilter;
            this.completeSubscriptionFilter = completeSubscriptionFilter;
            this.subFilterMode = subFilterMode;
            this.aggregationPeriod = aggregationPeriod;
        }

        boolean hasAggregationPeriod() {
            return this.aggregationPeriod != 0L;
        }
    }

    private class SubActionQueue
    implements Runnable {
        private SubAction pooledAction;
        private boolean scheduled;
        private boolean running;
        private ArrayDeque<SubAction> queue = new ArrayDeque(2);

        private SubActionQueue() {
        }

        @Override
        public void run() {
            SubAction current;
            SubAction next;
            block4: {
                next = null;
                try {
                    current = this.poll();
                    if (current != null) break block4;
                    this.finish(next);
                    return;
                }
                catch (Throwable throwable) {
                    this.finish(next);
                    throw throwable;
                }
            }
            next = AgentChannel.this.processSubAction(current);
            if (next == null) {
                this.pooledAction = current;
            }
            this.finish(next);
        }

        private synchronized SubAction poll() {
            assert (this.scheduled && !this.running);
            this.running = true;
            return this.queue.poll();
        }

        private synchronized void finish(SubAction next) {
            assert (this.scheduled && this.running);
            this.running = false;
            if (!this.isClosed() && next != null) {
                this.queue.addFirst(next);
            }
            if (this.queue.isEmpty()) {
                this.scheduled = false;
            } else if (AgentChannel.this.hasSubscriptionExecutor()) {
                this.scheduleInExecutor();
            }
        }

        private void scheduleIfNeeded() {
            assert (Thread.holdsLock(this));
            if (this.scheduled) {
                return;
            }
            this.scheduled = true;
            if (AgentChannel.this.hasSubscriptionExecutor()) {
                this.scheduleInExecutor();
            } else {
                this.runInPlace();
            }
        }

        private void scheduleInExecutor() {
            assert (Thread.holdsLock(this));
            assert (!this.queue.isEmpty());
            assert (AgentChannel.this.hasSubscriptionExecutor());
            QDCollector collector = AgentChannel.this.getSubActionCollector(this.queue.getFirst());
            if (collector == null) {
                AgentChannel.this.shaper.getSubscriptionExecutor().execute(this);
            } else {
                collector.executeLockBoundTask(AgentChannel.this.shaper.getSubscriptionExecutor(), this);
            }
        }

        private void runInPlace() {
            assert (Thread.holdsLock(AgentChannel.this) && Thread.holdsLock(this) && !AgentChannel.this.hasSubscriptionExecutor());
            while (this.scheduled) {
                this.run();
            }
        }

        synchronized void addAction(Config config, byte action) {
            assert (action == 1 || action == 0);
            if (this.isClosed()) {
                return;
            }
            if (!this.queue.isEmpty()) {
                SubAction last = this.queue.getLast();
                if (last.action == action) {
                    last.config = config;
                    return;
                }
            }
            SubAction a = this.getSubActionInstance();
            a.config = config;
            a.action = action;
            a.sub = null;
            a.notify = 0;
            this.queue.add(a);
            this.scheduleIfNeeded();
        }

        synchronized void addActionAndConsumeBuffer(Config config, byte action, RecordBuffer sub) {
            if (this.isClosed()) {
                return;
            }
            if (sub.hasCapacity() && !this.queue.isEmpty()) {
                SubAction last = this.queue.getLast();
                if (last.config == config && last.action == action) {
                    while (last.sub.hasCapacity()) {
                        RecordCursor cur = sub.next();
                        if (cur == null) {
                            sub.release();
                            return;
                        }
                        last.sub.append(cur);
                    }
                }
            }
            SubAction a = this.getSubActionInstance();
            a.config = config;
            a.action = action;
            a.sub = sub;
            a.notify = 0;
            this.queue.add(a);
            this.scheduleIfNeeded();
        }

        synchronized void addActionAndCopySource(Config config, byte action, RecordSource source) {
            RecordCursor cur;
            if (this.isClosed()) {
                return;
            }
            RecordBuffer sub = null;
            if (!this.queue.isEmpty()) {
                SubAction last = this.queue.getLast();
                if (last.config == config && last.action == action && last.sub.hasCapacity()) {
                    sub = last.sub;
                }
            }
            while ((cur = source.next()) != null) {
                if (sub == null) {
                    sub = RecordBuffer.getInstance(source.getMode());
                    sub.setCapacityLimited(true);
                    SubAction a = this.getSubActionInstance();
                    a.config = config;
                    a.action = action;
                    a.sub = sub;
                    a.notify = 0;
                    this.queue.add(a);
                }
                sub.append(cur);
                if (sub.hasCapacity()) continue;
                sub = null;
            }
            this.scheduleIfNeeded();
        }

        synchronized void addActionListToHeadAndConsumeBuffers(Config config, byte action, List<RecordBuffer> subList) {
            assert (this.running);
            if (this.isClosed()) {
                return;
            }
            int i = subList.size();
            while (--i >= 0) {
                RecordBuffer sub = subList.get(i);
                SubAction a = this.getSubActionInstance();
                a.config = config;
                a.action = action;
                a.sub = sub;
                a.notify = 0;
                this.queue.addFirst(a);
            }
        }

        private SubAction getSubActionInstance() {
            SubAction a = this.pooledAction;
            if (a == null) {
                a = new SubAction();
            } else {
                this.pooledAction = null;
            }
            return a;
        }

        synchronized void addCloseAction() {
            this.queue.clear();
            this.addAction(CLOSED_CONFIG, (byte)0);
        }

        private boolean isClosed() {
            assert (Thread.holdsLock(this));
            return !this.queue.isEmpty() && this.queue.getFirst().action == 0;
        }
    }

    private static class SubAction {
        Config config;
        byte action;
        RecordBuffer sub;
        int notify;

        private SubAction() {
        }
    }
}

