Design Pattern - REDUCE Side Join
You will use reduce side join if you are using more than one dataset and both of them are equally big.
Dataset to be used
File Name - customerDetails.txt
Name CustomerId
Example -
Aaron Hawkins,296334
Aaron Smayling,814503
Adam Bellavance,960803
Adam Hart,157942
Adam Shillingsburg,713629
Adrian Barton,525624
Adrian Hane,434995
Adrian Shami,813495
Aaron Smayling,814503
Adam Bellavance,960803
Adam Hart,157942
Adam Shillingsburg,713629
Adrian Barton,525624
Adrian Hane,434995
Adrian Shami,813495
Filename - customerTransaction.txt
transaction details...... , Name , ......
Example -
1,3,13/10/2010,Low,6,261.54,0.04,Regular Air,-213.25,38.94,35,Muhammed MacIntyre,Nunavut,Nunavut,Small Business,Office Supplies,Storage & Organization,"Eldon Base for stackable storage shelf, platinum",Large Box,0.8,20/10/2010
49,293,01/10/2012,High,49,10123.02,0.07,Delivery Truck,457.81,208.16,68.02,Barry French,Nunavut,Nunavut,Consumer,Office Supplies,Appliances,"1.7 Cubic Foot Compact ""Cube"" Office Refrigerators",Jumbo Drum,0.58,02/10/2012
50,293,01/10/2012,High,27,244.57,0.01,Regular Air,46.71,8.69,2.99,Barry French,Nunavut,Nunavut,Consumer,Office Supplies,Binders and Binder Accessories,"Cardinal Slant-D® Ring Binder, Heavy Gauge Vinyl",Small Box,0.39,03/10/2012
80,483,10/07/2011,High,30,4965.7595,0.08,Regular Air,1198.97,195.99,3.99,Clay Rozendal,Nunavut,Nunavut,Corporate,Technology,Telephones and Communication,R380,Small Box,0.58,12/07/2011
3866,27559,30/10/2011,High,38,465.9,0.05,Regular Air,79.34,12.28,4.86,Aaron Hawkins,Nova Scotia,Atlantic,Home Office,Office Supplies,Paper,Xerox 1933,Small Box,0.38,31/10/2011
49,293,01/10/2012,High,49,10123.02,0.07,Delivery Truck,457.81,208.16,68.02,Barry French,Nunavut,Nunavut,Consumer,Office Supplies,Appliances,"1.7 Cubic Foot Compact ""Cube"" Office Refrigerators",Jumbo Drum,0.58,02/10/2012
50,293,01/10/2012,High,27,244.57,0.01,Regular Air,46.71,8.69,2.99,Barry French,Nunavut,Nunavut,Consumer,Office Supplies,Binders and Binder Accessories,"Cardinal Slant-D® Ring Binder, Heavy Gauge Vinyl",Small Box,0.39,03/10/2012
80,483,10/07/2011,High,30,4965.7595,0.08,Regular Air,1198.97,195.99,3.99,Clay Rozendal,Nunavut,Nunavut,Corporate,Technology,Telephones and Communication,R380,Small Box,0.58,12/07/2011
3866,27559,30/10/2011,High,38,465.9,0.05,Regular Air,79.34,12.28,4.86,Aaron Hawkins,Nova Scotia,Atlantic,Home Office,Office Supplies,Paper,Xerox 1933,Small Box,0.38,31/10/2011
Here we will attach some marker to output values of each mapper so that in Reducer we can identify which mapper has emitted that output.
MapReduce Program :
package MapReduce;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleInputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class CustomerTransactions extends Configured implements Tool {
public int run(String[] args) throws Exception {
JobConf job = new JobConf(getConf(), CustomerTransactions.class);
job.setJobName("Customer Transactions");
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, CustomerTransactionsMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, CustomerDetailsMapper.class);
job.setReducerClass(SumReducer.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
try {
JobClient.runJob(job);
} catch (Exception e) {
e.printStackTrace();
}
return 0;
}
public static class CustomerDetailsMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
// taking one line/record at a time and parsing them into key
// value
// pairs
String line = value.toString();
String splitarray[] = line.split(",");
// sending the key value pair out of mapper
output.collect(new Text(splitarray[0]), new Text("CD~" + line.toString()));
}
}
public static class CustomerTransactionsMapper extends MapReduceBase
implements Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
// taking one line/record at a time and parsing them into key
// value
// pairs
String line = value.toString();
String splitarray[] = line.split(",");
// sending the key value pair out of mapper
output.collect(new Text(splitarray[11]), new Text("TD~" + line.toString()));
}
}
public static class SumReducer extends MapReduceBase implements Reducer<Text, Text, Text, DoubleWritable> {
String customerName;
Double transactionValue = 0.0, sumOfTransactionValues = 0.0;
String transactions[];
String details[];
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, DoubleWritable> output,
Reporter reporter) throws IOException {
while (values.hasNext()) {
String currValue = values.next().toString();
String splittedValue[] = currValue.split("~");
if (splittedValue[0].equals("CD")) {
if (splittedValue[1].contains(","))
{
details = splittedValue[1].trim().toString().split(",");
customerName = details[0].toString().trim();
}
else
{
customerName = splittedValue[0].toString().trim();
}
} else if (splittedValue[0].equals("TD")) {
transactions = splittedValue[1].trim().toString().split(",");
transactionValue = Double.parseDouble(transactions[5].toString());
}
try {
sumOfTransactionValues += transactionValue;
} catch (Exception e) {
System.out.println(splittedValue[1].toString());
}
}
try {
if (customerName != null && sumOfTransactionValues != null) {
output.collect(new Text(customerName), new DoubleWritable(sumOfTransactionValues));
}
else if (customerName == null && sumOfTransactionValues != null) {
output.collect(new Text("Customer Name"), new DoubleWritable(sumOfTransactionValues));
}
} catch (Exception e) {
System.out.println(values.toString());
}
}
}
public static void main(String[] args) throws Exception {
File directory = new File(args[2].toString());
if (directory.exists()) {
//directory.delete();
}
int res = ToolRunner.run(new Configuration(), new CustomerTransactions(), args);
System.exit(res);
}
}
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleInputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class CustomerTransactions extends Configured implements Tool {
public int run(String[] args) throws Exception {
JobConf job = new JobConf(getConf(), CustomerTransactions.class);
job.setJobName("Customer Transactions");
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, CustomerTransactionsMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, CustomerDetailsMapper.class);
job.setReducerClass(SumReducer.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
try {
JobClient.runJob(job);
} catch (Exception e) {
e.printStackTrace();
}
return 0;
}
public static class CustomerDetailsMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
// taking one line/record at a time and parsing them into key
// value
// pairs
String line = value.toString();
String splitarray[] = line.split(",");
// sending the key value pair out of mapper
output.collect(new Text(splitarray[0]), new Text("CD~" + line.toString()));
}
}
public static class CustomerTransactionsMapper extends MapReduceBase
implements Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
// taking one line/record at a time and parsing them into key
// value
// pairs
String line = value.toString();
String splitarray[] = line.split(",");
// sending the key value pair out of mapper
output.collect(new Text(splitarray[11]), new Text("TD~" + line.toString()));
}
}
public static class SumReducer extends MapReduceBase implements Reducer<Text, Text, Text, DoubleWritable> {
String customerName;
Double transactionValue = 0.0, sumOfTransactionValues = 0.0;
String transactions[];
String details[];
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, DoubleWritable> output,
Reporter reporter) throws IOException {
while (values.hasNext()) {
String currValue = values.next().toString();
String splittedValue[] = currValue.split("~");
if (splittedValue[0].equals("CD")) {
if (splittedValue[1].contains(","))
{
details = splittedValue[1].trim().toString().split(",");
customerName = details[0].toString().trim();
}
else
{
customerName = splittedValue[0].toString().trim();
}
} else if (splittedValue[0].equals("TD")) {
transactions = splittedValue[1].trim().toString().split(",");
transactionValue = Double.parseDouble(transactions[5].toString());
}
try {
sumOfTransactionValues += transactionValue;
} catch (Exception e) {
System.out.println(splittedValue[1].toString());
}
}
try {
if (customerName != null && sumOfTransactionValues != null) {
output.collect(new Text(customerName), new DoubleWritable(sumOfTransactionValues));
}
else if (customerName == null && sumOfTransactionValues != null) {
output.collect(new Text("Customer Name"), new DoubleWritable(sumOfTransactionValues));
}
} catch (Exception e) {
System.out.println(values.toString());
}
}
}
public static void main(String[] args) throws Exception {
File directory = new File(args[2].toString());
if (directory.exists()) {
//directory.delete();
}
int res = ToolRunner.run(new Configuration(), new CustomerTransactions(), args);
System.exit(res);
}
}