当前位置:   article > 正文

Spark的那些事(四) java操作kudu全示例(含sparksql)_java使用kudu查询复杂sql

java使用kudu查询复杂sql

上文提到,使用kudu等列式存储将数据以update模式写入kudu.
下面说一下java操作kudu的相关demo。java操作kudu在git上有相关demo,而spark操作kudu并没有。cloudera官网的操作中只提到了scala版本。本文列举java操作kudu的全示例,仅供入门参考。(痛苦的是sparksql查询kudu的java实现,官方没有示例,google也不好用)

1)pom依赖

    <dependency>
        <groupId>org.apache.kudu</groupId>
        <artifactId>kudu-client</artifactId>
        <version>1.5.0-cdh5.13.1</version>
        <scope>test</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-client-tools -->
    <dependency>
        <groupId>org.apache.kudu</groupId>
        <artifactId>kudu-client-tools</artifactId>
        <version>1.5.0-cdh5.13.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 -->
    <dependency>
        <groupId>org.apache.kudu</groupId>
        <artifactId>kudu-spark2_2.11</artifactId>
        <version>1.6.0</version>
    </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

““
本文用的是cloudera版本,添加:

 <repositories>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
  • 1
  • 2
  • 3
  • 4
  • 5

2)功能列表:
使用kuduClient创建表;
使用kuduClient添加数据;
使用kuduClient更新数据;
使用kuduClient查询数据;
使用kuduClient删除表;
使用sparksql查询数据;
使用spark—kuduContext判断表存在

ps:sparksql查询数据在cloudera官网只有scala版本。google也难找到java版的具体写法。查看源码,实际上通过format来指定包路径,制定的路径下包含实现了sparksql的DefaultSource即可。如spark.kudu包中存在DefaultSource类便可以被sparksql识别。
举一反三,其他的库也可以通过此方式访问。同时要扩展集成一个可以供sparksql查询的库也可以通过此方式实现。

package org.apache.kudu.spark.kudu
@org.apache.yetus.audience.InterfaceStability.Unstable
    class DefaultSource() extends scala.AnyRef with org.apache.spark.sql.sources.RelationProvider with org.apache.spark.sql.sources.CreatableRelationProvider with org.apache.spark.sql.sources.SchemaRelationProvider {
  val TABLE_KEY : java.lang.String = { /* compiled code */ }
  val KUDU_MASTER : java.lang.String = { /* compiled code */ }
  val OPERATION : java.lang.String = { /* compiled code */ }
      val FAULT_TOLERANT_SCANNER : java.lang.String = { /* compiled code */ }
  val SCAN_LOCALITY : java.lang.String = { /* compiled code */ }
  def defaultMasterAddrs : scala.Predef.String = { /* compiled code */ }
  override def createRelation(sqlContext : org.apache.spark.sql.SQLContext, parameters : scala.Predef.Map[scala.Predef.String, scala.Predef.String]) : org.apache.spark.sql.sources.BaseRelation = { /* compiled code */ }
  override def createRelation(sqlContext : org.apache.spark.sql.SQLContext, mode : org.apache.spark.sql.SaveMode, parameters : scala.Predef.Map[scala.Predef.String, scala.Predef.String], data : org.apache.spark.sql.DataFrame) : org.apache.spark.sql.sources.BaseRelation = { /* compiled code */ }
  override def createRelation(sqlContext : org.apache.spark.sql.SQLContext, parameters : scala.Predef.Map[scala.Predef.String, scala.Predef.String], schema : org.apache.spark.sql.types.StructType) : org.apache.spark.sql.sources.BaseRelation = { /* compiled code */ }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

3)代码示例:

import jdk.nashorn.internal.ir.annotations.Ignore;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.apache.kudu.spark.kudu.KuduContext;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * @ClassName: KuduUtil
 * @Description:用于操作kudu的示例代码
 * @author jason.li
 * @date 2018年1月11日 下午3:45:06
 */
@Ignore
public class KuduUtil {
    private static final String KUDU_MASTER = "10.1.0.20:7051";
    private static String tableName = "TestKudu";

    @Test
    public void kuduCreateTableTest(){
          KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
    try {
        List<ColumnSchema> columns = new ArrayList(2);
        columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.STRING)
                .key(true)
                .build());
        columns.add(new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING)
                .build());
        List<String> rangeKeys = new ArrayList<>();
        rangeKeys.add("key");
        Schema schema = new Schema(columns);
        client.createTable(tableName, schema,
                new CreateTableOptions().setRangePartitionColumns(rangeKeys));
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        try {
            client.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

@Test
public void kuduSaveTest(){
    KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
    try{
        KuduTable table = client.openTable(tableName);
        KuduSession session = client.newSession();
        System.out.println("-------start--------"+System.currentTimeMillis());
        for (int i = 30000; i < 31000; i++) {
            Insert insert = table.newInsert();
            PartialRow row = insert.getRow();
            row.addString(0, i+"");
            row.addString(1, "aaa");
            OperationResponse operationResponse =  session.apply(insert);
        }
        System.out.println("-------end--------"+System.currentTimeMillis());
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        try {
            client.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

@Test
public void kuduUpdateTest(){

    KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
    try {
    KuduTable table = client.openTable(tableName);
        KuduSession session = client.newSession();
            Update update = table.newUpdate();
            PartialRow row = update.getRow();
            row.addString("key", 4+"");
            row.addString("value", "value " + 10);
        OperationResponse operationResponse =  session.apply(update);

       System.out.print(operationResponse.getRowError());

    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        try {
            client.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

@Test
public void kuduSearchTest(){
    KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();

    try {
        KuduTable table = client.openTable(tableName);
    List<String> projectColumns = new ArrayList<>(1);
    projectColumns.add("value");
    KuduScanner scanner = client.newScannerBuilder(table)
            .setProjectedColumnNames(projectColumns)
            .build();
    while (scanner.hasMoreRows()) {
        RowResultIterator results = scanner.nextRows();
        while (results.hasNext()) {
            RowResult result = results.next();
            System.out.println(result.getString(0));
        }
    }
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        try {
            client.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

@Test
public void kuduDelTabletest(){
    KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
    try {
        client.deleteTable(tableName);
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        try {
            client.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

@Test
public void searchBysparkSql(){
    SparkSession sparkSession = getSparkSession();
    List<StructField> fields = Arrays.asList(
            DataTypes.createStructField("key", DataTypes.StringType, true),
            DataTypes.createStructField("value", DataTypes.StringType, true));
    StructType schema = DataTypes.createStructType(fields);
    Dataset ds =  sparkSession.read().format("org.apache.kudu.spark.kudu").
            schema(schema).option("kudu.master","10.1.0.20:7051").option("kudu.table","TestKudu").load();
    ds.registerTempTable("abc");
    sparkSession.sql("select * from abc").show();
}

@Test
public void checkTableExistByKuduContext(){
    SparkSession sparkSession = getSparkSession();
    KuduContext context = new KuduContext("10.1.0.20:7051",sparkSession.sparkContext());
    System.out.println(tableName +" is exist = "context.tableExists(tableName));
}

public SparkSession getSparkSession(){
    SparkConf conf = new SparkConf().setAppName("test")
            .setMaster("local[*]")
            .set("spark.driver.userClassPathFirst", "true");

    conf.set("spark.sql.crossJoin.enabled", "true");
    SparkContext sparkContext = new SparkContext(conf);
    SparkSession sparkSession = SparkSession.builder().sparkContext(sparkContext).getOrCreate();
    return sparkSession;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184

}

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/417487
推荐阅读
相关标签
  

闽ICP备14008679号