Maison > Java > javaDidacticiel > le corps du texte

Comment SpringBoot intègre dataworks

PHPz
Libérer: 2023-05-14 16:01:06
avant
1276 Les gens l'ont consulté

    Notes

    Le test ici consiste principalement à appeler le script extrait de dataworks et à le stocker localement.
    Le script contient deux parties

    1. Le script odps développé (obtenu via OpenApi) 2. Le script d'instruction de table (connectez maxCompute via les informations dataworks pour obtenir l'instruction de création)

    Limite de requête de pagination openApi d'Alibaba Cloud Dataworks, la limite maximale la requête est de 100 à la fois. Lorsque nous extrayons le script, nous devons l'interroger sur plusieurs pages

    Ce projet utilise la connexion SDK/JDBC de MaxCompute, et SpringBoot exploite la connexion SDK/JDBC MaxCompute

    Implémentation de l'intégration

    L'implémentation implique principalement l'écriture de classes d'outils, qui peuvent être configuré comme SpringBean si nécessaire, il peut être utilisé après l'avoir injecté dans le conteneur

    Introduction aux dépendances

    <properties>
        <java.version>1.8</java.version>
        <!--maxCompute-sdk-版本号-->
        <max-compute-sdk.version>0.40.8-public</max-compute-sdk.version>
        <!--maxCompute-jdbc-版本号-->
        <max-compute-jdbc.version>3.0.1</max-compute-jdbc.version>
        <!--dataworks版本号-->
        <dataworks-sdk.version>3.4.2</dataworks-sdk.version>
        <aliyun-java-sdk.version>4.5.20</aliyun-java-sdk.version>
    </properties>
    <dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <!--max compute sdk-->
    <dependency>
        <groupId>com.aliyun.odps</groupId>
        <artifactId>odps-sdk-core</artifactId>
        <version>${max-compute-sdk.version}</version>
    </dependency>
    <!--max compute jdbc-->
    <dependency>
        <groupId>com.aliyun.odps</groupId>
        <artifactId>odps-jdbc</artifactId>
        <version>${max-compute-jdbc.version}</version>
        <classifier>jar-with-dependencies</classifier>
    </dependency>
    <!--dataworks需要引入aliyun-sdk和dataworks本身-->
    <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-core</artifactId>
        <version>${aliyun-java-sdk.version}</version>
    </dependency>
    <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
        <version>${dataworks-sdk.version}</version>
    </dependency>
    </dependencies>
    Copier après la connexion

    Demander l'écriture de la classe de paramètres

    /**
     * @Description
     * @Author itdl
     * @Date 2022/08/09 15:12
     */
    @Data
    public class DataWorksOpenApiConnParam {
        /**
         * 区域 eg. cn-shanghai
         */
        private String region;
    
        /**
         * 访问keyId
         */
        private String aliyunAccessId;
        /**
         * 密钥
         */
        private String aliyunAccessKey;
    
        /**
         * 访问端点  就是API的URL前缀
         */
        private String endPoint;
    
        /**
         * 数据库类型 如odps
         */
        private String datasourceType;
    
        /**
         * 所属项目
         */
        private String project;
    
        /**
         * 项目环境 dev  prod
         */
        private String projectEnv;
    }
    Copier après la connexion
    Copier après la connexion

    Écriture de la classe d'outils

    Préparation de base de la classe, la fonction de rappel après avoir extrait le script

    Pourquoi est la fonction de rappel nécessaire, car tous les scripts sont extraits, si les résultats de chaque pagination sont fusionnés, cela provoquera un débordement de mémoire, et l'utilisation de la fonction de rappel n'ajoute qu'une fonction de traitement pour chaque cycle

    /**
     * @Description
     * @Author itdl
     * @Date 2022/08/09 15:12
     */
    @Data
    public class DataWorksOpenApiConnParam {
        /**
         * 区域 eg. cn-shanghai
         */
        private String region;
    
        /**
         * 访问keyId
         */
        private String aliyunAccessId;
        /**
         * 密钥
         */
        private String aliyunAccessKey;
    
        /**
         * 访问端点  就是API的URL前缀
         */
        private String endPoint;
    
        /**
         * 数据库类型 如odps
         */
        private String datasourceType;
    
        /**
         * 所属项目
         */
        private String project;
    
        /**
         * 项目环境 dev  prod
         */
        private String projectEnv;
    }
    Copier après la connexion
    Copier après la connexion

    Opération d'initialisation

    Instancie principalement le informations client de l'interface openApi de Dataworks et initialise la classe d'outils pour la connexion maxCompute (y compris les méthodes JDBC et SDK)

    private static final String MAX_COMPUTE_JDBC_URL_FORMAT = "http://service.%s.maxcompute.aliyun.com/api";
    /**默认的odps接口地址 在Odps中也可以看到该变量*/
    private static final String defaultEndpoint = "http://service.odps.aliyun.com/api";
    /**
     * dataworks连接参数
     *
     */
    private final DataWorksOpenApiConnParam connParam;
    
    /**
     * 可以使用dataworks去连接maxCompute 如果连接的引擎是maxCompute的话
     */
    private final MaxComputeJdbcUtil maxComputeJdbcUtil;
    
    private final MaxComputeSdkUtil maxComputeSdkUtil;
    
    private final boolean odpsSdk;
    
    
    /**
     * 客户端
     */
    private final IAcsClient client;
    
    public DataWorksOpenApiUtil(DataWorksOpenApiConnParam connParam, boolean odpsSdk) {
        this.connParam = connParam;
        this.client = buildClient();
        this.odpsSdk = odpsSdk;
        if (odpsSdk){
            this.maxComputeJdbcUtil = null;
            this.maxComputeSdkUtil = buildMaxComputeSdkUtil();
        }else {
            this.maxComputeJdbcUtil = buildMaxComputeJdbcUtil();
            this.maxComputeSdkUtil = null;
        }
    }
    
    private MaxComputeSdkUtil buildMaxComputeSdkUtil() {
        final MaxComputeSdkConnParam param = new MaxComputeSdkConnParam();
    
        // 设置账号密码
        param.setAliyunAccessId(connParam.getAliyunAccessId());
        param.setAliyunAccessKey(connParam.getAliyunAccessKey());
    
        // 设置endpoint
        param.setMaxComputeEndpoint(defaultEndpoint);
    
        // 目前只处理odps的引擎
        final String datasourceType = connParam.getDatasourceType();
        if (!"odps".equals(datasourceType)){
            throw new BizException(ResultCode.DATA_WORKS_ENGINE_SUPPORT_ERR);
        }
    
        // 获取项目环境,根据项目环境连接不同的maxCompute
        final String projectEnv = connParam.getProjectEnv();
    
        if ("dev".equals(projectEnv)){
            // 开发环境dataworks + _dev就是maxCompute的项目名
            param.setProjectName(String.join("_", connParam.getProject(), projectEnv));
        }else {
            // 生产环境dataworks的项目名和maxCompute一致
            param.setProjectName(connParam.getProject());
        }
    
        return new MaxComputeSdkUtil(param);
    }
    
    private MaxComputeJdbcUtil buildMaxComputeJdbcUtil() {
        final MaxComputeJdbcConnParam param = new MaxComputeJdbcConnParam();
    
        // 设置账号密码
        param.setAliyunAccessId(connParam.getAliyunAccessId());
        param.setAliyunAccessKey(connParam.getAliyunAccessKey());
    
        // 设置endpoint
        param.setEndpoint(String.format(MAX_COMPUTE_JDBC_URL_FORMAT, connParam.getRegion()));
    
        // 目前只处理odps的引擎
        final String datasourceType = connParam.getDatasourceType();
        if (!"odps".equals(datasourceType)){
            throw new BizException(ResultCode.DATA_WORKS_ENGINE_SUPPORT_ERR);
        }
    
        // 获取项目环境,根据项目环境连接不同的maxCompute
        final String projectEnv = connParam.getProjectEnv();
    
        if ("dev".equals(projectEnv)){
            // 开发环境dataworks + _dev就是maxCompute的项目名
            param.setProjectName(String.join("_", connParam.getProject(), projectEnv));
        }else {
            // 生产环境dataworks的项目名和maxCompute一致
            param.setProjectName(connParam.getProject());
        }
    
        return new MaxComputeJdbcUtil(param);
    }
    Copier après la connexion

    Appelez OpenApi pour extraire tous les scripts

    /**
     * 根据文件夹路径分页查询该路径下的文件(脚本)
     * @param pageSize 每页查询多少数据
     * @param folderPath 文件所在目录
     * @param userType 文件所属功能模块 可不传
     * @param fileTypes 设置文件代码类型 逗号分割 可不传
     */
    public void listAllFiles(Integer pageSize, String folderPath, String userType, String fileTypes, CallBack.FileCallBack callBack) throws ClientException {
        pageSize = setPageSize(pageSize);
        // 创建请求
        final ListFilesRequest request = new ListFilesRequest();
    
        // 设置分页参数
        request.setPageNumber(1);
        request.setPageSize(pageSize);
    
        // 设置上级文件夹
        request.setFileFolderPath(folderPath);
    
        // 设置区域和项目名称
        request.setSysRegionId(connParam.getRegion());
        request.setProjectIdentifier(connParam.getProject());
    
        // 设置文件所属功能模块
        if (!ObjectUtils.isEmpty(userType)){
            request.setUseType(userType);
        }
        // 设置文件代码类型
        if (!ObjectUtils.isEmpty(fileTypes)){
            request.setFileTypes(fileTypes);
        }
    
        // 发起请求
        ListFilesResponse res = client.getAcsResponse(request);
    
        // 获取分页总数
        final Integer totalCount = res.getData().getTotalCount();
        // 返回结果
        final List<ListFilesResponse.Data.File> resultList = res.getData().getFiles();
        // 计算能分几页
        long pages = totalCount % pageSize == 0 ? (totalCount / pageSize) : (totalCount / pageSize) + 1;
        // 只有1页 直接返回
        if (pages <= 1){
            callBack.handle(resultList);
            return;
        }
    
        // 第一页执行回调
        callBack.handle(resultList);
    
        // 分页数据 从第二页开始查询 同步拉取,可以优化为多线程拉取
        for (int i = 2; i <= pages; i++) {
            //第1页
            request.setPageNumber(i);
            //每页大小
            request.setPageSize(pageSize);
            // 发起请求
            res = client.getAcsResponse(request);
            final List<ListFilesResponse.Data.File> tableEntityList = res.getData().getFiles();
            if (!ObjectUtils.isEmpty(tableEntityList)){
                // 执行回调函数
                callBack.handle(tableEntityList);
            }
        }
    }
    Copier après la connexion

    Connecté en interne à MaxCompute pour extraire tout le contenu des scripts DDL

    Classe d'outils DataWorks code, traité via la fonction de rappel

        /**
         * 获取所有的DDL脚本
         * @param callBack 回调处理函数
         */
        public void listAllDdl(CallBack.DdlCallBack callBack){
            if (odpsSdk){
                final List<TableMetaInfo> tableInfos = maxComputeSdkUtil.getTableInfos();
                for (TableMetaInfo tableInfo : tableInfos) {
                    final String tableName = tableInfo.getTableName();
                    final String sqlCreateDesc = maxComputeSdkUtil.getSqlCreateDesc(tableName);
                    callBack.handle(tableName, sqlCreateDesc);
                }
            }
        }
    Copier après la connexion

    Code de classe d'outil MaxCompute, selon le nom de la table Obtenez l'instruction de création de table, en prenant le SDK comme exemple, JDBC exécute directement show create table pour obtenir l'instruction de création de table

    /**
     * 根据表名获取建表语句
     * @param tableName 表名
     * @return
     */
    public String getSqlCreateDesc(String tableName) {
        final Table table = odps.tables().get(tableName);
        // 建表语句
        StringBuilder mssqlDDL = new StringBuilder();
    
        // 获取表结构
        TableSchema tableSchema = table.getSchema();
        // 获取表名表注释
        String tableComment = table.getComment();
    
        //获取列名列注释
        List<Column> columns = tableSchema.getColumns();
        /*组装成mssql的DDL*/
        // 表名
        mssqlDDL.append("CREATE TABLE IF NOT EXISTS ");
        mssqlDDL.append(tableName).append("\n");
        mssqlDDL.append(" (\n");
        //列字段
        int index = 1;
        for (Column column : columns) {
            mssqlDDL.append("  ").append(column.getName()).append("\t\t").append(column.getTypeInfo().getTypeName());
            if (!ObjectUtils.isEmpty(column.getComment())) {
                mssqlDDL.append(" COMMENT &#39;").append(column.getComment()).append("&#39;");
            }
            if (index == columns.size()) {
                mssqlDDL.append("\n");
            } else {
                mssqlDDL.append(",\n");
            }
            index++;
        }
        mssqlDDL.append(" )\n");
        //获取分区
        List<Column> partitionColumns = tableSchema.getPartitionColumns();
        int partitionIndex = 1;
        if (!ObjectUtils.isEmpty(partitionColumns)) {
            mssqlDDL.append("PARTITIONED BY (");
        }
        for (Column partitionColumn : partitionColumns) {
            final String format = String.format("%s %s COMMENT &#39;%s&#39;", partitionColumn.getName(), partitionColumn.getTypeInfo().getTypeName(), partitionColumn.getComment());
            mssqlDDL.append(format);
            if (partitionIndex == partitionColumns.size()) {
                mssqlDDL.append("\n");
            } else {
                mssqlDDL.append(",\n");
            }
            partitionIndex++;
        }
    
        if (!ObjectUtils.isEmpty(partitionColumns)) {
            mssqlDDL.append(")\n");
        }
    //        mssqlDDL.append("STORED AS ALIORC  \n");
    //        mssqlDDL.append("TBLPROPERTIES (&#39;comment&#39;=&#39;").append(tableComment).append("&#39;);");
        mssqlDDL.append(";");
        return mssqlDDL.toString();
    }
    Copier après la connexion

    Code de test

    public static void main(String[] args) throws ClientException {
        final DataWorksOpenApiConnParam connParam = new DataWorksOpenApiConnParam();
        connParam.setAliyunAccessId("您的阿里云账号accessId");
        connParam.setAliyunAccessKey("您的阿里云账号accessKey");
        // dataworks所在区域
        connParam.setRegion("cn-chengdu");
        // dataworks所属项目
        connParam.setProject("dataworks所属项目");
        // dataworks所属项目环境 如果不分环境的话设置为生产即可
        connParam.setProjectEnv("dev");
        // 数据引擎类型 odps
        connParam.setDatasourceType("odps");
        // ddataworks接口地址
        connParam.setEndPoint("dataworks.cn-chengdu.aliyuncs.com");
        final DataWorksOpenApiUtil dataWorksOpenApiUtil = new DataWorksOpenApiUtil(connParam, true);
    
        // 拉取所有ODPS脚本
        dataWorksOpenApiUtil.listAllFiles(100, "", "", "10", files -> {
            // 处理文件
            for (ListFilesResponse.Data.File file : files) {
                final String fileName = file.getFileName();
                System.out.println(fileName);
            }
        });
    
        // 拉取所有表的建表语句
        dataWorksOpenApiUtil.listAllDdl((tableName, tableDdlContent) -> {
            System.out.println("=======================================");
            System.out.println("表名:" + tableName + "内容如下:\n");
            System.out.println(tableDdlContent);
            System.out.println("=======================================");
        });
    }
    Copier après la connexion

    Résultats des tests

    script test_001
    script test_002
    script test_003
    script test_004
    script test_005
    ============================== ===========
    Nom de la table : test_abc_info Le contenu est le suivant :

    CRÉER UNE TABLE SI N'EXISTE pas test_abc_info
    (
    test_abc1 STRING COMMENT 'Champ 1',
    test_abc2 STRING COMMENT 'Champ 2',
    test_abc3 COMMENTAIRE DE CHAÎNE 'Champ 3',
    test_abc 4 COMMENTAIRE DE CHAÎNE 'champ 4',
    test_abc5 COMMENTAIRE DE CHAÎNE 'champ 5',
    test_abc6 COMMENTAIRE DE CHAÎNE 'champ 6',
    test_abc7 COMMENTAIRE DE CHAÎNE 'champ 7',
    test_abc8 COMMENTAIRE DE CHAÎNE 'field 8'
    )
    PARTITIONED BY (p_date STRING COMMENT 'data date'
    )
    ;
    === ========================== ===========
    Déconnecté de la VM cible, adresse : '127.0.0.1 : 59509', transport : 'socket'

    Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

    Étiquettes associées:
    source:yisu.com
    Déclaration de ce site Web
    Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
    Tutoriels populaires
    Plus>
    Derniers téléchargements
    Plus>
    effets Web
    Code source du site Web
    Matériel du site Web
    Modèle frontal