mysql을 작성할 때 Spark SQL 지원 업데이트 작업을 만드는 방법

PHPz
풀어 주다: 2023-06-02 15:19:21
앞으로
1838명이 탐색했습니다.

Append, Overwrite, ErrorIfExists, Ignore 지원 외에도 업데이트 작업도 지원합니다

1. 먼저 배경을 이해하세요

spark는 도킹 데이터 소스의 작업 모드를 지원하는 열거형 클래스를 제공합니다

mysql을 작성할 때 Spark SQL 지원 업데이트 작업을 만드는 방법

Pass Viewing 소스 코드를 보면 Spark가 업데이트 작업을 지원하지 않는 것이 분명합니다

2. SparkSQL 지원 업데이트를 만드는 방법

핵심 지식 포인트는 다음과 같습니다.

sparkSQL에서 일반적으로 mysql에 데이터를 쓸 때:

대략적인 API는 다음과 같습니다. :

dataframe.write
          .format("sql.execution.customDatasource.jdbc")
          .option("jdbc.driver", "com.mysql.jdbc.Driver")
          .option("jdbc.url", "jdbc:mysql://localhost:3306/test?user=root&password=&useUnicode=true&characterEncoding=gbk&autoReconnect=true&failOverReadOnly=false")
          .option("jdbc.db", "test")
          .save()
로그인 후 복사

그런 다음 맨 아래 수준에서 Spark는 JDBC 방언 JdbcDialect를 사용하여 삽입하려는 데이터를 다음과 같이 변환합니다.

insert into student (columns_1 , columns_2 , ...) values (? , ? , ....)
로그인 후 복사

그런 다음 방언을 통해 구문 분석된 sql 문은 prepareStatement의 excuteBatch()를 통해 mysql에 제출됩니다.

그러면 위의 SQL 문은 명백합니다. 코드를 완전히 삽입하고 다음과 유사한 업데이트 작업이 없습니다.

UPDATE table_name SET field1=new-value1, field2=new-value2
로그인 후 복사

그러나 mysql은 다음과 같은 SQL 문을 독점적으로 지원합니다.

INSERT INTO student (columns_1,columns_2)VALUES ('第一个字段值','第二个字段值') ON DUPLICATE KEY UPDATE columns_1 = '呵呵哒',columns_2 = '哈哈哒';
로그인 후 복사

대략적인 의미 즉, 데이터가 없으면 삽입합니다.

그래서 우리는 JdbcDialect

INSERT INTO 表名称 (columns_1,columns_2)VALUES ('第一个字段值','第二个字段值') ON DUPLICATE KEY UPDATE columns_1 = '呵呵哒',columns_2 = '哈哈哒';
로그인 후 복사

3와 내부적으로 연결될 때 Spark SQL이 이러한 SQL 문을 생성할 수 있도록 하는 데 중점을 둡니다. 소스 코드를 변환하려면 전반적인 코드 설계 및 실행 프로세스를 이해해야 합니다.

우선:

dataframe.write
로그인 후 복사

write 메서드를 호출하면 클래스가 반환됩니다: DataFrameWriter

주로 DataFrameWriter는 SparkSQL이 연결할 클래스를 전달하는 항목이기 때문입니다. 외부 데이터 소스에 다음 내용이 등록됩니다. 정보 전달

mysql을 작성할 때 Spark SQL 지원 업데이트 작업을 만드는 방법

그런 다음 save() 작업을 시작한 후 데이터 쓰기를 시작합니다.

다음으로 save() 소스 코드를 살펴보세요.

위 소스 코드에서 가장 중요한 것은 DataSource 인스턴스를 등록한 다음 DataSource의 write 메소드를 사용하여 데이터를 쓰는 것입니다mysql을 작성할 때 Spark SQL 지원 업데이트 작업을 만드는 방법

DataSource를 인스턴스화할 때:

def save(): Unit = {
    assertNotBucketed("save")
    val dataSource = DataSource(
      df.sparkSession,
      className = source,//自定义数据源的包路径
      partitionColumns = partitioningColumns.getOrElse(Nil),//分区字段
      bucketSpec = getBucketSpec,//分桶(用于hive)
      options = extraOptions.toMap)//传入的注册信息
    //mode:插入数据方式SaveMode , df:要插入的数据
    dataSource.write(mode, df)
  }
로그인 후 복사

그 다음에는 dataSource.write(mode, df) 전체 논리는 다음과 같습니다.

providingClass.newInstance() 일치에 따라 모드를 만든 다음 일치하는 곳마다 코드를 실행합니다.

그런 다음 ProvideClass가 무엇인지 살펴보세요. mysql을 작성할 때 Spark SQL 지원 업데이트 작업을 만드는 방법

mysql을 작성할 때 Spark SQL 지원 업데이트 작업을 만드는 방법

패키지 path.DefaultSource를 가져온 후 프로그램은 다음을 입력합니다. mysql을 작성할 때 Spark SQL 지원 업데이트 작업을 만드는 방법

그런 다음 데이터베이스가 쓰기 대상으로 사용되는 경우 dataSource.createRelation으로 이동하고 소스 코드를 직접 따릅니다. mysql을 작성할 때 Spark SQL 지원 업데이트 작업을 만드는 방법

는 분명히 특성이므로 해당 특성이 구현된 곳으로 프로그램이 이동합니다. mysql을 작성할 때 Spark SQL 지원 업데이트 작업을 만드는 방법

이를 구현합니다. 특이점은 다음과 같습니다. path.DefaultSource, 그런 다음 여기에서 데이터 삽입 및 업데이트 지원 작업을 구현합니다. code

코드 흐름에 따르면 mysql에 데이터를 쓰는 SparkSQL의 최종 작업은 Enter: package path.DefaultSource class

즉, 이 클래스는 Spark의 일반 삽입 작업(SaveMode)과 업데이트 작업을 모두 지원해야 합니다.

sparksql이 업데이트 작업을 지원하는 경우 가장 중요한 것은 판단을 내리는 것입니다. 예를 들면 다음과 같습니다.

if(isUpdate){
    sql语句:INSERT INTO student (columns_1,columns_2)VALUES ('第一个字段值','第二个字段值') ON DUPLICATE KEY UPDATE columns_1 = '呵呵哒',columns_2 = '哈哈哒';
}else{
    insert into student (columns_1 , columns_2 , ...) values (? , ? , ....)
}
로그인 후 복사

그러나 Spark 프로덕션 sql 문의 소스 코드에는 다음과 같이 작성되어 있습니다.

최종적으로 하나를 생성하는 판단 논리가 없습니다.

INSERT INTO TABLE (字段1 , 字段2....) VALUES (? , ? ...)
로그인 후 복사

첫 번째 작업은 만드는 방법입니다. 현재 코드는 다음을 지원합니다. ON DUPLICATE KEY UPDATEmysql을 작성할 때 Spark SQL 지원 업데이트 작업을 만드는 방법

대담한 디자인을 만들 수 있습니다. 즉, 다음과 같은 판단을 내릴 수 있습니다. insertStatement 메소드

def insertStatement(conn: Connection, savemode:CustomSaveMode , table: String, rddSchema: StructType, dialect: JdbcDialect)
      : PreparedStatement = {
    val columns = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)).mkString(",")
    val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
    if(savemode == CustomSaveMode.update){
        //TODO 如果是update,就组装成ON DUPLICATE KEY UPDATE的模式处理
        s"INSERT INTO $table ($columns) VALUES ($placeholders) ON DUPLICATE KEY UPDATE $duplicateSetting"
    }esle{
        val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
        conn.prepareStatement(sql)
    }
    
  }
로그인 후 복사

이런 식으로 사용자가 전달한 savemode 모드에서 검증을 수행하고 업데이트 작업이면 해당 SQL 문을 반환합니다!

위의 논리에 따라 코드는 다음과 같이 작성됩니다.

이러한 방식으로 해당 SQL 문을 얻습니다.

그러나 이 SQL 문만으로는 충분하지 않습니다. 왜냐하면 jdbc의 prepareStatement 작업이 스파크에서 실행됩니다. 여기에는 커서가 포함됩니다. mysql을 작성할 때 Spark SQL 지원 업데이트 작업을 만드는 방법

즉, jdbc가 이 SQL을 탐색할 때 소스 코드는 다음을 수행합니다.

mysql을 작성할 때 Spark SQL 지원 업데이트 작업을 만드는 방법

看下makeSetter:

mysql을 작성할 때 Spark SQL 지원 업데이트 작업을 만드는 방법

所谓有坑就是:

insert into table (字段1 , 字段2, 字段3) values (? , ? , ?)
로그인 후 복사

那么当前在源码中返回的数组长度应该是3:

val setters: Array[JDBCValueSetter] = rddSchema.fields.map(_.dataType)
        .map(makeSetter(conn, dialect, _)).toArray
로그인 후 복사

但是如果我们此时支持了update操作,既:

insert into table (字段1 , 字段2, 字段3) values (? , ? , ?) ON DUPLICATE KEY UPDATE 字段1 = ?,字段2 = ?,字段3=?;
로그인 후 복사

那么很明显,上面的sql语句提供了6个? , 但在规定字段长度的时候只有3

mysql을 작성할 때 Spark SQL 지원 업데이트 작업을 만드는 방법

这样的话,后面的update操作就无法执行,程序报错!

所以我们需要有一个 识别机制,既:

if(isupdate){
    val numFields = rddSchema.fields.length * 2
}else{
    val numFields = rddSchema.fields.length
}
로그인 후 복사

mysql을 작성할 때 Spark SQL 지원 업데이트 작업을 만드는 방법

row[1,2,3] setter(0,1) //index of setter , index of row setter(1,2) setter(2,3) setter(3,1) setter(4,2) setter(5,3)

所以在prepareStatment中的占位符应该是row的两倍,而且应该是类似这样的一个逻辑

因此,代码改造前样子:

mysql을 작성할 때 Spark SQL 지원 업데이트 작업을 만드는 방법

mysql을 작성할 때 Spark SQL 지원 업데이트 작업을 만드는 방법

改造后的样子:

try {
      if (supportsTransactions) {
        conn.setAutoCommit(false) // Everything in the same db transaction.
        conn.setTransactionIsolation(finalIsolationLevel)
      }
//      val stmt = insertStatement(conn, table, rddSchema, dialect)
      //此处采用最新自己的sql语句,封装成prepareStatement
      val stmt = conn.prepareStatement(sqlStmt)
      println(sqlStmt)
      /**
        * 在mysql中有这样的操作:
        * INSERT INTO user_admin_t (_id,password) VALUES ('1','第一次插入的密码')
        * INSERT INTO user_admin_t (_id,password)VALUES ('1','第一次插入的密码') ON DUPLICATE KEY UPDATE _id = 'UpId',password = 'upPassword';
        * 如果是下面的ON DUPLICATE KEY操作,那么在prepareStatement中的游标会扩增一倍
        * 并且如果没有update操作,那么他的游标是从0开始计数的
        * 如果是update操作,要算上之前的insert操作
        * */
        //makeSetter也要适配update操作,即游标问题

      val isUpdate = saveMode == CustomSaveMode.Update
      val setters: Array[JDBCValueSetter] = isUpdate match {
        case true =>
          val setters: Array[JDBCValueSetter] = rddSchema.fields.map(_.dataType)
            .map(makeSetter(conn, dialect, _)).toArray
          Array.fill(2)(setters).flatten
        case _ =>
          rddSchema.fields.map(_.dataType)
      val numFieldsLength = rddSchema.fields.length
      val numFields = isUpdate match{
        case true => numFieldsLength *2
        case _ => numFieldsLength
      val cursorBegin = numFields / 2
      try {
        var rowCount = 0
        while (iterator.hasNext) {
          val row = iterator.next()
          var i = 0
          while (i < numFields) {
            if(isUpdate){
              //需要判断当前游标是否走到了ON DUPLICATE KEY UPDATE
              i < cursorBegin match{
                  //说明还没走到update阶段
                case true =>
                  //row.isNullAt 判空,则设置空值
                  if (row.isNullAt(i)) {
                    stmt.setNull(i + 1, nullTypes(i))
                  } else {
                    setters(i).apply(stmt, row, i, 0)
                  }
                  //说明走到了update阶段
                case false =>
                  if (row.isNullAt(i - cursorBegin)) {
                    //pos - offset
                    stmt.setNull(i + 1, nullTypes(i - cursorBegin))
                    setters(i).apply(stmt, row, i, cursorBegin)
              }
            }else{
              if (row.isNullAt(i)) {
                stmt.setNull(i + 1, nullTypes(i))
              } else {
                setters(i).apply(stmt, row, i ,0)
            }
            //滚动游标
            i = i + 1
          }
          stmt.addBatch()
          rowCount += 1
          if (rowCount % batchSize == 0) {
            stmt.executeBatch()
            rowCount = 0
        }
        if (rowCount > 0) {
          stmt.executeBatch()
      } finally {
        stmt.close()
        conn.commit()
      committed = true
      Iterator.empty
    } catch {
      case e: SQLException =>
        val cause = e.getNextException
        if (cause != null && e.getCause != cause) {
          if (e.getCause == null) {
            e.initCause(cause)
          } else {
            e.addSuppressed(cause)
        throw e
    } finally {
      if (!committed) {
        // The stage must fail.  We got here through an exception path, so
        // let the exception through unless rollback() or close() want to
        // tell the user about another problem.
        if (supportsTransactions) {
          conn.rollback()
        conn.close()
      } else {
        // The stage must succeed.  We cannot propagate any exception close() might throw.
        try {
          conn.close()
        } catch {
          case e: Exception => logWarning("Transaction succeeded, but closing failed", e)
로그인 후 복사
// A `JDBCValueSetter` is responsible for setting a value from `Row` into a field for
  // `PreparedStatement`. The last argument `Int` means the index for the value to be set
  // in the SQL statement and also used for the value in `Row`.
  //PreparedStatement, Row, position , cursor
  private type JDBCValueSetter = (PreparedStatement, Row, Int , Int) => Unit

  private def makeSetter(
      conn: Connection,
      dialect: JdbcDialect,
      dataType: DataType): JDBCValueSetter = dataType match {
    case IntegerType =>
      (stmt: PreparedStatement, row: Row, pos: Int,cursor:Int) =>
        stmt.setInt(pos + 1, row.getInt(pos - cursor))
    case LongType =>
        stmt.setLong(pos + 1, row.getLong(pos - cursor))
    case DoubleType =>
        stmt.setDouble(pos + 1, row.getDouble(pos - cursor))
    case FloatType =>
        stmt.setFloat(pos + 1, row.getFloat(pos - cursor))
    case ShortType =>
        stmt.setInt(pos + 1, row.getShort(pos - cursor))
    case ByteType =>
        stmt.setInt(pos + 1, row.getByte(pos - cursor))
    case BooleanType =>
        stmt.setBoolean(pos + 1, row.getBoolean(pos - cursor))
    case StringType =>
//        println(row.getString(pos))
        stmt.setString(pos + 1, row.getString(pos - cursor))
    case BinaryType =>
        stmt.setBytes(pos + 1, row.getAs[Array[Byte]](pos - cursor))
    case TimestampType =>
        stmt.setTimestamp(pos + 1, row.getAs[java.sql.Timestamp](pos - cursor))
    case DateType =>
        stmt.setDate(pos + 1, row.getAs[java.sql.Date](pos - cursor))
    case t: DecimalType =>
        stmt.setBigDecimal(pos + 1, row.getDecimal(pos - cursor))
    case ArrayType(et, _) =>
      // remove type length parameters from end of type name
      val typeName = getJdbcType(et, dialect).databaseTypeDefinition
        .toLowerCase.split("\\(")(0)
        val array = conn.createArrayOf(
          typeName,
          row.getSeq[AnyRef](pos - cursor).toArray)
        stmt.setArray(pos + 1, array)
    case _ =>
      (_: PreparedStatement, _: Row, pos: Int,cursor:Int) =>
        throw new IllegalArgumentException(
          s"Can&#39;t translate non-null value for field $pos")
  }
로그인 후 복사

위 내용은 mysql을 작성할 때 Spark SQL 지원 업데이트 작업을 만드는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

관련 라벨:
원천:yisu.com
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿