有时候你可能想使用不是基于JVM的语言开发一个Storm工程,你可能更喜欢使用别的语言或者想使用用某种语言编写的库。 Storm是用Java实现的,你看到的所有这本书中的 spout 和 bolt 都是用java编写的。那么有可能使用像Python、Ruby、或者JavaScript这样的语言
有时候你可能想使用不是基于JVM的语言开发一个Storm工程,你可能更喜欢使用别的语言或者想使用用某种语言编写的库。
Storm是用Java实现的,你看到的所有这本书中的spout和bolt都是用java编写的。那么有可能使用像Python、Ruby、或者JavaScript这样的语言编写spout和bolt吗?答案是当然可以!可以使用多语言协议达到这一目的。
多语言协议是Storm实现的一种特殊的协议,它使用标准输入输出作为spout和bolt进程间的通讯通道。消息以JSON格式或纯文本格式在通道中传递。
我们看一个用非JVM语言开发spout和bolt的简单例子。在这个例子中有一个spout产生从1到10,000的数字,一个bolt过滤素数,二者都用PHP实现。
NOTE:?在这个例子中,我们使用一个很笨的办法验证素数。有更好当然也更复杂的方法,它们已经超出了这个例子的范围。
有一个专门为Storm实现的PHP DSL(译者注:领域特定语言),我们将会在例子中展示我们的实现。首先定义拓扑。
1 2 3 4 5 6 7 | ...
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout( "numbers-generator" , new NumberGeneratorSpout(1, 10000));
builder.setBolt( "prime-numbers-filter" , new
PrimeNumbersFilterBolt()).shuffleGrouping( "numbers-generator" );
StormTopology topology = builder.createTopology();
...
|
登录后复制
NOTE:有一种使用非JVM语言定义拓扑的方式。既然Storm拓扑是Thrift架构,而且Nimbus是一个Thrift守护进程,你就可以使用任何你想用的语言创建并提交拓扑。但是这已经超出了本书的范畴了。
这里没什么新鲜了。我们看一下NumbersGeneratorSpout的实现。
1 2 3 4 5 6 7 8 9 10 11 | public class NumberGeneratorSpout extends ShellSpout implements IRichSpout {
public NumberGeneratorSpout(Integer from, Integer to) {
super( "php" , "-f" , "NumberGeneratorSpout.php" , from.toString(), to.toString());
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer. declare ( new Fields( "number" ));
}
public Map getComponentConfiguration() {
return null;
}
}
|
登录后复制
你可能已经注意到了,这个spout继承了ShellSpout。这是个由Storm提供的特殊的类,用来帮助你运行并控制用其它语言编写的spout。在这种情况下它告诉Storm如何执行你的PHP脚本。
NumberGeneratorSpout的PHP脚本向标准输出分发元组,并从标准输入读取确认或失败信号。
在开始实现NumberGeneratorSpout.php脚本之前,多观察一下多语言协议是如何工作的。
spout按照传递给构造器的参数从from到to顺序生成数字。
接下来看看PrimeNumbersFilterBolt。这个类实现了之前提到的壳。它告诉Storm如何执行你的PHP脚本。Storm为这一目的提供了一个特殊的叫做ShellBolt的类,你惟一要做的事就是指出如何运行脚本以及声明要分发的属性。
1 2 3 4 5 6 7 8 | public class PrimeNumbersFilterBolt extends ShellBolt implements IRichBolt {
public PrimeNumbersFilterBolt() {
super( "php" , "-f" , "PrimeNumbersFilterBolt.php" );
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer. declare ( new Fields( "number" ));
}
}
|
登录后复制
在这个构造器中只是告诉Storm如何运行PHP脚本。它与下列命令等价。
1 | php -f PrimeNumbersFilterBolt.php
|
登录后复制
PrimeNumbersFilterBolt.php脚本从标准输入读取元组,处理它们,然后向标准输出分发、确认或失败。在开始这个脚本之前,我们先多了解一些多语言协议的工作方式。
- 发起一次握手
- 开始循环
- 读/写元组
NOTE:有一种特殊的方式可以使用Storm的内建日志机制在你的脚本中记录日志,所以你不需要自己实现日志系统。
下面我们来看一看上述每一步的细节,以及如何用PHP实现它。
发起握手
为了控制整个流程(开始以及结束它),Storm需要知道它执行的脚本进程号(PID)。根据多语言协议,你的进程开始时发生的第一件事就是Storm要向标准输入(译者注:根据上下文理解,本章提到的标准输入输出都是从非JVM语言的角度理解的,这里提到的标准输入也就是PHP的标准输入)发送一段JSON数据,它包含Storm配置、拓扑上下文和一个进程号目录。它看起来就像下面的样子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | {
"conf" : {
"topology.message.timeout.secs" : 3,
},
"context" : {
"task->component" : {
"1" : "example-spout" ,
"2" : "__acker" ,
"3" : "example-bolt"
},
"taskid" : 3
},
"pidDir" : "..."
}
|
登录后复制
脚本进程必须在pidDir指定的目录下以自己的进程号为名字创建一个文件,并以JSON格式把进程号写到标准输出。
举个例子,如果你收到/tmp/examplen而你的脚本进程号是123,你应该创建一个名为/tmp/example/123的空文件并向标准输出打印文本行?{“pid”: 123}n(译者注:此处原文只有一个n,译者猜测应是排版错误)和endn。这样Storm就能持续追踪进程号并在它关闭时杀死脚本进程。下面是PHP实现:
1 2 3 4 5 6 | $config = json_decode(read_msg(), true);
$heartbeatdir = $config [ 'pidDir' ];
$pid = getmypid ();
fclose( fopen ( "$heartbeatdir/$pid" , "w" ));
storm_send([ "pid" => $pid ]);
flush ();
|
登录后复制
你已经实现了一个叫做read_msg的函数,用来处理从标准输入读取的消息。按照多语言协议的声明,消息可以是单行或多行JSON文本。一条消息以end\n结束。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | function read_msg() {
$msg = "" ;
while (true) {
$l = fgets (STDIN);
$line = substr ( $l ,0,-1);
if ( $line == "end" ) {
break ;
}
$msg = "$msg$line\n" ;
}
return substr ( $msg , 0, -1);
}
function storm_send( $json ) {
write_line(json_encode( $json ));
write_line( "end" );
}
function write_line( $line ) {
echo ( "$line\n" );
}
|
登录后复制
NOTE:flush()方法非常重要;有可能字符缓冲只有在积累到一定程度时才会清空。这意味着你的脚本可能会为了等待一个来自Storm的输入而永远挂起,而Storm却在等待来自你的脚本的输出。因此当你的脚本有内容输出时立即清空缓冲是很重要的。
开始循环以及读/写元组
这是整个工作中最重要的一步。这一步的实现取决于你开发的spout和bolt。
如果是spout,你应当开始分发元组。如果是bolt,就循环读取元组,处理它们,分发它发,确认成功或失败。
下面我们就看看用来分发数字的spout。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | $from = intval ( $argv [1]);
$to = intval ( $argv [2]);
while (true) {
$msg = read_msg();
$cmd = json_decode( $msg , true);
if ( $cmd [ 'command' ]== 'next' ) {
if ( $from < $to ) {
storm_emit( array ( "$from" ));
$task_ids = read_msg();
$from ++;
} else {
sleep(1);
}
}
storm_sync();
}
|
登录后复制
从命令行获取参数from和to,并开始迭代。每次从Storm得到一条next消息,这意味着你已准备好分发下一个元组。
一旦你发送了所有的数字,而且没有更多元组可发了,就休眠一段时间。
为了确保脚本已准备好发送下一个元组,Storm会在发送下一条之前等待sync\n文本行。调用read_msg(),读取一条命令,解析JSON。
对于bolts来说,有少许不同。
1 2 3 4 5 6 7 8 9 10 | while (true) {
$msg = read_msg();
$tuple = json_decode( $msg , true, 512, JSON_BIGINT_AS_STRING);
if (! empty ( $tuple [ "id" ])) {
if (isPrime( $tuple [ "tuple" ][0])) {
storm_emit( array ( $tuple [ "tuple" ][0]));
}
storm_ack( $tuple [ "id" ]);
}
}
|
登录后复制
循环的从标准输入读取元组。解析读取每一条JSON消息,判断它是不是一个元组,如果是,再检查它是不是一个素数,如果是素数再次分发一个元组,否则就忽略掉,最后不论如何都要确认成功。
NOTE:在json_decode函数中使用的JSON_BIGINT_AS_STRING是为了解决一个在JAVA和PHP之间的数据转换问题。JAVA发送的一些很大的数字,在PHP中会丢失精度,这样就会导致问题。为了避开这个问题,告诉PHP把大数字当作字符串处理,并在JSON消息中输出数字时不使用双引号。PHP5.4.0或更高版本要求使用这个参数。
emit,ack,fail,以及log消息都是如下结构:
emit
1 2 3 4 | {
"command" : "emit" ,
"tuple" : [ "foo" , "bar" ]
}
|
登录后复制
其中的数组包含了你分发的元组数据。
ack
1 2 3 4 | {
"command" : "ack" ,
"id" : 123456789
}
|
登录后复制
其中的id就是你处理的元组的ID。
fail
1 2 3 4 | {
"command" : "fail" ,
"id" : 123456789
}
|
登录后复制
与ack(译者注:原文是emit从上下JSON的内容和每个方法的功能上判断此处就是ack,可能是排版错误)相同,其中id就是你处理的元组ID。
log
1 2 3 4 | {
"command" : "log" ,
"msg" : "some message to be logged by storm."
}
|
登录后复制
下面是完整的的PHP代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 | <!--?php
function read_msg() {
$msg = "" ;
while (true) {
$l = fgets (STDIN);
$line = substr ( $l ,0,-1);
if ( $line == "end" ) {
break ;
}
$msg = "$msg$line\n" ;
}
return substr ( $msg , 0, -1);
}
function write_line( $line ) {
echo ( "$line\n" );
}
function storm_emit( $tuple ) {
$msg = array ( "command" =--> "emit" , "tuple" => $tuple );
storm_send( $msg );
}
function storm_send( $json ) {
write_line(json_encode( $json ));
write_line( "end" );
}
function storm_sync() {
storm_send( array ( "command" => "sync" ));
}
function storm_log( $msg ) {
$msg = array ( "command" => "log" , "msg" => $msg );
storm_send( $msg );
flush ();
}
$config = json_decode(read_msg(), true);
$heartbeatdir = $config [ 'pidDir' ];
$pid = getmypid ();
fclose( fopen ( "$heartbeatdir/$pid" , "w" ));
storm_send([ "pid" => $pid ]);
flush ();
$from = intval ( $argv [1]);
$to = intval ( $argv [2]);
while (true) {
$msg = read_msg();
$cmd = json_decode( $msg , true);
if ( $cmd [ 'command' ]== 'next' ) {
if ( $from < $to ) {
storm_emit( array ( "$from" ));
$task_ids = read_msg();
$from ++;
} else {
sleep(1);
}
}
storm_sync();
}
?>
<!--?php
function isPrime( $number ) {
if ( $number < 2) {
return false;
}
if ( $number ==2) {
return true;
}
for ( $i =2; $i <= $number -1; $i ++) {
if ( $number % $i == 0) {
return false;
}
}
return true;
}
function read_msg() {
$msg = "" ;
while (true) {
$l = fgets (STDIN);
$line = substr ( $l ,0,-1);
if ( $line == "end" ) {
break ;
}
$msg = "$msg$line\n" ;
}
return substr ( $msg , 0, -1);
}
function write_line( $line ) {
echo ( "$line\n" );
}
function storm_emit( $tuple ) {
$msg = array ( "command" =--> "emit" , "tuple" => $tuple );
storm_send( $msg );
}
function storm_send( $json ) {
write_line(json_encode( $json ));
write_line( "end" );
}
function storm_ack( $id ) {
storm_send([ "command" => "ack" , "id" => "$id" ]);
}
function storm_log( $msg ) {
$msg = array ( "command" => "log" , "msg" => "$msg" );
storm_send( $msg );
}
$config = json_decode(read_msg(), true);
$heartbeatdir = $config [ 'pidDir' ];
$pid = getmypid ();
fclose( fopen ( "$heartbeatdir/$pid" , "w" ));
storm_send([ "pid" => $pid ]);
flush ();
while (true) {
$msg = read_msg();
$tuple = json_decode( $msg , true, 512, JSON_BIGINT_AS_STRING);
if (! empty ( $tuple [ "id" ])) {
if (isPrime( $tuple [ "tuple" ][0])) {
storm_emit( array ( $tuple [ "tuple" ][0]));
}
storm_ack( $tuple [ "id" ]);
}
}
?>
|
登录后复制
NOTE:需要重点指出的是,应当把所有的脚本文件保存在你的工程目录下的一个名为multilang/resources的子目录中。这个子目录被包含在发送给工人进程的jar文件中。如果你不把脚本包含在这个目录中,Storm就不能运行它们,并抛出一个错误。
(全文完)如果您喜欢此文请点赞,分享,评论。
-
原创文章转载请注明出处:Storm入门之第7章使用非JVM语言开发
-
小额赞助本站::我要赞助