赞
踩
二、实现
python 在线加载.cu 文件
python 加载c++ 方法采用的是torch.utils.cpp_extension包的扩展方法。
直接加载cu文件,则采用torch.utils.cpp_extension.load() 方法。
如:
from torch.utils.cpp_extension import load
#import fused_act as fused
module_path = os.path.dirname(__file__)
fused = load(
"fused",
sources=[
os.path.join(module_path, "fused_bias_act.cpp"),
os.path.join(module_path, "fused_bias_act_kernel.cu"),
],
)
具体: fused_bias_act.cpp
#include <torch/extension.h> torch::Tensor fused_bias_act_op(const torch::Tensor& input, const torch::Tensor& bias, const torch::Tensor& refer, int act, int grad, float alpha, float scale); #define CHECK_CUDA(x) TORCH_CHECK(x.type().is_cuda(), #x " must be a CUDA tensor") #define CHECK_CONTIGUOUS(x) TORCH_CHECK(x.is_contiguous(), #x " must be contiguous") #define CHECK_INPUT(x) CHECK_CUDA(x); CHECK_CONTIGUOUS(x) torch::Tensor fused_bias_act(const torch::Tensor& input, const torch::Tensor& bias, const torch::Tensor& refer, int act, int grad, float alpha, float scale) { CHECK_CUDA(input); CHECK_CUDA(bias); return fused_bias_act_op(input, bias, refer, act, grad, alpha, scale); } PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) { m.def("fused_bias_act", &fused_bias_act, "fused bias act (CUDA)"); }
fused_bias_act_kernel.cu
// Copyright (c) 2019, NVIDIA Corporation. All rights reserved. // // This work is made available under the Nvidia Source Code License-NC. // To view a copy of this license, visit // https://nvlabs.github.io/stylegan2/license.html #include <torch/types.h> #include <ATen/ATen.h> #include <ATen/AccumulateType.h> #include <ATen/cuda/CUDAContext.h> #include <ATen/cuda/CUDAApplyUtils.cuh> #include <cuda.h> #include <cuda_runtime.h> template <typename scalar_t> static __global__ void fused_bias_act_kernel(scalar_t* out, const scalar_t* p_x, const scalar_t* p_b, const scalar_t* p_ref, int act, int grad, scalar_t alpha, scalar_t scale, int loop_x, int size_x, int step_b, int size_b, int use_bias, int use_ref) { int xi = blockIdx.x * loop_x * blockDim.x + threadIdx.x; scalar_t zero = 0.0; for (int loop_idx = 0; loop_idx < loop_x && xi < size_x; loop_idx++, xi += blockDim.x) { scalar_t x = p_x[xi]; if (use_bias) { x += p_b[(xi / step_b) % size_b]; } scalar_t ref = use_ref ? p_ref[xi] : zero; scalar_t y; switch (act * 10 + grad) { default: case 10: y = x; break; case 11: y = x; break; case 12: y = 0.0; break; case 30: y = (x > 0.0) ? x : x * alpha; break; case 31: y = (ref > 0.0) ? x : x * alpha; break; case 32: y = 0.0; break; } out[xi] = y * scale; } } torch::Tensor fused_bias_act_op(const torch::Tensor& input, const torch::Tensor& bias, const torch::Tensor& refer, int act, int grad, float alpha, float scale) { int curDevice = -1; cudaGetDevice(&curDevice); cudaStream_t stream = at::cuda::getCurrentCUDAStream(curDevice); auto x = input.contiguous(); auto b = bias.contiguous(); auto ref = refer.contiguous(); int use_bias = b.numel() ? 1 : 0; int use_ref = ref.numel() ? 1 : 0; int size_x = x.numel(); int size_b = b.numel(); int step_b = 1; for (int i = 1 + 1; i < x.dim(); i++) { step_b *= x.size(i); } int loop_x = 4; int block_size = 4 * 32; int grid_size = (size_x - 1) / (loop_x * block_size) + 1; auto y = torch::empty_like(x); AT_DISPATCH_FLOATING_TYPES_AND_HALF(x.scalar_type(), "fused_bias_act_kernel", [&] { fused_bias_act_kernel<scalar_t><<<grid_size, block_size, 0, stream>>>( y.data_ptr<scalar_t>(), x.data_ptr<scalar_t>(), b.data_ptr<scalar_t>(), ref.data_ptr<scalar_t>(), act, grad, alpha, scale, loop_x, size_x, step_b, size_b, use_bias, use_ref ); }); return y; }
test.py
import os import torch from torch import nn from torch.nn import functional as F from torch.autograd import Function from torch.utils.cpp_extension import load #import fused_act as fused module_path = os.path.dirname(__file__) fused = load( "fused", sources=[ os.path.join(module_path, "fused_bias_act.cpp"), os.path.join(module_path, "fused_bias_act_kernel.cu"), ], ) class FusedLeakyReLUFunctionBackward(Function): @staticmethod def forward(ctx, grad_output, out, negative_slope, scale): ctx.save_for_backward(out) ctx.negative_slope = negative_slope ctx.scale = scale empty = grad_output.new_empty(0) grad_input = fused.fused_bias_act( grad_output, empty, out, 3, 1, negative_slope, scale ) dim = [0] if grad_input.ndim > 2: dim += list(range(2, grad_input.ndim)) grad_bias = grad_input.sum(dim).detach() return grad_input, grad_bias @staticmethod def backward(ctx, gradgrad_input, gradgrad_bias): out, = ctx.saved_tensors gradgrad_out = fused.fused_bias_act( gradgrad_input, gradgrad_bias, out, 3, 1, ctx.negative_slope, ctx.scale ) return gradgrad_out, None, None, None class FusedLeakyReLUFunction(Function): @staticmethod def forward(ctx, input, bias, negative_slope, scale): empty = input.new_empty(0) out = fused.fused_bias_act(input, bias, empty, 3, 0, negative_slope, scale) ctx.save_for_backward(out) ctx.negative_slope = negative_slope ctx.scale = scale return out @staticmethod def backward(ctx, grad_output): out, = ctx.saved_tensors grad_input, grad_bias = FusedLeakyReLUFunctionBackward.apply( grad_output, out, ctx.negative_slope, ctx.scale ) return grad_input, grad_bias, None, None class FusedLeakyReLU(nn.Module): def __init__(self, channel, negative_slope=0.2, scale=2 ** 0.5): super().__init__() self.bias = nn.Parameter(torch.zeros(channel)) self.negative_slope = negative_slope self.scale = scale def forward(self, input): return fused_leaky_relu(input, self.bias, self.negative_slope, self.scale) def fused_leaky_relu(input, bias, negative_slope=0.2, scale=2 ** 0.5): if input.device.type == "cpu": rest_dim = [1] * (input.ndim - bias.ndim - 1) return ( F.leaky_relu( input + bias.view(1, bias.shape[0], *rest_dim), negative_slope=0.2 ) * scale ) else: return FusedLeakyReLUFunction.apply(input, bias, negative_slope, scale) if __name__ == '__main__': x=FusedLeakyReLU(3) res=x(torch.tensor(0.5)) print(res)
2 python 加载.cu 文件方式2
采用setup 与torch.utils.cpp_extension.CUDAExtension() 方法将c++ 文件编译,然后直接加载。
>> python setup.py install
则该包被安装到环境中。
使用:
import os import torch from torch import nn from torch.nn import functional as F from torch.autograd import Function from torch.utils.cpp_extension import load import fused_act as fused module_path = os.path.dirname(__file__) # fused = load( # "fused", # sources=[ # os.path.join(module_path, "fused_bias_act.cpp"), # os.path.join(module_path, "fused_bias_act_kernel.cu"), # ], # ) class FusedLeakyReLUFunctionBackward(Function): @staticmethod def forward(ctx, grad_output, out, negative_slope, scale): ctx.save_for_backward(out) ctx.negative_slope = negative_slope ctx.scale = scale empty = grad_output.new_empty(0) grad_input = fused.fused_bias_act( grad_output, empty, out, 3, 1, negative_slope, scale ) dim = [0] if grad_input.ndim > 2: dim += list(range(2, grad_input.ndim)) grad_bias = grad_input.sum(dim).detach() return grad_input, grad_bias @staticmethod def backward(ctx, gradgrad_input, gradgrad_bias): out, = ctx.saved_tensors gradgrad_out = fused.fused_bias_act( gradgrad_input, gradgrad_bias, out, 3, 1, ctx.negative_slope, ctx.scale ) return gradgrad_out, None, None, None class FusedLeakyReLUFunction(Function): @staticmethod def forward(ctx, input, bias, negative_slope, scale): empty = input.new_empty(0) out = fused.fused_bias_act(input, bias, empty, 3, 0, negative_slope, scale) ctx.save_for_backward(out) ctx.negative_slope = negative_slope ctx.scale = scale return out @staticmethod def backward(ctx, grad_output): out, = ctx.saved_tensors grad_input, grad_bias = FusedLeakyReLUFunctionBackward.apply( grad_output, out, ctx.negative_slope, ctx.scale ) return grad_input, grad_bias, None, None class FusedLeakyReLU(nn.Module): def __init__(self, channel, negative_slope=0.2, scale=2 ** 0.5): super().__init__() self.bias = nn.Parameter(torch.zeros(channel)) self.negative_slope = negative_slope self.scale = scale def forward(self, input): return fused_leaky_relu(input, self.bias, self.negative_slope, self.scale) def fused_leaky_relu(input, bias, negative_slope=0.2, scale=2 ** 0.5): if input.device.type == "cpu": rest_dim = [1] * (input.ndim - bias.ndim - 1) return ( F.leaky_relu( input + bias.view(1, bias.shape[0], *rest_dim), negative_slope=0.2 ) * scale ) else: return FusedLeakyReLUFunction.apply(input, bias, negative_slope, scale) if __name__ == '__main__': x=FusedLeakyReLU(3) res=x(torch.tensor(0.5)) print(res)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。