java - 如何提高大量的字符串比较速度
PHP中文网
PHP中文网 2017-04-18 09:59:45
0
17
1439
PHP中文网
PHP中文网

认证高级PHP讲师

Antworte allen(17)
PHPzhong

写一个思路

candidates = [
    'GGGAGCAGGCAAGGACTCTG',
    'GCTCGGGCTTGTCCACAGGA',
    '...',
    # 被你看出来啦,这些其实人类基因的片段
]

bg_db = [
    'CTGCTGACGGGTGACACCCA',
    'AGGAACTGGTGCTTGATGGC',
    '...',
    # 这个更多,有十亿左右
]

因为你的数据其实是很有特点的,这里可以进行精简。
因为所有的字符串都是20个字符长度,且都由ATCG四个字符组成。那么可以把它们变换为整数来进行比较。
二进制表现形式如下

A  ==>  00
T  ==>  01
C  ==>  10
G  ==>  11

因为一个字符串长度固定,每个字符可以由2个比特位表示,所以每个字符串可以表示为一个40位的整数。可以表示为32+8的形式,也可以直接使用64位整形。建议使用C语言来做。

再来说说比较。
因为要找到每一个candidate在bg_db中与之差异小于等于4的所有记录,所以只要两个整数一做^按位异或操作,结果中二进制中1不超过8个,且这不超过8个1最多只能分为4个组的才有可能是符合要求的(00^11=11,10^01=11)。
把结果的40个比特位分作20个组,那么就是说最多只有4个组为b01 b10 b11这三个值,其余的全部为b00
那么比较算法就很好写了。
可以对每个字节(四个组)获取其中有几个组是为三个非零值的,来简介获取整体的比较结果。
因为每个字节只有256种可能的值,而符合条件的值只有3^4=81,所以可以先将结果存储起来,然后进行获取。
这里给出一个函数,来获取结果中有几个是非零组。

/*****************下面table中值的生成******//**
  int i;
  for( i=0;i<256;++i){
    int t =0;
    t += (i&0x01 || i&0x02)?1:0;
    t += (i&0x04 || i&0x08)?1:0;
    t += (i&0x10 || i&0x20)?1:0;
    t += (i&0x40 || i&0x80)?1:0;
    printf("%d,",t);
    if(i%10 ==9){putchar('\n');}
  }
********************************************//

int table[] = {
0,1,1,1,1,2,2,2,1,2,
2,2,1,2,2,2,1,2,2,2,
2,3,3,3,2,3,3,3,2,3,
3,3,1,2,2,2,2,3,3,3,
2,3,3,3,2,3,3,3,1,2,
2,2,2,3,3,3,2,3,3,3,
2,3,3,3,1,2,2,2,2,3,
3,3,2,3,3,3,2,3,3,3,
2,3,3,3,3,4,4,4,3,4,
4,4,3,4,4,4,2,3,3,3,
3,4,4,4,3,4,4,4,3,4,
4,4,2,3,3,3,3,4,4,4,
3,4,4,4,3,4,4,4,1,2,
2,2,2,3,3,3,2,3,3,3,
2,3,3,3,2,3,3,3,3,4,
4,4,3,4,4,4,3,4,4,4,
2,3,3,3,3,4,4,4,3,4,
4,4,3,4,4,4,2,3,3,3,
3,4,4,4,3,4,4,4,3,4,
4,4,1,2,2,2,2,3,3,3,
2,3,3,3,2,3,3,3,2,3,
3,3,3,4,4,4,3,4,4,4,
3,4,4,4,2,3,3,3,3,4,
4,4,3,4,4,4,3,4,4,4,
2,3,3,3,3,4,4,4,3,4,
4,4,3,4,4,4
};

int getCount(uint64_t cmpresult)
{
    uint8_t* pb = &cmpresult;    // 这里假设是小段模式,且之前比较结果是存在低40位
    return table[pb[0]]+table[pb[1]]+table[pb[2]]+table[pb[3]]+table[pb[4]];
}
阿神

首先,你的时间估算完全不对,这种大规模的数据量处理,好歹跑个几万条,持续十秒以上的时间,才能拿来做乘法算总时间,只算一条的话,这个时间几乎都是初始化进程的开销,而非关键的IO、CPU开销

以下正文

ACTG四种可能性相当于2bit,用一个字符表示一个基因位太过浪费了,一个字符8bit,可以放4个基因位

即使不用任何算法,只是把你的20个基因写成二进制形式,也能节省5倍时间

另外,循环20次,CPU的指令数是20*n条,n估计至少有3,但对于二进制来说,做比较的异或运算直接是cpu指令,指令数是1

巴扎黑

算法不是很了解 但是就经验来说 复杂的算法反而耗时更久 不如这种简单粗暴来的迅速

可以考虑下多线程和集群来处理数据

对了 还有汉明距离貌似可以算这个

伊谢尔伦

同样没有使用算法,暴力解法,用c写的

在我的机器上(CPU: Core 2 Duo E7500, RAM: 4G, OS: Fedora 19),测试结果

candidates    bg.db        cost
10000    1000000    318758165微秒
500      1000000    14950302微秒 

如果换成题主的24核CPU,怎么也得有20倍的性能提升,然后再加上48台机器一起运算,5000W次运算为15s, 时间为
10000000 * 1000000000 / 500 / 1000000 * 15 / 20 / 48 / 3600 / 24 = 3.616898 天

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <sys/time.h>

#define START_CC(flag)  \
    struct timeval st_##flag##_beg; \
    gettimeofday(&st_##flag##_beg, 0)

#define STOP_CC(flag)   \
    struct timeval st_##flag##_end; \
    gettimeofday(&st_##flag##_end, 0)

#define PRINT_CC(flag)  \
    double cost_##flag = 0.0L; \
    cost_##flag = (double)(st_##flag##_end.tv_sec - st_##flag##_beg.tv_sec); \
    cost_##flag = cost_##flag * 1000000L + (double)(st_##flag##_end.tv_usec - st_##flag##_beg.tv_usec);    \
    printf(#flag" cost time %.6lf microsecond.\n", cost_##flag);

#define GENEORDER_CODE_LENGTH 20 + 1

typedef struct GeneOrder
{
    char code[GENEORDER_CODE_LENGTH];
}GeneOrder, *GeneOrderPtr;

typedef struct GOArray
{
    size_t          capacity;
    size_t          length;
    GeneOrderPtr    data;
}GOArray;

GOArray createGOAarray(size_t capacity)
{
    GOArray goa;

    goa.capacity    = capacity;
    goa.length      = 0;
    goa.data        = (GeneOrderPtr)malloc(capacity * sizeof(GeneOrder));

    return goa;
}

void destroyGOArray(GOArray* goa)
{
    if (goa->capacity > 0) {
        free(goa->data);
    }
}

bool readGOFile(char const *file, GOArray *goarray)
{
    FILE* fp = NULL;

    if ((fp = fopen(file, "r+")) == NULL) {
        return false;
    }

    char buff[64];

    while (fgets(buff, 64, fp) != NULL) {
        if (goarray->length < goarray->capacity) {
            memcpy(goarray->data[goarray->length].code,
                buff,
                GENEORDER_CODE_LENGTH * sizeof(char)
            );
            goarray->data[goarray->length].code[GENEORDER_CODE_LENGTH - 1] = '\0';
            goarray->length ++;
        } else {
            fclose(fp);
            return true;
        }
    }

    fclose(fp);
    return true;
}

int main(int argc, char* argv[])
{
    (void)argc;

    GOArray condgo  = createGOAarray(10000);
    GOArray bggo    = createGOAarray(1000000);

    printf("loading ...\n");

    START_CC(loading);
    if (!readGOFile(argv[1], &condgo) || !readGOFile(argv[2], &bggo)) {
        destroyGOArray(&condgo);
        destroyGOArray(&bggo);
        return -1;
    }
    STOP_CC(loading);
    PRINT_CC(loading);


    int count = 0;

    START_CC(compare);
    for (size_t i = 0;i < 500;i ++) {
        const GeneOrderPtr gop = condgo.data + i;
        for (size_t j = 0;j < bggo.length;j ++) {
            const GeneOrderPtr inner_gop = bggo.data + j;
            int inner_count = 0;

            for (size_t k = 0;k < 20;k ++) {
                if (gop->code[k] != inner_gop->code[k]) {
                    if (++inner_count > 4) {
                        break;
                    }
                }
            }

            if (inner_count <= 4) {
            #ifdef DBGPRINT
                printf("%d %s - %s\n", i, gop->code, inner_gop->code);
            #endif
                count++;
            }
        }
    }
    STOP_CC(compare);
    PRINT_CC(compare);

    printf("result = %d\n", count);

    destroyGOArray(&condgo);
    destroyGOArray(&bggo);

    return 0;
}

编译参数&运行

gcc -Wall -Wextra -o ccs main.c -std=c99 -Os && ./ccs candidate.list bg.db

如果改成多线程的话速度会更快一些,在我的机器改为2个线程简单使用500条candidates测试,速度可以提升到9040257微秒,线程增加到4个性能提升就不是很大了,但是较新的CPU都具有超线程技术,速度估计会更好一些。。。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <sys/time.h>
#include <pthread.h>

#define START_CC(flag)              \
    struct timeval st_##flag##_beg; \
    gettimeofday(&st_##flag##_beg, 0)

#define STOP_CC(flag)               \
    struct timeval st_##flag##_end; \
    gettimeofday(&st_##flag##_end, 0)

#define PRINT_CC(flag)                                                                                  \
    double cost_##flag = 0.0L;                                                                          \
    cost_##flag = (double)(st_##flag##_end.tv_sec - st_##flag##_beg.tv_sec);                            \
    cost_##flag = cost_##flag * 1000000L + (double)(st_##flag##_end.tv_usec - st_##flag##_beg.tv_usec); \
    printf(#flag " cost time %.6lf microsecond.\n", cost_##flag);

#define GENEORDER_CODE_LENGTH 20 + 1

typedef struct GeneOrder {
    char code[GENEORDER_CODE_LENGTH];
} GeneOrder, *GeneOrderPtr;

typedef struct GOArray {
    size_t capacity;
    size_t length;
    GeneOrderPtr data;
} GOArray;

GOArray createGOAarray(size_t capacity)
{
    GOArray goa;

    goa.capacity = capacity;
    goa.length = 0;
    goa.data = (GeneOrderPtr)malloc(capacity * sizeof(GeneOrder));

    return goa;
}

void destroyGOArray(GOArray* goa)
{
    if (goa->capacity > 0) {
        free(goa->data);
    }
}

bool readGOFile(char const* file, GOArray* goarray)
{
    FILE* fp = NULL;

    if ((fp = fopen(file, "r+")) == NULL) {
        return false;
    }

    char buff[64];

    while (fgets(buff, 64, fp) != NULL) {
        if (goarray->length < goarray->capacity) {
            memcpy(goarray->data[goarray->length].code, buff,
                GENEORDER_CODE_LENGTH * sizeof(char));
            goarray->data[goarray->length].code[GENEORDER_CODE_LENGTH - 1] = '\0';
            goarray->length++;
        }
        else {
            fclose(fp);
            return true;
        }
    }

    fclose(fp);
    return true;
}

typedef struct ProcessST {
    GOArray* pcond;
    GOArray* pbg;
    size_t beg;
    size_t end; // [beg, end)
} ProcessST;

void* processThread(void* parg)
{
    ProcessST* ppst = (ProcessST*)parg;
    GOArray* pcond = ppst->pcond;
    GOArray* pbg = ppst->pbg;
    int count = 0;

    for (size_t i = ppst->beg; i < ppst->end; i++) {
        const GeneOrderPtr gop = pcond->data + i;
        for (size_t j = 0; j < pbg->length; j++) {
            const GeneOrderPtr inner_gop = pbg->data + j;
            int inner_count = 0;

            for (size_t k = 0; k < 20; k++) {
                if (gop->code[k] != inner_gop->code[k]) {
                    if (++inner_count > 4) {
                        break;
                    }
                }
            }

            if (inner_count <= 4) {
#ifdef DBGPRINT
                printf("%d %s - %s\n", i, gop->code, inner_gop->code);
#endif
                count++;
            }
        }
    }

    return (void*)count;
}

int main(int argc, char* argv[])
{
    (void)argc;

    GOArray condgo = createGOAarray(10000);
    GOArray bggo = createGOAarray(1000000);

    printf("loading ...\n");

    START_CC(loading);
    if (!readGOFile(argv[1], &condgo) || !readGOFile(argv[2], &bggo)) {
        destroyGOArray(&condgo);
        destroyGOArray(&bggo);
        return -1;
    }
    STOP_CC(loading);
    PRINT_CC(loading);

    size_t range[] = { 0, 250, 500 };
    pthread_t thr[2] = { 0 };
    ProcessST pst[2];

    START_CC(compare);

    for (size_t i = 0; i < 2; i++) {
        pst[i].pcond = &condgo;
        pst[i].pbg = &bggo;
        pst[i].beg = range[i];
        pst[i].end = range[i + 1];
        pthread_create(&thr[i], NULL, processThread, &pst[i]);
    }

    int count = 0;
    int* ret = NULL;

    for (size_t i = 0; i < 2; i++) {
        pthread_join(thr[i], (void**)&ret);
        count += (int)ret;
    }

    STOP_CC(compare);
    PRINT_CC(compare);

    printf("result = %d\n", count);

    destroyGOArray(&condgo);
    destroyGOArray(&bggo);

    return 0;
}

编译测试

gcc -Wall -Wextra -o ccs main.c -std=c99 -O3 -lpthread && ./ccs candidate.list bg.db
巴扎黑

抱歉,今天看到还有人回复。
仔细看了一下问题,发现我以前以为只是匹配。
所以我提出用ac自动机。

但是题主是为了找到序列的差异。
这就是找两者的编辑距离。
wiki:编辑距离
wiki:来文斯坦距离

以前刷OJ的时候是使用DP(动态规划)来找一个字符串转换成另外一个字符串的最少编辑次数。

for(i:0->len)
    for(j:0->len)
        str1[i]==str2[j] ? cost=0 : cost=1
        dp[i,j]=min(
            dp[i-1, j  ] + 1,     // 刪除
            dp[i  , j-1] + 1,     // 插入
            dp[i-1, j-1] + cost   // 替換
        )

比如 :

str1    "abcdef"
str2    "acddff"

str2 转换为 str1

插入b 算一次
删除d 算一次
修改f 算一次

对于题主的ATCG基因序列来说,是不是只需要找到修改的就行了。
然而像这种
ATCGATCG
TCGATCGA
这样应该怎么算。

如果仅仅是找到修改的话,直接比较 str1[i] 和 str2[i] 就可以了。

for(i:0->len)
if(str1[i]!=str2[i] )++count;

受到@rockford 的启发。
我们可以对 原始数据 进行预处理。

candidates 中的串
GGGAGCAGGCAAGGACTCTG
A5 T2 C4 G9

进行处理之后的额外数据 A5 T2 C4 G9

bg_db 中的串
CTGCTGACGGGTGACACCCA
A4 T3 C7 G6

进行处理之后的额外数据 A4 T3 C7 G6

A5 -> A4 记作 -1
T2 -> T3 记作 +1
C4 -> C7 记作 +3
G9 -> G6 记作 -3

很明显 A 如果修改只能变成 TCG。
同理,我们只需要统计所有的+ 或者所有的 -
就可以知道他们的至少有多少不同之处。
大于4的都可以不进行比较。

通过先对比预处理的额外数据,然后再通过单次的比较算法来 进行比对。
(星期六加班-ing,下班后写一下)

Peter_Zhu

你单个的任务是确定的,需要的是把这些任务下发给 worker 去做,对于这样的计算都不是同步单进程进行的。
其实就相当于你有[a,b] 和 [c, d] 要对比,你的任务是

  1. [a, c]

  2. [a, d]

  3. [b, c]

  4. [b, d]

如果你是同步串行你需要的时间就是 4 * 单个时间
如果是你 4 个 cpu 或者 4 个 机器并行, 你的时间差不多是单个时间

所以对于像基因组这样的计算基本上都是用大型机器多核并行的任务来完成,基本上参考的原理都是 google MapReduce 这篇论文的原理

黄舟

算法我不行,但是,像你这样的大量数据,一台电脑对比肯定是不行的,像你这样数据CPU密集型任务,同意其他人说的使用集群或者多进程的方式来计算,也就是我们用map-reduce的模型去计算
map就是映射,你先将你每个candidates一个一个映射到bg_db形成类似这样的数据结构(candidates,bg_db)
做成队列然后交给不同的服务器,每个服务器用多进程去计算,只能这样了,但是你这个数据量太大了,想办法把你的任务分配好,并行计算吧,

Ty80

可以尝试用字典树来保存所有的字符串。然后在查询的时候就可以用在字典树上遍历的方法。
在树上遍历的时候,可以维护一个当前节点的序列,这个序列里保存着当前遍历到的节点和对应节点mismatch的数量。
在遍历下一个节点的时候,要把当前序列里所有的节点都尝试向下,并形成新的节点序列。
好处是可以把很多个串的当前位放在一起进行比较,可以节约一些时间。由于每个位置选择不多,mismatch也不大,所有应该不会出现当前节点序列膨胀过大的情况。(这是猜想… 没太认真验证过…)

def match(candidate, root):
  nset = [[],[]]
  currentId = 0
  nset[currentId].append((root, 0))
  for ch in candidate:
    nextId = 1 - currentId
    for item in nset[currentId]:
      node, mismatch = item
      for nx in node.child:
        if nx.key == ch:
          nset[nextId].append((nx, mismatch))
        else:
          if mismatch:
            nset[nextId].append((nx, mismatch - 1))
    nset[currentId].clear()
    currentId = 1 - currentId
  return nset[currentId]

上面的代码就是一个大概的意思。如果用C++写的话会再快很多。
整个过程都可以用集群做分布式计算。

Ty80

目测题主没有多台机器供他计算,
我有一个朴素的思路,计算每个串的字母序数和(A:0,T:1,C:2,G:3),先计算两个字符串的序数和的差值,最大不能超过12,四个A变成四个G,差值小于12的再进行处理,
只是一个大概的想法,具体的权值可以另外设置,理论上可以快很多。
另外有一个算法是计算字符串编辑距离(将一个字符串修改为另一个字符串的最少编辑次数增、删、改)的,我一下子想不起来,你可以自行查一下。

巴扎黑

我用blast blastn-short

:)

Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage