怎么使用SpringBoot定时任务实现数据同步
前言
业务的需求是,通过中台调用api接口获得,设备数据,要求现实设备数据的同步。
方案一:通过轮询接口的方式执行 pullData() 方法实现数据同步
该方式的原理是先清空之前的所有数据,然后重新插入通过api调用获取的最新数据。该方法的优点,逻辑简单。缺点是,频繁删除、插入数据。再调用查询数据时候,某一时刻,数据全部删除,还没及时插入的时候。数据可能有异常。
方案二:通过轮询接口的方式执行 pullDataNew() 方法实现数据同步
该方式的原理是先查询数据库,已有数据,然后和通过api调用获取的最新数据进行比对,找出数据中增量、减量和变量,进行同步更新。该方法的优点,减少对数据库的频繁操作,提升性能。缺点:无发现明显缺点。
package com.hxtx.spacedata.task; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.google.api.client.util.Lists; import com.hxtx.spacedata.common.domain.ResponseDTO; import com.hxtx.spacedata.config.SpringContextUtil; import com.hxtx.spacedata.controller.file.FilesMinioController; import com.hxtx.spacedata.domain.entity.entityconfig.EntityPointEntity; import com.hxtx.spacedata.service.entityconfig.EntityPointService; import com.hxtx.spacedata.util.HttpProxyUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; /** * 中台设备数据 定时任务执行 * * @author Tarzan Liu * @version 1.0.0 * @description * @date 2020/12/07 */ @Component @Slf4j public class EntityPointTask { @Autowired private EntityPointService entityPointService; @Value("${middleGround.server.host}") private String host; @Value("${middleGround.server.port}") private String port; private static FilesMinioController filesMinioController = SpringContextUtil.getBean(FilesMinioController.class); /** * 设备定义点数据拉取 * * @author tarzan Liu * @date 2020/12/2 */ @Scheduled(cron = "0/30 * * * * ?") // 30秒校验一次 public void pullDataTaskByCorn() { String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/system/list"); JSONObject jsonObject = JSON.parseObject(result); if (Objects.nonNull(jsonObject)) { JSONArray array = jsonObject.getJSONArray("data"); if (array != null && array.size() != 0) { for (int i = 0; i < array.size(); i++) { JSONObject obj = array.getJSONObject(i); String systemId = obj.getString("id"); pullDataNew(systemId); } } } } @Transactional(rollbackFor = Throwable.class) public ResponseDTO<String> pullData(String code) { List<EntityPointEntity> list = Lists.newArrayList(); String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/defintionView/listBySystemId/" + code); JSONObject jsonObject = JSON.parseObject(result); if (Objects.nonNull(jsonObject)) { JSONArray array = jsonObject.getJSONArray("data"); if (array != null && array.size() != 0) { for (int i = 0; i < array.size(); i++) { JSONObject obj = array.getJSONObject(i); String pointId = obj.getString("pointId"); String name = obj.getString("name"); list.add(EntityPointEntity.builder().pointId(pointId).name(name).code(code).build()); } List<EntityPointEntity> existList = entityPointService.list(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code).isNotNull(EntityPointEntity::getValue)); if (CollectionUtils.isNotEmpty(existList)) { Map<String, String> existMap = existList.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getValue)); list.forEach(e -> { String value = existMap.get(e.getPointId()); if (value != null) { e.setValue(value); } }); } entityPointService.remove(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code)); entityPointService.saveBatch(list); } } return ResponseDTO.succ(); } @Transactional(rollbackFor = Throwable.class) public ResponseDTO<String> pullDataNew(String code) { String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/defintionView/listBySystemId/" + code); JSONObject jsonObject = JSON.parseObject(result); if (Objects.nonNull(jsonObject)) { JSONArray data = jsonObject.getJSONArray("data"); List<EntityPointEntity> list = data.toJavaList(EntityPointEntity.class); if (CollectionUtils.isNotEmpty(list)) { list.forEach(e -> e.setCode(code)); List<EntityPointEntity> existList = entityPointService.list(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code)); if (CollectionUtils.isNotEmpty(existList)) { //存在map Map<String, String> existMap = existList.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getName)); //传输map Map<String, String> dataMap = list.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getName)); //增量 List<EntityPointEntity> increment = list.stream().filter(e -> existMap.get(e.getPointId()) == null).collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(increment)) { entityPointService.saveBatch(increment); } //减量 List<EntityPointEntity> decrement = existList.stream().filter(e -> dataMap.get(e.getPointId()) == null).collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(decrement)) { entityPointService.removeByIds(decrement.stream().map(EntityPointEntity::getId).collect(Collectors.toList())); } //变量 List<EntityPointEntity> variable = existList.stream().filter(e -> dataMap.get(e.getPointId()) != null && !dataMap.get(e.getPointId()).equals(e.getName())).collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(variable)) { variable.forEach(e -> { e.setName(dataMap.get(e.getPointId())); }); entityPointService.updateBatchById(variable); } } else { entityPointService.saveBatch(list); } } } return ResponseDTO.succ(); } }
数据库对应实体类
import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; import java.util.Date; @Builder @NoArgsConstructor @AllArgsConstructor @Data @TableName(value = "t_entity_point") public class EntityPointEntity implements Serializable { private static final long serialVersionUID = 2181036545424452651L; /** * 定义点id */ @TableId(value = "id", type = IdType.ASSIGN_ID) private Long id; /** * 定义点id */ private String pointId; /** * 名称 */ private String name; /** * 绘制数据 */ private String value; /** * 编码 */ private String code; /** * 创建时间 */ private Date createTime; }
HTTP请求代理工具类
import lombok.extern.slf4j.Slf4j; import org.apache.http.Consts; import org.apache.http.HttpEntity; import org.apache.http.HttpStatus; import org.apache.http.NameValuePair; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.conn.ssl.TrustStrategy; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.message.BasicNameValuePair; import org.apache.http.ssl.SSLContextBuilder; import org.apache.http.util.EntityUtils; import javax.net.ssl.SSLContext; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.URL; import java.net.URLConnection; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.List; import java.util.Map; /** * HTTP请求代理类 * * @author tarzan Liu * @description 发送Get Post请求 */ @Slf4j public class HttpProxyUtil { /** * 使用URLConnection进行GET请求 * * @param api_url * @return */ public static String sendGet(String api_url) { return sendGet(api_url, "", "utf-8"); } /** * 使用URLConnection进行GET请求 * * @param api_url * @param param * @return */ public static String sendGet(String api_url, String param) { return sendGet(api_url, param, "utf-8"); } /** * 使用URLConnection进行GET请求 * * @param api_url 请求路径 * @param param 请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值, 可以为空 * @param charset 字符集 * @return */ public static String sendGet(String api_url, String param, String charset) { StringBuffer buffer = new StringBuffer(); try { // 判断有无参数,若是拼接好的url,就不必再拼接了 if (param != null && !"".equals(param)) { api_url = api_url + "?" + param; } log.info("请求的路径是:" + api_url); URL realUrl = new URL(api_url); // 打开联接 URLConnection conn = realUrl.openConnection(); // 设置通用的请求属性 conn.setRequestProperty("accept", "*/*"); conn.setRequestProperty("connection", "Keep-Alive"); conn.setRequestProperty("user-agent", "Mozilla/4.0(compatible; MSIE 6.0; Windows NT 5.1; SV1)"); conn.setConnectTimeout(12000); //设置连接主机超时(单位:毫秒) conn.setReadTimeout(12000); // 设置从主机读取数据超时(单位:毫秒) conn.connect(); // 建立实际的联接 // 定义 BufferedReader输入流来读取URL的相应 try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), charset))) { String line; while ((line = in.readLine()) != null) { // buffer.append("\n"+line); buffer.append(line); } } } catch (Exception e) { log.error("发送GET请求出现异常! " + e.getMessage()); return null; } // log.info("响应返回数据:" + buffer.toString()); return buffer.toString(); } /** * 使用URLConnection进行POST请求 * * @param api_url 请求路径 * @param param 请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值,最好不为空 * @return */ public static String sendPost(String api_url, String param) { return sendPost(api_url, param, "utf-8"); } /** * 使用URLConnection进行POST请求 * * @param api_url 请求路径 * @param param 请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值,最好不为空 * @param charset 字符集 * @return */ public static String sendPost(String api_url, String param, String charset) { StringBuffer buffer = new StringBuffer(); try { log.info("请求的路径是:" + api_url + ",参数是:" + param); URL realUrl = new URL(api_url); // 打开联接 URLConnection conn = realUrl.openConnection(); // 设置通用的请求属性 conn.setRequestProperty("accept", "*/*"); conn.setRequestProperty("connection", "Keep-Alive"); conn.setRequestProperty("user-agent", "Mozilla/4.0(compatible; MSIE 6.0; Windows NT 5.1; SV1)"); conn.setConnectTimeout(12000); //设置连接主机超时(单位:毫秒) conn.setReadTimeout(12000); // 设置从主机读取数据超时(单位:毫秒) // 发送POST请求必须设置如下两行 conn.setDoOutput(true); conn.setDoInput(true); // 获取URLConnection对象对应的输出流 try (PrintWriter out = new PrintWriter(conn.getOutputStream())) { out.print(param); // 发送请求参数 out.flush();// flush输出流的缓冲 } // 定义 BufferedReader输入流来读取URL的相应,得指明使用UTF-8编码,否则到API服务器XML的中文不能被成功识别 try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), charset))) { String line; while ((line = in.readLine()) != null) { // buffer.append("\n"+line); buffer.append(line); } } } catch (Exception e) { log.error("发送POST请求出现异常! " + e.getMessage()); e.printStackTrace(); } log.info("响应返回数据:" + buffer.toString()); return buffer.toString(); } public static CloseableHttpClient createSSLClientDefault() throws Exception { SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new AllTrustStrategy()).build(); SSLConnectionSocketFactory sslSf = new SSLConnectionSocketFactory(sslContext); return HttpClients.custom().setSSLSocketFactory(sslSf).build(); } // 加载证书 private static class AllTrustStrategy implements TrustStrategy { public boolean isTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { return true; } } /** * 支持https请求 * * @param url * @param param * @return * @throws Exception */ public static String sendHttpClientPost(String url, Map<String, String> param) throws Exception { CloseableHttpClient httpClient = createSSLClientDefault(); HttpPost httpPost = null; CloseableHttpResponse response = null; String result = ""; try { // 发起HTTP的POST请求 httpPost = new HttpPost(url); List<NameValuePair> paramList = new ArrayList<NameValuePair>(); for (String key : param.keySet()) { paramList.add(new BasicNameValuePair(key, param.get(key))); } log.info("http请求地址:" + url + ",参数:" + paramList.toString()); // UTF8+URL编码 httpPost.setEntity(new UrlEncodedFormEntity(paramList, Consts.UTF_8)); httpPost.setConfig(RequestConfig.custom().setConnectTimeout(30000).setSocketTimeout(30000).build()); response = httpClient.execute(httpPost); HttpEntity entity = response.getEntity(); int statusCode = response.getStatusLine().getStatusCode(); if (HttpStatus.SC_OK == statusCode) { // 如果响应码是200 } result = EntityUtils.toString(entity); log.info("状态码:" + statusCode + ",响应信息:" + result); } finally { if (response != null) { response.close(); } if (httpPost != null) { httpPost.releaseConnection(); } httpClient.close(); } return result; } }
以上是怎么使用SpringBoot定时任务实现数据同步的详细内容。更多信息请关注PHP中文网其他相关文章!

热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

Video Face Swap
使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

SublimeText3汉化版
中文版,非常好用

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

Jasypt介绍Jasypt是一个java库,它允许开发员以最少的努力为他/她的项目添加基本的加密功能,并且不需要对加密工作原理有深入的了解用于单向和双向加密的高安全性、基于标准的加密技术。加密密码,文本,数字,二进制文件...适合集成到基于Spring的应用程序中,开放API,用于任何JCE提供程序...添加如下依赖:com.github.ulisesbocchiojasypt-spring-boot-starter2.1.1Jasypt好处保护我们的系统安全,即使代码泄露,也可以保证数据源的

一、Redis实现分布式锁原理为什么需要分布式锁在聊分布式锁之前,有必要先解释一下,为什么需要分布式锁。与分布式锁相对就的是单机锁,我们在写多线程程序时,避免同时操作一个共享变量产生数据问题,通常会使用一把锁来互斥以保证共享变量的正确性,其使用范围是在同一个进程中。如果换做是多个进程,需要同时操作一个共享资源,如何互斥呢?现在的业务应用通常是微服务架构,这也意味着一个应用会部署多个进程,多个进程如果需要修改MySQL中的同一行记录,为了避免操作乱序导致脏数据,此时就需要引入分布式锁了。想要实现分

使用场景1、下单成功,30分钟未支付。支付超时,自动取消订单2、订单签收,签收后7天未进行评价。订单超时未评价,系统默认好评3、下单成功,商家5分钟未接单,订单取消4、配送超时,推送短信提醒……对于延时比较长的场景、实时性不高的场景,我们可以采用任务调度的方式定时轮询处理。如:xxl-job今天我们采

springboot读取文件,打成jar包后访问不到最新开发出现一种情况,springboot打成jar包后读取不到文件,原因是打包之后,文件的虚拟路径是无效的,只能通过流去读取。文件在resources下publicvoidtest(){Listnames=newArrayList();InputStreamReaderread=null;try{ClassPathResourceresource=newClassPathResource("name.txt");Input

在Springboot+Mybatis-plus不使用SQL语句进行多表添加操作我所遇到的问题准备工作在测试环境下模拟思维分解一下:创建出一个带有参数的BrandDTO对象模拟对后台传递参数我所遇到的问题我们都知道,在我们使用Mybatis-plus中进行多表操作是极其困难的,如果你不使用Mybatis-plus-join这一类的工具,你只能去配置对应的Mapper.xml文件,配置又臭又长的ResultMap,然后再去写对应的sql语句,这种方法虽然看上去很麻烦,但具有很高的灵活性,可以让我们

SpringBoot和SpringMVC都是Java开发中常用的框架,但它们之间有一些明显的差异。本文将探究这两个框架的特点和用途,并对它们的差异进行比较。首先,我们来了解一下SpringBoot。SpringBoot是由Pivotal团队开发的,它旨在简化基于Spring框架的应用程序的创建和部署。它提供了一种快速、轻量级的方式来构建独立的、可执行

1、自定义RedisTemplate1.1、RedisAPI默认序列化机制基于API的Redis缓存实现是使用RedisTemplate模板进行数据缓存操作的,这里打开RedisTemplate类,查看该类的源码信息publicclassRedisTemplateextendsRedisAccessorimplementsRedisOperations,BeanClassLoaderAware{//声明了key、value的各种序列化方式,初始值为空@NullableprivateRedisSe

在项目中,很多时候需要用到一些配置信息,这些信息在测试环境和生产环境下可能会有不同的配置,后面根据实际业务情况有可能还需要再做修改。我们不能将这些配置在代码中写死,最好是写到配置文件中,比如可以把这些信息写到application.yml文件中。那么,怎么在代码里获取或者使用这个地址呢?有2个方法。方法一:我们可以通过@Value注解的${key}即可获取配置文件(application.yml)中和key对应的value值,这个方法适用于微服务比较少的情形方法二:在实际项目中,遇到业务繁琐,逻
