Introduction
Jaql has been designed to flexibly read and write data
from a variety of data stores and formats.
Input and output are handled through read/write expressions.
For example, hdfsRead, hdfsWrite
access
HDFS files,
hbaseRead, hbaseWrite
access HBase tables,
and localRead, localWrite
access locally stored files.
HDFS files and HBase tables can be partitioned and processed using Map/Reduce so
are examples of Jaql's Hadoop IO. Raw HDFS or standard
filesystem files where raw bytes are expected are examples of Jaql's stream-oriented IO.
We first go over a few examples, then discuss Hadoop IO, stream IO, how to customize
them, and finally, how to add your own data stores.
Reading and Writing JSON Data
Using Jaql, JSON data can be stored and retrieved from a variety of data sources including ordinary files. Jaql queries can take a collection as input and generate a new collection as output, where a collection corresponds to a JSON array. An example using an ordinary file to store a collection is as follows:
``` // Example 1. Write to a file named 'hey.dat'. localWrite('hey.dat', [{text: 'Hello World'}]);
// Read it back... localRead('hey.dat'); ```
Here, a single object with the 'Hello World' string is being written to a file called 'hey.dat' in the current directory. It is also possible to read and write JSON data to Hadoop HDFS files and HBase tables. We have made integration with Hadoop a priority by having all data types implement the WritableComparable interface. By integrating Jaql with HDFS and HBase, we are able to store JSON data and process it in parallel using Hadoop's map/reduce framework.
Examples using Hadoop are provided below. Our second example writes to an HDFS SequenceFile. A SequenceFile is a collection of key-value pairs. Jaql's hdfsWrite() function only writes data into the value field, leaving the key field empty. In example 2, the input data is represented as a literal, but in general the input can be an expression that is the result of a Jaql query.
``` // Example 2. Write to a Hadoop SequenceFile named: 'orders.dat'. hdfsWrite('orders.dat', [ {order: 1, cust: 'c1', items: [ {item: 1, qty: 2}, {item: 3, qty: 6}, {item: 5, qty: 10}]}, {order: 2, cust: 'c2', items: [ {item: 2, qty: 1}, {item: 5, qty: 2}, {item: 7, qty: 3}]}, {order: 3, cust: 'c1', items: [ {item: 1, qty: 2}, {item: 7, qty: 14}, {item: 5, qty: 10}]} ]);
// Read it back... hdfsRead('orders.dat'); ```
Our third example writes to an HBase table. An HBase table is a collection of records, with each record containing a primary key and a set of column name-value pairs. Column names in HBase are of type Text, while column values are simply byte arrays. HBase uses a two-part naming scheme for columns of the form 'column family:column'. In Jaql, we use a sharp '#' instead of a colon ':' to separate column families from columns, since the colon is already used as a separator in JSON. If a column family is not specified, a special 'default' column family is used, as in the following example.
``` // Example 3. Write to an HBase table named 'webcrawl'. hbaseWrite('webcrawl', [ {key: "www.cnn.com", page:'...', rank: 0.9, inlinks:[ {link: 'www.news.com', anchor: 'newsite'}, {link: 'www.jscript.com', anchor: 'look here'}]}, {key: "www.json.org", page:'...', rank: 0.8} ]);
// Read it back... hbaseRead('webcrawl'); ```
In hbaseWrite(), all objects in the input are written to a single HBase table. Each top-level JSON object is stored as an Hbase record with the specified key, and each name:value pair in the object is stored as a separate column-value pair in the record. Values are serialized as a byte array. Note that only the outermost name:value pairs in top-level objects are stored as separate columns in Hbase. Nested arrays and objects are serialized within these. In example 3, two HBase records are written. Each record is used to store the content, rank, and in-links (if any) of a web page.
Hadoop IO
In Hadoop's map-reduce, data is accessed using InputFormat and OutputFormat. Classes that implement these interfaces provide enough information to map-reduce so that the data can be partitioned and processed in parallel.
Jaql's I/O framework supports any Input(Output)Format to be plugged-in.
However, Input(Output)Formats work with Writables
while Jaql expects Items
.
Thus, the framework makes it easy to convert Writables
to and from Items
.
Default Input/Output Format
In order to make the discussion more concrete, lets look under the hood of the
hdfsRead('books.jqlb')
expression.
First, hdfsRead
is a macro
expression that
makes use of a more generic expression called read
.
The following is the actual read
expression that is invoked
when using hdfsRead
:
read('hdfs', 'books.jqlb');
The first argument to the read
expression is the name
associated with a type of data store. Just as names are associated to
function implementations in the function registry, names are
associated to data store types in the storage registry. The second
argument to read
is the path of a file stored in HDFS.
For the hdfs data store type, the registry entry specifies default Input(Output)Formats.
The defaults for Jaql are SequenceFileInputFormat
and SequenceFileOutputFormat
.
Text-based Input/Output Format
Suppose that the HDFS file is a text file (i.e, new-line delimitted records). The InputFormat to use in this case is TextInputFormat
. For this case, Jaql's default can be overriden as follows:
read('hdfs', 'books.jql', {format: 'org.apache.hadoop.mapred.TextInputFormat'});
The additional argument to read
is a record that specifies which class
will implement the InputFormat. More generally, this record can specify any options that
are specific to a give data store type. In the example above, a TextFile
's
records are Text
, not Jaql Items
(i.e., binary JSON) so are converted
by implementing a converter and specifying the converter as an option read
.
An example converter is implemented as follows:
public class FromJSONTxtConverter extends HadoopRecordToItem {
...
@Override
protected WritableToItem createValConverter() {
return new WritableToItem() {
public void convert(Writable src, Item tgt) {
// expect src is of type Text
// use a JSON parser to parse it
// set tgt
}
...
};
...
}
The FromJSONTxtConverter
takes as input a Writable
value and sets the
Item
to the parsed result. The following shows how to use it in read
:
read('hdfs', 'books.jql', {format : 'org.apache.hadoop.mapred.TextInputFormat',
converter : 'com.acme.extensions.data.FromJSONTxtConverter'});
The read
expression uses TextInputFormat
to read the file
specified at 'path' in HDFS. For each record retrieved from the file, it will use
FromJSONTxtConverter
to convert each Text Writable
to an Item
.
Using Custom File Formats
While extensible, the read
expression is cumbersome to specify in this manner.
There are several options to hide the details. The simplest is to define a function:
``` $myRead = fn($path) read('hdfs', $path, {format : 'org.apache.hadoop.mapred.TextInputFormat', converter : 'com.acme.extensions.data.FromJSONTxtConverter'});
$myRead('books.jql'); ```
Another option is to register a new data store type. This is done through a storage registry that maps a name to records that specify options for input and output. This is how Jaql keeps track of defaults for each data store. For example, the registry entry for 'hdfs' files is the following:
{type: 'hdfs',
inoptions : {adapter : 'com.ibm.impliance.jaql.DefaultHadoopInputAdapter',
format : 'org.apache.hadoop.mapred.SequenceFileInputFormat',
configurator : 'com.ibm.impliance.jaql.FileInputConfigurator'},
outoptions: {adapter : 'com.ibm.impliance.jaql.DefaultHadoopOutputAdapter',
format : 'org.apache.hadoop.mapred.SequenceFileOutputFormat',
configurator : 'com.ibm.impliance.jaql.FileOutputConfigurator'}};
A data store is named by (type:'hdfs') which is used by read
to find
associated options. There are two sets of options, one for input and one for output.
The default file format is a SequenceFile
whose records are key, value
pairs whose types are WritableComparable
and Writable
, respectively.
By default, Jaql ignores the key. Since a converter is not specified, Jaql assumes that the value's type is an Item
.
The 'hdfs' registry example includes additional options. The first is an Adapter
. This is the glue that brings together all other options for a data source and encapsulates how to access data and how to
convert the data to Items
(if needed). Thus, it produces an Item
iterator
for Jaql from any data store. For Hadoop data, the adapter to use is DefaultHadoopInput(Output)Adapter
.
The Hadoop adapter allows any existing Input(Output)Format to be swapped in along with any converter
(as shown by the earlier example). Another example of an adapter in Jaql is the StreamAdapter. It allows
access directly to byte stream data. Access to other data stores is possible by implementing the Adapter interface.
Using Custom File Formats in Map-Reduce
If an Input(Output)Format can be specified for a given data store, Hadoop's map-reduce can use it as an input(output). Accordingly, the Hadoop adapter informs the Jaql compiler that Hadoop's map-reduce can be used if appropriate. However, Input(Output)Formats may require specific configuration prior to submitting a map-reduce job. In Jaql, this is exposed through the "configurator" option. For example, an Input(Output)Format requires
that a file path be specified before the job is submitted. The com.ibm.impliance.jaql.FileInputConfigurator
does exactly this: the Adapter passes 'books.jql' and all options to the configurator, which then configures the job appropriately. For many HDFS files, com.ibm.impliance.jaql.FileInputConfigurator
is sufficient,
but if needed, it can be overriden.
Returning to the example, the new data store type is registered as follows:
registerAdapter({type : 'myHDFSFile',
inoptions: {adapter : 'com.ibm.jaql.io.hadoop.DefaultHadoopInputAdapter',
format : 'org.apache.hadoop.mapred.TextInputFormat',
converter : 'com.acme.extensions.data.FromJSONTxtConverter',
configurator : 'com.ibm.jaql.io.hadoop.FileInputConfigurator'}});
The Hadoop adapter and configurator are specified, along with the TextInputFormat
and custom converter. The newly registered data store is used as follows:
read('myHDFSFile', 'books.jql');
If a new data store type is very common, it may be convenient to define a Jaql function that hides some of the details:
``` $readMyFile = fn($name) read('myHDFSFile', $name);
$readMyFile('books.jql'); ```
In addition to HDFS files, Jaql supports HBase as a data source. This is supported by the same Hadoop adapter, but parameterized by Input(Output)Formats and configurators that are specific to HBase. For HBase, the 'path' represents a table name. Columns and column families can be specified as additional options.
Finally, in order to do something more interesting with these examples,
consider a query that will be rewritten to
map-reduce. The simplest example is a for-loop over a read
expression. Jaql translates such a query into the following map-reduce expression:
``` $q = for( $i in read('myHDFSFile', 'example.jql') ) [ {key: $i.publisher, ($i.title): $i.year} ];
hbaseWrite('mytable', $q);
// translates to: mapReduce({ input: {type: 'myHDFSFile', location: 'example.jql'}, output: {type: 'hbase' , location: 'mytable'}, map: fn($i) [ [null, {$i.key, $i.abc, $i.xyz}] ] }); ```
In this example, an HBase table is loaded in parallel with a projection from a JSON text formatted file. The mapReduce()
function specifies its input and output using records. Each record specifies the data store type, a location, and possibly additional options.
Example: Delimited File Formats
The previous examples showed how JSON binary data and JSON text data is processed using
JAQL and Hadoop's Input/Output format interfaces. However, JSON-formatted data is not required for JAQL. In this example, we show how a delimitted file can be processed using JAQL. As in the previous examples, an appropriate InputFormat is needed along with a converter that transforms an InputFormat record into JSON. Lets start with the following data that is stored in a file named delimited.dat
:
1,2,3
foo,bar,baz
a longer example of a string,followed by another followed by a number,42
First the file is loaded into HDFS:
hdfsShell("-copyFromLocal src/test/com/ibm/jaql/delimited.dat delimited.dat");
Next, an appropriate reader is defined by parameterizing hdfsRead
:
$dRead = fn($data)
hdfsRead($data, {format: "org.apache.hadoop.mapred.TextInputFormat",
converter: "com.acme.extensions.data.FromDelimitConverter"});
The format is a TextInputFormat
, as we've seen before and its converter is
FromDelimitConverter
whose core method is as follows:
``` public class FromDelimitConverter extends HadoopRecordToItem { ... @Override protected WritableToItem createValConverter() { return new WritableToItem() { public void convert(Writable src, Item tgt) { if (src == null || tgt == null) return; Text t = null; if (src instanceof Text) { t = (Text) src; } else { throw new RuntimeException("tried to convert from: " + src); }
String[] vals = new String(t.getBytes()).split(delimitter);
try {
if(header == null) {
1. setArray(vals, (FixedJArray)tgt.get()); } else { 2. setRecord(vals, header, (MemoryJRecord)tgt.get()); } } catch(Exception e) { throw new RuntimeException(e); } }
private void setRecord(String[] vals, JArray names, MemoryJRecord tgt) throws Exception
{
int n = (int) names.count();
if(n != vals.length)
throw new RuntimeException("values and header disagree in length: " + vals.length + "," + n);
for(int i = 0; i < n; i++) {
JString name = (JString) names.nth(i).get();
((JString)tgt.getRequired(name.toString()).getNonNull()).set(vals[i]);
}
}
private void setArray(String[] vals, FixedJArray tgt) {
tgt.clear();
int n = vals.length;
for(int i = 0; i < n; i++) {
tgt.add(new JString(vals[i])); // FIXME: memory
}
}
public Item createTarget()
{
if(header == null)
return new Item(new FixedJArray());
else {
int n = (int)header.count();
MemoryJRecord r = new MemoryJRecord(n);
try {
for(int i = 0; i < n; i++) {
r.add( (JString)header.nth(i).getNonNull(), new JString());
}
} catch(Exception e) { throw new RuntimeException(e);}
return new Item(r);
}
}
};
... ```
FromDelimitConverter
actually does a bit more than convert a delimited file
into JSON. It also permits an optional header file to be specified that is used
to name the fields present in a data file. Like any other read expression, dRead
returns a JSON array where each array element corresponds to a line from the input file.
If no header is specified, then each line is converted into a JSON array (line 1), possibly varying in arity from one line to the other. If a header is specified, then each line is converted into a JSON record whose names are specified in the header file (line 2). dRead
can be used to read delimited.dat
as follows:
jaql> $dRead("delimited.dat");
[
[
"1", "2", "3"
],
[
"foo", "bar", "baz"
],
[
"a longer example of a string","followed by another followed by a number","42"
]
]
To produce an array of records, the record field names can be specified using an
array of names. When the FromDelimitConverter
is parameterized by such an array,
it produces records instead of arrays. The following, slightly different read expression, can be used:
$dhRead = fn($data, $header)
hdfsRead($data, {format: "org.apache.hadoop.mapred.TextInputFormat", converter: "com.acme.extensions.data.FromDelimitConverter",
header: $header});
Using dhRead
, the data file, delimit.dat
, can be read as followed:
$dhRead("delimited.dat", ["a", "b", "c"]);
[
{
"a": "1", "b": "2", "c": "3"
},
{
"a": "foo", "b": "bar", "c": "baz"
},
{
"a": "a longer example of a string",
"b": "followed by another followed by a number",
"c": "42"
}
]
Alternatively, the schema could have been stored in a header file, delimited.hdr
:
a,b,c
and loaded into HDFS:
hdfsShell("-copyFromLocal build/test/com/ibm/jaql/delimited.hdr delimited.hdr");
Note that dRead
can be used to convert the header into an array:
jaql> $dRead("delimited.hdr");
[
[ "a", "b", "c" ]
]
This in turn can be used to read delimit.dat
using delimit.hdr
:
$dhRead("delimited.dat", $dRead("delimited.hdr")[0]);##
[
{
"a": "1", "b": "2", "c": "3"
},
{
"a": "foo", "b": "bar", "c": "baz"
},
{
"a": "a longer example of a string",
"b": "followed by another followed by a number",
"c": "42"
}
]
Stream IO
The Hadoop-based IO is useful when processing large data sets.
However, we expect that reading from an InputStream
or writing to an OutputStream
will also be needed when manipulating small data sets.
For this purpose, we provide an additional type of adapter: StreamAdapters.
StreamAdapters open an input or output stream given a URI.
For example, localRead, localWrite
and httpGet
expressions
are based on StreamAdapters. Just as Hadoop adapters allow for conversions between Writables
and Items
, Stream adapters also provide for converting between bytes and Items
.
For example, consider accessing a local file that is formatted as JSON text. The only class to implement is a converter that can borrow from the previous example:
``` public class FromJSONTxtConverter implements StreamToItem { ... public void setInputStream(InputStream in) { // set the input stream }
public boolean read(Item v) throws IOException {
// parse the input stream to get the next v
}
...
} ```
The new data source is registered and tested as follows:
localRead('books.jql', {format: 'com.acme.extensions.data.FromJSONTxtConverter'});
Finally, to work with external data sources, we recently added a httpGet() function that can retrieve JSON data from a URL. Below are two examples of httpGet() that get data from Freebase and Yahoo! Traffic (the latter requires you to supply an application id).
``` // Get albums recorded by "The Police" using Freebase. $artist = "The Police"; $freebase = httpGet('http://www.freebase.com/api/service/mqlread', { queries: serialize( { myquery: { query: [{ type: "/music/artist", name: $artist, album: [] }] } } ) })[0];
$freebase.myquery.result[**].album;
// result... [ "Outlandos d\'Amour", "Reggatta de Blanc", "Zenyatta Mondatta", "Ghost in the Machine", "Synchronicity", "Every Breath You Take: The Singles", "Greatest Hits", "Message in a Box: The Complete Recordings (disc 1)", "Message in a Box: The Complete Recordings (disc 2)", "Message in a Box: The Complete Recordings (disc 3)", "Message in a Box: The Complete Recordings (disc 4)", "Live! (disc 1: Orpheum WBCN/Boston Broadcast)", "Live! (disc 2: Atlanta/Synchronicity Concert)", "Every Breath You Take: The Classics", "Their Greatest Hits", "Can\'t Stand Losing You", "Roxanne \'97 (Puff Daddy remix)", "Roxanne \'97"];
// Get traffic incidents from Yahoo!. $appid = "YahooDemo"; // Set to your yahoo application ID $trafficData = httpGet('http://local.yahooapis.com/MapsService/V1/trafficData', { appid: $appid, street: "701 First Street", city: "Sunnyvale", state: "CA", output: "json" })[0];
$trafficData.ResultSet.Result[*].Title;
// result... [ "Road construction, on US-101 NB at FAIROAKS AVE TONBTO NB MATHILDA", "Road construction, on CA-85 SB at MOFFETT BLVD", "Road construction, on CA-237 EB at MATHILDA AVE TOEBTO FAIR OAKS AVE", "Road construction, on CA-237 WB at CROSSMAN AVE", "Road construction, on I-880 at GATEWAY BLVD"]; ```
Extending IO
Adapters can be extended in order to access data that are not suitable for Hadoop and Stream adapters. An example is access to relational databases, e.g., through a JDBC driver. The following lists the Adapter interface:
``` public interface StorableInputAdapter {
protected void initializeFrom(Item args);
public void open() throws IOException;
public abstract ItemReader getItemReader() throws IOException;
public void close() throws IOException;
} ```
The initializeFrom
method is used to bind-in arguments that are passed in from the expression
(e.g., 'path'
from myRead('path')
).
The open
sets up access to a data store whereas close
releases resources.
Finally, the Iter
consumes data from the data source and produces Items
as input to Jaql.