/*
 * Decompiled with CFR 0.152.
 */
package com.devexperts.qd.qtp;

import com.devexperts.qd.DataConsumer;
import com.devexperts.qd.DataIterator;
import com.devexperts.qd.DataScheme;
import com.devexperts.qd.QDCollector;
import com.devexperts.qd.QDContract;
import com.devexperts.qd.QDDistributor;
import com.devexperts.qd.QDFilter;
import com.devexperts.qd.QDHistory;
import com.devexperts.qd.QDLog;
import com.devexperts.qd.QDStream;
import com.devexperts.qd.QDTicker;
import com.devexperts.qd.SubscriptionFilter;
import com.devexperts.qd.SubscriptionProvider;
import com.devexperts.qd.kit.CompositeFilters;
import com.devexperts.qd.ng.RecordListener;
import com.devexperts.qd.ng.RecordProvider;
import com.devexperts.qd.qtp.MasterMessageAdapter;
import com.devexperts.qd.qtp.MessageAdapter;
import com.devexperts.qd.qtp.MessageDescriptor;
import com.devexperts.qd.qtp.MessageType;
import com.devexperts.qd.qtp.MessageVisitor;
import com.devexperts.qd.qtp.ProtocolDescriptor;
import com.devexperts.qd.qtp.QDEndpoint;
import com.devexperts.qd.spi.QDFilterContext;
import com.devexperts.qd.spi.QDFilterFactory;
import com.devexperts.qd.stats.QDStats;
import com.devexperts.util.LogUtil;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicReferenceArray;

public class DistributorAdapter
extends MessageAdapter
implements QDFilter.UpdateListener {
    private static final QDContract[] QD_CONTRACTS = QDContract.values();
    private static final int N_CONTRACTS = QD_CONTRACTS.length;
    private static final MessageType[] ADD_MESSAGES = new MessageType[N_CONTRACTS];
    private static final MessageType[] REMOVE_MESSAGES = new MessageType[N_CONTRACTS];
    private final DataScheme scheme;
    private final QDFilter filter;
    private final QDCollector[] collectors = new QDCollector[N_CONTRACTS];
    private final AtomicReferenceArray<QDDistributor> distributors = new AtomicReferenceArray(N_CONTRACTS);
    private final QDFilter[] peerFilter = new QDFilter[N_CONTRACTS];
    private final RecordListener subListener = new SubListener();
    private int phaseAdd;
    private int phaseRemove;

    public DistributorAdapter(QDEndpoint endpoint, QDTicker ticker, QDStream stream, QDHistory history, SubscriptionFilter filter, QDStats stats) {
        super(endpoint, stats);
        this.scheme = DistributorAdapter.getCommonScheme(ticker, stream, history);
        this.filter = QDFilter.fromFilter(filter, this.scheme);
        this.collectors[QDContract.TICKER.ordinal()] = ticker;
        this.collectors[QDContract.STREAM.ordinal()] = stream;
        this.collectors[QDContract.HISTORY.ordinal()] = history;
    }

    public DistributorAdapter(QDTicker ticker, QDStream stream, QDHistory history, SubscriptionFilter filter, QDStats stats) {
        this(null, ticker, stream, history, filter, stats);
    }

    public QDCollector getCollector(QDContract contract) {
        return this.collectors[contract.ordinal()];
    }

    protected QDDistributor createDistributor(QDCollector collector, SubscriptionFilter filter, String keyProperties) {
        return collector.distributorBuilder().withFilter(QDFilter.fromFilter(filter, this.scheme)).withKeyProperties(keyProperties).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private QDDistributor getOrCreateDistributor(int i) {
        QDDistributor distributor = this.distributors.get(i);
        if (distributor != null) {
            return distributor;
        }
        QDCollector collector = this.collectors[i];
        if (collector == null) {
            return null;
        }
        DistributorAdapter distributorAdapter = this;
        synchronized (distributorAdapter) {
            distributor = this.distributors.get(i);
            if (distributor != null) {
                return distributor;
            }
            distributor = this.createDistributor(collector, CompositeFilters.makeAnd(this.peerFilter[i], this.filter), this.getStats().getFullKeyProperties());
            this.distributors.set(i, distributor);
        }
        return distributor;
    }

    @Override
    public String toString() {
        return super.toString() + (this.filter == QDFilter.ANYTHING ? "" : "[" + this.filter + "]");
    }

    @Override
    public DataScheme getScheme() {
        return this.scheme;
    }

    @Override
    public String getSymbol(char[] chars, int offset, int length) {
        for (int i = 0; i < N_CONTRACTS; ++i) {
            String s;
            QDCollector c = this.collectors[i];
            if (c == null || (s = c.getSymbol(chars, offset, length)) == null) continue;
            return s;
        }
        return null;
    }

    @Override
    public void filterUpdated(QDFilter filter) {
        this.markForImmediateRestart();
        this.close();
    }

    @Override
    protected void startImpl(MasterMessageAdapter master) {
        if (this.filter.isDynamic()) {
            QDLog.log.warn("Using dynamic filter '" + LogUtil.hideCredentials(this.filter) + "' in distributor address will cause connection reset when filter changes");
        }
        this.filter.addUpdateListener(this);
        if (!this.useDescribeProtocol) {
            this.sendSubscriptionFromAllCollectors();
        }
        super.startImpl(master);
    }

    @Override
    protected void closeImpl() {
        this.filter.removeUpdateListener(this);
        for (int i = 0; i < N_CONTRACTS; ++i) {
            if (this.distributors.get(i) == null) continue;
            this.distributors.get(i).close();
        }
    }

    @Override
    protected void processData(DataIterator iterator, MessageType message) {
        if (!message.isData()) {
            throw new IllegalArgumentException(message.toString());
        }
        if (!this.isAlive()) {
            this.reportIgnoredMessage("Adapter is " + this.getStatus(), message);
        } else {
            QDContract contract = message.getContract();
            if (contract != null) {
                QDDistributor distributor = this.getOrCreateDistributor(contract.ordinal());
                if (distributor == null) {
                    this.reportIgnoredMessage("Contract is not supported", message);
                } else {
                    distributor.processData(iterator);
                    return;
                }
            }
        }
        DataConsumer.VOID.processData(iterator);
    }

    @Override
    public boolean retrieveMessages(MessageVisitor visitor) {
        super.retrieveMessages(visitor);
        long mask = this.retrieveMask();
        mask = this.retrieveDescribeProtocolMessage(visitor, mask);
        mask = this.retrieveAddAndRemoveMessages(visitor, mask);
        this.addMask(mask);
        return mask != 0L;
    }

    @Override
    public void prepareProtocolDescriptor(ProtocolDescriptor desc) {
        super.prepareProtocolDescriptor(desc);
        QDFilter stableFilter = CompositeFilters.toStableFilter(this.filter);
        if (stableFilter != QDFilter.ANYTHING) {
            desc.setProperty("filter", stableFilter.toString());
        }
        for (QDContract contract : QD_CONTRACTS) {
            if (this.collectors[contract.ordinal()] == null) continue;
            desc.addSend(desc.newMessageDescriptor(MessageType.forAddSubscription(contract)));
            desc.addSend(desc.newMessageDescriptor(MessageType.forRemoveSubscription(contract)));
            desc.addReceive(desc.newMessageDescriptor(MessageType.forData(contract)));
        }
        desc.addReceive(desc.newMessageDescriptor(MessageType.RAW_DATA));
    }

    @Override
    public void processDescribeProtocol(ProtocolDescriptor desc, boolean logDescriptor) {
        super.processDescribeProtocol(desc, logDescriptor);
        QDFilterFactory filterFactory = CompositeFilters.getFactory(this.scheme);
        HashMap<String, QDFilter> filters = new HashMap<String, QDFilter>();
        filters.put(null, QDFilter.ANYTHING);
        for (QDContract contract : QD_CONTRACTS) {
            if (this.collectors[contract.ordinal()] == null) continue;
            boolean add = desc.canReceive(MessageType.forAddSubscription(contract));
            boolean remove = desc.canReceive(MessageType.forRemoveSubscription(contract));
            if (!add && !remove) continue;
            if (add) {
                MessageDescriptor md = desc.getReceive(MessageType.forAddSubscription(contract));
                String filter = md.getProperty("filter");
                if (!filters.containsKey(filter)) {
                    try {
                        filters.put(filter, filterFactory.createFilter(filter, QDFilterContext.REMOTE_FILTER));
                    }
                    catch (IllegalArgumentException e) {
                        QDLog.log.warn("Cannot parse filter '" + LogUtil.hideCredentials(filter) + "' from " + LogUtil.hideCredentials(this.getRemoteHostAddress()) + ": " + e);
                        filters.put(filter, QDFilter.ANYTHING);
                    }
                }
                this.peerFilter[contract.ordinal()] = (QDFilter)filters.get(filter);
            }
            QDDistributor distributor = this.getOrCreateDistributor(contract.ordinal());
            distributor.getAddedRecordProvider().setRecordListener(add ? this.subListener : RecordListener.VOID);
            distributor.getRemovedRecordProvider().setRecordListener(remove ? this.subListener : RecordListener.VOID);
        }
    }

    @Override
    public boolean isProtocolDescriptorCompatible(ProtocolDescriptor desc) {
        for (QDContract contract : QD_CONTRACTS) {
            if (this.collectors[contract.ordinal()] == null || !desc.canReceive(MessageType.forAddSubscription(contract)) && !desc.canSend(MessageType.forData(contract)) && !desc.canSend(MessageType.RAW_DATA)) continue;
            return true;
        }
        return false;
    }

    private void sendSubscriptionFromAllCollectors() {
        for (int i = 0; i < N_CONTRACTS; ++i) {
            if (this.collectors[i] == null) continue;
            QDDistributor distributor = this.getOrCreateDistributor(i);
            distributor.getAddedRecordProvider().setRecordListener(this.subListener);
            distributor.getRemovedRecordProvider().setRecordListener(this.subListener);
        }
    }

    private long retrieveAddAndRemoveMessages(MessageVisitor visitor, long mask) {
        int cur = this.phaseAdd;
        int stop = this.phaseAdd;
        boolean hasMore = false;
        do {
            SubscriptionProvider provider;
            MessageType message;
            if (DistributorAdapter.hasMessageMask(mask, message = ADD_MESSAGES[cur]) && !(hasMore = this.visitSubscription(visitor, provider = this.distributors.get(cur).getAddedSubscriptionProvider(), message))) {
                mask = DistributorAdapter.clearMessageMask(mask, message);
            }
            this.phaseAdd = cur = (cur + 1) % N_CONTRACTS;
        } while (!hasMore && cur != stop);
        if (!hasMore) {
            mask = this.retrieveRemoveMessages(visitor, mask);
        }
        return mask;
    }

    private long retrieveRemoveMessages(MessageVisitor visitor, long mask) {
        int cur = this.phaseRemove;
        int stop = this.phaseRemove;
        boolean hasMore = false;
        do {
            SubscriptionProvider provider;
            MessageType message;
            if (DistributorAdapter.hasMessageMask(mask, message = REMOVE_MESSAGES[cur]) && !(hasMore = this.visitSubscription(visitor, provider = this.distributors.get(cur).getRemovedSubscriptionProvider(), message))) {
                mask = DistributorAdapter.clearMessageMask(mask, message);
            }
            this.phaseRemove = cur = (cur + 1) % N_CONTRACTS;
        } while (!hasMore && cur != stop);
        return mask;
    }

    protected boolean visitSubscription(MessageVisitor visitor, SubscriptionProvider provider, MessageType message) {
        return visitor.visitSubscription(provider, message);
    }

    public void subscriptionAvailable(SubscriptionProvider provider) {
        if (provider == null) {
            throw new NullPointerException();
        }
        for (int i = 0; i < N_CONTRACTS; ++i) {
            QDDistributor distributor = this.distributors.get(i);
            if (distributor == null) continue;
            if (provider == distributor.getAddedSubscriptionProvider()) {
                this.subscriptionChanged(provider, ADD_MESSAGES[i]);
                return;
            }
            if (provider != distributor.getRemovedSubscriptionProvider()) continue;
            this.subscriptionChanged(provider, REMOVE_MESSAGES[i]);
            return;
        }
        throw new AssertionError((Object)("Unknown subscription provider: " + provider));
    }

    protected void subscriptionChanged(SubscriptionProvider provider, MessageType message) {
        this.addMask(DistributorAdapter.getMessageMask(message));
    }

    static {
        for (QDContract contract : QD_CONTRACTS) {
            DistributorAdapter.ADD_MESSAGES[contract.ordinal()] = MessageType.forAddSubscription(contract);
            DistributorAdapter.REMOVE_MESSAGES[contract.ordinal()] = MessageType.forRemoveSubscription(contract);
        }
    }

    private class SubListener
    implements RecordListener {
        private SubListener() {
        }

        @Override
        public void recordsAvailable(RecordProvider provider) {
            DistributorAdapter.this.subscriptionAvailable(provider);
        }
    }

    public static class Factory
    extends MessageAdapter.AbstractFactory {
        public Factory(QDTicker ticker, QDStream stream, QDHistory history, SubscriptionFilter filter) {
            super(ticker, stream, history, filter);
        }

        public Factory(QDEndpoint endpoint, SubscriptionFilter filter) {
            super(endpoint, filter);
        }

        public Factory(QDTicker ticker) {
            this(ticker, null, null, null);
        }

        public Factory(QDStream stream) {
            this(null, stream, null, null);
        }

        public Factory(QDHistory history) {
            this(null, null, history, null);
        }

        public Factory(QDCollector collector) {
            this(collector instanceof QDTicker ? (QDTicker)collector : null, collector instanceof QDStream ? (QDStream)collector : null, collector instanceof QDHistory ? (QDHistory)collector : null, null);
        }

        @Override
        public MessageAdapter createAdapter(QDStats stats) {
            return new DistributorAdapter(this.endpoint, this.ticker, this.stream, this.history, this.getFilter(), stats);
        }
    }
}

