Free Trial

Safari Books Online is a digital library providing on-demand subscription access to thousands of learning resources.

  • Create BookmarkCreate Bookmark
  • Create Note or TagCreate Note or Tag
  • PrintPrint
Share this Page URL
Help

4. Data Organization Patterns > Structured to Hierarchical

Structured to Hierarchical

Pattern Description

The structured to hierarchical pattern creates new records from data that started in a very different structure. Because of its importance, this pattern in many ways stands alone in the chapter.

Intent

Transform your row-based data to a hierarchical format, such as JSON or XML.

Motivation

When migrating data from an RDBMS to a Hadoop system, one of the first things you should consider doing is reformatting your data into a more conducive structure. Since Hadoop doesn’t care what format your data is in, you should take advantage of hierarchical data to avoid doing joins.

For example, our StackOverflow data contains a table about comments, a table about posts, etc. It is pretty obvious that the data is stored in an normalized SQL database. When you visit a post on StackOverflow, all the different pieces need to be coalesced into one view. This gets even more complicated when you are trying to do analytics at the level of individual posts. Imagine trying to correlate the length of the post with the length of the comments. This requires you to first do a join, an expensive operation, then extract the data that allows you to do your real work. If instead you group the data by post so that the comments are colocated with the posts and the edit revisions (i.e., denormalizing the tables), this type of analysis will be much easier and more intuitive. Keeping the data in a normalized form in this case serves little purpose.

Unfortunately, data doesn’t always come grouped together. When someone posts an answer to a StackOverflow question, Hadoop can’t insert that record into the hierarchy immediately. Therefore, creating the denormalized records for MapReduce has to be done in a batch fashion periodically.

Another way to deal with a steady stream of updates is HBase. HBase is able to store data in a semi-structured and hierarchical fashion well. MongoDB would also be a good candidate for storing this type of data.

Applicability

The following should be true for this pattern to be appropriate:

  • You have data sources that are linked by some set of foreign keys.

  • Your data is structured and row-based.

Structure

Figure 4-1 shows the structure for this pattern. The description of each component is as follows:

  • If you wish to combine multiple data sources into a hierarchical data structure, a Hadoop class called MultipleInputs from org.apache.hadoop.mapreduce.lib.input is extremely valuable. MultipleInputs allows you to specify different input paths and different mapper classes for each input. The configuration is done in the driver. If you are loading data from only one source in this pattern, you don’t need this step.

  • The mappers load the data and parse the records into one cohesive format so that your work in the reducers is easier. The output key should reflect how you want to identify the root of each hierarchical record. For example, in our StackOverflow example, the root would be the post ID. You also need to give each piece of data some context about its source. You need to identify whether this output record is a post or a comment. To do this, you can simply concatenate some sort of label to the output value text.

  • In general, a combiner isn’t going to help you too much here. You could hypothetically group items with the same key and send them over together, but this has no major compression gains since all you would be doing is concatenating strings, so the size of the resulting string would be the same as the inputs.

  • The reducer receives the data from all the different sources key by key. All of the data for a particular grouping is going to be provided for you in one iterator, so all that is left for you to do is build the hierarchical data structure from the list of data items. With XML or JSON, you’ll build a single object and then write it out as output. The examples in this section show XML, which provides several convenient methods for constructing data structures. If you are using some other format, such as a custom format, you’ll just need to use the proper object building and serialization methods.

The structure of the structured to hierarchical pattern

Figure 4-1. The structure of the structured to hierarchical pattern

Consequences

The output will be in a hierarchical form, grouped by the key that you specified.

However, be careful that many formats such as XML and JSON have some sort of top-level root element that encompasses all of the records. If you actually need the document to be well-formed top-to-bottom, it’s usually easier to add this header and footer text as some post-processing step.

Known uses

Pre-joining data

Data arrives in disjointed structured data sets, and for analytical purposes it would be easier to bring the data together into more complex objects. By doing this, you are setting up your data to take advantage of the NoSQL model of analysis.

Preparing data for HBase or MongoDB

HBase is a natural way to store this data, so you can use this method to bring the data together in preparation for loading into HBase or MongoDB. Creating a new table and then executing a bulk import via MapReduce is particularly effective. The alternative is to do several rounds of inserts, which might be less efficient.

Resemblances

SQL

It’s rare that you would want to do something like this in a relational database, since storing data in this way is not conducive to analysis with SQL. However, the way you would solve a similar problem in an RDBMS is to join the data and then perform analysis on the result.

Pig

Pig has reasonable support for hierarchical data structures. You can have hierarchical bags and tuples, which make it easy to represent hierarchical structures and lists of objects in a single record. The COGROUP method in Pig does a great job of bringing data together while preserving the original structure. However, using the predefined keywords to do any sort of real analysis on a complex record is more challenging out of the box. For this, a user-defined function is the right way to go. Basically, you would use Pig to build and group the records, then a UDF to make sense of the data.

data_a = LOAD '/data/comments/' AS PigStorage('|');
data_b = LOAD '/data/posts/' AS PigStorage(',');

grouped = COGROUP data_a BY $2, data_b BY $1;

analyzed = FOREACH grouped GENERATE udfs.analyze(group, $1, $2);

...

Performance analysis

There are two performance concerns that you need to pay attention to when using this pattern. First, you need to be aware of how much data is being sent to the reducers from the mappers, and second you need to be aware of the memory footprint of the object that the reducer builds.

Since records with the grouping key can be scattered anywhere in the data set, pretty much all of data is going to move across the network. For this reason, you will need to pay particular attention to having an adequate number of reducers. The same strategies apply here that are employed in other patterns that shuffle everything over the network.

The next major concern is the possibility of hot spots in the data that could result in an obscenely large record. With large data sets, it is conceivable that a particular output record is going to have a lot of data associated with it. Imagine that for some reason a post on StackOverflow has a million comments associated with it. That would be extremely rare and unlikely, but not in the realm of the impossible. If you are building some sort of XML object, all of those comments at one point might be stored in memory before writing the object out. This can cause you to blow out the heap of the Java Virtual Machine, which obviously should be avoided.

Another problem with hot spots is a skew in how much data each reducer is handling. This is going to be a similar problem in just about any MapReduce job. In many cases the skew can be ignored, but if it really matters you can write a custom partitioner to split the data up more evenly.

Structured to Hierarchical Examples

Post/comment building on StackOverflow

In this example, we will take the posts and comments of the StackOverflow data and group them together. A hierarchy will look something like:

Posts
    Post
      Comment
      Comment
    Post
      Comment
      Comment
      Comment

The following descriptions of each code section explain the solution to the problem.

Problem: Given a list of posts and comments, create a structured XML hierarchy to nest comments with their related post.

Driver code

We don’t usually describe the code for the driver, but in this case we are doing something exotic with MultipleInputs. All we do differently is create a MultipleInputs object and add the comments path and the posts path with their respective mappers. The paths for the posts and comments data are provided via the command line, and the program retrieves them from the args array.

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = new Job(conf, "PostCommentHierarchy");
    job.setJarByClass(PostCommentBuildingDriver.class);

    MultipleInputs.addInputPath(job, new Path(args[0]),
            TextInputFormat.class, PostMapper.class);

    MultipleInputs.addInputPath(job, new Path(args[1]),
            TextInputFormat.class, CommentMapper.class);

    job.setReducerClass(UserJoinReducer.class);

    job.setOutputFormatClass(TextOutputFormat.class);
    TextOutputFormat.setOutputPath(job, new Path(args[2]));

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    System.exit(job.waitForCompletion(true) ? 0 : 2);
}

Mapper code

In this case, there are two mapper classes, one for comments and one for posts. In both, we extract the post ID to use it as the output key. We output the input value prepended with a character (“P” for a post or “C” for a comment) so we know which data set the record came from during the reduce phase.

public static class PostMapper extends Mapper<Object, Text, Text, Text> {

    private Text outkey = new Text();
    private Text outvalue = new Text();

    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {

        Map<String, String> parsed = MRDPUtils.transformXmlToMap(value
                .toString());

        // The foreign join key is the post ID
        outkey.set(parsed.get("Id"));

        // Flag this record for the reducer and then output
        outvalue.set("P" + value.toString());
        context.write(outkey, outvalue);
    }
}

public static class CommentMapper extends Mapper<Object, Text, Text, Text> {
    private Text outkey = new Text();
    private Text outvalue = new Text();

    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {

        Map<String, String> parsed = MRDPUtils.transformXmlToMap(value
                .toString());

        // The foreign join key is the post ID
        outkey.set(parsed.get("PostId"));

        // Flag this record for the reducer and then output
        outvalue.set("C" + value.toString());
        context.write(outkey, outvalue);
    }
}

Reducer code

The reducer builds the hierarchical XML object. All the values are iterated to get the post record and collect a list of comments. We know which record is which by the flag we added to the value. These flags are removed when assigning post or adding the list. Then, if the post is not null, an XML record is constructed with the post as the parent and comments as the children.

The implementation of the nestElements follows. We chose to use an XML library to build the final record, but please feel free to use whatever means you deem necessary.

public static class PostCommentHierarchyReducer extends
        Reducer<Text, Text, Text, NullWritable> {

    private ArrayList<String> comments = new ArrayList<String>();
    private DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
    private String post = null;

    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        // Reset variables
        post = null;
        comments.clear();

        // For each input value
        for (Text t : values) {
            // If this is the post record, store it, minus the flag
            if (t.charAt(0) == 'P') {
                post = t.toString().substring(1, t.toString().length())
                        .trim();
            } else {
                // Else, it is a comment record. Add it to the list, minus
                // the flag
                comments.add(t.toString()
                        .substring(1, t.toString().length()).trim());
            }
        }
        // If there are no comments, the comments list will simply be empty.

        // If post is not null, combine post with its comments.
        if (post != null) {
            // nest the comments underneath the post element
            String postWithCommentChildren = nestElements(post, comments);

            // write out the XML
            context.write(new Text(postWithCommentChildren),
                    NullWritable.get());
        }
    }
    ...

The nestElements method takes the post and the list of comments to create a new string of XML to output. It uses a DocumentBuilder and some additional helper methods to copy the Element objects into new ones, in addition to their attributes. This copying occurs to rename the element tags from row to either post or comment. The final Document is then transformed into an XML string.

    private String nestElements(String post, List<String> comments) {
        // Create the new document to build the XML
        DocumentBuilder bldr = dbf.newDocumentBuilder();
        Document doc = bldr.newDocument();

        // Copy parent node to document
        Element postEl = getXmlElementFromString(post);
        Element toAddPostEl = doc.createElement("post");

        // Copy the attributes of the original post element to the new one
        copyAttributesToElement(postEl.getAttributes(), toAddPostEl);

        // For each comment, copy it to the "post" node
        for (String commentXml : comments) {
            Element commentEl = getXmlElementFromString(commentXml);
            Element toAddCommentEl = doc.createElement("comments");

            // Copy the attributes of the original comment element to
            // the new one
            copyAttributesToElement(commentEl.getAttributes(),
                     toAddCommentEl);

            // Add the copied comment to the post element
            toAddPostEl.appendChild(toAddCommentEl);
        }

        // Add the post element to the document
        doc.appendChild(toAddPostEl);

        // Transform the document into a String of XML and return
        return transformDocumentToString(doc);
    }

    private Element getXmlElementFromString(String xml) {
        // Create a new document builder
        DocumentBuilder bldr = dbf.newDocumentBuilder();

        return bldr.parse(new InputSource(new StringReader(xml)))
                .getDocumentElement();
    }

    private void copyAttributesToElement(NamedNodeMap attributes,
            Element element) {

        // For each attribute, copy it to the element
        for (int i = 0; i < attributes.getLength(); ++i) {
            Attr toCopy = (Attr) attributes.item(i);
            element.setAttribute(toCopy.getName(), toCopy.getValue());
        }
    }

    private String transformDocumentToString(Document doc) {

        TransformerFactory tf = TransformerFactory.newInstance();
        Transformer transformer = tf.newTransformer();
        transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION,
                "yes");
        StringWriter writer = new StringWriter();
        transformer.transform(new DOMSource(doc), new StreamResult(
                writer));
        // Replace all new line characters with an empty string to have
        // one record per line.
        return writer.getBuffer().toString().replaceAll("\n|\r", "");
    }
}

Question/answer building on StackOverflow

This is a continuation of the previous example and will use the previous analytic’s output as the input to this analytic. Now that we have the comments associated with the posts, we are going to associate the post answers with the post questions. This needs to be done because posts consist of both answers and questions and are differentiated only by their PostTypeId. We’ll group them together by Id in questions and ParentId in answers.

The main difference between the two applications of this pattern is that in this one we are dealing only with one data set. Effectively, we are using a self-join here to correlate the different records from the same data set.

The following descriptions of each code section explain the solution to the problem.

Problem: Given the output of the previous example, perform a self-join operation to create a question, answer, and comment hierarchy.

Mapper code

The first thing the mapper code does is determine whether the record is a question or an answer, because the behavior for each will be different. For a question, we will extract Id as the key and label it as a question. For an answer, we will extract ParentId as the key and label it as an answer.

public class QuestionAnswerBuildingDriver {

    public static class PostCommentMapper extends
            Mapper<Object, Text, Text, Text> {

        private DocumentBuilderFactory dbf = DocumentBuilderFactory
                .newInstance();
        private Text outkey = new Text();
        private Text outvalue = new Text();

        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {

            // Parse the post/comment XML hierarchy into an Element
            Element post = getXmlElementFromString(value.toString());

            int postType = Integer.parseInt(post.getAttribute("PostTypeId"));

            // If postType is 1, it is a question
            if (postType == 1) {
                outkey.set(post.getAttribute("Id"));
                outvalue.set("Q" + value.toString());
            } else {
                // Else, it is an answer
                outkey.set(post.getAttribute("ParentId"));
                outvalue.set("A" + value.toString());
            }

            context.write(outkey, outvalue);
        }

        private Element getXmlElementFromString(String xml) {
            // same as previous example, Mapper code
        }
    }

Reducer code

The reducer code is very similar to the that in the previous example. It iterates through the input values and grabs the question and answer, being sure to remove the flag. It then nests the answers inside the question in the same fashion as the previous example. The difference is that tags are “question” instead of the “post” and “answer” instead of “comment.” The helper functions are omitted here for brevity. They can be viewed in the previous example.

public static class QuestionAnswerReducer extends
        Reducer<Text, Text, Text, NullWritable> {

    private ArrayList<String> answers = new ArrayList<String>();
    private DocumentBuilderFactory dbf = DocumentBuilderFactory
            .newInstance();
    private String question = null;

    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        // Reset variables
        question = null;
        answers.clear();

        // For each input value
        for (Text t : values) {
            // If this is the post record, store it, minus the flag
            if (t.charAt(0) == 'Q') {
                question = t.toString().substring(1, t.toString().length())
                        .trim();
            } else {
                // Else, it is a comment record. Add it to the list, minus
                // the flag
                answers.add(t.toString()
                        .substring(1, t.toString().length()).trim());
            }
        }

        // If post is not null
        if (question != null) {
            // nest the comments underneath the post element
            String postWithCommentChildren = nestElements(question, answers);

            // write out the XML
            context.write(new Text(postWithCommentChildren),
                    NullWritable.get());
        }
    }

    ... // ommitted helper functions
}
  • Safari Books Online
  • Create BookmarkCreate Bookmark
  • Create Note or TagCreate Note or Tag
  • PrintPrint