赞
踩
PyTorch 是一个非常流行的开源机器学习库,它被广泛应用于各种深度学习和科学计算领域。
图像分类
目标检测
图像分割
生成对抗网络(GANs)
姿态估计
视频分析
文本分类
命名实体识别
机器翻译
对话系统
情感分析
文本生成
语音识别
语音合成
声音事件检测
财务预测
能源需求预测
异常检测
游戏 AI
机器人控制
资源调度优化
医疗图像分析
疾病预测
药物发现
故障检测
质量检测
工艺优化
物理模拟
天文数据分析
生物信息学
推荐系统
异常检测
知识图谱
元学习
创建、操作和转换多维张量数据。
支持基本的数学运算、索引、切片等操作。
利用 GPU 加速数值计算。
使用 nn 模块定义神经网络层。
利用 autograd 机制自动计算梯度。
支持卷积网络、循环网络等常见神经网络结构。
使用优化器进行模型参数优化。
定义损失函数并进行反向传播。
在验证集或测试集上评估模型性能。
使用 Dataset 和 DataLoader 类加载和管理数据。
支持图像、文本、时间序列等多种数据类型。
实现数据增强、归一化等预处理操作。
将训练好的模型参数保存到磁盘。
从磁盘加载预训练的模型参数。
支持部分加载和继续训练等功能。
利用 TensorBoard 进行训练过程可视化。
使用 pdb 等调试工具进行模型调试。
利用 torchviz 等工具可视化模型结构。
将训练好的模型转换为 ONNX 格式,方便在其他环境中部署。
使用 torch.jit.trace 将模型编译为 TorchScript,用于高效推理。
支持在 C++、Java 等环境中部署 PyTorch 模型。
使用 PyTorch 的 torchvision.datasets 模块读取 CIFAR-10 数据集
对图像进行随机裁剪、翻转、色彩抖动等数据增强
将图像尺寸统一为 32x32, 并将像素值归一化到[-1, 1]范围
使用 PyTorch 的 nn 模块定义一个 ResNet18 模型
修改最后一层全连接层的输出大小为 10 (CIFAR-10 有 10 个类别)
使用 Adam 优化器和交叉熵损失函数进行监督训练
每个 epoch 在验证集上评估准确率,并保存最佳模型
在测试集上计算分类准确率、F1 score 等指标
打印混淆矩阵以分析模型在不同类别上的表现
将训练好的模型转换为 ONNX 格式
使用 ONNX Runtime 部署模型进行推理
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision.models import resnet18
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
import numpy as np
import onnx
import onnxruntime
# 数据预处理
transform_train = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
transform_test = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=64, shuffle=True, num_workers=2)
testloader = torch.utils.data.DataLoader(testset, batch_size=64, shuffle=False, num_workers=2)
# 模型定义
class ResNet18(nn.Module):
def __init__(self, num_classes=10):
super(ResNet18, self).__init__()
self.resnet = resnet18(pretrained=False)
self.resnet.fc = nn.Linear(self.resnet.fc.in_features, num_classes)
def forward(self, x):
return self.resnet(x)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = ResNet18().to(device)
# 模型训练
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
best_acc = 0
for epoch in range(50):
model.train()
for images, labels in trainloader:
images, labels = images.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
model.eval()
correct = 0
total = 0
with torch.no_grad():
for images, labels in testloader:
images, labels = images.to(device), labels.to(device)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
acc = 100 * correct / total
print(f'Epoch [{epoch+1}/50], Accuracy: {acc:.2f}%')
if acc > best_acc:
best_acc = acc
torch.save(model.state_dict(), 'best_model.pth')
# 模型评估
model.load_state_dict(torch.load('best_model.pth'))
model.eval()
y_true = []
y_pred = []
with torch.no_grad():
for images, labels in testloader:
images, labels = images.to(device), labels.to(device)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
y_true.extend(labels.cpu().numpy())
y_pred.extend(predicted.cpu().numpy())
acc = accuracy_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred, average='macro')
print(f'Test Accuracy: {acc:.4f}, F1 Score: {f1:.4f}')
conf_matrix = confusion_matrix(y_true, y_pred)
print('Confusion Matrix:')
print(conf_matrix)
# 模型部署
torch.onnx.export(model, torch.randn(1, 3, 32, 32, device=device), 'cifar10_resnet18.onnx')
ort_session = onnxruntime.InferenceSession('cifar10_resnet18.onnx')
def onnx_inference(image):
input_name = ort_session.get_inputs()[0].name
output_name = ort_session.get_outputs()[0].name
result = ort_session.run([output_name], {input_name: image.cpu().numpy()})[0]
return torch.from_numpy(result)
使用 NLTK 对文本进行分词和词性标注
构建词汇表并将文本转换为数字序列
使用 GloVe 预训练词嵌入初始化词向量
使用 PyTorch 的 nn 模块定义一个双向 LSTM 模型
添加一个全连接层进行二分类 (正向/负向情感)
使用 Adam 优化器和二元交叉熵损失进行监督训练
每个 epoch 在验证集上计算 F1 score 并保存最佳模型
在测试集上计算准确率、精确率、召回率和 F1 score
打印分类报告以分析模型在不同情感类别上的表现
将训练好的模型转换为 TorchScript 格式
在 Flask 应用中部署模型进行在线情感分析
import torch
import torch.nn as nn
import torch.optim as optim
from torchtext.data import Field, TabularDataset, BucketIterator
from torchtext.vocab import GloVe
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import nltk
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer
import re
# 1. 文本预处理
# 1.1 分词和词性标注
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
tokenizer = nltk.RegexpTokenizer(r'\w+')
pos_tagger = nltk.pos_tag
# 1.2 构建词汇表和数字序列
TEXT = Field(tokenize=tokenizer.tokenize, lower=True, include_lengths=True)
LABEL = Field(dtype=torch.long)
train_data, test_data = TabularDataset.splits(
path='data/', train='train.csv', test='test.csv',
format='csv', fields=[('text', TEXT), ('label', LABEL)])
TEXT.build_vocab(train_data, vectors=GloVe(name='6B', dim=300))
LABEL.build_vocab(train_data)
# 2. 模型定义
class SentimentClassifier(nn.Module):
def __init__(self, vocab_size, embed_dim, hidden_dim, output_dim, n_layers, bidirectional, dropout):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim)
self.embedding.weight.data.copy_(TEXT.vocab.vectors)
self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers=n_layers,
bidirectional=bidirectional, dropout=dropout)
self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, text, text_lengths):
embedded = self.dropout(self.embedding(text))
packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, text_lengths)
output, (hidden, cell) = self.lstm(packed_embedded)
output, output_lengths = nn.utils.rnn.pad_packed_sequence(output)
hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1))
return self.fc(hidden)
# 3. 模型训练
train_iter, test_iter = BucketIterator.splits(
(train_data, test_data), batch_size=32, sort_within_batch=True,
sort_key=lambda x: len(x.text), device='cpu')
model = SentimentClassifier(len(TEXT.vocab), 300, 256, 1, 2, True, 0.5)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters())
best_f1 = 0
for epoch in range(10):
model.train()
for batch in train_iter:
optimizer.zero_grad()
text, text_lengths = batch.text
logits = model(text, text_lengths).squeeze(1)
loss = criterion(logits, batch.label.unsqueeze(1).float())
loss.backward()
optimizer.step()
model.eval()
with torch.no_grad():
test_preds = []
test_labels = []
for batch in test_iter:
text, text_lengths = batch.text
logits = model(text, text_lengths).squeeze(1)
test_preds.extend(torch.sigmoid(logits).round().tolist())
test_labels.extend(batch.label.tolist())
f1 = precision_recall_fscore_support(test_labels, test_preds, average='binary')[2]
print(f'Epoch {epoch+1} - Val F1 Score: {f1:.4f}')
if f1 > best_f1:
best_f1 = f1
torch.save(model.state_dict(), 'best_model.pt')
# 4. 模型评估
model.load_state_dict(torch.load('best_model.pt'))
model.eval()
with torch.no_grad():
test_preds = []
test_labels = []
for batch in test_iter:
text, text_lengths = batch.text
logits = model(text, text_lengths).squeeze(1)
test_preds.extend(torch.sigmoid(logits).round().tolist())
test_labels.extend(batch.label.tolist())
acc = accuracy_score(test_labels, test_preds)
prec, rec, f1, _ = precision_recall_fscore_support(test_labels, test_preds, average='binary')
print(f'Accuracy: {acc:.4f}')
print(f'Precision: {prec:.4f}, Recall: {rec:.4f}, F1: {f1:.4f}')
# 5. 模型部署
import torch.jit
traced_model = torch.jit.script(model)
traced_model.save('sentiment_classifier.pt')
from flask import Flask, request, jsonify
app = Flask(__name__)
model = torch.jit.load('sentiment_classifier.pt')
model.eval()
@app.route('/sentiment', methods=['POST'])
def sentiment_analysis():
text = request.json['text']
tokens = tokenizer.tokenize(text.lower())
tensor = torch.LongTensor([TEXT.vocab.stoi[token] for token in tokens])
tensor = tensor.unsqueeze(0)
text_lengths = torch.LongTensor([len(tokens)])
logits = model(tensor, text_lengths).squeeze()
sentiment = 'Positive' if torch.sigmoid(logits) >= 0.5 else 'Negative'
return jsonify({'sentiment': sentiment})
if __name__ == '__main__':
app.run(debug=True)
使用 yfinance 库获取股票历史数据
构建 time-lag 特征,如开盘价、收盘价、成交量等
将数据划分为训练集和测试集
使用 PyTorch 的 nn 模块定义一个 LSTM 时间序列模型
输入为 time-lag 特征,输出为下一个时间步的股票价格
使用 Adam 优化器和 MSE 损失函数进行监督训练
每个 epoch 在验证集上计算 RMSE 并保存最佳模型
在测试集上计算 RMSE 和 R-squared 指标
绘制实际股价和预测股价的折线图进行可视化分析
将训练好的模型转换为 TorchScript 格式
部署到 AWS Lambda 函数中提供股票价格预测 API
import yfinance as yf
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
# 1. 数据预处理
# 获取股票数据
ticker = "AAPL"
data = yf.download(ticker, start="2018-01-01", end="2023-05-20")
# 构建 time-lag 特征
data['lag_open'] = data['Open'].shift(1)
data['lag_close'] = data['Close'].shift(1)
data['lag_volume'] = data['Volume'].shift(1)
data = data.dropna()
# 划分训练集和测试集
train_size = int(len(data) * 0.8)
train_data = data[:train_size]
test_data = data[train_size:]
# 2. 模型定义
class StockPredictionLSTM(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, output_size):
super(StockPredictionLSTM, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
out, _ = self.lstm(x, (h0, c0))
out = self.fc(out[:, -1, :])
return out
# 3. 模型训练
class StockDataset(Dataset):
def __init__(self, data):
self.data = data
def __len__(self):
return len(self.data) - 1
def __getitem__(self, idx):
X = self.data.iloc[idx, :-1].values
y = self.data.iloc[idx + 1, -1]
return X, y
train_dataset = StockDataset(train_data)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
model = StockPredictionLSTM(input_size=3, hidden_size=64, num_layers=2, output_size=1)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
for epoch in range(100):
for X, y in train_loader:
optimizer.zero_grad()
output = model(X.float())
loss = criterion(output, y.unsqueeze(1).float())
loss.backward()
optimizer.step()
print(f'Epoch [{epoch+1}/100], Loss: {loss.item():.4f}')
# 4. 模型评估
with torch.no_grad():
X_test = torch.from_numpy(test_data[['lag_open', 'lag_close', 'lag_volume']].values).float()
y_test = torch.from_numpy(test_data['Close'].values).float()
y_pred = model(X_test)
rmse = torch.sqrt(criterion(y_pred, y_test.unsqueeze(1))).item()
r2 = 1 - torch.sum((y_test - y_pred.squeeze())**2) / torch.sum((y_test - torch.mean(y_test))**2)
print(f'RMSE: {rmse:.4f}')
print(f'R-squared: {r2:.4f}')
使用 OpenAI Gym 创建 CartPole-v0 环境
观察空间为杆子角度和位置,行动空间为左右移动
使用 PyTorch 的 nn 模块定义一个 DQN 智能体模型
输入为当前观察,输出为左右两个动作的 Q 值
使用 Adam 优化器进行 Q 值网络的监督训练
采用 epsilon-greedy 策略进行探索和利用
使用经验回放机制提高样本利用效率
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
# 1. 环境构建
env = gym.make('CartPole-v0')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
# 2. 智能体定义
class DQNAgent(nn.Module):
def __init__(self, state_size, action_size):
super(DQNAgent, self).__init__()
self.fc1 = nn.Linear(state_size, 64)
self.fc2 = nn.Linear(64, 32)
self.fc3 = nn.Linear(32, action_size)
def forward(self, x):
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
x = self.fc3(x)
return x
# 3. 训练过程
agent = DQNAgent(state_size, action_size)
optimizer = optim.Adam(agent.parameters(), lr=0.001)
criterion = nn.MSELoss()
epsilon = 1.0
epsilon_decay = 0.995
epsilon_min = 0.01
replay_memory = []
batch_size = 32
def train_agent(batch_size):
if len(replay_memory) < batch_size:
return
# 从经验回放中采样
samples = np.random.choice(len(replay_memory), batch_size)
states, actions, rewards, next_states, dones = zip(*[replay_memory[i] for i in samples])
# 计算 Q 目标值
states = torch.FloatTensor(states)
next_states = torch.FloatTensor(next_states)
actions = torch.LongTensor(actions).unsqueeze(1)
rewards = torch.FloatTensor(rewards).unsqueeze(1)
dones = torch.FloatTensor(dones).unsqueeze(1)
q_values = agent(states).gather(1, actions)
next_q_values = agent(next_states).max(1)[0].unsqueeze(1).detach()
q_targets = rewards + (1 - dones) * 0.99 * next_q_values
# 更新网络
optimizer.zero_grad()
loss = criterion(q_values, q_targets)
loss.backward()
optimizer.step()
# 4. 模型评估
def evaluate_agent(episodes=100):
total_rewards = []
for _ in range(episodes):
state = env.reset()
done = False
episode_reward = 0
while not done:
state = torch.FloatTensor(state).unsqueeze(0)
action = torch.argmax(agent(state)).item()
next_state, reward, done, _ = env.step(action)
episode_reward += reward
state = next_state
total_rewards.append(episode_reward)
return np.mean(total_rewards)
# 5. 训练与部署
rewards = []
for episode in range(1000):
state = env.reset()
done = False
episode_reward = 0
while not done:
if np.random.rand() < epsilon:
action = env.action_space.sample()
else:
state = torch.FloatTensor(state).unsqueeze(0)
action = torch.argmax(agent(state)).item()
next_state, reward, done, _ = env.step(action)
replay_memory.append((state, action, reward, next_state, done))
state = next_state
episode_reward += reward
train_agent(batch_size)
rewards.append(episode_reward)
epsilon = max(epsilon * epsilon_decay, epsilon_min)
if (episode + 1) % 10 == 0:
print(f'Episode {episode + 1}, Reward: {np.mean(rewards[-10:])}')
# 保存模型并部署到 Docker 容器中
torch.jit.script(agent).save('cartpole_dqn.pt')
在验证环境中评估智能体的累积奖励
绘制训练过程中的奖励曲线进行可视化分析
将训练好的 DQN 模型转换为 TorchScript 格式
部署到 Docker 容器中提供 CartPole 平衡服务
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
# 1. 数据预处理
train_data = datasets.CIFAR10(root='./data', train=True, download=True,
transform=transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
]))
test_data = datasets.CIFAR10(root='./data', train=False, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
]))
train_loader = DataLoader(train_data, batch_size=64, shuffle=True)
test_loader = DataLoader(test_data, batch_size=64, shuffle=False)
# 2. 定义模型
class ResNet(nn.Module):
def __init__(self, num_classes=10):
super(ResNet, self).__init__()
# 卷积层、池化层、全连接层的定义
self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1)
self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
self.fc = nn.Linear(64 * 8 * 8, num_classes)
def forward(self, x):
x = self.conv1(x)
x = self.pool(x)
x = x.view(-1, 64 * 8 * 8)
x = self.fc(x)
return x
model = ResNet()
# 3. 训练模型
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
num_epochs = 10
for epoch in range(num_epochs):
running_loss = 0.0
for i, data in enumerate(train_loader, 0):
inputs, labels = data
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
if i % 2000 == 1999:
print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
running_loss = 0.0
print('Finished Training')
# 4. 评估模型
correct = 0
total = 0
with torch.no_grad():
for data in test_loader:
images, labels = data
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print(f'Accuracy of the network on the 10000 test images: {100 * correct // total}%')
这个示例展示了 PyTorch 在图像分类任务中的典型用法:
使用 torchvision 加载 CIFAR10 数据集,并进行数据预处理和增强。
定义一个简单的 ResNet 模型结构,包含卷积层、池化层和全连接层。
使用交叉熵损失函数和 SGD 优化器对模型进行训练。
在测试集上评估训练好的模型的分类准确率
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。