이번 논문리뷰는 BEVFusion이라는 논문으로 3D Detection에서 multi-modal 그 중에서도 camera-lidar에 관련된 논문입니다. ICRA 23에 publish되고 현재기준으로 multi model 3D detection쪽에서 opensource중에서는 최고성능을 자랑하고 multi task로 쓸 수 있는 유용한 multi-model 3D detector라서 살펴보게 되었습니다.
Intro
기존의 camera-lidar 를 fusion하여 Detection하는 모델들은 많았습니다. 대표적인 방법으로는 calibration matrix는 알고있다는 가정하에 lidar points들을 camera domain에 projection하여 fusion하는 방법입니다. 이 방법은 간편하지만 geometric distortion이 발생하하여 효과적인 geometric feature를 얻기에 부적절하다는 단점이 있습니다. 다음으로는 image를 lidar domain으로 reverse-projection을 시키는 방법이 있습니다. 이 방법 또한 image의 semantic한 정보들이 loss되는 문제가 있습니다.
해당 논문에서는 lidar, image 각각의 feature를 BEV space로 보내 fusion하는 방법을 제시하며 이를 통해 infomation loss를 최소화 시킬 수 있다고 주장합니다.
Method
BEV를 각각의 representation을 unify하는데 선택한 이유는 lidar의 geometry loss와 image의 semantic loss가 적기 때문이라고 합니다.(직관적으로 와닿지는 않습니다만..)
lidar feature를 BEV로 flatten하는건 익히 많이 쓰는 기법이라 언급할 것이 없지만 image feature를 BEV로 projection하는 것은 쉽지 않기에 그에 대해 좀 더 살펴보겠습니다. 아래의 그림은 각각의 feature를 BEV Field로 projection했을 때의 그림입니다.
Lidar-to-BEV Transformation
먼저 Lidar feature를 BEV하는 방법인데 이는 매우 간단하고 paper에 언급은 안되어있지만 아래의 코드로 확인할 수 있습니다.
'''
Args:
batch_dict:
voxel_features: (B, C, Z, Y, X), Voxel feature representation
Returns:
batch_dict:
spatial_features: (B, C, Y, X), BEV feature representation
'''
class BasicBlock2D(nn.Module):
def __init__(self, in_channels, out_channels, **kwargs):
"""
Initializes convolutional block
Args:
in_channels: int, Number of input channels
out_channels: int, Number of output channels
**kwargs: Dict, Extra arguments for nn.Conv2d
"""
super().__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.conv = nn.Conv2d(in_channels=in_channels,
out_channels=out_channels,
**kwargs)
self.bn = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU(inplace=True)
def forward(self, features):
"""
Applies convolutional block
Args:
features: (B, C_in, H, W), Input features
Returns:
x: (B, C_out, H, W), Output features
"""
x = self.conv(features)
x = self.bn(x)
x = self.relu(x)
return x
self.block = BasicBlock2D(in_channels=self.num_bev_features * self.num_heights,
out_channels=self.num_bev_features,
**self.model_cfg.ARGS)
(flatten)
voxel_features = batch_dict["voxel_features"] # from voxelnet (B, C, Z, Y, X)
bev_features = voxel_features.flatten(start_dim=1, end_dim=2) # (B, C, Z, Y, X) -> (B, C*Z, Y, X)
bev_features = self.block(bev_features) # (B, C*Z, Y, X) -> (B, C, Y, X) 채널 dim 을 C로 맞춰주기위해 conv태움
batch_dict["spatial_features"] = bev_features
return batch_dict
Camera-to-BEV Transformation
해당 논문에서는 LSS, BEVDet를 reference하여 depth distribution을 추정합니다. 아래의 그림은 BEVDet이라는 논문에서 lift라는 methodology에 나와있는 figure입니다.
Lift
BEVDet은 아래와 같이 multi mono cam 3D detector인데 BEVspace로 각각의 image feature를 변환하여 detection을 수행하는 모델입니다.
그 중에서 Lift라는 단계에서 2D의 pixel을 3D points로 transformation합니다. 얼핏보면 NeRF의 그것과 유사합니다만 구현은 nerf의 그것과는 좀 다릅니다. NeRF는 ray(line) eq상에 실제 point를 sampling하고 각각을 mlp에 넣고 neural rendering operation하는 개념이면
def sample_along_ray(origin, direction, near, far, num_samples):
t_vals = np.linspace(near, far, num_samples)
points = origin + direction * t_vals[:, np.newaxis]
return points
# Define ray parameters
camera_pos = np.array([0, 0, 0])
ray_direction = np.array([0, 0, 1]) # Example ray direction
near_clip = 1.0
far_clip = 5.0
num_samples = 100
Lift는 depth bounding 크기를 정의하고 (D)개의 depth channel을 증가시켜서 depth를 뽑아냅니다.
즉, 하나의 pixel feature에서 ray를 쏴 일정 step마다 point를 sampling하여 D개를 뽑습니다. 여기서 기존의 feature map에 D개의 depth차원이 추가됩니다. 각각의 point마다 depth의 확률을 나타내는 $\alpha$값을 가지고 이를 해당 pixel feature(채널이 c라고하면) 에 element wise 곱을 수행하고 그렇게 얻어진 vector를 context vector라고 합니다(HxWxD). 이를 전체 pixel feature에 수행하면 총 (CxHxWxD)만큼 크기의 pointclouds가 생성이 되게 됩니다. 여기서 $\alpha$또한 learnable parameter입니다.
아래 그림은 위의 내용을 도식화하여 나타낸 그림입니다.
아래의 코드는 BEVFusion에서 Camera-to-BEV 부분입니다.
depth = torch.zeros(batch_size, img.shape[1], 1, *self.image_size).to(points[0].device)
for c in range(on_img.shape[0]):
masked_coords = cur_coords[c, on_img[c]].long()
masked_dist = dist[c, on_img[c]]
depth[b, c, 0, masked_coords[:, 0], masked_coords[:, 1]] = masked_dist
img = x.view(int(BN/6), 6, C, H, W)
D : points along the camera ray
self.dtransform = nn.Sequential(
nn.Conv2d(1, 8, 1),
nn.BatchNorm2d(8),
nn.ReLU(True),
nn.Conv2d(8, 32, 5, stride=4, padding=2),
nn.BatchNorm2d(32),
nn.ReLU(True),
nn.Conv2d(32, 64, 5, stride=2, padding=2),
nn.BatchNorm2d(64),
nn.ReLU(True),
)
self.depthnet = nn.Sequential(
nn.Conv2d(in_channel + 64, in_channel, 3, padding=1),
nn.BatchNorm2d(in_channel),
nn.ReLU(True),
nn.Conv2d(in_channel, in_channel, 3, padding=1),
nn.BatchNorm2d(in_channel),
nn.ReLU(True),
nn.Conv2d(in_channel, self.D + self.C, 1),
)
x : image feature map from neck (BxCxHxW)
d : depth map (? from transformed lidar points, assist (no explaination in paper))
def get_cam_feats(self, x, d):
# batch, channel, feature height, feature width
B, C, fH, fW = x.shape
d = d.view(B , *d.shape[2:]) # B,1, H, W ?
x = x.view(B , C, fH, fW)
d = self.dtransform(d) # B, 64, H, W , 기존 lift에는 없는부분 inverse depth map활용
#BEVFusion에서는 lidar의 inverse depth map을 구하여 depth를 추정하는데 reference로 활용
x = torch.cat([d, x], dim=1) # (B, C + 64 , H , W )
x = self.depthnet(x) # [D(points along ray) + C(img feature channel)] x H x W
depth = x[:, : self.D].softmax(dim=1) # points depth probablity distribution
# D x H x W
x = depth.unsqueeze(1) * x[:, self.D : (self.D + self.C)].unsqueeze(2)# C x H x W
# [D x H x W] (element wise dot) [C x H x W] => D x C x H x W
x = x.view(B, self.C, self.D, fH, fW)
#B x C x D x H x W
return x
이렇게 구한 3D image feature map을 이제 lidar의 plane으로 보냅니다. 이 과정에서 calibration matrix를 사용하며 그렇게 매칭된 lidar plane에서 BEV pooling을 수행합니다.
camera-BEV translation에서 생성된 pointcloud는 (CxHxWxD)의 크기를 갖게 되고 크기가 매우 크기 때문에 계산 속도가 매우 느리게 됩니다. 이를 위해 최적화된 BEVPooling방법을 사용합니다.
Figure b에서 index는 (x,y)이고 value는 각 픽셀의 feature과 depth probability를 곱한 값이며, 같은 (x,y) 좌표의 grid에 속하는 point들을 sum pooling 하여 BEV 형태로 flatten하는 과정입니다.
또한 기존의 pooling 같은 모든 point에 대해 prefix sum을 계산한 다음 인덱스가 바뀌는 경계에서 이전 경계의 결과값을 빼는 방식인데 이는 경계에서의 값만 필요로 하는 것에 비해 사용되지 않는 partial sum을 많이 생성하여 비효율적입니다. 논문에서는 feature aggregation을 가속하기 위해, BEV grid에 직접 병렬화하는 특수한 GPU kernel을 제안 각 그리드에 GPU thread를 할당하여 그리드내의 구간 합을 계산하여 그리드의 출력들 사이의 의존성을 제거하기 대문에 multi-level tree reduction이 필요하지 않게 되어 완전한 병렬계산을 할 수 있다고 합니다.
def bev_pool(self, geom_feats, x):
geom_feats = geom_feats.to(torch.float)
x = x.to(torch.float)
B, N, D, H, W, C = x.shape
Nprime = B * N * D * H * W
# flatten x
x = x.reshape(Nprime, C)
# flatten indices
geom_feats = ((geom_feats - (self.bx - self.dx / 2.0)) / self.dx).long()
geom_feats = geom_feats.view(Nprime, 3)
batch_ix = torch.cat([torch.full([Nprime // B, 1], ix, device=x.device, dtype=torch.long) for ix in range(B)])
geom_feats = torch.cat((geom_feats, batch_ix), 1)
# filter out points that are outside box
kept = (
(geom_feats[:, 0] >= 0)
& (geom_feats[:, 0] < self.nx[0])
& (geom_feats[:, 1] >= 0)
& (geom_feats[:, 1] < self.nx[1])
& (geom_feats[:, 2] >= 0)
& (geom_feats[:, 2] < self.nx[2])
)
x = x[kept]
geom_feats = geom_feats[kept]
x = bev_pool(x, geom_feats, B, self.nx[2], self.nx[0], self.nx[1])
# collapse Z
final = torch.cat(x.unbind(dim=2), 1)
return final
Fully-Convolutional Fusion
Shared BEV space로 transformation된 두 센서의 feature는 concat등의 operation으로 쉽게 fusion할 수 있습니다. 하지만 space를 일치화하는 과정(calibration, inverse depth map feature)에서 어느정도 local mismatch가 발생하기 때문에 이를 보상하기 위한 residual block을 포함하는 convolution based BEV encoder를 사용합니다. → 걍 concat한 다음에 conv 한 번 태운거입니다.
class ConvFuser(nn.Module):
def __init__(self,model_cfg) -> None:
super().__init__()
self.model_cfg = model_cfg
in_channel = self.model_cfg.IN_CHANNEL
out_channel = self.model_cfg.OUT_CHANNEL
self.conv = nn.Sequential(
nn.Conv2d(in_channel, out_channel, 3, padding=1, bias=False),
nn.BatchNorm2d(out_channel),
nn.ReLU(True)
)
def forward(self,batch_dict):
"""
Args:
batch_dict:
spatial_features_img (tensor): Bev features from image modality
spatial_features (tensor): Bev features from lidar modality
Returns:
batch_dict:
spatial_features (tensor): Bev features after muli-modal fusion
"""
img_bev = batch_dict['spatial_features_img'] #BxCixHxW
lidar_bev = batch_dict['spatial_features']. #BxClxHxW
cat_bev = torch.cat([img_bev,lidar_bev],dim=1)#Bx(Ci+Cl)xHxW
mm_bev = self.conv(cat_bev) #BxOUTxHoxWo
batch_dict['spatial_features'] = mm_bev
return batch_dict
Multi-Task Heads
BEVFusion은 multi task가 가능합니다. 즉 head에 어떤걸 붙이느냐에 따라 segmentation, detection이 가능합니다.
detection의 경우 center heatmap head(centerpoint)를 사용하였고 segmentation은 CVT를 참고, focal loss를 이용하였습니다.
def predict(self, inputs):
batch_size = inputs.shape[0]
lidar_feat = self.shared_conv(inputs)
lidar_feat_flatten = lidar_feat.view(
batch_size, lidar_feat.shape[1], -1
)
bev_pos = self.bev_pos.repeat(batch_size, 1, 1).to(lidar_feat.device)
# query initialization
dense_heatmap = self.heatmap_head(lidar_feat)
heatmap = dense_heatmap.detach().sigmoid()
padding = self.nms_kernel_size // 2
local_max = torch.zeros_like(heatmap)
local_max_inner = F.max_pool2d(
heatmap, kernel_size=self.nms_kernel_size, stride=1, padding=0
)
local_max[:, :, padding:(-padding), padding:(-padding)] = local_max_inner
# for Pedestrian & Traffic_cone in nuScenes
if self.dataset_name == "nuScenes":
local_max[ :, 8, ] = F.max_pool2d(heatmap[:, 8], kernel_size=1, stride=1, padding=0)
local_max[ :, 9, ] = F.max_pool2d(heatmap[:, 9], kernel_size=1, stride=1, padding=0)
# for Pedestrian & Cyclist in Waymo
elif self.dataset_name == "Waymo":
local_max[ :, 1, ] = F.max_pool2d(heatmap[:, 1], kernel_size=1, stride=1, padding=0)
local_max[ :, 2, ] = F.max_pool2d(heatmap[:, 2], kernel_size=1, stride=1, padding=0)
heatmap = heatmap * (heatmap == local_max)
heatmap = heatmap.view(batch_size, heatmap.shape[1], -1)
# top num_proposals among all classes
top_proposals = heatmap.view(batch_size, -1).argsort(dim=-1, descending=True)[
..., : self.num_proposals
]
top_proposals_class = top_proposals // heatmap.shape[-1]
top_proposals_index = top_proposals % heatmap.shape[-1]
query_feat = lidar_feat_flatten.gather(
index=top_proposals_index[:, None, :].expand(-1, lidar_feat_flatten.shape[1], -1),
dim=-1,
)
self.query_labels = top_proposals_class
# add category embedding
one_hot = F.one_hot(top_proposals_class, num_classes=self.num_classes).permute(0, 2, 1)
query_cat_encoding = self.class_encoding(one_hot.float())
query_feat += query_cat_encoding
query_pos = bev_pos.gather(
index=top_proposals_index[:, None, :].permute(0, 2, 1).expand(-1, -1, bev_pos.shape[-1]),
dim=1,
)
# convert to xy
query_pos = query_pos.flip(dims=[-1])
bev_pos = bev_pos.flip(dims=[-1])
query_feat = self.decoder(
query_feat, lidar_feat_flatten, query_pos, bev_pos
)
res_layer = self.prediction_head(query_feat)
res_layer["center"] = res_layer["center"] + query_pos.permute(0, 2, 1)
res_layer["query_heatmap_score"] = heatmap.gather(
index=top_proposals_index[:, None, :].expand(-1, self.num_classes, -1),
dim=-1,
)
res_layer["dense_heatmap"] = dense_heatmap
return res_layer
def forward(self, batch_dict):
feats = batch_dict['spatial_features_2d']
res = self.predict(feats)
if not self.training:
bboxes = self.get_bboxes(res)
batch_dict['final_box_dicts'] = bboxes
else:
gt_boxes = batch_dict['gt_boxes']
gt_bboxes_3d = gt_boxes[...,:-1]
gt_labels_3d = gt_boxes[...,-1].long() - 1
loss, tb_dict = self.loss(gt_bboxes_3d, gt_labels_3d, res)
batch_dict['loss'] = loss
batch_dict['tb_dict'] = tb_dict
return batch_dict
Experiment
실험에서는 image의 백본으로는 Swin-T, Lidar의 백본으로는 VoxelNet을 사용, dataset은 nuscenes 데이터셋을 사용하였습니다.
논문에서는 기존의 Fusion방식인 Lidar형식이 더 효율적이여서 Camera-to-Lidar projection 방법을 수행하는 것보다 본 논문의 BEVFusion방법이 더 효율적이라는 사실을 실험결과를 통해 말하고 있습니다.
먼 거리의 객체나 작은 객체 검출 성능이 증가하게 되는데 이는 camera의 semantic한 정보를 그대로 가지고 올 수 있기 때문이라고 말하고 있습니다. 그러면서 기존의 point-level Fusion이 최선이라는 관습을 깨버리고 있습니다.
센서 융합의 이점인 비오는 날씨에 lidar detector의 단점을 image와의 Fusion을 통해 해결하고 있습니다. 또한 밤에는 camera센서의 무쓸모를 Lidar로 보완하고 있습니다.
개인적으로는 inverse depth map을 만드는 부분 최근에 나오는 depth estimation기법들을 사용해 보완하면 더욱 성능이 좋은 detector가 되지 않을까 합니다