/*
 * Decompiled with CFR 0.152.
 */
package com.dxfeed.api.impl;

import com.devexperts.qd.QDAgent;
import com.devexperts.qd.QDContract;
import com.devexperts.qd.QDFilter;
import com.devexperts.qd.QDHistory;
import com.devexperts.qd.QDLog;
import com.devexperts.qd.QDTicker;
import com.devexperts.qd.kit.ArrayListAttachmentStrategy;
import com.devexperts.qd.ng.AbstractRecordSink;
import com.devexperts.qd.ng.EventFlag;
import com.devexperts.qd.ng.RecordBuffer;
import com.devexperts.qd.ng.RecordCursor;
import com.devexperts.qd.ng.RecordListener;
import com.devexperts.qd.ng.RecordMode;
import com.devexperts.qd.ng.RecordProvider;
import com.devexperts.qd.ng.RecordSource;
import com.devexperts.qd.util.RecordProcessor;
import com.devexperts.util.IndexedSet;
import com.devexperts.util.TimePeriod;
import com.dxfeed.api.DXEndpoint;
import com.dxfeed.api.DXFeed;
import com.dxfeed.api.DXFeedSubscription;
import com.dxfeed.api.impl.DXEndpointImpl;
import com.dxfeed.api.impl.EventDelegate;
import com.dxfeed.api.impl.EventDelegateSet;
import com.dxfeed.api.osub.IndexedEventSubscriptionSymbol;
import com.dxfeed.api.osub.ObservableSubscriptionChangeListener;
import com.dxfeed.api.osub.TimeSeriesSubscriptionSymbol;
import com.dxfeed.event.EventType;
import com.dxfeed.event.IndexedEvent;
import com.dxfeed.event.IndexedEventSource;
import com.dxfeed.event.LastingEvent;
import com.dxfeed.event.TimeSeriesEvent;
import com.dxfeed.impl.AbstractIndexedList;
import com.dxfeed.promise.Promise;
import com.dxfeed.promise.PromiseHandler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;

public class DXFeedImpl
extends DXFeed {
    private static boolean TRACE_LOG = DXFeedImpl.class.desiredAssertionStatus();
    private static final String INVALID_EVENT_MSG = "Invalid event type and/or role";
    private static final QDContract[] CONTRACTS = QDContract.values();
    private static final int N_CONTRACTS = CONTRACTS.length;
    private static final EventProcessorAttachmentStrategy EVENT_PROCESSOR_ATTACHMENT_STRATEGY = new EventProcessorAttachmentStrategy();
    private static final LastEventAttachmentStrategy LAST_EVENT_ATTACHMENT_STRATEGY = new LastEventAttachmentStrategy();
    private static final ThreadLocal<LocalAddBatch> LOCAL_ADD_BATCH = new ThreadLocal();
    private static final ThreadLocal<LocalRemoveBatch> LOCAL_REMOVE_BATCH = new ThreadLocal();
    private final DXEndpointImpl endpoint;
    private final QDFilter filter;
    private final QDFilter.UpdateListener filterListener;
    private final RecordMode retrieveMode;
    private final QDAgent.Builder[] eventProcessorAgentBuilders = new QDAgent.Builder[N_CONTRACTS];
    private final IndexedSet<DXFeedSubscription<?>, EventProcessor<?, ?>> eventProcessors = IndexedSet.create(value -> value.subscription);
    private final IndexedSet<Closeable, Closeable> closeables = new IndexedSet();
    private final LastEventsProcessor lastEventsProcessor;
    private final long aggregationPeriodMillis;
    private static final int STATE_AVAILABLE_DATA_MASK = (1 << N_CONTRACTS) - 1;
    private static final int STATE_AVAILABLE_SNAPSHOT_MASK = (1 << N_CONTRACTS) - 1 << N_CONTRACTS;
    private static final int STATE_SCHEDULED_DATA = 0x40000000;
    private static final int STATE_SCHEDULED_SNAPSHOT = Integer.MIN_VALUE;

    public DXFeedImpl(DXEndpointImpl endpoint) {
        this(endpoint, QDFilter.ANYTHING);
    }

    @Deprecated
    public DXFeedImpl(DXEndpointImpl endpoint, QDFilter filter) {
        this.endpoint = endpoint;
        this.filter = filter;
        if (filter.isDynamic()) {
            this.filterListener = this::updateSubscriptionsOnFilterUpdate;
            filter.getUpdated().addUpdateListener(this.filterListener);
        } else {
            this.filterListener = null;
        }
        RecordMode mode = RecordMode.FLAGGED_DATA.withAttachment();
        if (endpoint.getQDEndpoint().hasEventTimeSequence()) {
            mode = mode.withEventTimeSequence();
        }
        this.retrieveMode = mode;
        for (QDContract contract : endpoint.getContracts()) {
            QDAgent.Builder builder;
            this.eventProcessorAgentBuilders[contract.ordinal()] = builder = endpoint.getCollector(contract).agentBuilder().withHistorySnapshot(true).withFilter(filter).withAttachmentStrategy(EVENT_PROCESSOR_ATTACHMENT_STRATEGY);
        }
        QDTicker ticker = (QDTicker)endpoint.getCollector(QDContract.TICKER);
        if (ticker == null) {
            this.lastEventsProcessor = null;
        } else {
            this.lastEventsProcessor = new LastEventsProcessor(ticker, filter);
            this.lastEventsProcessor.start();
        }
        this.aggregationPeriodMillis = endpoint.hasProperty("dxfeed.aggregationPeriod") ? TimePeriod.valueOf(endpoint.getProperty("dxfeed.aggregationPeriod")).getTime() : 0L;
    }

    public static void clearDataInBuffer(RecordBuffer buf, boolean keepTime) {
        RecordCursor cur;
        while ((cur = buf.writeNext()) != null) {
            int i;
            int n = i = keepTime ? 2 : 0;
            while (i < cur.getIntCount()) {
                cur.setInt(i, 0);
                ++i;
            }
            for (i = 0; i < cur.getObjCount(); ++i) {
                cur.setObj(i, null);
            }
        }
    }

    public void awaitTerminationAndCloseImpl() throws InterruptedException {
        for (EventProcessor processor : this.eventProcessors.toArray(new EventProcessor[0])) {
            if (processor == null) break;
            processor.awaitTermination();
        }
        this.closeImpl();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void closeImpl() {
        if (!this.endpoint.isClosed()) {
            Object object = this.endpoint.getLock();
            synchronized (object) {
                this.closeComponents();
            }
        } else {
            this.closeComponents();
        }
    }

    private void closeComponents() {
        for (EventProcessor<?, ?> processor : this.eventProcessors) {
            processor.close(false);
        }
        this.eventProcessors.clear();
        for (Closeable c : this.closeables) {
            c.close();
        }
        this.closeables.clear();
        if (this.lastEventsProcessor != null) {
            this.lastEventsProcessor.close();
        }
        if (this.filterListener != null) {
            assert (this.filter.isDynamic());
            this.filter.getUpdated().removeUpdateListener(this.filterListener);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeEventProcessor(DXFeedSubscription<?> subscription) {
        if (!this.endpoint.isClosed()) {
            Object object = this.endpoint.getLock();
            synchronized (object) {
                if (!this.endpoint.isClosed()) {
                    this.eventProcessors.removeKey(subscription);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeCloseable(Closeable c) {
        if (!this.endpoint.isClosed()) {
            Object object = this.endpoint.getLock();
            synchronized (object) {
                if (!this.endpoint.isClosed()) {
                    this.closeables.remove(c);
                }
            }
        }
    }

    @Override
    public void attachSubscription(DXFeedSubscription<?> subscription) {
        subscription.addChangeListener(new SubscriptionChangeListener(subscription, false));
    }

    @Override
    public void detachSubscription(DXFeedSubscription<?> subscription) {
        subscription.removeChangeListener(new SubscriptionChangeListener(subscription, false));
    }

    @Override
    public void detachSubscriptionAndClear(DXFeedSubscription<?> subscription) {
        subscription.removeChangeListener(new SubscriptionChangeListener(subscription, true));
    }

    @Override
    public <E extends LastingEvent<?>> E getLastEvent(E event) {
        EventDelegate<?> delegate = this.getLastingEventDelegateOrNull(event.getClass(), event.getEventSymbol());
        if (delegate == null) {
            return event;
        }
        QDTicker ticker = (QDTicker)this.endpoint.getCollector(QDContract.TICKER);
        String qdSymbol = delegate.getQDSymbolByEvent(event);
        int cipher = this.endpoint.encode(qdSymbol);
        if (!this.filter.getUpdatedFilter().accept(QDContract.TICKER, delegate.getRecord(), cipher, qdSymbol)) {
            return event;
        }
        LocalAddBatch lb = DXFeedImpl.getLocalAddBatch();
        if (ticker.getDataIfAvailable(lb.owner, delegate.getRecord(), cipher, qdSymbol)) {
            return (E)((LastingEvent)delegate.getEvent(event, lb.owner.cursor()));
        }
        return event;
    }

    @Override
    public <E extends LastingEvent<?>> E getLastEventIfSubscribed(Class<E> eventType, Object symbol) {
        EventDelegate<E> delegate = this.getLastingEventDelegateOrNull(eventType, symbol);
        if (delegate == null) {
            return null;
        }
        assert (this.lastEventsProcessor != null);
        String qdSymbol = delegate.getQDSymbolByEventSymbol(symbol);
        int cipher = this.endpoint.encode(qdSymbol);
        if (!this.filter.getUpdatedFilter().accept(QDContract.TICKER, delegate.getRecord(), cipher, qdSymbol)) {
            return null;
        }
        LocalAddBatch lb = DXFeedImpl.getLocalAddBatch();
        if (this.lastEventsProcessor.ticker.getDataIfSubscribed(lb.owner, delegate.getRecord(), cipher, qdSymbol)) {
            return (E)((LastingEvent)delegate.createEvent(symbol, lb.owner.cursor()));
        }
        return null;
    }

    @Override
    public <E extends LastingEvent<?>> Promise<E> getLastEventPromise(Class<E> eventType, Object symbol) {
        if (eventType == null || symbol == null) {
            throw new NullPointerException();
        }
        EventDelegate<E> delegate = this.getLastingEventDelegateOrNull(eventType, symbol);
        if (delegate == null) {
            return Promise.failed(new IllegalArgumentException(INVALID_EVENT_MSG));
        }
        assert (this.lastEventsProcessor != null);
        String qdSymbol = delegate.getQDSymbolByEventSymbol(symbol);
        int cipher = this.endpoint.encode(qdSymbol);
        if (!this.filter.getUpdatedFilter().accept(QDContract.TICKER, delegate.getRecord(), cipher, qdSymbol)) {
            return Promise.failed(new CancellationException("cancel"));
        }
        LocalAddBatch lb = DXFeedImpl.getLocalAddBatch();
        if (this.lastEventsProcessor.ticker.getDataIfAvailable(lb.owner, delegate.getRecord(), cipher, qdSymbol)) {
            return Promise.completed(delegate.createEvent(symbol, lb.owner.cursor()));
        }
        LastEventPromise<E> promise = new LastEventPromise<E>(symbol, delegate, cipher, qdSymbol);
        lb.subscribeStartBatch();
        lb.subscribeAddBatch(promise);
        if (!lb.completeAddSubBatch(this.lastEventsProcessor)) {
            promise.cancel();
            return promise;
        }
        return promise;
    }

    @Override
    public <E extends LastingEvent<?>> List<Promise<E>> getLastEventsPromises(Class<E> eventType, Collection<?> symbols) {
        if (eventType == null) {
            throw new NullPointerException();
        }
        ArrayList<Promise<Promise<Object>>> result = new ArrayList<Promise<Promise<Object>>>(symbols.size());
        LocalAddBatch lb = null;
        for (Object obj : symbols) {
            EventDelegate<E> delegate = this.getLastingEventDelegateOrNull(eventType, obj);
            if (delegate == null) {
                result.add(Promise.failed(new IllegalArgumentException(INVALID_EVENT_MSG)));
                continue;
            }
            assert (this.lastEventsProcessor != null);
            String qdSymbol = delegate.getQDSymbolByEventSymbol(obj);
            int cipher = this.endpoint.encode(qdSymbol);
            if (!this.filter.getUpdatedFilter().accept(QDContract.TICKER, delegate.getRecord(), cipher, qdSymbol)) {
                result.add(Promise.failed(new CancellationException("cancel")));
                continue;
            }
            if (lb == null) {
                lb = DXFeedImpl.getLocalAddBatch();
                lb.subscribeStartBatch();
            }
            LastEventPromise<E> promise = new LastEventPromise<E>(obj, delegate, cipher, qdSymbol);
            result.add(promise);
            if (this.lastEventsProcessor.ticker.getDataIfAvailable(lb.owner, delegate.getRecord(), cipher, qdSymbol)) {
                promise.complete(delegate.createEvent(obj, lb.owner.cursor()));
                continue;
            }
            lb.subscribeAddBatch(promise);
        }
        if (lb != null && !lb.completeAddSubBatch(this.lastEventsProcessor)) {
            for (Promise promise : result) {
                promise.cancel();
            }
        }
        return result;
    }

    private static LocalAddBatch getLocalAddBatch() {
        LocalAddBatch localAddBatch = LOCAL_ADD_BATCH.get();
        if (localAddBatch == null) {
            localAddBatch = new LocalAddBatch();
            LOCAL_ADD_BATCH.set(localAddBatch);
        }
        return localAddBatch;
    }

    private <E extends EventType<?>> EventDelegate<E> getLastingEventDelegateOrNull(Class<E> eventType, Object symbol) {
        EventDelegateSet<?, ?> delegateSet = this.endpoint.getDelegateSetByEventType(eventType);
        if (delegateSet == null) {
            return null;
        }
        Object delegate = delegateSet.getLastingDelegateByEventSymbol(delegateSet.convertSymbol(symbol));
        if (delegate == null) {
            return null;
        }
        if (((EventDelegate)delegate).getContract() != QDContract.TICKER) {
            return null;
        }
        return delegate;
    }

    @Override
    public <E extends IndexedEvent<?>> Promise<List<E>> getIndexedEventsPromise(Class<E> eventType, Object symbol, IndexedEventSource source) {
        if (eventType == null || symbol == null || source == null) {
            throw new NullPointerException();
        }
        EventDelegateSet<?, ?> delegateSet = this.endpoint.getDelegateSetByEventType(eventType);
        if (delegateSet == null) {
            return Promise.failed(new IllegalArgumentException(INVALID_EVENT_MSG));
        }
        List<?> delegates = delegateSet.getSubDelegatesBySubscriptionSymbol(symbol = delegateSet.convertSymbol(symbol), source.id());
        if (delegates.size() != 1) {
            return Promise.failed(new IllegalArgumentException(INVALID_EVENT_MSG));
        }
        EventDelegate delegate = (EventDelegate)delegates.get(0);
        if (delegate.getContract() != QDContract.HISTORY) {
            return Promise.failed(new IllegalArgumentException(INVALID_EVENT_MSG));
        }
        return this.fetchOrSubscribeFromHistory(symbol, delegate, 0L, 0L, Long.MAX_VALUE);
    }

    @Override
    public <E extends IndexedEvent<?>> List<E> getIndexedEventsIfSubscribed(Class<E> eventType, Object symbol, IndexedEventSource source) {
        if (eventType == null || symbol == null || source == null) {
            throw new NullPointerException();
        }
        EventDelegateSet<?, ?> delegateSet = this.endpoint.getDelegateSetByEventType(eventType);
        if (delegateSet == null) {
            return null;
        }
        List<?> delegates = delegateSet.getSubDelegatesBySubscriptionSymbol(symbol = delegateSet.convertSymbol(symbol), source.id());
        if (delegates.size() != 1) {
            return null;
        }
        EventDelegate delegate = (EventDelegate)delegates.get(0);
        if (delegate.getContract() != QDContract.HISTORY) {
            return null;
        }
        return this.fetchFromHistoryIfSubscribed(symbol, delegate, 0L, Long.MAX_VALUE);
    }

    @Override
    public <E extends TimeSeriesEvent<?>> Promise<List<E>> getTimeSeriesPromise(Class<E> eventType, Object symbol, long fromTime, long toTime) {
        if (eventType == null || symbol == null) {
            throw new NullPointerException();
        }
        EventDelegateSet<?, ?> delegateSet = this.endpoint.getDelegateSetByEventType(eventType);
        if (delegateSet == null) {
            return Promise.failed(new IllegalArgumentException(INVALID_EVENT_MSG));
        }
        List<?> delegates = delegateSet.getTimeSeriesDelegatesByEventSymbol(symbol = delegateSet.convertSymbol(symbol));
        if (delegates.size() != 1) {
            return Promise.failed(new IllegalArgumentException(INVALID_EVENT_MSG));
        }
        EventDelegate delegate = (EventDelegate)delegates.get(0);
        if (delegate.getContract() != QDContract.HISTORY) {
            return Promise.failed(new IllegalArgumentException(INVALID_EVENT_MSG));
        }
        long fetchTime = delegate.getFetchTimeHeuristicByEventSymbolAndFromTime(symbol, fromTime);
        return this.fetchOrSubscribeFromHistory(symbol, delegate, delegate.getQDTimeByEventTime(fetchTime), delegate.getQDTimeByEventTime(fromTime), delegate.getQDTimeByEventTime(toTime));
    }

    @Override
    public <E extends TimeSeriesEvent<?>> List<E> getTimeSeriesIfSubscribed(Class<E> eventType, Object symbol, long fromTime, long toTime) {
        if (eventType == null || symbol == null) {
            throw new NullPointerException();
        }
        EventDelegateSet<?, ?> delegateSet = this.endpoint.getDelegateSetByEventType(eventType);
        if (delegateSet == null) {
            return null;
        }
        List<?> delegates = delegateSet.getTimeSeriesDelegatesByEventSymbol(symbol = delegateSet.convertSymbol(symbol));
        if (delegates.size() != 1) {
            return null;
        }
        EventDelegate delegate = (EventDelegate)delegates.get(0);
        if (delegate.getContract() != QDContract.HISTORY) {
            return null;
        }
        return this.fetchFromHistoryIfSubscribed(symbol, delegate, delegate.getQDTimeByEventTime(fromTime), delegate.getQDTimeByEventTime(toTime));
    }

    @Nullable
    private <E extends IndexedEvent<?>> List<E> fetchFromHistoryIfSubscribed(Object symbol, EventDelegate<E> delegate, long fromQDTime, long toQDTime) {
        QDHistory history = (QDHistory)this.endpoint.getCollector(QDContract.HISTORY);
        String qdSymbol = delegate.getQDSymbolByEventSymbol(symbol);
        int cipher = this.endpoint.encode(qdSymbol);
        if (!this.filter.getUpdatedFilter().accept(QDContract.HISTORY, delegate.getRecord(), cipher, qdSymbol)) {
            return null;
        }
        if (!history.isSubscribed(delegate.getRecord(), cipher, qdSymbol, fromQDTime)) {
            return null;
        }
        HistoryFetchResult<E> fetch = new HistoryFetchResult<E>(symbol, 0L, delegate, false);
        history.examineData(delegate.getRecord(), cipher, qdSymbol, fromQDTime, toQDTime, fetch);
        return fetch.result != null ? fetch.result : Collections.emptyList();
    }

    private <E extends IndexedEvent<?>> Promise<List<E>> fetchOrSubscribeFromHistory(Object symbol, EventDelegate<E> delegate, long fetchQDTime, long fromQDTime, long toQDTime) {
        QDHistory history = (QDHistory)this.endpoint.getCollector(QDContract.HISTORY);
        String qdSymbol = delegate.getQDSymbolByEventSymbol(symbol);
        int cipher = this.endpoint.encode(qdSymbol);
        if (!this.filter.getUpdatedFilter().accept(QDContract.HISTORY, delegate.getRecord(), cipher, qdSymbol)) {
            return Promise.failed(new CancellationException("cancel"));
        }
        HistoryFetchResult<E> fetch = new HistoryFetchResult<E>(symbol, fromQDTime, delegate, true);
        history.examineData(delegate.getRecord(), cipher, qdSymbol, fetchQDTime, toQDTime, fetch);
        if (fetch.result != null) {
            if (!fetch.txPending) {
                return Promise.completed(fetch.result);
            }
            fetch.result.clearImpl();
        }
        QDAgent agent = history.agentBuilder().withHistorySnapshot(true).build();
        RecordBuffer sub = RecordBuffer.getInstance(RecordMode.HISTORY_SUBSCRIPTION);
        sub.add(delegate.getRecord(), cipher, qdSymbol).setTime(fetchQDTime);
        agent.addSubscription(sub);
        sub.release();
        HistoryPromiseCompleter completer = new HistoryPromiseCompleter(agent, symbol, fromQDTime, toQDTime, delegate, fetch.result);
        this.register(completer);
        return completer.promise;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private EventProcessor<?, ?> getOrCreateEventProcessor(DXFeedSubscription<?> subscription) {
        EventProcessor<Object, ?> processor = this.eventProcessors.getByKey(subscription);
        if (processor != null) {
            return processor;
        }
        Object object = this.endpoint.getLock();
        synchronized (object) {
            if (this.endpoint.isClosed()) {
                return null;
            }
            processor = this.eventProcessors.getByKey(subscription);
            if (processor == null) {
                processor = new EventProcessor(subscription);
                this.eventProcessors.add(processor);
            }
        }
        return processor;
    }

    private void closeEventProcessor(DXFeedSubscription<?> subscription, boolean clear) {
        EventProcessor<?, ?> processor = this.eventProcessors.getByKey(subscription);
        if (processor == null) {
            return;
        }
        processor.close(clear);
        this.removeEventProcessor(subscription);
    }

    private EnumMap<QDContract, RecordBuffer> toSubscription(DXFeedSubscription<?> subscription, Set<?> symbols, boolean isAddSub) {
        EnumMap<QDContract, RecordBuffer> result = new EnumMap<QDContract, RecordBuffer>(QDContract.class);
        for (Class<?> eventType : subscription.getEventTypes()) {
            EventDelegateSet<?, ?> delegateSet = this.endpoint.getDelegateSetByEventType(eventType);
            if (delegateSet == null) continue;
            for (Object subSymbol : symbols) {
                List<?> delegates;
                Object eventSymbol;
                long fromTime = 0L;
                if (subSymbol instanceof TimeSeriesSubscriptionSymbol) {
                    TimeSeriesSubscriptionSymbol tss = (TimeSeriesSubscriptionSymbol)subSymbol;
                    fromTime = tss.getFromTime();
                    eventSymbol = delegateSet.convertSymbol(tss.getEventSymbol());
                    delegates = delegateSet.getTimeSeriesDelegatesByEventSymbol(eventSymbol);
                } else if (subSymbol instanceof IndexedEventSubscriptionSymbol) {
                    IndexedEventSubscriptionSymbol ies = (IndexedEventSubscriptionSymbol)subSymbol;
                    eventSymbol = delegateSet.convertSymbol(ies.getEventSymbol());
                    delegates = delegateSet.getSubDelegatesBySubscriptionSymbol(eventSymbol, ies.getSource().id());
                } else {
                    eventSymbol = delegateSet.convertSymbol(subSymbol);
                    delegates = delegateSet.getSubDelegatesBySubscriptionSymbol(eventSymbol, -1);
                }
                for (EventDelegate delegate : delegates) {
                    RecordBuffer sub = result.get(delegate.getContract());
                    if (sub == null) {
                        sub = RecordBuffer.getInstance((isAddSub ? RecordMode.addedSubscriptionFor(delegate.getContract()) : RecordMode.SUBSCRIPTION).withAttachment());
                        result.put(delegate.getContract(), sub);
                    }
                    String qdSymbol = delegate.getQDSymbolByEventSymbol(eventSymbol);
                    RecordCursor cur = sub.add(delegate.getRecord(), this.endpoint.encode(qdSymbol), qdSymbol);
                    if (fromTime != 0L && isAddSub) {
                        cur.setTime(delegate.getQDTimeByEventTime(fromTime));
                    }
                    cur.setAttachment(new SymbolDelegate(eventSymbol, delegate));
                }
            }
        }
        return result;
    }

    public DXEndpointImpl getDXEndpoint() {
        return this.endpoint;
    }

    public boolean hasAggregationPeriod() {
        return this.aggregationPeriodMillis > 0L;
    }

    public long getAggregationPeriodMillis() {
        return this.aggregationPeriodMillis;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateSubscriptionsOnFilterUpdate(QDFilter updatedFilter) {
        for (EventProcessor processor : this.eventProcessors.toArray(new EventProcessor[0])) {
            DXFeedSubscription subscription;
            DXFeedSubscription dXFeedSubscription = subscription = processor.subscription;
            synchronized (dXFeedSubscription) {
                Set<?> symbols = subscription.getDecoratedSymbols();
                EnumMap<QDContract, RecordBuffer> sub = this.toSubscription(subscription, symbols, true);
                for (QDContract contract : sub.keySet()) {
                    RecordBuffer buffer = sub.get(contract);
                    processor.getOrCreateAgent(contract).setSubscription(buffer);
                    buffer.release();
                }
            }
        }
    }

    private <E> void executePromiseHandler(Promise<E> promise, PromiseHandler<? super E> handler) {
        if (handler != null) {
            this.endpoint.getOrCreateExecutor().execute(() -> handler.promiseDone(promise));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <E extends IndexedEvent<?>> HistoryPromiseCompleter<E> register(HistoryPromiseCompleter<E> completer) {
        Object object = this.endpoint.getLock();
        synchronized (object) {
            if (this.endpoint.isClosed()) {
                completer.promise.cancel();
                return completer;
            }
            this.closeables.add(completer);
        }
        completer.agent.setRecordListener(completer);
        return completer;
    }

    private static class ResultList<E extends IndexedEvent<?>>
    extends AbstractIndexedList<E> {
        private ResultList() {
        }

        @Override
        protected long getIndex(E event) {
            return event.getIndex();
        }
    }

    private static interface Closeable {
        public void close();
    }

    private class LastEventsProcessor
    extends RecordProcessor {
        final QDTicker ticker;
        final QDAgent tickerAgent;

        LastEventsProcessor(QDTicker ticker, QDFilter filter) {
            super(DXFeedImpl.this.endpoint.getOrCreateExecutor());
            this.ticker = ticker;
            this.tickerAgent = ticker.agentBuilder().withFilter(filter).withAttachmentStrategy(LAST_EVENT_ATTACHMENT_STRATEGY).build();
        }

        void start() {
            this.startProcessing(this.tickerAgent);
        }

        void close() {
            this.stopProcessing();
            this.tickerAgent.close();
        }

        boolean addSubscription(RecordBuffer sub) {
            if (DXFeedImpl.this.endpoint.isClosed()) {
                return false;
            }
            this.tickerAgent.addSubscription(sub);
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected synchronized void process(RecordSource source) {
            LocalRemoveBatch oldRemoveBatch = (LocalRemoveBatch)LOCAL_REMOVE_BATCH.get();
            LocalRemoveBatch removeBatch = oldRemoveBatch;
            if (removeBatch == null || removeBatch.lastEventsProcessor != this) {
                removeBatch = new LocalRemoveBatch(this);
                LOCAL_REMOVE_BATCH.set(removeBatch);
            }
            try {
                RecordCursor cursor;
                while ((cursor = source.next()) != null) {
                    LAST_EVENT_ATTACHMENT_STRATEGY.processEach(cursor, this);
                }
            }
            finally {
                if (removeBatch != oldRemoveBatch) {
                    LOCAL_REMOVE_BATCH.set(oldRemoveBatch);
                    removeBatch.completeRemoveSubBatch();
                }
            }
        }

        <E extends EventType<?>> void processEvent(RecordCursor cursor, LastEventPromise<E> promise) {
            if (promise.isDone()) {
                return;
            }
            promise.complete(promise.delegate.createEvent(promise.symbol, cursor));
        }

        <E extends EventType<?>> void cancel(LastEventPromise<E> a) {
            LocalRemoveBatch localRemoveBatch = (LocalRemoveBatch)LOCAL_REMOVE_BATCH.get();
            if (localRemoveBatch != null && localRemoveBatch.lastEventsProcessor == this) {
                localRemoveBatch.removeSub.add(a.delegate.getRecord(), a.cipher, a.qdSymbol).setAttachment(a);
                return;
            }
            RecordBuffer removeSub = RecordBuffer.getInstance(RecordMode.SUBSCRIPTION.withAttachment());
            removeSub.add(a.delegate.getRecord(), a.cipher, a.qdSymbol).setAttachment(a);
            this.tickerAgent.removeSubscription(removeSub);
            removeSub.release();
        }
    }

    private static class LocalRemoveBatch {
        RecordBuffer removeSub = RecordBuffer.getInstance(RecordMode.SUBSCRIPTION.withAttachment());
        LastEventsProcessor lastEventsProcessor;

        LocalRemoveBatch(LastEventsProcessor lastEventsProcessor) {
            this.lastEventsProcessor = lastEventsProcessor;
        }

        void completeRemoveSubBatch() {
            if (!this.removeSub.isEmpty()) {
                this.lastEventsProcessor.tickerAgent.removeSubscription(this.removeSub);
            }
            this.removeSub.release();
        }
    }

    private static class LocalAddBatch {
        final RecordCursor.Owner owner = RecordCursor.allocateOwner();
        RecordBuffer addSub;

        private LocalAddBatch() {
        }

        void subscribeStartBatch() {
            this.addSub = null;
        }

        <E extends EventType<?>> void subscribeAddBatch(LastEventPromise<E> a) {
            if (this.addSub == null) {
                this.addSub = RecordBuffer.getInstance(RecordMode.SUBSCRIPTION.withAttachment());
            }
            this.addSub.add(a.delegate.getRecord(), a.cipher, a.qdSymbol).setAttachment(a);
        }

        boolean completeAddSubBatch(LastEventsProcessor processor) {
            RecordCursor cur;
            if (this.addSub == null) {
                return true;
            }
            if (!processor.addSubscription(this.addSub)) {
                return false;
            }
            this.addSub.rewind();
            while ((cur = this.addSub.next()) != null) {
                LastEventPromise a = (LastEventPromise)cur.getAttachment();
                a.subscribed();
            }
            this.addSub.release();
            this.addSub = null;
            return true;
        }
    }

    private static class LastEventAttachmentStrategy
    extends ArrayListAttachmentStrategy<LastEventPromise<?>, LastEventsProcessor> {
        private LastEventAttachmentStrategy() {
        }

        @Override
        public void process(RecordCursor cursor, LastEventPromise attachment, LastEventsProcessor ctx) {
            ctx.processEvent(cursor, attachment);
        }
    }

    private class LastEventPromise<E extends EventType<?>>
    extends Promise<E> {
        final Object symbol;
        final EventDelegate<E> delegate;
        final int cipher;
        final String qdSymbol;
        private volatile boolean subscribed;

        LastEventPromise(Object symbol, EventDelegate<E> delegate, int cipher, String qdSymbol) {
            this.symbol = symbol;
            this.delegate = delegate;
            this.cipher = cipher;
            this.qdSymbol = qdSymbol;
        }

        void subscribed() {
            this.subscribed = true;
            if (this.isDone()) {
                DXFeedImpl.this.lastEventsProcessor.cancel(this);
            }
        }

        @Override
        protected void handleDone(PromiseHandler<? super E> handler) {
            if (this.subscribed) {
                DXFeedImpl.this.lastEventsProcessor.cancel(this);
            }
            DXFeedImpl.this.executePromiseHandler(this, handler);
        }
    }

    private static class EventProcessorAttachmentStrategy
    extends ArrayListAttachmentStrategy<SymbolDelegate, EventProcessor<?, ?>> {
        private EventProcessorAttachmentStrategy() {
        }

        @Override
        public void process(RecordCursor cursor, SymbolDelegate attachment, EventProcessor ctx) {
            if (attachment == null) {
                ctx.processWildcardEvent(cursor);
            } else {
                ctx.processEvent(cursor, attachment.symbol, attachment.delegate);
            }
        }

        @Override
        protected boolean incrementCombines(SymbolDelegate attachment) {
            ++attachment.count;
            return true;
        }

        @Override
        protected boolean decrementAndNotEmpty(SymbolDelegate attachment) {
            return --attachment.count > 0;
        }
    }

    private static final class SymbolDelegate {
        final Object symbol;
        final EventDelegate<?> delegate;
        int count = 1;

        SymbolDelegate(Object symbol, EventDelegate<?> delegate) {
            this.symbol = symbol;
            this.delegate = delegate;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof SymbolDelegate)) {
                return false;
            }
            SymbolDelegate that = (SymbolDelegate)o;
            return this.symbol.equals(that.symbol) && this.delegate.equals(that.delegate);
        }

        public int hashCode() {
            return 31 * this.symbol.hashCode() + this.delegate.hashCode();
        }
    }

    private class EventProcessor<T, E extends EventType<T>>
    implements RecordListener,
    Runnable {
        final QDAgent[] agents = new QDAgent[DXFeedImpl.access$1200()];
        final AtomicInteger state = new AtomicInteger();
        final DXFeedSubscription<E> subscription;
        final Runnable snapshotTask;
        final AtomicReference<CountDownLatch> terminationLatch = new AtomicReference();
        List<E> events;

        EventProcessor(DXFeedSubscription<E> subscription) {
            this.subscription = subscription;
            this.snapshotTask = DXFeedImpl.this.hasAggregationPeriod() ? () -> this.executeTask(true) : null;
        }

        private boolean setState(int mask) {
            int cur;
            do {
                if (((cur = this.state.get()) & mask) == 0) continue;
                return false;
            } while (!this.state.compareAndSet(cur, cur | mask));
            return true;
        }

        private void clearState(int mask) {
            int cur;
            while (((cur = this.state.get()) & mask) != 0 && !this.state.compareAndSet(cur, cur & ~mask)) {
            }
        }

        private void rescheduleTask(boolean snapshot) {
            Executor executor = this.subscription.getExecutor();
            if (executor == null) {
                executor = DXFeedImpl.this.endpoint.getOrCreateExecutor();
            }
            long aggregationPeriodMillis = DXFeedImpl.this.getAggregationPeriodMillis();
            if (snapshot) {
                executor.execute(this.snapshotTask);
            } else if (aggregationPeriodMillis == 0L || !(executor instanceof ScheduledExecutorService)) {
                executor.execute(this);
            } else {
                ((ScheduledExecutorService)executor).schedule(this, aggregationPeriodMillis, TimeUnit.MILLISECONDS);
            }
        }

        private void scheduleTaskIfNeeded(boolean snapshot) {
            if (this.setState(snapshot ? Integer.MIN_VALUE : 0x40000000)) {
                this.rescheduleTask(snapshot);
            }
        }

        @Override
        public void run() {
            this.executeTask(false);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        synchronized void executeTask(boolean snapshot) {
            boolean rescheduleTask = true;
            int availableMask = snapshot ? STATE_AVAILABLE_SNAPSHOT_MASK : STATE_AVAILABLE_DATA_MASK;
            try {
                int available = this.state.get();
                if ((available & availableMask) != 0) {
                    RecordBuffer buf = RecordBuffer.getInstance(DXFeedImpl.this.retrieveMode);
                    buf.setCapacityLimited(true);
                    this.retrieveImpl(buf, available, snapshot);
                    if (!buf.isEmpty()) {
                        this.process(buf);
                    }
                    buf.release();
                }
                boolean bl = rescheduleTask = (this.state.get() & availableMask) != 0;
                if (rescheduleTask) {
                    this.rescheduleTask(snapshot);
                } else {
                    this.clearState(snapshot ? Integer.MIN_VALUE : 0x40000000);
                    if ((this.state.get() & availableMask) != 0) {
                        this.scheduleTaskIfNeeded(snapshot);
                    }
                }
                if (!this.hasMoreDataToProcess()) {
                    this.signalNoMoreDataToProcess();
                }
            }
            catch (Throwable throwable) {
                if (rescheduleTask) {
                    this.rescheduleTask(snapshot);
                } else {
                    this.clearState(snapshot ? Integer.MIN_VALUE : 0x40000000);
                    if ((this.state.get() & availableMask) != 0) {
                        this.scheduleTaskIfNeeded(snapshot);
                    }
                }
                if (!this.hasMoreDataToProcess()) {
                    this.signalNoMoreDataToProcess();
                }
                throw throwable;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void retrieveImpl(RecordBuffer buf, int available, boolean snapshot) {
            int offset = snapshot ? N_CONTRACTS : 0;
            for (int i = 0; i < N_CONTRACTS; ++i) {
                int mask = 1 << i + offset;
                if ((available & mask) == 0) continue;
                this.clearState(mask);
                boolean more = true;
                try {
                    RecordProvider provider = snapshot ? this.agents[i].getSnapshotProvider() : this.agents[i];
                    more = provider.retrieve(buf);
                    continue;
                }
                finally {
                    if (more) {
                        this.setState(mask);
                        return;
                    }
                }
            }
        }

        @Override
        public void recordsAvailable(RecordProvider provider) {
            int i;
            for (i = 0; i < N_CONTRACTS; ++i) {
                if (provider != this.agents[i]) continue;
                if (this.setState(1 << i)) {
                    this.scheduleTaskIfNeeded(false);
                }
                return;
            }
            if (this.snapshotTask != null) {
                for (i = 0; i < N_CONTRACTS; ++i) {
                    QDAgent agent = this.agents[i];
                    if (agent == null || provider != agent.getSnapshotProvider()) continue;
                    if (this.setState(1 << i + N_CONTRACTS)) {
                        this.scheduleTaskIfNeeded(true);
                    }
                    return;
                }
            }
        }

        QDAgent getOrCreateAgent(QDContract contract) {
            QDAgent agent = this.agents[contract.ordinal()];
            if (agent != null) {
                return agent;
            }
            agent = DXFeedImpl.this.eventProcessorAgentBuilders[contract.ordinal()].withFilter(DXFeedImpl.this.filter.getUpdatedFilter()).build();
            if (DXFeedImpl.this.endpoint.getRole() == DXEndpoint.Role.STREAM_FEED) {
                agent.setBufferOverflowStrategy(QDAgent.BufferOverflowStrategy.BLOCK);
            }
            this.agents[contract.ordinal()] = agent;
            agent.setRecordListener(this);
            if (this.snapshotTask != null) {
                agent.getSnapshotProvider().setRecordListener(this);
            }
            return agent;
        }

        void closeAgents() {
            for (int i = 0; i < N_CONTRACTS; ++i) {
                QDAgent agent = this.agents[i];
                if (agent == null) continue;
                agent.close();
            }
        }

        void closeAgentsAndExamineDataBySubscription(RecordBuffer buf) {
            for (int i = 0; i < N_CONTRACTS; ++i) {
                QDAgent agent = this.agents[i];
                if (agent == null) continue;
                agent.closeAndExamineDataBySubscription(buf);
                DXFeedImpl.clearDataInBuffer(buf, CONTRACTS[i] == QDContract.HISTORY);
            }
        }

        void close(boolean clear) {
            if (clear) {
                RecordBuffer buf = RecordBuffer.getInstance();
                this.closeAgentsAndExamineDataBySubscription(buf);
                try {
                    this.awaitTermination();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                this.process(buf);
                buf.release();
            } else {
                this.closeAgents();
            }
        }

        private boolean hasMoreDataToProcess() {
            return (this.state.get() & (STATE_AVAILABLE_DATA_MASK | 0x40000000)) != 0;
        }

        private void signalNoMoreDataToProcess() {
            CountDownLatch latch;
            if (TRACE_LOG) {
                QDLog.log.trace("signalNoMoreDataToProcess on " + this);
            }
            if ((latch = this.terminationLatch.get()) != null) {
                latch.countDown();
            }
        }

        private void awaitTermination() throws InterruptedException {
            if (!this.hasMoreDataToProcess()) {
                if (TRACE_LOG) {
                    QDLog.log.trace("awaitTermination on " + this + " -- no more data to process");
                }
                return;
            }
            CountDownLatch latch = this.terminationLatch.get();
            if (latch == null) {
                this.terminationLatch.compareAndSet(null, new CountDownLatch(1));
                latch = this.terminationLatch.get();
            }
            if (this.hasMoreDataToProcess()) {
                if (TRACE_LOG) {
                    QDLog.log.trace("awaitTermination on " + this + " -- await");
                }
                latch.await();
            } else if (TRACE_LOG) {
                QDLog.log.trace("awaitTermination on " + this + " -- no more data to process");
            }
        }

        protected void process(RecordSource source) {
            this.events = new ArrayList();
            try {
                RecordCursor cursor;
                while ((cursor = source.next()) != null) {
                    EVENT_PROCESSOR_ATTACHMENT_STRATEGY.processEach(cursor, this);
                }
                if (this.events.isEmpty()) {
                    return;
                }
                DXFeedImpl.processEvents(this.subscription, this.events);
            }
            finally {
                this.events = null;
            }
        }

        void processWildcardEvent(RecordCursor cursor) {
            List<EventDelegate<?>> delegates = DXFeedImpl.this.endpoint.getDelegateListByContractAndRecord(QDContract.STREAM, cursor.getRecord());
            if (delegates == null) {
                return;
            }
            for (EventDelegate<?> delegate : delegates) {
                if (!this.subscription.containsEventType(delegate.getEventType())) continue;
                this.events.add(delegate.createEvent(cursor));
            }
        }

        void processEvent(RecordCursor cursor, T symbol, EventDelegate<E> delegate) {
            this.events.add(delegate.createEvent(symbol, cursor));
        }
    }

    private class SubscriptionChangeListener<E extends EventType<?>>
    implements ObservableSubscriptionChangeListener {
        private final DXFeedSubscription<E> subscription;
        private final boolean clearOnClose;

        SubscriptionChangeListener(DXFeedSubscription<E> subscription, boolean clearOnClose) {
            this.subscription = subscription;
            this.clearOnClose = clearOnClose;
        }

        @Override
        public void symbolsAdded(Set<?> symbols) {
            EnumMap sub = DXFeedImpl.this.toSubscription(this.subscription, symbols, true);
            if (sub.isEmpty()) {
                return;
            }
            EventProcessor processor = DXFeedImpl.this.getOrCreateEventProcessor(this.subscription);
            for (QDContract contract : sub.keySet()) {
                RecordBuffer buffer = (RecordBuffer)sub.get(contract);
                if (processor != null) {
                    processor.getOrCreateAgent(contract).addSubscription(buffer);
                }
                buffer.release();
            }
        }

        @Override
        public void symbolsRemoved(Set<?> symbols) {
            if (this.subscription.getSymbols().isEmpty()) {
                this.subscriptionClosed();
                return;
            }
            EnumMap sub = DXFeedImpl.this.toSubscription(this.subscription, symbols, false);
            if (sub.isEmpty()) {
                return;
            }
            EventProcessor processor = (EventProcessor)DXFeedImpl.this.eventProcessors.getByKey(this.subscription);
            for (QDContract contract : sub.keySet()) {
                RecordBuffer buffer = (RecordBuffer)sub.get(contract);
                if (processor != null) {
                    processor.getOrCreateAgent(contract).removeSubscription(buffer);
                }
                buffer.release();
            }
        }

        @Override
        public void subscriptionClosed() {
            DXFeedImpl.this.closeEventProcessor(this.subscription, this.clearOnClose);
        }

        public DXFeedImpl feed() {
            return DXFeedImpl.this;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof SubscriptionChangeListener)) {
                return false;
            }
            SubscriptionChangeListener that = (SubscriptionChangeListener)o;
            return this.subscription == that.subscription && this.feed() == that.feed();
        }

        public int hashCode() {
            return this.subscription.hashCode() ^ this.feed().hashCode();
        }
    }

    private class HistoryPromise<E extends IndexedEvent<?>>
    extends Promise<List<E>> {
        private final HistoryPromiseCompleter<E> completer;

        HistoryPromise(HistoryPromiseCompleter<E> completer) {
            this.completer = completer;
        }

        @Override
        protected void handleDone(PromiseHandler<? super List<E>> handler) {
            this.completer.agent.close();
            DXFeedImpl.this.removeCloseable(this.completer);
            DXFeedImpl.this.executePromiseHandler(this, handler);
        }
    }

    private class HistoryPromiseCompleter<E extends IndexedEvent<?>>
    extends AbstractRecordSink
    implements RecordListener,
    Closeable {
        final HistoryPromise<E> promise;
        final QDAgent agent;
        private final Object symbol;
        private final long fromQDTime;
        private final long toQDTime;
        private final EventDelegate<E> delegate;
        ResultList<E> result;
        boolean txPending;
        boolean complete;

        private HistoryPromiseCompleter(QDAgent agent, Object symbol, long fromQDTime, long toQDTime, EventDelegate<E> delegate, ResultList<E> result) {
            this.promise = new HistoryPromise(this);
            this.agent = agent;
            this.symbol = symbol;
            this.toQDTime = toQDTime;
            this.delegate = delegate;
            this.result = result;
            this.fromQDTime = fromQDTime;
        }

        @Override
        public void close() {
            this.promise.cancel();
        }

        @Override
        public void recordsAvailable(RecordProvider provider) {
            this.agent.retrieve(this);
            if (this.complete && !this.txPending) {
                this.promise.complete(this.getOrCreateResult());
            }
        }

        @Override
        public void append(RecordCursor cursor) {
            long time = cursor.getTime();
            int eventFlags = cursor.getEventFlags();
            this.txPending = EventFlag.TX_PENDING.in(eventFlags);
            if (time >= this.fromQDTime && time <= this.toQDTime) {
                boolean remove = EventFlag.REMOVE_EVENT.in(eventFlags);
                IndexedEvent event = (IndexedEvent)this.delegate.createEvent(this.symbol, cursor);
                event.setEventFlags(0);
                this.getOrCreateResult().updateImpl(event, remove);
            }
            if (time <= this.fromQDTime || EventFlag.SNAPSHOT_SNIP.in(eventFlags)) {
                this.complete = true;
            }
        }

        private AbstractIndexedList<E> getOrCreateResult() {
            if (this.result == null) {
                this.result = new ResultList();
            }
            return this.result;
        }
    }

    private static class HistoryFetchResult<E extends IndexedEvent<?>>
    extends AbstractRecordSink {
        private final Object symbol;
        private final long fromQDTime;
        private final EventDelegate<E> delegate;
        ResultList<E> result;
        boolean txPending;

        HistoryFetchResult(Object symbol, long fromQDTime, EventDelegate<E> delegate, boolean needSnapshot) {
            this.symbol = symbol;
            this.fromQDTime = fromQDTime;
            this.delegate = delegate;
            if (!needSnapshot) {
                this.result = new ResultList();
            }
        }

        @Override
        public void append(RecordCursor cursor) {
            long time = cursor.getTime();
            int eventFlags = cursor.getEventFlags();
            if (this.result == null && (time <= this.fromQDTime || EventFlag.SNAPSHOT_SNIP.in(eventFlags))) {
                this.result = new ResultList();
            }
            if (time < this.fromQDTime) {
                return;
            }
            if (this.result != null && !EventFlag.REMOVE_EVENT.in(eventFlags)) {
                IndexedEvent event = (IndexedEvent)this.delegate.createEvent(this.symbol, cursor);
                event.setEventFlags(0);
                this.result.updateImpl(event, false);
            }
            if (EventFlag.TX_PENDING.in(eventFlags)) {
                this.txPending = true;
            }
        }
    }
}

