当前位置:   article > 正文

Spark TPC-DS/H Data Gen_spark-tpcds-datagen

spark-tpcds-datagen
开启 Spark-Shell

$SPARK_HOME/bin/spark-shell --master local[10] --jars {PATH}/spark-sql-perf-1.2/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar

Gen Data
drop database if exists
sql(s"drop database if exists $databaseName CASCADE")
  • 1
Gen TCPDS Parquet
val tools_path = "/opt/Beaver/tpcds-kit/tools"
val data_path = "hdfs://{IP}:9000/tpcds_parquet_tpcds_kit_1_0/1"
val database_name = "tpcds_parquet_tpcds_kit_1_0_scale_1_db"
val scale = "1"
val p = scale.toInt / 2048.0
val catalog_returns_p = (263 * p + 1).toInt
val catalog_sales_p = (2285 * p * 0.5 * 0.5 + 1).toInt
val store_returns_p = (429 * p + 1).toInt
val store_sales_p = (3164 * p * 0.5 * 0.5 + 1).toInt
val web_returns_p = (198 * p + 1).toInt
val web_sales_p = (1207 * p * 0.5 * 0.5 + 1).toInt
val format = "parquet"
val codec = "snappy"

val useDoubleForDecimal = false

val partitionTables = false
val clusterByPartitionColumns = partitionTables

import com.databricks.spark.sql.perf.tpcds.TPCDSTables
spark.sqlContext.setConf(s"spark.sql.$format.compression.codec", codec)

val tables = new TPCDSTables(spark.sqlContext, tools_path, scale, useDoubleForDecimal)

tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "call_center", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "catalog_page", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "customer", 6)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "customer_address", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "customer_demographics", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "date_dim", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "household_demographics", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "income_band", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "inventory", 6)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "item", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "promotion", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "reason", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "ship_mode", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "store", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "time_dim", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "warehouse", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "web_page", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "web_site", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "catalog_sales", catalog_sales_p)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "catalog_returns", catalog_returns_p)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "store_sales", store_sales_p)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "store_returns", store_returns_p)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "web_sales", web_sales_p)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "web_returns", web_returns_p)

tables.createExternalTables(data_path, format, database_name, overwrite = true, discoverPartitions = partitionTables)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

// “file:///” , 创建本地文件,须要三条斜杠
val data_path = “file:///home/data/tpcds_parquet_tpcds_kit_1_0/1”

Gen TPCH ORC
import com.databricks.spark.sql.perf.tpch._

val tools_path = "/opt/Beaver/tpch-dbgen"
val format = "orc"
val useDoubleForDecimal = false
val partitionTables = false
val scaleFactor = "1"

val data_path = s"hdfs://{IP}:9000/tpch_${format}_${scaleFactor}"

val numPartitions =1
val databaseName = s"tpch_${format}_${scaleFactor}_db"


val clusterByPartitionColumns = partitionTables

val tables = new TPCHTables(spark, spark.sqlContext,
    dbgenDir = tools_path,
    scaleFactor = scaleFactor,
    useDoubleForDecimal = useDoubleForDecimal,
    useStringForDate = false)

spark.sqlContext.setConf("spark.sql.files.maxRecordsPerFile", "200000000")

tables.genData(
    location = data_path,
    format = format,
    overwrite = true, // overwrite the data that is already there
    partitionTables, // do not create the partitioned fact tables
    clusterByPartitionColumns, // shuffle to get partitions coalesced into single files.
    filterOutNullPartitionValues = false, // true to filter out the partition with NULL key value
    tableFilter = "", // "" means generate all tables
    numPartitions = numPartitions) // how many dsdgen partitions to run - number of input tasks.

// Create the specified database
sql(s"drop database if exists $databaseName CASCADE")
sql(s"create database $databaseName")
// Create metastore tables in a specified database for your data.
// Once tables are created, the current database will be switched to the specified database.
tables.createExternalTables(data_path, format, databaseName, overwrite = true, discoverPartitions = false)


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
创建Metadata
Parquet create database/tables
val tools_path = "/opt/Beaver/tpcds-kit/tools"
val data_path = "hdfs://{IP}:9000/{PATH}/user/sparkuser/part_tpcds_decimal_1000/"
val database_name = "sr242_parquet_part_tpcds_decimal_1000"
val scale = "1000"
val useDoubleForDecimal = false
val format = "parquet"
val partitionTables = true
import com.databricks.spark.sql.perf.tpcds.TPCDSTables

val tables = new TPCDSTables(spark, spark.sqlContext, tools_path, scale, useDoubleForDecimal)
tables.createExternalTables(data_path, format, database_name, overwrite = true, discoverPartitions = partitionTables)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
TPCH MetaData
val data_path= "hdfs://{IP}:9000/{PATH}/user/sparkuser/tpch_nonpart_1000/"
val databaseName = "arrow_tpch_nonpart_1000"
val tables = Seq("customer", "lineitem", "nation", "orders", "part", "partsupp", "region", "supplier")
val partitionTables = false
spark.sql(s"DROP database if exists $databaseName CASCADE")
if (spark.catalog.databaseExists(s"$databaseName")) {
	println(s"$databaseName has exists!")
}else{
	spark.sql(s"create database if not exists $databaseName").show
	spark.sql(s"use $databaseName").show
	for (table <- tables) {
		if (spark.catalog.tableExists(s"$table")){
			println(s"$table has exists!")
		}else{
			spark.catalog.createTable(s"$table", s"$data_path/$table", "arrow")
		}
	}
	if (partitionTables) {
            for (table <- tables) {
                try{
                    spark.sql(s"ALTER TABLE $table RECOVER PARTITIONS").show
                }catch{
                        case e: Exception => println(e)
                }
            }
        }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
Arrow create database/tables
val data_path= "hdfs://{IP}:9000/{PATH}/part_tpcds_decimal_1000/"
val databaseName = "arrow_part_tpcds_decimal_1000"
val tables = Seq("call_center", "catalog_page", "catalog_returns", "catalog_sales", "customer", "customer_address", "customer_demographics", "date_dim", "household_demographics", "income_band", "inventory", "item", "promotion", "reason", "ship_mode", "store", "store_returns", "store_sales", "time_dim", "warehouse", "web_page", "web_returns", "web_sales", "web_site")
val partitionTables = true
spark.sql(s"DROP database if exists $databaseName CASCADE")
if (spark.catalog.databaseExists(s"$databaseName")) {
	println(s"$databaseName has exists!")
}else{
	spark.sql(s"create database if not exists $databaseName").show
	spark.sql(s"use $databaseName").show
	for (table <- tables) {
		if (spark.catalog.tableExists(s"$table")){
			println(s"$table has exists!")
		}else{
			spark.catalog.createTable(s"$table", s"$data_path/$table", "arrow")
		}
	}
	if (partitionTables) {
            for (table <- tables) {
                try{
                    spark.sql(s"ALTER TABLE $table RECOVER PARTITIONS").show
                }catch{
                        case e: Exception => println(e)
                }
            }
        }
}



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
使用ALTER 修改meta 信息
val data_path= "hdfs://{IP}:9000/{PATH}/part_tpcds_decimal_1000/"
val databaseName = "parquet_part_tpcds_decimal_1000"
val tables = Seq("call_center", "catalog_page", "catalog_returns", "catalog_sales", "customer", "customer_address", "customer_demographics", "date_dim", "household_demographics", "income_band", "inventory", "item", "promotion", "reason", "ship_mode", "store", "store_returns", "store_sales", "time_dim", "warehouse", "web_page", "web_returns", "web_sales", "web_site")

spark.sql(s"use $databaseName").show
for (table <- tables) {
    try{
        spark.sql(s"ALTER TABLE $table SET LOCATION '$data_path/$table'").show
    }catch{
            case e: Exception => println(e)
    }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
TPCDS MetaData creation
val resourcePath: String = "file:/mnt/DP_disk3/tpcds-generated/"
val fileFormat: String = "parquet"
val databaseName = "trino_tpcds"

spark.sql(s"create database if not exists $databaseName").show
spark.sql(s"use $databaseName").show
Seq(
      "call_center",
      "catalog_page",
      "catalog_returns",
      "catalog_sales",
      "customer",
      "customer_address",
      "customer_demographics",
      "date_dim",
      "household_demographics",
      "income_band",
      "inventory",
      "item",
      "promotion",
      "reason",
      "ship_mode",
      "store",
      "store_returns",
      "store_sales",
      "time_dim",
      "warehouse",
      "web_page",
      "web_returns",
      "web_sales",
      "web_site"
    ).foreach { table =>
	  spark.catalog.createTable(table, s"$resourcePath/$table", fileFormat)
        try {
          spark.catalog.recoverPartitions(table)
        } catch {
          case _: Throwable =>
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
TPCDS 一种 PwerTest
import org.apache.spark.sql.execution.debug._
import scala.io.Source
import java.io.File
import java.util.Arrays
import sys.process._
 
// Configurations:
// 103个queries所在路径
var queries = "{tpcds-queries}"
val database = "{database_name}"
 
def time[R](block: => R): R = {
    val t0 = System.nanoTime()
    val result = block    // call-by-name
    val t1 = System.nanoTime()
    println("Elapsed time: " + (t1 - t0)/1000000000.0 + " seconds")
    result
}
 
def getListOfFiles(dir: String): List[File] = {
     val d = new File(dir)
     if (d.exists && d.isDirectory) {
         // You can run a specific query by using below line.
         // d.listFiles.filter(_.isFile).filter(_.getName().contains("17.sql")).toList
         d.listFiles.filter(_.isFile).toList
     } else {
         List[File]()
     }
}
val fileLists =
  getListOfFiles(queries)
val sorted = fileLists.sortBy {
       f => f.getName match {
       case name =>
         var str = name
         str = str.replaceFirst("a", ".1")
         str = str.replaceFirst("b", ".2")
         str = str.replaceFirst(".sql", "")
         str = str.replaceFirst("q", "")
         str.toDouble
     }}
 
// Main program to run TPC-DS testing
spark.sql(s"use $database")
for (t <- sorted) {
  println(t)
  val fileContents = Source.fromFile(t).getLines.filter(!_.startsWith("--")).mkString(" ")
  println(fileContents)
  try {
    time{spark.sql(fileContents).show}
    // spark.sql(fileContents).explain
    Thread.sleep(2000)
  } catch {
    case e: Exception => None
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
统计每条query的时长
import org.apache.spark.sql.execution.debug._
import scala.io.Source
import java.io.File
import java.util.Arrays
import sys.process._
import java.io.FileWriter
 
// Configurations:
// 103个queries所在路径
var queries = "$path/tpcds-queries-nodecimal-nodate/"
val database = "tpcds_parquet_1000_db"
 
def time[R](block: => R): Double = {
    val t0 = System.nanoTime()
    val result = block    // call-by-name
    val t1 = System.nanoTime()
	val elapse = (t1 - t0)/1000000000.0
    println("Elapsed time: " + elapse + " seconds")
    elapse
}
 
def getListOfFiles(dir: String): List[File] = {
     val d = new File(dir)
     if (d.exists && d.isDirectory) {
         // You can run a specific query by using below line.
         // d.listFiles.filter(_.isFile).filter(_.getName().contains("17.sql")).toList
         d.listFiles.filter(_.isFile).toList
     } else {
         List[File]()
     }
}
val fileLists =
  getListOfFiles(queries)
val sorted = fileLists.sortBy {
       f => f.getName match {
       case name =>
         var str = name
         str = str.replaceFirst("a", ".1")
         str = str.replaceFirst("b", ".2")
         str = str.replaceFirst(".sql", "")
         str = str.replaceFirst("q", "")
         str.toDouble
     }}
 
// Main program to run TPC-DS testing
spark.sql(s"use $database")


for (t <- sorted) {
val each_round_result = new FileWriter(s"./result.csv",true) 

  println(t)
  val fileContents = Source.fromFile(t).getLines.filter(!_.startsWith("--")).mkString(" ")
  println(fileContents)
  try {
    val elapse = time{spark.sql(fileContents).show}
    // spark.sql(fileContents).explain
    Thread.sleep(2000)
	each_round_result.write(s"$t, $elapse, Success\n")
  } catch {
    case e: Exception => {
		each_round_result.write(s"$t, -1, Fail\n")
	}
  }
  each_round_result.close
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67

以下是跑具体某条SQL 的测试方式

val path = s"${path}/tpcds-queries-nodecimal-nodate/q77.sql"
val fileContents = Source.fromFile(path).getLines.filter(!_.startsWith("--")).mkString(" ")
println(fileContents)
time{spark.sql(fileContents).show}
  • 1
  • 2
  • 3
  • 4
// 单独SQL

import org.apache.spark.sql.execution.debug._
import scala.io.Source
import java.io.File
import java.util.Arrays
import sys.process._
import java.io.FileWriter
 
// Configurations:
// 103个queries所在路径
var queries = "/home/hadoop/tpcds/sql/"
val database = "cosn_parquet_tpcds_kit_1000"
 
def time[R](block: => R): Double = {
    val t0 = System.nanoTime()
    val result = block    // call-by-name
    val t1 = System.nanoTime()
    val elapse = (t1 - t0)/1000000000.0
    println("Elapsed time: " + elapse + " seconds")
    elapse
}



// Main program to run TPC-DS testing
spark.sql(s"use $database")

val path = s"${queries}/q13.sql"
val fileContents = Source.fromFile(path).getLines.filter(!_.startsWith("--")).mkString(" ")
println(fileContents)
time{spark.sql(fileContents).show}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

Note: 关于 scala的代码记录,可以参考 .scala_history

Hive 造数据方式

可以在mvn 编译之前添加源

#~/.m2/settings.xml

<mirror>
<id>aliyun</id>
<mirrorOf>central</mirrorOf>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
cd ~
git clone https://github.com/hortonworks/hive-testbench.git
cd hive-testbench/
./tpcds-build.sh
  • 1
  • 2
  • 3
  • 4
SF=3
./tpcds-setup.sh $SF
  • 1
  • 2

遇到阻塞直接查看脚本。

如遇到以下问题,直接查看数据生成目录

ls: `/tmp/tpcds-generate/3': No such file or directory
Generating data at scale factor 3.
TPC-DS text data generation complete.
Loading text data into external tables.
make: *** [date_dim] Error 1
Loading constraints
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
$ hadoop fs -du -h /tmp/tpcds-generate/3
0        0        /tmp/tpcds-generate/3/_SUCCESS
3.0 K    6.1 K    /tmp/tpcds-generate/3/call_center
1.6 M    3.1 M    /tmp/tpcds-generate/3/catalog_page
62.0 M   124.1 M  /tmp/tpcds-generate/3/catalog_returns
854.8 M  1.7 G    /tmp/tpcds-generate/3/catalog_sales
23.8 M   47.6 M   /tmp/tpcds-generate/3/customer
9.9 M    19.7 M   /tmp/tpcds-generate/3/customer_address
76.9 M   153.8 M  /tmp/tpcds-generate/3/customer_demographics
9.8 M    19.7 M   /tmp/tpcds-generate/3/date_dim
191      382      /tmp/tpcds-generate/3/dbgen_version
148.1 K  296.2 K  /tmp/tpcds-generate/3/household_demographics
328      656      /tmp/tpcds-generate/3/income_band
549.4 M  1.1 G    /tmp/tpcds-generate/3/inventory
9.7 M    19.3 M   /tmp/tpcds-generate/3/item
41.6 K   83.3 K   /tmp/tpcds-generate/3/promotion
1.4 K    2.7 K    /tmp/tpcds-generate/3/reason
1.1 K    2.2 K    /tmp/tpcds-generate/3/ship_mode
8.1 K    16.3 K   /tmp/tpcds-generate/3/store
95.1 M   190.1 M  /tmp/tpcds-generate/3/store_returns
1.1 G    2.2 G    /tmp/tpcds-generate/3/store_sales
4.9 M    9.7 M    /tmp/tpcds-generate/3/time_dim
716      1.4 K    /tmp/tpcds-generate/3/warehouse
8.5 K    16.9 K   /tmp/tpcds-generate/3/web_page
28.5 M   57.0 M   /tmp/tpcds-generate/3/web_returns
425.3 M  850.5 M  /tmp/tpcds-generate/3/web_sales
9.1 K    18.2 K   /tmp/tpcds-generate/3/web_site
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
注意 thrift 的两个端口
<property>
        <name>hive.server2.thrift.http.port</name>
        <value>xxx0</value>
    </property>

    <property>
        <name>hive.server2.thrift.port</name>
        <value>xxx1</value>
    </property>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

不要把第一个端口误用到 beeline 里去,

否则会报如下错误

java.net.ConnectException: Connection refused (Connection refused) (state=08S01,code=0)
  • 1

修复好 beeline 的链接端口,重新触发脚本即可。具体可以 直接 查看 tpcds-setup.sh 脚本实现

./tpcds-setup.sh 3
  • 1
tpcds 表解释

在这里插入图片描述

$ hadoop fs -count -h /data/parquet_tpcds_kit/1000/*
           1            2             15.8 K /data/parquet_tpcds_kit/1000/call_center
           1            2              1.7 M /data/parquet_tpcds_kit/1000/catalog_page
           1          130              9.9 G /data/parquet_tpcds_kit/1000/catalog_returns
           1          280            100.3 G /data/parquet_tpcds_kit/1000/catalog_sales
           1            7            610.0 M /data/parquet_tpcds_kit/1000/customer
           1            2            112.1 M /data/parquet_tpcds_kit/1000/customer_address
           1            2              7.5 M /data/parquet_tpcds_kit/1000/customer_demographics
           1            2              1.8 M /data/parquet_tpcds_kit/1000/date_dim
           1            2             30.4 K /data/parquet_tpcds_kit/1000/household_demographics
           1            2              1.2 K /data/parquet_tpcds_kit/1000/income_band
           1            7              3.9 G /data/parquet_tpcds_kit/1000/inventory
           1            2             27.4 M /data/parquet_tpcds_kit/1000/item
           1            2             77.3 K /data/parquet_tpcds_kit/1000/promotion
           1            2              2.3 K /data/parquet_tpcds_kit/1000/reason
           1            2              2.6 K /data/parquet_tpcds_kit/1000/ship_mode
           1            2             89.6 K /data/parquet_tpcds_kit/1000/store
           1          211             16.0 G /data/parquet_tpcds_kit/1000/store_returns
           1          388            125.7 G /data/parquet_tpcds_kit/1000/store_sales
           1            2              1.1 M /data/parquet_tpcds_kit/1000/time_dim
           1            2              5.8 K /data/parquet_tpcds_kit/1000/warehouse
           1            2             57.0 K /data/parquet_tpcds_kit/1000/web_page
           1           98              5.0 G /data/parquet_tpcds_kit/1000/web_returns
           1          149             46.1 G /data/parquet_tpcds_kit/1000/web_sales
           1            2             16.5 K /data/parquet_tpcds_kit/1000/web_site
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
tpch

tpch 相关 queries 分析
https://www.cnblogs.com/xibuhaohao/p/11413669.html

造数据原理

tpcds
scala> tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "call_center", 1)
2024-04-10 17:26:20,973 [WARN] [main] package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
Generating table call_center in database to file:home/hadoop/parquet_test/1/call_center with save mode Overwrite.
List(bash, -c, cd /home/hadoop/data/tpcds-kit/tools && ./dsdgen -table call_center -filter Y -scale 1 -RNGSEED 100 )
dsdgen Population Generator (Version 2.13.0)
Copyright Transaction Processing Performance Council (TPC) 2001 - 2020
Warning: This scale factor is valid for QUALIFICATION ONLY
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
$ ./dsdgen -table call_center -filter Y -scale 1 -RNGSEED 100
dsdgen Population Generator (Version 2.13.0)
Copyright Transaction Processing Performance Council (TPC) 2001 - 2020
Warning: This scale factor is valid for QUALIFICATION ONLY
1|AAAAAAAABAAAAAAA|1998-01-01|||2450997|NY Metro|large|1|591|8AM-12AM|Keith Cunningham|4|Matters may hear as; profita|New, cold plants can put al|Dante Cook|3|pri|4|ese|995|Park 3rd|Dr.|Suite 470|Fairview|Williamson County|TN|35709|United States|-5|0.02|
2|AAAAAAAACAAAAAAA|1998-01-01|2000-12-31||2450876|Mid Atlantic|large|1|199|8AM-4PM|Stephen Clem|3|Classes devote largely other, standard ter|Free germans prove flatly industrial drugs. Low questions come to a equations. British, conservative|Christopher Perez|6|cally|3|pri|245|Johnson |Circle|Suite 200|Fairview|Williamson County|TN|35709|United States|-5|0.03|
3|AAAAAAAACAAAAAAA|2001-01-01|||2450876|Mid Atlantic|small|3|199|8AM-4PM|William Johnson|3|Classes devote largely other, standard ter|Ridiculous requirements must not implement about pure values. Substances know powers. Political rel|Derrick Burke|6|cally|3|pri|245|Johnson |Circle|Suite 200|Fairview|Williamson County|TN|35709|United States|-5|0.03|
4|AAAAAAAAEAAAAAAA|1998-01-01|2000-01-01||2450872|North Midwest|large|6|1638|8AM-4PM|Lamont Greene|3|Events must find anyway |Great rates must ensure famous, other banks. As main goals get home as a |Marvin Dean|2|able|2|able|927|Oak Main|ST|Suite 150|Fairview|Williamson County|TN|35709|United States|-5|0.03|
5|AAAAAAAAEAAAAAAA|2000-01-02|2001-12-31||2450872|North Midwest|medium|6|1638|8AM-12AM|Lamont Greene|3|Events must find anyway |So fresh supplies keep meanwhile religious, labour years. Rapid, careful subject|Matthew Williams|2|able|1|able|927|Oak Main|ST|Suite 150|Fairview|Williamson County|TN|35709|United States|-5|0.00|
6|AAAAAAAAEAAAAAAA|2002-01-01|||2450872|North Midwest|small|6|1638|8AM-4PM|Emilio Romano|6|As well novel sentences check through the plans.|Sophisticated cities fall for e|William Johnson|5|anti|1|able|927|Oak Main|ST|Suite 150|Fairview|Williamson County|TN|35709|United States|-5|0.07|
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

直接 运行dsdgen 是不产生文件的,直接吐出文本数据

tpch

tpch_dbgen 编译

def compile_tpch_dbgen(master, component, tpch_dbgen_version, tpch_dbgen_home):
    print("+++++++++++++++++++++++++++++")
    print("Install gcc. Downloads, compiles and compile sbt")
    print("+++++++++++++++++++++++++++++")
    cmd = "yum -y install make patch unzip;"
    cmd += "cd " + tpch_dbgen_home + " && git checkout 0469309147b42abac8857fa61b4cf69a6d3128a8; make clean && make; cd ..;"
    cmd += "tar -zcvf " + tpch_dbgen_home + "-" + tpch_dbgen_version + ".tar.gz" + " " + component + "-" + tpch_dbgen_version
    ssh_execute(master, cmd)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号