Semasa proses ujian keperluan projek, anda perlu menghantar beberapa permintaan kes penggunaan kepada pelayan Nginx, dan kemudian menyemak log Nginx yang sepadan untuk menentukan sama ada terdapat kandungan ciri untuk menentukan sama ada tugas berjaya dilaksanakan. Untuk meningkatkan kecekapan, proses ini perlu diautomasikan.
Python 3.6.5
Reka bentuk dan pelaksanaan kod
#!/usr/bin/env python # -*- coding:utf-8 -*- """ @CreateTime: 2021/06/26 9:05 @Author : shouke """ import time import threading import subprocess from collections import deque def collect_nginx_log(): global nginx_log_queue global is_tasks_compete global task_status args = "tail -0f /usr/local/openresty/nginx/logs/access.log" while task_status != "req_log_got": with subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines = True) as proc: log_for_req = "" outs, errs = "", "" try: outs, errs = proc.communicate(timeout=2) except subprocess.TimeoutExpired: print("获取nginx日志超时,正在重试") proc.kill() try: outs, errs = proc.communicate(timeout=5) except subprocess.TimeoutExpired: print("获取nginx日志超时,再次超时,停止重试") break finally: for line in outs.split(" "): flag = ""client_ip":"10.118.0.77"" # 特征 if flag in line: # 查找包含特征内容的日志 log_for_req += line if task_status == "req_finished": nginx_log_queue.append(log_for_req) task_status = "req_log_got" def run_tasks(task_list): """ 运行任务 :param task_list 任务列表 """ global nginx_log_queue global is_tasks_compete global task_status for task in task_list: thread = threading.Thread(target=collect_nginx_log, name="collect_nginx_log") thread.start() time.sleep(1) # 执行任务前,让收集日志线程先做好准备 print("正在执行任务:%s" % task.get("name")) # 执行Nginx任务请求 # ... task_status = "req_finished" time_to_wait = 0.1 while task_status != "req_log_got": # 请求触发的nginx日志收集未完成 time.sleep(time_to_wait) time_to_wait += 0.01 else:# 获取到用例请求触发的nginx日志 if nginx_log_queue: nginx_log = nginx_log_queue.popleft() task_status = "req_ready" # 解析日志 # do something here # ... else: print("存储请求日志的队列为空") # do something here # ... if __name__ == "__main__": nginx_log_queue = deque() is_tasks_compete = False # 所有任务是否执行完成 task_status = "req_ready" # req_ready,req_finished,req_log_got # 存放执行次任务任务的一些状态 print("###########################任务开始###########################") tast_list = [{"name":"test_task", "other":"..."}] run_tasks(tast_list) is_tasks_compete = True current_active_thread_num = len(threading.enumerate()) while current_active_thread_num != 1: time.sleep(2) current_active_thread_num = len(threading.enumerate()) print("###########################任务完成###########################")
Nota:
1 bukan Satu langkah, langsung tail -0f /usr/local/openresty/nginx/logs/access.log | grep "特征内容"
? Ini kerana log Nginx tidak boleh diperolehi dengan melakukan ini
2. Dalam amalan, didapati apabila proc.communicate(timeout=2)
dilaksanakan buat kali pertama untuk mendapatkan log, ia sentiasa mustahil untuk mendapatkannya, ia akan tamat masa, dan pemerolehan kedua diperlukan, dan timeout
Jika tetapan terlalu kecil (saya telah mencuba menetapkannya kepada 1秒
dalam amalan), ia juga akan menyebabkan log Nginx tidak dapat diperoleh semasa pelaksanaan kedua.
Atas ialah kandungan terperinci Bagaimana untuk mendapatkan log Nginx yang sepadan dengan permintaan tugas dalam Python dalam masa nyata. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!