Safari Books Online is a digital library providing on-demand subscription access to thousands of learning resources.
The total order sorting pattern is concerned with the order of the data from record to record.
Sorting is easy in sequential programming. Sorting in MapReduce, or more generally in parallel, is not easy. This is because the typical “divide and conquer” approach is a bit harder to apply here.
Each individual reducer will sort its data by key, but unfortunately, this sorting is not global across all data. What we want to do here is a total order sorting where, if you concatenate the output files, the records are sorted. If we just concatenate the output of a simple MapReduce job, segments of the data will be sorted, but the whole set will not be.
Sorted data has a number of useful properties. Sorted by time, it can provide a timeline view on the data. Finding things in a sorted data set can be done with binary search instead of linear search. In the case of MapReduce, we know the upper and lower boundaries of each file by looking at the last and first records, respectively. This can be useful for finding records, as well, and is one of the primary characteristics of HBase. Some databases can bulk load data faster if the data is sorted on the primary key or index column.
There are countless more reasons to have sorted data from an application standpoint or follow-on system standpoint. However, having data sorted for use in MapReduce serves little purpose, so hopefully this expensive operation only has to be done sparingly.
The main requirement here is pretty obvious: your sort key has to be comparable so the data can be ordered.
Total order sorting may be one of the more complicated patterns you’ll see. The reason this is that you first have to determine a set of partitions divided by ranges of values that will produce equal-sized subsets of data. These ranges will determine which reducer will sort which range of data. Then something similar to the partitioning pattern is run: a custom partitioner is used to partition data by the sort key. The lowest range of data goes to the first reducer, the next range goes to the second reducer, so on and so forth.
This pattern has two phases: an analyze phase that determines the ranges, and the order phase that actually sorts the data. The analyze phase is optional in some ways. You need to run it only once if the distribution of your data does not change quickly over time, because the value ranges it produces will continue to perform well. Also, in some cases, you may be able to guess the partitions yourself, especially if the data is evenly distributed. For example, if you are sorting comments by user ID, and you have a million users, you can assume that with a thousand reducers, each range is going to have a range of a thousand users. This is because comments by user ID should be spread out evenly and since you know the number of total users, you can divide that number by the number of reducers you want to use.
The analyze phase is a random sampling of the data. The partitions are then based on that random sample. The principle is that partitions that evely split the random sample should evenly split the larger data set well. The structure of the analyze step is as follows:
The mapper does a simple random sampling. When dividing records, it outputs the sort key as its output key so that the data will show up sorted at the reducer. We don’t care at all about the actual record, so we’ll just use a null value to save on space.
Ahead of time, determine the number of records in the total data set and figure out what percentage of records you’ll need to analyze to make a reasonable sample. For example, if you plan on running the order with a thousand reducers, sampling about a hundred thousand records should give nice, even partitions. Assuming you have a billion records, divide 100,000 by 1,000,000,000. This gives 0.0001, meaning .01% of the records should be run through the analyze phase.
Only one reducer will be used here. This will collect the sort keys together into a sorted list (they come in sorted, so that will be easy). Then, when all of them have been collected, the list of keys will be sliced into the data range boundaries.
The order phase is a relatively straightforward application of MapReduce that uses a custom partitioner. The structure of the order step is as follows:
The mapper extracts the sort key in the same way as the analyze step. However, this time the record itself is stored as the value instead of being ignored.
A custom partitioner is used that loads up the partition
file. In Hadoop, you can use the TotalOrderPartitioner, which is built
specifically for this purpose. It takes the data ranges from the
partition file produced in the previous step and decides which
reducer to send the data to.
The reducer’s job here is simple. The shuffle and sort take
care of the heavy lifting. The reduce function simply takes the
values that have come in and outputs them. The number of reducers
needs to be equal to the number of partitions for the TotalOrderPartitioner to work
properly.
Note that the number of ranges in the intermediate partition needs to be equal to the number of reducers in the order step. If you decide to change the number of reducers and you’ve been reusing the same file, you’ll need to rebuild it.
If you want to have a primary sort key and a secondary sort
key, concatenate the keys, delimited by something. For example, if
you want to sort by last name first, and city second, use a key that
looks like Smith^Baltimore.
Using Text for nearly everything in Hadoop is very natural
since that’s the format in which data is coming in. Be careful when
sorting on numerical data, though! The string "10000" is less than than "9" if they are compared as strings, which
is not what we want. Either pad the numbers with zeros or use a
numerical data type.
The output files will contain sorted data, and the output file
names will be sorted such that the data is in a total sorting. In
Hadoop, you’ll be able to issue hadoop fs
-cat output/part-r-* and retrieve the data in a sorted
manner.
This operation is expensive because you effectively have to load and parse the data twice: first to build the partition ranges, and then to actually sort the data.
The job that builds the partitions is straightforward and efficient since it has only one reducer and sends a minimal amount of data over the network. The output file is small, so writing it out is trivial. Also, you may only have to run this now and then, which will amortize the cost of building it over time.
The order step of the job has performance characteristics similar to the other data organization patterns, because it has to move all of the data over the network and write all of the data back out. Therefore, you should use a relatively large number of reducers.
The user data in our StackOverflow data set is in the order of the account’s creation. Instead, we’d like to have the data ordered by the last time they have visited the site.
For this example, we have a special driver that runs both the analyze and order steps. Also, there are two sets of MapReduce jobs, one for analyze and one for order.
Let’s break the driver down into two sections: building the partition list via sampling, then performing the sort.
The first section parses the input command line arguments and
creates input and output variables from them. It creates path files
to the partition list and the staging directory. The partition list
is used by the TotalOrderPartitioner to make sure the
key/value pairs are sorted properly. The staging directory is used
to store intermediate output between the two jobs. There is nothing
too special with the first job configuration. The main thing to note
is that the first job is a map-only only job that uses a SequenceFileOutputFormat.
publicstaticvoidmain(String[]args)throwsException{Configurationconf=newConfiguration();PathinputPath=newPath(args[0]);PathpartitionFile=newPath(args[1]+"_partitions.lst");PathoutputStage=newPath(args[1]+"_staging");PathoutputOrder=newPath(args[1]);// Configure job to prepare for samplingJobsampleJob=newJob(conf,"TotalOrderSortingStage");sampleJob.setJarByClass(TotalOrderSorting.class);// Use the mapper implementation with zero reduce taskssampleJob.setMapperClass(LastAccessDateMapper.class);sampleJob.setNumReduceTasks(0);sampleJob.setOutputKeyClass(Text.class);sampleJob.setOutputValueClass(Text.class);TextInputFormat.setInputPaths(sampleJob,inputPath);// Set the output format to a sequence filesampleJob.setOutputFormatClass(SequenceFileOutputFormat.class);SequenceFileOutputFormat.setOutputPath(sampleJob,outputStage);// Submit the job and get completion code.intcode=sampleJob.waitForCompletion(true)?0:1;...
The second job uses the identity mapper and our reducer implementation. The input is the output from the first job, so we’ll use the identity mapper to output the key/value pairs as they are stored from the output. The job is configured to 10 reducers, but any reasonable number can be used. Next, the partition file is configured, even though we have not created it yet.
The next important line uses the InputSampler
utility. This sampler writes the partition file by reading through
the configured input directory of the job. Using the RandomSampler,
it takes a configurable number of samples of the previous job’s
output. This can be an expensive operation, as the entire output is
read using this constructor. Another constructor of RandomSampler allows you to set the number
of input splits that will be sampled. This will increase execution
time, but you might not get as good a distribution.
After the partition file is written, the job is executed. The partition file and staging directory are then deleted, as they are no longer needed for this example.
If your data distribution is unlikely to change, it would be worthwhile to keep this partition file around. It can then be used over and over again for this job in the future as new data arrives on the system.
...if(code==0){JoborderJob=newJob(conf,"TotalOrderSortingStage");orderJob.setJarByClass(TotalOrderSorting.class);// Here, use the identity mapper to output the key/value pairs in// the SequenceFileorderJob.setMapperClass(Mapper.class);orderJob.setReducerClass(ValueReducer.class);// Set the number of reduce tasks to an appropriate number for the// amount of data being sortedorderJob.setNumReduceTasks(10);// Use Hadoop's TotalOrderPartitioner classorderJob.setPartitionerClass(TotalOrderPartitioner.class);// Set the partition fileTotalOrderPartitioner.setPartitionFile(orderJob.getConfiguration(),partitionFile);orderJob.setOutputKeyClass(Text.class);orderJob.setOutputValueClass(Text.class);// Set the input to the previous job's outputorderJob.setInputFormatClass(SequenceFileInputFormat.class);SequenceFileInputFormat.setInputPaths(orderJob,outputStage);// Set the output path to the command line parameterTextOutputFormat.setOutputPath(orderJob,outputOrder);// Set the separator to an empty stringorderJob.getConfiguration().set("mapred.textoutputformat.separator","");// Use the InputSampler to go through the output of the previous// job, sample it, and create the partition fileInputSampler.writePartitionFile(orderJob,newInputSampler.RandomSampler(.001,10000));// Submit the jobcode=orderJob.waitForCompletion(true)?0:2;}// Clean up the partition file and the staging directoryFileSystem.get(newConfiguration()).delete(partitionFile,false);FileSystem.get(newConfiguration()).delete(outputStage,true);System.exit(code);}
This mapper simply pulls the last access date for each user
and sets it as the sort key for the record. The input value is
output along with it. These key/value pairs, per our job
configuration, are written to a SequenceFile that
is used to create the partition list for the TotalOrderPartitioner. There is no reducer
for this job.
publicstaticclassLastAccessDateMapperextendsMapper<Object,Text,Text,Text>{privateTextoutkey=newText();publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException{Map<String,String>parsed=MRDPUtils.transformXmlToMap(value.toString());outkey.set(parsed.get("LastAccessDate"));context.write(outkey,value);}}
This job simply uses the identity mapper to take each input key/value pair and output them. No special configuration or implementation is needed.
Because the TotalOrderPartitioner took care of all the
sorting, all the reducer needs to do is output the values with a
NullWritable object. This will produce a part file for this reducer that is
sorted by last access date. The partitioner ensures that the
concatenation of all these part files (in order) produces a totally
ordered data set.
publicstaticclassValueReducerextendsReducer<Text,Text,Text,NullWritable>{publicvoidreduce(Textkey,Iterable<Text>values,Contextcontext)throwsIOException,InterruptedException{for(Textt:values){context.write(t,NullWritable.get());}}}