ホームページ バックエンド開発 C#.Net チュートリアル 複数のデータベースへのビッグデータの一括挿入を実装するための C# サンプル コードの詳細な説明

複数のデータベースへのビッグデータの一括挿入を実装するための C# サンプル コードの詳細な説明

Mar 25, 2017 am 11:43 AM

この記事では主に、SqlServer、Oracle、SQLite、MySQLを含むいくつかのデータベースにビッグデータのバッチ挿入を実装するための C# を紹介します。興味のある方は詳細をご覧ください。

これまで、SqlServer がデータのバッチ挿入をサポートしていることだけを知っていましたが、Oracle、SQLite、MySQL もそれをサポートしていることはほとんど知りませんでした。しかし、今日はいくつかのバッチ挿入を投稿します。データベース向けのソリューション。

まず、IProvider にはバッチ挿入用のプラグイン サービス インターフェイス IBatcherProvider があります。このインターフェイスは前の記事で説明しました。

/// <summary> 
  /// 提供数据批量处理的方法。 
  /// </summary> 
  public interface IBatcherProvider : IProviderService 
  { 
    /// <summary> 
    /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">每批次写入的数据量。</param> 
    void Insert(DataTable dataTable, int batchSize = 10000); 
  }
ログイン後にコピー

1. SqlServer データのバッチ挿入

SqlServer のバッチ挿入は、SqlBulkCopy を使用するだけです。

/// <summary> 
  /// 为 System.Data.SqlClient 提供的用于批量操作的方法。 
  /// </summary> 
  public sealed class MsSqlBatcher : IBatcherProvider 
  { 
    /// <summary> 
    /// 获取或设置提供者服务的上下文。 
    /// </summary> 
    public ServiceContext ServiceContext { get; set; } 
 
    /// <summary> 
    /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">每批次写入的数据量。</param> 
    public void Insert(DataTable dataTable, int batchSize = 10000) 
    { 
      Checker.ArgumentNull(dataTable, "dataTable"); 
      if (dataTable.Rows.Count == 0) 
      { 
        return; 
      } 
      using (var connection = (SqlConnection)ServiceContext.Database.CreateConnection()) 
      { 
        try 
        { 
          connection.TryOpen(); 
          //给表名加上前后导符 
          var tableName = DbUtility.FormatByQuote(ServiceContext.Database.Provider.GetService<ISyntaxProvider>(), dataTable.TableName); 
          using (var bulk = new SqlBulkCopy(connection, SqlBulkCopyOptions.KeepIdentity, null) 
            { 
              DestinationTableName = tableName,  
              BatchSize = batchSize 
            }) 
          { 
            //循环所有列,为bulk添加映射 
            dataTable.EachColumn(c => bulk.ColumnMappings.Add(c.ColumnName, c.ColumnName), c => !c.AutoIncrement); 
            bulk.WriteToServer(dataTable); 
            bulk.Close(); 
          } 
        } 
        catch (Exception exp) 
        { 
          throw new BatcherException(exp); 
        } 
        finally 
        { 
          connection.TryClose(); 
        } 
      } 
    } 
  }
ログイン後にコピー

上記ではトランザクションを使用しません。パフォーマンス トランザクションを使用する場合は、SqlBulkCopyOptions.UseInternalTransaction を設定できます。

2. Oracle データのバッチ挿入

System.Data.OracleClient はバッチ挿入をサポートしていないため、プロバイダとして Oracle.DataAccess コンポーネントのみを使用できます。

/// <summary> 
  /// Oracle.Data.Access 组件提供的用于批量操作的方法。 
  /// </summary> 
  public sealed class OracleAccessBatcher : IBatcherProvider 
  { 
    /// <summary> 
    /// 获取或设置提供者服务的上下文。 
    /// </summary> 
    public ServiceContext ServiceContext { get; set; } 
 
    /// <summary> 
    /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">每批次写入的数据量。</param> 
    public void Insert(DataTable dataTable, int batchSize = 10000) 
    { 
      Checker.ArgumentNull(dataTable, "dataTable"); 
      if (dataTable.Rows.Count == 0) 
      { 
        return; 
      } 
      using (var connection = ServiceContext.Database.CreateConnection()) 
      { 
        try 
        { 
          connection.TryOpen(); 
          using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) 
          { 
            if (command == null) 
            { 
              throw new BatcherException(new ArgumentException("command")); 
            } 
            command.Connection = connection; 
            command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable); 
            command.ExecuteNonQuery(); 
          } 
        } 
        catch (Exception exp) 
        { 
          throw new BatcherException(exp); 
        } 
        finally 
        { 
          connection.TryClose(); 
        } 
      } 
    } 
 
    /// <summary> 
    /// 生成插入数据的sql语句。 
    /// </summary> 
    /// <param name="database"></param> 
    /// <param name="command"></param> 
    /// <param name="table"></param> 
    /// <returns></returns> 
    private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table) 
    { 
      var names = new StringBuilder(); 
      var values = new StringBuilder(); 
      //将一个DataTable的数据转换为数组的数组 
      var data = table.ToArray(); 
 
      //设置ArrayBindCount属性 
      command.GetType().GetProperty("ArrayBindCount").SetValue(command, table.Rows.Count, null); 
 
      var syntax = database.Provider.GetService<ISyntaxProvider>(); 
      for (var i = 0; i < table.Columns.Count; i++) 
      { 
        var column = table.Columns[i]; 
 
        var parameter = database.Provider.DbProviderFactory.CreateParameter(); 
        if (parameter == null) 
        { 
          continue; 
        } 
        parameter.ParameterName = column.ColumnName; 
        parameter.Direction = ParameterDirection.Input; 
        parameter.DbType = column.DataType.GetDbType(); 
        parameter.Value = data[i]; 
 
        if (names.Length > 0) 
        { 
          names.Append(","); 
          values.Append(","); 
        } 
        names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, column.ColumnName)); 
        values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName); 
 
        command.Parameters.Add(parameter); 
      } 
      return string.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values); 
    } 
  }
ログイン後にコピー

上記の最も重要な手順は、DataTable を配列の配列表現、つまり object[][] に変換することです。前者の配列の上付き文字は列数であり、後者の配列は列数です。行数に応じてループします。 Columns 後者の配列をパラメータの値として使用します。つまり、パラメータの値は配列です。 insert ステートメントは、通常の insert ステートメントと何ら変わりません。

3. SQLite データのバッチ挿入

SQLite のバッチ挿入は、トランザクションを開始するだけで済みます。この具体的な原理は不明です。

public sealed class SQLiteBatcher : IBatcherProvider 
  { 
    /// <summary> 
    /// 获取或设置提供者服务的上下文。 
    /// </summary> 
    public ServiceContext ServiceContext { get; set; } 
 
    /// <summary> 
    /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">每批次写入的数据量。</param> 
    public void Insert(DataTable dataTable, int batchSize = 10000) 
    { 
      Checker.ArgumentNull(dataTable, "dataTable"); 
      if (dataTable.Rows.Count == 0) 
      { 
        return; 
      } 
      using (var connection = ServiceContext.Database.CreateConnection()) 
      { 
        DbTransaction transcation = null; 
        try 
        { 
          connection.TryOpen(); 
          transcation = connection.BeginTransaction(); 
          using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) 
          { 
            if (command == null) 
            { 
              throw new BatcherException(new ArgumentException("command")); 
            } 
            command.Connection = connection; 
 
            command.CommandText = GenerateInserSql(ServiceContext.Database, dataTable); 
            if (command.CommandText == string.Empty) 
            { 
              return; 
            } 
 
            var flag = new AssertFlag(); 
            dataTable.EachRow(row => 
              { 
                var first = flag.AssertTrue(); 
                ProcessCommandParameters(dataTable, command, row, first); 
                command.ExecuteNonQuery(); 
              }); 
          } 
          transcation.Commit(); 
        } 
        catch (Exception exp) 
        { 
          if (transcation != null) 
          { 
            transcation.Rollback(); 
          } 
          throw new BatcherException(exp); 
        } 
        finally 
        { 
          connection.TryClose(); 
        } 
      } 
    } 
 
    private void ProcessCommandParameters(DataTable dataTable, DbCommand command, DataRow row, bool first) 
    { 
      for (var c = 0; c < dataTable.Columns.Count; c++) 
      { 
        DbParameter parameter; 
        //首次创建参数,是为了使用缓存 
        if (first) 
        { 
          parameter = ServiceContext.Database.Provider.DbProviderFactory.CreateParameter(); 
          parameter.ParameterName = dataTable.Columns[c].ColumnName; 
          command.Parameters.Add(parameter); 
        } 
        else 
        { 
          parameter = command.Parameters[c]; 
        } 
        parameter.Value = row[c]; 
      } 
    } 
 
    /// <summary> 
    /// 生成插入数据的sql语句。 
    /// </summary> 
    /// <param name="database"></param> 
    /// <param name="table"></param> 
    /// <returns></returns> 
    private string GenerateInserSql(IDatabase database, DataTable table) 
    { 
      var syntax = database.Provider.GetService<ISyntaxProvider>(); 
      var names = new StringBuilder(); 
      var values = new StringBuilder(); 
      var flag = new AssertFlag(); 
      table.EachColumn(column => 
        { 
          if (!flag.AssertTrue()) 
          { 
            names.Append(","); 
            values.Append(","); 
          } 
          names.Append(DbUtility.FormatByQuote(syntax, column.ColumnName)); 
          values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName); 
        }); 
      return string.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values); 
    } 
  }
ログイン後にコピー

4. MySqlデータのバッチ挿入

/// <summary> 
  /// 为 MySql.Data 组件提供的用于批量操作的方法。 
  /// </summary> 
  public sealed class MySqlBatcher : IBatcherProvider 
  { 
    /// <summary> 
    /// 获取或设置提供者服务的上下文。 
    /// </summary> 
    public ServiceContext ServiceContext { get; set; } 
 
    /// <summary> 
    /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">每批次写入的数据量。</param> 
    public void Insert(DataTable dataTable, int batchSize = 10000) 
    { 
      Checker.ArgumentNull(dataTable, "dataTable"); 
      if (dataTable.Rows.Count == 0) 
      { 
        return; 
      } 
      using (var connection = ServiceContext.Database.CreateConnection()) 
      { 
        try 
        { 
          connection.TryOpen(); 
          using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) 
          { 
            if (command == null) 
            { 
              throw new BatcherException(new ArgumentException("command")); 
            } 
            command.Connection = connection; 
 
            command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable); 
            if (command.CommandText == string.Empty) 
            { 
              return; 
            } 
            command.ExecuteNonQuery(); 
          } 
        } 
        catch (Exception exp) 
        { 
          throw new BatcherException(exp); 
        } 
        finally 
        { 
          connection.TryClose(); 
        } 
      } 
    } 
 
    /// <summary> 
    /// 生成插入数据的sql语句。 
    /// </summary> 
    /// <param name="database"></param> 
    /// <param name="command"></param> 
    /// <param name="table"></param> 
    /// <returns></returns> 
    private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table) 
    { 
      var names = new StringBuilder(); 
      var values = new StringBuilder(); 
      var types = new List<DbType>(); 
      var count = table.Columns.Count; 
      var syntax = database.Provider.GetService<ISyntaxProvider>(); 
      table.EachColumn(c => 
        { 
          if (names.Length > 0) 
          { 
            names.Append(","); 
          } 
          names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, c.ColumnName)); 
          types.Add(c.DataType.GetDbType()); 
        }); 
 
      var i = 0; 
      foreach (DataRow row in table.Rows) 
      { 
        if (i > 0) 
        { 
          values.Append(","); 
        } 
        values.Append("("); 
        for (var j = 0; j < count; j++) 
        { 
          if (j > 0) 
          { 
            values.Append(", "); 
          } 
          var isStrType = IsStringType(types[j]); 
          var parameter = CreateParameter(database.Provider, isStrType, types[j], row[j], syntax.ParameterPrefix, i, j); 
          if (parameter != null) 
          { 
            values.Append(parameter.ParameterName); 
            command.Parameters.Add(parameter); 
          } 
          else if (isStrType) 
          { 
            values.AppendFormat("&#39;{0}&#39;", row[j]); 
          } 
          else 
          { 
            values.Append(row[j]); 
          } 
        } 
        values.Append(")"); 
        i++; 
      } 
      return string.Format("INSERT INTO {0}({1}) VALUES {2}", DbUtility.FormatByQuote(syntax, table.TableName), names, values); 
    } 
 
    /// <summary> 
    /// 判断是否为字符串类别。 
    /// </summary> 
    /// <param name="dbType"></param> 
    /// <returns></returns> 
    private bool IsStringType(DbType dbType) 
    { 
return dbType == DbType.AnsiString || dbType == DbType.AnsiStringFixedLength || dbType == DbType.String || dbType == DbType.StringFixedLength; 
    } 
 
    /// <summary> 
    /// 创建参数。 
    /// </summary> 
    /// <param name="provider"></param> 
    /// <param name="isStrType"></param> 
    /// <param name="dbType"></param> 
    /// <param name="value"></param> 
    /// <param name="parPrefix"></param> 
    /// <param name="row"></param> 
    /// <param name="col"></param> 
    /// <returns></returns> 
    private DbParameter CreateParameter(IProvider provider, bool isStrType, DbType dbType, object value, char parPrefix, int row, int col) 
    { 
      //如果生成全部的参数,则速度会很慢,因此,只有数据类型为字符串(包含&#39;号)和日期型时才添加参数 
      if ((isStrType && value.ToString().IndexOf(&#39;\&#39;&#39;) != -1) || dbType == DbType.DateTime) 
      { 
        var name = string.Format("{0}p_{1}_{2}", parPrefix, row, col); 
        var parameter = provider.DbProviderFactory.CreateParameter(); 
        parameter.ParameterName = name; 
        parameter.Direction = ParameterDirection.Input; 
        parameter.DbType = dbType; 
        parameter.Value = value; 
        return parameter; 
      } 
      return null; 
    } 
  }
ログイン後にコピー

MySqlのバッチ挿入は、ステートメントの値にすべての値を書き込むことです。たとえば、insert Batcher(id, name)values( 1、「1」、2、「2」、3、「3」、……10、「10」)。

5. テスト

次に、テスト ケースを作成して、バッチ挿入を使用した効果を確認します。

    public void TestBatchInsert() 
    { 
      Console.WriteLine(TimeWatcher.Watch(() => 
        InvokeTest(database => 
          { 
            var table = new DataTable("Batcher"); 
            table.Columns.Add("Id", typeof(int)); 
            table.Columns.Add("Name1", typeof(string)); 
            table.Columns.Add("Name2", typeof(string)); 
            table.Columns.Add("Name3", typeof(string)); 
            table.Columns.Add("Name4", typeof(string)); 
 
            //构造100000条数据 
            for (var i = 0; i < 100000; i++) 
            { 
              table.Rows.Add(i, i.ToString(), i.ToString(), i.ToString(), i.ToString()); 
            } 
 
            //获取 IBatcherProvider 
            var batcher = database.Provider.GetService<IBatcherProvider>(); 
            if (batcher == null) 
            { 
              Console.WriteLine("不支持批量插入。"); 
            } 
            else 
            { 
              batcher.Insert(table); 
            } 
 
            //输出batcher表的数据量 
            var sql = new SqlCommand("SELECT COUNT(1) FROM Batcher"); 
            Console.WriteLine("当前共有 {0} 条数据", database.ExecuteScalar(sql)); 
 
          }))); 
    }
ログイン後にコピー

次の表は、4 つのデータベースのそれぞれが 100,000 個のデータを生成するのにかかる時間を示しています

MsSql00:0 0:02.9376 Oracle00:00:01.5155959SQLite00:00:01.6275634MySql00 : 00:05.4166891
データベース

時間がかかります

300

以上が複数のデータベースへのビッグデータの一括挿入を実装するための C# サンプル コードの詳細な説明の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

AI Hentai Generator

AI Hentai Generator

AIヘンタイを無料で生成します。

ホットツール

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強力な PHP 統合開発環境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

C# を使用した Active Directory C# を使用した Active Directory Sep 03, 2024 pm 03:33 PM

C# を使用した Active Directory のガイド。ここでは、Active Directory の概要と、C# での動作方法について、構文と例とともに説明します。

C# のアクセス修飾子 C# のアクセス修飾子 Sep 03, 2024 pm 03:24 PM

C# のアクセス修飾子のガイド。 C# のアクセス修飾子の種類について、例と出力とともに説明しました。

C# の乱数ジェネレーター C# の乱数ジェネレーター Sep 03, 2024 pm 03:34 PM

C# の乱数ジェネレーターのガイド。ここでは、乱数ジェネレーターの仕組み、擬似乱数の概念、安全な数値について説明します。

C# データ グリッド ビュー C# データ グリッド ビュー Sep 03, 2024 pm 03:32 PM

C# データ グリッド ビューのガイド。ここでは、SQL データベースまたは Excel ファイルからデータ グリッド ビューをロードおよびエクスポートする方法の例について説明します。

C# 文字列リーダー C# 文字列リーダー Sep 03, 2024 pm 03:23 PM

C# StringReader のガイド。ここでは、C# StringReader の概要とその動作について、さまざまな例やコードとともに説明します。

C# のパターン C# のパターン Sep 03, 2024 pm 03:33 PM

C# のパターンのガイド。ここでは、C# のパターンの概要と上位 3 種類について、その例とコード実装とともに説明します。

C# シリアル化 C# シリアル化 Sep 03, 2024 pm 03:30 PM

C# シリアル化のガイド。ここでは、C# シリアル化オブジェクトの導入、手順、作業、例についてそれぞれ説明します。

C# 文字列ライター C# 文字列ライター Sep 03, 2024 pm 03:23 PM

C# StringWriter のガイド。ここでは、C# StringWriter クラスの概要とその動作について、さまざまな例やコードとともに説明します。

See all articles