PyTorch 实现 Opencv Canny 算法
使用PyTorch 实现Canny算法,相比于纯python实现,速度会更快,因为PyTorch api 使用很多c语言的底层模块,比如卷积。另外使用Pytorch可以很方便得利用cuda的加速,所以也在GPU并行运算上也有优势。
不过相比OpenCV c++的高效,PyTorch的实现显得更冗余,效率一般。但是PyTorch 使用张量、卷积来实现图像处理,保证一定效率的同时代码更简洁,另外比较重要的是,全过程使用python语言所以程序比较方进行调试,展示算法的中间处理过程。
程序运行
完整程序:https://github.com/qyzhizi/canny-algorithm.git
运行命令:
python pytorch-canny.py -v -i data/lena.png -gk 0 -L 100 -H 200
命令行参数:
-v
: 展示中间过程处理的图片
-i
: 图片路径
-gk
: 高斯核大小,默认是3, 传入0则表示不使用高斯模糊处理
-L
: 低阈值
-H
: 高阈值
结果:
Canny 算法执行步骤
算法主要分为一下步骤:
- 高斯滤波
- 使用sobel 算子计算梯度(水平梯度, 垂直梯度)
- 计算梯度的幅值与方向
- 梯度的极大值抑制
- 滞后阈值
main 程序片段
def main():
ap = argparse.ArgumentParser()
ap.add_argument("-i", "--image", required=True, default="lena.png", help="Path to the image")
ap.add_argument("-v", "--verbose", action='store_true', default=False, help="Path to the image")
ap.add_argument("-gk", "--gaussian_kernel_size", type=int, default=3, help="Path to the image")
ap.add_argument("-L", "--threshold_low", type=int, default=20, help="Path to the image")
ap.add_argument("-H", "--threshold_high", type=int, default=50, help="Path to the image")
args = vars(ap.parse_args())
verbose = args["verbose"]
threshold_low = args["threshold_low"]
threshold_high = args["threshold_high"]
gaussian_kernel_size = args["gaussian_kernel_size"]
print("args: " , args)
# 把工作目录切换到当前文件夹所在目录
os.chdir(os.path.dirname(os.path.abspath(__file__)))
# 打印当前工作目录
if verbose:
print(os.getcwd())
# 读入一张图片,并转换为灰度图
im = Image.open(args["image"]).convert('L')
# 将图片数据转换为矩阵
im = np.array(im, dtype='float32')
# 将图片矩阵转换为pytorch tensor,并适配卷积输入的要求
im = torch.from_numpy(im.reshape((1, 1, im.shape[0], im.shape[1])))
if verbose:
print("im.shape: ", im.shape)
# 对图像进行高斯滤波,平滑图像,去除噪声
im = gaussian_blur(im, gaussian_kernel_size, verbose=verbose)
# sobel 算子进行边缘检测
gradient_v = functional_conv2d_vertical(im, verbose=verbose) # 竖直梯度
gradient_h = functional_conv2d_horizontal(im, verbose=verbose) # 水平梯度
# 计算梯度幅值
gradient_magnitude = get_gradient_magnitude(gradient_v, gradient_h, verbose=verbose)
# 计算梯度方向
gradient_direction_quantized = get_gradient_direction_quantized(gradient_v, gradient_h, verbose=verbose)
# 非极大值抑制
nms_tensor = nms(gradient_magnitude, gradient_direction_quantized, verbose=verbose)
# 阈值处理
# edge_detect = threshold(nms_tensor, low=threshold_low, high=threshold_high, weak=50, strong=255)
# 滞后阈值
edge_detect = hysteresis_threshold(nms_tensor, low=threshold_low, high=threshold_high)
if verbose:
print("edge_detect.shape: ", edge_detect.shape, "im.shape: ", im.shape)
plt.imshow(edge_detect.squeeze().detach().numpy(), cmap='gray')
plt.title("Edge Detection")
plt.show()
# 保存图片
save_edge_detect_as_image(edge_detect, "data/edge_detect.png", verbose=verbose)
save_edge_detect_and_origin_image_as_image(edge_detect, args["image"], "data/edge_detect_and_origin_image.png", verbose=verbose)
if __name__ == "__main__":
start_time = time.time()
print("programming strat")
main()
end_time = time.time()
print("time: ", end_time - start_time)
print("programming end")
高斯滤波
def dnorm(x: int, mu:int , sd:int)-> np.float32:
return 1 / (np.sqrt(2 * np.pi) * sd) * np.e ** (-np.power((x - mu) / sd, 2) / 2)
def gaussian_kernel(size: int, sigma: int=1, verbose: bool=False) -> np.ndarray:
kernel_1D = np.linspace(-(size // 2), size // 2, size)
for i in range(size):
kernel_1D[i] = dnorm(kernel_1D[i], 0, sigma)
if verbose:
print("kernel_1D: ", kernel_1D)
kernel_2D = np.outer(kernel_1D, kernel_1D)
if verbose:
print("kernel_2D: ", kernel_2D)
kernel_2D *= 1.0 / (kernel_2D.max() * size * size)# 归一化, 使得kernel_2D的最大值为1
if verbose:
print(kernel_2D.max())
print("kernel_2D: ", kernel_2D)
if verbose:
plt.imshow(kernel_2D, interpolation='none', cmap='gray')
plt.title("Kernel ( {}X{} )".format(size, size))
plt.show()
return kernel_2D
# 对图像进行高斯滤波,平滑图像,去除噪声
def gaussian_blur(image: torch.tensor, kernel_size: int, verbose=False) -> torch.tensor:
if kernel_size % 2 == 1 and kernel_size > 1 and kernel_size < 20:
kernel = gaussian_kernel(kernel_size, sigma=math.sqrt(kernel_size), verbose=verbose)
kernel = kernel.reshape((1, 1, kernel_size, kernel_size))
weight = torch.from_numpy(kernel).to(torch.float32)
# image padding kernel_size // 2, 使用原始图像的边缘像素进行填充
image = F.pad(image, (kernel_size // 2, kernel_size // 2, kernel_size // 2, kernel_size // 2), mode='reflect')
blur = F.conv2d(image, weight, padding=0)
if verbose:
plt.imshow(blur.squeeze().numpy(), cmap='gray')
plt.title("blur")
plt.show()
return blur
else:
return image
函数gaussian_blur
:
def gaussian_blur(image: torch.tensor, kernel_size: int, verbose=False) -> torch.tensor:
gaussian_blur
实现了高斯滤波,首先调 gaussian_kernel
获取高斯核,然后转换为tensor
类型,作为接下卷积的权重。
使用F.conv2d
进行卷积,卷积核为高斯核,卷积结果就是高斯模糊化后的图片。
sobel 算子计算梯度
# sobel 算子进行边缘检测
gradient_v = functional_conv2d_vertical(im, verbose=verbose) # 竖直梯度
gradient_h = functional_conv2d_horizontal(im, verbose=verbose) # 水平梯度
同样使用了卷积来计算梯度:F.conv2d(im, weight, stride=1, padding=0)
, 注意padding:im = F.pad(im, (1, 1, 1, 1), mode='replicate')
, 这里使用了复制模式。使用补零或翻转的模式都不如复制模式好。opencv 的实现也是复制模式。
关于sobel 算子的原理,可以参考另外一篇文章: # Pytorch卷积实现边缘检测
functional_conv2d_vertical
函数 与 functional_conv2d_horizontal
函数的代码:
def functional_conv2d_horizontal(im, verbose=False):
"""使用F.Conv2d进行边缘检测, 检测竖直方向的轮廓, 水平梯度,右方向为正方向
Args:
im (tensor): 输入的tensor图像
Returns:
tensor: 输出的tensor图像
"""
sobel_kernel = np.array([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]], dtype='float32')
sobel_kernel = sobel_kernel.reshape((1, 1, 3, 3))
weight = torch.from_numpy(sobel_kernel)
im = F.pad(im, (1, 1, 1, 1), mode='replicate')
edge_detect = F.conv2d(im, weight, stride=1, padding=0)
if verbose:
plt.imshow(edge_detect.squeeze().detach().numpy(), cmap='gray')
plt.title("horizontal")
plt.show()
return edge_detect
def functional_conv2d_vertical(im, verbose=False):
"""使用F.Conv2d进行边缘检测, 检测水平方向的轮廓, 垂直梯度,向上为正方向
Args:
im (tensor): 输入的tensor图像
Returns:
tensor: 输出的tensor图像
"""
sobel_kernel = np.array([[1, 2, 1], [0, 0, 0], [-1, -2, -1]], dtype='float32')
sobel_kernel = sobel_kernel.reshape((1, 1, 3, 3))
weight = torch.from_numpy(sobel_kernel)
im = F.pad(im, (1, 1, 1, 1), mode='replicate')
edge_detect = F.conv2d(im, weight, stride=1, padding=0)
if verbose:
plt.imshow(edge_detect.squeeze().detach().numpy() , cmap='gray')
plt.title("vertical")
plt.show()
return edge_detect
计算梯度的幅值与方向
梯度幅值与方向将用于极大值抑制。
函数调用:
# 计算梯度幅值
gradient_magnitude = get_gradient_magnitude(gradient_v, gradient_h, verbose=verbose)
# 计算梯度方向
gradient_direction_quantized = get_gradient_direction_quantized(gradient_v, gradient_h, verbose=verbose)
计算幅值
默认使用L1
范数, 在opencv 的实现中,如果sobel卷积核比较大,比如16x16
会使用L2
范数。
参数:gradient_v
代表垂直方向的梯度,有正负值,向上为正方向。
gradient_h
代表水平方向的梯度,有正负值,右方向为正方向。
def get_gradient_magnitude(gradient_v: torch.tensor, gradient_h: torch.tensor, verbose=False, use_l2_norm=False) -> torch.tensor:
"""计算梯度幅值
"""
# 计算梯度的幅值
if use_l2_norm:
# use l2 norm
gradient_magnitude = torch.sqrt(gradient_v**2 + gradient_h**2)
else:
# use l1 norm
gradient_magnitude = torch.abs(gradient_v) + torch.abs(gradient_h)
# 将梯度幅值限制在0-255之间
# gradient_magnitude = gradient_magnitude * 255.0 / gradient_magnitude.max()
if verbose:
plt.imshow(gradient_magnitude.squeeze().numpy(), cmap='gray')
plt.title("Gradient Magnitude")
plt.show()
return gradient_magnitude
计算梯度方向
这里判断梯度方向时,会将梯度方向判定为[0, 45, 90, 135]
之一,用如下图表示:
这里的划分标准是:
位于[(360-22.5), 360] [0,22.5] [(180-22.5), (180+22.5)]
区间判方向为0
度
下图所示:
位于[(90-22.5), (90+22.5)] [(270-22.5), (270+22.5)]
区间判方向为90
度.
如下图所示:
同理,位于[135-22.5, 135+22.5] or [315-22.5, 315+22.5]
区间判方向为135
度. 位于[45-22.5, 45+22.5] or [225-22.5, 225+22.5]
判方向为45
度
如下图所示:
实际计算过程中,[0, 45, 90, 135]
4个方向分别用[0, 1, 2, 3]
表示。
例如:
# 将角度值转换为四个方向之一:0度,45度,90度,135度
gradient_direction_quantized = (torch.round(gradient_direction_deg / 45) % 4).int()
gradient_direction_quantized
是一个包含[0, 1, 2, 3]
的tensor.
torch.round(gradient_direction_deg / 45)
这里会进行四舍五入,比如:torch.round((135+22.5) / 45) == torch.round(3.5) == 4
计算梯度方向函数:
def get_gradient_direction_quantized(gradient_v: torch.tensor, gradient_h: torch.tensor, verbose=False) -> torch.tensor:
"""计算梯度方向
将梯度方向转换为四个方向之一: 0度,45度,90度,135度, 分别用 0,1,2,3 表示。
"""
# 计算梯度方向, 以弧度为单位
gradient_direction = torch.atan2(gradient_v, gradient_h)
# 将梯度方向转换为角度值, 并将角度值转换为0-360度之间
gradient_direction_deg = torch.rad2deg(gradient_direction) + 180
# 将角度值转换为四个方向之一:0度,45度,90度,135度
gradient_direction_quantized = (torch.round(gradient_direction_deg / 45) % 4).int()
if verbose:
print("gradient_direction_quantized: ", gradient_direction_quantized)
return gradient_direction_quantized
梯度的极大值抑制
极大值抑制算法:
进行极大值抑制时会对周边先padding一圈0,然后对某个像素进行极大值抑制时,然后判断当前像素的方向,方向分别用0, 1, 2, 3
表示,代表着0度、45度、90度,135度
。假设当前像素是135度
的方向,所以会比较135度
方向上另外2个像素,如果当前像素的坐标是(0,0)
, 那么另两个像素的坐标就是(-1,1)(1,-1)
,注意即使另两个像素不是135度,依然选这两个坐标,另两个像素是不是135度不重要,至于为什么,我也不知道如何解释。这种情况下,遍历每个像素就得到了极大值抑制的结果。
可以使用pytorch 的卷积来实现, 先定义一个卷积:
nms_conv_op = torch.nn.Conv2d(in_channels=1,out_channels=3, kernel_size=3, stride=1, padding=1, bias=False)
它的输出通道为3, 这意味着卷积的通道为3 例如:
quantized_0_list = [[[0, 0, 0],[1, 0, 0],[0, 0, 0]],
[[0, 0, 0],[0, 1, 0],[0, 0, 0]],
[[0, 0, 0],[0, 0, 1],[0, 0, 0]]]
这是选取0
度方向的卷积核,作用是选取该方向上的3个元素,每个卷积核选中某个方向上的一个元素。
第一个通道:[[0, 0, 0],[1, 0, 0],[0, 0, 0]]
将会选择当前元素左边的元素。第二个通道选择当前元素,第3个通道选择当前元素右边的元素。
45度、90度,135度
的情况也是类似,这里就不多赘述了。
由于有4个方向,因此会进行4次卷积.
假设输入gradient_magnitude
的shape 是(1,1,n,m)
, 那么输出的nms_edge
的shape 是 (1,3,n,m)
, 这里3 表示3通道,保存着对应位置某个方向上的3个元素。
mask_i = (gradient_direction_quantized == i).int()
: 获取某个方向上的mask
.
nms_direction_conv_mask_i = nms_direction_conv_i * mask_i
: 将某次卷积的结果与mask_i
相乘,只选择掩码所标识位置的值。shape 是 (1,3,n,m)
,
nms_direction_conv_mask_max_i, _ = torch.max(nms_direction_conv_mask_i, dim=1)
: 最后选择nms_direction_conv_mask_i
维度1
上(3通道)最大的值, nms_direction_conv_mask_max_i
的形状变为(1,1,n,m)
nms_edge = nms_edge + ((gradient_magnitude == nms_direction_conv_mask_max_i) * nms_direction_conv_mask_max_i)
nms_direction_conv_mask_max_i
中每个元素,表示该元素在i方向上最大的值, 但是这个最大值不一定是当前的元素。
(gradient_magnitude == nms_direction_conv_mask_max_i)
: 当前元素如果是最大的,那么选中,否者放弃。
(gradient_magnitude == nms_direction_conv_mask_max_i)
返回的0, 1
的矩阵,所以还需乘nms_direction_conv_mask_max_i
来选中对应的元素,最后加入到最终的输出nms_edge
。
完整程序:
def nms(gradient_magnitude: torch.tensor,
gradient_direction_quantized: torch.tensor,
verbose=False) -> torch.tensor:
"""非极大值抑制"""
nms_edge = torch.zeros_like(gradient_magnitude)
nms_conv_op = torch.nn.Conv2d(in_channels=1,out_channels=3,
kernel_size=3, stride=1,
padding=1, bias=False)
quantized_0_list = [[[0, 0, 0],[1, 0, 0],[0, 0, 0]],
[[0, 0, 0],[0, 1, 0],[0, 0, 0]],
[[0, 0, 0],[0, 0, 1],[0, 0, 0]]]
quantized_1_list = [[[0, 1, 0],[0, 0, 0],[0, 0, 0]],
[[0, 0, 0],[0, 1, 0],[0, 0, 0]],
[[0, 0, 0],[0, 0, 0],[0, 1, 0]]]
quantized_2_list = [[[0, 1, 0],[0, 0, 0],[0, 0, 0]],
[[0, 0, 0],[0, 1, 0],[0, 0, 0]],
[[0, 0, 0],[0, 0, 0],[0, 1, 0]]]
quantized_3_list = [[[1, 0, 0],[0, 0, 0],[0, 0, 0]],
[[0, 0, 0],[0, 1, 0],[0, 0, 0]],
[[0, 0, 0],[0, 0, 0],[0, 0, 1]]]
kernel_0 = torch.tensor(quantized_0_list, dtype=torch.float32)
kernel_1 = torch.tensor(quantized_1_list, dtype=torch.float32)
kernel_2 = torch.tensor(quantized_2_list, dtype=torch.float32)
kernel_3 = torch.tensor(quantized_3_list, dtype=torch.float32)
kernel_direction_quantized = [kernel_0,
kernel_1,
kernel_2,
kernel_3]
for i in range(4):
mask_i = (gradient_direction_quantized == i).int()
if verbose:
print("mask_i: ", mask_i)
nms_conv_op.weight.data = kernel_direction_quantized[i].reshape((3, 1, 3, 3))
nms_direction_conv_i = nms_conv_op(gradient_magnitude)
nms_direction_conv_mask_i = nms_direction_conv_i * mask_i
nms_direction_conv_mask_max_i, _ = torch.max(nms_direction_conv_mask_i, dim=1)
if verbose:
print("nms_direction_conv_mask_max_i: ", nms_direction_conv_mask_max_i)
print("nms_direction_i: ", nms_direction_conv_mask_max_i)
nms_edge = nms_edge + ((gradient_magnitude == nms_direction_conv_mask_max_i) *
nms_direction_conv_mask_max_i)
if verbose:
print("nms_edge: ", nms_edge)
# verbose = True
if verbose:
plt.imshow(nms_edge.squeeze().detach().numpy(), cmap='gray')
plt.title("Non-Maximum Suppression")
plt.show()
return nms_edge
滞后阈值
滞后阈值的目的是为了连接间断点,具有更好的鲁棒性。 例如:
# 普通双边阈值处理
edge_detect = threshold(nms_edge, low=threshold_low, high=threshold_high, weak=50, strong=255)
# 滞后阈值
edge_detect = hysteresis_threshold(nms_edge, low=threshold_low, high=threshold_high)
同样的命令:
python pytorch-canny.py -v -i data/lena.png -gk 0 -L 100 -H 200
使用滞后阈值的结果,普通双边阈值处理的结果分别是:
算法步骤:
hysteresis_threshold
输入:nms_edge
是一个极大值值抑制后的tensorlow
低阈值high
高阈值
- 分类:
- 不考虑低于
low
的元素 - 高于
high
的元素 记为强边缘,strong_mask
记录位置 - 大于
low
小于high
的元素记为弱边缘,weak_mask
记录位置
- 不考虑低于
- 将强边缘放入栈中,然后开始循环栈中的元素,先弹出栈中一个元素(强边缘),然后判断周围8个位置中是否存在弱边缘,如果存在,将弱边缘元素入栈,然后将当前元素输出到
output_index
中,表示该强边缘元素处理结束。另外由于周围8个位置中的弱边缘已经入栈,为了防止重复入栈,将weak_mask
对应位置设置为0。直到栈为空。 - 最后根据
output_index
中的位置,返回最后的边缘检测图。
程序:
def hysteresis_threshold(nms_edge: torch.tensor, low: int, high: int) -> torch.tensor:
nms_edge = nms_edge.squeeze()
strong_mask = nms_edge >= high
weak_mask = (nms_edge > low) & (nms_edge < high).int()
output_index = []
# get strong_mask index
strong_index = torch.nonzero(strong_mask).tolist()
# create stack and push strong_index
stack = deque(strong_index)
while len(stack) > 0:
# pop item
item = stack.pop()
# get item index
i, j = item[0], item[1]
# get weak_mask index
local_index = torch.nonzero(weak_mask[i-1:i+2, j-1:j+2]).tolist()
for local_index_item in local_index:
# stack.append(weak_index[i])
weak_index_item = [i-1+local_index_item[0], j-1+local_index_item[1]]
stack.append(weak_index_item)
weak_mask[weak_index_item[0], weak_index_item[1]] = 0
output_index.append(item)
# create output tensor use output_index
output_index_tensor = torch.tensor(output_index) # (N, 2)
output = torch.zeros_like(nms_edge)
output[output_index_tensor[:, 0], output_index_tensor[:, 1]] = 255
return output
与opencv 算法对比
opencv canny算法:
import cv2
import numpy as np
from matplotlib import pyplot as plt
import time
if __name__ == '__main__':
img = cv2.imread('data/lena.png',0)
start_time = time.time()
edges = cv2.Canny(img,100,200)
print("time: ", time.time() - start_time)
plt.subplot(121),plt.imshow(img,cmap = 'gray')
plt.title('Original Image'), plt.xticks([]), plt.yticks([])
plt.subplot(122),plt.imshow(edges,cmap = 'gray')
plt.title('Edge Image'), plt.xticks([]), plt.yticks([])
plt.show()
print("end")
# 保存图片
cv2.imwrite("data/opencv_edge_detect.png", edges)
同样的输入图片: opencv 输出与本文pytorch 实现的输出一样: 都使用cpu 的情况下,opencv 的处理速度非常快:是本文pytorch 实现的30倍。 本文pytorch 实现需要的时间:time: 0.0885 而opencv 实现需要的时间:time: 0.0030 非常amazing, 竟然慢这么多,笔者还运行过别人 纯python的实现 ,结论是非常慢。
sobel算子的 padding 对极大值抑制的影响
Canny算法中sobel算子padding的方式对极大值抑制的影响
参考
https://github.com/opencv/opencv/blob/2.2/modules/imgproc/src/canny.cpp https://github.com/adeveloperdiary/blog/tree/master/Computer_Vision/Canny_Edge_Detection