目录
hadoop之steaming介绍
使用原生java语言实现Map-reduce程序
借助hadoop streaming使用python语言实现Map-reduce程序
hadoop之实现集合join的需求
实现join的注意点和易踩坑总结
hadoop实现join操作的思路
hadoop使用python实现join的map和reduce代码
更多需要注意的地方
首页 数据库 mysql教程 Hadoop之使用python实现数据集合间join操作

Hadoop之使用python实现数据集合间join操作

Jun 07, 2016 pm 04:30 PM
hadoop join python 使用 实现 数据 集合

hadoop之steaming介绍 hadoop有个工具叫做steaming,能够支持python、shell、C++、PHP等其他任何支持标准输入stdin及标准输出stdout的语言,其运行原理可以通过和标准java的map-reduce程序对比来说明: 使用原生java语言实现Map-reduce程序 hadoop准备好数据

hadoop之steaming介绍

hadoop有个工具叫做steaming,能够支持python、shell、C++、PHP等其他任何支持标准输入stdin及标准输出stdout的语言,其运行原理可以通过和标准java的map-reduce程序对比来说明:

使用原生java语言实现Map-reduce程序
  1. hadoop准备好数据后,将数据传送给java的map程序
  2. java的map程序将数据处理后,输出O1
  3. hadoop将O1打散、排序,然后传给不同的reduce机器
  4. 每个reduce机器将传来的数据传给reduce程序
  5. reduce程序将数据处理,输出最终数据O2
借助hadoop streaming使用python语言实现Map-reduce程序
  1. hadoop准备好数据后,将数据传送给java的map程序
  2. java的map程序将数据处理成“键/值”对,并传送给python的map程序
  3. python的map程序将数据处理后,将结果传回给java的map程序
  4. java的map程序将数据输出为O1
  5. hadoop将O1打散、排序,然后传给不同的reduce机器
  6. 每个reduce机器将传来的数据处理成“键/值”对,并传送给python的reduce程序
  7. python的reduce程序将数据处理后,将结果返回给java的reduce程序
  8. java的reduce程序将数据处理,输出最终数据O2

上面红色表示map的对比,蓝色表示reduce的对比,可以看出streaming程序多了一步中间处理,这样说来steaming程序的效率和性能应该低于java版的程序,然而python的开发效率、运行性能有时候会大于java,这就是streaming的优势所在。

hadoop之实现集合join的需求

hadoop是用来做数据分析的,大都是对集合进行操作,因此该过程中将集合join起来使得一个集合能得到另一个集合对应的信息的需求非常常见。

比如以下这个需求,有两份数据:学生信息(学号,姓名)和学生成绩(学号、课程、成绩),特点是有个共同的主键“学号”,现在需要将两者结合起来得到数据(学号,姓名,课程,成绩),计算公式:

学号,姓名) join (学号,课程,成绩)= (学号,姓名,课程,成绩)

数据事例1-学生信息:

学号sno 姓名name
01 name1
02 name2
03 name3
04 name4

数据事例2:-学生成绩:

学号sno 课程号courseno 成绩grade
01 01 80
01 02 90
02 01 82
02 02 95

期待的最终输出:

学号sno 姓名name 课程courseno 成绩grade
01 name1 01 80
01 name1 02 90
02 name2 01 82
02 name2 02 95

实现join的注意点和易踩坑总结

如果你想写一个完善健壮的map reduce程序,我建议你首先弄清楚输入数据的格式、输出数据的格式,然后自己手动构建输入数据并手动计算出输出数据,这个过程中你会发现一些写程序中需要特别处理的地方:

  1. 实现join的key是哪个,是1个字段还是2个字段,本例中key是sno,1个字段
  2. 每个集合中key是否可以重复,本例中数据1不可重复,数据2的key可以重复
  3. 每个集合中key的对应值是否可以不存在,本例中有学生会没成绩,所以数据2的key可以为空

第1条会影响到hadoop启动脚本中key.fields和partition的配置,第2条会影响到map-reduce程序中具体的代码实现方式,第3条同样影响代码编写方式。

hadoop实现join操作的思路

具体思路是给每个数据源加上一个数字标记label,这样hadoop对其排序后同一个字段的数据排在一起并且按照label排好序了,于是直接将相邻相同key的数据合并在一起输出就得到了结果。

1、 map阶段:给表1和表2加标记,其实就是多输出一个字段,比如表一加标记为0,表2加标记为2;

2、 partion阶段:根据学号key为第一主键,标记label为第二主键进行排序和分区

3、 reduce阶段:由于已经按照第一主键、第二主键排好了序,将相邻相同key数据合并输出

hadoop使用python实现join的map和reduce代码

mapper.py的代码:

# -*- coding: utf-8 -*-
#Mapper.py
#来自疯狂的蚂蚁www.crazyant.net
import os
import sys
#mapper脚本
def mapper():
	#获取当前正在处理的文件的名字,这里我们有两个输入文件
	#所以要加以区分
	filepath = os.environ["map_input_file"]
	filename = os.path.split(filepath)[-1]
	for line in sys.stdin:
		if line.strip()=="":
			continue
		fields = line[:-1].split("\t")
		sno = fields[0]
		#以下判断filename的目的是不同的文件有不同的字段,并且需加上不同的标记
		if filename == 'data_info':
			name = fields[1]
			#下面的数字'0'就是为数据源1加上的统一标记
			print '\t'.join((sno,'0',name))
		elif filename == 'data_grade':
			courseno = fields[1]
			grade = fields[2]
			#下面的数字'1'就是为数据源1加上的统一标记
			print '\t'.join((sno,'1',courseno,grade))
if __name__=='__main__':
	mapper()
登录后复制

reducer的代码:

# -*- coding: utf-8 -*-
#reducer.py
#来自疯狂的蚂蚁www.crazyant.net
import sys
def reducer():
	#为了记录和上一个记录的区别,用lastsno记录上个sno
	lastsno = ""
	for line in sys.stdin:
		if line.strip()=="":
			continue
		fields = line[:-1].split("\t")
		sno = fields[0]
		'''
		处理思路:
		遇见当前key与上一条key不同并且label=0,就记录下来name值,
		当前key与上一条key相同并且label==1,则将本条数据的courseno、
		grade联通上一条记录的name一起输出成最终结果
		'''
		if sno != lastsno:
			name=""
			#这里没有判断label==1的情况,
			#因为sno!=lastno,并且label=1表示该条key没有数据源1的数据
			if fields[1]=="0":
				name=fields[2]
		elif sno==lastno:
			#这里没有判断label==0的情况,
			#因为sno==lastno并且label==0表示该条key没有数据源2的数据
			if fields[2]=="1":
				courseno=fields[2]
				grade=fields[3]
				if name:
					print '\t'.join((lastsno,name,courseno,grade))
		lastsno = sno
if __name__=='__main__':
	reducer()
登录后复制

使用shell脚本启动hadoop程序的方法:

#先删除输出目录
~/hadoop-client/hadoop/bin/hadoop fs -rmr /hdfs/jointest/output
#来自疯狂的蚂蚁www.crazyant.net
#注意,下面配置中的环境值每个人机器不一样
~/hadoop-client/hadoop/bin/hadoop streaming \
	-D mapred.map.tasks=10 \
	-D mapred.reduce.tasks=5 \
	-D mapred.job.map.capacity=10 \
	-D mapred.job.reduce.capacity=5 \
	-D mapred.job.name="join--sno_name-sno_courseno_grade" \
	-D num.key.fields.for.partition=1 \
	-D stream.num.map.output.key.fields=2 \
	-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
	-input "/hdfs/jointest/input/*" \
	-output "/hdfs/jointest/output" \
	-mapper "python26/bin/python26.sh mapper.py" \
	-reducer "python26/bin/python26.sh reducer.py" \
	-file "mapper.py" \
	-file "reducer.py" \
	-cacheArchive "/share/python26.tar.gz#python26"
#看看运行成功没,若输出0则表示成功了
echo $?
登录后复制

可以自己手工构造输入输出数据进行测试,本程序是验证过的。

更多需要注意的地方

hadoop的join操作可以分为很多类型,各种类型脚本的编写有所不同,其分类是按照key字段数目、value字段数目、key是否可重复来划分的,以下是一个个人总结的对照表,表示会影响的地方:

影响类型 影响的范围
key字段数目 1、启动脚本中num.key.fields.for.partition的配置2、启动脚本中stream.num.map.output.key.fields的配置

3、map和reduce脚本中key的获取

4、map和reduce脚本中每一条数据和上一条数据比较的方法key是否可重复如果数据源1可重复,标记为M;数据源2可重复标记为N,那么join可以分为:1*1、M*1、M*N类型

1*1类型:reduce中先记录第一个value,然后在下一条直接合并输出;

M*1类型:将类型1作为标记小的输出,然后每次遇见label=1就记录value,每遇见一次label=2就输出一次最终结果;

M*N类型:遇见类型1,就用数组记录value值,遇见label=2就将将记录的数组值全部连同该行value输出。value字段数目影响每次label=1时记录的数据个数,需要将value都记录下来

原文链接 转载须注明!

本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
3 周前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳图形设置
3 周前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您听不到任何人,如何修复音频
3 周前 By 尊渡假赌尊渡假赌尊渡假赌

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

C语言 sum 的作用是什么? C语言 sum 的作用是什么? Apr 03, 2025 pm 02:21 PM

C语言中没有内置求和函数,需自行编写。可通过遍历数组并累加元素实现求和:循环版本:使用for循环和数组长度计算求和。指针版本:使用指针指向数组元素,通过自增指针遍历高效求和。动态分配数组版本:动态分配数组并自行管理内存,确保释放已分配内存以防止内存泄漏。

谁得到更多的Python或JavaScript? 谁得到更多的Python或JavaScript? Apr 04, 2025 am 12:09 AM

Python和JavaScript开发者的薪资没有绝对的高低,具体取决于技能和行业需求。1.Python在数据科学和机器学习领域可能薪资更高。2.JavaScript在前端和全栈开发中需求大,薪资也可观。3.影响因素包括经验、地理位置、公司规模和特定技能。

distinctIdistinguish有关系吗 distinctIdistinguish有关系吗 Apr 03, 2025 pm 10:30 PM

distinct 和 distinguish 虽都与区分有关,但用法不同:distinct(形容词)描述事物本身的独特性,用于强调事物之间的差异;distinguish(动词)表示区分行为或能力,用于描述辨别过程。在编程中,distinct 常用于表示集合中元素的唯一性,如去重操作;distinguish 则体现在算法或函数的设计中,如区分奇数和偶数。优化时,distinct 操作应选择合适的算法和数据结构,而 distinguish 操作应优化区分逻辑效率,并注意编写清晰可读的代码。

H5页面制作是否需要持续维护 H5页面制作是否需要持续维护 Apr 05, 2025 pm 11:27 PM

H5页面需要持续维护,这是因为代码漏洞、浏览器兼容性、性能优化、安全更新和用户体验提升等因素。有效维护的方法包括建立完善的测试体系、使用版本控制工具、定期监控页面性能、收集用户反馈和制定维护计划。

如何理解 C 语言中的 !x? 如何理解 C 语言中的 !x? Apr 03, 2025 pm 02:33 PM

!x 的理解!x 是 C 语言中的逻辑非运算符,对 x 的值进行布尔取反,即真变假,假变真。但要注意,C 语言中真假由数值而非布尔类型表示,非零视为真,只有 0 才视为假。因此,!x 对负数的处理与正数相同,都视为真。

C语言中 sum 是什么意思? C语言中 sum 是什么意思? Apr 03, 2025 pm 02:36 PM

C语言中没有内置的sum函数用于求和,但可以通过以下方法实现:使用循环逐个累加元素;使用指针逐个访问并累加元素;对于大数据量,考虑并行计算。

如何获取58同城工作页面上的实时申请和浏览人数数据? 如何获取58同城工作页面上的实时申请和浏览人数数据? Apr 05, 2025 am 08:06 AM

如何在爬虫时获取58同城工作页面的动态数据?在使用爬虫工具爬取58同城的某个工作页面时,可能会遇到这样�...

爱心代码复制粘贴 爱心代码免费复制粘贴手机 爱心代码复制粘贴 爱心代码免费复制粘贴手机 Apr 04, 2025 am 06:48 AM

复制粘贴代码并非不可行,但需谨慎对待。代码中环境、库、版本等依赖项可能与当前项目不匹配,导致错误或不可预料的结果。务必确保上下文一致,包括文件路径、依赖库和 Python 版本。此外,复制粘贴特定库的代码时,可能需要安装该库及其依赖项。常见的错误包括路径错误、版本冲突和代码风格不一致。性能优化需根据代码原用途和约束重新设计或重构。理解并调试复制的代码至关重要,切勿盲目复制粘贴。

See all articles