Maison > développement back-end > Tutoriel Python > Analyse d'un exemple d'architecture distribuée maître-esclave personnalisée Python

Analyse d'un exemple d'architecture distribuée maître-esclave personnalisée Python

高洛峰
Libérer: 2017-02-22 16:22:06
original
1481 Les gens l'ont consulté

Cet article présente principalement l'architecture distribuée maître-esclave personnalisée de Python et analyse la structure, les principes et les techniques spécifiques d'implémentation de code de l'architecture distribuée maître-esclave sous forme d'exemples. Les amis dans le besoin peuvent s'y référer

<.>L'exemple de cet article décrit l'architecture distribuée maître-esclave personnalisée de Python. Partagez-le avec tout le monde pour votre référence, les détails sont les suivants :

Environnement : Win7 x64, Python 2.7, APScheduler 2.1.2.

Le schéma schématique est le suivant :

Analyse dun exemple darchitecture distribuée maître-esclave personnalisée Python

Partie code :

( 1), Nœud central :

#encoding=utf-8
#author: walker
#date: 2014-12-03
#function: 中心节点(主要功能是分配任务)
import SocketServer, socket, Queue
CenterIP = &#39;127.0.0.1&#39;  #中心节点IP
CenterListenPort = 9999  #中心节点监听端口
CenterClient = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #中心节点用于发送网络消息的socket
TaskQueue = Queue.Queue() #任务队列
#获取任务队列
def GetTaskQueue():
  for i in range(1, 11):
    TaskQueue.put(str(i))
#CenterServer的回调函数,在接受到udp报文是触发
class MyUDPHandler(SocketServer.BaseRequestHandler):
  def handle(self):
    data = self.request[0].strip()
    socket = self.request[1]
    print(data)
    if data.startswith(&#39;wait&#39;):
      vec = data.split(&#39;:&#39;)
      if len(vec) != 3:
        print(&#39;Error: len(vec) != 3&#39;)
      else:
        nodeIP = vec[1]
        nodeListenPort = vec[2]
        nodeID = nodeIP + &#39;:&#39; + nodeListenPort
        if not TaskQueue.empty():
          task = TaskQueue.get()
          print(&#39;send task &#39; + task + &#39; to &#39; + nodeID)
          CenterClient.sendto(&#39;task:&#39; + task, (nodeIP, int(nodeListenPort)))
        else:
          print(&#39;TaskQueue is empty!&#39;)
GetTaskQueue() #获取任务队列
CenterServer = SocketServer.UDPServer((CenterIP, CenterListenPort), MyUDPHandler)
print(&#39;Listen port &#39; + str(CenterListenPort) + &#39; ...&#39;)
CenterServer.serve_forever()
Copier après la connexion

(2), nœud de tâche :

#encoding=utf-8
#author: walker
#date: 2014-12-03
#function: 任务节点(请求/接收/执行任务)
import time, socket, SocketServer
from apscheduler.scheduler import Scheduler
CenterIP = &#39;127.0.0.1&#39;  #中心节点IP
CenterListenPort = 9999  #中心节点监听端口
NodeIP = socket.gethostbyname(socket.gethostname())  #任务节点自身IP
NodeClient = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  #任务节点用于发送网络消息的socket
#任务:发送网络信息
def jobSendNetMsg():
  msg = &#39;&#39;
  if NodeServer.TaskState == &#39;wait&#39;:
    msg = &#39;wait:&#39; + NodeIP + &#39;:&#39; + str(NodeListenPort)
  elif NodeServer.TaskState == &#39;exec&#39;:
    msg = &#39;exec:&#39; + NodeIP + &#39;:&#39; + str(NodeListenPort)
  print(msg)
  NodeClient.sendto(msg, (CenterIP, CenterListenPort))
#添加并启动定时任务
def InitTimer():
  sched = Scheduler()
  sched.add_interval_job(jobSendNetMsg, seconds=1)
  sched.start()
#执行任务
def ExecTask(task):
  print(&#39;ExecTask &#39; + task + &#39; ...&#39;)
  time.sleep(2)
  print(&#39;ExecTask &#39; + task + &#39; over&#39;)
#NodeServer的回调函数,在接受到udp报文是触发
class MyUDPHandler(SocketServer.BaseRequestHandler):
  def handle(self):
    data = self.request[0].strip()
    socket = self.request[1]
    print(&#39;recv data: &#39; + data)
    if data.startswith(&#39;task&#39;):
      vec = data.split(&#39;:&#39;)
      if len(vec) != 2:
        print(&#39;Error: len(vec) != 2&#39;)
      else:
        task = vec[1]
        self.server.TaskState = &#39;exec&#39;
        ExecTask(task)
        self.server.TaskState = &#39;wait&#39;
InitTimer()
NodeServer = SocketServer.UDPServer((&#39;&#39;, 0), MyUDPHandler)
NodeServer.TaskState = &#39;wait&#39; #(exec/wait)
NodeListenPort = NodeServer.server_address[1]
print(&#39;NodeListenPort:&#39; + str(NodeListenPort))
NodeServer.serve_forever()
Copier après la connexion


Pour plus d'articles liés à l'analyse d'exemples d'architecture distribuée maître-esclave personnalisée Python, veuillez faire attention au site Web PHP chinois !

Étiquettes associées:
source:php.cn
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal