/*
 * Decompiled with CFR 0.152.
 */
package org.nodex.java.addons.old.stomp;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.nodex.java.addons.old.stomp.Frame;
import org.nodex.java.addons.old.stomp.FrameHandler;
import org.nodex.java.addons.old.stomp.Parser;
import org.nodex.java.addons.old.stomp.StompMsgCallback;
import org.nodex.java.core.buffer.Buffer;
import org.nodex.java.core.net.NetSocket;

public class StompConnection {
    public static final String CORRELATION_ID_HEADER = "correlation-id";
    public static final String REPLY_TO_HEADER = "reply-to";
    private final NetSocket socket;
    private FrameHandler errorHandler;
    private Runnable connectHandler;
    protected boolean connected;
    private Map<String, StompMsgCallback> subscriptions = new HashMap<String, StompMsgCallback>();
    private Map<String, Runnable> waitingReceipts = new ConcurrentHashMap<String, Runnable>();
    private Map<String, StompMsgCallback> callbacks = new ConcurrentHashMap<String, StompMsgCallback>();
    private volatile String responseQueue;

    protected StompConnection(NetSocket socket) {
        this.socket = socket;
        socket.dataHandler(new Parser(new FrameHandler(){

            @Override
            public void onFrame(Frame frame) {
                StompConnection.this.handleFrame(frame);
            }
        }));
    }

    public void error(FrameHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    public void close() {
        this.socket.close();
    }

    public void send(String dest, Buffer body) {
        this.send(dest, new HashMap<String, String>(4), body, null);
    }

    public void send(String dest, Map<String, String> headers, Buffer body) {
        this.send(dest, headers, body, null);
    }

    public void send(String dest, Buffer body, Runnable completeCallback) {
        this.send(dest, new HashMap<String, String>(4), body, completeCallback);
    }

    public void send(String dest, Map<String, String> headers, Buffer body, Runnable completeCallback) {
        Frame frame = new Frame("SEND", headers, body);
        frame.headers.put("destination", dest);
        this.addReceipt(frame, completeCallback);
        this.write(frame);
    }

    private synchronized void setupResponseHandler() {
        if (this.responseQueue == null) {
            String queueName = UUID.randomUUID().toString();
            this.subscribe(queueName, new StompMsgCallback(){

                @Override
                public void onMessage(Map<String, String> headers, Buffer body) {
                    String cid = headers.get(StompConnection.CORRELATION_ID_HEADER);
                    if (cid == null) {
                        System.err.println("No correlation id");
                    } else {
                        StompMsgCallback cb = (StompMsgCallback)StompConnection.this.callbacks.remove(cid);
                        if (cb == null) {
                            System.err.println("No STOMP callback for correlation id");
                        } else {
                            cb.onMessage(headers, body);
                        }
                    }
                }
            });
            this.responseQueue = queueName;
        }
    }

    public synchronized void subscribe(String dest, StompMsgCallback messageCallback) {
        this.subscribe(dest, messageCallback, null);
    }

    public synchronized void subscribe(String dest, StompMsgCallback messageCallback, Runnable completeCallback) {
        if (this.subscriptions.containsKey(dest)) {
            throw new IllegalArgumentException("Already subscribed to " + dest);
        }
        this.subscriptions.put(dest, messageCallback);
        Frame frame = Frame.subscribeFrame(dest);
        this.addReceipt(frame, completeCallback);
        this.write(frame);
    }

    public synchronized void unsubscribe(String dest) {
        this.unsubscribe(dest, null);
    }

    public synchronized void unsubscribe(String dest, Runnable completeCallback) {
        this.subscriptions.remove(dest);
        Frame frame = Frame.unsubscribeFrame(dest);
        this.addReceipt(frame, completeCallback);
        this.write(frame);
    }

    public void write(Frame frame) {
        this.socket.write(frame.toBuffer());
    }

    protected void connect(String username, String password, Runnable connectHandler) {
        this.connectHandler = connectHandler;
        this.write(Frame.connectFrame(username, password));
    }

    private synchronized void handleMessage(Frame msg) {
        String dest = msg.headers.get("destination");
        StompMsgCallback sub = this.subscriptions.get(dest);
        sub.onMessage(msg.headers, msg.body);
    }

    private void addReceipt(Frame frame, Runnable callback) {
        if (callback != null) {
            String receipt = UUID.randomUUID().toString();
            frame.headers.put("receipt", receipt);
            this.waitingReceipts.put(receipt, callback);
        }
    }

    protected void handleFrame(Frame frame) {
        if (!this.connected) {
            if (!"CONNECTED".equals(frame.command)) {
                throw new IllegalStateException("Expected CONNECTED frame, got: " + frame.command);
            }
            this.connected = true;
            this.connectHandler.run();
        } else if ("MESSAGE".equals(frame.command)) {
            this.handleMessage(frame);
        } else if ("RECEIPT".equals(frame.command)) {
            String receipt = frame.headers.get("receipt-id");
            Runnable callback = this.waitingReceipts.get(receipt);
            callback.run();
        } else if ("ERROR".equals(frame.command) && this.errorHandler != null) {
            this.errorHandler.onFrame(frame);
        }
    }
}

