赞
踩
sparksession:使用数据集或DataFrameAPI进行spark编程的入口点。
sparksession可以用来创建DateFrame,将DataFrame当作数据表数据,可以在这个数据表上执行sql语句,也可以缓存数据表,从本地读取文件等。
用下面的方法来创建一个sparksession
- >>> spark = SparkSession.builder \\
- ... .master("local") \\
- ... .appName("Word Count") \\
- ... .config("spark.some.config.option", "some-value") \\
- ... .getOrCreate()
sparksession里面的内部类:Builder
config 函数:配置函数,通过该函数可以自定义配置项或使用SparkConf中的配置。
对于一个存在的SparkConf,可以使用conf参数进行配置
使用键值对进行配置,可以省略参数的名字
对于config函数里面配置的值到底是取SparkConf中的conf,还是用户自定义的键值对。
一个加锁操作内,判断conf参数是否为None,若是,则配置内容应该为用户自定义的内容,否则,则是sparkconf里面的配置。
最后返回一个实例对象
关于return self :返回的是这个函数所引用的实例对象。
Returning self from a method simply means that your method returns a reference to the instance object on which it was called. This can sometimes be seen in use with object oriented APIs that are designed as a fluent interface that encourages method cascading.
appName函数:
设置应用的名字,若没有设定应用的名字的话,会随机生成一个。
getOrCreate函数:获取已经 存在的sparksession,如果当前没有sparksession存在,则创建一个。
具体看源码
- def getOrCreate(self):
- """Gets an existing :class:`SparkSession` or, if there is no existing one, creates a
- new one based on the options set in this builder.
- This method first checks whether there is a valid global default SparkSession, and if
- yes, return that one. If no valid global default SparkSession exists, the method
- creates a new SparkSession and assigns the newly created SparkSession as the global
- default.
- >>> s1 = SparkSession.builder.config("k1", "v1").getOrCreate()
- >>> s1.conf.get("k1") == s1.sparkContext.getConf().get("k1") == "v1"
- True
- In case an existing SparkSession is returned, the config options specified
- in this builder will be applied to the existing SparkSession.
- >>> s2 = SparkSession.builder.config("k2", "v2").getOrCreate()
- >>> s1.conf.get("k1") == s2.conf.get("k1")
- True
- >>> s1.conf.get("k2") == s2.conf.get("k2")
- True
- """
- with self._lock:
- from pyspark.context import SparkContext
- from pyspark.conf import SparkConf
- session = SparkSession._instantiatedSession
- if session is None or session._sc._jsc is None:
- sparkConf = SparkConf()
- for key, value in self._options.items():
- sparkConf.set(key, value)
- sc = SparkContext.getOrCreate(sparkConf)
- # This SparkContext may be an existing one.
- for key, value in self._options.items():
- # we need to propagate the confs
- # before we create the SparkSession. Otherwise, confs like
- # warehouse path and metastore url will not be set correctly (
- # these confs cannot be changed once the SparkSession is created).
- sc._conf.set(key, value)
- session = SparkSession(sc)
- for key, value in self._options.items():
- session._jsparkSession.sessionState().conf().setConfString(key, value)
- for key, value in self._options.items():
- session.sparkContext._conf.set(key, value)
- return session
- class Builder(object):
- """Builder for :class:`SparkSession`.
- """
-
- _lock = RLock()#加锁处理
- _options = {}
-
- @since(2.0)
- def config(self, key=None, value=None, conf=None):
- """Sets a config option. Options set using this method are automatically propagated to
- both :class:`SparkConf` and :class:`SparkSession`'s own configuration.
- For an existing SparkConf, use `conf` parameter.
- >>> from pyspark.conf import SparkConf
- >>> SparkSession.builder.config(conf=SparkConf())
- <pyspark.sql.session...
- For a (key, value) pair, you can omit parameter names.
- >>> SparkSession.builder.config("spark.some.config.option", "some-value")
- <pyspark.sql.session...
- :param key: a key name string for configuration property
- :param value: a value for configuration property
- :param conf: an instance of :class:`SparkConf`
- """
- with self._lock:
- if conf is None:
- self._options[key] = str(value)
- else:
- for (k, v) in conf.getAll():
- self._options[k] = v
- return self
-
-
- @since(2.0)
- def master(self, master):
- """Sets the Spark master URL to connect to, such as "local" to run locally, "local[4]"
- to run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone
- cluster.
- :param master: a url for spark master
- """
- return self.config("spark.master", master)
-
-
- @since(2.0)
- def appName(self, name):
- """Sets a name for the application, which will be shown in the Spark web UI.
- If no application name is set, a randomly generated name will be used.
- :param name: an application name
- """
- return self.config("spark.app.name", name)
-
-
- @since(2.0)
- def enableHiveSupport(self):
- """Enables Hive support, including connectivity to a persistent Hive metastore, support
- for Hive serdes, and Hive user-defined functions.
- """
- return self.config("spark.sql.catalogImplementation", "hive")
-
- @since(2.0)
- def getOrCreate(self):
- """Gets an existing :class:`SparkSession` or, if there is no existing one, creates a
- new one based on the options set in this builder.
- This method first checks whether there is a valid global default SparkSession, and if
- yes, return that one. If no valid global default SparkSession exists, the method
- creates a new SparkSession and assigns the newly created SparkSession as the global
- default.
- >>> s1 = SparkSession.builder.config("k1", "v1").getOrCreate()
- >>> s1.conf.get("k1") == s1.sparkContext.getConf().get("k1") == "v1"
- True
- In case an existing SparkSession is returned, the config options specified
- in this builder will be applied to the existing SparkSession.
- >>> s2 = SparkSession.builder.config("k2", "v2").getOrCreate()
- >>> s1.conf.get("k1") == s2.conf.get("k1")
- True
- >>> s1.conf.get("k2") == s2.conf.get("k2")
- True
- """
- with self._lock:
- from pyspark.context import SparkContext
- from pyspark.conf import SparkConf
- session = SparkSession._instantiatedSession
- if session is None or session._sc._jsc is None:
- sparkConf = SparkConf()
- for key, value in self._options.items():
- sparkConf.set(key, value)
- sc = SparkContext.getOrCreate(sparkConf)
- # This SparkContext may be an existing one.
- for key, value in self._options.items():
- # we need to propagate the confs
- # before we create the SparkSession. Otherwise, confs like
- # warehouse path and metastore url will not be set correctly (
- # these confs cannot be changed once the SparkSession is created).
- sc._conf.set(key, value)
- session = SparkSession(sc)
- for key, value in self._options.items():
- session._jsparkSession.sessionState().conf().setConfString(key, value)
- for key, value in self._options.items():
- session.sparkContext._conf.set(key, value)
- return session
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。