赞
踩
<dependencies> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client</artifactId> <version>1.9.0</version> </dependency> <!--导入kudu的客户端工具--> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client-tools</artifactId> <version>${kudu.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies>
public class TestKudu { //声明全局变量 KuduClient 后期通过它来操作 kudu 表 private KuduClient kuduClient; //指定 kuduMaster 地址 private String kuduMaster; //指定表名 private String tableName; @Before public void init(){ //初始化操作 kuduMaster="node1:7051,node2:7051,node3:7051"; //指定表名 tableName="student"; KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(kuduMaster); kuduClientBuilder.defaultSocketReadTimeoutMs(10000); kuduClient=kuduClientBuilder.build(); }
/** \* 创建表 */ @Test public void createTable() throws KuduException { //1、创建一个client //KuduClient client = new KuduClientBuilder(KUDU_MASTER).build(); //判断表是否存在,不存在就构建 if(!kuduClient.tableExists(tableName)){ //构建创建表的 schema 信息-----就是表的字段和类型 ArrayList<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>(); columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).nullable(false).build()); columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).build()); columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).build()); columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("sex", Type.INT32).build()); Schema schema = new Schema(columnSchemas); //指定创建表的相关属性 CreateTableOptions options = new CreateTableOptions(); ArrayList<String> partitionList = new ArrayList<String>(); //指定 kudu 表的分区字段是什么 partitionList.add("id"); // 按照 id.hashcode % 分区数 = 分区号 options.addHashPartitions(partitionList,6); //4、指定分区方式为hash分区、6个分区,一个副本 options.setNumReplicas(1); kuduClient.createTable(tableName,schema,options); } }
/** * 向表加载数据 */ @Test public void insertTable() throws KuduException { //向表加载数据需要一个 kuduSession 对象 KuduSession kuduSession = kuduClient.newSession(); kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC); //需要使用 kuduTable 来构建 Operation 的子类实例对象 KuduTable kuduTable = kuduClient.openTable(tableName); for(int i=1;i<=10;i++){ Insert insert = kuduTable.newInsert(); PartialRow row = insert.getRow(); row.addInt("id",i); row.addString("name","zhangsan-"+i); row.addInt("age",20+i); row.addInt("sex",i%2); kuduSession.apply(insert);//最后实现执行数据的加载操作 } session.close(); client.close(); }
/* 查询表的数据结果*/ @Test public void queryData() throws KuduException { //构建一个查询的扫描器 KuduScanner.KuduScannerBuilder kuduScannerBuilder = kuduClient.newScannerBuilder(kuduClient.openTable(tableName)); ArrayList<String> columnsList = new ArrayList<String>(); columnsList.add("id"); columnsList.add("name"); columnsList.add("age"); columnsList.add("sex"); kuduScannerBuilder.setProjectedColumnNames(columnsList); //返回结果集 KuduScanner kuduScanner = kuduScannerBuilder.build(); //遍历 while (kuduScanner.hasMoreRows()){ RowResultIterator rowResults = kuduScanner.nextRows(); while (rowResults.hasNext()){ RowResult row = rowResults.next(); int id = row.getInt("id"); String name = row.getString("name"); int age = row.getInt("age"); int sex = row.getInt("sex"); System.out.println("id="+id+" name="+name+" age="+age+" sex="+sex); } } //7、关闭client client.close(); }
/** * 修改表的数据 */ @Test public void updateData() throws KuduException { //修改表的数据需要一个 kuduSession 对象 KuduSession kuduSession = kuduClient.newSession(); kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC); //需要使用 kuduTable 来构建 Operation 的子类实例对象 KuduTable kuduTable = kuduClient.openTable(tableName); //Update update = kuduTable.newUpdate(); Upsert upsert = kuduTable.newUpsert(); //如果 id 存在就表示修改,不存在就新增 PartialRow row = upsert.getRow(); row.addInt("id",100); row.addString("name","zhangsan-100"); row.addInt("age",100); row.addInt("sex",0); kuduSession.apply(upsert);//最后实现执行数据的修改操作 kuduSession.flush(); kuduSession.close(); client.close(); }
/* 删除数据 */ @Test public void deleteData() throws KuduException { //删除表的数据需要一个 kuduSession 对象 KuduSession kuduSession = kuduClient.newSession(); kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC); //需要使用 kuduTable 来构建 Operation 的子类实例对象 KuduTable kuduTable = kuduClient.openTable(tableName); Delete delete = kuduTable.newDelete(); PartialRow row = delete.getRow(); row.addInt("id",100); kuduSession.apply(delete);//最后实现执行数据的删除操作 kuduSession.flush(); kuduSession.close(); } /* 删除表 */ @Test public void dropTable() throws KuduException { if(kuduClient.tableExists(tableName)){ kuduClient.deleteTable(tableName); } }
/** * 测试分区: * 多级分区 * Multilevel Partition * 混合使用 hash 分区和 range 分区 * * 哈希分区有利于提高写入数据的吞吐量,而范围分区可以避免 tablet 无限增长问题, * hash 分区和 range 分区结合,可以极大的提升 kudu 的性能 */ @Test public void testMultilevelPartition() throws KuduException { //设置表的 schema LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>(); columnSchemas.add(newColumn("CompanyId", Type.INT32,true)); columnSchemas.add(newColumn("WorkId", Type.INT32,false)); columnSchemas.add(newColumn("Name", Type.STRING,false)); columnSchemas.add(newColumn("Gender", Type.STRING,false)); columnSchemas.add(newColumn("Photo", Type.STRING,false)); //创建 schema Schema schema = new Schema(columnSchemas); //创建表时提供的所有选项 CreateTableOptions tableOptions = new CreateTableOptions(); //设置副本数 tableOptions.setNumReplicas(1); //设置范围分区的规则 LinkedList<String> parcols = new LinkedList<String>(); parcols.add("CompanyId"); //hash 分区 tableOptions.addHashPartitions(parcols,5); //range 分区 int count=0; for(int i=0;i<10;i++){ PartialRow lower = schema.newPartialRow(); lower.addInt("CompanyId",count); count+=10; PartialRow upper = schema.newPartialRow(); upper.addInt("CompanyId",count); tableOptions.addRangePartition(lower,upper); } try { kuduClient.createTable("cat",schema,tableOptions); } catch (KuduException e) { e.printStackTrace(); } kuduClient.close(); }
KuduContext。这是可在 Spark 应用程序中广播的主要可序列化对象。此类代表在 Spark 执行程序中与 Kudu Java 客户端进行交互。 KuduContext 提供执行DDL 操作所需的方法,与本机 Kudu RDD 的接口,对数据执行更新/插入/删除,将数据类型从 Kudu 转换为 Spark 等。
<repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client-tools</artifactId> <version>1.9.0-cdh6.2.0</version> </dependency> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client</artifactId> <version>1.9.0-cdh6.2.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 --> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-spark2_2.11</artifactId> <version>1.9.0-cdh6.2.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.0</version> </dependency> </dependencies>
定义 kudu 的表需要分成 5 个步骤:
1:提供表名
2:提供 schema
3:提供主键
4:定义重要选项;例如:定义分区的 schema
5:调用 create Table api
object SparkKuduTest { def main(args: Array[String]): Unit = { //构建 sparkConf 对象 val sparkConf: SparkConf = new SparkConf().setAppName("SparkKuduTest").setMaster("local[2]") //构建 SparkSession 对象 val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() //获取 sparkContext 对象 val sc: SparkContext = sparkSession.sparkContext sc.setLogLevel("warn") //构建 KuduContext 对象 val kuduContext = new KuduContext("node1:7051,node2:7051,node3:7051",sc) //1.创建表操作 createTable(kuduContext) /** * 创建表 * @param kuduContext * @return */ private def createTable(kuduContext: KuduContext) = { //1.1 定义表名 val tableName = "spark_kudu" //1.2 定义表的 schema val schema = StructType( StructField("userId", StringType, false) :: StructField("name", StringType, false) :: StructField("age", IntegerType, false) :: StructField("sex", StringType, false) :: Nil) //1.3 定义表的主键 val primaryKey = Seq("userId") //1.4 定义分区的 schema val options = new CreateTableOptions //设置分区 options.setRangePartitionColumns(List("userId").asJava) //设置副本 options.setNumReplicas(1) //1.5 创建表 if(!kuduContext.tableExists(tableName)){ kuduContext.createTable(tableName, schema, primaryKey, options) } } }
定义表时要注意的是 Kudu 表选项值。你会注意到在指定组成范围分区列的列名列表时我们调用“asJava”方法。这是因为在这里,我们调用了 Kudu Java 客户端本身,它需要 Java 对象(即 java.util.List)而不是 Scala 的List 对象;(要使“asJava”方法可用,请记住导入 JavaConverters 库。)
创建表后,通过将浏览器指向 http// master 主机名:8051/tables 来查看Kudu 主 UI 可以找到创建的表,通过单击表 ID,能够看到表模式和分区信息。
Kudu 支持许多 DML 类型的操作,其中一些操作包含在 Spark on Kudu 集成.
包括:
INSERT - 将DataFrame的行插入Kudu表。请注意,虽然API完全支持INSERT,但不鼓励在 Spark 中使用它。使用 INSERT 是有风险的,因为 Spark 任务可能需要重新执行,这意味着可能要求再次插入已插入的行。这样做会导致失败,因为如果行已经存在,INSERT 将不允许插入行(导致失败)。
相反,我们鼓励使用下面描述的 INSERT_IGNORE。
INSERT-IGNORE - 将 DataFrame 的行插入 Kudu 表。如果表存在,则忽略插入动作。
DELETE - 从 Kudu 表中删除 DataFrame 中的行
UPSERT - 如果存在,则在 Kudu 表中更新 DataFrame 中的行,否则执行插入操作。
UPDATE - 更新 dataframe 中的行
先创建一张表,然后把数据插入到表中。
case class People(id:Int,name:String,age:Int) object DataFrameKudu { def main(args: Array[String]): Unit = { //构建 SparkConf 对象 val sparkConf: SparkConf = new SparkConf().setAppName("DataFrameKudu").setMaster("local[2]") //构建 SparkSession 对象 val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() //获取 SparkContext 对象 val sc: SparkContext = sparkSession.sparkContext sc.setLogLevel("warn") //指定 kudu 的 master 地址 val kuduMaster="node1:7051,node2:7051,node3:7051" //构建 KuduContext 对象 val kuduContext = new KuduContext(kuduMaster,sc) //定义表名 val tableName="people" //1、创建表 createTable(kuduContext, tableName) //2、插入数据到表中 insertData2table(sparkSession,sc, kuduContext, tableName) } /** * 创建表 * @param kuduContext * @param tableName */ private def createTable(kuduContext: KuduContext, tableName: String): Unit = { //定义表的 schema val schema = StructType( StructField("id", IntegerType, false) :: StructField("name", StringType, false) :: StructField("age", IntegerType, false) :: Nil ) //定义表的主键 val tablePrimaryKey = List("id") //定义表的选项配置 val options = new CreateTableOptions options.setRangePartitionColumns(List("id").asJava) options.setNumReplicas(1) //创建表 if (!kuduContext.tableExists(tableName)) { kuduContext.createTable(tableName, schema, tablePrimaryKey, options) } } /** * 插入数据到表中 * @param sparkSession * @param sc * @param kuduContext * @param tableName */ private def insertData2table(sparkSession:SparkSession,sc: SparkContext, kuduContext: KuduContext, tableName: String): Unit = { //准备数据 val data = List(People(1, "zhangsan", 20), People(2, "lisi", 30), People(3, "wangwu", 40)) val peopleRDD: RDD[People] = sc.parallelize(data) import sparkSession.implicits._ val peopleDF: DataFrame = peopleRDD.toDF kuduContext.insertRows(peopleDF, tableName) } }
/** * 删除表的数据 * @param sparkSession * @param sc * @param kuduMaster * @param kuduContext * @param tableName */ private def deleteData(sparkSession: SparkSession, sc: SparkContext, kuduMaster: String, kuduContext: KuduContext, tableName: String): Unit = { //定义一个 map 集合,封装 kudu 的相关信息 val options = Map( "kudu.master" -> kuduMaster, "kudu.table" -> tableName ) import sparkSession.implicits._ val data = List(People(1, "zhangsan", 20), People(2, "lisi", 30), People(3, "wangwu", 40)) val dataFrame: DataFrame = sc.parallelize(data).toDF dataFrame.createTempView("temp") //获取年龄大于 30 的所有用户 id val result: DataFrame = sparkSession.sql("select id from temp where age >30") //删除对应的数据,这里必须要是主键字段 kuduContext.deleteRows(result, tableName) }
/** * 更新数据--添加数据 * * @param sc * @param kuduMaster * @param kuduContext * @param tableName */ private def UpsertData(sparkSession: SparkSession,sc: SparkContext, kuduMaster: String, kuduContext: KuduContext, tableName: String): Unit = { //更新表中的数据 //定义一个 map 集合,封装 kudu 的相关信息 val options = Map( "kudu.master" -> kuduMaster, "kudu.table" -> tableName ) import sparkSession.implicits._ val data = List(People(1, "zhangsan", 50), People(5, "tom", 30)) val dataFrame: DataFrame = sc.parallelize(data).toDF //如果存在就是更新,否则就是插入 kuduContext.upsertRows(dataFrame, tableName) }
/** * 更新数据 * @param sparkSession * @param sc * @param kuduMaster * @param kuduContext * @param tableName */ private def updateData(sparkSession: SparkSession,sc: SparkContext, kuduMaster: String, kuduContext: KuduContext, tableName: String): Unit = { //定义一个 map 集合,封装 kudu 的相关信息 val options = Map( "kudu.master" -> kuduMaster, "kudu.table" -> tableName ) import sparkSession.implicits._ val data = List(People(1, "zhangsan", 60), People(6, "tom", 30)) val dataFrame: DataFrame = sc.parallelize(data).toDF //如果存在就是更新,否则就是报错 kuduContext.updateRows(dataFrame, tableName) }
虽然我们可以通过上面显示的 KuduContext 执行大量操作,但我们还可以直接从默认数据源本身调用读/写 API。要设置读取,我们需要为 Kudu 表指定选项,命名我们要读取的表以及为表提供服务的 Kudu 集群的 Kudu 主服务器列表。
/**
* 使用 DataFrameApi 读取 kudu 表中的数据
* @param sparkSession
* @param kuduMaster
* @param tableName
*/
private def getTableData(sparkSession: SparkSession, kuduMaster: String, tableName: String): Unit = {
//定义 map 集合,封装 kudu 的 master 地址和要读取的表名
val options = Map(
"kudu.master" -> kuduMaster,
"kudu.table" -> tableName
)
sparkSession.read.options(options).kudu.show()
}
在通过 DataFrame API 编写时,目前只支持一种模式“append”。尚未实现的“覆盖”模式。
/** * DataFrame api 写数据到 kudu 表 * @param sparkSession * @param sc * @param kuduMaster * @param tableName */ private def dataFrame2kudu(sparkSession: SparkSession, sc: SparkContext, kuduMaster: String, tableName: String): Unit = { //定义 map 集合,封装 kudu 的 master 地址和要读取的表名 val options = Map( "kudu.master" -> kuduMaster, "kudu.table" -> tableName ) val data = List(People(7, "jim", 30), People(8, "xiaoming", 40)) import sparkSession.implicits._ val dataFrame: DataFrame = sc.parallelize(data).toDF //把 dataFrame 结果写入到 kudu 表中 ,目前只支持 append 追加 dataFrame.write.options(options).mode("append").kudu //查看结果 //导包 import org.apache.kudu.spark.kudu._ //加载表的数据,导包调用 kudu 方法,转换为 dataFrame,最后在使用 show 方法显示结果 sparkSession.read.options(options).kudu.show() }
可以选择使用 Spark SQL 直接使用 INSERT 语句写入 Kudu 表;与’append’类似,INSERT 语句实际上将默认使用 UPSERT 语义处理.
/** * 使用 sparksql 操作 kudu 表 * @param sparkSession * @param sc * @param kuduMaster * @param tableName */ private def SparkSql2Kudu(sparkSession: SparkSession, sc: SparkContext, kuduMaster: String, tableName: String): Unit = { //定义 map 集合,封装 kudu 的 master 地址和表名 val options = Map( "kudu.master" -> kuduMaster, "kudu.table" -> tableName ) val data = List(People(10, "小张", 30), People(11, "小王", 40)) import sparkSession.implicits._ val dataFrame: DataFrame = sc.parallelize(data).toDF //把 dataFrame 注册成一张表 dataFrame.createTempView("temp1") //获取 kudu 表中的数据,然后注册成一张表 sparkSession.read.options(options).kudu.createTempView("temp2") //使用 sparkSQL 的 insert 操作插入数据 sparkSession.sql("insert into table temp2 select * from temp1") sparkSession.sql("select * from temp2 where age >30").show() }
Spark 与 Kudu 的集成同时提供了 kudu RDD.
//使用 kuduContext 对象调用 kuduRDD 方法,需要 sparkContext 对象,表名,想要的字段名称
val kuduRDD: RDD[Row] = kuduContext.kuduRDD(sc,tableName,Seq("name","age"))
//操作该 rdd 打印输出
val result: RDD[(String, Int)] = kuduRDD.map {
case Row(name: String, age: Int) => (name, age)
}
result.foreach(println)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。