python - 如何使用pykafka consumer进行数据处理并保存?
怪我咯
怪我咯 2017-04-18 10:34:31
0
1
1300

使用本地kafka bin/kafka-console-producer.sh --broker-list kafkaIP:port --topic topicName
创建命令行生产数据,然后打开python

from pykafka import KafkaClient
client = KafkaClient(hosts="192.168.x.x:9092")
topic = client.topics['wr_test']
consumer = topic.get_balanced_consumer(consumer_group='test-consumer-group',auto_commit_enable=True,zookeeper_connect='192.168.x.x:2121')

然后自己编写了简单的一套处理函数,从外部引用。将数据处理后存入elasticsearch 或者 数据库
比如
for msg in consumer:

if msg is not None:
    外部引入的处理函数(msg.value)
    

在python命令行
for msg in consumer:

print msg.offset, msg.value

这时候使用生产者敲入一些数据,在消费端就会就会立即打印出来
但是写成py文件之后,每次运行只会处理最近的生产的一次内容,在生产者中再进行输入一些内容,py文件就不会再进行数据处理了。
所以向问下如何编写能运行后能一直对消费者数据进行处理的函数?要注意哪些地方?

另外,get_balanced_consumer的方法,是连接zookeeper消费
使用topic.get_simple_consumer是直接消费kafka,使用这种方式就提示No handler for...的错误

还有一个疑问,就是实际生产环境日志产生量很快,应该如何编写一个多线程处理方法?

怪我咯
怪我咯

走同样的路,发现不同的人生

répondre à tous(1)
左手右手慢动作

J'ai vu une solution alternative sur le blog de quelqu'un d'autre
http://www.cnblogs.com/castle...
Lisez msg.value du consommateur dans une liste, puis lisez les données de la liste pour le traitement des données. Une fois le processus terminé, affichez les données obtenues dans la liste. Utilisez également try: ... except :... continue

Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal