


Bagaimana untuk melaksanakan penyegerakan data berbilang benang dan kumpulan besar dalam Java
Latar Belakang
Baru-baru ini saya menemui fungsi yang mempunyai 3 juta+ data dalam masa dua bulan, dan ia masih terkumpul selepas itu Kerana semua data telah disimpan dalam jadual mysql dari awal, ia kini perlu dipaparkan pada halaman dan perlu dikaitkan Data dalam jadual lain, dan syarat pertanyaan pada halaman keperluan produk mempunyai sebanyak 20 syarat Pada akhirnya, fungsi ini tersekat sehingga pada dasarnya mustahil untuk mencari data.
Akhir sekali, kami merancang untuk menyimpan data kedua-dua jadual ini dalam MongoDB pada masa yang sama untuk meningkatkan kecekapan pertanyaan.
Apabila menyegerak pada mulanya, satu utas digunakan untuk menyegerakkan data kedua-dua jadual ini dalam mod paging Hasilnya... hanya 300,000 data disegerakkan dalam satu malam, yang sangat perlahan! ! !
Akhirnya, selepas beberapa pengubahsuaian, 3 juta+ data berjaya disegerakkan dalam masa 2 jam.
Berikut adalah logik utama.
Sila tetapkan bilangan utas mengikut prestasi pelayan anda sendiri.
Idea
Mula-mula ketahui jumlah keputusan dalam keputusan yang ditetapkan melalui kiraan, tetapkan bilangan pertanyaan halaman untuk setiap utas, dapatkan bilangan utas melalui jumlah nombor dan nombor tunggal , dan tukar Subskrip had melaksanakan pertanyaan kelompok.
Pelaksanaan kod
package com.github.admin.controller.loans; import com.baomidou.mybatisplus.mapper.EntityWrapper; import com.github.admin.model.entity.CaseCheckCallRecord; import com.github.admin.model.entity.duyan.DuyanCallRecordDetail; import com.github.admin.model.entity.loans.CaseCallRemarkRecord; import com.github.admin.service.duyan.DuyanCallRecordDetailService; import com.github.admin.service.loans.CaseCallRemarkRecordService; import com.github.common.constant.MongodbConstant; import com.github.common.util.DingDingMsgSendUtils; import com.github.common.util.ListUtils; import com.github.common.util.Response; import com.github.common.util.concurrent.Executors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; /** * 多线程同步历史数据 * @author songfayuan * @date 2019-09-26 15:38 */ @Slf4j @RestController @RequestMapping("/demo") public class SynchronizeHistoricalDataController implements DisposableBean { private ExecutorService executor = Executors.newFixedThreadPool(10, "SynchronizeHistoricalDataController"); //newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。 @Value("${spring.profiles.active}") private String profile; @Autowired private DuyanCallRecordDetailService duyanCallRecordDetailService; @Autowired private MongoTemplate mongoTemplate; @Autowired private CaseCallRemarkRecordService caseCallRemarkRecordService; /** * 多线程同步通话记录历史数据 * @param params * @return * @throws Exception */ @GetMapping("/syncHistoryData") public Response syncHistoryData(Map<String, Object> params) throws Exception { executor.execute(new Runnable() { @Override public void run() { try { logicHandler(params); } catch (Exception e) { log.warn("多线程同步稽查通话记录历史数据才处理异常,errMsg = {}", e); DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,多线程同步稽查通话记录历史数据才处理异常,errMsg = "+e); } } }); return Response.success("请求成功"); } /** * 处理数据逻辑 * @param params * @throws Exception */ private void logicHandler(Map<String, Object> params) throws Exception { /******返回结果:多线程处理完的最终数据******/ List<DuyanCallRecordDetail> result = new ArrayList<>(); /******查询数据库总的数据条数******/ int count = this.duyanCallRecordDetailService.selectCount(new EntityWrapper<DuyanCallRecordDetail>() .eq("is_delete", 0) .eq("platform_type", 1)); DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,本次需要同步" + count + "条历史稽查通话记录数据。"); // int count = 2620266; /******限制每次查询的条数******/ int num = 1000; /******计算需要查询的次数******/ int times = count / num; if (count % num != 0) { times = times + 1; } /******每个线程开始查询的行数******/ int offset = 0; /******添加任务******/ List<Callable<List<DuyanCallRecordDetail>>> tasks = new ArrayList<>(); for (int i = 0; i < times; i++) { Callable<List<DuyanCallRecordDetail>> qfe = new ThredQuery(duyanCallRecordDetailService, params, offset, num); tasks.add(qfe); offset = offset + num; } /******为避免太多任务的最终数据全部存在list导致内存溢出,故将任务再次拆分单独处理******/ List<List<Callable<List<DuyanCallRecordDetail>>>> smallList = ListUtils.partition(tasks, 10); for (List<Callable<List<DuyanCallRecordDetail>>> callableList : smallList) { if (CollectionUtils.isNotEmpty(callableList)) { // executor.execute(new Runnable() { // @Override // public void run() { // log.info("任务拆分执行开始:线程{}拆分处理开始...", Thread.currentThread().getName()); // // log.info("任务拆分执行结束:线程{}拆分处理开始...", Thread.currentThread().getName()); // } // }); try { List<Future<List<DuyanCallRecordDetail>>> futures = executor.invokeAll(callableList); /******处理线程返回结果******/ if (futures != null && futures.size() > 0) { for (Future<List<DuyanCallRecordDetail>> future : futures) { List<DuyanCallRecordDetail> duyanCallRecordDetailList = future.get(); if (CollectionUtils.isNotEmpty(duyanCallRecordDetailList)){ executor.execute(new Runnable() { @Override public void run() { /******异步存储******/ log.info("异步存储MongoDB开始:线程{}拆分处理开始...", Thread.currentThread().getName()); saveMongoDB(duyanCallRecordDetailList); log.info("异步存储MongoDB结束:线程{}拆分处理开始...", Thread.currentThread().getName()); } }); } //result.addAll(future.get()); } } } catch (Exception e) { log.warn("任务拆分执行异常,errMsg = {}", e); DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,任务拆分执行异常,errMsg = "+e); } } } } /** * 数据存储MongoDB * @param duyanCallRecordDetailList */ private void saveMongoDB(List<DuyanCallRecordDetail> duyanCallRecordDetailList) { for (DuyanCallRecordDetail duyanCallRecordDetail : duyanCallRecordDetailList) { /******重复数据不同步MongoDB******/ org.springframework.data.mongodb.core.query.Query query = new org.springframework.data.mongodb.core.query.Query(); query.addCriteria(Criteria.where("callUuid").is(duyanCallRecordDetail.getCallUuid())); List<CaseCheckCallRecord> caseCheckCallRecordList = mongoTemplate.find(query, CaseCheckCallRecord.class, MongodbConstant.CASE_CHECK_CALL_RECORD); if (CollectionUtils.isNotEmpty(caseCheckCallRecordList)) { log.warn("call_uuid = {}在MongoDB已经存在数据,后面数据将被舍弃...", duyanCallRecordDetail.getCallUuid()); continue; } /******关联填写的记录******/ CaseCallRemarkRecord caseCallRemarkRecord = this.caseCallRemarkRecordService.selectOne(new EntityWrapper<CaseCallRemarkRecord>() .eq("is_delete", 0) .eq("call_uuid", duyanCallRecordDetail.getCallUuid())); CaseCheckCallRecord caseCheckCallRecord = new CaseCheckCallRecord(); BeanUtils.copyProperties(duyanCallRecordDetail, caseCheckCallRecord); //补充 caseCheckCallRecord.setCollectorUserId(duyanCallRecordDetail.getUserId()); if (caseCallRemarkRecord != null) { //补充 caseCheckCallRecord.setCalleeName(caseCallRemarkRecord.getContactName()); } log.info("正在存储数据到MongoDB:{}", caseCheckCallRecord.toString()); this.mongoTemplate.save(caseCheckCallRecord, MongodbConstant.CASE_CHECK_CALL_RECORD); } } @Override public void destroy() throws Exception { executor.shutdown(); } } class ThredQuery implements Callable<List<DuyanCallRecordDetail>> { /******需要通过构造方法把对应的业务service传进来 实际用的时候把类型变为对应的类型******/ private DuyanCallRecordDetailService myService; /******查询条件 根据条件来定义该类的属性******/ private Map<String, Object> params; /******分页index******/ private int offset; /******数量******/ private int num; public ThredQuery(DuyanCallRecordDetailService myService, Map<String, Object> params, int offset, int num) { this.myService = myService; this.params = params; this.offset = offset; this.num = num; } @Override public List<DuyanCallRecordDetail> call() throws Exception { /******通过service查询得到对应结果******/ List<DuyanCallRecordDetail> duyanCallRecordDetailList = myService.selectList(new EntityWrapper<DuyanCallRecordDetail>() .eq("is_delete", 0) .eq("platform_type", 1) .last("limit "+offset+", "+num)); return duyanCallRecordDetailList; } }
Alat ListUtils
package com.github.common.util; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import java.io.*; import java.util.ArrayList; import java.util.List; /** * 描述:List工具类 * @author songfayuan * 2018年7月22日下午2:23:22 */ @Slf4j public class ListUtils { /** * 描述:list集合深拷贝 * @param src * @return * @throws IOException * @throws ClassNotFoundException * @author songfayuan * 2018年7月22日下午2:35:23 */ public static <T> List<T> deepCopy(List<T> src) { try { ByteArrayOutputStream byteout = new ByteArrayOutputStream(); ObjectOutputStream out = new ObjectOutputStream(byteout); out.writeObject(src); ByteArrayInputStream bytein = new ByteArrayInputStream(byteout.toByteArray()); ObjectInputStream in = new ObjectInputStream(bytein); @SuppressWarnings("unchecked") List<T> dest = (List<T>) in.readObject(); return dest; } catch (ClassNotFoundException e) { e.printStackTrace(); return null; } catch (IOException e) { e.printStackTrace(); return null; } } /** * 描述:对象深拷贝 * @param src * @return * @throws IOException * @throws ClassNotFoundException * @author songfayuan * 2018年12月14日 */ public static <T> T objDeepCopy(T src) { try { ByteArrayOutputStream byteout = new ByteArrayOutputStream(); ObjectOutputStream out = new ObjectOutputStream(byteout); out.writeObject(src); ByteArrayInputStream bytein = new ByteArrayInputStream(byteout.toByteArray()); ObjectInputStream in = new ObjectInputStream(bytein); @SuppressWarnings("unchecked") T dest = (T) in.readObject(); return dest; } catch (ClassNotFoundException e) { log.error("errMsg = {}", e); return null; } catch (IOException e) { log.error("errMsg = {}", e); return null; } } /** * 将一个list均分成n个list,主要通过偏移量来实现的 * @author songfayuan * 2018年12月14日 */ public static <T> List<List<T>> averageAssign(List<T> source, int n) { List<List<T>> result = new ArrayList<List<T>>(); int remaider = source.size() % n; //(先计算出余数) int number = source.size() / n; //然后是商 int offset = 0;//偏移量 for (int i = 0; i < n; i++) { List<T> value = null; if (remaider > 0) { value = source.subList(i * number + offset, (i + 1) * number + offset + 1); remaider--; offset++; } else { value = source.subList(i * number + offset, (i + 1) * number + offset); } result.add(value); } return result; } /** * List按指定长度分割 * @param list the list to return consecutive sublists of (需要分隔的list) * @param size the desired size of each sublist (the last may be smaller) (分隔的长度) * @author songfayuan * @date 2019-07-07 21:37 */ public static <T> List<List<T>> partition(List<T> list, int size){ return Lists.partition(list, size); // 使用guava } /** * 测试 * @param args */ public static void main(String[] args) { List<Integer> bigList = new ArrayList<>(); for (int i = 0; i < 101; i++){ bigList.add(i); } log.info("bigList长度为:{}", bigList.size()); log.info("bigList为:{}", bigList); List<List<Integer>> smallists = partition(bigList, 20); log.info("smallists长度为:{}", smallists.size()); for (List<Integer> smallist : smallists) { log.info("拆分结果:{},长度为:{}", smallist, smallist.size()); } } }
Atas ialah kandungan terperinci Bagaimana untuk melaksanakan penyegerakan data berbilang benang dan kumpulan besar dalam Java. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Alat AI Hot

Undresser.AI Undress
Apl berkuasa AI untuk mencipta foto bogel yang realistik

AI Clothes Remover
Alat AI dalam talian untuk mengeluarkan pakaian daripada foto.

Undress AI Tool
Gambar buka pakaian secara percuma

Clothoff.io
Penyingkiran pakaian AI

AI Hentai Generator
Menjana ai hentai secara percuma.

Artikel Panas

Alat panas

Notepad++7.3.1
Editor kod yang mudah digunakan dan percuma

SublimeText3 versi Cina
Versi Cina, sangat mudah digunakan

Hantar Studio 13.0.1
Persekitaran pembangunan bersepadu PHP yang berkuasa

Dreamweaver CS6
Alat pembangunan web visual

SublimeText3 versi Mac
Perisian penyuntingan kod peringkat Tuhan (SublimeText3)

Topik panas



Panduan Nombor Sempurna di Jawa. Di sini kita membincangkan Definisi, Bagaimana untuk menyemak nombor Perfect dalam Java?, contoh dengan pelaksanaan kod.

Panduan untuk Weka di Jawa. Di sini kita membincangkan Pengenalan, cara menggunakan weka java, jenis platform, dan kelebihan dengan contoh.

Panduan untuk Nombor Smith di Jawa. Di sini kita membincangkan Definisi, Bagaimana untuk menyemak nombor smith di Jawa? contoh dengan pelaksanaan kod.

Dalam artikel ini, kami telah menyimpan Soalan Temuduga Spring Java yang paling banyak ditanya dengan jawapan terperinci mereka. Supaya anda boleh memecahkan temuduga.

Java 8 memperkenalkan API Stream, menyediakan cara yang kuat dan ekspresif untuk memproses koleksi data. Walau bagaimanapun, soalan biasa apabila menggunakan aliran adalah: bagaimana untuk memecahkan atau kembali dari operasi foreach? Gelung tradisional membolehkan gangguan awal atau pulangan, tetapi kaedah Foreach Stream tidak menyokong secara langsung kaedah ini. Artikel ini akan menerangkan sebab -sebab dan meneroka kaedah alternatif untuk melaksanakan penamatan pramatang dalam sistem pemprosesan aliran. Bacaan Lanjut: Penambahbaikan API Java Stream Memahami aliran aliran Kaedah Foreach adalah operasi terminal yang melakukan satu operasi pada setiap elemen dalam aliran. Niat reka bentuknya adalah

Panduan untuk TimeStamp to Date di Java. Di sini kita juga membincangkan pengenalan dan cara menukar cap waktu kepada tarikh dalam java bersama-sama dengan contoh.

Kapsul adalah angka geometri tiga dimensi, terdiri daripada silinder dan hemisfera di kedua-dua hujungnya. Jumlah kapsul boleh dikira dengan menambahkan isipadu silinder dan jumlah hemisfera di kedua -dua hujungnya. Tutorial ini akan membincangkan cara mengira jumlah kapsul yang diberikan dalam Java menggunakan kaedah yang berbeza. Formula volum kapsul Formula untuk jumlah kapsul adalah seperti berikut: Kelantangan kapsul = isipadu isipadu silinder Dua jumlah hemisfera dalam, R: Radius hemisfera. H: Ketinggian silinder (tidak termasuk hemisfera). Contoh 1 masukkan Jejari = 5 unit Ketinggian = 10 unit Output Jilid = 1570.8 Unit padu menjelaskan Kirakan kelantangan menggunakan formula: Kelantangan = π × r2 × h (4

Spring Boot memudahkan penciptaan aplikasi Java yang mantap, berskala, dan siap pengeluaran, merevolusi pembangunan Java. Pendekatan "Konvensyen Lebih Konfigurasi", yang wujud pada ekosistem musim bunga, meminimumkan persediaan manual, Allo
