Mapreduce读取hbase汇总到RDBMS
前言 Hbase对Mapreduce API进行了扩展,方便Mapreduce任务读写HTable数据。 HBase作为源的MapReduce读取示例 package hbase;import java.io.IOException;import java.sql.Connection;import java.sql.DriverManager;import java.sql.SQLException;import jav
前言
Hbase对Mapreduce API进行了扩展,方便Mapreduce任务读写HTable数据。
HBase作为源的MapReduce读取示例
<code>package hbase; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; public class ExampleHbaseToMysqlMapreduce { public static void main(String[] args) throws Exception { //hbase配置 Configuration config = HBaseConfiguration.create(); String tableName = "flws"; Scan scan = new Scan(); scan.setStartRow(Bytes.toBytes("5768014")); scan.setStopRow(Bytes.toBytes("5768888")); scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("AH")); scan.setCaching(500); scan.setCacheBlocks(false); //JOB定义 Job job = new Job(config, "ExampleHbaseMapreduce"); job.setJarByClass(ExampleHbaseToMysqlMapreduce.class); //设置map读取hbase方法 TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class, Text.class,Text.class, job); //reduce设置 job.setReducerClass(MyReducer.class); job.setOutputFormatClass(NullOutputFormat.class); job.setNumReduceTasks(5); boolean b = job.waitForCompletion(true); if (!b) { throw new Exception("error with job!"); } } public static class MyMapper extends TableMapper<text text> { public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { context.write( new Text(row.get()), new Text(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("AH")))); } } public static class MyReducer extends TableReducer<text text immutablebyteswritable> { private Connection conn = null; @Override protected void cleanup(Context context) throws IOException, InterruptedException { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } @Override protected void setup(Context context) throws IOException, InterruptedException { String driver = "com.mysql.jdbc.Driver"; String url = "jdbc:mysql://172.16.35.242/judgment?useUnicode=true&characterEncoding=gbk&zeroDateTimeBehavior=convertToNull"; try { Class.forName(driver); } catch (ClassNotFoundException e) { e.printStackTrace(); } try { conn = DriverManager.getConnection(url, "root", "root"); } catch (SQLException e) { e.printStackTrace(); } super.setup(context); } public void reduce(Text key, Iterable<text> values, Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); for (Text text : values) { sb.append(text.toString()); } try { Statement st = conn.createStatement(); st.executeUpdate("insert into test_mapreduce (id,ah) values (" + Integer.valueOf(key.toString()) + ",'" + sb.toString() + "')"); } catch (SQLException e) { e.printStackTrace(); } } } } </text></text></text></code>
原文地址:Mapreduce读取hbase汇总到RDBMS, 感谢原作者分享。

热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

AI Hentai Generator
免费生成ai无尽的。

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

SublimeText3汉化版
中文版,非常好用

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

热门话题

如何使用pandas正确读取txt文件,需要具体代码示例Pandas是一个广泛使用的Python数据分析库,它可以用于处理各种各样的数据类型,包括CSV文件、Excel文件、SQL数据库等。同时,它也可以用于读取文本文件,例如txt文件。但是,在读取txt文件时,我们有时会遇到一些问题,例如编码问题、分隔符问题等。本文将介绍如何使用pandas正确读取txt

使用pandas读取txt文件的实用技巧,需要具体代码示例在数据分析和数据处理中,txt文件是一种常见的数据格式。使用pandas读取txt文件可以快速、方便地进行数据处理。本文将介绍几种实用的技巧,以帮助你更好的使用pandas读取txt文件,并配以具体的代码示例。读取带有分隔符的txt文件使用pandas读取带有分隔符的txt文件时,可以使用read_c

Java中使用OpenCSV读取和写入CSV文件的示例CSV(Comma-SeparatedValues)指的是以逗号分隔的数值,是一种常见的数据存储格式。在Java中,OpenCSV是一个常用的工具库,用于读取和写入CSV文件。本文将介绍如何使用OpenCSV来实现读取和写入CSV文件的示例。引入OpenCSV库首先,需要引入OpenCSV库到

Pandas读取网页数据的实用方法,需要具体代码示例在数据分析和处理过程中,我们经常需要从网页中获取数据。而Pandas作为一种强大的数据处理工具,提供了方便的方法来读取和处理网页数据。本文将介绍几种常用的Pandas读取网页数据的实用方法,并附上具体的代码示例。方法一:使用read_html()函数Pandas的read_html()函数可以直接从网页中读

PHP读取Excel文件方法及常见问题解答Excel是一种非常普遍的电子表格文件格式,很多业务和数据都存放在Excel文件中。在开发过程中,如果需要将Excel文件中的数据导入系统中,就需要使用PHP读取Excel文件。本文将介绍PHP读取Excel文件的方法及常见问题解答。一、PHP读取Excel文件方法1.使用PHPExcel类库PHPExcel是一个P

快速入门:Pandas读取JSON文件的方法,需要具体代码示例引言:在数据分析和数据科学领域,Pandas是一个重要的Python库之一。它提供了丰富的功能和灵活的数据结构,能够方便地对各种数据进行处理和分析。在实际应用中,我们经常会遇到需要读取JSON文件的情况。本文将介绍如何使用Pandas来读取JSON文件,并附上具体的代码示例。一、Pandas的安装

Golang如何读取二进制文件?二进制文件是以二进制形式存储的文件,其中包含了计算机能够识别和处理的数据。在Golang中,我们可以使用一些方法来读取二进制文件,并将其解析成我们想要的数据格式。下面将介绍如何在Golang中读取二进制文件,并给出具体的代码示例。首先,我们需要使用os包中的Open函数打开一个二进制文件,这将返回一个文件对象。然后,我们可以使

PHP文件处理入门:读取与写入的步骤指引在Web开发中,文件处理是一项常见的任务,无论是读取用户上传的文件,还是将结果写入文件供后续使用,理解如何在PHP中进行文件处理都是至关重要的。本文将提供一个简单的指引,介绍PHP中文件的读取和写入的基本步骤,并附上代码示例供参考。文件读取在PHP中,可以使用fopen()函数打开一个文件,返回一个文件资源(file
