import os import shutil import argparse import re import json import numpy as np import cv2 import torch from tqdm import tqdm try: import mmpose # noqa: F401 except Exception as e: print(e) print("mmpose error, installing transformer_utils") os.system("pip install ./main/transformer_utils") def extract_frame_number(file_name): match = re.search(r"(\d{5})", file_name) if match: return int(match.group(1)) return None def merge_npz_files(npz_files, output_file): npz_files = sorted(npz_files, key=lambda x: extract_frame_number(os.path.basename(x))) merged_data = {} for file in npz_files: data = np.load(file) for key in data.files: if key not in merged_data: merged_data[key] = [] merged_data[key].append(data[key]) for key in merged_data: merged_data[key] = np.stack(merged_data[key], axis=0) np.savez(output_file, **merged_data) def npz_to_npz(pkl_path, npz_path): # Load the pickle file pkl_example = np.load(pkl_path, allow_pickle=True) n = pkl_example["expression"].shape[0] # Assuming this is the batch size full_pose = np.concatenate( [ pkl_example["global_orient"], pkl_example["body_pose"], pkl_example["jaw_pose"], pkl_example["leye_pose"], pkl_example["reye_pose"], pkl_example["left_hand_pose"], pkl_example["right_hand_pose"], ], axis=1, ) # print(full_pose.shape) np.savez( npz_path, betas=np.zeros(300), poses=full_pose.reshape(n, -1), expressions=np.zeros((n, 100)), trans=pkl_example["transl"].reshape(n, -1), model="smplx2020", gender="neutral", mocap_frame_rate=30, ) def get_json(root_dir, output_dir): clips = [] dirs = os.listdir(root_dir) all_length = 0 for dir in dirs: if not dir.endswith(".mp4"): continue video_id = dir[:-4] root = root_dir try: length = np.load(os.path.join(root, video_id + ".npz"), allow_pickle=True)["poses"].shape[0] all_length += length except Exception as e: print("cant open ", dir, e) continue clip = { "video_id": video_id, "video_path": root, # "audio_path": root, "motion_path": root, "mode": "test", "start_idx": 0, "end_idx": length, } clips.append(clip) if all_length < 1: print(f"skip due to total frames is less than 1500 for {root_dir}") return 0 else: with open(output_dir, "w") as f: json.dump(clips, f, indent=4) return all_length def infer(video_input, in_threshold, num_people, render_mesh, inferer, OUT_FOLDER): shutil.rmtree(f"{OUT_FOLDER}/smplx", ignore_errors=True) os.makedirs(f"{OUT_FOLDER}/smplx", exist_ok=True) multi_person = num_people cap = cv2.VideoCapture(video_input) video_name = os.path.basename(video_input) success = 1 frame = 0 while success: success, original_img = cap.read() if not success: break frame += 1 _, _, _ = inferer.infer(original_img, in_threshold, frame, multi_person, not (render_mesh)) cap.release() npz_files = [os.path.join(OUT_FOLDER, "smplx", x) for x in os.listdir(os.path.join(OUT_FOLDER, "smplx"))] merge_npz_files(npz_files, os.path.join(OUT_FOLDER, video_name.replace(".mp4", ".npz"))) shutil.rmtree(f"{OUT_FOLDER}/smplx", ignore_errors=True) npz_to_npz(os.path.join(OUT_FOLDER, video_name.replace(".mp4", ".npz")), os.path.join(OUT_FOLDER, video_name.replace(".mp4", ".npz"))) source = video_input destination = os.path.join(OUT_FOLDER, video_name.replace(".mp4", ".npz")).replace(".npz", ".mp4") shutil.copy(source, destination) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--video_folder_path", type=str, default="") parser.add_argument("--data_save_path", type=str, default="") parser.add_argument("--json_save_path", type=str, default="") args = parser.parse_args() video_folder = args.video_folder_path DEFAULT_MODEL = "smpler_x_s32" OUT_FOLDER = args.data_save_path os.makedirs(OUT_FOLDER, exist_ok=True) num_gpus = 1 if torch.cuda.is_available() else -1 index = torch.cuda.current_device() from main.inference import Inferer inferer = Inferer(DEFAULT_MODEL, num_gpus, OUT_FOLDER) for video_input in tqdm(os.listdir(video_folder)): if not video_input.endswith(".mp4"): continue infer(os.path.join(video_folder, video_input), 0.5, False, False, inferer, OUT_FOLDER) get_json(OUT_FOLDER, args.json_save_path)