业务:需要从一个数据库查询百万级数据,在java程序中插入到另一个oracle数据库中
代码:
private final int persize = 1000;
/**
* 推送数据-流程
* @param tableCode 表名
* @param startTime 开始时间
* @param endTime 结束时间
* @return
*/
public boolean pushFrData(String username,String tableCode,String tableName,String startTime,String endTime){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
System.out.println("导入数据到名录库!");
boolean boo = false;
//表名集合
String [] str = tableCode.split(",");
String [] names = tableName.split(",");
startTime = startTime==""?"2000-01-01":startTime;
endTime = endTime==""?getCurrentDate():endTime;
//System.out.println("service 时间 "+startTime+" > "+endTime);
String start_Time = "to_date('"+startTime+"','%Y-%m-%d')";
String end_Time = "to_date('"+endTime+"','%Y-%m-%d')";
System.out.println("选择推送 "+str.length+" 张表");
//遍历表名集合
for(int i = 0;i<str.length;i++){
System.out.println("所选数据表: "+str[i]);
//字段集合
String [] arr = fillService.getIdenCode(str[i]);
String iden_code = "";
//遍历字段
for(int j = 0;j<arr.length;j++){
iden_code += ""+arr[j]+",";
}
//表字段
iden_code = iden_code.substring(0,iden_code.length()-1);
//System.out.println(str[i]+"总共 "+arr.length+" 个字段!");
//得到推送数据集合
String sql = "select count(*) from "+str[i] +" where s_ext_timestamp >= "+start_Time+" and s_ext_timestamp < "+end_Time;
System.out.println(sql);
int table_size = Integer.valueOf(frDao.query(sql).get(0).toString());
//System.out.println(table_size/persize);
int times = table_size%persize==0?table_size/persize:table_size/persize+1;
for(int t = 1; t <= times;t++){
int start = (t-1) * persize;
List<Object[]> lists = getData(str[i], iden_code,startTime,endTime,start);
//推送数据
System.out.println("准备导出第 "+t+" 批数据");
push(lists,str[i],iden_code);
System.out.println("已导出第 "+t+" 批数据");
}
try{
if(table_size > 0){
addLog(username,str[i].toString(),names[i].toString(),table_size,"1",sdf.parse(startTime),sdf.parse(endTime));
}
}catch (ParseException e){
System.out.println("日期格式转换异常");
}
}
return boo;
}
/**
* 推送数据
* @param lists 数据集
* @param table 表名
* @param iden_code 字段集
*/
private int push(List<Object []> lists,String table,String iden_code){
boolean boo = false;
int count = 0;
//遍历数据结果集
if(lists.size()>0){
for(int k = 0;k < lists.size();k++){
Object [] obj = lists.get(k);
String val = ""; //将数据转换成String类型
for(Object s:obj){
//val += "'"+s.toString()+"',";
if(s != null){
val += "'"+s.toString()+"'|";
}else{
val += "''|";
}
}
val = val.substring(0,val.length()-1);
String etpsid = "";
String [] iden = iden_code.split(",");
String [] value = val.split("\\|");
String val2 = "";//格式化数据(日期)
if(iden.length == value.length){
//格式化sql语句的时间
for(int i = 0;i<iden.length;i++){
//判断字段是否是date类型
if(getDateType(table, iden[i])){
//格式化字符串 防止出现datetime类型 1900-01-01 00:00:00.0的情况
if(value[i].length() > 4){
String vv = value[i].substring(value[i].length()-3,value[i].length());
if(vv.contains(".")){
val2 += "to_date("+value[i].substring(0,value[i].length()-3)+"','YYYY-MM-DD HH24:MI:SS'),";
}else{
val2 += "to_date("+value[i]+",'YYYY-MM-DD HH24:MI:SS'),";
}
}else{
val2 += "'',";
}
}else{
val2 += value[i]+",";
}
if("ETPSID".equals(iden[i])){
etpsid = value[i];
}
}
val2 = val2.substring(0,val2.length()-1);
}else{
/*System.out.println(iden.length+" : "+value.length);
for(int j = 0 ;j< value.length;j++){
System.out.println(value[j]);
}*/
System.out.println("推送数据和字段不一致");
}
String mlk_table = getMlkTable(table);
String mlk_code = getMlkCode(iden_code,table);
//插入数据sql
//String sql = "insert into "+table+"("+iden_code+") values ("+val2+")";
//生成流水号
String uuid = UUID.randomUUID().toString();
uuid = uuid.replace("-","");
val2 += ",'"+uuid+"'";
String sql = "insert into "+mlk_table+"("+mlk_code+") values ("+val2+")";
if(etpsid != ""){
//System.out.println(etpsid);
//工商增量数据按日依"企业唯一标识"为关键字,更新、追加至名录库表中
String s_sql = "select * from "+mlk_table+" where 企业唯一标识 = "+etpsid;
//System.out.println(s_sql);
int s = mlDao.query(s_sql).size();
if (s > 0){
String update_sql = "";
String [] update_code = mlk_code.split(",");
for(int j = 0;j < iden_code.split(",").length;j++){
//判断字段是否是date类型
if(getDateType(table, iden[j])){
//格式化字符串 防止出现datetime类型 1900-01-01 00:00:00.0的情况
if(value[j].length() > 4){
String vv = value[j].substring(value[j].length() - 3, value[j].length());
if(vv.contains(".")){
update_sql += update_code[j]+" = "+ "to_date("+value[j].substring(0, value[j].length() - 3)+"','YYYY-MM-DD HH24:MI:SS'),";
}else{
update_sql += update_code[j]+" = "+ "to_date("+value[j]+",'YYYY-MM-DD HH24:MI:SS'),";
}
}else{
update_sql += update_code[j]+" = "+ "'',";
}
}else{
update_sql += update_code[j]+" = "+ value[j]+",";
}
//update_sql += update_code[j]+" = "+update_val[j]+",";
}
update_sql += "流水号 = '"+uuid+"'";
update_sql = "update "+mlk_table+" set "+update_sql+" where 企业唯一标识 = "+etpsid;
//System.out.println("/////////// /\n"+update_sql+"\n");
sql = update_sql;
System.out.println("更新 "+mlk_table+" 标识 "+etpsid);
}else{
System.out.println("插入 "+mlk_table+" 标识 "+etpsid);
}
}
//System.out.println(sql);
try{
//循环执行sql
mlDao.execute(sql);
count++;
System.out.println(table+" 推送第 "+count+" 条");
}catch (Exception e){
System.out.println("sql执行异常!");
}
}
System.out.println(table+" 表共插入 "+count+" 条数据!");
}else{
System.out.println(table+" 表共插入 "+count+" 条数据!");
System.out.println("导入数据为空!");
}
return count;
}
Si vous souhaitez le lire, vous pouvez le lire via plusieurs fils de discussion. Si vous souhaitez l'insérer, veuillez consulter le lien suivant :
https://segmentfault.com/sear...
https://segmentfault.com/sear...
1. Vous pouvez utiliser le traitement par lots pour enregistrer des données dans la base de données, par exemple en traitant 100 enregistrements insérés à la fois
.2 Au lieu de traiter toutes les données en même temps, vous pouvez diviser 1 W de données en deux threads. pour le traitement, afin que vous puissiez utiliser pleinement le processeur, vos collègues ne provoqueront pas de gros blocages
Il est recommandé d'utiliser le mode de traitement par lots JDBC et de rechercher les mots-clés suivants :
addBatch(String query)
executeBatch()
Il est recommandé de soumettre environ 1 000 transactions par lot.
Méthode du paresseux :
Établissez un lien distribué dans la base de données Oracle cible et insérez-le directement dans la table cible, sélectionnez l'instruction SQL