package org.xerial.silk.hadoop;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.xerial.core.XerialException;
import org.xerial.silk.SilkEvent;
import org.xerial.silk.SilkLinePushParser;
import org.xerial.silk.impl.SilkLineLexer;
import org.xerial.util.log.Logger;
import org.xerial.util.opt.Argument;
import org.xerial.util.opt.Option;
import org.xerial.util.opt.OptionParser;

/* loaded from: input_file:org/xerial/silk/hadoop/SilkDistributedParser.class */
public class SilkDistributedParser {
    private static Logger _logger = Logger.getLogger((Class<?>) SilkDistributedParser.class);

    /* loaded from: input_file:org/xerial/silk/hadoop/SilkDistributedParser$BlockComparator.class */
    public static class BlockComparator implements RawComparator<LinePos> {
        public int compare(byte[] bArr, int i, int i2, byte[] bArr2, int i3, int i4) {
            return WritableComparator.compareBytes(bArr, i, 4, bArr2, i3, 4);
        }

        public int compare(LinePos linePos, LinePos linePos2) {
            return linePos.block - linePos2.block;
        }
    }

    /* loaded from: input_file:org/xerial/silk/hadoop/SilkDistributedParser$BlockPartitioner.class */
    public static class BlockPartitioner extends Partitioner<LinePos, Text> {
        public int getPartition(LinePos linePos, Text text, int i) {
            return (linePos.block * 127) % i;
        }
    }

    /* loaded from: input_file:org/xerial/silk/hadoop/SilkDistributedParser$Config.class */
    public static class Config {

        @Option(symbol = "h", longName = "help", description = "display help message")
        boolean displayHelp = false;

        @Argument
        String inputFile = null;

        @Option(symbol = "o", description = "output file name")
        String outputFile = null;

        @Option(symbol = "v", description = "display verbose messages")
        boolean displayVerboseMessage = false;
    }

    /* loaded from: input_file:org/xerial/silk/hadoop/SilkDistributedParser$LinePos.class */
    public static class LinePos implements WritableComparable<LinePos> {
        public int block;
        public int offset;

        /* loaded from: input_file:org/xerial/silk/hadoop/SilkDistributedParser$LinePos$Comparator.class */
        public static class Comparator extends WritableComparator {
            public Comparator() {
                super(LinePos.class);
            }

            public int compare(byte[] bArr, int i, int i2, byte[] bArr2, int i3, int i4) {
                return compareBytes(bArr, i, i2, bArr2, i3, i4);
            }
        }

        public LinePos() {
        }

        public LinePos(int i, int i2) {
            this.block = i;
            this.offset = i2;
        }

        public void readFields(DataInput dataInput) throws IOException {
            this.block = dataInput.readInt();
            this.offset = dataInput.readInt();
        }

        public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeInt(this.block);
            dataOutput.writeInt(this.offset);
        }

        public int hashCode() {
            return (this.block * 157) + this.offset;
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof LinePos)) {
                return false;
            }
            LinePos linePos = (LinePos) LinePos.class.cast(obj);
            return (this.block == linePos.block) & (this.offset == linePos.offset);
        }

        public int compareTo(LinePos linePos) {
            int i = this.block - linePos.block;
            return i == 0 ? i : this.offset - linePos.offset;
        }

        public String toString() {
            return String.format("%d(%d)", Integer.valueOf(this.block), Integer.valueOf(this.offset));
        }

        static {
            WritableComparator.define(LinePos.class, new Comparator());
        }
    }

    /* loaded from: input_file:org/xerial/silk/hadoop/SilkDistributedParser$SilkBlockMapper.class */
    public static class SilkBlockMapper extends Mapper<LongWritable, Iterator<String>, LinePos, Text> {
        final int blockSize = 1024;
        private final SilkLineLexer lexer = new SilkLineLexer();

        protected void map(LongWritable longWritable, Iterator<String> it, Mapper<LongWritable, Iterator<String>, LinePos, Text>.Context context) throws IOException, InterruptedException {
            int i = 0;
            while (it.hasNext()) {
                try {
                    String next = it.next();
                    SilkDistributedParser._logger.info(String.format("map: (%s, %s)", longWritable.toString(), next));
                    SilkEvent parseLine = SilkLinePushParser.parseLine(this.lexer, next);
                    context.progress();
                    context.write(new LinePos((int) (longWritable.get() / 1024), i), new Text(parseLine.getType().toString()));
                    i++;
                } catch (XerialException e) {
                    e.printStackTrace();
                    return;
                }
            }
        }

        protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((LongWritable) obj, (Iterator<String>) obj2, (Mapper<LongWritable, Iterator<String>, LinePos, Text>.Context) context);
        }
    }

    /* loaded from: input_file:org/xerial/silk/hadoop/SilkDistributedParser$SilkBlockReducer.class */
    public static class SilkBlockReducer extends Reducer<LinePos, Text, IntWritable, Text> {
        protected void reduce(LinePos linePos, Iterable<Text> iterable, Reducer<LinePos, Text, IntWritable, Text>.Context context) throws IOException, InterruptedException {
            SilkDistributedParser._logger.info("reducer is invoked");
            Iterator<Text> it = iterable.iterator();
            while (it.hasNext()) {
                SilkDistributedParser._logger.info(String.format("reduce: (%s, %s)", linePos.toString(), it.next().toString()));
            }
            context.write(new IntWritable(linePos.block), new Text(""));
        }

        protected /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
            reduce((LinePos) obj, (Iterable<Text>) iterable, (Reducer<LinePos, Text, IntWritable, Text>.Context) context);
        }
    }

    public void execute(Config config) throws Exception {
        if (config.inputFile == null) {
            _logger.error("no input file is given");
            return;
        }
        if (config.outputFile == null) {
            config.outputFile = config.inputFile + ".out";
        }
        Configuration configuration = new Configuration();
        FileSystem fileSystem = FileSystem.get(configuration);
        if (fileSystem.exists(new Path(config.outputFile))) {
            fileSystem.delete(new Path(config.outputFile), true);
        }
        Job job = new Job(configuration, "Silk distributed parser");
        job.setJarByClass(SilkDistributedParser.class);
        job.setInputFormatClass(SilkBlockInputFormat.class);
        job.setMapperClass(SilkBlockMapper.class);
        job.setMapOutputKeyClass(LinePos.class);
        job.setMapOutputValueClass(Text.class);
        job.setPartitionerClass(BlockPartitioner.class);
        job.setGroupingComparatorClass(BlockComparator.class);
        job.setReducerClass(SilkBlockReducer.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(config.inputFile));
        FileOutputFormat.setOutputPath(job, new Path(config.outputFile));
        job.waitForCompletion(config.displayVerboseMessage);
    }

    public static void main(String[] strArr) {
        OptionParser optionParser = new OptionParser(Config.class);
        try {
            optionParser.parse(strArr);
            Config config = (Config) optionParser.getOptionHolder();
            if (config.displayHelp) {
                optionParser.printUsage();
            } else {
                new SilkDistributedParser().execute(config);
            }
        } catch (Exception e) {
            _logger.error(e);
        }
    }
}
