/*
 * Decompiled with CFR 0.152.
 */
package com.devexperts.qd.impl.matrix;

import com.devexperts.logging.Logging;
import com.devexperts.qd.DataRecord;
import com.devexperts.qd.DataScheme;
import com.devexperts.qd.QDAgent;
import com.devexperts.qd.QDCollector;
import com.devexperts.qd.QDContract;
import com.devexperts.qd.QDDistributor;
import com.devexperts.qd.QDErrorHandler;
import com.devexperts.qd.QDFilter;
import com.devexperts.qd.QDLog;
import com.devexperts.qd.SubscriptionFilter;
import com.devexperts.qd.impl.AbstractCollector;
import com.devexperts.qd.impl.matrix.Agent;
import com.devexperts.qd.impl.matrix.AgentBuffer;
import com.devexperts.qd.impl.matrix.ClosingAgentsQueue;
import com.devexperts.qd.impl.matrix.CollectorDebug;
import com.devexperts.qd.impl.matrix.Distribution;
import com.devexperts.qd.impl.matrix.Distributor;
import com.devexperts.qd.impl.matrix.DistributorsList;
import com.devexperts.qd.impl.matrix.GlobalLock;
import com.devexperts.qd.impl.matrix.Hashing;
import com.devexperts.qd.impl.matrix.LockBoundTaskQueue;
import com.devexperts.qd.impl.matrix.Mapper;
import com.devexperts.qd.impl.matrix.Notification;
import com.devexperts.qd.impl.matrix.RecordCursorKeeper;
import com.devexperts.qd.impl.matrix.RecordsContainer;
import com.devexperts.qd.impl.matrix.SubMatrix;
import com.devexperts.qd.impl.matrix.SubSnapshot;
import com.devexperts.qd.impl.matrix.SubscriptionDumpVisitor;
import com.devexperts.qd.impl.matrix.TotalSubMatrix;
import com.devexperts.qd.impl.matrix.management.CollectorCounters;
import com.devexperts.qd.impl.matrix.management.CollectorManagement;
import com.devexperts.qd.impl.matrix.management.CollectorOperation;
import com.devexperts.qd.ng.EventFlag;
import com.devexperts.qd.ng.RecordCursor;
import com.devexperts.qd.ng.RecordMode;
import com.devexperts.qd.ng.RecordSink;
import com.devexperts.qd.ng.RecordSource;
import com.devexperts.qd.stats.QDStats;
import com.devexperts.util.ArrayUtil;
import com.devexperts.util.SystemProperties;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.Executor;

public abstract class Collector
extends AbstractCollector
implements RecordsContainer {
    public static final boolean TRACE_LOG = Collector.class.desiredAssertionStatus();
    protected final Logging log = Logging.getLogging(this.getClass());
    static final int EXAMINE_BATCH_SIZE = SystemProperties.getIntProperty(Collector.class, "examineBatchSize", 10000, 1, Integer.MAX_VALUE);
    private static final int INITIAL_AGENTS_SIZE = 8;
    static final int KEY = 0;
    static final int RID = 1;
    static final int NEXT_AGENT = 2;
    static final int NEXT_INDEX = 3;
    static final int TOTAL_AGENT_STEP = 4;
    static final int TIME_TOTAL = 4;
    static final int TIME_TOTAL_X = 5;
    static final int TOTAL_HISTORY_AGENT_STEP = 6;
    static final int PREV_AGENT = 4;
    static final int STREAM_AGENT_STEP = 5;
    static final int SNAPSHOT_QUEUE = 5;
    static final int UPDATE_QUEUE = 6;
    static final int TICKER_AGENT_STEP = 7;
    static final int HISTORY_SUB_FLAGS = 6;
    static final int TIME_SUB = 7;
    static final int TIME_SUB_X = 8;
    static final int TIME_KNOWN = 9;
    static final int TIME_KNOWN_X = 10;
    static final int LAST_RECORD = 11;
    static final int LAST_RECORD_X = 12;
    static final int HISTORY_AGENT_STEP = 13;
    static final int HISTORY_BUFFER = 0;
    static final int TOTAL_OBJ_STEP = 0;
    static final int TOTAL_HISTORY_OBJ_STEP = 1;
    static final int ATTACHMENT = 0;
    static final int AGENT_OBJ_STEP = 0;
    static final int AGENT_ATTACHMENT_OBJ_STEP = 1;
    static final int NOTIFY_SUB_TOTAL_ADDED = 1;
    static final int NOTIFY_SUB_TOTAL_REMOVED = 2;
    static final int NOTIFY_SUB_SNAPSHOT_AVAILABLE = 4;
    static final int NOTIFY_SUB_DATA_AVAILABLE = 8;
    static final int NOTIFY_SUB_HAS_MORE = 16;
    static final int NOTIFY_SUB_PHASE2 = 32;
    static final int TOTAL_AGENT_INDEX = 1;
    static final int MIN_AGENT_INDEX = 2;
    static final int MIN_DISTRIBUTOR_INDEX = 1;
    static final int PREV_AGENT_MASK = Integer.MAX_VALUE;
    static final int PREV_AGENT_SET = Integer.MIN_VALUE;
    static final int QUEUE_BIT = Integer.MIN_VALUE;
    final CollectorManagement management;
    final CollectorCounters counters;
    CollectorCounters snapshotCounters;
    final RecordCursorKeeper keeper = new RecordCursorKeeper();
    final GlobalLock globalLock;
    final DataScheme scheme;
    final Mapper mapper;
    final boolean hasTime;
    final DataRecord[] records;
    final QDStats stats;
    final QDStats statsStorage;
    final Agent total;
    Agent[] agents;
    private int lastAgentIndex = 2;
    final DistributorsList distributors = new DistributorsList();
    boolean storeEverything;
    QDFilter storeEverythingFilter = QDFilter.ANYTHING;
    volatile QDErrorHandler errorHandler;
    private final ClosingAgentsQueue closingAgentsQueue = new ClosingAgentsQueue();
    int subNotifyAccumulator;
    int subStepsRemaining;
    final LockBoundTaskQueue lockBoundTaskQueue = new LockBoundTaskQueue();

    Collector(QDCollector.Builder<?> builder, boolean hasTime, boolean has_storage) {
        super(builder);
        this.management = CollectorManagement.getInstance(builder.getScheme(), this.getContract(), builder.getStats().getFullKeyProperties());
        this.counters = this.management.createCounters();
        this.globalLock = new GlobalLock(this.management, this.counters, this.keeper);
        this.scheme = builder.getScheme();
        this.mapper = new Mapper(this);
        this.hasTime = hasTime;
        this.records = new DataRecord[this.scheme.getRecordCount()];
        int i = this.records.length;
        while (--i >= 0) {
            this.records[i] = this.scheme.getRecord(i);
        }
        this.stats = builder.getStats();
        this.statsStorage = has_storage ? this.stats.create(QDStats.SType.STORAGE_DATA) : null;
        this.mapper.incMaxCounter(this.scheme.getRecordCount());
        QDStats unique_sub_stats = this.stats.create(QDStats.SType.UNIQUE_SUB);
        this.total = new Agent(this, 1, this.agentBuilder(), unique_sub_stats);
        this.total.sub = new TotalSubMatrix(this.mapper, hasTime ? 6 : 4, hasTime ? 1 : 0, 2, 0, 0, 29, unique_sub_stats);
        this.agents = new Agent[8];
        this.agents[this.total.number] = this.total;
        this.errorHandler = this.scheme.getService(QDErrorHandler.class);
        if (this.errorHandler == null) {
            this.errorHandler = QDErrorHandler.DEFAULT;
        }
        this.management.addCollector(this);
    }

    abstract Agent createAgentInternal(int var1, QDAgent.Builder var2, QDStats var3);

    AgentBuffer createAgentBuffer(Agent agent) {
        return new AgentBuffer(agent);
    }

    RecordMode getAgentBufferMode(Agent agent) {
        throw new UnsupportedOperationException();
    }

    public CollectorManagement getManagement() {
        return this.management;
    }

    public CollectorCounters getCountersSinceStart() {
        return this.counters.snapshot();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CollectorCounters getCountersSinceSnapshot() {
        CollectorCounters collectorCounters = this.counters;
        synchronized (collectorCounters) {
            return this.counters.since(this.snapshotCounters);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void snapshotCounters() {
        CollectorCounters collectorCounters = this.counters;
        synchronized (collectorCounters) {
            this.snapshotCounters = this.counters.snapshot();
        }
    }

    @Override
    public QDStats getStats() {
        return this.stats;
    }

    public void dumpSubscription(SubscriptionDumpVisitor visitor) throws IOException {
        HashMap<Agent, Integer> aid_map = new HashMap<Agent, Integer>();
        SubMatrix tsub = this.total.sub;
        visitor.visitCollector(System.identityHashCode(this), this.stats.getFullKeyProperties(), this.contract.toString(), this.hasTime);
        int tindex = tsub.matrix.length;
        while ((tindex -= tsub.step) > 0) {
            Agent agent;
            int nagent = tsub.getInt(tindex + 2);
            if (nagent <= 0) continue;
            int nindex = tsub.getInt(tindex + 3);
            int key = tsub.getVolatileInt(tindex);
            if (key == 0) continue;
            int cipher = this.getCipher(key);
            String symbol = null;
            if (cipher == 0 && (symbol = this.mapper.getMapping().getSymbolIfPresent(key)) == null) continue;
            int rid = tsub.getInt(tindex + 1);
            visitor.visitRecord(this.records[rid]);
            visitor.visitSymbol(cipher, symbol);
            Agent[] agents = this.agents;
            while (nagent > 0 && nagent < agents.length && (agent = agents[nagent]) != null) {
                SubMatrix nsub = agent.sub;
                if (nindex >= nsub.matrix.length || key != nsub.getInt(nindex) || rid != nsub.getInt(nindex + 1)) break;
                Integer aid_ext = (Integer)aid_map.get(agent);
                if (aid_ext != null) {
                    visitor.visitAgentAgain(aid_ext);
                } else {
                    aid_ext = aid_map.size();
                    aid_map.put(agent, aid_ext);
                    visitor.visitAgentNew(aid_ext, agent.getStats().getKeyProperties());
                }
                if (this.hasTime) {
                    visitor.visitTime(nsub.getInt(nindex + 7), nsub.getInt(nindex + 8));
                }
                nagent = nsub.getInt(nindex + 2);
                nindex = nsub.getInt(nindex + 3);
            }
            visitor.visitEndOfChain();
        }
        visitor.visitEndOfCollector();
    }

    boolean retrieveData(Agent agent, RecordSink sink, boolean snapshotOnly) {
        try {
            return this.retrieveDataImpl(agent, sink, snapshotOnly);
        }
        catch (Throwable error) {
            this.management.setFatalError(error);
            throw error;
        }
    }

    abstract boolean retrieveDataImpl(Agent var1, RecordSink var2, boolean var3);

    private void startSubChangeBatch(int notify) {
        this.subStepsRemaining = this.management.getSubscriptionBucket();
        this.subNotifyAccumulator = notify & 0xFFFFFFEF;
    }

    private int doneSubChangeBatch() {
        if (this.subStepsRemaining == 0) {
            this.subNotifyAccumulator |= 0x10;
        }
        return this.subNotifyAccumulator;
    }

    private void subscriptionChangeComplete(Agent agent) {
        if (agent.reducedSub) {
            this.rehashAgentIfNeeded(agent);
            this.rehashAgentIfNeeded(this.total);
            this.refilterStreamBuffersAfterSubscriptionChange(agent);
            agent.reducedSub = false;
        }
    }

    private void notifySubChange(int notify, Agent agent) {
        if (TRACE_LOG) {
            this.log.trace("notifySubChange" + ((notify & 1) != 0 ? " TOTAL_ADDED" : "") + ((notify & 2) != 0 ? " TOTAL_REMOVED" : "") + ((notify & 4) != 0 ? " SNAPSHOT_AVAILABLE" : "") + ((notify & 8) != 0 ? " DATA_AVAILABLE" : "") + " for " + agent);
        }
        if ((notify & 1) != 0) {
            this.distributors.notifyAdded();
        }
        if ((notify & 2) != 0) {
            this.distributors.notifyRemoved();
        }
        if ((notify & 4) != 0) {
            agent.notifySnapshotListener();
        }
        if ((notify & 8) != 0) {
            agent.notifyDataListener();
        }
    }

    int addSubscriptionPart(Agent agent, RecordSource source, int notify) {
        try {
            return this.addSubscriptionPartImpl(agent, source, notify);
        }
        catch (Throwable error) {
            this.management.setFatalError(error);
            throw error;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    int addSubscriptionPartImpl(Agent agent, RecordSource source, int notify) {
        if (agent.isClosed()) {
            return 0;
        }
        this.globalLock.lock(CollectorOperation.ADD_SUBSCRIPTION);
        try {
            this.startSubChangeBatch(notify);
            this.addSubscriptionGLocked(agent, source);
            notify = this.doneSubChangeBatch();
        }
        finally {
            this.globalLock.unlock();
        }
        if ((notify & 0x10) != 0) {
            return notify;
        }
        this.notifySubChange(notify, agent);
        return 0;
    }

    private void addSubscriptionGLocked(Agent agent, RecordSource source) {
        if (this.helpClose() || agent.isClosed()) {
            return;
        }
        agent.localLock.lock(CollectorOperation.ADD_SUBSCRIPTION);
        try {
            this.addSubscriptionGLLocked(agent, source);
        }
        finally {
            agent.localLock.unlock();
        }
    }

    private void addSubscriptionGLLocked(Agent agent, RecordSource source) {
        RecordCursor cur;
        ++agent.subModCount;
        if (agent.performSetterCleanupSteps()) {
            return;
        }
        while ((cur = source.next()) != null) {
            if (EventFlag.REMOVE_SYMBOL.in(cur.getEventFlags())) {
                this.removeSubInternal(agent, cur);
            } else if (this.isSubAllowed(agent, cur.getRecord(), cur.getCipher(), cur.getSymbol())) {
                this.addSubInternal(agent, cur, false);
            }
            if (--this.subStepsRemaining > 0) continue;
            return;
        }
        this.subscriptionChangeComplete(agent);
    }

    int removeSubscriptionPart(Agent agent, RecordSource source, int notify) {
        try {
            return this.removeSubscriptionPartImpl(agent, source, notify);
        }
        catch (Throwable error) {
            this.management.setFatalError(error);
            throw error;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    int removeSubscriptionPartImpl(Agent agent, RecordSource source, int notify) {
        if (agent.isClosed()) {
            return 0;
        }
        this.globalLock.lock(CollectorOperation.REMOVE_SUBSCRIPTION);
        try {
            this.startSubChangeBatch(notify);
            this.removeSubscriptionGLocked(agent, source);
            notify = this.doneSubChangeBatch();
        }
        finally {
            this.globalLock.unlock();
        }
        if ((notify & 0x10) != 0) {
            return notify;
        }
        this.notifySubChange(notify, agent);
        return 0;
    }

    private void removeSubscriptionGLocked(Agent agent, RecordSource source) {
        if (this.helpClose() || agent.isClosed()) {
            return;
        }
        agent.localLock.lock(CollectorOperation.REMOVE_SUBSCRIPTION);
        try {
            this.removeSubscriptionGLLocked(agent, source);
        }
        finally {
            agent.localLock.unlock();
        }
    }

    private void removeSubscriptionGLLocked(Agent agent, RecordSource source) {
        RecordCursor cur;
        ++agent.subModCount;
        if (agent.performSetterCleanupSteps()) {
            return;
        }
        while ((cur = source.next()) != null) {
            this.removeSubInternal(agent, cur);
            if (--this.subStepsRemaining > 0) continue;
            return;
        }
        this.subscriptionChangeComplete(agent);
    }

    int setSubscriptionPart(Agent agent, RecordSource source, int notify) {
        try {
            return this.setSubscriptionPartImpl(agent, source, notify);
        }
        catch (Throwable error) {
            this.management.setFatalError(error);
            throw error;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    int setSubscriptionPartImpl(Agent agent, RecordSource source, int notify) {
        if (agent.isClosed()) {
            return 0;
        }
        this.globalLock.lock(CollectorOperation.SET_SUBSCRIPTION);
        try {
            this.startSubChangeBatch(notify);
            this.setSubscriptionGLocked(agent, source);
            notify = this.doneSubChangeBatch();
        }
        finally {
            this.globalLock.unlock();
        }
        if ((notify & 0x10) != 0) {
            return notify;
        }
        this.notifySubChange(notify, agent);
        return 0;
    }

    private void setSubscriptionGLocked(Agent agent, RecordSource source) {
        if (this.helpClose() || agent.isClosed()) {
            return;
        }
        agent.localLock.lock(CollectorOperation.SET_SUBSCRIPTION);
        try {
            this.setSubscriptionGLLocked(agent, source);
        }
        finally {
            agent.localLock.unlock();
        }
    }

    private void setSubscriptionGLLocked(Agent agent, RecordSource source) {
        ++agent.subModCount;
        if (agent.performSetterCleanupSteps()) {
            return;
        }
        if ((this.subNotifyAccumulator & 0x20) == 0) {
            RecordCursor cur;
            while ((cur = source.next()) != null) {
                if (!this.isSubAllowed(agent, cur.getRecord(), cur.getCipher(), cur.getSymbol())) continue;
                this.addSubInternal(agent, cur, true);
                if (--this.subStepsRemaining > 0) continue;
                return;
            }
            this.subNotifyAccumulator |= 0x20;
            agent.startSetterCleanup();
            if (agent.performSetterCleanupSteps()) {
                return;
            }
        }
        this.subscriptionChangeComplete(agent);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    int closeAgentPartImpl(Agent agent, RecordSink sink, int notify) {
        if (agent.isCloseCompleted()) {
            return 0;
        }
        this.globalLock.lock(CollectorOperation.CLOSE_AGENT);
        try {
            this.startSubChangeBatch(notify);
            this.closeAgentGLocked(agent, sink);
            notify = this.doneSubChangeBatch();
        }
        finally {
            this.globalLock.unlock();
        }
        if ((notify & 0x10) != 0) {
            return notify;
        }
        this.notifySubChange(notify, agent);
        return 0;
    }

    private void closeAgentGLocked(Agent agent, RecordSink sink) {
        if (!agent.isClosed()) {
            agent.startClose(sink);
            this.closingAgentsQueue.add(agent);
        }
        this.helpClose();
    }

    private boolean helpClose() {
        while (this.subStepsRemaining > 0) {
            Agent agent = this.closingAgentsQueue.peek();
            if (agent == null) {
                return false;
            }
            agent.localLock.lock(CollectorOperation.CLOSE_AGENT);
            try {
                this.closeAgentGLLocked(agent);
            }
            finally {
                agent.localLock.unlock();
            }
        }
        return true;
    }

    private void closeAgentGLLocked(Agent agent) {
        ++agent.subModCount;
        if (agent.performCloseSteps()) {
            return;
        }
        this.agents[agent.number] = null;
    }

    @Override
    public DataScheme getScheme() {
        return this.scheme;
    }

    @Override
    public boolean isStoreEverything() {
        return this.storeEverything;
    }

    @Override
    public void setStoreEverything(boolean storeEverything) {
        this.storeEverything = storeEverything;
    }

    @Override
    public void setStoreEverythingFilter(SubscriptionFilter filter) {
        this.storeEverythingFilter = QDFilter.fromFilter(filter, this.scheme);
    }

    boolean shouldStoreEverything(DataRecord record, int cipher, String symbol) {
        return this.storeEverything && this.storeEverythingFilter.accept(this.contract, record, cipher, symbol);
    }

    boolean shouldStoreEverything(int key, int rid) {
        return this.storeEverything && (this.storeEverythingFilter == QDFilter.ANYTHING || this.storeEverythingFilter.accept(this.contract, this.records[rid], this.getCipher(key), this.getSymbol(key)));
    }

    @Override
    public QDAgent buildAgent(QDAgent.Builder builder) {
        this.globalLock.lock(CollectorOperation.CREATE_AGENT);
        try {
            QDAgent qDAgent = this.createAgentGLocked(builder);
            return qDAgent;
        }
        finally {
            this.globalLock.unlock();
        }
    }

    private QDAgent createAgentGLocked(QDAgent.Builder builder) {
        this.lastAgentIndex = ArrayUtil.findFreeIndex(this.agents, this.lastAgentIndex, 2);
        if (this.lastAgentIndex >= this.agents.length) {
            this.agents = ArrayUtil.grow(this.agents, 0);
        }
        this.mapper.incMaxCounter(this.scheme.getRecordCount());
        QDStats agentStats = this.stats.create(QDStats.SType.AGENT, builder.getKeyProperties(), builder.getKeyProperties() != null);
        this.agents[this.lastAgentIndex] = this.createAgentInternal(this.lastAgentIndex, builder, agentStats);
        return this.agents[this.lastAgentIndex];
    }

    @Override
    public QDDistributor buildDistributor(QDDistributor.Builder builder) {
        return this.distributors.createDistributor(this, builder);
    }

    @Override
    public String getSymbol(char[] chars, int offset, int length) {
        return this.mapper.getMapping().getSymbolIfPresent(chars, offset, length);
    }

    @Override
    public void executeLockBoundTask(Executor executor, Runnable task) {
        this.lockBoundTaskQueue.add(executor, task);
    }

    @Override
    public void close() {
        this.management.removeCollector(this);
        this.stats.close();
    }

    abstract int getNotificationBits(Agent var1);

    final int getCipher(int key) {
        if ((key & 0xC0000000) == 0) {
            return 0;
        }
        return key;
    }

    final String getSymbol(int key) {
        if ((key & 0xC0000000) == 0) {
            return this.mapper.getSymbol(key);
        }
        return null;
    }

    final String getDecodedSymbol(int key) {
        if ((key & 0xC0000000) == 0) {
            return this.mapper.getSymbol(key);
        }
        return this.scheme.getCodec().decode(key);
    }

    @Override
    public final DataRecord getRecord(int rid) {
        return this.records[rid];
    }

    final int getRid(DataRecord record) {
        int rid = record.getId();
        if (this.records[rid] == record) {
            return rid;
        }
        throw new IllegalArgumentException("Unknown record");
    }

    final int getKey(int cipher, String symbol) {
        if ((cipher & 0xC0000000) == 0) {
            if (cipher != 0) {
                throw new IllegalArgumentException("Reserved cipher");
            }
            return this.mapper.getMapping().getKey(symbol);
        }
        return cipher;
    }

    final int addKey(int cipher, String symbol) {
        if ((cipher & 0xC0000000) == 0) {
            if (cipher != 0) {
                throw new IllegalArgumentException("Reserved cipher");
            }
            return this.mapper.addKey(symbol);
        }
        return cipher;
    }

    final void rehashAgentIfNeeded(Agent agent) {
        if (Hashing.needRehash(agent.sub.shift, agent.sub.overallSize, agent.sub.payloadSize, 29)) {
            this.rehashAgent(agent);
        }
    }

    final void rehashAgent(Agent agent) {
        if (agent == this.total) {
            this.prepareTotalSubForRehash();
            this.total.sub = this.total.sub.rehash(29);
            return;
        }
        int oldSnapshotHead = agent.snapshotQueue == null ? 0 : agent.snapshotQueue.getHead();
        int oldUpdateHead = agent.updateQueue == null ? 0 : agent.updateQueue.getHead();
        SubMatrix osub = agent.sub;
        SubMatrix asub = agent.sub = osub.rehash(29);
        int aindex = asub.matrix.length;
        while ((aindex -= asub.step) >= 0) {
            int pagent = asub.getInt(aindex + 4) & Integer.MAX_VALUE;
            if (pagent == 0) continue;
            SubMatrix psub = this.agents[pagent].sub;
            int pindex = psub.getIndex(asub.getInt(aindex), asub.getInt(aindex + 1), 0);
            if (pindex == 0) {
                throw new IllegalStateException("Previous agent misses entry");
            }
            psub.setInt(pindex + 3, aindex);
        }
        if (agent.snapshotQueue != null) {
            agent.snapshotQueue.fixQueue(agent, oldSnapshotHead, osub, 5);
        }
        if (agent.updateQueue != null) {
            agent.updateQueue.fixQueue(agent, oldUpdateHead, osub, 6);
        }
    }

    protected boolean isSubAllowed(Agent agent, DataRecord record, int cipher, String symbol) {
        return agent.filter.getUpdatedFilter().accept(this.contract, record, cipher, symbol);
    }

    protected long trimSubTime(RecordCursor cur) {
        return cur.getTime();
    }

    void addSubInternal(Agent agent, RecordCursor cur, boolean setSub) {
        assert (agent != this.total);
        this.rehashAgentIfNeeded(agent);
        this.rehashAgentIfNeeded(this.total);
        int rid = this.getRid(cur.getRecord());
        int key = this.addKey(cur.getCipher(), cur.getSymbol());
        long time = this.trimSubTime(cur);
        SubMatrix asub = agent.sub;
        int aindex = asub.addIndexBegin(key, rid);
        boolean newSub = asub.getInt(aindex + 4) == 0;
        boolean wasPayload = asub.isPayload(aindex);
        SubMatrix tsub = this.total.sub;
        int tindex = tsub.addIndexBegin(key, rid);
        boolean sameSub = false;
        boolean totalRecordAdded = false;
        boolean reduceTimeTotal = false;
        long timeTotal = 0L;
        if (agent.hasAttachmentStrategy()) {
            asub.setObj(aindex, 0, newSub || setSub ? cur.getAttachment() : agent.updateAttachment(asub.getObj(aindex, 0), cur, false));
        }
        if (newSub) {
            if (this.hasTime) {
                asub.setInt(aindex + 6, this.getHistoryTimeSubFlags(cur, time));
                asub.setLong(aindex + 7, time);
                asub.setLong(aindex + 9, Long.MAX_VALUE);
                asub.setLong(aindex + 11, 0L);
            }
            int nagent = tsub.getInt(tindex + 2);
            int nindex = tsub.getInt(tindex + 3);
            asub.setInt(aindex + 2, nagent > 0 ? nagent : 0);
            asub.setInt(aindex + 3, nindex);
            asub.setInt(aindex + 4, this.total.number);
            tsub.setInt(tindex + 2, agent.number);
            tsub.setInt(tindex + 3, aindex);
            if (nagent > 0) {
                SubMatrix nsub = this.agents[nagent].sub;
                int nset = nsub.getInt(nindex + 4) & Integer.MIN_VALUE;
                nsub.setInt(nindex + 4, agent.number | nset);
                if (this.hasTime && time < tsub.getLong(tindex + 4)) {
                    tsub.setLong(tindex + 4, time);
                    totalRecordAdded = true;
                }
            } else {
                if (this.hasTime) {
                    tsub.setLong(tindex + 4, time);
                }
                if (nagent == 0) {
                    tsub.updateAddedPayload(rid);
                }
                totalRecordAdded = true;
            }
            if (!wasPayload) {
                asub.updateAddedPayload(rid);
            }
        } else if (this.hasTime) {
            long timePrev = asub.getLong(aindex + 7);
            boolean timeSubFlagsChanged = this.updateHistoryTimeSubFlags(cur, time, asub, aindex);
            boolean bl = sameSub = time == timePrev && !timeSubFlagsChanged;
            if (!sameSub) {
                asub.setLong(aindex + 7, time);
                timeTotal = tsub.getLong(tindex + 4);
                if (time < timeTotal) {
                    tsub.setLong(tindex + 4, time);
                    totalRecordAdded = true;
                } else if (time > timePrev && timePrev == timeTotal) {
                    reduceTimeTotal = true;
                }
            }
        } else {
            sameSub = true;
        }
        if (TRACE_LOG) {
            this.log.trace("addSubInternal " + cur.getRecord().getName() + ":" + cur.getDecodedSymbol() + "@" + time + " setSub=" + setSub + " newSub=" + newSub + " wasPayload=" + wasPayload + " totalRecordAdded=" + totalRecordAdded + " reduceTimeTotal=" + reduceTimeTotal + " sameSub=" + sameSub + " for " + agent);
        }
        if (setSub) {
            int pagent = asub.getInt(aindex + 4);
            asub.setInt(aindex + 4, pagent | Integer.MIN_VALUE);
            if (sameSub) {
                return;
            }
        }
        asub.addIndexComplete(aindex, key, rid);
        tsub.addIndexComplete(tindex, key, rid);
        assert (!totalRecordAdded || !reduceTimeTotal);
        if (totalRecordAdded) {
            this.totalRecordAdded(key, rid, tsub, tindex, time);
        }
        if (reduceTimeTotal) {
            this.reduceTimeTotal(key, rid, tsub, tindex, timeTotal);
        }
        this.enqueueAddedRecord(agent, asub, aindex);
    }

    private boolean updateHistoryTimeSubFlags(RecordCursor cur, long time, SubMatrix asub, int aindex) {
        int prevFlags = asub.getInt(aindex + 6) & Integer.MIN_VALUE;
        int newFlags = this.getHistoryTimeSubFlags(cur, time);
        asub.setInt(aindex + 6, newFlags);
        return newFlags != prevFlags;
    }

    private int getHistoryTimeSubFlags(RecordCursor cur, long time) {
        return time == cur.getTime() ? 0 : Integer.MIN_VALUE;
    }

    void prepareTotalSubForRehash() {
    }

    void totalRecordAdded(int key, int rid, SubMatrix tsub, int tindex, long time) {
        if (this.distributors.addSub(key, rid, time)) {
            this.subNotifyAccumulator |= 1;
        }
    }

    boolean totalRecordRemoved(int key, int rid, SubMatrix tsub, int tindex) {
        if (this.distributors.removeSub(key, rid)) {
            this.subNotifyAccumulator |= 2;
        }
        return true;
    }

    void enqueueAddedRecord(Agent agent, SubMatrix asub, int aindex) {
    }

    void dequeueRemovedRecord(Agent agent, SubMatrix asub, int aindex) {
    }

    boolean keepInStreamBufferOnRefilter(Agent agent, RecordCursor cur) {
        return true;
    }

    void refilterStreamBuffersAfterSubscriptionChange(Agent agent) {
    }

    void removeSubInternal(Agent agent, RecordCursor cur) {
        int rid;
        SubMatrix asub = agent.sub;
        int key = this.getKey(cur.getCipher(), cur.getSymbol());
        int aindex = asub.getIndex(key, rid = this.getRid(cur.getRecord()), 0);
        int pagent = asub.getInt(aindex + 4) & Integer.MAX_VALUE;
        if (pagent == 0) {
            return;
        }
        if (agent.hasAttachmentStrategy()) {
            Object attachment = agent.updateAttachment(asub.getObj(aindex, 0), cur, true);
            asub.setObj(aindex, 0, attachment);
            if (attachment != null) {
                return;
            }
        }
        this.removeSubInternalExisting(agent, asub, aindex, pagent, key, rid);
    }

    void removeSubInternalExistingByIndex(Agent agent, int aindex, int pagent) {
        SubMatrix asub = agent.sub;
        this.removeSubInternalExisting(agent, asub, aindex, pagent, asub.getInt(aindex + 0), asub.getInt(aindex + 1));
    }

    void removeSubInternalExisting(Agent agent, SubMatrix asub, int aindex, int pagent, int key, int rid) {
        boolean totalRecordRemoved = false;
        SubMatrix psub = this.agents[pagent].sub;
        int pindex = psub.getIndex(key, rid, 0);
        if (pindex == 0) {
            throw new IllegalStateException("Previous agent misses entry");
        }
        int nagent = asub.getInt(aindex + 2);
        int nindex = asub.getInt(aindex + 3);
        psub.setInt(pindex + 2, nagent);
        psub.setInt(pindex + 3, nindex);
        asub.setInt(aindex + 2, 0);
        asub.setInt(aindex + 3, 0);
        asub.setInt(aindex + 4, 0);
        if (nagent > 0) {
            SubMatrix nsub = this.agents[nagent].sub;
            int nset = nsub.getInt(nindex + 4) & Integer.MIN_VALUE;
            nsub.setInt(nindex + 4, pagent | nset);
        } else if (pagent == this.total.number) {
            totalRecordRemoved = true;
        }
        if (this.hasTime) {
            this.removeSubInternalExistingTime(asub, aindex, key, rid, psub, pindex, nagent > 0 || pagent != this.total.number);
        }
        agent.reducedSub = true;
        if (!asub.isPayload(aindex)) {
            asub.updateRemovedPayload(rid);
        }
        if (TRACE_LOG) {
            this.log.trace("removeSubInternal " + this.records[rid].getName() + ":" + this.getDecodedSymbol(key) + " totalRecordRemoved=" + totalRecordRemoved + " for " + agent);
        }
        if (totalRecordRemoved && this.totalRecordRemoved(key, rid, psub, pindex)) {
            psub.updateRemovedPayload(rid);
        }
        this.dequeueRemovedRecord(agent, asub, aindex);
    }

    private void removeSubInternalExistingTime(SubMatrix asub, int aindex, int key, int rid, SubMatrix psub, int pindex, boolean moreAgents) {
        if (moreAgents) {
            SubMatrix tsub = this.total.sub;
            int tindex = tsub.getIndex(key, rid, 0);
            if (tindex == 0 || tsub.getInt(tindex + 2) <= 0) {
                throw new IllegalStateException("Total agent misses entry");
            }
            long timeTotal = tsub.getLong(tindex + 4);
            long timePrev = asub.getLong(aindex + 7);
            if (timePrev == timeTotal) {
                this.reduceTimeTotal(key, rid, tsub, tindex, timeTotal);
            }
        } else {
            psub.setLong(pindex + 4, Long.MAX_VALUE);
        }
        asub.setInt(aindex + 6, 0);
        asub.setLong(aindex + 7, Long.MAX_VALUE);
        asub.setLong(aindex + 9, Long.MAX_VALUE);
    }

    private void reduceTimeTotal(int key, int rid, SubMatrix tsub, int tindex, long timeTotal) {
        int nagent = tsub.getInt(tindex + 2);
        int nindex = tsub.getInt(tindex + 3);
        long time = Long.MAX_VALUE;
        while (nagent > 0) {
            SubMatrix nsub = this.agents[nagent].sub;
            long t = nsub.getLong(nindex + 7);
            if (t < time) {
                if (t <= timeTotal) {
                    return;
                }
                time = t;
            }
            nagent = nsub.getInt(nindex + 2);
            nindex = nsub.getInt(nindex + 3);
        }
        tsub.setLong(tindex + 4, time);
        this.totalRecordAdded(key, rid, tsub, tindex, time);
    }

    void examineSubDataInternalByIndex(Agent agent, int aindex, RecordSink sink) {
    }

    boolean isSub(Agent agent, DataRecord record, int cipher, String symbol, long time, int timeOffset) {
        int rid;
        SubMatrix asub = agent.sub;
        int key = this.getKey(cipher, symbol);
        int index = asub.getVolatileIndex(key, rid = this.getRid(record), 0);
        return asub.isSubscribed(index) && (!this.hasTime || time >= asub.getLong(index + timeOffset));
    }

    @Override
    public boolean isSubscribed(DataRecord record, int cipher, String symbol, long time) {
        return this.isSub(this.total, record, cipher, symbol, time, 4);
    }

    boolean examineSub(Agent agent, RecordSink sink) {
        return new SubSnapshot(agent, QDFilter.ANYTHING).retrieveSubscription(sink);
    }

    @Override
    public boolean examineSubscription(RecordSink sink) {
        return this.examineSub(this.total, sink);
    }

    @Override
    public int getSubscriptionSize() {
        return this.total.sub.payloadSize;
    }

    @Override
    public void setErrorHandler(QDErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    void processData(Distributor distributor, RecordSource source) {
        try {
            this.processDataImpl(distributor, source);
        }
        catch (Throwable error) {
            this.management.setFatalError(error);
            throw error;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void processDataImpl(Distributor distributor, RecordSource source) {
        boolean make_more_pass;
        Distribution dist = Distribution.getInstance();
        dist.prepareCounters(this.records.length);
        Notification notif = null;
        do {
            dist.start(this.agents, this.management.getDistributionBucket());
            try {
                boolean blocked;
                make_more_pass = this.processRecordSource(distributor, dist, source);
                this.onBetweenProcessPhases();
                if (notif == null) {
                    notif = Notification.getInstance();
                }
                do {
                    this.processDataUpdate(dist, notif, source);
                    blocked = dist.hasBlocked();
                    if (!blocked) continue;
                    this.processDataNotify(notif);
                    notif.clear();
                    dist.enqueueBlocked();
                } while (blocked);
            }
            finally {
                dist.done();
            }
        } while (make_more_pass);
        dist.flushAndClearCounters(this.counters);
        dist.release();
        this.processDataNotify(notif);
        notif.release();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean processRecordSource(Distributor distributor, Distribution dist, RecordSource source) {
        this.globalLock.lock(CollectorOperation.PROCESS_DATA);
        try {
            boolean bl = this.processRecordSourceGLocked(distributor, dist, source);
            return bl;
        }
        finally {
            this.globalLock.unlock();
        }
    }

    abstract boolean processRecordSourceGLocked(Distributor var1, Distribution var2, RecordSource var3);

    abstract int processAgentDataUpdate(Distribution var1, RecordSource var2, Agent var3);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void processDataUpdate(Distribution dist, Notification notif, RecordSource buffer) {
        Agent agent;
        notif.ensureCapacity(dist.getMaxAgentNumber());
        int max_spins = this.management.getMaxDistributionSpins();
        int m_spins = 0;
        while ((agent = dist.firstAgent()) != null) {
            ++m_spins;
            int ignorable = dist.numberOfAgents() * (max_spins - 1) / max_spins;
            while (agent != null) {
                int blockIndex;
                if (ignorable <= 0) {
                    agent.localLock.lock(CollectorOperation.PROCESS_DATA);
                } else if (!agent.localLock.tryLock(CollectorOperation.PROCESS_DATA)) {
                    --ignorable;
                    agent = dist.nextAgent();
                    continue;
                }
                try {
                    blockIndex = this.processDataUpdateLLocked(agent, dist, notif, buffer);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    blockIndex = 0;
                }
                finally {
                    agent.localLock.unlock();
                }
                agent = dist.removeAgent(blockIndex);
            }
        }
        dist.countSpins(m_spins);
    }

    private int processDataUpdateLLocked(Agent agent, Distribution dist, Notification notif, RecordSource buffer) throws InterruptedException {
        int blockIndex = 0;
        if (!agent.isClosed()) {
            if (agent.buffer != null) {
                while (agent.buffer.isBlocked()) {
                    agent.localLock.await();
                }
            }
            int bits = this.getNotificationBits(agent);
            blockIndex = this.processAgentDataUpdate(dist, buffer, agent);
            if ((bits = ~bits & this.getNotificationBits(agent)) != 0) {
                notif.add(agent, bits);
            }
        }
        return blockIndex;
    }

    void processDataNotify(Notification notif) {
        Agent agent = notif.firstAgent();
        while (agent != null) {
            int bits = notif.getBits(agent);
            if ((bits & Integer.MIN_VALUE) != 0) {
                agent.notifySnapshotListener();
            }
            if ((bits & 0x40000000) != 0) {
                agent.notifyDataListener();
            }
            agent = notif.nextAgent(agent);
        }
    }

    void countRetrieval(Agent agent) {
        if (agent.nRetrieved > 0) {
            this.counters.countRetrieval(agent.nRetrieved);
            agent.nRetrieved = 0;
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.contract);
        if (this.stats != null) {
            sb.append("[").append(this.stats.getFullKeyProperties()).append("]");
        } else {
            sb.append("@").append(Integer.toHexString(this.hashCode()));
        }
        return sb.toString();
    }

    public void visitAgents(CollectorDebug.AgentVisitor av) {
        for (Agent agent : this.agents) {
            if (agent == null) continue;
            av.visitAgent(agent);
        }
    }

    <T extends CollectorDebug.SymbolReferenceVisitor> T visitSymbols(T srv, CollectorDebug.RehashCrashInfo rci) {
        for (Agent agent : this.agents) {
            if (agent == null) continue;
            CollectorDebug.visitAgentSymbols(srv, rci, agent);
        }
        this.distributors.visitDistributorsSymbols(srv);
        return srv;
    }

    public void verify(CollectorDebug.Log log, CollectorDebug.RehashCrashInfo rci) {
        log.info("--- Verifying " + this);
        log.info("Verifying symbols...");
        Mapper verifyMapper = new Mapper(this);
        this.visitSymbols(new CollectorDebug.VerifySymbolReferences(log, this.scheme, this.mapper, verifyMapper), rci).printSummary();
        this.mapper.getMapping().verify(log, verifyMapper.getMapping());
        log.info("Verifying agents subscription...");
        CollectorDebug.VerifySubscription verifySubscription = new CollectorDebug.VerifySubscription(log, this.scheme, this.mapper);
        for (Agent agent : this.agents) {
            if (agent == null || agent == this.total) continue;
            verifySubscription.agentNumber = agent.number;
            CollectorDebug.visitAgentSymbols(verifySubscription, rci, agent);
        }
        log.info("Found " + verifySubscription.totalSize + " entries in agents subscription");
        log.info("Verifying total subscription...");
        verifySubscription.verifyTotal(this.total.sub, this.agents);
        log.info("Verify completed");
    }

    public void analyzeQueue(CollectorDebug.Log log, String symbol, String record) {
        if (this.getContract() == QDContract.TICKER || this.getContract() == QDContract.HISTORY) {
            this.analyzeQueue(log, symbol, record, "snapshot", 5);
        }
        if (this.getContract() == QDContract.TICKER) {
            this.analyzeQueue(log, symbol, record, "update", 6);
        }
    }

    public void analyzeQueue(CollectorDebug.Log log, String symbol, String record, String name, int offset) {
        log.info("-- Analyzing " + name + " queue" + (symbol != null ? " symbol " + symbol : "") + (record != null ? " record " + record : "") + " in " + this);
        int key = this.getKeyIfPresent(symbol);
        int rid = this.getRidIfPresent(record);
        for (Agent agent : this.agents) {
            if (agent == null || agent == this.total) continue;
            this.analyzeQueueImpl(log, key, rid, agent, name, offset);
        }
        log.info("Analyze queue completed");
    }

    private void analyzeQueueImpl(CollectorDebug.Log log, int filterKey, int filterRid, Agent agent, String name, int offset) {
        SubMatrix sub = agent.sub;
        int index = sub.matrix.length;
        while ((index -= sub.step) >= 0) {
            int queueNext;
            if (!sub.isPayload(index) || ((queueNext = sub.getInt(index + offset)) & Integer.MAX_VALUE) == 0 || offset == 5 && (queueNext & Integer.MIN_VALUE) == 0) continue;
            int key = sub.getInt(index + 0);
            int rid = sub.getInt(index + 1);
            if (filterKey != -1 && filterKey != key || filterRid != -1 && filterRid != rid) continue;
            StringBuilder sb = new StringBuilder("Found in " + name + " queue " + CollectorDebug.fmtKeyRid(this.scheme, this.mapper, key, rid) + " at " + index);
            if (this.hasTime) {
                long lastRecord;
                sb.append(" time sub ").append(sub.getInt(index + 7)).append(" ").append(sub.getInt(index + 8));
                sb.append(" time known ").append(sub.getInt(index + 9)).append(" ").append(sub.getInt(index + 10));
                long lastRecordWithBit = sub.getLong(index + 11);
                if ((lastRecordWithBit & Long.MIN_VALUE) != 0L) {
                    sb.append(" TX_DIRTY");
                }
                if ((lastRecord = lastRecordWithBit & Long.MIN_VALUE) == 0L) {
                    sb.append(" not in buffer");
                } else if (agent.buffer.isInBuffer(lastRecord)) {
                    sb.append(" still in buffer");
                } else {
                    sb.append(" dropped from buffer");
                }
            }
            sb.append(" of ").append(agent);
            log.info(sb.toString());
        }
    }

    public void analyzeSymbolRefs(CollectorDebug.Log log, String symbol, String record, CollectorDebug.RehashCrashInfo rci) {
        log.info("-- Analyzing symbols refs" + (symbol != null ? " symbol " + symbol : "") + (record != null ? " record " + record : "") + " in " + this);
        int key = this.getKeyIfPresent(symbol);
        int rid = this.getRidIfPresent(record);
        this.visitSymbols(new CollectorDebug.AnalyzeKeyRid(log, key, rid, this.scheme, this.mapper), rci);
        log.info("Analyze completed");
    }

    private int getRidIfPresent(String record) {
        if (record == null) {
            return -1;
        }
        DataRecord r = this.scheme.findRecordByName(record);
        if (r != null) {
            return r.getId();
        }
        QDLog.log.warn("Record is not found for " + record);
        return -1;
    }

    private int getKeyIfPresent(String symbol) {
        int key;
        int n = key = symbol == null ? -1 : this.scheme.getCodec().encode(symbol);
        if (key != 0) {
            return key;
        }
        key = this.mapper.getMapping().getKey(symbol);
        if (key != 0) {
            return key;
        }
        QDLog.log.warn("Key is not found for " + symbol);
        return -1;
    }

    protected void onBetweenProcessPhases() {
    }
}

