Home > Database > Mysql Tutorial > [置顶] python脚本sqlite3模块的应用

[置顶] python脚本sqlite3模块的应用

WBOY
Release: 2016-06-07 14:50:35
Original
1422 people have browsed it

#!/usr/bin/python # -*- coding:utf-8 -*- import sqlite3 import os class SQLTest: '''sqlite数据库接口''' def __init__(self,path='',verbose=False): self.verbose = verbose self.path = path if os.path.isfile(path): self.conn = sqlite3.connect(p

#!/usr/bin/python
# -*- coding:utf-8 -*-
import sqlite3
import os


class SQLTest:
    '''sqlite数据库接口'''
    def __init__(self,path='',verbose=False):
        self.verbose = verbose
        self.path = path
        if os.path.isfile(path):
            self.conn = sqlite3.connect(path)
            if self.verbose:
                print('硬盘上面:[{}].format(path)')
        else:
            self.conn = sqlite3.connect(':memory:')
            if self.verbose:
                print('内存上面:[:memory:]')
    
    def setverbose(self,b):
        self.verbose = b
        
    def createtable(self,sql):
        '''创建数据库表'''
        if sql is not None and sql != '':
            cu = self.conn.cursor()
            if self.verbose:
                print('执行sql:[{}]'.format(sql))
            cu.execute(sql)
            self.conn.commit()
            if self.verbose:
                print('创建数据库表成功!')
            self.close_all(cu)
        else:
            print('The [{}] is empty or equal None!'.format(sql))
    
    def querytable(self):
        sql = 'SELECT name FROM sqlite_master WHERE type="table" ORDER BY name'
        cu = self.conn.cursor()
        cu.execute(sql)
        return cu.fetchall()
    
    def renametable(self,table,newtable):
        if table is not None and talbe !='':
            sql = 'ALTER TABLE %s RENAME TO "%s" ' % (table,newtable)
            cu = self.conn.cursor()
            cu.execute(sql)
            self.conn.commit()
            print('delete table sucess!')
            self.close_all(cu)
        else:
            print('The [{}] is empty or equal None!'.format(sql))    
    
    def insert(self,sql,data):
        if sql is not None and sql != '':
            if data is not None:
                cu = self.conn.cursor()
                for d in data:
                    cu.execute(sql,d)
                    self.conn.commit()
                if self.verbose:
                    print('插入数据库表成功!')
                self.close_all(cu)
        else:
            print('The [{}] is empty or equal None!'.format(sql))    
            
    def fetchall(self,sql):
        if sql is not None and sql != '':
            cu = self.conn.cursor()
            if self.verbose:
                print('执行sql:[{}]'.format(sql))
            cu.execute(sql)
            return cu.fetchall()
        else:
            print('The [{}] is empty or equal None!'.format(sql))
            
    def fetchone(self,sql,data):
        if sql is not None and sql != '':
            if data is not None:
                cu = self.conn.cursor()
                cu.execute(sql,(data,))
                return cu.fetchall()
            else:
                print('The [{}] is None!'.format(data))
        else:
            print('The [{}] is empty or equal None!'.format(sql))
            
    def updata(self,sql,data):
        if sql is not None and sql != '':
            if data is not None:
                cu = self.conn.cursor()
                for d in data:
                    cu.execute(sql,d)
                    self.conn.commit()
                self.close_all(cu)
        else:
            print('The [{}] is empty or equal None!'.format(sql))
            
    def rowcount(self,table):
        sql = 'select count (*) from "%s"' % table
        cu = self.conn.cursor()
        r = cu.execute(sql)
        return (r.fetchone()[0])
    
    def delete(self,sql,data):
        if sql is not None and sql != '':
            if data is not None:
                cu = self.conn.cursor()
                for d in data:
                    cu.execute(sql,d)
                    self.conn.commit()
                self.close_all(cu)
        else:
            print('The [{}] is empty or equal None!'.format(sql))
                    
    def droptable(self,table):
        if table is not None and table != '':
            sql = 'DROP TABLE IF EXISTS ' + table
            cu = self.conn.cursor()
            cu.execute(sql)
            self.conn.commit()
            print('delete table sucess!')
            self.close_all(cu)
        else:
            print('The [{}] is empty or equal None!'.format(sql))
            
    def close_all(self,cu):
        try:
            if cu is not None:
                cu.close()
        finally:
            if cu is not None:
                cu.close()
                
# function
def drop_table_test(sql,table):
    '''删除数据库表测试'''
    print('删除数据库表测试 ...')
    db.droptable(table)


def create_table_test(sql):
    '''创建数据库表测试'''
    print('创建数据库表测试 ...')
    create_table_sql = '''CREATE TABLE IF NOT EXISTS 'table1'(
                        'id' integer(32) NOT NULL,
                        'name' nvarchar(128) NOT NULL,
                        PRIMARY KEY('id')
                        )'''
    sql.createtable(create_table_sql)


def insert_test(sql):
    '''插入数据测试 ...'''
    print('插入数据测试 ...')
    insert_sql = 'INSERT INTO table1 values(?,?)'
    data = [(1,'aaa'),(2,'bbb')]
    sql.insert(insert_sql,data)


def fetchall_test(sql):
    '''查询所有数据'''
    print('查询所有数据 ...')
    fetchall_sql = 'SELECT * FROM table1'
    r = sql.fetchall(fetchall_sql)
    for e in range(len(r)):
        print(r[e])


def fetchone_test(sql):
    '''查询一条数据'''
    print('查询一条数据 ...')
    fetchall_sql = 'SELECT * FROM table1 WHERE id = ?'
    data = 1
    r = sql.fetchone(fetchall_sql,data)
    for e in range(len(r)):
        print(r[e])


def query_test(sql):
    '''有几个表'''
    print('有几个表 ...')
    print(sql.query_table(db))
    
def update_test(sql):
    '''更新数据'''
    print('更新数据 ...')
    update_sql = 'UPDATE table1 SET name = ? WHERE id = ?'
    data = [('TestA',1),('TestB',2)]
    sql.updata(update_sql,data)


def delete_test(db):
    '''删除数据'''
    print('删除数据 ...')
    delete_sql = 'DELETE FROM table1 WHERE name = ? and id = ?'
    data = [('TestA',1)]
    sql.delete(delete_sql,data)
    
# self test
if __name__ == '__main__':  
    sql = SQLTest(verbose=True)
    #创建数据库表
    create_table_test(sql)
    #插入数据
    insert_test(sql)
    #查询多条数据
    fetchall_test(sql)
    #查询一条数据
    fetchone_test(sql)
    #更新数据
    update_test(sql)
    #查询多条数据
    fetchall_test(sql)
    #删除一条数据
    delete_test(sql)
    print(sql.rowcount('table1'))
    #查询多条数据
    fetchall_test(sql)

    

#==================================================================================

测试结果

    
    
    
source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template