Python으로 간단한 채팅 프로그램 작성 튜토리얼
구현 아이디어
x01 서버 구축
우선, 서버 측에서 소켓은 메시지를 수락하는 데 사용됩니다. 소켓 요청이 수락될 때마다 새 스레드가 열려 메시지의 배포 및 수락을 관리합니다. 동시에 채팅방의 다양한 기능을 처리하기 위해 모든 스레드를 관리하는 핸들러가 있습니다
x02 클라이언트 설정
클라이언트 설정은 서버보다 훨씬 간단합니다. 클라이언트는 메시지를 보내고 특정 규칙에 따라 특정 문자를 입력하면 됩니다. 따라서 클라이언트 측에서는 두 개의 스레드만 사용하면 됩니다. 하나는 메시지 수신 전용입니다. other는 전송 전용입니다.
우리가 하나를 사용하지 않는 이유는 하나만 사용하면 메시지가 수신될 때 메시지를 받는 사람이 메시지를 보내기 전에 차단된 상태가 되기 때문입니다. 메시지 보내기도 마찬가지이므로 이 두 기능을 한 곳에 구현하면 계속 메시지를 주고받을 방법이 없게 됩니다
구현 방법
서버측 구현
import json import threading from socket import * from time import ctime class PyChattingServer: __socket = socket(AF_INET, SOCK_STREAM, 0) __address = ('', 12231) __buf = 1024 def __init__(self): self.__socket.bind(self.__address) self.__socket.listen(20) self.__msg_handler = ChattingHandler() def start_session(self): print('等待客户连接...\r\n') try: while True: cs, caddr = self.__socket.accept() # 利用handler来管理线程,实现线程之间的socket的相互通信 self.__msg_handler.start_thread(cs, caddr) except socket.error: pass class ChattingThread(threading.Thread): __buf = 1024 def __init__(self, cs, caddr, msg_handler): super(ChattingThread, self).__init__() self.__cs = cs self.__caddr = caddr self.__msg_handler = msg_handler # 使用多线程管理会话 def run(self): try: print('...连接来自于:', self.__caddr) data = '欢迎你到来PY_CHATTING!请输入你的很cooooool的昵称(不能带有空格哟`)\r\n' self.__cs.sendall(bytes(data, 'utf-8')) while True: data = self.__cs.recv(self.__buf).decode('utf-8') if not data: break self.__msg_handler.handle_msg(data, self.__cs) print(data) except socket.error as e: print(e.args) pass finally: self.__msg_handler.close_conn(self.__cs) self.__cs.close() class ChattingHandler: __help_str = "[ SYSTEM ]\r\n" \ "输入/ls,即可获得所有登陆用户信息\r\n" \ "输入/h,即可获得帮助\r\n" \ "输入@用户名 (注意用户名后面的空格)+消息,即可发动单聊\r\n" \ "输入/i,即可屏蔽群聊信息\r\n" \ "再次输入/i,即可取消屏蔽\r\n" \ "所有首字符为/的信息都不会发送出去" __buf = 1024 __socket_list = [] __user_name_to_socket = {} __socket_to_user_name = {} __user_name_to_broadcast_state = {} def start_thread(self, cs, caddr): self.__socket_list.append(cs) chat_thread = ChattingThread(cs, caddr, self) chat_thread.start() def close_conn(self, cs): if cs not in self.__socket_list: return # 去除socket的记录 nickname = "SOMEONE" if cs in self.__socket_list: self.__socket_list.remove(cs) # 去除socket与username之间的映射关系 if cs in self.__socket_to_user_name: nickname = self.__socket_to_user_name[cs] self.__user_name_to_socket.pop(self.__socket_to_user_name[cs]) self.__socket_to_user_name.pop(cs) self.__user_name_to_broadcast_state.pop(nickname) nickname += " " # 广播某玩家退出聊天室 self.broadcast_system_msg(nickname + "离开了PY_CHATTING") # 管理用户输入的信息 def handle_msg(self, msg, cs): js = json.loads(msg) if js['type'] == "login": if js['msg'] not in self.__user_name_to_socket: if ' ' in js['msg']: self.send_to(json.dumps({ 'type': 'login', 'success': False, 'msg': '账号不能够带有空格' }), cs) else: self.__user_name_to_socket[js['msg']] = cs self.__socket_to_user_name[cs] = js['msg'] self.__user_name_to_broadcast_state[js['msg']] = True self.send_to(json.dumps({ 'type': 'login', 'success': True, 'msg': '昵称建立成功,输入/ls可查看所有在线的人,输入/help可以查看帮助(所有首字符为/的消息都不会发送)' }), cs) # 广播其他人,他已经进入聊天室 self.broadcast_system_msg(js['msg'] + "已经进入了聊天室") else: self.send_to(json.dumps({ 'type': 'login', 'success': False, 'msg': '账号已存在' }), cs) # 若玩家处于屏蔽模式,则无法发送群聊消息 elif js['type'] == "broadcast": if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]: self.broadcast(js['msg'], cs) else: self.send_to(json.dumps({ 'type': 'broadcast', 'msg': '屏蔽模式下无法发送群聊信息' }), cs) elif js['type'] == "ls": self.send_to(json.dumps({ 'type': 'ls', 'msg': self.get_all_login_user_info() }), cs) elif js['type'] == "help": self.send_to(json.dumps({ 'type': 'help', 'msg': self.__help_str }), cs) elif js['type'] == "sendto": self.single_chatting(cs, js['nickname'], js['msg']) elif js['type'] == "ignore": self.exchange_ignore_state(cs) def exchange_ignore_state(self, cs): if cs in self.__socket_to_user_name: state = self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]] if state: state = False else: state = True self.__user_name_to_broadcast_state.pop(self.__socket_to_user_name[cs]) self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]] = state if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]: msg = "通常模式" else: msg = "屏蔽模式" self.send_to(json.dumps({ 'type': 'ignore', 'success': True, 'msg': '[TIME : %s]\r\n[ SYSTEM ] : %s\r\n' % (ctime(), "模式切换成功,现在是" + msg) }), cs) else: self.send_to({ 'type': 'ignore', 'success': False, 'msg': '切换失败' }, cs) def single_chatting(self, cs, nickname, msg): if nickname in self.__user_name_to_socket: msg = '[TIME : %s]\r\n[ %s CHATTING TO %s ] : %s\r\n' % ( ctime(), self.__socket_to_user_name[cs], nickname, msg) self.send_to_list(json.dumps({ 'type': 'single', 'msg': msg }), self.__user_name_to_socket[nickname], cs) else: self.send_to(json.dumps({ 'type': 'single', 'msg': '该用户不存在' }), cs) print(nickname) def send_to_list(self, msg, *cs): for i in range(len(cs)): self.send_to(msg, cs[i]) def get_all_login_user_info(self): login_list = "[ SYSTEM ] ALIVE USER : \r\n" for key in self.__socket_to_user_name: login_list += self.__socket_to_user_name[key] + ",\r\n" return login_list def send_to(self, msg, cs): if cs not in self.__socket_list: self.__socket_list.append(cs) cs.sendall(bytes(msg, 'utf-8')) def broadcast_system_msg(self, msg): data = '[TIME : %s]\r\n[ SYSTEM ] : %s\r\n' % (ctime(), msg) js = json.dumps({ 'type': 'system_msg', 'msg': data }) # 屏蔽了群聊的玩家也可以获得系统的群发信息 for i in range(len(self.__socket_list)): if self.__socket_list[i] in self.__socket_to_user_name: self.__socket_list[i].sendall(bytes(js, 'utf-8')) def broadcast(self, msg, cs): data = '[TIME : %s]\r\n[%s] : %s\r\n' % (ctime(), self.__socket_to_user_name[cs], msg) js = json.dumps({ 'type': 'broadcast', 'msg': data }) # 没有的登陆的玩家无法得知消息,屏蔽了群聊的玩家也没办法获取信息 for i in range(len(self.__socket_list)): if self.__socket_list[i] in self.__socket_to_user_name \ and self.__user_name_to_broadcast_state[self.__socket_to_user_name[self.__socket_list[i]]]: self.__socket_list[i].sendall(bytes(js, 'utf-8')) def main(): server = PyChattingServer() server.start_session() main()
클라이언트측 구현
import json import threading from socket import * is_login = False is_broadcast = True class ClientReceiveThread(threading.Thread): __buf = 1024 def __init__(self, cs): super(ClientReceiveThread, self).__init__() self.__cs = cs def run(self): self.receive_msg() def receive_msg(self): while True: msg = self.__cs.recv(self.__buf).decode('utf-8') if not msg: break js = json.loads(msg) if js['type'] == "login": if js['success']: global is_login is_login = True print(js['msg']) elif js['type'] == "ignore": if js['success']: global is_broadcast if is_broadcast: is_broadcast = False else: is_broadcast = True print(js['msg']) else: if not is_broadcast: print("[现在处于屏蔽模式]") print(js['msg']) class ClientSendMsgThread(threading.Thread): def __init__(self, cs): super(ClientSendMsgThread, self).__init__() self.__cs = cs def run(self): self.send_msg() # 根据不同的输入格式来进行不同的聊天方式 def send_msg(self): while True: js = None msg = input() if not is_login: js = json.dumps({ 'type': 'login', 'msg': msg }) elif msg[0] == "@": data = msg.split(' ') if not data: print("请重新输入") break nickname = data[0] nickname = nickname.strip("@") if len(data) == 1: data.append(" ") js = json.dumps({ 'type': 'sendto', 'nickname': nickname, 'msg': data[1] }) elif msg == "/help": js = json.dumps({ 'type': 'help', 'msg': None }) elif msg == "/ls": js = json.dumps({ 'type': 'ls', 'msg': None }) elif msg == "/i": js = json.dumps({ 'type': 'ignore', 'msg': None }) else: if msg[0] != '/': js = json.dumps({ 'type': 'broadcast', 'msg': msg }) if js is not None: self.__cs.sendall(bytes(js, 'utf-8')) def main(): buf = 1024 # 改变这个的地址,变成服务器的地址,那么只要部署到服务器上就可以全网使用了 address = ("127.0.0.1", 12231) cs = socket(AF_INET, SOCK_STREAM, 0) cs.connect(address) data = cs.recv(buf).decode("utf-8") if data: print(data) receive_thread = ClientReceiveThread(cs) receive_thread.start() send_thread = ClientSendMsgThread(cs) send_thread.start() while True: pass main()
위 내용은 Python으로 간단한 채팅 프로그램 작성 튜토리얼의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

핫 AI 도구

Undresser.AI Undress
사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover
사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool
무료로 이미지를 벗다

Clothoff.io
AI 옷 제거제

AI Hentai Generator
AI Hentai를 무료로 생성하십시오.

인기 기사

뜨거운 도구

메모장++7.3.1
사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전
중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기
강력한 PHP 통합 개발 환경

드림위버 CS6
시각적 웹 개발 도구

SublimeText3 Mac 버전
신 수준의 코드 편집 소프트웨어(SublimeText3)

뜨거운 주제











MySQL에는 무료 커뮤니티 버전과 유료 엔터프라이즈 버전이 있습니다. 커뮤니티 버전은 무료로 사용 및 수정할 수 있지만 지원은 제한되어 있으며 안정성이 낮은 응용 프로그램에 적합하며 기술 기능이 강합니다. Enterprise Edition은 안정적이고 신뢰할 수있는 고성능 데이터베이스가 필요하고 지원 비용을 기꺼이 지불하는 응용 프로그램에 대한 포괄적 인 상업적 지원을 제공합니다. 버전을 선택할 때 고려 된 요소에는 응용 프로그램 중요도, 예산 책정 및 기술 기술이 포함됩니다. 완벽한 옵션은없고 가장 적합한 옵션 만 있으므로 특정 상황에 따라 신중하게 선택해야합니다.

이 기사는 MySQL 데이터베이스의 작동을 소개합니다. 먼저 MySQLworkBench 또는 명령 줄 클라이언트와 같은 MySQL 클라이언트를 설치해야합니다. 1. MySQL-Uroot-P 명령을 사용하여 서버에 연결하고 루트 계정 암호로 로그인하십시오. 2. CreateABase를 사용하여 데이터베이스를 작성하고 데이터베이스를 선택하십시오. 3. CreateTable을 사용하여 테이블을 만들고 필드 및 데이터 유형을 정의하십시오. 4. InsertInto를 사용하여 데이터를 삽입하고 데이터를 쿼리하고 업데이트를 통해 데이터를 업데이트하고 DELETE를 통해 데이터를 삭제하십시오. 이러한 단계를 마스터하고 일반적인 문제를 처리하는 법을 배우고 데이터베이스 성능을 최적화하면 MySQL을 효율적으로 사용할 수 있습니다.

MySQL 다운로드 파일은 손상되었습니다. 어떻게해야합니까? 아아, mySQL을 다운로드하면 파일 손상을 만날 수 있습니다. 요즘 정말 쉽지 않습니다! 이 기사는 모든 사람이 우회를 피할 수 있도록이 문제를 해결하는 방법에 대해 이야기합니다. 읽은 후 손상된 MySQL 설치 패키지를 복구 할 수있을뿐만 아니라 향후에 갇히지 않도록 다운로드 및 설치 프로세스에 대해 더 깊이 이해할 수 있습니다. 파일 다운로드가 손상된 이유에 대해 먼저 이야기합시다. 이에 대한 많은 이유가 있습니다. 네트워크 문제는 범인입니다. 네트워크의 다운로드 프로세스 및 불안정성의 중단으로 인해 파일 손상이 발생할 수 있습니다. 다운로드 소스 자체에도 문제가 있습니다. 서버 파일 자체가 고장 났으며 물론 다운로드하면 고장됩니다. 또한 일부 안티 바이러스 소프트웨어의 과도한 "열정적 인"스캔으로 인해 파일 손상이 발생할 수 있습니다. 진단 문제 : 파일이 실제로 손상되었는지 확인하십시오

MySQL 설치 실패의 주된 이유는 다음과 같습니다. 1. 권한 문제, 관리자로 실행하거나 Sudo 명령을 사용해야합니다. 2. 종속성이 누락되었으며 관련 개발 패키지를 설치해야합니다. 3. 포트 충돌, 포트 3306을 차지하는 프로그램을 닫거나 구성 파일을 수정해야합니다. 4. 설치 패키지가 손상되어 무결성을 다운로드하여 확인해야합니다. 5. 환경 변수가 잘못 구성되었으며 운영 체제에 따라 환경 변수를 올바르게 구성해야합니다. 이러한 문제를 해결하고 각 단계를 신중하게 확인하여 MySQL을 성공적으로 설치하십시오.

MySQL 데이터베이스 성능 최적화 안내서 리소스 집약적 응용 프로그램에서 MySQL 데이터베이스는 중요한 역할을 수행하며 대규모 트랜잭션 관리를 담당합니다. 그러나 응용 프로그램 규모가 확장됨에 따라 데이터베이스 성능 병목 현상은 종종 제약이됩니다. 이 기사는 일련의 효과적인 MySQL 성능 최적화 전략을 탐색하여 응용 프로그램이 고 부하에서 효율적이고 반응이 유지되도록합니다. 실제 사례를 결합하여 인덱싱, 쿼리 최적화, 데이터베이스 설계 및 캐싱과 같은 심층적 인 주요 기술을 설명합니다. 1. 데이터베이스 아키텍처 설계 및 최적화 된 데이터베이스 아키텍처는 MySQL 성능 최적화의 초석입니다. 몇 가지 핵심 원칙은 다음과 같습니다. 올바른 데이터 유형을 선택하고 요구 사항을 충족하는 가장 작은 데이터 유형을 선택하면 저장 공간을 절약 할 수있을뿐만 아니라 데이터 처리 속도를 향상시킬 수 있습니다.

MySQL 성능 최적화는 설치 구성, 인덱싱 및 쿼리 최적화, 모니터링 및 튜닝의 세 가지 측면에서 시작해야합니다. 1. 설치 후 innodb_buffer_pool_size 매개 변수와 같은 서버 구성에 따라 my.cnf 파일을 조정해야합니다. 2. 과도한 인덱스를 피하기 위해 적절한 색인을 작성하고 Execution 명령을 사용하여 실행 계획을 분석하는 것과 같은 쿼리 문을 최적화합니다. 3. MySQL의 자체 모니터링 도구 (showprocesslist, showstatus)를 사용하여 데이터베이스 건강을 모니터링하고 정기적으로 백업 및 데이터베이스를 구성하십시오. 이러한 단계를 지속적으로 최적화함으로써 MySQL 데이터베이스의 성능을 향상시킬 수 있습니다.

MySQL은 기본 데이터 저장 및 관리를위한 네트워크 연결없이 실행할 수 있습니다. 그러나 다른 시스템과의 상호 작용, 원격 액세스 또는 복제 및 클러스터링과 같은 고급 기능을 사용하려면 네트워크 연결이 필요합니다. 또한 보안 측정 (예 : 방화벽), 성능 최적화 (올바른 네트워크 연결 선택) 및 데이터 백업은 인터넷에 연결하는 데 중요합니다.

MySQL이 시작을 거부 했습니까? 당황하지 말고 확인합시다! 많은 친구들이 MySQL을 설치 한 후 서비스를 시작할 수 없다는 것을 알았으며 너무 불안했습니다! 걱정하지 마십시오.이 기사는 침착하게 다루고 그 뒤에있는 마스터 마인드를 찾을 수 있습니다! 그것을 읽은 후에는이 문제를 해결할뿐만 아니라 MySQL 서비스에 대한 이해와 문제 해결 문제에 대한 아이디어를 향상시키고보다 강력한 데이터베이스 관리자가 될 수 있습니다! MySQL 서비스는 시작되지 않았으며 간단한 구성 오류에서 복잡한 시스템 문제에 이르기까지 여러 가지 이유가 있습니다. 가장 일반적인 측면부터 시작하겠습니다. 기본 지식 : 서비스 시작 프로세스 MySQL 서비스 시작에 대한 간단한 설명. 간단히 말해서 운영 체제는 MySQL 관련 파일을로드 한 다음 MySQL 데몬을 시작합니다. 여기에는 구성이 포함됩니다
