赞
踩
欢迎关注笔者的微信公众号
在计算机科学中,有两种不同类型的程序:IO 密集型和 CPU 密集型。这两种程序的主要差别在于它们在执行任务时瓶颈所在的地方。
需要注意的是,大多数程序都是一种折衷的方式,即同时存在 IO 密集型和 CPU 密集型的特征。因此,在设计程序时需要适当考虑两者的平衡,以确保程序的效率和可用性。
在设计 IO 密集型程序时,需要考虑如何提高程序的并发性和线程安全性,以最大化利用外部设备的性能。下面是一些建议:
下面是一个使用 Java 实现的简单 IO 密集型程序的例子:
import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class IoIntensiveProgram { public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(10); File file = new File("data.txt"); List<String> lines = Files.readAllLines(Paths.get(file.getAbsolutePath()), StandardCharsets.UTF_8); for (String line : lines) { executorService.submit(new Callable<Void>() { @Override public Void call() throws Exception { processLine(line); return null; } }); } executorService.shutdown(); } private static void processLine(String line) throws IOException { // ... } }
如果把上面的例子改成 CPU 密集型程序,只需要把 processLine
函数内部改成大量的计算操作,就可以得到一个 CPU 密集型程序。
下面是一个使用 Java 实现的简单 CPU 密集型程序的例子:
import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CpuIntensiveProgram { public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 10000; i++) { executorService.submit(new Callable<Void>() { @Override public Void call() throws Exception { processData(); return null; } }); } executorService.shutdown(); } private static void processData() { double result = 0; for (int i = 0; i < 1000000; i++) { result += Math.sin(i); } } }
当然,上面只是一个简单的例子,实际上你可以根据你的需求来实现 CPU 密集型程序。
总的来说,如果你的程序更多地依赖于 I/O 操作,那么应该使用 IO 密集型程序;如果你的程序更多地依赖于 CPU 计算,那么应该使用 CPU 密集型程序。希望以上内容能帮助你更好地设计你的程序。
下面分享一些笔者最近工作中实际遇到的问题与经验。
笔者最近的一项工作是对大批数据进行分析并将分析后的数据生成离线图表文件。主要存在以下问题和难点:
首先在技术选型上笔者主要是python
+ plotly
+ numpy
+ pandas
。plotly
可以生成动态的离线图表文件,pandas
是python
生态最常用的数据分析工具。
最开始采取的是最朴素的方案,直接遍历每辆车的数据,挨个生成文件。但是发现每辆车的处理时间都超过10分钟有的甚至需要半小时,时间上不可接受。最终的方案就是采用多线程同时处理多个文件。但如何确定最佳的线程数其实也踩了很多坑。
如何针对不同类型的程序确定线程数网上有很多介绍,概括来说就是:
t h r e a d _ n u m = c p u _ n u m + 1 thread\_num = cpu\_num+1 thread_num=cpu_num+1,+1是为了预防有个线程被阻塞,cpu可以调用其他线程。
对于IO密集型程序,线程数的确定有两种说法:
最佳线程数 = (1/CPU利用率) = 1 + (I/O耗时/CPU耗时);(一般为2*CPU核心数)
最佳线程数 = CPU核心数/(1-阻塞系数),阻塞系数一般为0.8~0.9之间,也可以取0.8或者0.9。
从实际应用来看,线程数的多少没有确定公式可遵循,具体得看设备的硬件情况和程序要求。下面两张图分别是线程数=2*CPU数和线程数 = CPU数/(1-阻塞系数)的系统监控图。当线程数较小时,没有充分发挥系统能力,当然这不会有什么问题,只是可能耗费较多时间。但是当线程数过多时,系统内存占用急速上升,资源不够导致程序奔溃。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4jeTmM3F-1675827491534)(https://itbird.oss-cn-beijing.aliyuncs.com/img/2023/02/07/IO%E5%AF%86%E9%9B%86%E5%9E%8B3.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fbxU5oc3-1675827491535)(https://itbird.oss-cn-beijing.aliyuncs.com/img/2023/02/07/IO%E5%AF%86%E9%9B%86%E5%9E%8B2-16757806087952.png)]
因此,线程数的确定并不是一个固定的数值,而是需要根据系统的实际情况进行调整的。因此,最好的方法是通过测试和实验来确定线程数
import multiprocessing import threading import os import glob import numpy as np import pandas as pd cpu_num = multiprocessing.cpu_count() block_coef = 0.9 thread_num = int(np.ceil(cpu_num/(1-block_coef))) def merge(files: list = [], is_old=True): if files is None or files == []: return file_name = os.path.basename(os.path.dirname(files[0])) if is_old: file_name = file_name + '-1.csv' else: file_name = file_name + '-2.csv' df = pd.DataFrame() for f in files: print(f) tmp = pd.read_excel(f) df = pd.concat([df, tmp]) df.to_csv('./output/'+file_name, index=None, encoding='utf-8') def resolve(batch: list = []): """ 每个线程负责处理一批数据 """ for b in batch: merge(b) if __name__ == '__main__': dirs = glob.glob('./raw_data/*') file_list = [] for dir in dirs: file_list.append([os.path.join(dir, i) for i in os.listdir(dir)]) file_list = sorted(file_list) batch_size = int(len(dirs) / thread_num) split = int(np.ceil(len(dirs) / batch_size)) for i in range(split): threading.Thread(target=resolve, args=(file_list[i * batch_size:(i + 1) * batch_size],)).start()
df = util.get_dataset(car_id)
# 解析时间格式字段
df["上报时间"] = pd.to_datetime(df["上报时间"])
df['压差'] = df['电池单体电压最高值'] - df['电池单体电压最低值']
统计不同区间内的数据量
import pandas as pd import numpy as np pd.set_option('display.max_rows', None) df = pd.read_csv(r"xxx.csv") df["上报时间"] = pd.to_datetime(df["上报时间"]) df.set_index(df["上报时间"], inplace=True) df['压差'] = (df['电池单体电压最高值'] - df['电池单体电压最低值'])*1000 bins = [-1e-10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 120, 140, 160, 180, 200, np.inf] result = pd.cut(df['压差'], bins=bins,right=True) # 全量数据的区间统计 result.value_counts() # 按月区间统计 d = pd.cut(data['压差'], bins=bins,right=True).groupby([data.index.year, data.index.month]).value_counts() d.index.names = ['year', 'month', 'group'] d = pd.DataFrame(d) # 查询2022年6月的数据情况 d.loc[2022].loc[6]['压差']
绘制图表,控制显示或者保存
import os import plotly import plotly.graph_objects as go def plot_fig(data, x="上报时间", y=None, y2=None, title='', mode='lines', to_html=False, carid=''): fig = go.Figure() # fig = px.line(data, x=x, y=y) if type(y) == str: fig.add_trace(go.Scatter(x=data[x], y=data[y], name=y, mode=mode)) if y == '压差': fig.update_layout(yaxis=dict(range=[data[y].min(), 0.5])) else: for col in y: fig.add_trace(go.Scatter(x=data[x], y=data[col], name=col, mode=mode)) if y2 is not None: # 设置双y轴 fig.add_trace(go.Scatter(x=data[x], y=data[y2], xaxis="x", yaxis="y2", name=y2)) fig.update_layout(yaxis2=dict(anchor='x', overlaying='y', side='right')) fig.update_xaxes( rangeslider_visible=True, rangeselector=dict( buttons=list( [ dict(count=1, label="1月", step="month", stepmode="backward"), dict(count=6, label="半年", step="month", stepmode="backward"), dict(count=1, label="今年", step="year", stepmode="todate"), dict(count=1, label="1年", step="year", stepmode="backward"), dict(step="all"), ] ) ), type="date", tickformat='%Y-%m-%d %H:%M:%S' ) fig.update_layout(width=1500, height=600, title=carid + "-" + title) if to_html: parent_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'output', carid) print(parent_dir) if not os.path.exists(parent_dir): os.mkdir(parent_dir) filename = parent_dir + '/' + title + '.html' # plotly.io.write_html(fig, filename) # 关闭自动用浏览器打开,真坑爹,浏览器莫名其妙打开 plotly.offline.plot(fig, filename=filename, auto_open=False) else: fig.show()
好像快一年没写文章了。。。最近整理下过去一年工作和科研方面的成果和经验与大家分享。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。