Home > Backend Development > PHP Tutorial > 数据处理中提升性能的方法-引入并发但是避免同步

数据处理中提升性能的方法-引入并发但是避免同步

WBOY
Release: 2016-06-20 12:35:59
Original
879 people have browsed it

背景

只要存在数据库,就会有后台批量处理数据的需求,比如数据表备份、定期清理、数据替换、数据迁移,对于批量处理来说,往往会涉及大量的查询、过滤、归类、聚合计算,在批量脚本中直接查询数据库往往性能太低,甚至会因为一个大型的SQL导致数据库锁表出现线上事故,因此一般采用先导出到文件,在文件上计算然后再导入,比如:

1、使用mysql -e “select * from table” > output.txt的方式,执行SQL,将结果导出到文件中;

2、针对文件,使用各种方式进行聚合、过滤、替换等计算,最后产出成需要使用的格式;

3、发布产出的文件,或者使用load data命令导入到数据库;

由于只是一次性的批量查询数据库导出到文件,然后针对文件进行计算,而不是每次都查询数据库,大量节省了网络的IO耗费,从而提升处理的速度。

然而得到了导出的文件之后,如果文件过大,或者计算逻辑复杂比如大量的调用了耗费CPU的正则匹配、聚合计算,那么单线程的处理会耗费大量的时间,这时候就可以引入并发处理,使得机器的CPU、内存、IO、网络等资源全部充分利用起来,大幅度降低处理时间。

引入多线程,拆分输入文件为多个,每个小文件启动一个处理线程

HADOOP的MAP-REDUCE的做法,是先将文件split成小分片文件,然后针对每个分片做计算,最后将每个分片的结果聚合在一起,然而由于HADOOP的调度、集群稳定性等各种原因,对于MB大小级别的文件处理,会发现速度非常慢,有时候甚至比单机单线程处理速度还慢,将单机单线程改成多线程,往往会发现令人惊讶的效果提升。

直观的做法,是使用主线程读取输入的单个大文件,然后将读取的结果分配给子线程处理,然后主线程做整合,这种方式因为多线程共用了单个文件的IO,需要加入对文件的同步机制,最后会发现性能瓶颈在这单个文件的读取同步之上。

可以将大文件分片成小文件,然后每个文件分配给单个线程单独处理,避免线程间的资源同步,每个线程会享用单独的CPU核、内存单元、文件句柄,处理速度能达到最快。

使用这种方式,可以用以下的步骤进行:

1、使用SHELL,将输入文件拆分成预定线程数目的份数,存放到一个目录中;

2、以输入文件的目录路径作为参数,编程语言JAVA/PYTHON读取该目录的所有文件,对于每个文件启动一个处理线程,进行处理;

3、SHELL将输出目录的所有文件,使用cat file* > output_file的方式,得到最终的计算结果

Shell

将输入文件拆分成多个小文件,启动多线程进行处理,输出结果文件

function run multitask(){ # 开启多个异步线程 SPLITS COUNT=20 # 输入文件总数 sourcefile linescount=

cat ${input_file} | wc -l
Copy after login
Copy after login
# 计算出拆分的文件个数 split filelines count=$(( $sourcefile linescount / $SPLITS COUNT )) # 进行文件拆分 split -l $splitfile linescount -a 3 -d ${input file} ${input

dir}/inputFile_

# 执行JAVA程序$JAVA_CMD -classpath $jar_path "net.crazyant.BackTaskMain" "${input_dir}" "${output_dir}" "${output_err_dir}"# 合并文件cat ${output_dir}/* > ${output_file}
Copy after login

}

run multitask

## 将输入文件拆分成多个小文件,启动多线程进行处理,输出结果文件#function run_multi_task(){ # 开启多个异步线程 SPLITS_COUNT=20 # 输入文件总数 source_file_lines_count=`cat ${input_file} | wc -l` # 计算出拆分的文件个数 split_file_lines_count=$(( $source_file_lines_count / $SPLITS_COUNT )) # 进行文件拆分 split -l $split_file_lines_count -a 3 -d ${input_file} ${input_dir}/inputFile_  # 执行JAVA程序 $JAVA_CMD -classpath $jar_path "net.crazyant.BackTaskMain" "${input_dir}" "${output_dir}" "${output_err_dir}"  # 合并文件 cat ${output_dir}/* > ${output_file}} run_multi_task
Copy after login

这里注意,拆分文件的时候,不能使用split按照大小进行拆分,因为会把输入文件中的行截断;

对应的JAVA程序,则是通过读取文件夹中文件列表的方法,每个文件单独启动一个线程:

Java

public class BackTaskMain {    public static void main(String[] args) {        String inputDataDir = args[1];        String outputDataDir = args[2];        String errDataDir = args[3];                File inputDir = new File(inputDataDir);        File[] inputFiles = inputDir.listFiles();                // 记录开启的线程        List<Thread> threads = new ArrayList<Thread>();        for (File inputFile : inputFiles) {            if (inputFile.getName().equals(".") || inputFile.getName().equals("..")) {                continue;            }                        // 针对每个inputFile,生成对应的outputFile和errFile            String outputSrcLiceFpath = outputDataDir + "/" + inputFile.getName() + ".out";            String errorOutputFpath = errDataDir + "/" + inputFile.getName() + ".err";                        // 创建Runnable            BackRzInterface backRzInterface = new BackRzInterface();            backRzInterface.setInputFilePath(inputFile.getAbsolutePath());            backRzInterface.setOutputFilePath(outputSrcLiceFpath);            backRzInterface.setErrorOutputFpath(errorOutputFpath);                        // 创建Thread,启动线程            Thread singleRunThread = new Thread(backRzInterface);            threads.add(singleRunThread);            singleRunThread.start();        }                for (Thread thread : threads) {            try {                // 使用thread.join(),等待所有的线程执行完毕                thread.join();                System.out.println(thread.getName() + " has over");            } catch (InterruptedException e) {                e.printStackTrace();            }        }        System.out.println("proccess all over");    }}
Copy after login
public class BackTaskMain {    public static void main(String[] args) {        String inputDataDir = args[1];        String outputDataDir = args[2];        String errDataDir = args[3];                FileinputDir = new File(inputDataDir);        File[] inputFiles = inputDir.listFiles();                // 记录开启的线程        List<Thread> threads = new ArrayList<Thread>();        for (FileinputFile : inputFiles) {            if (inputFile.getName().equals(".") || inputFile.getName().equals("..")) {                continue;            }                        // 针对每个inputFile,生成对应的outputFile和errFile            String outputSrcLiceFpath = outputDataDir + "/" + inputFile.getName() + ".out";            String errorOutputFpath = errDataDir + "/" + inputFile.getName() + ".err";                        // 创建Runnable            BackRzInterfacebackRzInterface = new BackRzInterface();            backRzInterface.setInputFilePath(inputFile.getAbsolutePath());            backRzInterface.setOutputFilePath(outputSrcLiceFpath);            backRzInterface.setErrorOutputFpath(errorOutputFpath);                        // 创建Thread,启动线程            ThreadsingleRunThread = new Thread(backRzInterface);            threads.add(singleRunThread);            singleRunThread.start();        }                for (Threadthread : threads) {            try {                // 使用thread.join(),等待所有的线程执行完毕                thread.join();                System.out.println(thread.getName() + " has over");            } catch (InterruptedException e) {                e.printStackTrace();            }        }        System.out.println("proccess all over");    }}
Copy after login

通过这种方式,将大文件拆分成小文件,启动多个线程,每个线程处理一个小文件,最终将每个小文件的结果聚合,就得到了最终产出,性能上却大幅提升。

若有依赖的资源,可以按线程先复制、拆分、克隆,防止依赖的资源成为性能瓶颈

在上面的代码中,BackRzInterface是每个线程启动时要使用的Runnable对象,可以看到用的是每次new的方式创建:

// 创建RunnableBackRzInterface backRzInterface = new BackRzInterface();

这样每个处理线程依赖的BackRzInterface都是独立的,对这个Runnable代码的使用不会存在同步问题。

如果多线程处理中需要使用外部资源,最好想办法使得每个线程单独使用自己独占的资源,相互之间若不会存在冲突,可以实现最大化并发处理。

其他一些例子,比如:

  • 多线程用到了字典文件,那么方法是首先将字典文件复制多份,每个线程使用自己独占的字典,避免并发同步访问字典;
  • 多线程若需要统一ID发号,可以提前计算出每个输入文件的行数,然后依次生成第一个线程需要的ID范围、第二个线程需要的ID范围,这些不同的ID范围也可以分别生成不同的文件,这样每个线程会使用各自独立的ID资源,避免了多个线程单时刻访问单个ID发号服务,使得发号成为性能瓶颈的可能;
  • 多线程如果依赖相同的Service,如果可以每次new对象就每次new,如果Bean都是在Spring中管理,则将Service加上@Scope(“prototype”),或者将对象每次clone一下得到一个新对象,保证最终每个线程使用自己独占的对象。
  • 尽量使用函数式编程的思想,每个函数都不要产生副作用,不要修改入参,结果只能通过return返回,避免增加代码同步冲突的可能;

通过以上这些类似的方法,每次将可能需要同步访问的共享资源,通过复制、分片等手段得到不同份,每个线程单独访问自己那一份,避免同步访问,最终实现性能最优。

避免同步的终极方法:使用多进程进行实现资源隔离

如果将文件拆分成了多份,依赖的ID、词典等资源也相应提供了多份,但是发现代码中存在无法解决的代码级别同步,该怎么办呢?

相对于想尽办法解决代码中的同步问题来说,多线程和多进程之间的性能差别微乎其微,我们都知道线程会使用进程的资源,所以导致了线程之间存在竞争进程资源,但是对于进程来说,CPU、内存等硬件资源是完全隔离的,这时候将程序运行在多进程而不是多线程,反而能更好的提升性能。

对于一些支持多线程不好的语言,比如PHP,直接用这种多进程计算的方法,速度并不比支持多线程的JAVA、PYTHON语言差:

Shell

要拆分的文件数,也就是要启动的多进程数

SPLITS_COUNT=20

input splitsdir="${input dir}splits" output splitsdir="${output dir}splits"

输入文件行数

source filelines_count=

cat ${input_file} | wc -l
Copy after login
Copy after login

每个文件拆分的行数=总行数除以要拆分的文件个数(也就是对应进程的个数)

split filelines count=$(( $sourcefile linescount / ${SPLITS_COUNT} ))

执行拆分,注意这里使用-l进行行级别拆分更好

split -l $split filelines count -a 3 -d ${inputfile} ${input splitsdir}/inputfile_

process idx=1 for fname in $(ls ${inputsplits dir}); do inputfpath=${input splitsdir}/$fname ouput fpath=${outputsplits dir}/$fname # 后台执行所有进程 php "/php/main.php" "${inputfpath}" "${ouput fpath}" & (( processidx++ )) done

等待所有后台进程执行结束

wait

合并文件

cat $output splitsdir/* > ${output_file}

# 要拆分的文件数,也就是要启动的多进程数SPLITS_COUNT=20 input_splits_dir="${input_dir}_splits"output_splits_dir="${output_dir}_splits"# 输入文件行数source_file_lines_count=`cat ${input_file} | wc -l`# 每个文件拆分的行数=总行数除以要拆分的文件个数(也就是对应进程的个数)split_file_lines_count=$(( $source_file_lines_count / ${SPLITS_COUNT} ))# 执行拆分,注意这里使用-l进行行级别拆分更好split -l $split_file_lines_count -a 3 -d ${input_file} ${input_splits_dir}/inputfile_ process_idx=1for fname in $(ls ${input_splits_dir}); do input_fpath=${input_splits_dir}/$fname ouput_fpath=${output_splits_dir}/$fname # 后台执行所有进程 php "/php/main.php" "${input_fpath}" "${ouput_fpath}" & (( process_idx++ )) done # 等待所有后台进程执行结束wait # 合并文件cat $output_splits_dir/* > ${output_file}
Copy after login

上述代码中,使用shell的&符号,可以在后台同时启动多个进程,使用wait语法,可以实现多线程的Thread.join特性,等待所有的进程执行结束。

总结

对于输入文件的大小、计算的复杂度处于单机和集群计算之间的数据处理,使用并发处理最为合适,但是并发的同步处理却会降低多线程的性能,这时可以借助于输入文件复制拆分、依赖资源复制拆分切片等方法,实现每个线程处理自己的独占资源,从而最大化提升计算速度。而对于一些无法避免的代码同步冲突逻辑,可以退化为多进程处理数据,借助于SHELL的后台进程支持,实现进程级别的资源独占,最终大幅提升处理性能。

source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template