/*
 * Decompiled with CFR 0.152.
 */
package zmq;

import java.util.ArrayDeque;
import java.util.Deque;
import zmq.Address;
import zmq.Blob;
import zmq.Ctx;
import zmq.Dist;
import zmq.IOThread;
import zmq.Msg;
import zmq.Mtrie;
import zmq.Options;
import zmq.Pipe;
import zmq.SessionBase;
import zmq.SocketBase;
import zmq.ZError;

public class XPub
extends SocketBase {
    private final Mtrie subscriptions;
    private final Dist dist;
    boolean verbose;
    private boolean more;
    private final Deque<Blob> pending;
    private static Mtrie.IMtrieHandler mark_as_matching = new Mtrie.IMtrieHandler(){

        public void invoke(Pipe pipe_, byte[] data, Object arg_) {
            XPub self = (XPub)arg_;
            self.dist.match(pipe_);
        }
    };
    private static Mtrie.IMtrieHandler send_unsubscription = new Mtrie.IMtrieHandler(){

        public void invoke(Pipe pipe_, byte[] data_, Object arg_) {
            XPub self = (XPub)arg_;
            if (self.options.type != 1) {
                Blob unsub = new Blob(data_.length + 1);
                unsub.put(0, (byte)0);
                unsub.put(1, data_);
                self.pending.add(unsub);
            }
        }
    };

    public XPub(Ctx parent_, int tid_, int sid_) {
        super(parent_, tid_, sid_);
        this.options.type = 9;
        this.verbose = false;
        this.more = false;
        this.subscriptions = new Mtrie();
        this.dist = new Dist();
        this.pending = new ArrayDeque<Blob>();
    }

    protected void xattach_pipe(Pipe pipe_, boolean icanhasall_) {
        assert (pipe_ != null);
        this.dist.attach(pipe_);
        if (icanhasall_) {
            this.subscriptions.add(null, pipe_);
        }
        this.xread_activated(pipe_);
    }

    protected void xread_activated(Pipe pipe_) {
        Msg sub = null;
        while ((sub = pipe_.read()) != null) {
            byte[] data = sub.data();
            int size = sub.size();
            if (size <= 0 || data[0] != 0 && data[0] != 1) continue;
            boolean unique = data[0] == 0 ? this.subscriptions.rm(data, 1, pipe_) : this.subscriptions.add(data, 1, pipe_);
            if (this.options.type != 9 || !unique && (data[0] != 1 || !this.verbose)) continue;
            this.pending.add(new Blob(sub.data()));
        }
    }

    protected void xwrite_activated(Pipe pipe_) {
        this.dist.activated(pipe_);
    }

    public boolean xsetsockopt(int option_, Object optval_) {
        if (option_ != 40) {
            ZError.errno(22);
            return false;
        }
        this.verbose = (Integer)optval_ == 1;
        return true;
    }

    protected void xterminated(Pipe pipe_) {
        this.subscriptions.rm(pipe_, send_unsubscription, this);
        this.dist.terminated(pipe_);
    }

    protected boolean xsend(Msg msg_, int flags_) {
        boolean rc;
        boolean msg_more = msg_.has_more();
        if (!this.more) {
            this.subscriptions.match(msg_.data(), msg_.size(), mark_as_matching, this);
        }
        if (!(rc = this.dist.send_to_matching(msg_, flags_))) {
            return rc;
        }
        if (!msg_more) {
            this.dist.unmatch();
        }
        this.more = msg_more;
        return true;
    }

    protected boolean xhas_out() {
        return this.dist.has_out();
    }

    protected Msg xrecv(int flags_) {
        if (this.pending.isEmpty()) {
            ZError.errno(35);
            return null;
        }
        Blob first = this.pending.pollFirst();
        Msg msg_ = new Msg(first.data());
        return msg_;
    }

    protected boolean xhas_in() {
        return !this.pending.isEmpty();
    }

    public static class XPubSession
    extends SessionBase {
        public XPubSession(IOThread io_thread_, boolean connect_, SocketBase socket_, Options options_, Address addr_) {
            super(io_thread_, connect_, socket_, options_, addr_);
        }
    }
}

