Skip to content

Video Processing

Configuration dataclass

Source code in src/video_processing/configuration.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@dataclass
class Configuration:
    mode: str
    video_path: str
    faces_output_dir: str
    face_rectangle: Rect
    camera_rectangle: Rect
    camera_output_dir: str
    pixel_diff_threshold: float
    variance_filter_value: float
    mean_filter_value: float
    variance_filter_tolerance: float
    mean_filter_tolerance: float
    face_detection_model_path: str
    face_detection_min_size: tuple[int, int]
    samples_per_sequence: int
    offset_before_marker: int | None
    offset_after_marker: int | None

    @classmethod
    def from_args(cls, args) -> Self:
        """Create Configuration object from parsed command-line arguments."""
        return cls(
            mode=args.mode,
            video_path=args.video_path,
            faces_output_dir=args.faces_output_dir,
            face_rectangle=Rect(
                args.face_rect_x,
                args.face_rect_y,
                args.face_rect_width,
                args.face_rect_height,
            ),
            camera_rectangle=Rect(
                args.camera_rect_x,
                args.camera_rect_y,
                args.camera_rect_width,
                args.camera_rect_height,
            ),
            camera_output_dir=args.camera_output_dir,
            pixel_diff_threshold=args.pixel_diff_threshold,
            variance_filter_value=args.variance_filter_value,
            mean_filter_value=args.mean_filter_value,
            variance_filter_tolerance=args.variance_filter_tolerance,
            mean_filter_tolerance=args.mean_filter_tolerance,
            face_detection_model_path=args.face_detection_model_path,
            face_detection_min_size=(
                args.face_detection_min_width,
                args.face_detection_min_height,
            ),
            samples_per_sequence=args.samples_per_sequence,
            offset_before_marker=args.offset_before_marker,
            offset_after_marker=args.offset_after_marker,
        )

    def __post_init__(self):
        if self.mode == "offset":
            assert self.offset_before_marker is not None
            assert self.offset_after_marker is not None

from_args(args) classmethod

Create Configuration object from parsed command-line arguments.

Source code in src/video_processing/configuration.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@classmethod
def from_args(cls, args) -> Self:
    """Create Configuration object from parsed command-line arguments."""
    return cls(
        mode=args.mode,
        video_path=args.video_path,
        faces_output_dir=args.faces_output_dir,
        face_rectangle=Rect(
            args.face_rect_x,
            args.face_rect_y,
            args.face_rect_width,
            args.face_rect_height,
        ),
        camera_rectangle=Rect(
            args.camera_rect_x,
            args.camera_rect_y,
            args.camera_rect_width,
            args.camera_rect_height,
        ),
        camera_output_dir=args.camera_output_dir,
        pixel_diff_threshold=args.pixel_diff_threshold,
        variance_filter_value=args.variance_filter_value,
        mean_filter_value=args.mean_filter_value,
        variance_filter_tolerance=args.variance_filter_tolerance,
        mean_filter_tolerance=args.mean_filter_tolerance,
        face_detection_model_path=args.face_detection_model_path,
        face_detection_min_size=(
            args.face_detection_min_width,
            args.face_detection_min_height,
        ),
        samples_per_sequence=args.samples_per_sequence,
        offset_before_marker=args.offset_before_marker,
        offset_after_marker=args.offset_after_marker,
    )

VideoProcessor

Class to process the screen recording and transform it into a dataset of image files.

Source code in src/video_processing/processor.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
class VideoProcessor:
    """Class to process the screen recording and transform it into a dataset of image files."""

    def __init__(self, config: Configuration):
        self.config = config

        self.statistics_filter = StatisticsFilter(
            variance=config.variance_filter_value,
            mean=config.mean_filter_value,
            variance_tolerance=config.variance_filter_tolerance,
            mean_tolerance=config.mean_filter_tolerance,
        )
        self.similarity_filter = SimilarityFilter(
            mse_threshold=config.pixel_diff_threshold,
        )
        self.face_detection_filter = FaceDetectionFilter(
            cascade_weights_path=config.face_detection_model_path,
            min_size=config.face_detection_min_size,
        )
        self.filters = (
            self.statistics_filter,
            self.similarity_filter,
            self.face_detection_filter,
        )

        self.frames_processed = 0
        self.frames_saved = 0
        self.curr_face_idx = 1
        self.previous_frame = None
        self.filter_counts = {
            FilterType.SIMILARITY: 0,
            FilterType.STATISTICS: 0,
            FilterType.FACE_DETECTION: 0,
        }

        self.sequence_markers: list[SequenceMarker] = [
            SequenceMarker(0, 0)
        ]  # Starts with no face (static icon)

        os.makedirs(config.faces_output_dir, exist_ok=True)
        os.makedirs(config.camera_output_dir, exist_ok=True)

    def find_face_sequences(self):
        """Find sequences in video stream and extract unique faces."""
        with video_capture(self.config.video_path) as cap:
            video_props = VideoProperties.from_capture(cap)
            self._print_configuration_info(video_props)

            is_now_inside_face_sequence = False

            while True:
                ret, frame = cap.read()
                if not ret:
                    break

                face, face_grayscale = self._extract_face(frame)

                # For the first frame that contains the static icon after a sequence of faces, a marker is added.
                if is_now_inside_face_sequence:
                    if self._is_static_icon_in_bottom_left_corner(face_grayscale):
                        is_now_inside_face_sequence = False
                        self.sequence_markers.append(
                            SequenceMarker(self.frames_processed, 0)
                        )

                # Check if the is the region contains a face that is a beginning of a new sequence.
                # (After a sequence of static icons), a marker is added and the face image is saved.
                is_new_face = self._is_new_face_in_bottom_left_corner(face_grayscale)
                if is_new_face:
                    self.save_face_file(face, self.frames_processed, self.curr_face_idx)
                    self.sequence_markers.append(
                        SequenceMarker(self.frames_processed, self.curr_face_idx)
                    )
                    is_now_inside_face_sequence = True
                    self.frames_saved += 1
                    self.curr_face_idx += 1

                self.previous_frame = face_grayscale.copy()
                self.frames_processed += 1

                if self.frames_processed % 1000 == 0:
                    self._print_progress(video_props)

        self._print_summary()
        self._print_sequence_markers_summary()

    def save_sampled_camera_regions_from_sequences(
        self, frame_idx_to_face_idx: dict[int, int]
    ):
        """Save camera regions from evenly distributed frames within each sequence.

        For each sequence, this method samples frames evenly distributed across the sequence
        length and saves the camera region for each sampled frame.

        Args:
            samples_per_sequence: Number of frames to sample from each sequence
        """

        with video_capture(self.config.video_path) as cap:
            current_frame_idx = 0

            while len(frame_idx_to_face_idx) > 0:
                ret, frame = cap.read()
                if not ret:
                    break

                if current_frame_idx in frame_idx_to_face_idx:
                    face_idx = frame_idx_to_face_idx.pop(current_frame_idx)
                    camera_region = self._extract_camera_region(frame)

                    self._save_camera_region_file(
                        camera_region, current_frame_idx, face_idx
                    )

                current_frame_idx += 1

    def sample_frames_from_sequences_naive(
        self, samples_per_sequence: int
    ) -> dict[int, int]:
        """Sample given number of frames from each sequence.

        Return a mapping of sampled frame indices (keys) to face indices (values) present on those frames.
        """
        assert (
            len(self.sequence_markers) > 0
        ), "No sequences found to sample frames from."
        assert (
            samples_per_sequence > 1
        ), "Number of samples per sequence must be greater than 1."
        sequence_lengths = self.get_sequence_lengths()
        assert (
            len(sequence_lengths) > 0
        ), "No sequence lengths found to sample frames from."

        frame_idx_to_face_idx = {}

        for marker, length in zip(self.sequence_markers, sequence_lengths):
            sequence_start = marker.frame_number

            if (
                length <= samples_per_sequence
            ):  # If sequence is shorter than desired samples, take all frames
                sampled_frames = list(range(sequence_start, sequence_start + length))
            else:  # Calculate evenly distributed indices
                step = length / (samples_per_sequence - 1)
                sampled_frames = [
                    (
                        sequence_start + length - 1
                        if i
                        == samples_per_sequence
                        - 1  # Last sample should be the last frame of the sequence
                        else sequence_start + int(i * step)
                    )
                    for i in range(samples_per_sequence)
                ]

            for frame_idx in sampled_frames:
                frame_idx_to_face_idx[frame_idx] = marker.face_index

        return frame_idx_to_face_idx

    def sample_frames_from_sequence_offset_based(
        self, sequence_descriptors: list[SequenceDescriptor], samples_per_sequence: int
    ) -> dict[int, int]:
        """Sampling strategy based on sequence descriptors - defined start and end frames."""
        frame_idx_to_face_idx = {}

        for descriptor in sequence_descriptors:
            sequence_start = descriptor.start_frame
            sequence_end = descriptor.end_frame
            face_index = descriptor.face_index

            if (
                sequence_end - sequence_start + 1 <= samples_per_sequence
            ):  # If sequence is shorter than desired samples, take all frames
                sampled_frames = list(range(sequence_start, sequence_end + 1))
            else:  # Calculate evenly distributed indices
                step = (sequence_end - sequence_start) / (samples_per_sequence - 1)
                sampled_frames = [
                    (
                        sequence_end
                        if i == samples_per_sequence - 1
                        else sequence_start + int(i * step)
                    )
                    for i in range(samples_per_sequence)
                ]

            for frame_idx in sampled_frames:
                frame_idx_to_face_idx[frame_idx] = face_index

        return frame_idx_to_face_idx

    def get_offset_sequence_bounds(
        self, frames_before_marker: int, frames_after_marker: int
    ) -> list[SequenceDescriptor]:
        """Recalculate sequence bounds as offsets from the sequence markers.

        Sequence markers mark the frames where the system recognized a face or the static icon was displayed.
        Some time before recognizing a face, the person must have appeared in the camera view.
        Some time after recognizing a face (not necessarily when the icon appears in the corner),
        the person must have left the camera view.

        This approach calculates the start and end frames of each sequence based on offsets from the markers.
        Throw away the sequences that (supposedly) do not contain a face (face_index == 0).
        """
        return [
            SequenceDescriptor(
                start_frame=marker.frame_number - frames_before_marker,
                end_frame=marker.frame_number + frames_after_marker,
                face_index=marker.face_index,
            )
            for marker in self.sequence_markers
            if marker.face_index > 0
        ]

    @staticmethod
    def _validate_rectangle(rect: Rect, video_props: VideoProperties):
        """Validate that the rectangle is within the bounds of the video properties."""
        if (
            rect.x < 0
            or rect.y < 0
            or rect.x + rect.width > video_props.width
            or rect.y + rect.height > video_props.height
        ):
            raise ValueError(
                f"Error: Rectangle is outside video bounds ({video_props.width}x{video_props.height})"
            )

    def _is_new_face_in_bottom_left_corner(self, face_grayscale) -> bool:

        for frame_filter in self.filters:
            if frame_filter.should_filter_out(face_grayscale, self.previous_frame):
                self.filter_counts[frame_filter.type()] += 1
                return False

        return True

    def _is_static_icon_in_bottom_left_corner(self, face_grayscale) -> bool:
        """Check if the region matches the statistics for the icon.

        Icon is displayed when there is no face.
        The pixel values are not an exact match, so checking statistics.
        """
        return self.statistics_filter.should_filter_out(face_grayscale, None)

    def _extract_face(self, frame):
        rect = self.config.face_rectangle
        face = frame[rect.y : rect.y + rect.height, rect.x : rect.x + rect.width]
        face = cv2.rotate(face, cv2.ROTATE_90_CLOCKWISE)
        face_grayscale = cv2.cvtColor(face, cv2.COLOR_BGR2GRAY)
        face_grayscale = np.array(face_grayscale)
        return face, face_grayscale

    def _extract_camera_region(self, frame):
        rect = self.config.camera_rectangle
        camera_region = frame[
            rect.y : rect.y + rect.height, rect.x : rect.x + rect.width
        ]
        camera_region = cv2.rotate(camera_region, cv2.ROTATE_90_CLOCKWISE)
        return camera_region

    def _save_camera_region_file(self, camera_region, frame_idx: int, face_idx: int):
        filename = f"frame_{frame_idx:06d}_with_face_{face_idx:06d}.jpg"
        filepath = os.path.join(self.config.camera_output_dir, filename)
        success = cv2.imwrite(filepath, camera_region)

        if not success:
            raise RuntimeError(
                f"Failed to save camera region from frame {frame_idx} to {filepath}"
            )

        print(f"Saved camera region from frame {frame_idx}: {filename}")

    def save_face_file(self, face, frame_idx: int, face_idx: int):
        filename = f"frame_{frame_idx:06d}_face_{face_idx:06d}.jpg"
        filepath = os.path.join(self.config.faces_output_dir, filename)
        success = cv2.imwrite(filepath, face)

        if not success:
            raise RuntimeError(f"Failed to save face {frame_idx} to {filepath}")

        print(f"Saved face from frame {frame_idx:06d}: {filename}")

    def _print_configuration_info(self, video_props: VideoProperties):
        print(f"Video Properties: {video_props}")
        print(f"Configuration: {self.config}")
        print(f"Face rectangle: {self.config.face_rectangle}")

    def _print_summary(self):
        print(f"Extraction complete!")
        print(f"Total frames processed: {self.frames_processed}")
        print(f"Successfully saved: {self.frames_saved} images")
        print(
            f"Filtered by pixel similarity: {self.filter_counts[FilterType.SIMILARITY]}"
        )
        print(f"Filtered by statistics: {self.filter_counts[FilterType.STATISTICS]}")
        print(
            f"Filtered by face detection: {self.filter_counts[FilterType.FACE_DETECTION]}"
        )
        print(f"Total filtered: {sum(self.filter_counts.values())}")
        print(
            f"Filtering efficiency: {((self.frames_processed - self.frames_saved) / self.frames_processed * 100):.1f}% frames filtered out"
        )
        print(f"Output directory: {self.config.faces_output_dir}")

    def _print_progress(self, video_props: VideoProperties):
        progress = (self.frames_processed / video_props.total_frames) * 100
        print(
            f"Progress: {progress:.1f}% - Processed {self.frames_processed}/{video_props.total_frames} frames, Saved {self.frames_saved} unique"
        )

    def _print_sequence_markers_summary(self):
        print("-" * 40)
        print(f"Sequence markers: {len(self.sequence_markers)}")
        for marker in self.sequence_markers:
            print(
                f"Frame {marker.frame_number:06d}, Face Index {marker.face_index:06d}"
            )

    def get_sequence_lengths(self) -> list[int]:
        """
        Convert sequence markers into a list of sequence lengths.

        Sequence markers mark the beginning of new sequences. This function calculates
        the length of each sequence by finding the distance between consecutive markers.

        Returns:
            list[int]: List of sequence lengths in frames
        """
        if len(self.sequence_markers) < 2:
            return []

        sequence_lengths = []

        for i in range(len(self.sequence_markers) - 1):
            current_marker = self.sequence_markers[i]
            next_marker = self.sequence_markers[i + 1]

            # Length is the distance between consecutive markers
            sequence_length = next_marker.frame_number - current_marker.frame_number
            sequence_lengths.append(sequence_length)

        # For the last sequence, calculate length from last marker to end of video
        if self.sequence_markers:
            last_marker = self.sequence_markers[-1]
            last_sequence_length = self.frames_processed - last_marker.frame_number
            sequence_lengths.append(last_sequence_length)

        return sequence_lengths

    def print_sequence_analysis(self):
        """Print detailed analysis of sequence lengths and patterns."""
        sequence_lengths = self.get_sequence_lengths()

        if not sequence_lengths:
            print("No sequences found for analysis.")
            return

        print("-" * 60)
        print("SEQUENCE ANALYSIS")
        print("-" * 60)

        # Basic statistics
        total_sequences = len(sequence_lengths)
        min_length = min(sequence_lengths)
        max_length = max(sequence_lengths)
        avg_length = sum(sequence_lengths) / len(sequence_lengths)

        print(f"Total sequences: {total_sequences}")
        print(f"Sequence lengths: {sequence_lengths}")
        print(f"Min length: {min_length} frames")
        print(f"Max length: {max_length} frames")
        print(f"Average length: {avg_length:.1f} frames")

        # Detailed breakdown
        print("\nSequence breakdown:")
        for i, (marker, length) in enumerate(
            zip(self.sequence_markers, sequence_lengths)
        ):
            sequence_type = (
                "Face sequence" if marker.face_index > 0 else "Static icon sequence"
            )
            print(
                f"Sequence {i+1:2d}: Frame {marker.frame_number:06d} -> {length:3d} frames ({sequence_type})"
            )

        # Pattern analysis
        face_sequences = [
            length
            for marker, length in zip(self.sequence_markers, sequence_lengths)
            if marker.face_index > 0
        ]
        icon_sequences = [
            length
            for marker, length in zip(self.sequence_markers, sequence_lengths)
            if marker.face_index == 0
        ]

        if face_sequences:
            print(
                f"\nFace sequences: {len(face_sequences)} total, avg length: {sum(face_sequences)/len(face_sequences):.1f} frames"
            )
        if icon_sequences:
            print(
                f"Icon sequences: {len(icon_sequences)} total, avg length: {sum(icon_sequences)/len(icon_sequences):.1f} frames"
            )

find_face_sequences()

Find sequences in video stream and extract unique faces.

Source code in src/video_processing/processor.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def find_face_sequences(self):
    """Find sequences in video stream and extract unique faces."""
    with video_capture(self.config.video_path) as cap:
        video_props = VideoProperties.from_capture(cap)
        self._print_configuration_info(video_props)

        is_now_inside_face_sequence = False

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            face, face_grayscale = self._extract_face(frame)

            # For the first frame that contains the static icon after a sequence of faces, a marker is added.
            if is_now_inside_face_sequence:
                if self._is_static_icon_in_bottom_left_corner(face_grayscale):
                    is_now_inside_face_sequence = False
                    self.sequence_markers.append(
                        SequenceMarker(self.frames_processed, 0)
                    )

            # Check if the is the region contains a face that is a beginning of a new sequence.
            # (After a sequence of static icons), a marker is added and the face image is saved.
            is_new_face = self._is_new_face_in_bottom_left_corner(face_grayscale)
            if is_new_face:
                self.save_face_file(face, self.frames_processed, self.curr_face_idx)
                self.sequence_markers.append(
                    SequenceMarker(self.frames_processed, self.curr_face_idx)
                )
                is_now_inside_face_sequence = True
                self.frames_saved += 1
                self.curr_face_idx += 1

            self.previous_frame = face_grayscale.copy()
            self.frames_processed += 1

            if self.frames_processed % 1000 == 0:
                self._print_progress(video_props)

    self._print_summary()
    self._print_sequence_markers_summary()

get_offset_sequence_bounds(frames_before_marker, frames_after_marker)

Recalculate sequence bounds as offsets from the sequence markers.

Sequence markers mark the frames where the system recognized a face or the static icon was displayed. Some time before recognizing a face, the person must have appeared in the camera view. Some time after recognizing a face (not necessarily when the icon appears in the corner), the person must have left the camera view.

This approach calculates the start and end frames of each sequence based on offsets from the markers. Throw away the sequences that (supposedly) do not contain a face (face_index == 0).

Source code in src/video_processing/processor.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def get_offset_sequence_bounds(
    self, frames_before_marker: int, frames_after_marker: int
) -> list[SequenceDescriptor]:
    """Recalculate sequence bounds as offsets from the sequence markers.

    Sequence markers mark the frames where the system recognized a face or the static icon was displayed.
    Some time before recognizing a face, the person must have appeared in the camera view.
    Some time after recognizing a face (not necessarily when the icon appears in the corner),
    the person must have left the camera view.

    This approach calculates the start and end frames of each sequence based on offsets from the markers.
    Throw away the sequences that (supposedly) do not contain a face (face_index == 0).
    """
    return [
        SequenceDescriptor(
            start_frame=marker.frame_number - frames_before_marker,
            end_frame=marker.frame_number + frames_after_marker,
            face_index=marker.face_index,
        )
        for marker in self.sequence_markers
        if marker.face_index > 0
    ]

get_sequence_lengths()

Convert sequence markers into a list of sequence lengths.

Sequence markers mark the beginning of new sequences. This function calculates the length of each sequence by finding the distance between consecutive markers.

Returns:

Type Description
list[int]

list[int]: List of sequence lengths in frames

Source code in src/video_processing/processor.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def get_sequence_lengths(self) -> list[int]:
    """
    Convert sequence markers into a list of sequence lengths.

    Sequence markers mark the beginning of new sequences. This function calculates
    the length of each sequence by finding the distance between consecutive markers.

    Returns:
        list[int]: List of sequence lengths in frames
    """
    if len(self.sequence_markers) < 2:
        return []

    sequence_lengths = []

    for i in range(len(self.sequence_markers) - 1):
        current_marker = self.sequence_markers[i]
        next_marker = self.sequence_markers[i + 1]

        # Length is the distance between consecutive markers
        sequence_length = next_marker.frame_number - current_marker.frame_number
        sequence_lengths.append(sequence_length)

    # For the last sequence, calculate length from last marker to end of video
    if self.sequence_markers:
        last_marker = self.sequence_markers[-1]
        last_sequence_length = self.frames_processed - last_marker.frame_number
        sequence_lengths.append(last_sequence_length)

    return sequence_lengths

print_sequence_analysis()

Print detailed analysis of sequence lengths and patterns.

Source code in src/video_processing/processor.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
def print_sequence_analysis(self):
    """Print detailed analysis of sequence lengths and patterns."""
    sequence_lengths = self.get_sequence_lengths()

    if not sequence_lengths:
        print("No sequences found for analysis.")
        return

    print("-" * 60)
    print("SEQUENCE ANALYSIS")
    print("-" * 60)

    # Basic statistics
    total_sequences = len(sequence_lengths)
    min_length = min(sequence_lengths)
    max_length = max(sequence_lengths)
    avg_length = sum(sequence_lengths) / len(sequence_lengths)

    print(f"Total sequences: {total_sequences}")
    print(f"Sequence lengths: {sequence_lengths}")
    print(f"Min length: {min_length} frames")
    print(f"Max length: {max_length} frames")
    print(f"Average length: {avg_length:.1f} frames")

    # Detailed breakdown
    print("\nSequence breakdown:")
    for i, (marker, length) in enumerate(
        zip(self.sequence_markers, sequence_lengths)
    ):
        sequence_type = (
            "Face sequence" if marker.face_index > 0 else "Static icon sequence"
        )
        print(
            f"Sequence {i+1:2d}: Frame {marker.frame_number:06d} -> {length:3d} frames ({sequence_type})"
        )

    # Pattern analysis
    face_sequences = [
        length
        for marker, length in zip(self.sequence_markers, sequence_lengths)
        if marker.face_index > 0
    ]
    icon_sequences = [
        length
        for marker, length in zip(self.sequence_markers, sequence_lengths)
        if marker.face_index == 0
    ]

    if face_sequences:
        print(
            f"\nFace sequences: {len(face_sequences)} total, avg length: {sum(face_sequences)/len(face_sequences):.1f} frames"
        )
    if icon_sequences:
        print(
            f"Icon sequences: {len(icon_sequences)} total, avg length: {sum(icon_sequences)/len(icon_sequences):.1f} frames"
        )

sample_frames_from_sequence_offset_based(sequence_descriptors, samples_per_sequence)

Sampling strategy based on sequence descriptors - defined start and end frames.

Source code in src/video_processing/processor.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def sample_frames_from_sequence_offset_based(
    self, sequence_descriptors: list[SequenceDescriptor], samples_per_sequence: int
) -> dict[int, int]:
    """Sampling strategy based on sequence descriptors - defined start and end frames."""
    frame_idx_to_face_idx = {}

    for descriptor in sequence_descriptors:
        sequence_start = descriptor.start_frame
        sequence_end = descriptor.end_frame
        face_index = descriptor.face_index

        if (
            sequence_end - sequence_start + 1 <= samples_per_sequence
        ):  # If sequence is shorter than desired samples, take all frames
            sampled_frames = list(range(sequence_start, sequence_end + 1))
        else:  # Calculate evenly distributed indices
            step = (sequence_end - sequence_start) / (samples_per_sequence - 1)
            sampled_frames = [
                (
                    sequence_end
                    if i == samples_per_sequence - 1
                    else sequence_start + int(i * step)
                )
                for i in range(samples_per_sequence)
            ]

        for frame_idx in sampled_frames:
            frame_idx_to_face_idx[frame_idx] = face_index

    return frame_idx_to_face_idx

sample_frames_from_sequences_naive(samples_per_sequence)

Sample given number of frames from each sequence.

Return a mapping of sampled frame indices (keys) to face indices (values) present on those frames.

Source code in src/video_processing/processor.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def sample_frames_from_sequences_naive(
    self, samples_per_sequence: int
) -> dict[int, int]:
    """Sample given number of frames from each sequence.

    Return a mapping of sampled frame indices (keys) to face indices (values) present on those frames.
    """
    assert (
        len(self.sequence_markers) > 0
    ), "No sequences found to sample frames from."
    assert (
        samples_per_sequence > 1
    ), "Number of samples per sequence must be greater than 1."
    sequence_lengths = self.get_sequence_lengths()
    assert (
        len(sequence_lengths) > 0
    ), "No sequence lengths found to sample frames from."

    frame_idx_to_face_idx = {}

    for marker, length in zip(self.sequence_markers, sequence_lengths):
        sequence_start = marker.frame_number

        if (
            length <= samples_per_sequence
        ):  # If sequence is shorter than desired samples, take all frames
            sampled_frames = list(range(sequence_start, sequence_start + length))
        else:  # Calculate evenly distributed indices
            step = length / (samples_per_sequence - 1)
            sampled_frames = [
                (
                    sequence_start + length - 1
                    if i
                    == samples_per_sequence
                    - 1  # Last sample should be the last frame of the sequence
                    else sequence_start + int(i * step)
                )
                for i in range(samples_per_sequence)
            ]

        for frame_idx in sampled_frames:
            frame_idx_to_face_idx[frame_idx] = marker.face_index

    return frame_idx_to_face_idx

save_sampled_camera_regions_from_sequences(frame_idx_to_face_idx)

Save camera regions from evenly distributed frames within each sequence.

For each sequence, this method samples frames evenly distributed across the sequence length and saves the camera region for each sampled frame.

Parameters:

Name Type Description Default
samples_per_sequence

Number of frames to sample from each sequence

required
Source code in src/video_processing/processor.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def save_sampled_camera_regions_from_sequences(
    self, frame_idx_to_face_idx: dict[int, int]
):
    """Save camera regions from evenly distributed frames within each sequence.

    For each sequence, this method samples frames evenly distributed across the sequence
    length and saves the camera region for each sampled frame.

    Args:
        samples_per_sequence: Number of frames to sample from each sequence
    """

    with video_capture(self.config.video_path) as cap:
        current_frame_idx = 0

        while len(frame_idx_to_face_idx) > 0:
            ret, frame = cap.read()
            if not ret:
                break

            if current_frame_idx in frame_idx_to_face_idx:
                face_idx = frame_idx_to_face_idx.pop(current_frame_idx)
                camera_region = self._extract_camera_region(frame)

                self._save_camera_region_file(
                    camera_region, current_frame_idx, face_idx
                )

            current_frame_idx += 1

naive_strategy(config)

Naive sampling strategy that samples frames from each sequence.

Source code in src/video_processing/processor.py
426
427
428
429
430
431
432
433
434
435
436
def naive_strategy(config: Configuration):
    """Naive sampling strategy that samples frames from each sequence."""
    processor = VideoProcessor(config)
    processor.find_face_sequences()
    processor.print_sequence_analysis()

    # Sample frames from sequences
    sequence_mapping = processor.sample_frames_from_sequences_naive(
        config.samples_per_sequence
    )
    processor.save_sampled_camera_regions_from_sequences(sequence_mapping)

offset_strategy(config)

Offset sampling strategy that samples frames based on sequence descriptors.

Source code in src/video_processing/processor.py
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
def offset_strategy(config: Configuration):
    """Offset sampling strategy that samples frames based on sequence descriptors."""
    processor = VideoProcessor(config)
    processor.find_face_sequences()
    processor.print_sequence_analysis()

    # Offset sequence sampling
    assert config.offset_before_marker is not None
    assert config.offset_after_marker is not None
    seq_descriptors = processor.get_offset_sequence_bounds(
        config.offset_before_marker, config.offset_after_marker
    )
    samples = processor.sample_frames_from_sequence_offset_based(
        seq_descriptors, config.samples_per_sequence
    )
    processor.save_sampled_camera_regions_from_sequences(samples)

configuration

Configuration dataclass

Source code in src/video_processing/configuration.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@dataclass
class Configuration:
    mode: str
    video_path: str
    faces_output_dir: str
    face_rectangle: Rect
    camera_rectangle: Rect
    camera_output_dir: str
    pixel_diff_threshold: float
    variance_filter_value: float
    mean_filter_value: float
    variance_filter_tolerance: float
    mean_filter_tolerance: float
    face_detection_model_path: str
    face_detection_min_size: tuple[int, int]
    samples_per_sequence: int
    offset_before_marker: int | None
    offset_after_marker: int | None

    @classmethod
    def from_args(cls, args) -> Self:
        """Create Configuration object from parsed command-line arguments."""
        return cls(
            mode=args.mode,
            video_path=args.video_path,
            faces_output_dir=args.faces_output_dir,
            face_rectangle=Rect(
                args.face_rect_x,
                args.face_rect_y,
                args.face_rect_width,
                args.face_rect_height,
            ),
            camera_rectangle=Rect(
                args.camera_rect_x,
                args.camera_rect_y,
                args.camera_rect_width,
                args.camera_rect_height,
            ),
            camera_output_dir=args.camera_output_dir,
            pixel_diff_threshold=args.pixel_diff_threshold,
            variance_filter_value=args.variance_filter_value,
            mean_filter_value=args.mean_filter_value,
            variance_filter_tolerance=args.variance_filter_tolerance,
            mean_filter_tolerance=args.mean_filter_tolerance,
            face_detection_model_path=args.face_detection_model_path,
            face_detection_min_size=(
                args.face_detection_min_width,
                args.face_detection_min_height,
            ),
            samples_per_sequence=args.samples_per_sequence,
            offset_before_marker=args.offset_before_marker,
            offset_after_marker=args.offset_after_marker,
        )

    def __post_init__(self):
        if self.mode == "offset":
            assert self.offset_before_marker is not None
            assert self.offset_after_marker is not None

from_args(args) classmethod

Create Configuration object from parsed command-line arguments.

Source code in src/video_processing/configuration.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@classmethod
def from_args(cls, args) -> Self:
    """Create Configuration object from parsed command-line arguments."""
    return cls(
        mode=args.mode,
        video_path=args.video_path,
        faces_output_dir=args.faces_output_dir,
        face_rectangle=Rect(
            args.face_rect_x,
            args.face_rect_y,
            args.face_rect_width,
            args.face_rect_height,
        ),
        camera_rectangle=Rect(
            args.camera_rect_x,
            args.camera_rect_y,
            args.camera_rect_width,
            args.camera_rect_height,
        ),
        camera_output_dir=args.camera_output_dir,
        pixel_diff_threshold=args.pixel_diff_threshold,
        variance_filter_value=args.variance_filter_value,
        mean_filter_value=args.mean_filter_value,
        variance_filter_tolerance=args.variance_filter_tolerance,
        mean_filter_tolerance=args.mean_filter_tolerance,
        face_detection_model_path=args.face_detection_model_path,
        face_detection_min_size=(
            args.face_detection_min_width,
            args.face_detection_min_height,
        ),
        samples_per_sequence=args.samples_per_sequence,
        offset_before_marker=args.offset_before_marker,
        offset_after_marker=args.offset_after_marker,
    )

filters

FaceDetectionFilter

Bases: FaceFilter

Source code in src/video_processing/filters.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
class FaceDetectionFilter(FaceFilter):

    def __init__(self, cascade_weights_path: str, min_size: tuple[int, int]):
        self.cascade_classifier = self.create_haar_cascade_classifier(
            cascade_weights_path
        )
        self.min_size = min_size

    def should_filter_out(
        self, gray_frame: np.ndarray, previous_frame: np.ndarray | None
    ) -> bool:
        """Filter out if no face is detected in the frame."""
        faces = self.cascade_classifier.detectMultiScale(
            gray_frame,
            scaleFactor=1.1,
            minNeighbors=5,
            minSize=self.min_size,
            flags=cv2.CASCADE_SCALE_IMAGE,
        )
        return len(faces) == 0

    @staticmethod
    def create_haar_cascade_classifier(cascade_path: str) -> cv2.CascadeClassifier:
        """Create and return a Haar cascade classifier for face detection.

        Args:
            cascade_path: Path to the Haar cascade XML file

        Returns:
            cv2.CascadeClassifier: Loaded Haar cascade classifier
        """
        classifier = cv2.CascadeClassifier()
        if not classifier.load(cascade_path):
            raise ValueError(
                f"Could not load Haar cascade classifier from {cascade_path}"
            )
        return classifier

    def type(self) -> FilterType:
        """Return the type of filter."""
        return FilterType.FACE_DETECTION

create_haar_cascade_classifier(cascade_path) staticmethod

Create and return a Haar cascade classifier for face detection.

Parameters:

Name Type Description Default
cascade_path str

Path to the Haar cascade XML file

required

Returns:

Type Description
CascadeClassifier

cv2.CascadeClassifier: Loaded Haar cascade classifier

Source code in src/video_processing/filters.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
@staticmethod
def create_haar_cascade_classifier(cascade_path: str) -> cv2.CascadeClassifier:
    """Create and return a Haar cascade classifier for face detection.

    Args:
        cascade_path: Path to the Haar cascade XML file

    Returns:
        cv2.CascadeClassifier: Loaded Haar cascade classifier
    """
    classifier = cv2.CascadeClassifier()
    if not classifier.load(cascade_path):
        raise ValueError(
            f"Could not load Haar cascade classifier from {cascade_path}"
        )
    return classifier

should_filter_out(gray_frame, previous_frame)

Filter out if no face is detected in the frame.

Source code in src/video_processing/filters.py
103
104
105
106
107
108
109
110
111
112
113
114
def should_filter_out(
    self, gray_frame: np.ndarray, previous_frame: np.ndarray | None
) -> bool:
    """Filter out if no face is detected in the frame."""
    faces = self.cascade_classifier.detectMultiScale(
        gray_frame,
        scaleFactor=1.1,
        minNeighbors=5,
        minSize=self.min_size,
        flags=cv2.CASCADE_SCALE_IMAGE,
    )
    return len(faces) == 0

type()

Return the type of filter.

Source code in src/video_processing/filters.py
133
134
135
def type(self) -> FilterType:
    """Return the type of filter."""
    return FilterType.FACE_DETECTION

FaceFilter

Bases: ABC

Source code in src/video_processing/filters.py
14
15
16
17
18
19
20
21
22
23
24
25
class FaceFilter(ABC):

    @abstractmethod
    def should_filter_out(
        self, gray_frame: np.ndarray, previous_frame: np.ndarray | None
    ) -> bool:
        pass

    @abstractmethod
    def type(self) -> FilterType:
        """Return the type of filter."""
        pass

type() abstractmethod

Return the type of filter.

Source code in src/video_processing/filters.py
22
23
24
25
@abstractmethod
def type(self) -> FilterType:
    """Return the type of filter."""
    pass

SimilarityFilter

Bases: FaceFilter

Source code in src/video_processing/filters.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class SimilarityFilter(FaceFilter):

    def __init__(self, mse_threshold: float):
        self.mse_threshold = mse_threshold

    def should_filter_out(
        self, gray_frame: np.ndarray, previous_frame: np.ndarray | None
    ) -> bool:
        """Filter out if the current frame is similar to the previous one based on MSE."""
        if previous_frame is None:
            return False

        mse = self.mse(gray_frame, previous_frame)
        is_similar = mse < self.mse_threshold
        return is_similar

    def mse(self, frame1: np.ndarray, frame2: np.ndarray) -> float:
        """Calculate Mean Squared Error (MSE) between two frames."""
        mse = np.mean((frame1.astype("float") - frame2.astype("float")) ** 2)
        return float(mse)

    def type(self) -> FilterType:
        """Return the type of filter."""
        return FilterType.SIMILARITY

mse(frame1, frame2)

Calculate Mean Squared Error (MSE) between two frames.

Source code in src/video_processing/filters.py
44
45
46
47
def mse(self, frame1: np.ndarray, frame2: np.ndarray) -> float:
    """Calculate Mean Squared Error (MSE) between two frames."""
    mse = np.mean((frame1.astype("float") - frame2.astype("float")) ** 2)
    return float(mse)

should_filter_out(gray_frame, previous_frame)

Filter out if the current frame is similar to the previous one based on MSE.

Source code in src/video_processing/filters.py
33
34
35
36
37
38
39
40
41
42
def should_filter_out(
    self, gray_frame: np.ndarray, previous_frame: np.ndarray | None
) -> bool:
    """Filter out if the current frame is similar to the previous one based on MSE."""
    if previous_frame is None:
        return False

    mse = self.mse(gray_frame, previous_frame)
    is_similar = mse < self.mse_threshold
    return is_similar

type()

Return the type of filter.

Source code in src/video_processing/filters.py
49
50
51
def type(self) -> FilterType:
    """Return the type of filter."""
    return FilterType.SIMILARITY

StatisticsFilter

Bases: FaceFilter

Source code in src/video_processing/filters.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class StatisticsFilter(FaceFilter):

    def __init__(
        self,
        variance: float,
        mean: float,
        variance_tolerance: float,
        mean_tolerance: float,
    ):
        self.variance = variance
        self.mean = mean
        self.variance_tolerance = variance_tolerance
        self.mean_tolerance = mean_tolerance

    def should_filter_out(
        self, gray_frame: np.ndarray, previous_frame: np.ndarray | None
    ) -> bool:
        """Filter out if the frame's statistics match the configured values."""
        variance, mean = self.calculate_image_stats(gray_frame)
        variance_match = abs(variance - self.variance) <= self.variance_tolerance
        mean_match = abs(mean - self.mean) <= self.mean_tolerance

        return variance_match and mean_match

    def calculate_image_stats(self, gray_frame: np.ndarray) -> tuple[float, float]:
        """Calculate variance and mean of a grayscale image.

        Args:
            gray_frame: Grayscale image as numpy array
        Returns:
            tuple: (variance, mean)
        """
        variance = np.var(gray_frame.astype(np.float64))
        mean = np.mean(gray_frame.astype(np.float64))
        return float(variance), float(mean)

    def type(self) -> FilterType:
        """Return the type of filter."""
        return FilterType.STATISTICS

calculate_image_stats(gray_frame)

Calculate variance and mean of a grayscale image.

Parameters:

Name Type Description Default
gray_frame ndarray

Grayscale image as numpy array

required

Returns: tuple: (variance, mean)

Source code in src/video_processing/filters.py
78
79
80
81
82
83
84
85
86
87
88
def calculate_image_stats(self, gray_frame: np.ndarray) -> tuple[float, float]:
    """Calculate variance and mean of a grayscale image.

    Args:
        gray_frame: Grayscale image as numpy array
    Returns:
        tuple: (variance, mean)
    """
    variance = np.var(gray_frame.astype(np.float64))
    mean = np.mean(gray_frame.astype(np.float64))
    return float(variance), float(mean)

should_filter_out(gray_frame, previous_frame)

Filter out if the frame's statistics match the configured values.

Source code in src/video_processing/filters.py
68
69
70
71
72
73
74
75
76
def should_filter_out(
    self, gray_frame: np.ndarray, previous_frame: np.ndarray | None
) -> bool:
    """Filter out if the frame's statistics match the configured values."""
    variance, mean = self.calculate_image_stats(gray_frame)
    variance_match = abs(variance - self.variance) <= self.variance_tolerance
    mean_match = abs(mean - self.mean) <= self.mean_tolerance

    return variance_match and mean_match

type()

Return the type of filter.

Source code in src/video_processing/filters.py
90
91
92
def type(self) -> FilterType:
    """Return the type of filter."""
    return FilterType.STATISTICS

processor

VideoProcessor

Class to process the screen recording and transform it into a dataset of image files.

Source code in src/video_processing/processor.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
class VideoProcessor:
    """Class to process the screen recording and transform it into a dataset of image files."""

    def __init__(self, config: Configuration):
        self.config = config

        self.statistics_filter = StatisticsFilter(
            variance=config.variance_filter_value,
            mean=config.mean_filter_value,
            variance_tolerance=config.variance_filter_tolerance,
            mean_tolerance=config.mean_filter_tolerance,
        )
        self.similarity_filter = SimilarityFilter(
            mse_threshold=config.pixel_diff_threshold,
        )
        self.face_detection_filter = FaceDetectionFilter(
            cascade_weights_path=config.face_detection_model_path,
            min_size=config.face_detection_min_size,
        )
        self.filters = (
            self.statistics_filter,
            self.similarity_filter,
            self.face_detection_filter,
        )

        self.frames_processed = 0
        self.frames_saved = 0
        self.curr_face_idx = 1
        self.previous_frame = None
        self.filter_counts = {
            FilterType.SIMILARITY: 0,
            FilterType.STATISTICS: 0,
            FilterType.FACE_DETECTION: 0,
        }

        self.sequence_markers: list[SequenceMarker] = [
            SequenceMarker(0, 0)
        ]  # Starts with no face (static icon)

        os.makedirs(config.faces_output_dir, exist_ok=True)
        os.makedirs(config.camera_output_dir, exist_ok=True)

    def find_face_sequences(self):
        """Find sequences in video stream and extract unique faces."""
        with video_capture(self.config.video_path) as cap:
            video_props = VideoProperties.from_capture(cap)
            self._print_configuration_info(video_props)

            is_now_inside_face_sequence = False

            while True:
                ret, frame = cap.read()
                if not ret:
                    break

                face, face_grayscale = self._extract_face(frame)

                # For the first frame that contains the static icon after a sequence of faces, a marker is added.
                if is_now_inside_face_sequence:
                    if self._is_static_icon_in_bottom_left_corner(face_grayscale):
                        is_now_inside_face_sequence = False
                        self.sequence_markers.append(
                            SequenceMarker(self.frames_processed, 0)
                        )

                # Check if the is the region contains a face that is a beginning of a new sequence.
                # (After a sequence of static icons), a marker is added and the face image is saved.
                is_new_face = self._is_new_face_in_bottom_left_corner(face_grayscale)
                if is_new_face:
                    self.save_face_file(face, self.frames_processed, self.curr_face_idx)
                    self.sequence_markers.append(
                        SequenceMarker(self.frames_processed, self.curr_face_idx)
                    )
                    is_now_inside_face_sequence = True
                    self.frames_saved += 1
                    self.curr_face_idx += 1

                self.previous_frame = face_grayscale.copy()
                self.frames_processed += 1

                if self.frames_processed % 1000 == 0:
                    self._print_progress(video_props)

        self._print_summary()
        self._print_sequence_markers_summary()

    def save_sampled_camera_regions_from_sequences(
        self, frame_idx_to_face_idx: dict[int, int]
    ):
        """Save camera regions from evenly distributed frames within each sequence.

        For each sequence, this method samples frames evenly distributed across the sequence
        length and saves the camera region for each sampled frame.

        Args:
            samples_per_sequence: Number of frames to sample from each sequence
        """

        with video_capture(self.config.video_path) as cap:
            current_frame_idx = 0

            while len(frame_idx_to_face_idx) > 0:
                ret, frame = cap.read()
                if not ret:
                    break

                if current_frame_idx in frame_idx_to_face_idx:
                    face_idx = frame_idx_to_face_idx.pop(current_frame_idx)
                    camera_region = self._extract_camera_region(frame)

                    self._save_camera_region_file(
                        camera_region, current_frame_idx, face_idx
                    )

                current_frame_idx += 1

    def sample_frames_from_sequences_naive(
        self, samples_per_sequence: int
    ) -> dict[int, int]:
        """Sample given number of frames from each sequence.

        Return a mapping of sampled frame indices (keys) to face indices (values) present on those frames.
        """
        assert (
            len(self.sequence_markers) > 0
        ), "No sequences found to sample frames from."
        assert (
            samples_per_sequence > 1
        ), "Number of samples per sequence must be greater than 1."
        sequence_lengths = self.get_sequence_lengths()
        assert (
            len(sequence_lengths) > 0
        ), "No sequence lengths found to sample frames from."

        frame_idx_to_face_idx = {}

        for marker, length in zip(self.sequence_markers, sequence_lengths):
            sequence_start = marker.frame_number

            if (
                length <= samples_per_sequence
            ):  # If sequence is shorter than desired samples, take all frames
                sampled_frames = list(range(sequence_start, sequence_start + length))
            else:  # Calculate evenly distributed indices
                step = length / (samples_per_sequence - 1)
                sampled_frames = [
                    (
                        sequence_start + length - 1
                        if i
                        == samples_per_sequence
                        - 1  # Last sample should be the last frame of the sequence
                        else sequence_start + int(i * step)
                    )
                    for i in range(samples_per_sequence)
                ]

            for frame_idx in sampled_frames:
                frame_idx_to_face_idx[frame_idx] = marker.face_index

        return frame_idx_to_face_idx

    def sample_frames_from_sequence_offset_based(
        self, sequence_descriptors: list[SequenceDescriptor], samples_per_sequence: int
    ) -> dict[int, int]:
        """Sampling strategy based on sequence descriptors - defined start and end frames."""
        frame_idx_to_face_idx = {}

        for descriptor in sequence_descriptors:
            sequence_start = descriptor.start_frame
            sequence_end = descriptor.end_frame
            face_index = descriptor.face_index

            if (
                sequence_end - sequence_start + 1 <= samples_per_sequence
            ):  # If sequence is shorter than desired samples, take all frames
                sampled_frames = list(range(sequence_start, sequence_end + 1))
            else:  # Calculate evenly distributed indices
                step = (sequence_end - sequence_start) / (samples_per_sequence - 1)
                sampled_frames = [
                    (
                        sequence_end
                        if i == samples_per_sequence - 1
                        else sequence_start + int(i * step)
                    )
                    for i in range(samples_per_sequence)
                ]

            for frame_idx in sampled_frames:
                frame_idx_to_face_idx[frame_idx] = face_index

        return frame_idx_to_face_idx

    def get_offset_sequence_bounds(
        self, frames_before_marker: int, frames_after_marker: int
    ) -> list[SequenceDescriptor]:
        """Recalculate sequence bounds as offsets from the sequence markers.

        Sequence markers mark the frames where the system recognized a face or the static icon was displayed.
        Some time before recognizing a face, the person must have appeared in the camera view.
        Some time after recognizing a face (not necessarily when the icon appears in the corner),
        the person must have left the camera view.

        This approach calculates the start and end frames of each sequence based on offsets from the markers.
        Throw away the sequences that (supposedly) do not contain a face (face_index == 0).
        """
        return [
            SequenceDescriptor(
                start_frame=marker.frame_number - frames_before_marker,
                end_frame=marker.frame_number + frames_after_marker,
                face_index=marker.face_index,
            )
            for marker in self.sequence_markers
            if marker.face_index > 0
        ]

    @staticmethod
    def _validate_rectangle(rect: Rect, video_props: VideoProperties):
        """Validate that the rectangle is within the bounds of the video properties."""
        if (
            rect.x < 0
            or rect.y < 0
            or rect.x + rect.width > video_props.width
            or rect.y + rect.height > video_props.height
        ):
            raise ValueError(
                f"Error: Rectangle is outside video bounds ({video_props.width}x{video_props.height})"
            )

    def _is_new_face_in_bottom_left_corner(self, face_grayscale) -> bool:

        for frame_filter in self.filters:
            if frame_filter.should_filter_out(face_grayscale, self.previous_frame):
                self.filter_counts[frame_filter.type()] += 1
                return False

        return True

    def _is_static_icon_in_bottom_left_corner(self, face_grayscale) -> bool:
        """Check if the region matches the statistics for the icon.

        Icon is displayed when there is no face.
        The pixel values are not an exact match, so checking statistics.
        """
        return self.statistics_filter.should_filter_out(face_grayscale, None)

    def _extract_face(self, frame):
        rect = self.config.face_rectangle
        face = frame[rect.y : rect.y + rect.height, rect.x : rect.x + rect.width]
        face = cv2.rotate(face, cv2.ROTATE_90_CLOCKWISE)
        face_grayscale = cv2.cvtColor(face, cv2.COLOR_BGR2GRAY)
        face_grayscale = np.array(face_grayscale)
        return face, face_grayscale

    def _extract_camera_region(self, frame):
        rect = self.config.camera_rectangle
        camera_region = frame[
            rect.y : rect.y + rect.height, rect.x : rect.x + rect.width
        ]
        camera_region = cv2.rotate(camera_region, cv2.ROTATE_90_CLOCKWISE)
        return camera_region

    def _save_camera_region_file(self, camera_region, frame_idx: int, face_idx: int):
        filename = f"frame_{frame_idx:06d}_with_face_{face_idx:06d}.jpg"
        filepath = os.path.join(self.config.camera_output_dir, filename)
        success = cv2.imwrite(filepath, camera_region)

        if not success:
            raise RuntimeError(
                f"Failed to save camera region from frame {frame_idx} to {filepath}"
            )

        print(f"Saved camera region from frame {frame_idx}: {filename}")

    def save_face_file(self, face, frame_idx: int, face_idx: int):
        filename = f"frame_{frame_idx:06d}_face_{face_idx:06d}.jpg"
        filepath = os.path.join(self.config.faces_output_dir, filename)
        success = cv2.imwrite(filepath, face)

        if not success:
            raise RuntimeError(f"Failed to save face {frame_idx} to {filepath}")

        print(f"Saved face from frame {frame_idx:06d}: {filename}")

    def _print_configuration_info(self, video_props: VideoProperties):
        print(f"Video Properties: {video_props}")
        print(f"Configuration: {self.config}")
        print(f"Face rectangle: {self.config.face_rectangle}")

    def _print_summary(self):
        print(f"Extraction complete!")
        print(f"Total frames processed: {self.frames_processed}")
        print(f"Successfully saved: {self.frames_saved} images")
        print(
            f"Filtered by pixel similarity: {self.filter_counts[FilterType.SIMILARITY]}"
        )
        print(f"Filtered by statistics: {self.filter_counts[FilterType.STATISTICS]}")
        print(
            f"Filtered by face detection: {self.filter_counts[FilterType.FACE_DETECTION]}"
        )
        print(f"Total filtered: {sum(self.filter_counts.values())}")
        print(
            f"Filtering efficiency: {((self.frames_processed - self.frames_saved) / self.frames_processed * 100):.1f}% frames filtered out"
        )
        print(f"Output directory: {self.config.faces_output_dir}")

    def _print_progress(self, video_props: VideoProperties):
        progress = (self.frames_processed / video_props.total_frames) * 100
        print(
            f"Progress: {progress:.1f}% - Processed {self.frames_processed}/{video_props.total_frames} frames, Saved {self.frames_saved} unique"
        )

    def _print_sequence_markers_summary(self):
        print("-" * 40)
        print(f"Sequence markers: {len(self.sequence_markers)}")
        for marker in self.sequence_markers:
            print(
                f"Frame {marker.frame_number:06d}, Face Index {marker.face_index:06d}"
            )

    def get_sequence_lengths(self) -> list[int]:
        """
        Convert sequence markers into a list of sequence lengths.

        Sequence markers mark the beginning of new sequences. This function calculates
        the length of each sequence by finding the distance between consecutive markers.

        Returns:
            list[int]: List of sequence lengths in frames
        """
        if len(self.sequence_markers) < 2:
            return []

        sequence_lengths = []

        for i in range(len(self.sequence_markers) - 1):
            current_marker = self.sequence_markers[i]
            next_marker = self.sequence_markers[i + 1]

            # Length is the distance between consecutive markers
            sequence_length = next_marker.frame_number - current_marker.frame_number
            sequence_lengths.append(sequence_length)

        # For the last sequence, calculate length from last marker to end of video
        if self.sequence_markers:
            last_marker = self.sequence_markers[-1]
            last_sequence_length = self.frames_processed - last_marker.frame_number
            sequence_lengths.append(last_sequence_length)

        return sequence_lengths

    def print_sequence_analysis(self):
        """Print detailed analysis of sequence lengths and patterns."""
        sequence_lengths = self.get_sequence_lengths()

        if not sequence_lengths:
            print("No sequences found for analysis.")
            return

        print("-" * 60)
        print("SEQUENCE ANALYSIS")
        print("-" * 60)

        # Basic statistics
        total_sequences = len(sequence_lengths)
        min_length = min(sequence_lengths)
        max_length = max(sequence_lengths)
        avg_length = sum(sequence_lengths) / len(sequence_lengths)

        print(f"Total sequences: {total_sequences}")
        print(f"Sequence lengths: {sequence_lengths}")
        print(f"Min length: {min_length} frames")
        print(f"Max length: {max_length} frames")
        print(f"Average length: {avg_length:.1f} frames")

        # Detailed breakdown
        print("\nSequence breakdown:")
        for i, (marker, length) in enumerate(
            zip(self.sequence_markers, sequence_lengths)
        ):
            sequence_type = (
                "Face sequence" if marker.face_index > 0 else "Static icon sequence"
            )
            print(
                f"Sequence {i+1:2d}: Frame {marker.frame_number:06d} -> {length:3d} frames ({sequence_type})"
            )

        # Pattern analysis
        face_sequences = [
            length
            for marker, length in zip(self.sequence_markers, sequence_lengths)
            if marker.face_index > 0
        ]
        icon_sequences = [
            length
            for marker, length in zip(self.sequence_markers, sequence_lengths)
            if marker.face_index == 0
        ]

        if face_sequences:
            print(
                f"\nFace sequences: {len(face_sequences)} total, avg length: {sum(face_sequences)/len(face_sequences):.1f} frames"
            )
        if icon_sequences:
            print(
                f"Icon sequences: {len(icon_sequences)} total, avg length: {sum(icon_sequences)/len(icon_sequences):.1f} frames"
            )

find_face_sequences()

Find sequences in video stream and extract unique faces.

Source code in src/video_processing/processor.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def find_face_sequences(self):
    """Find sequences in video stream and extract unique faces."""
    with video_capture(self.config.video_path) as cap:
        video_props = VideoProperties.from_capture(cap)
        self._print_configuration_info(video_props)

        is_now_inside_face_sequence = False

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            face, face_grayscale = self._extract_face(frame)

            # For the first frame that contains the static icon after a sequence of faces, a marker is added.
            if is_now_inside_face_sequence:
                if self._is_static_icon_in_bottom_left_corner(face_grayscale):
                    is_now_inside_face_sequence = False
                    self.sequence_markers.append(
                        SequenceMarker(self.frames_processed, 0)
                    )

            # Check if the is the region contains a face that is a beginning of a new sequence.
            # (After a sequence of static icons), a marker is added and the face image is saved.
            is_new_face = self._is_new_face_in_bottom_left_corner(face_grayscale)
            if is_new_face:
                self.save_face_file(face, self.frames_processed, self.curr_face_idx)
                self.sequence_markers.append(
                    SequenceMarker(self.frames_processed, self.curr_face_idx)
                )
                is_now_inside_face_sequence = True
                self.frames_saved += 1
                self.curr_face_idx += 1

            self.previous_frame = face_grayscale.copy()
            self.frames_processed += 1

            if self.frames_processed % 1000 == 0:
                self._print_progress(video_props)

    self._print_summary()
    self._print_sequence_markers_summary()

get_offset_sequence_bounds(frames_before_marker, frames_after_marker)

Recalculate sequence bounds as offsets from the sequence markers.

Sequence markers mark the frames where the system recognized a face or the static icon was displayed. Some time before recognizing a face, the person must have appeared in the camera view. Some time after recognizing a face (not necessarily when the icon appears in the corner), the person must have left the camera view.

This approach calculates the start and end frames of each sequence based on offsets from the markers. Throw away the sequences that (supposedly) do not contain a face (face_index == 0).

Source code in src/video_processing/processor.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def get_offset_sequence_bounds(
    self, frames_before_marker: int, frames_after_marker: int
) -> list[SequenceDescriptor]:
    """Recalculate sequence bounds as offsets from the sequence markers.

    Sequence markers mark the frames where the system recognized a face or the static icon was displayed.
    Some time before recognizing a face, the person must have appeared in the camera view.
    Some time after recognizing a face (not necessarily when the icon appears in the corner),
    the person must have left the camera view.

    This approach calculates the start and end frames of each sequence based on offsets from the markers.
    Throw away the sequences that (supposedly) do not contain a face (face_index == 0).
    """
    return [
        SequenceDescriptor(
            start_frame=marker.frame_number - frames_before_marker,
            end_frame=marker.frame_number + frames_after_marker,
            face_index=marker.face_index,
        )
        for marker in self.sequence_markers
        if marker.face_index > 0
    ]

get_sequence_lengths()

Convert sequence markers into a list of sequence lengths.

Sequence markers mark the beginning of new sequences. This function calculates the length of each sequence by finding the distance between consecutive markers.

Returns:

Type Description
list[int]

list[int]: List of sequence lengths in frames

Source code in src/video_processing/processor.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def get_sequence_lengths(self) -> list[int]:
    """
    Convert sequence markers into a list of sequence lengths.

    Sequence markers mark the beginning of new sequences. This function calculates
    the length of each sequence by finding the distance between consecutive markers.

    Returns:
        list[int]: List of sequence lengths in frames
    """
    if len(self.sequence_markers) < 2:
        return []

    sequence_lengths = []

    for i in range(len(self.sequence_markers) - 1):
        current_marker = self.sequence_markers[i]
        next_marker = self.sequence_markers[i + 1]

        # Length is the distance between consecutive markers
        sequence_length = next_marker.frame_number - current_marker.frame_number
        sequence_lengths.append(sequence_length)

    # For the last sequence, calculate length from last marker to end of video
    if self.sequence_markers:
        last_marker = self.sequence_markers[-1]
        last_sequence_length = self.frames_processed - last_marker.frame_number
        sequence_lengths.append(last_sequence_length)

    return sequence_lengths

print_sequence_analysis()

Print detailed analysis of sequence lengths and patterns.

Source code in src/video_processing/processor.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
def print_sequence_analysis(self):
    """Print detailed analysis of sequence lengths and patterns."""
    sequence_lengths = self.get_sequence_lengths()

    if not sequence_lengths:
        print("No sequences found for analysis.")
        return

    print("-" * 60)
    print("SEQUENCE ANALYSIS")
    print("-" * 60)

    # Basic statistics
    total_sequences = len(sequence_lengths)
    min_length = min(sequence_lengths)
    max_length = max(sequence_lengths)
    avg_length = sum(sequence_lengths) / len(sequence_lengths)

    print(f"Total sequences: {total_sequences}")
    print(f"Sequence lengths: {sequence_lengths}")
    print(f"Min length: {min_length} frames")
    print(f"Max length: {max_length} frames")
    print(f"Average length: {avg_length:.1f} frames")

    # Detailed breakdown
    print("\nSequence breakdown:")
    for i, (marker, length) in enumerate(
        zip(self.sequence_markers, sequence_lengths)
    ):
        sequence_type = (
            "Face sequence" if marker.face_index > 0 else "Static icon sequence"
        )
        print(
            f"Sequence {i+1:2d}: Frame {marker.frame_number:06d} -> {length:3d} frames ({sequence_type})"
        )

    # Pattern analysis
    face_sequences = [
        length
        for marker, length in zip(self.sequence_markers, sequence_lengths)
        if marker.face_index > 0
    ]
    icon_sequences = [
        length
        for marker, length in zip(self.sequence_markers, sequence_lengths)
        if marker.face_index == 0
    ]

    if face_sequences:
        print(
            f"\nFace sequences: {len(face_sequences)} total, avg length: {sum(face_sequences)/len(face_sequences):.1f} frames"
        )
    if icon_sequences:
        print(
            f"Icon sequences: {len(icon_sequences)} total, avg length: {sum(icon_sequences)/len(icon_sequences):.1f} frames"
        )

sample_frames_from_sequence_offset_based(sequence_descriptors, samples_per_sequence)

Sampling strategy based on sequence descriptors - defined start and end frames.

Source code in src/video_processing/processor.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def sample_frames_from_sequence_offset_based(
    self, sequence_descriptors: list[SequenceDescriptor], samples_per_sequence: int
) -> dict[int, int]:
    """Sampling strategy based on sequence descriptors - defined start and end frames."""
    frame_idx_to_face_idx = {}

    for descriptor in sequence_descriptors:
        sequence_start = descriptor.start_frame
        sequence_end = descriptor.end_frame
        face_index = descriptor.face_index

        if (
            sequence_end - sequence_start + 1 <= samples_per_sequence
        ):  # If sequence is shorter than desired samples, take all frames
            sampled_frames = list(range(sequence_start, sequence_end + 1))
        else:  # Calculate evenly distributed indices
            step = (sequence_end - sequence_start) / (samples_per_sequence - 1)
            sampled_frames = [
                (
                    sequence_end
                    if i == samples_per_sequence - 1
                    else sequence_start + int(i * step)
                )
                for i in range(samples_per_sequence)
            ]

        for frame_idx in sampled_frames:
            frame_idx_to_face_idx[frame_idx] = face_index

    return frame_idx_to_face_idx

sample_frames_from_sequences_naive(samples_per_sequence)

Sample given number of frames from each sequence.

Return a mapping of sampled frame indices (keys) to face indices (values) present on those frames.

Source code in src/video_processing/processor.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def sample_frames_from_sequences_naive(
    self, samples_per_sequence: int
) -> dict[int, int]:
    """Sample given number of frames from each sequence.

    Return a mapping of sampled frame indices (keys) to face indices (values) present on those frames.
    """
    assert (
        len(self.sequence_markers) > 0
    ), "No sequences found to sample frames from."
    assert (
        samples_per_sequence > 1
    ), "Number of samples per sequence must be greater than 1."
    sequence_lengths = self.get_sequence_lengths()
    assert (
        len(sequence_lengths) > 0
    ), "No sequence lengths found to sample frames from."

    frame_idx_to_face_idx = {}

    for marker, length in zip(self.sequence_markers, sequence_lengths):
        sequence_start = marker.frame_number

        if (
            length <= samples_per_sequence
        ):  # If sequence is shorter than desired samples, take all frames
            sampled_frames = list(range(sequence_start, sequence_start + length))
        else:  # Calculate evenly distributed indices
            step = length / (samples_per_sequence - 1)
            sampled_frames = [
                (
                    sequence_start + length - 1
                    if i
                    == samples_per_sequence
                    - 1  # Last sample should be the last frame of the sequence
                    else sequence_start + int(i * step)
                )
                for i in range(samples_per_sequence)
            ]

        for frame_idx in sampled_frames:
            frame_idx_to_face_idx[frame_idx] = marker.face_index

    return frame_idx_to_face_idx

save_sampled_camera_regions_from_sequences(frame_idx_to_face_idx)

Save camera regions from evenly distributed frames within each sequence.

For each sequence, this method samples frames evenly distributed across the sequence length and saves the camera region for each sampled frame.

Parameters:

Name Type Description Default
samples_per_sequence

Number of frames to sample from each sequence

required
Source code in src/video_processing/processor.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def save_sampled_camera_regions_from_sequences(
    self, frame_idx_to_face_idx: dict[int, int]
):
    """Save camera regions from evenly distributed frames within each sequence.

    For each sequence, this method samples frames evenly distributed across the sequence
    length and saves the camera region for each sampled frame.

    Args:
        samples_per_sequence: Number of frames to sample from each sequence
    """

    with video_capture(self.config.video_path) as cap:
        current_frame_idx = 0

        while len(frame_idx_to_face_idx) > 0:
            ret, frame = cap.read()
            if not ret:
                break

            if current_frame_idx in frame_idx_to_face_idx:
                face_idx = frame_idx_to_face_idx.pop(current_frame_idx)
                camera_region = self._extract_camera_region(frame)

                self._save_camera_region_file(
                    camera_region, current_frame_idx, face_idx
                )

            current_frame_idx += 1

naive_strategy(config)

Naive sampling strategy that samples frames from each sequence.

Source code in src/video_processing/processor.py
426
427
428
429
430
431
432
433
434
435
436
def naive_strategy(config: Configuration):
    """Naive sampling strategy that samples frames from each sequence."""
    processor = VideoProcessor(config)
    processor.find_face_sequences()
    processor.print_sequence_analysis()

    # Sample frames from sequences
    sequence_mapping = processor.sample_frames_from_sequences_naive(
        config.samples_per_sequence
    )
    processor.save_sampled_camera_regions_from_sequences(sequence_mapping)

offset_strategy(config)

Offset sampling strategy that samples frames based on sequence descriptors.

Source code in src/video_processing/processor.py
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
def offset_strategy(config: Configuration):
    """Offset sampling strategy that samples frames based on sequence descriptors."""
    processor = VideoProcessor(config)
    processor.find_face_sequences()
    processor.print_sequence_analysis()

    # Offset sequence sampling
    assert config.offset_before_marker is not None
    assert config.offset_after_marker is not None
    seq_descriptors = processor.get_offset_sequence_bounds(
        config.offset_before_marker, config.offset_after_marker
    )
    samples = processor.sample_frames_from_sequence_offset_based(
        seq_descriptors, config.samples_per_sequence
    )
    processor.save_sampled_camera_regions_from_sequences(samples)

sequence_marker

SequenceMarker dataclass

Information about beginning of a frame sequence in a video.

face_index is 0 if no face was detected in the frame.

Source code in src/video_processing/sequence_marker.py
 4
 5
 6
 7
 8
 9
10
11
12
@dataclass
class SequenceMarker:
    """Information about beginning of a frame sequence in a video.

    face_index is 0 if no face was detected in the frame.
    """

    frame_number: int
    face_index: int