当前位置:   article > 正文

第4.3章:StarRocks数据导出--Spark Connector_starrocks-spark-connector

starrocks-spark-connector

通过Spark Connector,我们可以使用Spark来快速读取StarRocks中存储的数据。Spark Connector在使用上和咱们之前介绍的Flink Connector Source类似,二者都具备并行读取StarRocksBE节点数据的能力,相对于jdbc的方式大大提高了数据读取效率

Spark Connector目前只能用来读取数据,Sink部分需要咱们自己基于Stream Load实现。此外,Spark Connector版本更新较慢,所以相对更推荐使用Flink Connector来进行数据全链路的处理工作。

Spark Connector目前也已开源,git地址为:

GitHub - StarRocks/spark-starrocks-connectorhttps://github.com/StarRocks/spark-starrocks-connector

官方编译好的Jar包下载地址为:

demo/SparkDemo/src/main/resources at master · StarRocks/demo · GitHubhttps://github.com/StarRocks/demo/tree/master/SparkDemo/src/main/resources

Spark Connector官方介绍材料见:

Spark StarRocks Connector @ Spark_connector @ StarRocks Docshttps://docs.starrocks.com/zh-cn/main/unloading/Spark_connector

咱们这里只使用Spark Shell演示一个简单的例子,使用spark connector获取StarRocks的数据并导出到MySQL中。集群相关组件的信息如下:

节点IP

部署服务

端口

版本

说明

192.168.110.23

[starrocks]

FE

8030

2.1.0

用户名密码均为root

BE

9060

Broker

8000

名称为:hdfs_broker

192.168.110.98

[standalone]

Spark

7077

spark-3.1.3-bin-hadoop3.2

starrocks-spark3_2.12-1.0.0.jar

mysql-connector-java-5.1.49.jar

MySQL

3306

5.7.37

用户名密码均为root

备注:

1)启动spark前需将starrocks-spark3_2.12-1.0.0.jar和mysql-connector-java-5.1.49.jar都拷贝至$SPARK_HOME/jars/目录中。

2)spark connector或flink connector在读取StarRocks数据时有一次重定向操作,会将请求由FE的8030端口重定向至BE的9060端口,这里需注意对应端口的开放。

3)若使用前文4.0的“一键部署包”在Docker中部署StarRocks,则无法在除宿主机外的其他机器上使用spark connector或flink connector source,因为重定向操作返回给程序的是容器内的IP,在其他机器上无法直接访问。

1、StarRocks准备

登录StarRocks:

[root@starrocks ~]# mysql -h192.168.110.23 -P9030 -uroot -proot

……

StarRocks创建表:

mysql> use starrocks;

mysql> CREATE TABLE users (

user_id BIGINT NOT NULL,

NAME string NOT NULL,

email string NULL,

address string NULL,

age TINYINT NULL )

PRIMARY KEY ( user_id )

DISTRIBUTED BY HASH ( user_id )

BUCKETS 4 PROPERTIES ( "replication_num" = "1" );

向StarRocks中插入演示数据:

INSERT INTO `starrocks`.`users`(`user_id`, `name`, `email`, `address`, `age`) VALUES (6, 'test6', 'test@163.com', 'address6', 14),(2, 'test2', 'test@163.com', 'address2', 6),(8, 'test8', 'test@163.com', 'address8', 6),(4, 'test4', 'test@163.com', 'address4', 0),(0, 'test0', 'test@163.com', 'address0', 26);

2、MySQL准备

在MySQL创建对应的库和表:

mysql> create database starrocks_export;

mysql> use starrocks_export;

mysql> CREATE TABLE `users` (

`user_id` int(11) NOT NULL,

`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,

`email` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,

`address` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,

`age` int(11) NULL DEFAULT NULL,

PRIMARY KEY (`user_id`) USING BTREE

);

3、Spark作业

进入spark-shell:

[root@standalone spark-3.1.3-bin-hadoop3.2]# ./bin/spark-shell

3.1获取StarRocks数据

官网文档提供有SQL、DataFrame和RDD语法,以DataFrame为例:

val starrocksSparkDF = spark.read.format("starrocks")

  .option("starrocks.table.identifier", "$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME")

  .option("starrocks.fenodes", "$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESTFUL_PORT")

  .option("user", "$YOUR_STARROCKS_USERNAME")

  .option("password", "$YOUR_STARROCKS_PASSWORD")

  .load()

根据集群信息,改写后执行:

scala> val starrocksSparkDF = spark.read.format("starrocks").option("starrocks.table.identifier", "starrocks.users").option("starrocks.fenodes", "192.168.110.23:8030").option("user", "root").option("password", "root").load()

在交互窗口中,已经可以查到StarRocks表中的数据:

scala> starrocksSparkDF.show();

+----------+---------+------------------+------------+-----+

|user_id  | name  |   email           | address |age  |

+----------+---------+------------------+------------+-----+

|       2     | test2   |test@163.com|address2|  6   |

|       8     | test8   |test@163.com|address8|  6   |

|       6     | test6   |test@163.com|address6| 14  |

|       4     | test4   |test@163.com|address4|  0   |

|       0     | test0   |test@163.com|address0| 26  |

+----------+---------+------------------+------------+-----+

3.2数据写入MySQL

1)导入需要使用的依赖包:

scala> import org.apache.spark.sql._

scala> import java.util.Properties

2)创建MySQL配置文件信息:

scala> val prop = new Properties();

scala> prop.setProperty("user", "root");

scala> prop.setProperty("password", "root");

scala> prop.setProperty("driver", "com.mysql.jdbc.Driver");

3)将数据写入到MySQL中:

scala> starrocksSparkDF.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.110.98:3306/starrocks_export","users",prop);

4)在MySQL数据库中查看数据是否写入:

mysql> select * from users;

+---------------+----------+----------------------+----------------+---------+

| user_id       |name    | email                  | address       | age     |

+---------------+----------+----------------------+----------------+---------+

|       0           | test0    | test@163.com   | address0     |   26     |

|       2           | test2    | test@163.com   | address2     |    6      |

|       4           | test4    | test@163.com   | address4     |    0      |

|       6           | test6    | test@163.com   | address6     |   14     |

|       8           | test8    | test@163.com   | address8     |    6      |

+---------------+----------+----------------------+----------------+---------+

5)退出spark-shell:

scala> :quit

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/381296
推荐阅读
相关标签
  

闽ICP备14008679号