概述
1、Oracle数据准备
(1)将Oracle.sql文件导入到Oracle
--在PLSQL中打开命令执行窗口
@ @E:实时数据仓库2、脚本文件/Oracle.sql --脚本路径
(2)查看表数据
select * from base_province
2、测试环境准备
(1)创建文件夹,下面要用到
E:oracledb_recovery_file_dest
E:oraclenamespace
(2)以DBA身份连接到数据库
- Windows打开CMD运行SqlPlus,以DBA身份连接数据库
sqlplus sys/123456 AS SYSDBA
(3)开启日志归档
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = 'E:oracledb_recovery_file_dest' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
- 查看日志是否归档
archive log list;
(4)启用补充日志记录(库表)
ALTER TABLE scott.emp ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
(5)创建表空间
CREATE TABLESPACE logminer_tbs DATAFILE 'E:ORACLEnamespacelogminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
(6)创建用户及授予权限
-- 创建用户family绑定表空间LOGMINER_TBS
CREATE USER family IDENTIFIED BY chickenkang DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
-- 授予family用户dba的权限
grant connect,resource,dba to family;
-- 并授予权限
GRANT CREATE SESSION TO family;
GRANT SELECT ON V_$DATABASE to family;
GRANT FLASHBACK ANY TABLE TO family;
GRANT SELECT ANY TABLE TO family;
GRANT SELECT_CATALOG_ROLE TO family;
GRANT EXECUTE_CATALOG_ROLE TO family;
GRANT SELECT ANY TRANSACTION TO family;
GRANT EXECUTE ON SYS.DBMS_LOGMNR TO family;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO family;
GRANT CREATE TABLE TO family;
GRANT LOCK ANY TABLE TO family;
GRANT ALTER ANY TABLE TO family;
GRANT CREATE SEQUENCE TO family;
GRANT EXECUTE ON DBMS_LOGMNR TO family;
GRANT EXECUTE ON DBMS_LOGMNR_D TO family;
GRANT SELECT ON V_$LOG TO family;
GRANT SELECT ON V_$LOG_HISTORY TO family;
GRANT SELECT ON V_$LOGMNR_LOGS TO family;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO family;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO family;
GRANT SELECT ON V_$LOGFILE TO family;
GRANT SELECT ON V_$ARCHIVED_LOG TO family;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO family;
(7)创建user表,修改user表让其支持增量日志
ALTER TABLE family.emp ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
3、编写脚本
(1)pom依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc10</artifactId>
<version>19.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
(2)代码编写
package com.hxjy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
public class OracleSourceExample {
public static void main(String[] args) throws Exception {
SourceFunction<String> sourceFunction = OracleSourceExample.<String>builder()
.hostname("localhost")
.port(1521)
.database("ORCL") // monitor XE database
.schemaList("family") // monitor inventory schema
//.tableList("inventory.products") // monitor products table
.username("family")
.password("chickenkang")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.addSource(sourceFunction)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("FlinkCDCOracle");
}
}
(3)运行结果
最后
以上就是忐忑方盒为你收集整理的第三章 数据采集专题之FlinkCDC实时采集Oracle的全部内容,希望文章能够帮你解决第三章 数据采集专题之FlinkCDC实时采集Oracle所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复