Home > Backend Development > Python Tutorial > Python daemon (multi-threaded development)

Python daemon (multi-threaded development)

高洛峰
Release: 2016-10-18 09:38:45
Original
1267 people have browsed it

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

#!/usr/bin/python

import sys,time,json,logging

import Queue, threading, datetime

from lib.base.daemon import Daemon

from lib.queue.httpsqs.HttpsqsClient import HttpsqsClient

from lib.db.DbMongodb import DbMongodb

logging.basicConfig(level=logging.DEBUG,

                format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',

                datefmt='%a, %d %b %Y %H:%M:%S',

                filename='myapp.log',

                filemode='w')

                  

queue = Queue.Queue()      

httpsqs = HttpsqsClient('192.168.0.218','1218','httpsqs.com')

db = DbMongodb('192.168.0.119','testdb')

          

class ThreadGetHttpSqs(threading.Thread):

    def __init__(self):

        threading.Thread.__init__(self)

        self.httpsqs = httpsqs

        self.queue = queue

      

    def run(self):

        while True:

            data = self.httpsqs.get('logtest')

            if data is not None:

                self.queue.put(data)

                logging.info('get:id %s , tablename %s' % (self.getName(),data))

            else:

                time.sleep(3)

              

              

              

class ThreadInsertDB(threading.Thread):

    def __init__(self):

        threading.Thread.__init__(self)

        self.queue = queue

        self.db = db

          

    def run(self):

        while True:

            chunk = self.queue.get()

            s = json.loads(chunk)

            tablename = s['table']

            data = s['data']

            self.db.save(tablename,data)

            logging.info('insert:id %s , tablename %s' % (self.getName(),tablename))

            self.queue.task_done()

              

class MyDaemon(Daemon):

    def _run(self):

        while True:

            for i in range(2):

                t = ThreadGetHttpSqs()

                #t.setDaemon(True)

                t.start()

              

            for i in range(2):

                b = ThreadInsertDB()

                #t.setDaemon(True)

                b.start()

            #线程已经为永真循环,进程不能再循环

            time.wait()

              

                  

                 

if __name__ == "__main__":

    daemon = MyDaemon('/tmp/daemon-example.pid')

    if len(sys.argv) == 2:

        if 'start' == sys.argv[1]:

            daemon.start()

        elif 'stop' == sys.argv[1]:

            daemon.stop()

        elif 'restart' == sys.argv[1]:

            daemon.restart()

        else:

            print "Unknown command"

            sys.exit(2)

        sys.exit(0)

    else:

        print "usage: %s start|stop|restart" % sys.argv[0]

        sys.exit(2)

Copy after login


Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template