package org.mongodb.bolt;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import com.mongodb.BasicDBObject;
import com.mongodb.WriteConcern;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;
import org.mongodb.StormMongoObjectGrabber;

/* loaded from: input_file:org/mongodb/bolt/MongoInsertBolt.class */
public class MongoInsertBolt extends MongoBoltBase {
    static Logger LOG = Logger.getLogger(MongoInsertBolt.class);
    private LinkedBlockingQueue<Tuple> queue;
    private MongoBoltTask task;
    private Thread writeThread;
    private boolean inThread;

    public MongoInsertBolt(String str, String str2, StormMongoObjectGrabber stormMongoObjectGrabber, WriteConcern writeConcern) {
        super(str, str2, stormMongoObjectGrabber, writeConcern);
        this.queue = new LinkedBlockingQueue<>(10000);
    }

    public MongoInsertBolt(String str, String str2, StormMongoObjectGrabber stormMongoObjectGrabber, WriteConcern writeConcern, boolean z) {
        super(str, str2, stormMongoObjectGrabber, writeConcern);
        this.queue = new LinkedBlockingQueue<>(10000);
        this.inThread = z;
    }

    @Override // org.mongodb.bolt.MongoBoltBase
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        super.prepare(map, topologyContext, outputCollector);
        if (this.inThread) {
            this.task = new MongoBoltTask(this.queue, this.mongo, this.db, this.collection, this.mapper, this.writeConcern) { // from class: org.mongodb.bolt.MongoInsertBolt.1
                @Override // org.mongodb.bolt.MongoBoltTask
                public void execute(Tuple tuple) {
                    this.collection.insert(this.mapper.map(new BasicDBObject(), tuple), this.writeConcern);
                }
            };
            this.writeThread = new Thread(this.task);
            this.writeThread.start();
        }
    }

    @Override // org.mongodb.bolt.MongoBoltBase
    public void execute(Tuple tuple) {
        if (this.inThread) {
            this.queue.add(tuple);
        } else {
            try {
                this.collection.insert(this.mapper.map(new BasicDBObject(), tuple), this.writeConcern);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        afterExecuteTuple(tuple);
    }

    @Override // org.mongodb.bolt.MongoBoltBase
    public void afterExecuteTuple(Tuple tuple) {
    }

    @Override // org.mongodb.bolt.MongoBoltBase
    public void cleanup() {
        if (this.inThread) {
            this.task.stopThread();
        }
        this.mongo.close();
    }

    @Override // org.mongodb.bolt.MongoBoltBase
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }
}
