当前位置:   article > 正文

StarRocks新老版本数据迁移方案2(flink版)_starrockssink.sink方法

starrockssink.sink方法

1、flink实时进行数据迁移

将老版本或者低版本StarRocks表中的数据,迁移到新版本StarRocks表中,为了使用新版本中的各种新功能,故需要迁移数据,这种模式比较通用,对其他组件依赖比较少,方便快捷使用,推荐使用这种方式。

迁移前后:该例子为了演示,表结构一模一样的哦,并亲测有效,强烈建议使用

1.1、依赖

该模式会用到组件(flink-connector-starrocks)进行读取写入即可。pom如下

flink版本:1.14.x

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-base</artifactId>
  4. <version>${flink.version}</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.starrocks</groupId>
  8. <artifactId>flink-connector-starrocks</artifactId>
  9. <version>1.2.3_flink-1.14_2.11</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.flink</groupId>
  13. <artifactId>flink-shaded-guava</artifactId>
  14. <version>30.1.1-jre-15.0</version>
  15. </dependency>
  16. <!-- flink Sql -->
  17. <dependency>
  18. <groupId>org.apache.flink</groupId>
  19. <artifactId>flink-table-planner_2.11</artifactId>
  20. <version>${flink.version}</version>
  21. <!-- <scope>provided</scope>-->
  22. </dependency>
  23. <dependency>
  24. <groupId>org.apache.flink</groupId>
  25. <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  26. <version>${flink.version}</version>
  27. </dependency>

1.2、实现方式

1.2.1、读取数据工具类

  1. public class StarRocksSourceOptionsUtils {
  2. // mysql -hxxx -P 9030 -uroot -pxxx -Dtest;
  3. public static String SR_SOURCE_CONNECTOR="starrocks";
  4. public static String SR_SOURCE_SCAN_URL="xxx:8030,xxx:8030,xxx:8030";
  5. public static String SR_SOURCE_JDBC_URL="jdbc:mysql://xxx:9030";
  6. public static String SR_SOURCE_USERNAME="root";
  7. public static String SR_SOURCE_PASSWORD="xxx";
  8. /**
  9. * StarRocks Source
  10. * @return
  11. */
  12. public static StarRocksSourceOptions createStarRocksSourceOptions(String db,String tableName){
  13. StarRocksSourceOptions.Builder builder = StarRocksSourceOptions.builder()
  14. .withProperty("connector", SR_SOURCE_CONNECTOR)
  15. .withProperty("scan-url", SR_SOURCE_SCAN_URL)
  16. .withProperty("jdbc-url", SR_SOURCE_JDBC_URL)
  17. .withProperty("username", SR_SOURCE_USERNAME)
  18. .withProperty("password", SR_SOURCE_PASSWORD)
  19. //BE 节点中单个查询的内存上限。单位:字节。默认值:1073741824(即 1 GB)。104857600:100M
  20. .withProperty("scan.params.mem-limit-byte","10737418240")
  21. //数据读取任务的超时时间,在任务执行过程中进行检查。单位:秒。默认值:600。如果超过该时间,仍未返回读取结果,则停止数据读取任务。
  22. .withProperty("scan.params.query-timeout-s","2592000")
  23. //Flink 连接器连接 StarRocks 集群的时间上限。单位:毫秒。默认值:1000。超过该时间上限,则数据读取任务会报错。
  24. .withProperty("scan.connect.timeout-ms","2592000")
  25. //数据读取任务的保活时间,通过轮询机制定期检查。单位:分钟。默认值:10。建议取值大于等于 5
  26. .withProperty("scan.params.keep-alive-min","8")
  27. //数据读取失败时的最大重试次数。默认值:1。超过该数量上限,则数据读取任务报错。
  28. .withProperty("scan.max-retries","100")
  29. .withProperty("table-name",tableName)
  30. .withProperty("database-name",db);
  31. return builder.build();
  32. }
  33. }

1.2.2、写入数据工具类

  1. public class StarRocksSinkOptionsUtils {
  2. //mysql -hxxx -P 9030 -uroot -pxxx -Dtest
  3. public static String starRocksJDBC="jdbc:mysql://xxx:9030,xxx:9030,xxx:9030";
  4. public static String starRocksLOADUrl="xxx:8030;xxx:8030;xxx:8030";
  5. public static String starRocksUsername="root";
  6. public static String starRocksPassword="xxx";
  7. public static Integer maxBytes = 1024 * 1024 * 64;
  8. public static StarRocksSinkOptions createStarRocksSink(String db, String tableName) {
  9. StarRocksSinkOptions.Builder sROptBuilder = StarRocksSinkOptions.builder()
  10. .withProperty(StarRocksSinkOptions.JDBC_URL.key(), starRocksJDBC)
  11. .withProperty(StarRocksSinkOptions.LOAD_URL.key(), starRocksLOADUrl)
  12. .withProperty(StarRocksSinkOptions.USERNAME.key(), starRocksUsername)
  13. .withProperty(StarRocksSinkOptions.PASSWORD.key(), starRocksPassword)
  14. .withProperty(StarRocksSinkOptions.DATABASE_NAME.key(), db)
  15. .withProperty("sink.properties.format", "json")
  16. .withProperty("sink.properties.strip_outer_array", "true")
  17. // 刷新条数
  18. .withProperty(StarRocksSinkOptions.SINK_BATCH_MAX_ROWS.key(), "64000")
  19. // 刷新数据大小
  20. .withProperty(StarRocksSinkOptions.SINK_BATCH_MAX_SIZE.key(), maxBytes.toString())
  21. .withProperty(StarRocksSinkOptions.SINK_BATCH_FLUSH_INTERVAL.key(), "10000")
  22. .withProperty(StarRocksSinkOptions.SINK_MAX_RETRIES.key(), "5")
  23. .withProperty(StarRocksSinkOptions.SINK_CONNECT_TIMEOUT.key(), "5000")
  24. .withProperty(StarRocksSinkOptions.TABLE_NAME.key(), tableName);
  25. return sROptBuilder.build();
  26. }
  27. }

俩工具类写好后,基本的查询及写入的工具都好了,但是里面还有很多的细节哦

1.3、整体流程

1.3.1、import导入

  1. import xxx.StarRocksSinkOptionsUtils;
  2. import xxx.StarRocksSourceOptionsUtils;
  3. import com.alibaba.fastjson.JSON;
  4. import com.starrocks.connector.flink.StarRocksSink;
  5. import com.starrocks.connector.flink.StarRocksSource;
  6. import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
  7. import com.starrocks.connector.flink.table.source.StarRocksSourceOptions;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.streaming.api.functions.ProcessFunction;
  12. import org.apache.flink.streaming.api.functions.sink.SinkFunction;
  13. import org.apache.flink.table.api.DataTypes;
  14. import org.apache.flink.table.api.TableSchema;
  15. import org.apache.flink.table.data.RowData;
  16. import org.apache.flink.util.Collector;
  17. import java.nio.charset.StandardCharsets;
  18. import java.util.HashMap;

1.3.2、创建source扫描表

  1. String db="test";
  2. String tableName="users";
  3. StarRocksSourceOptions options=StarRocksSinkOptionsUtils.createStarRocksSourceOptions(db,tableName);
  4. TableSchema tableSchema = getTableSchema();
  5. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  6. DataStreamSource<RowData> rowDataDataStreamSource = env
  7. .addSource(StarRocksSource.source(tableSchema, options))
  8. //.setParallelism(1);

1.3.3、表schema获取

getTableSchema:
  1. private static TableSchema getTableSchema() {
  2. TableSchema tableSchema = TableSchema.builder()
  3. .field("id", DataTypes.BIGINT())
  4. .field("addr", DataTypes.STRING())
  5. .field("dates", DataTypes.TIMESTAMP())
  6. .field("money", DataTypes.DECIMAL(18,4))
  7. .field("create_time", DataTypes.TIMESTAMP())
  8. .build();
  9. return tableSchema;
  10. }

1.3.4、创建sink模块

  1. StarRocksSinkOptions starRocksSinkOptions = getStarRocksSinkOptions(db,tableName);
  2. SinkFunction<String> starRocksSink = StarRocksSink.sink(starRocksSinkOptions);
  3. SingleOutputStreamOperator<String> processOutputStream = rowDataDataStreamSource.process(new ProcessFunction<RowData, String>() {
  4. @Override
  5. public void processElement(RowData value, ProcessFunction<RowData, String>.Context ctx, Collector<String> out) throws Exception {
  6. HashMap<String, String> map = new HashMap<>();
  7. extractedMapping(value, map);
  8. String json = JSON.toJSONString(map);
  9. out.collect(json);
  10. }
  11. });
  12. processOutputStream.addSink(starRocksSink).name(tableName + "- starRocksSink").uid(tableName + "- starRocksSink");
  13. env.execute("StarRocks flink source");

1.3.5、映射字段处理方法

extractedMapping:
  1. private static void extractedMapping(RowData value, HashMap<String, String> map) {
  2. map.put("id",new String(String.valueOf(value.getLong(0)).getBytes(), StandardCharsets.UTF_8));
  3. map.put("addr",new String(String.valueOf(value.getString(1)).getBytes(), StandardCharsets.UTF_8));
  4. map.put("dates",new String(String.valueOf(value.getTimestamp(2,5).toString().replace("T"," ")).getBytes(), StandardCharsets.UTF_8));
  5. map.put("money",new String(String.valueOf(value.getDecimal(3,18,4)).getBytes(), StandardCharsets.UTF_8));
  6. map.put("create_time",new String(String.valueOf(value.getTimestamp(4,5).toString().replace("T"," ")).getBytes(), StandardCharsets.UTF_8));
  7. }

1.3.6、获取sink可选项方法

getStarRocksSinkOptions:
  1. private static StarRocksSinkOptions getStarRocksSinkOptions(String db,String tableName) {
  2. StarRocksSinkOptions starRocksSinkOptions = StarRocksSinkOptionsUtils.createStarRocksSink(db,tableName);
  3. return starRocksSinkOptions;
  4. }

1.4、flink启动脚本

这种模式下,是走flink的batch模式,步骤如下:

1、打包上传jar包到flink机器

2、启动任务

3、运行完后自动退出

1.5、数据校验

迁移的source表和sink表,在不通的集群中结构一致,并校验迁移前后数据;

1、校验总数是否一致,是否有丢数据情况

2、校验字段数据值是否一致,各种类型都要校验一遍,防止把你坑的吐血,然后重来一遍,哈哈!特别一个表大几百T的表,让你血吐好几盆!(建议先用小表测试完后,在进行大表数据迁移

1.6、注意事项

1.6.1、source端问题

1、老旧版本的StarRocks在scan过程中,老是报内存满了,该怎么处理?

2、source设置几个并行度合适?

3、数据读取失败时的最大重试次数几次合适?

4、数据读取任务的保活时间设置多长合适?

5、连接器连接 StarRocks 集群的时间上限多大合适?TimeOut问题?

6、BE 节点中单个查询的内存上限多大合适?

7、数据读取任务的超时时间多大合适?

8、大几百T的表如何同步?

1.6.2、Sink端问题

1、xin版本的StarRocks在写入过程中,老是报内存满了,该怎么处理?

2、sink设置几个并行度合适?

3、多久刷一次写入数据比较合适,不然内存满了,任务死给你看!(BE只能重启,好像没有其他的办法-来自官网解释)

1.6.3、数据类型处理

1、NULL值如何处理?

2、日期格式如何处理?

3、为什么schema和映射的时候要对应顺序?

4、还有好多问题了,如果你也遇到了,联系我!

1.7、附录

如你也遇到问题了,微信号如下(加v之前,标注说明StarRocks来意关键字样,防止欺骗我的感情):

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Li_阴宅/article/detail/993037
推荐阅读
相关标签
  

闽ICP备14008679号