赞
踩
统一API写法:
常见源写出:
- # cording:utf8
-
- from pyspark.sql import SparkSession
- from pyspark.sql.types import StructType, IntegerType, StringType
- import pyspark.sql.functions as F
- if __name__ == '__main__':
- spark = SparkSession.builder.\
- appName('write').\
- master('local[*]').\
- getOrCreate()
-
- sc = spark.sparkContext
-
- # 1.读取文件
- schema = StructType().add('user_id', StringType(), nullable=True).\
- add('movie_id', IntegerType(), nullable=True).\
- add('rank', IntegerType(), nullable=True).\
- add('ts', StringType(), nullable=True)
-
- df = spark.read.format('csv').\
- option('sep', '\t').\
- option('header', False).\
- option('encoding', 'utf-8').\
- schema(schema=schema).\
- load('../input/u.data')
-
- # write text 写出,只能写出一个列的数据,需要将df转换为单列df
- df.select(F.concat_ws('---', 'user_id', 'movie_id', 'rank', 'ts')).\
- write.\
- mode('overwrite').\
- format('text').\
- save('../output/sql/text')
-
- # write csv
- df.write.mode('overwrite').\
- format('csv').\
- option('sep',';').\
- option('header', True).\
- save('../output/sql/csv')
-
- # write json
- df.write.mode('overwrite').\
- format('json').\
- save('../output/sql/json')
-
- # write parquet
- df.write.mode('overwrite').\
- format('parquet').\
- save('../output/sql/parquet')
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
API写法:
注意:
①jdbc连接字符串中,建议使用useSSL=false 确保连接可以正常连接( 不使用SSL安全协议进行连接)
②jdbc连接字符串中,建议使用useUnicode=true 来确保传输中不出现乱码
③save()不要填参数,没有路径,是写出数据库
④dbtable属性:指定写出的表名
- # cording:utf8
-
- from pyspark.sql import SparkSession
- from pyspark.sql.types import StructType, IntegerType, StringType
- import pyspark.sql.functions as F
- if __name__ == '__main__':
- spark = SparkSession.builder.\
- appName('write').\
- master('local[*]').\
- getOrCreate()
-
- sc = spark.sparkContext
-
- # 1.读取文件
- schema = StructType().add('user_id', StringType(), nullable=True).\
- add('movie_id', IntegerType(), nullable=True).\
- add('rank', IntegerType(), nullable=True).\
- add('ts', StringType(), nullable=True)
-
- df = spark.read.format('csv').\
- option('sep', '\t').\
- option('header', False).\
- option('encoding', 'utf-8').\
- schema(schema=schema).\
- load('../input/u.data')
-
- # 2.写出df到MySQL数据库
- df.write.mode('overwrite').\
- format('jdbc').\
- option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=false&useUnicode=true&serverTimezone=GMT%2B8').\
- option('dbtable', 'movie_data').\
- option('user', 'root').\
- option('password', '123456').\
- save()
-
- # 读取
- df2 = spark.read.format('jdbc'). \
- option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=false&useUnicode=true&serverTimezone=GMT%2B8'). \
- option('dbtable', 'movie_data'). \
- option('user', 'root'). \
- option('password', '123456'). \
- load()
-
- # 查看读取结果
- df2.printSchema()
- df2.show()
- '''
- JDBC写出,会自动创建表的
- 因为DataFrame中的有表结构信息,StructType记录的 各个字段的名称 类型 和是否运行为空
- '''
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
保存结果:
读取结果:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。