当前位置:   article > 正文

Pysaprk Notes:pyspark sql model_# this sparkcontext may be an existing one.

# this sparkcontext may be an existing one.

sparksession:使用数据集或DataFrameAPI进行spark编程的入口点。

sparksession可以用来创建DateFrame,将DataFrame当作数据表数据,可以在这个数据表上执行sql语句,也可以缓存数据表,从本地读取文件等。

用下面的方法来创建一个sparksession

  1. >>> spark = SparkSession.builder \\
  2. ... .master("local") \\
  3. ... .appName("Word Count") \\
  4. ... .config("spark.some.config.option", "some-value") \\
  5. ... .getOrCreate()

sparksession里面的内部类:Builder

config 函数:配置函数,通过该函数可以自定义配置项或使用SparkConf中的配置。

对于一个存在的SparkConf,可以使用conf参数进行配置

 

使用键值对进行配置,可以省略参数的名字

对于config函数里面配置的值到底是取SparkConf中的conf,还是用户自定义的键值对。

一个加锁操作内,判断conf参数是否为None,若是,则配置内容应该为用户自定义的内容,否则,则是sparkconf里面的配置。

最后返回一个实例对象

 

关于return self :返回的是这个函数所引用的实例对象。

Returning self from a method simply means that your method returns a reference to the instance object on which it was called. This can sometimes be seen in use with object oriented APIs that are designed as a fluent interface that encourages method cascading.
 


appName函数:

设置应用的名字,若没有设定应用的名字的话,会随机生成一个。 


getOrCreate函数:获取已经 存在的sparksession,如果当前没有sparksession存在,则创建一个。

具体看源码

  1. def getOrCreate(self):
  2. """Gets an existing :class:`SparkSession` or, if there is no existing one, creates a
  3. new one based on the options set in this builder.
  4. This method first checks whether there is a valid global default SparkSession, and if
  5. yes, return that one. If no valid global default SparkSession exists, the method
  6. creates a new SparkSession and assigns the newly created SparkSession as the global
  7. default.
  8. >>> s1 = SparkSession.builder.config("k1", "v1").getOrCreate()
  9. >>> s1.conf.get("k1") == s1.sparkContext.getConf().get("k1") == "v1"
  10. True
  11. In case an existing SparkSession is returned, the config options specified
  12. in this builder will be applied to the existing SparkSession.
  13. >>> s2 = SparkSession.builder.config("k2", "v2").getOrCreate()
  14. >>> s1.conf.get("k1") == s2.conf.get("k1")
  15. True
  16. >>> s1.conf.get("k2") == s2.conf.get("k2")
  17. True
  18. """
  19. with self._lock:
  20. from pyspark.context import SparkContext
  21. from pyspark.conf import SparkConf
  22. session = SparkSession._instantiatedSession
  23. if session is None or session._sc._jsc is None:
  24. sparkConf = SparkConf()
  25. for key, value in self._options.items():
  26. sparkConf.set(key, value)
  27. sc = SparkContext.getOrCreate(sparkConf)
  28. # This SparkContext may be an existing one.
  29. for key, value in self._options.items():
  30. # we need to propagate the confs
  31. # before we create the SparkSession. Otherwise, confs like
  32. # warehouse path and metastore url will not be set correctly (
  33. # these confs cannot be changed once the SparkSession is created).
  34. sc._conf.set(key, value)
  35. session = SparkSession(sc)
  36. for key, value in self._options.items():
  37. session._jsparkSession.sessionState().conf().setConfString(key, value)
  38. for key, value in self._options.items():
  39. session.sparkContext._conf.set(key, value)
  40. return session
  1. class Builder(object):
  2. """Builder for :class:`SparkSession`.
  3. """
  4. _lock = RLock()#加锁处理
  5. _options = {}
  6. @since(2.0)
  7. def config(self, key=None, value=None, conf=None):
  8. """Sets a config option. Options set using this method are automatically propagated to
  9. both :class:`SparkConf` and :class:`SparkSession`'s own configuration.
  10. For an existing SparkConf, use `conf` parameter.
  11. >>> from pyspark.conf import SparkConf
  12. >>> SparkSession.builder.config(conf=SparkConf())
  13. <pyspark.sql.session...
  14. For a (key, value) pair, you can omit parameter names.
  15. >>> SparkSession.builder.config("spark.some.config.option", "some-value")
  16. <pyspark.sql.session...
  17. :param key: a key name string for configuration property
  18. :param value: a value for configuration property
  19. :param conf: an instance of :class:`SparkConf`
  20. """
  21. with self._lock:
  22. if conf is None:
  23. self._options[key] = str(value)
  24. else:
  25. for (k, v) in conf.getAll():
  26. self._options[k] = v
  27. return self
  28. @since(2.0)
  29. def master(self, master):
  30. """Sets the Spark master URL to connect to, such as "local" to run locally, "local[4]"
  31. to run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone
  32. cluster.
  33. :param master: a url for spark master
  34. """
  35. return self.config("spark.master", master)
  36. @since(2.0)
  37. def appName(self, name):
  38. """Sets a name for the application, which will be shown in the Spark web UI.
  39. If no application name is set, a randomly generated name will be used.
  40. :param name: an application name
  41. """
  42. return self.config("spark.app.name", name)
  43. @since(2.0)
  44. def enableHiveSupport(self):
  45. """Enables Hive support, including connectivity to a persistent Hive metastore, support
  46. for Hive serdes, and Hive user-defined functions.
  47. """
  48. return self.config("spark.sql.catalogImplementation", "hive")
  49. @since(2.0)
  50. def getOrCreate(self):
  51. """Gets an existing :class:`SparkSession` or, if there is no existing one, creates a
  52. new one based on the options set in this builder.
  53. This method first checks whether there is a valid global default SparkSession, and if
  54. yes, return that one. If no valid global default SparkSession exists, the method
  55. creates a new SparkSession and assigns the newly created SparkSession as the global
  56. default.
  57. >>> s1 = SparkSession.builder.config("k1", "v1").getOrCreate()
  58. >>> s1.conf.get("k1") == s1.sparkContext.getConf().get("k1") == "v1"
  59. True
  60. In case an existing SparkSession is returned, the config options specified
  61. in this builder will be applied to the existing SparkSession.
  62. >>> s2 = SparkSession.builder.config("k2", "v2").getOrCreate()
  63. >>> s1.conf.get("k1") == s2.conf.get("k1")
  64. True
  65. >>> s1.conf.get("k2") == s2.conf.get("k2")
  66. True
  67. """
  68. with self._lock:
  69. from pyspark.context import SparkContext
  70. from pyspark.conf import SparkConf
  71. session = SparkSession._instantiatedSession
  72. if session is None or session._sc._jsc is None:
  73. sparkConf = SparkConf()
  74. for key, value in self._options.items():
  75. sparkConf.set(key, value)
  76. sc = SparkContext.getOrCreate(sparkConf)
  77. # This SparkContext may be an existing one.
  78. for key, value in self._options.items():
  79. # we need to propagate the confs
  80. # before we create the SparkSession. Otherwise, confs like
  81. # warehouse path and metastore url will not be set correctly (
  82. # these confs cannot be changed once the SparkSession is created).
  83. sc._conf.set(key, value)
  84. session = SparkSession(sc)
  85. for key, value in self._options.items():
  86. session._jsparkSession.sessionState().conf().setConfString(key, value)
  87. for key, value in self._options.items():
  88. session.sparkContext._conf.set(key, value)
  89. return session

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/955435
推荐阅读
  

闽ICP备14008679号