package kakfa_clj.core;

import clojure.lang.Keyword;
import clojure.lang.PersistentArrayMap;
import clojure.lang.PersistentVector;
import clojure.lang.RT;
import clojure.lang.Symbol;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:kakfa_clj/core/Consumer.class */
public class Consumer implements Iterable<Message> {
    private final Object connector;
    private final AtomicBoolean closed = new AtomicBoolean(false);

    private Consumer(Object obj) {
        this.connector = obj;
    }

    public static Consumer connect(KafkaConf kafkaConf, BrokerConf[] brokerConfArr, RedisConf redisConf, String... strArr) {
        return new Consumer(RT.var("kafka-clj.consumer.node", "create-node!").invoke(RT.var("clojure.core", "merge").invoke(kafkaConf.getConf(), PersistentArrayMap.createAsIfByAssoc(new Object[]{Keyword.intern("bootstrap-brokers"), Producer.toKeywordListMap(brokerConfArr), Keyword.intern("redis-conf"), redisConf.toMap()})), PersistentVector.create(Arrays.asList(strArr))));
    }

    public Message readMsg(long j) {
        return new Message((Map) RT.var("kafka-clj.consumer.node", "read-msg!").invoke(this.connector, Long.valueOf(j)));
    }

    public Message readMsg() {
        return new Message((Map) RT.var("kafka-clj.consumer.node", "read-msg!").invoke(this.connector));
    }

    public void addTopics(String... strArr) {
        RT.var("kafka-clj.consumer.node", "add-topics!").invoke(this.connector, PersistentVector.create(Arrays.asList(strArr)));
    }

    public void removeTopics(String... strArr) {
        RT.var("kafka-clj.consumer.node", "remove-topics!").invoke(this.connector, PersistentVector.create(Arrays.asList(strArr)));
    }

    @Override // java.lang.Iterable
    public Iterator<Message> iterator() {
        return new Iterator<Message>() { // from class: kakfa_clj.core.Consumer.1
            @Override // java.util.Iterator
            public boolean hasNext() {
                return !Consumer.this.closed.get();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public Message next() {
                return Consumer.this.readMsg();
            }

            @Override // java.util.Iterator
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }

    public void close() {
        this.closed.set(true);
        RT.var("kafka-clj.consumer.node", "shutdown-node!").invoke(this.connector);
    }

    private void usageConsumer() throws Exception {
        Consumer connect = connect(new KafkaConf(), new BrokerConf[]{new BrokerConf("192.168.4.40", 9092)}, new RedisConf("192.168.4.10", 6379, "test-group"), "my-topic");
        Message readMsg = connect.readMsg();
        readMsg.getTopic();
        readMsg.getPartition();
        readMsg.getOffset();
        readMsg.getBytes();
        Iterator<Message> it = connect.iterator();
        while (it.hasNext()) {
            System.out.println(it.next());
        }
    }

    static {
        RT.var("clojure.core", "require").invoke(Symbol.create("kafka-clj.consumer.node"));
    }
}
