SaveAsBinaryFile with Spark (Part 2)

Content of this article

In Previous article, I wrote how to export a binary string as it is using Spark. There was a simpler way than exporting using mapPartitionsWithIndex. I couldn't export to HDFS with the previous method, This time I can export to HDFS and I think it's the "correct" way, so I hope you can think of the previous article as how to use mapPartitionsWithIndex.

Java code

This time too, we will do it in Java.

SaveBinary.java


import java.io.*;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;

public class SaveBinary {
    public static void main(String[] args) {
        JavaSparkContext sc = new JavaSparkContext(new SparkConf());

        //Make RDD of bytes appropriately
        ArrayList<byte[]> bytes = new ArrayList<>();
        for (int i=0; i<10; i++) {
            byte[] b = new byte[10];
            for (int j=0; j<10; j++) {
                b[j] = (byte)(i*10+j);
            }
            bytes.add(b);
        }
        JavaRDD<byte[]> rdd = sc.parallelize(bytes, 2);
        /*Image in RDD
            rdd[0] =  0,  1,  2,  3,  4,  5,  6,  7,  8,  9
            rdd[1] = 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
・ ・ ・
        */
        // byte[]To BytesWritable, use NullWritable as value to make JavaPairRDD
       rdd.mapToPair(x->new Tuple2<>(new BytesWritable(x),NullWritable.get()))
          //Specify BytesOutputFormat (self-made) and perform saveAsNewAPIHadoopFile
          .saveAsNewAPIHadoopFile("./out", BytesWritable.class, NullWritable.class, BytesOutputFormat.class);
    }
}

BytesOutputFormat.java


import java.io.*;
import java.util.*;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;

public class BytesOutputFormat extends SequenceFileOutputFormat<BytesWritable,NullWritable> {
    @Override
    public RecordWriter<BytesWritable,NullWritable> getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext job) throws IOException {
        //BytesOutputFormat just calls BytesRecordWriter
        BytesRecordWriter writer = new BytesRecordWriter(job);
        return writer;
    }
}

ByteRecordWriter.java


import java.io.*;
import java.util.*;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;

public class BytesRecordWriter extends RecordWriter<BytesWritable,NullWritable> {
    private boolean saveToHdfs_ = true;
    private OutputStream os_;
    public BytesRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext job) throws IOException {
        // SaveBinary.The save destination specified in java is stored here
        String outDir = job.getConfiguration().get("mapreduce.output.fileoutputformat.outputdir");
        String taskId = job.getTaskAttemptID().getTaskID().toString();
        //Even if HDFS or NFS is different, os_Just change the class
        if (saveToHdfs_) {
            FileSystem hdfs = FileSystem.get(job.getConfiguration());
            os_ = hdfs.create(new Path(outDir + "/part-" + taskId.substring(taskId.length()-6)));
        } else {
            os_ = new FileOutputStream(outDir + "/part-" + taskId.substring(taskId.length()-6));
        }
    }
    @Override
    public void write(BytesWritable key, NullWritable value) throws IOException {
        os_.write(key.getBytes(), 0, key.getLength());
    }

    @Override
    public void close(org.apache.hadoop.mapreduce.TaskAttemptContext context) throws IOException {
        os_.close();
        os_ = null;
    }
}

Commentary

JavaPairRDD has a Hadoop save method, so Save using saveAsNewAPIHadoopFile. (Some of them don't have "New API", but I don't really understand the difference. I think you should use the new one.)

Since saveAsNewAPIHadoopFile can specify OutputFormat, After that, create a RecordWriter according to the format you want to save, and specify the OutputFormat to pass it.

This area may be confusing for those who have never implemented Hadoop's Output Format, If you look at the above code, you can see what you are doing immediately. You just created an OutputStream and the write method is called for each element.

Output result

I will show the output result for the time being. This time, I purposely set partition to 2 and executed it, so there are two output files.

out/part-000000


00 01 02 03 04 05 06 07  08 09 0A 0B 0C 0D 0E 0F
10 11 12 13 14 15 16 17  18 19 1A 1B 1C 1D 1E 1F
20 21 22 23 24 25 26 27  28 29 2A 2B 2C 2D 2E 2F
30 31

out/part-000001


32 33 34 35 36 37 38 39  3A 3B 3C 3D 3E 3F 40 41
42 43 44 45 46 47 48 49  4A 4B 4C 4D 4E 4F 50 51
52 53 54 55 56 57 58 59  5A 5B 5C 5D 5E 5F 60 61
62 63

You can see that it was output without any extra data. As an aside, since the data is continuous at part-000000 and part-000001, If you cat these files, you will get the same file that was output as 1 partition.

Recommended Posts

SaveAsBinaryFile with Spark (Part 2)
SaveAsBinaryFile in Spark
Easy microservices with Spark Framework!
Bean mapping with MapStruct Part 3
Bean mapping with MapStruct Part 2
Java to learn with ramen [Part 1]
REST API test with REST Assured Part 2
Data linkage with Spark and Cassandra
Server processing with Java (Introduction part.1)
Reintroduction to Operators with RxJava Part 1
Getting Started with Java Starting from 0 Part 1
AWS Lambda with Java starting now Part 1
Extract a part of a string with Ruby
Install Docker with WSL2 Memo ([Part 2] Docker introduction)