While it's generally preferable to request threads to exit gracefully, there are situations where abrupt termination is necessary. This article explores methods to terminate threads even when they may not be designed for it.
The recommended approach is to use a stop flag that threads periodically check to determine if they should exit. This allows threads to release resources and perform cleanup before ending.
import threading class StoppableThread(threading.Thread): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._stop_event = threading.Event() def stop(self): self._stop_event.set() def stopped(self): return self._stop_event.is_set()
In rare cases, it may be necessary to forcefully terminate a thread. This can be achieved using the _async_raise function:
def _async_raise(tid, exctype): if not inspect.isclass(exctype): raise TypeError("Only types can be raised (not instances)") res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(exctype)) if res == 0: raise ValueError("invalid thread id") elif res != 1: ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), None) raise SystemError("PyThreadState_SetAsyncExc failed") class ThreadWithExc(threading.Thread): def raise_exc(self, exctype): _async_raise(self._get_my_tid(), exctype)
Note that forceful termination can leave resources in an unstable state. Only use this option when graceful termination is not available or when the thread is actively blocking the program.
The above is the detailed content of How Can I Gracefully and Forcefully Terminate a Running Thread in Python?. For more information, please follow other related articles on the PHP Chinese website!