赞
踩
将老版本或者低版本StarRocks表中的数据,迁移到新版本StarRocks表中,为了使用新版本中的各种新功能,故需要迁移数据,这种模式比较通用,对其他组件依赖比较少,方便快捷使用,推荐使用这种方式。
迁移前后:该例子为了演示,表结构一模一样的哦,并亲测有效,强烈建议使用
该模式会用到组件(flink-connector-starrocks)进行读取写入即可。pom如下
flink版本:1.14.x
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-base</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.starrocks</groupId>
- <artifactId>flink-connector-starrocks</artifactId>
- <version>1.2.3_flink-1.14_2.11</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-shaded-guava</artifactId>
- <version>30.1.1-jre-15.0</version>
- </dependency>
- <!-- flink Sql -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner_2.11</artifactId>
- <version>${flink.version}</version>
- <!-- <scope>provided</scope>-->
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-java-bridge_2.11</artifactId>
- <version>${flink.version}</version>
- </dependency>

- public class StarRocksSourceOptionsUtils {
-
- // mysql -hxxx -P 9030 -uroot -pxxx -Dtest;
- public static String SR_SOURCE_CONNECTOR="starrocks";
- public static String SR_SOURCE_SCAN_URL="xxx:8030,xxx:8030,xxx:8030";
- public static String SR_SOURCE_JDBC_URL="jdbc:mysql://xxx:9030";
- public static String SR_SOURCE_USERNAME="root";
- public static String SR_SOURCE_PASSWORD="xxx";
-
- /**
- * StarRocks Source
- * @return
- */
- public static StarRocksSourceOptions createStarRocksSourceOptions(String db,String tableName){
- StarRocksSourceOptions.Builder builder = StarRocksSourceOptions.builder()
- .withProperty("connector", SR_SOURCE_CONNECTOR)
- .withProperty("scan-url", SR_SOURCE_SCAN_URL)
- .withProperty("jdbc-url", SR_SOURCE_JDBC_URL)
- .withProperty("username", SR_SOURCE_USERNAME)
- .withProperty("password", SR_SOURCE_PASSWORD)
- //BE 节点中单个查询的内存上限。单位:字节。默认值:1073741824(即 1 GB)。104857600:100M
- .withProperty("scan.params.mem-limit-byte","10737418240")
- //数据读取任务的超时时间,在任务执行过程中进行检查。单位:秒。默认值:600。如果超过该时间,仍未返回读取结果,则停止数据读取任务。
- .withProperty("scan.params.query-timeout-s","2592000")
- //Flink 连接器连接 StarRocks 集群的时间上限。单位:毫秒。默认值:1000。超过该时间上限,则数据读取任务会报错。
- .withProperty("scan.connect.timeout-ms","2592000")
- //数据读取任务的保活时间,通过轮询机制定期检查。单位:分钟。默认值:10。建议取值大于等于 5。
- .withProperty("scan.params.keep-alive-min","8")
- //数据读取失败时的最大重试次数。默认值:1。超过该数量上限,则数据读取任务报错。
- .withProperty("scan.max-retries","100")
- .withProperty("table-name",tableName)
- .withProperty("database-name",db);
- return builder.build();
- }
- }

- public class StarRocksSinkOptionsUtils {
-
- //mysql -hxxx -P 9030 -uroot -pxxx -Dtest
- public static String starRocksJDBC="jdbc:mysql://xxx:9030,xxx:9030,xxx:9030";
- public static String starRocksLOADUrl="xxx:8030;xxx:8030;xxx:8030";
- public static String starRocksUsername="root";
- public static String starRocksPassword="xxx";
- public static Integer maxBytes = 1024 * 1024 * 64;
-
- public static StarRocksSinkOptions createStarRocksSink(String db, String tableName) {
- StarRocksSinkOptions.Builder sROptBuilder = StarRocksSinkOptions.builder()
- .withProperty(StarRocksSinkOptions.JDBC_URL.key(), starRocksJDBC)
- .withProperty(StarRocksSinkOptions.LOAD_URL.key(), starRocksLOADUrl)
- .withProperty(StarRocksSinkOptions.USERNAME.key(), starRocksUsername)
- .withProperty(StarRocksSinkOptions.PASSWORD.key(), starRocksPassword)
- .withProperty(StarRocksSinkOptions.DATABASE_NAME.key(), db)
- .withProperty("sink.properties.format", "json")
- .withProperty("sink.properties.strip_outer_array", "true")
- // 刷新条数
- .withProperty(StarRocksSinkOptions.SINK_BATCH_MAX_ROWS.key(), "64000")
- // 刷新数据大小
- .withProperty(StarRocksSinkOptions.SINK_BATCH_MAX_SIZE.key(), maxBytes.toString())
- .withProperty(StarRocksSinkOptions.SINK_BATCH_FLUSH_INTERVAL.key(), "10000")
- .withProperty(StarRocksSinkOptions.SINK_MAX_RETRIES.key(), "5")
- .withProperty(StarRocksSinkOptions.SINK_CONNECT_TIMEOUT.key(), "5000")
- .withProperty(StarRocksSinkOptions.TABLE_NAME.key(), tableName);
- return sROptBuilder.build();
- }
- }

俩工具类写好后,基本的查询及写入的工具都好了,但是里面还有很多的细节哦
- import xxx.StarRocksSinkOptionsUtils;
- import xxx.StarRocksSourceOptionsUtils;
- import com.alibaba.fastjson.JSON;
- import com.starrocks.connector.flink.StarRocksSink;
- import com.starrocks.connector.flink.StarRocksSource;
- import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
- import com.starrocks.connector.flink.table.source.StarRocksSourceOptions;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.ProcessFunction;
- import org.apache.flink.streaming.api.functions.sink.SinkFunction;
- import org.apache.flink.table.api.DataTypes;
- import org.apache.flink.table.api.TableSchema;
- import org.apache.flink.table.data.RowData;
- import org.apache.flink.util.Collector;
-
- import java.nio.charset.StandardCharsets;
- import java.util.HashMap;

- String db="test";
- String tableName="users";
- StarRocksSourceOptions options=StarRocksSinkOptionsUtils.createStarRocksSourceOptions(db,tableName);
- TableSchema tableSchema = getTableSchema();
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStreamSource<RowData> rowDataDataStreamSource = env
- .addSource(StarRocksSource.source(tableSchema, options))
- //.setParallelism(1);
getTableSchema:
- private static TableSchema getTableSchema() {
- TableSchema tableSchema = TableSchema.builder()
- .field("id", DataTypes.BIGINT())
- .field("addr", DataTypes.STRING())
- .field("dates", DataTypes.TIMESTAMP())
- .field("money", DataTypes.DECIMAL(18,4))
- .field("create_time", DataTypes.TIMESTAMP())
- .build();
- return tableSchema;
- }
- StarRocksSinkOptions starRocksSinkOptions = getStarRocksSinkOptions(db,tableName);
- SinkFunction<String> starRocksSink = StarRocksSink.sink(starRocksSinkOptions);
-
- SingleOutputStreamOperator<String> processOutputStream = rowDataDataStreamSource.process(new ProcessFunction<RowData, String>() {
- @Override
- public void processElement(RowData value, ProcessFunction<RowData, String>.Context ctx, Collector<String> out) throws Exception {
- HashMap<String, String> map = new HashMap<>();
-
- extractedMapping(value, map);
-
- String json = JSON.toJSONString(map);
- out.collect(json);
- }
- });
-
- processOutputStream.addSink(starRocksSink).name(tableName + "- starRocksSink").uid(tableName + "- starRocksSink");
-
- env.execute("StarRocks flink source");

extractedMapping:
- private static void extractedMapping(RowData value, HashMap<String, String> map) {
- map.put("id",new String(String.valueOf(value.getLong(0)).getBytes(), StandardCharsets.UTF_8));
- map.put("addr",new String(String.valueOf(value.getString(1)).getBytes(), StandardCharsets.UTF_8));
- map.put("dates",new String(String.valueOf(value.getTimestamp(2,5).toString().replace("T"," ")).getBytes(), StandardCharsets.UTF_8));
- map.put("money",new String(String.valueOf(value.getDecimal(3,18,4)).getBytes(), StandardCharsets.UTF_8));
- map.put("create_time",new String(String.valueOf(value.getTimestamp(4,5).toString().replace("T"," ")).getBytes(), StandardCharsets.UTF_8));
-
- }
getStarRocksSinkOptions:
- private static StarRocksSinkOptions getStarRocksSinkOptions(String db,String tableName) {
- StarRocksSinkOptions starRocksSinkOptions = StarRocksSinkOptionsUtils.createStarRocksSink(db,tableName);
- return starRocksSinkOptions;
- }
这种模式下,是走flink的batch模式,步骤如下:
1、打包上传jar包到flink机器
2、启动任务
3、运行完后自动退出
迁移的source表和sink表,在不通的集群中结构一致,并校验迁移前后数据;
1、校验总数是否一致,是否有丢数据情况
2、校验字段数据值是否一致,各种类型都要校验一遍,防止把你坑的吐血,然后重来一遍,哈哈!特别一个表大几百T的表,让你血吐好几盆!(建议先用小表测试完后,在进行大表数据迁移)
1、老旧版本的StarRocks在scan过程中,老是报内存满了,该怎么处理?
2、source设置几个并行度合适?
3、数据读取失败时的最大重试次数几次合适?
4、数据读取任务的保活时间设置多长合适?
5、连接器连接 StarRocks 集群的时间上限多大合适?TimeOut问题?
6、BE 节点中单个查询的内存上限多大合适?
7、数据读取任务的超时时间多大合适?
8、大几百T的表如何同步?
1、xin版本的StarRocks在写入过程中,老是报内存满了,该怎么处理?
2、sink设置几个并行度合适?
3、多久刷一次写入数据比较合适,不然内存满了,任务死给你看!(BE只能重启,好像没有其他的办法-来自官网解释)
1、NULL值如何处理?
2、日期格式如何处理?
3、为什么schema和映射的时候要对应顺序?
4、还有好多问题了,如果你也遇到了,联系我!
如你也遇到问题了,微信号如下(加v之前,标注说明StarRocks来意关键字样,防止欺骗我的感情):
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。