最近專案在準備搞SASS化,SASS化有一個特點就是多租用戶,且每個租用戶之間的資料都要隔離,對於資料庫的隔離方案常見的有資料庫隔離,表隔離,字段隔離,目前我只用到表隔離和字段隔離(資料庫隔離的原理也差不多)。對於欄位隔離比較簡單,就是查詢條件不同而已,例如像下面的SQL查詢:
SELECT * FROM t_demo WHERE tenant_id='xxx' AND is_del=0
但是為了嚴謹,需求上需要在執行SQL之前檢查對應的表是否帶上tenant_id
的查詢欄位。
對於表隔離就麻煩了一些,他需要做到在運行的時候根據對應的租戶ID來處理某個資料表,舉個例子,假如有下面這樣的一條SQL查詢:
SELECT * FROM t_demo WHERE is_del=0
在遇到租用戶A時,SQL查詢將變成:
SELECT * FROM t_demo_a WHERE is_del=0
在遇到租用戶B時,SQL查詢將變成:
SELECT * FROM t_demo_b WHERE is_del=0
如果商家數量固定時,一般在程式碼裡寫if-else
來判斷就可以了,但是常見的SASS化應用程式的商家是會一直新增的,那麼對於這個SQL邏輯就會變成這樣:
def sql_handle(tenant_id: str): table_name: str = f"t_demo_{tenant_id}" sql: str = f"SELECT * FROM {table_name} WHERE is_del=0"
但是這有幾個問題,對於ORM來說,一開始只建立一個t_demo
對應的表物件就可以了,現在卻要根據多個商家建立多個表對象,這是不切實際的,其次如果是裸寫SQL,一般會使用IDE的檢查,而對於這樣的SQL:
sql: str = f"SELECT * FROM {table_name} WHERE is_del=0"
IDE是沒辦法進行檢查的,當然還有一個最為嚴重的問題,就是當前的專案已經非常龐大了,如果每個相關表的呼叫都進行適配更改的話,那工程量就非常龐大了,所以最好的方案就是在引擎庫得到使用者傳過來的SQL語句後且還沒送到MySQL
伺服器之前自動的根據商家ID更改SQL, 而要達到這樣的效果,就必須侵入到我們使用的MySQL
的引擎庫,修改裡面的方法來相容我們的需求。
不管是使用
dbutils
還是sqlalchemy
,都可以指定一個引擎庫,目前常用的引擎庫是pymysql
,所以下文都將以pymysql
為例進行闡述。
由於必須侵入到我們使用的引擎庫,所以我們應該先判斷我們需要修改引擎庫的哪個方法,在經過源碼閱讀後,我判定只要更改pymysql.cursors.Cursor
的mogrify
方法:
def mogrify(self, query, args=None): """ Returns the exact string that is sent to the database by calling the execute() method. This method follows the extension to the DB API 2.0 followed by Psycopg. """ conn = self._get_db() if args is not None: query = query % self._escape_args(args, conn) return query
這個方法的作用就是把使用者傳過來的SQL和參數整合,產生一個最終的SQL,剛好符合我們的需求,於是可以透過繼承的思路來創建一個新的屬於我們自己的Cursor
類別:
import pymysql class Cursor(pymysql.cursors.Cursor): def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str: # 在此可以编写处理还合成的SQL逻辑 mogrify_sql: str = super().mogrify(query, args) # 在此可以编写处理合成后的SQL逻辑 return mogrify_sql class DictCursor(pymysql.cursors.DictCursorMixin, Cursor): """A cursor which returns results as a dictionary""" # 直接修改Cursor类的`mogrify`方法并不会影响到`DictCursor`类,所以我们也要创建一个新的`Cursor`类。
創建好了Cursor
類別後,就需要考慮如何在pymysql
中應用我們自訂的Cursor
類別了,一般的Mysql
連線庫都支援我們傳入自訂的Cursor
類,例如pymysql
:
import pymysql.cursors # Connect to the database connection = pymysql.connect( host='localhost', user='user', password='passwd', database='db', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor )
我們可以透過cursorclass
來指定我們的Cursor
類別,如果使用的函式庫不支援或其它原因則需要使用猴子補丁的方法,具體的使用方法見Python探針完成呼叫庫的資料擷取。
現在我們已經搞定了在何處修改SQL的問題了,接下來就要思考如何在mogrify
方法取得到商家ID以及那些表格要進行替換,一般我們在進行一段程式碼呼叫時,有兩種傳參數的方法, 一種是傳數組類型的參數:
with conn.cursor() as cursor: cursor.execute("SELECT * FROM t_demo WHERE is_del=%s", (0, ))
一種是傳字典類型的參數:
with conn.cursor() as cursor: cursor.execute("SELECT * FROM t_demo WHERE is_del=%(is_del)s", {"is_del": 0})
目前大多數的專案都存在這兩種類型的編寫習慣,而引擎庫在執行execute
時會經過處理後才把參數sql
#和args
傳給了mogrify
,如果我們是使用字典類型的參數,那麼可以在裡面嵌入我們需要的參數,並在mogrify
裡面提取出來,但是使用了數組類型的參數或是ORM庫的話就比較難傳遞參數給mogrify
方法了,這時可以透過context
隱式的把參數傳給mogrify
方法,具體的分析和原理可見:python如何使用contextvars模組原始碼分析。
context
的使用方法很簡單, 首先是建立一個context
封裝的類別:
from contextvars import ContextVar, Token from typing import Any, Dict, Optional, Set context: ContextVar[Dict[str, Any]] = ContextVar("context", default={}) class Context(object): """基础的context调用,支持Type Hints检查""" tenant_id: str replace_table_set: Set[str] def __getattr__(self, key: str) -> Any: value: Any = context.get().get(key) return value def __setattr__(self, key: str, value: Any) -> None: context.get()[key] = value class WithContext(Context): """简单的处理reset token逻辑,和context管理,只用在业务代码""" def __init__(self) -> None: self._token: Optional[Token] = None def __enter__(self) -> "WithContext": self._token = context.set({}) return self def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: if self._token: context.reset(self._token) self._token = None
接下來在業務程式碼中,透過context傳入目前業務對應的參數:
with WithContext as context: context.tenant_id = "xxx" context.replace_table_set = {"t_demo"} with conn.cursor() as cursor: cursor.execute("SELECT * FROM t_demo WHERE is_del=%s", (0, ))
然後在mogrify
中透過呼叫context
即可獲得對應的參數了:
import pymysql class Cursor(pymysql.cursors.Cursor): def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str: tenant_id: str = context.tenant_id replace_table_set: Set[str] = context.replace_table_set # 在此可以编写处理还合成的SQL逻辑 mogrify_sql: str = super().mogrify(query, args) # 在此可以编写处理合成后的SQL逻辑 return mogrify_sql
現在,萬事俱備,只剩下修改SQL的邏輯,之前在做別的專案的時候,建的表都是十分的規範,它們是以t_xxx
的格式給表命名,這樣一來替換表名十分方便,只要進行兩次替換就可以兼容大多數情況了,代碼如下:
import pymysql class Cursor(pymysql.cursors.Cursor): def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str: tenant_id: str = context.tenant_id replace_table_set: Set[str] = context.replace_table_set # 简单示例,实际上正则的效率会更好 for replace_table in replace_table_set: if replace_table in query: # 替换表名 query = query.replace(f" {replace_table} ", f" {replace_table}_{tenant_id} ") # 替换查询条件中带有表名的 query = query.replace(f" {replace_table}.", f" {replace_table}_{tenant_id}.") mogrify_sql: str = super().mogrify(query, args) # 在此可以编写处理合成后的SQL逻辑 return mogrify_sql
但是现在项目的SQL规范并不是很好,有些表名还是MySQL
的关键字,所以靠简单的替换是行不通的,同时这个需求中,一些表只需要字段隔离,需要确保有带上对应的字段查询,这就意味着必须有一个库可以来解析SQL
,并返回一些数据使我们可以比较方便的知道SQL
中哪些是表名,哪些是查询字段了。
目前在Python中有一个比较知名的SQL
解析库--sqlparse,它可以通过解析引擎把SQL解析成一个Python对象
,之后我们就可以通过一些语法来判断哪些是SQL
关键字, 哪些是表名,哪些是查询条件等等。但是这个库只实现一些底层的API,我们需要对他和SQL比较了解之后才能实现一些比较完备的功能,比如下面3种常见的SQL:
SELECT * FROM t_demo SELECT * FROM t_demo as demo SELECT * FROM t_other as other LEFT JOIN t_demo demo on demo.xxx==other.xxx
如果我们要通过sqlparse
来提取表名的话就需要处理这3种情况,而我们如果要每一个情况都编写出来的话,那将会非常费心费力,同时也可能存在遗漏的情况,这时就需要用到另外一个库--sql_metadata,这个库是基于sqlparse
和正则的解析库,同时提供了大量的常见使用方法的封装,我们通过直接调用对应的函数就能知道SQL
中有哪些表名,查询字段是什么了。
目前已知这个库有一个缺陷,就是会自动去掉字段的符号, 比如表名为关键字时,我们需要使用`符号把它包起来:
SELECT * FROM `case`
但在经过sql_metadata
解析后得到的表名是case
而不是`case`,需要人为的处理,但是我并不觉得这是一个BUG,自己不按规范创建表,能怪谁呢。
接下来就可以通过sql_metadata
的方法来实现我需要的功能了,在根据需求修改后,代码长这样(说明见注释):
from typing import Dict, Set, Tuple, Union import pymysql import sql_metadata class Cursor(pymysql.cursors.Cursor): def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str: tenant_id: str = context.tenant_id # 生成一个解析完成的SQL对象 sql_parse: sql_metadata.Parser = sql_metadata.Parser(query) # 新加的一个属性,这里存下需要校验查询条件的表名 check_flag = False where_table_set: Set[str] = context.where_table_set # 该方法会获取到SQL对应的table,返回的是一个table的数组 for table_name in sql_parse.tables: if table_name in where_table_set: if sql_parse.columns_dict: # 该方法会返回SQL对应的字段,其中分为select, join, where等,这里只用到了where for where_column in sql_parse.columns_dict.get("where", []): # 如果连表,里面存的是类似于t_demo.tenant_id,所以要兼容这一个情况 if "tenant_id" in where_column.lower().split("."): check_flag = True break if not check_flag: # 检查不通过就抛错 raise RuntimeError() # 更换表名的逻辑 replace_table_set: Set[str] = context.replace_table_set new_query: str = query for table_name in sql_parse.tables: if table_name in replace_table_set: new_query = "" # tokens存放着解析完的数据,比如SELECT * FROM t_demo解析后是 # [SELECT, *, FROM, t_demo]四个token for token in sql_parse.tokens: # 判断token是否是表名 if token.is_potential_table_name: # 提取规范的表名 parse_table_name: str = token.stringified_token.strip() if parse_table_name in replace_table_set: new_table_name: str = f" {parse_table_name}_{tenant_id}" # next_token代表SQL的下一个字段 if token.next_token.normalized != "AS": # 如果当前表没有设置别名 # 通过AS把替换前的表名设置为新表名的别名,这样一来后面的表名即使没进行更改,也是能读到对应商户ID的表 new_table_name += f" AS {parse_table_name}" query += new_table_name continue # 通过stringified_token获取的数据会自动带空格,比如`FROM`得到的会是` FROM`,这样拼接的时候就不用考虑是否加空格了 new_query += token.stringified_token mogrify_sql: str = super().mogrify(new_query, args) # 在此可以编写处理合成后的SQL逻辑 return mogrify_sql
这份代码十分简单,它只做简单介绍,事实上这段逻辑会应用到所有的SQL
查询中,我们应该要保证这段代码是没问题的,同时不要有太多的性能浪费,所以在使用的时候要考虑到代码拆分和优化。 比如在使用的过程中可以发现,我们的SQL
转换和检查都是在父类的Cursor.mogrify
之前进行的,这就意味着不管我们代码逻辑里cursor.execute
传的参数是什么,对于同一个代码逻辑来说,传过来的query
值是保持不变的,比如下面的代码:
def get_user_info(uid: str) -> Dict[str, Any]: with conn.cursor() as cursor: cursor.execute("SELECT * FROM t_user WHERE uid=%(uid)s", {"uid": uid}) return cursor.fetchone() or {}
这段代码中传到Cursor.mogrify
的query永远为SELECT * FROM t_user WHERE uid=%(uid)s,有变化的只是args中uid的不同。 有了这样的一个前提条件,那么我们就可以把query
的校验结果和转换结果缓存下来,减少每次都需要解析SQL
再校验造成的性能浪费。至于如何实现缓存则需要根据自己的项目来决定,比如项目中只有几百个SQL
执行,那么直接用Python
的dict
来存放就可以了,如果项目中执行的SQL
很多,同时有些执行的频率非常的高,有些执行的频率非常的低,那么可以考虑使用LRU
来缓存。
以上是如何在Python中根據運行時修改業務SQL程式碼?的詳細內容。更多資訊請關注PHP中文網其他相關文章!