/*
 * Decompiled with CFR 0.152.
 */
package de.willuhn.jameica.messaging;

import de.willuhn.jameica.messaging.Message;
import de.willuhn.jameica.messaging.MessageConsumer;
import de.willuhn.jameica.messaging.MessagingQueue;
import de.willuhn.jameica.messaging.StatusBarMessage;
import de.willuhn.jameica.system.Application;
import de.willuhn.jameica.system.OperationCanceledException;
import de.willuhn.logging.Level;
import de.willuhn.logging.Logger;
import de.willuhn.util.ApplicationException;
import de.willuhn.util.Queue;
import java.util.LinkedList;
import java.util.List;

public final class NamedQueue
implements MessagingQueue {
    private static final int MAX_MESSAGES = 1000;
    private static Worker worker = null;
    private List<MessageConsumer> consumers = new LinkedList<MessageConsumer>();
    private Queue messages = new Queue(1000);
    private String name = null;

    NamedQueue(String name) {
        this.name = name;
        Logger.debug((String)("creating message queue " + this.name));
        if (worker == null) {
            Logger.debug((String)"starting messaging worker thread");
            worker = new Worker();
            worker.start();
        }
        worker.register(this);
    }

    public String getName() {
        return this.name;
    }

    @Override
    public int getQueueSize() {
        if (this.messages == null) {
            return 0;
        }
        return this.messages.size();
    }

    @Override
    public void registerMessageConsumer(MessageConsumer consumer) {
        if (consumer == null) {
            return;
        }
        Logger.debug((String)("queue " + this.name + ": registering message consumer " + consumer.getClass().getName()));
        this.consumers.add(consumer);
    }

    @Override
    public void unRegisterMessageConsumer(MessageConsumer consumer) {
        if (this.consumers == null) {
            Logger.debug((String)"queue already shut down, skip unregistering");
            return;
        }
        if (consumer == null) {
            return;
        }
        Logger.debug((String)("queue " + this.name + ": unregistering message consumer " + consumer.getClass().getName()));
        this.consumers.remove(consumer);
    }

    @Override
    public void queueMessage(Message message) {
        Logger.warn((String)"queueing not supported - delivering without queueing");
        this.sendMessage(message);
    }

    @Override
    public synchronized void close() {
        this.flush();
        worker.unregister(this);
        this.consumers = null;
        this.messages = null;
    }

    @Override
    public void flush() {
        try {
            worker.wakeup();
            while (this.messages != null && this.messages.size() > 0) {
                Thread.sleep(5L);
            }
        }
        catch (Exception e) {
            Logger.error((String)"unable to flush queue", (Throwable)e);
        }
    }

    @Override
    public void sendMessage(Message message) {
        if (message == null) {
            return;
        }
        if (this.consumers.size() == 0) {
            Logger.debug((String)"no message consumers defined, ignoring message");
            return;
        }
        try {
            this.messages.push((Object)message);
            worker.wakeup();
        }
        catch (Queue.QueueFullException e) {
            Logger.error((String)("unable to send message " + message.toString() + " - queue " + this.name + " full"));
        }
    }

    @Override
    public void sendSyncMessage(Message message) {
        if (message == null) {
            return;
        }
        if (this.consumers.size() == 0) {
            Logger.debug((String)"no message consumers defined, ignoring message");
            return;
        }
        worker.send(this.consumers, message);
    }

    private static class Worker
    extends Thread {
        private Object lock = new Object();
        private List<NamedQueue> queues = new LinkedList<NamedQueue>();
        private boolean quit = false;

        private Worker() {
            super("Jameica Messaging Worker Thread");
        }

        private void register(NamedQueue queue) {
            this.queues.add(queue);
        }

        private void unregister(NamedQueue queue) {
            this.wakeup();
            Logger.debug((String)("closing queue: " + queue.getName()));
            if (this.queues.contains(queue) && this.queues.size() == 1) {
                this.quit = true;
            }
            this.queues.remove(queue);
            if (this.quit) {
                try {
                    Logger.debug((String)"shutting down messaging factory");
                    this.wakeup();
                    this.join(5000L);
                }
                catch (Exception e) {
                    Logger.error((String)"error while waiting for worker shutdown", (Throwable)e);
                }
                Logger.debug((String)"messaging factory shut down");
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void wakeup() {
            Object object = this.lock;
            synchronized (object) {
                this.lock.notifyAll();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void send(List<MessageConsumer> consumers, Message msg) {
            if (this.quit) {
                Logger.warn((String)"shutdown in progress, no more messages accepted");
                return;
            }
            Logger.debug((String)("sending message " + msg.toString()));
            MessageConsumer consumer = null;
            List<MessageConsumer> list = consumers;
            synchronized (list) {
                for (int i = 0; i < consumers.size(); ++i) {
                    boolean send;
                    consumer = consumers.get(i);
                    Class[] expected = consumer.getExpectedMessageTypes();
                    boolean bl = send = expected == null;
                    if (expected != null) {
                        for (int j = 0; j < expected.length; ++j) {
                            if (!expected[j].isInstance(msg)) continue;
                            send = true;
                            break;
                        }
                    }
                    try {
                        if (!send) continue;
                        consumer.handleMessage(msg);
                        continue;
                    }
                    catch (ApplicationException ae) {
                        Application.getMessagingFactory().sendSyncMessage(new StatusBarMessage(ae.getMessage(), 1));
                        continue;
                    }
                    catch (OperationCanceledException oce) {
                        Logger.debug((String)("consumer " + consumer.getClass().getName() + " cancelled message " + msg));
                        continue;
                    }
                    catch (Throwable t) {
                        Logger.error((String)("consumer " + consumer.getClass().getName() + " produced an error (" + t.getClass().getName() + ": " + t + ") while consuming message " + msg));
                        Logger.write((Level)Level.INFO, (String)"error while processing message", (Throwable)t);
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (!this.quit) {
                for (int i = 0; i < this.queues.size() && !this.quit; ++i) {
                    try {
                        NamedQueue queue = this.queues.get(i);
                        while (queue.messages != null && queue.messages.size() > 0) {
                            this.send(queue.consumers, (Message)queue.messages.pop());
                        }
                        continue;
                    }
                    catch (Exception e) {
                        Logger.write((Level)Level.DEBUG, (String)"error while processing queue", (Throwable)e);
                        return;
                    }
                }
                try {
                    Object object = this.lock;
                    synchronized (object) {
                        this.lock.wait(60000L);
                    }
                }
                catch (InterruptedException interruptedException) {
                }
            }
        }
    }
}

