当前位置:   article > 正文

spark实战:对日志进行查询_②从syslog日志生成dateframe syslog每行的数据类似以下: nov 24 13:1

②从syslog日志生成dateframe syslog每行的数据类似以下: nov 24 13:17:01 spark

应用spark的版本是1.5
代码如下:

//import SogouQ3_utf.txt as RDD, and convert this RDD to Dataframe
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType,StructField,StringType}
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val data = sc.textFile("file:/usr/local/test/SogouQ3_utf.txt")
val schemaString = "time user word ranking number url"
val schema =
  StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD = data.map(_.split("\t")).map(p=>Row(p(0),p(1),p(2),p(3),p(4),p(5)))
val dataDataFrame = sqlContext.createDataFrame(rowRDD, schema)
dataDataFrame.registerTempTable("data")
dataDataFrame.printSchema()
val df = dataDataFrame
//most hot query words top10
val grouped1 = df.groupBy("word").count()
val grouped1_sort = grouped1.sort(grouped1("count").desc)
grouped1_sort.registerTempTable("word_top10")
val result1 = sqlContext.sql("select * from word_top10 limit 10")
//user who queries top10
val grouped2 = df.groupBy("user").count()
val grouped2_sort = grouped2.sort(grouped2("count").desc)
grouped2_sort.registerTempTable("user_top10")
val result2 = sqlContext.sql("select * from user_top10 limit 10")
//most hot website top50
val data_regex = sqlContext.sql("""select time, user, word, ranking, number, 
regexp_extract(url, '(http|https)://([\!\w\.-]+)/', 2) from data""").toDF("time", "user", "word", "ranking", 
"number", "url_regex")
val grouped3 = data_regex.groupBy("url_regex").count()
val grouped3_sort = grouped3.sort(grouped3("count").desc)
grouped3_sort.registerTempTable("site_top50")
val result3 = sqlContext.sql("select * from site_top50 limit 50")
//save to hadoop
import com.databricks.spark.csv
val saveOptions1 = Map("header" -> "true", "path" -> "/output1")
result1.write.format("com.databricks.spark.csv").options(saveOptions1).save()
val saveOptions2 = Map("header" -> "true", "path" -> "/output2")
result2.write.format("com.databricks.spark.csv").options(saveOptions2).save()
val saveOptions3 = Map("header" -> "true", "path" -> "/output3")
result3.write.format("com.databricks.spark.csv").options(saveOptions3).save()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/640198
推荐阅读
相关标签
  

闽ICP备14008679号