首页 数据库 mysql教程 Hadoop MapRduce 重写DBOutputFormat更新mysql数据库_MySQL

Hadoop MapRduce 重写DBOutputFormat更新mysql数据库_MySQL

Jun 01, 2016 pm 01:10 PM

 

在http://blog.csdn.net/sunflower_cao/article/details/28266939  写过可以通过继承 Writable, DBWritable实现在reduce过程中讲结果写入到mysql数据库里边,但是一直有一个问题就是只能实现insert  没法去更新已经存在的数据,这就导致不同的mapreduce程序获得的数据只能插入到不同的数据库中 在使用的时候需要建立view或者使用复杂的sql语句去查询,今天调查了下,发现可以通过重写DBOutputFormat

 

上代码:

TblsWritable.java

import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.lib.db.DBWritable;/** * 重写DBWritable *  * @author caozw TblsWritable需要向mysql中写入数据 */public  class TblsWritable implements Writable, DBWritable {	String initTime;	String new_android_user;	String new_ios_user;	String new_total_user;	String iosUserTotal;	String androidUserTotal;	String userTotal;	public TblsWritable() {	}	public TblsWritable(String initTime, String new_android_user,			String new_ios_user, String new_total_user,			String iosUserTotal, String androidUserTotal, String userTotal) {		this.initTime = initTime;		this.new_android_user = new_android_user;		this.new_ios_user = new_ios_user;		this.new_total_user = new_total_user;		this.iosUserTotal = iosUserTotal;		this.androidUserTotal = androidUserTotal;		this.userTotal = userTotal;	}	@Override	public void write(PreparedStatement statement) throws SQLException {		statement.setString(1, this.new_android_user);		statement.setString(2, this.new_ios_user);		statement.setString(3, this.new_total_user);		statement.setString(4, this.androidUserTotal);		statement.setString(5, this.iosUserTotal);		statement.setString(6, this.userTotal);		statement.setString(7, this.initTime);	}	@Override	public void readFields(ResultSet resultSet) throws SQLException {		this.new_android_user = resultSet.getString(1);		this.new_ios_user = resultSet.getString(2);		this.new_total_user = resultSet.getString(3);		this.androidUserTotal = resultSet.getString(4);		this.iosUserTotal = resultSet.getString(5);		this.userTotal = resultSet.getString(6);		this.initTime = resultSet.getString(7);	}	@Override	public void write(DataOutput out) throws IOException {		out.writeUTF(this.new_android_user);		out.writeUTF(this.new_ios_user);		out.writeUTF(this.new_total_user);		out.writeUTF(this.androidUserTotal);		out.writeUTF(this.iosUserTotal);		out.writeUTF(this.userTotal);		out.writeUTF(this.initTime);	}	@Override	public void readFields(DataInput in) throws IOException {		this.new_android_user = in.readUTF();		this.new_ios_user = in.readUTF();		this.new_total_user = in.readUTF();		this.androidUserTotal = in.readUTF();		this.iosUserTotal = in.readUTF();		this.userTotal = in.readUTF();		this.initTime = in.readUTF();	}	public String toString() {		return new String(this.initTime + " " + this.new_android_user + " "				+ this.new_ios_user + " " + this.new_total_user + " "				+ this.androidUserTotal + " " + this.iosUserTotal + " "				+ this.userTotal);	}}
登录后复制


WriteDataToMysql.java

import java.io.IOException;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;/** * 将mapreduce的结果数据写入mysql中 *  * @author caozw */public class WriteDataToMysql {	public static String[] fieldNames = { "INITTIME", "NEW_ANDROID_USER",			"NEW_IOS_USER", "NEW_USER_TOTAL", "TOTAL_ANDROID_USER",			"TOTAL_IOS_USER", "TOTAL_USER" };	public static String table = "USER_INFO_STATIC";		public static class ConnMysqlMapper extends			Mapper<longwritable text intwritable> {		enum Counter {			LINESKIP,		}		private final static IntWritable one = new IntWritable(1);		private final static IntWritable zero = new IntWritable(0);		public void map(LongWritable key, Text value, Context context)				throws IOException, InterruptedException {			try {				String line = value.toString();				String[] strings = line.split("/t");				String initTime = strings[1];				String devType = strings[4];				if (initTime.length() == 19) {					SimpleDateFormat sdf = new SimpleDateFormat(							"yyyy-MM-dd HH:mm:ss");					Date date = sdf.parse(initTime);					if ("1".equals(devType)) {						context.write(new Text(initTime.substring(0, 10)), one);						context.write(new Text(initTime.substring(0, 10)), zero);					}				} else {					// System.err.println(initTime);					context.getCounter(Counter.LINESKIP).increment(1);				}				// } catch (ArrayIndexOutOfBoundsException e) {			} catch (ArrayIndexOutOfBoundsException e) {				context.getCounter(Counter.LINESKIP).increment(1);				return;			} catch (ParseException e) {				context.getCounter(Counter.LINESKIP).increment(1);				return;			}		}	}	public static class ConnMysqlReducer extends			Reducer<text intwritable tblswritable> {		static int iosUserTotal = 0;		static int androidUserTotal = 0;		public void reduce(Text key, Iterable<intwritable> values,				Context context) throws IOException, InterruptedException {			int android = 0;			int ios = 0;			int total = 0;			for (Iterator<intwritable> itr = values.iterator(); itr.hasNext();) {				total++;				if (0 == itr.next().get()) {					android++;				} else {					ios++;				}			}			iosUserTotal += ios;			androidUserTotal += android;			/*			 * System.err.println(key.toString() + ":" + String.valueOf(android)			 * + ":" + String.valueOf(ios) + ":" + String.valueOf(total));			 */			context.write(					new TblsWritable(key.toString(), String.valueOf(android),							String.valueOf(ios), String.valueOf(total), String									.valueOf(androidUserTotal), String									.valueOf(iosUserTotal), String									.valueOf(androidUserTotal + iosUserTotal)),					null);		}	}	public static void main(String args[]) throws IOException,			InterruptedException, ClassNotFoundException {		Configuration conf = new Configuration();		DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",				"jdbc:mysql://127.0.0.1:3306/XINGXUNTONG", "hadoop", "123456");		Job job = new Job(conf, "test mysql connection");		job.setJarByClass(WriteDataToMysql.class);		job.setMapperClass(ConnMysqlMapper.class);		job.setReducerClass(ConnMysqlReducer.class);		job.setOutputKeyClass(Text.class);		job.setOutputValueClass(IntWritable.class);		job.setSpeculativeExecution(false);		job.setInputFormatClass(TextInputFormat.class);		job.setOutputFormatClass(MysqlDBOutputFormat.class);		//job.setOutputFormatClass(DBOutputFormat.class);				FileInputFormat.addInputPath(job, new Path(args[0]));		// DBOutputFormat.setOutput(job, "test", "initTime", "new_user_total");		//DBOutputFormat.setOutput(job, table, fieldNames);		MysqlDBOutputFormat.setOutput(job, table, fieldNames);		System.exit(job.waitForCompletion(true) ? 0 : 1);	}}</intwritable></intwritable></text></longwritable>
登录后复制


MysqlDBOutputFormat.java

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */import java.io.IOException;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.SQLException;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.classification.InterfaceAudience;import org.apache.hadoop.classification.InterfaceStability;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.OutputCommitter;import org.apache.hadoop.mapreduce.OutputFormat;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.lib.db.DBWritable;import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.StringUtils;/** * A OutputFormat that sends the reduce output to a SQL table. * <p>  * {@link MysqlDBOutputFormat} accepts <key> pairs, where  * key has a type extending DBWritable. Returned {@link RecordWriter}  * writes <b>only the key</b> to the database with a batch SQL query.   *  */@InterfaceAudience.Public@InterfaceStability.Stablepublic class MysqlDBOutputFormat<k extends dbwritable v> extends OutputFormat<k> {  private static final Log LOG = LogFactory.getLog(MysqlDBOutputFormat.class);  public void checkOutputSpecs(JobContext context)       throws IOException, InterruptedException {}  public OutputCommitter getOutputCommitter(TaskAttemptContext context)       throws IOException, InterruptedException {    return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),                                   context);  }  /**   * A RecordWriter that writes the reduce output to a SQL table   */  @InterfaceStability.Evolving  public class DBRecordWriter       extends RecordWriter<k v> {    private Connection connection;    private PreparedStatement statement;    public DBRecordWriter() throws SQLException {    }    public DBRecordWriter(Connection connection        , PreparedStatement statement) throws SQLException {      this.connection = connection;      this.statement = statement;      this.connection.setAutoCommit(false);    }    public Connection getConnection() {      return connection;    }        public PreparedStatement getStatement() {      return statement;    }        /** {@inheritDoc} */    public void close(TaskAttemptContext context) throws IOException {      try {        statement.executeBatch();        connection.commit();      } catch (SQLException e) {        try {          connection.rollback();        }        catch (SQLException ex) {          LOG.warn(StringUtils.stringifyException(ex));        }        throw new IOException(e.getMessage());      } finally {        try {          statement.close();          connection.close();        }        catch (SQLException ex) {          throw new IOException(ex.getMessage());        }      }    }    /** {@inheritDoc} */    public void write(K key, V value) throws IOException {      try {        key.write(statement);        statement.addBatch();      } catch (SQLException e) {        e.printStackTrace();      }    }  }  /**   * Constructs the query used as the prepared statement to insert data.   *    * @param table   *          the table to insert into   * @param fieldNames   *          the fields to insert into. If field names are unknown, supply an   *          array of nulls.   */  public String constructQuery(String table, String[] fieldNames) {		if (fieldNames == null) {			throw new IllegalArgumentException(					"Field names may not be null");		}		StringBuilder query = new StringBuilder();		query.append("UPDATE ").append(table);		System.err.println("fieldNames.length:" + fieldNames.length);		if (fieldNames.length > 0) {			query.append(" SET ");			query.append(fieldNames[1] + " = ?");			query.append("," + fieldNames[2] + " = ?");			query.append("," + fieldNames[3] + " = ?");			query.append("," + fieldNames[4] + " = ?");			query.append("," + fieldNames[5] + " = ?");			query.append("," + fieldNames[6] + " = ?");			query.append(" WHERE ");			query.append(fieldNames[0] + " = ?");			System.err.println(query.toString());			return query.toString();		} else {			return null;		}  }  /** {@inheritDoc} */  public RecordWriter<k v> getRecordWriter(TaskAttemptContext context)       throws IOException {    DBConfiguration dbConf = new DBConfiguration(context.getConfiguration());    String tableName = dbConf.getOutputTableName();    String[] fieldNames = dbConf.getOutputFieldNames();        if(fieldNames == null) {      fieldNames = new String[dbConf.getOutputFieldCount()];    }        try {      Connection connection = dbConf.getConnection();      PreparedStatement statement = null;        statement = connection.prepareStatement(                    constructQuery(tableName, fieldNames));      return new DBRecordWriter(connection, statement);    } catch (Exception ex) {      throw new IOException(ex.getMessage());    }  }  /**   * Initializes the reduce-part of the job with    * the appropriate output settings   *    * @param job The job   * @param tableName The table to insert data into   * @param fieldNames The field names in the table.   */  public static void setOutput(Job job, String tableName,       String... fieldNames) throws IOException {    if(fieldNames.length > 0 && fieldNames[0] != null) {      DBConfiguration dbConf = setOutput(job, tableName);      dbConf.setOutputFieldNames(fieldNames);    } else {      if (fieldNames.length > 0) {        setOutput(job, tableName, fieldNames.length);      }      else {         throw new IllegalArgumentException(          "Field names must be greater than 0");      }    }  }    /**   * Initializes the reduce-part of the job    * with the appropriate output settings   *    * @param job The job   * @param tableName The table to insert data into   * @param fieldCount the number of fields in the table.   */  public static void setOutput(Job job, String tableName,       int fieldCount) throws IOException {    DBConfiguration dbConf = setOutput(job, tableName);    dbConf.setOutputFieldCount(fieldCount);  }    private static DBConfiguration setOutput(Job job,      String tableName) throws IOException {    job.setOutputFormatClass(MysqlDBOutputFormat.class);    job.setReduceSpeculativeExecution(false);    DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());        dbConf.setOutputTableName(tableName);    return dbConf;  }}</k></k></k></k></key></p>
登录后复制


MysqlDBOutputFormat.java是将hadoop源码里边的DBOutputFormat.java拿过来重写了里边的constructQuery的方法,在生成sql语句的时候产生update的sql语句来实现的

DBOutputFormat.java里边的constructQuery

public String constructQuery(String table, String[] fieldNames) {    if(fieldNames == null) {      throw new IllegalArgumentException("Field names may not be null");    }    StringBuilder query = new StringBuilder();    query.append("INSERT INTO ").append(table);    if (fieldNames.length > 0 && fieldNames[0] != null) {      query.append(" (");      for (int i = 0; i <p><br>重写后:</p><pre class="brush:php;toolbar:false">public String constructQuery(String table, String[] fieldNames) {		if (fieldNames == null) {			throw new IllegalArgumentException(					"Field names may not be null");		}		StringBuilder query = new StringBuilder();		query.append("UPDATE ").append(table);		System.err.println("fieldNames.length:" + fieldNames.length);		if (fieldNames.length > 0) {			query.append(" SET ");			query.append(fieldNames[1] + " = ?");			query.append("," + fieldNames[2] + " = ?");			query.append("," + fieldNames[3] + " = ?");			query.append("," + fieldNames[4] + " = ?");			query.append("," + fieldNames[5] + " = ?");			query.append("," + fieldNames[6] + " = ?");			query.append(" WHERE ");			query.append(fieldNames[0] + " = ?");			System.err.println(query.toString());			return query.toString();		} else {			return null;		}  }
登录后复制


按照java的思想我这里其实可以通过extends DBOutputFomat来实现update mysql的功能 ,但是我试了死活不行,暂且记下 有时间再来仔细揣摩

重载方式代码(测试未通过)有知道为什么的还请不吝赐教

import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;import org.apache.hadoop.mapreduce.lib.db.DBWritable;public class MysqlOutputFormat1<k extends dbwritable> extends DBOutputFormat<k v> {	public String constructQuery(String table, String[] fieldNames) {		if (fieldNames == null) {			throw new IllegalArgumentException(					"Field names may not be null");		}		StringBuilder query = new StringBuilder();		query.append("UPDATE ").append(table);		System.err.println("fieldNames.length:"+fieldNames.length);		if (fieldNames.length > 0) {			query.append(" SET ");			query.append(fieldNames[1] + " = ?");			query.append("," + fieldNames[2] + " = ?");			query.append("," + fieldNames[3] + " = ?");			query.append("," + fieldNames[4] + " = ?");			query.append("," + fieldNames[5] + " = ?");			query.append("," + fieldNames[6] + " = ?");			query.append(" WHERE ");			query.append(fieldNames[0] + " = ?");			System.err.println(query.toString());			return query.toString();		} else {			return null;		}	}}</k></k>
登录后复制


另外可以通过我感觉可以通过sql语句的merge方式生成merge语句从而实现有则更新 没有则插入 参考网页http://en.wikipedia.org/wiki/Merge_%28SQL%29

--http://en.wikipedia.org/wiki/Merge_%28SQL%29 MERGE INTO tablename USING table_reference ON (condition)   WHEN MATCHED THEN   UPDATE SET column1 = value1 [, column2 = value2 ...]   WHEN NOT MATCHED THEN   INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...
登录后复制


 

 

本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

说明InnoDB全文搜索功能。 说明InnoDB全文搜索功能。 Apr 02, 2025 pm 06:09 PM

InnoDB的全文搜索功能非常强大,能够显着提高数据库查询效率和处理大量文本数据的能力。 1)InnoDB通过倒排索引实现全文搜索,支持基本和高级搜索查询。 2)使用MATCH和AGAINST关键字进行搜索,支持布尔模式和短语搜索。 3)优化方法包括使用分词技术、定期重建索引和调整缓存大小,以提升性能和准确性。

如何使用Alter Table语句在MySQL中更改表? 如何使用Alter Table语句在MySQL中更改表? Mar 19, 2025 pm 03:51 PM

本文讨论了使用MySQL的Alter Table语句修改表,包括添加/删除列,重命名表/列以及更改列数据类型。

与MySQL中使用索引相比,全表扫描何时可以更快? 与MySQL中使用索引相比,全表扫描何时可以更快? Apr 09, 2025 am 12:05 AM

全表扫描在MySQL中可能比使用索引更快,具体情况包括:1)数据量较小时;2)查询返回大量数据时;3)索引列不具备高选择性时;4)复杂查询时。通过分析查询计划、优化索引、避免过度索引和定期维护表,可以在实际应用中做出最优选择。

可以在 Windows 7 上安装 mysql 吗 可以在 Windows 7 上安装 mysql 吗 Apr 08, 2025 pm 03:21 PM

是的,可以在 Windows 7 上安装 MySQL,虽然微软已停止支持 Windows 7,但 MySQL 仍兼容它。不过,安装过程中需要注意以下几点:下载适用于 Windows 的 MySQL 安装程序。选择合适的 MySQL 版本(社区版或企业版)。安装过程中选择适当的安装目录和字符集。设置 root 用户密码,并妥善保管。连接数据库进行测试。注意 Windows 7 上的兼容性问题和安全性问题,建议升级到受支持的操作系统。

InnoDB中的聚类索引和非簇索引(次级索引)之间的差异。 InnoDB中的聚类索引和非簇索引(次级索引)之间的差异。 Apr 02, 2025 pm 06:25 PM

聚集索引和非聚集索引的区别在于:1.聚集索引将数据行存储在索引结构中,适合按主键查询和范围查询。2.非聚集索引存储索引键值和数据行的指针,适用于非主键列查询。

哪些流行的MySQL GUI工具(例如MySQL Workbench,PhpMyAdmin)是什么? 哪些流行的MySQL GUI工具(例如MySQL Workbench,PhpMyAdmin)是什么? Mar 21, 2025 pm 06:28 PM

文章讨论了流行的MySQL GUI工具,例如MySQL Workbench和PhpMyAdmin,比较了它们对初学者和高级用户的功能和适合性。[159个字符]

您如何处理MySQL中的大型数据集? 您如何处理MySQL中的大型数据集? Mar 21, 2025 pm 12:15 PM

文章讨论了处理MySQL中大型数据集的策略,包括分区,碎片,索引和查询优化。

如何使用Drop Table语句将表放入MySQL中? 如何使用Drop Table语句将表放入MySQL中? Mar 19, 2025 pm 03:52 PM

本文讨论了使用Drop Table语句在MySQL中放下表,并强调了预防措施和风险。它强调,没有备份,该动作是不可逆转的,详细介绍了恢复方法和潜在的生产环境危害。

See all articles