Blog

Scaling up Cassandra and Mahout with Hadoop

By Sean Owen

14 Aug 2011
Category: Technical Articles

Introduction 

My last article, "Recommending (from) Cassandra", introduced the possibility of learning from all of that data that you've been squirreling away in your Apache Cassandra instance. Using another Apache library, Mahout, you saw how to create a rough-and-ready recommender engine on top of a Cassandra keyspace. Using this pattern, you could learn associations from just about any type of entity to any other entity - not just real people and physical items.

The resulting recommender engine was simple and even computed and updated its results in real-time. This came at a price - to accomplish this it relied heavily on caching data from Cassandra in memory, as the machine learning algorithms at work were exceptionally data-intensive. This need to keep data in memory in order to maintain performance meant that the recommender would eventually run out of Java heap space at scale. Past about 100 million associations, any normally-sized server would not have enough memory to support a large enough cache.

And in big data, that is not cool. It's not fashionable to have any hard limit on input size.

MapReduce to the Rescue

Fortunately, the previous article was exploring only a part of Mahout, and one that is not designed for arbitrarily large scale. In contrast, most of Mahout's algorithms are written for larger scale. They're also written in an entirely different way, within the distributed computing paradigm MapReduce, as implemented by Apache Hadoop

MapReduce is a popular way of structuring distributed computations, but is quite a different world from a simple Java program. The single biggest difference is that, in general, within the MapReduce paradigm, you can only access a small slice of all input data at any one time. Algorithms and processes must be structured to figure out the final output by piecing together results of processing bits of the input independently.

This makes things harder, but it also means that frameworks like Hadoop can run such MapReduce-oriented processes on small pieces of the total input at a time. This means you no longer need one huge machine to fit the entire input in memory at once, which solves the scale limitation problem above. And, as a big bonus, it means that Hadoop can run these bits of computation on hundreds or thousands of machines simultaneously, and complete in a fraction of the time that even a massively power single computer could. 

And, fortunately, Hadoop can take its input from Cassandra instances out of the box. You can plug Cassandra data into the Mahout / Hadoop combination to allow for recommender engine computations on much larger sets of data.

Preparing to Distribute

Fair warning: this is going to be significantly more complex than setting up a non-distributed recommender was. It will require some knowledge of Hadoop, and hacking Mahout code. Non-developers can stop here, secure in the knowledge that machine learning and Cassandra and large input can go hand-in-hand.

The first thing you will need is a fresh copy of Mahout, including source code. And for this, you will need a client for Subversion, the version control system used by Mahout. I strongly recommend using a free IDE like Eclipse or IntelliJ Community Edition, which already support Subversion. Use them to check out the latest code from Mahout version control at http://svn.apache.org/repos/asf/mahout/trunk.

You will next need the capability to build Mahout. Mahout uses Apache Maven for builds. Fortunately, the two IDEs I mentioned above can automatically understand and import the project in its entirety if you will just point them at the directory where you checked out Mahout's "trunk" directory.

You should now be able to browse and build Mahout with your IDE.

Good news: you don't have to install or set up Hadoop itself in order to experiment with Hadoop here. Mahout can use a simple, local instance of one node. In reality, you would of course be setting up a Hadoop cluster of many machines and making use of that. 

Populating Cassandra 

For input, you're going to again use data from GroupLens, first described in the previous article. However, you're going to process less data here, because you are actually going to run recommendations for every user in the data set at once using a Hadoop cluster of one node: your computer. In order for it to finish in just minutes rather than hours, you will use the 100,000 rating data set instead. 

Download this smaller data set, and locate the u.data file inside. This also needs a small translation to CSV format before it can be loaded into Cassandra since this file is delimited by tabs instead of commas. Use a text editor to replace tabs with commas, or, use the Unix-style command line command tr '\t' ',' < u.data > u.csv . Refer to the code snippet in the previous article for code that can load the data into Cassandra. Once the data is loaded into Cassandra, you can proceed.

Wiring Mahout for Cassandra Access 

An algorithm or process of any complexity can't be implemented with one single MapReduce job. Instead, it will require a series of chained, related jobs. Each of these jobs must be configured and launched on Hadoop. So, how do you run jobs on Hadoop?

Fortunately Mahout provides "Job" classes, which are runnable Java programs themselves, which configure the right MapReduce jobs and send the work to a Hadoop cluster for processing. Mahout has many MapReduce-based algorithms; you are interested an implementation of an item-based recommender system found in package org.apache.mahout.cf.taste.hadoop.item. It contains a great deal of code implementing the algorithm, but the only class you need to pay attention to is the one which will manage the work on Hadoop: RecommenderJob.

This class is ready to run. However, it presumes that data will be read from a location on an HDFS (Hadoop Distributed File System) cluster. You want it to read from Cassandra, and this will take a few tweaks. For brevity, the changes described here are quick hacks to get it working, and not a proper integration.

First, find the file core/pom.xml from the Mahout project root. In the section with <dependency> tags, add a new one:

    <dependency>

      <groupId>org.apache.cassandra</groupId>

      <artifactId>cassandra-all</artifactId>

    </dependency>

This temporarily gives the core module access to Cassandra libraries. Now find the line where the first MapReduce is configured and invoked; it's around line 157. Change the first two stanzas as follows:

    AtomicInteger currentPhase = new AtomicInteger();

 

    ByteBuffer noValue = ByteBuffer.wrap(new byte[0]);

 

    if (shouldRunNextPhase(parsedArgs, currentPhase)) {

      Job itemIDIndex = prepareJob(

        inputPath, itemIDIndexPath, ColumnFamilyInputFormat.class,

        ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class,

        ItemIDIndexReducer.class, VarIntWritable.class, VarLongWritable.class,

        SequenceFileOutputFormat.class);

      SlicePredicate slice = new SlicePredicate();

      slice.setSlice_range(new SliceRange(noValue, noValue, false, Integer.MAX_VALUE));

      Configuration conf = itemIDIndex.getConfiguration();

      ConfigHelper.setInitialAddress(conf, "localhost");

      ConfigHelper.setRpcPort(conf, "9160");

      ConfigHelper.setInputSlicePredicate(conf, slice);

      ConfigHelper.setInputColumnFamily(conf, "recommender", "users");

      itemIDIndex.setCombinerClass(ItemIDIndexReducer.class);

      itemIDIndex.waitForCompletion(true);

    }

 

    int numberOfUsers = 0;

    if (shouldRunNextPhase(parsedArgs, currentPhase)) {

      Job toUserVector = prepareJob(

        inputPath, userVectorPath, ColumnFamilyInputFormat.class,

        ToItemPrefsMapper.class, VarLongWritable.class, booleanData ? VarLongWritable.class : EntityPrefWritable.class,

        ToUserVectorReducer.class, VarLongWritable.class, VectorWritable.class,

        SequenceFileOutputFormat.class);

      SlicePredicate slice = new SlicePredicate();

      slice.setSlice_range(new SliceRange(noValue, noValue, false, Integer.MAX_VALUE));

      Configuration conf = toUserVector.getConfiguration();

      ConfigHelper.setInitialAddress(conf, "localhost");

      ConfigHelper.setRpcPort(conf, "9160");

      ConfigHelper.setInputSlicePredicate(conf, slice);

      ConfigHelper.setInputColumnFamily(conf, "recommender", "users");

      toUserVector.getConfiguration().setBoolean(BOOLEAN_DATA, booleanData);

      toUserVector.getConfiguration().setInt(ToUserVectorReducer.MIN_PREFERENCES_PER_USER, minPrefsPerUser);

      toUserVector.waitForCompletion(true);

      numberOfUsers = (int) toUserVector.getCounters().findCounter(ToUserVectorReducer.Counters.USERS).getValue();

    }

You will need to add imports for new classes referenced in this code; the IDE should help you do so.

This is almost the only code that needs to change; it's just the first phases that need to know they are reading from Cassandra. However the result of reading from Cassandra is a little different than reading from a text file. You need to change the two Mapper classes that read the input too. Find ItemIDIndexMapper and change its code to:

    public final class ItemIDIndexMapper extends

        Mapper<ByteBuffer,SortedMap<ByteBuffer,IColumn>, VarIntWritable, VarLongWritable> {

      @Override

      protected void map(ByteBuffer key,

                         SortedMap<ByteBuffer,IColumn> value,

                         Context context) throws IOException, InterruptedException {

        for (ByteBuffer itemIDBytes : value.keySet()) {

          long itemID = itemIDBytes.asLongBuffer().get();

          int index = TasteHadoopUtils.idToIndex(itemID);

          context.write(new VarIntWritable(index), new VarLongWritable(itemID));

        }

      }  

    }

Then, find and change class ToEntityPrefsMapper to:

    public abstract class ToEntityPrefsMapper extends

        Mapper<ByteBuffer,SortedMap<ByteBuffer,IColumn>,VarLongWritable,VarLongWritable> {

      public static final String TRANSPOSE_USER_ITEM = "transposeUserItem";

 

      ToEntityPrefsMapper(boolean itemKey) {

      }

 

      @Override

      public void map(ByteBuffer key,

                      SortedMap<ByteBuffer,IColumn> value,

                      Context context) throws IOException, InterruptedException {

        long userID = key.asLongBuffer().get();

        for (Map.Entry<ByteBuffer,IColumn> entry : value.entrySet()) {

          long itemID = entry.getKey().asLongBuffer().get();

          float prefValue = entry.getValue().value().asFloatBuffer().get();

          context.write(new VarLongWritable(itemID), new EntityPrefWritable(userID, prefValue));

        }

      }

    }

Finally, rebuild Mahout. You will notice some test code fails to compile. For our purposes, just delete any line that does not compile and rebuild, as we do not need tests now.

Running Recommendations from Cassandra

At last, you are ready to run recommendations. You will now need to simply run RecommenderJob from within your IDE. It needs two command-line arguments to specify both the input column family in Cassandra, and an output directory on your local file system. I suggest using the directory /tmp for output, but you can use any directory you like. Specify arguments:

    -Dmapred.input.dir=users -Dmapred.output.dir=file:///tmp/out

In addition, you will want to tell the IDE to use /tmp as the program's working directory, and specify an additional argument to the JVM to allow it plenty of memory: -Xmx1g. Run the program!

You will see a flurry of output from Hadoop as the local cluster computes all item-item similarities, and computes recommendations. For convenience, the command-line arguments you specified have sent the result to /tmp/out on your local machine. Within you will find a file called part-r-00000. This is Hadoop's normal convention, and with much more input, you would find many huge files here.

For this Mahout job and this input, however, there is just the one simple text file. You can open it to find it contains recommendations for all 6,040 users in the data set:

1       [943:5.0,942:5.0,940:5.0,939:5.0,938:5.0,937:5.0,936:5.0,935:5.0,934:5.0,933:5.0]    

2       [554:4.5263157,6:4.5116277,608:4.4883723,741:4.4666667,189:4.428571,90:4.361244,500:4.357143,72:4.3522725,456:4.351456,198:4.3469386]

3       [833:5.0,599:5.0,552:5.0,548:5.0,545:5.0,541:5.0,540:5.0,537:5.0,843:5.0,534:5.0]

The first value on each line is a user ID; the remainder are IDs of recommended items, followed each by an estimated rating for that item. They are all near 5.0 in this data set, since the input was actually ratings for movies on a scale of 1 to 5.

 

More Data, More Algorithms

Savor the result for a moment. While you were only processing 100,000 data points on one machine, take some satisfaction in knowing that it's not really any different to process a trillion data points on a  cluster of 1,000 machines. It takes some setup and preparation, but machine learning at scale from your Cassandra data is certainly within easy reach.

While Mahout itself generally reads and writes text files and Hadoop's SequenceFile format, a similar surgery can be performed on its other algorithms to get them to read information from Cassandra. This gives you access to clustering, classification and more at scale in Mahout. Of course, Cassandra can also be the destination for the output or even intermediate results; this article is already too long to spend time showing this as well. And of course, all of this works more out-of-the-box when reading and writing from HDFS. You don't have to use Cassandra to use Mahout.

blog comments powered by Disqus