将Hadoop RPC框架应用于多节点任务调度
在hadoop中,主从节点之间保持着心跳通信,用于传输节点状态信息、任务调度信息以及节点动作信息等等。 hdfs的namenode与datanode,mapreduce的jobtracker与tasktracker,hbase的hmaster与 regionserver之间的通信,都是基于hadoop RPC。Hadoop RPC是hadoop
在hadoop中,主从节点之间保持着心跳通信,用于传输节点状态信息、任务调度信息以及节点动作信息等等。 hdfs的namenode与datanode,mapreduce的jobtracker与tasktracker,hbase的hmaster与 regionserver之间的通信,都是基于hadoop RPC。Hadoop RPC是hadoop里非常基础的通信框架。hadoop 2.0以前hadoop RPC的数据序列化是通过实现自己定义的Writable接口实现,而从hadoop 2.0开始,数据的序列化工作交给了ProtocolBuffer去做。关于Hadoop RPC的实现原理已经有很多文章进行了详细的介绍(源码级强力分析hadoop的RPC机制,Hadoop基于Protocol Buffer的RPC实现代码分析-Server端,带有HA功能的Hadoop Client端RPC实现原理与代码分析),这里就不在赘述了。下面就直接引入问题和方案吧。
问题
工作中经常需要在定时任务系统上写一些定时任务,随着业务规模的增长和扩大,需要定时处理的任务越来越多,任务之间的执行间隔越来越小,某一时间段内(比如0点、整点或半点)执行的任务会越来越密集,只在一台机器上执行这些任务的话,会出现较大的风险:
- 任务并发度较高时,单机的系统资源将成为瓶颈
- 如果一个任务的运行占用了整个机器的大部分资源,比如sql查询耗费巨大内存和CPU资源,将直接影响其他任务的运行
- 任务失败后,如果仍然在同一台节点自动重新执行,失败率较高
- 机器宕机后,必须第一时间重启机器或重新部署定时任务系统,所有任务都不能按时执行
- 等等
方案
可想而知的是,可以通过将定时任务系统进行分布式改造,使用多个节点执行任务,将任务分发到不同节点上进行处理,并且完善失败重试机制,从而提高系统稳定性,实现任务系统的高可靠。
既然是在多个节点之间分发任务,肯定得有个任务的管理者(主节点),在我们现有的系统中,也就是一套可以部署定时任务的web系统,任务代码更新后,部署好这套web系统,即可通过web页面设置定时任务并且进行调度(在单个节点上执行)。执行任务的节点(子节点)有多个以后,如何分发任务到子节点呢,我们可以把任务的信息封装成一个bean,通过RPC发布给子节点,子节点通过这个任务bean获得任务信息,并在指定的时刻执行任务。同时,子节点可以通过与主节点的心跳通信将节点状态和执行任务的情况告诉主节点。这样其实就与hadoop mapreduce分发任务有点相似了,呵呵,这里主节点与子节点之间的通信,我们就可以通过Hadoop RPC框架来实现了,不同的是,我们分发的任务是定时任务,发布任务时需要将任务的定时信息一并发给子节点。
实现
单点的定时任务系统是基于Quartz的,在分布式环境下,可以继续基于Quartz进行改造,任务的定时信息可以通过Quartz中的JobDetail和Trigger对象来描述并封装,加上任务执行的入口类信息,再通过RPC由主节点发给子节点。子节点收到封装好的任务信息对象后,再构造JobDetail和Trigger,设置好启动时间后,通过入口类启动任务。下面是一个简单的demo。
以下是一个简单的定时任务信息描述对象CronJobInfo,包括JobDetailInfo和TriggerInfo两个属性:
/** * 定时任务信息,包括任务信息和触发器信息 */ public class CronJobInfo implements Writable { private JobDetailInfo jobDetailInfo = new JobDetailInfo(); private TriggerInfo triggerInfo = new TriggerInfo(); @Override public void readFields(DataInput in) throws IOException { jobDetailInfo.readFields(in); triggerInfo.readFields(in); } @Override public void write(DataOutput out) throws IOException { jobDetailInfo.write(out); triggerInfo.write(out); } // getters and setters... }
任务信息JobDetailInfo,由主节点构造,子节点解析构造JobDetail对象:
public class JobDetailInfo implements Writable { private String name; // 任务名称 private String group = Scheduler.DEFAULT_GROUP; // 任务组 private String description; // 任务描述 private Class jobClass; // 任务的启动类 private JobDataMap jobDataMap; // 任务所需的参数,用来给作业提供数据支持的数据结构 private boolean volatility = false; // 重启应用之后是否删除任务的相关信息, private boolean durability = false; // 任务完成之后是否依然保留到数据库 private boolean shouldRecover = false; // 应用重启之后时候忽略过期任务 @Override public void readFields(DataInput in) throws IOException { name = WritableUtils.readString(in); group = WritableUtils.readString(in); description = WritableUtils.readString(in); String className = WritableUtils.readString(in); if (className != null) { try { jobClass = Class.forName(new String(className)); } catch (ClassNotFoundException e) { e.printStackTrace(); } } int dataMapSize = WritableUtils.readVInt(in); while (dataMapSize-- > 0) { String key = WritableUtils.readString(in); String value = WritableUtils.readString(in); jobDataMap.put(key, value); } volatility = in.readBoolean(); durability = in.readBoolean(); shouldRecover = in.readBoolean(); } @Override public void write(DataOutput out) throws IOException { WritableUtils.writeString(out, name); WritableUtils.writeString(out, group); WritableUtils.writeString(out, description); WritableUtils.writeString(out, jobClass.getName()); if (jobDataMap == null) WritableUtils.writeVInt(out, 0); else { WritableUtils.writeVInt(out, jobDataMap.size()); for (Object k : jobDataMap.keySet()) { WritableUtils.writeString(out, k.toString()); WritableUtils.writeString(out, jobDataMap.get(k).toString()); } } out.writeBoolean(volatility); out.writeBoolean(durability); out.writeBoolean(shouldRecover); } //getters and setters //..... }
任务触发器信息TriggerInfo ,由主节点构造,子节点解析构造Trigger对象:
public class TriggerInfo implements Writable { private String name; // trigger名称 private String group = Scheduler.DEFAULT_GROUP; // triger组名称 private String description; // trigger描述 private Date startTime; // 启动时间 private Date endTime; // 结束时间 private long repeatInterval; // 重试时间间隔 private int repeatCount; //重试次数 @Override public void readFields(DataInput in) throws IOException { name = WritableUtils.readString(in); group = WritableUtils.readString(in); description = WritableUtils.readString(in); long start = in.readLong(); startTime = start==0 ? null : new Date(start); long end = in.readLong(); endTime = end==0 ? null : new Date(end); repeatInterval = in.readLong(); repeatCount = in.readInt(); } @Override public void write(DataOutput out) throws IOException { WritableUtils.writeString(out, name); WritableUtils.writeString(out, group); WritableUtils.writeString(out, description); out.writeLong(startTime == null ? 0 : startTime.getTime()); out.writeLong(endTime == null ? 0 : endTime.getTime()); out.writeLong(repeatInterval); out.writeInt(repeatCount); } //getters and setters //..... }
主从节点通信的协议:
public interface TaskProtocol extends VersionedProtocol { public CronJobInfo hearbeat(); }
在这个demo中,主节点启动后,启动RPC server线程,等待客户端(子节点)的连接,当客户端调用heartbeat方法时,主节点将会生成一个任务信息返回给客户端:
public class TaskScheduler implements TaskProtocol { private Logger logger = Logger.getLogger(getClass()); private Server server; public TaskScheduler() { try { server = RPC.getServer(this, "192.168.1.101", 8888, new Configuration()); server.start(); server.join(); } catch (UnknownHostException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public long getProtocolVersion(String arg0, long arg1) throws IOException { return 1; } @Override public CronJobInfo generateCronJob() { // 1、创建JobDetial对象 JobDetailInfo detail = new JobDetailInfo(); // 设置工作项 detail.setJobClass(DemoTask.class); detail.setName("MyJob_1"); detail.setGroup("JobGroup_1"); // 2、创建Trigger对象 TriggerInfo trigger = new TriggerInfo(); trigger.setName("Trigger_1"); trigger.setGroup("Trigger_Group_1"); trigger.setStartTime(new Date()); // 设置重复停止时间,并销毁该Trigger对象 Calendar c = Calendar.getInstance(); c.setTimeInMillis(System.currentTimeMillis() + 1000 * 1L); trigger.setEndTime(c.getTime()); // 设置重复间隔时间 trigger.setRepeatInterval(1000 * 1L); // 设置重复执行次数 trigger.setRepeatCount(3); CronJobInfo info = new CronJobInfo(); info.setJobDetailInfo(detail); info.setTriggerInfo(trigger); return info; } public static void main(String[] args) { TaskScheduler ts = new TaskScheduler(); } }
demo任务类,打印信息:
public class DemoTask implements Job { public void execute(JobExecutionContext context) throws JobExecutionException { System.out.println(this + ": executing task @" + new Date()); } }
子节点demo,启动后连接主节点,远程调用generateCronJob方法,获得一个任务描述信息,并启动定时任务
public class TaskRunner { private Logger logger = Logger.getLogger(getClass()); private TaskProtocol proxy; public TaskRunner() { InetSocketAddress addr = new InetSocketAddress("localhost", 8888); try { proxy = (TaskProtocol) RPC.waitForProxy(TaskProtocol.class, 1, addr, new Configuration()); } catch (IOException e) { e.printStackTrace(); } } public void close() { RPC.stopProxy(proxy); } /** * 从server获取一个定时任务 */ public void getCronJob() { CronJobInfo info = proxy.generateCronJob(); JobDetail jobDetail = getJobDetail(info.getJobDetailInfo()); SimpleTrigger trigger = getTrigger(info.getTriggerInfo()); // 创建Scheduler对象,并配置JobDetail和Trigger对象 SchedulerFactory sf = new StdSchedulerFactory(); Scheduler scheduler = null; try { scheduler = sf.getScheduler(); scheduler.scheduleJob(jobDetail, trigger); // 执行启动操作 scheduler.start(); } catch (SchedulerException e) { e.printStackTrace(); } } /** * @param jobDetailInfo * @return */ private JobDetail getJobDetail(JobDetailInfo info) { JobDetail detail = new JobDetail(); detail.setName(info.getName()); detail.setGroup(info.getGroup()); detail.setDescription(info.getDescription()); detail.setJobClass(info.getJobClass()); detail.setJobDataMap(info.getJobDataMap()); detail.setRequestsRecovery(info.isShouldRecover()); detail.setDurability(info.isDurability()); detail.setVolatility(info.isVolatility()); logger.info("client get jobdetail:" + detail); return detail; } /** * @param triggerInfo * @return */ private SimpleTrigger getTrigger(TriggerInfo info) { SimpleTrigger trigger = new SimpleTrigger(); trigger.setName(info.getName()); trigger.setGroup(info.getGroup()); trigger.setDescription(info.getDescription()); trigger.setStartTime(info.getStartTime()); trigger.setEndTime(info.getEndTime()); trigger.setRepeatInterval(info.getRepeatInterval()); trigger.setRepeatCount(info.getRepeatCount()); logger.info("client get trigger:" + trigger); return trigger; } public static void main(String[] args) { TaskRunner t = new TaskRunner(); t.getCronJob(); t.close(); } }
先启动TaskScheduler,再启动TaskRunner,结果如下:
- TaskScheduler:
2013-01-20 15:42:21,661 [Socket Reader #1 for port 8888] INFO? [org.apache.hadoop.ipc.Server] – Starting Socket Reader #1 for port 8888
2013-01-20 15:42:21,662 [main] INFO? [org.apache.hadoop.ipc.metrics.RpcMetrics] – Initializing RPC Metrics with hostName=TaskScheduler, port=8888
2013-01-20 15:42:21,706 [main] INFO? [org.apache.hadoop.ipc.metrics.RpcDetailedMetrics] – Initializing RPC Metrics with hostName=TaskScheduler, port=8888
2013-01-20 15:42:21,710 [IPC Server listener on 8888] INFO? [org.apache.hadoop.ipc.Server] – IPC Server listener on 8888: starting
2013-01-20 15:42:21,711 [IPC Server Responder] INFO? [org.apache.hadoop.ipc.Server] – IPC Server Responder: starting
2013-01-20 15:42:21,711 [IPC Server handler 0 on 8888] INFO? [org.apache.hadoop.ipc.Server] – IPC Server handler 0 on 8888: starting
2013-01-20 15:42:24,084 [IPC Server handler 0 on 8888] INFO? [org.mh.rpc.task.TaskScheduler] – generate a task: org.mh.rpc.task.JobDetailInfo@1f26605
- TaskRunner:
2013-01-20 15:42:26,323 [main] INFO? [org.mh.rpc.task.TaskRunner] – client get jobdetail:JobDetail ‘JobGroup_1.MyJob_1′:? jobClass: ‘org.mh.rpc.quartz.GetSumTask isStateful: false isVolatile: false isDurable: false requestsRecovers: false
2013-01-20 15:42:26,329 [main] INFO? [org.mh.rpc.task.TaskRunner] – client get trigger:Trigger ‘Trigger_Group_1.Trigger_1′:? triggerClass: ‘org.quartz.SimpleTrigger isVolatile: false calendar: ‘null’ misfireInstruction: 0 nextFireTime: null
2013-01-20 15:42:26,382 [main] INFO? [org.quartz.simpl.SimpleThreadPool] – Job execution threads will use class loader of thread: main
2013-01-20 15:42:26,411 [main] INFO? [org.quartz.core.SchedulerSignalerImpl] – Initialized Scheduler Signaller of type: class org.quartz.core.SchedulerSignalerImpl
2013-01-20 15:42:26,411 [main] INFO? [org.quartz.core.QuartzScheduler] – Quartz Scheduler v.1.6.5 created.
2013-01-20 15:42:26,413 [main] INFO? [org.quartz.simpl.RAMJobStore] – RAMJobStore initialized.
2013-01-20 15:42:26,413 [main] INFO? [org.quartz.impl.StdSchedulerFactory] – Quartz scheduler ‘DefaultQuartzScheduler’ initialized from default resource file in Quartz package: ‘quartz.properties’
2013-01-20 15:42:26,413 [main] INFO? [org.quartz.impl.StdSchedulerFactory] – Quartz scheduler version: 1.6.5
2013-01-20 15:42:26,415 [main] INFO? [org.quartz.core.QuartzScheduler] – Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED started.
org.mh.rpc.quartz.DemoTask@1b66b06: executing task @Sun Jan 20 15:42:26 CST 2013
上面是一个简单的demo,演示了如何通过RPC将任务调度给节点去执行,对于Quartz来说,任务的形式可以千变万化,关键就看怎么去使用了,分发到多个节点上执行的话,就还需要对任务的信息做更多的封装了。
(本文已被阅读2次)


原文地址:将Hadoop RPC框架应用于多节点任务调度, 感谢原作者分享。

热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

AI Hentai Generator
免费生成ai无尽的。

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

SublimeText3汉化版
中文版,非常好用

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

评估Java框架商业支持的性价比涉及以下步骤:确定所需的保障级别和服务水平协议(SLA)保证。研究支持团队的经验和专业知识。考虑附加服务,如升级、故障排除和性能优化。权衡商业支持成本与风险缓解和提高效率。

轻量级PHP框架通过小体积和低资源消耗提升应用程序性能。其特点包括:体积小,启动快,内存占用低提升响应速度和吞吐量,降低资源消耗实战案例:SlimFramework创建RESTAPI,仅500KB,高响应性、高吞吐量

编写清晰全面的文档对于Golang框架至关重要。最佳实践包括:遵循既定文档风格,例如Google的Go编码风格指南。使用清晰的组织结构,包括标题、子标题和列表,并提供导航。提供全面准确的信息,包括入门指南、API参考和概念。使用代码示例说明概念和使用方法。保持文档更新,跟踪更改并记录新功能。提供支持和社区资源,例如GitHub问题和论坛。创建实际案例,如API文档。

根据应用场景选择最佳Go框架:考虑应用类型、语言特性、性能需求、生态系统。常见Go框架:Gin(Web应用)、Echo(Web服务)、Fiber(高吞吐量)、gorm(ORM)、fasthttp(速度)。实战案例:构建RESTAPI(Fiber),与数据库交互(gorm)。选择框架:性能关键选fasthttp,灵活Web应用选Gin/Echo,数据库交互选gorm。

PHP框架的学习曲线取决于语言熟练度、框架复杂性、文档质量和社区支持。与Python框架相比,PHP框架的学习曲线更高,而与Ruby框架相比,则较低。与Java框架相比,PHP框架的学习曲线中等,但入门时间较短。

根据基准测试,对于小型、高性能应用程序,Quarkus(快速启动、低内存)或Micronaut(TechEmpower优异)是理想选择。SpringBoot适用于大型、全栈应用程序,但启动时间和内存占用稍慢。

在Go框架开发中,常见的挑战及其解决方案是:错误处理:利用errors包进行管理,并使用中间件集中处理错误。身份验证和授权:集成第三方库并创建自定义中间件来检查凭据。并发处理:利用goroutine、互斥锁和通道来控制资源访问。单元测试:使用gotest包,模拟和存根进行隔离,并使用代码覆盖率工具确保充分性。部署和监控:使用Docker容器打包部署,设置数据备份,通过日志记录和监控工具跟踪性能和错误。

选择Go框架时,关键性能指标(KPI)包括:响应时间、吞吐量、并发能力和资源使用。通过基准测试和比较框架的KPI,开发人员可以根据应用程序需求进行明智的选择,考虑预期负载、性能关键部分和资源限制。
