数据处理中提升性能的方法-引入并发但是避免同步

WBOY
Lepaskan: 2016-06-20 12:35:59
asal
829 orang telah melayarinya

背景

只要存在数据库,就会有后台批量处理数据的需求,比如数据表备份、定期清理、数据替换、数据迁移,对于批量处理来说,往往会涉及大量的查询、过滤、归类、聚合计算,在批量脚本中直接查询数据库往往性能太低,甚至会因为一个大型的SQL导致数据库锁表出现线上事故,因此一般采用先导出到文件,在文件上计算然后再导入,比如:

1、使用mysql -e “select * from table” > output.txt的方式,执行SQL,将结果导出到文件中;

2、针对文件,使用各种方式进行聚合、过滤、替换等计算,最后产出成需要使用的格式;

3、发布产出的文件,或者使用load data命令导入到数据库;

由于只是一次性的批量查询数据库导出到文件,然后针对文件进行计算,而不是每次都查询数据库,大量节省了网络的IO耗费,从而提升处理的速度。

然而得到了导出的文件之后,如果文件过大,或者计算逻辑复杂比如大量的调用了耗费CPU的正则匹配、聚合计算,那么单线程的处理会耗费大量的时间,这时候就可以引入并发处理,使得机器的CPU、内存、IO、网络等资源全部充分利用起来,大幅度降低处理时间。

引入多线程,拆分输入文件为多个,每个小文件启动一个处理线程

HADOOP的MAP-REDUCE的做法,是先将文件split成小分片文件,然后针对每个分片做计算,最后将每个分片的结果聚合在一起,然而由于HADOOP的调度、集群稳定性等各种原因,对于MB大小级别的文件处理,会发现速度非常慢,有时候甚至比单机单线程处理速度还慢,将单机单线程改成多线程,往往会发现令人惊讶的效果提升。

直观的做法,是使用主线程读取输入的单个大文件,然后将读取的结果分配给子线程处理,然后主线程做整合,这种方式因为多线程共用了单个文件的IO,需要加入对文件的同步机制,最后会发现性能瓶颈在这单个文件的读取同步之上。

可以将大文件分片成小文件,然后每个文件分配给单个线程单独处理,避免线程间的资源同步,每个线程会享用单独的CPU核、内存单元、文件句柄,处理速度能达到最快。

使用这种方式,可以用以下的步骤进行:

1、使用SHELL,将输入文件拆分成预定线程数目的份数,存放到一个目录中;

2、以输入文件的目录路径作为参数,编程语言JAVA/PYTHON读取该目录的所有文件,对于每个文件启动一个处理线程,进行处理;

3、SHELL将输出目录的所有文件,使用cat file* > output_file的方式,得到最终的计算结果

Shell

将输入文件拆分成多个小文件,启动多线程进行处理,输出结果文件

function run multitask(){ # 开启多个异步线程 SPLITS COUNT=20 # 输入文件总数 sourcefile linescount=

cat ${input_file} | wc -l
Salin selepas log masuk
Salin selepas log masuk
# 计算出拆分的文件个数 split filelines count=$(( $sourcefile linescount / $SPLITS COUNT )) # 进行文件拆分 split -l $splitfile linescount -a 3 -d ${input file} ${input

dir}/inputFile_

# 执行JAVA程序$JAVA_CMD -classpath $jar_path "net.crazyant.BackTaskMain" "${input_dir}" "${output_dir}" "${output_err_dir}"# 合并文件cat ${output_dir}/* > ${output_file}
Salin selepas log masuk

}

run multitask

## 将输入文件拆分成多个小文件,启动多线程进行处理,输出结果文件#function run_multi_task(){ # 开启多个异步线程 SPLITS_COUNT=20 # 输入文件总数 source_file_lines_count=`cat ${input_file} | wc -l` # 计算出拆分的文件个数 split_file_lines_count=$(( $source_file_lines_count / $SPLITS_COUNT )) # 进行文件拆分 split -l $split_file_lines_count -a 3 -d ${input_file} ${input_dir}/inputFile_  # 执行JAVA程序 $JAVA_CMD -classpath $jar_path "net.crazyant.BackTaskMain" "${input_dir}" "${output_dir}" "${output_err_dir}"  # 合并文件 cat ${output_dir}/* > ${output_file}} run_multi_task
Salin selepas log masuk

这里注意,拆分文件的时候,不能使用split按照大小进行拆分,因为会把输入文件中的行截断;

对应的JAVA程序,则是通过读取文件夹中文件列表的方法,每个文件单独启动一个线程:

Java

public class BackTaskMain {    public static void main(String[] args) {        String inputDataDir = args[1];        String outputDataDir = args[2];        String errDataDir = args[3];                File inputDir = new File(inputDataDir);        File[] inputFiles = inputDir.listFiles();                // 记录开启的线程        List<Thread> threads = new ArrayList<Thread>();        for (File inputFile : inputFiles) {            if (inputFile.getName().equals(".") || inputFile.getName().equals("..")) {                continue;            }                        // 针对每个inputFile,生成对应的outputFile和errFile            String outputSrcLiceFpath = outputDataDir + "/" + inputFile.getName() + ".out";            String errorOutputFpath = errDataDir + "/" + inputFile.getName() + ".err";                        // 创建Runnable            BackRzInterface backRzInterface = new BackRzInterface();            backRzInterface.setInputFilePath(inputFile.getAbsolutePath());            backRzInterface.setOutputFilePath(outputSrcLiceFpath);            backRzInterface.setErrorOutputFpath(errorOutputFpath);                        // 创建Thread,启动线程            Thread singleRunThread = new Thread(backRzInterface);            threads.add(singleRunThread);            singleRunThread.start();        }                for (Thread thread : threads) {            try {                // 使用thread.join(),等待所有的线程执行完毕                thread.join();                System.out.println(thread.getName() + " has over");            } catch (InterruptedException e) {                e.printStackTrace();            }        }        System.out.println("proccess all over");    }}
Salin selepas log masuk
public class BackTaskMain {    public static void main(String[] args) {        String inputDataDir = args[1];        String outputDataDir = args[2];        String errDataDir = args[3];                FileinputDir = new File(inputDataDir);        File[] inputFiles = inputDir.listFiles();                // 记录开启的线程        List<Thread> threads = new ArrayList<Thread>();        for (FileinputFile : inputFiles) {            if (inputFile.getName().equals(".") || inputFile.getName().equals("..")) {                continue;            }                        // 针对每个inputFile,生成对应的outputFile和errFile            String outputSrcLiceFpath = outputDataDir + "/" + inputFile.getName() + ".out";            String errorOutputFpath = errDataDir + "/" + inputFile.getName() + ".err";                        // 创建Runnable            BackRzInterfacebackRzInterface = new BackRzInterface();            backRzInterface.setInputFilePath(inputFile.getAbsolutePath());            backRzInterface.setOutputFilePath(outputSrcLiceFpath);            backRzInterface.setErrorOutputFpath(errorOutputFpath);                        // 创建Thread,启动线程            ThreadsingleRunThread = new Thread(backRzInterface);            threads.add(singleRunThread);            singleRunThread.start();        }                for (Threadthread : threads) {            try {                // 使用thread.join(),等待所有的线程执行完毕                thread.join();                System.out.println(thread.getName() + " has over");            } catch (InterruptedException e) {                e.printStackTrace();            }        }        System.out.println("proccess all over");    }}
Salin selepas log masuk

通过这种方式,将大文件拆分成小文件,启动多个线程,每个线程处理一个小文件,最终将每个小文件的结果聚合,就得到了最终产出,性能上却大幅提升。

若有依赖的资源,可以按线程先复制、拆分、克隆,防止依赖的资源成为性能瓶颈

在上面的代码中,BackRzInterface是每个线程启动时要使用的Runnable对象,可以看到用的是每次new的方式创建:

// 创建RunnableBackRzInterface backRzInterface = new BackRzInterface();

这样每个处理线程依赖的BackRzInterface都是独立的,对这个Runnable代码的使用不会存在同步问题。

如果多线程处理中需要使用外部资源,最好想办法使得每个线程单独使用自己独占的资源,相互之间若不会存在冲突,可以实现最大化并发处理。

其他一些例子,比如:

  • 多线程用到了字典文件,那么方法是首先将字典文件复制多份,每个线程使用自己独占的字典,避免并发同步访问字典;
  • 多线程若需要统一ID发号,可以提前计算出每个输入文件的行数,然后依次生成第一个线程需要的ID范围、第二个线程需要的ID范围,这些不同的ID范围也可以分别生成不同的文件,这样每个线程会使用各自独立的ID资源,避免了多个线程单时刻访问单个ID发号服务,使得发号成为性能瓶颈的可能;
  • 多线程如果依赖相同的Service,如果可以每次new对象就每次new,如果Bean都是在Spring中管理,则将Service加上@Scope(“prototype”),或者将对象每次clone一下得到一个新对象,保证最终每个线程使用自己独占的对象。
  • 尽量使用函数式编程的思想,每个函数都不要产生副作用,不要修改入参,结果只能通过return返回,避免增加代码同步冲突的可能;

通过以上这些类似的方法,每次将可能需要同步访问的共享资源,通过复制、分片等手段得到不同份,每个线程单独访问自己那一份,避免同步访问,最终实现性能最优。

避免同步的终极方法:使用多进程进行实现资源隔离

如果将文件拆分成了多份,依赖的ID、词典等资源也相应提供了多份,但是发现代码中存在无法解决的代码级别同步,该怎么办呢?

相对于想尽办法解决代码中的同步问题来说,多线程和多进程之间的性能差别微乎其微,我们都知道线程会使用进程的资源,所以导致了线程之间存在竞争进程资源,但是对于进程来说,CPU、内存等硬件资源是完全隔离的,这时候将程序运行在多进程而不是多线程,反而能更好的提升性能。

对于一些支持多线程不好的语言,比如PHP,直接用这种多进程计算的方法,速度并不比支持多线程的JAVA、PYTHON语言差:

Shell

要拆分的文件数,也就是要启动的多进程数

SPLITS_COUNT=20

input splitsdir="${input dir}splits" output splitsdir="${output dir}splits"

输入文件行数

source filelines_count=

cat ${input_file} | wc -l
Salin selepas log masuk
Salin selepas log masuk

每个文件拆分的行数=总行数除以要拆分的文件个数(也就是对应进程的个数)

split filelines count=$(( $sourcefile linescount / ${SPLITS_COUNT} ))

执行拆分,注意这里使用-l进行行级别拆分更好

split -l $split filelines count -a 3 -d ${inputfile} ${input splitsdir}/inputfile_

process idx=1 for fname in $(ls ${inputsplits dir}); do inputfpath=${input splitsdir}/$fname ouput fpath=${outputsplits dir}/$fname # 后台执行所有进程 php "/php/main.php" "${inputfpath}" "${ouput fpath}" & (( processidx++ )) done

等待所有后台进程执行结束

wait

合并文件

cat $output splitsdir/* > ${output_file}

# 要拆分的文件数,也就是要启动的多进程数SPLITS_COUNT=20 input_splits_dir="${input_dir}_splits"output_splits_dir="${output_dir}_splits"# 输入文件行数source_file_lines_count=`cat ${input_file} | wc -l`# 每个文件拆分的行数=总行数除以要拆分的文件个数(也就是对应进程的个数)split_file_lines_count=$(( $source_file_lines_count / ${SPLITS_COUNT} ))# 执行拆分,注意这里使用-l进行行级别拆分更好split -l $split_file_lines_count -a 3 -d ${input_file} ${input_splits_dir}/inputfile_ process_idx=1for fname in $(ls ${input_splits_dir}); do input_fpath=${input_splits_dir}/$fname ouput_fpath=${output_splits_dir}/$fname # 后台执行所有进程 php "/php/main.php" "${input_fpath}" "${ouput_fpath}" & (( process_idx++ )) done # 等待所有后台进程执行结束wait # 合并文件cat $output_splits_dir/* > ${output_file}
Salin selepas log masuk

上述代码中,使用shell的&符号,可以在后台同时启动多个进程,使用wait语法,可以实现多线程的Thread.join特性,等待所有的进程执行结束。

总结

对于输入文件的大小、计算的复杂度处于单机和集群计算之间的数据处理,使用并发处理最为合适,但是并发的同步处理却会降低多线程的性能,这时可以借助于输入文件复制拆分、依赖资源复制拆分切片等方法,实现每个线程处理自己的独占资源,从而最大化提升计算速度。而对于一些无法避免的代码同步冲突逻辑,可以退化为多进程处理数据,借助于SHELL的后台进程支持,实现进程级别的资源独占,最终大幅提升处理性能。

sumber:php.cn
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan
Tentang kita Penafian Sitemap
Laman web PHP Cina:Latihan PHP dalam talian kebajikan awam,Bantu pelajar PHP berkembang dengan cepat!