안녕하세요. 이번 포스팅은 3D Detection 코드를 리뷰하도록 하겠습니다.
코드 베이스는 아래의 OpenPCDet를 사용하였습니다.
이론적인 설명은 아래 포스팅 참고하세요
[paper review] PV-RCNN, PV-RCNN ++ 논문 리뷰
안녕하세요. 후니대디입니다. PV-RCNN, PV-RCNN++를 이번 포스팅에서 다루겠습니다. PV-RCNN PV-RCNN은 PointVoxel-RCNN은 줄임말로 3D voxel과 point-based을 모두 사용하는 프레임워크를 제시합니다. 대부분의 기
jaehoon-daddy.tistory.com
요약하면 기존의 voxel-based의 detection은 ROI-pooling단계 or Head단계에서 sparse한 특징때문에 bbox의 위치를 정확하고 미세하게 regression하기 힘든 문제를 point-base방법을 이용하여 voxel set Abstraction이라는 모듈을 만들어 fine-tune을 시도한 모델입니다.그리고 fine-tune전까지의 아키텍처가 SECOND모델이라고 생각하시면 됩니다.
![](https://blog.kakaocdn.net/dn/cd69WZ/btszWE8Aqhz/KTNC5PNKs9kXp21cbGBs80/img.png)
VFE(1st stage)
위의 모델 아키텍쳐에서 위에 부분인 voxel-based cnn부분을 보겠습니다. 처음단계로 raw points들을 VFE(Voxel Feature Encoding : VoxelNet 논문에서 언급됨) 단계를 거쳐 feature encoding을 진행하게 됩니다. VoxelNet에서는 이 부분에도 layer를 거쳐(즉, learning방법으로) feature encoding을 하였습니다.
![](https://blog.kakaocdn.net/dn/vnjvm/btszUdjBWSj/8GiH7pUjOKnKbOyoqFXWHk/img.png)
본격적인 3D Conv를 태우기 위한 전단계라고 보시면 됩니다.
OpenPCDet에는 아래의 VEF모듈 종류가 있고 대표적인 MeanVFE를 살펴보겠습니다. (개인적으로 VFE모듈을 바꿔서 ablation study를 해본결과 큰 차이는 없었습니다)
class MeanVFE(VFETemplate):
def __init__(self, model_cfg, num_point_features, **kwargs):
super().__init__(model_cfg=model_cfg)
self.num_point_features = num_point_features
def get_output_feature_dim(self):
return self.num_point_features
def forward(self, batch_dict, **kwargs):
"""
Args:
batch_dict:
voxels: (num_voxels, max_points_per_voxel, C)
voxel_num_points: optional (num_voxels)
**kwargs:
Returns:
vfe_features: (num_voxels, C)
"""
voxel_features, voxel_num_points = batch_dict['voxels'], batch_dict['voxel_num_points']
points_mean = voxel_features[:, :, :].sum(dim=1, keepdim=False)
normalizer = torch.clamp_min(voxel_num_points.view(-1, 1), min=1.0).type_as(voxel_features)
points_mean = points_mean / normalizer
batch_dict['voxel_features'] = points_mean.contiguous()
return batch_dict
위의 코드는 sparse conv 라이브러리를 통해 voxelization을 하고 voxel안에 있는 points들의 feature(여기서의 feature는 raw points의 x,y,z,intensity..)을 평균화합니다.
voxel-based backbone(1st stage)
이제 3D Convolution을 태울차례인데 voxel의 대부분은 empty이다보니(lidar의 sparse한 특정) 대부분의 3D Detection(autonomous field)에서는 이런 sparse한 특성 혹은 disadvantage를 어떻게 해결하느냐로 발전해왔습니다. 그 중 하나가 sparse convolution인데 즉, sparse한 voxel을 보다 빠르게 처리하기 위한 방법입니다. 유명한 library로를 second에서 나온 spconv와 spvnas(segmentation-mit)에서 나온 torchsparse가 있습니다. pcdet에서는 spconv를 사용하였습니다.(아마 더 오래되고 pcdet도 china에서 개발한 framework라서 그런거같습니다?)
![](https://blog.kakaocdn.net/dn/ccQDtZ/btszSJwjdpt/bkCguojIM5U9kLwUG4RG60/img.png)
대략적인 원리는 empty한 부분은 0패딩을 하지 않고 아예연산을 하지 않는 것인데, non-empty voxel부분만 hashmap에 저장하고 이들만 연산합니다.
![](https://blog.kakaocdn.net/dn/WBDr0/btszXF0RmAQ/KYMAudL0TxKVI782OoB23k/img.png)
아래는 sparse conv를 이용하여 voxelbase의 백본을 만드는 코드입니다. 첫번째 단계인 VFE를 거쳐서 enconding된 크기를 B,H,W,Z라고 하면 최종 128채널의 B,H*,W*,Z*가 출력됩니다.
class VoxelBackBone8x(nn.Module):
def __init__(self, model_cfg, input_channels, grid_size, **kwargs):
super().__init__()
self.model_cfg = model_cfg
norm_fn = partial(nn.BatchNorm1d, eps=1e-3, momentum=0.01)
self.sparse_shape = grid_size[::-1] + [1, 0, 0]
self.conv_input = spconv.SparseSequential(
spconv.SubMConv3d(input_channels, 16, 3, padding=1, bias=False, indice_key='subm1'),
norm_fn(16),
nn.ReLU(),
)
block = post_act_block
self.conv1 = spconv.SparseSequential(
block(16, 16, 3, norm_fn=norm_fn, padding=1, indice_key='subm1'),
)
self.conv2 = spconv.SparseSequential(
# [1600, 1408, 41] <- [800, 704, 21]
block(16, 32, 3, norm_fn=norm_fn, stride=2, padding=1, indice_key='spconv2', conv_type='spconv'),
block(32, 32, 3, norm_fn=norm_fn, padding=1, indice_key='subm2'),
block(32, 32, 3, norm_fn=norm_fn, padding=1, indice_key='subm2'),
)
self.conv3 = spconv.SparseSequential(
# [800, 704, 21] <- [400, 352, 11]
block(32, 64, 3, norm_fn=norm_fn, stride=2, padding=1, indice_key='spconv3', conv_type='spconv'),
block(64, 64, 3, norm_fn=norm_fn, padding=1, indice_key='subm3'),
block(64, 64, 3, norm_fn=norm_fn, padding=1, indice_key='subm3'),
)
self.conv4 = spconv.SparseSequential(
# [400, 352, 11] <- [200, 176, 5]
block(64, 64, 3, norm_fn=norm_fn, stride=2, padding=(0, 1, 1), indice_key='spconv4', conv_type='spconv'),
block(64, 64, 3, norm_fn=norm_fn, padding=1, indice_key='subm4'),
block(64, 64, 3, norm_fn=norm_fn, padding=1, indice_key='subm4'),
)
last_pad = 0
last_pad = self.model_cfg.get('last_pad', last_pad)
self.conv_out = spconv.SparseSequential(
# [200, 150, 5] -> [200, 150, 2]
spconv.SparseConv3d(64, 128, (3, 1, 1), stride=(2, 1, 1), padding=last_pad,
bias=False, indice_key='spconv_down2'),
norm_fn(128),
nn.ReLU(),
)
self.num_point_features = 128
self.backbone_channels = {
'x_conv1': 16,
'x_conv2': 32,
'x_conv3': 64,
'x_conv4': 64
}
def forward(self, batch_dict):
"""
Args:
batch_dict:
batch_size: int
vfe_features: (num_voxels, C)
voxel_coords: (num_voxels, 4), [batch_idx, z_idx, y_idx, x_idx]
Returns:
batch_dict:
encoded_spconv_tensor: sparse tensor
"""
voxel_features, voxel_coords = batch_dict['voxel_features'], batch_dict['voxel_coords']
batch_size = batch_dict['batch_size']
input_sp_tensor = spconv.SparseConvTensor(
features=voxel_features,
indices=voxel_coords.int(),
spatial_shape=self.sparse_shape,
batch_size=batch_size
)
x = self.conv_input(input_sp_tensor)
x_conv1 = self.conv1(x)
x_conv2 = self.conv2(x_conv1)
x_conv3 = self.conv3(x_conv2)
x_conv4 = self.conv4(x_conv3)
# for detection head
# [200, 176, 5] -> [200, 176, 2]
out = self.conv_out(x_conv4)
batch_dict.update({
'encoded_spconv_tensor': out,
'encoded_spconv_tensor_stride': 8
})
batch_dict.update({
'multi_scale_3d_features': {
'x_conv1': x_conv1,
'x_conv2': x_conv2,
'x_conv3': x_conv3,
'x_conv4': x_conv4,
}
})
batch_dict.update({
'multi_scale_3d_strides': {
'x_conv1': 1,
'x_conv2': 2,
'x_conv3': 4,
'x_conv4': 8,
}
})
return batch_dict
BEV(1st stage)
BEV(Bird Eye View)는 말그대로 z방향에 수직하여 본 view를 의미하게 즉 z는 고려하지 않는 view입니다. pcdet 프레임워크에서 BEV로 만드는 모듈은 아래와 같습니다.
일단 SECOND, PVRCNN에서 쓰는 일반적인 HeightCompression을 보겠습니다.
class HeightCompression(nn.Module):
def __init__(self, model_cfg, **kwargs):
super().__init__()
self.model_cfg = model_cfg
self.num_bev_features = self.model_cfg.NUM_BEV_FEATURES
def forward(self, batch_dict):
"""
Args:
batch_dict:
encoded_spconv_tensor: sparse tensor
Returns:
batch_dict:
spatial_features:
"""
encoded_spconv_tensor = batch_dict['encoded_spconv_tensor']
spatial_features = encoded_spconv_tensor.dense()
N, C, D, H, W = spatial_features.shape
spatial_features = spatial_features.view(N, C * D, H, W)
batch_dict['spatial_features'] = spatial_features
batch_dict['spatial_features_stride'] = batch_dict['encoded_spconv_tensor_stride']
return batch_dict
위의 코드를 보면 voxel base backbone에서 나온 N(batch size), C(channel), D(z), H(x), W(y)에서 channel과 D(z-height)를 단순하게 합쳐줍니다. 그렇게 되면 BEV이후의 shape은 N(batch size), CxD(channel개수 X z feature크기), H, W 가 됩니다.
이후에는 2D conv를 태웁니다. pcdet에서는 아래와 같은 종류가 있는데 BEVResBackbone은 residual block을 사용하였습니다.
이 부분은 매우 기본적인 내용이라서 생략하겠습니다.
Head(1st stage)
이 후의 과정은 2D detection과 유사합니다. 2D Detection과정에 대해서는 아래 포스팅 참고하세요
[Detection] Object Detection History 1탄
안녕하세요. 후니대디입니다. 이번 포스팅에서는 Object Detection의 발전과정 및 개요에 대해 키워드 중심으로 전체적인 맥락을 살펴보도록 하겠습니다. Definition 우선 정의 부터 살펴보자면, Classifi
jaehoon-daddy.tistory.com
pcdet에서 제공하는 head의 유형은 아래와 같습니다.
Anchor single타입이 대표적인 하나의 앵커가 Region을 proposal하는 것이고 AnchorHeadMulti는 yolo처럼 각각의 class마다 anchor를 가지고 있어서 해당 class가 맞는지 아닌지 0 or 1로 구별하는 타입입니다. CenterHead는 Centerpoint에서 제안된 tracking에도 사용될 수 있는 head이고 pointHead는 pointrcnn에 특화된, VoxelNeXtHead는 VoxelNeXt에 특화된, TransFusionHead는 BEVFusion에 특화된 head입니다.
여기서는 AnchorHeadSingle을 살펴보겠습니다.
import numpy as np
import torch.nn as nn
from .anchor_head_template import AnchorHeadTemplate
@staticmethod
def generate_anchors(anchor_generator_cfg, grid_size, point_cloud_range, anchor_ndim=7):
anchor_generator = AnchorGenerator(
anchor_range=point_cloud_range,
anchor_generator_config=anchor_generator_cfg
)
feature_map_size = [grid_size[:2] // config['feature_map_stride'] for config in anchor_generator_cfg]
anchors_list, num_anchors_per_location_list = anchor_generator.generate_anchors(feature_map_size)
if anchor_ndim != 7:
for idx, anchors in enumerate(anchors_list):
pad_zeros = anchors.new_zeros([*anchors.shape[0:-1], anchor_ndim - 7])
new_anchors = torch.cat((anchors, pad_zeros), dim=-1)
anchors_list[idx] = new_anchors
return anchors_list, num_anchors_per_location_list
class AnchorHeadSingle(AnchorHeadTemplate):
def __init__(self, model_cfg, input_channels, num_class, class_names, grid_size, point_cloud_range,
predict_boxes_when_training=True, **kwargs):
super().__init__(
model_cfg=model_cfg, num_class=num_class, class_names=class_names, grid_size=grid_size, point_cloud_range=point_cloud_range,
predict_boxes_when_training=predict_boxes_when_training
)
self.num_anchors_per_location = sum(self.num_anchors_per_location)
self.conv_cls = nn.Conv2d(
input_channels, self.num_anchors_per_location * self.num_class,
kernel_size=1
)
self.conv_box = nn.Conv2d(
input_channels, self.num_anchors_per_location * self.box_coder.code_size,
kernel_size=1
)
if self.model_cfg.get('USE_DIRECTION_CLASSIFIER', None) is not None:
self.conv_dir_cls = nn.Conv2d(
input_channels,
self.num_anchors_per_location * self.model_cfg.NUM_DIR_BINS,
kernel_size=1
)
else:
self.conv_dir_cls = None
self.init_weights()
def init_weights(self):
pi = 0.01
nn.init.constant_(self.conv_cls.bias, -np.log((1 - pi) / pi))
nn.init.normal_(self.conv_box.weight, mean=0, std=0.001)
def forward(self, data_dict):
spatial_features_2d = data_dict['spatial_features_2d']
cls_preds = self.conv_cls(spatial_features_2d)
box_preds = self.conv_box(spatial_features_2d)
cls_preds = cls_preds.permute(0, 2, 3, 1).contiguous() # [N, H, W, C]
box_preds = box_preds.permute(0, 2, 3, 1).contiguous() # [N, H, W, C]
self.forward_ret_dict['cls_preds'] = cls_preds
self.forward_ret_dict['box_preds'] = box_preds
if self.conv_dir_cls is not None:
dir_cls_preds = self.conv_dir_cls(spatial_features_2d)
dir_cls_preds = dir_cls_preds.permute(0, 2, 3, 1).contiguous()
self.forward_ret_dict['dir_cls_preds'] = dir_cls_preds
else:
dir_cls_preds = None
if self.training:
targets_dict = self.assign_targets(
gt_boxes=data_dict['gt_boxes']
)
self.forward_ret_dict.update(targets_dict)
if not self.training or self.predict_boxes_when_training:
batch_cls_preds, batch_box_preds = self.generate_predicted_boxes(
batch_size=data_dict['batch_size'],
cls_preds=cls_preds, box_preds=box_preds, dir_cls_preds=dir_cls_preds
)
data_dict['batch_cls_preds'] = batch_cls_preds
data_dict['batch_box_preds'] = batch_box_preds
data_dict['cls_preds_normalized'] = False
return data_dict
2D backbone을 통해 나온 feature map을 conv_cls, conv_box를 통과시켜서 각각 conv 한번 더 태웁니다. 이후 direction에 대한 classification을 위해 feature을 conv_dir_cls에 통과시킵니다. 크기는 num_anchors_per_location은 anchor가 생성되는 위치에서 생성된 anchor box의 개수이고 conv_cls는 anchor box개수에 class개수만큼 곱한 크기, conv_box는 anchor box개수에 anchor dim인데 보통 (x,y,z,dx,dy,dz,ry)인 7을 의미합니다. conv_dir_cls은 anchor box개수에 dir binary개수(2- bin base prediction으로 0~180인지 180~360인지 러프하게 구함)를 곱한 크기를 의미합니다(option).
SECOND의 경우 if문에서 training으로 들어가서 gt box를 통해 바로 backpropagation을 하게 되고 PVRCNN계열은 아래로 분기를 타게 되서 한번 더 finetune과정을 겪습니다. 제안된 anchor bbox중에서 gt와 가장 iou가 큰 녀석에 할당되어 아래 loss function을 거쳐 학습이 진행됩니다.
def build_losses(self, losses_cfg):
self.add_module(
'cls_loss_func',
loss_utils.SigmoidFocalClassificationLoss(alpha=0.25, gamma=2.0)
)
reg_loss_name = 'WeightedSmoothL1Loss' if losses_cfg.get('REG_LOSS_TYPE', None) is None \
else losses_cfg.REG_LOSS_TYPE
self.add_module(
'reg_loss_func',
getattr(loss_utils, reg_loss_name)(code_weights=losses_cfg.LOSS_WEIGHTS['code_weights'])
)
self.add_module(
'dir_loss_func',
loss_utils.WeightedCrossEntropyLoss()
)
그렇게 생성된 conv_cls, conv_box, conv_dir_cls은 위의 loss function을 통해 학습을 하게 됩니다.
conv_cls은 생성된 anchorbox마다의 class 확률, conv_box은 생성된 anchorbox마다 (x,y,z,dx,dy,dz,ry)를 포함하고 아래 loss식을 통해 regression합니다. 이를 weight_sum하여 loss function으로 가져갑니다.
![](https://blog.kakaocdn.net/dn/K5B9H/btszOOete6P/1WoUJJaKVnlCis5oVfkZE1/img.png)
![](https://blog.kakaocdn.net/dn/bvk7JM/btszWGF0ZK9/QWNmHMKYf0v9mcwx5s0G60/img.png)
VoxelSetAbstraction(2nd-Stage)
![](https://blog.kakaocdn.net/dn/bJtQnf/btszUxbRxYb/oVSktefxMhOYG8mQ0KD441/img.png)
이후부터는 refinement 부분입니다. 위의 그림은 pointnet++의 set abstraction과정입니다. FPS로 기준 point를 찾고 grouping한다음 mini-pointnet(mlp)를 거치는 이 한 cycle을 set abstraction이라고 합니다. 이를 활용한 것을 voxelsetabstraction이라고 합니다.
즉, raw points에서 뽑은 keypoint를 1st단계에서 feature map에 대응시켜서 대응된 voxel의 주변에 non-empty한 부분들하고 pointnet++를 태우는 것입니다. 즉, voxel을 pointnet++에 태운다고 이해하시면 됩니다.
def get_sampled_points(self, batch_dict):
"""
Args:
batch_dict:
Returns:
keypoints: (N1 + N2 + ..., 4), where 4 indicates [bs_idx, x, y, z]
"""
batch_size = batch_dict['batch_size']
if self.model_cfg.POINT_SOURCE == 'raw_points':
src_points = batch_dict['points'][:, 1:4]
batch_indices = batch_dict['points'][:, 0].long()
elif self.model_cfg.POINT_SOURCE == 'voxel_centers':
src_points = common_utils.get_voxel_centers(
batch_dict['voxel_coords'][:, 1:4],
downsample_times=1,
voxel_size=self.voxel_size,
point_cloud_range=self.point_cloud_range
)
batch_indices = batch_dict['voxel_coords'][:, 0].long()
else:
raise NotImplementedError
keypoints_list = []
for bs_idx in range(batch_size):
bs_mask = (batch_indices == bs_idx)
sampled_points = src_points[bs_mask].unsqueeze(dim=0) # (1, N, 3)
if self.model_cfg.SAMPLE_METHOD == 'FPS':
cur_pt_idxs = pointnet2_stack_utils.farthest_point_sample(
sampled_points[:, :, 0:3].contiguous(), self.model_cfg.NUM_KEYPOINTS
).long()
if sampled_points.shape[1] < self.model_cfg.NUM_KEYPOINTS:
times = int(self.model_cfg.NUM_KEYPOINTS / sampled_points.shape[1]) + 1
non_empty = cur_pt_idxs[0, :sampled_points.shape[1]]
cur_pt_idxs[0] = non_empty.repeat(times)[:self.model_cfg.NUM_KEYPOINTS]
keypoints = sampled_points[0][cur_pt_idxs[0]].unsqueeze(dim=0)
elif self.model_cfg.SAMPLE_METHOD == 'SPC':
cur_keypoints = self.sectorized_proposal_centric_sampling(
roi_boxes=batch_dict['rois'][bs_idx], points=sampled_points[0]
)
bs_idxs = cur_keypoints.new_ones(cur_keypoints.shape[0]) * bs_idx
keypoints = torch.cat((bs_idxs[:, None], cur_keypoints), dim=1)
else:
raise NotImplementedError
keypoints_list.append(keypoints)
keypoints = torch.cat(keypoints_list, dim=0) # (B, M, 3) or (N1 + N2 + ..., 4)
if len(keypoints.shape) == 3:
batch_idx = torch.arange(batch_size, device=keypoints.device).view(-1, 1).repeat(1, keypoints.shape[1]).view(-1, 1)
keypoints = torch.cat((batch_idx.float(), keypoints.view(-1, 3)), dim=1)
return keypoints
@staticmethod
def aggregate_keypoint_features_from_one_source(
batch_size, aggregate_func, xyz, xyz_features, xyz_bs_idxs, new_xyz, new_xyz_batch_cnt,
filter_neighbors_with_roi=False, radius_of_neighbor=None, num_max_points_of_part=200000, rois=None
):
"""
Args:
aggregate_func:
xyz: (N, 3)
xyz_features: (N, C)
xyz_bs_idxs: (N)
new_xyz: (M, 3)
new_xyz_batch_cnt: (batch_size), [N1, N2, ...]
filter_neighbors_with_roi: True/False
radius_of_neighbor: float
num_max_points_of_part: int
rois: (batch_size, num_rois, 7 + C)
Returns:
"""
xyz_batch_cnt = xyz.new_zeros(batch_size).int()
if filter_neighbors_with_roi:
point_features = torch.cat((xyz, xyz_features), dim=-1) if xyz_features is not None else xyz
point_features_list = []
for bs_idx in range(batch_size):
bs_mask = (xyz_bs_idxs == bs_idx)
_, valid_mask = sample_points_with_roi(
rois=rois[bs_idx], points=xyz[bs_mask],
sample_radius_with_roi=radius_of_neighbor, num_max_points_of_part=num_max_points_of_part,
)
point_features_list.append(point_features[bs_mask][valid_mask])
xyz_batch_cnt[bs_idx] = valid_mask.sum()
valid_point_features = torch.cat(point_features_list, dim=0)
xyz = valid_point_features[:, 0:3]
xyz_features = valid_point_features[:, 3:] if xyz_features is not None else None
else:
for bs_idx in range(batch_size):
xyz_batch_cnt[bs_idx] = (xyz_bs_idxs == bs_idx).sum()
pooled_points, pooled_features = aggregate_func(
xyz=xyz.contiguous(),
xyz_batch_cnt=xyz_batch_cnt,
new_xyz=new_xyz,
new_xyz_batch_cnt=new_xyz_batch_cnt,
features=xyz_features.contiguous(),
)
return pooled_features
def forward(self, batch_dict):
"""
Args:
batch_dict:
batch_size:
keypoints: (B, num_keypoints, 3)
multi_scale_3d_features: {
'x_conv4': ...
}
points: optional (N, 1 + 3 + C) [bs_idx, x, y, z, ...]
spatial_features: optional
spatial_features_stride: optional
Returns:
point_features: (N, C)
point_coords: (N, 4)
"""
keypoints = self.get_sampled_points(batch_dict)
point_features_list = []
if 'bev' in self.model_cfg.FEATURES_SOURCE:
point_bev_features = self.interpolate_from_bev_features(
keypoints, batch_dict['spatial_features'], batch_dict['batch_size'],
bev_stride=batch_dict['spatial_features_stride']
)
point_features_list.append(point_bev_features)
batch_size = batch_dict['batch_size']
new_xyz = keypoints[:, 1:4].contiguous()
new_xyz_batch_cnt = new_xyz.new_zeros(batch_size).int()
for k in range(batch_size):
new_xyz_batch_cnt[k] = (keypoints[:, 0] == k).sum()
if 'raw_points' in self.model_cfg.FEATURES_SOURCE:
raw_points = batch_dict['points']
pooled_features = self.aggregate_keypoint_features_from_one_source(
batch_size=batch_size, aggregate_func=self.SA_rawpoints,
xyz=raw_points[:, 1:4],
xyz_features=raw_points[:, 4:].contiguous() if raw_points.shape[1] > 4 else None,
xyz_bs_idxs=raw_points[:, 0],
new_xyz=new_xyz, new_xyz_batch_cnt=new_xyz_batch_cnt,
filter_neighbors_with_roi=self.model_cfg.SA_LAYER['raw_points'].get('FILTER_NEIGHBOR_WITH_ROI', False),
radius_of_neighbor=self.model_cfg.SA_LAYER['raw_points'].get('RADIUS_OF_NEIGHBOR_WITH_ROI', None),
rois=batch_dict.get('rois', None)
)
point_features_list.append(pooled_features)
for k, src_name in enumerate(self.SA_layer_names):
cur_coords = batch_dict['multi_scale_3d_features'][src_name].indices
cur_features = batch_dict['multi_scale_3d_features'][src_name].features.contiguous()
xyz = common_utils.get_voxel_centers(
cur_coords[:, 1:4], downsample_times=self.downsample_times_map[src_name],
voxel_size=self.voxel_size, point_cloud_range=self.point_cloud_range
)
pooled_features = self.aggregate_keypoint_features_from_one_source(
batch_size=batch_size, aggregate_func=self.SA_layers[k],
xyz=xyz.contiguous(), xyz_features=cur_features, xyz_bs_idxs=cur_coords[:, 0],
new_xyz=new_xyz, new_xyz_batch_cnt=new_xyz_batch_cnt,
filter_neighbors_with_roi=self.model_cfg.SA_LAYER[src_name].get('FILTER_NEIGHBOR_WITH_ROI', False),
radius_of_neighbor=self.model_cfg.SA_LAYER[src_name].get('RADIUS_OF_NEIGHBOR_WITH_ROI', None),
rois=batch_dict.get('rois', None)
)
point_features_list.append(pooled_features)
point_features = torch.cat(point_features_list, dim=-1)
batch_dict['point_features_before_fusion'] = point_features.view(-1, point_features.shape[-1])
point_features = self.vsa_point_feature_fusion(point_features.view(-1, point_features.shape[-1]))
batch_dict['point_features'] = point_features # (BxN, C)
batch_dict['point_coords'] = keypoints # (BxN, 4)
return batch_dict
우선 get_sampled_points함수를 통해 raw_points에서 keypoints를 뽑습니다. 뽑는 방법은 FPS방법으로 keypoint들이 가장 멀리 떨어지도록 뽑는 방법입니다. pointnet++에서 제안되었고 pcdet프레임워크에서도 pointnet++를 모듈을 그래도 사용합니다.
이제 voxel base단계에서 각각의 feature map에서 keypoints들이 매칭되는 voxel과 그 이웃의 non-empty한 복셀들과 grouping을 하고 mini-pointnet을 태웁니다. 그 과정이 self.aggregate_keypoint_features_from_one_source 함수에서 이뤄집니다.
어느 feature map을 이용할지는 config에서 설정할 수 있습니다.
이렇게 되면 raw_point에서 뽑은 keypoint에 voxel을 pointnet++에 태우고 나온 feature를 vectorize한 keypoints들이 리턴됩니다.
RVRCNN Head(ROI Head) - 2nd Stage
1stage에 뽑은 bbox prediction과 voxelset abstraction에서 뽑은 keypoints들을 이용해서 refine하는 과정입니다.
![](https://blog.kakaocdn.net/dn/cRUkdx/btszRRWsSjH/rDNRh4nvj8iOt2HbkQngRK/img.png)
Grid point는 1st stage의 bbox내부에 6x6x6의 uniform sampling point를 생성하고 각각의 grid point와 voxel set abstraction을 통해 feature를 갖고있는 keypoints들을 다시 pointnet++를 수행하고 나온 값들을 vectorize합니다.
self.cls_layers = self.make_fc_layers(
input_channels=pre_channel, output_channels=self.num_class, fc_list=self.model_cfg.CLS_FC
)
self.reg_layers = self.make_fc_layers(
input_channels=pre_channel,
output_channels=self.box_coder.code_size * self.num_class,
fc_list=self.model_cfg.REG_FC
)
self.roi_grid_pool_layer, num_c_out = pointnet2_stack_modules.build_local_aggregation_module(
input_channels=input_channels, config=self.model_cfg.ROI_GRID_POOL
)
def roi_grid_pool(self, batch_dict):
"""
Args:
batch_dict:
batch_size:
rois: (B, num_rois, 7 + C)
point_coords: (num_points, 4) [bs_idx, x, y, z]
point_features: (num_points, C)
point_cls_scores: (N1 + N2 + N3 + ..., 1)
point_part_offset: (N1 + N2 + N3 + ..., 3)
Returns:
"""
batch_size = batch_dict['batch_size']
rois = batch_dict['rois']
point_coords = batch_dict['point_coords']
point_features = batch_dict['point_features']
point_features = point_features * batch_dict['point_cls_scores'].view(-1, 1)
global_roi_grid_points, local_roi_grid_points = self.get_global_grid_points_of_roi(
rois, grid_size=self.model_cfg.ROI_GRID_POOL.GRID_SIZE
) # (BxN, 6x6x6, 3)
global_roi_grid_points = global_roi_grid_points.view(batch_size, -1, 3) # (B, Nx6x6x6, 3)
xyz = point_coords[:, 1:4]
xyz_batch_cnt = xyz.new_zeros(batch_size).int()
batch_idx = point_coords[:, 0]
for k in range(batch_size):
xyz_batch_cnt[k] = (batch_idx == k).sum()
new_xyz = global_roi_grid_points.view(-1, 3)
new_xyz_batch_cnt = xyz.new_zeros(batch_size).int().fill_(global_roi_grid_points.shape[1])
pooled_points, pooled_features = self.roi_grid_pool_layer(
xyz=xyz.contiguous(),
xyz_batch_cnt=xyz_batch_cnt,
new_xyz=new_xyz,
new_xyz_batch_cnt=new_xyz_batch_cnt,
features=point_features.contiguous(),
) # (M1 + M2 ..., C)
pooled_features = pooled_features.view(
-1, self.model_cfg.ROI_GRID_POOL.GRID_SIZE ** 3,
pooled_features.shape[-1]
) # (BxN, 6x6x6, C)
return pooled_features
def forward(self, batch_dict):
"""
:param input_data: input dict
:return:
"""
targets_dict = self.proposal_layer(
batch_dict, nms_config=self.model_cfg.NMS_CONFIG['TRAIN' if self.training else 'TEST']
)
if self.training:
targets_dict = batch_dict.get('roi_targets_dict', None)
if targets_dict is None:
targets_dict = self.assign_targets(batch_dict)
batch_dict['rois'] = targets_dict['rois']
batch_dict['roi_labels'] = targets_dict['roi_labels']
# RoI aware pooling
pooled_features = self.roi_grid_pool(batch_dict) # (BxN, 6x6x6, C)
grid_size = self.model_cfg.ROI_GRID_POOL.GRID_SIZE
batch_size_rcnn = pooled_features.shape[0]
pooled_features = pooled_features.permute(0, 2, 1).\
contiguous().view(batch_size_rcnn, -1, grid_size, grid_size, grid_size) # (BxN, C, 6, 6, 6)
shared_features = self.shared_fc_layer(pooled_features.view(batch_size_rcnn, -1, 1))
rcnn_cls = self.cls_layers(shared_features).transpose(1, 2).contiguous().squeeze(dim=1) # (B, 1 or 2)
rcnn_reg = self.reg_layers(shared_features).transpose(1, 2).contiguous().squeeze(dim=1) # (B, C)
if not self.training:
batch_cls_preds, batch_box_preds = self.generate_predicted_boxes(
batch_size=batch_dict['batch_size'], rois=batch_dict['rois'], cls_preds=rcnn_cls, box_preds=rcnn_reg
)
batch_dict['batch_cls_preds'] = batch_cls_preds
batch_dict['batch_box_preds'] = batch_box_preds
batch_dict['cls_preds_normalized'] = False
else:
targets_dict['rcnn_cls'] = rcnn_cls
targets_dict['rcnn_reg'] = rcnn_reg
self.forward_ret_dict = targets_dict
return batch_dict
roi_grid_pool function에서 실제적인 위에서 설명한 알고리즘이 진행됩니다. get_global_grid_points_of_roi는 grid point를 생성하는 부분이고 roi_grid_pool_layer를 통해서 grid point를 기준으로 keypoint(w voxel feature)들을 pointnet++태웁니다.
나온 vectorized 된 feature들을 이후 shared_fc_layer를 통해 fully connect화하고 각각 classification layer와 regression layer를 거칩니다. class_layers는 1D conv로 이뤄져 있고 class 개수 만큼 output이 나오고 각각의 confidence를 나타냅니다.
regression_layers는 마찬가지로 1D conv로 이뤄져 있고 x,y,z,dx,dy,dz,ry 7개의 값을 prediction합니다. 이렇게 anchor bbox없이 point들만으로 좀 더 풍부한 receptive field에서 head과정을 거칩니다.
Loss - 2nd Stage
![](https://blog.kakaocdn.net/dn/bVU6Uo/btszZkPI8ih/Y718CiRSQVLyx72cfMhTVk/img.png)
loss는 1st와 유사하며 코드는 아래와 같습니다.
def get_box_cls_layer_loss(self, forward_ret_dict):
loss_cfgs = self.model_cfg.LOSS_CONFIG
rcnn_cls = forward_ret_dict['rcnn_cls']
rcnn_cls_labels = forward_ret_dict['rcnn_cls_labels'].view(-1)
if loss_cfgs.CLS_LOSS == 'BinaryCrossEntropy':
rcnn_cls_flat = rcnn_cls.view(-1)
batch_loss_cls = F.binary_cross_entropy(torch.sigmoid(rcnn_cls_flat), rcnn_cls_labels.float(), reduction='none')
cls_valid_mask = (rcnn_cls_labels >= 0).float()
rcnn_loss_cls = (batch_loss_cls * cls_valid_mask).sum() / torch.clamp(cls_valid_mask.sum(), min=1.0)
elif loss_cfgs.CLS_LOSS == 'CrossEntropy':
batch_loss_cls = F.cross_entropy(rcnn_cls, rcnn_cls_labels, reduction='none', ignore_index=-1)
cls_valid_mask = (rcnn_cls_labels >= 0).float()
rcnn_loss_cls = (batch_loss_cls * cls_valid_mask).sum() / torch.clamp(cls_valid_mask.sum(), min=1.0)
else:
raise NotImplementedError
def get_box_reg_layer_loss(self, forward_ret_dict):
loss_cfgs = self.model_cfg.LOSS_CONFIG
code_size = self.box_coder.code_size
reg_valid_mask = forward_ret_dict['reg_valid_mask'].view(-1)
gt_boxes3d_ct = forward_ret_dict['gt_of_rois'][..., 0:code_size]
gt_of_rois_src = forward_ret_dict['gt_of_rois_src'][..., 0:code_size].view(-1, code_size)
rcnn_reg = forward_ret_dict['rcnn_reg'] # (rcnn_batch_size, C)
roi_boxes3d = forward_ret_dict['rois']
rcnn_batch_size = gt_boxes3d_ct.view(-1, code_size).shape[0]
fg_mask = (reg_valid_mask > 0)
fg_sum = fg_mask.long().sum().item()
tb_dict = {}
if loss_cfgs.REG_LOSS == 'smooth-l1':
rois_anchor = roi_boxes3d.clone().detach().view(-1, code_size)
rois_anchor[:, 0:3] = 0
rois_anchor[:, 6] = 0
reg_targets = self.box_coder.encode_torch(
gt_boxes3d_ct.view(rcnn_batch_size, code_size), rois_anchor
)
rcnn_loss_reg = self.reg_loss_func(
rcnn_reg.view(rcnn_batch_size, -1).unsqueeze(dim=0),
reg_targets.unsqueeze(dim=0),
) # [B, M, 7]
rcnn_loss_reg = (rcnn_loss_reg.view(rcnn_batch_size, -1) * fg_mask.unsqueeze(dim=-1).float()).sum() / max(fg_sum, 1)
rcnn_loss_reg = rcnn_loss_reg * loss_cfgs.LOSS_WEIGHTS['rcnn_reg_weight']
tb_dict['rcnn_loss_reg'] = rcnn_loss_reg.item()
if loss_cfgs.CORNER_LOSS_REGULARIZATION and fg_sum > 0:
# TODO: NEED to BE CHECK
fg_rcnn_reg = rcnn_reg.view(rcnn_batch_size, -1)[fg_mask]
fg_roi_boxes3d = roi_boxes3d.view(-1, code_size)[fg_mask]
fg_roi_boxes3d = fg_roi_boxes3d.view(1, -1, code_size)
batch_anchors = fg_roi_boxes3d.clone().detach()
roi_ry = fg_roi_boxes3d[:, :, 6].view(-1)
roi_xyz = fg_roi_boxes3d[:, :, 0:3].view(-1, 3)
batch_anchors[:, :, 0:3] = 0
rcnn_boxes3d = self.box_coder.decode_torch(
fg_rcnn_reg.view(batch_anchors.shape[0], -1, code_size), batch_anchors
).view(-1, code_size)
rcnn_boxes3d = common_utils.rotate_points_along_z(
rcnn_boxes3d.unsqueeze(dim=1), roi_ry
).squeeze(dim=1)
rcnn_boxes3d[:, 0:3] += roi_xyz
loss_corner = loss_utils.get_corner_loss_lidar(
rcnn_boxes3d[:, 0:7],
gt_of_rois_src[fg_mask][:, 0:7]
)
loss_corner = loss_corner.mean()
loss_corner = loss_corner * loss_cfgs.LOSS_WEIGHTS['rcnn_corner_weight']
rcnn_loss_reg += loss_corner
tb_dict['rcnn_loss_corner'] = loss_corner.item()
else:
raise NotImplementedError
return rcnn_loss_reg, tb_dict
디테일한 함수 안까지 보고싶다면 아래의 pcdet 프레임워크를 참고하세요.
GitHub - open-mmlab/OpenPCDet: OpenPCDet Toolbox for LiDAR-based 3D Object Detection.
OpenPCDet Toolbox for LiDAR-based 3D Object Detection. - GitHub - open-mmlab/OpenPCDet: OpenPCDet Toolbox for LiDAR-based 3D Object Detection.
github.com
![](https://t1.daumcdn.net/keditor/emoticon/friends1/large/008.gif)