from utils import ( predict_keypoints_vitpose, get_edge_groups, get_series, z_score_normalization, modify_student_frame, modify_student_frame_2, get_video_frames, check_and_download_models ) from config import ( CONNECTIONS_VIT_FULL, CONNECTIONS_FOR_ERROR, EDGE_GROUPS_FOR_ERRORS, EDGE_GROUPS_FOR_SUMMARY, get_thresholds ) from dtaidistance import dtw import numpy as np from scipy.signal import savgol_filter from scipy.stats import mstats import datetime from datetime import timedelta import cv2 def video_identity(dtw_mean, dtw_filter, angles_sensitive, angles_common, angles_insensitive, trigger_state, video_teacher, video_student): check_and_download_models() detection_result_teacher = predict_keypoints_vitpose( video_path=video_teacher, model_path="models/vitpose-b-wholebody.pth", model_name="b", detector_path="models/yolov8s.pt" ) detection_result_student = predict_keypoints_vitpose( video_path=video_student, model_path="models/vitpose-b-wholebody.pth", model_name="b", detector_path="models/yolov8s.pt" ) detection_result_teacher_angles = get_series(detection_result_teacher[:, :,:-1], EDGE_GROUPS_FOR_ERRORS).T detection_result_student_angles = get_series(detection_result_student[:, :,:-1], EDGE_GROUPS_FOR_ERRORS).T edge_groups_for_dtw = get_edge_groups(CONNECTIONS_VIT_FULL) serieses_teacher = get_series(detection_result_teacher[:, :,:-1], edge_groups_for_dtw) serieses_student = get_series(detection_result_student[:, :,:-1], edge_groups_for_dtw) serieses_teacher = z_score_normalization(serieses_teacher) serieses_student = z_score_normalization(serieses_student) list_of_paths = [] for idx in range(len(serieses_teacher)): series_teacher = np.array(serieses_teacher[idx]) series_student = np.array(serieses_student[idx]) _ , paths = dtw.warping_paths(series_teacher, series_student, window=50) path = dtw.best_path(paths) list_of_paths.append(path) all_dtw_tupples = [] for path in list_of_paths: all_dtw_tupples.extend(path) mean_path = [] for student_frame in range(len(serieses_student[0])): frame_from_teacher = [] for frame_teacher in all_dtw_tupples: if frame_teacher[1] == student_frame: frame_from_teacher.append(frame_teacher[0]) mean_path.append((int(mstats.winsorize(np.array(frame_from_teacher), limits=[dtw_mean, dtw_mean]).mean()), student_frame)) path_array = np.array(mean_path) smoothed_data = savgol_filter(path_array, window_length=dtw_filter, polyorder=0, axis=0) path_array = np.array(smoothed_data).astype(int) video_teacher_loaded = get_video_frames(video_teacher) video_student_loaded = get_video_frames(video_student) alignments = np.unique(path_array, axis=0) threshouds_for_errors = get_thresholds(angles_sensitive, angles_common, angles_insensitive) # ====================================================================================== trigger_1 = [] trigger_2 = [] save_teacher_frames = [] save_student_frames = [] all_text_summaries = [] for idx, alignment in enumerate(alignments): frame_student_out, frame_teacher_out, trigger_1, trigger_2, text_info_summary = modify_student_frame( detection_result_student=detection_result_student, detection_result_teacher_angles=detection_result_teacher_angles, detection_result_student_angles=detection_result_student_angles, video_teacher=video_teacher_loaded, video_student=video_student_loaded, alignment_frames=alignment, edge_groups=EDGE_GROUPS_FOR_ERRORS, connections=CONNECTIONS_FOR_ERROR, thresholds=threshouds_for_errors, previously_trigered=trigger_1, previously_trigered_2=trigger_2, triger_state=trigger_state, text_dictionary=EDGE_GROUPS_FOR_SUMMARY ) save_teacher_frames.append(frame_teacher_out) save_student_frames.append(frame_student_out) text_info_summary = [(log, idx) for log in text_info_summary] all_text_summaries.extend(text_info_summary) save_teacher_frames = np.array(save_teacher_frames) save_student_frames = np.array(save_student_frames) save_teacher_frames_resized = np.array([cv2.resize(frame, (1280, 720)) for frame in save_teacher_frames]) save_student_frames_resized = np.array([cv2.resize(frame, (1280, 720)) for frame in save_student_frames]) # print(f"video shape: {save_student_frames.shape}") print(f"shape s: {save_student_frames.shape}") print(f"shape t: {save_teacher_frames.shape}") concat_video = [] # print(alignments) concat_video = np.concatenate((save_teacher_frames_resized, save_student_frames_resized), axis=2) concat_video = np.array(concat_video) current_time = datetime.datetime.now() timestamp_str = current_time.strftime("%Y_%m-%d_%H_%M_%S") video_path = f"videos/pose_{timestamp_str}.mp4" out = cv2.VideoWriter(video_path, cv2.VideoWriter_fourcc(*'mp4v'), 30, (1280*2, 720)) for frame in concat_video: out.write(frame) out.release() all_text_summaries_clean = list(set(all_text_summaries)) all_text_summaries_clean.sort(key=lambda x: x[1]) general_summary = [] for log in all_text_summaries_clean: comment, frame = log total_seconds = frame / 30 general_summary.append(f"{comment} on frame {frame}. Video time: {str(timedelta(seconds=total_seconds))[3:-4]}") general_summary = "\n".join(general_summary) log_path = f"logs/log_{timestamp_str}.txt" content = f""" Settings: Dynamic Time Warping: - Winsorize mean: {dtw_mean} - Savitzky-Golay Filter: {dtw_filter} Thresholds: - Sensitive: {angles_sensitive} - Standart: {angles_common} - Insensitive: {angles_insensitive} Patience: - trigger count: {trigger_state} Error logs: {general_summary} """ with open(log_path, "w") as file: file.write(content) return video_path, general_summary, log_path