package org.mongodb.bolt;

import backtype.storm.tuple.Tuple;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.Mongo;
import com.mongodb.WriteConcern;
import java.io.Serializable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.mongodb.StormMongoObjectGrabber;
import org.mongodb.UpdateQueryCreator;

/* loaded from: input_file:org/mongodb/bolt/MongoBoltTask.class */
abstract class MongoBoltTask implements Runnable, Serializable {
    private static final long serialVersionUID = -6501658936124868951L;
    static Logger LOG = Logger.getLogger(MongoBoltTask.class);
    private AtomicBoolean running = new AtomicBoolean(true);
    protected LinkedBlockingQueue<Tuple> queue;
    protected Mongo mongo;
    protected DB db;
    protected UpdateQueryCreator updateQueryCreator;
    protected StormMongoObjectGrabber mapper;
    protected WriteConcern writeConcern;
    protected DBCollection collection;

    public void stopThread() {
        this.running.set(false);
    }

    public MongoBoltTask(LinkedBlockingQueue<Tuple> linkedBlockingQueue, Mongo mongo, DB db, DBCollection dBCollection, StormMongoObjectGrabber stormMongoObjectGrabber, WriteConcern writeConcern) {
        this.queue = linkedBlockingQueue;
        this.mongo = mongo;
        this.db = db;
        this.collection = dBCollection;
        this.mapper = stormMongoObjectGrabber;
        this.writeConcern = writeConcern;
    }

    public MongoBoltTask(LinkedBlockingQueue<Tuple> linkedBlockingQueue, Mongo mongo, DB db, DBCollection dBCollection, UpdateQueryCreator updateQueryCreator, StormMongoObjectGrabber stormMongoObjectGrabber, WriteConcern writeConcern) {
        this.queue = linkedBlockingQueue;
        this.mongo = mongo;
        this.db = db;
        this.collection = dBCollection;
        this.updateQueryCreator = updateQueryCreator;
        this.mapper = stormMongoObjectGrabber;
        this.writeConcern = writeConcern;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.running.get()) {
            try {
                Tuple poll = this.queue.poll();
                if (poll != null) {
                    if (LOG.isInfoEnabled()) {
                        LOG.info("Insert document");
                    }
                    execute(poll);
                } else {
                    Thread.sleep(50L);
                }
            } catch (Exception e) {
                if (this.running.get()) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    public abstract void execute(Tuple tuple);
}
