
订单数据表order.txt (分隔符是一个tab 这里未做修改)
id pid amount
1001 01 1
1002 02 2
1003 03 3
1001 01 1
1002 02 2
1003 03 3
商品信息表pd.txt
pid pname
01 小米
02 华为
03 格力
最终数据:将商品信息表中的pname根据pid合并到订单数据表中
id pname amount
1001 小米 1
1001 小米 1
1002 华为 2
1002 华为 2
1003 格力 3
1003 格力 3
package com.art.mapreduce.reducejoin;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class OrderPdBean implements Writable {
// 1. 属性
private String orderId;
private String pid;
private int amount;
private String pname;
private String tname;
// 2. 反序列化的时候一定要空参构造函数 cmd+N 语法糖super();调用父类的构造函数
public OrderPdBean() {
super();
}
// 有参构造函数
public OrderPdBean(String orderId, String pid, int amount, String pname, String tname) {
super();
this.orderId = orderId;
this.pid = pid;
this.amount = amount;
this.pname = pname;
this.tname = tname;
}
// 3. 写序列化方法 接口一定要重写的方法
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeUTF(pid);
out.writeInt(amount);
out.writeUTF(pname);
out.writeUTF(tname);
}
// 4. 重写读序列化方法 顺序一定要和写序列化方法一致(队列)
@Override
public void readFields(DataInput in) throws IOException {
this.orderId = in.readUTF();
this.pid = in.readUTF();
this.amount = in.readInt();
this.pname = in.readUTF();
this.tname = in.readUTF();
}
// 5.get+set方法 cmd+n
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
public String getTname() {
return tname;
}
public void setTname(String tname) {
this.tname = tname;
}
// 6. toString cmd+n
@Override
public String toString() {
return "OrderPdBean{" +
"orderId='" + orderId + ''' +
", pid='" + pid + ''' +
", amount=" + amount +
", pname='" + pname + ''' +
", tname='" + tname + ''' +
'}';
}
}
ReduceJoinMapper.java
package com.art.mapreduce.reducejoin; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; import static java.lang.System.exit; public class ReduceJoinMapper extends MapperReduceJoinReducer.java{ // 2.0 属性 主要为了写出去的k和v String tname; OrderPdBean OPBean = new OrderPdBean(); Text k = new Text(); // 2.1 通过setup方法获取文件名 @Override protected void setup(Context context) throws IOException, InterruptedException { // 要debug一下每个context里存了写什么 地址 // super.setup(context); FileSplit fileSplit = (FileSplit) context.getInputSplit();// 注意:1.用的是getInputSplit 2.类型从InputSplit改成了FileSplit tname = fileSplit.getPath().getName(); } // 2.2 通过map方法标记每行数据 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 2.2.1 获取一行数据 String line = value.toString(); String[] fields; if (tname.contains("order")) { // 这里是写死的 可考虑优化 fields = line.split("t"); k.set(fields[1]); OPBean.setOrderId(fields[0]); OPBean.setPid(fields[1]); OPBean.setAmount(Integer.parseInt(fields[2])); OPBean.setPname(""); OPBean.setTname("order"); } else if (tname.contains("pd")) { fields = line.split("t"); k.set(fields[0]); OPBean.setOrderId(""); OPBean.setPid(fields[0]); OPBean.setAmount(0); OPBean.setPname(fields[1]); OPBean.setTname("pd"); } else { System.out.println("既不是order文件也不是pd文件"); exit(1); } context.write(k, OPBean); } }
package com.art.mapreduce.reducejoin; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import static java.lang.System.exit; public class ReduceJoinReducer extends ReducerReduceJoinDriver.java{ // 3.1 属性 // 3.1.1 存放所有的order对象list ArrayList orderBeans = new ArrayList (); // 3.1.2 存放pd对象(观察数据:相同的key传入一个reduce,此时pd只有一条数据,维表) OrderPdBean pdBean = new OrderPdBean(); // 3.1.3 由于for里面的orderBean是引用而非对象,不能起到真正的copy 所以要new一个tempBean来存储写入 OrderPdBean tempBean = new OrderPdBean(); // 3.2 重写reduce方法 @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // super.reduce(key, values, context); for (OrderPdBean orderPdBean : values) { // 3.2.1 订单表的放进数组内存 (优化思考:数据倾斜或内存满,考虑map端做combine) if ("order".equals(orderPdBean.getTname())) { try { BeanUtils.copyProperties(tempBean, orderPdBean); orderBeans.add(tempBean); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } // 3.2.2 产品表的放进产品bean } else if ("pd".equals(orderPdBean.getTname())) { try { BeanUtils.copyProperties(pdBean, orderPdBean); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } else { System.out.println("非打标order或者pd的数据"); exit(1); } } // 3.2.3 遍历orderBeans列表 将pname替换为pdBean的pname for (OrderPdBean orderBean : orderBeans) { try { BeanUtils.copyProperty(orderBean, "pname", pdBean.getPname()); // 3.2.4 数据写出去 context.write(orderBean, NullWritable.get()); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } }
package com.art.mapreduce.reducejoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class ReduceJoinDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"/Users/art/Documents/#Java_original_script/hadoop_mr_0000/mr_reducejoin_0000/src/main/java/com/art/mapreduce/reducejoin/datas/inputs", "/Users/21/Documents/#Java_original_script/hadoop_mr_0000/mr_reducejoin_0000/src/main/java/com/art/mapreduce/reducejoin/datas/outputs"};
// 1. 创建一个job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2. 设置map类 map写出kv类
job.setMapperClass(ReduceJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(OrderPdBean.class);
// 3. 设置reduce类 reduce写出kv类
job.setReducerClass(ReduceJoinReducer.class);
job.setOutputKeyClass(OrderPdBean.class);
job.setOutputValueClass(NullWritable.class);
// 4. 指定本程序jar包所在的路径
job.setJarByClass(ReduceJoinDriver.class);
// 5. 设置读写文件路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6. 将job中的配置相关参数,以及job所用的java类所在jar包提交到yarn上运行
boolean result = job.waitForCompletion(true);
System.exit((result) ? 0 : 1);
}
}
log4j.properties
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%npom.xml
(四)性能优化 在map端 combineorg.apache.logging.log4j log4j-core 2.8.2 org.apache.hadoop hadoop-common 2.7.2 org.apache.hadoop hadoop-client 2.7.2 org.apache.hadoop hadoop-hdfs 2.7.2
略
done 未发布