Fix all linting errors

This commit is contained in:
Michael Hansen
2019-12-23 11:58:47 -05:00
parent ad2e208fc2
commit aa658fb29b
17 changed files with 128 additions and 40 deletions
+1
View File
@@ -1166,6 +1166,7 @@ def prefers_json() -> bool:
def quality(accept, key: str) -> float:
"""Return Accept quality for media type."""
for option in accept.options:
# pylint: disable=W0212
if accept._values_match(key, option.value):
return option.quality
return 0.0
+1 -1
View File
@@ -240,7 +240,7 @@ cp -R "${openfst_dir}"/build/bin/* "${venv}/bin/"
# -----------------------------------------------------------------------------
# opengrm
# http://www.opengrm.org/twiki/bin/view/GRM/NGramLibrary
#
#
# Required to build language models.
# -----------------------------------------------------------------------------
+4 -1
View File
@@ -24,7 +24,10 @@ disable=
unnecessary-pass,
unused-argument,
invalid-name,
broad-except
broad-except,
no-self-use,
c-extension-no-member,
too-many-nested-blocks
[FORMAT]
expected-line-ending-format=LF
+28 -9
View File
@@ -1,9 +1,10 @@
#!/usr/bin/env python3
"""Rhasspy command-line interface"""
import argparse
import asyncio
import io
import json
import logging
# Configure logging
import logging.config
import os
@@ -31,6 +32,7 @@ mic_stdin_running = False
async def main() -> None:
"""Main method"""
global mic_stdin_running, mic_stdin_thread
# Parse command-line arguments
@@ -226,8 +228,6 @@ async def main() -> None:
logger.debug(profiles_dirs)
# Create rhasspy core
from rhasspy.core import RhasspyCore
core = RhasspyCore(args.profile, args.system_profiles, args.user_profiles)
# Add profile settings from the command line
@@ -238,7 +238,7 @@ async def main() -> None:
except Exception:
pass
logger.debug("Profile: {0}={1}".format(key, value))
logger.debug("Profile: %s=%s", key, value)
extra_settings[key] = value
core.profile.set(key, value)
@@ -304,7 +304,9 @@ async def main() -> None:
missing_files = core.check_profile()
if len(missing_files) > 0:
logger.fatal(
f"Missing required files for {profile.name}: {missing_files.keys()}. Please run download command and try again."
"Missing required files for %s: %s. Please run download command and try again.",
profile.name,
missing_files.keys(),
)
sys.exit(1)
@@ -332,6 +334,7 @@ async def main() -> None:
async def wav2text(core: RhasspyCore, profile: Profile, args: Any) -> None:
"""Transcribe WAV file(s)"""
if len(args.wav_files) > 0:
# Read WAV paths from argument list
transcriptions = {}
@@ -356,7 +359,7 @@ async def wav2text(core: RhasspyCore, profile: Profile, args: Any) -> None:
async def text2intent(core: RhasspyCore, profile: Profile, args: Any) -> None:
# Parse sentences from command line or stdin
"""Parse sentences from command line or stdin"""
intents = {}
sentences = args.sentences if len(args.sentences) > 0 else sys.stdin
for sentence in sentences:
@@ -378,6 +381,7 @@ async def text2intent(core: RhasspyCore, profile: Profile, args: Any) -> None:
async def wav2intent(core: RhasspyCore, profile: Profile, args: Any) -> None:
"""Recognize intent from WAV file(s)"""
if len(args.wav_files) > 0:
# Read WAV paths from argument list
transcriptions = {}
@@ -416,6 +420,7 @@ async def wav2intent(core: RhasspyCore, profile: Profile, args: Any) -> None:
async def train_profile(core: RhasspyCore, profile: Profile, args: Any) -> None:
"""Train Rhasspy profile"""
result = await core.train(reload_actors=False, no_cache=args.no_cache)
print(result)
@@ -426,6 +431,7 @@ async def train_profile(core: RhasspyCore, profile: Profile, args: Any) -> None:
async def mic2wav(core: RhasspyCore, profile: Profile, args: Any) -> None:
"""Record voice command from microphone"""
# Listen until silence
wav_data = buffer_to_wav((await core.record_command(args.timeout)).data)
@@ -439,6 +445,7 @@ async def mic2wav(core: RhasspyCore, profile: Profile, args: Any) -> None:
async def mic2text(core: RhasspyCore, profile: Profile, args: Any) -> None:
"""Record voice command and transcribe"""
# Listen until silence
wav_data = buffer_to_wav((await core.record_command(args.timeout)).data)
@@ -455,6 +462,7 @@ async def mic2text(core: RhasspyCore, profile: Profile, args: Any) -> None:
def read_audio_stdin(core: RhasspyCore, chunk_size: int = 960):
"""Record audio chunks from stdin"""
global mic_stdin_running
while mic_stdin_running:
audio_data = sys.stdin.buffer.read(chunk_size)
@@ -462,6 +470,7 @@ def read_audio_stdin(core: RhasspyCore, chunk_size: int = 960):
async def mic2intent(core: RhasspyCore, profile: Profile, args: Any) -> None:
"""Record voice command, transcribe, and recognize intent"""
# Listen until silence
wav_data = buffer_to_wav((await core.record_command(args.timeout)).data)
@@ -484,6 +493,7 @@ async def mic2intent(core: RhasspyCore, profile: Profile, args: Any) -> None:
async def word2phonemes(core: RhasspyCore, profile: Profile, args: Any) -> None:
"""Get pronunciation(s) for word(s)"""
words = args.words if len(args.words) > 0 else sys.stdin
# Get pronunciations for all words
@@ -501,6 +511,7 @@ async def word2phonemes(core: RhasspyCore, profile: Profile, args: Any) -> None:
async def word2wav(core: RhasspyCore, profile: Profile, args: Any) -> None:
"""Speak a word's pronunciation"""
# Get pronunciation for word
all_pronunciations = (
await core.get_word_pronunciations([args.word], n=1)
@@ -530,8 +541,10 @@ def _send_frame(
width: int,
channels: int,
) -> None:
"""Send a single audio frame via MQTT"""
with io.BytesIO() as mqtt_buffer:
with wave.open(mqtt_buffer, mode="wb") as mqtt_file:
mqtt_file: wave.Wave_write = wave.open(mqtt_buffer, mode="wb")
with mqtt_file:
mqtt_file.setframerate(rate)
mqtt_file.setsampwidth(width)
mqtt_file.setnchannels(channels)
@@ -543,6 +556,7 @@ def _send_frame(
async def wav2mqtt(core: RhasspyCore, profile: Profile, args: Any) -> None:
"""Publish WAV to MQTT as audio frames"""
# hermes/audioServer/<SITE_ID>/audioFrame
topic = "hermes/audioServer/%s/audioFrame" % args.site_id
@@ -562,7 +576,7 @@ async def wav2mqtt(core: RhasspyCore, profile: Profile, args: Any) -> None:
num_chunks = int(
(args.silence_before * rate * width * channels) / chunk_size
)
for i in range(num_chunks):
for _ in range(num_chunks):
_send_frame(
core, topic, bytes(chunk_size), rate, width, channels
)
@@ -583,7 +597,7 @@ async def wav2mqtt(core: RhasspyCore, profile: Profile, args: Any) -> None:
num_chunks = int(
(args.silence_after * rate * width * channels) / chunk_size
)
for i in range(num_chunks):
for _ in range(num_chunks):
_send_frame(
core, topic, bytes(chunk_size), rate, width, channels
)
@@ -602,6 +616,7 @@ async def wav2mqtt(core: RhasspyCore, profile: Profile, args: Any) -> None:
async def text2wav(core: RhasspyCore, profile: Profile, args: Any) -> None:
"""Speak a sentence and output WAV data"""
result = await core.speak_sentence(args)
sys.stdout.buffer.write(result.wav_data)
@@ -612,6 +627,7 @@ async def text2wav(core: RhasspyCore, profile: Profile, args: Any) -> None:
async def text2speech(core: RhasspyCore, profile: Profile, args: Any) -> None:
"""Speak sentences"""
sentences = args.sentences
if len(sentences) == 0:
sentences = sys.stdin
@@ -627,6 +643,7 @@ async def text2speech(core: RhasspyCore, profile: Profile, args: Any) -> None:
async def sleep(core: RhasspyCore, profile: Profile, args: Any) -> None:
"""Wait for wake word to be spoken"""
result = await core.wakeup_and_wait()
if isinstance(result, WakeWordDetected):
print(result.name)
@@ -640,6 +657,7 @@ async def sleep(core: RhasspyCore, profile: Profile, args: Any) -> None:
async def download(core: RhasspyCore, profile: Profile, args: Any) -> None:
"""Download necessary profile files"""
await core.download_profile(delete=args.delete)
print("OK")
@@ -650,6 +668,7 @@ async def download(core: RhasspyCore, profile: Profile, args: Any) -> None:
async def check(core: RhasspyCore, profile: Profile, args: Any) -> None:
"""Verify that profile files are downloaded"""
missing_files = core.check_profile()
json.dump(missing_files, sys.stdout, indent=4)
+1
View File
@@ -1,3 +1,4 @@
"""Run Rhasspy command-line"""
import asyncio
from rhasspy import main
+2 -2
View File
@@ -116,7 +116,7 @@ class APlayAudioPlayer(RhasspyActor):
aplay_cmd.append(path)
self._logger.debug(aplay_cmd)
subprocess.run(aplay_cmd)
subprocess.run(aplay_cmd, check=True)
def play_data(self, wav_data: bytes) -> None:
"""Play a WAV buffer using aplay."""
@@ -128,7 +128,7 @@ class APlayAudioPlayer(RhasspyActor):
self._logger.debug(aplay_cmd)
# Play data
subprocess.run(aplay_cmd, input=wav_data)
subprocess.run(aplay_cmd, input=wav_data, check=True)
# -------------------------------------------------------------------------
+1
View File
@@ -118,6 +118,7 @@ class WebrtcvadCommandListener(RhasspyActor):
self.timeout_sec: float = 30
self.vad_mode: int = 0
self.vad: Optional[webrtcvad.Vad] = None
self.timeout_id: str = ""
def to_started(self, from_state: str) -> None:
"""Transition to started state."""
+2 -1
View File
@@ -667,8 +667,9 @@ class RhasspyCore:
# Copy specific file/directory
self._logger.debug("Copying %s to %s", extract_path, dest_path)
if os.path.isdir(extract_path):
if len(src_exclude) > 0:
if src_exclude:
# Ignore some files
# pylint: disable=W0640
shutil.copytree(
extract_path,
dest_path,
+5 -2
View File
@@ -238,14 +238,14 @@ class FsticuffsRecognizer(RhasspyActor):
# Add words from FST
self.words = set()
for node, data in self.graph.nodes(data=True):
for _, data in self.graph.nodes(data=True):
if "word" in data:
self.words.add(data["word"])
# Load stop words
stop_words_path = self.profile.read_path("stop_words.txt")
if os.path.exists(stop_words_path):
self._logger.debug(f"Using stop words at {stop_words_path}")
self._logger.debug("Using stop words at %s", stop_words_path)
with open(stop_words_path, "r") as stop_words_file:
self.stop_words = {
line.strip()
@@ -586,6 +586,7 @@ class FlairRecognizer(RhasspyActor):
RhasspyActor.__init__(self)
try:
# pylint: disable=E0401
from flair.models import TextClassifier, SequenceTagger
except Exception:
pass
@@ -624,6 +625,7 @@ class FlairRecognizer(RhasspyActor):
def recognize(self, text: str) -> Dict[str, Any]:
"""Run intent classifier and then named-entity recognizer."""
# pylint: disable=E0401
from flair.data import Sentence
intent = empty_intent()
@@ -666,6 +668,7 @@ class FlairRecognizer(RhasspyActor):
def load_models(self) -> None:
"""Load intent classifier and named entity recognizers."""
# pylint: disable=E0401
from flair.models import TextClassifier, SequenceTagger
# Load mapping from intent id to user intent name
+5
View File
@@ -451,14 +451,19 @@ class FlairIntentTrainer(RhasspyActor):
def train(self, intent_fst) -> None:
"""Train intent classifier and named entity recognizers."""
# pylint: disable=E0401
from flair.data import Sentence, Token
# pylint: disable=E0401
from flair.models import SequenceTagger, TextClassifier
# pylint: disable=E0401
from flair.embeddings import (
FlairEmbeddings,
StackedEmbeddings,
DocumentRNNEmbeddings,
)
# pylint: disable=E0401
from flair.data import TaggedCorpus
# pylint: disable=E0401
from flair.trainers import ModelTrainer
# Directory to look for downloaded embeddings
+22 -2
View File
@@ -1,4 +1,4 @@
#!/usr/bin/env python3
"""Support for guessing word pronunciations"""
import os
import re
import subprocess
@@ -15,12 +15,16 @@ from rhasspy.utils import load_phoneme_map, read_dict
class SpeakWord:
"""Speak a word's pronunciation"""
def __init__(self, word: str, receiver: Optional[RhasspyActor] = None) -> None:
self.word = word
self.receiver = receiver
class WordSpoken:
"""Response to SpeakWord"""
def __init__(self, word: str, wav_data: bytes, phonemes: str) -> None:
self.word = word
self.wav_data = wav_data
@@ -28,18 +32,24 @@ class WordSpoken:
class GetWordPhonemes:
"""Get eSpeak phonemes for a word"""
def __init__(self, word: str, receiver: Optional[RhasspyActor] = None) -> None:
self.word = word
self.receiver = receiver
class WordPhonemes:
"""Response to GetWordPhonemes"""
def __init__(self, word: str, phonemes: str) -> None:
self.word = word
self.phonemes = phonemes
class GetWordPronunciations:
"""Look up or guess word pronunciation(s)"""
def __init__(
self, words: List[str], n: int = 5, receiver: Optional[RhasspyActor] = None
) -> None:
@@ -49,11 +59,15 @@ class GetWordPronunciations:
class WordPronunciations:
"""Response to GetWordPronunciations"""
def __init__(self, pronunciations: Dict[str, Dict[str, Any]]) -> None:
self.pronunciations = pronunciations
class PronunciationFailed:
"""Response when g2p fails"""
def __init__(self, reason: str) -> None:
self.reason = reason
@@ -80,11 +94,14 @@ class PhonetisaurusPronounce(RhasspyActor):
RhasspyActor.__init__(self)
self.speed = 80 # wpm for speaking
self.base_dict: Optional[Dict[str, List[str]]] = None
self.speech_system: str = ""
def to_started(self, from_state: str) -> None:
"""Transition to started state"""
self.speech_system = self.profile.get("speech_to_text.system", "pocketsphinx")
def in_started(self, message: Any, sender: RhasspyActor) -> None:
"""Handle messages in started state"""
if isinstance(message, SpeakWord):
espeak_phonemes, wav_data = self.speak(message.word)
self.send(
@@ -107,6 +124,7 @@ class PhonetisaurusPronounce(RhasspyActor):
# -------------------------------------------------------------------------
def speak(self, espeak_str: str, voice: Optional[str] = None) -> Tuple[str, bytes]:
"""Speak word pronunciation"""
# Use eSpeak to pronounce word
espeak_command = ["espeak", "-s", str(self.speed), "-x"]
@@ -133,6 +151,7 @@ class PhonetisaurusPronounce(RhasspyActor):
# -------------------------------------------------------------------------
def translate_phonemes(self, phonemes: str) -> str:
"""Get eSpeak phonemes for a pronunciation"""
# Load map from Sphinx to eSpeak phonemes
map_path = self.profile.read_path(
self.profile.get(f"speech_to_text.{self.speech_system}.phoneme_map")
@@ -150,10 +169,11 @@ class PhonetisaurusPronounce(RhasspyActor):
# -------------------------------------------------------------------------
def pronounce(self, words: List[str], n: int = 5) -> Dict[str, Dict[str, Any]]:
"""Look up or guess word pronunciation(s)"""
assert n > 0, "No pronunciations requested"
assert len(words) > 0, "No words to look up"
self._logger.debug("Getting pronunciations for %s" % words)
self._logger.debug("Getting pronunciations for %s", words)
# Load base and custom dictionaries
base_dictionary_path = self.profile.read_path(
+11 -3
View File
@@ -101,6 +101,7 @@ class PocketsphinxDecoder(RhasspyActor):
self.min_confidence: float = 0
self.preload: bool = False
self.decoder = None
self.open_transcription = False
def to_started(self, from_state: str) -> None:
"""Transition to started state."""
@@ -254,6 +255,7 @@ class PocketsphinxDecoder(RhasspyActor):
problems: Dict[str, Any] = {}
try:
# pylint: disable=W0201,W1201,W0611
import pocketsphinx # noqa: F401
except Exception:
problems[
@@ -324,8 +326,6 @@ class RemoteDecoder(RhasspyActor):
def transcribe_wav(self, wav_data: bytes) -> str:
"""POST to remote server and return response."""
import requests
headers = {"Content-Type": "audio/wav"}
self._logger.debug(
"POSTing %d byte(s) of WAV data to %s", len(wav_data), self.remote_url
@@ -361,6 +361,7 @@ class KaldiDecoder(RhasspyActor):
self.graph_dir: Optional[Path] = None
self.decode_path: Optional[Path] = None
self.decode_command: List[str] = []
self.open_transcription = False
def to_started(self, from_state: str) -> None:
"""Transition to started state."""
@@ -495,6 +496,10 @@ class HomeAssistantSTTIntegration(RhasspyActor):
self.pem_file: Optional[str] = ""
self.platform: Optional[str] = None
self.chunk_size: int = 2048
self.sample_rate: int = 16000
self.bit_rate: int = 16
self.channels: int = 1
self.language: str = "en-US"
def to_started(self, from_state: str) -> None:
"""Transition to started state."""
@@ -566,7 +571,10 @@ class HomeAssistantSTTIntegration(RhasspyActor):
with wave.open(wav_buffer, "rb") as wav_file:
# Send empty WAV as initial chunk (header only)
with io.BytesIO() as empty_wav_buffer:
with wave.open(empty_wav_buffer, "wb") as empty_wav_file:
empty_wav_file: wave.Wave_write = wave.open(
empty_wav_buffer, "wb"
)
with empty_wav_file:
empty_wav_file.setframerate(wav_file.getframerate())
empty_wav_file.setsampwidth(wav_file.getsampwidth())
empty_wav_file.setnchannels(wav_file.getnchannels())
+11 -4
View File
@@ -344,6 +344,7 @@ class MaryTTSSentenceSpeaker(RhasspyActor):
self.player: Optional[RhasspyActor] = None
self.receiver: Optional[RhasspyActor] = None
self.wav_data = bytes()
self.effects: Dict[str, Any] = {}
def to_started(self, from_state: str) -> None:
"""Transition to started state."""
@@ -356,9 +357,7 @@ class MaryTTSSentenceSpeaker(RhasspyActor):
self.voice = self.profile.get("text_to_speech.marytts.voice", None)
self.locale = self.profile.get("text_to_speech.marytts.locale", "en-US")
self.effects = self.profile.get(
"text_to_speech.marytts.effects", {}
)
self.effects = self.profile.get("text_to_speech.marytts.effects", {})
self.player = self.config["player"]
self.transition("ready")
@@ -655,12 +654,18 @@ class GoogleWaveNetSentenceSpeaker(RhasspyActor):
from google.cloud import texttospeech
client = texttospeech.TextToSpeechClient()
# pylint: disable=E1101
synthesis_input = texttospeech.types.SynthesisInput(text=sentence)
# pylint: disable=E1101
voice_params = texttospeech.types.VoiceSelectionParams(
language_code=language_code,
name=language_code + "-" + voice,
ssml_gender=self.gender,
)
# pylint: disable=E1101
audio_config = texttospeech.types.AudioConfig(
audio_encoding="LINEAR16", sample_rate_hertz=self.sample_rate
)
@@ -789,7 +794,9 @@ class HomeAssistantSentenceSpeaker(RhasspyActor):
lame_command = ["lame", "--decode", "-", "-"]
self._logger.debug(lame_command)
return subprocess.check_output(lame_command, input=audio_bytes)
return subprocess.run(
lame_command, input=audio_bytes, check=True, stdout=subprocess.PIPE
).stdout
# Assume WAV
return audio_bytes
+13 -9
View File
@@ -4,7 +4,8 @@ import os
import shutil
import subprocess
import tempfile
from typing import Any, Dict
from pathlib import Path
from typing import Any, Dict, Optional
from rhasspy.profiles import Profile
@@ -23,7 +24,9 @@ class SpeechTuner:
"""Cache import stuff upfront."""
pass
def tune(self, wav_intents: Dict[str, Dict[str, Any]]) -> None:
def tune(
self, wav_intents: Dict[str, Dict[str, Any]], mllr_path: Optional[Path] = None
) -> None:
"""Tunes a speech system with WAV file paths mapped to intents."""
pass
@@ -38,6 +41,7 @@ class SphinxTrainSpeechTuner(SpeechTuner):
"""Uses sphinxtrain tools to generate an MLLR matrix for an acoustic model."""
def tune(self, wav_intents: Dict[str, Dict[str, Any]], mllr_path=None) -> None:
"""Generate MLLR matrix for Pocketsphinx model"""
ps_config = self.profile.get("speech_to_text.pocketsphinx")
# Load decoder settings
@@ -54,15 +58,15 @@ class SphinxTrainSpeechTuner(SpeechTuner):
mdef_path,
]
logger.debug("Creating mdef.txt: %s" % mdef_command)
logger.debug("Creating mdef.txt: %s", mdef_command)
subprocess.check_call(mdef_command)
# Copy WAV files into temporary directory with unique names
fileid_intents = {}
logger.debug("Copying %s WAV file(s) to %s" % (len(wav_intents), temp_dir))
logger.debug("Copying %s WAV file(s) to %s", len(wav_intents), temp_dir)
for wav_path in list(wav_intents.keys()):
if not os.path.exists(wav_path):
logger.warn("Skipping %s (does not exist)" % wav_path)
logger.warning("Skipping %s (does not exist)", wav_path)
continue
# Copy WAV file
@@ -78,10 +82,10 @@ class SphinxTrainSpeechTuner(SpeechTuner):
# Write fileids (just the file name, no extension)
fileids_path = os.path.join(temp_dir, "fileids")
with open(fileids_path, "w") as fileids_file:
for file_id in fileid_intents.keys():
for file_id in fileid_intents:
print(file_id, file=fileids_file)
logger.debug("Wrote %s fileids" % len(fileid_intents))
logger.debug("Wrote %s fileids", len(fileid_intents))
# Write transcription.txt
transcription_path = os.path.join(temp_dir, "transcription.txt")
@@ -90,7 +94,7 @@ class SphinxTrainSpeechTuner(SpeechTuner):
text = fileid_intents[file_id]["text"].strip()
print("%s (%s.wav)" % (text, file_id), file=transcription_file)
logger.debug("Wrote %s" % transcription_path)
logger.debug("Wrote %s", transcription_path)
# Extract features
feat_params_path = os.path.join(hmm_path, "feat.params")
@@ -181,7 +185,7 @@ class SphinxTrainSpeechTuner(SpeechTuner):
logger.debug(solve_command)
subprocess.check_call(solve_command)
logger.debug("Generated MLLR matrix: %s" % mllr_path)
logger.debug("Generated MLLR matrix: %s", mllr_path)
# -----------------------------------------------------------------------------
+8 -3
View File
@@ -141,7 +141,8 @@ def recursive_remove(base_dict: Dict[Any, Any], new_dict: Dict[Any, Any]) -> Non
def buffer_to_wav(buffer: bytes) -> bytes:
"""Wraps a buffer of raw audio data (16-bit, 16Khz mono) in a WAV"""
with io.BytesIO() as wav_buffer:
with wave.open(wav_buffer, mode="wb") as wav_file:
wav_file: wave.Wave_write = wave.open(wav_buffer, mode="wb")
with wav_file:
wav_file.setframerate(16000)
wav_file.setsampwidth(2)
wav_file.setnchannels(1)
@@ -179,7 +180,8 @@ def convert_wav(wav_data: bytes, rate=16000, width=16, channels=1) -> bytes:
def maybe_convert_wav(wav_data: bytes, rate=16000, width=16, channels=1) -> bytes:
"""Converts WAV data to 16-bit, 16Khz mono if necessary."""
with io.BytesIO(wav_data) as wav_io:
with wave.open(wav_io, "rb") as wav_file:
wav_file: wave.Wave_read = wave.open(wav_io, "rb")
with wav_file:
if (
(wav_file.getframerate() != rate)
or (wav_file.getsampwidth() != width)
@@ -347,6 +349,7 @@ def make_sentences_by_intent(intent_fst: fst.Fst) -> Dict[str, Any]:
def sample_sentences_by_intent(
intent_fst_paths: Dict[str, str], num_samples: int
) -> Dict[str, Any]:
"""Generate random intents"""
from rhasspy.train.jsgf2fst import fstprintall, symbols2intent
def sample_sentences(intent_name: str, intent_fst_path: str):
@@ -444,8 +447,10 @@ def split_whitespace(s: str, **kwargs):
def get_wav_duration(wav_bytes: bytes) -> float:
"""Return the real-time duration of a WAV file"""
with io.BytesIO(wav_bytes) as wav_buffer:
with wave.open(wav_buffer) as wav_file:
wav_file: wave.Wave_read = wave.open(wav_buffer, "rb")
with wav_file:
frames = wav_file.getnframes()
rate = wav_file.getframerate()
return frames / float(rate)
+6 -1
View File
@@ -464,6 +464,7 @@ class SnowboyWakeListener(RhasspyActor):
"""Get problems at startup."""
problems: Dict[str, Any] = {}
try:
# pylint: disable=W0611
from snowboy import snowboydetect, snowboydecoder # noqa: F401
except Exception:
problems[
@@ -495,6 +496,7 @@ class PreciseWakeListener(RhasspyActor):
"""Listens for a wake word using Mycroft Precise."""
def __init__(self) -> None:
# pylint: disable=E0401
from precise_runner import ReadWriteStream
RhasspyActor.__init__(self)
@@ -562,7 +564,7 @@ class PreciseWakeListener(RhasspyActor):
if self.send_not_detected:
# Wait for all chunks to finish processing
for i in range(num_chunks):
for _ in range(num_chunks):
self.prediction_sem.acquire(timeout=0.1)
# Wait a little bit for the precise engine to finish processing
@@ -603,6 +605,7 @@ class PreciseWakeListener(RhasspyActor):
def load_runner(self) -> None:
"""Load precise runner."""
if self.engine is None:
# pylint: disable=E0401
from precise_runner import PreciseEngine
self.model_name = self.profile.get("wake.precise.model", "hey-mycroft-2.pb")
@@ -617,6 +620,7 @@ class PreciseWakeListener(RhasspyActor):
)
if self.runner is None:
# pylint: disable=E0401
from precise_runner import PreciseRunner, ReadWriteStream
self.stream = ReadWriteStream()
@@ -656,6 +660,7 @@ class PreciseWakeListener(RhasspyActor):
"""Get problems at startup."""
problems: Dict[str, Any] = {}
try:
# pylint: disable=E0401,W0611
from precise_runner import PreciseRunner, ReadWriteStream # noqa: F401
except Exception:
problems[
+7 -2
View File
@@ -58,7 +58,7 @@ class RhasspyTestCase(unittest.TestCase):
# -------------------------------------------------------------------------
def __init__(self, *args, **kwargs):
def __init__(self, *_args, **kwargs):
self.profiles = RhasspyTestCase.PROFILES
self.test_wav_path = {
p: os.path.join("etc", "test", p, "test.wav") for p in self.profiles
@@ -68,11 +68,12 @@ class RhasspyTestCase(unittest.TestCase):
for p in self.profiles
}
unittest.TestCase.__init__(self, *args, **kwargs)
unittest.TestCase.__init__(self, *_args, **kwargs)
# -------------------------------------------------------------------------
def test_transcribe(self):
"""Call async_test_transcribe"""
loop.run_until_complete(self.async_test_transcribe())
async def async_test_transcribe(self):
@@ -89,6 +90,7 @@ class RhasspyTestCase(unittest.TestCase):
# -------------------------------------------------------------------------
def test_recognize(self):
"""Call async_test_recognize"""
loop.run_until_complete(self.async_test_recognize())
async def async_test_recognize(self):
@@ -113,6 +115,7 @@ class RhasspyTestCase(unittest.TestCase):
# -------------------------------------------------------------------------
def test_training(self):
"""Call async_test_training"""
loop.run_until_complete(self.async_test_training())
async def async_test_training(self):
@@ -169,9 +172,11 @@ class RhasspyTestCase(unittest.TestCase):
# -------------------------------------------------------------------------
def test_pronounce(self):
"""Call async_test_pronounce"""
loop.run_until_complete(self.async_test_pronounce())
async def async_test_pronounce(self):
"""Test g2p model for guess pronunciation"""
for profile_name in self.profiles:
async with RhasspyTestCore(profile_name) as core:
test_info = self.test_json[profile_name]