sqoop client java api将mysql的数据导到hdfs
mysqlsqoophdfshadoop
package com.hadoop.recommend; import org.apache.sqoop.client.SqoopClient;import org.apache.sqoop.model.MDriverConfig; import org.apache.sqoop.model.MFromConfig;import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; import org.apache.sqoop.model.MLinkConfig; import org.apache.sqoop.model.MSubmission; import org.apache.sqoop.model.MToConfig; import org.apache.sqoop.submission.counter.Counter; import org.apache.sqoop.submission.counter.CounterGroup; import org.apache.sqoop.submission.counter.Counters; import org.apache.sqoop.validation.Status; public class MysqlToHDFS { public static void main(String[] args) { sqoopTransfer(); } public static void sqoopTransfer() { //初始化 String url = "http://master:12000/sqoop/"; SqoopClient client = new SqoopClient(url); //创建一个源链接 JDBC long fromConnectorId = 2; MLink fromLink = client.createLink(fromConnectorId); fromLink.setName("JDBC connector"); fromLink.setCreationUser("hadoop"); MLinkConfig fromLinkConfig = fromLink.getConnectorLinkConfig(); fromLinkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://master:3306/hive"); fromLinkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver"); fromLinkConfig.getStringInput("linkConfig.username").setValue("root"); fromLinkConfig.getStringInput("linkConfig.password").setValue(""); Status fromStatus = client.saveLink(fromLink); if(fromStatus.canProceed()) { System.out.println("创建JDBC Link成功,ID为: " + fromLink.getPersistenceId()); } else { System.out.println("创建JDBC Link失败"); } //创建一个目的地链接HDFS long toConnectorId = 1; MLink toLink = client.createLink(toConnectorId); toLink.setName("HDFS connector"); toLink.setCreationUser("hadoop"); MLinkConfig toLinkConfig = toLink.getConnectorLinkConfig(); toLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://master:9000/"); Status toStatus = client.saveLink(toLink); if(toStatus.canProceed()) { System.out.println("创建HDFS Link成功,ID为: " + toLink.getPersistenceId()); } else { System.out.println("创建HDFS Link失败"); } //创建一个任务 long fromLinkId = fromLink.getPersistenceId(); long toLinkId = toLink.getPersistenceId(); MJob job = client.createJob(fromLinkId, toLinkId); job.setName("MySQL to HDFS job"); job.setCreationUser("hadoop"); //设置源链接任务配置信息 MFromConfig fromJobConfig = job.getFromJobConfig(); fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("sqoop"); fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("sqoop"); fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id"); MToConfig toJobConfig = job.getToJobConfig(); toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/user/hdfs/recommend"); MDriverConfig driverConfig = job.getDriverConfig(); driverConfig.getStringInput("throttlingConfig.numExtractors").setValue("3"); Status status = client.saveJob(job); if(status.canProceed()) { System.out.println("JOB创建成功,ID为: "+ job.getPersistenceId()); } else { System.out.println("JOB创建失败。"); } //启动任务 long jobId = job.getPersistenceId(); MSubmission submission = client.startJob(jobId); System.out.println("JOB提交状态为 : " + submission.getStatus()); while(submission.getStatus().isRunning() && submission.getProgress() != -1) { System.out.println("进度 : " + String.format("%.2f %%", submission.getProgress() * 100)); //三秒报告一次进度 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("JOB执行结束... ..."); System.out.println("Hadoop任务ID为 :" + submission.getExternalId()); Counters counters = submission.getCounters(); if(counters != null) { System.out.println("计数器:"); for(CounterGroup group : counters) { System.out.print("\t"); System.out.println(group.getName()); for(Counter counter : group) { System.out.print("\t\t"); System.out.print(counter.getName()); System.out.print(": "); System.out.println(counter.getValue()); } } } if(submission.getExceptionInfo() != null) { System.out.println("JOB执行异常,异常信息为 : " +submission.getExceptionInfo()); } System.out.println("MySQL通过sqoop传输数据到HDFS统计执行完毕"); } }
报了这个错失咋回事??

热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

Video Face Swap
使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

SublimeText3汉化版
中文版,非常好用

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

热门话题











全表扫描在MySQL中可能比使用索引更快,具体情况包括:1)数据量较小时;2)查询返回大量数据时;3)索引列不具备高选择性时;4)复杂查询时。通过分析查询计划、优化索引、避免过度索引和定期维护表,可以在实际应用中做出最优选择。

InnoDB的全文搜索功能非常强大,能够显着提高数据库查询效率和处理大量文本数据的能力。 1)InnoDB通过倒排索引实现全文搜索,支持基本和高级搜索查询。 2)使用MATCH和AGAINST关键字进行搜索,支持布尔模式和短语搜索。 3)优化方法包括使用分词技术、定期重建索引和调整缓存大小,以提升性能和准确性。

是的,可以在 Windows 7 上安装 MySQL,虽然微软已停止支持 Windows 7,但 MySQL 仍兼容它。不过,安装过程中需要注意以下几点:下载适用于 Windows 的 MySQL 安装程序。选择合适的 MySQL 版本(社区版或企业版)。安装过程中选择适当的安装目录和字符集。设置 root 用户密码,并妥善保管。连接数据库进行测试。注意 Windows 7 上的兼容性问题和安全性问题,建议升级到受支持的操作系统。

聚集索引和非聚集索引的区别在于:1.聚集索引将数据行存储在索引结构中,适合按主键查询和范围查询。2.非聚集索引存储索引键值和数据行的指针,适用于非主键列查询。

MySQL是一个开源的关系型数据库管理系统。1)创建数据库和表:使用CREATEDATABASE和CREATETABLE命令。2)基本操作:INSERT、UPDATE、DELETE和SELECT。3)高级操作:JOIN、子查询和事务处理。4)调试技巧:检查语法、数据类型和权限。5)优化建议:使用索引、避免SELECT*和使用事务。

MySQL 数据库中,用户和数据库的关系通过权限和表定义。用户拥有用户名和密码,用于访问数据库。权限通过 GRANT 命令授予,而表由 CREATE TABLE 命令创建。要建立用户和数据库之间的关系,需创建数据库、创建用户,然后授予权限。

MySQL支持四种索引类型:B-Tree、Hash、Full-text和Spatial。1.B-Tree索引适用于等值查找、范围查询和排序。2.Hash索引适用于等值查找,但不支持范围查询和排序。3.Full-text索引用于全文搜索,适合处理大量文本数据。4.Spatial索引用于地理空间数据查询,适用于GIS应用。

MySQL 和 MariaDB 可以共存,但需要谨慎配置。关键在于为每个数据库分配不同的端口号和数据目录,并调整内存分配和缓存大小等参数。连接池、应用程序配置和版本差异也需要考虑,需要仔细测试和规划以避免陷阱。在资源有限的情况下,同时运行两个数据库可能会导致性能问题。
