mirror of
https://github.com/abhiTronix/vidgear.git
synced 2026-02-06 19:03:18 +00:00
- ✨ Added type annotations to improve code clarity and enable static type checking. - 🧑💻 Annotated all core and asyncio classes, except internal methods. - 🩹 Renamed type variables for consistency. CI/CD: - 💚 Updated codecov-action to `v4.5.0` to fix tokenless upload issues in GitHub Actions CI. - 💚 Replaced `codecov-uploader` with `codecovcli` to resolve "Bad CPU type in executable" bug in Azure Pipelines CI. --------- Co-authored-by: abhiTronix <abhi.una12@gmail.com>
This commit is contained in:
@@ -90,7 +90,7 @@ jobs:
|
||||
- name: Display exit code
|
||||
run: echo "Exit code was = $EXIT_CODE"
|
||||
- name: Upload coverage to Codecov
|
||||
uses: codecov/codecov-action@v3
|
||||
uses: codecov/codecov-action@v4.5.0
|
||||
with:
|
||||
name: ${{ matrix.python-version }}
|
||||
token: ${{ secrets.CODECOV_TOKEN }}
|
||||
|
||||
+4
-4
@@ -80,14 +80,14 @@ steps:
|
||||
- bash: |
|
||||
echo "Exit Code was: $(exit_code)"
|
||||
curl https://keybase.io/codecovsecurity/pgp_keys.asc | gpg --no-default-keyring --keyring trustedkeys.gpg --import
|
||||
curl -Os https://uploader.codecov.io/latest/macos/codecov
|
||||
curl -Os https://uploader.codecov.io/latest/macos/codecov.SHA256SUM
|
||||
curl -Os https://uploader.codecov.io/latest/macos/codecov.SHA256SUM.sig
|
||||
curl -Os https://cli.codecov.io/latest/macos/codecov
|
||||
curl -Os https://cli.codecov.io/latest/macos/codecov.SHA256SUM
|
||||
curl -Os https://cli.codecov.io/latest/macos/codecov.SHA256SUM.sig
|
||||
gpgv codecov.SHA256SUM.sig codecov.SHA256SUM
|
||||
shasum -a 256 -c codecov.SHA256SUM
|
||||
chmod +x codecov
|
||||
if [ "$(exit_code)" != "124" ]; then
|
||||
./codecov -t $CODECOV_TOKEN -f coverage.xml -C $(Build.SourceVersion) -B $(Build.SourceBranch) -b $(Build.BuildNumber);
|
||||
./codecov do-upload -t $CODECOV_TOKEN -f coverage.xml -C $(Build.SourceVersion) -B $(Build.SourceBranch) -b $(Build.BuildNumber);
|
||||
else
|
||||
echo "Timeout test - Skipped Codecov!";
|
||||
fi
|
||||
|
||||
@@ -30,6 +30,8 @@ from tqdm import tqdm
|
||||
from colorlog import ColoredFormatter
|
||||
from requests.adapters import HTTPAdapter
|
||||
from requests.packages.urllib3.util.retry import Retry
|
||||
from numpy.typing import NDArray
|
||||
from typing import Union
|
||||
|
||||
# import helper packages
|
||||
from ..helper import logger_handler, mkdir_safe
|
||||
@@ -64,7 +66,9 @@ class TimeoutHTTPAdapter(HTTPAdapter):
|
||||
return super().send(request, **kwargs)
|
||||
|
||||
|
||||
def create_blank_frame(frame=None, text="", logging=False):
|
||||
def create_blank_frame(
|
||||
frame: NDArray = None, text: str = "", logging: bool = False
|
||||
) -> NDArray:
|
||||
"""
|
||||
## create_blank_frame
|
||||
|
||||
@@ -103,7 +107,11 @@ def create_blank_frame(frame=None, text="", logging=False):
|
||||
return blank_frame
|
||||
|
||||
|
||||
async def reducer(frame=None, percentage=0, interpolation=cv2.INTER_LANCZOS4):
|
||||
async def reducer(
|
||||
frame: NDArray = None,
|
||||
percentage: Union[int, float] = 0,
|
||||
interpolation: int = cv2.INTER_LANCZOS4,
|
||||
) -> NDArray:
|
||||
"""
|
||||
## reducer
|
||||
|
||||
@@ -144,7 +152,12 @@ async def reducer(frame=None, percentage=0, interpolation=cv2.INTER_LANCZOS4):
|
||||
return cv2.resize(frame, dimensions, interpolation=interpolation)
|
||||
|
||||
|
||||
def generate_webdata(path, c_name="webgear", overwrite_default=False, logging=False):
|
||||
def generate_webdata(
|
||||
path: str,
|
||||
c_name: str = "webgear",
|
||||
overwrite_default: bool = False,
|
||||
logging: bool = False,
|
||||
) -> str:
|
||||
"""
|
||||
## generate_webdata
|
||||
|
||||
@@ -209,10 +222,7 @@ def generate_webdata(path, c_name="webgear", overwrite_default=False, logging=Fa
|
||||
css_static_dir, c_name=c_name, files=["custom.css"], logging=logging
|
||||
)
|
||||
download_webdata(
|
||||
js_static_dir,
|
||||
c_name=c_name,
|
||||
files=["custom.js"],
|
||||
logging=logging,
|
||||
js_static_dir, c_name=c_name, files=["custom.js"], logging=logging,
|
||||
)
|
||||
download_webdata(
|
||||
favicon_dir, c_name=c_name, files=["favicon-32x32.png"], logging=logging
|
||||
@@ -225,7 +235,9 @@ def generate_webdata(path, c_name="webgear", overwrite_default=False, logging=Fa
|
||||
return path
|
||||
|
||||
|
||||
def download_webdata(path, c_name="webgear", files=[], logging=False):
|
||||
def download_webdata(
|
||||
path: str, c_name: str = "webgear", files: list = [], logging: bool = False
|
||||
) -> str:
|
||||
"""
|
||||
## download_webdata
|
||||
|
||||
@@ -326,7 +338,7 @@ def download_webdata(path, c_name="webgear", files=[], logging=False):
|
||||
)
|
||||
|
||||
|
||||
def validate_webdata(path, files=[], logging=False):
|
||||
def validate_webdata(path: str, files: list = [], logging: bool = False) -> bool:
|
||||
"""
|
||||
## validate_auth_keys
|
||||
|
||||
|
||||
@@ -29,6 +29,8 @@ import string
|
||||
import secrets
|
||||
import platform
|
||||
from collections import deque
|
||||
from typing import Any, Tuple, AsyncGenerator, Union, TypeVar
|
||||
from numpy.typing import NDArray
|
||||
|
||||
# import helper packages
|
||||
from ..helper import logger_handler, import_dependency_safe, logcurr_vidgear_ver
|
||||
@@ -50,6 +52,9 @@ logger.propagate = False
|
||||
logger.addHandler(logger_handler())
|
||||
logger.setLevel(log.DEBUG)
|
||||
|
||||
# Type variable `T` representing class `NetGear_Async`.
|
||||
T = TypeVar("T", bound="NetGear_Async")
|
||||
|
||||
|
||||
class NetGear_Async:
|
||||
"""
|
||||
@@ -82,26 +87,26 @@ class NetGear_Async:
|
||||
def __init__(
|
||||
self,
|
||||
# NetGear_Async parameters
|
||||
address=None,
|
||||
port=None,
|
||||
protocol="tcp",
|
||||
pattern=0,
|
||||
receive_mode=False,
|
||||
timeout=0.0,
|
||||
address: str = None,
|
||||
port: str = None,
|
||||
protocol: str = "tcp",
|
||||
pattern: int = 0,
|
||||
receive_mode: bool = False,
|
||||
timeout: Union[int, float] = 0.0,
|
||||
# Videogear parameters
|
||||
enablePiCamera=False,
|
||||
stabilize=False,
|
||||
source=None,
|
||||
camera_num=0,
|
||||
stream_mode=False,
|
||||
backend=0,
|
||||
colorspace=None,
|
||||
resolution=(640, 480),
|
||||
framerate=25,
|
||||
time_delay=0,
|
||||
enablePiCamera: bool = False,
|
||||
stabilize: bool = False,
|
||||
source: Any = None,
|
||||
camera_num: int = 0,
|
||||
stream_mode: bool = False,
|
||||
backend: int = 0,
|
||||
colorspace: str = None,
|
||||
resolution: Tuple[int, int] = (640, 480),
|
||||
framerate: Union[int, float] = 25,
|
||||
time_delay: int = 0,
|
||||
# common parameters
|
||||
logging=False,
|
||||
**options
|
||||
logging: bool = False,
|
||||
**options: dict
|
||||
):
|
||||
"""
|
||||
This constructor method initializes the object state and attributes of the NetGear_Async class.
|
||||
@@ -320,7 +325,7 @@ class NetGear_Async:
|
||||
# create asyncio queue if bidirectional mode activated
|
||||
self.__queue = asyncio.Queue() if self.__bi_mode else None
|
||||
|
||||
def launch(self):
|
||||
def launch(self) -> T:
|
||||
"""
|
||||
Launches an asynchronous generators and loop executors for respective task.
|
||||
"""
|
||||
@@ -499,7 +504,7 @@ class NetGear_Async:
|
||||
)
|
||||
self.__logging and logger.debug(recv_confirmation)
|
||||
|
||||
async def recv_generator(self):
|
||||
async def recv_generator(self) -> AsyncGenerator[Tuple[Any, NDArray], NDArray]:
|
||||
"""
|
||||
A default Asynchronous Frame Generator for NetGear_Async's Receiver-end.
|
||||
"""
|
||||
@@ -613,8 +618,7 @@ class NetGear_Async:
|
||||
|
||||
# create return type dict without data
|
||||
rettype_dict = dict(
|
||||
return_type=(type(return_data).__name__),
|
||||
return_data=None,
|
||||
return_type=(type(return_data).__name__), return_data=None,
|
||||
)
|
||||
# encode it
|
||||
rettype_enc = msgpack.packb(rettype_dict)
|
||||
@@ -677,7 +681,7 @@ class NetGear_Async:
|
||||
# sleep for sometime
|
||||
await asyncio.sleep(0)
|
||||
|
||||
async def transceive_data(self, data=None):
|
||||
async def transceive_data(self, data: Any = None) -> Any:
|
||||
"""
|
||||
Bidirectional Mode exclusive method to Transmit data _(in Receive mode)_ and Receive data _(in Send mode)_.
|
||||
|
||||
@@ -756,7 +760,7 @@ class NetGear_Async:
|
||||
)
|
||||
)
|
||||
|
||||
def close(self, skip_loop=False):
|
||||
def close(self, skip_loop: bool = False) -> None:
|
||||
"""
|
||||
Terminates all NetGear_Async Asynchronous processes gracefully.
|
||||
|
||||
|
||||
@@ -26,6 +26,7 @@ import contextlib
|
||||
import numpy as np
|
||||
import logging as log
|
||||
from os.path import expanduser
|
||||
from typing import Any, Tuple, Union
|
||||
|
||||
# import helper packages
|
||||
from .helper import (
|
||||
@@ -79,18 +80,18 @@ class WebGear:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
enablePiCamera=False,
|
||||
stabilize=False,
|
||||
source=None,
|
||||
camera_num=0,
|
||||
stream_mode=False,
|
||||
backend=0,
|
||||
colorspace=None,
|
||||
resolution=(640, 480),
|
||||
framerate=25,
|
||||
logging=False,
|
||||
time_delay=0,
|
||||
**options
|
||||
enablePiCamera: bool = False,
|
||||
stabilize: bool = False,
|
||||
source: Any = None,
|
||||
camera_num: int = 0,
|
||||
stream_mode: bool = False,
|
||||
backend: int = 0,
|
||||
colorspace: str = None,
|
||||
resolution: Tuple[int, int] = (640, 480),
|
||||
framerate: Union[int, float] = 25,
|
||||
logging: bool = False,
|
||||
time_delay: int = 0,
|
||||
**options: dict
|
||||
):
|
||||
"""
|
||||
This constructor method initializes the object state and attributes of the WebGear class.
|
||||
@@ -378,7 +379,7 @@ class WebGear:
|
||||
# keeps check if producer loop should be running
|
||||
self.__isrunning = True
|
||||
|
||||
def __call__(self):
|
||||
def __call__(self) -> Starlette:
|
||||
"""
|
||||
Implements a custom Callable method for WebGear application.
|
||||
"""
|
||||
@@ -554,7 +555,7 @@ class WebGear:
|
||||
# close Video Server
|
||||
self.shutdown()
|
||||
|
||||
def shutdown(self):
|
||||
def shutdown(self) -> None:
|
||||
"""
|
||||
Implements a Callable to be run on application shutdown
|
||||
"""
|
||||
|
||||
@@ -26,6 +26,7 @@ import fractions
|
||||
import asyncio
|
||||
import logging as log
|
||||
from os.path import expanduser
|
||||
from typing import Any, Union, Tuple
|
||||
|
||||
# import helper packages
|
||||
from .helper import (
|
||||
@@ -77,6 +78,7 @@ if not (aiortc is None):
|
||||
from aiortc.contrib.media import MediaRelay
|
||||
from aiortc.mediastreams import MediaStreamError
|
||||
from av import VideoFrame # aiortc dependency
|
||||
from av.frame import Frame as AVFrame # imported for type annotation
|
||||
|
||||
class RTC_VideoServer(VideoStreamTrack):
|
||||
"""
|
||||
@@ -86,18 +88,18 @@ if not (aiortc is None):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
enablePiCamera=False,
|
||||
stabilize=False,
|
||||
source=None,
|
||||
camera_num=0,
|
||||
stream_mode=False,
|
||||
backend=0,
|
||||
colorspace=None,
|
||||
resolution=(640, 480),
|
||||
framerate=25,
|
||||
logging=False,
|
||||
time_delay=0,
|
||||
**options
|
||||
enablePiCamera: bool = False,
|
||||
stabilize: bool = False,
|
||||
source: Any = None,
|
||||
camera_num: int = 0,
|
||||
stream_mode: bool = False,
|
||||
backend: int = 0,
|
||||
colorspace: str = None,
|
||||
resolution: Tuple[int, int] = (640, 480),
|
||||
framerate: Union[int, float] = 25,
|
||||
logging: bool = False,
|
||||
time_delay: int = 0,
|
||||
**options: dict
|
||||
):
|
||||
"""
|
||||
This constructor method initializes the object state and attributes of the RTC_VideoServer class.
|
||||
@@ -194,7 +196,7 @@ if not (aiortc is None):
|
||||
# handles reset signal
|
||||
self.__reset_enabled = False
|
||||
|
||||
def launch(self):
|
||||
def launch(self) -> None:
|
||||
"""
|
||||
Launches VideoGear stream
|
||||
"""
|
||||
@@ -204,7 +206,7 @@ if not (aiortc is None):
|
||||
if hasattr(self.__stream, "start") and callable(self.__stream.start):
|
||||
self.__stream.start()
|
||||
|
||||
async def next_timestamp(self):
|
||||
async def next_timestamp(self) -> Tuple[int, fractions.Fraction]:
|
||||
"""
|
||||
VideoStreamTrack internal method for generating accurate timestamp.
|
||||
"""
|
||||
@@ -229,7 +231,7 @@ if not (aiortc is None):
|
||||
self.is_running = True
|
||||
return self._timestamp, VIDEO_TIME_BASE
|
||||
|
||||
async def recv(self):
|
||||
async def recv(self) -> AVFrame:
|
||||
"""
|
||||
A coroutine function that yields `av.frame.Frame`.
|
||||
"""
|
||||
@@ -296,14 +298,14 @@ if not (aiortc is None):
|
||||
# return `av.frame.Frame`
|
||||
return frame
|
||||
|
||||
async def reset(self):
|
||||
async def reset(self) -> None:
|
||||
"""
|
||||
Resets timestamp clock
|
||||
"""
|
||||
self.__reset_enabled = True
|
||||
self.is_running = False
|
||||
|
||||
def terminate(self):
|
||||
def terminate(self) -> None:
|
||||
"""
|
||||
Gracefully terminates VideoGear stream
|
||||
"""
|
||||
@@ -340,18 +342,18 @@ class WebGear_RTC:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
enablePiCamera=False,
|
||||
stabilize=False,
|
||||
source=None,
|
||||
camera_num=0,
|
||||
stream_mode=False,
|
||||
backend=0,
|
||||
colorspace=None,
|
||||
resolution=(640, 480),
|
||||
framerate=25,
|
||||
logging=False,
|
||||
time_delay=0,
|
||||
**options
|
||||
enablePiCamera: bool = False,
|
||||
stabilize: bool = False,
|
||||
source: Any = None,
|
||||
camera_num: int = 0,
|
||||
stream_mode: bool = False,
|
||||
backend: int = 0,
|
||||
colorspace: str = None,
|
||||
resolution: Tuple[int, int] = (640, 480),
|
||||
framerate: Union[int, float] = 25,
|
||||
logging: bool = False,
|
||||
time_delay: int = 0,
|
||||
**options: dict
|
||||
):
|
||||
"""
|
||||
This constructor method initializes the object state and attributes of the WebGear_RTC class.
|
||||
@@ -420,9 +422,9 @@ class WebGear_RTC:
|
||||
if isinstance(value, bool):
|
||||
if value:
|
||||
self.__relay = MediaRelay()
|
||||
options["enable_infinite_frames"] = (
|
||||
True # enforce infinite frames
|
||||
)
|
||||
options[
|
||||
"enable_infinite_frames"
|
||||
] = True # enforce infinite frames
|
||||
logger.critical(
|
||||
"Enabled live broadcasting for Peer connection(s)."
|
||||
)
|
||||
@@ -506,7 +508,7 @@ class WebGear_RTC:
|
||||
# collects peer RTC connections
|
||||
self.__pcs = set()
|
||||
|
||||
def __call__(self):
|
||||
def __call__(self) -> Starlette:
|
||||
"""
|
||||
Implements a custom Callable method for WebGear_RTC application.
|
||||
"""
|
||||
@@ -654,7 +656,7 @@ class WebGear_RTC:
|
||||
await asyncio.gather(*coros)
|
||||
self.__pcs.clear()
|
||||
|
||||
def shutdown(self):
|
||||
def shutdown(self) -> None:
|
||||
"""
|
||||
Gracefully shutdown video-server
|
||||
"""
|
||||
|
||||
+19
-11
@@ -24,6 +24,8 @@ import time
|
||||
import queue
|
||||
import logging as log
|
||||
from threading import Thread, Event
|
||||
from typing import TypeVar, Optional, Any
|
||||
from numpy.typing import NDArray
|
||||
|
||||
# import helper packages
|
||||
from .helper import (
|
||||
@@ -57,7 +59,9 @@ if not (yt_dlp is None):
|
||||
options (dict): provides ability to alter yt-dlp backend params.
|
||||
"""
|
||||
|
||||
def __init__(self, source_url, logging=False, **stream_params):
|
||||
def __init__(
|
||||
self, source_url: str, logging: bool = False, **stream_params: dict
|
||||
):
|
||||
# initialize global params
|
||||
self.__logging = logging
|
||||
self.is_livestream = False
|
||||
@@ -198,6 +202,10 @@ if not (yt_dlp is None):
|
||||
return streams
|
||||
|
||||
|
||||
# Type variable `T` representing class `CamGear`.
|
||||
T = TypeVar("T", bound="CamGear")
|
||||
|
||||
|
||||
class CamGear:
|
||||
"""
|
||||
CamGear supports a diverse range of video streams which can handle/control video stream almost any IP/USB Cameras, multimedia video file format (upto 4k tested),
|
||||
@@ -212,13 +220,13 @@ class CamGear:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
source=0,
|
||||
stream_mode=False,
|
||||
backend=0,
|
||||
colorspace=None,
|
||||
logging=False,
|
||||
time_delay=0,
|
||||
**options
|
||||
source: Any = 0,
|
||||
stream_mode: bool = False,
|
||||
backend: int = 0,
|
||||
colorspace: str = None,
|
||||
logging: bool = False,
|
||||
time_delay: int = 0,
|
||||
**options: dict
|
||||
):
|
||||
"""
|
||||
This constructor method initializes the object state and attributes of the CamGear class.
|
||||
@@ -414,7 +422,7 @@ class CamGear:
|
||||
# initialize stream read flag event
|
||||
self.__stream_read = Event()
|
||||
|
||||
def start(self):
|
||||
def start(self) -> T:
|
||||
"""
|
||||
Launches the internal *Threaded Frames Extractor* daemon.
|
||||
|
||||
@@ -487,7 +495,7 @@ class CamGear:
|
||||
# release resources
|
||||
self.stream.release()
|
||||
|
||||
def read(self):
|
||||
def read(self) -> Optional[NDArray]:
|
||||
"""
|
||||
Extracts frames synchronously from monitored queue, while maintaining a fixed-length frame buffer in the memory,
|
||||
and blocks the thread if the queue is full.
|
||||
@@ -505,7 +513,7 @@ class CamGear:
|
||||
else None
|
||||
)
|
||||
|
||||
def stop(self):
|
||||
def stop(self) -> None:
|
||||
"""
|
||||
Safely terminates the thread, and release the multi-threaded resources.
|
||||
"""
|
||||
|
||||
+54
-38
@@ -44,6 +44,8 @@ from colorlog import ColoredFormatter
|
||||
from pkg_resources import parse_version
|
||||
from requests.adapters import HTTPAdapter, Retry
|
||||
from ..version import __version__
|
||||
from typing import List, Dict, Optional, Union
|
||||
from numpy.typing import NDArray
|
||||
|
||||
|
||||
def logger_handler():
|
||||
@@ -193,11 +195,7 @@ def deprecated(parameter=None, message=None, stacklevel=2):
|
||||
|
||||
|
||||
def import_dependency_safe(
|
||||
name,
|
||||
error="raise",
|
||||
pkg_name=None,
|
||||
min_version=None,
|
||||
custom_message=None,
|
||||
name, error="raise", pkg_name=None, min_version=None, custom_message=None,
|
||||
):
|
||||
"""
|
||||
## import_dependency_safe
|
||||
@@ -313,7 +311,7 @@ class TimeoutHTTPAdapter(HTTPAdapter):
|
||||
return super().send(request, **kwargs)
|
||||
|
||||
|
||||
def check_CV_version():
|
||||
def check_CV_version() -> int:
|
||||
"""
|
||||
## check_CV_version
|
||||
|
||||
@@ -325,7 +323,7 @@ def check_CV_version():
|
||||
return 3
|
||||
|
||||
|
||||
def check_open_port(address, port=22):
|
||||
def check_open_port(address: str, port: int = 22) -> bool:
|
||||
"""
|
||||
## check_open_port
|
||||
|
||||
@@ -346,7 +344,9 @@ def check_open_port(address, port=22):
|
||||
return False
|
||||
|
||||
|
||||
def check_WriteAccess(path, is_windows=False, logging=False):
|
||||
def check_WriteAccess(
|
||||
path: str, is_windows: bool = False, logging: bool = False
|
||||
) -> bool:
|
||||
"""
|
||||
## check_WriteAccess
|
||||
|
||||
@@ -403,7 +403,7 @@ def check_WriteAccess(path, is_windows=False, logging=False):
|
||||
return write_accessible
|
||||
|
||||
|
||||
def check_gstreamer_support(logging=False):
|
||||
def check_gstreamer_support(logging: bool = False) -> bool:
|
||||
"""
|
||||
## check_gstreamer_support
|
||||
|
||||
@@ -429,7 +429,7 @@ def check_gstreamer_support(logging=False):
|
||||
return False
|
||||
|
||||
|
||||
def get_supported_resolution(value, logging=False):
|
||||
def get_supported_resolution(value: str, logging=False) -> str:
|
||||
"""
|
||||
## get_supported_resolution
|
||||
|
||||
@@ -475,7 +475,7 @@ def get_supported_resolution(value, logging=False):
|
||||
return stream_resolution
|
||||
|
||||
|
||||
def dimensions_to_resolutions(value):
|
||||
def dimensions_to_resolutions(value: List[str]) -> List[str]:
|
||||
"""
|
||||
## dimensions_to_resolutions
|
||||
|
||||
@@ -502,7 +502,7 @@ def dimensions_to_resolutions(value):
|
||||
)
|
||||
|
||||
|
||||
def get_supported_vencoders(path):
|
||||
def get_supported_vencoders(path: str) -> List[str]:
|
||||
"""
|
||||
## get_supported_vencoders
|
||||
|
||||
@@ -529,7 +529,7 @@ def get_supported_vencoders(path):
|
||||
return [[s for s in o.split(" ")][-1] for o in outputs]
|
||||
|
||||
|
||||
def get_supported_demuxers(path):
|
||||
def get_supported_demuxers(path: str) -> List[str]:
|
||||
"""
|
||||
## get_supported_demuxers
|
||||
|
||||
@@ -551,7 +551,7 @@ def get_supported_demuxers(path):
|
||||
return [o.strip() if not ("," in o) else o.split(",")[-1].strip() for o in outputs]
|
||||
|
||||
|
||||
def get_supported_pixfmts(path):
|
||||
def get_supported_pixfmts(path: str) -> List[str]:
|
||||
"""
|
||||
## get_supported_pixfmts
|
||||
|
||||
@@ -579,7 +579,7 @@ def get_supported_pixfmts(path):
|
||||
return [[s for s in o[0].split(" ")][-1] for o in outputs if len(o) == 3]
|
||||
|
||||
|
||||
def is_valid_url(path, url=None, logging=False):
|
||||
def is_valid_url(path: str, url: str = None, logging: bool = False) -> bool:
|
||||
"""
|
||||
## is_valid_url
|
||||
|
||||
@@ -620,7 +620,9 @@ def is_valid_url(path, url=None, logging=False):
|
||||
return False
|
||||
|
||||
|
||||
def validate_video(path, video_path=None, logging=False):
|
||||
def validate_video(
|
||||
path: str, video_path: str = None, logging: bool = False
|
||||
) -> Optional[dict]:
|
||||
"""
|
||||
## validate_video
|
||||
|
||||
@@ -658,7 +660,9 @@ def validate_video(path, video_path=None, logging=False):
|
||||
return result if (len(result) == 2) else None
|
||||
|
||||
|
||||
def create_blank_frame(frame=None, text="", logging=False):
|
||||
def create_blank_frame(
|
||||
frame: NDArray = None, text: str = "", logging: bool = False
|
||||
) -> NDArray:
|
||||
"""
|
||||
## create_blank_frame
|
||||
|
||||
@@ -696,7 +700,7 @@ def create_blank_frame(frame=None, text="", logging=False):
|
||||
return blank_frame
|
||||
|
||||
|
||||
def extract_time(value):
|
||||
def extract_time(value: str) -> int:
|
||||
"""
|
||||
## extract_time
|
||||
|
||||
@@ -715,7 +719,7 @@ def extract_time(value):
|
||||
t_duration = re.findall(r"\d{2}:\d{2}:\d{2}(?:\.\d{2})?", stripped_data)
|
||||
return (
|
||||
sum(
|
||||
float(x) * 60**i
|
||||
float(x) * 60 ** i
|
||||
for i, x in enumerate(reversed(t_duration[0].split(":")))
|
||||
)
|
||||
if t_duration
|
||||
@@ -723,7 +727,7 @@ def extract_time(value):
|
||||
)
|
||||
|
||||
|
||||
def validate_audio(path, source=None):
|
||||
def validate_audio(path: str, source: Union[str, list] = None) -> str:
|
||||
"""
|
||||
## validate_audio
|
||||
|
||||
@@ -794,7 +798,7 @@ def validate_audio(path, source=None):
|
||||
return ""
|
||||
|
||||
|
||||
def get_audio_bitrate(samplerate, channels, bit_depth):
|
||||
def get_audio_bitrate(samplerate: int, channels: int, bit_depth: float) -> int:
|
||||
"""
|
||||
## get_audio_bitrate
|
||||
|
||||
@@ -810,7 +814,7 @@ def get_audio_bitrate(samplerate, channels, bit_depth):
|
||||
return round((samplerate * channels * bit_depth) / 1000)
|
||||
|
||||
|
||||
def get_video_bitrate(width, height, fps, bpp):
|
||||
def get_video_bitrate(width: int, height: int, fps: float, bpp: float) -> int:
|
||||
"""
|
||||
## get_video_bitrate
|
||||
|
||||
@@ -827,7 +831,7 @@ def get_video_bitrate(width, height, fps, bpp):
|
||||
return round((width * height * bpp * fps) / 1000)
|
||||
|
||||
|
||||
def delete_file_safe(file_path):
|
||||
def delete_file_safe(file_path: str) -> None:
|
||||
"""
|
||||
## delete_ext_safe
|
||||
|
||||
@@ -843,7 +847,7 @@ def delete_file_safe(file_path):
|
||||
logger.exception(str(e))
|
||||
|
||||
|
||||
def mkdir_safe(dir_path, logging=False):
|
||||
def mkdir_safe(dir_path: str, logging: bool = False) -> None:
|
||||
"""
|
||||
## mkdir_safe
|
||||
|
||||
@@ -862,7 +866,9 @@ def mkdir_safe(dir_path, logging=False):
|
||||
raise
|
||||
|
||||
|
||||
def delete_ext_safe(dir_path, extensions=[], logging=False):
|
||||
def delete_ext_safe(
|
||||
dir_path: str, extensions: list = [], logging: bool = False
|
||||
) -> None:
|
||||
"""
|
||||
## delete_ext_safe
|
||||
|
||||
@@ -898,7 +904,7 @@ def delete_ext_safe(dir_path, extensions=[], logging=False):
|
||||
logging and logger.debug("Deleted file: `{}`".format(file))
|
||||
|
||||
|
||||
def capPropId(property, logging=True):
|
||||
def capPropId(property: str, logging: bool = True) -> int:
|
||||
"""
|
||||
## capPropId
|
||||
|
||||
@@ -920,7 +926,7 @@ def capPropId(property, logging=True):
|
||||
return integer_value
|
||||
|
||||
|
||||
def retrieve_best_interpolation(interpolations):
|
||||
def retrieve_best_interpolation(interpolations: list) -> Optional[int]:
|
||||
"""
|
||||
## retrieve_best_interpolation
|
||||
Retrieves best interpolation for resizing
|
||||
@@ -937,7 +943,11 @@ def retrieve_best_interpolation(interpolations):
|
||||
return None
|
||||
|
||||
|
||||
def reducer(frame=None, percentage=0, interpolation=cv2.INTER_LANCZOS4):
|
||||
def reducer(
|
||||
frame: NDArray = None,
|
||||
percentage: Union[int, float] = 0,
|
||||
interpolation: int = cv2.INTER_LANCZOS4,
|
||||
) -> NDArray:
|
||||
"""
|
||||
## reducer
|
||||
|
||||
@@ -978,7 +988,7 @@ def reducer(frame=None, percentage=0, interpolation=cv2.INTER_LANCZOS4):
|
||||
return cv2.resize(frame, dimensions, interpolation=interpolation)
|
||||
|
||||
|
||||
def dict2Args(param_dict):
|
||||
def dict2Args(param_dict: dict) -> list:
|
||||
"""
|
||||
## dict2Args
|
||||
|
||||
@@ -1008,8 +1018,11 @@ def dict2Args(param_dict):
|
||||
|
||||
|
||||
def get_valid_ffmpeg_path(
|
||||
custom_ffmpeg="", is_windows=False, ffmpeg_download_path="", logging=False
|
||||
):
|
||||
custom_ffmpeg: str = "",
|
||||
is_windows: bool = False,
|
||||
ffmpeg_download_path: str = "",
|
||||
logging: bool = False,
|
||||
) -> Union[str, bool]:
|
||||
"""
|
||||
## get_valid_ffmpeg_path
|
||||
|
||||
@@ -1100,7 +1113,9 @@ def get_valid_ffmpeg_path(
|
||||
return final_path if validate_ffmpeg(final_path, logging=logging) else False
|
||||
|
||||
|
||||
def download_ffmpeg_binaries(path, os_windows=False, os_bit=""):
|
||||
def download_ffmpeg_binaries(
|
||||
path: str, os_windows: bool = False, os_bit: str = ""
|
||||
) -> str:
|
||||
"""
|
||||
## download_ffmpeg_binaries
|
||||
|
||||
@@ -1124,8 +1139,7 @@ def download_ffmpeg_binaries(path, os_windows=False, os_bit=""):
|
||||
os.path.abspath(path), "ffmpeg-static-{}-gpl.zip".format(os_bit)
|
||||
)
|
||||
file_path = os.path.join(
|
||||
os.path.abspath(path),
|
||||
"ffmpeg-static-{}-gpl/bin/ffmpeg.exe".format(os_bit),
|
||||
os.path.abspath(path), "ffmpeg-static-{}-gpl/bin/ffmpeg.exe".format(os_bit),
|
||||
)
|
||||
base_path, _ = os.path.split(file_name) # extract file base path
|
||||
# check if file already exists
|
||||
@@ -1185,7 +1199,7 @@ def download_ffmpeg_binaries(path, os_windows=False, os_bit=""):
|
||||
return final_path
|
||||
|
||||
|
||||
def validate_ffmpeg(path, logging=False):
|
||||
def validate_ffmpeg(path: str, logging: bool = False) -> bool:
|
||||
"""
|
||||
## validate_ffmpeg
|
||||
|
||||
@@ -1215,7 +1229,7 @@ def validate_ffmpeg(path, logging=False):
|
||||
return True
|
||||
|
||||
|
||||
def check_output(*args, **kwargs):
|
||||
def check_output(*args: Union[list, tuple], **kwargs: dict) -> bytes:
|
||||
"""
|
||||
## check_output
|
||||
|
||||
@@ -1254,7 +1268,9 @@ def check_output(*args, **kwargs):
|
||||
return output if not (retrieve_stderr) else stderr
|
||||
|
||||
|
||||
def generate_auth_certificates(path, overwrite=False, logging=False):
|
||||
def generate_auth_certificates(
|
||||
path: str, overwrite: bool = False, logging: bool = False
|
||||
) -> tuple:
|
||||
"""
|
||||
## generate_auth_certificates
|
||||
|
||||
@@ -1366,7 +1382,7 @@ def generate_auth_certificates(path, overwrite=False, logging=False):
|
||||
return (keys_dir, secret_keys_dir, public_keys_dir)
|
||||
|
||||
|
||||
def validate_auth_keys(path, extension):
|
||||
def validate_auth_keys(path: str, extension: str) -> bool:
|
||||
"""
|
||||
## validate_auth_keys
|
||||
|
||||
|
||||
+12
-10
@@ -30,6 +30,8 @@ import logging as log
|
||||
from threading import Thread
|
||||
from collections import deque
|
||||
from os.path import expanduser
|
||||
from numpy.typing import NDArray
|
||||
from typing import Optional, Any
|
||||
|
||||
# import helper packages
|
||||
from .helper import (
|
||||
@@ -114,13 +116,13 @@ class NetGear:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
address=None,
|
||||
port=None,
|
||||
protocol=None,
|
||||
pattern=0,
|
||||
receive_mode=False,
|
||||
logging=False,
|
||||
**options
|
||||
address: str = None,
|
||||
port: str = None,
|
||||
protocol: str = None,
|
||||
pattern: int = 0,
|
||||
receive_mode: bool = False,
|
||||
logging: bool = False,
|
||||
**options: dict
|
||||
):
|
||||
"""
|
||||
This constructor method initializes the object state and attributes of the NetGear class.
|
||||
@@ -1209,7 +1211,7 @@ class NetGear:
|
||||
# otherwise append recovered frame to queue
|
||||
self.__queue.append(frame)
|
||||
|
||||
def recv(self, return_data=None):
|
||||
def recv(self, return_data=None) -> Optional[NDArray]:
|
||||
"""
|
||||
A Receiver end method, that extracts received frames synchronously from monitored deque, while maintaining a
|
||||
fixed-length frame buffer in the memory, and blocks the thread if the deque is full.
|
||||
@@ -1246,7 +1248,7 @@ class NetGear:
|
||||
# otherwise return NoneType
|
||||
return None
|
||||
|
||||
def send(self, frame, message=None):
|
||||
def send(self, frame: NDArray, message: Any = None) -> Optional[Any]:
|
||||
"""
|
||||
A Server end method, that sends the data and frames over the network to Client(s).
|
||||
|
||||
@@ -1484,7 +1486,7 @@ class NetGear:
|
||||
# log confirmation
|
||||
self.__logging and logger.debug(recv_confirmation)
|
||||
|
||||
def close(self, kill=False):
|
||||
def close(self, kill: bool = False) -> None:
|
||||
"""
|
||||
Safely terminates the threads, and NetGear resources.
|
||||
|
||||
|
||||
+19
-21
@@ -25,6 +25,8 @@ import os
|
||||
import time
|
||||
import logging as log
|
||||
from threading import Thread
|
||||
from typing import Tuple, Union, TypeVar
|
||||
from numpy.typing import NDArray
|
||||
|
||||
# import helper packages
|
||||
from .helper import (
|
||||
@@ -54,6 +56,9 @@ logger.addHandler(logger_handler())
|
||||
logger.setLevel(log.DEBUG)
|
||||
|
||||
|
||||
PIGear = TypeVar("PIGear", bound="PiGear")
|
||||
|
||||
|
||||
class PiGear:
|
||||
"""
|
||||
PiGear implements a seamless and robust wrapper around the [picamera2](https://github.com/raspberrypi/picamera2) python library, simplifying integration with minimal code changes and ensuring a
|
||||
@@ -84,13 +89,13 @@ class PiGear:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
camera_num=0,
|
||||
resolution=(640, 480),
|
||||
framerate=30,
|
||||
colorspace=None,
|
||||
logging=False,
|
||||
time_delay=0,
|
||||
**options
|
||||
camera_num: int = 0,
|
||||
resolution: Tuple[int, int] = (640, 480),
|
||||
framerate: Union[int, float] = 30,
|
||||
colorspace: str = None,
|
||||
logging: bool = False,
|
||||
time_delay: int = 0,
|
||||
**options: dict
|
||||
):
|
||||
"""
|
||||
This constructor method initializes the object state and attributes of the PiGear class.
|
||||
@@ -302,11 +307,7 @@ class PiGear:
|
||||
# unless format is either BGR or BGRA
|
||||
(
|
||||
not (colorspace is None)
|
||||
or options["format"]
|
||||
in [
|
||||
"RGB888",
|
||||
"XRGB8888",
|
||||
]
|
||||
or options["format"] in ["RGB888", "XRGB8888",]
|
||||
) and logger.warning(
|
||||
"Custom Output frames `format={}` detected. It is advised to define `colorspace` parameter or handle this format manually in your code!".format(
|
||||
options["format"]
|
||||
@@ -336,8 +337,7 @@ class PiGear:
|
||||
invalid_sensor_keys = set(list(sensor)) - set(valid_sensor)
|
||||
invalid_sensor_keys and logger.warning(
|
||||
"Discarding sensor properties NOT supported by current Camera Sensor: `{}`. Only supported are: (`{}`)".format(
|
||||
"`, `".join(invalid_sensor_keys),
|
||||
"`, `".join(valid_sensor),
|
||||
"`, `".join(invalid_sensor_keys), "`, `".join(valid_sensor),
|
||||
)
|
||||
)
|
||||
# delete all unsupported control keys
|
||||
@@ -497,7 +497,7 @@ class PiGear:
|
||||
# initialize termination flag
|
||||
self.__terminate = False
|
||||
|
||||
def start(self):
|
||||
def start(self) -> PIGear:
|
||||
"""
|
||||
Launches the internal *Threaded Frames Extractor* daemon
|
||||
|
||||
@@ -588,7 +588,7 @@ class PiGear:
|
||||
self.__rawCapture.close()
|
||||
self.__camera.close()
|
||||
|
||||
def read(self):
|
||||
def read(self) -> NDArray:
|
||||
"""
|
||||
Extracts frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory,
|
||||
and blocks the thread if the deque is full.
|
||||
@@ -608,16 +608,14 @@ class PiGear:
|
||||
# clear frame
|
||||
self.frame = None
|
||||
# re-raise error for debugging
|
||||
error_msg = (
|
||||
"[PiGear:ERROR] :: Camera Module API failure occurred: {}".format(
|
||||
self.__exceptions[1]
|
||||
)
|
||||
error_msg = "[PiGear:ERROR] :: Camera Module API failure occurred: {}".format(
|
||||
self.__exceptions[1]
|
||||
)
|
||||
raise RuntimeError(error_msg).with_traceback(self.__exceptions[2])
|
||||
# return the frame
|
||||
return self.frame
|
||||
|
||||
def stop(self):
|
||||
def stop(self) -> None:
|
||||
"""
|
||||
Safely terminates the thread, and release the multi-threaded resources.
|
||||
"""
|
||||
|
||||
@@ -26,6 +26,8 @@ import numpy as np
|
||||
import logging as log
|
||||
from threading import Thread, Event
|
||||
from collections import OrderedDict
|
||||
from typing import TypeVar
|
||||
from numpy.typing import NDArray
|
||||
|
||||
# import helper packages
|
||||
from .helper import (
|
||||
@@ -48,6 +50,9 @@ logger.propagate = False
|
||||
logger.addHandler(logger_handler())
|
||||
logger.setLevel(log.DEBUG)
|
||||
|
||||
# Type variable `T` representing class `ScreenGear`.
|
||||
T = TypeVar("T", bound="ScreenGear")
|
||||
|
||||
|
||||
class ScreenGear:
|
||||
"""
|
||||
@@ -61,7 +66,12 @@ class ScreenGear:
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, monitor=None, backend=None, colorspace=None, logging=False, **options
|
||||
self,
|
||||
monitor: int = None,
|
||||
backend: str = None,
|
||||
colorspace: str = None,
|
||||
logging: bool = False,
|
||||
**options: dict
|
||||
):
|
||||
"""
|
||||
This constructor method initializes the object state and attributes of the ScreenGear class.
|
||||
@@ -273,7 +283,7 @@ class ScreenGear:
|
||||
# initialize termination flag
|
||||
self.__terminate = Event()
|
||||
|
||||
def start(self):
|
||||
def start(self) -> T:
|
||||
"""
|
||||
Launches the internal *Threaded Frames Extractor* daemon
|
||||
|
||||
@@ -284,8 +294,7 @@ class ScreenGear:
|
||||
self.__thread.start()
|
||||
if self.__backend == "dxcam":
|
||||
self.__capture_object.start(
|
||||
target_fps=self.__target_fps,
|
||||
video_mode=True,
|
||||
target_fps=self.__target_fps, video_mode=True,
|
||||
)
|
||||
self.__logging and self.__target_fps and logger.debug(
|
||||
"Targeting FPS: {}".format(self.__target_fps)
|
||||
@@ -361,7 +370,7 @@ class ScreenGear:
|
||||
self.__capture_object.stop()
|
||||
del self.__capture_object
|
||||
|
||||
def read(self):
|
||||
def read(self) -> NDArray:
|
||||
"""
|
||||
Extracts frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory,
|
||||
and blocks the thread if the deque is full.
|
||||
@@ -371,7 +380,7 @@ class ScreenGear:
|
||||
# return the frame
|
||||
return self.frame
|
||||
|
||||
def stop(self):
|
||||
def stop(self) -> None:
|
||||
"""
|
||||
Safely terminates the thread, and release the resources.
|
||||
"""
|
||||
|
||||
@@ -25,6 +25,7 @@ import cv2
|
||||
import numpy as np
|
||||
import logging as log
|
||||
from collections import deque
|
||||
from typing import Optional
|
||||
|
||||
# import helper packages
|
||||
from .helper import (
|
||||
@@ -53,11 +54,11 @@ class Stabilizer:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
smoothing_radius=25,
|
||||
border_type="black",
|
||||
border_size=0,
|
||||
crop_n_zoom=False,
|
||||
logging=False,
|
||||
smoothing_radius: int = 25,
|
||||
border_type: str = "black",
|
||||
border_size: int = 0,
|
||||
crop_n_zoom: bool = False,
|
||||
logging: bool = False,
|
||||
):
|
||||
"""
|
||||
This constructor method initializes the object state and attributes of the Stabilizer class.
|
||||
@@ -151,7 +152,7 @@ class Stabilizer:
|
||||
# define normalized box filter
|
||||
self.__box_filter = np.ones(smoothing_radius) / smoothing_radius
|
||||
|
||||
def stabilize(self, frame):
|
||||
def stabilize(self, frame: np.ndarray) -> Optional[np.ndarray]:
|
||||
"""
|
||||
This method takes an unstabilized video frame, and returns a stabilized one.
|
||||
|
||||
@@ -382,7 +383,7 @@ class Stabilizer:
|
||||
# finally return stabilized frame
|
||||
return frame_stabilized
|
||||
|
||||
def clean(self):
|
||||
def clean(self) -> None:
|
||||
"""
|
||||
Cleans Stabilizer resources
|
||||
"""
|
||||
|
||||
+29
-28
@@ -28,6 +28,7 @@ import logging as log
|
||||
import subprocess as sp
|
||||
from tqdm import tqdm
|
||||
from collections import OrderedDict
|
||||
from numpy.typing import NDArray
|
||||
|
||||
# import helper packages
|
||||
from .helper import (
|
||||
@@ -68,7 +69,12 @@ class StreamGear:
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, output="", format="dash", custom_ffmpeg="", logging=False, **stream_params
|
||||
self,
|
||||
output: str = "",
|
||||
format: str = "dash",
|
||||
custom_ffmpeg: str = "",
|
||||
logging: bool = False,
|
||||
**stream_params: dict
|
||||
):
|
||||
"""
|
||||
This constructor method initializes the object state and attributes of the StreamGear class.
|
||||
@@ -353,9 +359,7 @@ class StreamGear:
|
||||
elif os.path.isfile(abs_path) and self.__clear_assets:
|
||||
# clear previous assets if specified
|
||||
delete_ext_safe(
|
||||
os.path.dirname(abs_path),
|
||||
assets_exts,
|
||||
logging=self.__logging,
|
||||
os.path.dirname(abs_path), assets_exts, logging=self.__logging,
|
||||
)
|
||||
# check if path has valid file extension
|
||||
assert abs_path.endswith(
|
||||
@@ -403,7 +407,7 @@ class StreamGear:
|
||||
parameter="rgb_mode",
|
||||
message="The `rgb_mode` parameter is deprecated and will be removed in a future version. Only BGR format frames will be supported going forward.",
|
||||
)
|
||||
def stream(self, frame, rgb_mode=False):
|
||||
def stream(self, frame: NDArray, rgb_mode: bool = False) -> None:
|
||||
"""
|
||||
Pipes `ndarray` frames to FFmpeg Pipeline for transcoding them into chunked-encoded media segments of
|
||||
streaming formats such as MPEG-DASH and HLS.
|
||||
@@ -463,7 +467,7 @@ class StreamGear:
|
||||
)
|
||||
raise ValueError # for testing purpose only
|
||||
|
||||
def transcode_source(self):
|
||||
def transcode_source(self) -> None:
|
||||
"""
|
||||
Transcodes an entire video file _(with or without audio)_ into chunked-encoded media segments of
|
||||
streaming formats such as MPEG-DASH and HLS.
|
||||
@@ -583,8 +587,7 @@ class StreamGear:
|
||||
logger.info("Input video's audio source will be used for this run.")
|
||||
# assign audio codec
|
||||
output_parameters["-acodec"] = self.__params.pop(
|
||||
"-acodec",
|
||||
"aac" if ("-streams" in self.__params) else "copy",
|
||||
"-acodec", "aac" if ("-streams" in self.__params) else "copy",
|
||||
)
|
||||
if output_parameters["-acodec"] != "copy":
|
||||
output_parameters["a_bitrate"] = bitrate # temporary handler
|
||||
@@ -741,13 +744,11 @@ class StreamGear:
|
||||
# process given dash/hls stream and return it
|
||||
if self.__format == "dash":
|
||||
processed_params = self.__generate_dash_stream(
|
||||
input_params=input_params,
|
||||
output_params=output_params,
|
||||
input_params=input_params, output_params=output_params,
|
||||
)
|
||||
else:
|
||||
processed_params = self.__generate_hls_stream(
|
||||
input_params=input_params,
|
||||
output_params=output_params,
|
||||
input_params=input_params, output_params=output_params,
|
||||
)
|
||||
return processed_params
|
||||
|
||||
@@ -843,11 +844,11 @@ class StreamGear:
|
||||
# otherwise calculate video-bitrate
|
||||
fps = stream.pop("-framerate", 0.0)
|
||||
if dimensions and isinstance(fps, (float, int)) and fps > 0:
|
||||
intermediate_dict["-b:v:{}".format(stream_count)] = (
|
||||
"{}k".format(
|
||||
get_video_bitrate(
|
||||
int(dimensions[0]), int(dimensions[1]), fps, bpp
|
||||
)
|
||||
intermediate_dict[
|
||||
"-b:v:{}".format(stream_count)
|
||||
] = "{}k".format(
|
||||
get_video_bitrate(
|
||||
int(dimensions[0]), int(dimensions[1]), fps, bpp
|
||||
)
|
||||
)
|
||||
else:
|
||||
@@ -862,16 +863,16 @@ class StreamGear:
|
||||
audio_bitrate = stream.pop("-audio_bitrate", "")
|
||||
if "-acodec" in output_params:
|
||||
if audio_bitrate and audio_bitrate.endswith(("k", "M")):
|
||||
intermediate_dict["-b:a:{}".format(stream_count)] = (
|
||||
audio_bitrate
|
||||
)
|
||||
intermediate_dict[
|
||||
"-b:a:{}".format(stream_count)
|
||||
] = audio_bitrate
|
||||
else:
|
||||
# otherwise calculate audio-bitrate
|
||||
if dimensions:
|
||||
aspect_width = int(dimensions[0])
|
||||
intermediate_dict["-b:a:{}".format(stream_count)] = (
|
||||
"{}k".format(128 if (aspect_width > 800) else 96)
|
||||
)
|
||||
intermediate_dict[
|
||||
"-b:a:{}".format(stream_count)
|
||||
] = "{}k".format(128 if (aspect_width > 800) else 96)
|
||||
# update output parameters
|
||||
output_params.update(intermediate_dict)
|
||||
# clear intermediate dict
|
||||
@@ -948,9 +949,9 @@ class StreamGear:
|
||||
else:
|
||||
# otherwise reset to default
|
||||
logger.warning("Invalid `-hls_flags` value skipped!")
|
||||
output_params["-hls_flags"] = (
|
||||
"delete_segments+discont_start+split_by_time"
|
||||
)
|
||||
output_params[
|
||||
"-hls_flags"
|
||||
] = "delete_segments+discont_start+split_by_time"
|
||||
# clean everything at exit?
|
||||
remove_at_exit = self.__params.pop("-remove_at_exit", 0)
|
||||
if isinstance(remove_at_exit, int) and remove_at_exit in [
|
||||
@@ -1230,7 +1231,7 @@ class StreamGear:
|
||||
@deprecated(
|
||||
message="The `terminate()` method will be removed in the next release. Kindly use `close()` method instead."
|
||||
)
|
||||
def terminate(self):
|
||||
def terminate(self) -> None:
|
||||
"""
|
||||
!!! warning "[DEPRECATION NOTICE]: This method is now deprecated and will be removed in a future release."
|
||||
|
||||
@@ -1241,7 +1242,7 @@ class StreamGear:
|
||||
|
||||
self.close()
|
||||
|
||||
def close(self):
|
||||
def close(self) -> None:
|
||||
"""
|
||||
Safely terminates various StreamGear process.
|
||||
"""
|
||||
|
||||
+20
-15
@@ -20,6 +20,8 @@ limitations under the License.
|
||||
|
||||
# import the necessary packages
|
||||
import logging as log
|
||||
from typing import TypeVar, Tuple, Union, Any
|
||||
from numpy.typing import NDArray
|
||||
|
||||
# import helper packages
|
||||
from .helper import logger_handler, logcurr_vidgear_ver
|
||||
@@ -33,6 +35,9 @@ logger.propagate = False
|
||||
logger.addHandler(logger_handler())
|
||||
logger.setLevel(log.DEBUG)
|
||||
|
||||
# Type variable `T` representing class `VideoGear`.
|
||||
T = TypeVar("T", bound="VideoGear")
|
||||
|
||||
|
||||
class VideoGear:
|
||||
"""
|
||||
@@ -48,21 +53,21 @@ class VideoGear:
|
||||
def __init__(
|
||||
self,
|
||||
# VideoGear parameters
|
||||
enablePiCamera=False,
|
||||
stabilize=False,
|
||||
enablePiCamera: bool = False,
|
||||
stabilize: bool = False,
|
||||
# PiGear parameters
|
||||
camera_num=0,
|
||||
resolution=(640, 480),
|
||||
framerate=30,
|
||||
camera_num: int = 0,
|
||||
resolution: Tuple[int, int] = (640, 480),
|
||||
framerate: Union[int, float] = 30,
|
||||
# CamGear parameters
|
||||
source=0,
|
||||
stream_mode=False,
|
||||
backend=0,
|
||||
source: Any = 0,
|
||||
stream_mode: bool = False,
|
||||
backend: int = 0,
|
||||
# common parameters
|
||||
time_delay=0,
|
||||
colorspace=None,
|
||||
logging=False,
|
||||
**options
|
||||
time_delay: int = 0,
|
||||
colorspace: str = None,
|
||||
logging: bool = False,
|
||||
**options: dict
|
||||
):
|
||||
"""
|
||||
This constructor method initializes the object state and attributes of the VideoGear class.
|
||||
@@ -153,7 +158,7 @@ class VideoGear:
|
||||
# initialize framerate variable
|
||||
self.framerate = self.stream.framerate
|
||||
|
||||
def start(self):
|
||||
def start(self) -> T:
|
||||
"""
|
||||
Launches the internal *Threaded Frames Extractor* daemon of API in use.
|
||||
|
||||
@@ -162,7 +167,7 @@ class VideoGear:
|
||||
self.stream.start()
|
||||
return self
|
||||
|
||||
def read(self):
|
||||
def read(self) -> NDArray:
|
||||
"""
|
||||
Extracts frames synchronously from selected API's monitored deque, while maintaining a fixed-length frame
|
||||
buffer in the memory, and blocks the thread if the deque is full.
|
||||
@@ -178,7 +183,7 @@ class VideoGear:
|
||||
return frame_stab
|
||||
return self.stream.read()
|
||||
|
||||
def stop(self):
|
||||
def stop(self) -> None:
|
||||
"""
|
||||
Safely terminates the thread, and release the respective multi-threaded resources.
|
||||
"""
|
||||
|
||||
@@ -26,6 +26,8 @@ import platform
|
||||
import pathlib
|
||||
import logging as log
|
||||
import subprocess as sp
|
||||
from typing import List
|
||||
from numpy.typing import NDArray
|
||||
|
||||
# import helper packages
|
||||
from .helper import (
|
||||
@@ -77,11 +79,11 @@ class WriteGear:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
output="",
|
||||
compression_mode=True,
|
||||
custom_ffmpeg="",
|
||||
logging=False,
|
||||
**output_params
|
||||
output: str = "",
|
||||
compression_mode: bool = True,
|
||||
custom_ffmpeg: str = "",
|
||||
logging: bool = False,
|
||||
**output_params: dict
|
||||
):
|
||||
"""
|
||||
This constructor method initializes the object state and attributes of the WriteGear class.
|
||||
@@ -369,7 +371,7 @@ class WriteGear:
|
||||
"Compression Mode is disabled, Activating OpenCV built-in Writer!"
|
||||
)
|
||||
|
||||
def write(self, frame, rgb_mode=False):
|
||||
def write(self, frame: NDArray, rgb_mode: bool = False) -> None:
|
||||
"""
|
||||
Pipelines `ndarray` frames to respective API _(**FFmpeg** in Compression Mode & **OpenCV's VideoWriter API** in Non-Compression Mode)_.
|
||||
|
||||
@@ -634,7 +636,7 @@ class WriteGear:
|
||||
"""
|
||||
self.close()
|
||||
|
||||
def execute_ffmpeg_cmd(self, command=None):
|
||||
def execute_ffmpeg_cmd(self, command: List = None) -> None:
|
||||
"""
|
||||
|
||||
Executes user-defined FFmpeg Terminal command, formatted as a python list(in Compression Mode only).
|
||||
@@ -758,7 +760,7 @@ class WriteGear:
|
||||
self.__process.isOpened()
|
||||
), "[WriteGear:ERROR] :: Failed to initialize OpenCV Writer!"
|
||||
|
||||
def close(self):
|
||||
def close(self) -> None:
|
||||
"""
|
||||
Safely terminates various WriteGear process.
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user