Hadoop 中利用 地图reduce 读写 mysql 数据
Hadoop 中利用 mapreduce 读写 mysql 数据 问题导读 1.hadoop mapreduce的通过哪两个类可以读取数据源? 2.如果没有mysql驱动包,一般会是什么问题? 3.如何添加包? 有时候我们在项目中会遇到输入结果集很大,但是输出结果很小,比如一些 pv、uv 数据,然后
Hadoop 中利用 mapreduce 读写 mysql 数据问题导读
1.hadoop mapreduce的通过哪两个类可以读取数据源?
2.如果没有mysql驱动包,一般会是什么问题?
3.如何添加包?
有时候我们在项目中会遇到输入结果集很大,但是输出结果很小,比如一些 pv、uv 数据,然后为了实时查询的需求,或者一些 OLAP 的需求,我们需要 mapreduce 与 mysql 进行数据的交互,而这些特性正是 hbase 或者 hive 目前亟待改进的地方。
好了言归正传,简单的说说背景、原理以及需要注意的地方:
1、为了方便 MapReduce 直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过DBInputFormat类把数据库表数据读入到HDFS,根据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。
2、由于0.20版本对DBInputFormat和DBOutputFormat支持不是很好,该例用了0.19版本来说明这两个类的用法。
至少在我的 0.20.203 中的 org.apache.hadoop.mapreduce.lib 下是没见到 db 包,所以本文也是以老版的 API 来为例说明的。
3、运行MapReduce时候报错:java.io.IOException: com.mysql.jdbc.Driver,一般是由于程序找不到mysql驱动包。解决方法是让每个tasktracker运行MapReduce程序时都可以找到该驱动包。
添加包有两种方式:
(1)在每个节点下的${HADOOP_HOME}/lib下添加该包。重启集群,一般是比较原始的方法。
(2)a)把包传到集群上: hadoop fs -put mysql-connector-java-5.1.0- bin.jar /hdfsPath/
? ?? ? b)在mr程序提交job前,添加语句:DistributedCache.addFileToClassPath(new Path(“/hdfsPath/mysql- connector-java- 5.1.0-bin.jar”), conf);
(3)虽然API用的是0.19的,但是使用0.20的API一样可用,只是会提示方法已过时而已。、
4、测试数据:
- CREATE TABLE `t` (
- `id` int DEFAULT NULL,
- `name` varchar(10) DEFAULT NULL
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- CREATE TABLE `t2` (
- `id` int DEFAULT NULL,
- `name` varchar(10) DEFAULT NULL
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- insert into t values (1,"june"),(2,"decli"),(3,"hello"),
- ? ? ? ? (4,"june"),(5,"decli"),(6,"hello"),(7,"june"),
- ? ? ? ? (8,"decli"),(9,"hello"),(10,"june"),
- ? ? ? ? (11,"june"),(12,"decli"),(13,"hello");
5、代码:
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import java.sql.PreparedStatement;
- import java.sql.ResultSet;
- import java.sql.SQLException;
- import java.util.Iterator;
- import org.apache.hadoop.filecache.DistributedCache;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.mapred.JobClient;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapred.MapReduceBase;
- import org.apache.hadoop.mapred.Mapper;
- import org.apache.hadoop.mapred.OutputCollector;
- import org.apache.hadoop.mapred.Reducer;
- import org.apache.hadoop.mapred.Reporter;
- import org.apache.hadoop.mapred.lib.IdentityReducer;
- import org.apache.hadoop.mapred.lib.db.DBConfiguration;
- import org.apache.hadoop.mapred.lib.db.DBInputFormat;
- import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
- import org.apache.hadoop.mapred.lib.db.DBWritable;
- /**
- * Function: 测试 mr 与 mysql 的数据交互,此测试用例将一个表中的数据复制到另一张表中
- * ? ? ? ? ? ? ? ? ? ? ? ???实际当中,可能只需要从 mysql 读,或者写到 mysql 中。
- * date: 2013-7-29 上午2:34:04
- * @author june
- */
- public class Mysql2Mr {
- ? ? ? ? // DROP TABLE IF EXISTS `hadoop`.`studentinfo`;
- ? ? ? ? // CREATE TABLE studentinfo (
- ? ? ? ? // id INTEGER NOT NULL PRIMARY KEY,
- ? ? ? ? // name VARCHAR(32) NOT NULL);
- ? ? ? ? public static class StudentinfoRecord implements Writable, DBWritable {
- ? ? ? ? ? ? ? ? int id;
- ? ? ? ? ? ? ? ? String name;
- ? ? ? ? ? ? ? ? public StudentinfoRecord() {
- ? ? ? ? ? ? ? ? }
- ? ? ? ? ? ? ? ? public void readFields(DataInput in) throws IOException {
- ? ? ? ? ? ? ? ? ? ? ? ? this.id = in.readInt();
- ? ? ? ? ? ? ? ? ? ? ? ? this.name = Text.readString(in);
- ? ? ? ? ? ? ? ? }
- ? ? ? ? ? ? ? ? public String toString() {
- ? ? ? ? ? ? ? ? ? ? ? ? return new String(this.id + " " + this.name);
- ? ? ? ? ? ? ? ? }
- ? ? ? ? ? ? ? ? @Override
- ? ? ? ? ? ? ? ? public void write(PreparedStatement stmt) throws SQLException {
- ? ? ? ? ? ? ? ? ? ? ? ? stmt.setInt(1, this.id);
- ? ? ? ? ? ? ? ? ? ? ? ? stmt.setString(2, this.name);
- ? ? ? ? ? ? ? ? }
- ? ? ? ? ? ? ? ? @Override
- ? ? ? ? ? ? ? ? public void readFields(ResultSet result) throws SQLException {
- ? ? ? ? ? ? ? ? ? ? ? ? this.id = result.getInt(1);
- ? ? ? ? ? ? ? ? ? ? ? ? this.name = result.getString(2);
- ? ? ? ? ? ? ? ? }
- ? ? ? ? ? ? ? ? @Override
- ? ? ? ? ? ? ? ? public void write(DataOutput out) throws IOException {
- ? ? ? ? ? ? ? ? ? ? ? ? out.writeInt(this.id);
- ? ? ? ? ? ? ? ? ? ? ? ? Text.writeString(out, this.name);
- ? ? ? ? ? ? ? ? }
- ? ? ? ? }
- ? ? ? ? // 记住此处是静态内部类,要不然你自己实现无参构造器,或者等着抛异常:
- ? ? ? ? // Caused by: java.lang.NoSuchMethodException: DBInputMapper.
() - ? ? ? ? // http://stackoverflow.com/questions/7154125/custom-mapreduce-input-format-cant-find-constructor
- ? ? ? ? // 网上脑残式的转帖,没见到一个写对的。。。
- ? ? ? ? public static class DBInputMapper extends MapReduceBase implements
- ? ? ? ? ? ? ? ? ? ? ? ? Mapper
{ - ? ? ? ? ? ? ? ? public void map(LongWritable key, StudentinfoRecord value,
- ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? OutputCollector
collector, Reporter reporter) throws IOException { - ? ? ? ? ? ? ? ? ? ? ? ? collector.collect(new LongWritable(value.id), new Text(value.toString()));
- ? ? ? ? ? ? ? ? }
- ? ? ? ? }
- ? ? ? ? public static class MyReducer extends MapReduceBase implements
- ? ? ? ? ? ? ? ? ? ? ? ? Reducer
{ - ? ? ? ? ? ? ? ? @Override
- ? ? ? ? ? ? ? ? public void reduce(LongWritable key, Iterator
values, - ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? OutputCollector
output, Reporter reporter) throws IOException { - ? ? ? ? ? ? ? ? ? ? ? ? String[] splits = values.next().toString().split(" ");
- ? ? ? ? ? ? ? ? ? ? ? ? StudentinfoRecord r = new StudentinfoRecord();
- ? ? ? ? ? ? ? ? ? ? ? ? r.id = Integer.parseInt(splits[0]);
- ? ? ? ? ? ? ? ? ? ? ? ? r.name = splits[1];
- ? ? ? ? ? ? ? ? ? ? ? ? output.collect(r, new Text(r.name));
- ? ? ? ? ? ? ? ? }
- ? ? ? ? }
- ? ? ? ? public static void main(String[] args) throws IOException {
- ? ? ? ? ? ? ? ? JobConf conf = new JobConf(Mysql2Mr.class);
- ? ? ? ? ? ? ? ? DistributedCache.addFileToClassPath(new Path("/tmp/mysql-connector-java-5.0.8-bin.jar"), conf);
- ? ? ? ? ? ? ? ? conf.setMapOutputKeyClass(LongWritable.class);
- ? ? ? ? ? ? ? ? conf.setMapOutputValueClass(Text.class);
- ? ? ? ? ? ? ? ? conf.setOutputKeyClass(LongWritable.class);
- ? ? ? ? ? ? ? ? conf.setOutputValueClass(Text.class);
- ? ? ? ? ? ? ? ? conf.setOutputFormat(DBOutputFormat.class);
- ? ? ? ? ? ? ? ? conf.setInputFormat(DBInputFormat.class);
- ? ? ? ? ? ? ? ? // // mysql to hdfs
- ? ? ? ? ? ? ? ? // conf.setReducerClass(IdentityReducer.class);
- ? ? ? ? ? ? ? ? // Path outPath = new Path("/tmp/1");
- ? ? ? ? ? ? ? ? // FileSystem.get(conf).delete(outPath, true);
- ? ? ? ? ? ? ? ? // FileOutputFormat.setOutputPath(conf, outPath);
- ? ? ? ? ? ? ? ? DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.1.101:3306/test",
- ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? "root", "root");
- ? ? ? ? ? ? ? ? String[] fields = { "id", "name" };
- ? ? ? ? ? ? ? ? // 从 t 表读数据
- ? ? ? ? ? ? ? ? DBInputFormat.setInput(conf, StudentinfoRecord.class, "t", null, "id", fields);
- ? ? ? ? ? ? ? ? // mapreduce 将数据输出到 t2 表
- ? ? ? ? ? ? ? ? DBOutputFormat.setOutput(conf, "t2", "id", "name");
- ? ? ? ? ? ? ? ? // conf.setMapperClass(org.apache.hadoop.mapred.lib.IdentityMapper.class);
- ? ? ? ? ? ? ? ? conf.setMapperClass(DBInputMapper.class);
- ? ? ? ? ? ? ? ? conf.setReducerClass(MyReducer.class);
- ? ? ? ? ? ? ? ? JobClient.runJob(conf);
- ? ? ? ? }
- }
6、结果:
执行两次后,你可以看到mysql结果:
- mysql> select * from t2;
- +------+-------+
- | id? ?| name??|
- +------+-------+
- |? ? 1 | june??|
- |? ? 2 | decli |
- |? ? 3 | hello |
- |? ? 4 | june??|
- |? ? 5 | decli |
- |? ? 6 | hello |
- |? ? 7 | june??|
- |? ? 8 | decli |
- |? ? 9 | hello |
- |? ?10 | june??|
- |? ?11 | june??|
- |? ?12 | decli |
- |? ?13 | hello |
- |? ? 1 | june??|
- |? ? 2 | decli |
- |? ? 3 | hello |
- |? ? 4 | june??|
- |? ? 5 | decli |
- |? ? 6 | hello |
- |? ? 7 | june??|
- |? ? 8 | decli |
- |? ? 9 | hello |
- |? ?10 | june??|
- |? ?11 | june??|
- |? ?12 | decli |
- |? ?13 | hello |
- +------+-------+
- 26 rows in set (0.00 sec)
- mysql>
7、日志:
- 13/07/29 02:33:03 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
- 13/07/29 02:33:03 INFO filecache.TrackerDistributedCacheManager: Creating mysql-connector-java-5.0.8-bin.jar in /tmp/hadoop-june/mapred/local/archive/-8943686319031389138_-1232673160_640840668/192.168.1.101/tmp-work--8372797484204470322 with rwxr-xr-x
- 13/07/29 02:33:03 INFO filecache.TrackerDistributedCacheManager: Cached hdfs://192.168.1.101:9000/tmp/mysql-connector-java-5.0.8-bin.jar as /tmp/hadoop-june/mapred/local/archive/-8943686319031389138_-1232673160_640840668/192.168.1.101/tmp/mysql-connector-java-5.0.8-bin.jar
- 13/07/29 02:33:03 INFO filecache.TrackerDistributedCacheManager: Cached hdfs://192.168.1.101:9000/tmp/mysql-connector-java-5.0.8-bin.jar as /tmp/hadoop-june/mapred/local/archive/-8943686319031389138_-1232673160_640840668/192.168.1.101/tmp/mysql-connector-java-5.0.8-bin.jar
- 13/07/29 02:33:03 INFO mapred.JobClient: Running job: job_local_0001
- 13/07/29 02:33:03 INFO mapred.MapTask: numReduceTasks: 1
- 13/07/29 02:33:03 INFO mapred.MapTask: io.sort.mb = 100
- 13/07/29 02:33:03 INFO mapred.MapTask: data buffer = 79691776/99614720
- 13/07/29 02:33:03 INFO mapred.MapTask: record buffer = 262144/327680
- 13/07/29 02:33:03 INFO mapred.MapTask: Starting flush of map output
- 13/07/29 02:33:03 INFO mapred.MapTask: Finished spill 0
- 13/07/29 02:33:03 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
- 13/07/29 02:33:04 INFO mapred.JobClient:??map 0% reduce 0%
- 13/07/29 02:33:06 INFO mapred.LocalJobRunner:?
- 13/07/29 02:33:06 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
- 13/07/29 02:33:06 INFO mapred.LocalJobRunner:?
- 13/07/29 02:33:06 INFO mapred.Merger: Merging 1 sorted segments
- 13/07/29 02:33:06 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 235 bytes
- 13/07/29 02:33:06 INFO mapred.LocalJobRunner:?
- 13/07/29 02:33:06 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
- 13/07/29 02:33:07 INFO mapred.JobClient:??map 100% reduce 0%
- 13/07/29 02:33:09 INFO mapred.LocalJobRunner: reduce > reduce
- 13/07/29 02:33:09 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
- 13/07/29 02:33:09 WARN mapred.FileOutputCommitter: Output path is null in cleanup
- 13/07/29 02:33:10 INFO mapred.JobClient:??map 100% reduce 100%
- 13/07/29 02:33:10 INFO mapred.JobClient: Job complete: job_local_0001
- 13/07/29 02:33:10 INFO mapred.JobClient: Counters: 18
- 13/07/29 02:33:10 INFO mapred.JobClient:? ?File Input Format Counters?
- 13/07/29 02:33:10 INFO mapred.JobClient:? ???Bytes Read=0
- 13/07/29 02:33:10 INFO mapred.JobClient:? ?File Output Format Counters?
- 13/07/29 02:33:10 INFO mapred.JobClient:? ???Bytes Written=0
- 13/07/29 02:33:10 INFO mapred.JobClient:? ?FileSystemCounters
- 13/07/29 02:33:10 INFO mapred.JobClient:? ???FILE_BYTES_READ=1211691
- 13/07/29 02:33:10 INFO mapred.JobClient:? ???HDFS_BYTES_READ=1081704
- 13/07/29 02:33:10 INFO mapred.JobClient:? ???FILE_BYTES_WRITTEN=2392844
- 13/07/29 02:33:10 INFO mapred.JobClient:? ?Map-Reduce Framework
- 13/07/29 02:33:10 INFO mapred.JobClient:? ???Map output materialized bytes=239
- 13/07/29 02:33:10 INFO mapred.JobClient:? ???Map input records=13
- 13/07/29 02:33:10 INFO mapred.JobClient:? ???Reduce shuffle bytes=0
- 13/07/29 02:33:10 INFO mapred.JobClient:? ???Spilled Records=26
- 13/07/29 02:33:10 INFO mapred.JobClient:? ???Map output bytes=207
- 13/07/29 02:33:10 INFO mapred.JobClient:? ???Map input bytes=13
- 13/07/29 02:33:10 INFO mapred.JobClient:? ???SPLIT_RAW_BYTES=75
- 13/07/29 02:33:10 INFO mapred.JobClient:? ???Combine input records=0
- 13/07/29 02:33:10 INFO mapred.JobClient:? ???Reduce input records=13
- 13/07/29 02:33:10 INFO mapred.JobClient:? ???Reduce input groups=13
- 13/07/29 02:33:10 INFO mapred.JobClient:? ???Combine output records=0
- 13/07/29 02:33:10 INFO mapred.JobClient:? ???Reduce output records=13
- 13/07/29 02:33:10 INFO mapred.JobClient:? ???Map output records=13
Mysql中数据:
- mysql> select * from lxw_tbls;
- +---------------------+----------------+
- | TBL_NAME? ?? ?? ?? ?| TBL_TYPE? ?? ? |
- +---------------------+----------------+
- | lxw_test_table? ?? ?| EXTERNAL_TABLE |
- | lxw_t? ?? ?? ?? ?? ?| MANAGED_TABLE??|
- | lxw_t1? ?? ?? ?? ???| MANAGED_TABLE??|
- | tt? ?? ?? ?? ?? ?? ?| MANAGED_TABLE??|
- | tab_partition? ?? ? | MANAGED_TABLE??|
- | lxw_hbase_table_1? ?| MANAGED_TABLE??|
- | lxw_hbase_user_info | MANAGED_TABLE??|
- | t? ?? ?? ?? ?? ?? ? | EXTERNAL_TABLE |
- | lxw_jobid? ?? ?? ???| MANAGED_TABLE??|
- +---------------------+----------------+
- 9 rows in set (0.01 sec)
- mysql> select * from lxw_tbls where TBL_NAME like 'lxw%' order by TBL_NAME;
- +---------------------+----------------+
- | TBL_NAME? ?? ?? ?? ?| TBL_TYPE? ?? ? |
- +---------------------+----------------+
- | lxw_hbase_table_1? ?| MANAGED_TABLE??|
- | lxw_hbase_user_info | MANAGED_TABLE??|
- | lxw_jobid? ?? ?? ???| MANAGED_TABLE??|
- | lxw_t? ?? ?? ?? ?? ?| MANAGED_TABLE??|
- | lxw_t1? ?? ?? ?? ???| MANAGED_TABLE??|
- | lxw_test_table? ?? ?| EXTERNAL_TABLE |
- +---------------------+----------------+
- 6 rows in set (0.00 sec)
MapReduce程序代码,ConnMysql.java:
- package com.lxw.study;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import java.net.URI;
- import java.sql.PreparedStatement;
- import java.sql.ResultSet;
- import java.sql.SQLException;
- import java.util.Iterator;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.filecache.DistributedCache;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
- import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
- import org.apache.hadoop.mapreduce.lib.db.DBWritable;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- public class ConnMysql {
- ? ?? ???
- ? ?? ???private static Configuration conf = new Configuration();
- ? ?? ???
- ? ?? ???static {
- ? ?? ?? ?? ?? ? conf.addResource(new Path("F:/lxw-hadoop/hdfs-site.xml"));
- ? ?? ?? ?? ?? ? conf.addResource(new Path("F:/lxw-hadoop/mapred-site.xml"));
- ? ?? ?? ?? ?? ? conf.addResource(new Path("F:/lxw-hadoop/core-site.xml"));
- ? ?? ?? ?? ?? ? conf.set("mapred.job.tracker", "10.133.103.21:50021");
- ? ?? ???}
- ? ?? ???
- ? ?? ???public static class TblsRecord implements Writable, DBWritable {
- ? ?? ?? ?? ?? ? String tbl_name;
- ? ?? ?? ?? ?? ? String tbl_type;
- ? ?? ?? ?? ?? ? public TblsRecord() {
- ? ?? ?? ?? ?? ? }
- ? ?? ?? ?? ?? ? @Override
- ? ?? ?? ?? ?? ? public void write(PreparedStatement statement) throws SQLException {
- ? ?? ?? ?? ?? ?? ?? ?? ?// TODO Auto-generated method stub
- ? ?? ?? ?? ?? ?? ?? ?? ?statement.setString(1, this.tbl_name);
- ? ?? ?? ?? ?? ?? ?? ?? ?statement.setString(2, this.tbl_type);
- ? ?? ?? ?? ?? ? }
- ? ?? ?? ?? ?? ? @Override
- ? ?? ?? ?? ?? ? public void readFields(ResultSet resultSet) throws SQLException {
- ? ?? ?? ?? ?? ?? ?? ?? ?// TODO Auto-generated method stub
- ? ?? ?? ?? ?? ?? ?? ?? ?this.tbl_name = resultSet.getString(1);
- ? ?? ?? ?? ?? ?? ?? ?? ?this.tbl_type = resultSet.getString(2);
- ? ?? ?? ?? ?? ? }
- ? ?? ?? ?? ?? ? @Override
- ? ?? ?? ?? ?? ? public void write(DataOutput out) throws IOException {
- ? ?? ?? ?? ?? ?? ?? ?? ?// TODO Auto-generated method stub
- ? ?? ?? ?? ?? ?? ?? ?? ?Text.writeString(out, this.tbl_name);
- ? ?? ?? ?? ?? ?? ?? ?? ?Text.writeString(out, this.tbl_type);
- ? ?? ?? ?? ?? ? }
- ? ?? ?? ?? ?? ? @Override
- ? ?? ?? ?? ?? ? public void readFields(DataInput in) throws IOException {
- ? ?? ?? ?? ?? ?? ?? ?? ?// TODO Auto-generated method stub
- ? ?? ?? ?? ?? ?? ?? ?? ?this.tbl_name = Text.readString(in);
- ? ?? ?? ?? ?? ?? ?? ?? ?this.tbl_type = Text.readString(in);
- ? ?? ?? ?? ?? ? }
- ? ?? ?? ?? ?? ? public String toString() {
- ? ?? ?? ?? ?? ?? ?? ?? ?return new String(this.tbl_name + " " + this.tbl_type);
- ? ?? ?? ?? ?? ? }
- ? ?? ???}
- ? ?? ???public static class ConnMysqlMapper extends Mapper
{ - ? ?? ?? ?? ?? ? public void map(LongWritable key,TblsRecord values,Context context)?
- ? ?? ?? ?? ?? ?? ?? ?? ?? ?? ???throws IOException,InterruptedException {
- ? ?? ?? ?? ?? ?? ?? ?? ?context.write(new Text(values.tbl_name), new Text(values.tbl_type));
- ? ?? ?? ?? ?? ? }
- ? ?? ???}
- ? ?? ???
- ? ?? ???public static class ConnMysqlReducer extends Reducer
{ - ? ?? ?? ?? ?? ? public void reduce(Text key,Iterable
values,Context context)? - ? ?? ?? ?? ?? ?? ?? ?? ?? ?? ???throws IOException,InterruptedException {
- ? ?? ?? ?? ?? ?? ?? ?? ?for(Iterator
itr = values.iterator();itr.hasNext();) { - ? ?? ?? ?? ?? ?? ?? ?? ?? ?? ???context.write(key, itr.next());
- ? ?? ?? ?? ?? ?? ?? ?? ?}
- ? ?? ?? ?? ?? ? }
- ? ?? ???}
- ? ?? ???
- ? ?? ???public static void main(String[] args) throws Exception {
- ? ?? ?? ?? ?? ? Path output = new Path("/user/lxw/output/");
- ? ?? ?? ?? ?? ??
- ? ?? ?? ?? ?? ? FileSystem fs = FileSystem.get(URI.create(output.toString()), conf);
- ? ?? ?? ?? ?? ? if (fs.exists(output)) {
- ? ?? ?? ?? ?? ?? ?? ?? ?fs.delete(output);
- ? ?? ?? ?? ?? ? }
- ? ?? ?? ?? ?? ??
- ? ?? ?? ?? ?? ? //mysql的jdbc驱动
- ? ?? ?? ?? ?? ? DistributedCache.addFileToClassPath(new Path(??
- ? ?? ?? ?? ?? ?? ?? ?? ???"hdfs://hd022-test.nh.sdo.com/user/liuxiaowen/mysql-connector-java-5.1.13-bin.jar"), conf);??
- ? ?? ?? ?? ?? ??
- ? ?? ?? ?? ?? ? DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",??
- ? ?? ?? ?? ?? ?? ?? ?? ???"jdbc:mysql://10.133.103.22:3306/hive", "hive", "hive");??
- ? ?? ?? ?? ?? ??
- ? ?? ?? ?? ?? ? Job job = new Job(conf,"test mysql connection");
- ? ?? ?? ?? ?? ? job.setJarByClass(ConnMysql.class);
- ? ?? ?? ?? ?? ??
- ? ?? ?? ?? ?? ? job.setMapperClass(ConnMysqlMapper.class);
- ? ?? ?? ?? ?? ? job.setReducerClass(ConnMysqlReducer.class);
- ? ?? ?? ?? ?? ??
- ? ?? ?? ?? ?? ? job.setOutputKeyClass(Text.class);
- ? ?? ?? ?? ?? ? job.setOutputValueClass(Text.class);
- ? ?? ?? ?? ?? ??
- ? ?? ?? ?? ?? ? job.setInputFormatClass(DBInputFormat.class);
- ? ?? ?? ?? ?? ? FileOutputFormat.setOutputPath(job, output);
- ? ?? ?? ?? ?? ??
- ? ?? ?? ?? ?? ? //列名
- ? ?? ?? ?? ?? ? String[] fields = { "TBL_NAME", "TBL_TYPE" };?
- ? ?? ?? ?? ?? ? //六个参数分别为:
- ? ?? ?? ?? ?? ? //1.Job;2.Class extends DBWritable>
- ? ?? ?? ?? ?? ? //3.表名;4.where条件
- ? ?? ?? ?? ?? ? //5.order by语句;6.列名
- ? ?? ?? ?? ?? ? DBInputFormat.setInput(job, TblsRecord.class,
- ? ?? ?? ?? ?? ?? ?? ?"lxw_tbls", "TBL_NAME like 'lxw%'", "TBL_NAME", fields);??
- ? ?? ?? ?? ?? ??
- ? ?? ?? ?? ?? ? System.exit(job.waitForCompletion(true) ? 0 : 1);
- ? ?? ???}
- ? ?? ???
- }
运行结果:
- [lxw@hd025-test ~]$ hadoop fs -cat /user/lxw/output/part-r-00000
- lxw_hbase_table_1? ?? ? MANAGED_TABLE
- lxw_hbase_user_info? ???MANAGED_TABLE
- lxw_jobid? ?? ? MANAGED_TABLE
- lxw_t? ?MANAGED_TABLE
- lxw_t1??MANAGED_TABLE
- lxw_test_table??EXTERNAL_TABLE
http://www.aboutyun.com/forum.php?highlight=MapReduce+MySQL&mod=viewthread&tid=7405

ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

AI Hentai Generator
AIヘンタイを無料で生成します。

人気の記事

ホットツール

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中国語版
中国語版、とても使いやすい

ゼンドスタジオ 13.0.1
強力な PHP 統合開発環境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

ホットトピック

PHP で MySQL データベースをバックアップおよび復元するには、次の手順を実行します。 データベースをバックアップします。 mysqldump コマンドを使用して、データベースを SQL ファイルにダンプします。データベースの復元: mysql コマンドを使用して、SQL ファイルからデータベースを復元します。

MySQL クエリのパフォーマンスは、検索時間を線形の複雑さから対数の複雑さまで短縮するインデックスを構築することで最適化できます。 PreparedStatement を使用して SQL インジェクションを防止し、クエリのパフォーマンスを向上させます。クエリ結果を制限し、サーバーによって処理されるデータ量を削減します。適切な結合タイプの使用、インデックスの作成、サブクエリの使用の検討など、結合クエリを最適化します。クエリを分析してボトルネックを特定し、キャッシュを使用してデータベースの負荷を軽減し、オーバーヘッドを最小限に抑えます。

MySQLテーブルにデータを挿入するにはどうすればよいですか?データベースに接続する: mysqli を使用してデータベースへの接続を確立します。 SQL クエリを準備します。挿入する列と値を指定する INSERT ステートメントを作成します。クエリの実行: query() メソッドを使用して挿入クエリを実行します。成功すると、確認メッセージが出力されます。

PHP で MySQL ストアド プロシージャを使用するには: PDO または MySQLi 拡張機能を使用して、MySQL データベースに接続します。ストアド プロシージャを呼び出すステートメントを準備します。ストアド プロシージャを実行します。結果セットを処理します (ストアド プロシージャが結果を返す場合)。データベース接続を閉じます。

PHP を使用して MySQL テーブルを作成するには、次の手順が必要です。 データベースに接続します。データベースが存在しない場合は作成します。データベースを選択します。テーブルを作成します。クエリを実行します。接続を閉じます。

MySQL 8.4 (2024 年時点の最新の LTS リリース) で導入された主な変更の 1 つは、「MySQL Native Password」プラグインがデフォルトで有効ではなくなったことです。さらに、MySQL 9.0 ではこのプラグインが完全に削除されています。 この変更は PHP および他のアプリに影響します

サービス開始から 1 年後、Google マップは新しい機能をリリースしました。地図上に目的地までのルートを設定すると、移動ルートがまとめられます。旅行が始まると、携帯電話のロック画面からルート案内を「参照」できます。 Google マップを使用すると、到着予定時刻とルートを確認できます。旅行中は、ロック画面でナビゲーション情報を表示できます。また、携帯電話のロックを解除すると、Google マップにアクセスせずにナビゲーション情報を表示できます。携帯電話のロックを解除すると、Google マップにアクセスせずにナビゲーション情報を表示できます。携帯電話のロックを解除すると、Google マップにアクセスせずにナビゲーション情報を表示できます。 携帯電話のロックを解除すると、Google マップにアクセスせずにナビゲーション情報を表示できます。 Google マップにアクセスせずにナビゲーション情報を表示できます。

2025年、グローバルデジタル仮想通貨取引プラットフォームは、トランザクションのボリューム、セキュリティ、ユーザーエクスペリエンスなどの指標に基づいて、2025年に世界のトップ10のデジタル通貨取引プラットフォームを激しく競争しています。 OKXは、強力な技術的強さとグローバルな運用戦略で最初にランクされており、Binanceは高流動性と低料金に密接に続きます。 Gate.io、Coinbase、Krakenなどのプラットフォームは、それぞれの利点がある最前線にいます。このリストには、Huobi、Kucoin、Bitfinex、Crypto.com、Geminiなどの取引プラットフォームがそれぞれ独自の特徴がありますが、投資は注意する必要があります。プラットフォームを選択するには、セキュリティ、流動性、料金、ユーザーエクスペリエンス、通貨選択、規制コンプライアンスなどの要因を考慮し、合理的に投資する必要があります
