Java多執行緒批次資料匯入的方法介紹
這篇文章帶給大家的內容是關於Java多執行緒批次資料導入的方法介紹,有一定的參考價值,有需要的朋友可以參考一下,希望對你有幫助。
前言:當遇到大量資料匯入時,為了提高處理的速度,可以選擇使用多執行緒來批次處理這些處理。常見的場景有:
- 大檔案匯入資料庫(這個檔案不一定是標準的
CSV
可匯入檔案或需要在記憶體中經過一定的處理) - 資料同步(從第三方介面拉取資料處理後寫入自己的資料庫)
以上的場景有一個共通性,這類資料匯入的場景簡單來說就是將資料從一個資料來源移動到另一個資料來源,而其中必定可以分成兩步驟
- 資料讀取:從資料來源讀取資料到內存
- 資料寫入:將記憶體中的資料寫入到另外一個資料來源,可能存在資料處理
而且資料讀取的速度一般會比資料寫入的速度快很多,也就是讀取快,寫入慢。
設計想法
由於場景的特徵是讀取快,寫入慢,如果是使用多執行緒處理,建議是資料寫入部分改造為多執行緒。而資料讀取可以改造成批次讀取資料。簡單來說就是兩個重點:
- 批次讀取資料
- 多執行緒寫入資料
範例
##多執行緒批次處理最簡單的方案是使用執行緒池來進行處理,下面會透過一個模擬批次讀取和寫入的服務,以及對這個服務的多執行緒寫入呼叫作為範例,展示如何多執行緒批次資料導入。 模擬服務import java.util.concurrent.atomic.AtomicLong;
/**
* 数据批量写入用的模拟服务
*
* @author RJH
* create at 2019-04-01
*/
public class MockService {
/**
* 可读取总数
*/
private long canReadTotal;
/**
* 写入总数
*/
private AtomicLong writeTotal=new AtomicLong(0);
/**
* 写入休眠时间(单位:毫秒)
*/
private final long sleepTime;
/**
* 构造方法
*
* @param canReadTotal
* @param sleepTime
*/
public MockService(long canReadTotal, long sleepTime) {
this.canReadTotal = canReadTotal;
this.sleepTime = sleepTime;
}
/**
* 批量读取数据接口
*
* @param num
* @return
*/
public synchronized long readData(int num) {
long readNum;
if (canReadTotal >= num) {
canReadTotal -= num;
readNum = num;
} else {
readNum = canReadTotal;
canReadTotal = 0;
}
//System.out.println("read data size:" + readNum);
return readNum;
}
/**
* 写入数据接口
*/
public void writeData() {
try {
// 休眠一定时间模拟写入速度慢
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 写入总数自增
System.out.println("thread:" + Thread.currentThread() + " write data:" + writeTotal.incrementAndGet());
}
/**
* 获取写入的总数
*
* @return
*/
public long getWriteTotal() {
return writeTotal.get();
}
}
登入後複製
批次資料處理器import java.util.concurrent.atomic.AtomicLong; /** * 数据批量写入用的模拟服务 * * @author RJH * create at 2019-04-01 */ public class MockService { /** * 可读取总数 */ private long canReadTotal; /** * 写入总数 */ private AtomicLong writeTotal=new AtomicLong(0); /** * 写入休眠时间(单位:毫秒) */ private final long sleepTime; /** * 构造方法 * * @param canReadTotal * @param sleepTime */ public MockService(long canReadTotal, long sleepTime) { this.canReadTotal = canReadTotal; this.sleepTime = sleepTime; } /** * 批量读取数据接口 * * @param num * @return */ public synchronized long readData(int num) { long readNum; if (canReadTotal >= num) { canReadTotal -= num; readNum = num; } else { readNum = canReadTotal; canReadTotal = 0; } //System.out.println("read data size:" + readNum); return readNum; } /** * 写入数据接口 */ public void writeData() { try { // 休眠一定时间模拟写入速度慢 Thread.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } // 写入总数自增 System.out.println("thread:" + Thread.currentThread() + " write data:" + writeTotal.incrementAndGet()); } /** * 获取写入的总数 * * @return */ public long getWriteTotal() { return writeTotal.get(); } }
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 基于线程池的多线程批量写入处理器
* @author RJH
* create at 2019-04-01
*/
public class SimpleBatchHandler {
private ExecutorService executorService;
private MockService service;
/**
* 每次批量读取的数据量
*/
private int batch;
/**
* 线程个数
*/
private int threadNum;
public SimpleBatchHandler(MockService service, int batch,int threadNum) {
this.service = service;
this.batch = batch;
//使用固定数目的线程池
this.executorService = Executors.newFixedThreadPool(threadNum);
}
/**
* 开始处理
*/
public void startHandle() {
// 开始处理的时间
long startTime = System.currentTimeMillis();
System.out.println("start handle time:" + startTime);
long readData;
while ((readData = service.readData(batch)) != 0) {// 批量读取数据,知道读取不到数据才停止
for (long i = 0; i < readData; i++) {
executorService.execute(() -> service.writeData());
}
}
// 关闭线程池
executorService.shutdown();
while (!executorService.isTerminated()) {//等待线程池中的线程执行完
}
// 结束时间
long endTime = System.currentTimeMillis();
System.out.println("end handle time:" + endTime);
// 总耗时
System.out.println("total handle time:" + (endTime - startTime) + "ms");
// 写入总数
System.out.println("total write num:" + service.getWriteTotal());
}
}
登入後複製
測試類別import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 基于线程池的多线程批量写入处理器 * @author RJH * create at 2019-04-01 */ public class SimpleBatchHandler { private ExecutorService executorService; private MockService service; /** * 每次批量读取的数据量 */ private int batch; /** * 线程个数 */ private int threadNum; public SimpleBatchHandler(MockService service, int batch,int threadNum) { this.service = service; this.batch = batch; //使用固定数目的线程池 this.executorService = Executors.newFixedThreadPool(threadNum); } /** * 开始处理 */ public void startHandle() { // 开始处理的时间 long startTime = System.currentTimeMillis(); System.out.println("start handle time:" + startTime); long readData; while ((readData = service.readData(batch)) != 0) {// 批量读取数据,知道读取不到数据才停止 for (long i = 0; i < readData; i++) { executorService.execute(() -> service.writeData()); } } // 关闭线程池 executorService.shutdown(); while (!executorService.isTerminated()) {//等待线程池中的线程执行完 } // 结束时间 long endTime = System.currentTimeMillis(); System.out.println("end handle time:" + endTime); // 总耗时 System.out.println("total handle time:" + (endTime - startTime) + "ms"); // 写入总数 System.out.println("total write num:" + service.getWriteTotal()); } }
/**
* SimpleBatchHandler的测试类
* @author RJH
* create at 2019-04-01
*/
public class SimpleBatchHandlerTest {
public static void main(String[] args) {
// 总数
long total=100000;
// 休眠时间
long sleepTime=100;
// 每次拉取的数量
int batch=100;
// 线程个数
int threadNum=16;
MockService mockService=new MockService(total,sleepTime);
SimpleBatchHandler handler=new SimpleBatchHandler(mockService,batch,threadNum);
handler.startHandle();
}
}
登入後複製
執行結果/** * SimpleBatchHandler的测试类 * @author RJH * create at 2019-04-01 */ public class SimpleBatchHandlerTest { public static void main(String[] args) { // 总数 long total=100000; // 休眠时间 long sleepTime=100; // 每次拉取的数量 int batch=100; // 线程个数 int threadNum=16; MockService mockService=new MockService(total,sleepTime); SimpleBatchHandler handler=new SimpleBatchHandler(mockService,batch,threadNum); handler.startHandle(); } }
start handle time:1554298681755
thread:Thread[pool-1-thread-2,5,main] write data:1
thread:Thread[pool-1-thread-1,5,main] write data:2
...省略部分输出
thread:Thread[pool-1-thread-4,5,main] write data:100000
end handle time:1554299330202
total handle time:648447ms
total write num:100000
登入後複製
分析 #在單執行緒情況下的執行時間應該是start handle time:1554298681755 thread:Thread[pool-1-thread-2,5,main] write data:1 thread:Thread[pool-1-thread-1,5,main] write data:2 ...省略部分输出 thread:Thread[pool-1-thread-4,5,main] write data:100000 end handle time:1554299330202 total handle time:648447ms total write num:100000
total*sleepTime,即
10000000ms,而改造為多執行緒後執行時間為
648447ms。
Java影片教學】
以上是Java多執行緒批次資料匯入的方法介紹的詳細內容。更多資訊請關注PHP中文網其他相關文章!

熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

AI Hentai Generator
免費產生 AI 無盡。

熱門文章

熱工具

記事本++7.3.1
好用且免費的程式碼編輯器

SublimeText3漢化版
中文版,非常好用

禪工作室 13.0.1
強大的PHP整合開發環境

Dreamweaver CS6
視覺化網頁開發工具

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

熱門話題

Java 8引入了Stream API,提供了一種強大且表達力豐富的處理數據集合的方式。然而,使用Stream時,一個常見問題是:如何從forEach操作中中斷或返回? 傳統循環允許提前中斷或返回,但Stream的forEach方法並不直接支持這種方式。本文將解釋原因,並探討在Stream處理系統中實現提前終止的替代方法。 延伸閱讀: Java Stream API改進 理解Stream forEach forEach方法是一個終端操作,它對Stream中的每個元素執行一個操作。它的設計意圖是處

膠囊是一種三維幾何圖形,由一個圓柱體和兩端各一個半球體組成。膠囊的體積可以通過將圓柱體的體積和兩端半球體的體積相加來計算。本教程將討論如何使用不同的方法在Java中計算給定膠囊的體積。 膠囊體積公式 膠囊體積的公式如下: 膠囊體積 = 圓柱體體積 兩個半球體體積 其中, r: 半球體的半徑。 h: 圓柱體的高度(不包括半球體)。 例子 1 輸入 半徑 = 5 單位 高度 = 10 單位 輸出 體積 = 1570.8 立方單位 解釋 使用公式計算體積: 體積 = π × r2 × h (4
