🎨 Added type annotations across multiple modules (Fixes #376) [#414] by @abhisheksuran

-  Added type annotations to improve code clarity and enable static type checking.
- 🧑‍💻 Annotated all core and asyncio classes, except internal methods.
- 🩹 Renamed type variables for consistency.

CI/CD:
- 💚 Updated codecov-action to `v4.5.0` to fix tokenless upload issues in GitHub Actions CI.
- 💚 Replaced `codecov-uploader` with `codecovcli` to resolve "Bad CPU type in executable" bug in Azure Pipelines CI.

---------

Co-authored-by: abhiTronix <abhi.una12@gmail.com>
This commit is contained in:
Abhishek Suran
2024-08-11 09:49:20 +05:30
committed by GitHub
parent bba3211731
commit 31550b6480
15 changed files with 291 additions and 230 deletions
+1 -1
View File
@@ -90,7 +90,7 @@ jobs:
- name: Display exit code
run: echo "Exit code was = $EXIT_CODE"
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v4.5.0
with:
name: ${{ matrix.python-version }}
token: ${{ secrets.CODECOV_TOKEN }}
+4 -4
View File
@@ -80,14 +80,14 @@ steps:
- bash: |
echo "Exit Code was: $(exit_code)"
curl https://keybase.io/codecovsecurity/pgp_keys.asc | gpg --no-default-keyring --keyring trustedkeys.gpg --import
curl -Os https://uploader.codecov.io/latest/macos/codecov
curl -Os https://uploader.codecov.io/latest/macos/codecov.SHA256SUM
curl -Os https://uploader.codecov.io/latest/macos/codecov.SHA256SUM.sig
curl -Os https://cli.codecov.io/latest/macos/codecov
curl -Os https://cli.codecov.io/latest/macos/codecov.SHA256SUM
curl -Os https://cli.codecov.io/latest/macos/codecov.SHA256SUM.sig
gpgv codecov.SHA256SUM.sig codecov.SHA256SUM
shasum -a 256 -c codecov.SHA256SUM
chmod +x codecov
if [ "$(exit_code)" != "124" ]; then
./codecov -t $CODECOV_TOKEN -f coverage.xml -C $(Build.SourceVersion) -B $(Build.SourceBranch) -b $(Build.BuildNumber);
./codecov do-upload -t $CODECOV_TOKEN -f coverage.xml -C $(Build.SourceVersion) -B $(Build.SourceBranch) -b $(Build.BuildNumber);
else
echo "Timeout test - Skipped Codecov!";
fi
+21 -9
View File
@@ -30,6 +30,8 @@ from tqdm import tqdm
from colorlog import ColoredFormatter
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from numpy.typing import NDArray
from typing import Union
# import helper packages
from ..helper import logger_handler, mkdir_safe
@@ -64,7 +66,9 @@ class TimeoutHTTPAdapter(HTTPAdapter):
return super().send(request, **kwargs)
def create_blank_frame(frame=None, text="", logging=False):
def create_blank_frame(
frame: NDArray = None, text: str = "", logging: bool = False
) -> NDArray:
"""
## create_blank_frame
@@ -103,7 +107,11 @@ def create_blank_frame(frame=None, text="", logging=False):
return blank_frame
async def reducer(frame=None, percentage=0, interpolation=cv2.INTER_LANCZOS4):
async def reducer(
frame: NDArray = None,
percentage: Union[int, float] = 0,
interpolation: int = cv2.INTER_LANCZOS4,
) -> NDArray:
"""
## reducer
@@ -144,7 +152,12 @@ async def reducer(frame=None, percentage=0, interpolation=cv2.INTER_LANCZOS4):
return cv2.resize(frame, dimensions, interpolation=interpolation)
def generate_webdata(path, c_name="webgear", overwrite_default=False, logging=False):
def generate_webdata(
path: str,
c_name: str = "webgear",
overwrite_default: bool = False,
logging: bool = False,
) -> str:
"""
## generate_webdata
@@ -209,10 +222,7 @@ def generate_webdata(path, c_name="webgear", overwrite_default=False, logging=Fa
css_static_dir, c_name=c_name, files=["custom.css"], logging=logging
)
download_webdata(
js_static_dir,
c_name=c_name,
files=["custom.js"],
logging=logging,
js_static_dir, c_name=c_name, files=["custom.js"], logging=logging,
)
download_webdata(
favicon_dir, c_name=c_name, files=["favicon-32x32.png"], logging=logging
@@ -225,7 +235,9 @@ def generate_webdata(path, c_name="webgear", overwrite_default=False, logging=Fa
return path
def download_webdata(path, c_name="webgear", files=[], logging=False):
def download_webdata(
path: str, c_name: str = "webgear", files: list = [], logging: bool = False
) -> str:
"""
## download_webdata
@@ -326,7 +338,7 @@ def download_webdata(path, c_name="webgear", files=[], logging=False):
)
def validate_webdata(path, files=[], logging=False):
def validate_webdata(path: str, files: list = [], logging: bool = False) -> bool:
"""
## validate_auth_keys
+28 -24
View File
@@ -29,6 +29,8 @@ import string
import secrets
import platform
from collections import deque
from typing import Any, Tuple, AsyncGenerator, Union, TypeVar
from numpy.typing import NDArray
# import helper packages
from ..helper import logger_handler, import_dependency_safe, logcurr_vidgear_ver
@@ -50,6 +52,9 @@ logger.propagate = False
logger.addHandler(logger_handler())
logger.setLevel(log.DEBUG)
# Type variable `T` representing class `NetGear_Async`.
T = TypeVar("T", bound="NetGear_Async")
class NetGear_Async:
"""
@@ -82,26 +87,26 @@ class NetGear_Async:
def __init__(
self,
# NetGear_Async parameters
address=None,
port=None,
protocol="tcp",
pattern=0,
receive_mode=False,
timeout=0.0,
address: str = None,
port: str = None,
protocol: str = "tcp",
pattern: int = 0,
receive_mode: bool = False,
timeout: Union[int, float] = 0.0,
# Videogear parameters
enablePiCamera=False,
stabilize=False,
source=None,
camera_num=0,
stream_mode=False,
backend=0,
colorspace=None,
resolution=(640, 480),
framerate=25,
time_delay=0,
enablePiCamera: bool = False,
stabilize: bool = False,
source: Any = None,
camera_num: int = 0,
stream_mode: bool = False,
backend: int = 0,
colorspace: str = None,
resolution: Tuple[int, int] = (640, 480),
framerate: Union[int, float] = 25,
time_delay: int = 0,
# common parameters
logging=False,
**options
logging: bool = False,
**options: dict
):
"""
This constructor method initializes the object state and attributes of the NetGear_Async class.
@@ -320,7 +325,7 @@ class NetGear_Async:
# create asyncio queue if bidirectional mode activated
self.__queue = asyncio.Queue() if self.__bi_mode else None
def launch(self):
def launch(self) -> T:
"""
Launches an asynchronous generators and loop executors for respective task.
"""
@@ -499,7 +504,7 @@ class NetGear_Async:
)
self.__logging and logger.debug(recv_confirmation)
async def recv_generator(self):
async def recv_generator(self) -> AsyncGenerator[Tuple[Any, NDArray], NDArray]:
"""
A default Asynchronous Frame Generator for NetGear_Async's Receiver-end.
"""
@@ -613,8 +618,7 @@ class NetGear_Async:
# create return type dict without data
rettype_dict = dict(
return_type=(type(return_data).__name__),
return_data=None,
return_type=(type(return_data).__name__), return_data=None,
)
# encode it
rettype_enc = msgpack.packb(rettype_dict)
@@ -677,7 +681,7 @@ class NetGear_Async:
# sleep for sometime
await asyncio.sleep(0)
async def transceive_data(self, data=None):
async def transceive_data(self, data: Any = None) -> Any:
"""
Bidirectional Mode exclusive method to Transmit data _(in Receive mode)_ and Receive data _(in Send mode)_.
@@ -756,7 +760,7 @@ class NetGear_Async:
)
)
def close(self, skip_loop=False):
def close(self, skip_loop: bool = False) -> None:
"""
Terminates all NetGear_Async Asynchronous processes gracefully.
+15 -14
View File
@@ -26,6 +26,7 @@ import contextlib
import numpy as np
import logging as log
from os.path import expanduser
from typing import Any, Tuple, Union
# import helper packages
from .helper import (
@@ -79,18 +80,18 @@ class WebGear:
def __init__(
self,
enablePiCamera=False,
stabilize=False,
source=None,
camera_num=0,
stream_mode=False,
backend=0,
colorspace=None,
resolution=(640, 480),
framerate=25,
logging=False,
time_delay=0,
**options
enablePiCamera: bool = False,
stabilize: bool = False,
source: Any = None,
camera_num: int = 0,
stream_mode: bool = False,
backend: int = 0,
colorspace: str = None,
resolution: Tuple[int, int] = (640, 480),
framerate: Union[int, float] = 25,
logging: bool = False,
time_delay: int = 0,
**options: dict
):
"""
This constructor method initializes the object state and attributes of the WebGear class.
@@ -378,7 +379,7 @@ class WebGear:
# keeps check if producer loop should be running
self.__isrunning = True
def __call__(self):
def __call__(self) -> Starlette:
"""
Implements a custom Callable method for WebGear application.
"""
@@ -554,7 +555,7 @@ class WebGear:
# close Video Server
self.shutdown()
def shutdown(self):
def shutdown(self) -> None:
"""
Implements a Callable to be run on application shutdown
"""
+36 -34
View File
@@ -26,6 +26,7 @@ import fractions
import asyncio
import logging as log
from os.path import expanduser
from typing import Any, Union, Tuple
# import helper packages
from .helper import (
@@ -77,6 +78,7 @@ if not (aiortc is None):
from aiortc.contrib.media import MediaRelay
from aiortc.mediastreams import MediaStreamError
from av import VideoFrame # aiortc dependency
from av.frame import Frame as AVFrame # imported for type annotation
class RTC_VideoServer(VideoStreamTrack):
"""
@@ -86,18 +88,18 @@ if not (aiortc is None):
def __init__(
self,
enablePiCamera=False,
stabilize=False,
source=None,
camera_num=0,
stream_mode=False,
backend=0,
colorspace=None,
resolution=(640, 480),
framerate=25,
logging=False,
time_delay=0,
**options
enablePiCamera: bool = False,
stabilize: bool = False,
source: Any = None,
camera_num: int = 0,
stream_mode: bool = False,
backend: int = 0,
colorspace: str = None,
resolution: Tuple[int, int] = (640, 480),
framerate: Union[int, float] = 25,
logging: bool = False,
time_delay: int = 0,
**options: dict
):
"""
This constructor method initializes the object state and attributes of the RTC_VideoServer class.
@@ -194,7 +196,7 @@ if not (aiortc is None):
# handles reset signal
self.__reset_enabled = False
def launch(self):
def launch(self) -> None:
"""
Launches VideoGear stream
"""
@@ -204,7 +206,7 @@ if not (aiortc is None):
if hasattr(self.__stream, "start") and callable(self.__stream.start):
self.__stream.start()
async def next_timestamp(self):
async def next_timestamp(self) -> Tuple[int, fractions.Fraction]:
"""
VideoStreamTrack internal method for generating accurate timestamp.
"""
@@ -229,7 +231,7 @@ if not (aiortc is None):
self.is_running = True
return self._timestamp, VIDEO_TIME_BASE
async def recv(self):
async def recv(self) -> AVFrame:
"""
A coroutine function that yields `av.frame.Frame`.
"""
@@ -296,14 +298,14 @@ if not (aiortc is None):
# return `av.frame.Frame`
return frame
async def reset(self):
async def reset(self) -> None:
"""
Resets timestamp clock
"""
self.__reset_enabled = True
self.is_running = False
def terminate(self):
def terminate(self) -> None:
"""
Gracefully terminates VideoGear stream
"""
@@ -340,18 +342,18 @@ class WebGear_RTC:
def __init__(
self,
enablePiCamera=False,
stabilize=False,
source=None,
camera_num=0,
stream_mode=False,
backend=0,
colorspace=None,
resolution=(640, 480),
framerate=25,
logging=False,
time_delay=0,
**options
enablePiCamera: bool = False,
stabilize: bool = False,
source: Any = None,
camera_num: int = 0,
stream_mode: bool = False,
backend: int = 0,
colorspace: str = None,
resolution: Tuple[int, int] = (640, 480),
framerate: Union[int, float] = 25,
logging: bool = False,
time_delay: int = 0,
**options: dict
):
"""
This constructor method initializes the object state and attributes of the WebGear_RTC class.
@@ -420,9 +422,9 @@ class WebGear_RTC:
if isinstance(value, bool):
if value:
self.__relay = MediaRelay()
options["enable_infinite_frames"] = (
True # enforce infinite frames
)
options[
"enable_infinite_frames"
] = True # enforce infinite frames
logger.critical(
"Enabled live broadcasting for Peer connection(s)."
)
@@ -506,7 +508,7 @@ class WebGear_RTC:
# collects peer RTC connections
self.__pcs = set()
def __call__(self):
def __call__(self) -> Starlette:
"""
Implements a custom Callable method for WebGear_RTC application.
"""
@@ -654,7 +656,7 @@ class WebGear_RTC:
await asyncio.gather(*coros)
self.__pcs.clear()
def shutdown(self):
def shutdown(self) -> None:
"""
Gracefully shutdown video-server
"""
+19 -11
View File
@@ -24,6 +24,8 @@ import time
import queue
import logging as log
from threading import Thread, Event
from typing import TypeVar, Optional, Any
from numpy.typing import NDArray
# import helper packages
from .helper import (
@@ -57,7 +59,9 @@ if not (yt_dlp is None):
options (dict): provides ability to alter yt-dlp backend params.
"""
def __init__(self, source_url, logging=False, **stream_params):
def __init__(
self, source_url: str, logging: bool = False, **stream_params: dict
):
# initialize global params
self.__logging = logging
self.is_livestream = False
@@ -198,6 +202,10 @@ if not (yt_dlp is None):
return streams
# Type variable `T` representing class `CamGear`.
T = TypeVar("T", bound="CamGear")
class CamGear:
"""
CamGear supports a diverse range of video streams which can handle/control video stream almost any IP/USB Cameras, multimedia video file format (upto 4k tested),
@@ -212,13 +220,13 @@ class CamGear:
def __init__(
self,
source=0,
stream_mode=False,
backend=0,
colorspace=None,
logging=False,
time_delay=0,
**options
source: Any = 0,
stream_mode: bool = False,
backend: int = 0,
colorspace: str = None,
logging: bool = False,
time_delay: int = 0,
**options: dict
):
"""
This constructor method initializes the object state and attributes of the CamGear class.
@@ -414,7 +422,7 @@ class CamGear:
# initialize stream read flag event
self.__stream_read = Event()
def start(self):
def start(self) -> T:
"""
Launches the internal *Threaded Frames Extractor* daemon.
@@ -487,7 +495,7 @@ class CamGear:
# release resources
self.stream.release()
def read(self):
def read(self) -> Optional[NDArray]:
"""
Extracts frames synchronously from monitored queue, while maintaining a fixed-length frame buffer in the memory,
and blocks the thread if the queue is full.
@@ -505,7 +513,7 @@ class CamGear:
else None
)
def stop(self):
def stop(self) -> None:
"""
Safely terminates the thread, and release the multi-threaded resources.
"""
+54 -38
View File
@@ -44,6 +44,8 @@ from colorlog import ColoredFormatter
from pkg_resources import parse_version
from requests.adapters import HTTPAdapter, Retry
from ..version import __version__
from typing import List, Dict, Optional, Union
from numpy.typing import NDArray
def logger_handler():
@@ -193,11 +195,7 @@ def deprecated(parameter=None, message=None, stacklevel=2):
def import_dependency_safe(
name,
error="raise",
pkg_name=None,
min_version=None,
custom_message=None,
name, error="raise", pkg_name=None, min_version=None, custom_message=None,
):
"""
## import_dependency_safe
@@ -313,7 +311,7 @@ class TimeoutHTTPAdapter(HTTPAdapter):
return super().send(request, **kwargs)
def check_CV_version():
def check_CV_version() -> int:
"""
## check_CV_version
@@ -325,7 +323,7 @@ def check_CV_version():
return 3
def check_open_port(address, port=22):
def check_open_port(address: str, port: int = 22) -> bool:
"""
## check_open_port
@@ -346,7 +344,9 @@ def check_open_port(address, port=22):
return False
def check_WriteAccess(path, is_windows=False, logging=False):
def check_WriteAccess(
path: str, is_windows: bool = False, logging: bool = False
) -> bool:
"""
## check_WriteAccess
@@ -403,7 +403,7 @@ def check_WriteAccess(path, is_windows=False, logging=False):
return write_accessible
def check_gstreamer_support(logging=False):
def check_gstreamer_support(logging: bool = False) -> bool:
"""
## check_gstreamer_support
@@ -429,7 +429,7 @@ def check_gstreamer_support(logging=False):
return False
def get_supported_resolution(value, logging=False):
def get_supported_resolution(value: str, logging=False) -> str:
"""
## get_supported_resolution
@@ -475,7 +475,7 @@ def get_supported_resolution(value, logging=False):
return stream_resolution
def dimensions_to_resolutions(value):
def dimensions_to_resolutions(value: List[str]) -> List[str]:
"""
## dimensions_to_resolutions
@@ -502,7 +502,7 @@ def dimensions_to_resolutions(value):
)
def get_supported_vencoders(path):
def get_supported_vencoders(path: str) -> List[str]:
"""
## get_supported_vencoders
@@ -529,7 +529,7 @@ def get_supported_vencoders(path):
return [[s for s in o.split(" ")][-1] for o in outputs]
def get_supported_demuxers(path):
def get_supported_demuxers(path: str) -> List[str]:
"""
## get_supported_demuxers
@@ -551,7 +551,7 @@ def get_supported_demuxers(path):
return [o.strip() if not ("," in o) else o.split(",")[-1].strip() for o in outputs]
def get_supported_pixfmts(path):
def get_supported_pixfmts(path: str) -> List[str]:
"""
## get_supported_pixfmts
@@ -579,7 +579,7 @@ def get_supported_pixfmts(path):
return [[s for s in o[0].split(" ")][-1] for o in outputs if len(o) == 3]
def is_valid_url(path, url=None, logging=False):
def is_valid_url(path: str, url: str = None, logging: bool = False) -> bool:
"""
## is_valid_url
@@ -620,7 +620,9 @@ def is_valid_url(path, url=None, logging=False):
return False
def validate_video(path, video_path=None, logging=False):
def validate_video(
path: str, video_path: str = None, logging: bool = False
) -> Optional[dict]:
"""
## validate_video
@@ -658,7 +660,9 @@ def validate_video(path, video_path=None, logging=False):
return result if (len(result) == 2) else None
def create_blank_frame(frame=None, text="", logging=False):
def create_blank_frame(
frame: NDArray = None, text: str = "", logging: bool = False
) -> NDArray:
"""
## create_blank_frame
@@ -696,7 +700,7 @@ def create_blank_frame(frame=None, text="", logging=False):
return blank_frame
def extract_time(value):
def extract_time(value: str) -> int:
"""
## extract_time
@@ -715,7 +719,7 @@ def extract_time(value):
t_duration = re.findall(r"\d{2}:\d{2}:\d{2}(?:\.\d{2})?", stripped_data)
return (
sum(
float(x) * 60**i
float(x) * 60 ** i
for i, x in enumerate(reversed(t_duration[0].split(":")))
)
if t_duration
@@ -723,7 +727,7 @@ def extract_time(value):
)
def validate_audio(path, source=None):
def validate_audio(path: str, source: Union[str, list] = None) -> str:
"""
## validate_audio
@@ -794,7 +798,7 @@ def validate_audio(path, source=None):
return ""
def get_audio_bitrate(samplerate, channels, bit_depth):
def get_audio_bitrate(samplerate: int, channels: int, bit_depth: float) -> int:
"""
## get_audio_bitrate
@@ -810,7 +814,7 @@ def get_audio_bitrate(samplerate, channels, bit_depth):
return round((samplerate * channels * bit_depth) / 1000)
def get_video_bitrate(width, height, fps, bpp):
def get_video_bitrate(width: int, height: int, fps: float, bpp: float) -> int:
"""
## get_video_bitrate
@@ -827,7 +831,7 @@ def get_video_bitrate(width, height, fps, bpp):
return round((width * height * bpp * fps) / 1000)
def delete_file_safe(file_path):
def delete_file_safe(file_path: str) -> None:
"""
## delete_ext_safe
@@ -843,7 +847,7 @@ def delete_file_safe(file_path):
logger.exception(str(e))
def mkdir_safe(dir_path, logging=False):
def mkdir_safe(dir_path: str, logging: bool = False) -> None:
"""
## mkdir_safe
@@ -862,7 +866,9 @@ def mkdir_safe(dir_path, logging=False):
raise
def delete_ext_safe(dir_path, extensions=[], logging=False):
def delete_ext_safe(
dir_path: str, extensions: list = [], logging: bool = False
) -> None:
"""
## delete_ext_safe
@@ -898,7 +904,7 @@ def delete_ext_safe(dir_path, extensions=[], logging=False):
logging and logger.debug("Deleted file: `{}`".format(file))
def capPropId(property, logging=True):
def capPropId(property: str, logging: bool = True) -> int:
"""
## capPropId
@@ -920,7 +926,7 @@ def capPropId(property, logging=True):
return integer_value
def retrieve_best_interpolation(interpolations):
def retrieve_best_interpolation(interpolations: list) -> Optional[int]:
"""
## retrieve_best_interpolation
Retrieves best interpolation for resizing
@@ -937,7 +943,11 @@ def retrieve_best_interpolation(interpolations):
return None
def reducer(frame=None, percentage=0, interpolation=cv2.INTER_LANCZOS4):
def reducer(
frame: NDArray = None,
percentage: Union[int, float] = 0,
interpolation: int = cv2.INTER_LANCZOS4,
) -> NDArray:
"""
## reducer
@@ -978,7 +988,7 @@ def reducer(frame=None, percentage=0, interpolation=cv2.INTER_LANCZOS4):
return cv2.resize(frame, dimensions, interpolation=interpolation)
def dict2Args(param_dict):
def dict2Args(param_dict: dict) -> list:
"""
## dict2Args
@@ -1008,8 +1018,11 @@ def dict2Args(param_dict):
def get_valid_ffmpeg_path(
custom_ffmpeg="", is_windows=False, ffmpeg_download_path="", logging=False
):
custom_ffmpeg: str = "",
is_windows: bool = False,
ffmpeg_download_path: str = "",
logging: bool = False,
) -> Union[str, bool]:
"""
## get_valid_ffmpeg_path
@@ -1100,7 +1113,9 @@ def get_valid_ffmpeg_path(
return final_path if validate_ffmpeg(final_path, logging=logging) else False
def download_ffmpeg_binaries(path, os_windows=False, os_bit=""):
def download_ffmpeg_binaries(
path: str, os_windows: bool = False, os_bit: str = ""
) -> str:
"""
## download_ffmpeg_binaries
@@ -1124,8 +1139,7 @@ def download_ffmpeg_binaries(path, os_windows=False, os_bit=""):
os.path.abspath(path), "ffmpeg-static-{}-gpl.zip".format(os_bit)
)
file_path = os.path.join(
os.path.abspath(path),
"ffmpeg-static-{}-gpl/bin/ffmpeg.exe".format(os_bit),
os.path.abspath(path), "ffmpeg-static-{}-gpl/bin/ffmpeg.exe".format(os_bit),
)
base_path, _ = os.path.split(file_name) # extract file base path
# check if file already exists
@@ -1185,7 +1199,7 @@ def download_ffmpeg_binaries(path, os_windows=False, os_bit=""):
return final_path
def validate_ffmpeg(path, logging=False):
def validate_ffmpeg(path: str, logging: bool = False) -> bool:
"""
## validate_ffmpeg
@@ -1215,7 +1229,7 @@ def validate_ffmpeg(path, logging=False):
return True
def check_output(*args, **kwargs):
def check_output(*args: Union[list, tuple], **kwargs: dict) -> bytes:
"""
## check_output
@@ -1254,7 +1268,9 @@ def check_output(*args, **kwargs):
return output if not (retrieve_stderr) else stderr
def generate_auth_certificates(path, overwrite=False, logging=False):
def generate_auth_certificates(
path: str, overwrite: bool = False, logging: bool = False
) -> tuple:
"""
## generate_auth_certificates
@@ -1366,7 +1382,7 @@ def generate_auth_certificates(path, overwrite=False, logging=False):
return (keys_dir, secret_keys_dir, public_keys_dir)
def validate_auth_keys(path, extension):
def validate_auth_keys(path: str, extension: str) -> bool:
"""
## validate_auth_keys
+12 -10
View File
@@ -30,6 +30,8 @@ import logging as log
from threading import Thread
from collections import deque
from os.path import expanduser
from numpy.typing import NDArray
from typing import Optional, Any
# import helper packages
from .helper import (
@@ -114,13 +116,13 @@ class NetGear:
def __init__(
self,
address=None,
port=None,
protocol=None,
pattern=0,
receive_mode=False,
logging=False,
**options
address: str = None,
port: str = None,
protocol: str = None,
pattern: int = 0,
receive_mode: bool = False,
logging: bool = False,
**options: dict
):
"""
This constructor method initializes the object state and attributes of the NetGear class.
@@ -1209,7 +1211,7 @@ class NetGear:
# otherwise append recovered frame to queue
self.__queue.append(frame)
def recv(self, return_data=None):
def recv(self, return_data=None) -> Optional[NDArray]:
"""
A Receiver end method, that extracts received frames synchronously from monitored deque, while maintaining a
fixed-length frame buffer in the memory, and blocks the thread if the deque is full.
@@ -1246,7 +1248,7 @@ class NetGear:
# otherwise return NoneType
return None
def send(self, frame, message=None):
def send(self, frame: NDArray, message: Any = None) -> Optional[Any]:
"""
A Server end method, that sends the data and frames over the network to Client(s).
@@ -1484,7 +1486,7 @@ class NetGear:
# log confirmation
self.__logging and logger.debug(recv_confirmation)
def close(self, kill=False):
def close(self, kill: bool = False) -> None:
"""
Safely terminates the threads, and NetGear resources.
+19 -21
View File
@@ -25,6 +25,8 @@ import os
import time
import logging as log
from threading import Thread
from typing import Tuple, Union, TypeVar
from numpy.typing import NDArray
# import helper packages
from .helper import (
@@ -54,6 +56,9 @@ logger.addHandler(logger_handler())
logger.setLevel(log.DEBUG)
PIGear = TypeVar("PIGear", bound="PiGear")
class PiGear:
"""
PiGear implements a seamless and robust wrapper around the [picamera2](https://github.com/raspberrypi/picamera2) python library, simplifying integration with minimal code changes and ensuring a
@@ -84,13 +89,13 @@ class PiGear:
def __init__(
self,
camera_num=0,
resolution=(640, 480),
framerate=30,
colorspace=None,
logging=False,
time_delay=0,
**options
camera_num: int = 0,
resolution: Tuple[int, int] = (640, 480),
framerate: Union[int, float] = 30,
colorspace: str = None,
logging: bool = False,
time_delay: int = 0,
**options: dict
):
"""
This constructor method initializes the object state and attributes of the PiGear class.
@@ -302,11 +307,7 @@ class PiGear:
# unless format is either BGR or BGRA
(
not (colorspace is None)
or options["format"]
in [
"RGB888",
"XRGB8888",
]
or options["format"] in ["RGB888", "XRGB8888",]
) and logger.warning(
"Custom Output frames `format={}` detected. It is advised to define `colorspace` parameter or handle this format manually in your code!".format(
options["format"]
@@ -336,8 +337,7 @@ class PiGear:
invalid_sensor_keys = set(list(sensor)) - set(valid_sensor)
invalid_sensor_keys and logger.warning(
"Discarding sensor properties NOT supported by current Camera Sensor: `{}`. Only supported are: (`{}`)".format(
"`, `".join(invalid_sensor_keys),
"`, `".join(valid_sensor),
"`, `".join(invalid_sensor_keys), "`, `".join(valid_sensor),
)
)
# delete all unsupported control keys
@@ -497,7 +497,7 @@ class PiGear:
# initialize termination flag
self.__terminate = False
def start(self):
def start(self) -> PIGear:
"""
Launches the internal *Threaded Frames Extractor* daemon
@@ -588,7 +588,7 @@ class PiGear:
self.__rawCapture.close()
self.__camera.close()
def read(self):
def read(self) -> NDArray:
"""
Extracts frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory,
and blocks the thread if the deque is full.
@@ -608,16 +608,14 @@ class PiGear:
# clear frame
self.frame = None
# re-raise error for debugging
error_msg = (
"[PiGear:ERROR] :: Camera Module API failure occurred: {}".format(
self.__exceptions[1]
)
error_msg = "[PiGear:ERROR] :: Camera Module API failure occurred: {}".format(
self.__exceptions[1]
)
raise RuntimeError(error_msg).with_traceback(self.__exceptions[2])
# return the frame
return self.frame
def stop(self):
def stop(self) -> None:
"""
Safely terminates the thread, and release the multi-threaded resources.
"""
+15 -6
View File
@@ -26,6 +26,8 @@ import numpy as np
import logging as log
from threading import Thread, Event
from collections import OrderedDict
from typing import TypeVar
from numpy.typing import NDArray
# import helper packages
from .helper import (
@@ -48,6 +50,9 @@ logger.propagate = False
logger.addHandler(logger_handler())
logger.setLevel(log.DEBUG)
# Type variable `T` representing class `ScreenGear`.
T = TypeVar("T", bound="ScreenGear")
class ScreenGear:
"""
@@ -61,7 +66,12 @@ class ScreenGear:
"""
def __init__(
self, monitor=None, backend=None, colorspace=None, logging=False, **options
self,
monitor: int = None,
backend: str = None,
colorspace: str = None,
logging: bool = False,
**options: dict
):
"""
This constructor method initializes the object state and attributes of the ScreenGear class.
@@ -273,7 +283,7 @@ class ScreenGear:
# initialize termination flag
self.__terminate = Event()
def start(self):
def start(self) -> T:
"""
Launches the internal *Threaded Frames Extractor* daemon
@@ -284,8 +294,7 @@ class ScreenGear:
self.__thread.start()
if self.__backend == "dxcam":
self.__capture_object.start(
target_fps=self.__target_fps,
video_mode=True,
target_fps=self.__target_fps, video_mode=True,
)
self.__logging and self.__target_fps and logger.debug(
"Targeting FPS: {}".format(self.__target_fps)
@@ -361,7 +370,7 @@ class ScreenGear:
self.__capture_object.stop()
del self.__capture_object
def read(self):
def read(self) -> NDArray:
"""
Extracts frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory,
and blocks the thread if the deque is full.
@@ -371,7 +380,7 @@ class ScreenGear:
# return the frame
return self.frame
def stop(self):
def stop(self) -> None:
"""
Safely terminates the thread, and release the resources.
"""
+8 -7
View File
@@ -25,6 +25,7 @@ import cv2
import numpy as np
import logging as log
from collections import deque
from typing import Optional
# import helper packages
from .helper import (
@@ -53,11 +54,11 @@ class Stabilizer:
def __init__(
self,
smoothing_radius=25,
border_type="black",
border_size=0,
crop_n_zoom=False,
logging=False,
smoothing_radius: int = 25,
border_type: str = "black",
border_size: int = 0,
crop_n_zoom: bool = False,
logging: bool = False,
):
"""
This constructor method initializes the object state and attributes of the Stabilizer class.
@@ -151,7 +152,7 @@ class Stabilizer:
# define normalized box filter
self.__box_filter = np.ones(smoothing_radius) / smoothing_radius
def stabilize(self, frame):
def stabilize(self, frame: np.ndarray) -> Optional[np.ndarray]:
"""
This method takes an unstabilized video frame, and returns a stabilized one.
@@ -382,7 +383,7 @@ class Stabilizer:
# finally return stabilized frame
return frame_stabilized
def clean(self):
def clean(self) -> None:
"""
Cleans Stabilizer resources
"""
+29 -28
View File
@@ -28,6 +28,7 @@ import logging as log
import subprocess as sp
from tqdm import tqdm
from collections import OrderedDict
from numpy.typing import NDArray
# import helper packages
from .helper import (
@@ -68,7 +69,12 @@ class StreamGear:
"""
def __init__(
self, output="", format="dash", custom_ffmpeg="", logging=False, **stream_params
self,
output: str = "",
format: str = "dash",
custom_ffmpeg: str = "",
logging: bool = False,
**stream_params: dict
):
"""
This constructor method initializes the object state and attributes of the StreamGear class.
@@ -353,9 +359,7 @@ class StreamGear:
elif os.path.isfile(abs_path) and self.__clear_assets:
# clear previous assets if specified
delete_ext_safe(
os.path.dirname(abs_path),
assets_exts,
logging=self.__logging,
os.path.dirname(abs_path), assets_exts, logging=self.__logging,
)
# check if path has valid file extension
assert abs_path.endswith(
@@ -403,7 +407,7 @@ class StreamGear:
parameter="rgb_mode",
message="The `rgb_mode` parameter is deprecated and will be removed in a future version. Only BGR format frames will be supported going forward.",
)
def stream(self, frame, rgb_mode=False):
def stream(self, frame: NDArray, rgb_mode: bool = False) -> None:
"""
Pipes `ndarray` frames to FFmpeg Pipeline for transcoding them into chunked-encoded media segments of
streaming formats such as MPEG-DASH and HLS.
@@ -463,7 +467,7 @@ class StreamGear:
)
raise ValueError # for testing purpose only
def transcode_source(self):
def transcode_source(self) -> None:
"""
Transcodes an entire video file _(with or without audio)_ into chunked-encoded media segments of
streaming formats such as MPEG-DASH and HLS.
@@ -583,8 +587,7 @@ class StreamGear:
logger.info("Input video's audio source will be used for this run.")
# assign audio codec
output_parameters["-acodec"] = self.__params.pop(
"-acodec",
"aac" if ("-streams" in self.__params) else "copy",
"-acodec", "aac" if ("-streams" in self.__params) else "copy",
)
if output_parameters["-acodec"] != "copy":
output_parameters["a_bitrate"] = bitrate # temporary handler
@@ -741,13 +744,11 @@ class StreamGear:
# process given dash/hls stream and return it
if self.__format == "dash":
processed_params = self.__generate_dash_stream(
input_params=input_params,
output_params=output_params,
input_params=input_params, output_params=output_params,
)
else:
processed_params = self.__generate_hls_stream(
input_params=input_params,
output_params=output_params,
input_params=input_params, output_params=output_params,
)
return processed_params
@@ -843,11 +844,11 @@ class StreamGear:
# otherwise calculate video-bitrate
fps = stream.pop("-framerate", 0.0)
if dimensions and isinstance(fps, (float, int)) and fps > 0:
intermediate_dict["-b:v:{}".format(stream_count)] = (
"{}k".format(
get_video_bitrate(
int(dimensions[0]), int(dimensions[1]), fps, bpp
)
intermediate_dict[
"-b:v:{}".format(stream_count)
] = "{}k".format(
get_video_bitrate(
int(dimensions[0]), int(dimensions[1]), fps, bpp
)
)
else:
@@ -862,16 +863,16 @@ class StreamGear:
audio_bitrate = stream.pop("-audio_bitrate", "")
if "-acodec" in output_params:
if audio_bitrate and audio_bitrate.endswith(("k", "M")):
intermediate_dict["-b:a:{}".format(stream_count)] = (
audio_bitrate
)
intermediate_dict[
"-b:a:{}".format(stream_count)
] = audio_bitrate
else:
# otherwise calculate audio-bitrate
if dimensions:
aspect_width = int(dimensions[0])
intermediate_dict["-b:a:{}".format(stream_count)] = (
"{}k".format(128 if (aspect_width > 800) else 96)
)
intermediate_dict[
"-b:a:{}".format(stream_count)
] = "{}k".format(128 if (aspect_width > 800) else 96)
# update output parameters
output_params.update(intermediate_dict)
# clear intermediate dict
@@ -948,9 +949,9 @@ class StreamGear:
else:
# otherwise reset to default
logger.warning("Invalid `-hls_flags` value skipped!")
output_params["-hls_flags"] = (
"delete_segments+discont_start+split_by_time"
)
output_params[
"-hls_flags"
] = "delete_segments+discont_start+split_by_time"
# clean everything at exit?
remove_at_exit = self.__params.pop("-remove_at_exit", 0)
if isinstance(remove_at_exit, int) and remove_at_exit in [
@@ -1230,7 +1231,7 @@ class StreamGear:
@deprecated(
message="The `terminate()` method will be removed in the next release. Kindly use `close()` method instead."
)
def terminate(self):
def terminate(self) -> None:
"""
!!! warning "[DEPRECATION NOTICE]: This method is now deprecated and will be removed in a future release."
@@ -1241,7 +1242,7 @@ class StreamGear:
self.close()
def close(self):
def close(self) -> None:
"""
Safely terminates various StreamGear process.
"""
+20 -15
View File
@@ -20,6 +20,8 @@ limitations under the License.
# import the necessary packages
import logging as log
from typing import TypeVar, Tuple, Union, Any
from numpy.typing import NDArray
# import helper packages
from .helper import logger_handler, logcurr_vidgear_ver
@@ -33,6 +35,9 @@ logger.propagate = False
logger.addHandler(logger_handler())
logger.setLevel(log.DEBUG)
# Type variable `T` representing class `VideoGear`.
T = TypeVar("T", bound="VideoGear")
class VideoGear:
"""
@@ -48,21 +53,21 @@ class VideoGear:
def __init__(
self,
# VideoGear parameters
enablePiCamera=False,
stabilize=False,
enablePiCamera: bool = False,
stabilize: bool = False,
# PiGear parameters
camera_num=0,
resolution=(640, 480),
framerate=30,
camera_num: int = 0,
resolution: Tuple[int, int] = (640, 480),
framerate: Union[int, float] = 30,
# CamGear parameters
source=0,
stream_mode=False,
backend=0,
source: Any = 0,
stream_mode: bool = False,
backend: int = 0,
# common parameters
time_delay=0,
colorspace=None,
logging=False,
**options
time_delay: int = 0,
colorspace: str = None,
logging: bool = False,
**options: dict
):
"""
This constructor method initializes the object state and attributes of the VideoGear class.
@@ -153,7 +158,7 @@ class VideoGear:
# initialize framerate variable
self.framerate = self.stream.framerate
def start(self):
def start(self) -> T:
"""
Launches the internal *Threaded Frames Extractor* daemon of API in use.
@@ -162,7 +167,7 @@ class VideoGear:
self.stream.start()
return self
def read(self):
def read(self) -> NDArray:
"""
Extracts frames synchronously from selected API's monitored deque, while maintaining a fixed-length frame
buffer in the memory, and blocks the thread if the deque is full.
@@ -178,7 +183,7 @@ class VideoGear:
return frame_stab
return self.stream.read()
def stop(self):
def stop(self) -> None:
"""
Safely terminates the thread, and release the respective multi-threaded resources.
"""
+10 -8
View File
@@ -26,6 +26,8 @@ import platform
import pathlib
import logging as log
import subprocess as sp
from typing import List
from numpy.typing import NDArray
# import helper packages
from .helper import (
@@ -77,11 +79,11 @@ class WriteGear:
def __init__(
self,
output="",
compression_mode=True,
custom_ffmpeg="",
logging=False,
**output_params
output: str = "",
compression_mode: bool = True,
custom_ffmpeg: str = "",
logging: bool = False,
**output_params: dict
):
"""
This constructor method initializes the object state and attributes of the WriteGear class.
@@ -369,7 +371,7 @@ class WriteGear:
"Compression Mode is disabled, Activating OpenCV built-in Writer!"
)
def write(self, frame, rgb_mode=False):
def write(self, frame: NDArray, rgb_mode: bool = False) -> None:
"""
Pipelines `ndarray` frames to respective API _(**FFmpeg** in Compression Mode & **OpenCV's VideoWriter API** in Non-Compression Mode)_.
@@ -634,7 +636,7 @@ class WriteGear:
"""
self.close()
def execute_ffmpeg_cmd(self, command=None):
def execute_ffmpeg_cmd(self, command: List = None) -> None:
"""
Executes user-defined FFmpeg Terminal command, formatted as a python list(in Compression Mode only).
@@ -758,7 +760,7 @@ class WriteGear:
self.__process.isOpened()
), "[WriteGear:ERROR] :: Failed to initialize OpenCV Writer!"
def close(self):
def close(self) -> None:
"""
Safely terminates various WriteGear process.
"""