안녕하세요. 이번 포스팅은 3D Detection 코드를 리뷰하도록 하겠습니다.
코드 베이스는 아래의 OpenPCDet를 사용하였습니다.
이론적인 설명은 아래 포스팅 참고하세요
요약하면 기존의 voxel-based의 detection은 ROI-pooling단계 or Head단계에서 sparse한 특징때문에 bbox의 위치를 정확하고 미세하게 regression하기 힘든 문제를 point-base방법을 이용하여 voxel set Abstraction이라는 모듈을 만들어 fine-tune을 시도한 모델입니다.그리고 fine-tune전까지의 아키텍처가 SECOND모델이라고 생각하시면 됩니다.
VFE(1st stage)
위의 모델 아키텍쳐에서 위에 부분인 voxel-based cnn부분을 보겠습니다. 처음단계로 raw points들을 VFE(Voxel Feature Encoding : VoxelNet 논문에서 언급됨) 단계를 거쳐 feature encoding을 진행하게 됩니다. VoxelNet에서는 이 부분에도 layer를 거쳐(즉, learning방법으로) feature encoding을 하였습니다.
본격적인 3D Conv를 태우기 위한 전단계라고 보시면 됩니다.
OpenPCDet에는 아래의 VEF모듈 종류가 있고 대표적인 MeanVFE를 살펴보겠습니다. (개인적으로 VFE모듈을 바꿔서 ablation study를 해본결과 큰 차이는 없었습니다)
class MeanVFE(VFETemplate):
def __init__(self, model_cfg, num_point_features, **kwargs):
super().__init__(model_cfg=model_cfg)
self.num_point_features = num_point_features
def get_output_feature_dim(self):
return self.num_point_features
def forward(self, batch_dict, **kwargs):
"""
Args:
batch_dict:
voxels: (num_voxels, max_points_per_voxel, C)
voxel_num_points: optional (num_voxels)
**kwargs:
Returns:
vfe_features: (num_voxels, C)
"""
voxel_features, voxel_num_points = batch_dict['voxels'], batch_dict['voxel_num_points']
points_mean = voxel_features[:, :, :].sum(dim=1, keepdim=False)
normalizer = torch.clamp_min(voxel_num_points.view(-1, 1), min=1.0).type_as(voxel_features)
points_mean = points_mean / normalizer
batch_dict['voxel_features'] = points_mean.contiguous()
return batch_dict
위의 코드는 sparse conv 라이브러리를 통해 voxelization을 하고 voxel안에 있는 points들의 feature(여기서의 feature는 raw points의 x,y,z,intensity..)을 평균화합니다.
voxel-based backbone(1st stage)
이제 3D Convolution을 태울차례인데 voxel의 대부분은 empty이다보니(lidar의 sparse한 특정) 대부분의 3D Detection(autonomous field)에서는 이런 sparse한 특성 혹은 disadvantage를 어떻게 해결하느냐로 발전해왔습니다. 그 중 하나가 sparse convolution인데 즉, sparse한 voxel을 보다 빠르게 처리하기 위한 방법입니다. 유명한 library로를 second에서 나온 spconv와 spvnas(segmentation-mit)에서 나온 torchsparse가 있습니다. pcdet에서는 spconv를 사용하였습니다.(아마 더 오래되고 pcdet도 china에서 개발한 framework라서 그런거같습니다?)
대략적인 원리는 empty한 부분은 0패딩을 하지 않고 아예연산을 하지 않는 것인데, non-empty voxel부분만 hashmap에 저장하고 이들만 연산합니다.
아래는 sparse conv를 이용하여 voxelbase의 백본을 만드는 코드입니다. 첫번째 단계인 VFE를 거쳐서 enconding된 크기를 B,H,W,Z라고 하면 최종 128채널의 B,H*,W*,Z*가 출력됩니다.
class VoxelBackBone8x(nn.Module):
def __init__(self, model_cfg, input_channels, grid_size, **kwargs):
super().__init__()
self.model_cfg = model_cfg
norm_fn = partial(nn.BatchNorm1d, eps=1e-3, momentum=0.01)
self.sparse_shape = grid_size[::-1] + [1, 0, 0]
self.conv_input = spconv.SparseSequential(
spconv.SubMConv3d(input_channels, 16, 3, padding=1, bias=False, indice_key='subm1'),
norm_fn(16),
nn.ReLU(),
)
block = post_act_block
self.conv1 = spconv.SparseSequential(
block(16, 16, 3, norm_fn=norm_fn, padding=1, indice_key='subm1'),
)
self.conv2 = spconv.SparseSequential(
# [1600, 1408, 41] <- [800, 704, 21]
block(16, 32, 3, norm_fn=norm_fn, stride=2, padding=1, indice_key='spconv2', conv_type='spconv'),
block(32, 32, 3, norm_fn=norm_fn, padding=1, indice_key='subm2'),
block(32, 32, 3, norm_fn=norm_fn, padding=1, indice_key='subm2'),
)
self.conv3 = spconv.SparseSequential(
# [800, 704, 21] <- [400, 352, 11]
block(32, 64, 3, norm_fn=norm_fn, stride=2, padding=1, indice_key='spconv3', conv_type='spconv'),
block(64, 64, 3, norm_fn=norm_fn, padding=1, indice_key='subm3'),
block(64, 64, 3, norm_fn=norm_fn, padding=1, indice_key='subm3'),
)
self.conv4 = spconv.SparseSequential(
# [400, 352, 11] <- [200, 176, 5]
block(64, 64, 3, norm_fn=norm_fn, stride=2, padding=(0, 1, 1), indice_key='spconv4', conv_type='spconv'),
block(64, 64, 3, norm_fn=norm_fn, padding=1, indice_key='subm4'),
block(64, 64, 3, norm_fn=norm_fn, padding=1, indice_key='subm4'),
)
last_pad = 0
last_pad = self.model_cfg.get('last_pad', last_pad)
self.conv_out = spconv.SparseSequential(
# [200, 150, 5] -> [200, 150, 2]
spconv.SparseConv3d(64, 128, (3, 1, 1), stride=(2, 1, 1), padding=last_pad,
bias=False, indice_key='spconv_down2'),
norm_fn(128),
nn.ReLU(),
)
self.num_point_features = 128
self.backbone_channels = {
'x_conv1': 16,
'x_conv2': 32,
'x_conv3': 64,
'x_conv4': 64
}
def forward(self, batch_dict):
"""
Args:
batch_dict:
batch_size: int
vfe_features: (num_voxels, C)
voxel_coords: (num_voxels, 4), [batch_idx, z_idx, y_idx, x_idx]
Returns:
batch_dict:
encoded_spconv_tensor: sparse tensor
"""
voxel_features, voxel_coords = batch_dict['voxel_features'], batch_dict['voxel_coords']
batch_size = batch_dict['batch_size']
input_sp_tensor = spconv.SparseConvTensor(
features=voxel_features,
indices=voxel_coords.int(),
spatial_shape=self.sparse_shape,
batch_size=batch_size
)
x = self.conv_input(input_sp_tensor)
x_conv1 = self.conv1(x)
x_conv2 = self.conv2(x_conv1)
x_conv3 = self.conv3(x_conv2)
x_conv4 = self.conv4(x_conv3)
# for detection head
# [200, 176, 5] -> [200, 176, 2]
out = self.conv_out(x_conv4)
batch_dict.update({
'encoded_spconv_tensor': out,
'encoded_spconv_tensor_stride': 8
})
batch_dict.update({
'multi_scale_3d_features': {
'x_conv1': x_conv1,
'x_conv2': x_conv2,
'x_conv3': x_conv3,
'x_conv4': x_conv4,
}
})
batch_dict.update({
'multi_scale_3d_strides': {
'x_conv1': 1,
'x_conv2': 2,
'x_conv3': 4,
'x_conv4': 8,
}
})
return batch_dict
BEV(1st stage)
BEV(Bird Eye View)는 말그대로 z방향에 수직하여 본 view를 의미하게 즉 z는 고려하지 않는 view입니다. pcdet 프레임워크에서 BEV로 만드는 모듈은 아래와 같습니다.
일단 SECOND, PVRCNN에서 쓰는 일반적인 HeightCompression을 보겠습니다.
class HeightCompression(nn.Module):
def __init__(self, model_cfg, **kwargs):
super().__init__()
self.model_cfg = model_cfg
self.num_bev_features = self.model_cfg.NUM_BEV_FEATURES
def forward(self, batch_dict):
"""
Args:
batch_dict:
encoded_spconv_tensor: sparse tensor
Returns:
batch_dict:
spatial_features:
"""
encoded_spconv_tensor = batch_dict['encoded_spconv_tensor']
spatial_features = encoded_spconv_tensor.dense()
N, C, D, H, W = spatial_features.shape
spatial_features = spatial_features.view(N, C * D, H, W)
batch_dict['spatial_features'] = spatial_features
batch_dict['spatial_features_stride'] = batch_dict['encoded_spconv_tensor_stride']
return batch_dict
위의 코드를 보면 voxel base backbone에서 나온 N(batch size), C(channel), D(z), H(x), W(y)에서 channel과 D(z-height)를 단순하게 합쳐줍니다. 그렇게 되면 BEV이후의 shape은 N(batch size), CxD(channel개수 X z feature크기), H, W 가 됩니다.
이후에는 2D conv를 태웁니다. pcdet에서는 아래와 같은 종류가 있는데 BEVResBackbone은 residual block을 사용하였습니다.
이 부분은 매우 기본적인 내용이라서 생략하겠습니다.
Head(1st stage)
이 후의 과정은 2D detection과 유사합니다. 2D Detection과정에 대해서는 아래 포스팅 참고하세요
pcdet에서 제공하는 head의 유형은 아래와 같습니다.
Anchor single타입이 대표적인 하나의 앵커가 Region을 proposal하는 것이고 AnchorHeadMulti는 yolo처럼 각각의 class마다 anchor를 가지고 있어서 해당 class가 맞는지 아닌지 0 or 1로 구별하는 타입입니다. CenterHead는 Centerpoint에서 제안된 tracking에도 사용될 수 있는 head이고 pointHead는 pointrcnn에 특화된, VoxelNeXtHead는 VoxelNeXt에 특화된, TransFusionHead는 BEVFusion에 특화된 head입니다.
여기서는 AnchorHeadSingle을 살펴보겠습니다.
import numpy as np
import torch.nn as nn
from .anchor_head_template import AnchorHeadTemplate
@staticmethod
def generate_anchors(anchor_generator_cfg, grid_size, point_cloud_range, anchor_ndim=7):
anchor_generator = AnchorGenerator(
anchor_range=point_cloud_range,
anchor_generator_config=anchor_generator_cfg
)
feature_map_size = [grid_size[:2] // config['feature_map_stride'] for config in anchor_generator_cfg]
anchors_list, num_anchors_per_location_list = anchor_generator.generate_anchors(feature_map_size)
if anchor_ndim != 7:
for idx, anchors in enumerate(anchors_list):
pad_zeros = anchors.new_zeros([*anchors.shape[0:-1], anchor_ndim - 7])
new_anchors = torch.cat((anchors, pad_zeros), dim=-1)
anchors_list[idx] = new_anchors
return anchors_list, num_anchors_per_location_list
class AnchorHeadSingle(AnchorHeadTemplate):
def __init__(self, model_cfg, input_channels, num_class, class_names, grid_size, point_cloud_range,
predict_boxes_when_training=True, **kwargs):
super().__init__(
model_cfg=model_cfg, num_class=num_class, class_names=class_names, grid_size=grid_size, point_cloud_range=point_cloud_range,
predict_boxes_when_training=predict_boxes_when_training
)
self.num_anchors_per_location = sum(self.num_anchors_per_location)
self.conv_cls = nn.Conv2d(
input_channels, self.num_anchors_per_location * self.num_class,
kernel_size=1
)
self.conv_box = nn.Conv2d(
input_channels, self.num_anchors_per_location * self.box_coder.code_size,
kernel_size=1
)
if self.model_cfg.get('USE_DIRECTION_CLASSIFIER', None) is not None:
self.conv_dir_cls = nn.Conv2d(
input_channels,
self.num_anchors_per_location * self.model_cfg.NUM_DIR_BINS,
kernel_size=1
)
else:
self.conv_dir_cls = None
self.init_weights()
def init_weights(self):
pi = 0.01
nn.init.constant_(self.conv_cls.bias, -np.log((1 - pi) / pi))
nn.init.normal_(self.conv_box.weight, mean=0, std=0.001)
def forward(self, data_dict):
spatial_features_2d = data_dict['spatial_features_2d']
cls_preds = self.conv_cls(spatial_features_2d)
box_preds = self.conv_box(spatial_features_2d)
cls_preds = cls_preds.permute(0, 2, 3, 1).contiguous() # [N, H, W, C]
box_preds = box_preds.permute(0, 2, 3, 1).contiguous() # [N, H, W, C]
self.forward_ret_dict['cls_preds'] = cls_preds
self.forward_ret_dict['box_preds'] = box_preds
if self.conv_dir_cls is not None:
dir_cls_preds = self.conv_dir_cls(spatial_features_2d)
dir_cls_preds = dir_cls_preds.permute(0, 2, 3, 1).contiguous()
self.forward_ret_dict['dir_cls_preds'] = dir_cls_preds
else:
dir_cls_preds = None
if self.training:
targets_dict = self.assign_targets(
gt_boxes=data_dict['gt_boxes']
)
self.forward_ret_dict.update(targets_dict)
if not self.training or self.predict_boxes_when_training:
batch_cls_preds, batch_box_preds = self.generate_predicted_boxes(
batch_size=data_dict['batch_size'],
cls_preds=cls_preds, box_preds=box_preds, dir_cls_preds=dir_cls_preds
)
data_dict['batch_cls_preds'] = batch_cls_preds
data_dict['batch_box_preds'] = batch_box_preds
data_dict['cls_preds_normalized'] = False
return data_dict
2D backbone을 통해 나온 feature map을 conv_cls, conv_box를 통과시켜서 각각 conv 한번 더 태웁니다. 이후 direction에 대한 classification을 위해 feature을 conv_dir_cls에 통과시킵니다. 크기는 num_anchors_per_location은 anchor가 생성되는 위치에서 생성된 anchor box의 개수이고 conv_cls는 anchor box개수에 class개수만큼 곱한 크기, conv_box는 anchor box개수에 anchor dim인데 보통 (x,y,z,dx,dy,dz,ry)인 7을 의미합니다. conv_dir_cls은 anchor box개수에 dir binary개수(2- bin base prediction으로 0~180인지 180~360인지 러프하게 구함)를 곱한 크기를 의미합니다(option).
SECOND의 경우 if문에서 training으로 들어가서 gt box를 통해 바로 backpropagation을 하게 되고 PVRCNN계열은 아래로 분기를 타게 되서 한번 더 finetune과정을 겪습니다. 제안된 anchor bbox중에서 gt와 가장 iou가 큰 녀석에 할당되어 아래 loss function을 거쳐 학습이 진행됩니다.
def build_losses(self, losses_cfg):
self.add_module(
'cls_loss_func',
loss_utils.SigmoidFocalClassificationLoss(alpha=0.25, gamma=2.0)
)
reg_loss_name = 'WeightedSmoothL1Loss' if losses_cfg.get('REG_LOSS_TYPE', None) is None \
else losses_cfg.REG_LOSS_TYPE
self.add_module(
'reg_loss_func',
getattr(loss_utils, reg_loss_name)(code_weights=losses_cfg.LOSS_WEIGHTS['code_weights'])
)
self.add_module(
'dir_loss_func',
loss_utils.WeightedCrossEntropyLoss()
)
그렇게 생성된 conv_cls, conv_box, conv_dir_cls은 위의 loss function을 통해 학습을 하게 됩니다.
conv_cls은 생성된 anchorbox마다의 class 확률, conv_box은 생성된 anchorbox마다 (x,y,z,dx,dy,dz,ry)를 포함하고 아래 loss식을 통해 regression합니다. 이를 weight_sum하여 loss function으로 가져갑니다.
VoxelSetAbstraction(2nd-Stage)
이후부터는 refinement 부분입니다. 위의 그림은 pointnet++의 set abstraction과정입니다. FPS로 기준 point를 찾고 grouping한다음 mini-pointnet(mlp)를 거치는 이 한 cycle을 set abstraction이라고 합니다. 이를 활용한 것을 voxelsetabstraction이라고 합니다.
즉, raw points에서 뽑은 keypoint를 1st단계에서 feature map에 대응시켜서 대응된 voxel의 주변에 non-empty한 부분들하고 pointnet++를 태우는 것입니다. 즉, voxel을 pointnet++에 태운다고 이해하시면 됩니다.
def get_sampled_points(self, batch_dict):
"""
Args:
batch_dict:
Returns:
keypoints: (N1 + N2 + ..., 4), where 4 indicates [bs_idx, x, y, z]
"""
batch_size = batch_dict['batch_size']
if self.model_cfg.POINT_SOURCE == 'raw_points':
src_points = batch_dict['points'][:, 1:4]
batch_indices = batch_dict['points'][:, 0].long()
elif self.model_cfg.POINT_SOURCE == 'voxel_centers':
src_points = common_utils.get_voxel_centers(
batch_dict['voxel_coords'][:, 1:4],
downsample_times=1,
voxel_size=self.voxel_size,
point_cloud_range=self.point_cloud_range
)
batch_indices = batch_dict['voxel_coords'][:, 0].long()
else:
raise NotImplementedError
keypoints_list = []
for bs_idx in range(batch_size):
bs_mask = (batch_indices == bs_idx)
sampled_points = src_points[bs_mask].unsqueeze(dim=0) # (1, N, 3)
if self.model_cfg.SAMPLE_METHOD == 'FPS':
cur_pt_idxs = pointnet2_stack_utils.farthest_point_sample(
sampled_points[:, :, 0:3].contiguous(), self.model_cfg.NUM_KEYPOINTS
).long()
if sampled_points.shape[1] < self.model_cfg.NUM_KEYPOINTS:
times = int(self.model_cfg.NUM_KEYPOINTS / sampled_points.shape[1]) + 1
non_empty = cur_pt_idxs[0, :sampled_points.shape[1]]
cur_pt_idxs[0] = non_empty.repeat(times)[:self.model_cfg.NUM_KEYPOINTS]
keypoints = sampled_points[0][cur_pt_idxs[0]].unsqueeze(dim=0)
elif self.model_cfg.SAMPLE_METHOD == 'SPC':
cur_keypoints = self.sectorized_proposal_centric_sampling(
roi_boxes=batch_dict['rois'][bs_idx], points=sampled_points[0]
)
bs_idxs = cur_keypoints.new_ones(cur_keypoints.shape[0]) * bs_idx
keypoints = torch.cat((bs_idxs[:, None], cur_keypoints), dim=1)
else:
raise NotImplementedError
keypoints_list.append(keypoints)
keypoints = torch.cat(keypoints_list, dim=0) # (B, M, 3) or (N1 + N2 + ..., 4)
if len(keypoints.shape) == 3:
batch_idx = torch.arange(batch_size, device=keypoints.device).view(-1, 1).repeat(1, keypoints.shape[1]).view(-1, 1)
keypoints = torch.cat((batch_idx.float(), keypoints.view(-1, 3)), dim=1)
return keypoints
@staticmethod
def aggregate_keypoint_features_from_one_source(
batch_size, aggregate_func, xyz, xyz_features, xyz_bs_idxs, new_xyz, new_xyz_batch_cnt,
filter_neighbors_with_roi=False, radius_of_neighbor=None, num_max_points_of_part=200000, rois=None
):
"""
Args:
aggregate_func:
xyz: (N, 3)
xyz_features: (N, C)
xyz_bs_idxs: (N)
new_xyz: (M, 3)
new_xyz_batch_cnt: (batch_size), [N1, N2, ...]
filter_neighbors_with_roi: True/False
radius_of_neighbor: float
num_max_points_of_part: int
rois: (batch_size, num_rois, 7 + C)
Returns:
"""
xyz_batch_cnt = xyz.new_zeros(batch_size).int()
if filter_neighbors_with_roi:
point_features = torch.cat((xyz, xyz_features), dim=-1) if xyz_features is not None else xyz
point_features_list = []
for bs_idx in range(batch_size):
bs_mask = (xyz_bs_idxs == bs_idx)
_, valid_mask = sample_points_with_roi(
rois=rois[bs_idx], points=xyz[bs_mask],
sample_radius_with_roi=radius_of_neighbor, num_max_points_of_part=num_max_points_of_part,
)
point_features_list.append(point_features[bs_mask][valid_mask])
xyz_batch_cnt[bs_idx] = valid_mask.sum()
valid_point_features = torch.cat(point_features_list, dim=0)
xyz = valid_point_features[:, 0:3]
xyz_features = valid_point_features[:, 3:] if xyz_features is not None else None
else:
for bs_idx in range(batch_size):
xyz_batch_cnt[bs_idx] = (xyz_bs_idxs == bs_idx).sum()
pooled_points, pooled_features = aggregate_func(
xyz=xyz.contiguous(),
xyz_batch_cnt=xyz_batch_cnt,
new_xyz=new_xyz,
new_xyz_batch_cnt=new_xyz_batch_cnt,
features=xyz_features.contiguous(),
)
return pooled_features
def forward(self, batch_dict):
"""
Args:
batch_dict:
batch_size:
keypoints: (B, num_keypoints, 3)
multi_scale_3d_features: {
'x_conv4': ...
}
points: optional (N, 1 + 3 + C) [bs_idx, x, y, z, ...]
spatial_features: optional
spatial_features_stride: optional
Returns:
point_features: (N, C)
point_coords: (N, 4)
"""
keypoints = self.get_sampled_points(batch_dict)
point_features_list = []
if 'bev' in self.model_cfg.FEATURES_SOURCE:
point_bev_features = self.interpolate_from_bev_features(
keypoints, batch_dict['spatial_features'], batch_dict['batch_size'],
bev_stride=batch_dict['spatial_features_stride']
)
point_features_list.append(point_bev_features)
batch_size = batch_dict['batch_size']
new_xyz = keypoints[:, 1:4].contiguous()
new_xyz_batch_cnt = new_xyz.new_zeros(batch_size).int()
for k in range(batch_size):
new_xyz_batch_cnt[k] = (keypoints[:, 0] == k).sum()
if 'raw_points' in self.model_cfg.FEATURES_SOURCE:
raw_points = batch_dict['points']
pooled_features = self.aggregate_keypoint_features_from_one_source(
batch_size=batch_size, aggregate_func=self.SA_rawpoints,
xyz=raw_points[:, 1:4],
xyz_features=raw_points[:, 4:].contiguous() if raw_points.shape[1] > 4 else None,
xyz_bs_idxs=raw_points[:, 0],
new_xyz=new_xyz, new_xyz_batch_cnt=new_xyz_batch_cnt,
filter_neighbors_with_roi=self.model_cfg.SA_LAYER['raw_points'].get('FILTER_NEIGHBOR_WITH_ROI', False),
radius_of_neighbor=self.model_cfg.SA_LAYER['raw_points'].get('RADIUS_OF_NEIGHBOR_WITH_ROI', None),
rois=batch_dict.get('rois', None)
)
point_features_list.append(pooled_features)
for k, src_name in enumerate(self.SA_layer_names):
cur_coords = batch_dict['multi_scale_3d_features'][src_name].indices
cur_features = batch_dict['multi_scale_3d_features'][src_name].features.contiguous()
xyz = common_utils.get_voxel_centers(
cur_coords[:, 1:4], downsample_times=self.downsample_times_map[src_name],
voxel_size=self.voxel_size, point_cloud_range=self.point_cloud_range
)
pooled_features = self.aggregate_keypoint_features_from_one_source(
batch_size=batch_size, aggregate_func=self.SA_layers[k],
xyz=xyz.contiguous(), xyz_features=cur_features, xyz_bs_idxs=cur_coords[:, 0],
new_xyz=new_xyz, new_xyz_batch_cnt=new_xyz_batch_cnt,
filter_neighbors_with_roi=self.model_cfg.SA_LAYER[src_name].get('FILTER_NEIGHBOR_WITH_ROI', False),
radius_of_neighbor=self.model_cfg.SA_LAYER[src_name].get('RADIUS_OF_NEIGHBOR_WITH_ROI', None),
rois=batch_dict.get('rois', None)
)
point_features_list.append(pooled_features)
point_features = torch.cat(point_features_list, dim=-1)
batch_dict['point_features_before_fusion'] = point_features.view(-1, point_features.shape[-1])
point_features = self.vsa_point_feature_fusion(point_features.view(-1, point_features.shape[-1]))
batch_dict['point_features'] = point_features # (BxN, C)
batch_dict['point_coords'] = keypoints # (BxN, 4)
return batch_dict
우선 get_sampled_points함수를 통해 raw_points에서 keypoints를 뽑습니다. 뽑는 방법은 FPS방법으로 keypoint들이 가장 멀리 떨어지도록 뽑는 방법입니다. pointnet++에서 제안되었고 pcdet프레임워크에서도 pointnet++를 모듈을 그래도 사용합니다.
이제 voxel base단계에서 각각의 feature map에서 keypoints들이 매칭되는 voxel과 그 이웃의 non-empty한 복셀들과 grouping을 하고 mini-pointnet을 태웁니다. 그 과정이 self.aggregate_keypoint_features_from_one_source 함수에서 이뤄집니다.
어느 feature map을 이용할지는 config에서 설정할 수 있습니다.
이렇게 되면 raw_point에서 뽑은 keypoint에 voxel을 pointnet++에 태우고 나온 feature를 vectorize한 keypoints들이 리턴됩니다.
RVRCNN Head(ROI Head) - 2nd Stage
1stage에 뽑은 bbox prediction과 voxelset abstraction에서 뽑은 keypoints들을 이용해서 refine하는 과정입니다.
Grid point는 1st stage의 bbox내부에 6x6x6의 uniform sampling point를 생성하고 각각의 grid point와 voxel set abstraction을 통해 feature를 갖고있는 keypoints들을 다시 pointnet++를 수행하고 나온 값들을 vectorize합니다.
self.cls_layers = self.make_fc_layers(
input_channels=pre_channel, output_channels=self.num_class, fc_list=self.model_cfg.CLS_FC
)
self.reg_layers = self.make_fc_layers(
input_channels=pre_channel,
output_channels=self.box_coder.code_size * self.num_class,
fc_list=self.model_cfg.REG_FC
)
self.roi_grid_pool_layer, num_c_out = pointnet2_stack_modules.build_local_aggregation_module(
input_channels=input_channels, config=self.model_cfg.ROI_GRID_POOL
)
def roi_grid_pool(self, batch_dict):
"""
Args:
batch_dict:
batch_size:
rois: (B, num_rois, 7 + C)
point_coords: (num_points, 4) [bs_idx, x, y, z]
point_features: (num_points, C)
point_cls_scores: (N1 + N2 + N3 + ..., 1)
point_part_offset: (N1 + N2 + N3 + ..., 3)
Returns:
"""
batch_size = batch_dict['batch_size']
rois = batch_dict['rois']
point_coords = batch_dict['point_coords']
point_features = batch_dict['point_features']
point_features = point_features * batch_dict['point_cls_scores'].view(-1, 1)
global_roi_grid_points, local_roi_grid_points = self.get_global_grid_points_of_roi(
rois, grid_size=self.model_cfg.ROI_GRID_POOL.GRID_SIZE
) # (BxN, 6x6x6, 3)
global_roi_grid_points = global_roi_grid_points.view(batch_size, -1, 3) # (B, Nx6x6x6, 3)
xyz = point_coords[:, 1:4]
xyz_batch_cnt = xyz.new_zeros(batch_size).int()
batch_idx = point_coords[:, 0]
for k in range(batch_size):
xyz_batch_cnt[k] = (batch_idx == k).sum()
new_xyz = global_roi_grid_points.view(-1, 3)
new_xyz_batch_cnt = xyz.new_zeros(batch_size).int().fill_(global_roi_grid_points.shape[1])
pooled_points, pooled_features = self.roi_grid_pool_layer(
xyz=xyz.contiguous(),
xyz_batch_cnt=xyz_batch_cnt,
new_xyz=new_xyz,
new_xyz_batch_cnt=new_xyz_batch_cnt,
features=point_features.contiguous(),
) # (M1 + M2 ..., C)
pooled_features = pooled_features.view(
-1, self.model_cfg.ROI_GRID_POOL.GRID_SIZE ** 3,
pooled_features.shape[-1]
) # (BxN, 6x6x6, C)
return pooled_features
def forward(self, batch_dict):
"""
:param input_data: input dict
:return:
"""
targets_dict = self.proposal_layer(
batch_dict, nms_config=self.model_cfg.NMS_CONFIG['TRAIN' if self.training else 'TEST']
)
if self.training:
targets_dict = batch_dict.get('roi_targets_dict', None)
if targets_dict is None:
targets_dict = self.assign_targets(batch_dict)
batch_dict['rois'] = targets_dict['rois']
batch_dict['roi_labels'] = targets_dict['roi_labels']
# RoI aware pooling
pooled_features = self.roi_grid_pool(batch_dict) # (BxN, 6x6x6, C)
grid_size = self.model_cfg.ROI_GRID_POOL.GRID_SIZE
batch_size_rcnn = pooled_features.shape[0]
pooled_features = pooled_features.permute(0, 2, 1).\
contiguous().view(batch_size_rcnn, -1, grid_size, grid_size, grid_size) # (BxN, C, 6, 6, 6)
shared_features = self.shared_fc_layer(pooled_features.view(batch_size_rcnn, -1, 1))
rcnn_cls = self.cls_layers(shared_features).transpose(1, 2).contiguous().squeeze(dim=1) # (B, 1 or 2)
rcnn_reg = self.reg_layers(shared_features).transpose(1, 2).contiguous().squeeze(dim=1) # (B, C)
if not self.training:
batch_cls_preds, batch_box_preds = self.generate_predicted_boxes(
batch_size=batch_dict['batch_size'], rois=batch_dict['rois'], cls_preds=rcnn_cls, box_preds=rcnn_reg
)
batch_dict['batch_cls_preds'] = batch_cls_preds
batch_dict['batch_box_preds'] = batch_box_preds
batch_dict['cls_preds_normalized'] = False
else:
targets_dict['rcnn_cls'] = rcnn_cls
targets_dict['rcnn_reg'] = rcnn_reg
self.forward_ret_dict = targets_dict
return batch_dict
roi_grid_pool function에서 실제적인 위에서 설명한 알고리즘이 진행됩니다. get_global_grid_points_of_roi는 grid point를 생성하는 부분이고 roi_grid_pool_layer를 통해서 grid point를 기준으로 keypoint(w voxel feature)들을 pointnet++태웁니다.
나온 vectorized 된 feature들을 이후 shared_fc_layer를 통해 fully connect화하고 각각 classification layer와 regression layer를 거칩니다. class_layers는 1D conv로 이뤄져 있고 class 개수 만큼 output이 나오고 각각의 confidence를 나타냅니다.
regression_layers는 마찬가지로 1D conv로 이뤄져 있고 x,y,z,dx,dy,dz,ry 7개의 값을 prediction합니다. 이렇게 anchor bbox없이 point들만으로 좀 더 풍부한 receptive field에서 head과정을 거칩니다.
Loss - 2nd Stage
loss는 1st와 유사하며 코드는 아래와 같습니다.
def get_box_cls_layer_loss(self, forward_ret_dict):
loss_cfgs = self.model_cfg.LOSS_CONFIG
rcnn_cls = forward_ret_dict['rcnn_cls']
rcnn_cls_labels = forward_ret_dict['rcnn_cls_labels'].view(-1)
if loss_cfgs.CLS_LOSS == 'BinaryCrossEntropy':
rcnn_cls_flat = rcnn_cls.view(-1)
batch_loss_cls = F.binary_cross_entropy(torch.sigmoid(rcnn_cls_flat), rcnn_cls_labels.float(), reduction='none')
cls_valid_mask = (rcnn_cls_labels >= 0).float()
rcnn_loss_cls = (batch_loss_cls * cls_valid_mask).sum() / torch.clamp(cls_valid_mask.sum(), min=1.0)
elif loss_cfgs.CLS_LOSS == 'CrossEntropy':
batch_loss_cls = F.cross_entropy(rcnn_cls, rcnn_cls_labels, reduction='none', ignore_index=-1)
cls_valid_mask = (rcnn_cls_labels >= 0).float()
rcnn_loss_cls = (batch_loss_cls * cls_valid_mask).sum() / torch.clamp(cls_valid_mask.sum(), min=1.0)
else:
raise NotImplementedError
def get_box_reg_layer_loss(self, forward_ret_dict):
loss_cfgs = self.model_cfg.LOSS_CONFIG
code_size = self.box_coder.code_size
reg_valid_mask = forward_ret_dict['reg_valid_mask'].view(-1)
gt_boxes3d_ct = forward_ret_dict['gt_of_rois'][..., 0:code_size]
gt_of_rois_src = forward_ret_dict['gt_of_rois_src'][..., 0:code_size].view(-1, code_size)
rcnn_reg = forward_ret_dict['rcnn_reg'] # (rcnn_batch_size, C)
roi_boxes3d = forward_ret_dict['rois']
rcnn_batch_size = gt_boxes3d_ct.view(-1, code_size).shape[0]
fg_mask = (reg_valid_mask > 0)
fg_sum = fg_mask.long().sum().item()
tb_dict = {}
if loss_cfgs.REG_LOSS == 'smooth-l1':
rois_anchor = roi_boxes3d.clone().detach().view(-1, code_size)
rois_anchor[:, 0:3] = 0
rois_anchor[:, 6] = 0
reg_targets = self.box_coder.encode_torch(
gt_boxes3d_ct.view(rcnn_batch_size, code_size), rois_anchor
)
rcnn_loss_reg = self.reg_loss_func(
rcnn_reg.view(rcnn_batch_size, -1).unsqueeze(dim=0),
reg_targets.unsqueeze(dim=0),
) # [B, M, 7]
rcnn_loss_reg = (rcnn_loss_reg.view(rcnn_batch_size, -1) * fg_mask.unsqueeze(dim=-1).float()).sum() / max(fg_sum, 1)
rcnn_loss_reg = rcnn_loss_reg * loss_cfgs.LOSS_WEIGHTS['rcnn_reg_weight']
tb_dict['rcnn_loss_reg'] = rcnn_loss_reg.item()
if loss_cfgs.CORNER_LOSS_REGULARIZATION and fg_sum > 0:
# TODO: NEED to BE CHECK
fg_rcnn_reg = rcnn_reg.view(rcnn_batch_size, -1)[fg_mask]
fg_roi_boxes3d = roi_boxes3d.view(-1, code_size)[fg_mask]
fg_roi_boxes3d = fg_roi_boxes3d.view(1, -1, code_size)
batch_anchors = fg_roi_boxes3d.clone().detach()
roi_ry = fg_roi_boxes3d[:, :, 6].view(-1)
roi_xyz = fg_roi_boxes3d[:, :, 0:3].view(-1, 3)
batch_anchors[:, :, 0:3] = 0
rcnn_boxes3d = self.box_coder.decode_torch(
fg_rcnn_reg.view(batch_anchors.shape[0], -1, code_size), batch_anchors
).view(-1, code_size)
rcnn_boxes3d = common_utils.rotate_points_along_z(
rcnn_boxes3d.unsqueeze(dim=1), roi_ry
).squeeze(dim=1)
rcnn_boxes3d[:, 0:3] += roi_xyz
loss_corner = loss_utils.get_corner_loss_lidar(
rcnn_boxes3d[:, 0:7],
gt_of_rois_src[fg_mask][:, 0:7]
)
loss_corner = loss_corner.mean()
loss_corner = loss_corner * loss_cfgs.LOSS_WEIGHTS['rcnn_corner_weight']
rcnn_loss_reg += loss_corner
tb_dict['rcnn_loss_corner'] = loss_corner.item()
else:
raise NotImplementedError
return rcnn_loss_reg, tb_dict
디테일한 함수 안까지 보고싶다면 아래의 pcdet 프레임워크를 참고하세요.