
感谢松鼠会大佬的再三邀请。对我来说这算是一篇命题作文,那么我的答案是什么呢?刚好我也很喜欢另外一个松鼠社区,那么就用两只松鼠来做答案吧,没错,Flink和OpenGauss就是我的答案:
手把手完成一次OpenGauss的安装
手把手构建 Flink connector opengauss
操作系统建议使用 openEuler20.03LTS ,注意不要使用sp包,可能会因为sp包依赖升级,而导致安装失败。使用下面安装脚本,我对官方脚本进行了一些修改,适合做单机实验,不再需要手动设置ip,而是直接将 eth0 的 ip 作为数据库ip使用。
#!/bin/bash
## Author: 贾军锋
## update: dafei1288 @ 2021-09-27
## Date: 2021-04-15
## OS: openEuler20.03LTS [最小硬件配置:2c/4G]
## Database:openGauss 2.0.0
## Description:一键式实现操作系统环境配置、openGauss软件下载、openGauss软件安装等步骤,帮助大家提升安装openGauss数据库效率
## Tips: 请确保操作系统可以连接外网
## 0.关闭virbr0网卡 [本地虚拟机标准化安装openEuler系统会默认存在virbr0网卡,删除该网卡以避免干扰数据库的安装]
## virsh net-destroy default
## virsh net-list
## echo "Net device virbr0 is disabled."
## 1.定义主机信息[请根据实际情况修改]
export MY_HOSTNAME=node1 ## 主机名
export MY_HOSTIP=$(ifconfig eth0 | grep 'inet' | awk '{print $2}' | head -1) ## IP地址
export MY_SOFTWARE_DIRECTORY=/soft/openGauss ## 软件包所在目录
export MY_XML=/soft/openGauss/clusterconfig.xml ## 集群配置文件XML
export openGauss_Download_url=https://opengauss.obs.cn-south-1.myhuaweicloud.com/2.0.0/x86_openEuler/openGauss-2.0.0-openEuler-64bit-all.tar.gz ## openGauss软件包下载地址
## 1. 设置主机名并配置hosts文件
hostnamectl set-hostname $MY_HOSTNAME
sed -i '/$MY_HOSTIP/d' /etc/hosts
echo "$MY_HOSTIP $MY_HOSTNAME #Gauss OM IP Hosts Mapping" >> /etc/hosts
cat /etc/hosts
echo "1.Configure /etc/hosts completed."
echo -e "n"
## 2. 关闭防火墙
systemctl disable firewalld.service
systemctl stop firewalld.service
echo "Firewalld " `systemctl status firewalld|grep Active`
echo "2.Disable firewalld service completed."
echo -e "n"
## 3. 关闭SELinux
sed -i '/^SELINUX=/d' /etc/selinux/config
echo "SELINUX=disabled" >> /etc/selinux/config
cat /etc/selinux/config|grep "SELINUX=disabled"
echo "3.Disable SELINUX completed."
echo -e "n"
## 4. 设置操作系统字符集编码
echo "LANG=en_US.UTF-8" >> /etc/profile
source /etc/profile
echo $LANG
echo "4.Configure encoding completed."
echo -e "n"
## 5. 设置操作系统时区
rm -fr /etc/localtime
ln -s /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
date -R
hwclock
echo "5.Configure Timezone completed."
echo -e "n"
## 6. 关闭SWAP分区 [对于2G内存的设备,建议待安装完毕后再打开SWAP以间接 “扩容内存容量”]
sed -i '/swap/s/^/#/' /etc/fstab
swapoff -a
free -m
echo "6.Close swap partition completed."
echo -e "n"
## 7. 配置SSH服务,关闭Banner,允许root远程登录
sed -i '/Banner/s/^/#/' /etc/ssh/sshd_config
sed -i '/PermitRootLogin/s/^/#/' /etc/ssh/sshd_config
echo -e "n" >> /etc/ssh/sshd_config
echo "Banner none " >> /etc/ssh/sshd_config
echo "PermitRootLogin yes" >> /etc/ssh/sshd_config
cat /etc/ssh/sshd_config |grep -v ^#|grep -E 'PermitRoot|Banner'
echo "7.Configure SSH Service completed."
echo -e "n"
## 8. 配置YUM源、安装依赖包、修改默认Python3版本
mkdir /etc/yum.repos.d/bak
mv /etc/yum.repos.d
public class OpenGaussRowConverter extends AbstractJdbcRowConverter {
public OpenGaussRowConverter(RowType rowType) {
super(rowType);
}
private static final long serialVersionUID = 1L;
@Override
public String converterName() {
return "opengauss";
}
}
构建 方言(Dialect)
package name.lijiaqi.dialect;
import name.lijiaqi.converter.OpenGaussRowConverter;
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.logical.RowType;
import java.util.Optional;
public class OpenGaussDialect implements JdbcDialect {
private static final long serialVersionUID = 1L;
@Override
public String dialectName() {
return "opengauss";
}
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:opengauss:");
}
@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new OpenGaussRowConverter(rowType);
}
@Override
public String getLimitClause(long l) {
return null;
}
@Override
public void validate(TableSchema schema) throws ValidationException {
JdbcDialect.super.validate(schema);
}
@Override
public Optional defaultDriverName() {
return Optional.of("org.opengauss.Driver");
}
@Override
public String quoteIdentifier(String identifier) {
return "'" + identifier + "'";
}
@Override
public Optional getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
return JdbcDialect.super.getUpsertStatement(tableName, fieldNames, uniqueKeyFields);
}
@Override
public String getRowExistsStatement(String tableName, String[] conditionFields) {
return JdbcDialect.super.getRowExistsStatement(tableName, conditionFields);
}
@Override
public String getInsertIntoStatement(String tableName, String[] fieldNames) {
return JdbcDialect.super.getInsertIntoStatement(tableName, fieldNames);
}
@Override
public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
return JdbcDialect.super.getUpdateStatement(tableName, fieldNames, conditionFields);
}
@Override
public String getDeleteStatement(String tableName, String[] conditionFields) {
return JdbcDialect.super.getDeleteStatement(tableName, conditionFields);
}
@Override
public String getSelectFromStatement(String tableName, String[] selectFields, String[] conditionFields) {
return JdbcDialect.super.getSelectFromStatement(tableName, selectFields, conditionFields);
}
}
注册动态表工厂(DynamicTableFactory),以及相关Sink程序
首先创建 OpenGaussSinkFunction 用于接受RowData数据输入,并将其Sink到配置的数据库中
package name.lijiaqi.table;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
public class OpenGaussDynamicTableSink implements DynamicTableSink {
private final JdbcOptions jdbcOptions;
private final EncodingFormat> encodingFormat;
private final DataType dataType;
public OpenGaussDynamicTableSink(JdbcOptions jdbcOptions, EncodingFormat> encodingFormat, DataType dataType) {
this.jdbcOptions = jdbcOptions;
this.encodingFormat = encodingFormat;
this.dataType = dataType;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
return requestedMode;
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
System.out.println("SinkRuntimeProvider");
System.out.println(dataType);
// SerializationSchema serializationSchema = encodingFormat.createRuntimeEncoder(context, dataType);
OpenGaussSinkFunction gbasedbtSinkFunction = new OpenGaussSinkFunction(jdbcOptions,dataType);
return SinkFunctionProvider.of(gbasedbtSinkFunction);
}
@Override
public DynamicTableSink copy() {
return new OpenGaussDynamicTableSink(jdbcOptions, encodingFormat, dataType);
}
@Override
public String asSummaryString() {
return "OpenGauss Table Sink";
}
}
构建 OpenGaussDynamicTableSink
package name.lijiaqi.table;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
public class OpenGaussDynamicTableSink implements DynamicTableSink {
private final JdbcOptions jdbcOptions;
private final EncodingFormat> encodingFormat;
private final DataType dataType;
public OpenGaussDynamicTableSink(JdbcOptions jdbcOptions, EncodingFormat> encodingFormat, DataType dataType) {
this.jdbcOptions = jdbcOptions;
this.encodingFormat = encodingFormat;
this.dataType = dataType;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
return requestedMode;
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
System.out.println("SinkRuntimeProvider");
System.out.println(dataType);
// SerializationSchema serializationSchema = encodingFormat.createRuntimeEncoder(context, dataType);
OpenGaussSinkFunction gbasedbtSinkFunction = new OpenGaussSinkFunction(jdbcOptions,dataType);
return SinkFunctionProvider.of(gbasedbtSinkFunction);
}
@Override
public DynamicTableSink copy() {
return new OpenGaussDynamicTableSink(jdbcOptions, encodingFormat, dataType);
}
@Override
public String asSummaryString() {
return "OpenGauss Table Sink";
}
}
构建OpenGaussDynamicTableFactory
package name.lijiaqi.table;
import name.lijiaqi.dialect.OpenGaussDialect;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableSchemaUtils;
import java.util.HashSet;
import java.util.Set;
public class OpenGaussDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
public static final String IDENTIFIER = "opengauss";
private static final String DRIVER_NAME = "org.opengauss.Driver";
public static final ConfigOption URL = ConfigOptions
.key("url")
.stringType()
.noDefaultValue()
.withDescription("the jdbc database url.");
public static final ConfigOption DRIVER = ConfigOptions
.key("driver")
.stringType()
.defaultValue(DRIVER_NAME)
.withDescription("the jdbc driver.");
public static final ConfigOption TABLE_NAME = ConfigOptions
.key("table-name")
.stringType()
.noDefaultValue()
.withDescription("the jdbc table name.");
public static final ConfigOption USERNAME = ConfigOptions
.key("username")
.stringType()
.noDefaultValue()
.withDescription("the jdbc user name.");
public static final ConfigOption PASSWORD = ConfigOptions
.key("password")
.stringType()
.noDefaultValue()
.withDescription("the jdbc password.");
// public static final ConfigOption FORMAT = ConfigOptions
// .key("format")
// .stringType()
// .noDefaultValue()
// .withDescription("the format.");
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public Set> requiredOptions() {
Set> requiredOptions = new HashSet<>();
requiredOptions.add(URL);
requiredOptions.add(TABLE_NAME);
requiredOptions.add(USERNAME);
requiredOptions.add(PASSWORD);
// requiredOptions.add(FORMAT);
return requiredOptions;
}
@Override
public Set> optionalOptions() {
return new HashSet<>();
}
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig config = helper.getOptions();
helper.validate();
JdbcOptions jdbcOptions = getJdbcOptions(config);
TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
return new OpenGaussDynamicTableSource(jdbcOptions, physicalSchema);
}
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// final EncodingFormat> encodingFormat = helper.discoverEncodingFormat(
// SerializationFormatFactory.class,
// FactoryUtil.FORMAT);
final ReadableConfig config = helper.getOptions();
// validate all options
helper.validate();
// get the validated options
JdbcOptions jdbcOptions = getJdbcOptions(config);
// derive the produced data type (excluding computed columns) from the catalog table
final DataType dataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
// table sink
return new OpenGaussDynamicTableSink(jdbcOptions, null, dataType);
}
private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
final String url = readableConfig.get(URL);
final JdbcOptions.Builder builder = JdbcOptions.builder()
.setDriverName(DRIVER_NAME)
.setDBUrl(url)
.setTableName(readableConfig.get(TABLE_NAME))
.setDialect(new OpenGaussDialect());
readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
return builder.build();
}
}
接下来通过SPI注册动态表:创建文件resourcesmeta-INFservicesorg.apache.flink.table.factories.Factory内容注册为name.lijiaqi.table.OpenGaussDynamicTableFactory
至此,我们的Flink connector 就构建完成,接下来,我们要使用其,来完成一个真正的项目。
CDC实战下面是项目的整体架构图,我们通过flink cdc 从mysql获取变更数据,然后通过 flink sql 将数据 sink 到 opengauss里
接下来,我们看一下如何通过Flink SQL实现CDC ,只需3条SQL语句即可。
创建数据源表
// 数据源表 String sourceDDL = "CREATE TABLE mysql_binlog (n" + " id INT NOT NULL,n" + " name STRING,n" + " description STRINGn" + ") WITH (n" + " 'connector' = 'mysql-cdc',n" + " 'hostname' = 'localhost',n" + " 'port' = '3306',n" + " 'username' = 'flinkcdc',n" + " 'password' = '123456',n" + " 'database-name' = 'test',n" + " 'table-name' = 'test_cdc'n" + ")";
创建输出表,输出到opengauss ,这里 connector设置成opengauss
String url = "jdbc:opengauss://172.27.71.161:26000/postgres"; String userName = "jacky"; String password = "123456"; String gbasedbtSinkTable = "t1"; // 输出目标表 String sinkDDL = "CREATE TABLE test_cdc_sink (n" + " id INT NOT NULL,n" + " name STRING,n" + " description STRING,n" + " PRIMARY KEY (id) NOT ENFORCED n " + ") WITH (n" + " 'connector' = 'opengauss',n" + // " 'driver' = 'com.gbasedbt.jdbc.Driver',n" + " 'url' = '" + url + "',n" + " 'username' = '" + userName + "',n" + " 'password' = '" + password + "',n" + " 'table-name' = '" + gbasedbtSinkTable + "' n" + ")";
这里我们直接将数据汇入
String transformSQL = "insert into test_cdc_sink select * from mysql_binlog";
完整参考代码
package name.lijiaqi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class MysqlToOpenGaussMain {
public static void main(String[] args) throws Exception {
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
// 数据源表
String sourceDDL =
"CREATE TABLE mysql_binlog (n" +
" id INT NOT NULL,n" +
" name STRING,n" +
" description STRING,n" +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH (n" +
" 'connector' = 'mysql-cdc',n" +
" 'hostname' = 'localhost',n" +
" 'port' = '3306',n" +
" 'username' = 'flinkcdc',n" +
" 'password' = '123456',n" +
" 'database-name' = 'test',n" +
" 'table-name' = 'test_cdc'n" +
")";
String url = "jdbc:opengauss://172.27.71.161:26000/postgres";
String userName = "jacky";
String password = "123456";
String gbasedbtSinkTable = "t1";
// 输出目标表
String sinkDDL =
"CREATE TABLE test_cdc_sink (n" +
" id INT NOT NULL,n" +
" name STRING,n" +
" description STRING,n" +
" PRIMARY KEY (id) NOT ENFORCED n " +
") WITH (n" +
" 'connector' = 'opengauss',n" +
// " 'driver' = 'com.gbasedbt.jdbc.Driver',n" +
" 'url' = '" + url + "',n" +
" 'username' = '" + userName + "',n" +
" 'password' = '" + password + "',n" +
" 'table-name' = '" + gbasedbtSinkTable + "' n" +
")";
String transformSQL =
"insert into test_cdc_sink select * from mysql_binlog";
tableEnv.executeSql(sourceDDL);
tableEnv.executeSql(sinkDDL);
TableResult result = tableEnv.executeSql(transformSQL);
result.print();
env.execute("sync-flink-cdc");
}
}
运行结果
工程地址如下 , 欢迎start,for,pull requests
https://github.com/dafei1288/flink-connector-opengauss
- END -
历史文章导读:
如何用Flink整合hudi,构架沧湖一体化解决方案
云原生初体验:在k8s上部署springboot应用
手动实现一门图灵完备的编程语言——Brainfuck
10分钟入门响应式:Springboot整合kafka实现reactive
基于Calcite自定义SQL解析器
基于JDBC实现VPD:SQL解析篇
如何成为一个成功的首席数据官
基于Win10单机部署kubernetes应用
浅谈基于JDBC实现虚拟专用数据库(VPD)
你好,我是 +7 ,一个大数据领域的硬核原创作者。
做过后端架构、数据库中间件、数据平台&架构、产品。
专注大数据领域,数据库领域实时动态&技术提升&个人成长&职场进阶,欢迎关注。
如果文件对您有点帮助,请关注、分享,帮助更多人~非常感谢