My favorites | Sign in
Project Home Downloads Wiki Issues Source
READ-ONLY: This project has been archived. For more information see this post.
Search
for
ProgrammingGuideTutorial  
SSS Mapreduce Programming Guide - Tutorial
ja, en
Updated Feb 27, 2013

基本的なMapreduceアプリケーションの作成

まず、最初に基本的なMapreduceアプリケーションの作成をWordCountを例に説明します。 WordCountはテキストの単語毎の出現回数を数えるアプリケーションです。

SSS Mapreduceでは通常以下の3つのアプリケーションをセットで作成します。

  1. 入力データの設置
  2. 並列計算処理
  3. 結果の取得

それぞれの作成方法について説明を行います。

入力データの設置

SSS Mapreduceではストレージサーバー上のデータをMapreduceアルゴリズムで処理し、結果をストレージサーバーに出力します。 そのため、まず最初に入力データをストレージサーバ上に設置する必要があります。

以下のプログラムは、コマンドライン引数で指定されたファイルをストレージサーバーに書き込むものです。 このプログラムを例に入力データをストレージサーバーに設置する方法について説明を行います。

src/base/org/sss/mapreduce/example/TextPutter.java:

package org.sss.mapreduce.example;

import java.io.BufferedReader;
import java.io.FileReader;

import org.sss.client.DataPutter;
import org.sss.client.SssClient;
import org.sss.mapreduce.GroupID;
import org.sss.mapreduce.datatype.PackableInt;
import org.sss.mapreduce.datatype.PackableString;

public class TextPutter {
  public static void main(String[] args) throws Exception {
    SssClient client = new SssClient(args);

    String[] others = client.getArgs();
    String input = others[0];

    DataPutter<PackableInt, PackableString> putter =
      DataPutter.create(client, PackableInt.class, PackableString.class);
    GroupID gid = putter.getOutputGroupID();
    try {
      BufferedReader reader = new BufferedReader(new FileReader(input));
      try {
        int n = 1;
        String line;

        while ((line = reader.readLine()) != null) {
          putter.append(new PackableInt(n), new PackableString(line));

        }
      }
      finally {
        reader.close();
      }
    }
    finally {    
      putter.close();
    }
    System.out.println("output data deployed - " + gid);
  }
}

アプリケーションプログラムがSSS Mapreduceを操作するための起点となるのがSssClientクラスです。 そのため、まずのアプリケーションプログラムではSssClientのインスタンスを作成します。

  public static void main(String[] args) throws Exception {
    SssClient client = new SssClient(args);

SssClientのコンストラクタは引数としてmainメソッドに渡されるコマンドライン引数を要求します。 SssClientはコマンドライン引数の中で、「--sss」ではじまるものについてはSSS Mapreduceへのオプションとして解釈します。

アプリケーションプログラムでコマンドライン引数の内容を参照する場合には、 SssClient#getArgsメソッドを利用してコマンドライン引数からSSS Mapreduceへのオプションを除いたものを取得します。

    String[] others = client.getArgs();

TextPutterでは最初の引数を入力のテキストファイルとして扱います。

    String input = others[0];

アプリケーションプログラムからストレージサーバーへのデータの書き込みには、 DataPutterを利用します。

DataPutterには型変数として、キーとバリューの型を指定します。 また、インスタンスの生成にはコンストラクタではなく、DataPutter.createメソッドを利用します。

    DataPutter<PackableInt, PackableString> putter =
      DataPutter.create(client, PackableInt.class, PackableString.class);

ここではキーの型としてPackableInt、バリューの型としてPackableStringを指定しています。

次にDataPutterから書き込み先の「GroupID」を取得します。

    GroupID gid = putter.getOutputGroupID();

GroupIDはTupleGroupを指し示すための識別子です。 後ほど書き込んだデータを参照する際に利用します。

実際にTupleをストレージサーバーに書き込むにはDataPutter#appendメソッドを利用します。 以下のコードではファイルから1行ずつ読み込み、行番号と行の内容をそれぞれPackableIntとPackableStringでラップしてDataPutter#appendメソッドに渡しています。

    try {
      BufferedReader reader = new BufferedReader(new FileReader(input));
      try {
        int n = 1;
        String line;

        while ((line = reader.readLine()) != null) {
          putter.append(new PackableInt(n), new PackableString(line));

        }
      }
      finally {
        reader.close();
      }
    }

書き込みが完了したら、DataPutter#closeメソッドでDataPutterを閉じます。

    finally {    
      putter.close();
    }

最後に書き込み先のGroupIDを出力しています。

    System.out.println("output data deployed - " + gid);

この値は次の並列計算処理を行うアプリケーションで利用します。

並列計算処理

SSS MapreduceにてMapreduce計算モデルにしたがって並列計算を行う方法について説明を行います。 まず、WordCountの集計を行うプログラムの全体を以下に記します。

src/base/org/sss/mapreduce/example/WordCount.java:

package org.sss.mapreduce.example;

import java.util.StringTokenizer;

import org.sss.client.JobEngine;
import org.sss.client.SssClient;
import org.sss.mapreduce.GroupID;
import org.sss.mapreduce.Mapper;
import org.sss.mapreduce.Output;
import org.sss.mapreduce.Reducer;
import org.sss.mapreduce.SssException;
import org.sss.mapreduce.datatype.PackableInt;
import org.sss.mapreduce.datatype.PackableString;

public class WordCount {
  public static class WordCountMapper extends Mapper {
    public void map(Context context,
        PackableInt key, PackableString value,
        Output<PackableString, PackableInt> output) throws Exception {
      StringTokenizer it = new StringTokenizer(value.get());
      while (it.hasMoreTokens()) {
        output.write(new PackableString(it.nextToken()), new PackableInt(1));
      }
    }
  }

  public static class WordCountReducer extends Reducer {
    public void reduce(Context context,
        PackableString key, Iterable<PackableInt> values,
        Output<PackableString, PackableInt> output) throws Exception {
      int sum = 0;
      for (PackableInt v: values) {
        sum += v.get();
      }
      output.write(key, new PackableInt(sum));
    } 
  }

  public static void main(String[] args) throws SssException {
    SssClient client = new SssClient(args);

    String[] others = client.getArgs();
    GroupID input = GroupID.createFromString(others[0]);

    JobEngine engine = new JobEngine(client);
    try {
      GroupID mapOutput    = GroupID.createRandom(engine);
      GroupID reduceOutput = GroupID.createRandom(engine);

      engine.getJobBuilder("mapper", WordCountMapper.class)
        .addInput(input).addOutput(mapOutput, WordCountReducer.class).build();

      engine.getJobBuilder("reducer", WordCountReducer.class)
        .addInput(mapOutput).addOutput(reduceOutput).build();

      engine.exec();

      System.out.println("output data deployed - " + reduceOutput);
    } finally {
      engine.dispose();
    }
  }
}

SSS MapreduceではMapReduce計算モデルにしたがって計算を実行します。 そのため、Mapの際に実行する処理を実装したクラスとReducerの際に実行する処理を実装したクラスを定義します。

次のコードがMapの際に実行する処理を実装したクラスです。 Mapの際に実行する処理を実装したクラスを「Mapper」と呼びます。

  public static class WordCountMapper extends Mapper {
    public void map(Context context,
        PackableInt key, PackableString value,
        Output<PackableString, PackableInt> output) throws Exception {
      StringTokenizer it = new StringTokenizer(value.get());
      while (it.hasMoreTokens()) {
        output.write(new PackableString(it.nextToken()), new PackableInt(1));
      }
    }
  }

Mapperは以下の条件を満たす必要があります。

  • SSS MapreduceのMapperクラスを継承していること
  • ただひとつのmapメソッドを持つこと

また、入力と出力の型をmapメソッドのシグネチャから取得します。 SSS Mapreduceではmapメソッドの引数として以下のものを要求します。

  1. Mapper#Context
  2. 入力のキーの型
  3. 入力のバリューの型
  4. Output<出力のキーの型, 出力のバリューの型>

上記のコードでは、PackableStringのvalue変数から文字列を取得して単語毎に分割しています。 そして、分割した単語と出現頻度数として「1」をOutput#writeメソッドを使って次のフェーズへ渡しています。

次のコードがReduceの際に実行する処理を実装したクラスです。 Reduceの際に実行する処理を実装したクラスを「Reducer」と呼びます。

  public static class WordCountReducer extends Reducer {
    public void reduce(Context context,
        PackableString key, Iterable<PackableInt> values,
        Output<PackableString, PackableInt> output) throws Exception {
      int sum = 0;
      for (PackableInt v: values) {
        sum += v.get();
      }
      output.write(key, new PackableInt(sum));
    } 
  }

Reducerは以下の条件を満たす必要があります。

  • SSS MapreduceのReducerクラスを継承していること
  • ただひとつのreduceメソッドを持つこと

また、入力と出力の型をreduceメソッドのシグネチャから取得します。 SSS Mapreduceではreduceメソッドの引数として以下のものを要求します。

  1. Reducer#Context
  2. 入力のキーの型
  3. Iterable<入力のバリューの型>
  4. Output<出力のキーの型, 出力のバリューの型>

ReducerのvalueではMapperの出力したTupleのうち、キーが同じものについて一度の呼び出しで受け取ります。 上記のコードでは、キーはそのままに、バリューの値の総和を計算してOutput#writeメソッドを使って次のフェーズへ渡しています。

次にエントリーポイントの説明を行います。 エントリーポイントではまずTextPutterと同様にSssClientのインスタンスを作成します。

  public static void main(String[] args) throws SssException {
    SssClient client = new SssClient(args);

次にコマンドライン引数から入力のGroupIDを取得します。

    String[] others = client.getArgs();
    GroupID input = GroupID.createFromString(others[0]);

文字列からGroupIDを生成するにはGroupID.createFromStringメソッドを利用します。

SSS Mapreduceで並列計算の実行を行うにはJobEngineを利用します。 まず、インスタンスを生成します。

    JobEngine engine = new JobEngine(client);
    try {

次にMapperの出力用とReducerの出力用に新しいTupleGroupを用意します。

      GroupID mapOutput    = GroupID.createRandom(engine);
      GroupID reduceOutput = GroupID.createRandom(engine);

新しいTupleGroupの生成にはGroupID.createRandomメソッドを利用します。

JobEngineに対してMapperの実行条件を指定します。

      engine.getJobBuilder("mapper", WordCountMapper.class)
        .addInput(input).addOutput(mapOutput, WordCountReducer.class).build();

MapperやReducerではJobという単位で実行を管理するため、WordCountMapperを実行するJobを生成します。 JobはJobEngine#getJobBuilderメソッドで生成するJob.Builderを使って生成します。 JobEngine#getJobBuilderには、ジョブの名前と実行するMapper/Reducerのクラスオブジェクトを指定します。 ジョブの名前は実行完了後に生成されるログに利用されるため、わかりやすいものを指定してください。

次に、Job.Builder#addInputで入力を、Job.Builder#addOutputで出力先のGroupIDを指定します。 これらのメソッドはJob.Builder自身を返すため、連なって記述することができます。

また、Job.Builder#addOutputでWordCountReducer.classを指定しています。 これは「Combiner」の指定になります。ジョブの出力の部分集合を集約する処理を実行するクラスです。 WordCountReducerでは集約対象のTupleをすべて一度に集約した場合と、部分集合を集約した結果を更に集約した場合で同じ結果になります。 このような場合、Reducerの前段のジョブが生成する出力を直接ストレージサーバーに書き込むのではなく、メモリ上に収まる一定量毎に集約することで、ストレージサーバーへ書き込む量を減らして、処理を高速化することができます。 ここでは「Combiner」としてWordCountReducerを指定し、WordCountMapperの出力を一定量毎にWordCountReducerが集約してからmapOutputへ書き込むように指定しています。

入力/出力の条件を指定したあとにJob.Builder#build()を呼び出してJobを生成しています。 Jobは生成時に自動的にJobEngineに登録されるため、必要がなければユーザが変数などで保持する必要はありません。

JobEngineに対してReducerの実行条件を指定します。 SSS MapreduceではMapperとReducerでJobの生成方法に違いはありません。

      engine.getJobBuilder("reducer", WordCountReducer.class)
        .addInput(mapOutput).addOutput(reduceOutput).build();

JobEngine#execを呼び出すことで指示した条件で並列計算を実行します。

      engine.exec();

このメソッドは計算が完了するまで返りません。

JobEngineはジョブの実行順序を入力と出力の関係から決定します。 つまり、あるジョブAの出力が別のジョブBの入力になっている場合には、ジョブAがジョブBよりも先に実行されることが保証されます。 今回の場合は"mapper"の出力mapOutputが"reducer"の入力になっているため、"mapper"が"reducer"より先に実行されます。

Reducerの出力先を出力します。

      System.out.println("output data deployed - " + reduceOutput);

これは次の結果の出力を行う際に利用します。

最後にJobEngine#disposeメソッドを呼び、JobEngineの実行のためのサーバー側のリソースを解放します。

    } finally {
      engine.dispose();
    }

結果の取得

並列処理を行った結果を取得する方法について説明します。 以下のプログラムはWordCountの結果を取得し標準出力に書き出すものです。

src/base/org/sss/mapreduce/example/WordCountResultPrinter.java:

package org.sss.mapreduce.example;

import org.sss.client.DataGetter;
import org.sss.client.SssClient;
import org.sss.mapreduce.GroupID;
import org.sss.mapreduce.Tuple;
import org.sss.mapreduce.datatype.PackableInt;
import org.sss.mapreduce.datatype.PackableString;

public class WordCountResultPrinter {
  public static void main(String[] args) throws Exception {
    SssClient client = new SssClient(args);

    String[] others = client.getArgs();
    if (others.length < 1) {
      System.err.println("input group id is not specified.");
      System.exit(1);
    }

    GroupID table = GroupID.createFromString(others[0]);
    DataGetter<PackableString, PackableInt> dg = 
      DataGetter.create(client, PackableString.class, PackableInt.class, table);
    try {
      for (Tuple<PackableString, PackableInt> kv: dg) {
        System.out.format("%s -> %d\n", kv.getKey().get(), kv.getValue().get());
      }
    }
    catch (Throwable e) {
      System.err.println("caught an IOException while reading RawKeyValue.");
      e.printStackTrace();
    } finally {
      // don't forget this.
      dg.close();
    }
  }
}

TextPutter、 WordCountと同様にSssClientを生成します。

  public static void main(String[] args) throws Exception {
    SssClient client = new SssClient(args);

    String[] others = client.getArgs();

ストレージ上のデータを読み込むにはDataGetterクラスを利用します。 DataGetterクラスはキーとバリューの型を型変数として要求します。 また、コンストラクタではSssClientのインスタンス、キーとバリューの型のClassオブジェクト、読み込むTupleGroupの識別子を指定します。 今回は、キーとバリューの型はWordCountが出力と同じPackableIntとPackableIntを指定します。 読み込み元のTupleGroupはコマンドライン引数から取得したものを利用します。

    GroupID table = GroupID.createFromString(others[0]);
    DataGetter<PackableString, PackableInt> dg = 
      DataGetter.create(client, PackableString.class, PackableInt.class, table);

DataGetterはIterable<Tuple<キーの型, バリューの型>>を実装しているため、for文でそれぞれのTupleに順次アクセスすることができます。 Tupleクラスは単純にTupleを表現したもので、キーとバリューを格納するだけのクラスです。

    try {
      for (Tuple<PackableString, PackableInt> kv: dg) {
        System.out.format("%s -> %d\n", kv.getKey().get(), kv.getValue().get());
      }
    }

ここでは取得したTupleを単純に標準出力に書き出しています。

最後にDataGetterの利用しているリソースをDataGetter#closeメソッドで解放します。

    } finally {
      // don't forget this.
      dg.close();
    }

ビルド方法

SSS Mapreduceのアプリケーションをビルドするには以下のJarをクラスパスに含める必要があります。

  • ${MAPREDUCE_HOME}/mapreduce.jar
  • ${MAPREDUCE_HOME}/lib以下のjarファイル

実行方法

SSS Mapreduceのアプリケーションの実行は以下のスクリプトを利用します。

  • ${MAPREDUCE_HOME}/bin/run.sh

利用方法は以下の通りです。

run.sh JAR_FILES CLASS_NAME ARGS...

引数の意味は以下の通りです。

  • JAR_FILES
  • アプリケーションの実行に必要なJarファイルを指定します。複数のJarファイルを指定する場合には「:」で区切り指定します。
  • CLASS_NAME
  • 実行を開始するmainメソッドのあるクラスを指定します。
  • ARGS...
  • mainメソッドに渡される引数を指定します。

WordCountを実行する例を以下に示します。 なお、WordCountをコンパイルした結果は!mapreduce-example.jarに含まれているものとします。

$ cat HelloWorld.txt 
Hi, greeting from SSS client.
$ ${MAPREDUCE_HOME}/bin/run.sh ./mapreduce-example.jar org.sss.mapreduce.example.TextPutter HelloWorld.txt                                   
output data deployed - 19c0a55ea-9ed1-42e8-a820-b1c5b797b21c
$ ${MAPREDUCE_HOME}/bin/run.sh ./mapreduce-example.jar org.sss.mapreduce.example.WordCount  19c0a55ea-9ed1-42e8-a820-b1c5b797b21c
output data deployed - 171982401-426f-4ddc-aa59-606f26fff9de
$ ${MAPREDUCE_HOME}/bin/run.sh ./mapreduce-example.jar org.sss.mapreduce.example.WordCountResultPrinter 171982401-426f-4ddc-aa59-606f26fff9de
SSS -> 1
client. -> 1
Hi, -> 1
greeting -> 1
from -> 1
$
Powered by Google Project Hosting