Free Trial

Safari Books Online is a digital library providing on-demand subscription access to thousands of learning resources.

  • Create BookmarkCreate Bookmark
  • Create Note or TagCreate Note or Tag
  • PrintPrint
Share this Page URL
Help

4. Data Organization Patterns > Total Order Sorting

Total Order Sorting

Pattern Description

The total order sorting pattern is concerned with the order of the data from record to record.

Intent

You want to sort your data in parallel on a sort key.

Motivation

Sorting is easy in sequential programming. Sorting in MapReduce, or more generally in parallel, is not easy. This is because the typical “divide and conquer” approach is a bit harder to apply here.

Each individual reducer will sort its data by key, but unfortunately, this sorting is not global across all data. What we want to do here is a total order sorting where, if you concatenate the output files, the records are sorted. If we just concatenate the output of a simple MapReduce job, segments of the data will be sorted, but the whole set will not be.

Sorted data has a number of useful properties. Sorted by time, it can provide a timeline view on the data. Finding things in a sorted data set can be done with binary search instead of linear search. In the case of MapReduce, we know the upper and lower boundaries of each file by looking at the last and first records, respectively. This can be useful for finding records, as well, and is one of the primary characteristics of HBase. Some databases can bulk load data faster if the data is sorted on the primary key or index column.

There are countless more reasons to have sorted data from an application standpoint or follow-on system standpoint. However, having data sorted for use in MapReduce serves little purpose, so hopefully this expensive operation only has to be done sparingly.

Applicability

The main requirement here is pretty obvious: your sort key has to be comparable so the data can be ordered.

Structure

Total order sorting may be one of the more complicated patterns you’ll see. The reason this is that you first have to determine a set of partitions divided by ranges of values that will produce equal-sized subsets of data. These ranges will determine which reducer will sort which range of data. Then something similar to the partitioning pattern is run: a custom partitioner is used to partition data by the sort key. The lowest range of data goes to the first reducer, the next range goes to the second reducer, so on and so forth.

This pattern has two phases: an analyze phase that determines the ranges, and the order phase that actually sorts the data. The analyze phase is optional in some ways. You need to run it only once if the distribution of your data does not change quickly over time, because the value ranges it produces will continue to perform well. Also, in some cases, you may be able to guess the partitions yourself, especially if the data is evenly distributed. For example, if you are sorting comments by user ID, and you have a million users, you can assume that with a thousand reducers, each range is going to have a range of a thousand users. This is because comments by user ID should be spread out evenly and since you know the number of total users, you can divide that number by the number of reducers you want to use.

The analyze phase is a random sampling of the data. The partitions are then based on that random sample. The principle is that partitions that evely split the random sample should evenly split the larger data set well. The structure of the analyze step is as follows:

  • The mapper does a simple random sampling. When dividing records, it outputs the sort key as its output key so that the data will show up sorted at the reducer. We don’t care at all about the actual record, so we’ll just use a null value to save on space.

  • Ahead of time, determine the number of records in the total data set and figure out what percentage of records you’ll need to analyze to make a reasonable sample. For example, if you plan on running the order with a thousand reducers, sampling about a hundred thousand records should give nice, even partitions. Assuming you have a billion records, divide 100,000 by 1,000,000,000. This gives 0.0001, meaning .01% of the records should be run through the analyze phase.

  • Only one reducer will be used here. This will collect the sort keys together into a sorted list (they come in sorted, so that will be easy). Then, when all of them have been collected, the list of keys will be sliced into the data range boundaries.

The order phase is a relatively straightforward application of MapReduce that uses a custom partitioner. The structure of the order step is as follows:

  • The mapper extracts the sort key in the same way as the analyze step. However, this time the record itself is stored as the value instead of being ignored.

  • A custom partitioner is used that loads up the partition file. In Hadoop, you can use the TotalOrderPartitioner, which is built specifically for this purpose. It takes the data ranges from the partition file produced in the previous step and decides which reducer to send the data to.

  • The reducer’s job here is simple. The shuffle and sort take care of the heavy lifting. The reduce function simply takes the values that have come in and outputs them. The number of reducers needs to be equal to the number of partitions for the TotalOrderPartitioner to work properly.

Caution

Note that the number of ranges in the intermediate partition needs to be equal to the number of reducers in the order step. If you decide to change the number of reducers and you’ve been reusing the same file, you’ll need to rebuild it.

Tip

If you want to have a primary sort key and a secondary sort key, concatenate the keys, delimited by something. For example, if you want to sort by last name first, and city second, use a key that looks like Smith^Baltimore.

Caution

Using Text for nearly everything in Hadoop is very natural since that’s the format in which data is coming in. Be careful when sorting on numerical data, though! The string "10000" is less than than "9" if they are compared as strings, which is not what we want. Either pad the numbers with zeros or use a numerical data type.

Consequences

The output files will contain sorted data, and the output file names will be sorted such that the data is in a total sorting. In Hadoop, you’ll be able to issue hadoop fs -cat output/part-r-* and retrieve the data in a sorted manner.

Resemblances

SQL

Ordering in SQL is pretty easy!

SELECT * FROM data ORDER BY col1;
Pig

Ordering in Pig is syntactically pretty easy, but it’s a very expensive operation. Behind the scenes, it will run a multi-stage MapReduce job to first find the partitions, and then perform the actual sort.

c = ORDER b BY col1;

Performance analysis

This operation is expensive because you effectively have to load and parse the data twice: first to build the partition ranges, and then to actually sort the data.

The job that builds the partitions is straightforward and efficient since it has only one reducer and sends a minimal amount of data over the network. The output file is small, so writing it out is trivial. Also, you may only have to run this now and then, which will amortize the cost of building it over time.

The order step of the job has performance characteristics similar to the other data organization patterns, because it has to move all of the data over the network and write all of the data back out. Therefore, you should use a relatively large number of reducers.

Total Order Sorting Examples

Sort users by last visit

The user data in our StackOverflow data set is in the order of the account’s creation. Instead, we’d like to have the data ordered by the last time they have visited the site.

For this example, we have a special driver that runs both the analyze and order steps. Also, there are two sets of MapReduce jobs, one for analyze and one for order.

Driver code

Let’s break the driver down into two sections: building the partition list via sampling, then performing the sort.

The first section parses the input command line arguments and creates input and output variables from them. It creates path files to the partition list and the staging directory. The partition list is used by the TotalOrderPartitioner to make sure the key/value pairs are sorted properly. The staging directory is used to store intermediate output between the two jobs. There is nothing too special with the first job configuration. The main thing to note is that the first job is a map-only only job that uses a SequenceFileOutputFormat.

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Path inputPath = new Path(args[0]);
    Path partitionFile = new Path(args[1] + "_partitions.lst");
    Path outputStage = new Path(args[1] + "_staging");
    Path outputOrder = new Path(args[1]);

    // Configure job to prepare for sampling
    Job sampleJob = new Job(conf, "TotalOrderSortingStage");
    sampleJob.setJarByClass(TotalOrderSorting.class);

    // Use the mapper implementation with zero reduce tasks
    sampleJob.setMapperClass(LastAccessDateMapper.class);
    sampleJob.setNumReduceTasks(0);

    sampleJob.setOutputKeyClass(Text.class);
    sampleJob.setOutputValueClass(Text.class);

    TextInputFormat.setInputPaths(sampleJob, inputPath);

    // Set the output format to a sequence file
    sampleJob.setOutputFormatClass(SequenceFileOutputFormat.class);
    SequenceFileOutputFormat.setOutputPath(sampleJob, outputStage);

    // Submit the job and get completion code.
    int code = sampleJob.waitForCompletion(true) ? 0 : 1;

    ...

The second job uses the identity mapper and our reducer implementation. The input is the output from the first job, so we’ll use the identity mapper to output the key/value pairs as they are stored from the output. The job is configured to 10 reducers, but any reasonable number can be used. Next, the partition file is configured, even though we have not created it yet.

The next important line uses the InputSampler utility. This sampler writes the partition file by reading through the configured input directory of the job. Using the RandomSampler, it takes a configurable number of samples of the previous job’s output. This can be an expensive operation, as the entire output is read using this constructor. Another constructor of RandomSampler allows you to set the number of input splits that will be sampled. This will increase execution time, but you might not get as good a distribution.

After the partition file is written, the job is executed. The partition file and staging directory are then deleted, as they are no longer needed for this example.

Tip

If your data distribution is unlikely to change, it would be worthwhile to keep this partition file around. It can then be used over and over again for this job in the future as new data arrives on the system.

    ...

    if (code == 0) {
        Job orderJob = new Job(conf, "TotalOrderSortingStage");
        orderJob.setJarByClass(TotalOrderSorting.class);

        // Here, use the identity mapper to output the key/value pairs in
        // the SequenceFile
        orderJob.setMapperClass(Mapper.class);
        orderJob.setReducerClass(ValueReducer.class);

        // Set the number of reduce tasks to an appropriate number for the
        // amount of data being sorted
        orderJob.setNumReduceTasks(10);

        // Use Hadoop's TotalOrderPartitioner class
        orderJob.setPartitionerClass(TotalOrderPartitioner.class);

        // Set the partition file
        TotalOrderPartitioner.setPartitionFile(orderJob.getConfiguration(),
                partitionFile);

        orderJob.setOutputKeyClass(Text.class);
        orderJob.setOutputValueClass(Text.class);

        // Set the input to the previous job's output
        orderJob.setInputFormatClass(SequenceFileInputFormat.class);
        SequenceFileInputFormat.setInputPaths(orderJob, outputStage);

        // Set the output path to the command line parameter
        TextOutputFormat.setOutputPath(orderJob, outputOrder);

        // Set the separator to an empty string
        orderJob.getConfiguration().set(
                "mapred.textoutputformat.separator", "");

        // Use the InputSampler to go through the output of the previous
        // job, sample it, and create the partition file
        InputSampler.writePartitionFile(orderJob,
                new InputSampler.RandomSampler(.001, 10000));

        // Submit the job
        code = orderJob.waitForCompletion(true) ? 0 : 2;
    }

    // Clean up the partition file and the staging directory
    FileSystem.get(new Configuration()).delete(partitionFile, false);
    FileSystem.get(new Configuration()).delete(outputStage, true);

    System.exit(code);
}

Analyze mapper code

This mapper simply pulls the last access date for each user and sets it as the sort key for the record. The input value is output along with it. These key/value pairs, per our job configuration, are written to a SequenceFile that is used to create the partition list for the TotalOrderPartitioner. There is no reducer for this job.

public static class LastAccessDateMapper extends
        Mapper<Object, Text, Text, Text> {

    private Text outkey = new Text();

    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {

        Map<String, String> parsed = MRDPUtils.transformXmlToMap(value
                .toString());

        outkey.set(parsed.get("LastAccessDate"));
        context.write(outkey, value);
    }
}

Order mapper code

This job simply uses the identity mapper to take each input key/value pair and output them. No special configuration or implementation is needed.

Order reducer code

Because the TotalOrderPartitioner took care of all the sorting, all the reducer needs to do is output the values with a NullWritable object. This will produce a part file for this reducer that is sorted by last access date. The partitioner ensures that the concatenation of all these part files (in order) produces a totally ordered data set.

public static class ValueReducer extends
        Reducer<Text, Text, Text, NullWritable> {

    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        for (Text t : values) {
            context.write(t, NullWritable.get());
        }
    }
}
  • Safari Books Online
  • Create BookmarkCreate Bookmark
  • Create Note or TagCreate Note or Tag
  • PrintPrint