Example analysis of multiple scheduled tasks executed in a single thread in Python development

黄舟
Release: 2017-07-27 16:03:23
Original
1846 people have browsed it

Single-threaded multi-timing tasks

1. Initial version:

Idea: timer , to put it bluntly, it means delaying the execution of the specified program. Currently, it is not practical to reconstruct the timer in python by yourself, and the ability is not reached, so the system timer must be used for delay operation, but we can change the rules; All programs that need to perform scheduled operations are added to a specific list, take out the program with the shortest scheduled time in the list, bind threading.Timer (time, callback), wait for the time to time out, trigger a custom callback, and execute the program just removed from the list ; Then update the time, take out the program with the shortest time in the list again, continue to bind threading.Timer, and continue the iterative loop; when a new scheduled task is added to the list, cancel the current threading.Timer binding , update the time in the list, take out the shortest time again, and bind threading.Timer...

Code:


import threading
import time

class Timer():
    '''单线程下的定时器'''

    def __init__(self):
        self.queues = []
        self.timer = None
        self.last_time = time.time()

    def start(self):
        item = self.get()
        if item:
            self.timer = threading.Timer(item[0],self.execute)
            self.timer.start()

    def add(self,item):
        print('add',item)
        self.flush_time()
        self.queues.append(item)
        self.queues.sort(key=lambda x:x[0])

        if self.timer:
            self.timer.cancel()
            self.timer = None
        self.start()

    def get(self):
        item = None
        if len(self.queues) > 0:
            item = self.queues[0]
        return item

    def pop(self):
        item = None
        if len(self.queues) > 0:
            item = self.queues.pop(0)
        return item

    def flush_time(self):
        curr_time = time.time()
        for i in self.queues:
            i[0] = i[0] - (curr_time - self.last_time)
        self.last_time = curr_time

    def execute(self):
        # if self.timer:
        #     self.timer.cancel()
        #     self.timer = None
        item = self.pop()
        self.flush_time()
        if item:
            callback = item[1]
            args = item[0]
            callback(args)
        self.start()
Copy after login

Execution and output:


##

if __name__ == '__main__':    # 检测线程数
    def func():        while True:            print(threading.active_count())
            time.sleep(1)
    
    f1 = threading.Thread(target=func)
    f1.start()    
    import logging
    logging.basicConfig(level=logging.INFO,format="%(asctime)s %(message)s", datefmt="%m/%d/%Y %H:%M:%S [%A]")    def func1(*args):
        logging.info('func1 %s'%args)        # time.sleep(5)
    
    def func2(*args):
        logging.info('func2 %s' % args)        # time.sleep(5)
    def func3(*args):
        logging.info('func3 %s' % args)        # time.sleep(5)
    
    def func4(*args):
        logging.info('func4 %s' % args)        # time.sleep(5)
    
    def func5(*args):
        logging.info('func5 %s' % args)        # time.sleep(5)
    
    
    # 测试
    t1 = Timer()
    logging.info('start')
    t1.add([5,func1])
    time.sleep(0.5)
    t1.add([4,func2])
    time.sleep(0.5)
    t1.add([3,func3])
    time.sleep(0.5)
    t1.add([2,func4])
    time.sleep(0.5)
    t1.add([1,func5])
    time.sleep(5)
    t1.add([1,func1])
    t1.add([2,func2])
    t1.add([3,func3])
    t1.add([4,func4])
    t1.add([5,func5])    
    # 输出
    # 2
    # 07/27/2017 10:36:47 [Thursday] start
    # add [5, <function func1 at 0x000000D79FC77E18>]
    # add [4, <function func2 at 0x000000D79FCA8488>]
    # 3
    # add [3, <function func3 at 0x000000D79FCA8510>]
    # add [2, <function func4 at 0x000000D79FCA8598>]
    # 3
    # add [1, <function func5 at 0x000000D79FCA8620>]
    # 3
    # 07/27/2017 10:36:50 [Thursday] func5 1
    # 07/27/2017 10:36:51 [Thursday] func4 0.498349666595459
    # 3
    # 07/27/2017 10:36:51 [Thursday] func3 0.49782633781433105
    # 07/27/2017 10:36:52 [Thursday] func2 0.49848270416259766
    # 3
    # 07/27/2017 10:36:52 [Thursday] func1 0.48449039459228516
    # 2
    # 2
    # add [1, <function func1 at 0x000000D79FC77E18>]
    # add [2, <function func2 at 0x000000D79FCA8488>]
    # add [3, <function func3 at 0x000000D79FCA8510>]
    # add [4, <function func4 at 0x000000D79FCA8598>]
    # add [5, <function func5 at 0x000000D79FCA8620>]
    # 3
    # 07/27/2017 10:36:55 [Thursday] func1 0.9990766048431396
    # 3
    # 07/27/2017 10:36:56 [Thursday] func2 0.9988017082214355
    # 3
    # 07/27/2017 10:36:57 [Thursday] func3 0.99928879737854
    # 07/27/2017 10:36:58 [Thursday] func4 0.9991350173950195
    # 3
    # 3
    # 07/27/2017 10:36:59 [Thursday] func5 0.9988160133361816
Copy after login

Execution code

Note: Check the code output. All timers are executed in sequence according to the calibrated time. It is perfect. Everything looks beautiful. It just looks like, hahaha, when you put func After time.sleep(5) is enabled, the number of threads increases slowly; the reason is that the last timer callback is still being executed, and the next timer has been started, and then another thread is added. Alas, Failure

2, revised version

Idea: Use the generator-consumer model, The threading.Condition condition variable is used; a Timer is forced to be enabled forever!

Code:


import time
import threading
import logging

class NewTimer(threading.Thread):
    &#39;&#39;&#39;单线程下的定时器&#39;&#39;&#39;
    def __init__(self):
        super().__init__()
        self.queues = []
        self.timer = None
        self.cond = threading.Condition()

    def run(self):
        while True:
            # print(&#39;NewTimer&#39;,self.queues)
            self.cond.acquire()
            item = self.get()
            callback = None
            if not item:
                logging.info(&#39;NewTimer wait&#39;)
                self.cond.wait()
            elif item[0] <= time.time():
                new_item = self.pop()
                callback = new_item[1]
            else:
                logging.info(&#39;NewTimer start sys timer and wait&#39;)
                self.timer = threading.Timer(item[0]-time.time(),self.execute)
                self.timer.start()
                self.cond.wait()
            self.cond.release()

            if callback:
                callback(item[0])

    def add(self, item):
        # print(&#39;add&#39;, item)
        self.cond.acquire()
        item[0] = item[0] + time.time()
        self.queues.append(item)
        self.queues.sort(key=lambda x: x[0])
        logging.info(&#39;NewTimer add notify&#39;)
        if self.timer:
            self.timer.cancel()
            self.timer = None
        self.cond.notify()
        self.cond.release()

    def pop(self):
        item = None
        if len(self.queues) > 0:
            item = self.queues.pop(0)
        return item

    def get(self):
        item = None
        if len(self.queues) > 0:
            item = self.queues[0]
        return item

    def execute(self):
        logging.info(&#39;NewTimer execute notify&#39;)
        self.cond.acquire()
        self.cond.notify()
        self.cond.release()
Copy after login

Execution and output:

##

if __name__ == &#39;__main__&#39;:    def func():        while True:            print(threading.active_count())
            time.sleep(1)

    f1 = threading.Thread(target=func)
    f1.start()
    logging.basicConfig(level=logging.INFO,format="%(asctime)s %(message)s", datefmt="%m/%d/%Y %H:%M:%S [%A]")

    newtimer = NewTimer()
    newtimer.start()    def func1(*args):
        logging.info(&#39;func1 %s&#39;%args)
        time.sleep(5)    def func2(*args):
        logging.info(&#39;func2 %s&#39; % args)
        time.sleep(5)    def func3(*args):
        logging.info(&#39;func3 %s&#39; % args)
        time.sleep(5)    def func4(*args):
        logging.info(&#39;func4 %s&#39; % args)
        time.sleep(5)    def func5(*args):
        logging.info(&#39;func5 %s&#39; % args)
        time.sleep(5)

    newtimer.add([5,func1])
    newtimer.add([4,func2])
    newtimer.add([3,func3])
    newtimer.add([2,func4])
    newtimer.add([1,func5])
    time.sleep(1)
    newtimer.add([1,func1])
    newtimer.add([2,func2])
    newtimer.add([3,func3])
    newtimer.add([4,func4])
    newtimer.add([5,func5])# 输出# 2# 07/27/2017 11:26:19 [Thursday] NewTimer wait# 07/27/2017 11:26:19 [Thursday] NewTimer add notify# 07/27/2017 11:26:19 [Thursday] NewTimer add notify# 07/27/2017 11:26:19 [Thursday] NewTimer add notify# 07/27/2017 11:26:19 [Thursday] NewTimer add notify# 07/27/2017 11:26:19 [Thursday] NewTimer add notify# 07/27/2017 11:26:19 [Thursday] NewTimer start sys timer and wait# 07/27/2017 11:26:20 [Thursday] NewTimer execute notify# 4# 07/27/2017 11:26:20 [Thursday] func5 1501125980.2175007# 07/27/2017 11:26:20 [Thursday] NewTimer add notify# 07/27/2017 11:26:20 [Thursday] NewTimer add notify# 07/27/2017 11:26:20 [Thursday] NewTimer add notify# 07/27/2017 11:26:20 [Thursday] NewTimer add notify# 07/27/2017 11:26:20 [Thursday] NewTimer add notify# 3# 3# 3# 3# 3# 07/27/2017 11:26:25 [Thursday] func4 1501125981.2175007# 3# 3# 3# 3# 07/27/2017 11:26:30 [Thursday] func1 1501125981.218279# 3# 3# 3# 3# 3# 3# 07/27/2017 11:26:35 [Thursday] func3 1501125982.2175007# 3# 3# 3# 3# 07/27/2017 11:26:40 [Thursday] func2 1501125982.218279# 3# 3# 3# 3# 3# 07/27/2017 11:26:45 [Thursday] func2 1501125983.2175007# 3# 3# 3# 3# 3# 07/27/2017 11:26:50 [Thursday] func3 1501125983.218279# 3# 3# 3# 3# 3# 07/27/2017 11:26:55 [Thursday] func1 1501125984.2175007# 3# 3# 3# 3# 3# 07/27/2017 11:27:00 [Thursday] func4 1501125984.218279# 3# 3# 3# 3# 3# 07/27/2017 11:27:05 [Thursday] func5 1501125985.218279# 3# 3# 3# 3# 3# 07/27/2017 11:27:10 [Thursday] NewTimer wait
Copy after login

Output

Note: The number of test threads will not increase in any way this time, and multi-timer task requirements can be achieved at the same time; Disadvantages: Two threads are used, and a single thread is not used for implementation, and the second time accuracy problem , you need to wait for the last scheduled program to complete before the program can continue to run

The above is the detailed content of Example analysis of multiple scheduled tasks executed in a single thread in Python development. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template