/*
 * Decompiled with CFR 0.152.
 */
package com.dxfeed.api.impl;

import com.devexperts.qd.DataRecord;
import com.devexperts.qd.QDContract;
import com.devexperts.qd.QDDistributor;
import com.devexperts.qd.kit.CompositeFilters;
import com.devexperts.qd.ng.RecordBuffer;
import com.devexperts.qd.ng.RecordCursor;
import com.devexperts.qd.ng.RecordMode;
import com.devexperts.qd.ng.RecordSource;
import com.devexperts.qd.util.SubscriptionProcessor;
import com.devexperts.util.IndexedSet;
import com.dxfeed.api.DXFeedSubscription;
import com.dxfeed.api.DXPublisher;
import com.dxfeed.api.impl.DXEndpointImpl;
import com.dxfeed.api.impl.DXPublisherObservableSubscriptionImpl;
import com.dxfeed.api.impl.EventDelegate;
import com.dxfeed.api.impl.EventDelegateSet;
import com.dxfeed.api.osub.WildcardSymbol;
import com.dxfeed.event.EventType;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class DXPublisherImpl
extends DXPublisher {
    private final DXEndpointImpl endpoint;
    private final List<QDDistributor> publishDistributors = new ArrayList<QDDistributor>();
    private final IndexedSet<Class<?>, Subscription<?>> subscriptionsByClass = IndexedSet.create(sub -> sub.eventType);

    DXPublisherImpl(DXEndpointImpl endpoint) {
        this.endpoint = endpoint;
        for (QDContract contract : endpoint.getContracts()) {
            int ordinal = contract.ordinal();
            while (this.publishDistributors.size() <= ordinal) {
                this.publishDistributors.add(null);
            }
            this.publishDistributors.set(ordinal, endpoint.getCollector(contract).distributorBuilder().build());
        }
    }

    public void closeImpl() {
        for (Subscription<?> subscription : this.subscriptionsByClass) {
            subscription.close();
        }
    }

    @Override
    public void publishEvents(Collection<?> events) {
        if (this.endpoint.isClosed() || events.isEmpty()) {
            return;
        }
        int nextContract = 0;
        int doneContracts = 0;
        do {
            int thisContract = nextContract;
            nextContract = 0;
            RecordMode mode = RecordMode.FLAGGED_DATA;
            if (this.endpoint.getQDEndpoint().hasEventTimeSequence()) {
                mode = mode.withEventTimeSequence();
            }
            RecordBuffer buf = RecordBuffer.getInstance(mode);
            for (Object event : events) {
                EventDelegateSet<?, ?> delegateSet = this.endpoint.getDelegateSetByEventType(event.getClass());
                List<?> delegates = delegateSet.getPubDelegatesByEvent((EventType)event);
                int delegatesSize = delegates.size();
                for (int i = 0; i < delegatesSize; ++i) {
                    EventDelegate delegate = (EventDelegate)delegates.get(i);
                    int curContract = 1 << delegate.getContract().ordinal();
                    if (curContract != thisContract && thisContract != 0) {
                        if (nextContract != 0 || (doneContracts & curContract) != 0) continue;
                        nextContract = curContract;
                        continue;
                    }
                    RecordCursor cursor = delegate.putEvent((EventType)event, buf);
                    if (cursor == null) continue;
                    thisContract = curContract;
                }
            }
            if (!buf.isEmpty()) {
                this.getOrCreateDistributor(Integer.numberOfTrailingZeros(thisContract)).process(buf);
            }
            buf.release();
            doneContracts |= thisContract;
        } while (nextContract != 0);
    }

    private synchronized QDDistributor getOrCreateDistributor(int contractOrdinal) {
        QDDistributor distributor = this.publishDistributors.get(contractOrdinal);
        if (distributor == null) {
            distributor = this.endpoint.getCollector(QDContract.values()[contractOrdinal]).distributorBuilder().build();
            this.publishDistributors.set(contractOrdinal, distributor);
        }
        return distributor;
    }

    public <T> DXPublisherObservableSubscriptionImpl<T> getSubscription(Class<T> eventType) {
        Subscription<Object> subscription = this.subscriptionsByClass.getByKey(eventType);
        if (subscription == null) {
            subscription = this.createSubscriptionImpl(eventType);
            if (this.endpoint.isClosed()) {
                subscription.close();
            }
        }
        return subscription.observableSubscription;
    }

    private synchronized <T> Subscription<T> createSubscriptionImpl(Class<T> eventType) {
        Subscription<Object> subscription = this.subscriptionsByClass.getByKey(eventType);
        if (subscription == null) {
            subscription = new Subscription<T>(eventType);
            this.subscriptionsByClass.add(subscription);
            subscription.start();
        }
        return subscription;
    }

    private class Processor<T>
    extends SubscriptionProcessor {
        private final QDContract contract;
        private final QDDistributor distributor;
        private final Subscription<T> subscription;
        private final Set<Object> symbols;

        private Processor(Executor executor, QDContract contract, QDDistributor distributor, Subscription<T> subscription) {
            super(executor, contract);
            this.symbols = new IndexedSet();
            this.contract = contract;
            this.distributor = distributor;
            this.subscription = subscription;
        }

        public void start() {
            this.startProcessing(this.distributor);
        }

        public void close() {
            this.stopProcessing();
            this.distributor.close();
        }

        @Override
        protected void processAddedSubscription(RecordSource source) {
            this.processSub(source, true);
        }

        @Override
        protected void processRemovedSubscription(RecordSource source) {
            this.processSub(source, false);
        }

        private void processSub(RecordSource source, boolean add) {
            RecordCursor cursor;
            this.symbols.clear();
            ArrayList<DataRecord> wildcards = null;
            while ((cursor = source.next()) != null) {
                DataRecord record = cursor.getRecord();
                List<EventDelegate<?>> delegates = DXPublisherImpl.this.endpoint.getDelegateListByContractAndRecord(this.contract, record);
                if (delegates == null) continue;
                for (EventDelegate<?> delegate : delegates) {
                    if (!delegate.isPub() || delegate.getEventType() != this.subscription.eventType) continue;
                    String qdSymbol = DXPublisherImpl.this.endpoint.decode(cursor.getCipher(), cursor.getSymbol());
                    if (qdSymbol.equals("*") && this.contract == QDContract.STREAM) {
                        if (wildcards == null) {
                            wildcards = new ArrayList<DataRecord>();
                        }
                        wildcards.add(record);
                        continue;
                    }
                    this.symbols.add(delegate.getSubscriptionSymbolByQDSymbolAndTime(qdSymbol, cursor.getTime()));
                }
            }
            if (add) {
                if (wildcards != null) {
                    if (this.subscription.wildcardRecords.isEmpty()) {
                        this.symbols.add(WildcardSymbol.ALL);
                    }
                    this.subscription.wildcardRecords.addAll(wildcards);
                }
                this.subscription.innerSubscription.addSymbols(this.symbols);
            } else {
                if (wildcards != null) {
                    this.subscription.wildcardRecords.removeAll(wildcards);
                    if (this.subscription.wildcardRecords.isEmpty()) {
                        this.symbols.add(WildcardSymbol.ALL);
                    }
                }
                this.subscription.innerSubscription.removeSymbols(this.symbols);
            }
        }
    }

    private static class InnerSubscription<T>
    extends DXFeedSubscription<T> {
        InnerSubscription(Class<T> eventType) {
            super(eventType);
        }

        @Override
        protected boolean shallNotifyOnSymbolUpdate(@Nonnull Object symbol, @Nullable Object oldSymbol) {
            return true;
        }
    }

    private class Subscription<T> {
        final Class<T> eventType;
        final Set<DataRecord> wildcardRecords;
        final InnerSubscription<T> innerSubscription;
        final DXPublisherObservableSubscriptionImpl<T> observableSubscription;
        final List<Processor<T>> processors = new ArrayList<Processor<T>>();

        Subscription(Class<T> eventType) {
            HashSet<DataRecord> set;
            QDContract contract;
            this.eventType = eventType;
            this.wildcardRecords = new IndexedSet();
            this.innerSubscription = new InnerSubscription<T>(eventType);
            this.observableSubscription = new DXPublisherObservableSubscriptionImpl<T>(this.innerSubscription);
            EnumMap<QDContract, HashSet<DataRecord>> records = new EnumMap<QDContract, HashSet<DataRecord>>(QDContract.class);
            EventDelegateSet<?, ?> delegateSet = DXPublisherImpl.this.endpoint.getDelegateSetByEventType(eventType);
            if (delegateSet != null) {
                for (EventDelegate eventDelegate : delegateSet.getAllPubDelegates()) {
                    contract = eventDelegate.getContract();
                    set = (HashSet<DataRecord>)records.get(contract);
                    if (set == null) {
                        set = new HashSet<DataRecord>();
                        records.put(contract, set);
                    }
                    set.add(eventDelegate.getRecord());
                }
            }
            for (Map.Entry entry : records.entrySet()) {
                contract = (QDContract)entry.getKey();
                set = (Set)entry.getValue();
                QDDistributor distributor = DXPublisherImpl.this.endpoint.getCollector(contract).distributorBuilder().withFilter(CompositeFilters.forRecords((Collection<DataRecord>)set)).build();
                this.processors.add(new Processor(DXPublisherImpl.this.endpoint.getOrCreateExecutor(), contract, distributor, this));
            }
        }

        void start() {
            for (Processor<T> processor : this.processors) {
                processor.start();
            }
        }

        void close() {
            this.innerSubscription.close();
            for (Processor<T> processor : this.processors) {
                processor.close();
            }
        }
    }
}

