当前位置:   article > 正文

【spark源码系列】DataType原理方法示例源码详解_spark datetype

spark datetype

spark源码系列】DataType原理方法示例源码详解

原理

Spark 的 DataType 是用于描述和操作数据类型的类。它是 Spark SQL 中的一个重要概念,DataType是所有Spark SQL数据类型的基类,用于表示表的列以及表达式的返回类型。

在 Spark 中,DataType 主要有两个作用:

  1. 描述列的数据类型:在创建表时,可以使用 DataType 来指定每个列的数据类型,例如整数、字符串、布尔值等。通过指定正确的数据类型,Spark 可以进行正确的数据解析和类型检查,确保数据的一致性和准确性。

  2. 表达式的返回类型:在 Spark SQL 中,可以使用表达式进行数据转换和计算。每个表达式都有一个返回类型,该类型由其输入类型和操作决定。DataType 用于表示这些表达式的返回类型,以便在计算过程中进行类型检查和结果处理。

Spark 的 DataType 类型层次结构是根据 SQL 标准中的数据类型分层设计的。它包括基本数据类型(如整数、字符串、布尔值)和复合数据类型(如数组、结构体、映射)。对于不同的数据类型,Spark 提供了相应的子类来实现具体的功能和行为。

Spark 的 DataType 采用了面向对象的设计原则,通过继承和多态来实现不同数据类型的共享方法和属性。通过这种方式,可以方便地操作和处理不同的数据类型,并保证了代码的可扩展性和灵活性。

另外,Spark 还提供了一些工具方法和函数,用于创建、转换和操作 DataType。例如,可以使用 DataTypes.createStructType() 创建结构体类型,使用 DataTypes.IntegerType 获取整数类型的实例,使用 DataType.typeName 获取数据类型的名称等。

总之,Spark 的 DataType 是一个重要的组件,用于描述和操作数据类型。它提供了一种统一的方式来处理不同的数据类型,并保证了数据的一致性和正确性。通过使用 DataType,可以更好地开发和维护 Spark SQL 应用程序,并在数据处理过程中获得更高的效率和准确性。

属性

  • typeName: 数据类型在JSON序列化中使用的名称。
  • json: 数据类型的紧凑JSON表示形式。
  • prettyJson: 数据类型的漂亮(缩进)JSON表示形式。
  • simpleString: 数据类型的可读字符串表示形式。
  • catalogString: 数据类型在外部目录中保存的字符串表示形式。
  • sql: 数据类型的SQL字符串表示形式。

方法

  • sameType(other: DataType): Boolean: 检查thisother是否相同类型,忽略了nullability的差异。
  • asNullable: DataType: 返回相同类型但将所有nullability字段设置为true的数据类型。
  • existsRecursively(f: (DataType) => Boolean): Boolean: 如果此DataType树的任何DataType满足给定函数f,则返回true。
  • canWrite(write: DataType, read: DataType, byName: Boolean, resolver: Resolver, context: String, storeAssignmentPolicy: StoreAssignmentPolicy.Value, addError: String => Unit): Boolean: 返回写入数据类型是否可以使用读取数据类型进行读取。

伴生对象

  • fromDDL(ddl: String): DataType: 从DDL字符串解析数据类型。
  • fromJson(json: String): DataType: 从JSON字符串解析数据类型。

示例

DoubleType

/**
 * 表示 `Double` 值的数据类型。请使用单例对象 `DataTypes.DoubleType`。
 *
 * @since 1.3.0
 */
@Stable
class DoubleType private() extends FractionalType {
   
  // 将伴生对象和该类分开,以便伴生对象也可以继承此类型。否则,伴生对象在字节码中将是 "DoubleType$" 类型。
  // 定义私有构造函数,以便只能通过伴生对象进行实例化。
  private[sql] type InternalType = Double
  @transient private[sql] lazy val tag = typeTag[InternalType]
  private[sql] val numeric = implicitly[Numeric[Double]]
  private[sql] val fractional = implicitly[Fractional[Double]]
  private[sql] val ordering =
    (x: Double, y: Double) => SQLOrderingUtil.compareDoubles(x, y)
  private[sql] val asIntegral = DoubleType.DoubleAsIfIntegral

  override private[sql] def exactNumeric = DoubleExactNumeric

  /**
   * DoubleType 的默认值大小为 8 字节。
   */
  override def defaultSize: Int = 8

  private[sql] override def physicalDataType: PhysicalDataType = PhysicalDoubleType

  private[spark] override def asNullable: DoubleType = this
}

/**
 * @since 1.3.0
 */
@Stable
case object DoubleType extends DoubleType {
   

  // 以下特质从 Scala 2.12 中复制过来;在 2.13 中不存在
  // TODO: SPARK-30011 在 Scala 2.12 支持被删除后重新审视
  trait DoubleIsConflicted extends Numeric[Double] {
   
    def plus(x: Double, y: Double): Double = x + y
    def minus(x: Double, y: Double): Double = x - y
    def times(x: Double, y: Double): Double = x * y
    def negate(x: Double): Double = -x
    def fromInt(x: Int): Double = x.toDouble
    def toInt(x: Double): Int = x.toInt
    def toLong(x: Double): Long = x.toLong
    def toFloat(x: Double): Float = x.toFloat
    def toDouble(x: Double): Double = x
    // 数值基类特质中的逻辑不正确处理 abs(-0.0)
    override def abs(x: Double): Double = math.abs(x)
    // 从 Scala 2.13 中添加的;在 2.12 中不要覆盖以便能在 2.12 中工作
    def parseString(str: String): Option[Double] =
      Try(java.lang.Double.parseDouble(str)).toOption

  }

  trait DoubleAsIfIntegral extends DoubleIsConflicted with Integral[Double] {
   
    def quot(x: Double, y: Double): Double = (BigDecimal(x) quot BigDecimal(y)).doubleValue
    def rem(x: Double, y: Double): Double = (BigDecimal(x) remainder BigDecimal(y)).doubleValue
  }

  object DoubleAsIfIntegral extends DoubleAsIfIntegral {
   
    override def compare(x: Double, y: Double): Int = java.lang.Double.compare(x, y)
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

上述代码定义了 DoubleType 类和伴生对象,用于表示 Double 值的数据类型。

以下是这些类和对象的一些关键信息:

  • DoubleType 类继承自 FractionalType 类,表示分数类型的数据。它定义了 InternalType 类型为 Double,并具有相应的类型标签和排序规则。
  • DoubleType 类还定义了数值操作和精确数字操作的实现,并提供了默认大小、物理数据类型和可空性的设置。
  • DoubleType 对象是 DoubleType 类的单例对象,继承了 DoubleType 类的所有功能,并且还实现了一些特质。
  • DoubleType.DoubleIsConflicted 特质定义了 Double 值的数学运算,包括加法、减法、乘法等。它还重写了 abs 方法以正确处理 -0.0
  • DoubleType.DoubleAsIfIntegral 特质是 DoubleIsConflicted 特质的子特质,并进一步定义了 Double 值的整数运算,包括商和余数的计算。
  • DoubleType.DoubleAsIfIntegral 对象是 DoubleAsIfIntegral 特质的单例对象,并实现了比较方法来进行 Double 值之间的比较。

这些类和对象的目的是为了提供对 Double 数据类型的支持和操作,包括数值计算、比较和转换等。它们通过定义和实现相应的方法和特质,使得在程序中可以方便地使用和处理 Double 类型的数据。

IntegerType

/**
 * 表示 `Int` 值的数据类型。请使用单例对象 `DataTypes.IntegerType`。
 *
 * @since 1.3.0
 */
@Stable
class IntegerType private() extends IntegralType {
   
  // 将伴生对象和该类分开,以便伴生对象也可以继承此类型。否则,伴生对象在字节码中将是 "IntegerType$" 类型。
  // 定义私有构造函数,以便只能通过伴生对象进行实例化。
  private[sql] type InternalType = Int
  @transient private[sql] lazy val tag = typeTag[InternalType]
  private[sql] val numeric = implicitly[Numeric[Int]]
  private[sql] val integral = implicitly[Integral[Int]]
  private[sql] val ordering = implicitly[Ordering[InternalType]]
  override private[sql] val exactNumeric = IntegerExactNumeric

  /**
   * IntegerType 的默认值大小为 4 字节。
   */
  override def defaultSize: Int = 4

  private[sql] override def physicalDataType: PhysicalDataType = PhysicalIntegerType

  override def simpleString: String = "int"

  private[spark] override def asNullable: IntegerType = this
}

/**
 * @since 1.3.0
 */
@Stable
case object IntegerType extends IntegerType
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

上述代码定义了 IntegerType 类和伴生对象,用于表示 Int 值的数据类型。以下是这些类和对象的一些关键信息:

  • IntegerType 类继承自 IntegralType 类,表示整数类型的数据。它定义了 InternalType 类型为 Int,并具有相应的类型标签、数值操作和整数操作。
  • IntegerType 类还定义了默认大小、物理数据类型和可空性的设置。
  • IntegerType 对象是 IntegerType 类的单例对象,并继承了 IntegerType 类的所有功能。

这些类和对象的目的是为了提供对 Int 数据类型的支持和操作,包括数值计算、比较和转换等。它们通过定义和实现相应的方法和属性,使得在程序中可以方便地使用和处理 Int 类型的数据。

BooleanType

/**
 * 表示 `Boolean` 值的数据类型。请使用单例对象 `DataTypes.BooleanType`。
 *
 * @since 1.3.0
 */
@Stable
class BooleanType private() extends AtomicType {
   
  // 将伴生对象和该类分开,以便伴生对象也可以继承此类型。否则,伴生对象在字节码中将是 "BooleanType$" 类型。
  // 定义私有构造函数,以便只能通过伴生对象进行实例化。
  private[sql] type InternalType = Boolean
  @transient private[sql] lazy val tag = typeTag[InternalType]
  private[sql] val ordering = implicitly[Ordering[InternalType]]

  /**
   * BooleanType 的默认值大小为 1 字节。
   */
  override def defaultSize: Int = 1

  private[sql] override def physicalDataType: PhysicalDataType = PhysicalBooleanType

  private[spark] override def asNullable: BooleanType = this
}

/**
 * @since 1.3.0
 */
@Stable
case object BooleanType extends BooleanType
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

上述代码定义了 BooleanType 类和伴生对象,用于表示 Boolean 值的数据类型。以下是这些类和对象的一些关键信息:

  • BooleanType 类继承自 AtomicType 类,表示布尔类型的数据。它定义了 InternalType 类型为 Boolean,并具有相应的排序规则。
  • BooleanType 类还定义了默认大小、物理数据类型和可空性的设置。
  • BooleanType 对象是 BooleanType 类的单例对象,并继承了 BooleanType 类的所有功能。

这些类和对象的目的是为了提供对 Boolean 数据类型的支持和操作,包括比较、逻辑运算等。它们通过定义和实现相应的方法和属性,使得在程序中可以方便地使用和处理 Boolean 类型的数据。

ArrayType

/**
 * ArrayType 的伴生对象。
 *
 * @since 1.3.0
 */
@Stable
object ArrayType extends AbstractDataType {
   
  /**
   * 使用给定的元素类型构造一个 [[ArrayType]] 对象。`containsNull` 默认为 true。
   */
  def apply(elementType: DataType): ArrayType = ArrayType(elementType, containsNull = true)

  override private[sql] def defaultConcreteType: DataType = ArrayType(NullType, containsNull = true)

  override private[sql] def acceptsType(other: DataType): Boolean = {
   
    other.isInstanceOf[ArrayType]
  }

  override private[spark] def simpleString: String = "array"
}

/**
 * 多个值的集合的数据类型。
 * 在内部,它们表示为包含 ``scala.collection.Seq`` 的列。
 *
 * 请使用 `DataTypes.createArrayType()` 来创建具体的实例。
 *
 * [[ArrayType]] 对象由两个字段组成,`elementType: [[DataType]]` 和 `containsNull: Boolean`。
 * `elementType` 字段用于指定数组元素的类型。
 * `containsNull` 字段用于指定数组是否可以包含 `null` 值。
 *
 * @param elementType 值的数据类型。
 * @param containsNull 指示数组是否可以包含 `null` 值。
 *
 * @since 1.3.0
 */
@Stable
case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataType {
   

  /** 无参构造函数用于 kryo 序列化。 */
  protected def this() = this(null, false)

  private[sql] def buildFormattedString(
      prefix: String,
      stringConcat: StringConcat,
      maxDepth: Int): Unit = {
   
    if (maxDepth > 0) {
   
      stringConcat.append(
        s"$prefix-- element: ${
     elementType.typeName} (containsNull = $containsNull)\n")
      DataType.buildFormattedString(elementType, s"$prefix    |", stringConcat, maxDepth)
    }
  }

  override private[sql] def jsonValue =
    ("type" -> typeName) ~
      ("elementType" -> elementType.jsonValue) ~
      ("containsNull" -> containsNull)

  /**
   * ArrayType 的默认值大小为元素类型的默认大小。
   * 我们假设数组中平均只有 1 个元素。参见 SPARK-18853。
   */
  override def defaultSize: Int = 1 * elementType.defaultSize

  private[sql] override def physicalDataType: PhysicalDataType =
    PhysicalArrayType(elementType, containsNull)

  override def simpleString: String = s"array<${
     elementType.simpleString}>"

  override def catalogString: String = s"array<${
     elementType.catalogString}>"

  override def sql: String = s"ARRAY<${
     elementType.sql}>"

  override private[spark] def asNullable: ArrayType =
    ArrayType(elementType.asNullable, containsNull = true)

  override private[spark] def existsRecursively(f: (DataType) => Boolean): Boolean = {
   
    f(this) || elementType.existsRecursively(f)
  }

  @transient
  private[sql] lazy val interpretedOrdering: Ordering[ArrayData] = new Ordering[ArrayData] {
   
    private[this] val elementOrdering: Ordering[Any] = elementType match {
   
      case dt: AtomicType => dt.ordering.asInstanceOf[Ordering[Any]]
      case a : ArrayType => a.interpretedOrdering.asInstanceOf[Ordering[Any]]
      case s: StructType => s.interpretedOrdering.asInstanceOf[Ordering[Any]]
      case other =>
        throw new IllegalArgumentException(
          s"Type ${
     other.catalogString} does not support ordered operations")
    }

    def compare(x: ArrayData
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/603676
推荐阅读
相关标签
  

闽ICP备14008679号