from ccnet.sync_client import SyncClient
import Queue
class ClientPool(object):
"""ccnet client pool."""
def __init__(self, conf_dir, pool_size=5):
"""
:param conf_dir: the ccnet configuration directory
:param pool_size:
"""
self.conf_dir = conf_dir
self.pool_size = pool_size
self._pool = Queue.Queue(pool_size)
def _create_client(self):
client = SyncClient(self.conf_dir)
client.req_ids = {}
client.connect_daemon()
return client
def get_client(self):
try:
client = self._pool.get(False)
except:
client = self._create_client()
return client
def return_client(self, client):
try:
self._pool.put(client, False)
except Queue.Full:
pass
class SyncClient(Client):
'''sync mode client'''
def __init__(self, config_dir):
Client.__init__(self, config_dir)
self._req_id = _REQ_ID_START
self.mq_req_id = -1
def disconnect_daemon(self):
if self.is_connected():
try:
self._connfd.close()
except:
pass
def read_response(self):
packet = read_packet(self._connfd)
if packet.header.ptype != CCNET_MSG_RESPONSE:
raise RuntimeError('Invalid Response')
code, code_msg, content = parse_response(packet.body)
return Response(code, code_msg, content)
def send_cmd(self, cmd):
req_id = self.get_request_id()
self.send_request(req_id, 'receive-cmd')
resp = self.read_response()
if resp.code != '200':
raise RuntimeError('Failed to send-cmd: %s %s' % (resp.code, resp.code_msg))
cmd += '\000'
self.send_update(req_id, '200', '', cmd)
resp = self.read_response()
if resp.code != '200':
raise RuntimeError('Failed to send-cmd: %s %s' % (resp.code, resp.code_msg))
self.send_update(req_id, SC_PROC_DONE, SS_PROC_DONE, '')
def prepare_recv_message(self, msg_type):
request = 'mq-server %s' % msg_type
req_id = self.get_request_id()
self.send_request(req_id, request)
resp = self.read_response()
if resp.code != '200':
raise RuntimeError('bad response: %s %s' % (resp.code, resp.code_msg))
def receive_message(self):
resp = self.read_response()
# the message from ccnet daemon has the trailing null byte included
msg = message_from_string(resp.content[:-1])
return msg
def prepare_send_message(self):
request = 'mq-server'
mq_req_id = self.get_request_id()
self.send_request(mq_req_id, request)
resp = self.read_response()
if resp.code != '200':
raise RuntimeError('bad response: %s %s' % (resp.code, resp.code_msg))
self.mq_req_id = mq_req_id
def send_message(self, msg_type, content):
if self.mq_req_id == -1:
self.prepare_send_message()
msg = gen_inner_message_string(self.peer_id, msg_type, content)
self.send_update(self.mq_req_id, "300", '', msg)
resp = self.read_response()
if resp.code != '200':
self.mq_req_id = -1
raise RuntimeError('bad response: %s %s' % (resp.code, resp.code_msg))
def register_service_sync(self, service, group):
'''Mainly used by a program to register a dummy service to ensure only
single instance of that program is running
'''
cmd = 'register-service %s %s' % (service, group)
self.send_cmd(cmd)
Queue
このデータ構造はめったに使用しませんが、Python
のドキュメントを確認しました:ドキュメントには、これが スレッドセーフなキュー であるため、マルチスレッド プログラミングで一般的な、複数のプロデューサーと複数のコンシューマーによる競合環境で使用できると記載されています。
これが
Queue
についての私の理解です。次に、質問のコードについて説明します。このコードが客户端的池
であることが明らかです。クライアントを取得する必要がある場合は、get_client()
を呼び出します。 > メソッドを使用すると、SyncClient
クライアントが返されます。それが完了したら、return_client(client)
を呼び出して元に戻すことを忘れないでください。そうしないと、ここで大量のクライアントが生成されます。多数のクライアントを作成すると問題が発生する可能性があります。以上です、質問者さんが理解しているか分かりませんが。
キューはスレッドセーフなキューです。@yylucifer は、実際には、
redis-py
のプール設計をより明確に説明しています。 blockconnectionpool 1 も基本的に同じ考えです。