赞
踩
通过Spark Connector,我们可以使用Spark来快速读取StarRocks中存储的数据。Spark Connector在使用上和咱们之前介绍的Flink Connector Source类似,二者都具备并行读取StarRocks的BE节点数据的能力,相对于jdbc的方式大大提高了数据读取效率。
Spark Connector目前只能用来读取数据,Sink部分需要咱们自己基于Stream Load实现。此外,Spark Connector版本更新较慢,所以相对更推荐使用Flink Connector来进行数据全链路的处理工作。
Spark Connector目前也已开源,git地址为:
GitHub - StarRocks/spark-starrocks-connectorhttps://github.com/StarRocks/spark-starrocks-connector
官方编译好的Jar包下载地址为:
Spark Connector官方介绍材料见:
咱们这里只使用Spark Shell演示一个简单的例子,使用spark connector获取StarRocks的数据并导出到MySQL中。集群相关组件的信息如下:
节点IP | 部署服务 | 端口 | 版本 | 说明 |
192.168.110.23 [starrocks] | FE | 8030 | 2.1.0 | 用户名密码均为root |
BE | 9060 | |||
Broker | 8000 | 名称为:hdfs_broker | ||
192.168.110.98 [standalone] | Spark | 7077 | spark-3.1.3-bin-hadoop3.2 starrocks-spark3_2.12-1.0.0.jar mysql-connector-java-5.1.49.jar | |
MySQL | 3306 | 5.7.37 | 用户名密码均为root |
备注:
1)启动spark前需将starrocks-spark3_2.12-1.0.0.jar和mysql-connector-java-5.1.49.jar都拷贝至$SPARK_HOME/jars/目录中。
2)spark connector或flink connector在读取StarRocks数据时有一次重定向操作,会将请求由FE的8030端口重定向至BE的9060端口,这里需注意对应端口的开放。
3)若使用前文4.0章的“一键部署包”在Docker中部署StarRocks,则无法在除宿主机外的其他机器上使用spark connector或flink connector source,因为重定向操作返回给程序的是容器内的IP,在其他机器上无法直接访问。
登录StarRocks:
[root@starrocks ~]# mysql -h192.168.110.23 -P9030 -uroot -proot
……
StarRocks创建表:
mysql> use starrocks;
mysql> CREATE TABLE users (
user_id BIGINT NOT NULL,
NAME string NOT NULL,
email string NULL,
address string NULL,
age TINYINT NULL )
PRIMARY KEY ( user_id )
DISTRIBUTED BY HASH ( user_id )
BUCKETS 4 PROPERTIES ( "replication_num" = "1" );
向StarRocks中插入演示数据:
INSERT INTO `starrocks`.`users`(`user_id`, `name`, `email`, `address`, `age`) VALUES (6, 'test6', 'test@163.com', 'address6', 14),(2, 'test2', 'test@163.com', 'address2', 6),(8, 'test8', 'test@163.com', 'address8', 6),(4, 'test4', 'test@163.com', 'address4', 0),(0, 'test0', 'test@163.com', 'address0', 26);
在MySQL创建对应的库和表:
mysql> create database starrocks_export;
mysql> use starrocks_export;
mysql> CREATE TABLE `users` (
`user_id` int(11) NOT NULL,
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`email` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`address` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`age` int(11) NULL DEFAULT NULL,
PRIMARY KEY (`user_id`) USING BTREE
);
进入spark-shell:
[root@standalone spark-3.1.3-bin-hadoop3.2]# ./bin/spark-shell
官网文档提供有SQL、DataFrame和RDD语法,以DataFrame为例:
val starrocksSparkDF = spark.read.format("starrocks")
.option("starrocks.table.identifier", "$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME")
.option("starrocks.fenodes", "$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESTFUL_PORT")
.option("user", "$YOUR_STARROCKS_USERNAME")
.option("password", "$YOUR_STARROCKS_PASSWORD")
.load()
根据集群信息,改写后执行:
scala> val starrocksSparkDF = spark.read.format("starrocks").option("starrocks.table.identifier", "starrocks.users").option("starrocks.fenodes", "192.168.110.23:8030").option("user", "root").option("password", "root").load()
在交互窗口中,已经可以查到StarRocks表中的数据:
scala> starrocksSparkDF.show();
+----------+---------+------------------+------------+-----+
|user_id | name | email | address |age |
+----------+---------+------------------+------------+-----+
| 2 | test2 |test@163.com|address2| 6 |
| 8 | test8 |test@163.com|address8| 6 |
| 6 | test6 |test@163.com|address6| 14 |
| 4 | test4 |test@163.com|address4| 0 |
| 0 | test0 |test@163.com|address0| 26 |
+----------+---------+------------------+------------+-----+
1)导入需要使用的依赖包:
scala> import org.apache.spark.sql._
scala> import java.util.Properties
2)创建MySQL配置文件信息:
scala> val prop = new Properties();
scala> prop.setProperty("user", "root");
scala> prop.setProperty("password", "root");
scala> prop.setProperty("driver", "com.mysql.jdbc.Driver");
3)将数据写入到MySQL中:
scala> starrocksSparkDF.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.110.98:3306/starrocks_export","users",prop);
4)在MySQL数据库中查看数据是否写入:
mysql> select * from users;
+---------------+----------+----------------------+----------------+---------+
| user_id |name | email | address | age |
+---------------+----------+----------------------+----------------+---------+
| 0 | test0 | test@163.com | address0 | 26 |
| 2 | test2 | test@163.com | address2 | 6 |
| 4 | test4 | test@163.com | address4 | 0 |
| 6 | test6 | test@163.com | address6 | 14 |
| 8 | test8 | test@163.com | address8 | 6 |
+---------------+----------+----------------------+----------------+---------+
5)退出spark-shell:
scala> :quit
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。