Spaces:
Running
Running
from utils import ( | |
predict_keypoints_vitpose, | |
get_edge_groups, | |
get_series, | |
z_score_normalization, | |
modify_student_frame, | |
modify_student_frame_2, | |
get_video_frames, | |
check_and_download_models | |
) | |
from config import ( | |
CONNECTIONS_VIT_FULL, | |
CONNECTIONS_FOR_ERROR, | |
EDGE_GROUPS_FOR_ERRORS, | |
EDGE_GROUPS_FOR_SUMMARY, | |
get_thresholds | |
) | |
from dtaidistance import dtw | |
import numpy as np | |
from scipy.signal import savgol_filter | |
from scipy.stats import mstats | |
import datetime | |
from datetime import timedelta | |
import cv2 | |
def video_identity(dtw_mean, dtw_filter, angles_sensitive, angles_common, angles_insensitive, trigger_state, video_teacher, video_student): | |
check_and_download_models() | |
detection_result_teacher = predict_keypoints_vitpose( | |
video_path=video_teacher, | |
model_path="models/vitpose-b-wholebody.pth", | |
model_name="b", | |
detector_path="models/yolov8s.pt" | |
) | |
detection_result_student = predict_keypoints_vitpose( | |
video_path=video_student, | |
model_path="models/vitpose-b-wholebody.pth", | |
model_name="b", | |
detector_path="models/yolov8s.pt" | |
) | |
detection_result_teacher_angles = get_series(detection_result_teacher[:, :,:-1], EDGE_GROUPS_FOR_ERRORS).T | |
detection_result_student_angles = get_series(detection_result_student[:, :,:-1], EDGE_GROUPS_FOR_ERRORS).T | |
edge_groups_for_dtw = get_edge_groups(CONNECTIONS_VIT_FULL) | |
serieses_teacher = get_series(detection_result_teacher[:, :,:-1], edge_groups_for_dtw) | |
serieses_student = get_series(detection_result_student[:, :,:-1], edge_groups_for_dtw) | |
serieses_teacher = z_score_normalization(serieses_teacher) | |
serieses_student = z_score_normalization(serieses_student) | |
list_of_paths = [] | |
for idx in range(len(serieses_teacher)): | |
series_teacher = np.array(serieses_teacher[idx]) | |
series_student = np.array(serieses_student[idx]) | |
_ , paths = dtw.warping_paths(series_teacher, series_student, window=50) | |
path = dtw.best_path(paths) | |
list_of_paths.append(path) | |
all_dtw_tupples = [] | |
for path in list_of_paths: | |
all_dtw_tupples.extend(path) | |
mean_path = [] | |
for student_frame in range(len(serieses_student[0])): | |
frame_from_teacher = [] | |
for frame_teacher in all_dtw_tupples: | |
if frame_teacher[1] == student_frame: | |
frame_from_teacher.append(frame_teacher[0]) | |
mean_path.append((int(mstats.winsorize(np.array(frame_from_teacher), limits=[dtw_mean, dtw_mean]).mean()), student_frame)) | |
path_array = np.array(mean_path) | |
smoothed_data = savgol_filter(path_array, window_length=dtw_filter, polyorder=0, axis=0) | |
path_array = np.array(smoothed_data).astype(int) | |
video_teacher_loaded = get_video_frames(video_teacher) | |
video_student_loaded = get_video_frames(video_student) | |
alignments = np.unique(path_array, axis=0) | |
threshouds_for_errors = get_thresholds(angles_sensitive, angles_common, angles_insensitive) | |
# ====================================================================================== | |
trigger_1 = [] | |
trigger_2 = [] | |
save_teacher_frames = [] | |
save_student_frames = [] | |
all_text_summaries = [] | |
for idx, alignment in enumerate(alignments): | |
frame_student_out, frame_teacher_out, trigger_1, trigger_2, text_info_summary = modify_student_frame( | |
detection_result_student=detection_result_student, | |
detection_result_teacher_angles=detection_result_teacher_angles, | |
detection_result_student_angles=detection_result_student_angles, | |
video_teacher=video_teacher_loaded, | |
video_student=video_student_loaded, | |
alignment_frames=alignment, | |
edge_groups=EDGE_GROUPS_FOR_ERRORS, | |
connections=CONNECTIONS_FOR_ERROR, | |
thresholds=threshouds_for_errors, | |
previously_trigered=trigger_1, | |
previously_trigered_2=trigger_2, | |
triger_state=trigger_state, | |
text_dictionary=EDGE_GROUPS_FOR_SUMMARY | |
) | |
save_teacher_frames.append(frame_teacher_out) | |
save_student_frames.append(frame_student_out) | |
text_info_summary = [(log, idx) for log in text_info_summary] | |
all_text_summaries.extend(text_info_summary) | |
save_teacher_frames = np.array(save_teacher_frames) | |
save_student_frames = np.array(save_student_frames) | |
save_teacher_frames_resized = np.array([cv2.resize(frame, (1280, 720)) for frame in save_teacher_frames]) | |
save_student_frames_resized = np.array([cv2.resize(frame, (1280, 720)) for frame in save_student_frames]) | |
# print(f"video shape: {save_student_frames.shape}") | |
print(f"shape s: {save_student_frames.shape}") | |
print(f"shape t: {save_teacher_frames.shape}") | |
concat_video = [] | |
# print(alignments) | |
concat_video = np.concatenate((save_teacher_frames_resized, save_student_frames_resized), axis=2) | |
concat_video = np.array(concat_video) | |
current_time = datetime.datetime.now() | |
timestamp_str = current_time.strftime("%Y_%m-%d_%H_%M_%S") | |
video_path = f"videos/pose_{timestamp_str}.mp4" | |
out = cv2.VideoWriter(video_path, cv2.VideoWriter_fourcc(*'mp4v'), 30, (1280*2, 720)) | |
for frame in concat_video: | |
out.write(frame) | |
out.release() | |
all_text_summaries_clean = list(set(all_text_summaries)) | |
all_text_summaries_clean.sort(key=lambda x: x[1]) | |
general_summary = [] | |
for log in all_text_summaries_clean: | |
comment, frame = log | |
total_seconds = frame / 30 | |
general_summary.append(f"{comment} on frame {frame}. Video time: {str(timedelta(seconds=total_seconds))[3:-4]}") | |
general_summary = "\n".join(general_summary) | |
log_path = f"logs/log_{timestamp_str}.txt" | |
content = f""" | |
Settings: | |
Dynamic Time Warping: | |
- Winsorize mean: {dtw_mean} | |
- Savitzky-Golay Filter: {dtw_filter} | |
Thresholds: | |
- Sensitive: {angles_sensitive} | |
- Standart: {angles_common} | |
- Insensitive: {angles_insensitive} | |
Patience: | |
- trigger count: {trigger_state} | |
Error logs: | |
{general_summary} | |
""" | |
with open(log_path, "w") as file: | |
file.write(content) | |
return video_path, general_summary, log_path |