首页 数据库 mysql教程 Hadoop 中利用 地图reduce 读写 mysql 数据

Hadoop 中利用 地图reduce 读写 mysql 数据

Jun 07, 2016 pm 04:27 PM
hadoop mysql reduce 使用 地图 数字 读写

Hadoop 中利用 mapreduce 读写 mysql 数据 问题导读 1.hadoop mapreduce的通过哪两个类可以读取数据源? 2.如果没有mysql驱动包,一般会是什么问题? 3.如何添加包? 有时候我们在项目中会遇到输入结果集很大,但是输出结果很小,比如一些 pv、uv 数据,然后

Hadoop 中利用 mapreduce 读写 mysql 数据

问题导读
1.hadoop mapreduce的通过哪两个类可以读取数据源?
2.如果没有mysql驱动包,一般会是什么问题?
3.如何添加包?




有时候我们在项目中会遇到输入结果集很大,但是输出结果很小,比如一些 pv、uv 数据,然后为了实时查询的需求,或者一些 OLAP 的需求,我们需要 mapreduce 与 mysql 进行数据的交互,而这些特性正是 hbase 或者 hive 目前亟待改进的地方。

好了言归正传,简单的说说背景、原理以及需要注意的地方:

1、为了方便 MapReduce 直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过DBInputFormat类把数据库表数据读入到HDFS,根据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。

2、由于0.20版本对DBInputFormat和DBOutputFormat支持不是很好,该例用了0.19版本来说明这两个类的用法。

至少在我的 0.20.203 中的 org.apache.hadoop.mapreduce.lib 下是没见到 db 包,所以本文也是以老版的 API 来为例说明的。

3、运行MapReduce时候报错:java.io.IOException: com.mysql.jdbc.Driver,一般是由于程序找不到mysql驱动包。解决方法是让每个tasktracker运行MapReduce程序时都可以找到该驱动包。

添加包有两种方式:

(1)在每个节点下的${HADOOP_HOME}/lib下添加该包。重启集群,一般是比较原始的方法。

(2)a)把包传到集群上: hadoop fs -put mysql-connector-java-5.1.0- bin.jar /hdfsPath/

? ?? ? b)在mr程序提交job前,添加语句:DistributedCache.addFileToClassPath(new Path(“/hdfsPath/mysql- connector-java- 5.1.0-bin.jar”), conf);

(3)虽然API用的是0.19的,但是使用0.20的API一样可用,只是会提示方法已过时而已。、

4、测试数据:

  1. CREATE TABLE `t` (
  2. `id` int DEFAULT NULL,
  3. `name` varchar(10) DEFAULT NULL
  4. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  5. CREATE TABLE `t2` (
  6. `id` int DEFAULT NULL,
  7. `name` varchar(10) DEFAULT NULL
  8. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  9. insert into t values (1,"june"),(2,"decli"),(3,"hello"),
  10. ? ? ? ? (4,"june"),(5,"decli"),(6,"hello"),(7,"june"),
  11. ? ? ? ? (8,"decli"),(9,"hello"),(10,"june"),
  12. ? ? ? ? (11,"june"),(12,"decli"),(13,"hello");
复制代码


5、代码:

  1. import java.io.DataInput;
  2. import java.io.DataOutput;
  3. import java.io.IOException;
  4. import java.sql.PreparedStatement;
  5. import java.sql.ResultSet;
  6. import java.sql.SQLException;
  7. import java.util.Iterator;
  8. import org.apache.hadoop.filecache.DistributedCache;
  9. import org.apache.hadoop.fs.Path;
  10. import org.apache.hadoop.io.LongWritable;
  11. import org.apache.hadoop.io.Text;
  12. import org.apache.hadoop.io.Writable;
  13. import org.apache.hadoop.mapred.JobClient;
  14. import org.apache.hadoop.mapred.JobConf;
  15. import org.apache.hadoop.mapred.MapReduceBase;
  16. import org.apache.hadoop.mapred.Mapper;
  17. import org.apache.hadoop.mapred.OutputCollector;
  18. import org.apache.hadoop.mapred.Reducer;
  19. import org.apache.hadoop.mapred.Reporter;
  20. import org.apache.hadoop.mapred.lib.IdentityReducer;
  21. import org.apache.hadoop.mapred.lib.db.DBConfiguration;
  22. import org.apache.hadoop.mapred.lib.db.DBInputFormat;
  23. import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
  24. import org.apache.hadoop.mapred.lib.db.DBWritable;
  25. /**
  26. * Function: 测试 mr 与 mysql 的数据交互,此测试用例将一个表中的数据复制到另一张表中
  27. * ? ? ? ? ? ? ? ? ? ? ? ???实际当中,可能只需要从 mysql 读,或者写到 mysql 中。
  28. * date: 2013-7-29 上午2:34:04
  29. * @author june
  30. */
  31. public class Mysql2Mr {
  32. ? ? ? ? // DROP TABLE IF EXISTS `hadoop`.`studentinfo`;
  33. ? ? ? ? // CREATE TABLE studentinfo (
  34. ? ? ? ? // id INTEGER NOT NULL PRIMARY KEY,
  35. ? ? ? ? // name VARCHAR(32) NOT NULL);
  36. ? ? ? ? public static class StudentinfoRecord implements Writable, DBWritable {
  37. ? ? ? ? ? ? ? ? int id;
  38. ? ? ? ? ? ? ? ? String name;
  39. ? ? ? ? ? ? ? ? public StudentinfoRecord() {
  40. ? ? ? ? ? ? ? ? }
  41. ? ? ? ? ? ? ? ? public void readFields(DataInput in) throws IOException {
  42. ? ? ? ? ? ? ? ? ? ? ? ? this.id = in.readInt();
  43. ? ? ? ? ? ? ? ? ? ? ? ? this.name = Text.readString(in);
  44. ? ? ? ? ? ? ? ? }
  45. ? ? ? ? ? ? ? ? public String toString() {
  46. ? ? ? ? ? ? ? ? ? ? ? ? return new String(this.id + " " + this.name);
  47. ? ? ? ? ? ? ? ? }
  48. ? ? ? ? ? ? ? ? @Override
  49. ? ? ? ? ? ? ? ? public void write(PreparedStatement stmt) throws SQLException {
  50. ? ? ? ? ? ? ? ? ? ? ? ? stmt.setInt(1, this.id);
  51. ? ? ? ? ? ? ? ? ? ? ? ? stmt.setString(2, this.name);
  52. ? ? ? ? ? ? ? ? }
  53. ? ? ? ? ? ? ? ? @Override
  54. ? ? ? ? ? ? ? ? public void readFields(ResultSet result) throws SQLException {
  55. ? ? ? ? ? ? ? ? ? ? ? ? this.id = result.getInt(1);
  56. ? ? ? ? ? ? ? ? ? ? ? ? this.name = result.getString(2);
  57. ? ? ? ? ? ? ? ? }
  58. ? ? ? ? ? ? ? ? @Override
  59. ? ? ? ? ? ? ? ? public void write(DataOutput out) throws IOException {
  60. ? ? ? ? ? ? ? ? ? ? ? ? out.writeInt(this.id);
  61. ? ? ? ? ? ? ? ? ? ? ? ? Text.writeString(out, this.name);
  62. ? ? ? ? ? ? ? ? }
  63. ? ? ? ? }
  64. ? ? ? ? // 记住此处是静态内部类,要不然你自己实现无参构造器,或者等着抛异常:
  65. ? ? ? ? // Caused by: java.lang.NoSuchMethodException: DBInputMapper.()
  66. ? ? ? ? // http://stackoverflow.com/questions/7154125/custom-mapreduce-input-format-cant-find-constructor
  67. ? ? ? ? // 网上脑残式的转帖,没见到一个写对的。。。
  68. ? ? ? ? public static class DBInputMapper extends MapReduceBase implements
  69. ? ? ? ? ? ? ? ? ? ? ? ? Mapper {
  70. ? ? ? ? ? ? ? ? public void map(LongWritable key, StudentinfoRecord value,
  71. ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? OutputCollector collector, Reporter reporter) throws IOException {
  72. ? ? ? ? ? ? ? ? ? ? ? ? collector.collect(new LongWritable(value.id), new Text(value.toString()));
  73. ? ? ? ? ? ? ? ? }
  74. ? ? ? ? }
  75. ? ? ? ? public static class MyReducer extends MapReduceBase implements
  76. ? ? ? ? ? ? ? ? ? ? ? ? Reducer {
  77. ? ? ? ? ? ? ? ? @Override
  78. ? ? ? ? ? ? ? ? public void reduce(LongWritable key, Iterator values,
  79. ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? OutputCollector output, Reporter reporter) throws IOException {
  80. ? ? ? ? ? ? ? ? ? ? ? ? String[] splits = values.next().toString().split(" ");
  81. ? ? ? ? ? ? ? ? ? ? ? ? StudentinfoRecord r = new StudentinfoRecord();
  82. ? ? ? ? ? ? ? ? ? ? ? ? r.id = Integer.parseInt(splits[0]);
  83. ? ? ? ? ? ? ? ? ? ? ? ? r.name = splits[1];
  84. ? ? ? ? ? ? ? ? ? ? ? ? output.collect(r, new Text(r.name));
  85. ? ? ? ? ? ? ? ? }
  86. ? ? ? ? }
  87. ? ? ? ? public static void main(String[] args) throws IOException {
  88. ? ? ? ? ? ? ? ? JobConf conf = new JobConf(Mysql2Mr.class);
  89. ? ? ? ? ? ? ? ? DistributedCache.addFileToClassPath(new Path("/tmp/mysql-connector-java-5.0.8-bin.jar"), conf);
  90. ? ? ? ? ? ? ? ? conf.setMapOutputKeyClass(LongWritable.class);
  91. ? ? ? ? ? ? ? ? conf.setMapOutputValueClass(Text.class);
  92. ? ? ? ? ? ? ? ? conf.setOutputKeyClass(LongWritable.class);
  93. ? ? ? ? ? ? ? ? conf.setOutputValueClass(Text.class);
  94. ? ? ? ? ? ? ? ? conf.setOutputFormat(DBOutputFormat.class);
  95. ? ? ? ? ? ? ? ? conf.setInputFormat(DBInputFormat.class);
  96. ? ? ? ? ? ? ? ? // // mysql to hdfs
  97. ? ? ? ? ? ? ? ? // conf.setReducerClass(IdentityReducer.class);
  98. ? ? ? ? ? ? ? ? // Path outPath = new Path("/tmp/1");
  99. ? ? ? ? ? ? ? ? // FileSystem.get(conf).delete(outPath, true);
  100. ? ? ? ? ? ? ? ? // FileOutputFormat.setOutputPath(conf, outPath);
  101. ? ? ? ? ? ? ? ? DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.1.101:3306/test",
  102. ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? "root", "root");
  103. ? ? ? ? ? ? ? ? String[] fields = { "id", "name" };
  104. ? ? ? ? ? ? ? ? // 从 t 表读数据
  105. ? ? ? ? ? ? ? ? DBInputFormat.setInput(conf, StudentinfoRecord.class, "t", null, "id", fields);
  106. ? ? ? ? ? ? ? ? // mapreduce 将数据输出到 t2 表
  107. ? ? ? ? ? ? ? ? DBOutputFormat.setOutput(conf, "t2", "id", "name");
  108. ? ? ? ? ? ? ? ? // conf.setMapperClass(org.apache.hadoop.mapred.lib.IdentityMapper.class);
  109. ? ? ? ? ? ? ? ? conf.setMapperClass(DBInputMapper.class);
  110. ? ? ? ? ? ? ? ? conf.setReducerClass(MyReducer.class);
  111. ? ? ? ? ? ? ? ? JobClient.runJob(conf);
  112. ? ? ? ? }
  113. }
复制代码



6、结果:

执行两次后,你可以看到mysql结果:

  1. mysql> select * from t2;
  2. +------+-------+
  3. | id? ?| name??|
  4. +------+-------+
  5. |? ? 1 | june??|
  6. |? ? 2 | decli |
  7. |? ? 3 | hello |
  8. |? ? 4 | june??|
  9. |? ? 5 | decli |
  10. |? ? 6 | hello |
  11. |? ? 7 | june??|
  12. |? ? 8 | decli |
  13. |? ? 9 | hello |
  14. |? ?10 | june??|
  15. |? ?11 | june??|
  16. |? ?12 | decli |
  17. |? ?13 | hello |
  18. |? ? 1 | june??|
  19. |? ? 2 | decli |
  20. |? ? 3 | hello |
  21. |? ? 4 | june??|
  22. |? ? 5 | decli |
  23. |? ? 6 | hello |
  24. |? ? 7 | june??|
  25. |? ? 8 | decli |
  26. |? ? 9 | hello |
  27. |? ?10 | june??|
  28. |? ?11 | june??|
  29. |? ?12 | decli |
  30. |? ?13 | hello |
  31. +------+-------+
  32. 26 rows in set (0.00 sec)
  33. mysql>
复制代码


7、日志:

  1. 13/07/29 02:33:03 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
  2. 13/07/29 02:33:03 INFO filecache.TrackerDistributedCacheManager: Creating mysql-connector-java-5.0.8-bin.jar in /tmp/hadoop-june/mapred/local/archive/-8943686319031389138_-1232673160_640840668/192.168.1.101/tmp-work--8372797484204470322 with rwxr-xr-x
  3. 13/07/29 02:33:03 INFO filecache.TrackerDistributedCacheManager: Cached hdfs://192.168.1.101:9000/tmp/mysql-connector-java-5.0.8-bin.jar as /tmp/hadoop-june/mapred/local/archive/-8943686319031389138_-1232673160_640840668/192.168.1.101/tmp/mysql-connector-java-5.0.8-bin.jar
  4. 13/07/29 02:33:03 INFO filecache.TrackerDistributedCacheManager: Cached hdfs://192.168.1.101:9000/tmp/mysql-connector-java-5.0.8-bin.jar as /tmp/hadoop-june/mapred/local/archive/-8943686319031389138_-1232673160_640840668/192.168.1.101/tmp/mysql-connector-java-5.0.8-bin.jar
  5. 13/07/29 02:33:03 INFO mapred.JobClient: Running job: job_local_0001
  6. 13/07/29 02:33:03 INFO mapred.MapTask: numReduceTasks: 1
  7. 13/07/29 02:33:03 INFO mapred.MapTask: io.sort.mb = 100
  8. 13/07/29 02:33:03 INFO mapred.MapTask: data buffer = 79691776/99614720
  9. 13/07/29 02:33:03 INFO mapred.MapTask: record buffer = 262144/327680
  10. 13/07/29 02:33:03 INFO mapred.MapTask: Starting flush of map output
  11. 13/07/29 02:33:03 INFO mapred.MapTask: Finished spill 0
  12. 13/07/29 02:33:03 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
  13. 13/07/29 02:33:04 INFO mapred.JobClient:??map 0% reduce 0%
  14. 13/07/29 02:33:06 INFO mapred.LocalJobRunner:?
  15. 13/07/29 02:33:06 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
  16. 13/07/29 02:33:06 INFO mapred.LocalJobRunner:?
  17. 13/07/29 02:33:06 INFO mapred.Merger: Merging 1 sorted segments
  18. 13/07/29 02:33:06 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 235 bytes
  19. 13/07/29 02:33:06 INFO mapred.LocalJobRunner:?
  20. 13/07/29 02:33:06 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
  21. 13/07/29 02:33:07 INFO mapred.JobClient:??map 100% reduce 0%
  22. 13/07/29 02:33:09 INFO mapred.LocalJobRunner: reduce > reduce
  23. 13/07/29 02:33:09 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
  24. 13/07/29 02:33:09 WARN mapred.FileOutputCommitter: Output path is null in cleanup
  25. 13/07/29 02:33:10 INFO mapred.JobClient:??map 100% reduce 100%
  26. 13/07/29 02:33:10 INFO mapred.JobClient: Job complete: job_local_0001
  27. 13/07/29 02:33:10 INFO mapred.JobClient: Counters: 18
  28. 13/07/29 02:33:10 INFO mapred.JobClient:? ?File Input Format Counters?
  29. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Bytes Read=0
  30. 13/07/29 02:33:10 INFO mapred.JobClient:? ?File Output Format Counters?
  31. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Bytes Written=0
  32. 13/07/29 02:33:10 INFO mapred.JobClient:? ?FileSystemCounters
  33. 13/07/29 02:33:10 INFO mapred.JobClient:? ???FILE_BYTES_READ=1211691
  34. 13/07/29 02:33:10 INFO mapred.JobClient:? ???HDFS_BYTES_READ=1081704
  35. 13/07/29 02:33:10 INFO mapred.JobClient:? ???FILE_BYTES_WRITTEN=2392844
  36. 13/07/29 02:33:10 INFO mapred.JobClient:? ?Map-Reduce Framework
  37. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Map output materialized bytes=239
  38. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Map input records=13
  39. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Reduce shuffle bytes=0
  40. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Spilled Records=26
  41. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Map output bytes=207
  42. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Map input bytes=13
  43. 13/07/29 02:33:10 INFO mapred.JobClient:? ???SPLIT_RAW_BYTES=75
  44. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Combine input records=0
  45. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Reduce input records=13
  46. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Reduce input groups=13
  47. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Combine output records=0
  48. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Reduce output records=13
  49. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Map output records=13
复制代码







MapReduce直接连接Mysql获取数据






Mysql中数据:

  1. mysql> select * from lxw_tbls;
  2. +---------------------+----------------+
  3. | TBL_NAME? ?? ?? ?? ?| TBL_TYPE? ?? ? |
  4. +---------------------+----------------+
  5. | lxw_test_table? ?? ?| EXTERNAL_TABLE |
  6. | lxw_t? ?? ?? ?? ?? ?| MANAGED_TABLE??|
  7. | lxw_t1? ?? ?? ?? ???| MANAGED_TABLE??|
  8. | tt? ?? ?? ?? ?? ?? ?| MANAGED_TABLE??|
  9. | tab_partition? ?? ? | MANAGED_TABLE??|
  10. | lxw_hbase_table_1? ?| MANAGED_TABLE??|
  11. | lxw_hbase_user_info | MANAGED_TABLE??|
  12. | t? ?? ?? ?? ?? ?? ? | EXTERNAL_TABLE |
  13. | lxw_jobid? ?? ?? ???| MANAGED_TABLE??|
  14. +---------------------+----------------+
  15. 9 rows in set (0.01 sec)
  16. mysql> select * from lxw_tbls where TBL_NAME like 'lxw%' order by TBL_NAME;
  17. +---------------------+----------------+
  18. | TBL_NAME? ?? ?? ?? ?| TBL_TYPE? ?? ? |
  19. +---------------------+----------------+
  20. | lxw_hbase_table_1? ?| MANAGED_TABLE??|
  21. | lxw_hbase_user_info | MANAGED_TABLE??|
  22. | lxw_jobid? ?? ?? ???| MANAGED_TABLE??|
  23. | lxw_t? ?? ?? ?? ?? ?| MANAGED_TABLE??|
  24. | lxw_t1? ?? ?? ?? ???| MANAGED_TABLE??|
  25. | lxw_test_table? ?? ?| EXTERNAL_TABLE |
  26. +---------------------+----------------+
  27. 6 rows in set (0.00 sec)
复制代码


MapReduce程序代码,ConnMysql.java:

  1. package com.lxw.study;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import java.net.URI;
  6. import java.sql.PreparedStatement;
  7. import java.sql.ResultSet;
  8. import java.sql.SQLException;
  9. import java.util.Iterator;
  10. import org.apache.hadoop.conf.Configuration;
  11. import org.apache.hadoop.filecache.DistributedCache;
  12. import org.apache.hadoop.fs.FileSystem;
  13. import org.apache.hadoop.fs.Path;
  14. import org.apache.hadoop.io.LongWritable;
  15. import org.apache.hadoop.io.Text;
  16. import org.apache.hadoop.io.Writable;
  17. import org.apache.hadoop.mapreduce.Job;
  18. import org.apache.hadoop.mapreduce.Mapper;
  19. import org.apache.hadoop.mapreduce.Reducer;
  20. import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
  21. import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
  22. import org.apache.hadoop.mapreduce.lib.db.DBWritable;
  23. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  24. public class ConnMysql {
  25. ? ?? ???
  26. ? ?? ???private static Configuration conf = new Configuration();
  27. ? ?? ???
  28. ? ?? ???static {
  29. ? ?? ?? ?? ?? ? conf.addResource(new Path("F:/lxw-hadoop/hdfs-site.xml"));
  30. ? ?? ?? ?? ?? ? conf.addResource(new Path("F:/lxw-hadoop/mapred-site.xml"));
  31. ? ?? ?? ?? ?? ? conf.addResource(new Path("F:/lxw-hadoop/core-site.xml"));
  32. ? ?? ?? ?? ?? ? conf.set("mapred.job.tracker", "10.133.103.21:50021");
  33. ? ?? ???}
  34. ? ?? ???
  35. ? ?? ???public static class TblsRecord implements Writable, DBWritable {
  36. ? ?? ?? ?? ?? ? String tbl_name;
  37. ? ?? ?? ?? ?? ? String tbl_type;
  38. ? ?? ?? ?? ?? ? public TblsRecord() {
  39. ? ?? ?? ?? ?? ? }
  40. ? ?? ?? ?? ?? ? @Override
  41. ? ?? ?? ?? ?? ? public void write(PreparedStatement statement) throws SQLException {
  42. ? ?? ?? ?? ?? ?? ?? ?? ?// TODO Auto-generated method stub
  43. ? ?? ?? ?? ?? ?? ?? ?? ?statement.setString(1, this.tbl_name);
  44. ? ?? ?? ?? ?? ?? ?? ?? ?statement.setString(2, this.tbl_type);
  45. ? ?? ?? ?? ?? ? }
  46. ? ?? ?? ?? ?? ? @Override
  47. ? ?? ?? ?? ?? ? public void readFields(ResultSet resultSet) throws SQLException {
  48. ? ?? ?? ?? ?? ?? ?? ?? ?// TODO Auto-generated method stub
  49. ? ?? ?? ?? ?? ?? ?? ?? ?this.tbl_name = resultSet.getString(1);
  50. ? ?? ?? ?? ?? ?? ?? ?? ?this.tbl_type = resultSet.getString(2);
  51. ? ?? ?? ?? ?? ? }
  52. ? ?? ?? ?? ?? ? @Override
  53. ? ?? ?? ?? ?? ? public void write(DataOutput out) throws IOException {
  54. ? ?? ?? ?? ?? ?? ?? ?? ?// TODO Auto-generated method stub
  55. ? ?? ?? ?? ?? ?? ?? ?? ?Text.writeString(out, this.tbl_name);
  56. ? ?? ?? ?? ?? ?? ?? ?? ?Text.writeString(out, this.tbl_type);
  57. ? ?? ?? ?? ?? ? }
  58. ? ?? ?? ?? ?? ? @Override
  59. ? ?? ?? ?? ?? ? public void readFields(DataInput in) throws IOException {
  60. ? ?? ?? ?? ?? ?? ?? ?? ?// TODO Auto-generated method stub
  61. ? ?? ?? ?? ?? ?? ?? ?? ?this.tbl_name = Text.readString(in);
  62. ? ?? ?? ?? ?? ?? ?? ?? ?this.tbl_type = Text.readString(in);
  63. ? ?? ?? ?? ?? ? }
  64. ? ?? ?? ?? ?? ? public String toString() {
  65. ? ?? ?? ?? ?? ?? ?? ?? ?return new String(this.tbl_name + " " + this.tbl_type);
  66. ? ?? ?? ?? ?? ? }
  67. ? ?? ???}
  68. ? ?? ???public static class ConnMysqlMapper extends Mapper {
  69. ? ?? ?? ?? ?? ? public void map(LongWritable key,TblsRecord values,Context context)?
  70. ? ?? ?? ?? ?? ?? ?? ?? ?? ?? ???throws IOException,InterruptedException {
  71. ? ?? ?? ?? ?? ?? ?? ?? ?context.write(new Text(values.tbl_name), new Text(values.tbl_type));
  72. ? ?? ?? ?? ?? ? }
  73. ? ?? ???}
  74. ? ?? ???
  75. ? ?? ???public static class ConnMysqlReducer extends Reducer {
  76. ? ?? ?? ?? ?? ? public void reduce(Text key,Iterable values,Context context)?
  77. ? ?? ?? ?? ?? ?? ?? ?? ?? ?? ???throws IOException,InterruptedException {
  78. ? ?? ?? ?? ?? ?? ?? ?? ?for(Iterator itr = values.iterator();itr.hasNext();) {
  79. ? ?? ?? ?? ?? ?? ?? ?? ?? ?? ???context.write(key, itr.next());
  80. ? ?? ?? ?? ?? ?? ?? ?? ?}
  81. ? ?? ?? ?? ?? ? }
  82. ? ?? ???}
  83. ? ?? ???
  84. ? ?? ???public static void main(String[] args) throws Exception {
  85. ? ?? ?? ?? ?? ? Path output = new Path("/user/lxw/output/");
  86. ? ?? ?? ?? ?? ??
  87. ? ?? ?? ?? ?? ? FileSystem fs = FileSystem.get(URI.create(output.toString()), conf);
  88. ? ?? ?? ?? ?? ? if (fs.exists(output)) {
  89. ? ?? ?? ?? ?? ?? ?? ?? ?fs.delete(output);
  90. ? ?? ?? ?? ?? ? }
  91. ? ?? ?? ?? ?? ??
  92. ? ?? ?? ?? ?? ? //mysql的jdbc驱动
  93. ? ?? ?? ?? ?? ? DistributedCache.addFileToClassPath(new Path(??
  94. ? ?? ?? ?? ?? ?? ?? ?? ???"hdfs://hd022-test.nh.sdo.com/user/liuxiaowen/mysql-connector-java-5.1.13-bin.jar"), conf);??
  95. ? ?? ?? ?? ?? ??
  96. ? ?? ?? ?? ?? ? DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",??
  97. ? ?? ?? ?? ?? ?? ?? ?? ???"jdbc:mysql://10.133.103.22:3306/hive", "hive", "hive");??
  98. ? ?? ?? ?? ?? ??
  99. ? ?? ?? ?? ?? ? Job job = new Job(conf,"test mysql connection");
  100. ? ?? ?? ?? ?? ? job.setJarByClass(ConnMysql.class);
  101. ? ?? ?? ?? ?? ??
  102. ? ?? ?? ?? ?? ? job.setMapperClass(ConnMysqlMapper.class);
  103. ? ?? ?? ?? ?? ? job.setReducerClass(ConnMysqlReducer.class);
  104. ? ?? ?? ?? ?? ??
  105. ? ?? ?? ?? ?? ? job.setOutputKeyClass(Text.class);
  106. ? ?? ?? ?? ?? ? job.setOutputValueClass(Text.class);
  107. ? ?? ?? ?? ?? ??
  108. ? ?? ?? ?? ?? ? job.setInputFormatClass(DBInputFormat.class);
  109. ? ?? ?? ?? ?? ? FileOutputFormat.setOutputPath(job, output);
  110. ? ?? ?? ?? ?? ??
  111. ? ?? ?? ?? ?? ? //列名
  112. ? ?? ?? ?? ?? ? String[] fields = { "TBL_NAME", "TBL_TYPE" };?
  113. ? ?? ?? ?? ?? ? //六个参数分别为:
  114. ? ?? ?? ?? ?? ? //1.Job;2.Class extends DBWritable>
  115. ? ?? ?? ?? ?? ? //3.表名;4.where条件
  116. ? ?? ?? ?? ?? ? //5.order by语句;6.列名
  117. ? ?? ?? ?? ?? ? DBInputFormat.setInput(job, TblsRecord.class,
  118. ? ?? ?? ?? ?? ?? ?? ?"lxw_tbls", "TBL_NAME like 'lxw%'", "TBL_NAME", fields);??
  119. ? ?? ?? ?? ?? ??
  120. ? ?? ?? ?? ?? ? System.exit(job.waitForCompletion(true) ? 0 : 1);
  121. ? ?? ???}
  122. ? ?? ???
  123. }
复制代码


运行结果:

  1. [lxw@hd025-test ~]$ hadoop fs -cat /user/lxw/output/part-r-00000
  2. lxw_hbase_table_1? ?? ? MANAGED_TABLE
  3. lxw_hbase_user_info? ???MANAGED_TABLE
  4. lxw_jobid? ?? ? MANAGED_TABLE
  5. lxw_t? ?MANAGED_TABLE
  6. lxw_t1??MANAGED_TABLE
  7. lxw_test_table??EXTERNAL_TABLE
复制代码

http://www.aboutyun.com/forum.php?highlight=MapReduce+MySQL&mod=viewthread&tid=7405

本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

热门话题

Java教程
1663
14
CakePHP 教程
1420
52
Laravel 教程
1313
25
PHP教程
1266
29
C# 教程
1239
24
laravel入门实例 laravel入门实例 Apr 18, 2025 pm 12:45 PM

Laravel 是一款 PHP 框架,用于轻松构建 Web 应用程序。它提供一系列强大的功能,包括:安装: 使用 Composer 全局安装 Laravel CLI,并在项目目录中创建应用程序。路由: 在 routes/web.php 中定义 URL 和处理函数之间的关系。视图: 在 resources/views 中创建视图以呈现应用程序的界面。数据库集成: 提供与 MySQL 等数据库的开箱即用集成,并使用迁移来创建和修改表。模型和控制器: 模型表示数据库实体,控制器处理 HTTP 请求。

MySQL和PhpMyAdmin:核心功能和功能 MySQL和PhpMyAdmin:核心功能和功能 Apr 22, 2025 am 12:12 AM

MySQL和phpMyAdmin是强大的数据库管理工具。1)MySQL用于创建数据库和表、执行DML和SQL查询。2)phpMyAdmin提供直观界面进行数据库管理、表结构管理、数据操作和用户权限管理。

解决数据库连接问题:使用minii/db库的实际案例 解决数据库连接问题:使用minii/db库的实际案例 Apr 18, 2025 am 07:09 AM

在开发一个小型应用时,我遇到了一个棘手的问题:需要快速集成一个轻量级的数据库操作库。尝试了多个库后,我发现它们要么功能过多,要么兼容性不佳。最终,我找到了minii/db,这是一个基于Yii2的简化版本,完美地解决了我的问题。

MySQL与其他编程语言:一种比较 MySQL与其他编程语言:一种比较 Apr 19, 2025 am 12:22 AM

MySQL与其他编程语言相比,主要用于存储和管理数据,而其他语言如Python、Java、C 则用于逻辑处理和应用开发。 MySQL以其高性能、可扩展性和跨平台支持着称,适合数据管理需求,而其他语言在各自领域如数据分析、企业应用和系统编程中各有优势。

laravel框架安装方法 laravel框架安装方法 Apr 18, 2025 pm 12:54 PM

文章摘要:本文提供了详细分步说明,指导读者如何轻松安装 Laravel 框架。Laravel 是一个功能强大的 PHP 框架,它 упростил 和加快了 web 应用程序的开发过程。本教程涵盖了从系统要求到配置数据库和设置路由等各个方面的安装过程。通过遵循这些步骤,读者可以快速高效地为他们的 Laravel 项目打下坚实的基础。

解决MySQL模式问题:TheliaMySQLModesChecker模块的使用体验 解决MySQL模式问题:TheliaMySQLModesChecker模块的使用体验 Apr 18, 2025 am 08:42 AM

在使用Thelia开发电商网站时,我遇到了一个棘手的问题:MySQL模式设置不当,导致某些功能无法正常运行。经过一番探索,我找到了一个名为TheliaMySQLModesChecker的模块,它能够自动修复Thelia所需的MySQL模式,彻底解决了我的困扰。

MySQL:结构化数据和关系数据库 MySQL:结构化数据和关系数据库 Apr 18, 2025 am 12:22 AM

MySQL通过表结构和SQL查询高效管理结构化数据,并通过外键实现表间关系。1.创建表时定义数据格式和类型。2.使用外键建立表间关系。3.通过索引和查询优化提高性能。4.定期备份和监控数据库确保数据安全和性能优化。

MySQL:解释的关键功能和功能 MySQL:解释的关键功能和功能 Apr 18, 2025 am 12:17 AM

MySQL是一个开源的关系型数据库管理系统,广泛应用于Web开发。它的关键特性包括:1.支持多种存储引擎,如InnoDB和MyISAM,适用于不同场景;2.提供主从复制功能,利于负载均衡和数据备份;3.通过查询优化和索引使用提高查询效率。

See all articles