python - 如何利用多进程来解决循环嵌套速度不行的问题?
代言
代言 2017-06-12 09:22:04
0
6
2266

有一个循环里面套循环的模式,
在内循环的循环体内要同时用到大循环和小循环的变量。

我这里是简化成了一个简单的模型,
这种模式如果函数复杂的话速度超级慢,
想问一下如何使用多进程的办法来解决速度问题?

我的思路是,只对小循环采用多进程,
在大循环的循环体内写多进程的代码,
但是一直失败,
求大神给出正确的代码。

拜谢!

import random as r
list1=list(range(100))
i=0
reslist=[]
while i<2000:#大循环
    alist=[]#三个列表变量,每次循环开始时清空
    blist=[]
    clist=[]
    for each in list1:#小循环
        x=r.randint(i+30,i+60)+each#涉及到大、小循环变量的几个函数,这里用random示意
        y=r.randint(i+60,i+120)+each
        z=r.randint(i+60,i+180)+each
        
        res=2.5*x-y-z
        reslist.append(res)#对函数结果进行操作
        if res>=50:
            alist.append(each)
        if -50<res<50:
            blist.append(each)
        if res<=-50:
            clist.append(each)
            
    for each in alist:#在大循环中对小循环中得出的结果进行进一步其他操作
        print(each)
    for each in blist:
        print(each)
    for each in clist:
        print(each)
    
    i+=1
代言
代言

全部回复(6)
学习ing

首先,并行计算需要各个并行运算的子程序间没有相互因果关系。
小循环内,res与x,y,z,与alist,blist,clist,都是因果关系密切的,很难拆分并行计算。
题主贴上来的虽然不是原始代码,不知道原始代码里大循环间有没有因果关系,不过从示意代码来看,
把大循环拆分为N个线程(用不到进程吧)应该是可以的,每个线程计算2000/N次。
例如,分为8个线程,线程1计算i=0到249,线程2计算i=250到499,依次类推。。。
这里N的大小,可以根据CPU的核数来定,如果N超过CPU的核数,就没有太大意义了,反而有可能会降低效率。

洪涛

中间应该用elif吧,最后面for的缩进好像也有问题

为情所困

可以在大循环这里开多进程,比如大循环2000次,如CPU的核数是4,则开4个进程,每个进程负责运行500个

小循环结束后,可以开子线程去执行下面的这些后续操作,大循环继续往前处理

for each in alist:#在大循环中对小循环中得出的结果进行进一步其他操作
    print(each)
for each in blist:
    print(each)
for each in clist:
    print(each)
phpcn_u1582

可以将小循环用子进程去处理 不过这样 你需要两个大循环。一个循环处理小循环 ,等处理完这个循环在来个大循环处理后面的事情

像这样

import random as r


def cumput(i, list1):
    alist = []
    blist = []
    clist = []
    reslist = []
    for each in list1:  # 小循环
        x = r.randint(i + 30, i + 60) + each  # 涉及到大、小循环变量的几个函数,这里用random示意
        y = r.randint(i + 60, i + 120) + each
        z = r.randint(i + 60, i + 180) + each

        res = 2.5 * x - y - z
        reslist.append(res)  # 对函数结果进行操作
        if res >= 50:
            alist.append(each)
        if -50 < res < 50:
            blist.append(each)
        if res <= -50:
            clist.append(each)
    return alist, blist, clist, reslist


if __name__ == '__main__':
    multiprocessing.freeze_support()
    list1 = list(range(100))
    i = 0
    pool = multiprocessing.Pool(2)
    res = {}
    while i < 2000:  # 大循环
        res[i]=pool.apply_async(cumput, (i, list1,))
        i += 1
    pool.close()
    pool.join()
    for i in res:
        for each in res[i].get()[0]:  # 在大循环中对小循环中得出的结果进行进一步其他操作
            print(each)
        for each in res[i].get()[1]:
            print(each)
        for each in res[i].get()[2]:
            print(each)
typecho

如果小循环中执行的函数比较耗时的话可以考虑生产者-消费者模型


import random
from threading import Thread
from Queue import Queue

resqueue = Queue()
aqueue = Queue()
bqueue = Queue()
cqueue = Queue()

def producer():
    list1=list(range(100))
    
    for _ in range(2000):
        for each in list1:
            x=r.randint(i+30,i+60)+each
            y=r.randint(i+60,i+120)+each
            z=r.randint(i+60,i+180)+each
            
            res=2.5*x-y-z
            resqueue.put(res)
            
            if res>=50:
                aqueue.put(each)
            if -50<res<50:
                bqueue.put(each)
            if res<=-50:
                cqueue.put(each)

def consumer_a():
    while True:
        try:
            data = aqueue.get(timeout=5)
        except Queue.Empty:
            return
        else:
            # 耗时操作
            deal_data(data)
            aqueue.task_done()
            
def consumer_b():
    while True:
        try:
            data = bqueue.get(timeout=5)
        except Queue.Empty:
            return
        else:
            # 耗时操作
            deal_data(data)
            bqueue.task_done()
            
 def consumer_c():
    while True:
        try:
            data = cqueue.get(timeout=5)
        except Queue.Empty:
            return
        else:
            # 耗时操作
            deal_data(data)
            cqueue.task_done()

 def consumer_res():
    while True:
        try:
            data = resqueue.get(timeout=5)
        except Queue.Empty:
            return
        else:
            # 耗时操作
            deal_data(data)
            resqueue.task_done()
            
if __name__ == "__main__":
    t1 = Thread(target=producer)
    t2 = Thread(target=consumer_a)
    ...
    
    t1.start()
    t2.start()
                       
刘奇

题主是不是应该先设计好进程的输入与输出,多进程做并行计算的话进程之间的通信是最重要的,据我了解的应该是MPI,比如多层循环,应该是先分发部分数据到每个进程,每个进程做计算后再返回数据整合点,然后合并结果输出。

还有一个比较重要的点是估算每个进程的执行时间,毕竟有进程间的通信的话等待时间也会导致效率下降。

@一代键客 所说,你的嵌套不太符合并行计算的输入规则,可以看看这个例子

http://blog.csdn.net/zouxy09/...

之前测试过文中的例子,没啥问题,你沿着这些做的话应该是可以搞出来的

热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板