前言 Hbase对Mapreduce API进行了扩展,方便Mapreduce任务读写HTable数据。 HBase作为源的MapReduce读取示例 package hbase;import java.io.IOException;import java.sql.Connection;import java.sql.DriverManager;import java.sql.SQLException;import jav
Hbase对Mapreduce API进行了扩展,方便Mapreduce任务读写HTable数据。
<code>package hbase; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; public class ExampleHbaseToMysqlMapreduce { public static void main(String[] args) throws Exception { //hbase配置 Configuration config = HBaseConfiguration.create(); String tableName = "flws"; Scan scan = new Scan(); scan.setStartRow(Bytes.toBytes("5768014")); scan.setStopRow(Bytes.toBytes("5768888")); scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("AH")); scan.setCaching(500); scan.setCacheBlocks(false); //JOB定义 Job job = new Job(config, "ExampleHbaseMapreduce"); job.setJarByClass(ExampleHbaseToMysqlMapreduce.class); //设置map读取hbase方法 TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class, Text.class,Text.class, job); //reduce设置 job.setReducerClass(MyReducer.class); job.setOutputFormatClass(NullOutputFormat.class); job.setNumReduceTasks(5); boolean b = job.waitForCompletion(true); if (!b) { throw new Exception("error with job!"); } } public static class MyMapper extends TableMapper<text text> { public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { context.write( new Text(row.get()), new Text(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("AH")))); } } public static class MyReducer extends TableReducer<text text immutablebyteswritable> { private Connection conn = null; @Override protected void cleanup(Context context) throws IOException, InterruptedException { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } @Override protected void setup(Context context) throws IOException, InterruptedException { String driver = "com.mysql.jdbc.Driver"; String url = "jdbc:mysql://172.16.35.242/judgment?useUnicode=true&characterEncoding=gbk&zeroDateTimeBehavior=convertToNull"; try { Class.forName(driver); } catch (ClassNotFoundException e) { e.printStackTrace(); } try { conn = DriverManager.getConnection(url, "root", "root"); } catch (SQLException e) { e.printStackTrace(); } super.setup(context); } public void reduce(Text key, Iterable<text> values, Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); for (Text text : values) { sb.append(text.toString()); } try { Statement st = conn.createStatement(); st.executeUpdate("insert into test_mapreduce (id,ah) values (" + Integer.valueOf(key.toString()) + ",'" + sb.toString() + "')"); } catch (SQLException e) { e.printStackTrace(); } } } } </text></text></text></code>
原文地址:Mapreduce读取hbase汇总到RDBMS, 感谢原作者分享。