Module dvt.video

Video files.

Expand source code
# -*- coding: utf-8 -*-
"""Video files.
"""

from math import ceil
from numpy import zeros, uint8
from pandas import DataFrame
from cv2 import (
    VideoCapture,
    CAP_PROP_FPS,
    CAP_PROP_FRAME_COUNT,
    CAP_PROP_FRAME_HEIGHT,
    CAP_PROP_FRAME_WIDTH,
    CAP_PROP_POS_MSEC,
)

from .utils import _expand_path


class VideoFrameInput:
    """An input object for extracting single frames from an input video."""

    def __init__(self, input_path):
        """Construct a new input from a video file.

        Args:
            input_path (str): Path to the video file. Can be any file readable
                by the OpenCV function VideoCapture.
            bsize (int): Number of frames to include in a batch. Defaults to
                256.
        """
        self.input_path = _expand_path(input_path)[0]
        self.meta = None
        self.fcount = -1
        self.finished = False
        self._video_cap = None
        self.reset()

        super().__init__()

    def reset(self):
        """Open connection to the video file."""
        self.fcount = -1
        self.finished = False

        self._video_cap = VideoCapture(self.input_path)
        self._ftotal = int(self._video_cap.get(CAP_PROP_FRAME_COUNT))
        self.meta = self._metadata()

    def next_frame(self):
        """Get the next frame."""
        if self.finished:
            return

        # get the next frame and return
        self.fcount = self.fcount + 1
        _, frame = self._video_cap.read()
        self.finished = self._ftotal == (self.fcount + 1)
        return frame

    def get_metadata(self):
        """Return metadata in a format to put into DVTOutput"""
        return {"meta": DataFrame(self.meta, index=[0])}

    def _metadata(self):
        """Fill metadata attribute using metadata from the video source."""
        path, bname, filename, file_extension = _expand_path(self.input_path)
        return {
            "type": "video",
            "fps": self._video_cap.get(CAP_PROP_FPS),
            "frames": int(self._video_cap.get(CAP_PROP_FRAME_COUNT)),
            "height": int(self._video_cap.get(CAP_PROP_FRAME_HEIGHT)),
            "width": int(self._video_cap.get(CAP_PROP_FRAME_WIDTH)),
            "input_path": path,
            "input_bname": bname,
            "input_filename": filename,
            "input_file_extension": file_extension,
        }


class VideoBatchInput:
    """An input object for extracting batches of images from an input video."""

    def __init__(self, input_path, bsize=256):
        """Construct a new input from a video file.

        Args:
            input_path (str): Path to the video file. Can be any file readable
                by the OpenCV function VideoCapture.
            bsize (int): Number of frames to include in a batch. Defaults to
                256.
        """
        self.input_path = _expand_path(input_path)[0]
        self.bsize = bsize
        self.meta = None
        self.fcount = 0
        self.finished = False
        self.start = 0
        self.end = 0
        self.max_batch = 0
        self._video_cap = None
        self._img = None
        self._continue_read = True
        self.reset()

        super().__init__()

    def reset(self):
        """Open connection to the video file."""
        # start settings to
        self.fcount = 0
        self.finished = False
        self.start = 0
        self.end = 0
        self._video_cap = VideoCapture(self.input_path)
        self.meta = self._metadata()
        self.max_batch = ceil(self.meta["frames"] / self.bsize)

        self._img = zeros(
            (self.bsize * 2, self.meta["height"], self.meta["width"], 3),
            dtype=uint8,
        )
        self._fill_bandwidth()  # fill the buffer with the first batch
        self._continue_read = True  # is there any more input left

    def next_batch(self):
        """Move forward one batch and return the current FrameBatch object.

        Returns:
            A FrameBatch object that contains the next set of frames.
        """

        if self.finished:
            return

        # shift window over by one bandwidth
        self._img[: self.bsize, :, :, :] = self._img[self.bsize :, :, :, :]

        # fill up the bandwidth; with zeros at and of video input
        if self._continue_read:
            self._fill_bandwidth()
        else:
            self.finished = True
            self._img[self.bsize :, :, :, :] = 0

        # update counters
        frame_start = self.fcount
        self.start = self.end
        self.end = self._video_cap.get(CAP_PROP_POS_MSEC)
        self.fcount = self.fcount + self.bsize

        # get frame names
        fnames = list(range(int(frame_start), int(frame_start + self.bsize)))

        # return batch of frames.
        return FrameBatch(
            img=self._img,
            start=self.start,
            end=self.end,
            finished=self.finished,
            fnames=fnames,
            bnum=(frame_start // self.bsize),
        )

    def get_metadata(self):
        """Return metadata in a format to put into a DVTOutput object."""
        return {"meta": DataFrame(self.meta, index=[0])}

    def _metadata(self):
        """Fill metadata attribute using metadata from the video source."""
        path, bname, filename, file_extension = _expand_path(self.input_path)
        return {
            "type": "video",
            "fps": self._video_cap.get(CAP_PROP_FPS),
            "frames": int(self._video_cap.get(CAP_PROP_FRAME_COUNT)),
            "height": int(self._video_cap.get(CAP_PROP_FRAME_HEIGHT)),
            "width": int(self._video_cap.get(CAP_PROP_FRAME_WIDTH)),
            "input_path": path,
            "input_bname": bname,
            "input_filename": filename,
            "input_file_extension": file_extension,
        }

    def _fill_bandwidth(self):
        """Read in the next set of frames from disk and store results.

        This should not be called directly, but only through the next_batch
        method. Otherwise the internal counters will become inconsistent.
        """
        for idx in range(self.bsize):
            self._continue_read, frame = self._video_cap.read()
            if self._continue_read:
                self._img[idx + self.bsize, :, :, :] = frame
            else:
                self._img[idx + self.bsize, :, :, :] = 0


class FrameBatch:
    """A collection of frames and associated metadata.

    The batch contains an array of size (bsize * 2, width, height, 3). At the
    start and end of the video file, the array is padded with zeros (an all
    black frame). The batch includes twice as many frames as given in the
    batch size, but an annotator should only return results from the first
    half of the data (the "batch"). The other data is included for annotators
    that need to look ahead of the current, such as the cut detectors.

    Attributes:
        img (np.array): A four-dimensional array containing pixels from the
            next 2*bsize of images.
        start (float): Time code at the start of the current batch.
        end (float): Time code at the end of the current batch.
        fnames (list): Names of frames in the batch.
        bnum (int): The batch number.
        bsize (int): Number of frames in a batch.
    """

    def __init__(self, **kwargs):
        self.img = kwargs.get("img")
        self.start = kwargs.get("start")
        self.end = kwargs.get("end")
        self.fnames = kwargs.get("fnames")
        self.bnum = kwargs.get("bnum")
        self.bsize = self.img.shape[0] // 2

    def get_frames(self):
        """Return the entire image dataset for the batch.

        Use this method if you need to look ahead at the following batch for
        an annotator to work. Images are given in RGB space.

        Returns:
            A four-dimensional array containing pixels from the current and
            next batches of data.
        """
        return self.img

    def get_batch(self):
        """Return image data for just the current batch.

        Use this method unless you have a specific need to look ahead at new
        values in the data. Images are given in RGB space.

        Returns:
            A four-dimensional array containing pixels from the current batch
            of images.
        """
        return self.img[: self.bsize, :, :, :]

    def get_frame_names(self):
        """Return frame names for the current batch of data.

        Returns:
            A list of names of length equal to the batch size.
        """
        return self.fnames

Classes

class FrameBatch (**kwargs)

A collection of frames and associated metadata.

The batch contains an array of size (bsize * 2, width, height, 3). At the start and end of the video file, the array is padded with zeros (an all black frame). The batch includes twice as many frames as given in the batch size, but an annotator should only return results from the first half of the data (the "batch"). The other data is included for annotators that need to look ahead of the current, such as the cut detectors.

Attributes

img : np.array
A four-dimensional array containing pixels from the next 2*bsize of images.
start : float
Time code at the start of the current batch.
end : float
Time code at the end of the current batch.
fnames : list
Names of frames in the batch.
bnum : int
The batch number.
bsize : int
Number of frames in a batch.
Expand source code
class FrameBatch:
    """A collection of frames and associated metadata.

    The batch contains an array of size (bsize * 2, width, height, 3). At the
    start and end of the video file, the array is padded with zeros (an all
    black frame). The batch includes twice as many frames as given in the
    batch size, but an annotator should only return results from the first
    half of the data (the "batch"). The other data is included for annotators
    that need to look ahead of the current, such as the cut detectors.

    Attributes:
        img (np.array): A four-dimensional array containing pixels from the
            next 2*bsize of images.
        start (float): Time code at the start of the current batch.
        end (float): Time code at the end of the current batch.
        fnames (list): Names of frames in the batch.
        bnum (int): The batch number.
        bsize (int): Number of frames in a batch.
    """

    def __init__(self, **kwargs):
        self.img = kwargs.get("img")
        self.start = kwargs.get("start")
        self.end = kwargs.get("end")
        self.fnames = kwargs.get("fnames")
        self.bnum = kwargs.get("bnum")
        self.bsize = self.img.shape[0] // 2

    def get_frames(self):
        """Return the entire image dataset for the batch.

        Use this method if you need to look ahead at the following batch for
        an annotator to work. Images are given in RGB space.

        Returns:
            A four-dimensional array containing pixels from the current and
            next batches of data.
        """
        return self.img

    def get_batch(self):
        """Return image data for just the current batch.

        Use this method unless you have a specific need to look ahead at new
        values in the data. Images are given in RGB space.

        Returns:
            A four-dimensional array containing pixels from the current batch
            of images.
        """
        return self.img[: self.bsize, :, :, :]

    def get_frame_names(self):
        """Return frame names for the current batch of data.

        Returns:
            A list of names of length equal to the batch size.
        """
        return self.fnames

Methods

def get_batch(self)

Return image data for just the current batch.

Use this method unless you have a specific need to look ahead at new values in the data. Images are given in RGB space.

Returns

A four-dimensional array containing pixels from the current batch of images.

Expand source code
def get_batch(self):
    """Return image data for just the current batch.

    Use this method unless you have a specific need to look ahead at new
    values in the data. Images are given in RGB space.

    Returns:
        A four-dimensional array containing pixels from the current batch
        of images.
    """
    return self.img[: self.bsize, :, :, :]
def get_frame_names(self)

Return frame names for the current batch of data.

Returns

A list of names of length equal to the batch size.

Expand source code
def get_frame_names(self):
    """Return frame names for the current batch of data.

    Returns:
        A list of names of length equal to the batch size.
    """
    return self.fnames
def get_frames(self)

Return the entire image dataset for the batch.

Use this method if you need to look ahead at the following batch for an annotator to work. Images are given in RGB space.

Returns

A four-dimensional array containing pixels from the current and next batches of data.

Expand source code
def get_frames(self):
    """Return the entire image dataset for the batch.

    Use this method if you need to look ahead at the following batch for
    an annotator to work. Images are given in RGB space.

    Returns:
        A four-dimensional array containing pixels from the current and
        next batches of data.
    """
    return self.img
class VideoBatchInput (input_path, bsize=256)

An input object for extracting batches of images from an input video.

Construct a new input from a video file.

Args

input_path : str
Path to the video file. Can be any file readable by the OpenCV function VideoCapture.
bsize : int
Number of frames to include in a batch. Defaults to 256.
Expand source code
class VideoBatchInput:
    """An input object for extracting batches of images from an input video."""

    def __init__(self, input_path, bsize=256):
        """Construct a new input from a video file.

        Args:
            input_path (str): Path to the video file. Can be any file readable
                by the OpenCV function VideoCapture.
            bsize (int): Number of frames to include in a batch. Defaults to
                256.
        """
        self.input_path = _expand_path(input_path)[0]
        self.bsize = bsize
        self.meta = None
        self.fcount = 0
        self.finished = False
        self.start = 0
        self.end = 0
        self.max_batch = 0
        self._video_cap = None
        self._img = None
        self._continue_read = True
        self.reset()

        super().__init__()

    def reset(self):
        """Open connection to the video file."""
        # start settings to
        self.fcount = 0
        self.finished = False
        self.start = 0
        self.end = 0
        self._video_cap = VideoCapture(self.input_path)
        self.meta = self._metadata()
        self.max_batch = ceil(self.meta["frames"] / self.bsize)

        self._img = zeros(
            (self.bsize * 2, self.meta["height"], self.meta["width"], 3),
            dtype=uint8,
        )
        self._fill_bandwidth()  # fill the buffer with the first batch
        self._continue_read = True  # is there any more input left

    def next_batch(self):
        """Move forward one batch and return the current FrameBatch object.

        Returns:
            A FrameBatch object that contains the next set of frames.
        """

        if self.finished:
            return

        # shift window over by one bandwidth
        self._img[: self.bsize, :, :, :] = self._img[self.bsize :, :, :, :]

        # fill up the bandwidth; with zeros at and of video input
        if self._continue_read:
            self._fill_bandwidth()
        else:
            self.finished = True
            self._img[self.bsize :, :, :, :] = 0

        # update counters
        frame_start = self.fcount
        self.start = self.end
        self.end = self._video_cap.get(CAP_PROP_POS_MSEC)
        self.fcount = self.fcount + self.bsize

        # get frame names
        fnames = list(range(int(frame_start), int(frame_start + self.bsize)))

        # return batch of frames.
        return FrameBatch(
            img=self._img,
            start=self.start,
            end=self.end,
            finished=self.finished,
            fnames=fnames,
            bnum=(frame_start // self.bsize),
        )

    def get_metadata(self):
        """Return metadata in a format to put into a DVTOutput object."""
        return {"meta": DataFrame(self.meta, index=[0])}

    def _metadata(self):
        """Fill metadata attribute using metadata from the video source."""
        path, bname, filename, file_extension = _expand_path(self.input_path)
        return {
            "type": "video",
            "fps": self._video_cap.get(CAP_PROP_FPS),
            "frames": int(self._video_cap.get(CAP_PROP_FRAME_COUNT)),
            "height": int(self._video_cap.get(CAP_PROP_FRAME_HEIGHT)),
            "width": int(self._video_cap.get(CAP_PROP_FRAME_WIDTH)),
            "input_path": path,
            "input_bname": bname,
            "input_filename": filename,
            "input_file_extension": file_extension,
        }

    def _fill_bandwidth(self):
        """Read in the next set of frames from disk and store results.

        This should not be called directly, but only through the next_batch
        method. Otherwise the internal counters will become inconsistent.
        """
        for idx in range(self.bsize):
            self._continue_read, frame = self._video_cap.read()
            if self._continue_read:
                self._img[idx + self.bsize, :, :, :] = frame
            else:
                self._img[idx + self.bsize, :, :, :] = 0

Methods

def get_metadata(self)

Return metadata in a format to put into a DVTOutput object.

Expand source code
def get_metadata(self):
    """Return metadata in a format to put into a DVTOutput object."""
    return {"meta": DataFrame(self.meta, index=[0])}
def next_batch(self)

Move forward one batch and return the current FrameBatch object.

Returns

A FrameBatch object that contains the next set of frames.

Expand source code
def next_batch(self):
    """Move forward one batch and return the current FrameBatch object.

    Returns:
        A FrameBatch object that contains the next set of frames.
    """

    if self.finished:
        return

    # shift window over by one bandwidth
    self._img[: self.bsize, :, :, :] = self._img[self.bsize :, :, :, :]

    # fill up the bandwidth; with zeros at and of video input
    if self._continue_read:
        self._fill_bandwidth()
    else:
        self.finished = True
        self._img[self.bsize :, :, :, :] = 0

    # update counters
    frame_start = self.fcount
    self.start = self.end
    self.end = self._video_cap.get(CAP_PROP_POS_MSEC)
    self.fcount = self.fcount + self.bsize

    # get frame names
    fnames = list(range(int(frame_start), int(frame_start + self.bsize)))

    # return batch of frames.
    return FrameBatch(
        img=self._img,
        start=self.start,
        end=self.end,
        finished=self.finished,
        fnames=fnames,
        bnum=(frame_start // self.bsize),
    )
def reset(self)

Open connection to the video file.

Expand source code
def reset(self):
    """Open connection to the video file."""
    # start settings to
    self.fcount = 0
    self.finished = False
    self.start = 0
    self.end = 0
    self._video_cap = VideoCapture(self.input_path)
    self.meta = self._metadata()
    self.max_batch = ceil(self.meta["frames"] / self.bsize)

    self._img = zeros(
        (self.bsize * 2, self.meta["height"], self.meta["width"], 3),
        dtype=uint8,
    )
    self._fill_bandwidth()  # fill the buffer with the first batch
    self._continue_read = True  # is there any more input left
class VideoFrameInput (input_path)

An input object for extracting single frames from an input video.

Construct a new input from a video file.

Args

input_path : str
Path to the video file. Can be any file readable by the OpenCV function VideoCapture.
bsize : int
Number of frames to include in a batch. Defaults to 256.
Expand source code
class VideoFrameInput:
    """An input object for extracting single frames from an input video."""

    def __init__(self, input_path):
        """Construct a new input from a video file.

        Args:
            input_path (str): Path to the video file. Can be any file readable
                by the OpenCV function VideoCapture.
            bsize (int): Number of frames to include in a batch. Defaults to
                256.
        """
        self.input_path = _expand_path(input_path)[0]
        self.meta = None
        self.fcount = -1
        self.finished = False
        self._video_cap = None
        self.reset()

        super().__init__()

    def reset(self):
        """Open connection to the video file."""
        self.fcount = -1
        self.finished = False

        self._video_cap = VideoCapture(self.input_path)
        self._ftotal = int(self._video_cap.get(CAP_PROP_FRAME_COUNT))
        self.meta = self._metadata()

    def next_frame(self):
        """Get the next frame."""
        if self.finished:
            return

        # get the next frame and return
        self.fcount = self.fcount + 1
        _, frame = self._video_cap.read()
        self.finished = self._ftotal == (self.fcount + 1)
        return frame

    def get_metadata(self):
        """Return metadata in a format to put into DVTOutput"""
        return {"meta": DataFrame(self.meta, index=[0])}

    def _metadata(self):
        """Fill metadata attribute using metadata from the video source."""
        path, bname, filename, file_extension = _expand_path(self.input_path)
        return {
            "type": "video",
            "fps": self._video_cap.get(CAP_PROP_FPS),
            "frames": int(self._video_cap.get(CAP_PROP_FRAME_COUNT)),
            "height": int(self._video_cap.get(CAP_PROP_FRAME_HEIGHT)),
            "width": int(self._video_cap.get(CAP_PROP_FRAME_WIDTH)),
            "input_path": path,
            "input_bname": bname,
            "input_filename": filename,
            "input_file_extension": file_extension,
        }

Methods

def get_metadata(self)

Return metadata in a format to put into DVTOutput

Expand source code
def get_metadata(self):
    """Return metadata in a format to put into DVTOutput"""
    return {"meta": DataFrame(self.meta, index=[0])}
def next_frame(self)

Get the next frame.

Expand source code
def next_frame(self):
    """Get the next frame."""
    if self.finished:
        return

    # get the next frame and return
    self.fcount = self.fcount + 1
    _, frame = self._video_cap.read()
    self.finished = self._ftotal == (self.fcount + 1)
    return frame
def reset(self)

Open connection to the video file.

Expand source code
def reset(self):
    """Open connection to the video file."""
    self.fcount = -1
    self.finished = False

    self._video_cap = VideoCapture(self.input_path)
    self._ftotal = int(self._video_cap.get(CAP_PROP_FRAME_COUNT))
    self.meta = self._metadata()