Ich habe im vorherigen Artikel eine basedao.py über die Kapselung von DBUtils und pymysql geschrieben. In den letzten Tagen habe ich meine Gedanken neu geordnet und basedao.py optimiert. Es werden jedoch nicht viele Methoden unterstützt in der Zukunft gemacht.
Hauptfunktionen:
1. Ein einzelnes Objekt abfragen:
Erforderliche Parameter: Tabellenname, Filterbedingungen
2. Mehrere Objekte abfragen:
Erforderliche Parameter: Tabellenname, Filterbedingung
3. Abfrage nach Primärschlüssel:
Erforderliche Parameter: Tabellenname, Wert
4. Paging-Abfrage:
Erforderliche Parameter: Tabelle Name, Seitenzahl, Anzahl der Datensätze pro Seite, Filterbedingungen
Der spezifische Code lautet wie folgt:
1 import json, os, sys, time 2 3 import pymysql 4 from DBUtils import PooledDB 5 6 class BaseDao(object): 7 """ 8 简便的数据库操作基类 9 """ 10 __config = {} # 数据库连接配置 11 __conn = None # 数据库连接 12 __cursor = None # 数据库游标 13 __database = None # 用于临时村塾查询数据库 14 __tableName = None # 用于临时存储查询表名 15 __fields = [] # 用于临时存储查询表的字段列表 16 __primaryKey_dict = {} # 用于存储配置中的数据库中所有表的主键 17 18 def __init__(self, creator=pymysql, host="localhost", user=None, password="", database=None, port=3306, charset="utf8"): 19 if host is None: 20 raise Exception("Parameter [host] is None.") 21 if user is None: 22 raise Exception("Parameter [user] is None.") 23 if password is None: 24 raise Exception("Parameter [password] is None.") 25 if database is None: 26 raise Exception("Parameter [database] is None.") 27 if port is None: 28 raise Exception("Parameter [port] is None.") 29 self.__config = dict({ 30 "creator" : creator, "charset":charset, 31 "host":host, "port":port, 32 "user":user, "password":password, "database":database 33 }) 34 self.__conn = PooledDB.connect(**self.__config) 35 self.__cursor = self.__conn.cursor() 36 self.__database = self.__config["database"] 37 self.__init_primaryKey() 38 print(get_time(), "数据库连接初始化成功。") 39 40 def __del__(self): 41 '重写类被清除时调用的方法' 42 if self.__cursor: 43 self.__cursor.close() 44 print(get_time(), "游标关闭") 45 if self.__conn: 46 self.__conn.close() 47 print(get_time(), "连接关闭") 48 49 def select_one(self, tableName=None, filters={}): 50 ''' 51 查询单个对象 52 @tableName 表名 53 @filters 过滤条件 54 @return 返回字典集合,集合中以表字段作为 key,字段值作为 value 55 ''' 56 self.__check_params(tableName) 57 sql = self.__query_util(filters) 58 self.__cursor.execute(sql) 59 result = self.__cursor.fetchone() 60 return self.__parse_result(result) 61 62 def select_pk(self, tableName=None, primaryKey=None): 63 ''' 64 按主键查询 65 @tableName 表名 66 @primaryKey 主键值 67 ''' 68 self.__check_params(tableName) 69 filters = {} 70 filters.setdefault(str(self.__primaryKey_dict[tableName]), primaryKey) 71 sql = self.__query_util(filters) 72 self.__cursor.execute(sql) 73 result = self.__cursor.fetchone() 74 return self.__parse_result(result) 75 76 def select_all(self, tableName=None, filters={}): 77 ''' 78 查询所有 79 @tableName 表名 80 @filters 过滤条件 81 @return 返回字典集合,集合中以表字段作为 key,字段值作为 value 82 ''' 83 self.__check_params(tableName) 84 sql = self.__query_util(filters) 85 self.__cursor.execute(sql) 86 results = self.__cursor.fetchall() 87 return self.__parse_results(results) 88 89 def count(self, tableName=None): 90 ''' 91 统计记录数 92 ''' 93 self.__check_params(tableName) 94 sql = "SELECT count(*) FROM %s"%(self.__tableName) 95 self.__cursor.execute(sql) 96 result = self.__cursor.fetchone() 97 return result[0] 98 99 def select_page(self, tableName=None, pageNum=1, limit=10, filters={}):100 '''101 分页查询102 @tableName 表名103 @return 返回字典集合,集合中以表字段作为 key,字段值作为 value104 '''105 self.__check_params(tableName)106 totalCount = self.count()107 if totalCount / limit == 0 :108 totalPage = totalCount / limit109 else:110 totalPage = totalCount // limit + 1111 if pageNum > totalPage:112 print("最大页数为%d"%totalPage)113 pageNum = totalPage114 elif pageNum < 1:115 print("页数不能小于1")116 pageNum = 1117 beginindex = (pageNum-1) * limit118 filters.setdefault("_limit_", (beginindex, limit))119 sql = self.__query_util(filters)120 self.__cursor.execute(sql)121 results = self.__cursor.fetchall()122 return self.__parse_results(results)123 124 def __parse_result(self, result):125 '用于解析单个查询结果,返回字典对象'126 obj = {}127 for k,v in zip(self.__fields, result):128 obj[k] = v129 return obj130 131 def __parse_results(self, results):132 '用于解析多个查询结果,返回字典列表对象'133 objs = []134 for result in results:135 obj = self.__parse_result(result)136 objs.append(obj)137 return objs138 139 def __init_primaryKey(self):140 '根据配置中的数据库读取该数据库中所有表的主键集合'141 sql = """SELECT TABLE_NAME, COLUMN_NAME142 FROM Information_schema.columns143 WHERE COLUMN_KEY='PRI' AND TABLE_SCHEMA='%s'"""%(self.__database)144 self.__cursor.execute(sql)145 results = self.__cursor.fetchall()146 for result in results:147 self.__primaryKey_dict[result[0]] = result[1]148 149 def __query_fields(self, tableName=None, database=None):150 '查询表的字段列表, 将查询出来的字段列表存入 __fields 中'151 sql = """SELECT column_name152 FROM Information_schema.columns153 WHERE table_Name = '%s' AND TABLE_SCHEMA='%s'"""%(tableName, database)154 self.__cursor.execute(sql)155 fields_tuple = self.__cursor.fetchall()156 self.__fields = [fields[0] for fields in fields_tuple]157 158 def __query_util(self, filters=None):159 """160 SQL 语句拼接方法161 @filters 过滤条件162 """163 sql = r'SELECT #{FIELDS} FROM #{TABLE_NAME} WHERE 1=1 #{FILTERS}'164 # 拼接查询表165 sql = sql.replace("#{TABLE_NAME}", self.__tableName)166 # 拼接查询字段167 self.__query_fields(self.__tableName, self.__database)168 FIELDS = ""169 for field in self.__fields:170 FIELDS += field + ", "171 FIELDS = FIELDS[0: len(FIELDS)-2]172 sql = sql.replace("#{FIELDS}", FIELDS)173 # 拼接查询条件(待优化)174 if filters is None:175 sql = sql.replace("#{FILTERS}", "")176 else:177 FILTERS = ""178 if not isinstance(filters, dict):179 raise Exception("Parameter [filters] must be dict type. ")180 isPage = False181 if filters.get("_limit_"):182 isPage = True183 beginindex, limit = filters.get("_limit_")184 for k, v in filters.items():185 if k.startswith("_in_"): # 拼接 in186 FILTERS += "AND %s IN (" %(k[4:])187 values = v.split(",")188 for value in values:189 FILTERS += "%s,"%value190 FILTERS = FILTERS[0:len(FILTERS)-1] + ") "191 elif k.startswith("_nein_"): # 拼接 not in192 FILTERS += "AND %s NOT IN (" %(k[4:])193 values = v.split(",")194 for value in values:195 FILTERS += "%s,"%value196 FILTERS = FILTERS[0:len(FILTERS)-1] + ") "197 elif k.startswith("_like_"): # 拼接 like198 FILTERS += "AND %s like '%%%s%%' " %(k[6:], v)199 elif k.startswith("_ne_"): # 拼接不等于200 FILTERS += "AND %s != '%s' " %(k[4:], v)201 elif k.startswith("_lt_"): # 拼接小于202 FILTERS += "AND %s < '%s' " %(k[4:], v)203 elif k.startswith("_le_"): # 拼接小于等于204 FILTERS += "AND %s <= '%s' " %(k[4:], v)205 elif k.startswith("_gt_"): # 拼接大于206 FILTERS += "AND %s > '%s' " %(k[4:], v)207 elif k.startswith("_ge_"): # 拼接大于等于208 FILTERS += "AND %s >= '%s' " %(k[4:], v)209 elif k in self.__fields: # 拼接等于210 FILTERS += "AND %s = '%s' "%(k, v)211 sql = sql.replace("#{FILTERS}", FILTERS)212 if isPage:213 sql += "LIMIT %d,%d"%(beginindex, limit)214 215 print(get_time(), sql)216 return sql217 218 def __check_params(self, tableName):219 '''220 检查参数221 '''222 if tableName:223 self.__tableName = tableName224 else:225 if self.__tableName is None:226 raise Exception("Parameter [tableName] is None.")227 228 def get_time():229 return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())230 231 if __name__ == "__main__":232 config = {233 # "creator": pymysql,234 # "host" : "127.0.0.1", 235 "user" : "root", 236 "password" : "root",237 "database" : "test", 238 # "port" : 3306,239 # "charset" : 'utf8'240 }241 base = BaseDao(**config)242 ########################################################################243 # user = base.select_one("user")244 # print(user)245 ########################################################################246 # users = base.select_all("user")247 # print(users)248 ########################################################################249 # filter1 = {250 # "sex":0,251 # "_in_id":"1,2,3,4,5",252 # "_like_name":"zhang",253 # "_ne_name":"wangwu"254 # }255 # user_filters = base.select_all(tableName="user", filters=filter1)256 # print(user_filters)257 ########################################################################258 # menu = base.select_one(tableName="menu")259 # print(menu)260 ########################################################################261 # user_pk = base.select_pk("user", 2)262 # print(user_pk)263 ########################################################################264 # filter2 = {265 # "_in_id":"1,2,3,4",266 # "_like_name":"test"267 # }268 # user_limit = base.select_page("user", 2, 10, filter2) #未实现269 # print(user_limit)270 ########################################################################
Der Code enthält mehrere konkrete Beispiele als Referenz.
Das obige ist der detaillierte Inhalt vonPython kapselt DBUtils und Pymysql-Instanzen. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!