栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 软件开发 > 后端开发 > Java

hadoop

Java 更新时间:发布时间: 百科书网 趣学号
hadoop_mapreduce06-ReduceJoin案例 (一)需求

订单数据表order.txt (分隔符是一个tab 这里未做修改)
id pid amount
1001 01 1
1002 02 2
1003 03 3
1001 01 1
1002 02 2
1003 03 3

商品信息表pd.txt
pid pname
01 小米
02 华为
03 格力

最终数据:将商品信息表中的pname根据pid合并到订单数据表中
id pname amount
1001 小米 1
1001 小米 1
1002 华为 2
1002 华为 2
1003 格力 3
1003 格力 3

(二)需求分析

(三)代码实现 OrderPdBean.java
package com.art.mapreduce.reducejoin;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


public class OrderPdBean implements Writable {
    // 1. 属性
    private String orderId;
    private String pid;
    private int amount;
    private String pname;
    private String tname;

    // 2. 反序列化的时候一定要空参构造函数 cmd+N  语法糖super();调用父类的构造函数
    public OrderPdBean() {
        super();
    }

    // 有参构造函数
    public OrderPdBean(String orderId, String pid, int amount, String pname, String tname) {
        super();
        this.orderId = orderId;
        this.pid = pid;
        this.amount = amount;
        this.pname = pname;
        this.tname = tname;
    }

    // 3. 写序列化方法 接口一定要重写的方法
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeUTF(pid);
        out.writeInt(amount);
        out.writeUTF(pname);
        out.writeUTF(tname);
    }

    // 4. 重写读序列化方法 顺序一定要和写序列化方法一致(队列)
    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.pid = in.readUTF();
        this.amount = in.readInt();
        this.pname = in.readUTF();
        this.tname = in.readUTF();
    }

    // 5.get+set方法  cmd+n
    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public String getPid() {
        return pid;
    }

    public void setPid(String pid) {
        this.pid = pid;
    }

    public int getAmount() {
        return amount;
    }

    public void setAmount(int amount) {
        this.amount = amount;
    }

    public String getPname() {
        return pname;
    }

    public void setPname(String pname) {
        this.pname = pname;
    }

    public String getTname() {
        return tname;
    }

    public void setTname(String tname) {
        this.tname = tname;
    }

    // 6. toString cmd+n
    @Override
    public String toString() {
        return "OrderPdBean{" +
                "orderId='" + orderId + ''' +
                ", pid='" + pid + ''' +
                ", amount=" + amount +
                ", pname='" + pname + ''' +
                ", tname='" + tname + ''' +
                '}';
    }
}

ReduceJoinMapper.java
package com.art.mapreduce.reducejoin;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

import static java.lang.System.exit;


public class ReduceJoinMapper extends Mapper {
    // 2.0 属性 主要为了写出去的k和v
    String tname;
    OrderPdBean OPBean = new OrderPdBean();
    Text k = new Text();

    // 2.1 通过setup方法获取文件名
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {  // 要debug一下每个context里存了写什么 地址
//        super.setup(context);
        FileSplit fileSplit = (FileSplit) context.getInputSplit();// 注意:1.用的是getInputSplit 2.类型从InputSplit改成了FileSplit
        tname = fileSplit.getPath().getName();
    }

    // 2.2 通过map方法标记每行数据
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 2.2.1 获取一行数据
        String line = value.toString();
        String[] fields;
        if (tname.contains("order")) {    // 这里是写死的 可考虑优化
            fields = line.split("t");
            k.set(fields[1]);
            OPBean.setOrderId(fields[0]);
            OPBean.setPid(fields[1]);
            OPBean.setAmount(Integer.parseInt(fields[2]));
            OPBean.setPname("");
            OPBean.setTname("order");
        } else if (tname.contains("pd")) {
            fields = line.split("t");
            k.set(fields[0]);
            OPBean.setOrderId("");
            OPBean.setPid(fields[0]);
            OPBean.setAmount(0);
            OPBean.setPname(fields[1]);
            OPBean.setTname("pd");
        } else {
            System.out.println("既不是order文件也不是pd文件");
            exit(1);
        }

        context.write(k, OPBean);

    }
}

ReduceJoinReducer.java
package com.art.mapreduce.reducejoin;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;

import static java.lang.System.exit;


public class ReduceJoinReducer extends Reducer {
    // 3.1 属性
    // 3.1.1 存放所有的order对象list
    ArrayList orderBeans = new ArrayList();
    // 3.1.2 存放pd对象(观察数据:相同的key传入一个reduce,此时pd只有一条数据,维表)
    OrderPdBean pdBean = new OrderPdBean();

    // 3.1.3 由于for里面的orderBean是引用而非对象,不能起到真正的copy 所以要new一个tempBean来存储写入
    OrderPdBean tempBean = new OrderPdBean();

    // 3.2 重写reduce方法
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
//        super.reduce(key, values, context);

        for (OrderPdBean orderPdBean : values) {
            // 3.2.1 订单表的放进数组内存 (优化思考:数据倾斜或内存满,考虑map端做combine)
            if ("order".equals(orderPdBean.getTname())) {
                try {
                    BeanUtils.copyProperties(tempBean, orderPdBean);
                    orderBeans.add(tempBean);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
                // 3.2.2 产品表的放进产品bean
            } else if ("pd".equals(orderPdBean.getTname())) {
                try {
                    BeanUtils.copyProperties(pdBean, orderPdBean);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            } else {
                System.out.println("非打标order或者pd的数据");
                exit(1);
            }
        }

        // 3.2.3  遍历orderBeans列表 将pname替换为pdBean的pname
        for (OrderPdBean orderBean : orderBeans) {
            try {
                BeanUtils.copyProperty(orderBean, "pname", pdBean.getPname());
                // 3.2.4 数据写出去
                context.write(orderBean, NullWritable.get());
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            } catch (InvocationTargetException e) {
                e.printStackTrace();
            }
        }


    }
}

ReduceJoinDriver.java
package com.art.mapreduce.reducejoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class ReduceJoinDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        args = new String[]{"/Users/art/Documents/#Java_original_script/hadoop_mr_0000/mr_reducejoin_0000/src/main/java/com/art/mapreduce/reducejoin/datas/inputs", "/Users/21/Documents/#Java_original_script/hadoop_mr_0000/mr_reducejoin_0000/src/main/java/com/art/mapreduce/reducejoin/datas/outputs"};

        // 1. 创建一个job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2. 设置map类 map写出kv类
        job.setMapperClass(ReduceJoinMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(OrderPdBean.class);

        // 3. 设置reduce类 reduce写出kv类
        job.setReducerClass(ReduceJoinReducer.class);
        job.setOutputKeyClass(OrderPdBean.class);
        job.setOutputValueClass(NullWritable.class);

        // 4. 指定本程序jar包所在的路径
        job.setJarByClass(ReduceJoinDriver.class);

        // 5. 设置读写文件路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 6. 将job中的配置相关参数,以及job所用的java类所在jar包提交到yarn上运行
        boolean result = job.waitForCompletion(true);
        System.exit((result) ? 0 : 1);

    }
}


log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
pom.xml
 
        
            org.apache.logging.log4j
            log4j-core
            2.8.2
        
        
            org.apache.hadoop
            hadoop-common
            2.7.2
        
        
            org.apache.hadoop
            hadoop-client
            2.7.2
        
        
            org.apache.hadoop
            hadoop-hdfs
            2.7.2
        
    
(四)性能优化 在map端 combine


done 未发布

转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/956437.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号