栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 前沿技术 > 大数据 > 大数据系统

windows 下 oracle goldengate 到 kafka 复制单机测试配置

大数据系统 更新时间:发布时间: 百科书网 趣学号
复制拓扑

1. 软件版本

操作系统:  window 7
数据库:   Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production

Oracle GoldenGate 12.3.0.1.5 for Oracle on Windows 64 bit.zip

Oracle GoldenGate for Big Data 12.3.2.1.1 for Microsoft Windows x64.zip

JDK8

kafka_2.13-2.8.0.tgz

2. 安装及配置

2.1配置环境变量

 

 goldengate 需要一些oracle 的库

 Path 后追加

;%JAVA_HOME%jrebinserver;%ORACLE_HOME%bin;F:DATAbase11gogg12big;

注意分隔符 ; F:DATAbase11gogg12big 是我安装 ogg big data 的目录

%JAVA_HOME%jrebinserver 是 GoldenGate for Big Data 启动java 虚拟机的环境配置

2.2 解压 Oracle GoldenGate 12.3.0.1.5 for Oracle on Windows 64 bit.zip 后安装

 2.3 安装 GoldenGate for Big Data

解压 Oracle GoldenGate for Big Data 12.3.2.1.1 for Microsoft Windows x64.zip 到 F:DATAbase11gogg12big 目录即可

2.4 安装 kafka

解压 kafka 到 F:DATAbase11gkafka_2.13-2.8.0

修改F:DATAbase11gkafka_2.13-2.8.0configzookeeper.properties 数据目录

dataDir=F:/DATAbase/11g/kafka_2.13-2.8.0/data/zookeeper

修改 F:DATAbase11gkafka_2.13-2.8.0configserver.properties 日志目录

log.dirs=F:/DATAbase/11g/kafka_2.13-2.8.0/data/kafka-logs

3. 配置goldengate 3.1 数据库设置

SELECt force_logging,
       supplemental_log_data_min min,
       supplemental_log_data_pk pk,
       supplemental_log_data_ui ui,
       supplemental_log_data_fk fk,
       supplemental_log_data_all allc
FROM v$database;

其中 数据库必须是归档模式而且 supplemental_log_data_min 必须为Y

ALTER DATAbase ADD SUPPLEMENTAL LOG DATA;
ALTER SYSTEM SWITCH LOGFILE;

3.2 给数据库用户 stock 设置权限

grant dba to stock

3.3  配置源端 

运行 F:DATAbase11gogg12ggsci.exe 

-- 登录数据库

DBLOGIN USERID stock, PASSWORD 123456

ADD SCHEMATRANDATA stock ALLCOLS

--添加一个复制表
ADD TRANDATA stock.stock
-- 再添加一个表
ADD TRANDATA stock.test

--编辑参数

EDIT PARAMS MGR

--------------------------

PORT 7809
AUTOSTART Extract *
LAGINFOMINUTES 5
LAGCRITICALMINUTES 5

-----------------------------


1.编辑 extract exkafka 的参数,其中 exttrail dirdatt1 指定了本地输出目录

edit param exkafka
----------------------------------------
EXTRACT exkafka
dynamicresolution
USERID stock, PASSWORD 123456
exttrail dirdat/t1
TABLE stock.stock;
--------------------------------------------


2. 添加extract 进程 ETTND001
-- delete  extract exkafka
add extract exkafka,tranlog,begin now

3. 添加trail文件的定义与extract进程绑定 

--将本地的 exttrail dirdatt1  输出目录的定义与extract进程绑定 
--意思是 EXTRACT ETTND001 导出的文件放到  exttrail dirdatt1 目录下
-- DELETE exttrail dirdat/t1 EXTRACT exkafka


ADD exttrail dirdat/t1, EXTRACT exkafka

4. 设置pump

4.1   pump进程本质上来说也是一个extract,只不过他的作用仅仅是把trail文件传递到目标端,配置过程和extract进程类似,只是逻辑上称之为pump进程

edit param pukafka
-------------------------------------------
extract pukafka
passthru
dynamicresolution
USERID stock, PASSWORD 123456
rmthost localhost mgrport 17809
rmttrail dirdat/t1
TABLE stock.stock;
----------------------------------------

4.2 分别将本地trail文件和目标端的trail文件绑定到extract进程:
-- exttrailsource dirdatt1 指定了 第三部指定的内容
-- delete extract pukafka,exttrailsource dirdat/t1
--同 add extract exkafka,tranlog,begin now 一样效果
     add extract pukafka,exttrailsource dirdat/t1


-- delete  rmttrail dirdat/t1 extract pukafka
4.3
--ADD RMTTRAIL dirdat/t1 EXTRACT EPTND001, MEGABYTES 600
add rmttrail dirdat/t1 extract pukafka  , MEGABYTES 600
 

3.4 配置目标端

运行 F:DATAbase11gogg12bigggsci.exe

EDIT PARAMS MGR

--------------------------------------
PORT 17809
AUTOSTART REPLICAT *
LAGINFOMINUTES 5
LAGCRITICALMINUTES 5

------------------------------------------------
edit  param  ./GLOBALS
CHECKPOINTTABLE stock.checkpoint


--添加进程

-- delete REPLICAT RFDLD001, EXTTRAIL ./dirdat/t1, CHECKPOINTTABLE stock.checkpoint
ADD REPLICAT RFDLD001 , EXTTRAIL ./dirdat/t1, CHECKPOINTTABLE stock.checkpoint

--编辑参数
edit param RFDLD001
--------------------------------------------------
REPLICAT RFDLD001
TARGETDB LIBFILE ggjava.dll SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP stock.*, TARGET stock.*;
--------------------------------------------------

--kafka.props 文件内容

gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
#The following resolves the topic name using the short table name
gg.handler.kafkahandler.topicMappingTemplate=${tableName}
#The following selects the message key using the concatenated primary keys
gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.SchemaTopicName=stockSchemaTopic
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=op


goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE

gg.log=log4j
gg.log.level=INFO

gg.report.time=30sec

#Sample gg.classpath for Apache Kafka,notice windows classpath split is ;
#gg.classpath=dirprm/;f:/DATAbase/11g/kafka_2.11-1.1.0/libs/*
gg.classpath=dirprm/;f:/DATAbase/11g/kafka_2.13-2.8.0/libs/*
#Sample gg.classpath for HDP
#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar 

custom_kafka_producer.properties  配置如下

bootstrap.servers=localhost:9092
acks=1
reconnect.backoff.ms=1000

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size=16384
linger.ms=10000

最终的参数文件列表如下

 4. 启动测试 4.1源端启动 

start mgr

4.2目标端启动 

start mgr

start *

 4.3 启动kafka  

在 F:DATAbase11gkafka_2.13-2.8.0binwindows 目录下 cmd中执行

start zookeeper-server-start.bat ....configzookeeper.properties
start kafka-server-start.bat ....configserver.properties

执行消费端

F:DATAbase11gkafka_2.13-2.8.0binwindows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic TEST --from-beginning
{"table":"STOCK.TEST","op_type":"I","op_ts":"2021-09-28 16:00:20.000000","current_ts":"2021-09-28T16:00:27.109000","pos"

4.4 在数据库端插入数据测试


 

对应于 kafka.proprs

 其中 

gg.handler.kafkahandler.topicMappingTemplate=${tableName} 会为每张复制的表生产一个队列

gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic   是元数据队列

 查看kafka 中的队列

 其中 STOCK和TEST 是 复制的两个表数据队列

stockSchemaTopic  是数据库元数据变更队列

kafka-topics.bat --list --zookeeper localhost:2181
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic stockTopic --from-beginning

转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/279936.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号