Dear Sreekanth,

Hope you are doing well.

Here is an example of Images Duplicates Finder job.

Pre Processing job – generating the sequence file :

The target is a sequence file which has the filename as the key and the BytesWritable as the value. The input is a file contains all the image file as HDFS filenames. For example:


So our map / reduce job will include only a map that will read one file at a time and write it to a sequence file by using SequenceFileOutputFormat. It uses a FileSystem object in order to open the HDFS file and FSDataInputStream in order to read from it. The byte array is being written to the context as a bytewritable. Since the output format of the job is SequenceFileOutputFormat class, the output of the map is being written to a sequence file.

This is demonstrated in the which implements this pre process job.

Images Duplicates Finder job :

Now we have a sequence file with all the files binary data and it is time for the actual job that will filter the duplicates. We will use the MD5 algorithm to generate a unique key for each image and compare this key in order to find the duplicates. Our map / reduce job will include:

        A mapper that will read the binary image data of all the image files and will create MD5 presentation for each file. It will pass this data to the reducer where the key will be the MD5 string while the value will be the filename. Thus, all the identical images will be grouped together by the hadoop framework.

public void map(Text key, BytesWritable value, Context context) throws IOException,InterruptedException { //get the md5 for this specific file String md5Str; try { md5Str = calculateMd5(value.getBytes()); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); context.setStatus("Internal error - can't find the algorithm for calculating the md5"); return; } Text md5Text = new Text(md5Str);  //put the file in the map where the md5 is the key, so duplicates  //will be grouped together for the reduce function context.write(md5Text, key); } static String calculateMd5(byte[] imageData) throws NoSuchAlgorithmException { //get the md5 for this specific data MessageDigest md = MessageDigest.getInstance("MD5"); md.update(imageData); byte[] hash = md.digest(); // Below code of converting Byte Array to hex String hexString = new String(); for (int i=0; i < hash.length; i++) { hexString += Integer.toString( ( hash[i] & 0xff ) + 0x100, 16).substring( 1 ); } return hexString; }

A very simple reducer that will only take the first filename for each MD5 hash. That way there will be only a single filename for each identical image and all the duplicates are filtered. The output is a map file where the key is the filename and the value is the MD5 hash

public void reduce(Text key, Iterable<Text> values,
  Context context) throws IOException, InterruptedException {
   //Key here is the md5 hash while the values are all the image files that
 // are associated with it. for each md5 value we need to take only
 // one file (the first)
  Text imageFilePath = null;
  for (Text filePath : values) {
    imageFilePath = filePath;
    break;//only the first one
  // In the result file the key will be again the image file path.
  context.write(imageFilePath, key);

All this stuff is demonstrated in class which implements the duplicates remover job.

Please let me know if have any further issue regarding this.