java - 如何提高大量的字符串比较速度
PHP中文网
PHP中文网 2017-04-18 09:59:45
0
17
1440
PHP中文网
PHP中文网

认证高级PHP讲师

全部回覆(17)
PHPzhong

寫一個思路

candidates = [
    'GGGAGCAGGCAAGGACTCTG',
    'GCTCGGGCTTGTCCACAGGA',
    '...',
    # 被你看出来啦,这些其实人类基因的片段
]

bg_db = [
    'CTGCTGACGGGTGACACCCA',
    'AGGAACTGGTGCTTGATGGC',
    '...',
    # 这个更多,有十亿左右
]

因為你的數據其實是很有特色的,這裡可以進行精簡。
因為所有的字串都是20個字元長度,而且都由ATCG四個字元組成。那麼可以把它們變換為整數來進行比較。
二進位表現形式如下

A  ==>  00
T  ==>  01
C  ==>  10
G  ==>  11

因為一個字串長度固定,每個字元可以由2個位元位表示,所以每個字串可以表示為一個40位的整数。可以表示为32+8的形式,也可以直接使用64位整形。建议使用C語言來做。

再來說說比較。
因為要找到每一個candidate在bg_db中與之差異小於等於4的所有記錄,所以只要兩個整數一做^按位異或操作,結果中二進位中1不超過8個,且這不超過8個1最多只能分為4個組的才有可能是符合要求的(00^11 =11,10^01=11)。 每一个candidate在bg_db中与之差异小于等于4的所有记录,所以只要两个整数一做^按位异或操作,结果中二进制中1不超过8个,且这不超过8个1最多只能分为4个组的才有可能是符合要求的(00^11=11,10^01=11)。
把结果的40个比特位分作20个组,那么就是说最多只有4个组为b01 b10 b11这三个值,其余的全部为b00
那么比较算法就很好写了。
可以对每个字节(四个组)获取其中有几个组是为三个非零值的,来简介获取整体的比较结果。
因为每个字节只有256种可能的值,而符合条件的值只有3^4=81把結果的40個位元位元分作20個組,那麼就是說最多只有4個組為b01 b10 b11這三個值,其餘的全部為b00那麼比較演算法就很好寫了。
可以對每個位元組(四個組)取得其中有幾個組是為三個非零值的,來簡介取得整體的比較結果。

因為每個位元組只有256種可能的值,而符合條件的值只有3^4=81種🎜,所以可以先儲存結果起來,然後進行取得。 🎜這裡給一個函數,來取得結果中有幾個是非零組。 🎜
/*****************下面table中值的生成******//**
  int i;
  for( i=0;i<256;++i){
    int t =0;
    t += (i&0x01 || i&0x02)?1:0;
    t += (i&0x04 || i&0x08)?1:0;
    t += (i&0x10 || i&0x20)?1:0;
    t += (i&0x40 || i&0x80)?1:0;
    printf("%d,",t);
    if(i%10 ==9){putchar('\n');}
  }
********************************************//

int table[] = {
0,1,1,1,1,2,2,2,1,2,
2,2,1,2,2,2,1,2,2,2,
2,3,3,3,2,3,3,3,2,3,
3,3,1,2,2,2,2,3,3,3,
2,3,3,3,2,3,3,3,1,2,
2,2,2,3,3,3,2,3,3,3,
2,3,3,3,1,2,2,2,2,3,
3,3,2,3,3,3,2,3,3,3,
2,3,3,3,3,4,4,4,3,4,
4,4,3,4,4,4,2,3,3,3,
3,4,4,4,3,4,4,4,3,4,
4,4,2,3,3,3,3,4,4,4,
3,4,4,4,3,4,4,4,1,2,
2,2,2,3,3,3,2,3,3,3,
2,3,3,3,2,3,3,3,3,4,
4,4,3,4,4,4,3,4,4,4,
2,3,3,3,3,4,4,4,3,4,
4,4,3,4,4,4,2,3,3,3,
3,4,4,4,3,4,4,4,3,4,
4,4,1,2,2,2,2,3,3,3,
2,3,3,3,2,3,3,3,2,3,
3,3,3,4,4,4,3,4,4,4,
3,4,4,4,2,3,3,3,3,4,
4,4,3,4,4,4,3,4,4,4,
2,3,3,3,3,4,4,4,3,4,
4,4,3,4,4,4
};

int getCount(uint64_t cmpresult)
{
    uint8_t* pb = &cmpresult;    // 这里假设是小段模式,且之前比较结果是存在低40位
    return table[pb[0]]+table[pb[1]]+table[pb[2]]+table[pb[3]]+table[pb[4]];
}
阿神

首先,你的時間估算完全不對,這種大規模的數據量處理,好歹跑個幾萬條,持續十秒以上的時間,才能拿來做乘法算總時間,只算一條的話,這個時間幾乎都是初始化進程的開銷,而非關鍵的IO、CPU開銷

以下正文

ACTG四種可能性相當於2bit,用一個字元表示一個基因位太過浪費了,一個字元8bit,可以放4個基因位

即使不用任何演算法,只是把你的20個基因寫成二進位形式,也能節省5倍時間

另外,循環20次,CPU的指令數是20*n條,n估計至少有3,但對於二進位來說,做比較的異或運算直接是cpu指令,指令數是1

巴扎黑

演算法不是很了解 但是就經驗來說 複雜的演算法反而耗時更久 不如這種簡單粗暴來的迅速

可以考慮下多執行緒和叢集來處理資料

對了 還有漢明距離似乎可以算這個

伊谢尔伦

同樣沒有使用演算法,暴力解法,用c寫的

在我的機器上(CPU: Core 2 Duo E7500, RAM: 4G, OS: Fedora 19),測試結果

candidates    bg.db        cost
10000    1000000    318758165微秒
500      1000000    14950302微秒 

如果換成題主的24核心CPU,怎麼也得有20倍的效能提升,然後再加上48台機器一起運算,5000W次運算為15s, 時間為
10000000 * 1000000000 / 500 / 1000 / 20 / 48 / 3600 / 24 = 3.616898 天

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <sys/time.h>

#define START_CC(flag)  \
    struct timeval st_##flag##_beg; \
    gettimeofday(&st_##flag##_beg, 0)

#define STOP_CC(flag)   \
    struct timeval st_##flag##_end; \
    gettimeofday(&st_##flag##_end, 0)

#define PRINT_CC(flag)  \
    double cost_##flag = 0.0L; \
    cost_##flag = (double)(st_##flag##_end.tv_sec - st_##flag##_beg.tv_sec); \
    cost_##flag = cost_##flag * 1000000L + (double)(st_##flag##_end.tv_usec - st_##flag##_beg.tv_usec);    \
    printf(#flag" cost time %.6lf microsecond.\n", cost_##flag);

#define GENEORDER_CODE_LENGTH 20 + 1

typedef struct GeneOrder
{
    char code[GENEORDER_CODE_LENGTH];
}GeneOrder, *GeneOrderPtr;

typedef struct GOArray
{
    size_t          capacity;
    size_t          length;
    GeneOrderPtr    data;
}GOArray;

GOArray createGOAarray(size_t capacity)
{
    GOArray goa;

    goa.capacity    = capacity;
    goa.length      = 0;
    goa.data        = (GeneOrderPtr)malloc(capacity * sizeof(GeneOrder));

    return goa;
}

void destroyGOArray(GOArray* goa)
{
    if (goa->capacity > 0) {
        free(goa->data);
    }
}

bool readGOFile(char const *file, GOArray *goarray)
{
    FILE* fp = NULL;

    if ((fp = fopen(file, "r+")) == NULL) {
        return false;
    }

    char buff[64];

    while (fgets(buff, 64, fp) != NULL) {
        if (goarray->length < goarray->capacity) {
            memcpy(goarray->data[goarray->length].code,
                buff,
                GENEORDER_CODE_LENGTH * sizeof(char)
            );
            goarray->data[goarray->length].code[GENEORDER_CODE_LENGTH - 1] = '
gcc -Wall -Wextra -o ccs main.c -std=c99 -Os && ./ccs candidate.list bg.db
'; goarray->length ++; } else { fclose(fp); return true; } } fclose(fp); return true; } int main(int argc, char* argv[]) { (void)argc; GOArray condgo = createGOAarray(10000); GOArray bggo = createGOAarray(1000000); printf("loading ...\n"); START_CC(loading); if (!readGOFile(argv[1], &condgo) || !readGOFile(argv[2], &bggo)) { destroyGOArray(&condgo); destroyGOArray(&bggo); return -1; } STOP_CC(loading); PRINT_CC(loading); int count = 0; START_CC(compare); for (size_t i = 0;i < 500;i ++) { const GeneOrderPtr gop = condgo.data + i; for (size_t j = 0;j < bggo.length;j ++) { const GeneOrderPtr inner_gop = bggo.data + j; int inner_count = 0; for (size_t k = 0;k < 20;k ++) { if (gop->code[k] != inner_gop->code[k]) { if (++inner_count > 4) { break; } } } if (inner_count <= 4) { #ifdef DBGPRINT printf("%d %s - %s\n", i, gop->code, inner_gop->code); #endif count++; } } } STOP_CC(compare); PRINT_CC(compare); printf("result = %d\n", count); destroyGOArray(&condgo); destroyGOArray(&bggo); return 0; }

編譯參數&運行

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <sys/time.h>
#include <pthread.h>

#define START_CC(flag)              \
    struct timeval st_##flag##_beg; \
    gettimeofday(&st_##flag##_beg, 0)

#define STOP_CC(flag)               \
    struct timeval st_##flag##_end; \
    gettimeofday(&st_##flag##_end, 0)

#define PRINT_CC(flag)                                                                                  \
    double cost_##flag = 0.0L;                                                                          \
    cost_##flag = (double)(st_##flag##_end.tv_sec - st_##flag##_beg.tv_sec);                            \
    cost_##flag = cost_##flag * 1000000L + (double)(st_##flag##_end.tv_usec - st_##flag##_beg.tv_usec); \
    printf(#flag " cost time %.6lf microsecond.\n", cost_##flag);

#define GENEORDER_CODE_LENGTH 20 + 1

typedef struct GeneOrder {
    char code[GENEORDER_CODE_LENGTH];
} GeneOrder, *GeneOrderPtr;

typedef struct GOArray {
    size_t capacity;
    size_t length;
    GeneOrderPtr data;
} GOArray;

GOArray createGOAarray(size_t capacity)
{
    GOArray goa;

    goa.capacity = capacity;
    goa.length = 0;
    goa.data = (GeneOrderPtr)malloc(capacity * sizeof(GeneOrder));

    return goa;
}

void destroyGOArray(GOArray* goa)
{
    if (goa->capacity > 0) {
        free(goa->data);
    }
}

bool readGOFile(char const* file, GOArray* goarray)
{
    FILE* fp = NULL;

    if ((fp = fopen(file, "r+")) == NULL) {
        return false;
    }

    char buff[64];

    while (fgets(buff, 64, fp) != NULL) {
        if (goarray->length < goarray->capacity) {
            memcpy(goarray->data[goarray->length].code, buff,
                GENEORDER_CODE_LENGTH * sizeof(char));
            goarray->data[goarray->length].code[GENEORDER_CODE_LENGTH - 1] = '
gcc -Wall -Wextra -o ccs main.c -std=c99 -O3 -lpthread && ./ccs candidate.list bg.db
'; goarray->length++; } else { fclose(fp); return true; } } fclose(fp); return true; } typedef struct ProcessST { GOArray* pcond; GOArray* pbg; size_t beg; size_t end; // [beg, end) } ProcessST; void* processThread(void* parg) { ProcessST* ppst = (ProcessST*)parg; GOArray* pcond = ppst->pcond; GOArray* pbg = ppst->pbg; int count = 0; for (size_t i = ppst->beg; i < ppst->end; i++) { const GeneOrderPtr gop = pcond->data + i; for (size_t j = 0; j < pbg->length; j++) { const GeneOrderPtr inner_gop = pbg->data + j; int inner_count = 0; for (size_t k = 0; k < 20; k++) { if (gop->code[k] != inner_gop->code[k]) { if (++inner_count > 4) { break; } } } if (inner_count <= 4) { #ifdef DBGPRINT printf("%d %s - %s\n", i, gop->code, inner_gop->code); #endif count++; } } } return (void*)count; } int main(int argc, char* argv[]) { (void)argc; GOArray condgo = createGOAarray(10000); GOArray bggo = createGOAarray(1000000); printf("loading ...\n"); START_CC(loading); if (!readGOFile(argv[1], &condgo) || !readGOFile(argv[2], &bggo)) { destroyGOArray(&condgo); destroyGOArray(&bggo); return -1; } STOP_CC(loading); PRINT_CC(loading); size_t range[] = { 0, 250, 500 }; pthread_t thr[2] = { 0 }; ProcessST pst[2]; START_CC(compare); for (size_t i = 0; i < 2; i++) { pst[i].pcond = &condgo; pst[i].pbg = &bggo; pst[i].beg = range[i]; pst[i].end = range[i + 1]; pthread_create(&thr[i], NULL, processThread, &pst[i]); } int count = 0; int* ret = NULL; for (size_t i = 0; i < 2; i++) { pthread_join(thr[i], (void**)&ret); count += (int)ret; } STOP_CC(compare); PRINT_CC(compare); printf("result = %d\n", count); destroyGOArray(&condgo); destroyGOArray(&bggo); return 0; }

如果改成多線程的話速度會更快一些,在我的機器改為2个线程简单使用500条candidates测试,速度可以提升到9040257微秒,线程增加到4個性能提升就不是很大了,但是較新的CPU都具有超線程技術,速度估計會更好一些。 。 。

rrreee

編譯測驗

rrreee
巴扎黑

抱歉,今天看到還有人回覆。
仔細看了一下問題,發現我以前以為只是匹配。
所以我提出用ac自動機。

但是題主是為了找出序列的差異。
這就是找兩者的編輯距離。
wiki:編輯距離
wiki:來文斯坦距離

以前刷OJ的時候是使用DP(動態規劃)來找一個字串轉換成另外一個字串的最少編輯次數。

for(i:0->len)
    for(j:0->len)
        str1[i]==str2[j] ? cost=0 : cost=1
        dp[i,j]=min(
            dp[i-1, j  ] + 1,     // 刪除
            dp[i  , j-1] + 1,     // 插入
            dp[i-1, j-1] + cost   // 替換
        )

比如 :

str1    "abcdef"
str2    "acddff"

str2 轉換為 str1

插入b 算一次
刪除d 算一次
修改f 算一次

對於題主的ATCG基因序列來說,是不是只要找到修改的就行了。
然而像這種
ATCGATCG
TCGATCGA
這樣該怎麼算。

如果只是找到修改的話,直接比較 str1[i] 和 str2[i] 就可以了。

for(i:0->len)
if(str1[i]!=str2[i] )++count;

受到@rockford 的啟發。
我們可以對 原始資料 進行預處理。

candidates 中的串
GGGAGCAGGCAAGGACTCTG
A5 T2 C4 G9

進行處理後的額外資料 A5 T2 C4 G9

bg_db 中的字串
CTGCTGACGGGTGACACCCA
A4 T3 C7 G6

進行處理後的額外資料 A4 T3 C7 G6

A5 -> A4 記作 -1
T2 -> T3 記作 +1
C4 -> C7 記作 +3
G9 -> G6 記作 -3

很明顯 A 如果修改只能變成 TCG。
同理,我們只需要統計所有的+ 或所有的 -
就可以知道他們的至少有多少不同之處。
大於4的都可以不進行比較。

透過先比較預處理的額外數據,然後再透過單次的比較演算法來 進行比對。
(星期六加班-ing,下班後寫)

Peter_Zhu

你單一的任務是確定的,需要的是把這些任務下發給 worker 去做,對於這樣的計算都不是同步單進程進行的。
其實等於你有[a,b] 和 [c, d] 要對比,你的任務是

  1. [a, c]

  2. [a, d]

  3. [b, c]

  4. [b, d]

如果你是同步串列你需要的時間就是 4 * 單一時間
如果是你 4 個 cpu 或 4 個 機器並行, 你的時間差不多是單一時間

所以對於像基因組這樣的計算基本上都是用大型機器多核並行的任務來完成,基本上參考的原理都是 google MapReduce 這篇論文的原理

黄舟

演算法我不行,但是,像你這樣的大量數據,一台電腦對比肯定是不行的,像你這樣數據CPU密集型任務,同意其他人說的使用集群或者多進程的方式來計算,也就是我們用map-reduce的模型去計算
map就是映射,你先將你每個candidates一個一個映射到bg_db形成類似這樣的資料結構(candidates,bg_db)
做成隊列然後交給不同的伺服器,每個伺服器用多進程去計算,只能這樣了,但是你這個資料量太大了,想辦法把你的任務分配好,並行計算吧,

Ty80

可以嘗試用字典樹來保存所有的字串。然後在查詢的時候就可以用在字典樹上遍歷的方法。
在樹上遍歷的時候,可以維護一個目前節點的序列,這個序列裡保存著目前遍歷到的節點和對應節點mismatch的數量。
在遍歷下一個節點的時候,要把目前序列裡所有的節點都嘗試向下,並形成新的節點序列。
好處是可以把很多串的當前位放在一起比較,可以節省一些時間。由於每個位置選擇不多,mismatch也不大,所有應該不會出現當前節點序列膨脹過大的情況。 (這是猜想… 沒太認真驗證過…)

def match(candidate, root):
  nset = [[],[]]
  currentId = 0
  nset[currentId].append((root, 0))
  for ch in candidate:
    nextId = 1 - currentId
    for item in nset[currentId]:
      node, mismatch = item
      for nx in node.child:
        if nx.key == ch:
          nset[nextId].append((nx, mismatch))
        else:
          if mismatch:
            nset[nextId].append((nx, mismatch - 1))
    nset[currentId].clear()
    currentId = 1 - currentId
  return nset[currentId]

上面的程式碼就是一個大概的意思。如果用C++寫的話會再快很多。
整個過程都可以用叢集做分散式計算。

Ty80

目測題主沒有多台機器供他計算,
我有一個樸素的思路,計算每個串的字母序數和(A:0,T:1,C:2,G:3),先計算兩個字串的序數和的差值,最大不能超過12,四個A變成四個G,差值小於12的再進行處理,
只是一個大概的想法,具體的權值可以另外設置,理論上可以快很多。
另外有一個演算法是計算字串編輯距離(將一個字串修改為另一個字串的最少編輯次數增、刪、改)的,我一下子想不起來,你可以自行查一下。

巴扎黑

我用blast blastn-short

:)

熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板