{ val obj = ParseJsonData.getJsonData(item) obj.isInstanceOf[JSONObject]..._hive 一张表有200个文件 怎么变成10个文件">
赞
踩
import sparkSession.implicits._ ssc.textFile("/user/ods/coursepay.log") .filter(item => { val obj = ParseJsonData.getJsonData(item) obj.isInstanceOf[JSONObject] }).mapPartitions(partitions => { partitions.map(item => { val jsonObject = ParseJsonData.getJsonData(item) val orderid = jsonObject.getString("orderid") val paymoney = jsonObject.getString("paymoney") val discount = jsonObject.getString("discount") val createtime = jsonObject.getString("createitme") val dt = jsonObject.getString("dt") val dn = jsonObject.getString("dn") (orderid, discount, paymoney, createtime, dt, dn) }) }).toDF("orderid", "discount", "paymoney", "createtime", "dt", "dn"). write.partitionBy("dt", "dn"). bucketBy(10, "orderid").sortBy("orderid"). mode(SaveMode.Append).saveAsTable("dwd.dwd_course_pay_cluster")
.toDF("orderid", "discount", "paymoney", "createtime", "dt", "dn").
coalesce(1).
write.partitionBy("dt", "dn").
bucketBy(10, "orderid").sortBy("orderid").
mode(SaveMode.Append).saveAsTable("dwd.dwd_course_pay_cluster")
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。