> 데이터 베이스 > Oracle > Flink CDC 시리즈(마이닝 및 튜닝 사례)에서 Oracle 데이터의 실시간 추출에 대한 가장 체계적인 숙달

Flink CDC 시리즈(마이닝 및 튜닝 사례)에서 Oracle 데이터의 실시간 추출에 대한 가장 체계적인 숙달

WBOY
풀어 주다: 2022-01-18 17:59:09
앞으로
3826명이 탐색했습니다.

이 기사에서는 Oracle의 실시간 데이터 캡처 및 성능 조정에 대해 설명하고 평가판 과정에서 몇 가지 주요 세부 정보를 공유합니다. 모든 사람에게 도움이 되기를 바랍니다.

Flink CDC 시리즈(마이닝 및 튜닝 사례)에서 Oracle 데이터의 실시간 추출에 대한 가장 체계적인 숙달

Flink CDC는 내장 Debezium 구성 요소를 도입하여 Oracle에 대한 지원을 추가하는 최신 버전 2.1을 2021년 11월 15일에 출시했습니다. 저자는 시험 사용을 위해 즉시 이 버전을 다운로드했으며 Oracle의 실시간 데이터 캡처 및 성능 튜닝을 성공적으로 구현했습니다. 이제 시험 과정에서 몇 가지 주요 세부 사항을 공유하겠습니다.

평가판 환경:

Oracle: 11.2.0.4.0(RAC 배포)

Flink: 1.13.1

Hadoop: 3.2.1

Yarn에서 Flink를 통해 배포 및 사용

1. 연결할 수 없습니다. 데이터베이스에

공식 문서에 따르면 Flink SQL CLI에 다음 명령문을 입력합니다.

create table TEST (A string)
WITH ('connector'='oracle-cdc',
    'hostname'='10.230.179.125',
    'port'='1521',
    'username'='myname',
    'password'='***',
    'database-name'='MY_SERVICE_NAME',
    'schema-name'='MY_SCHEMA',
    'table-name'='TEST' );
로그인 후 복사

그런 다음 TEST에서 select *를 통해 관찰해 보니 정상적으로 Oracle에 연결할 수 없는 것을 확인합니다.

[ERROR] Could not execute SQL statement. Reason:
oracle.net.ns.NetException: Listener refused the connection with the following error:
ORA-12505, TNS:listener does not currently know of SID given in connect descriptor
로그인 후 복사

오류 메시지로 판단하면 Flink CDC가 연결 정보에 제공된 MY_SERVICE_NAME(오라클 서비스 이름)을 SID로 잘못 착각했기 때문일 수 있습니다. 그래서 Flink CDC와 관련된 Oracle Connector의 소스코드를 읽어보니 com.ververica.cdc.connectors.oracle.OracleValidator에서 Oracle 연결을 위한 코드가 다음과 같은 것을 발견했습니다.

public static Connection openConnection(Properties properties) throws SQLException {
    DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
    String hostname = properties.getProperty("database.hostname");
    String port = properties.getProperty("database.port");
    String dbname = properties.getProperty("database.dbname");
    String userName = properties.getProperty("database.user");
    String userpwd = properties.getProperty("database.password");
    return DriverManager.getConnection(
            "jdbc:oracle:thin:@" + hostname + ":" + port + ":" + dbname, userName, userpwd);
}
로그인 후 복사

에서 볼 수 있듯이 위의 Flink CDC 현재 버전에서는 SID와 서비스 이름의 연결 방법에 차이가 없지만 SID의 연결 방법은 코드에 직접 작성됩니다(즉, 포트와 db 이름은 ":"으로 구분됩니다). .

Oracle 8i부터 Oracle은 데이터베이스의 클러스터(RAC) 배포를 지원하기 위해 서비스 이름 개념을 도입했습니다. 서비스 이름은 데이터베이스의 다양한 SID 인스턴스에 대한 연결을 통합하기 위해 데이터베이스의 논리적 개념으로 사용될 수 있습니다. 이를 바탕으로 다음 두 가지 방법을 고려할 수 있습니다.

Flink CDC의 테이블 생성 문에서 데이터베이스 이름을 서비스 이름으로 바꾸고 SID 중 하나로 바꿉니다. 이 방법은 연결 문제를 해결할 수 있지만 주류 Oracle 클러스터 배포의 실제 시나리오에 적용할 수는 없습니다.

소스 코드를 수정하세요. 구체적으로 말하면, 새 프로젝트에서 com.ververica.cdc.connectors.oracle.OracleValidator 메소드를 다시 작성하여 Service Name의 연결 메소드로 변경할 수 있습니다(즉, "/"를 사용하여 포트와 db 이름을 구분함). :

"jdbc :oracle:thin:@" + 호스트 이름 + ":" + 포트 + "/" + dbname, userName, userpwd);

저자는 액세스를 유지하면서 데이터베이스에 대한 정상적인 연결을 달성하기 위해 두 번째 방법을 사용합니다. 이름 속성의 Oracle 서비스 사용량에 적용됩니다.

2. Oracle 테이블을 찾을 수 없습니다

위의 단계를 수행하고 TEST에서 *를 선택하여 다시 관찰하면 다음과 같이 오류가 보고됩니다.

[ERROR] Could not execute SQL statement. Reason:
io.debezium.DebeziumException: Supplemental logging not configured for table MY_SERVICE_NAME.MY_SCHEMA.test.  Use command: ALTER TABLE MY_SCHEMA.test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS
로그인 후 복사

오류 로그에 언급된 테이블이 MY_SERVICE_NAME .MY_SCHEMA.test인 것을 확인했는데 데이터베이스 이름과 스키마 이름은 대문자인데 테이블 이름은 소문자인 이유는 무엇인가요?

이 오류는 io.debezium 패키지에서 보고된다는 점에 유의하세요. 패키지의 소스 코드를 분석하면(Flink CDC의 pom.xml 파일에서 확인할 수 있으며 현재 debezium 1.5.4 버전이 사용됨) io.debezium.relational에 .Tables에 다음과 같은 코드가 있다는 것을 알 수 있습니다:

private TableId toLowerCaseIfNeeded(TableId tableId) {
    return tableIdCaseInsensitive ? tableId.toLowercase() : tableId;
}
로그인 후 복사

Debezium 개발자들은 "대소문자 구분 안 함"을 "테이블 이름을 소문자로 변환해야 합니다"로 일률적으로 정의한 것을 볼 수 있습니다. Debezium에서 지원하는 PostgreSQL, Mysql 등의 경우에도 마찬가지입니다. 그러나 Oracle 데이터베이스의 경우 "대소문자 구분 안 함"은 내부 메타정보를 저장할 때 테이블 이름을 대문자로 변환해야 함을 의미합니다. 소문자 테이블 이름을 읽으려고 할 때 오류가 발생했습니다.

Debezium은 최신 안정 버전 1.7.1과 최신 개발 버전 1.8.0까지 이 문제를 해결하지 않았으므로 다음 두 가지 방법을 통해 이 문제를 우회할 수 있습니다.

Oracle "대소문자를 구분하지 않음"을 사용해야 하는 경우 "민감한" 기능을 사용하면 소스 코드를 직접 수정하고 위의 toLowercase를 toUppercase로 변경할 수 있습니다. (이것도 작성자가 선택한 방법입니다.)

소스 코드 수정을 원하지 않고 Oracle의 " 대소문자를 구분하지 않음" 기능을 사용하려면 다음 예와 같이 Add 'debezium.database.tablename.case.insensitive'='false'에서 create 문을 사용할 수 있습니다.

create table TEST (A string)
WITH ('connector'='oracle-cdc',
    'hostname'='10.230.179.125',
    'port'='1521',
    'username'='myname',
    'password'='***',
    'database-name'='MY_SERVICE_NAME',
'schema-name'='MY_SCHEMA',
'table-name'='TEST',
'debezium.database.tablename.case.insensitive'='false' );
로그인 후 복사

이 방법의 단점은 Oracle의 " 대소문자를 구분하지 않는" 기능은 'table-name'에 사용해야 합니다. 대문자 테이블 이름을 명시적으로 지정합니다.

database.tablename.case.insensitive 매개변수의 경우 Debezium은 현재 Oracle 11g에 대해 기본적으로 true로 설정하고 다른 Oracle 버전에서는 기본적으로 false로 설정한다는 점에 유의해야 합니다. 따라서 독자가 Oracle 11g 버전을 사용하지 않는 경우 이 매개변수를 수정할 필요는 없지만 여전히 대문자 테이블 이름을 명시적으로 지정해야 합니다.

3. 데이터 지연이 큼데이터 지연이 큼, 때로는 데이터 변경 사항을 캡처하는 데 3~5분이 소요됩니다. 이 문제에 대한 명확한 해결책은 Flink CDC FAQ에 나와 있습니다. create 문에 다음 두 가지 구성 항목을 추가하세요.

'debezium.log.mining.strategy'='online_catalog',
'debezium.log.mining.continuous.mine'='true'
로그인 후 복사

那么为什么要这样做呢?我们依然可以通过分析源码和日志,结合 Oracle Logminer 的工作原理来加深对工具的理解。

对 Logminer 的抽取工作,主要在 Debezium 的 io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource 中 execute 方法进行。为节约篇幅,本文不列出实际的源码,仅提炼出关键过程绘于下面的流程图,有兴趣的读者可以对照该流程图,结合实际源码进行分析:

Flink CDC 시리즈(마이닝 및 튜닝 사례)에서 Oracle 데이터의 실시간 추출에 대한 가장 체계적인 숙달

采用 redo_log_catalog 的方式,可以监控数据表的 DDL 信息,且由于 archive logs 被永久保存到磁盘上,可以在数据库宕机后依然正常获取到宕机前的所有 DDL 和 DML 操作。但由于涉及到比 online catalog 更多的信息监控,以及由此带来的频繁的日志切换和日志转储操作,其代价也是惊人的。

根据笔者实际测试情况,如果 debezium.log.mining.strategy 为默认配置 redo_log_catalog,则不仅需要多执行第 ① 步操作 (该操作耗时约为半分钟到 1 分钟之间),在第 ④ 步,根据 archived logs 的数据量,耗时也会在 1 分钟到 4 分钟之间浮动;在第 ⑤ 步,实际查询 V$LOGMNR_CONTENTS 视图也常常需要十几秒才能完成。

此外,由于 archive logs 在实际系统中增长速度较快,因此在实际使用中,常会配合进行定期删除或转储过期日志的操作。由于上述第 ④ 步的耗时较长,笔者观察到在第 ④ 步执行的过程中,在一定概率下会发生第 ② 步加入的a rchive logs 已过期而被删除转储的情况,于是在第 ⑤ 步查询的时候,会由于找不到第 ② 步加入的日志,而报下面的错误:

ORA-00308: cannot open archive log '/path/to/archive/log/...'
ORA-27037: unable to obtain file status
로그인 후 복사

一般来说,Flink CDC 所需要监控的表,特别是对于业务系统有重大意义的表,一般不会进行 DDL 操作,仅需要捕捉 DML 操作即可,且对于数据库宕机等极特殊情况,也可使用在数据库恢复后进行全量数据更新的方式保障数据的一致性。因而,online_catalog 的方式足以满足我们的需要。

另外,无论使用 online_catalog,还是默认的 redo_log_catalog,都会存在第 ② 步找到的日志和第 ⑤ 步实际需要的日志不同步的问题,因此,加入 'debezium.log.mining.continuous.mine'='true' 参数,将实时搜集日志的工作交给 Oracle 自动完成,即可规避这一问题。

笔者按照这两个参数配置后,数据延迟一般可以从数分钟降至 5 秒钟左右。

四、调节参数继续降低数据延迟

上述流程图的第 ③ 步和第 ⑦ 步,提到了根据配置项来确定 LogMiner 监控时序范围,以及确定休眠时间。下面对该过程进行进一步分析,并对单个表的进一步调优给出一般性的方法论。

通过观察 io.debezium.connector.oracle.logminer.LogMinerHelper 类中的 getEndScn 方法,可了解到 debezium 对监控时序范围和休眠时间的调节原理。为便于读者理解,将该方法用流程图说明如下:

Flink CDC 시리즈(마이닝 및 튜닝 사례)에서 Oracle 데이터의 실시간 추출에 대한 가장 체계적인 숙달

从上述的流程图中可以看出,debezium 给出 log.mining.batch.size.* 和 log.mining.sleep.time.* 两组参数,就是为了让每一次 logMiner 运行的步长能够尽可能和数据库自身 SCN 增加的步长一致。由此可见:

log.mining.batch.size.* 和 log.mining.sleep.time.* 参数的设定,和数据库整体的表现有关,和单个表的数据变化情况无关;

log.mining.batch.size.default 不仅仅是监控时序范围的起始值,还是监控时序范围变化的阈值。所以如果要实现更灵活的监控时序范围调整,可考虑适当减小该参数;

由于每一次确定监控时序范围时,都会根据 topScn 和 currentScn 的大小来调整 sleepTime,所以为了实现休眠时间更灵活的调整,可考虑适当增大 log.mining.sleep.time.increment.ms;

log.mining.batch.size.max 不能过小,否则会有监控时序范围永远无法追上数据库当前 SCN 的风险。为此,debezium 在 io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics 中存在以下逻辑:

if (currentBatchSize == batchSizeMax) {
    LOGGER.info("LogMiner is now using the maximum batch size {}. This could be indicative of large SCN gaps", currentBatchSize);
}
로그인 후 복사

如果当前的监控时序范围达到了 log.mining.batch.size.max,那么 debezium 会在日志中给出如上提示。在实际应用中,观察 Flink CDC 产生的 log 是否包含该提示,便可得知 log.mining.batch.size.max 的值是否合理。

五、Debezium Oracle Connector 的隐藏参数

사실 위에서 우리는 두 개의 숨겨진 매개변수인 debezium.database.tablename.case.insensitive(두 번째 섹션 참조)와 debezium.log.mining.continuous.mine(세 번째 섹션 참조)에 대해 배웠습니다. Debezium의 공식 문서에는 실제로 설명되어 있지 않지만 실제로 사용할 수 있습니다. 소스 코드를 분석하면 이제 Debezium Oracle Connector의 모든 숨겨진 매개 변수가 제공되며 해당 설명은 다음과 같습니다.

Flink CDC 시리즈(마이닝 및 튜닝 사례)에서 Oracle 데이터의 실시간 추출에 대한 가장 체계적인 숙달

저자는 위에서 사용한 두 매개 변수 외에도 log.mining.history가 있다고 믿습니다. .recorder.class 매개변수에도 주목할 가치가 있습니다. 이 매개변수는 현재 빈 클래스인 io.debezium.connector.oracle.logminer.NeverHistoryRecorder로 기본 설정되어 있으므로 Flink CDC 동작을 분석할 때 io.debezium.connector.oracle.logminer.HistoryRecorder 인터페이스를 구현하는 클래스를 사용자 정의합니다. , 소스 코드를 수정하지 않고도 Flink CDC 동작에 대한 개인화된 모니터링을 실현할 수 있습니다.

추천 튜토리얼: "Oracle Tutorial"

위 내용은 Flink CDC 시리즈(마이닝 및 튜닝 사례)에서 Oracle 데이터의 실시간 추출에 대한 가장 체계적인 숙달의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

관련 라벨:
원천:juejin.im
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿