赞
踩
设置 PySpark 环境,PySpark 需要一个 SparkSession 来与 Spark 功能进行交互。我们还需要配置 Spark 支持 Hive,并包含 MySQL 连接器 JAR 以实现数据库连接。
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.enableHiveSupport() \
.config("spark.driver.extraClassPath", "lib/mysql-connector-java-8.0.30.jar") \
.getOrCreate()
这里我们需要导入一个mysql jdbc的依赖jar。
我们的招聘数据存储在一个 CSV 文件中。我们将定义一个函数将这些数据读入 Spark DataFrame 中。我们将指定一个模式以确保每列的数据类型正确。
from pyspark.sql.types import StringType, StructType def read_data_from_csv(path): schema = StructType() \ .add("recruitment_positions", StringType(), True) \ .add("recruitment_city", StringType(), True) \ .add("recruitment_salary", StringType(), True) \ .add("recruitment_experience", StringType(), True) \ .add("recruitment_skills", StringType(), True) \ .add("recruitment_company", StringType(), True) \ .add("recruitment_industry", StringType(), True) \ .add("recruitment_scale", StringType(), True) df = spark.read \ .option("header", True) \ .schema(schema) \ .csv(path) return df # 示例用法: df = read_data_from_csv("../data_prepare/data.csv")
接下来,我们将对招聘数据进行一些基本的数据分析。我们将选择相关列并根据需要应用转换。
def data_analysis(df): df.createTempView("job") df = spark.sql(""" SELECT recruitment_positions, recruitment_salary, recruitment_skills AS recruitment_requirements, recruitment_experience, '本科' AS recruiting_educational_qualifications, recruitment_company, recruitment_scale AS company_stage, recruitment_industry, recruitment_skills, recruitment_city, recruitment_city AS recruitment_area, recruitment_city AS recruitment_address, recruitment_scale FROM job LIMIT 10 """) df.show() return df # 示例用法: df = data_analysis(df)
一旦我们分析了数据,可能希望将其存储在 MySQL 数据库中以进行进一步处理或报告。我们将定义一个函数将 DataFrame 写入 MySQL,导入数据之前需要创建mysql表。
CREATE TABLE `recruitment_data` ( `recruitment_data_id` int NOT NULL AUTO_INCREMENT COMMENT '招聘数据ID', `recruitment_positions` varchar(500) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘职位', `recruitment_salary` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘薪资', `recruitment_requirements` varchar(500) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘职位', `recruitment_experience` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘经历', `recruiting_educational_qualifications` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘学历', `recruitment_company` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘公司', `company_stage` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '公司阶段', `recruitment_industry` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘行业', `recruitment_skills` varchar(500) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘技能', `recruitment_city` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘城市', `recruitment_area` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘区域', `recruitment_address` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘地址', `recruitment_scale` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘规模', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (`recruitment_data_id`) ) ENGINE=InnoDB AUTO_INCREMENT=26698 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='招聘数据'
pyspark追加写入mysql
def write_data_to_mysql(df):
df.write.format("jdbc") \
.option("url", "jdbc:mysql://localhost:3306/project05928") \
.option("driver", "com.mysql.jdbc.Driver") \
.option("dbtable", "recruitment_data") \
.option("user", "root") \
.option("password", "12345678") \
.mode("append") \
.save()
# 示例用法:
write_data_to_mysql(df)
在本博客文章中,我们探讨了如何使用 PySpark 分析招聘数据。我们从设置 PySpark 环境开始,然后读取 CSV 文件中的数据,进行数据分析,最后将分析后的数据导入到 MySQL 数据库中。PySpark 的可伸缩性和性能使其成为处理大量招聘数据的理想选择。通过利用 PySpark 的功能,组织可以获得有价值的见解,优化他们的招聘流程并做出数据驱动的决策。
from pyspark.sql import SparkSession from pyspark.sql.types import StringType, StructType def read_data_from_csv(path): schema = StructType() \ .add("recruitment_positions",StringType(),True) \ .add("recruitment_city",StringType(),True) \ .add("recruitment_salary",StringType(),True) \ .add("recruitment_experience",StringType(),True) \ .add("recruitment_skills",StringType(),True) \ .add("recruitment_company",StringType(),True) \ .add("recruitment_industry",StringType(),True) \ .add("recruitment_scale",StringType(),True) df = spark.read\ .option("header", True)\ .schema(schema)\ .csv(path) return df def data_ana(df): df.createTempView("job") df = spark.sql(""" select recruitment_positions, recruitment_salary, recruitment_skills as recruitment_requirements, recruitment_experience, '本科' as recruiting_educational_qualifications, recruitment_company, recruitment_scale as company_stage, recruitment_industry, recruitment_skills, recruitment_city, recruitment_city as recruitment_area, recruitment_city as recruitment_address, recruitment_scale from job limit 10 """) df.show() return df def write_data2mysql(df): df.write.format("jdbc") \ .option("url", "jdbc:mysql://localhost:3306/project05928") \ .option("driver", "com.mysql.jdbc.Driver") \ .option("dbtable", "recruitment_data") \ .option("user", "root") \ .option("password", "12345678") \ .mode("append") \ .save() if __name__ == '__main__': spark = SparkSession.builder \ .enableHiveSupport()\ .config("spark.driver.extraClassPath","lib/mysql-connector-java-8.0.30.jar")\ .getOrCreate() df = read_data_from_csv("../data_prepare/data.csv") df = data_ana(df) write_data2mysql(df) spark.stop()
如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。