Design Pattern - MAP Side Join
You will use mapside join if one of your table can fit in memory which will reduce overhead on your sort and shuffle data.
Prerequisites:
- Data should be partitioned and sorted in particular way.
- Each input data should be divided in same number of partition.
- Must be sorted with same key.
- All the records for a particular key must reside in the same partition.
Dataset to be used
File Name - u.item
u.item -- Information about the items (movies); this is a tab separated
list of
movie id | movie title | release date | video release date |
IMDb URL | unknown | Action | Adventure | Animation |
Children's | Comedy | Crime | Documentary | Drama | Fantasy |
Film-Noir | Horror | Musical | Mystery | Romance | Sci-Fi |
Thriller | War | Western |
The last 19 fields are the genres, a 1 indicates the movie
is of that genre, a 0 indicates it is not; movies can be in
several genres at once.
The movie ids are the ones used in the u.data data set.
Example -
1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
2|GoldenEye (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?GoldenEye%20(1995)|0|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
3|Four Rooms (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995)|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
4|Get Shorty (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Get%20Shorty%20(1995)|0|1|0|0|0|1|0|0|1|0|0|0|0|0|0|0|0|0|0
5|Copycat (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Copycat%20(1995)|0|0|0|0|0|0|1|0|1|0|0|0|0|0|0|0|1|0|0
Filename - u.data
u.data -- The full u data set, 100000 ratings by 943 users on 1682 items.
Each user has rated at least 20 movies. Users and items are
numbered consecutively from 1. The data is randomly
ordered. This is a tab separated list of
user id | item id | rating | timestamp.
The time stamps are unix seconds since 1/1/1970 UTC
Example -
196 242 3 881250949
186 302 3 891717742
22 377 1 878887116
244 51 2 880606923
166 346 1 886397596
MapReduce Program :
package MovieLens;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MovieLens extends Configured implements Tool {
Configuration conf = new Configuration();
@Override
public Configuration getConf() {
return conf;
}
@Override
public void setConf(Configuration arg0) {
this.conf = arg0;
}
/***************************************************************
* Mapper: RatingMapper
***************************************************************/
public static class RatingMapper extends
Mapper<LongWritable, Text, Text, DoubleWritable> {
private static HashMap<Integer, String> ItemMap = new HashMap<Integer, String>();
private BufferedReader brReader;
private String strItemName = "";
enum MYCOUNTER {
RECORD_COUNT, FILE_EXISTS, FILE_NOT_FOUND, SOME_OTHER_ERROR
}
protected void setup(Context context) throws IOException,
InterruptedException {
URI[] cacheFilesLocal = context.getCacheFiles();
for (URI pathIterator : cacheFilesLocal) {
Path eachPath = new Path(pathIterator);
if (eachPath.getName().toString().trim().equals("u.item")) {
context.getCounter(MYCOUNTER.FILE_EXISTS).increment(1);
loadItemHashMap(eachPath, context);
}
}
}
private void loadItemHashMap(Path filePath, Context context)
throws IOException {
String strLineRead = "";
try {
brReader = new BufferedReader(new FileReader(
filePath.toString()));
// Read each line, split and load to HashMap
while ((strLineRead = brReader.readLine()) != null) {
String itemArray[] = strLineRead.split("\\|");
if (itemArray[0].trim() != "" && itemArray[1].trim() != "")
ItemMap.put(Integer.parseInt(itemArray[0].trim()),
itemArray[1].trim());
}
} catch (FileNotFoundException e) {
e.printStackTrace();
context.getCounter(MYCOUNTER.FILE_NOT_FOUND).increment(1);
} catch (IOException e) {
context.getCounter(MYCOUNTER.SOME_OTHER_ERROR).increment(1);
e.printStackTrace();
} finally {
if (brReader != null) {
brReader.close();
}
}
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
strItemName = "";
if (value.toString().length() > 0) {
String arrRating[] = value.toString().split("\t");
strItemName = ItemMap.get(Integer.parseInt(arrRating[1]
.toString().trim()));
if (strItemName == null || strItemName.isEmpty() == true)
strItemName = "Movie Not Found";
context.write(
new Text(strItemName),
new DoubleWritable(Integer.parseInt(arrRating[2]
.toString())));
}
}
}
/***************************************************************
* Mapper: ItemMapper
***************************************************************/
public static class ItemMapper extends
Mapper<LongWritable, Text, Text, NullWritable> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if (value.toString().length() > 0) {
String arrCustAttributes[] = value.toString().split(",");
context.write(new Text(arrCustAttributes[12].toString()),
NullWritable.get());
}
}
}
/***************************************************************
* Driver function
*****************************************************************/
@SuppressWarnings("deprecation")
public int run(String[] args) throws Exception {
Job job = new Job(conf, "Movie Lens");
MultipleInputs.addInputPath(job, new Path(args[1].toString()),
TextInputFormat.class, RatingMapper.class);
job.setJarByClass(MovieLens.class);
job.addCacheFile(new Path(args[0]).toUri());
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setReducerClass(AverageRatingReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
try {
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
return 0;
}
/****************************************
* BasicReducer
* **************************************/
public static class AverageRatingReducer extends
Reducer<Text, DoubleWritable, Text, DoubleWritable> {
Double individualRating = 0d;
Double countRating = 0d;
Double avgRating = 0d;
@Override
public void reduce(Text key, Iterable<DoubleWritable> values,
Context context) throws IOException, InterruptedException {
for (DoubleWritable value : values) {
countRating++;
individualRating += Double.parseDouble(value.toString());
}
avgRating = individualRating / countRating;
context.write(key, new DoubleWritable(avgRating));
}
}
public static void main(String[] args) throws Exception {
File directory = new File(args[2].toString());
if (directory.exists()) {
directory.delete();
}
int res = ToolRunner.run(new Configuration(), new MovieLens(), args);
System.exit(res);
}
}
Output will be like this :
Til There Was You (1997) 2.3333333333333335
1-900 (1994) 2.4285714285714284
101 Dalmatians (1996) 2.8536585365853657
12 Angry Men (1957) 3.6048387096774195
187 (1997) 3.5224913494809686
No comments:
Post a Comment