Maksym-Lysyi's picture
initial commit
# Part of this code is derived/taken from
import os
import sys
import pickle
import random
import cv2
import json_tricks as json
import numpy as np
from pycocotools.coco import COCO
from torchvision import transforms
import torchvision.transforms.functional as F
from tqdm import tqdm
from PIL import Image
from .HumanPoseEstimation import HumanPoseEstimationDataset as Dataset
from vit_utils.transform import fliplr_joints, affine_transform, get_affine_transform
import numpy as np
class COCODataset(Dataset):
COCODataset class.
def __init__(self, root_path="./datasets/COCO", data_version="train2017",
is_train=True, use_gt_bboxes=True, bbox_path="",
image_width=288, image_height=384,
scale=True, scale_factor=0.35, flip_prob=0.5, rotate_prob=0.5, rotation_factor=45., half_body_prob=0.3,
use_different_joints_weight=False, heatmap_sigma=3, soft_nms=False):
Initializes a new COCODataset object.
Image and annotation indexes are loaded and stored in memory.
Annotations are preprocessed to have a simple list of annotations to iterate over.
Bounding boxes can be loaded from the ground truth or from a pickle file (in this case, no annotations are
root_path (str): dataset root path.
Default: "./datasets/COCO"
data_version (str): desired version/folder of COCO. Possible options are "train2017", "val2017".
Default: "train2017"
is_train (bool): train or eval mode. If true, train mode is used.
Default: True
use_gt_bboxes (bool): use ground truth bounding boxes. If False, bbox_path is required.
Default: True
bbox_path (str): bounding boxes pickle file path.
Default: ""
image_width (int): image width.
Default: 288
image_height (int): image height.
Default: ``384``
color_rgb (bool): rgb or bgr color mode. If True, rgb color mode is used.
Default: True
scale (bool): scale mode.
Default: True
scale_factor (float): scale factor.
Default: 0.35
flip_prob (float): flip probability.
Default: 0.5
rotate_prob (float): rotate probability.
Default: 0.5
rotation_factor (float): rotation factor.
Default: 45.
half_body_prob (float): half body probability.
Default: 0.3
use_different_joints_weight (bool): use different joints weights.
If true, the following joints weights will be used:
[1., 1., 1., 1., 1., 1., 1., 1.2, 1.2, 1.5, 1.5, 1., 1., 1.2, 1.2, 1.5, 1.5]
Default: False
heatmap_sigma (float): sigma of the gaussian used to create the heatmap.
Default: 3
soft_nms (bool): enable soft non-maximum suppression.
Default: False
super(COCODataset, self).__init__()
self.root_path = root_path
self.data_version = data_version
self.is_train = is_train
self.use_gt_bboxes = use_gt_bboxes
self.bbox_path = bbox_path
self.scale = scale # ToDo Check
self.scale_factor = scale_factor
self.flip_prob = flip_prob
self.rotate_prob = rotate_prob
self.rotation_factor = rotation_factor
self.half_body_prob = half_body_prob
self.use_different_joints_weight = use_different_joints_weight # ToDo Check
self.heatmap_sigma = heatmap_sigma
self.soft_nms = soft_nms
# Image & annotation path
self.data_path = f"{root_path}/{data_version}"
self.annotation_path = f"{root_path}/annotations/person_keypoints_{data_version}.json"
self.image_size = (image_width, image_height)
self.aspect_ratio = image_width * 1.0 / image_height
self.heatmap_size = (int(image_width / 4), int(image_height / 4))
self.heatmap_type = 'gaussian'
self.pixel_std = 200 # I don't understand the meaning of pixel_std (=200) in the original implementation
self.num_joints = 25
self.num_joints_half_body = 15
# eye, ear, shoulder, elbow, wrist, hip, knee, ankle
self.flip_pairs = [[1, 2], [3, 4], [6, 7], [8, 9], [10, 11], [12, 13],
[15, 16], [17, 18], [19, 22], [20, 23], [21, 24]]
self.upper_body_ids = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
self.lower_body_ids = [11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22]
self.joints_weight = np.array([1., 1., 1., 1., 1., 1., 1., 1., 1.2, 1.2,
1.5, 1.5, 1., 1., 1., 1.2, 1.2, 1.5, 1.5,
1.5, 1.5, 1.5, 1.5, 1.5,
1.5]).reshape((self.num_joints, 1))
self.transform = transforms.Compose([
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
# Load COCO dataset - Create COCO object then load images and annotations
self.coco = COCO(self.annotation_path)
# Create a list of annotations and the corresponding image (each image can contain more than one detection)
""" Load bboxes and joints
- if self.use_gt_bboxes -> Load GT bboxes and joints
- else -> Load pre-predicted bboxes by a detector (as YOLOv3) and null joints
if not self.use_gt_bboxes:
bboxes must be saved as the original COCO annotations
i.e. the format must be:
bboxes = {
'<imgId>': [
'id': <annId>, # progressive id for debugging
'clean_bbox': np.array([<x>, <y>, <w>, <h>])}
with open(self.bbox_path, 'rb') as fd:
bboxes = pickle.load(fd) = []
# load annotations for each image of COCO
for imgId in tqdm(self.coco.getImgIds(), desc="Prepare images, annotations ... "):
ann_ids = self.coco.getAnnIds(imgIds=imgId, iscrowd=False) # annotation ids
img = self.coco.loadImgs(imgId)[0] # load img
if self.use_gt_bboxes:
objs = self.coco.loadAnns(ann_ids)
# sanitize bboxes
valid_objs = []
for obj in objs:
# Skip non-person objects (it should never happen)
if obj['category_id'] != 1:
# ignore objs without keypoints annotation
if max(obj['keypoints']) == 0 and max(obj['foot_kpts']) == 0:
x, y, w, h = obj['bbox']
x1 = np.max((0, x))
y1 = np.max((0, y))
x2 = np.min((img['width'] - 1, x1 + np.max((0, w - 1))))
y2 = np.min((img['height'] - 1, y1 + np.max((0, h - 1))))
# Use only valid bounding boxes
if obj['area'] > 0 and x2 >= x1 and y2 >= y1:
obj['clean_bbox'] = [x1, y1, x2 - x1, y2 - y1]
objs = valid_objs
objs = bboxes[imgId]
# for each annotation of this image, add the formatted annotation to
for obj in objs:
joints = np.zeros((self.num_joints, 2), dtype=np.float)
joints_visibility = np.ones((self.num_joints, 2), dtype=np.float)
# Add foot data to keypoints
if self.use_gt_bboxes:
""" COCO pre-processing
- Moved above
- Skip non-person objects (it should never happen)
if obj['category_id'] != 1:
# ignore objs without keypoints annotation
if max(obj['keypoints']) == 0:
# Not all joints are already present, skip them
vjoints = list(range(self.num_joints))
for idx, pt in enumerate(vjoints):
if pt == 5 or pt == 14:
continue # Neck and hip are manually filled
joints[pt, 0] = obj['keypoints'][idx * 3 + 0]
joints[pt, 1] = obj['keypoints'][idx * 3 + 1]
t_vis = int(np.clip(obj['keypoints'][idx * 3 + 2], 0, 1))
if visibility == 0 -> keypoint is not in the image.
if visibility == 1 -> keypoint is in the image BUT not visible
(e.g. behind an object).
if visibility == 2 -> keypoint looks clearly
(i.e. it is not hidden).
joints_visibility[pt, 0] = t_vis
joints_visibility[pt, 1] = t_vis
center, scale = self._box2cs(obj['clean_bbox'][:4])
# Add neck and c-hip (check utils/ for keypoints)
joints[5, 0] = (joints[6, 0] + joints[7, 0]) / 2
joints[5, 1] = (joints[6, 1] + joints[7, 1]) / 2
joints_visibility[5, :] = min(joints_visibility[6, 0],
joints_visibility[7, 0])
joints[14, 0] = (joints[12, 0] + joints[13, 0]) / 2
joints[14, 1] = (joints[12, 1] + joints[13, 1]) / 2
joints_visibility[14, :] = min(joints_visibility[12, 0],
joints_visibility[13, 0]){
'imgId': imgId,
'annId': obj['id'],
'imgPath': f"{self.root_path}/{self.data_version}/{imgId:012d}.jpg",
'center': center,
'scale': scale,
'joints': joints,
'joints_visibility': joints_visibility,
# Done check if we need prepare_data -> We should not
print('\nCOCO dataset loaded!')
# Default values
self.bbox_thre = 1.0
self.image_thre = 0.0
self.in_vis_thre = 0.2
self.nms_thre = 1.0
self.oks_thre = 0.9
def __len__(self):
return len(
def __getitem__(self, index):
# index = 0
joints_data =[index].copy()
# Load image
image = np.array(['imgPath']))
if image.ndim == 2:
# Some images are grayscale and will fail the trasform, convert to RGB
image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
raise ValueError(f"Fail to read {joints_data['imgPath']}")
joints = joints_data['joints']
joints_vis = joints_data['joints_visibility']
c = joints_data['center']
s = joints_data['scale']
score = joints_data['score'] if 'score' in joints_data else 1
r = 0
# Apply data augmentation
if self.is_train:
if (self.half_body_prob and random.random() < self.half_body_prob and
np.sum(joints_vis[:, 0]) > self.num_joints_half_body):
c_half_body, s_half_body = self._half_body_transform(joints, joints_vis)
if c_half_body is not None and s_half_body is not None:
c, s = c_half_body, s_half_body
sf = self.scale_factor
rf = self.rotation_factor
if self.scale:
# A random scale factor in [1 - sf, 1 + sf]
s = s * np.clip(random.random() * sf + 1, 1 - sf, 1 + sf)
if self.rotate_prob and random.random() < self.rotate_prob:
# A random rotation factor in [-2 * rf, 2 * rf]
r = np.clip(random.random() * rf, -rf * 2, rf * 2)
r = 0
if self.flip_prob and random.random() < self.flip_prob:
image = image[:, ::-1, :]
joints, joints_vis = fliplr_joints(joints, joints_vis,
c[0] = image.shape[1] - c[0] - 1
# Apply affine transform on joints and image
trans = get_affine_transform(c, s, self.pixel_std, r, self.image_size)
image = cv2.warpAffine(
(int(self.image_size[0]), int(self.image_size[1])),
for i in range(self.num_joints):
if joints_vis[i, 0] > 0.:
joints[i, 0:2] = affine_transform(joints[i, 0:2], trans)
# Convert image to tensor and normalize
if self.transform is not None: # I could remove this check
image = self.transform(image)
target, target_weight = self._generate_target(joints, joints_vis)
# Update metadata
joints_data['joints'] = joints
joints_data['joints_visibility'] = joints_vis
joints_data['center'] = c
joints_data['scale'] = s
joints_data['rotation'] = r
joints_data['score'] = score
# from utils.visualization import draw_points_and_skeleton, joints_dict
# image = np.rollaxis(image.detach().cpu().numpy(), 0, 3)
# joints = np.hstack((joints[:, ::-1], joints_vis[:, 0][..., None]))
# image = draw_points_and_skeleton(image.copy(), joints,
# joints_dict()['coco']['skeleton'],
# person_index=0,
# points_color_palette='gist_rainbow',
# skeleton_color_palette='jet',
# points_palette_samples=10,
# confidence_threshold=0.4)
# cv2.imshow('', image)
# cv2.waitKey(0)
return image, target.astype(np.float32), target_weight.astype(np.float32), joints_data
# Private methods
def _box2cs(self, box):
x, y, w, h = box[:4]
return self._xywh2cs(x, y, w, h)
def _xywh2cs(self, x, y, w, h):
center = np.zeros((2,), dtype=np.float32)
center[0] = x + w * 0.5
center[1] = y + h * 0.5
if w > self.aspect_ratio * h:
h = w * 1.0 / self.aspect_ratio
elif w < self.aspect_ratio * h:
w = h * self.aspect_ratio
scale = np.array(
[w * 1.0 / self.pixel_std, h * 1.0 / self.pixel_std],
if center[0] != -1:
scale = scale * 1.25
return center, scale
def _half_body_transform(self, joints, joints_vis):
upper_joints = []
lower_joints = []
for joint_id in range(self.num_joints):
if joints_vis[joint_id][0] > 0:
if joint_id in self.upper_body_ids:
if random.random() < 0.5 and len(upper_joints) > 2:
selected_joints = upper_joints
selected_joints = lower_joints \
if len(lower_joints) > 2 else upper_joints
if len(selected_joints) < 2:
return None, None
selected_joints = np.array(selected_joints, dtype=np.float32)
center = selected_joints.mean(axis=0)[:2]
left_top = np.amin(selected_joints, axis=0)
right_bottom = np.amax(selected_joints, axis=0)
w = right_bottom[0] - left_top[0]
h = right_bottom[1] - left_top[1]
if w > self.aspect_ratio * h:
h = w * 1.0 / self.aspect_ratio
elif w < self.aspect_ratio * h:
w = h * self.aspect_ratio
scale = np.array(
w * 1.0 / self.pixel_std,
h * 1.0 / self.pixel_std
scale = scale * 1.5
return center, scale
def _generate_target(self, joints, joints_vis):
:param joints: [num_joints, 2]
:param joints_vis: [num_joints, 2]
:return: target, target_weight(1: visible, 0: invisible)
target_weight = np.ones((self.num_joints, 1), dtype=np.float32)
target_weight[:, 0] = joints_vis[:, 0]
if self.heatmap_type == 'gaussian':
target = np.zeros((self.num_joints,
tmp_size = self.heatmap_sigma * 3
for joint_id in range(self.num_joints):
feat_stride = np.asarray(self.image_size) / np.asarray(self.heatmap_size)
mu_x = int(joints[joint_id][0] / feat_stride[0] + 0.5)
mu_y = int(joints[joint_id][1] / feat_stride[1] + 0.5)
# Check that any part of the gaussian is in-bounds
ul = [int(mu_x - tmp_size), int(mu_y - tmp_size)]
br = [int(mu_x + tmp_size + 1), int(mu_y + tmp_size + 1)]
if ul[0] >= self.heatmap_size[0] or ul[1] >= self.heatmap_size[1] \
or br[0] < 0 or br[1] < 0:
# If not, just return the image as is
target_weight[joint_id] = 0
# # Generate gaussian
size = 2 * tmp_size + 1
x = np.arange(0, size, 1, np.float32)
y = x[:, np.newaxis]
x0 = y0 = size // 2
# The gaussian is not normalized, we want the center value to equal 1
g = np.exp(- ((x - x0) ** 2 + (y - y0) ** 2) / (2 * self.heatmap_sigma ** 2))
# Usable gaussian range
g_x = max(0, -ul[0]), min(br[0], self.heatmap_size[0]) - ul[0]
g_y = max(0, -ul[1]), min(br[1], self.heatmap_size[1]) - ul[1]
# Image range
img_x = max(0, ul[0]), min(br[0], self.heatmap_size[0])
img_y = max(0, ul[1]), min(br[1], self.heatmap_size[1])
v = target_weight[joint_id]
if v > 0.5:
target[joint_id][img_y[0]:img_y[1], img_x[0]:img_x[1]] = \
g[g_y[0]:g_y[1], g_x[0]:g_x[1]]
raise NotImplementedError
if self.use_different_joints_weight:
target_weight = np.multiply(target_weight, self.joints_weight)
return target, target_weight
def _write_coco_keypoint_results(self, keypoints, res_file):
data_pack = [
'cat_id': 1, # 1 == 'person'
'cls': 'person',
'ann_type': 'keypoints',
'keypoints': keypoints
results = self._coco_keypoint_results_one_category_kernel(data_pack[0])
with open(res_file, 'w') as f:
json.dump(results, f, sort_keys=True, indent=4)
except Exception:
content = []
with open(res_file, 'r') as f:
for line in f:
content[-1] = ']'
with open(res_file, 'w') as f:
for c in content:
def _coco_keypoint_results_one_category_kernel(self, data_pack):
cat_id = data_pack['cat_id']
keypoints = data_pack['keypoints']
cat_results = []
for img_kpts in keypoints:
if len(img_kpts) == 0:
_key_points = np.array([img_kpts[k]['keypoints'] for k in range(len(img_kpts))], dtype=np.float32)
key_points = np.zeros((_key_points.shape[0], self.num_joints * 3), dtype=np.float32)
for ipt in range(self.num_joints):
key_points[:, ipt * 3 + 0] = _key_points[:, ipt, 0]
key_points[:, ipt * 3 + 1] = _key_points[:, ipt, 1]
key_points[:, ipt * 3 + 2] = _key_points[:, ipt, 2] # keypoints score.
result = [
'image_id': img_kpts[k]['image'],
'category_id': cat_id,
'keypoints': list(key_points[k]),
'score': img_kpts[k]['score'].astype(np.float32),
'center': list(img_kpts[k]['center']),
'scale': list(img_kpts[k]['scale'])
for k in range(len(img_kpts))
return cat_results
if __name__ == '__main__':
# from skimage import io
coco = COCODataset(root_path=f"{os.path.dirname(__file__)}/COCO", data_version="traincoex", rotate_prob=0., half_body_prob=0.)
item = coco[1]
# io.imsave("tmp.jpg", item[0].permute(1,2,0).numpy())
# img = np.clip(np.transpose(item[0].numpy(), (1, 2, 0))[:, :, ::-1] * np.asarray([0.229, 0.224, 0.225]) +
# np.asarray([0.485, 0.456, 0.406]), 0, 1) * 255
# cv2.imwrite('./tmp.png', img.astype(np.uint8))
# print(item[-1])