/*
 * Decompiled with CFR 0.152.
 */
package com.devexperts.qd.qtp;

import com.devexperts.auth.AuthSession;
import com.devexperts.connector.proto.Configurable;
import com.devexperts.qd.DataScheme;
import com.devexperts.qd.QDAgent;
import com.devexperts.qd.QDCollector;
import com.devexperts.qd.QDContract;
import com.devexperts.qd.QDFilter;
import com.devexperts.qd.QDHistory;
import com.devexperts.qd.QDLog;
import com.devexperts.qd.QDStream;
import com.devexperts.qd.QDTicker;
import com.devexperts.qd.SubscriptionConsumer;
import com.devexperts.qd.SubscriptionFilter;
import com.devexperts.qd.SubscriptionIterator;
import com.devexperts.qd.kit.CompositeFilters;
import com.devexperts.qd.ng.RecordProvider;
import com.devexperts.qd.ng.RecordSource;
import com.devexperts.qd.qtp.AgentAdapterChannels;
import com.devexperts.qd.qtp.AgentChannel;
import com.devexperts.qd.qtp.ChannelShaper;
import com.devexperts.qd.qtp.DynamicChannelShaper;
import com.devexperts.qd.qtp.MessageAdapter;
import com.devexperts.qd.qtp.MessageDescriptor;
import com.devexperts.qd.qtp.MessageType;
import com.devexperts.qd.qtp.MessageVisitor;
import com.devexperts.qd.qtp.ProtocolDescriptor;
import com.devexperts.qd.qtp.QDEndpoint;
import com.devexperts.qd.qtp.auth.BasicChannelShaperFactory;
import com.devexperts.qd.qtp.auth.ChannelShapersFactory;
import com.devexperts.qd.spi.QDFilterContext;
import com.devexperts.qd.spi.QDFilterFactory;
import com.devexperts.qd.stats.QDStats;
import com.devexperts.qd.util.LegacyAdapter;
import com.devexperts.services.Services;
import com.devexperts.util.LogUtil;
import com.devexperts.util.LoggedThreadPoolExecutor;
import com.devexperts.util.TimePeriod;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;

public class AgentAdapter
extends MessageAdapter {
    private static final QDContract[] QD_CONTRACTS = QDContract.values();
    private static final int N_CONTRACTS = QD_CONTRACTS.length;
    private static final Iterable<ChannelShapersFactory> CHANNEL_SHAPERS_FACTORIES = Services.createServices(ChannelShapersFactory.class, null);
    private final DataScheme scheme;
    private final QDFilter filter;
    private Factory factory;
    final QDFilter[] peerFilter = new QDFilter[N_CONTRACTS];
    private ChannelShaper[] shapers;
    private AgentChannel[] channels;

    public AgentAdapter(QDEndpoint endpoint, QDTicker ticker, QDStream stream, QDHistory history, SubscriptionFilter filter, QDStats stats) {
        super(endpoint, stats);
        this.scheme = AgentAdapter.getCommonScheme(ticker, stream, history);
        this.filter = QDFilter.fromFilter(filter, this.scheme);
        ArrayList<DynamicChannelShaper> shapers = new ArrayList<DynamicChannelShaper>();
        if (ticker != null) {
            shapers.add(this.newDynamicShaper(ticker));
        }
        if (stream != null) {
            shapers.add(this.newDynamicShaper(stream));
        }
        if (history != null) {
            shapers.add(this.newDynamicShaper(history));
        }
        this.initialize(shapers.toArray(new ChannelShaper[shapers.size()]));
    }

    public AgentAdapter(QDTicker ticker, QDStream stream, QDHistory history, SubscriptionFilter filter, QDStats stats) {
        this(null, ticker, stream, history, filter, stats);
    }

    public AgentAdapter(DataScheme scheme, QDStats stats) {
        this(null, scheme, QDFilter.ANYTHING, stats);
    }

    private AgentAdapter(QDEndpoint endpoint, DataScheme scheme, QDFilter filter, QDStats stats) {
        super(endpoint, stats);
        if (scheme == null) {
            throw new NullPointerException();
        }
        this.scheme = scheme;
        this.filter = filter;
    }

    private DynamicChannelShaper newDynamicShaper(QDCollector collector) {
        DynamicChannelShaper shaper = new DynamicChannelShaper(collector.getContract(), null, this.filter);
        shaper.setCollector(collector);
        return shaper;
    }

    public synchronized AgentAdapter initialize(ChannelShaper ... shapers) {
        if (this.shapers != null) {
            throw new IllegalArgumentException("Already initialized");
        }
        this.shapers = (ChannelShaper[])shapers.clone();
        this.channels = new AgentChannel[shapers.length];
        return this;
    }

    protected QDAgent createAgent(QDCollector collector, SubscriptionFilter filter, String keyProperties) {
        return this.createAgentBuilder(collector, filter, keyProperties).build();
    }

    protected QDAgent.Builder createAgentBuilder(QDCollector collector, SubscriptionFilter filter, String keyProperties) {
        return collector.agentBuilder().withFilter(QDFilter.fromFilter(filter, this.scheme)).withKeyProperties(keyProperties).withOptSet(this.getRemoteOptSet());
    }

    @Override
    public String toString() {
        return super.toString() + (this.filter == QDFilter.ANYTHING ? "" : "[" + this.filter + "]");
    }

    @Override
    public DataScheme getScheme() {
        return this.scheme;
    }

    @Override
    public String getSymbol(char[] chars, int offset, int length) {
        QDCollector prevCollector = null;
        for (AgentChannel channel : this.channels) {
            QDCollector collector;
            if (channel == null || (collector = channel.shaper.getCollector()) == prevCollector || collector == null) continue;
            String result = collector.getSymbol(chars, offset, length);
            if (result != null) {
                return result;
            }
            prevCollector = collector;
        }
        return null;
    }

    public void updateChannel(ChannelShaper shaper) {
    }

    private void setAgentFactory(Factory factory) {
        this.factory = factory;
    }

    Factory getAgentFactory() {
        return this.factory;
    }

    @Override
    public boolean supportsMixedSubscription() {
        return true;
    }

    @Override
    protected void processSubscription(SubscriptionIterator iterator, MessageType message) {
        if (!message.isSubscription()) {
            throw new IllegalArgumentException(message.toString());
        }
        if (!this.isAlive()) {
            this.reportIgnoredMessage("Adapter is " + this.getStatus(), message);
        } else {
            boolean hasContract = false;
            QDContract contract = message.getContract();
            RecordSource sub = LegacyAdapter.of(iterator);
            long initialPosition = sub.getPosition();
            for (int i = 0; i < this.shapers.length; ++i) {
                ChannelShaper shaper = this.shapers[i];
                if (shaper.getContract() != contract) continue;
                hasContract = true;
                AgentChannel channel = this.getOrCreateChannelAt(i);
                sub.setPosition(initialPosition);
                channel.processSubscription(message, sub);
            }
            LegacyAdapter.release(iterator, sub);
            if (!hasContract) {
                this.reportIgnoredMessage("Contract is not supported", message);
            }
        }
        SubscriptionConsumer.VOID.processSubscription(iterator);
    }

    private AgentChannel getOrCreateChannelAt(int i) {
        AgentChannel channel = this.channels[i];
        if (channel == null) {
            ChannelShaper shaper = this.shapers[i];
            channel = new AgentChannel(this, shaper);
            shaper.bind(channel);
            this.channels[i] = channel;
        }
        return channel;
    }

    @Override
    protected void closeImpl() {
        assert (Thread.holdsLock(this));
        if (this.channels != null) {
            for (AgentChannel channel : this.channels) {
                if (channel == null) continue;
                channel.close();
            }
        }
    }

    @Override
    public boolean retrieveMessages(MessageVisitor visitor) {
        super.retrieveMessages(visitor);
        long mask = this.retrieveMask();
        mask = this.retrieveDescribeProtocolMessage(visitor, mask);
        this.addMask(mask);
        boolean result = mask != 0L;
        return result |= this.retrieveDataMessages(visitor);
    }

    @Override
    public void prepareProtocolDescriptor(ProtocolDescriptor desc) {
        super.prepareProtocolDescriptor(desc);
        HashMap<MessageDescriptor, String> messageTypeFilters = new HashMap<MessageDescriptor, String>();
        for (QDContract contract : QD_CONTRACTS) {
            QDFilter combinedFilter = QDFilter.NOTHING;
            for (ChannelShaper shaper : this.shapers) {
                if (shaper.getContract() != contract) continue;
                combinedFilter = CompositeFilters.makeOr(combinedFilter, shaper.getSubscriptionFilter().toStableFilter());
            }
            if (combinedFilter == QDFilter.NOTHING) continue;
            MessageDescriptor addSubscriptionMessage = desc.newMessageDescriptor(MessageType.forAddSubscription(contract));
            desc.addSend(desc.newMessageDescriptor(MessageType.forData(contract)));
            desc.addReceive(addSubscriptionMessage);
            desc.addReceive(desc.newMessageDescriptor(MessageType.forRemoveSubscription(contract)));
            messageTypeFilters.put(addSubscriptionMessage, combinedFilter.toString());
        }
        HashSet filtersStringSet = new HashSet(messageTypeFilters.values());
        if (filtersStringSet.size() == 1) {
            String filter = (String)filtersStringSet.iterator().next();
            if (!filter.equals(QDFilter.ANYTHING.toString())) {
                desc.setProperty("filter", filter);
            }
        } else {
            messageTypeFilters.entrySet().stream().filter(entry -> !((String)entry.getValue()).isEmpty()).forEach(entry -> ((MessageDescriptor)entry.getKey()).setProperty("filter", (String)entry.getValue()));
        }
    }

    @Override
    public void processDescribeProtocol(ProtocolDescriptor desc, boolean logDescriptor) {
        super.processDescribeProtocol(desc, logDescriptor);
        if (this.channels == null) {
            return;
        }
        QDFilterFactory filterFactory = CompositeFilters.getFactory(this.scheme);
        HashMap<String, QDFilter> filters = new HashMap<String, QDFilter>();
        filters.put(null, QDFilter.ANYTHING);
        for (QDContract contract : QD_CONTRACTS) {
            MessageDescriptor md = desc.getReceive(MessageType.forData(contract));
            if (md == null) continue;
            String filter = md.getProperty("filter");
            if (!filters.containsKey(filter)) {
                try {
                    filters.put(filter, filterFactory.createFilter(filter, QDFilterContext.REMOTE_FILTER));
                }
                catch (IllegalArgumentException e) {
                    QDLog.log.warn("Cannot parse filter '" + LogUtil.hideCredentials(filter) + "' from " + LogUtil.hideCredentials(this.getRemoteHostAddress()), e);
                    filters.put(filter, QDFilter.ANYTHING);
                }
            }
            this.peerFilter[contract.ordinal()] = (QDFilter)filters.get(filter);
        }
    }

    @Override
    public boolean isProtocolDescriptorCompatible(ProtocolDescriptor desc) {
        if (this.shapers == null) {
            return true;
        }
        for (QDContract contract : QD_CONTRACTS) {
            boolean hasContract = false;
            for (ChannelShaper shaper : this.shapers) {
                if (shaper.getContract() != contract) continue;
                hasContract = true;
            }
            if (!hasContract || !desc.canSend(MessageType.forAddSubscription(contract)) || !desc.canReceive(MessageType.forData(contract))) continue;
            return true;
        }
        return false;
    }

    protected boolean retrieveDataMessages(MessageVisitor visitor) {
        if (this.channels == null) {
            return false;
        }
        int iterations = this.channels.length;
        while (--iterations >= 0) {
            long currentTime = System.currentTimeMillis();
            double minDistance = Double.POSITIVE_INFINITY;
            AgentChannel dueChannel = null;
            for (AgentChannel channel : this.channels) {
                if (channel == null || !channel.hasSnapshotOrDataForNow(currentTime)) continue;
                if (channel.quota >= 1.0) {
                    minDistance = 0.0;
                    dueChannel = channel;
                    break;
                }
                double distance = (1.0 - channel.quota) / (double)channel.shaper.getWeight();
                if (!(distance < minDistance)) continue;
                minDistance = distance;
                dueChannel = channel;
            }
            if (dueChannel == null) {
                return false;
            }
            if (minDistance > 0.0) {
                for (AgentChannel channel : this.channels) {
                    if (channel == null) continue;
                    channel.quota += minDistance * (double)channel.shaper.getWeight();
                    if (!(channel.quota >= 1.0)) continue;
                    channel.quota = 1.0;
                }
            }
            dueChannel.quota = 0.0;
            if (!dueChannel.retrieveSnapshotOrData(visitor, currentTime)) continue;
            return true;
        }
        return true;
    }

    @Override
    public void reinitConfiguration(AuthSession session) {
        if (session == null) {
            if (this.hasAuthRealm()) {
                return;
            }
            if (this.channels == null) {
                this.initialize(this.factory.getAgentAdapterChannels().getNewShapers());
            }
            return;
        }
        ChannelShaper[] shapers = null;
        if (CHANNEL_SHAPERS_FACTORIES != null) {
            ChannelShapersFactory shapersFactory;
            Iterator<ChannelShapersFactory> iterator = CHANNEL_SHAPERS_FACTORIES.iterator();
            while (iterator.hasNext() && (shapers = (shapersFactory = iterator.next()).createChannelShapers(this, session)) == null) {
            }
        }
        if (shapers == null) {
            shapers = BasicChannelShaperFactory.INSTANCE.createChannelShapers(this, session);
        }
        this.initialize(shapers);
    }

    @Override
    public long nextRetrieveTime(long currentTime) {
        long time = super.nextRetrieveTime(currentTime);
        if (this.channels != null) {
            for (AgentChannel channel : this.channels) {
                if (channel == null) continue;
                time = Math.min(time, channel.nextRetrieveTime(currentTime));
            }
        }
        return time;
    }

    protected boolean visitData(MessageVisitor visitor, RecordProvider provider, MessageType message) {
        return visitor.visitData(provider, message);
    }

    public static class Factory
    extends MessageAdapter.AbstractFactory {
        private TimePeriod aggregationPeriod = TimePeriod.valueOf(0L);
        private AgentAdapterChannels channels;
        private int subscriptionThreads;
        private Executor subscriptionExecutor;

        public Factory(QDTicker ticker, QDStream stream, QDHistory history, SubscriptionFilter filter) {
            super(ticker, stream, history, filter);
        }

        public Factory(QDEndpoint endpoint, SubscriptionFilter filter) {
            super(endpoint, filter);
        }

        public Factory(QDTicker ticker) {
            this(ticker, null, null, null);
        }

        public Factory(QDStream stream) {
            this(null, stream, null, null);
        }

        public Factory(QDHistory history) {
            this(null, null, history, null);
        }

        public Factory(QDCollector collector) {
            this(collector instanceof QDTicker ? (QDTicker)collector : null, collector instanceof QDStream ? (QDStream)collector : null, collector instanceof QDHistory ? (QDHistory)collector : null, null);
            this.channels = new AgentAdapterChannels("", this);
        }

        public synchronized TimePeriod getAggregationPeriod() {
            return this.aggregationPeriod;
        }

        @Configurable(description="default aggregation period for all channels")
        public synchronized void setAggregationPeriod(TimePeriod aggregationPeriod) {
            if (aggregationPeriod.equals(this.aggregationPeriod)) {
                return;
            }
            if (aggregationPeriod.getTime() < 0L) {
                throw new IllegalArgumentException("cannot be negative");
            }
            this.aggregationPeriod = aggregationPeriod;
            this.rebuildChannels();
        }

        public synchronized String getChannels() {
            return this.channels == null ? "" : this.channels.toString();
        }

        @Configurable(description="channels configuration string")
        public synchronized void setChannels(String channels) {
            if (channels == null) {
                throw new NullPointerException();
            }
            this.channels = new AgentAdapterChannels(channels, this);
        }

        public synchronized int getSubscriptionThreads() {
            return this.subscriptionThreads;
        }

        @Configurable(description="size of the pool for subscription-handling threads\n(zero default - does not use separate thread pool for subscription)")
        public synchronized void setSubscriptionThreads(int subscriptionThreads) {
            if (subscriptionThreads == this.subscriptionThreads) {
                return;
            }
            if (subscriptionThreads < 0) {
                throw new IllegalArgumentException("cannot be negative");
            }
            this.subscriptionThreads = subscriptionThreads;
            this.rebuildChannels();
        }

        public synchronized Executor getSubscriptionExecutor() {
            return this.subscriptionExecutor;
        }

        @Configurable(description="explicit subscription executor")
        public synchronized void setSubscriptionExecutor(Executor subscriptionExecutor) {
            this.subscriptionExecutor = subscriptionExecutor;
            this.rebuildChannels();
        }

        synchronized Executor getOrCreateSubscriptionExecutor() {
            if (this.subscriptionExecutor != null) {
                return this.subscriptionExecutor;
            }
            if (this.subscriptionThreads > 0) {
                this.subscriptionExecutor = new LoggedThreadPoolExecutor(this.subscriptionThreads, this + "-Subscription", QDLog.log);
            }
            return this.subscriptionExecutor;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Nonnull
        AgentAdapterChannels getAgentAdapterChannels() {
            AgentAdapterChannels channels;
            Factory factory = this;
            synchronized (factory) {
                if (this.channels == null) {
                    this.channels = new AgentAdapterChannels("", this);
                }
                channels = this.channels;
            }
            return channels;
        }

        private void rebuildChannels() {
            if (this.channels != null) {
                this.channels = new AgentAdapterChannels(this.channels.toString(), this);
            }
        }

        @Override
        public MessageAdapter createAdapter(QDStats stats) {
            AgentAdapter adapter = new AgentAdapter(this.endpoint, MessageAdapter.getCommonScheme(this.ticker, this.stream, this.history), this.getFilter(), stats);
            adapter.setAgentFactory(this);
            return adapter;
        }
    }
}

