当前位置:   article > 正文

Delta lake with Java--数据增删改查

Delta lake with Java--数据增删改查

之前写的关于spark sql 操作delta lake表的,总觉得有点混乱,今天用Java以真实的数据来进行一次数据的CRUD操作,所涉及的数据来源于Delta lake up and running配套的 GitGitHub - benniehaelen/delta-lake-up-and-running: Companion repository for the book 'Delta Lake Up and Running'

要实现的效果是新建表,导入数据,然后对表进行增删改查操作,具体代码如下:

  1. package detal.lake.java;
  2. import io.delta.tables.DeltaTable;
  3. import org.apache.spark.sql.SparkSession;
  4. import java.text.SimpleDateFormat;
  5. import io.delta.tables.DeltaTable;
  6. import org.apache.spark.sql.*;
  7. import org.apache.spark.sql.types.DataTypes;
  8. import org.apache.spark.sql.types.StructField;
  9. import org.apache.spark.sql.types.StructType;
  10. import java.text.SimpleDateFormat;
  11. import java.util.ArrayList;
  12. import java.util.Date;
  13. import java.util.List;
  14. import java.util.HashMap;
  15. public class DeltaLakeCURD {
  16. //将字符串转换成java.sql.Timestamp
  17. public static java.sql.Timestamp strToSqlDate(String strDate, String dateFormat) {
  18. SimpleDateFormat sf = new SimpleDateFormat(dateFormat);
  19. java.util.Date date = null;
  20. try {
  21. date = sf.parse(strDate);
  22. } catch (Exception e) {
  23. e.printStackTrace();
  24. }
  25. java.sql.Timestamp dateSQL = new java.sql.Timestamp(date.getTime());
  26. return dateSQL;
  27. }
  28. public static void main(String[] args) {
  29. SparkSession spark = SparkSession.builder()
  30. .master("local[*]")
  31. .appName("delta_lake")
  32. .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
  33. .config("spark.databricks.delta.autoCompact.enabled", "true")
  34. .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
  35. .getOrCreate();
  36. SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
  37. String savePath="file:///D:\\\\bigdata\\\\detla-lake-with-java\\\\YellowTaxi";
  38. String csvPath="D:\\bookcode\\delta-lake-up-and-running-main\\data\\YellowTaxisLargeAppend.csv";
  39. String tableName = "taxidb.YellowTaxis";
  40. spark.sql("CREATE DATABASE IF NOT EXISTS taxidb");
  41. //定义表
  42. DeltaTable.createIfNotExists(spark)
  43. .tableName(tableName)
  44. .addColumn("RideId","INT")
  45. .addColumn("VendorId","INT")
  46. .addColumn("PickupTime","TIMESTAMP")
  47. .addColumn("DropTime","TIMESTAMP")
  48. .location(savePath)
  49. .execute();
  50. //加载csv数据并导入delta表
  51. var df=spark.read().format("delta").table(tableName);
  52. var schema=df.schema();
  53. System.out.println(schema.simpleString());
  54. var df_for_append=spark.read().option("header","true").schema(schema).csv(csvPath);
  55. System.out.println("记录总行数:"+df_for_append.count());
  56. System.out.println("导入数据,开始时间"+ sdf.format(new Date()));
  57. df_for_append.write().format("delta").mode(SaveMode.Overwrite).saveAsTable(tableName);
  58. System.out.println("导入数据,结束时间" + sdf.format(new Date()));
  59. DeltaTable deltaTable = DeltaTable.forName(spark,tableName);
  60. //插入数据
  61. List<Row> list = new ArrayList<Row>();
  62. list.add(RowFactory.create(-1,-1,strToSqlDate("2023-01-01 10:00:00","yyyy-MM-dd HH:mm:ss"),strToSqlDate("2023-01-01 10:00:00","yyyy-MM-dd HH:mm:ss")));
  63. List<StructField> structFields = new ArrayList<>();
  64. structFields.add(DataTypes.createStructField("RideId", DataTypes.IntegerType, true));
  65. structFields.add(DataTypes.createStructField("VendorId", DataTypes.IntegerType, true));
  66. structFields.add(DataTypes.createStructField("PickupTime", DataTypes.TimestampType, true));
  67. structFields.add(DataTypes.createStructField("DropTime", DataTypes.TimestampType, true));
  68. StructType structType = DataTypes.createStructType(structFields);
  69. var yellowTaxipDF=spark.createDataFrame(list,structType); //建立需要新增数据并转换成dataframe
  70. System.out.println("插入数据,开始时间"+ sdf.format(new Date()));
  71. yellowTaxipDF.write().format("delta").mode(SaveMode.Append).saveAsTable(tableName);
  72. System.out.println("插入数据,结束时间"+ sdf.format(new Date()));
  73. System.out.println("插入后数据");
  74. deltaTable.toDF().select("*").where("RideId=-1").show(false);
  75. //更新数据
  76. System.out.println("更新前数据");
  77. deltaTable.toDF().select("*").where("RideId=999994").show(false);
  78. System.out.println("更新数据,开始时间"+ sdf.format(new Date()));
  79. deltaTable.updateExpr(
  80. "RideId = 999994",
  81. new HashMap<String, String>() {{
  82. put("VendorId", "250");
  83. }}
  84. );
  85. System.out.println("更新数据,结束时间"+ sdf.format(new Date()));
  86. System.out.println("更新后数据");
  87. deltaTable.toDF().select("*").where("RideId=999994").show(false);
  88. //查询数据
  89. System.out.println("查询数据,开始时间"+ sdf.format(new Date()));
  90. var selectDf= deltaTable.toDF().select("*").where("RideId=1");
  91. selectDf.show(false);
  92. System.out.println("查询数据,结束时间" + sdf.format(new Date()));
  93. //删除数据
  94. System.out.println("删除数据,开始时间"+ sdf.format(new Date()));
  95. deltaTable.delete("RideId=1");
  96. System.out.println("删除数据,结束时间"+ sdf.format(new Date()));
  97. deltaTable.toDF().select("*").where("RideId=1").show(false);
  98. }
  99. }

里面涉及spark的TimestampType类型,如何将字符串输入到TimestampType列,找了几个小时才找到答案,具体参考了如下连接,原来直接将string转成java.sql.Timestamp即可,于是在网上找了一个方法,实现了转换,转换代码非原创,也是借鉴其他大牛的。

scala - How to create TimestampType column in spark from string - Stack Overflow

最后运行结果

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/534435
推荐阅读
相关标签
  

闽ICP备14008679号