赞
踩
package sparkSql
import org.apache.spark.SparkConf
import java.util
import java.util.Properties
import org.apache.commons.math3.linear.SparseFieldVector
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import org.apache.spark.rdd.RDD
import redis.clients.jedis.{HostAndPort, Jedis, JedisCluster}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types._
import org.eclipse.jetty.rewrite.handler.RedirectUtil
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
object sparkReadHdfs{
def main(args: Array[String]): Unit = {
//这一块是在写sql的时候,就是sql条件中所传的参数
// val cond = args(0).split("")
// val startTime = cond(0)
// val endTime = cond(1)
val conf = new SparkConf()
.setMaster(“local[*]”)
.setAppName(“sparkReadHdfs”)
.set(“spark.default.parallelism”,“240”)
.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
.set(“spark.shuffle.consolidateFiles”, “true”)
.set(“spark.shuffle.memoryFraction”, “0.2”)
.set(“spark.shuffle.file.buffer”, “32kb”)
//.set(“spark.debug.maxToStringFields”,“1000”)
val spark = SparkSession.builder().config(conf).getOrCreate()
/**
* 读取HDFS上的parquet文件,转换成rdd
* rdd => df
*/
//这里说明一下这个代码只是自己测试用了一下,一开始用sparkstreaming读kafka中数据写入HDFS上保存parquet格式的时候忘了指定schema,所以spark-sql在读的时候我指定了一下
val rr = spark.read.parquet(“hdfs://cdh02:8020/home/tangweihao/data_dir/2019-09-23/part-00000-198479b0-483d-4c6c-af6d-6a7dcda039cd-c000.snappy.parquet”).rdd.repartition(1000)
import spark.implicits.
val ss = rr.map(line => {
val arr = line.toString().split("\|")
val requestIdAcJnl = arr(0)
val nodeId = arr(1)
val startDate = arr(2)
val startTime = arr(3)
val startMSec = arr(4)
val logInTuSkShuKnKISHcd = arr(5)
val logInTuSkKuZbTNcd = arr(6)
val logInTuSkKuZno = arr(7)
val logInTuskLoginId = arr(8)
val knrShShuKnKiShCd = arr(9)
val knrShShZkBtn = arr(10)
val knrShLogInId = arr(11)
val chkBn = arr(12)
val ipAdrs = arr(13)
val identityNo = arr(14)
val sentServer = arr(15)
val receivedServer = arr(16)
val host = arr(17)
val requestContentType = arr(18)
val requestContentLength = arr(19)
val requestParameters = arr(20)
val requestUri = arr(21)
val migRcd = arr(22)
val screenId = arr(23)
val requestQueryval = arr(24)
val requestSessionId = arr(25)
val requestHttpMethod = arr(26)
val requestCookie = arr(27)
val responseContentLength = arr(28)
val responseCookie = arr(29)
val responseStatus = arr(30)
val responseHttpHeader = arr(31)
val responseRedirectLocation = arr(32)
val plShrKbn1 = arr(33)
val blShrSts1 = arr(34)
val msgCode11 = arr(35)
val msgCode12 = arr(36)
val plShrKbn2 = arr(37)
val blShrSts2 = arr(38)
val msgCode21 = arr(39)
val msgCode22 = arr(40)
val returnStatusKbn = arr(41)
val endDate = arr(42)
val endTime = arr(43)
val endMSec = arr(44)
val performance = arr(45)
val starSendFlg = arr(46)
val registration = arr(47)
val registerDate = arr(48)
val createTs = arr(49)
val updateTs = arr(50)
val requestId = arr(51)
val device = arr(52)
Row(requestIdAcJnl,nodeId,startDate,startTime,startMSec,logInTuSkShuKnKISHcd,logInTuSkKuZbTNcd,logInTuSkKuZno,logInTuskLoginId,knrShShuKnKiShCd,knrShShZkBtn,knrShLogInId,chkBn,ipAdrs,identityNo,sentServer,receivedServer,host,requestContentType,requestContentLength,requestParameters,requestUri,migRcd,screenId,requestQueryval,requestSessionId,requestHttpMethod,requestCookie,responseContentLength,responseCookie,responseStatus,responseHttpHeader,responseRedirectLocation,plShrKbn1,blShrSts1,msgCode11,msgCode12,plShrKbn2,blShrSts2,msgCode21,msgCode22,returnStatusKbn,endDate,endTime,endMSec,performance,starSendFlg,registration,registerDate,createTs,updateTs,requestId,device)
})
val schema= StructType(Array(
StructField(“requestIdAcJnl”,StringType,true),
StructField(“nodeId”,StringType,true),
StructField(“startDate”,StringType,true),
StructField(“startTime”,StringType,true),
StructField(“startMSec”,StringType,true),
StructField(“logInTuSkShuKnKISHcd”,StringType,true),
StructField(“logInTuSkKuZbTNcd”,StringType,true),
StructField(“logInTuSkKuZno”,StringType,true),
StructField(“logInTuskLoginId”,StringType,true),
StructField(“knrShShuKnKiShCd”,StringType,true),
StructField(“knrShShZkBtn”,StringType,true),
StructField(“knrShLogInId”,StringType,true),
StructField(“chkBn”,StringType,true),
StructField(“ipAdrs”,StringType,true),
StructField(“identityNo”,StringType,true),
StructField(“sentServer”,StringType,true),
StructField(“receivedServer”,StringType,true),
StructField(“host”,StringType,true),
StructField(“requestContentType”,StringType,true),
StructField(“requestContentLength”,StringType,true),
StructField(“requestParameters”,StringType,true),
StructField(“requestUri”,StringType,true),
StructField(“migRcd”,StringType,true),
StructField(“screenId”,StringType,true),
StructField(“requestQueryval”,StringType,true),
StructField(“requestSessionId”,StringType,true),
StructField(“requestHttpMethod”,StringType,true),
StructField(“requestCookie”,StringType,true),
StructField(“responseContentLength”,StringType,true),
StructField(“responseCookie”,StringType,true),
StructField(“responseStatus”,StringType,true),
StructField(“responseHttpHeader”,StringType,true),
StructField(“responseRedirectLocation”,StringType,true),
StructField(“plShrKbn1”,StringType,true),
StructField(“blShrSts1”,StringType,true),
StructField(“msgCode11”,StringType,true),
StructField(“msgCode12”,StringType,true),
StructField(“plShrKbn2”,StringType,true),
StructField(“blShrSts2”,StringType,true),
StructField(“msgCode21”,StringType,true),
StructField(“msgCode22”,StringType,true),
StructField(“returnStatusKbn”,StringType,true),
StructField(“endDate”,StringType,true),
StructField(“endTime”,StringType,true),
StructField(“endMSec”,StringType,true),
StructField(“performance”,StringType,true),
StructField(“starSendFlg”,StringType,true),
StructField(“registration”,StringType,true),
StructField(“registerDate”,StringType,true),
StructField(“createTs”,StringType,true),
StructField(“updateTs”,StringType,true),
StructField(“requestId”,StringType,true),
StructField(“device”,StringType,true)
))
val df = spark.createDataFrame(ss,schema)
//这里是注册的一张临时表
df.createOrReplaceTempView(“t”)
//这个是真正执行sql的语句
val sql: DataFrame = spark.sql(“select requestIdAcJnl from t as REQUESTIDACJNL”)
sql.show()
// sql.write.format("jdbc") // .option("url","jdbc:mysql://192.168.168.48:3306/cloudera_manager?useSSL=false") // .option("dbtable","request_di_ac_jnl") // .option("driver","com.mysql.jdbc.Driver") // .option("user","cdh") // .option("password","Cdh@pwd:MySQL57!@#$") // .save() //val sqlRDD: RDD[Row] = sql.rdd.repartition(1) /** * 把spark-sql查询到的结果保存到redis集群中 * 03-06 */ sql.foreachPartition(iter => { val password = "Hyron@redis:cluster" val timeout = 10000 val database = 0 val config = new GenericObjectPoolConfig() val hostAndPort1 = new HostAndPort("cdh03", 7380) val hostAndPort2 = new HostAndPort("cdh03", 7381) val hostAndPort3 = new HostAndPort("cdh04", 7380) val hostAndPort4 = new HostAndPort("cdh04", 7381) val hostAndPort5 = new HostAndPort("cdh05", 7380) val hostAndPort6 = new HostAndPort("cdh05", 7381) val hostAndPort7 = new HostAndPort("cdh06", 7380) val hostAndPort8 = new HostAndPort("cdh06", 7381) val hostAndPortSet = new util.HashSet[HostAndPort]() hostAndPortSet.add(hostAndPort1) hostAndPortSet.add(hostAndPort2) hostAndPortSet.add(hostAndPort3) hostAndPortSet.add(hostAndPort4) hostAndPortSet.add(hostAndPort5) hostAndPortSet.add(hostAndPort6) hostAndPortSet.add(hostAndPort7) hostAndPortSet.add(hostAndPort8)
// var jedis = new JedisCluster(hostAndPortSet, timeout, timeout, 1024, password, config)
// iter.foreach(row =>{
// jedis.lpush(“requestIdAcJnl”,row.toString())
// })
//
var arr1 = new ArrayBufferString
iter.foreach(row =>{
arr1.append(row.toString())
})
var jedis = new JedisCluster(hostAndPortSet,timeout,timeout,1024,password,config)
jedis.lpush(“requestIdAcJnl-Test3”,arr1.toString())
})
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。