


Tutorial zum Schreiben eines einfachen Chat-Programms in Python
Implementierungsideen
x01 Einrichtung des Servers
Verwenden Sie zunächst auf der Serverseite den Socket, um Nachrichten jedes Mal zu akzeptieren Wenn eine Nachricht akzeptiert wird, öffnet die Socket-Anfrage einen neuen Thread, um die Verteilung und Annahme von Nachrichten zu verwalten. Gleichzeitig gibt es einen Handler zum Verwalten aller Threads, wodurch die Verarbeitung verschiedener Funktionen des Chatrooms realisiert wird. 🎜🎜## 🎜🎜#x02 Die Einrichtung des Clients
Die Einrichtung des Clients ist viel einfacher als die des Servers. Die Funktion des Clients besteht nur darin, Nachrichten zu senden und zu empfangen. und um bestimmte Nachrichten nach bestimmten Regeln einzugeben, um die Verwendung unterschiedlicher Funktionen zu erreichen, müssen Sie auf der Clientseite nur zwei Threads verwenden, einen für den Empfang von Nachrichten und einen für das Senden von Nachrichten.
Warum nicht einen verwenden? Denn wenn Sie nur einen verwenden, wird derjenige, der die Nachricht empfängt, vor dem Senden blockiert Wenn diese beiden Funktionen an einer Stelle implementiert sind, führt dies dazu, dass Nachrichten nicht kontinuierlich gesendet oder empfangen werden können 🎜## 🎜🎜 #import json
import threading
from socket import *
from time import ctime
class PyChattingServer:
__socket = socket(AF_INET, SOCK_STREAM, 0)
__address = ('', 12231)
__buf = 1024
def __init__(self):
self.__socket.bind(self.__address)
self.__socket.listen(20)
self.__msg_handler = ChattingHandler()
def start_session(self):
print('等待客户连接...\r\n')
try:
while True:
cs, caddr = self.__socket.accept()
# 利用handler来管理线程,实现线程之间的socket的相互通信
self.__msg_handler.start_thread(cs, caddr)
except socket.error:
pass
class ChattingThread(threading.Thread):
__buf = 1024
def __init__(self, cs, caddr, msg_handler):
super(ChattingThread, self).__init__()
self.__cs = cs
self.__caddr = caddr
self.__msg_handler = msg_handler
# 使用多线程管理会话
def run(self):
try:
print('...连接来自于:', self.__caddr)
data = '欢迎你到来PY_CHATTING!请输入你的很cooooool的昵称(不能带有空格哟`)\r\n'
self.__cs.sendall(bytes(data, 'utf-8'))
while True:
data = self.__cs.recv(self.__buf).decode('utf-8')
if not data:
break
self.__msg_handler.handle_msg(data, self.__cs)
print(data)
except socket.error as e:
print(e.args)
pass
finally:
self.__msg_handler.close_conn(self.__cs)
self.__cs.close()
class ChattingHandler:
__help_str = "[ SYSTEM ]\r\n" \
"输入/ls,即可获得所有登陆用户信息\r\n" \
"输入/h,即可获得帮助\r\n" \
"输入@用户名 (注意用户名后面的空格)+消息,即可发动单聊\r\n" \
"输入/i,即可屏蔽群聊信息\r\n" \
"再次输入/i,即可取消屏蔽\r\n" \
"所有首字符为/的信息都不会发送出去"
__buf = 1024
__socket_list = []
__user_name_to_socket = {}
__socket_to_user_name = {}
__user_name_to_broadcast_state = {}
def start_thread(self, cs, caddr):
self.__socket_list.append(cs)
chat_thread = ChattingThread(cs, caddr, self)
chat_thread.start()
def close_conn(self, cs):
if cs not in self.__socket_list:
return
# 去除socket的记录
nickname = "SOMEONE"
if cs in self.__socket_list:
self.__socket_list.remove(cs)
# 去除socket与username之间的映射关系
if cs in self.__socket_to_user_name:
nickname = self.__socket_to_user_name[cs]
self.__user_name_to_socket.pop(self.__socket_to_user_name[cs])
self.__socket_to_user_name.pop(cs)
self.__user_name_to_broadcast_state.pop(nickname)
nickname += " "
# 广播某玩家退出聊天室
self.broadcast_system_msg(nickname + "离开了PY_CHATTING")
# 管理用户输入的信息
def handle_msg(self, msg, cs):
js = json.loads(msg)
if js['type'] == "login":
if js['msg'] not in self.__user_name_to_socket:
if ' ' in js['msg']:
self.send_to(json.dumps({
'type': 'login',
'success': False,
'msg': '账号不能够带有空格'
}), cs)
else:
self.__user_name_to_socket[js['msg']] = cs
self.__socket_to_user_name[cs] = js['msg']
self.__user_name_to_broadcast_state[js['msg']] = True
self.send_to(json.dumps({
'type': 'login',
'success': True,
'msg': '昵称建立成功,输入/ls可查看所有在线的人,输入/help可以查看帮助(所有首字符为/的消息都不会发送)'
}), cs)
# 广播其他人,他已经进入聊天室
self.broadcast_system_msg(js['msg'] + "已经进入了聊天室")
else:
self.send_to(json.dumps({
'type': 'login',
'success': False,
'msg': '账号已存在'
}), cs)
# 若玩家处于屏蔽模式,则无法发送群聊消息
elif js['type'] == "broadcast":
if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]:
self.broadcast(js['msg'], cs)
else:
self.send_to(json.dumps({
'type': 'broadcast',
'msg': '屏蔽模式下无法发送群聊信息'
}), cs)
elif js['type'] == "ls":
self.send_to(json.dumps({
'type': 'ls',
'msg': self.get_all_login_user_info()
}), cs)
elif js['type'] == "help":
self.send_to(json.dumps({
'type': 'help',
'msg': self.__help_str
}), cs)
elif js['type'] == "sendto":
self.single_chatting(cs, js['nickname'], js['msg'])
elif js['type'] == "ignore":
self.exchange_ignore_state(cs)
def exchange_ignore_state(self, cs):
if cs in self.__socket_to_user_name:
state = self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]
if state:
state = False
else:
state = True
self.__user_name_to_broadcast_state.pop(self.__socket_to_user_name[cs])
self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]] = state
if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]:
msg = "通常模式"
else:
msg = "屏蔽模式"
self.send_to(json.dumps({
'type': 'ignore',
'success': True,
'msg': '[TIME : %s]\r\n[ SYSTEM ] : %s\r\n' % (ctime(), "模式切换成功,现在是" + msg)
}), cs)
else:
self.send_to({
'type': 'ignore',
'success': False,
'msg': '切换失败'
}, cs)
def single_chatting(self, cs, nickname, msg):
if nickname in self.__user_name_to_socket:
msg = '[TIME : %s]\r\n[ %s CHATTING TO %s ] : %s\r\n' % (
ctime(), self.__socket_to_user_name[cs], nickname, msg)
self.send_to_list(json.dumps({
'type': 'single',
'msg': msg
}), self.__user_name_to_socket[nickname], cs)
else:
self.send_to(json.dumps({
'type': 'single',
'msg': '该用户不存在'
}), cs)
print(nickname)
def send_to_list(self, msg, *cs):
for i in range(len(cs)):
self.send_to(msg, cs[i])
def get_all_login_user_info(self):
login_list = "[ SYSTEM ] ALIVE USER : \r\n"
for key in self.__socket_to_user_name:
login_list += self.__socket_to_user_name[key] + ",\r\n"
return login_list
def send_to(self, msg, cs):
if cs not in self.__socket_list:
self.__socket_list.append(cs)
cs.sendall(bytes(msg, 'utf-8'))
def broadcast_system_msg(self, msg):
data = '[TIME : %s]\r\n[ SYSTEM ] : %s\r\n' % (ctime(), msg)
js = json.dumps({
'type': 'system_msg',
'msg': data
})
# 屏蔽了群聊的玩家也可以获得系统的群发信息
for i in range(len(self.__socket_list)):
if self.__socket_list[i] in self.__socket_to_user_name:
self.__socket_list[i].sendall(bytes(js, 'utf-8'))
def broadcast(self, msg, cs):
data = '[TIME : %s]\r\n[%s] : %s\r\n' % (ctime(), self.__socket_to_user_name[cs], msg)
js = json.dumps({
'type': 'broadcast',
'msg': data
})
# 没有的登陆的玩家无法得知消息,屏蔽了群聊的玩家也没办法获取信息
for i in range(len(self.__socket_list)):
if self.__socket_list[i] in self.__socket_to_user_name \
and self.__user_name_to_broadcast_state[self.__socket_to_user_name[self.__socket_list[i]]]:
self.__socket_list[i].sendall(bytes(js, 'utf-8'))
def main():
server = PyChattingServer()
server.start_session()
main()
Nach dem Login kopieren
import json import threading from socket import * from time import ctime class PyChattingServer: __socket = socket(AF_INET, SOCK_STREAM, 0) __address = ('', 12231) __buf = 1024 def __init__(self): self.__socket.bind(self.__address) self.__socket.listen(20) self.__msg_handler = ChattingHandler() def start_session(self): print('等待客户连接...\r\n') try: while True: cs, caddr = self.__socket.accept() # 利用handler来管理线程,实现线程之间的socket的相互通信 self.__msg_handler.start_thread(cs, caddr) except socket.error: pass class ChattingThread(threading.Thread): __buf = 1024 def __init__(self, cs, caddr, msg_handler): super(ChattingThread, self).__init__() self.__cs = cs self.__caddr = caddr self.__msg_handler = msg_handler # 使用多线程管理会话 def run(self): try: print('...连接来自于:', self.__caddr) data = '欢迎你到来PY_CHATTING!请输入你的很cooooool的昵称(不能带有空格哟`)\r\n' self.__cs.sendall(bytes(data, 'utf-8')) while True: data = self.__cs.recv(self.__buf).decode('utf-8') if not data: break self.__msg_handler.handle_msg(data, self.__cs) print(data) except socket.error as e: print(e.args) pass finally: self.__msg_handler.close_conn(self.__cs) self.__cs.close() class ChattingHandler: __help_str = "[ SYSTEM ]\r\n" \ "输入/ls,即可获得所有登陆用户信息\r\n" \ "输入/h,即可获得帮助\r\n" \ "输入@用户名 (注意用户名后面的空格)+消息,即可发动单聊\r\n" \ "输入/i,即可屏蔽群聊信息\r\n" \ "再次输入/i,即可取消屏蔽\r\n" \ "所有首字符为/的信息都不会发送出去" __buf = 1024 __socket_list = [] __user_name_to_socket = {} __socket_to_user_name = {} __user_name_to_broadcast_state = {} def start_thread(self, cs, caddr): self.__socket_list.append(cs) chat_thread = ChattingThread(cs, caddr, self) chat_thread.start() def close_conn(self, cs): if cs not in self.__socket_list: return # 去除socket的记录 nickname = "SOMEONE" if cs in self.__socket_list: self.__socket_list.remove(cs) # 去除socket与username之间的映射关系 if cs in self.__socket_to_user_name: nickname = self.__socket_to_user_name[cs] self.__user_name_to_socket.pop(self.__socket_to_user_name[cs]) self.__socket_to_user_name.pop(cs) self.__user_name_to_broadcast_state.pop(nickname) nickname += " " # 广播某玩家退出聊天室 self.broadcast_system_msg(nickname + "离开了PY_CHATTING") # 管理用户输入的信息 def handle_msg(self, msg, cs): js = json.loads(msg) if js['type'] == "login": if js['msg'] not in self.__user_name_to_socket: if ' ' in js['msg']: self.send_to(json.dumps({ 'type': 'login', 'success': False, 'msg': '账号不能够带有空格' }), cs) else: self.__user_name_to_socket[js['msg']] = cs self.__socket_to_user_name[cs] = js['msg'] self.__user_name_to_broadcast_state[js['msg']] = True self.send_to(json.dumps({ 'type': 'login', 'success': True, 'msg': '昵称建立成功,输入/ls可查看所有在线的人,输入/help可以查看帮助(所有首字符为/的消息都不会发送)' }), cs) # 广播其他人,他已经进入聊天室 self.broadcast_system_msg(js['msg'] + "已经进入了聊天室") else: self.send_to(json.dumps({ 'type': 'login', 'success': False, 'msg': '账号已存在' }), cs) # 若玩家处于屏蔽模式,则无法发送群聊消息 elif js['type'] == "broadcast": if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]: self.broadcast(js['msg'], cs) else: self.send_to(json.dumps({ 'type': 'broadcast', 'msg': '屏蔽模式下无法发送群聊信息' }), cs) elif js['type'] == "ls": self.send_to(json.dumps({ 'type': 'ls', 'msg': self.get_all_login_user_info() }), cs) elif js['type'] == "help": self.send_to(json.dumps({ 'type': 'help', 'msg': self.__help_str }), cs) elif js['type'] == "sendto": self.single_chatting(cs, js['nickname'], js['msg']) elif js['type'] == "ignore": self.exchange_ignore_state(cs) def exchange_ignore_state(self, cs): if cs in self.__socket_to_user_name: state = self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]] if state: state = False else: state = True self.__user_name_to_broadcast_state.pop(self.__socket_to_user_name[cs]) self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]] = state if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]: msg = "通常模式" else: msg = "屏蔽模式" self.send_to(json.dumps({ 'type': 'ignore', 'success': True, 'msg': '[TIME : %s]\r\n[ SYSTEM ] : %s\r\n' % (ctime(), "模式切换成功,现在是" + msg) }), cs) else: self.send_to({ 'type': 'ignore', 'success': False, 'msg': '切换失败' }, cs) def single_chatting(self, cs, nickname, msg): if nickname in self.__user_name_to_socket: msg = '[TIME : %s]\r\n[ %s CHATTING TO %s ] : %s\r\n' % ( ctime(), self.__socket_to_user_name[cs], nickname, msg) self.send_to_list(json.dumps({ 'type': 'single', 'msg': msg }), self.__user_name_to_socket[nickname], cs) else: self.send_to(json.dumps({ 'type': 'single', 'msg': '该用户不存在' }), cs) print(nickname) def send_to_list(self, msg, *cs): for i in range(len(cs)): self.send_to(msg, cs[i]) def get_all_login_user_info(self): login_list = "[ SYSTEM ] ALIVE USER : \r\n" for key in self.__socket_to_user_name: login_list += self.__socket_to_user_name[key] + ",\r\n" return login_list def send_to(self, msg, cs): if cs not in self.__socket_list: self.__socket_list.append(cs) cs.sendall(bytes(msg, 'utf-8')) def broadcast_system_msg(self, msg): data = '[TIME : %s]\r\n[ SYSTEM ] : %s\r\n' % (ctime(), msg) js = json.dumps({ 'type': 'system_msg', 'msg': data }) # 屏蔽了群聊的玩家也可以获得系统的群发信息 for i in range(len(self.__socket_list)): if self.__socket_list[i] in self.__socket_to_user_name: self.__socket_list[i].sendall(bytes(js, 'utf-8')) def broadcast(self, msg, cs): data = '[TIME : %s]\r\n[%s] : %s\r\n' % (ctime(), self.__socket_to_user_name[cs], msg) js = json.dumps({ 'type': 'broadcast', 'msg': data }) # 没有的登陆的玩家无法得知消息,屏蔽了群聊的玩家也没办法获取信息 for i in range(len(self.__socket_list)): if self.__socket_list[i] in self.__socket_to_user_name \ and self.__user_name_to_broadcast_state[self.__socket_to_user_name[self.__socket_list[i]]]: self.__socket_list[i].sendall(bytes(js, 'utf-8')) def main(): server = PyChattingServer() server.start_session() main()
Das obige ist der detaillierte Inhalt vonTutorial zum Schreiben eines einfachen Chat-Programms in Python. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Heiße KI -Werkzeuge

Undresser.AI Undress
KI-gestützte App zum Erstellen realistischer Aktfotos

AI Clothes Remover
Online-KI-Tool zum Entfernen von Kleidung aus Fotos.

Undress AI Tool
Ausziehbilder kostenlos

Clothoff.io
KI-Kleiderentferner

AI Hentai Generator
Erstellen Sie kostenlos Ai Hentai.

Heißer Artikel

Heiße Werkzeuge

Notepad++7.3.1
Einfach zu bedienender und kostenloser Code-Editor

SublimeText3 chinesische Version
Chinesische Version, sehr einfach zu bedienen

Senden Sie Studio 13.0.1
Leistungsstarke integrierte PHP-Entwicklungsumgebung

Dreamweaver CS6
Visuelle Webentwicklungstools

SublimeText3 Mac-Version
Codebearbeitungssoftware auf Gottesniveau (SublimeText3)

Heiße Themen



Sie können grundlegende Programmierkonzepte und Fähigkeiten von Python innerhalb von 2 Stunden lernen. 1. Lernen Sie Variablen und Datentypen, 2. Master Control Flow (bedingte Anweisungen und Schleifen), 3.. Verstehen Sie die Definition und Verwendung von Funktionen, 4. Beginnen Sie schnell mit der Python -Programmierung durch einfache Beispiele und Code -Snippets.

Python wird in den Bereichen Webentwicklung, Datenwissenschaft, maschinelles Lernen, Automatisierung und Skripten häufig verwendet. 1) In der Webentwicklung vereinfachen Django und Flask Frameworks den Entwicklungsprozess. 2) In den Bereichen Datenwissenschaft und maschinelles Lernen bieten Numpy-, Pandas-, Scikit-Learn- und TensorFlow-Bibliotheken eine starke Unterstützung. 3) In Bezug auf Automatisierung und Skript ist Python für Aufgaben wie automatisiertes Test und Systemmanagement geeignet.

Es ist unmöglich, das MongoDB -Passwort direkt über Navicat anzuzeigen, da es als Hash -Werte gespeichert ist. So rufen Sie verlorene Passwörter ab: 1. Passwörter zurücksetzen; 2. Überprüfen Sie die Konfigurationsdateien (können Hash -Werte enthalten). 3. Überprüfen Sie Codes (May Hardcode -Passwörter).

Als Datenprofi müssen Sie große Datenmengen aus verschiedenen Quellen verarbeiten. Dies kann Herausforderungen für das Datenmanagement und die Analyse darstellen. Glücklicherweise können zwei AWS -Dienste helfen: AWS -Kleber und Amazon Athena.

Zu den Schritten zum Starten eines Redis -Servers gehören: Installieren von Redis gemäß dem Betriebssystem. Starten Sie den Redis-Dienst über Redis-Server (Linux/macOS) oder redis-server.exe (Windows). Verwenden Sie den Befehl redis-cli ping (linux/macOS) oder redis-cli.exe ping (Windows), um den Dienststatus zu überprüfen. Verwenden Sie einen Redis-Client wie Redis-Cli, Python oder Node.js, um auf den Server zuzugreifen.

Um eine Warteschlange aus Redis zu lesen, müssen Sie den Warteschlangenname erhalten, die Elemente mit dem Befehl LPOP lesen und die leere Warteschlange verarbeiten. Die spezifischen Schritte sind wie folgt: Holen Sie sich den Warteschlangenname: Nennen Sie ihn mit dem Präfix von "Warteschlange:" wie "Warteschlangen: My-Queue". Verwenden Sie den Befehl LPOP: Wischen Sie das Element aus dem Kopf der Warteschlange aus und geben Sie seinen Wert zurück, z. B. die LPOP-Warteschlange: my-queue. Verarbeitung leerer Warteschlangen: Wenn die Warteschlange leer ist, gibt LPOP NIL zurück, und Sie können überprüfen, ob die Warteschlange existiert, bevor Sie das Element lesen.

FRAGE: Wie kann man die Redis -Server -Version anzeigen? Verwenden Sie das Befehlszeilen-Tool-REDIS-CLI-Verssion, um die Version des angeschlossenen Servers anzuzeigen. Verwenden Sie den Befehl "Info Server", um die interne Version des Servers anzuzeigen, und muss Informationen analysieren und zurückgeben. Überprüfen Sie in einer Cluster -Umgebung die Versionskonsistenz jedes Knotens und können automatisch mit Skripten überprüft werden. Verwenden Sie Skripte, um die Anzeigeversionen zu automatisieren, z. B. eine Verbindung mit Python -Skripten und Druckversionsinformationen.

Die Kennwortsicherheit von Navicat beruht auf der Kombination aus symmetrischer Verschlüsselung, Kennwortstärke und Sicherheitsmaßnahmen. Zu den spezifischen Maßnahmen gehören: Verwenden von SSL -Verbindungen (vorausgesetzt, dass der Datenbankserver das Zertifikat unterstützt und korrekt konfiguriert), die Navicat regelmäßig Aktualisierung unter Verwendung von sichereren Methoden (z. B. SSH -Tunneln), die Einschränkung von Zugriffsrechten und vor allem niemals Kennwörter aufzeichnen.
