背景
在PHP中使用Memcache或者Redis时,我们一般都会对Memcache和Redis封装一下,单独完成写一个Cache类,作为Memcache活着Redis的代理,且一般为单例模式。在业务代码中,使用Cache类时,操作的基本的示例代码如下
// cache 的 key $key = 'this is key'; $expire = 60;// 超时时间 // cache 的实例 $cache = Wk_Cache::instance(); $data = $cache->fetch($key); // 判断data if(empty($data)){ // 如果为空,调用db方法 $db = new Wk_DB(); $data = $db->getXXX(); $cache->store($key, $data, $expire); } // 处理$data相关数据 return $data;
基本流程为
第一步,先组装查询key,到Cache查询Value,如果存在,继续处理,进入第三步;如果不存在,进入第二步
第二步,根据请求,到DB中,查询相关数据,如果数据存在,把数据放到Cache中
第三步,处理cache或者db中返回的数据
问题
上述流程基本上会出现在每次调用Cache的部分,先cache查询,没有的话调用DB或者第三方接口,获取数据,再次存入Cache,继续数据处理。多次调用,既是一种问题,应该把这种查询方式封装到更底层的方法内。而不是每次重复这样的逻辑,除了封装的问题外,还有其他问题,我们统一列举下
第一:从设计角度来说 重复代码,需要更底层逻辑封装。
第二:key的组装,麻烦繁琐,实际情况,可能会把各种参数组装进去,维护的时候,不敢修改。
第三:设置的expire超时时间,会分散在各处逻辑代码中,最终很难统计Cache缓存时间的情况。
第四:由于要把cache->store方法放到调用db之后执行,如果db后,还有其他逻辑处理,有可能会忘掉把数据放入cache,导致数据。
第五:在高并发系统中,cache失效那一刻,会有大量请求直接穿透到后方,导致DB或者第三方接口压力陡升,响应变慢,进一步影响系统稳定性,这一现象为“Dogpile”。
以上问题中,最简单的为2,3种,对于expire超时时间分散的问题,我们可以通过统一配置文件来解决,比如我们可以创建这样的一个配置文件。
“test"=>array( // namespace,方便分组 "keys"=> array( “good”=>array( // 定义的key,此key非最终入cache的key,入key需要和params组装成唯一的key "timeout"=>600, // 此处定义超时时间 "params"=>array("epid"=>1,"num"=>1), // 通过这种方法,描述需要传递参数,用于组装最终入cache的key "desc"=>"描述" ), "top_test"=>array( // 定义的key,此key非最终入cache的key,入key需要和params组装成唯一的key "timeout"=>60, // 此处定义超时时间 "ttl"=>10, // 自动触发时间 "params"=>array('site_id'=>1,'boutique'=>1,'offset'=>1,'rows'=> 1,'uid'=>1,'tag_id'=>1,'type'=>1), // 通过这种方法,描述需要传递参数,用于组装最终入cache的key "desc"=>"描述", "author"=>"ugg", ), ) )
如上所示,通过一个算法,我们可以把site_top_feeds和params组装成唯一的入库key,组装后的key,大概是这样site_top_feeds_site_id=12&boutique=1&offset=0&rows=20&uid=&tag_id=0&type=2通过这种方式,我们避免工人自己组装key,从而杜绝第二种问题,同时在这个配置文件中,我们也设置了timeout,这样调用store时,我们可以直接从配置文件中读取,从而避免第三个问题。经过如上修改后,我们的cache方法,也做了适当的调整,调用示例如下。
$siteid = 121; $seminal = 1; $tag_id = 12; $tag_id = 22; $data = fetch(‘site_top_feeds’,array('site_id'=>$siteid,'boutique'=>$seminal, 'offset'=>"0", 'rows' => "20", 'uid' =>null,’tag_id’=>$tag_id,’type'=>$type),'feed'); if(empty($data)){ // db相关操作 $db = new Wk_DB(); $data = $db->getTopFeeds($site_id,$seminal,0,20,null,$tag_id,$type); // $data数据其他处理逻辑 这里 …… $cache->store(‘site_top_feeds’,$data,array(‘site_id'=>$siteid,'boutique'=>$seminal, 'offset'=>"0", 'rows' => "20", 'uid' =>null,’tag_id’=>$tag_id,’type'=>$type),'feed'); }
通过以上方案,我没看到,timeout超时时间没有了,key的组装也没有了,对于外层调用是透明的了。但是我们通过配置文件可以知道site_top_feeds的timeout是多少,通过封装的算法,知道组装的key是什么样的。
这种方式,并没有解决第一和第四的问题,封装性;要想完成封装性,第一件事情要做的就是回调函数,作为校本语言,PHP并没有完善的函数指针概念,当然要想执行一个函数其实也不需要指针。PHP支持回调函数的方法有两种call_user_func,call_user_func_array。
但是,我做了两个例子,发现上述方法,执行效率比原生方法,相差很多
native:0.0097959041595459s call_user_func:0.028249025344849s call_user_func_array:0.046605110168457s
例子代码如下:
$s = microtime(true); for($i=0; $i< 10000 ; ++$i){ $a = new a(); $data = $a->aaa($array, $array, $array); $data = a::bbb($array, $array, $array); } $e = microtime(true); echo "native:".($e-$s)."s\n"; $s = microtime(true); for($i=0; $i< 10000 ; ++$i){ $a = new a(); $data = call_user_func(array($a,'aaa'),$array,$array,$array); $data = call_user_func(array('a','bbb'),$array,$array,$array); } $e = microtime(true); echo "call_user_func:".($e-$s)."s\n"; $s = microtime(true); for($i=0; $i< 10000 ; ++$i){ $a = new a(); $data = call_user_func_array(array($a,'aaa'),array(&$array,&$array,&$array)); $data = call_user_func_array(array('a','bbb'),array(&$array,&$array,&$array)); } $e = microtime(true); echo “call_user_func_array:".($e-$s)."s\n";
在PHP中,知道一个对象和方法,其实调用方法很简单,比如上面的例子
$a = new a(); $data = $a->aaa($array, $array, $array); $obj = $a; $func = ‘aaa’; $params = array($array,$array,$array); $obj->$func($params[0],$params[1],$params[2]); // 通过这种方式可以直接执行
这种方式的执行性能怎么样,经过我们对比测试发现
native:0.0092940330505371s call_user_func:0.028635025024414s call_user_func_array:0.048038959503174s my_callback:0.11308288574219s
switch(count($params)){ case 0: $result = $obj->{$func}();break; case 1: $result = $obj->{$func}($params[0]);break; case 2: $result = $obj->{$func}($params[0],$params[1]);break; case 3: $result = $obj->{$func}($params[0],$params[1],$params[2]);break; case 4: $result = $obj->{$func}($params[0],$params[1],$params[2],$params[3]);break; case 5: $result = $obj->{$func}($params[0],$params[1],$params[2],$params[3],$params[4]);break; case 6: $result = $obj->{$func}($params[0],$params[1],$params[2],$params[3],$params[4],$params[5]);break; case 7: $result = $obj->{$func}($params[0],$params[1],$params[2],$params[3],$params[4],$params[5],$params[6]);break; default: $result = call_user_func_array(array($obj, $func), $params); break; }
完成以上准备工作后,就可以使用回调机制了,再次调用的业务代码
…. // 相关变量赋值 $db = new Wk_DB(); $callback['obj'] = $db; $callback['func'] = 'getTopFeeds'; $callback['params'] = array('site_id'=>$siteid,'boutique'=>$seminal, 'offset'=>"0", 'rows' => "20", 'uid' =>null,'tag_id'=>$tag_id,'type'=>$type); $top_feed_list = $cache->smart_fetch('site_top_feeds',$callback,'feed');
… // 变量初始化 $key = “this is key”; $expiration = 600; $recalculate_at = 100; $lock_length = 20; $data = $cache->fetch($key); $ttl = $cache->redis->ttl($key); if($recalculate_at>=$ttl&&$r->setnx("lock:".$key,true)){ $r->expire(“lock:”.$key, $lock_length); $db = new Wk_DB(); $data = $db->getXXX(); $cache->store($key, $expiration, $value); }
解决方案
好了,关键核心代码如下
1:function回调部分代码
public static function callback($callback){ // 安全检查 if(!isset($callback['obj']) || !isset($callback['func']) || !isset($callback['params']) || !is_array($callback['params'])){ throw new Exception("CallBack Array Error"); } // 利用反射,判断对象和函数是否存在 $obj = $callback['obj']; $func = $callback['func']; $params = $callback['params']; // 方法判断 $method = new ReflectionMethod($obj,$func); if(!$method){ throw new Exception("CallBack Obj Not Find func"); } // 方法属性判断 if (!($method->isPublic() || $method->isStatic())) { throw new Exception("CallBack Obj func Error"); } // 参数个数判断(不进行逐项检测) $paramsNum = $method->getNumberOfParameters(); if($paramsNum < count($params)){ throw new Exception("CallBack Obj Params Error"); } // 6个参数以内,逐个调用,超过6个,直接调用call_user_func_array $result = false; // 判断静态类方法 if(!is_object($obj) && $method->isStatic()){ switch(count($params)){ case 0: $result = $obj::{$func}();break; case 1: $result = $obj::{$func}($params[0]);break; case 2: $result = $obj::{$func}($params[0],$params[1]);break; case 3: $result = $obj::{$func}($params[0],$params[1],$params[2]);break; case 4: $result = $obj::{$func}($params[0],$params[1],$params[2],$params[3]);break; case 5: $result = $obj::{$func}($params[0],$params[1],$params[2],$params[3],$params[4]);break; case 6: $result = $obj::{$func}($params[0],$params[1],$params[2],$params[3],$params[4],$params[5]);break; case 7: $result = $obj::{$func}($params[0],$params[1],$params[2],$params[3],$params[4],$params[5],$params[6]);break; default: $result = call_user_func_array(array($obj, $func), $params); break; } }else{ switch(count($params)){ case 0: $result = $obj->{$func}();break; case 1: $result = $obj->{$func}($params[0]);break; case 2: $result = $obj->{$func}($params[0],$params[1]);break; case 3: $result = $obj->{$func}($params[0],$params[1],$params[2]);break; case 4: $result = $obj->{$func}($params[0],$params[1],$params[2],$params[3]);break; case 5: $result = $obj->{$func}($params[0],$params[1],$params[2],$params[3],$params[4]);break; case 6: $result = $obj->{$func}($params[0],$params[1],$params[2],$params[3],$params[4],$params[5]);break; case 7: $result = $obj->{$func}($params[0],$params[1],$params[2],$params[3],$params[4],$params[5],$params[6]);break; default: $result = call_user_func_array(array($obj, $func), $params); break; } }
public function smart_fetch($key,$callback,$namespace="wk") { key = $prefix.$key.$suffix; $result = $this->_redis->get($key); $bttl = false; // ttl状态判断(注意冷启动) if(!empty($ttl)){ // 获得过期时间 $rttl = $this->_redis->ttl($key); if($rttl > 0 && $ttl >= $rttl && $this->_redis->setnx("lock".$key,true)){ // 设置超时时间(超时时间3秒) $this->_redis->expire("lock".$key,3); $bttl = true; } } // 如何返回值不存在,调用回调函数,获取数值,并保持数据库 if($bttl || !$result || (isset($CONFIG['FLUSH']) && !empty($CONFIG['FLUSH']))){ // 重新调整参数 $callbackparams = array(); foreach($params as $k=>$value){ $callbackparams[] = $value; } $callback['params'] = $callbackparams; $result = Wk_Common::callback($callback); $expire = $key_config["timeout"]; // 存储数据 $status = $this->_redis->setex($key, $expire, $result); $result=$this->_redis->get($key); } // 删除锁 if($bttl){ $this->_redis->delete("lock".$key); } return $result; }
至此,我们使用脚本语言特性,通过user_call_func_array方法补齐所有函数回调机制,从而实现对Cache的封装,通过配置文件定义组装key的规则和每个key的超时时间,再通过Redis的ttl和setnx特性,保证只有一个进程执行DB操作,从而很好避免dogpile问题,实现cache自动触发,保证cache持续存在数据,并且有效减少DB的访问次数,提高性能。