最近的一項開發任務是同步第三方數據,而第三方數據一般有存量數據和增量數據,存量數據有100w 。在得知此需求時,進行了一定的資訊檢索和工具學習,提前獲取存量資料到目標庫,再使用kettle進行存量資料轉換;增量資料則根據業務方規定的請求時間,透過定時任務去獲取增量資料並進行資料轉換。在資料取得和轉換時,我們應該要記錄每一次的請求信息,以便於溯源和資料對帳!!!
2.1 遞歸方式
使用遞歸方式時,要求資料量少,否則會出現堆疊溢出或堆疊溢出!!!並且遞歸方式是單線程,所以會導致同步速度很慢!!!
/** * 数据同步 - 递归方式 * 此处存量数据只需要请求到数据并保存数据库即可,后期通过kettle进行转换。 * Data为自定义实体类,这里仅做示例!!! */ private void fetchAndSaveDB(int pageIndex, int pageSize) throws Exception { log.info("【数据同步 - 存量】,第{}次同步,", pageIndex); List<Data> datas= getDataByPage(pageIndex,pageSize); if (CollectionUtils.isNotEmpty(datas)) { dataService.saveOrUpdateBatch(datas); log.info("【数据同步 - 存量】,第{}次同步,同步成功", pageIndex); if (datas.size() < pageSize) { log.info("【数据同步 - 存量】,第{}次同步,获取数据小于每页获取条数,证明已全部同步完毕!!!", pageIndex); return; } // 递归操作-直到数据同步完毕 fetchAndSaveDB(pageIndex + 1, pageSize); } else { log.info("【数据同步 - 存量】,第{}次同步,获取数据为空,证明已全部同步完毕!!!", pageIndex); return; } } /** * 获取分页数据,Data为自定义实体类,这里仅做示例!!! */ private List<Data> getDataByPage(int pageIndex, int pageSize) throws Exception { //通过feign调用第三方接口获取数据 String data = dataFeignService.fetchAllData(pageSize, pageIndex); JSONObject jsonObject = JSONObject.parseObject(data); JSONArray datalist = jsonObject.getJSONArray("datalist"); List<Data> datas = datalist.toJavaList(Data.class); return datas; }
2.2 多線程方式
由於遞歸方式是單線程,考慮到資料的龐大,且易造成記憶體溢出,因此將遞歸更換成多線程方式,不僅避免了記憶體溢出的情況,且速度大大的提升!!!
public void synAllData() { // 定义原子变量 - 页数 AtomicInteger pageIndex = new AtomicInteger(0); // 创建线程池 ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10); // 100万数据 int total = 1000000;//数据总量 int times = total / 1000; if (total % 1000!= 0) { times = times + 1; } LocalDateTime beginLocalDateTime = LocalDateTime.now(); log.info("【数据同步 - 存量】开始同步时间:{}", beginLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); for (int index = 1; index <= times; index++) { fixedThreadPool.submit(new Runnable() { @Override public void run() { try { multiFetchAndSaveDB(pageIndex.incrementAndGet(), 1000); } catch (Exception e) { log.error("并发获取并保存数据异常:{}", e); } } }); } LocalDateTime endLocalDateTime = LocalDateTime.now(); log.info("【数据同步 - 存量】同步结束时间:{},总共耗时:{}分钟", endLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), Duration.between(beginLocalDateTime, endLocalDateTime).toMinutes()); } /** * 数据同步 - 【多线程方式】 * * @throws Exception */ private void multiFetchAndSaveDB(int pageIndex, int pageSize) throws Exception { log.info("【数据同步 - 存量】,第{}次同步,", pageIndex); List<Data> datas= getDataByPage(pageIndex, pageSize);//getDataByPage()同上2.1 if (CollectionUtils.isNotEmpty(datas)) { log.info("【数据同步 - 存量】,第{}次同步,同步成功", pageIndex); if (datas.size() < pageSize) { log.info("【数据同步 - 存量】,第{}次同步,获取数据小于每页获取条数,证明已全部同步完毕!!!", pageIndex); return; } } else { log.info("【数据同步 - 存量】,第{}次同步,获取数据为空,证明已全部同步完毕!!!", pageIndex); return; } }
增量資料需要寫定時任務,可使用Scheduled註解,並需要將增量資料存放到目標庫且進行資料轉換!
以上是怎麼使用Java多執行緒實現第三方資料同步的詳細內容。更多資訊請關注PHP中文網其他相關文章!