当前位置:   article > 正文

计算机毕业设计hadoop+spark+hive物流大数据分析平台 物流预测系统 物流信息爬虫 物流大数据 机器学习 深度学习 知识图谱 大数据

计算机毕业设计hadoop+spark+hive物流大数据分析平台 物流预测系统 物流信息爬虫 物流大数据 机器学习 深度学习 知识图谱 大数据

流程:
1.Python爬虫采集物流数据等存入mysql和.csv文件;
2.使用pandas+numpy或者MapReduce对上面的数据集进行数据清洗生成最终上传到hdfs;
3.使用hive数据仓库完成建库建表导入.csv数据集;
4.使用hive之hive_sql进行离线计算,使用spark之scala进行实时计算;
5.将计算指标使用sqoop工具导入mysql;
6.使用Flask+echarts进行可视化大屏实现、数据查询表格实现、含预测算法;

核心算法代码分享如下:

  1. from flask import Flask, render_template, request, redirect, url_for
  2. import json
  3. from flask_mysqldb import MySQL
  4. from flask import Flask, send_from_directory,render_template, request, redirect, url_for, jsonify
  5. import csv
  6. import os
  7. import pymysql
  8. # 创建应用对象
  9. app = Flask(__name__)
  10. app.config['MYSQL_HOST'] = 'bigdata'
  11. app.config['MYSQL_USER'] = 'root'
  12. app.config['MYSQL_PASSWORD'] = '123456'
  13. app.config['MYSQL_DB'] = 'hive_chinawutong'
  14. mysql = MySQL(app) # this is the instantiation
  15. @app.route('/tables01')
  16. def tables01():
  17. cur = mysql.connection.cursor()
  18. cur.execute('''SELECT replace(REPLACE(REPLACE(from_province, '区', ''), '省', ''),'市','') from_province,num FROM table01''')
  19. #row_headers = [x[0] for x in cur.description] # this will extract row headers
  20. row_headers = ['from_province','num'] # this will extract row headers
  21. rv = cur.fetchall()
  22. json_data = []
  23. #print(json_data)
  24. for result in rv:
  25. json_data.append(dict(zip(row_headers, result)))
  26. return json.dumps(json_data, ensure_ascii=False)
  27. @app.route('/tables02')
  28. def tables02():
  29. cur = mysql.connection.cursor()
  30. cur.execute('''SELECT pub_time,num,LENGTH(pub_time) len_time FROM table02 ORDER BY len_time desc ''')
  31. #row_headers = [x[0] for x in cur.description] # this will extract row headers
  32. row_headers = ['pub_time','num'] # this will extract row headers
  33. rv = cur.fetchall()
  34. json_data = []
  35. #print(json_data)
  36. for result in rv:
  37. json_data.append(dict(zip(row_headers, result)))
  38. return json.dumps(json_data, ensure_ascii=False)
  39. @app.route('/tables03')
  40. def tables03():
  41. cur = mysql.connection.cursor()
  42. cur.execute('''SELECT * FROM table03 order by rztime asc''')
  43. #row_headers = [x[0] for x in cur.description] # this will extract row headers
  44. row_headers = ['rztime','num'] # this will extract row headers
  45. rv = cur.fetchall()
  46. json_data = []
  47. #print(json_data)
  48. for result in rv:
  49. json_data.append(dict(zip(row_headers, result)))
  50. return json.dumps(json_data, ensure_ascii=False)
  51. @app.route('/tables04')
  52. def tables04():
  53. cur = mysql.connection.cursor()
  54. cur.execute('''SELECT * FROM table04''')
  55. #row_headers = [x[0] for x in cur.description] # this will extract row headers
  56. row_headers = ['yslx','num'] # this will extract row headers
  57. rv = cur.fetchall()
  58. json_data = []
  59. #print(json_data)
  60. for result in rv:
  61. json_data.append(dict(zip(row_headers, result)))
  62. return json.dumps(json_data, ensure_ascii=False)
  63. @app.route("/getmapcountryshowdata")
  64. def getmapcountryshowdata():
  65. filepath = r"D:\\wuliu_hadoop_spark_spider2025\\echarts\\data\\maps\\china.json"
  66. with open(filepath, "r", encoding='utf-8') as f:
  67. data = json.load(f)
  68. return json.dumps(data, ensure_ascii=False)
  69. @app.route('/tables05')
  70. def tables05():
  71. cur = mysql.connection.cursor()
  72. cur.execute('''SELECT * FROM table05 order by num asc''')
  73. #row_headers = [x[0] for x in cur.description] # this will extract row headers
  74. row_headers = ['hwlx','num'] # this will extract row headers
  75. rv = cur.fetchall()
  76. json_data = []
  77. #print(json_data)
  78. for result in rv:
  79. json_data.append(dict(zip(row_headers, result)))
  80. return json.dumps(json_data, ensure_ascii=False)
  81. @app.route('/tables06')
  82. def tables06():
  83. cur = mysql.connection.cursor()
  84. cur.execute('''SELECT * FROM table06''')
  85. #row_headers = [x[0] for x in cur.description] # this will extract row headers
  86. row_headers = ['weight_union','num'] # this will extract row headers
  87. rv = cur.fetchall()
  88. json_data = []
  89. #print(json_data)
  90. for result in rv:
  91. json_data.append(dict(zip(row_headers, result)))
  92. return json.dumps(json_data, ensure_ascii=False)
  93. @app.route('/tables07')
  94. def tables07():
  95. cur = mysql.connection.cursor()
  96. cur.execute('''SELECT * FROM table07 order by num asc''')
  97. #row_headers = [x[0] for x in cur.description] # this will extract row headers
  98. row_headers = ['recieve_province','num'] # this will extract row headers
  99. rv = cur.fetchall()
  100. json_data = []
  101. #print(json_data)
  102. for result in rv:
  103. json_data.append(dict(zip(row_headers, result)))
  104. return json.dumps(json_data, ensure_ascii=False)
  105. @app.route('/tables08')
  106. def tables08():
  107. cur = mysql.connection.cursor()
  108. cur.execute('''SELECT * FROM table08''')
  109. #row_headers = [x[0] for x in cur.description] # this will extract row headers
  110. row_headers = ['end_time','num'] # this will extract row headers
  111. rv = cur.fetchall()
  112. json_data = []
  113. #print(json_data)
  114. for result in rv:
  115. json_data.append(dict(zip(row_headers, result)))
  116. return json.dumps(json_data, ensure_ascii=False)
  117. @app.route('/tables09')
  118. def tables09():
  119. cur = mysql.connection.cursor()
  120. cur.execute('''SELECT * FROM table09''')
  121. #row_headers = [x[0] for x in cur.description] # this will extract row headers
  122. row_headers = ['wlmc','num'] # this will extract row headers
  123. rv = cur.fetchall()
  124. json_data = []
  125. #print(json_data)
  126. for result in rv:
  127. json_data.append(dict(zip(row_headers, result)))
  128. return json.dumps(json_data, ensure_ascii=False)
  129. @app.route('/data',methods=['GET'])
  130. def data():
  131. limit = int(request.args['limit'])
  132. page = int(request.args['page'])
  133. page = (page-1)*limit
  134. conn = pymysql.connect(host='bigdata', user='root', password='123456', port=3306, db='hive_chinawutong',
  135. charset='utf8mb4')
  136. cursor = conn.cursor()
  137. if (len(request.args) == 2):
  138. cursor.execute("select count(*) from ods_chinawutong");
  139. count = cursor.fetchall()
  140. cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
  141. cursor.execute("select * from ods_chinawutong limit "+str(page)+","+str(limit));
  142. data_dict = []
  143. result = cursor.fetchall()
  144. for field in result:
  145. data_dict.append(field)
  146. else:
  147. weight_union = str(request.args['weight_union'])
  148. wlmc = str(request.args['wlmc']).lower()
  149. if(weight_union=='不限'):
  150. cursor.execute("select count(*) from ods_chinawutong where wlmc like '%"+wlmc+"%'");
  151. count = cursor.fetchall()
  152. cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
  153. cursor.execute("select * from ods_chinawutong where wlmc like '%"+wlmc+"%' limit " + str(page) + "," + str(limit));
  154. data_dict = []
  155. result = cursor.fetchall()
  156. for field in result:
  157. data_dict.append(field)
  158. else:
  159. cursor.execute("select count(*) from ods_chinawutong where wlmc like '%"+wlmc+"%' and weight_union like '%"+weight_union+"%'");
  160. count = cursor.fetchall()
  161. cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
  162. cursor.execute("select * from ods_chinawutong where wlmc like '%"+wlmc+"%' and weight_union like '%"+weight_union+"%' limit " + str(page) + "," + str(limit));
  163. data_dict = []
  164. result = cursor.fetchall()
  165. for field in result:
  166. data_dict.append(field)
  167. table_result = {"code": 0, "msg": None, "count": count[0], "data": data_dict}
  168. cursor.close()
  169. conn.close()
  170. return jsonify(table_result)
  171. if __name__ == "__main__":
  172. app.run(debug=False)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/638194
推荐阅读
相关标签
  

闽ICP备14008679号