Der Test hier besteht hauptsächlich darin, das aus Dataworks abgerufene Skript aufzurufen und lokal zu speichern. Das Skript besteht aus zwei Teilen Die Abfrage umfasst jeweils 100 Zeitstreifen. Wenn wir das Skript abrufen, müssen wir es auf mehreren Seiten abfragen
<properties> <java.version>1.8</java.version> <!--maxCompute-sdk-版本号--> <max-compute-sdk.version>0.40.8-public</max-compute-sdk.version> <!--maxCompute-jdbc-版本号--> <max-compute-jdbc.version>3.0.1</max-compute-jdbc.version> <!--dataworks版本号--> <dataworks-sdk.version>3.4.2</dataworks-sdk.version> <aliyun-java-sdk.version>4.5.20</aliyun-java-sdk.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <!--max compute sdk--> <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>odps-sdk-core</artifactId> <version>${max-compute-sdk.version}</version> </dependency> <!--max compute jdbc--> <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>odps-jdbc</artifactId> <version>${max-compute-jdbc.version}</version> <classifier>jar-with-dependencies</classifier> </dependency> <!--dataworks需要引入aliyun-sdk和dataworks本身--> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>${aliyun-java-sdk.version}</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-dataworks-public</artifactId> <version>${dataworks-sdk.version}</version> </dependency> </dependencies>
/** * @Description * @Author itdl * @Date 2022/08/09 15:12 */ @Data public class DataWorksOpenApiConnParam { /** * 区域 eg. cn-shanghai */ private String region; /** * 访问keyId */ private String aliyunAccessId; /** * 密钥 */ private String aliyunAccessKey; /** * 访问端点 就是API的URL前缀 */ private String endPoint; /** * 数据库类型 如odps */ private String datasourceType; /** * 所属项目 */ private String project; /** * 项目环境 dev prod */ private String projectEnv; }
/** * @Description * @Author itdl * @Date 2022/08/09 15:12 */ @Data public class DataWorksOpenApiConnParam { /** * 区域 eg. cn-shanghai */ private String region; /** * 访问keyId */ private String aliyunAccessId; /** * 密钥 */ private String aliyunAccessKey; /** * 访问端点 就是API的URL前缀 */ private String endPoint; /** * 数据库类型 如odps */ private String datasourceType; /** * 所属项目 */ private String project; /** * 项目环境 dev prod */ private String projectEnv; }
private static final String MAX_COMPUTE_JDBC_URL_FORMAT = "http://service.%s.maxcompute.aliyun.com/api"; /**默认的odps接口地址 在Odps中也可以看到该变量*/ private static final String defaultEndpoint = "http://service.odps.aliyun.com/api"; /** * dataworks连接参数 * */ private final DataWorksOpenApiConnParam connParam; /** * 可以使用dataworks去连接maxCompute 如果连接的引擎是maxCompute的话 */ private final MaxComputeJdbcUtil maxComputeJdbcUtil; private final MaxComputeSdkUtil maxComputeSdkUtil; private final boolean odpsSdk; /** * 客户端 */ private final IAcsClient client; public DataWorksOpenApiUtil(DataWorksOpenApiConnParam connParam, boolean odpsSdk) { this.connParam = connParam; this.client = buildClient(); this.odpsSdk = odpsSdk; if (odpsSdk){ this.maxComputeJdbcUtil = null; this.maxComputeSdkUtil = buildMaxComputeSdkUtil(); }else { this.maxComputeJdbcUtil = buildMaxComputeJdbcUtil(); this.maxComputeSdkUtil = null; } } private MaxComputeSdkUtil buildMaxComputeSdkUtil() { final MaxComputeSdkConnParam param = new MaxComputeSdkConnParam(); // 设置账号密码 param.setAliyunAccessId(connParam.getAliyunAccessId()); param.setAliyunAccessKey(connParam.getAliyunAccessKey()); // 设置endpoint param.setMaxComputeEndpoint(defaultEndpoint); // 目前只处理odps的引擎 final String datasourceType = connParam.getDatasourceType(); if (!"odps".equals(datasourceType)){ throw new BizException(ResultCode.DATA_WORKS_ENGINE_SUPPORT_ERR); } // 获取项目环境,根据项目环境连接不同的maxCompute final String projectEnv = connParam.getProjectEnv(); if ("dev".equals(projectEnv)){ // 开发环境dataworks + _dev就是maxCompute的项目名 param.setProjectName(String.join("_", connParam.getProject(), projectEnv)); }else { // 生产环境dataworks的项目名和maxCompute一致 param.setProjectName(connParam.getProject()); } return new MaxComputeSdkUtil(param); } private MaxComputeJdbcUtil buildMaxComputeJdbcUtil() { final MaxComputeJdbcConnParam param = new MaxComputeJdbcConnParam(); // 设置账号密码 param.setAliyunAccessId(connParam.getAliyunAccessId()); param.setAliyunAccessKey(connParam.getAliyunAccessKey()); // 设置endpoint param.setEndpoint(String.format(MAX_COMPUTE_JDBC_URL_FORMAT, connParam.getRegion())); // 目前只处理odps的引擎 final String datasourceType = connParam.getDatasourceType(); if (!"odps".equals(datasourceType)){ throw new BizException(ResultCode.DATA_WORKS_ENGINE_SUPPORT_ERR); } // 获取项目环境,根据项目环境连接不同的maxCompute final String projectEnv = connParam.getProjectEnv(); if ("dev".equals(projectEnv)){ // 开发环境dataworks + _dev就是maxCompute的项目名 param.setProjectName(String.join("_", connParam.getProject(), projectEnv)); }else { // 生产环境dataworks的项目名和maxCompute一致 param.setProjectName(connParam.getProject()); } return new MaxComputeJdbcUtil(param); }
/** * 根据文件夹路径分页查询该路径下的文件(脚本) * @param pageSize 每页查询多少数据 * @param folderPath 文件所在目录 * @param userType 文件所属功能模块 可不传 * @param fileTypes 设置文件代码类型 逗号分割 可不传 */ public void listAllFiles(Integer pageSize, String folderPath, String userType, String fileTypes, CallBack.FileCallBack callBack) throws ClientException { pageSize = setPageSize(pageSize); // 创建请求 final ListFilesRequest request = new ListFilesRequest(); // 设置分页参数 request.setPageNumber(1); request.setPageSize(pageSize); // 设置上级文件夹 request.setFileFolderPath(folderPath); // 设置区域和项目名称 request.setSysRegionId(connParam.getRegion()); request.setProjectIdentifier(connParam.getProject()); // 设置文件所属功能模块 if (!ObjectUtils.isEmpty(userType)){ request.setUseType(userType); } // 设置文件代码类型 if (!ObjectUtils.isEmpty(fileTypes)){ request.setFileTypes(fileTypes); } // 发起请求 ListFilesResponse res = client.getAcsResponse(request); // 获取分页总数 final Integer totalCount = res.getData().getTotalCount(); // 返回结果 final List<ListFilesResponse.Data.File> resultList = res.getData().getFiles(); // 计算能分几页 long pages = totalCount % pageSize == 0 ? (totalCount / pageSize) : (totalCount / pageSize) + 1; // 只有1页 直接返回 if (pages <= 1){ callBack.handle(resultList); return; } // 第一页执行回调 callBack.handle(resultList); // 分页数据 从第二页开始查询 同步拉取,可以优化为多线程拉取 for (int i = 2; i <= pages; i++) { //第1页 request.setPageNumber(i); //每页大小 request.setPageSize(pageSize); // 发起请求 res = client.getAcsResponse(request); final List<ListFilesResponse.Data.File> tableEntityList = res.getData().getFiles(); if (!ObjectUtils.isEmpty(tableEntityList)){ // 执行回调函数 callBack.handle(tableEntityList); } } }
test_001-Skript
test_002-Skripttest_005-Skript
============================ ===========Tabellenname: test_abc_info Der Inhalt ist wie folgt:
TABELLE ERSTELLEN, WENN NICHT EXISTIERT test_abc_info
(test_abc1 STRING COMMENT 'Feld 1',
test_abc2 STRING COMMENT 'Feld 2',test_abc3 STRING KOMMENTAR 'Feld 3',
test_abc 4 STRING KOMMENTAR 'Feld 4',)PARTITIONIERT DURCH (p_date STRING COMMENT 'Datendatum'
)
;
=== ======================== ===========
Von der Ziel-VM getrennt, Adresse: '127.0.0.1: 59509', Transport: 'socket'
Das obige ist der detaillierte Inhalt vonWie SpringBoot Dataworks integriert. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!