新版api mapreduce reduce结果写入mysql_MySQL
Jun 01, 2016 pm 01:12 PMimport java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;import org.apache.hadoop.mapreduce.lib.db.DBWritable;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;/** * 将mapreduce的结果数据写入mysql中 * * @author asheng */public class WriteDataToMysql { /** * 重写DBWritable * * @author asheng TblsWritable需要向mysql中写入数据 */ public static class TblsWritable implements Writable, DBWritable { String tbl_name; String tbl_type; public TblsWritable() { } public TblsWritable(String tbl_name, String tab_type) { this.tbl_name = tbl_name; this.tbl_type = tab_type; } @Override public void write(PreparedStatement statement) throws SQLException { statement.setString(1, this.tbl_name); statement.setString(2, this.tbl_type); } @Override public void readFields(ResultSet resultSet) throws SQLException { this.tbl_name = resultSet.getString(1); this.tbl_type = resultSet.getString(2); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.tbl_name); out.writeUTF(this.tbl_type); } @Override public void readFields(DataInput in) throws IOException { this.tbl_name = in.readUTF(); this.tbl_type = in.readUTF(); } public String toString() { return new String(this.tbl_name + " " + this.tbl_type); } } public static class ConnMysqlMapper extends Mapper<longwritable text intwritable> // TblsRecord是自定义的类型,也就是上面重写的DBWritable类 { enum Counter { LINESKIP, } private final static IntWritable one = new IntWritable(1); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { try { String line = value.toString(); String[] strings = line.split("/t"); String initTime = strings[1]; String devType = strings[4]; if (initTime.length() == 19) { SimpleDateFormat sdf = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss"); Date date = sdf.parse(initTime); context.write(new Text(initTime.substring(0, 10)),one); } else { // System.err.println(initTime); context.getCounter(Counter.LINESKIP).increment(1); } // } catch (ArrayIndexOutOfBoundsException e) { } catch (ArrayIndexOutOfBoundsException e) { context.getCounter(Counter.LINESKIP).increment(1); return; } catch (ParseException e) { context.getCounter(Counter.LINESKIP).increment(1); return; } } } public static class ConnMysqlReducer extends Reducer<text text tblswritable> { public void reduce(Text key, Iterable<text> values, Context context) throws IOException, InterruptedException { int count = 0; for (Iterator<text> itr = values.iterator(); itr.hasNext(); itr .next()) { count++; } context.write( new TblsWritable(key.toString(), String.valueOf(count)), null); } } public static void main(String args[]) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://127.0.0.1:3306/XINGXUNTONG", "hadoop", "123456"); Job job = new Job(conf, "test mysql connection"); job.setJarByClass(WriteDataToMysql.class); job.setMapperClass(ConnMysqlMapper.class); job.setReducerClass(ConnMysqlReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(DBOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); DBOutputFormat.setOutput(job, "test", "initTime", "new_user_total"); System.exit(job.waitForCompletion(true) ? 0 : 1); }}</text></text></text></longwritable>
Copy after login
之所以写入mysql是因为我们平时处理的Tb级log文件处理结果却很小,写入关系数据库使查询和使用非常便利
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn

Hot Article
Repo: How To Revive Teammates
3 weeks ago
By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Energy Crystals Explained and What They Do (Yellow Crystal)
1 weeks ago
By 尊渡假赌尊渡假赌尊渡假赌
How Long Does It Take To Beat Split Fiction?
3 weeks ago
By DDD
Hello Kitty Island Adventure: How To Get Giant Seeds
3 weeks ago
By 尊渡假赌尊渡假赌尊渡假赌

Hot tools Tags

Hot Article
Repo: How To Revive Teammates
3 weeks ago
By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Energy Crystals Explained and What They Do (Yellow Crystal)
1 weeks ago
By 尊渡假赌尊渡假赌尊渡假赌
How Long Does It Take To Beat Split Fiction?
3 weeks ago
By DDD
Hello Kitty Island Adventure: How To Get Giant Seeds
3 weeks ago
By 尊渡假赌尊渡假赌尊渡假赌

Hot Article Tags

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Hot Topics

Reduce the use of MySQL memory in Docker

How do you alter a table in MySQL using the ALTER TABLE statement?

How to solve the problem of mysql cannot open shared library

What is SQLite? Comprehensive overview

Run MySQl in Linux (with/without podman container with phpmyadmin)

Running multiple MySQL versions on MacOS: A step-by-step guide

What are some popular MySQL GUI tools (e.g., MySQL Workbench, phpMyAdmin)?

How do I secure MySQL against common vulnerabilities (SQL injection, brute-force attacks)?
