通常、マルチプロセス アプリケーションを開発する場合、複数のプロセスが同じリソース (重要なリソース) にアクセスする状況が必然的に発生します。このとき、リソース割り当てを実現するために、グローバル ロックを追加する必要があります。 1 つのプロセスが同時にリソースにアクセスできます)。
例は次のとおりです:
mysql を使用してタスクキューを実装するとします。実装プロセスは次のとおりです。
1. 次のように、キュー タスク を保存するジョブ テーブルを MySQL に作成します。
create table jobs( id auto_increment not null primary key, message text not null, job_status not null default 0 );
。
2. 新しいデータ をジョブ テーブルに入れてキューに入れるプロデューサー プロセスがあります。
insert into jobs(message) values('msg1');
を取得します。実行される操作は次のとおりです:
select * from jobs where job_status=0 order by id asc limit 1; update jobs set job_status=1 where id = ?; -- id为刚刚取得的记录id
==========================区切り線=================== = ===================
クロスプロセス ロックの実装に関しては、いくつかの主な実装方法があります。
(1)セマフォ
(2)ファイルロックfcntl
(3)ソケット(ポート番号バインド)
(4)信号
これらの方法にはそれぞれ長所と短所があります。一般的には、最初の 2 つの方法の方が一般的です。ここでは詳しく説明しません。
情報を確認したところ、mysql にはロックの実装があり、大規模な同時分散アクセスがボトルネックになる可能性があるアプリケーション シナリオに適していることがわかりました。
デモは次のように Python を使用して実装されました:
ファイル名: glock.py
#!/usr/bin/env python2.7 # # -*- coding:utf-8 -*- # # Desc : # import logging, time import MySQLdb class Glock: def __init__(self, db): self.db = db def _execute(self, sql): cursor = self.db.cursor() try: ret = None cursor.execute(sql) if cursor.rowcount != 1: logging.error("Multiple rows returned in mysql lock function.") ret = None else: ret = cursor.fetchone() cursor.close() return ret except Exception, ex: logging.error("Execute sql \"%s\" failed! Exception: %s", sql, str(ex)) cursor.close() return None def lock(self, lockstr, timeout): sql = "SELECT GET_LOCK('%s', %s)" % (lockstr, timeout) ret = self._execute(sql) if ret[0] == 0: logging.debug("Another client has previously locked '%s'.", lockstr) return False elif ret[0] == 1: logging.debug("The lock '%s' was obtained successfully.", lockstr) return True else: logging.error("Error occurred!") return None def unlock(self, lockstr): sql = "SELECT RELEASE_LOCK('%s')" % (lockstr) ret = self._execute(sql) if ret[0] == 0: logging.debug("The lock '%s' the lock is not released(the lock was not established by this thread).", lockstr) return False elif ret[0] == 1: logging.debug("The lock '%s' the lock was released.", lockstr) return True else: logging.error("The lock '%s' did not exist.", lockstr) return None #Init logging def init_logging(): sh = logging.StreamHandler() logger = logging.getLogger() logger.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s') sh.setFormatter(formatter) logger.addHandler(sh) logging.info("Current log level is : %s",logging.getLevelName(logger.getEffectiveLevel())) def main(): init_logging() db = MySQLdb.connect(host='localhost', user='root', passwd='') lock_name = 'queue' l = Glock(db) ret = l.lock(lock_name, 10) if ret != True: logging.error("Can't get lock! exit!") quit() time.sleep(10) logging.info("You can do some synchronization work across processes!") ##TODO ## you can do something in here ## l.unlock(lock_name) if __name__ == "__main__": main()
このデモでは、TODO がマークされており、コンシューマがジョブ テーブルからメッセージを取得するためのロジックをここに配置できます。つまり、境界線
の上です。
2. 複数のコンシューマープロセスがあると仮定し、ジョブテーブルからキュー情報を取得します。実行される操作は次のとおりです。
このようにして、複数のプロセスが重要なリソースに同時にアクセスし、データの一貫性を確保できます。
select * from jobs where job_status=0 order by id asc limit 1; update jobs set job_status=1 where id = ?; -- id为刚刚取得的记录id
[@tj-10-47 test]# ./glock.py 2014-03-14 17:08:40,277 -glock:glock.py-L70-INFO: Current log level is : DEBUG 2014-03-14 17:08:40,299 -glock:glock.py-L43-DEBUG: The lock 'queue' was obtained successfully. 2014-03-14 17:08:50,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes! 2014-03-14 17:08:50,299 -glock:glock.py-L56-DEBUG: The lock 'queue' the lock was released.