
背景
StarRocks提供多种导入方式支持各数据源的数据导入,本文介绍mysql的历史数据导入StarRocks的几种方式以及使用场景
概述:mysql的数据导入一般建议3种方式导入
适用于离线数据的批量导入,有大量历史数据表导入时推荐此方式
可于离线数据的导入
适用于mysql数据的实时同步
示例demo下面介绍上述三种导入的使用方法以及demo示例
datax导入 下载安装#下载解压datax
DDLwget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
tar -zvxf datax.tar.gz
# writer推荐使用最新插件,下载地址:https://github.com/StarRocks/DataX/releases
tar -zvxf starrockswriter.tar.gz 或 doriswriter.tar.gz
#将starrockswriter放至datax插件目录
mv starrockswriter datax/plugin/writer/
StarRocks建表
CREATE TABLE dataxtest(
name1 tinyint(4) NULL COMMENT "",
name2 tinyint(4) NULL COMMENT ""
) ENGINE=OLAP
duplicate KEY(name1)
COMMENT "OLAP"
DISTRIBUTED BY HASH(name1) BUCKETS 3
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);
mysql建表
CREATE TABLE dataxtest3(
NAME1 tinyint(4) NULL COMMENT "",
NAME2 tinyint(4) NULL COMMENT ""
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='订单表'
mysql插入数据
Insert into dataxtest values(1,1),(2,2),(3,3),(4,4)
SR创建用户:
编辑任务:CREATE USER datax@'%' IDENTIFIED BY '123456';
grant all on . to 'datax'@'%';
vim job/mysql.json
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": ["name1", "name2"],
"connection": [
{
"table": [ "dataxtest"],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/canal"
]
}
]
}
},
"writer": {
"name": "doriswriter",
"parameter": {
"username": "datax",
"password": "123456",
"database": "qcy",
"table": "dataxtest",
"column": ["name1","name2"],
"preSql": [],
"postSql": [],
"jdbcUrl": "jdbc:mysql://127.0.0.1:8012/",
"loadUrl": ["127.0.0.1:8011"],
"loadProps": {}
}
}
}
]
}
}
执行任务
python bin/datax.py --jvm="-Xms6G -Xmx6G" --loglevel=debug job/mysql.json
写入成功,查看数据
myqsl外表导入内容:
通过建立mysql外表,然后使用insert into 导入starrocks
建表:mysql外表:CREATE TABLE `lineorder` (
`lo_orderdate` int(11) NOT NULL COMMENT "",
`lo_orderkey` int(11) NOT NULL COMMENT "",
`lo_linenumber` tinyint(4) NOT NULL COMMENT "",
`lo_custkey` int(11) NOT NULL COMMENT "",
`lo_partkey` int(11) NOT NULL COMMENT "",
`lo_suppkey` int(11) NOT NULL COMMENT "",
`lo_orderpriority` varchar(16) NOT NULL COMMENT "",
`lo_shippriority` tinyint(4) NOT NULL COMMENT "",
`lo_quantity` tinyint(4) NOT NULL COMMENT "",
`lo_extendedprice` int(11) NOT NULL COMMENT "",
`lo_ordtotalprice` int(11) NOT NULL COMMENT "",
`lo_discount` tinyint(4) NOT NULL COMMENT "",
`lo_revenue` int(11) NOT NULL COMMENT "",
`lo_supplycost` int(11) NOT NULL COMMENT "",
`lo_tax` tinyint(4) NOT NULL COMMENT "",
`lo_commitdate` int(11) NOT NULL COMMENT "",
`lo_shipmode` varchar(11) NOT NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`lo_orderdate`, `lo_orderkey`)
COMMENT "OLAP"
PARTITION BY RANGE(`lo_orderdate`)
(PARTITION p1 VALUES [("-2147483648"), ("19930101")),
PARTITION p2 VALUES [("19930101"), ("19940101")),
PARTITION p3 VALUES [("19940101"), ("19950101")),
PARTITION p4 VALUES [("19950101"), ("19960101")),
PARTITION p5 VALUES [("19960101"), ("19970101")),
PARTITION p6 VALUES [("19970101"), ("19980101")),
PARTITION p7 VALUES [("19980101"), ("19990101")))
DISTRIBUTED BY HASH(`lo_orderkey`) BUCKETS 96
PROPERTIES (
"replication_num" = "1",
"colocate_with" = "groupc1",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);
导入100条数据:CREATE EXTERNAL TABLE lineorder_external_table
(
`lo_orderdate` int(11) NOT NULL COMMENT "",
`lo_orderkey` int(11) NOT NULL COMMENT "",
`lo_linenumber` tinyint(4) NOT NULL COMMENT "",
`lo_custkey` int(11) NOT NULL COMMENT "",
`lo_partkey` int(11) NOT NULL COMMENT "",
`lo_suppkey` int(11) NOT NULL COMMENT "",
`lo_orderpriority` varchar(16) NOT NULL COMMENT "",
`lo_shippriority` tinyint(4) NOT NULL COMMENT "",
`lo_quantity` tinyint(4) NOT NULL COMMENT "",
`lo_extendedprice` int(11) NOT NULL COMMENT "",
`lo_ordtotalprice` int(11) NOT NULL COMMENT "",
`lo_discount` tinyint(4) NOT NULL COMMENT "",
`lo_revenue` int(11) NOT NULL COMMENT "",
`lo_supplycost` int(11) NOT NULL COMMENT "",
`lo_tax` tinyint(4) NOT NULL COMMENT "",
`lo_commitdate` int(11) NOT NULL COMMENT "",
`lo_shipmode` varchar(11) NOT NULL COMMENT ""
)
ENGINE=mysql
PROPERTIES
(
"host" = "xxx.xxx.xxx.xxx",
"port" = "9030",
"user" = "qcy",
"password" = "123456",
"database" = "ssb_100",
"table" = "lineorder"
);
导入成功
导入6亿数据:导入出错execute timeout
调整参数set query_timeout=3000后重试:
导入成功
cdc工具同步 概述:主要描述flink-cdc同步mysql数据到sr中的使用实践以及一些问题的解决,原理部分不详细描述
使用flink-cdc+primarykey模型实现数据同步Db 需要修改成MySQL的连接信息。
be_num 需要配置成StarRocks集群的节点数(这个能帮助更合理的设置bucket数量)。
[table-rule.1] 是匹配规则,可以根据正则表达式匹配数据库和表名生成建表的SQL,也可以配置多个规则。仅支持正则匹配,不支持多表使用逗号分开的形式。
flink.starrocks.* 是StarRocks的集群配置信息,参考Flink.
注意:此处留意ip,端口,库名,表名,正则表达式是否书写正确。另外如果flink设置的是多并行度,由于flink-cdc的机制,需要开启checkpoint才能进行数据同步,不开启checkpoint只能使用单并行度进行同步。开启checkpoint的方式请参考: 失败重试及任务重启
[db]
host = xxx.xxx.xxx.xxx
port = 3306
user = root
password =
[other]
# number of backends in StarRocks
be_num = 3
# `decimal_v3` is supported since StarRocks-1.18.1
use_decimal_v3 = false
# file to save the converted DDL SQL
output_dir = ./result
[table-rule.1]
# pattern to match databases for setting properties
database = ^console_19321.*$
# pattern to match tables for setting properties
table = ^.*$
############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################
flink.starrocks.jdbc-url=jdbc:mysql://xxx.xxx.xxx.xxx:9030
flink.starrocks.load-url= xxx.xxx.xxx.xxx:8030
flink.starrocks.username=root
flink.starrocks.password=
flink.starrocks.sink.properties.column_separator=x01
flink.starrocks.sink.properties.row_delimiter=x02
flink.starrocks.sink.buffer-flush.interval-ms=15000
```
$./starrocks-migrate-tool
$ls result
flink-create.1.sql smt.tar.gz starrocks-create.all.sql
flink-create.all.sql starrocks-create.1.sql
Mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.1.sql
bin/sql-client.sh -f flink-create.1.sql
这个执行以后同步任务会持续执行
如果是Flink 1.13之前的版本可能无法直接执行脚本,需要逐行提交 注意 记得打开MySQL binlog
bin/flink list
如果有任务请查看log日志,或者调整conf中的系统配置中内存和slot。
失败重试及任务重启checkpoints savepoints 简单配置如下
# unit: ms
execution.checkpointing.interval: 300000
state.backend: filesystem
state.checkpoints.dir: file:///tmp/flink-checkpoints-directory
state.savepoints.dir: file:///tmp/flink-savepoints-directory
[table-rule.1]
# pattern to match databases for setting properties
database = ^console_19321.*$
# pattern to match tables for setting properties
table = ^.*$
############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################
flink.starrocks.jdbc-url=jdbc:mysql://xxx.xxx.xxx.xxx:9030
flink.starrocks.load-url= xxx.xxx.xxx.xxx:8030
flink.starrocks.username=root
flink.starrocks.password=
flink.starrocks.sink.properties.column_separator=x01
flink.starrocks.sink.properties.row_delimiter=x02
flink.starrocks.sink.buffer-flush.interval-ms=15000
[table-rule.2]
# pattern to match databases for setting properties
database = ^database2.*$
# pattern to match tables for setting properties
table = ^.*$
############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################
flink.starrocks.jdbc-url=jdbc:mysql://xxx.xxx.xxx.xxx:9030
flink.starrocks.load-url= xxx.xxx.xxx.xxx:8030
flink.starrocks.username=root
flink.starrocks.password=
# 如果导入数据不方便选出合适的分隔符可以考虑使用Json格式,但是会有一定的性能损失,使用方法:用以下参数替换flink.starrocks.sink.properties.column_separator和flink.starrocks.sink.properties.row_delimiter参数
flink.starrocks.sink.properties.strip_outer_array=true
flink.starrocks.sink.properties.format=json
~~~
[table-rule.3]
# pattern to match databases for setting properties
database = ^edu_db_[0-9]*$
# pattern to match tables for setting properties
table = ^course_[0-9]*$
############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################
flink.starrocks.jdbc-url=jdbc:mysql://xxx.xxx.xxx.xxx:9030
flink.starrocks.load-url= xxx.xxx.xxx.xxx:8030
flink.starrocks.username=root
flink.starrocks.password=
flink.starrocks.sink.properties.column_separator=x01
flink.starrocks.sink.properties.row_delimiter=x02
flink.starrocks.sink.buffer-flush.interval-ms=5000
这样会自动生成一个多对一的导入关系,在StarRocks默认生成的表名是 course__auto_shard,也可以自行在生成的配置文件中修改。
'sink.properties.column_separator' = '\x01'
'sink.properties.row_delimiter' = '\x02'
修改/etc/my.cnf
#开启binlog日志
log-bin=/var/lib/mysql/mysql-bin
#log_bin=ON
##binlog日志的基本文件名
#log_bin_basename=/var/lib/mysql/mysql-bin
##binlog文件的索引文件,管理所有binlog文件
#log_bin_index=/var/lib/mysql/mysql-bin.index
#配置serverid
server-id=1
binlog_format = row
重启mysqld,然后可以通过 SHOW VARIABLES LIKE 'log_bin'; 确认是否已经打开。