博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
map侧连接
阅读量:6879 次
发布时间:2019-06-26

本文共 3631 字,大约阅读时间需要 12 分钟。

两个数据集中一个非常小,可以让小数据集存入缓存。在作业开始这些文件会被复制到运行task的节点上。 一开始,它的setup方法会检索缓存文件。

与reduce侧连接不同,Map侧连接需要等待参与连接的数据集满足如下条件:

1.除了连接键外,所有的输入都必须按照连接键排序。 输入的各种数据集必须有相同的分区数。 所有具有相同键的记录需要放在同一分区中。 当Map任务对其他Mapreduce作业的结果进行处理时(Cleanup时),Map侧的连接条件都自动满足 CompositeInputFormat类用于执行Map侧的连接,而输入和连接类型的配置可以通过属性指定。

2.如果其中一个数据集足够小,旁路的分布式通道可以用在Map侧的连接中。

实例:

输入:   

   123(工厂)                       a(地址表):

Beijing Red Star,1                      1,Beijing

Shenzhen Thunder,3                       2,Guangzhou

Guangzhou Honda,2                     3,Shenzhen

Beijing Rising,1                        4,xian   

Guangzhou Development Bank,2

Tencent,3

Back of Beijing,1

 

思路:在map端中的cache载入地址表,在map阶段的setup()中,定义HashMap(),将字符串分割,放入HashMap中,然后在map阶段,利用hashmap。get(),得到对应的地址。

代码:

package mapreduce01;

 

import java.io.IOException;

import java.net.URI;

import java.util.HashMap;

import java.util.Map;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.LineReader;

 

 

public class Mapduan {

 

static String INPUT_PATH = "hdfs://master:9000/qq/123";

static String OUTPUT_PATH="hdfs://master:9000/output";

static class MyMapper extends Mapper<Object,Object,Text,Text>{

Text output_key = new Text();

Text output_value = new Text();

Map<String,String> addMap = new HashMap<String,String>();   //image  yingshe

protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException{

URI uri=context.getCacheFiles()[0];

Path path = new Path(uri);

FileSystem fs = path.getFileSystem(context.getConfiguration());

LineReader lineReader = new LineReader(fs.open(path));

Text line=new Text();

while(lineReader.readLine(line)>0){

String tokens[] = line.toString().split(",");

if(tokens!=null && tokens.length==2)

     addMap.put(tokens[0], tokens[1]);                

}

}

protected void map(Object key,Object value,Context context) throws IOException,InterruptedException{

String[] tokens = value.toString().split(",");

if(tokens!=null&&tokens.length==2){

output_key.set(tokens[0]);

String addrName = addMap.get(tokens[1].toString());

output_value.set(addrName);

context.write(output_key,output_value);

}

}

}

static class MyReduce extends Reducer<Text,Text,Text,Text> {

Text  output_key=new Text();

Text  output_value=new Text();

protected void reduce(Text key, Iterable<Text> values,Context context)  throws IOException,InterruptedException{

context.write(key,values.iterator().next());

}

}

public static void main(String[] args) throws Exception{

Path outputpath = new Path(OUTPUT_PATH);

Path cacheFile = new Path("hdfs://master:9000/qq/a");

Configuration conf = new Configuration();

FileSystem fs = outputpath.getFileSystem(conf);

if(fs.exists(outputpath)){

fs.delete(outputpath,true);

}

 Job  job=Job.getInstance(conf);

 FileInputFormat.setInputPaths(job,INPUT_PATH);

 FileOutputFormat.setOutputPath(job, outputpath);

 URI uri =cacheFile.toUri();

 job.setCacheFiles(new URI[]{uri});  //set cache address

 job.setMapperClass(MyMapper.class);

 job.setReducerClass(MyReduce.class);   

 job.setOutputKeyClass(Text.class);

 job.setOutputValueClass(Text.class);

 job.waitForCompletion(true);

}

}

 

实验结果:

Back of Beijing Beijing

Beijing Red Star Beijing

Beijing Rising Beijing

Guangzhou Development Bank Guangzhou

Guangzhou Honda Guangzhou

Shenzhen Thunder Shenzhen

Tencent Shenzhen

转载于:https://www.cnblogs.com/luminous1/p/8386916.html

你可能感兴趣的文章
PHP语言拓展json模块
查看>>
spring 配置文件applicationContext.xml命名空间及标签解析
查看>>
我的友情链接
查看>>
回到顶部代码(兼容IE6)
查看>>
web.xml文件的作用
查看>>
iOS开发篇——OC延展类目协议介绍
查看>>
桌面客户端
查看>>
exchange online 用户许可证迁移常见问题
查看>>
ELK调优
查看>>
mysql性能优化2
查看>>
【Java】Java 实现导出excel表 POI
查看>>
如何对待用户需求的几点思考
查看>>
POJ 3686 The Windy's 最小费用最大流
查看>>
RH124-13 软件包安装与升级
查看>>
我的友情链接
查看>>
1.python入门到精通
查看>>
通过vue-cli来学习修改Webpack多环境配置和发布问题
查看>>
Exchange Server 2013 高可用部署系列(四)邮箱服务器高可用——数据库可用性组(DAG)...
查看>>
和尚挑水的故事给我们带来的思想
查看>>
Zookeeper工作原理
查看>>