from ccnet.sync_client import SyncClient
import Queue
class ClientPool(object):
"""ccnet client pool."""
def __init__(self, conf_dir, pool_size=5):
"""
:param conf_dir: the ccnet configuration directory
:param pool_size:
"""
self.conf_dir = conf_dir
self.pool_size = pool_size
self._pool = Queue.Queue(pool_size)
def _create_client(self):
client = SyncClient(self.conf_dir)
client.req_ids = {}
client.connect_daemon()
return client
def get_client(self):
try:
client = self._pool.get(False)
except:
client = self._create_client()
return client
def return_client(self, client):
try:
self._pool.put(client, False)
except Queue.Full:
pass
class SyncClient(Client):
'''sync mode client'''
def __init__(self, config_dir):
Client.__init__(self, config_dir)
self._req_id = _REQ_ID_START
self.mq_req_id = -1
def disconnect_daemon(self):
if self.is_connected():
try:
self._connfd.close()
except:
pass
def read_response(self):
packet = read_packet(self._connfd)
if packet.header.ptype != CCNET_MSG_RESPONSE:
raise RuntimeError('Invalid Response')
code, code_msg, content = parse_response(packet.body)
return Response(code, code_msg, content)
def send_cmd(self, cmd):
req_id = self.get_request_id()
self.send_request(req_id, 'receive-cmd')
resp = self.read_response()
if resp.code != '200':
raise RuntimeError('Failed to send-cmd: %s %s' % (resp.code, resp.code_msg))
cmd += '\000'
self.send_update(req_id, '200', '', cmd)
resp = self.read_response()
if resp.code != '200':
raise RuntimeError('Failed to send-cmd: %s %s' % (resp.code, resp.code_msg))
self.send_update(req_id, SC_PROC_DONE, SS_PROC_DONE, '')
def prepare_recv_message(self, msg_type):
request = 'mq-server %s' % msg_type
req_id = self.get_request_id()
self.send_request(req_id, request)
resp = self.read_response()
if resp.code != '200':
raise RuntimeError('bad response: %s %s' % (resp.code, resp.code_msg))
def receive_message(self):
resp = self.read_response()
# the message from ccnet daemon has the trailing null byte included
msg = message_from_string(resp.content[:-1])
return msg
def prepare_send_message(self):
request = 'mq-server'
mq_req_id = self.get_request_id()
self.send_request(mq_req_id, request)
resp = self.read_response()
if resp.code != '200':
raise RuntimeError('bad response: %s %s' % (resp.code, resp.code_msg))
self.mq_req_id = mq_req_id
def send_message(self, msg_type, content):
if self.mq_req_id == -1:
self.prepare_send_message()
msg = gen_inner_message_string(self.peer_id, msg_type, content)
self.send_update(self.mq_req_id, "300", '', msg)
resp = self.read_response()
if resp.code != '200':
self.mq_req_id = -1
raise RuntimeError('bad response: %s %s' % (resp.code, resp.code_msg))
def register_service_sync(self, service, group):
'''Mainly used by a program to register a dummy service to ensure only
single instance of that program is running
'''
cmd = 'register-service %s %s' % (service, group)
self.send_cmd(cmd)
Queue
J'utilise rarement cette structure de données, mais j'ai vérifié la documentation dePython
:Le document indique qu'il s'agit d'une file d'attente thread-safe, elle peut donc être utilisée dans un environnement de course avec plusieurs producteurs et plusieurs consommateurs, ce qui est courant dans la programmation multithread.
C'est ce que j'aime
Queue
Parlons maintenant du code dans la question. Il ressort clairement du nom que ce code est un客户端的池
Lorsque vous avez besoin d'obtenir un client, appelez la méthodeget_client()
et elle renverra unSyncClient
Le. Le client vous est remis. Lorsque vous avez terminé, n'oubliez pas d'appeler :return_client(client)
Échangez-le sinon, une grande vague de clients sera générée ici. sera problématique.C’est à peu près tout, je ne sais pas si celui qui pose la question comprend.
La file d'attente est une file d'attente thread-safe. Quant au code donné, @yylucifer l'a expliqué plus clairement. Il s'agit en fait d'un pool de clients réutilisable. Cette utilisation est très courante. blockconnectionpool one est fondamentalement la même idée.
redis-py