/*
 * Decompiled with CFR 0.152.
 */
package com.corundumstudio.socketio.store;

import com.corundumstudio.socketio.store.pubsub.PubSubListener;
import com.corundumstudio.socketio.store.pubsub.PubSubMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;
import io.netty.util.internal.PlatformDependent;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;

public class RedissonPubSubStore
implements PubSubStore {
    private final RedissonClient redissonPub;
    private final RedissonClient redissonSub;
    private final Long nodeId;
    private final ConcurrentMap<String, Queue<Integer>> map = PlatformDependent.newConcurrentHashMap();

    public RedissonPubSubStore(RedissonClient redissonPub, RedissonClient redissonSub, Long nodeId) {
        this.redissonPub = redissonPub;
        this.redissonSub = redissonSub;
        this.nodeId = nodeId;
    }

    @Override
    public void publish(PubSubType type, PubSubMessage msg) {
        msg.setNodeId(this.nodeId);
        this.redissonPub.getTopic(type.toString()).publish((Object)msg);
    }

    @Override
    public <T extends PubSubMessage> void subscribe(PubSubType type, final PubSubListener<T> listener, Class<T> clazz) {
        Queue oldList;
        String name = type.toString();
        RTopic topic = this.redissonSub.getTopic(name);
        int regId = topic.addListener(PubSubMessage.class, (MessageListener)new MessageListener<PubSubMessage>(){
            final /* synthetic */ RedissonPubSubStore this$0;
            {
                this.this$0 = this$0;
            }

            public void onMessage(CharSequence channel, PubSubMessage msg) {
                if (!this.this$0.nodeId.equals(msg.getNodeId())) {
                    listener.onMessage(msg);
                }
            }
        });
        Queue<Integer> list = (ConcurrentLinkedQueue<Integer>)this.map.get(name);
        if (list == null && (oldList = (Queue)this.map.putIfAbsent(name, list = new ConcurrentLinkedQueue<Integer>())) != null) {
            list = oldList;
        }
        list.add(regId);
    }

    @Override
    public void unsubscribe(PubSubType type) {
        String name = type.toString();
        Queue regIds = (Queue)this.map.remove(name);
        RTopic topic = this.redissonSub.getTopic(name);
        for (Integer id : regIds) {
            topic.removeListener(new Integer[]{id});
        }
    }

    @Override
    public void shutdown() {
    }
}

