My favorites | Sign in
Project Home Downloads Wiki Issues Source
READ-ONLY: This project has been archived. For more information see this post.
Search
for
ProgrammingGuideTutorial  
SSS Mapreduce Programming Guide - Tutorial
en, ja
Updated Feb 27, 2013

How to write basic Mapreduce application

This chapter explains how to write basic Mapreduce application using WordCount as an example. WordCount is the application which counts number of words every word in text.

In SSS Mapreduce, the followings three applications are one set.

  1. Putting input data
  2. The processing parallely
  3. Getting the result

How to create each application is explained below.

Putting input data

SSS Mapreduce processes data in storage servers and writes result to storage servers. Therefore, first input data must be put on storage servers.

The following program writes the file specified by command-line arguments to storage servers. This section explans how to put input data to storage using this program as an example.

src/base/org/sss/mapreduce/example/TextPutter.java:

package org.sss.mapreduce.example;

import java.io.BufferedReader;
import java.io.FileReader;

import org.sss.client.DataPutter;
import org.sss.client.SssClient;
import org.sss.mapreduce.GroupID;
import org.sss.mapreduce.datatype.PackableInt;
import org.sss.mapreduce.datatype.PackableString;

public class TextPutter {
  public static void main(String[] args) throws Exception {
    SssClient client = new SssClient(args);

    String[] others = client.getArgs();
    String input = others[0];

    DataPutter<PackableInt, PackableString> putter =
      DataPutter.create(client, PackableInt.class, PackableString.class);
    GroupID gid = putter.getOutputGroupID();
    try {
      BufferedReader reader = new BufferedReader(new FileReader(input));
      try {
        int n = 1;
        String line;

        while ((line = reader.readLine()) != null) {
          putter.append(new PackableInt(n), new PackableString(line));

        }
      }
      finally {
        reader.close();
      }
    }
    finally {    
      putter.close();
    }
    System.out.println("output data deployed - " + gid);
  }
}

SssClient class is the starting point that an application program handles SSS Mapreduce. Thus first, the program need create SssClient.

  public static void main(String[] args) throws Exception {
    SssClient client = new SssClient(args);

The constructor of SssClient requires the command-line arguments passed to main method as the parameter. SssClient interprets strings started from "---sss" in command-line arguments as an option for SSS Mapreduce.

If you want to refer command-line arguments, get command-line arguments without options for SSS Mapreduce using SssClient#getArgs.

    String[] others = client.getArgs();

TextPutter uses first argument as an input text file.

    String input = others[0];

DataPutter is available to write the tuples to storage servers.

DataPutter requires key and value types as type arguments. And DataPutter.create method is available to create the new instance instead constructor.

    DataPutter<PackableInt, PackableString> putter =
      DataPutter.create(client, PackableInt.class, PackableString.class);

Above code specifies PackableString as key type and PackableString as value type.

Next, the application gets "GroupID" which show the output.

    GroupID gid = putter.getOutputGroupID();

"GroupID" is identifier to show TupleGroup. Laiter, it is used when the application refers the written data later. DataPutter#append method is available to write a tuple to storage servers actually.

The following code reads one line at a time and passes the line number and the line content wrapped by PackableInt and PackableString.

    try {
      BufferedReader reader = new BufferedReader(new FileReader(input));
      try {
        int n = 1;
        String line;

        while ((line = reader.readLine()) != null) {
          putter.append(new PackableInt(n), new PackableString(line));

        }
      }
      finally {
        reader.close();
      }
    }

After writing, it closes DataPutter using DataPutter#close method.

    finally {    
      putter.close();
    }

Finally, GroupID of the output is printed.

    System.out.println("output data deployed - " + gid);

This GroupID is used by the next application which processes the data parallely.

The processing parallely

This section explans how to calculate parallely by SSS Mapreduce computational model. First, the application to count number of words is shown below.

src/base/org/sss/mapreduce/example/WordCount.java:

package org.sss.mapreduce.example;

import java.util.StringTokenizer;

import org.sss.client.JobEngine;
import org.sss.client.SssClient;
import org.sss.mapreduce.GroupID;
import org.sss.mapreduce.Mapper;
import org.sss.mapreduce.Output;
import org.sss.mapreduce.Reducer;
import org.sss.mapreduce.SssException;
import org.sss.mapreduce.datatype.PackableInt;
import org.sss.mapreduce.datatype.PackableString;

public class WordCount {
  public static class WordCountMapper extends Mapper {
    public void map(Context context,
        PackableInt key, PackableString value,
        Output<PackableString, PackableInt> output) throws Exception {
      StringTokenizer it = new StringTokenizer(value.get());
      while (it.hasMoreTokens()) {
        output.write(new PackableString(it.nextToken()), new PackableInt(1));
      }
    }
  }

  public static class WordCountReducer extends Reducer {
    public void reduce(Context context,
        PackableString key, Iterable<PackableInt> values,
        Output<PackableString, PackableInt> output) throws Exception {
      int sum = 0;
      for (PackableInt v: values) {
        sum += v.get();
      }
      output.write(key, new PackableInt(sum));
    } 
  }

  public static void main(String[] args) throws SssException {
    SssClient client = new SssClient(args);

    String[] others = client.getArgs();
    GroupID input = GroupID.createFromString(others[0]);

    JobEngine engine = new JobEngine(client);
    try {
      GroupID mapOutput    = GroupID.createRandom(engine);
      GroupID reduceOutput = GroupID.createRandom(engine);

      engine.getJobBuilder("mapper", WordCountMapper.class)
        .addInput(input).addOutput(mapOutput, WordCountReducer.class).build();

      engine.getJobBuilder("reducer", WordCountReducer.class)
        .addInput(mapOutput).addOutput(reduceOutput).build();

      engine.exec();

      System.out.println("output data deployed - " + reduceOutput);
    } finally {
      engine.dispose();
    }
  }
}

SSS Mapper executes the calculation by MapReduce computational model. Therefore, The class implementing the executed processing on Map and the class implementing the executed processing on Reduce are defined.

The following s code is the class implementing the executed processing on Map. The class implementing the executed processing on Map is called "Mapper".

  public static class WordCountMapper extends Mapper {
    public void map(Context context,
        PackableInt key, PackableString value,
        Output<PackableString, PackableInt> output) throws Exception {
      StringTokenizer it = new StringTokenizer(value.get());
      while (it.hasMoreTokens()) {
        output.write(new PackableString(it.nextToken()), new PackableInt(1));
      }
    }
  }

"Mapper" must fulfill the following conditions.

  • Extending the class named "Mapper" of SSS Mapreduce.
  • Having only one the method named "map".

SSS Mapreduce gets the input and output types from signature of "map" method. SSS Mapreduce requires the followings as arguments of "map" method.

  1. Mapper#Context
  2. input key type
  3. input value type
  4. Output<output key type, output value type>

In about code, WordCountMapper get the string from "value" argument of PackableString type and splits the string every word. And it passes the splited word and "1" as number of words to next phase using Output#write method.

The following s code is the class implementing the executed processing on Reduce. The class implementing the executed processing on Map is called "Reducer".

  public static class WordCountReducer extends Reducer {
    public void reduce(Context context,
        PackableString key, Iterable<PackableInt> values,
        Output<PackableString, PackableInt> output) throws Exception {
      int sum = 0;
      for (PackableInt v: values) {
        sum += v.get();
      }
      output.write(key, new PackableInt(sum));
    } 
  }

"Reducer" must fulfill the following conditions.

  • Extending the class named "Reducer" of SSS Mapreduce.
  • Having only one the method named "reduce".

SSS Mapreduce gets the input and output types from signature of "reduce" method. SSS Mapreduce requires the followings as arguments of "reduce" method.

  1. Mapper#Context
  2. input key type
  3. Iterable<input value type>
  4. Output<output key type, output value type>

Reducer receives the tuples which have same key in Mapper outputs at a call. Above code passes the key as it is calculates sum of all values to next phase using Output#write method.

Next explans main method. First, main method creates SssClient instance line TextPutter.

  public static void main(String[] args) throws SssException {
    SssClient client = new SssClient(args);

Next, it gets input GroupID from command-line arguments.

    String[] others = client.getArgs();
    GroupID input = GroupID.createFromString(others[0]);

GroupID.createFromString is available to create GroupID from string.

JobEngine is available to invoke the parallel computing processing.

First, create new JobEngine instance.

    JobEngine engine = new JobEngine(client);
    try {

Next, create new TupleGroup for Mapper output and Reducer output.

      GroupID mapOutput    = GroupID.createRandom(engine);
      GroupID reduceOutput = GroupID.createRandom(engine);

GroupID.createRandom method is available to create new TupleGroup.

Specify the conditions of Mapper execution.

      engine.getJobBuilder("mapper", WordCountMapper.class)
        .addInput(input).addOutput(mapOutput, WordCountReducer.class).build();

SSS Mapreduce manages Mapper and Reducer using the unit called "Job". Therefore, above code creates the Job for invoking WordCountMapper. !Job.Builder created by JobEngine#getJobBuilder is available to create a Job. JobEngine#getJobBuilder requires the job name and the class object of Mapper or Reducer. Use an intelligible name as the job name because the job name is used in the log file which is generated after the job completion.

Next, specify the input using Job.Builder#addInput and the output using Job.Builder#addOutput. These methods returns Job.Builder itself, so can be written like a chain.

And above code specifies WordCountReducer.class to Job.Builder#addOutput. It is the specification of "Combiner". "Combiner" is the class which collects a subset of Job output. Using WordCountReducer, the result is the same by the case where the all target tuples are collected and the case where the subset is collected and the result further is collected. In this case, the processing is accelerable by reducing the data written to storage servers by collecting the data every size which rides on memory. In above code, by specifying WordCountReducer as "Combiner", SSS Mapreduce collects the output of WordCountMapper every the fixed size and writes to storage servers.

Create the Job after specifying the conditions of input and output using Job.Builder#build method. The Job has been registered automatically, so you need to store the Job in a variable etc if you does not use the job.

The following code specifies the conditions of Reducer execution. In SSS Mapreduce, The creation method of Mapper and Reducer is not different.

      engine.getJobBuilder("reducer", WordCountReducer.class)
        .addInput(mapOutput).addOutput(reduceOutput).build();
      engine.exec();

This executes the parallel computational processing using the specified conditions by JobEngine#exec method. This method has not been returned until the calculation has been completed.

JobEngine decides the execution sequence of the jobs from the relation of input and output. That is, when the output of a job A is the input of another job B, SSS Mapreduce guarantees the job A to execute before the job B. In WordCount, mapOutput which is the output of "mapper" is the input of "reducer", so "mapper" is executed before "reducer".

The followings code prints the output of Reducer.

      System.out.println("output data deployed - " + reduceOutput);

This is used when next phase to get the result.

Finally this releases the server side resource to execute JobEngine using JobEngine#dispose.

    } finally {
      engine.dispose();
    }

Getting the result

This section explains how to get the result of the parallel processing. The following program is to get the results of WordCount and write them to standard output.

src/base/org/sss/mapreduce/example/WordCountResultPrinter.java:

package org.sss.mapreduce.example;

import org.sss.client.DataGetter;
import org.sss.client.SssClient;
import org.sss.mapreduce.GroupID;
import org.sss.mapreduce.Tuple;
import org.sss.mapreduce.datatype.PackableInt;
import org.sss.mapreduce.datatype.PackableString;

public class WordCountResultPrinter {
  public static void main(String[] args) throws Exception {
    SssClient client = new SssClient(args);

    String[] others = client.getArgs();
    if (others.length < 1) {
      System.err.println("input group id is not specified.");
      System.exit(1);
    }

    GroupID table = GroupID.createFromString(others[0]);
    DataGetter<PackableString, PackableInt> dg = 
      DataGetter.create(client, PackableString.class, PackableInt.class, table);
    try {
      for (Tuple<PackableString, PackableInt> kv: dg) {
        System.out.format("%s -> %d\n", kv.getKey().get(), kv.getValue().get());
      }
    }
    catch (Throwable e) {
      System.err.println("caught an IOException while reading RawKeyValue.");
      e.printStackTrace();
    } finally {
      // don't forget this.
      dg.close();
    }
  }
}

First, main method creates SssClient instance line TextPutter.

  public static void main(String[] args) throws Exception {
    SssClient client = new SssClient(args);

    String[] others = client.getArgs();

DataGetter is available to read the tuples in the storage servers. DataGetter requires the type of key and value as type arguments. And Constructor of DataGetter requires the instance of SssClient, the class object of key and value, and an identifier of a TupleGroup which will be read.

WordCountResultPrinter uses first command-line argument as TupleGroup which will be read.

    GroupID table = GroupID.createFromString(others[0]);
    DataGetter<PackableString, PackableInt> dg = 
      DataGetter.create(client, PackableString.class, PackableInt.class, table);

DataGetter implements Iterable<Tuple<key type, value type>>, so you can access each tuple using for sentence. The Tuple class expresses Tuple simply and only stores a key and a value.

    try {
      for (Tuple<PackableString, PackableInt> kv: dg) {
        System.out.format("%s -> %d\n", kv.getKey().get(), kv.getValue().get());
      }
    }

This code writes the gotten tuple to standard output simply.

Finally, this releases the resource used by DataGetter, using DataGetter#close.

    } finally {
      // don't forget this.
      dg.close();
    }

How to build the application

The class path is required to contain the following jar files to compile a SSS Mapreduce application.

  • ${MAPREDUCE_HOME}/mapreduce.jar
  • jar files under ${MAPREDUCE_HOME}/lib

How to execute

The following script is available to execute a SSS Mapreduce application.

  • ${MAPREDUCE_HOME}/bin/run.sh

The usage of this script is shown below.

run.sh JAR_FILES CLASS_NAME ARGS...

The means of arguments are shown below.

  • JAR_FILES
  • This is the jar files which is required the execution of application. When two or more jar files is specified, they are splited by ":".
  • CLASS_NAME
  • This is name of the class to be invoked.
  • ARGS...
  • This is the arguments to pass main method.

The example which execute WordCount is shown below. In addition, the result of having compiled WordCount shall be contained in mapreduce-example.jar.

$ cat HelloWorld.txt 
Hi, greeting from SSS client.
$ ${MAPREDUCE_HOME}/bin/run.sh ./mapreduce-example.jar org.sss.mapreduce.example.TextPutter HelloWorld.txt                                   
output data deployed - 19c0a55ea-9ed1-42e8-a820-b1c5b797b21c
$ ${MAPREDUCE_HOME}/bin/run.sh ./mapreduce-example.jar org.sss.mapreduce.example.WordCount  19c0a55ea-9ed1-42e8-a820-b1c5b797b21c
output data deployed - 171982401-426f-4ddc-aa59-606f26fff9de
$ ${MAPREDUCE_HOME}/bin/run.sh ./mapreduce-example.jar org.sss.mapreduce.example.WordCountResultPrinter 171982401-426f-4ddc-aa59-606f26fff9de
SSS -> 1
client. -> 1
Hi, -> 1
greeting -> 1
from -> 1
$
Powered by Google Project Hosting