赞
踩
$SPARK_HOME/bin/spark-shell --master local[10] --jars {PATH}/spark-sql-perf-1.2/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar
sql(s"drop database if exists $databaseName CASCADE")
val tools_path = "/opt/Beaver/tpcds-kit/tools" val data_path = "hdfs://{IP}:9000/tpcds_parquet_tpcds_kit_1_0/1" val database_name = "tpcds_parquet_tpcds_kit_1_0_scale_1_db" val scale = "1" val p = scale.toInt / 2048.0 val catalog_returns_p = (263 * p + 1).toInt val catalog_sales_p = (2285 * p * 0.5 * 0.5 + 1).toInt val store_returns_p = (429 * p + 1).toInt val store_sales_p = (3164 * p * 0.5 * 0.5 + 1).toInt val web_returns_p = (198 * p + 1).toInt val web_sales_p = (1207 * p * 0.5 * 0.5 + 1).toInt val format = "parquet" val codec = "snappy" val useDoubleForDecimal = false val partitionTables = false val clusterByPartitionColumns = partitionTables import com.databricks.spark.sql.perf.tpcds.TPCDSTables spark.sqlContext.setConf(s"spark.sql.$format.compression.codec", codec) val tables = new TPCDSTables(spark.sqlContext, tools_path, scale, useDoubleForDecimal) tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "call_center", 1) tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "catalog_page", 1) tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "customer", 6) tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "customer_address", 1) tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "customer_demographics", 1) tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "date_dim", 1) tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "household_demographics", 1) tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "income_band", 1) tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "inventory", 6) tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "item", 1) tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "promotion", 1) tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "reason", 1) tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "ship_mode", 1) tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "store", 1) tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "time_dim", 1) tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "warehouse", 1) tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "web_page", 1) tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "web_site", 1) tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "catalog_sales", catalog_sales_p) tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "catalog_returns", catalog_returns_p) tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "store_sales", store_sales_p) tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "store_returns", store_returns_p) tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "web_sales", web_sales_p) tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "web_returns", web_returns_p) tables.createExternalTables(data_path, format, database_name, overwrite = true, discoverPartitions = partitionTables)
// “file:///” , 创建本地文件,须要三条斜杠
val data_path = “file:///home/data/tpcds_parquet_tpcds_kit_1_0/1”
import com.databricks.spark.sql.perf.tpch._ val tools_path = "/opt/Beaver/tpch-dbgen" val format = "orc" val useDoubleForDecimal = false val partitionTables = false val scaleFactor = "1" val data_path = s"hdfs://{IP}:9000/tpch_${format}_${scaleFactor}" val numPartitions =1 val databaseName = s"tpch_${format}_${scaleFactor}_db" val clusterByPartitionColumns = partitionTables val tables = new TPCHTables(spark, spark.sqlContext, dbgenDir = tools_path, scaleFactor = scaleFactor, useDoubleForDecimal = useDoubleForDecimal, useStringForDate = false) spark.sqlContext.setConf("spark.sql.files.maxRecordsPerFile", "200000000") tables.genData( location = data_path, format = format, overwrite = true, // overwrite the data that is already there partitionTables, // do not create the partitioned fact tables clusterByPartitionColumns, // shuffle to get partitions coalesced into single files. filterOutNullPartitionValues = false, // true to filter out the partition with NULL key value tableFilter = "", // "" means generate all tables numPartitions = numPartitions) // how many dsdgen partitions to run - number of input tasks. // Create the specified database sql(s"drop database if exists $databaseName CASCADE") sql(s"create database $databaseName") // Create metastore tables in a specified database for your data. // Once tables are created, the current database will be switched to the specified database. tables.createExternalTables(data_path, format, databaseName, overwrite = true, discoverPartitions = false)
val tools_path = "/opt/Beaver/tpcds-kit/tools"
val data_path = "hdfs://{IP}:9000/{PATH}/user/sparkuser/part_tpcds_decimal_1000/"
val database_name = "sr242_parquet_part_tpcds_decimal_1000"
val scale = "1000"
val useDoubleForDecimal = false
val format = "parquet"
val partitionTables = true
import com.databricks.spark.sql.perf.tpcds.TPCDSTables
val tables = new TPCDSTables(spark, spark.sqlContext, tools_path, scale, useDoubleForDecimal)
tables.createExternalTables(data_path, format, database_name, overwrite = true, discoverPartitions = partitionTables)
val data_path= "hdfs://{IP}:9000/{PATH}/user/sparkuser/tpch_nonpart_1000/" val databaseName = "arrow_tpch_nonpart_1000" val tables = Seq("customer", "lineitem", "nation", "orders", "part", "partsupp", "region", "supplier") val partitionTables = false spark.sql(s"DROP database if exists $databaseName CASCADE") if (spark.catalog.databaseExists(s"$databaseName")) { println(s"$databaseName has exists!") }else{ spark.sql(s"create database if not exists $databaseName").show spark.sql(s"use $databaseName").show for (table <- tables) { if (spark.catalog.tableExists(s"$table")){ println(s"$table has exists!") }else{ spark.catalog.createTable(s"$table", s"$data_path/$table", "arrow") } } if (partitionTables) { for (table <- tables) { try{ spark.sql(s"ALTER TABLE $table RECOVER PARTITIONS").show }catch{ case e: Exception => println(e) } } } }
val data_path= "hdfs://{IP}:9000/{PATH}/part_tpcds_decimal_1000/" val databaseName = "arrow_part_tpcds_decimal_1000" val tables = Seq("call_center", "catalog_page", "catalog_returns", "catalog_sales", "customer", "customer_address", "customer_demographics", "date_dim", "household_demographics", "income_band", "inventory", "item", "promotion", "reason", "ship_mode", "store", "store_returns", "store_sales", "time_dim", "warehouse", "web_page", "web_returns", "web_sales", "web_site") val partitionTables = true spark.sql(s"DROP database if exists $databaseName CASCADE") if (spark.catalog.databaseExists(s"$databaseName")) { println(s"$databaseName has exists!") }else{ spark.sql(s"create database if not exists $databaseName").show spark.sql(s"use $databaseName").show for (table <- tables) { if (spark.catalog.tableExists(s"$table")){ println(s"$table has exists!") }else{ spark.catalog.createTable(s"$table", s"$data_path/$table", "arrow") } } if (partitionTables) { for (table <- tables) { try{ spark.sql(s"ALTER TABLE $table RECOVER PARTITIONS").show }catch{ case e: Exception => println(e) } } } }
val data_path= "hdfs://{IP}:9000/{PATH}/part_tpcds_decimal_1000/"
val databaseName = "parquet_part_tpcds_decimal_1000"
val tables = Seq("call_center", "catalog_page", "catalog_returns", "catalog_sales", "customer", "customer_address", "customer_demographics", "date_dim", "household_demographics", "income_band", "inventory", "item", "promotion", "reason", "ship_mode", "store", "store_returns", "store_sales", "time_dim", "warehouse", "web_page", "web_returns", "web_sales", "web_site")
spark.sql(s"use $databaseName").show
for (table <- tables) {
try{
spark.sql(s"ALTER TABLE $table SET LOCATION '$data_path/$table'").show
}catch{
case e: Exception => println(e)
}
}
val resourcePath: String = "file:/mnt/DP_disk3/tpcds-generated/" val fileFormat: String = "parquet" val databaseName = "trino_tpcds" spark.sql(s"create database if not exists $databaseName").show spark.sql(s"use $databaseName").show Seq( "call_center", "catalog_page", "catalog_returns", "catalog_sales", "customer", "customer_address", "customer_demographics", "date_dim", "household_demographics", "income_band", "inventory", "item", "promotion", "reason", "ship_mode", "store", "store_returns", "store_sales", "time_dim", "warehouse", "web_page", "web_returns", "web_sales", "web_site" ).foreach { table => spark.catalog.createTable(table, s"$resourcePath/$table", fileFormat) try { spark.catalog.recoverPartitions(table) } catch { case _: Throwable => } }
import org.apache.spark.sql.execution.debug._ import scala.io.Source import java.io.File import java.util.Arrays import sys.process._ // Configurations: // 103个queries所在路径 var queries = "{tpcds-queries}" val database = "{database_name}" def time[R](block: => R): R = { val t0 = System.nanoTime() val result = block // call-by-name val t1 = System.nanoTime() println("Elapsed time: " + (t1 - t0)/1000000000.0 + " seconds") result } def getListOfFiles(dir: String): List[File] = { val d = new File(dir) if (d.exists && d.isDirectory) { // You can run a specific query by using below line. // d.listFiles.filter(_.isFile).filter(_.getName().contains("17.sql")).toList d.listFiles.filter(_.isFile).toList } else { List[File]() } } val fileLists = getListOfFiles(queries) val sorted = fileLists.sortBy { f => f.getName match { case name => var str = name str = str.replaceFirst("a", ".1") str = str.replaceFirst("b", ".2") str = str.replaceFirst(".sql", "") str = str.replaceFirst("q", "") str.toDouble }} // Main program to run TPC-DS testing spark.sql(s"use $database") for (t <- sorted) { println(t) val fileContents = Source.fromFile(t).getLines.filter(!_.startsWith("--")).mkString(" ") println(fileContents) try { time{spark.sql(fileContents).show} // spark.sql(fileContents).explain Thread.sleep(2000) } catch { case e: Exception => None } }
import org.apache.spark.sql.execution.debug._ import scala.io.Source import java.io.File import java.util.Arrays import sys.process._ import java.io.FileWriter // Configurations: // 103个queries所在路径 var queries = "$path/tpcds-queries-nodecimal-nodate/" val database = "tpcds_parquet_1000_db" def time[R](block: => R): Double = { val t0 = System.nanoTime() val result = block // call-by-name val t1 = System.nanoTime() val elapse = (t1 - t0)/1000000000.0 println("Elapsed time: " + elapse + " seconds") elapse } def getListOfFiles(dir: String): List[File] = { val d = new File(dir) if (d.exists && d.isDirectory) { // You can run a specific query by using below line. // d.listFiles.filter(_.isFile).filter(_.getName().contains("17.sql")).toList d.listFiles.filter(_.isFile).toList } else { List[File]() } } val fileLists = getListOfFiles(queries) val sorted = fileLists.sortBy { f => f.getName match { case name => var str = name str = str.replaceFirst("a", ".1") str = str.replaceFirst("b", ".2") str = str.replaceFirst(".sql", "") str = str.replaceFirst("q", "") str.toDouble }} // Main program to run TPC-DS testing spark.sql(s"use $database") for (t <- sorted) { val each_round_result = new FileWriter(s"./result.csv",true) println(t) val fileContents = Source.fromFile(t).getLines.filter(!_.startsWith("--")).mkString(" ") println(fileContents) try { val elapse = time{spark.sql(fileContents).show} // spark.sql(fileContents).explain Thread.sleep(2000) each_round_result.write(s"$t, $elapse, Success\n") } catch { case e: Exception => { each_round_result.write(s"$t, -1, Fail\n") } } each_round_result.close }
以下是跑具体某条SQL 的测试方式
val path = s"${path}/tpcds-queries-nodecimal-nodate/q77.sql"
val fileContents = Source.fromFile(path).getLines.filter(!_.startsWith("--")).mkString(" ")
println(fileContents)
time{spark.sql(fileContents).show}
// 单独SQL import org.apache.spark.sql.execution.debug._ import scala.io.Source import java.io.File import java.util.Arrays import sys.process._ import java.io.FileWriter // Configurations: // 103个queries所在路径 var queries = "/home/hadoop/tpcds/sql/" val database = "cosn_parquet_tpcds_kit_1000" def time[R](block: => R): Double = { val t0 = System.nanoTime() val result = block // call-by-name val t1 = System.nanoTime() val elapse = (t1 - t0)/1000000000.0 println("Elapsed time: " + elapse + " seconds") elapse } // Main program to run TPC-DS testing spark.sql(s"use $database") val path = s"${queries}/q13.sql" val fileContents = Source.fromFile(path).getLines.filter(!_.startsWith("--")).mkString(" ") println(fileContents) time{spark.sql(fileContents).show}
Note: 关于 scala的代码记录,可以参考 .scala_history
可以在mvn 编译之前添加源
#~/.m2/settings.xml
<mirror>
<id>aliyun</id>
<mirrorOf>central</mirrorOf>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>
cd ~
git clone https://github.com/hortonworks/hive-testbench.git
cd hive-testbench/
./tpcds-build.sh
SF=3
./tpcds-setup.sh $SF
遇到阻塞直接查看脚本。
如遇到以下问题,直接查看数据生成目录
ls: `/tmp/tpcds-generate/3': No such file or directory
Generating data at scale factor 3.
TPC-DS text data generation complete.
Loading text data into external tables.
make: *** [date_dim] Error 1
Loading constraints
$ hadoop fs -du -h /tmp/tpcds-generate/3 0 0 /tmp/tpcds-generate/3/_SUCCESS 3.0 K 6.1 K /tmp/tpcds-generate/3/call_center 1.6 M 3.1 M /tmp/tpcds-generate/3/catalog_page 62.0 M 124.1 M /tmp/tpcds-generate/3/catalog_returns 854.8 M 1.7 G /tmp/tpcds-generate/3/catalog_sales 23.8 M 47.6 M /tmp/tpcds-generate/3/customer 9.9 M 19.7 M /tmp/tpcds-generate/3/customer_address 76.9 M 153.8 M /tmp/tpcds-generate/3/customer_demographics 9.8 M 19.7 M /tmp/tpcds-generate/3/date_dim 191 382 /tmp/tpcds-generate/3/dbgen_version 148.1 K 296.2 K /tmp/tpcds-generate/3/household_demographics 328 656 /tmp/tpcds-generate/3/income_band 549.4 M 1.1 G /tmp/tpcds-generate/3/inventory 9.7 M 19.3 M /tmp/tpcds-generate/3/item 41.6 K 83.3 K /tmp/tpcds-generate/3/promotion 1.4 K 2.7 K /tmp/tpcds-generate/3/reason 1.1 K 2.2 K /tmp/tpcds-generate/3/ship_mode 8.1 K 16.3 K /tmp/tpcds-generate/3/store 95.1 M 190.1 M /tmp/tpcds-generate/3/store_returns 1.1 G 2.2 G /tmp/tpcds-generate/3/store_sales 4.9 M 9.7 M /tmp/tpcds-generate/3/time_dim 716 1.4 K /tmp/tpcds-generate/3/warehouse 8.5 K 16.9 K /tmp/tpcds-generate/3/web_page 28.5 M 57.0 M /tmp/tpcds-generate/3/web_returns 425.3 M 850.5 M /tmp/tpcds-generate/3/web_sales 9.1 K 18.2 K /tmp/tpcds-generate/3/web_site
<property>
<name>hive.server2.thrift.http.port</name>
<value>xxx0</value>
</property>
<property>
<name>hive.server2.thrift.port</name>
<value>xxx1</value>
</property>
不要把第一个端口误用到 beeline 里去,
否则会报如下错误
java.net.ConnectException: Connection refused (Connection refused) (state=08S01,code=0)
修复好 beeline 的链接端口,重新触发脚本即可。具体可以 直接 查看 tpcds-setup.sh 脚本实现
./tpcds-setup.sh 3
$ hadoop fs -count -h /data/parquet_tpcds_kit/1000/* 1 2 15.8 K /data/parquet_tpcds_kit/1000/call_center 1 2 1.7 M /data/parquet_tpcds_kit/1000/catalog_page 1 130 9.9 G /data/parquet_tpcds_kit/1000/catalog_returns 1 280 100.3 G /data/parquet_tpcds_kit/1000/catalog_sales 1 7 610.0 M /data/parquet_tpcds_kit/1000/customer 1 2 112.1 M /data/parquet_tpcds_kit/1000/customer_address 1 2 7.5 M /data/parquet_tpcds_kit/1000/customer_demographics 1 2 1.8 M /data/parquet_tpcds_kit/1000/date_dim 1 2 30.4 K /data/parquet_tpcds_kit/1000/household_demographics 1 2 1.2 K /data/parquet_tpcds_kit/1000/income_band 1 7 3.9 G /data/parquet_tpcds_kit/1000/inventory 1 2 27.4 M /data/parquet_tpcds_kit/1000/item 1 2 77.3 K /data/parquet_tpcds_kit/1000/promotion 1 2 2.3 K /data/parquet_tpcds_kit/1000/reason 1 2 2.6 K /data/parquet_tpcds_kit/1000/ship_mode 1 2 89.6 K /data/parquet_tpcds_kit/1000/store 1 211 16.0 G /data/parquet_tpcds_kit/1000/store_returns 1 388 125.7 G /data/parquet_tpcds_kit/1000/store_sales 1 2 1.1 M /data/parquet_tpcds_kit/1000/time_dim 1 2 5.8 K /data/parquet_tpcds_kit/1000/warehouse 1 2 57.0 K /data/parquet_tpcds_kit/1000/web_page 1 98 5.0 G /data/parquet_tpcds_kit/1000/web_returns 1 149 46.1 G /data/parquet_tpcds_kit/1000/web_sales 1 2 16.5 K /data/parquet_tpcds_kit/1000/web_site
tpch 相关 queries 分析
https://www.cnblogs.com/xibuhaohao/p/11413669.html
scala> tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "call_center", 1)
2024-04-10 17:26:20,973 [WARN] [main] package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
Generating table call_center in database to file:home/hadoop/parquet_test/1/call_center with save mode Overwrite.
List(bash, -c, cd /home/hadoop/data/tpcds-kit/tools && ./dsdgen -table call_center -filter Y -scale 1 -RNGSEED 100 )
dsdgen Population Generator (Version 2.13.0)
Copyright Transaction Processing Performance Council (TPC) 2001 - 2020
Warning: This scale factor is valid for QUALIFICATION ONLY
$ ./dsdgen -table call_center -filter Y -scale 1 -RNGSEED 100
dsdgen Population Generator (Version 2.13.0)
Copyright Transaction Processing Performance Council (TPC) 2001 - 2020
Warning: This scale factor is valid for QUALIFICATION ONLY
1|AAAAAAAABAAAAAAA|1998-01-01|||2450997|NY Metro|large|1|591|8AM-12AM|Keith Cunningham|4|Matters may hear as; profita|New, cold plants can put al|Dante Cook|3|pri|4|ese|995|Park 3rd|Dr.|Suite 470|Fairview|Williamson County|TN|35709|United States|-5|0.02|
2|AAAAAAAACAAAAAAA|1998-01-01|2000-12-31||2450876|Mid Atlantic|large|1|199|8AM-4PM|Stephen Clem|3|Classes devote largely other, standard ter|Free germans prove flatly industrial drugs. Low questions come to a equations. British, conservative|Christopher Perez|6|cally|3|pri|245|Johnson |Circle|Suite 200|Fairview|Williamson County|TN|35709|United States|-5|0.03|
3|AAAAAAAACAAAAAAA|2001-01-01|||2450876|Mid Atlantic|small|3|199|8AM-4PM|William Johnson|3|Classes devote largely other, standard ter|Ridiculous requirements must not implement about pure values. Substances know powers. Political rel|Derrick Burke|6|cally|3|pri|245|Johnson |Circle|Suite 200|Fairview|Williamson County|TN|35709|United States|-5|0.03|
4|AAAAAAAAEAAAAAAA|1998-01-01|2000-01-01||2450872|North Midwest|large|6|1638|8AM-4PM|Lamont Greene|3|Events must find anyway |Great rates must ensure famous, other banks. As main goals get home as a |Marvin Dean|2|able|2|able|927|Oak Main|ST|Suite 150|Fairview|Williamson County|TN|35709|United States|-5|0.03|
5|AAAAAAAAEAAAAAAA|2000-01-02|2001-12-31||2450872|North Midwest|medium|6|1638|8AM-12AM|Lamont Greene|3|Events must find anyway |So fresh supplies keep meanwhile religious, labour years. Rapid, careful subject|Matthew Williams|2|able|1|able|927|Oak Main|ST|Suite 150|Fairview|Williamson County|TN|35709|United States|-5|0.00|
6|AAAAAAAAEAAAAAAA|2002-01-01|||2450872|North Midwest|small|6|1638|8AM-4PM|Emilio Romano|6|As well novel sentences check through the plans.|Sophisticated cities fall for e|William Johnson|5|anti|1|able|927|Oak Main|ST|Suite 150|Fairview|Williamson County|TN|35709|United States|-5|0.07|
直接 运行dsdgen 是不产生文件的,直接吐出文本数据
tpch_dbgen 编译
def compile_tpch_dbgen(master, component, tpch_dbgen_version, tpch_dbgen_home):
print("+++++++++++++++++++++++++++++")
print("Install gcc. Downloads, compiles and compile sbt")
print("+++++++++++++++++++++++++++++")
cmd = "yum -y install make patch unzip;"
cmd += "cd " + tpch_dbgen_home + " && git checkout 0469309147b42abac8857fa61b4cf69a6d3128a8; make clean && make; cd ..;"
cmd += "tar -zcvf " + tpch_dbgen_home + "-" + tpch_dbgen_version + ".tar.gz" + " " + component + "-" + tpch_dbgen_version
ssh_execute(master, cmd)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。