|
|
1. Introduction
2.1 Outline
2.2 TableInputFormat
2.3 Task context
3. Compiling MapReduce support
3.1 Prerequisites
3.2 Building Hadoop
3.3 Building MapReduce support
4. Running example MapReduce task
4.1 Sample table creation
4.2 Configure MapReduce job
4.3 Run the job
5. Configuration options reference
Introduction
A word about mapreduce
The reader is assumed to have a prior knowledge about the mapreduce algorithm. The details are located at http://labs.google.com/papers/mapreduce-osdi04.pdf
Filesystem support for MapReduce and Hypertable
The mapreduce (MR) is currently supported by the Hadoop DFS only.
Design
Outline
The mapreduce connector consist of 2 separate parts:
- Java classes implementing required interfaces (the requirements stem from Hadoop's MR architecture)
- C++ Pipes interface
MapReduce can currently operate on single tables only. It achieves that in several steps:
- Creating TableInputFormat class
- Passing task context to the C++ Pipes
TableInputFormat
The TableInputFormat class implements InputFormat interface for describing the data source.It does that by keeping table name and tablet ranges and their corresponding network locations, i.e. ip addresses of servers hosting them. This information is used to construct TableSplit objects describing particular tablets which get passed along with the task context information to the map() function.
Context
Hadoop's mapreduce can operate in 3 different ways:
- utilizing native Java code
- using streaming plugin supporting text only data
- using Pipes API
The current choice is to use Pipes API as it allows to quickly prototype the code and test the ideas quickly and by no means is a final solution (for more information please reference Future Work section).
The Pipes API receives the task context and an associated TableSplit ,created by the Java counter part of the connector, and starts running. It can accept input from a pipe (hence the name), which works conceptually the same way as the unix "|" (pipe) operator, or by utilizing custom RecordReaders. The current choice is to utilize custom C++ RecordReader so that the information contained inside the tables can be retrieved. Both ways require approach specific configuration.
The map function receives single consecutive cells so it is up to the person implementing map function to account for that and handle row boundary mapping. The columns to be passed to the map function can be configured in the job configuration XML file (please refer to configuration section)
Compilation
Prerequisites
In order to compile MapReduce support for Hypertable you have to download Java JDK and fulfill usual requirements for the Hypertable.
Building
Building Hadoop
The first step is to prepare hadoop pipes and hadoop utils libraries. You can do this as follows (note: HADOOP_SRC_DIR is the path to the Hadoop's source code directory)
$ cd HADOOP_SRC_DIR/src/c++/utils $ sh configure $ make && make install # no need for root, this is local install only
When compiling hadoopUtils library please make sure you add "-fPIC" code somewhere in the makefile. This is a bug in hadoop. The pipes library has PIC enabled by default.
WARNING: compiling with CXXFLAGS="-fPIC" make won't work as the makefile doesn't append but rather overwrites compile flags.
$ cd HADOOP_SRC_DIR/src/c++/pipes $ sh configure $ make && make install
Once these libraries are compiled, you have to add their paths to the ld.so.conf file. For example on Ubuntu you have to do the following:
# echo "$HADOOP_SRC_DIR/src/c++/install/lib" > /etc/ld.so.conf.d/hadoop.conf
Apply the following patch to the hadoop Pipes source code
--- src/java/org/apache/hadoop/mapred/pipes/Submitter.java.orig 2008-05-15 09:20:16.000000000 +0200
+++ src/java/org/apache/hadoop/mapred/pipes/Submitter.java 2008-07-23 13:01:58.000000000 +0200
@@ -364,6 +364,14 @@
}
if (results.hasOption("-jar")) {
conf.setJar((String) results.getValue("-jar"));
+ // if they gave us a jar file, include it into the class path
+ String jarFile = conf.getJar();
+ if (jarFile != null) {
+ ClassLoader loader =
+ new URLClassLoader(new URL[]{ FileSystem.getLocal(conf).
+ pathToFile(new Path(jarFile)).toURL()});
+ conf.setClassLoader(loader);
+ }
}
if (results.hasOption("-inputformat")) {
setIsJavaRecordReader(conf, true);
@@ -406,14 +414,6 @@
conf.set(keyValSplit[0], keyValSplit[1]);
}
}
- // if they gave us a jar file, include it into the class path
- String jarFile = conf.getJar();
- if (jarFile != null) {
- ClassLoader loader =
- new URLClassLoader(new URL[]{ FileSystem.getLocal(conf).
- pathToFile(new Path(jarFile)).toURL()});
- conf.setClassLoader(loader);
- }
submitJob(conf);
} catch (OptionException oe) {
cli.printUsage();and then compile Hadoop by issuing
HADOOP_SRC_DIR/$ ant compile && ant jar
This will compile the Hadoop with corrected code which will load custom classes at appropriate moment so that it can read custom InputFormat implementation. Replace the hadoop-core-*.jar archive with build/hadoop-core-*-dev.jar one. This will replace the old jar file with a new patched code.
Add the following property to your hadoop-site.xml or hadoop-default.xml (depending on your needs)
<property> <name>hypertable.config.path</name> <value>/absolute/path/to/your/hypertable/config</value> </property>
Start hadoop as usually.
Building Hypertable
To build Hypertable with MapReduce support enabled you have to know the location of your Hadoop source dir (let's call it HADOOP_SRC_DIR) and location of Hadoop's pipes and utils libraries
Let's proceed by
$ cd HYPERTABLE_SRC_DIR $ ant jar # do not forget this step !!! $ cmake -DBUILD_SHARED_LIBS=ON -DHADOOP_INCLUDE_PATH=$HADOOP_SRC_DIR/src/c++/install/include/hadoop/ -DHADOOP_LIB_PATH=$HADOOP_SRC_DIR/src/c++/install/lib/ . $ make && sudo make install
NOTE: In case of JNI headers not being found run the cmake program with following flags
-DJAVA_INCLUDE_PATH=$JAVA_HOME/include -DJAVA_INCLUDE_PATH2=$JAVA_HOME/include/linux # set it to the same place as JAVA_INCLUDE_PATH if you don't run Linux
After the compilation copy the contrib/cc/MapReduce/libMapReduceJNI.so to the Hadoop's lib/native/<your OS>-<your architecture>/ directory (This will get automated in the near future).
Once you've done this we can advance to the next section.
Running MapReduce
Start hypertable as usually before proceeding.
Table Creation
Create some table and populate it with some data. Alternatively you can select existing table for this purpose. Please note the table layout. You can check the layout of the table by issuing
hypertable> show create table TableName;
For the sake of the example let's assume the table is called MyTable_ and it has the following column families
Configure MapReduce job
Put the HYPERTABLE_SRC_DIR/contrib/cc/MapReduce/test/mapredjob file to the HDFS by typing
hadoop -dfs mkdir /mapreduce/ hadoop -dfs mkdir /mapreduce/applications hadoop -dfs put contrib/cc/MapReduce/test/mapredjob /mapreduce/applications/MyJob
Create the XML file called MyJob.xml and paste the following data into it
<?xml version="1.0"?> <configuration> <property> <name>mapred.reduce.tasks</name> <value>2</value> </property> <property> <name>hadoop.pipes.executable</name> <value>/mapreduce/applications/MyJob#MyJob</value> </property> <property> <name>hadoop.pipes.java.recordreader</name> <value>false</value> </property> <property> <name>hadoop.pipes.java.mapper</name> <value>false</value> </property> <property> <name>hadoop.pipes.java.reducer</name> <value>false</value> </property> <property> <name>keep.failed.task.files</name> <value>true</value> </property> <property> <name>hypertable.table.name</name> <value>MyTable</value> </property> <property> <name>hypertable.table.columns.all</name> <value>false</value> </property> <property> <name>hypertable.table.columns</name> <value>Name</value> </property> </configuration>
The above configuration file will tell the MapReduce framework to open the table called "MyTable" and read all cells from it which are in the Name column family. This information will in turn be passed to the Map part of the framework. It also tells the MapReduce framework where to find the binary which will perform all of this work.
Run the job
Once you have uploaded the mapred file to the HDFS and configured the XML file, you are ready to test the first sample mapreduce task.This task counts the occurrences of all cells in the column family and outputs it to the standard output which will end up in report file. (For the Hypertable sink you have to write your own table writer as it is not possible to write a general one. Sorry)
To run the job type
hadoop pipes -conf MyJob.xml \ -program /mapreduce/applications/MyJob \ -input MyTable \ -inputformat org.hypertable.mapreduce.TableInputFormat \ -output <output file if any> \ -jobconf hadoop.pipes.java.recordreader=false,hadoop.pipes.java.recordwriter=false \ -jar <path to hypertable-0.9.0.7.jar>
This should launch the example MapReduce task.
Configuration Options
Below you can find a list of configuration options for the Hypertable mapreduce job configuration files
- hypertable.table.name - sets the name of the source table
- hypertable.table.columns - sets the column families to fetch from the table. the column names are comma separated strings.
- hypertable.table.columns.all - boolean flag accepting "true" or "false" values telling if the source table should feed every possible column to the map() function
Future Work
These are things that need to be done in order to have a fully-featured Hypertable support
- create TableOutputFormat class in Java
- create C++ RecordWriter
- integrate with thrift broker once it is ready
- fully automate Hypertable+Hadoop setup (esp. JNI library)
- add the hadoop fix to the hypertable source tree
Sign in to add a comment
