package com.corundumstudio.socketio.store;

import com.corundumstudio.socketio.store.pubsub.PubSubListener;
import com.corundumstudio.socketio.store.pubsub.PubSubMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
import io.netty.util.internal.PlatformDependent;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;

/* loaded from: classes.dex */
public class HazelcastPubSubStore implements PubSubStore {
    private final HazelcastInstance hazelcastPub;
    private final HazelcastInstance hazelcastSub;
    private final ConcurrentMap<String, Queue<String>> map = PlatformDependent.newConcurrentHashMap();
    private final Long nodeId;

    public HazelcastPubSubStore(HazelcastInstance hazelcastInstance, HazelcastInstance hazelcastInstance2, Long l) {
        this.hazelcastPub = hazelcastInstance;
        this.hazelcastSub = hazelcastInstance2;
        this.nodeId = l;
    }

    @Override // com.corundumstudio.socketio.store.pubsub.PubSubStore
    public void publish(PubSubType pubSubType, PubSubMessage pubSubMessage) {
        pubSubMessage.setNodeId(this.nodeId);
        this.hazelcastPub.getTopic(pubSubType.toString()).publish(pubSubMessage);
    }

    @Override // com.corundumstudio.socketio.store.pubsub.PubSubStore
    public void shutdown() {
    }

    @Override // com.corundumstudio.socketio.store.pubsub.PubSubStore
    public <T extends PubSubMessage> void subscribe(PubSubType pubSubType, final PubSubListener<T> pubSubListener, Class<T> cls) {
        Queue<String> putIfAbsent;
        String pubSubType2 = pubSubType.toString();
        String addMessageListener = this.hazelcastSub.getTopic(pubSubType2).addMessageListener(new MessageListener<T>() { // from class: com.corundumstudio.socketio.store.HazelcastPubSubStore.1
            /* JADX WARN: Multi-variable type inference failed */
            public void onMessage(Message<T> message) {
                if (HazelcastPubSubStore.this.nodeId.equals(((PubSubMessage) message.getMessageObject()).getNodeId())) {
                    return;
                }
                pubSubListener.onMessage(message.getMessageObject());
            }
        });
        Queue<String> queue = this.map.get(pubSubType2);
        if (queue == null && (putIfAbsent = this.map.putIfAbsent(pubSubType2, (queue = new ConcurrentLinkedQueue()))) != null) {
            queue = putIfAbsent;
        }
        queue.add(addMessageListener);
    }

    @Override // com.corundumstudio.socketio.store.pubsub.PubSubStore
    public void unsubscribe(PubSubType pubSubType) {
        String pubSubType2 = pubSubType.toString();
        Queue<String> remove = this.map.remove(pubSubType2);
        ITopic topic = this.hazelcastSub.getTopic(pubSubType2);
        Iterator<String> it = remove.iterator();
        while (it.hasNext()) {
            topic.removeMessageListener(it.next());
        }
    }
}
