Safari Books Online is a digital library providing on-demand subscription access to thousands of learning resources.
Hadoop comes with a useful set of Writable implementations that serve most
purposes; however, on occasion, you may need to write your own custom
implementation. With a custom Writable, you have full control over the
binary representation and the sort order. Because Writables are at the heart of the MapReduce
data path, tuning the binary representation can have a significant
effect on performance. The stock Writable implementations that come with
Hadoop are well-tuned, but for more elaborate structures, it is often
better to create a new Writable
type, rather than compose the stock types.
To demonstrate how to create a custom Writable, we shall write an implementation
that represents a pair of strings, called TextPair. The basic implementation is shown
in Example 4-7.
Example 4-7. A Writable implementation that stores a pair of Text objects
import java.io.*;
import org.apache.hadoop.io.*;
public class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second;
public TextPair() {
set(new Text(), new Text());
}
public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
set(first, second);
}
public void set(Text first, Text second) {
this.first = first;
this.second = second;
}
public Text getFirst() {
return first;
}
public Text getSecond() {
return second;
}
@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public int hashCode() {
return first.hashCode() * 163 + second.hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof TextPair) {
TextPair tp = (TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}
@Override
public String toString() {
return first + "\t" + second;
}
@Override
public int compareTo(TextPair tp) {
int cmp = first.compareTo(tp.first);
if (cmp != 0) {
return cmp;
}
return second.compareTo(tp.second);
}
}
The first part of the implementation is straightforward: there
are two Text instance variables,
first and second, and associated constructors,
getters, and setters. All Writable implementations must have a
default constructor so that the MapReduce framework can instantiate
them, then populate their fields by calling
readFields(). Writable instances are mutable
and often reused, so you should take care to avoid allocating objects
in the write() or
readFields() methods.
TextPair’s
write() method serializes each Text object in turn to the output stream, by
delegating to the Text objects
themselves. Similarly, readFields()
deserializes the bytes from the input stream by delegating to each
Text object. The DataOutput and DataInput interfaces have a
rich set of methods for serializing and deserializing Java primitives,
so, in general, you have complete control over the wire format of your
Writable object.
Just as you would for any value object you write in Java, you
should override the hashCode(),
equals(), and
toString() methods from java.lang.Object. The
hashCode() method is used by the HashPartitioner (the default partitioner in
MapReduce) to choose a reduce partition, so you should make sure that
you write a good hash function that mixes well to ensure reduce
partitions are of a similar size.
If you ever plan to use your custom Writable with TextOutputFormat, then you must implement
its toString() method. TextOutputFormat calls
toString() on keys and values for their
output representation. For TextPair, we write the underlying Text objects as strings separated by a tab
character.
TextPair is an implementation
of WritableComparable, so it
provides an implementation of the compareTo()
method that imposes the ordering you would expect: it sorts by the
first string followed by the second. Notice that TextPair differs from TextArrayWritable from the previous section
(apart from the number of Text
objects it can store), since TextArrayWritable is only a Writable, not a WritableComparable.
The code for TextPair in
Example 4-7 will work as it stands; however, there
is a further optimization we can make. As explained in WritableComparable and comparators, when TextPair is being used as a key in
MapReduce, it will have to be deserialized into an object for the
compareTo() method to be invoked. What if
it were possible to compare two TextPair objects just by looking at their
serialized representations?
It turns out that we can do this, since TextPair is the concatenation of two
Text objects, and the binary
representation of a Text object
is a variable-length integer containing the number of bytes in the
UTF-8 representation of the string, followed by the UTF-8 bytes
themselves. The trick is to read the initial length, so we know how
long the first Text object’s byte
representation is; then we can delegate to Text’s RawComparator, and invoke it with the
appropriate offsets for the first or second string. Example 4-8 gives the details (note that this
code is nested in the TextPair
class).
Example 4-8. A RawComparator for comparing TextPair byte representations
public static class Comparator extends WritableComparator {
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
public Comparator() {
super(TextPair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
try {
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
if (cmp != 0) {
return cmp;
}
return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1,
b2, s2 + firstL2, l2 - firstL2);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
}
static {
WritableComparator.define(TextPair.class, new Comparator());
}
We actually subclass WritableComparator rather than implement
RawComparator directly, since it
provides some convenience methods and default implementations. The
subtle part of this code is calculating firstL1 and firstL2, the lengths of the first Text field in each byte stream. Each is
made up of the length of the variable-length integer (returned by
decodeVIntSize() on WritableUtils), and the value it is
encoding (returned by readVInt()).
The static block registers the raw comparator so that whenever
MapReduce sees the TextPair
class, it knows to use the raw comparator as its default
comparator.
As we can see with TextPair, writing raw comparators takes
some care, since you have to deal with details at the byte level. It
is worth looking at some of the implementations of Writable in the org.apache.hadoop.io package for further
ideas, if you need to write your own. The utility methods on
WritableUtils are very handy
too.
Custom comparators should also be written to be RawComparators, if possible. These are
comparators that implement a different sort order to the natural
sort order defined by the default comparator. Example 4-9 shows a comparator for TextPair, called FirstComparator, that considers only the
first string of the pair. Note that we override the
compare() method that takes objects so both
compare() methods have the same semantics.
We will make use of this comparator in Chapter 8, when we look at joins and secondary sorting in MapReduce (see Joins).
Example 4-9. A custom RawComparator for comparing the first field of TextPair byte representations
public static class FirstComparator extends WritableComparator {
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
public FirstComparator() {
super(TextPair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
try {
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
if (a instanceof TextPair && b instanceof TextPair) {
return ((TextPair) a).first.compareTo(((TextPair) b).first);
}
return super.compare(a, b);
}
}