本指南將教您如何使用 Python 建立 WebSocket 代理伺服器。
pip install websockets
import asyncio import websockets import json class WebSocketProxy: def init(self, source_url, symbols): self.source_url = source_url self.clients = set() self.symbols = symbols self.valid_user_key = "yourValidUserKey" # Single valid user key for authentication async def on_open(self, ws): print("Connected to source") symbols_str = ",".join(self.symbols.keys()) init_message = f"{{"userKey":"your_api_key", "symbol":"{symbols_str}"}}" await ws.send(init_message)
async def client_handler(self, websocket, path): try: # Wait for a message that should contain the authentication key auth_message = await asyncio.wait_for(websocket.recv(), timeout=10) auth_data = json.loads(auth_message) user_key = auth_data.get("userKey") if user_key == self.valid_user_key: self.clients.add(websocket) print(f"Client authenticated with key: {user_key}") try: await websocket.wait_closed() finally: self.clients.remove(websocket) else: print("Authentication failed") await websocket.close(reason="Authentication failed") except (asyncio.TimeoutError, json.JSONDecodeError, KeyError): print("Failed to authenticate") await websocket.close(reason="Failed to authenticate")
async def source_handler(self): async with websockets.connect(self.source_url) as websocket: await self.on_open(websocket) async for message in websocket: await self.broadcast(message) async def broadcast(self, message): if self.clients: await asyncio.gather(*(client.send(message) for client in self.clients))
def run(self, host="localhost", port=8765): start_server = websockets.serve(self.client_handler, host, port) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_until_complete(self.source_handler()) asyncio.get_event_loop().run_forever() if name == "main": symbols = {"EURUSD": {}, "GBPUSD": {}, "USDJPY": {}, "AUDUSD": {}, "USDCAD": {}} source_url = "ws://example.com/source" proxy = WebSocketProxy(source_url, symbols) proxy.run()
您已經成功開發了一個基於Python的WebSocket代理伺服器。此伺服器可以驗證客戶端身份,維護與指定資料來源的持久連接,並將從來源接收的訊息有效地分發給所有經過驗證的客戶端。事實證明,對於需要將資料從單一來源安全即時傳播到不同用戶群的應用程式來說,此功能非常寶貴。
徹底的伺服器測試對於確保最佳效能和可靠性至關重要。它驗證其對連接和訊息傳輸的正確處理。為了提高效率,請考慮實施負載平衡機制和自訂連線標頭。最後,建議將伺服器部署到適合生產部署的環境,例如專門為容納長期網路連線而設計的雲端服務。
另外,請查看我們網站上最初發布的教學:使用 Python 代理程式擴展外匯 WebSocket
以上是使用 Python 代理實現可擴展的外匯 WebSocket的詳細內容。更多資訊請關注PHP中文網其他相關文章!