赞
踩
c++ cpp 并行计算筛选过滤 裁决文书网1985-2021 的300g数据
数据
数据解压以后大概300g,最开始是使用python代码进行计算,但是python实在太慢了,加上多进程也不行,
于是 使用c++ 进行 计算
c++这块最开始使用的是 i7-9700h 用的是单线程,使用三个程序程序同步计算,大概需要2-3个小时的样子。
改成c++多线程,电脑换成i9-12900h 20核心的,就快得多了。
最多也就是30分钟
解压的时候,是使用python调用zip,多进程进行解压,python解压代码如下:
python代码写的也能看出来,最开始其实就是用单线程,但是也是很慢,不过我用的多进程的时候,也是慢,原因是 磁盘io到达最大,这里教训我以后要尽量找个磁盘io大的程序
import traceback import os import shutil import sys def get_all_file(path): # import os return_list=[] for root,dirs,files in os.walk(path,topdown=True): for file_one in files: use_path=root+'/'+file_one return_list.append(use_path.replace('/','\\')) return return_list # w文件区域。 main_path=os.getcwd() # exe文件存放的路径。 import os import zipfile # 定义zip文件所在的目录 zip_dir = './3669万专利申请全量数据1985-2022年' zip_dir = './zhongguo_caijue_ziliaoku/main_zip' # 遍历目录下的所有文件 # for file_name in get_all_file(zip_dir): # if file_name.endswith('.zip'): # # 构建zip文件的完整路径 # # zip_path = os.path.join(zip_dir, file_name) # zip_path= file_name # # 创建一个与zip文件同名的文件夹来存放解压后的文件 # output_dir = os.path.splitext(zip_path)[0] # os.makedirs(output_dir, exist_ok=True) # # 打开zip文件 # with zipfile.ZipFile(zip_path, 'r') as zip_ref: # # 解压zip文件到指定的输出目录 # zip_ref.extractall(output_dir) # print(f'Successfully extracted {file_name} to {output_dir}') def get_zip_fuc(file_name,mi): if file_name.endswith('.zip'): # 构建zip文件的完整路径 # zip_path = os.path.join(zip_dir, file_name) zip_path= file_name # 创建一个与zip文件同名的文件夹来存放解压后的文件 output_dir = os.path.splitext(zip_path)[0] os.makedirs(output_dir, exist_ok=True) # 打开zip文件 with zipfile.ZipFile(zip_path, 'r') as zip_ref: # 解压zip文件到指定的输出目录 zip_ref.extractall(output_dir) print(f'Successfully extracted {mi} {file_name} to {output_dir}') import multiprocessing if __name__ == '__main__': pool = multiprocessing.Pool(processes=20) # main_len = len(node_n2_list_list) for mi,file_name in enumerate(get_all_file(zip_dir)): pool.apply_async(get_zip_fuc, args=(file_name,mi)) pool.close() pool.join()
回到c++计算这里,对于 多文件,单个文件也很大的程序,我采取的做法是比较简单的,也就是一个线程负责一个文件的计算筛选,通过线程锁来提取对应的程序。
// by guangdu wx:wo15985300747 // 有需要用c++加速计算的可以联系我,我可以给你封装为各种各样语言的实现 // 大数据处理的也可以一起聊聊 // 复杂网络也是一样哦 #include "pool_number.cpp" #include <thread> #include <iostream> #include <chrono> #include <vector> using namespace std; int cpu_number(){ // SYSTEM_INFO sysInfo; // GetSystemInfo(&sysInfo); // unsigned int numCores1 = sysInfo.dwNumberOfProcessors; // return numCores1; unsigned int numCores = std::thread::hardware_concurrency(); return numCores; } # include <iostream> #include <windows.h> #include <string> #include <basci/basci.h> // using namespace std; #include <iostream> #include <fstream> #include <vector> #include <string> #include <sstream> using namespace std; #include <iostream> #include <fstream> #include <vector> #include <string> using namespace std; vector<vector<string>> read_csv(string filename) { vector<vector<string>> data; ifstream file(filename); if (!file.is_open()) { cerr << "Failed to open file: " << filename << endl; return data; } enum State { UnquotedField, QuotedField, QuotedQuote }; State state = UnquotedField; vector<string> row; string field; char c; while (file.get(c)) { switch (state) { case UnquotedField: switch (c) { case ',': // end of field row.push_back(field); field.clear(); break; case '"': // start of quoted field state = QuotedField; break; case '\n': // end of row row.push_back(field); data.push_back(row); row.clear(); field.clear(); break; default: field.push_back(c); break; } break; case QuotedField: switch (c) { case '"': // end of quoted field state = QuotedQuote; break; default: field.push_back(c); break; } break; case QuotedQuote: switch (c) { case ',': // comma inside quotes row.push_back(field + "\""); field.clear(); state = UnquotedField; break; case '"': // escaped quote field.push_back('"'); state = QuotedField; break; case '\n': // end of row row.push_back(field); data.push_back(row); row.clear(); field.clear(); state = UnquotedField; break; default: // end of quote row.push_back(field); field.clear(); state = UnquotedField; break; } break; } } if (!field.empty()) { row.push_back(field); } if (!row.empty()) { data.push_back(row); } file.close(); // 减少复制的损耗 return std::move(data); } #include <iostream> #include <fstream> using namespace std; int getFileSize(string filePath) { ifstream file(filePath, ios::binary | ios::ate); int size = file.tellg(); file.close(); return size/(1024*1024); } #include <chrono> #include <chrono> #include <ctime> #include <iomanip> #include <sstream> #include <string> std::string getCurrentTimeStr() { // 获取当前时间 auto now = std::chrono::system_clock::now(); std::time_t time = std::chrono::system_clock::to_time_t(now); // 将时间格式化为字符串 std::stringstream ss; ss << std::put_time(std::localtime(&time), "%Y_%m_%d_%H_%M_%S"); return ss.str(); } // bool write_in(string xx, vector<string> dd_list) { // for (const auto& str : dd_list) { // if (xx.find(str) != string::npos) { // return true; // } // } // return false; // } bool write_in(string xx, const vector<string>& dd_list) { for (const string& str : dd_list) { if (xx.find(str) != string::npos) { return true; } } return false; } // #include <iostream> // #include <vector> // #include <string> // std::ostream& operator<<(std::ostream& os, const std::vector<std::string>& vec) { // os << "["; // int i=0; // for (auto it = vec.begin(); it != vec.end(); ++it) { // // if (it != vec.begin()) { // // os << ", "; // // } // os <<" "<<i<<" "<< *it<<endl; // i=i+1; // } // os << "]"; // return os; // } // void write_csv_file(string file_name,vector<vector<string>> main_list){ // } /* #include <fstream> #include <iostream> #include <vector> using namespace std; void write_csv_file(string file_name, vector<vector<string>> main_list) { ofstream file(file_name); if (file.is_open()) { for (vector<string> row : main_list) { for (int i = 0; i < row.size(); i++) { string cell = row[i]; if (cell.find_first_of(",\"\n") != string::npos) { file << "\""; for (char c : cell) { if (c == '\"') { file << "\"\""; } else { file << c; } } file << "\""; } else { file << cell; } if (i < row.size() - 1) { file << ","; } } file << "\n"; } file.close(); } else { cerr << "Unable to open file: " << file_name << endl; } } */ #include <fstream> #include <iostream> #include <vector> using namespace std; void write_csv_file(string file_name, vector<vector<string>>* main_list) { ofstream file(file_name); if (file.is_open()) { for (vector<string>& row : *main_list) { for (int i = 0; i < row.size(); i++) { string& cell = row[i]; if (cell.find_first_of(",\"\n") != string::npos) { file << "\""; for (char c : cell) { if (c == '\"') { file << "\"\""; } else { file << c; } } file << "\""; } else { file << cell; } if (i < row.size() - 1) { file << ","; } } file << "\n"; } file.close(); } else { cerr << "Unable to open file: " << file_name << endl; } } #include <iostream> #include <chrono> #include <ctime> #include <cstdlib> using namespace std; #include <iostream> #include <thread> #include <mutex> std::mutex mtx; // 定义一个互斥锁 int now_sum_id_number; vector<vector<string >> re_vector_2d_list_fuc(int fi,int main_i,int main_len,long long csv_i,string read_file,string new_path,int main_number,vector<string> str_list ){ std::unique_lock<std::mutex> lock(mtx, std::defer_lock); // 定义一个未加锁的unique_lock // 读取一个csv文件,获得其中的内容 enum State { UnquotedField, QuotedField, QuotedQuote }; vector<string> row; string field; char c; State state; auto start_time = std::chrono::high_resolution_clock::now(); std::chrono::time_point<std::chrono::high_resolution_clock> end_time; vector<vector<string >> data_list; // int main_len = main_list.len(); // for (node *p = main_list.head->next; p != main_list.head; p = p->next) { cout<<endl<<endl; cout<< main_i<<"/"<<main_len<<" "<<fi<<" "<<getFileSize(read_file)<<" "<<gbktoutf8(read_file)<<endl; main_i = main_i+1; ifstream file(read_file, std::ios::in); if (!file.is_open()) { cerr << "Failed to open file: " << read_file << endl; } state = UnquotedField; while (file.get(c)) { switch (state) { case UnquotedField: switch (c) { case ',': // end of field row.push_back(field); field.clear(); break; case '"': // start of quoted field state = QuotedField; break; case '\n': // end of row row.push_back(field); // data.push_back(row); // fi=fi+1; // if(fi%100000==0){ // end_time = std::chrono::high_resolution_clock::now(); // // cout<<"fi "<<fi<<" "<<std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count()<<" "<<row.size()<<endl; // // cout<<"fi "<<fi<<" "<<(std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count())/ 1e6 <<" "<<row.size()<<endl; // cout<<" sum_row_number "<<main_number<<" "<<fi/10000<<" time[s] "<<(std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count())/ 1e6 <<" row_size "<<row.size()<<" "<<row[10] <<endl; // // cout<<"file fix "<<fi/10000<<" "<<std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count()<<" "<<row.size()<<" "<<gbktoutf8 (p->str )<<" "<<row[10] <<endl; // // cout<<row<<endl; // // cout<<" "<<row[10]<<endl; // } if(write_in(gbktoutf8(row[10]),str_list)){ ;//执行写入 data_list.push_back(row); main_number= main_number+1; if(main_number%10000==0){ end_time = std::chrono::high_resolution_clock::now(); // cout<<"清空与整理a "<<main_number<<endl; cout<<" sum_row_number "<<main_number<<" "<<fi/10000<<" time[s] "<<(std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count())/ 1e6 <<" row_size "<<row.size()<<" "<<row[10] <<endl; lock.lock(); // 手动开启线程锁 now_sum_id_number = now_sum_id_number+1; lock.unlock(); // 手动关闭线程锁 write_csv_file(new_path +"/"+to_string(now_sum_id_number)+" "+ to_string(csv_i)+".csv",data_list); data_list.clear(); csv_i=csv_i+1; } } row.clear(); field.clear(); break; default: field.push_back(c); break; } break; case QuotedField: switch (c) { case '"': // end of quoted field state = QuotedQuote; break; default: field.push_back(c); break; } break; case QuotedQuote: switch (c) { case ',': // comma inside quotes row.push_back(field + "\""); field.clear(); state = UnquotedField; break; case '"': // escaped quote field.push_back('"'); state = QuotedField; break; case '\n': // end of row row.push_back(field); // data.push_back(row); // fi=fi+1; // if(fi%10000==0){ // end_time = std::chrono::high_resolution_clock::now(); // // cout<<"file fix "<<fi/10000<<" "<<std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count()<<" "<<row.size()<<" "<<gbktoutf8 (p->str )<<" "<<row[10] <<endl; // cout<<" sum_row_number "<<main_number<<" "<<fi/10000<<" time[s] "<<(std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count())/ 1e6 <<" row_size "<<row.size()<<" "<<row[10] <<endl; // // cout<<row<<endl; // // cout<<" "<<row[10]<<endl; // } if(write_in(gbktoutf8(row[10]),str_list)){ ;//执行写入 data_list.push_back(row); main_number= main_number+1; if(main_number%10000==0){ end_time = std::chrono::high_resolution_clock::now(); // cout<<"清空与整理a "<<main_number<<endl; cout<<" sum_row_number "<<main_number<<" "<<fi/10000<<" time[s] "<<(std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count())/ 1e6 <<" row_size "<<row.size()<<" "<<row[10] <<endl; // cout<<"清空与整理b "<<main_number<<endl; // write_csv_file(new_path +"/"+ to_string(csv_i)+".csv",data_list); lock.lock(); // 手动开启线程锁 now_sum_id_number = now_sum_id_number+1; lock.unlock(); // 手动关闭线程锁 write_csv_file(new_path +"/"+to_string(now_sum_id_number)+" "+ to_string(csv_i)+".csv",data_list); data_list.clear(); csv_i=csv_i+1; } } row.clear(); field.clear(); state = UnquotedField; break; default: // end of quote row.push_back(field); field.clear(); state = UnquotedField; break; } break; } } if (!field.empty()) { row.push_back(field); } if (!row.empty()) { fi=fi+1; // if(fi%10000==0){ // cout<<"fi "<<fi<<endl; // } // data.push_back(row); end_time = std::chrono::high_resolution_clock::now(); cout<<"file fix "<<main_number<<" "<<fi/10000<<" "<<std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count()<<" "<<row.size()<<" "<<gbktoutf8 (read_file)<<" "<<row[10] <<endl; // cout<<" "<<row[10]<<endl; if(write_in(gbktoutf8(row[10]),str_list)){ ;//执行写入 data_list.push_back(row); main_number= main_number+1; if(main_number%10000==0){ // cout<<"清空与整理c "<<main_number<<endl; // write_csv_file(new_path +"/"+ to_string(csv_i)+".csv",data_list); lock.lock(); // 手动开启线程锁 now_sum_id_number = now_sum_id_number+1; lock.unlock(); // 手动关闭线程锁 write_csv_file(new_path +"/"+to_string(now_sum_id_number)+" "+ to_string(csv_i)+".csv",data_list); data_list.clear(); csv_i=csv_i+1; } } } end_time = std::chrono::high_resolution_clock::now(); cout<<" sum_row_number "<<main_number<<" "<<fi/10000<<" time[s] "<<(std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count())/ 1e6 <<" row_size "<<row.size()<<" "<<row[10] <<endl; cout<<"file size "<<main_number<<" "<<gbktoutf8(read_file)<<endl; file.close(); // } cout<<"main_len "<<data_list.size()<<endl; // write_csv_file(new_path +"/"+ to_string(csv_i+1)+".csv",data_list); lock.lock(); // 手动开启线程锁 now_sum_id_number = now_sum_id_number+1; lock.unlock(); // 手动关闭线程锁 write_csv_file(new_path +"/"+to_string(now_sum_id_number)+" "+ to_string(csv_i)+".csv",data_list); return std::move(data_list); } int main() { // string date_str = "2021-06-29-01"; // 给定的日期字符串 // // 将日期字符串转换为时间点 // tm date_tm = {}; // strptime(date_str.c_str(), "%Y-%m-%d-%H", &date_tm); // time_t date_time = mktime(&date_tm); // auto date_point = chrono::system_clock::from_time_t(date_time); // // 获取当前时间点 // auto now_point = chrono::system_clock::now(); // // 判断当前时间是否超过给定时间点 // if (now_point > date_point) { // cout << "Current time has exceeded the given date." << endl; // exit(0); // 退出程序 // } //记录程序开始的时间是多久 auto start = std::chrono::high_resolution_clock::now(); now_sum_id_number=0; // 标记具体的线程中的进度 // 传递一个文件路径,传递一个二维数组,写入一下。 // int csv_i=0; int csv_i=26; // 十万行写入一次,传入一次,少计算数量多次 system("chcp 65001"); u_init(); cout<<"exe path: "<<main_path<<endl; vector<vector<string>> data; string read_path = main_path+utf8togbk("/读取文件"); ulist main_list =get_all_file(read_path); cout<<main_list.len()<<endl; int main_i =0; int fi=0; string write_path = main_path +utf8togbk("/写入文件"); make_file(write_path); string new_path = write_path +utf8togbk("/")+ getCurrentTimeStr(); make_file(new_path ); // 开始读取数据了,等于0 的不要,写一个函数,传递一个值,和一个数组,返回其中的内容。 vector<string> str_list; // str_list.push_back(utf8togbk("买卖合同纠纷")); // str_list.push_back("买卖合同纠纷"); vector<vector<string>> csv_list= read_csv(utf8togbk("查询词.csv")); copy_file(utf8togbk("查询词.csv").c_str(),new_path.c_str()); // cout<<<<endl; cout<<"---------------------------------------------------------------------------------------------------------"<<endl; int main_number=0; int cxi=0; string cxd=""; for(auto & dc:csv_list){ cxd = gbktoutf8(dc[0]); if(cxd!= ""){ cout<<"查询关键词序列 在其中 "<<cxi<<" :"<<cxd<<endl; str_list.push_back(cxd); }else{ cout<<"查询关键词序列 不在其中 "<<cxi<<" :"<<cxd<<endl; // str_list.push_back(cxd); } cxi=cxi+1; } cout<<"---------------------------------------------------------------------------------------------------------"<<endl; int main_len = main_list.len(); //------------------------------------------------------------------- // auto start = std::chrono::high_resolution_clock::now(); cout<<"cpu_number "<<cpu_number()<<endl; int pool_size = cpu_number(); int task_num = 23; ThreadPool threadpool(pool_size); vector<future<vector<vector<string>>>> resVec; string read_file; for (node *p = main_list.head->next; p != main_list.head; p = p->next){ read_file = p->str; resVec.emplace_back( // 分为前后两个部分,参数要对的上。 threadpool.AddTask([fi,main_i,main_len,csv_i,read_file,new_path,main_number,str_list] { return re_vector_2d_list_fuc(fi,main_i,main_len,csv_i,read_file,new_path,main_number,str_list); }) ); } int write_file_number = 0; vector<vector<vector<string>>> remain_list; /*打印每个任务的返回值*/ for (auto&& result: resVec) { remain_list.push_back(result.get()); // cout <<"data: "<< result.get() << " \n"; } for(auto dd:remain_list){ // cout<<dd<<endl; } // 获取结束时间点 auto end = std::chrono::high_resolution_clock::now(); // 计算代码执行时间(以毫秒为单位) auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count(); // 输出执行时间 std::cout << "run time:" << duration << " ms" << std::endl; cout << endl; system("pause"); return 0; }
c++ 运行效果,用27g大小的部分计算的,非常快 60s不到
pass 还有一个线程池文件,有需要call,我发你吧。
分析c++的 main.cpp 文件,逻辑是
获取指定文件夹下的所有文件,然后开启线程池计算,不断计算就可以了。
//转载请勿去除我的联系方式
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。