框架模型层
实现了一个基于Vision Transformer (ViT) 的图像分类模型,并且使用了intel_extension_for_pytorch (IPEX)库在Intel GPU上进行优化和推理。代码中主要包括几个模块,如补丁嵌入、注意力机制、多层感知机 (MLP)、Transformer 块和最终的 Vision Transformer (ViT) 模型。
- 将输入的图像切分为多个小块(patches),并通过卷积层将其投影到高维空间。img_size: 输入图像的尺寸(默认224x224)。patch_size: 每个补丁的尺寸(默认16x16)。in_channels: 输入图像的通道数(默认3,RGB图像)。embed_dim: 嵌入维度,即每个补丁映射到的高维空间大小(默认768)。
- 现了多头自注意力机制,每个输入token通过注意力机制计算与其他token的关系。embed_dim: 输入嵌入的维度。num_heads: 多头注意力机制的头数。
- 在 forward 函数中,首先计算Q, K, V (查询、键和值)矩阵,然后通过缩放点积注意力公式进行计算。最后,将结果投影回嵌入维度。
- MLP通过两层线性层和GELU激活函数来处理输入数据,并通过Dropout层来增加模型的鲁棒性。in_features: 输入特征数。hidden_features: 隐藏层的特征数。out_features: 输出特征数。dropout: 随机失活比例(默认0)。
- ransformerBlock 类是Transformer中的一个块,由多头注意力层和前馈网络(MLP)组成。embed_dim: 输入嵌入维度。num_heads: 注意力头的数量。mlp_ratio: MLP隐藏层的维度相对于嵌入维度的比率。dropout: 用于MLP的Dropout比例。
- 在前向传播中,首先对输入进行规范化并应用注意力机制,然后将结果输入到MLP层。每一步使用了残差连接来避免梯度消失问题。
- ViT模型是Transformer架构在计算机视觉任务中的应用。它首先将图像分割为补丁,使用补丁嵌入后再通过多个TransformerBlock处理。
- 确定设备(Intel GPU 或 CPU)。创建 ViT 模型并移动到指定设备。使用 IPEX 进行模型优化,提升推理效率。创建一个随机的输入张量(dummy_input)并进行推理。进行多次推理并计算推理的FPS(每秒帧数)。
- 通过 time 库测量推理过程的总耗时,然后计算每秒处理的帧数 (FPS)。
import torch
import intel_extension_for_pytorch as ipex
from torch import nn
import time
class PatchEmbedding(nn.Module):
def __init__(self, img_size=224, patch_size=16, in_channels=3, embed_dim=768):
super(PatchEmbedding, self).__init__()
self.img_size = img_size
self.patch_size = patch_size
self.grid_size = img_size // patch_size
self.num_patches = self.grid_size ** 2
self.proj = nn.Conv2d(in_channels, embed_dim, kernel_size=patch_size, stride=patch_size)
def forward(self, x):
x = self.proj(x)
x = x.flatten(2)
x = x.transpose(1, 2) # (B, N, D)
return x
class Attention(nn.Module):
def __init__(self, embed_dim, num_heads):
super(Attention, self).__init__()
self.num_heads = num_heads
self.head_dim = embed_dim // num_heads
self.scale = self.head_dim ** -0.5
self.qkv = nn.Linear(embed_dim, embed_dim * 3)
self.proj = nn.Linear(embed_dim, embed_dim)
def forward(self, x):
B, N, C = x.shape
qkv = self.qkv(x).reshape(B, N, 3, self.num_heads, self.head_dim)
qkv = qkv.permute(2, 0, 3, 1, 4) # (3, B, num_heads, N, head_dim)
q, k, v = qkv[0], qkv[1], qkv[2]
attn = (q @ k.transpose(-2, -1)) * self.scale
attn = attn.softmax(dim=-1)
x = (attn @ v).transpose(1, 2).reshape(B, N, C)
x = self.proj(x)
return x
class MLP(nn.Module):
def __init__(self, in_features, hidden_features, out_features, dropout=0.):
super(MLP, self).__init__()
self.fc1 = nn.Linear(in_features, hidden_features)
self.act = nn.GELU()
self.fc2 = nn.Linear(hidden_features, out_features)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
x = self.fc1(x)
x = self.act(x)
x = self.dropout(x)
x = self.fc2(x)
x = self.dropout(x)
return x
class TransformerBlock(nn.Module):
def __init__(self, embed_dim, num_heads, mlp_ratio=4., dropout=0., attention_dropout=0.):
super(TransformerBlock, self).__init__()
self.norm1 = nn.LayerNorm(embed_dim)
self.attn = Attention(embed_dim, num_heads)
self.norm2 = nn.LayerNorm(embed_dim)
mlp_hidden_dim = int(embed_dim * mlp_ratio)
self.mlp = MLP(embed_dim, mlp_hidden_dim, embed_dim, dropout)
def forward(self, x):
x = x + self.attn(self.norm1(x))
x = x + self.mlp(self.norm2(x))
return x
class ViT(nn.Module):
def __init__(self, img_size=224, patch_size=16, in_channels=3, num_classes=1000, embed_dim=768, depth=12, num_heads=12, mlp_ratio=4., dropout=0., attention_dropout=0.):
super(ViT, self).__init__()
self.patch_embed = PatchEmbedding(img_size, patch_size, in_channels, embed_dim)
num_patches = self.patch_embed.num_patches
self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim))
self.pos_embed = nn.Parameter(torch.zeros(1, num_patches + 1, embed_dim))
self.dropout = nn.Dropout(dropout)
self.blocks = nn.Sequential(
*[TransformerBlock(embed_dim, num_heads, mlp_ratio, dropout, attention_dropout) for _ in range(depth)]
)
self.norm = nn.LayerNorm(embed_dim)
self.head = nn.Linear(embed_dim, num_classes)
def forward(self, x):
B = x.shape[0]
x = self.patch_embed(x)
cls_tokens = self.cls_token.expand(B, -1, -1)
x = torch.cat((cls_tokens, x), dim=1)
x = x + self.pos_embed
x = self.dropout(x)
x = self.blocks(x)
x = self.norm(x)
cls_token_final = x[:, 0]
x = self.head(cls_token_final)
return x
#---------------------------------在Intel GPU测试-----------------------------------------------------
def main():
# 确定设备
device = torch.device("xpu" if torch.xpu.is_available() else "cpu")
print(f"Using device: {device}")
# 创建模型并移动到指定设备
model = ViT(img_size=224).to(device)
# 设置模型为评估模式
model.eval()
# 使用 IPEX 进行优化
model = ipex.optimize(model)
# 创建一个随机输入张量,形状为 (batch_size, channels, height, width)
dummy_input = torch.randn(1, 3, 224, 224).to(device)
t_start = time.time()
iterations = 12800
for _ in range(iterations):
with torch.no_grad():
outputs = model(dummy_input)
elapsed_time = time.time() - t_start
latency = elapsed_time / iterations * 1000
FPS = 1000 / latency
print(f"FPS: {FPS:.2f}")
# 打印输出形状
print(outputs.shape)
if __name__ == "__main__":
main()
结果
Using device: xpu
FPS: 153.79
torch.Size([1, 1000])
使用了Intel的扩展库 intel_extension_for_pytorch (ipex),并通过Intel的硬件加速设备(A770 GPU)执行训练任务。代码对经典的ResNet50模型进行训练,数据集是CIFAR-10。
- 设置学习率 LR 为 0.001。DOWNLOAD = True 表示如果本地没有数据集,则下载 CIFAR-10 数据集。DATA 指定了数据集保存的路径。
- 使用 torchvision.transforms 对CIFAR-10数据集进行预处理:将图片调整为 224x224 像素(ResNet50 的输入尺寸)。将图片数据转换为张量。对图像数据进行归一化,将像素值调整到 [-1, 1] 的范围。加载CIFAR-10训练数据集,将每个 batch 的大小设置为 256。
- 使用预定义的 ResNet50 模型,不加载预训练权重。定义交叉熵损失 CrossEntropyLoss(),适用于分类任务。使用 SGD(随机梯度下降)优化器,学习率 LR 为 0.001,动量 momentum 为 0.9。
- 使用 model.to("xpu") 和 criterion.to("xpu") 将模型和损失函数转移到Intel扩展提供的加速设备上(如Intel A770 GPU)。"xpu" 是Intel GPU的设备标识。调用 ipex.optimize() 来优化模型和优化器,以便在Intel设备上高效运行,并且设置数据类型为 bfloat16,这是一种混合精度加速方案,能够提升性能和内存效率。
- 使用 torch.xpu.amp.autocast 进行自动混合精度训练 (bfloat16),这有助于提升性能,尤其是在Intel设备上。计算输出 output 和损失 loss,然后反向传播 loss.backward() 并更新权重 optimizer.step()。
- 使用 torch.save() 保存模型和优化器的状态,以便后续恢复或继续训练。
import torch
import torchvision
############# code changes ###############
import intel_extension_for_pytorch as ipex
############# code changes ###############
LR = 0.001
DOWNLOAD = True
DATA = "/home/dev/datasets"
transform = torchvision.transforms.Compose(
[
torchvision.transforms.Resize((224, 224)),
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
]
)
train_dataset = torchvision.datasets.CIFAR10(
root=DATA,
train=True,
transform=transform,
download=DOWNLOAD,
)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=256)
model = torchvision.models.resnet50()
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=LR, momentum=0.9)
model.train()
##################################### code changes ################################
model = model.to("xpu")
criterion = criterion.to("xpu")
model, optimizer = ipex.optimize(model, optimizer=optimizer, dtype=torch.bfloat16)
##################################### code changes ################################
for batch_idx, (data, target) in enumerate(train_loader):
optimizer.zero_grad()
######################### code changes #########################
data = data.to("xpu")
target = target.to("xpu")
with torch.xpu.amp.autocast(enabled=True, dtype=torch.bfloat16):
######################### code changes #########################
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
print(batch_idx)
torch.save(
{
"model_state_dict": model.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
},
"/home/dev/datasets/checkpoint.pth",
)
print("Execution finished")
结果
[2024-10-14 01:22:30,041] [INFO] [real_accelerator.py:203:get_accelerator] Setting ds_accelerator to xpu (auto detect)
[WARNING] async_io requires the dev libaio .so object and headers but these were not found.
[WARNING] async_io: please install the libaio-dev package with apt
[WARNING] If libaio is already installed (perhaps from source), try setting the CFLAGS and LDFLAGS environment variables to where it can be found.
0
1
...
195
Execution finished
实现了一个基于U-Net架构的深度学习模型,并使用Intel的PyTorch扩展(intel_extension_for_pytorch)进行优化。
- 定义了一个双卷积层,包括两个卷积操作和两个ReLU激活函数。用于提取特征。
- 定义了下采样层,使用最大池化操作将输入特征图的大小减半。
- 使用转置卷积(上卷积)对输入特征图进行上采样。对来自下采样路径的特征图进行中心裁剪,并将其与上采样路径的特征图拼接。
- 实现U-Net的前向传播过程,包括下采样、上采样和特征图拼接。
- 确定使用的设备(XPU或CPU)。初始化U-Net模型并进行优化。生成随机输入张量,并多次进行模型推理以测量性能。输出每秒帧数(FPS)和模型输出的形状。
import torch
import intel_extension_for_pytorch as ipex # 引入Intel的PyTorch扩展
import time
from torch import nn
import torchvision.transforms.functional as F
class DoubleConvolution(nn.Module):
def __init__(self, in_channels: int, out_channels: int):
super().__init__()
self.first = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1)
self.act1 = nn.ReLU()
self.second = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
self.act2 = nn.ReLU()
def forward(self, x: torch.Tensor):
x = self.first(x)
x = self.act1(x)
x = self.second(x)
return self.act2(x)
class DownSample(nn.Module):
def __init__(self):
super().__init__()
# Max pooling layer
self.pool = nn.MaxPool2d(2)
def forward(self, x: torch.Tensor):
return self.pool(x)
class UpSample(nn.Module):
def __init__(self, in_channels: int, out_channels: int):
super().__init__()
# Up-convolution
self.up = nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2)
def forward(self, x: torch.Tensor):
return self.up(x)
class CropAndConcat(nn.Module):
def forward(self, x: torch.Tensor, contracting_x: torch.Tensor):
contracting_x = F.center_crop(contracting_x, [x.shape[2], x.shape[3]])
x = torch.cat([x, contracting_x], dim=1)
return x
class unet(nn.Module):
def __init__(self, in_channels=3, out_channels=19):
super().__init__()
self.down_conv = nn.ModuleList([DoubleConvolution(i, o) for i, o in
[(in_channels, 64), (64, 128), (128, 256), (256, 512)]])
self.down_sample = nn.ModuleList([DownSample() for _ in range(4)])
self.middle_conv = DoubleConvolution(512, 1024)
self.up_sample = nn.ModuleList([UpSample(i, o) for i, o in
[(1024, 512), (512, 256), (256, 128), (128, 64)]])
self.up_conv = nn.ModuleList([DoubleConvolution(i, o) for i, o in
[(1024, 512), (512, 256), (256, 128), (128, 64)]])
self.concat = nn.ModuleList([CropAndConcat() for _ in range(4)])
self.final_conv = nn.Conv2d(64, out_channels, kernel_size=1)
def forward(self, x: torch.Tensor):
pass_through = []
for i in range(len(self.down_conv)):
x = self.down_conv[i](x)
pass_through.append(x)
x = self.down_sample[i](x)
x = self.middle_conv(x)
for i in range(len(self.up_conv)):
x = self.up_sample[i](x)
x = self.concat[i](x, pass_through.pop())
x = self.up_conv[i](x)
x = self.final_conv(x)
return x
def main():
# 确定设备
device = torch.device("xpu" if torch.xpu.is_available() else "cpu")
print(f"Using device: {device}")
model = unet(out_channels=1000).to(device)
model.eval()
model = ipex.optimize(model)
input_tensor = torch.randn(1, 3, 224, 224).to(device)
t_start = time.time()
iterations = 128
for _ in range(iterations):
with torch.no_grad():
outputs = model(input_tensor)
elapsed_time = time.time() - t_start
latency = elapsed_time / iterations * 1000
FPS = 1000 / latency
print(f"FPS: {FPS:.2f}")
print(f'Output shape: {outputs.shape}')
if __name__ == '__main__':
main()
结果
Using device: xpu
FPS: 109.66
Output shape: torch.Size([1, 1000, 224, 224])
使用Intel的PyTorch扩展(intel_extension_for_pytorch)通过加载预训练的DPT模型实现了对输入图像的语义分割,输出分割结果并与原图融合,最终保存为文件。
- 通过requests库下载指定URL的图像,并使用PIL的Image.open方法打开该图像。
- 加载预训练的DPT图像处理器和语义分割模型。检查是否有可用的GPU,并将模型移动到相应的设备上。
- 使用特征提取器对图像进行预处理,将其转换为模型所需的张量格式。进行多次模型推理以测量性能,并计算每秒帧数(FPS)。
- 使用双线性插值调整logits的大小,以匹配原图的尺寸。通过torch.argmax获取每个像素的预测类别。
- 预测的张量移到CPU并转换为NumPy数组,然后将其转换为PIL图像。使用Image.blend将原图与分割图进行融合,设置融合的透明度为0.5。
- 指定保存路径,检查输出文件夹是否存在,若不存在则创建。将融合后的图像保存为PNG文件,并输出保存的路径。
from transformers import DPTFeatureExtractor, DPTForSemanticSegmentation, DPTImageProcessor
from PIL import Image
import intel_extension_for_pytorch as ipex # 引入Intel的PyTorch扩展
import requests
import torch
import os
import time
# 获取图片
url = "http://images.cocodataset.org/val2017/000000026204.jpg"
image = Image.open(requests.get(url, stream=True).raw)
# 加载模型和特征提取器
feature_extractor = DPTImageProcessor.from_pretrained("Intel/dpt-large-ade")
model = DPTForSemanticSegmentation.from_pretrained("Intel/dpt-large-ade")
# 检查是否有可用的GPU
device = torch.device("xpu" if torch.xpu.is_available() else "cpu")
print(f"Using device: {device}")
# 将模型移到GPU
model.to(device)
# 将图像输入到模型,并转换为张量
inputs = feature_extractor(images=image, return_tensors="pt").to(device)
t_start = time.time()
iterations = 128
# 模型推理,输出logits
for _ in range(iterations):
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
elapsed_time = time.time() - t_start
latency = elapsed_time / iterations * 1000
FPS = 1000 / latency
print(f"FPS: {FPS:.2f}")
# logits的大小
# print(logits.shape)
# 插值调整logits大小
prediction = torch.nn.functional.interpolate(
logits,
size=image.size[::-1], # 反转尺寸 (width, height)
mode="bicubic",
align_corners=False
)
# 转换logits为类别预测
prediction = torch.argmax(prediction, dim=1) + 1
# 移除维度
prediction = prediction.squeeze()
# 将预测张量移到CPU并转换为numpy数组
prediction = prediction.cpu().numpy()
# 将预测数组转换为图像
predicted_seg = Image.fromarray(prediction.astype('uint8'))
# 定义ADE20K调色板
adepallete = [
0,0,0,120,120,120,180,120,120,6,230,230,80,50,50,4,200,3,120,120,80,
140,140,140,204,5,255,230,230,230,4,250,7,224,5,255,235,255,7,150,5,
61,120,120,70,8,255,51,255,6,82,143,255,140,204,255,4,255,51,7,204,70,
3,0,102,200,61,230,250,255,6,51,11,102,255,255,7,71,255,9,224,9,7,230,
220,220,220,255,9,92,112,9,255,8,255,214,7,255,224,255,184,6,10,255,71,
255,41,10,7,255,255,224,255,8,102,8,255,255,61,6,255,194,7,255,122,8,0,
255,20,255,8,41,255,5,153,6,51,255,235,12,255,160,150,20,0,163,255,140,
140,140,250,10,15,20,255,0,31,255,0,255,31,0,255,224,0,153,255,0,0,0,255,
255,71,0,0,235,255,0,173,255,31,0,255,11,200,200,255,82,0,0,255,245,0,61,
255,0,255,112,0,255,133,255,0,0,255,163,0,255,102,0,194,255,0,0,143,255,
51,255,0,0,82,255,0,255,41,0,255,173,10,0,255,173,255,0,0,255,153,255,92,
0,255,0,255,255,0,245,255,0,102,255,173,0,255,0,20,255,184,184,0,31,255,
0,255,61,0,71,255,255,0,204,0,255,194,0,255,82,0,10,255,0,112,255,51,0,
255,0,194,255,0,122,255,0,255,163,255,153,0,0,255,10,255,112,0,143,255,0,
82,0,255,163,255,0,255,235,0,8,184,170,133,0,255,0,255,92,184,0,255,255,
0,31,0,184,255,0,214,255,255,0,112,92,255,0,0,224,255,112,224,255,70,184,
160,163,0,255,153,0,255,71,255,0,255,0,163,255,204,0,255,0,143,0,255,235,
133,255,0,255,0,235,245,0,255,255,0,122,255,245,0,10,190,212,214,255,0,0,
204,255,20,0,255,255,255,0,0,153,255,0,41,255,0,255,204,41,0,255,41,255,0,
173,0,255,0,245,255,71,0,255,122,0,255,0,255,184,0,92,255,184,255,0,0,133,
255,255,214,0,25,194,194,102,255,0,92,0,255
]
# 应用调色板到预测分割图像
predicted_seg.putpalette(adepallete)
# 将原图和分割图像融合
out = Image.blend(image, predicted_seg.convert("RGB"), alpha=0.5)
# 指定保存路径
name = "street"
output_folder = "/home/dev/datasets"
# 确保输出文件夹存在
if not os.path.exists(output_folder):
os.makedirs(output_folder)
# 拼接完整的路径和文件名
depth_image_path = os.path.join(output_folder, name + "_depth_image.png")
# 保存图像
out.save(depth_image_path)
print(f"图像已保存为 {depth_image_path}")
结果
Using device: xpu
FPS: 13.28
图像已保存为 /home/dev/datasets/street_depth_image.png
得到的语义分割图像如下
利用 StableDiffusionLDM3DPipeline 模型从提示词生成3D相关的图像(包括RGB图像和深度图像),并保存到指定路径。
- 通过 from_pretrained 方法从Hugging Face Model Hub加载预训练的 ldm3d-4c 模型。这个模型是特定于3D图像生成的稳定扩散模型。
- prompt: 定义生成图像时要使用的文本提示。name: 将生成的文件命名为“lemons”,以便后续保存时使用。
- 定义推理循环次数(iterations = 128),即模型将重复运行128次。在每次迭代中,通过 pipe(prompt) 使用提示词进行推理,返回结果包括 rgb_image 和 depth_image。计算推理总时间,并根据迭代次数计算平均延迟(每次推理所需的时间,单位为毫秒)和FPS(每秒帧数)。打印FPS,表示模型生成图像的速度。
- 生成的图像保存路径分别为RGB图像的 .jpg 文件和深度图像的 .png 文件。使用 save() 方法将图像保存到指定的文件路径中。
from diffusers import StableDiffusionLDM3DPipeline
import time
import torch
import intel_extension_for_pytorch as ipex # 引入Intel的PyTorch扩展
import os
# 加载模型
pipe = StableDiffusionLDM3DPipeline.from_pretrained("Intel/ldm3d-4c")
# 检查可用设备(xpu 或 CPU)
device = torch.device("xpu" if torch.xpu.is_available() else "cpu")
print(f"使用设备: {device}")
# 移到选择的设备
pipe.to(device)
# 定义提示词和输出名称
prompt = "A picture of some lemons on a table"
name = "lemons"
# 开始推理计时
t_start = time.time()
iterations = 128
# 模型推理
for _ in range(iterations):
output = pipe(prompt)
rgb_image, depth_image = output.rgb, output.depth
# 计算经过时间和 FPS
elapsed_time = time.time() - t_start
latency = elapsed_time / iterations * 1000 # 转换为毫秒
FPS = 1000 / latency
print(f"FPS: {FPS:.2f}")
# 指定输出路径
output_folder = "/home/dev/datasets"
# 确保输出文件夹存在
if not os.path.exists(output_folder):
os.makedirs(output_folder)
# 拼接完整的路径和文件名
rgb_image_path = os.path.join(output_folder, f"{name}_ldm3d_4c_rgb.jpg")
depth_image_path = os.path.join(output_folder, f"{name}_ldm3d_4c_depth.png")
# 保存图像
rgb_image[0].save(rgb_image_path)
depth_image[0].save(depth_image_path)
print(f"RGB 图像已保存至: {rgb_image_path}")
print(f"深度图像已保存至: {depth_image_path}")
结果
Loading pipeline components...: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████| 7/7 [00:00<00:00, 22.55it/s]
使用设备: xpu
100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 49/49 [00:14<00:00, 3.39it/s]
...
100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 49/49 [00:11<00:00, 4.20it/s]
FPS: 0.08
RGB 图像已保存至: /home/dev/datasets/lemons_ldm3d_4c_rgb.jpg
深度图像已保存至: /home/dev/datasets/lemons_ldm3d_4c_depth.png
生成的图像如下
深度图像生成如下
使用 Ernie 3.0 模型 进行推理,并且根据不同模式(GPU 或 TPU)来评估模型的性能,包括推理时间(延迟和帧率)以及模型的 FLOPs(浮点运算次数)和参数数量。
- GPU模式:检查 GPU 是否可用(XPU 设备),下载并加载 Ernie 3.0 模型,并将输入数据传输到设备。
- text: 需要输入的文本。max_length: 最大文本长度。model_path 和 tokenizer_path: 模型和分词器的路径。
- profile 函数通过 thop 库计算模型的 FLOPs(浮点运算次数)。计算模型的可训练参数数量,并将 FLOPs 转换为 GFLOPs(十亿次浮点运算)和参数数量转换为百万级别。
- 使用模型的前向传播函数 (self.model(**self.inputs)) 执行推理并返回输出。运行100次推理,计算平均延迟(latency)和帧率(FPS)。输出模型的 FLOPs 和参数量。
import os
import time
import torch
import intel_extension_for_pytorch as ipex # 引入Intel的PyTorch扩展
import requests
from transformers import BertTokenizer, ErnieModel
# from tpu_perf.infer import SGInfer
from thop import profile
import numpy as np
def download_model_weights(model_path):
if not os.path.exists(os.path.join(model_path, 'pytorch_model.bin')):
model_url = "https://huggingface.co/nghuyong/ernie-3.0-medium-zh/resolve/main/pytorch_model.bin?download=true"
response = requests.get(model_url)
if response.status_code == 200:
with open(os.path.join(model_path, 'pytorch_model.bin'), 'wb') as f:
f.write(response.content)
print("权重下载完成。")
else:
print("权重下载失败")
class ernie3:
def __init__(self, mode='gpu', text="Hello, how are you?", max_length=256, model_path='/home/dev/datasets/vocab', tokenizer_path='/home/dev/datasets/vocab'):
self.mode = mode
self.text = text
self.max_length = max_length
self.tokenizer_path = tokenizer_path
self.model_path = model_path
self.tokenizer = BertTokenizer.from_pretrained(tokenizer_path)
if mode == 'gpu':
self.device = torch.device("xpu" if torch.xpu.is_available() else "cpu")
download_model_weights(model_path)
self.model = ErnieModel.from_pretrained(model_path).to(self.device)
self.inputs = self.tokenizer(text=self.text, return_tensors='pt', padding='max_length', max_length=self.max_length).to(self.device)
elif mode == 'tpu':
self.inputs = self.tokenizer(text=text, return_tensors='pt', padding='max_length', max_length=max_length)
self.input_ids = self.inputs['input_ids'].numpy().astype(np.int32)
else:
raise ValueError("Mode should be either 'gpu' or 'tpu'")
def count_parameters_and_flops(self):
flops, _ = profile(self.model, (self.inputs.input_ids, self.inputs.attention_mask), verbose=False)
params = sum(p.numel() for p in self.model.parameters() if p.requires_grad)
return flops / 1e9 * 2, params / 1e6
def forward(self):
if self.mode == 'gpu':
outputs = self.model(**self.inputs)
return outputs
elif self.mode == 'tpu':
return self.input_ids
else:
raise ValueError("Mode should be either 'gpu' or 'tpu'")
#---------------------------------intel XPU测试-----------------------------------------------------
if __name__ == '__main__':
mode = 'gpu'
model = ernie3(mode=mode)
if mode == 'gpu':
iterations = 100
t_start = time.time()
for _ in range(iterations):
with torch.no_grad():
outputs = model.forward()
elapsed_time = time.time() - t_start
flops, params = model.count_parameters_and_flops()
latency = elapsed_time / iterations * 1000
FPS = 1000 / latency
print(f"FPS: {FPS:.2f}")
print(f"Latency: {latency:.2f} ms")
print(f"FLOPs: {flops} GFLOPs")
print(f"Parameters: {params} Million")
elif mode == 'tpu':
bmodel_path = "/home/aii-works/Benchmark/bmodel/language/nlp/ernie3/ernie3_1684x_f32.bmodel"
net = SGInfer(bmodel_path, devices=[0])
input = model.forward()
iterations = 100
t_start = time.time()
for _ in range(iterations):
output = net.infer_one(input)
elapsed_time = time.time() - t_start
latency = elapsed_time / iterations * 1000
FPS = 1000 / latency
print(f"FPS: {FPS:.2f}")
print(f"Latency: {latency:.2f} ms")
结果
FPS: 170.75
Latency: 5.86 ms
FLOPs: 21.764898816 GFLOPs
Parameters: 75.427584 Million
实现了用于生成文本的脚本,基于Hugging Face的Transformers库和Intel® Extension for PyTorch* (IPEX) 进行优化。它使用了GPT-2模型,并通过命令行参数来控制一些生成行为。
- 通过 argparse 来接受命令行参数。包括:dtype: 选择数据类型,可以是 float32 或 bfloat16,分别表示全精度和半精度浮点数。max-new-tokens: 生成的新tokens的最大数量。prompt: 输入的提示词,用于生成文本。greedy: 是否启用贪婪搜索,默认为 False,否则会使用beam search。batch-size: 生成文本时处理的批次大小。
- 根据 dtype 参数决定是否启用自动混合精度(AMP)。如果使用 bfloat16,则 amp_enabled 为 True,启用自动混合精度加速推理。
- 使用GPT-2模型,配置通过 AutoConfig 加载,并通过 AutoModelForCausalLM 加载模型的预训练权重。分词器 (tokenizer) 用于将自然语言转换为模型可以处理的输入格式。
- model.eval() 将模型设置为评估模式,禁用dropout等训练时特有的行为。channels_last 内存格式可以提升Intel硬件上的性能。ipex.llm.optimize 通过 Intel® Extension for PyTorch* 进一步优化模型的执行速度和内存使用。
- generate_kwargs 包含生成文本时的相关参数,如不进行采样 (do_sample=False),使用温度参数来控制文本生成的随机性。
- tokenizer 将输入提示 (prompt) 转换为token形式,并返回相应的张量。input_size 计算输入提示的token数量。提示被复制成多批次输入,用于处理多个输入(取决于 batch_size)。
- 使用 torch.inference_mode() 禁用梯度计算,以减少内存开销并加速推理。torch.cpu.amp.autocast 用于自动混合精度的推理(如果启用)。model.generate 根据输入提示生成新的文本,生成的长度由 max_new_tokens 参数控制。最后,通过 tokenizer.batch_decode 将生成的tokens解码为可读的文本,并打印生成结果及新增的token数量。
import torch
#################### code changes ####################
import intel_extension_for_pytorch as ipex
######################################################
import argparse
from transformers import (
AutoConfig,
AutoModelForCausalLM,
AutoTokenizer,
)
# args
parser = argparse.ArgumentParser("Generation script (fp32/bf16 path)", add_help=False)
parser.add_argument(
"--dtype",
type=str,
choices=["float32", "bfloat16"],
default="float32",
help="choose the weight dtype and whether to enable auto mixed precision or not",
)
parser.add_argument(
"--max-new-tokens", default=32, type=int, help="output max new tokens"
)
parser.add_argument(
"--prompt", default="What are we having for dinner?", type=str, help="input prompt"
)
parser.add_argument("--greedy", action="store_true")
parser.add_argument("--batch-size", default=1, type=int, help="batch size")
args = parser.parse_args()
print(args)
# dtype
amp_enabled = True if args.dtype != "float32" else False
amp_dtype = getattr(torch, args.dtype)
# load model
model_id = "gpt2" # 或者其他模型名称
config = AutoConfig.from_pretrained(
model_id, torchscript=True, trust_remote_code=True
)
model = AutoModelForCausalLM.from_pretrained(
model_id,
torch_dtype=amp_dtype,
config=config,
low_cpu_mem_usage=True,
trust_remote_code=True,
)
tokenizer = AutoTokenizer.from_pretrained(
model_id,
trust_remote_code=True
)
model = model.eval()
model = model.to(memory_format=torch.channels_last)
# Intel(R) Extension for PyTorch*
#################### code changes #################### # noqa F401
model = ipex.llm.optimize(
model,
dtype=amp_dtype,
inplace=True,
deployment_mode=True,
)
###################################################### # noqa F401
# generate args
num_beams = 1 if args.greedy else 4
generate_kwargs = dict(do_sample=False, temperature=0.9, num_beams=num_beams)
# input prompt
prompt = args.prompt
input_size = tokenizer(prompt, return_tensors="pt").input_ids.size(dim=1)
print("---- Prompt size:", input_size)
prompt = [prompt] * args.batch_size
# inference
with torch.inference_mode(), torch.cpu.amp.autocast(enabled=amp_enabled):
input_ids = tokenizer(prompt, return_tensors="pt").input_ids
gen_ids = model.generate(
input_ids,
max_new_tokens=args.max_new_tokens,
**generate_kwargs
)
gen_text = tokenizer.batch_decode(gen_ids, skip_special_tokens=True)
input_tokens_lengths = [x.shape[0] for x in input_ids]
output_tokens_lengths = [x.shape[0] for x in gen_ids]
total_new_tokens = [
o - i for i, o in zip(input_tokens_lengths, output_tokens_lengths)
]
print(gen_text, total_new_tokens, flush=True)
结果
['What are we having for dinner? What are we having for dinner? What are we having for dinner? What are we having for dinner? What are we having for dinner? What are we having'] [32]
使用 Intel Extension for PyTorch (IPEX) 来进行动态量化,对一个 BERT 模型进行动态量化处理,并将其转化为一个 TorchScript 模型保存下来。整个流程包括模型准备、量化、JIT 编译并使用 torch.jit.trace 和 torch.jit.freeze 来保存一个量化后的模型。
- 加载了预训练的 BertModel,这是一个来自 transformers 库的 BERT 模型,并将其设为推理模式(eval()),适合后续量化和推理。
- 生成了随机的输入数据,模拟了 BERT 模型的输入。vocab_size 是词汇表的大小,batch_size 是输入批次的大小,seq_length 是输入序列的长度。
- 使用 IPEX 提供的默认动态量化配置映射(qconfig_mapping)。动态量化是指只对模型的部分权重进行量化,而不对所有的权重和激活值进行静态量化。
- 通过 prepare 函数,模型被准备好进行动态量化。此过程将原始模型包装,添加量化所需的 observer 和 hooks。convert 函数将准备好的模型转换为量化的模型。经过转换后,模型内部的部分计算会使用更高效的量化运算。
- 使用 torch.jit.trace 对量化后的模型进行追踪,生成一个 TorchScript 模型。torch.jit.freeze 用来优化和冻结模型,确保未使用的部分不会在模型推理中引入不必要的开销。
- 将冻结的量化模型保存为 .pt 文件,可以用于后续的加载和推理。
import torch
#################### code changes #################### # noqa F401
import intel_extension_for_pytorch as ipex
from intel_extension_for_pytorch.quantization import prepare, convert
###################################################### # noqa F401
##### Example Model ##### # noqa F401
from transformers import BertModel
model = BertModel.from_pretrained("bert-base-uncased")
model.eval()
vocab_size = model.config.vocab_size
batch_size = 128
seq_length = 512
data = torch.randint(vocab_size, size=[batch_size, seq_length])
######################### # noqa F401
qconfig_mapping = ipex.quantization.default_dynamic_qconfig_mapping
# Alternatively, define your own qconfig:
# from torch.ao.quantization import PerChannelMinMaxObserver, PlaceholderObserver, QConfig, QConfigMapping
# qconfig = QConfig(
# activation = PlaceholderObserver.with_args(dtype=torch.float, is_dynamic=True),
# weight = PerChannelMinMaxObserver.with_args(dtype=torch.qint8, qscheme=torch.per_channel_symmetric))
# qconfig_mapping = QConfigMapping().set_global(qconfig)
prepared_model = prepare(model, qconfig_mapping, example_inputs=data)
converted_model = convert(prepared_model)
with torch.no_grad():
traced_model = torch.jit.trace(
converted_model, (data,), check_trace=False, strict=False
)
traced_model = torch.jit.freeze(traced_model)
traced_model.save("/home/dev/dynamic_quantized_model.pt")
print("Saved model to: dynamic_quantized_model.pt")
结果
Saved model to: dynamic_quantized_model.pt