Saturday 12 November 2011

Running partitional clustering using hadoop map reduce



Here I will be discussing k means clustering which I implemented using hadoop map reduce.

The k means Clustering:
We need vectors to represent our data. We also need k centers to start. These centers are vectors too, sometimes they are just a subset of the input vectors, but sometimes they are random points or points-of-interest to which we are going to cluster them. Since this is a MapReduce version we have to figure out what keys and values we are using. We are just using a vector, a vector can be a clustercenter as well. So we treat our clustercenter-vectors always like keys, and the input vectors are simple values. The clustering itself works like this:

In the map step
 Read the cluster centers into memory from a sequencefile
 Iterate over each cluster center for each input key/value pair.
 Measure the distances and save the nearest center which has the lowest distance to the vector
 Write the clustercenter with its vector to the filesystem.

In the reduce step
 Iterate over each value vector and calculate the average vector. (Sum each vector and divide each part by the number of vectors we received).
 This is the new center, save it into a SequenceFile.
 Check the convergence between the clustercenter that is stored in the key object and the new center.
 If it they are not equal, increment an update counter
 Run this whole thing until nothing was updated anymore.


LIST PROCESSING
Conceptually, MapReduce programs transform lists of input data elements into lists of output data elements. A MapReduce program will do this twice, using two different list processing idioms: map, and reduce.

MAPPING LISTS
The first phase of a MapReduce program is called mapping. A list of data elements are provided, one at a time, to a function called the Mapper, which transforms each element individually to an output data element.

REDUCING LISTS
 Reducing lets you aggregate values together. A reducer function receives an iterator of input values from an input list. It then combines these values together, returning a single output value.

MapReduce Data Flow
Now that we have seen the components that make up a basic MapReduce job, we can see how everything works together at a higher level: It is to be noted that a single node can run many mapper processes at the same time.


The distance measurement
We need a measurement of a distance between two vectors, especially between a center and a vector. I've came up with the manhattan distance because it doesn't require much computation overhead like square-rooting (Euclidian distance) and it is not too complex. Let's have a look:


 InputSplits:
 An InputSplit describes a unit of work that comprises a single map task in a MapReduce program. A MapReduce program applied to a data set, collectively referred to as a Job, is made up of several (possibly several hundred) tasks. Map tasks may involve reading a whole file; they often involve reading only part of a file. By default, the FileInputFormat and its descendants break a file up into 64 MB chunks (the same size as blocks in HDFS). You can control this value by setting themapred.min.split.size parameter in hadoop-site.xml, or by overriding the parameter in theJobConf object used to submit a particular MapReduce job. By processing a file in chunks, we allow several map tasks to operate on a single file in parallel. If the file is very large, this can improve performance significantly through parallelism. Even more importantly, since the various blocks that make up the file may be spread across several different nodes in the cluster, it allows tasks to be scheduled on each of these different nodes; the individual blocks are thus all processed locally, instead of needing to be transferred from one node to another. Of course, while log files can be processed in this piece-wise fashion, some file formats are not amenable to chunked processing. By writing a custom InputFormat, you can control how the file is broken up (or is not broken up) into splits. The InputFormat defines the list of tasks that make up the mapping phase; each task corresponds to a single input split. The tasks are then assigned to the nodes in the system based on where the input file chunks are physically resident. An individual node may have several dozen tasks assigned to it. The node will begin working on the tasks, attempting to perform as many in parallel as it can. The on-node parallelism is controlled by the mapred.tasktracker.map.tasks.maximum parameter.


The Mapper
Let's assume that there is a list or a list-like sequencefile-iterating interface that is called centers. It contains ClusterCenter objects that represent the current centers. The DistanceMeasurer class contains the static method we defined previously.

It's just a looping and a measuring, always keeping a reference to the nearest center. Afterwards we are writing it out.


The Reducer
 Once again let's have a list or a list-like sequence file-iterating interface that is called centers. Here we need it for storage reasons.
 Let me explain:
 The first loop only dumps the values in the iterable into a list and sums up each component of the vector in a newly created center. Then we are averaging it in another loop and we are writing the new center along with each vector we held in memory the whole time. Afterwards we are just checking if the vector has changed, this method is just a delegating to the underlying vectors compareTo. If the centers are not equal it returns true. And therefore it updates an counter. Controlling Hadoop MapReduce Job recursion First off you should know that in Hadoop you have counters, you may see them after a job ran or in the Webinterface of the Jobtracker. "Famous" counters are the "Map input records" or the "Reduce output records". The best of all is that we can setup our own counters, just with the use of enums.


How to setup Counter?
The simplest approach is to just define an enum like this: public enum UpdateCounter { UPDATED } Now you can manipulate the counter using: context.getCounter(UpdateCounter.UPDATED).increment(1);
"context" is the context object you get from your mapper or your reducer. This line will obviously increment your update counter by 1.

 How to fetch the counter?
This is as easy as setting up an enum. You are submitting a job like this:
Configuration conf = new Configuration(); Job job = new Job(conf); job.setJobName("Graph explorer"); job.setMapperClass(DatasetImporter.class);
job.setReducerClass(ExplorationReducer.class); // leave out the stuff with paths etc. job.waitForCompletion(true);

You can access your counter like this:
long counter = job.getCounters().findCounter(ExplorationReducer.UpdateCounter.UPDATED) .getValue();


 How to get the recursion running?
 Now we know how to get the counter. Now setting up a recursion is quite simple. The only thing that you should watch for is already existing paths from older job runs.


Quick Summary:
 In my code I have taken a small data set, so the number of mapper tasks is 1 because the input file does not exceed 64 MB limit as by default the input file is split into chunks of 64 MB though we can very well change this limit. The recursion is controlled by counters as shown in the above section. Initially I took 2 random cluster centers (hence k=2 in k means clustering). Then I implemented WritableComparable interface for keys as well as values (i.e. the vector class and the clustercenter class in the code). Then I implemented the k means clustering using map reduce. After running the task, I found that the data set converged in 3 interations which is quite small because the data set was very small hence the beauty of map reduce could not be appreciated. Had the data set been larger(of the order of 100 MB or more), there would have been significant performance improvement by running the task using hadoop map reduce.

9 comments:

  1. Could you plz send me the complete code ?

    ReplyDelete
  2. I could not understand one thing: How is recursion is controlled ?

    ReplyDelete
  3. Just concentrate on the following code segment. You will get the idea .

    SequenceFileOutputFormat.setOutputPath(job, out); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(ClusterCenter.class); job.setOutputValueClass(Vector.class); job.waitForCompletion(true); long counter = job.getCounters().findCounter(KMeansReducer.Counter.CONVERGED).getValue(); iteration++; while (counter > 0) { conf = new Configuration(); conf.set("centroid.path", center.toString()); conf.set("num.iteration", iteration + ""); job = new Job(conf); job.setJobName("KMeans Clustering " + iteration); job.setMapperClass(KMeansMapper.class);
    job.setReducerClass(KMeansReducer.class); job.setJarByClass(KMeansMapper.class); in = new Path("files/clustering/depth_" + (iteration - 1) + "/"); out = new Path("files/clustering/depth_" + iteration); SequenceFileInputFormat.addInputPath(job, in); if (fs.exists(out)) fs.delete(out, true); SequenceFileOutputFormat.setOutputPath(job, out); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(ClusterCenter.class); job.setOutputValueClass(Vector.class); job.waitForCompletion(true); iteration++; counter = job.getCounters().findCounter(KMeansReducer.Counter.CONVERGED).getValue(); }

    ReplyDelete
  4. very good post. can you send me the complete code. my mail id is vmsrinivas82@yahoomail.com

    ReplyDelete
    Replies
    1. Thanks.
      Surely I would send you the code by tomorrow... :)

      Delete
  5. Hi Ankit,

    Very nice description of k Mean clustering on map reduce. Can you please send me the complete code to wmahendra@gmail.com

    ReplyDelete
    Replies
    1. Ya sure. I would search for the code and get back to you by this weekend.

      Delete