Home > Database > Mysql Tutorial > kettle api 执行转换

kettle api 执行转换

WBOY
Release: 2016-06-07 15:10:51
Original
1795 people have browsed it

import java.io.DataOutputStream; import java.io.File; import java.io.FileOutputStream; import java.util.Date; import be.ibridge.kettle.core.Const; import be.ibridge.kettle.core.LogWriter; import be.ibridge.kettle.core.NotePadMeta; import b


import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.util.Date;

import be.ibridge.kettle.core.Const;
import be.ibridge.kettle.core.LogWriter;
import be.ibridge.kettle.core.NotePadMeta;
import be.ibridge.kettle.core.database.Database;
import be.ibridge.kettle.core.database.DatabaseMeta;
import be.ibridge.kettle.core.exception.KettleException;
import be.ibridge.kettle.trans.StepLoader;
import be.ibridge.kettle.trans.Trans;
import be.ibridge.kettle.trans.TransHopMeta;
import be.ibridge.kettle.trans.TransMeta;
import be.ibridge.kettle.trans.step.StepMeta;
import be.ibridge.kettle.trans.step.StepMetaInterface;
import be.ibridge.kettle.trans.step.selectvalues.SelectValuesMeta;
import be.ibridge.kettle.trans.step.tableinput.TableInputMeta;
import be.ibridge.kettle.trans.step.tableoutput.TableOutputMeta;

/**
 *
 *

Title:
 * 本文描述了以下操作:

1)           建立一个新的转换(transformation)

2)           把转换(transformation)存储为XML文件

3)           生成需要在目标表运行的SQL语句

4)           执行转换(transformation)

5)           删除目标表,可以使测试程序可以反复执行(这一点可根据需要修改)。


 *

Description: TODO 类的功能描述


 *

Copyright: Copyright (c) 2003



 * @author 洪亮
 * @version 1.0
 *

------------------------------------------------------------


 *

修改历史


 *

  序号    日期       时间        修 改 人    修 改 原 因


 *

   1    2006-9-20   下午05:59:06     洪亮       创建 


 *
 */

public class TransBuilderME
{
    public static final String[] databasesXML = {
        "" +
        "" +
            "target" +
            "192.168.169.220" +
            "ORACLE" +
            "Native" +
            "NMSDB" +
            "1521" +
            "UCP" +
            "UCP" +
          "
",
         
          "" +
          "" +
              "source" +
              "192.168.169.220" +
              "ORACLE" +
              "Native" +
              "NMSDB" +
              "1521" +
              "UCP" +
              "UCP" +
            "

    };

    /**
     * Creates a new Transformation using input parameters such as the tablename to read from.
     * @param transformationName The name of the transformation
     * @param sourceDatabaseName The name of the database to read from
     * @param sourceTableName The name of the table to read from
     * @param sourceFields The field names we want to read from the source table
     * @param targetDatabaseName The name of the target database
     * @param targetTableName The name of the target table we want to write to
     * @param targetFields The names of the fields in the target table (same number of fields as sourceFields)
     * @return A new transformation
     * @throws KettleException In the rare case something goes wrong
     */
    public static final TransMeta buildCopyTable(String transformationName, String sourceDatabaseName, String sourceTableName, String[] sourceFields, String targetDatabaseName, String targetTableName, String[] targetFields) throws KettleException
    {
        LogWriter log = LogWriter.getInstance();
       
        try
        {
            //
            // Create a new transformation...
            //传输元信息
            TransMeta transMeta = new TransMeta();
            transMeta.setName(transformationName);//传输名称
           
            // Add the database connections
            for (int i=0;i            {
                DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]);//数据库元信息
                transMeta.addDatabase(databaseMeta);//传输元  中加入数据库元信息
            }
           
            DatabaseMeta sourceDBInfo = transMeta.findDatabase(sourceDatabaseName);//查找源数据库元信息
            DatabaseMeta targetDBInfo = transMeta.findDatabase(targetDatabaseName);//查找目标数据库元信息

           
            //
            // Add a note
            //
            String note = "Reads information from table [" + sourceTableName+ "] on database [" + sourceDBInfo + "]" + Const.CR;
            note += "After that, it writes the information to table [" + targetTableName + "] on database [" + targetDBInfo + "]";
            NotePadMeta ni = new NotePadMeta(note, 150, 10, -1, -1);//注释信息
            transMeta.addNote(ni);

            //
            // create the source step...
            //
            String fromstepname = "read from [" + sourceTableName + "]";//from步骤名称
            TableInputMeta tii = new TableInputMeta();//表输入元数据信息
            tii.setDatabaseMeta(sourceDBInfo);//为表输入 指定 数据库
            String selectSQL = "SELECT "+Const.CR;//拼接查询sql语句
            for (int i=0;i            {
                if (i>0) selectSQL+=", "; else selectSQL+="  ";
                selectSQL+=sourceFields[i]+Const.CR;
            }
            selectSQL+="FROM "+sourceTableName;
            tii.setSQL(selectSQL);//设置查询sql语句

            StepLoader steploader = StepLoader.getInstance();//???

            String fromstepid = steploader.getStepPluginID(tii);
            //步骤元数据信息
            StepMeta fromstep = new StepMeta(log, fromstepid, fromstepname, (StepMetaInterface) tii);
            fromstep.setLocation(150, 100);
            fromstep.setDraw(true);
            fromstep.setDescription("Reads information from table [" + sourceTableName + "] on database [" + sourceDBInfo + "]");
            //传输中 添加步骤
            transMeta.addStep(fromstep);
            //
            // add logic to rename fields
            // Use metadata logic in SelectValues, use SelectValueInfo...
            //选择字段(重命名)
            SelectValuesMeta svi = new SelectValuesMeta();
            svi.allocate(0, 0, sourceFields.length);
            for (int i = 0; i             {
             //设置源字段和目标字段
                svi.getMetaName()[i] = sourceFields[i];
                svi.getMetaRename()[i] = targetFields[i];
            }

            String selstepname = "Rename field names";
            //获取步骤插件ID
            String selstepid = steploader.getStepPluginID(svi);
            //创建步骤元数据信息
            StepMeta selstep = new StepMeta(log, selstepid, selstepname, (StepMetaInterface) svi);
            selstep.setLocation(350, 100);
            selstep.setDraw(true);
            selstep.setDescription("Rename field names");
            //添加步骤
            transMeta.addStep(selstep);

            //传输连接元数据信息(连接from和select)
            TransHopMeta shi = new TransHopMeta(fromstep, selstep);
            transMeta.addTransHop(shi);//添加到传输元对象
            fromstep = selstep;//然后设置from步骤为select步骤

            //
            // Create the target step...
            //
            //
            // Add the TableOutputMeta step...
            //设置目标步骤名称
            String tostepname = "write to [" + targetTableName + "]";
            //表输出元对象
            TableOutputMeta toi = new TableOutputMeta();
            toi.setDatabase(targetDBInfo);//设置数据库
            toi.setTablename(targetTableName);//设置表名
            toi.setCommitSize(3000);//设置批量提交数
            toi.setTruncateTable(true);//是否清除原有数据

            //获取步骤ID
            String tostepid = steploader.getStepPluginID(toi);
            //创建to步骤
            StepMeta tostep = new StepMeta(log, tostepid, tostepname, (StepMetaInterface) toi);
            tostep.setLocation(550, 100);
            tostep.setDraw(true);
            tostep.setDescription("Write information to table [" + targetTableName + "] on database [" + targetDBInfo + "]");
            transMeta.addStep(tostep);//添加步骤

            //
            // Add a hop between the two steps...
            //
            //创建连接 from--to
            TransHopMeta hi = new TransHopMeta(fromstep, tostep);
            transMeta.addTransHop(hi);

            // OK, if we're still here: overwrite the current transformation...
            return transMeta;
        }
        catch (Exception e)
        {
            throw new KettleException("An unexpected error occurred creating the new transformation", e);
        }
    }

    /**
     * 1) create a new transformation
     * 2) save the transformation as XML file
     * 3) generate the SQL for the target table
     * 4) Execute the transformation
     * 5) drop the target table to make this program repeatable
     *
     * @param args
     */
    public static void main(String[] args) throws Exception
    {
        long start = new Date().getTime();
        // Init the logging...
        LogWriter log = LogWriter.getInstance("TransBuilder.log", true, LogWriter.LOG_LEVEL_DETAILED);
       
        // Load the Kettle steps & plugins
        StepLoader stloader = StepLoader.getInstance();
        if (!stloader.read())
        {
            log.logError("TransBuilder",  "Error loading Kettle steps & plugins... stopping now!");
            return;
        }
       
        // The parameters we want, optionally this can be
        String fileName = "./NewTrans.xml";
        String transformationName = "Test Transformation";
        String sourceDatabaseName = "source";
        String sourceTableName = "emp_collect";
        String sourceFields[] = {
          "empno",      
          "ename",      
          "job",        
          "mgr",        
          "comm",       
          "sal",        
          "deptno",     
          "birthday"   
 
            };

        String targetDatabaseName = "target";
        String targetTableName = "emp_kettle01";
        String targetFields[] = {
          "empno01",      
          "ename01",      
          "job01",        
          "mgr01",        
          "comm",       
          "sal",        
          "deptno",     
          "birthday"
            };

       
        // Generate the transformation.
        //创建转换元对象
        TransMeta transMeta = TransBuilderME.buildCopyTable(
                transformationName,
                sourceDatabaseName,
                sourceTableName,
                sourceFields,
                targetDatabaseName,
                targetTableName,
                targetFields
                );
//        transMeta = new  TransMeta();
        // Save it as a file:
        //传输元对象 中获得XML,并输出
        String xml = transMeta.getXML();
        DataOutputStream dos = new DataOutputStream(new FileOutputStream(new File(fileName)));
        dos.write(xml.getBytes("UTF-8"));
        dos.close();
        System.out.println("Saved transformation to file: "+fileName);

        // OK, What's the SQL we need to execute to generate the target table?
        //获得sql语句,创建表语句
        String sql = transMeta.getSQLStatementsString();

        // Execute the SQL on the target table:
        //创建表
        Database targetDatabase = new Database(transMeta.findDatabase(targetDatabaseName));
        targetDatabase.connect();//连接数据库
        targetDatabase.execStatements(sql);//执行sql
       
        // Now execute the transformation...
        //执行传输任务
        Trans trans = new Trans(log, transMeta);
        trans.execute(null);
        trans.waitUntilFinished();//等待执行完毕
       
        // For testing/repeatability, we drop the target table again
//        targetDatabase.execStatement("drop table "+targetTableName);
        targetDatabase.disconnect();//断开数据库连接
       
        long end = new Date().getTime();
        System.out.println("运行时间:" + (end - start) / 1000 + "秒");
        long min = (end - start) / 1000 / 60;
        long second = (end - start) / 1000 % 60;
        System.out.println("运行时间:" + min + "分钟" + second + "秒");
    }


}
 

Related labels:
source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template