decoupling UI and runtime with signaling + rule signaling upon refresh

This commit is contained in:
Nicolas Vandamme
2026-03-10 22:17:08 +01:00
parent fe4a397f3e
commit fca8c07bac
10 changed files with 2648 additions and 750 deletions
@@ -0,0 +1,15 @@
# This file is part of OpenSnitch.
#
# OpenSnitch is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenSnitch is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OpenSnitch. If not, see <http://www.gnu.org/licenses/>.
File diff suppressed because it is too large Load Diff
@@ -1,101 +1,68 @@
import os
import re
from dataclasses import dataclass, field, asdict, replace
from typing import Any
from urllib.parse import urlparse, unquote
from typing import Any, TypeVar
from collections.abc import Callable
from opensnitch.utils.xdg import xdg_config_home
from opensnitch.plugins.list_subscriptions._utils import (
to_seconds,
dedupe_subscription_identity,
derive_filename,
ensure_filename_type_suffix,
normalize_groups,
normalize_lists_dir,
now_iso,
normalize_iso_timestamp,
opt_int,
opt_str,
parse_compact_duration,
safe_filename,
to_seconds,
to_max_bytes,
)
DEFAULT_UA = (
"Mozilla/5.0 (X11; Linux x86_64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0 Safari/537.36"
DEFAULT_UA = "Mozilla/5.0 (X11; Linux x86_64; rv:148.0) Gecko/20100101 Firefox/148.0"
DEFAULT_NOTIFY_CONFIG = {
"success": {"desktop": "Lists subscriptions updated"},
"error": {"desktop": "Error updating lists subscriptions"},
}
SubscriptionLike = TypeVar(
"SubscriptionLike", "SubscriptionSpec", "MutableSubscriptionSpec"
)
def normalize_lists_dir(path: str | None) -> str:
default_dir = os.path.join(xdg_config_home, "opensnitch", "list_subscriptions")
raw = (path or "").strip()
if raw == "":
raw = default_dir
expanded = os.path.expandvars(os.path.expanduser(raw))
if not os.path.isabs(expanded):
return os.path.abspath(expanded)
return expanded
def safe_filename(value: Any) -> str:
return os.path.basename((str(value or "")).strip())
def filename_from_url(url: str | None) -> str:
try:
parsed = urlparse((url or "").strip())
return safe_filename(unquote(parsed.path or ""))
except Exception:
return ""
def slugify_name(name: str | None) -> str:
raw = (name or "").strip().lower()
if raw == "":
return "subscription.list"
slug = re.sub(r"[^a-z0-9._-]+", "-", raw).strip("-._")
if slug == "":
slug = "subscription"
if "." not in slug:
slug += ".list"
return safe_filename(slug)
def derive_filename(name: str | None, url: str | None, filename: str | None) -> str:
fn = safe_filename(filename)
if fn != "":
return fn
fn = filename_from_url(url)
if fn != "":
return fn
return slugify_name(name)
def ensure_filename_type_suffix(filename: str, list_type: str) -> str:
fn = safe_filename(filename)
base, ext = os.path.splitext(fn)
ltype = (list_type or "hosts").strip().lower()
suffix = f"-{ltype}"
if not base.lower().endswith(suffix):
base = f"{base}{suffix}" if base else ltype
if ext == "":
ext = ".txt"
return safe_filename(f"{base}{ext}")
def normalize_group(group: str | None) -> str:
raw = (group or "all").strip().lower()
raw = re.sub(r"[^a-z0-9._-]+", "-", raw).strip("-._")
return raw if raw else "all"
def normalize_groups(groups: Any) -> list[str]:
out: list[str] = []
if isinstance(groups, (list, tuple, set)):
raw_items = [str(x) for x in groups]
else:
raw_items = str(groups or "").split(",")
seen: set[str] = set()
for item in raw_items:
g = normalize_group(item)
if g in seen:
continue
seen.add(g)
out.append(g)
return out if out else ["all"]
def normalize_subscription_identities(
subscriptions: list[SubscriptionLike],
invalidate_duplicates: bool = False,
clone: (
Callable[[SubscriptionLike, str, str, str, str], SubscriptionLike] | None
) = None,
):
normalized: list[SubscriptionLike] = []
seen_filenames: dict[str, str] = {}
for sub in subscriptions:
url = (sub.url or "").strip()
if url == "":
return None
list_type = (sub.format or "hosts").strip().lower()
filename = ensure_filename_type_suffix(
derive_filename(sub.name, url, sub.filename), list_type
)
name = (sub.name or "").strip() or filename
filename, name, duplicate_same_url = dedupe_subscription_identity(
filename,
name,
url,
list_type,
seen_filenames,
)
if duplicate_same_url and invalidate_duplicates:
return None
if clone is None:
normalized.append(sub)
else:
normalized.append(clone(sub, name, url, filename, list_type))
return normalized
@dataclass(frozen=True)
@@ -140,29 +107,37 @@ class SubscriptionSpec:
name: str
url: str
filename: str
groups: tuple[str, ...] = ("all",)
groups: tuple[str, ...] = ()
enabled: bool = True
format: str = "hosts"
interval: int = 24
interval_units: str = "hours"
timeout: int = 60
timeout_units: str = "seconds"
max_size: int = 20
max_size_units: str = "MB"
interval: int | None = None
interval_units: str | None = None
timeout: int | None = None
timeout_units: str | None = None
max_size: int | None = None
max_size_units: str | None = None
interval_seconds: int = 24 * 3600
timeout_seconds: int = 60
max_bytes: int = 20 * 1024 * 1024
@staticmethod
def from_dict(d: dict[str, Any], defaults: GlobalDefaults):
if not isinstance(d, dict):
return None
def from_dict(
d: dict[str, Any] | None,
defaults: GlobalDefaults | None = None,
require_url: bool = True,
ensure_suffix: bool = True,
):
d = d or {}
defaults = defaults or GlobalDefaults.from_dict({})
name = (d.get("name") or "").strip()
url = (d.get("url") or "").strip()
list_type = str(d.get("format", "hosts") or "hosts").strip().lower()
filename = derive_filename(name, url, d.get("filename"))
filename = ensure_filename_type_suffix(filename, list_type)
if ensure_suffix and filename != "":
filename = ensure_filename_type_suffix(filename, list_type)
elif not ensure_suffix and filename == "":
filename = ""
groups_raw = d.get("groups")
if "group" in d:
legacy_group = d.get("group")
@@ -173,45 +148,41 @@ class SubscriptionSpec:
else:
groups_raw = [groups_raw, legacy_group]
groups = normalize_groups(groups_raw)
if "all" not in groups:
groups.insert(0, "all")
if not url:
if require_url and not url:
return None
if not name:
if require_url and not name:
name = filename
def _opt_int(x: Any):
try:
return int(x) if x is not None else None
except Exception:
return None
def _opt_str(x: Any):
try:
if x is None:
return None
x = (str(x) or "").strip().lower()
return x if x != "" else None
except Exception:
return None
interval_raw: Any = d.get("interval")
timeout_raw: Any = d.get("timeout")
interval_units_raw: Any = d.get("interval_units")
timeout_units_raw: Any = d.get("timeout_units")
interval = _opt_int(interval_raw) or defaults.interval
interval_units_opt = _opt_str(interval_units_raw)
interval_units = interval_units_opt or defaults.interval_units
timeout = _opt_int(timeout_raw) or defaults.timeout
timeout_units_opt = _opt_str(timeout_units_raw)
timeout_units = timeout_units_opt or defaults.timeout_units
max_size = _opt_int(d.get("max_size")) or defaults.max_size
max_size_units = _opt_str(d.get("max_size_units")) or defaults.max_size_units
interval = opt_int(interval_raw)
interval_units_opt = opt_str(interval_units_raw)
interval_units = interval_units_opt
timeout = opt_int(timeout_raw)
timeout_units_opt = opt_str(timeout_units_raw)
timeout_units = timeout_units_opt
max_size = opt_int(d.get("max_size"))
max_size_units = opt_str(d.get("max_size_units"))
default_interval_seconds = to_seconds(defaults.interval, defaults.interval_units, 24 * 3600)
default_timeout_seconds = to_seconds(defaults.timeout, defaults.timeout_units, 60)
default_max_bytes = to_max_bytes(defaults.max_size, defaults.max_size_units, 20 * 1024 * 1024)
default_interval_seconds = to_seconds(
defaults.interval, defaults.interval_units, 24 * 3600
)
default_timeout_seconds = to_seconds(
defaults.timeout, defaults.timeout_units, 60
)
default_max_bytes = to_max_bytes(
defaults.max_size, defaults.max_size_units, 20 * 1024 * 1024
)
effective_interval = interval if interval is not None else defaults.interval
effective_interval_units = interval_units or defaults.interval_units
effective_timeout = timeout if timeout is not None else defaults.timeout
effective_timeout_units = timeout_units or defaults.timeout_units
effective_max_size = max_size if max_size is not None else defaults.max_size
effective_max_size_units = max_size_units or defaults.max_size_units
interval_seconds: int | None = None
interval_is_composite = False
@@ -219,7 +190,9 @@ class SubscriptionSpec:
interval_seconds = parse_compact_duration(interval_raw)
interval_is_composite = interval_seconds is not None
if interval_seconds is None:
interval_seconds = to_seconds(interval, interval_units, default_interval_seconds)
interval_seconds = to_seconds(
effective_interval, effective_interval_units, default_interval_seconds
)
elif interval_is_composite:
interval = interval_seconds
interval_units = "composite"
@@ -230,12 +203,16 @@ class SubscriptionSpec:
timeout_seconds = parse_compact_duration(timeout_raw)
timeout_is_composite = timeout_seconds is not None
if timeout_seconds is None:
timeout_seconds = to_seconds(timeout, timeout_units, default_timeout_seconds)
timeout_seconds = to_seconds(
effective_timeout, effective_timeout_units, default_timeout_seconds
)
elif timeout_is_composite:
timeout = timeout_seconds
timeout_units = "composite"
max_bytes = to_max_bytes(max_size, max_size_units, default_max_bytes)
max_bytes = to_max_bytes(
effective_max_size, effective_max_size_units, default_max_bytes
)
return SubscriptionSpec(
name=name,
@@ -261,15 +238,15 @@ class MutableSubscriptionSpec:
name: str = ""
url: str = ""
filename: str = ""
groups: list[str] = field(default_factory=lambda: ["all"])
groups: list[str] = field(default_factory=list)
enabled: bool = True
format: str = "hosts"
interval: int = 24
interval_units: str = "hours"
timeout: int = 60
timeout_units: str = "seconds"
max_size: int = 20
max_size_units: str = "MB"
interval: int | None = None
interval_units: str | None = None
timeout: int | None = None
timeout_units: str | None = None
max_size: int | None = None
max_size_units: str | None = None
@staticmethod
def from_spec(spec: SubscriptionSpec):
@@ -289,70 +266,211 @@ class MutableSubscriptionSpec:
)
@staticmethod
def from_dict(d: dict[str, Any], defaults: GlobalDefaults):
spec = SubscriptionSpec.from_dict(d, defaults)
def from_dict(
d: dict[str, Any] | None,
defaults: GlobalDefaults | None = None,
require_url: bool = True,
ensure_suffix: bool = True,
):
spec = SubscriptionSpec.from_dict(
d,
defaults,
require_url=require_url,
ensure_suffix=ensure_suffix,
)
if spec is None:
return None
return MutableSubscriptionSpec.from_spec(spec)
d = d or {}
def _has_value(value: Any):
return value is not None and str(value).strip() != ""
return MutableSubscriptionSpec(
name=spec.name,
url=spec.url,
filename=spec.filename,
groups=list(spec.groups),
enabled=spec.enabled,
format=spec.format,
interval=(
spec.interval
if _has_value(d.get("interval")) or spec.interval_units == "composite"
else None
),
interval_units=(
spec.interval_units
if _has_value(d.get("interval_units"))
or spec.interval_units == "composite"
else None
),
timeout=(
spec.timeout
if _has_value(d.get("timeout")) or spec.timeout_units == "composite"
else None
),
timeout_units=(
spec.timeout_units
if _has_value(d.get("timeout_units"))
or spec.timeout_units == "composite"
else None
),
max_size=spec.max_size if _has_value(d.get("max_size")) else None,
max_size_units=(
spec.max_size_units if _has_value(d.get("max_size_units")) else None
),
)
def to_dict(self):
return {
data: dict[str, Any] = {
"enabled": bool(self.enabled),
"name": (self.name or "").strip(),
"url": (self.url or "").strip(),
"filename": safe_filename(self.filename),
"format": (self.format or "hosts").strip().lower(),
"groups": normalize_groups(self.groups),
"interval": int(self.interval),
"interval_units": (self.interval_units or "hours").strip().lower(),
"timeout": int(self.timeout),
"timeout_units": (self.timeout_units or "seconds").strip().lower(),
"max_size": int(self.max_size),
"max_size_units": (self.max_size_units or "MB").strip(),
}
if self.interval is not None:
data["interval"] = int(self.interval)
data["interval_units"] = (self.interval_units or "hours").strip().lower()
if self.timeout is not None:
data["timeout"] = int(self.timeout)
data["timeout_units"] = (self.timeout_units or "seconds").strip().lower()
if self.max_size is not None:
data["max_size"] = int(self.max_size)
data["max_size_units"] = (self.max_size_units or "MB").strip()
return data
@dataclass(frozen=True)
class PluginConfig:
defaults: GlobalDefaults = field(default_factory=lambda: GlobalDefaults.from_dict({}))
defaults: GlobalDefaults = field(
default_factory=lambda: GlobalDefaults.from_dict({})
)
subscriptions: list[SubscriptionSpec] = field(default_factory=list)
notify: dict[str, Any] = field(default_factory=lambda: dict(DEFAULT_NOTIFY_CONFIG))
@staticmethod
def from_dict(raw_cfg: dict[str, Any], lists_dir: str | None = None):
def from_dict(
raw_cfg: dict[str, Any],
lists_dir: str | None = None,
invalidate_duplicates: bool = False,
):
raw_cfg = raw_cfg or {}
if not isinstance(raw_cfg, dict):
raw_cfg = {}
defaults = GlobalDefaults.from_dict(raw_cfg, lists_dir)
subs: list[SubscriptionSpec] = []
seen_filenames: set[str] = set()
for item in (raw_cfg.get("subscriptions") or []):
sub = SubscriptionSpec.from_dict(item, defaults)
for item in raw_cfg.get("subscriptions") or []:
sub = SubscriptionSpec.from_dict(
item,
defaults,
)
if sub is not None:
key = os.path.normcase(sub.filename)
if key in seen_filenames:
base, ext = os.path.splitext(sub.filename)
n = 2
candidate = sub.filename
while os.path.normcase(candidate) in seen_filenames:
suffix = f"-{n}"
candidate = f"{base}{suffix}{ext}" if ext else f"{base}{suffix}"
n += 1
sub = replace(sub, filename=candidate)
if sub.name.strip() == "" or sub.name == sub.filename:
sub = replace(sub, name=candidate)
key = os.path.normcase(sub.filename)
seen_filenames.add(key)
subs.append(sub)
return PluginConfig(defaults=defaults, subscriptions=subs)
normalized_subs = normalize_subscription_identities(
subs,
invalidate_duplicates=invalidate_duplicates,
clone=lambda sub, name, url, filename, list_type: replace(
sub,
name=name,
url=url,
filename=filename,
format=list_type,
),
)
if normalized_subs is None:
normalized_subs = []
notify = raw_cfg.get("notify")
if not isinstance(notify, dict):
notify = dict(DEFAULT_NOTIFY_CONFIG)
return PluginConfig(
defaults=defaults, subscriptions=normalized_subs, notify=notify
)
@dataclass
class MutablePluginConfig:
defaults: GlobalDefaults = field(
default_factory=lambda: GlobalDefaults.from_dict({})
)
subscriptions: list[MutableSubscriptionSpec] = field(default_factory=list)
notify: dict[str, Any] = field(default_factory=lambda: dict(DEFAULT_NOTIFY_CONFIG))
@staticmethod
def from_plugin_config(config: PluginConfig):
return MutablePluginConfig(
defaults=config.defaults,
subscriptions=[
MutableSubscriptionSpec.from_spec(sub) for sub in config.subscriptions
],
notify=dict(config.notify),
)
@staticmethod
def from_dict(raw_cfg: dict[str, Any], lists_dir: str | None = None):
compiled_cfg = PluginConfig.from_dict(raw_cfg, lists_dir=lists_dir)
return MutablePluginConfig.from_plugin_config(compiled_cfg)
@staticmethod
def default(lists_dir: str | None = None):
return MutablePluginConfig(
defaults=GlobalDefaults.from_dict({}, lists_dir=lists_dir),
subscriptions=[],
notify=dict(DEFAULT_NOTIFY_CONFIG),
)
def normalize_subscriptions(self, invalidate_duplicates: bool = False):
normalized = normalize_subscription_identities(
self.subscriptions,
invalidate_duplicates=invalidate_duplicates,
clone=lambda sub, name, url, filename, list_type: MutableSubscriptionSpec(
name=name,
url=url,
filename=filename,
groups=list(sub.groups),
enabled=sub.enabled,
format=list_type,
interval=sub.interval,
interval_units=sub.interval_units,
timeout=sub.timeout,
timeout_units=sub.timeout_units,
max_size=sub.max_size,
max_size_units=sub.max_size_units,
),
)
if normalized is None:
return None
self.subscriptions = normalized
return normalized
def to_dict(self):
return {
"lists_dir": normalize_lists_dir(self.defaults.lists_dir),
"interval": int(self.defaults.interval),
"interval_units": self.defaults.interval_units,
"timeout": int(self.defaults.timeout),
"timeout_units": self.defaults.timeout_units,
"max_size": int(self.defaults.max_size),
"max_size_units": self.defaults.max_size_units,
"user_agent": (
self.defaults.user_agent
if self.defaults.user_agent is not None
else DEFAULT_UA
),
"subscriptions": [sub.to_dict() for sub in self.subscriptions],
"notify": self.notify,
}
@dataclass
class MutableActionConfig:
enabled: bool = False
defaults: GlobalDefaults = field(default_factory=lambda: GlobalDefaults.from_dict({}))
subscriptions: list[MutableSubscriptionSpec] = field(default_factory=list)
plugin: MutablePluginConfig = field(default_factory=MutablePluginConfig.default)
action_name: str = "listSubscriptionsActions"
created: str = ""
updated: str = ""
@@ -362,9 +480,14 @@ class MutableActionConfig:
@staticmethod
def from_action_dict(raw_action: dict[str, Any], lists_dir: str | None = None):
action_name = str(raw_action.get("name", "listSubscriptionsActions"))
created = str(raw_action.get("created", ""))
updated = str(raw_action.get("updated", ""))
description = str(raw_action.get("description", "Manage and auto-update blocklist subscriptions (hosts format)"))
created = normalize_iso_timestamp(raw_action.get("created"))
updated = normalize_iso_timestamp(raw_action.get("updated"), fallback=created)
description = str(
raw_action.get(
"description",
"Manage and auto-update blocklist subscriptions (hosts format)",
)
)
action_types_raw = raw_action.get("type", ["global", "main-dialog"])
if isinstance(action_types_raw, list):
action_types = [str(t) for t in action_types_raw]
@@ -372,16 +495,27 @@ class MutableActionConfig:
action_types = ["global", "main-dialog"]
actions_obj = raw_action.get("actions", {})
action_cfg = actions_obj.get("list_subscriptions", {}) if isinstance(actions_obj, dict) else {}
plugin_cfg_raw = action_cfg.get("config", {}) if isinstance(action_cfg, dict) else {}
action_cfg = (
actions_obj.get("list_subscriptions", {})
if isinstance(actions_obj, dict)
else {}
)
plugin_cfg_raw = (
action_cfg.get("config", {}) if isinstance(action_cfg, dict) else {}
)
plugin_cfg = plugin_cfg_raw if isinstance(plugin_cfg_raw, dict) else {}
compiled_cfg = PluginConfig.from_dict(plugin_cfg, lists_dir=plugin_cfg.get("lists_dir") or lists_dir)
enabled = bool(action_cfg.get("enabled", False)) if isinstance(action_cfg, dict) else False
mutable_plugin = MutablePluginConfig.from_dict(
plugin_cfg, lists_dir=plugin_cfg.get("lists_dir") or lists_dir
)
enabled = (
bool(action_cfg.get("enabled", False))
if isinstance(action_cfg, dict)
else False
)
return MutableActionConfig(
enabled=enabled,
defaults=compiled_cfg.defaults,
subscriptions=[MutableSubscriptionSpec.from_spec(s) for s in compiled_cfg.subscriptions],
plugin=mutable_plugin,
action_name=action_name,
created=created,
updated=updated,
@@ -391,51 +525,29 @@ class MutableActionConfig:
@staticmethod
def default(lists_dir: str | None = None):
defaults = GlobalDefaults.from_dict(
{
"interval": 24,
"interval_units": "hours",
"timeout": 20,
"timeout_units": "seconds",
"max_size": 50,
"max_size_units": "MB",
},
lists_dir=lists_dir,
)
created = now_iso()
return MutableActionConfig(
enabled=True,
defaults=defaults,
subscriptions=[],
plugin=MutablePluginConfig.default(lists_dir),
created=created,
updated=created,
)
def to_plugin_dict(self):
return {
"lists_dir": normalize_lists_dir(self.defaults.lists_dir),
"interval": int(self.defaults.interval),
"interval_units": self.defaults.interval_units,
"timeout": int(self.defaults.timeout),
"timeout_units": self.defaults.timeout_units,
"max_size": int(self.defaults.max_size),
"max_size_units": self.defaults.max_size_units,
"user_agent": self.defaults.user_agent if self.defaults.user_agent is not None else DEFAULT_UA,
"subscriptions": [sub.to_dict() for sub in self.subscriptions],
"notify": {
"success": {"desktop": "Lists subscriptions updated"},
"error": {"desktop": "Error updating lists subscriptions"},
},
}
def to_action_dict(self):
created = normalize_iso_timestamp(self.created)
updated = now_iso()
self.created = created
self.updated = updated
return {
"name": self.action_name,
"created": self.created,
"updated": self.updated,
"created": created,
"updated": updated,
"description": self.description,
"type": list(self.types),
"actions": {
"list_subscriptions": {
"enabled": bool(self.enabled),
"config": self.to_plugin_dict(),
"config": self.plugin.to_dict(),
}
},
}
@@ -2,8 +2,13 @@ import errno
import json
import os
import re
import time
from enum import IntEnum
from datetime import datetime
from typing import Any
from urllib.parse import urlparse, unquote
from opensnitch.utils.xdg import xdg_config_home
TIME_MULT = {
@@ -34,6 +39,14 @@ SIZE_MULT = {
}
class RuntimeEvent(IntEnum):
RUNTIME_ENABLED = 1
CONFIG_RELOADED = 2
RUNTIME_DISABLED = 3
RUNTIME_STOPPED = 4
RUNTIME_ERROR = 5
def now_iso():
return datetime.now().astimezone().isoformat()
@@ -45,6 +58,151 @@ def parse_iso(ts: str):
return None
def normalize_iso_timestamp(value: Any, fallback: str | None = None):
text = str(value or "").strip()
if text != "" and parse_iso(text) is not None:
return text
if fallback:
return fallback
return now_iso()
def opt_int(value: Any):
try:
return int(value) if value is not None else None
except Exception:
return None
def opt_str(value: Any):
try:
if value is None:
return None
normalized = (str(value) or "").strip().lower()
return normalized if normalized != "" else None
except Exception:
return None
def normalize_lists_dir(path: str | None) -> str:
default_dir = os.path.join(xdg_config_home, "opensnitch", "list_subscriptions")
raw = (path or "").strip()
if raw == "":
raw = default_dir
expanded = os.path.expandvars(os.path.expanduser(raw))
if not os.path.isabs(expanded):
return os.path.abspath(expanded)
return expanded
def safe_filename(value: Any) -> str:
return os.path.basename((str(value or "")).strip())
def filename_from_url(url: str | None) -> str:
try:
parsed = urlparse((url or "").strip())
return safe_filename(unquote(parsed.path or ""))
except Exception:
return ""
def slugify_name(name: str | None) -> str:
raw = (name or "").strip().lower()
if raw == "":
return "subscription.list"
slug = re.sub(r"[^a-z0-9._-]+", "-", raw).strip("-._")
if slug == "":
slug = "subscription"
if "." not in slug:
slug += ".list"
return safe_filename(slug)
def derive_filename(name: str | None, url: str | None, filename: str | None) -> str:
fn = safe_filename(filename)
if fn != "":
return fn
fn = filename_from_url(url)
if fn != "":
return fn
return slugify_name(name)
def ensure_filename_type_suffix(filename: str, list_type: str) -> str:
fn = safe_filename(filename)
base, ext = os.path.splitext(fn)
ltype = (list_type or "hosts").strip().lower()
suffix = f"-{ltype}"
if not base.lower().endswith(suffix):
base = f"{base}{suffix}" if base else ltype
if ext == "":
ext = ".txt"
return safe_filename(f"{base}{ext}")
def normalize_group(group: str | None) -> str:
raw = (group or "").strip().lower()
if raw == "":
return ""
raw = re.sub(r"[^a-z0-9._-]+", "-", raw).strip("-._")
return raw
def normalize_groups(groups: Any) -> list[str]:
out: list[str] = []
if isinstance(groups, (list, tuple, set)):
raw_items = [str(x) for x in groups]
else:
raw_items = str(groups or "").split(",")
seen: set[str] = set()
for item in raw_items:
g = normalize_group(item)
if g == "" or g == "all" or g in seen:
continue
seen.add(g)
out.append(g)
return out
def dedupe_subscription_identity(
filename: str,
name: str,
url: str,
list_type: str,
seen_filenames: dict[str, str] | None,
):
if seen_filenames is None:
return filename, name, False
key = filename
seen_url = seen_filenames.get(key)
if seen_url is None:
seen_filenames[key] = url
return filename, name, False
if seen_url == url:
return filename, name, True
base, ext = os.path.splitext(filename)
if ext == "":
ext = ".txt"
suffix = f"-{(list_type or 'hosts').strip().lower()}"
root = base
if root.lower().endswith(suffix):
root = root[: -len(suffix)]
root = root.rstrip("-")
n = 2
candidate = filename
while candidate in seen_filenames:
candidate = f"{root}-{n}{suffix}{ext}"
n += 1
display_name = (name or "").strip()
if display_name == "":
display_name = root or "subscription"
seen_filenames[candidate] = url
return candidate, f"{display_name} ({n - 1})", False
def to_seconds(value: Any, units: str | None, default_seconds: int):
try:
if value is None:
@@ -109,18 +267,113 @@ def write_json_atomic(path: str, obj: dict[str, Any]):
os.replace(tmp, path)
def json_lock_path(path: str) -> str:
return f"{path}.lock"
def read_json_locked(path: str, timeout: float = 5.0, poll_interval: float = 0.05):
lock_path = json_lock_path(path)
lock = FileLock(lock_path)
deadline = time.monotonic() + max(timeout, 0.0)
while os.path.exists(lock_path):
lock.break_stale()
if not os.path.exists(lock_path):
break
if time.monotonic() >= deadline:
raise TimeoutError(f"timed out waiting for lock: {lock_path}")
time.sleep(poll_interval)
return read_json(path)
def write_json_atomic_locked(
path: str,
obj: dict[str, Any],
timeout: float = 5.0,
poll_interval: float = 0.05,
):
lock = FileLock(json_lock_path(path))
deadline = time.monotonic() + max(timeout, 0.0)
while not lock.acquire():
if time.monotonic() >= deadline:
raise TimeoutError(f"timed out waiting for lock: {lock.lock_path}")
time.sleep(poll_interval)
try:
write_json_atomic(path, obj)
finally:
lock.release()
class FileLock:
def __init__(self, lock_path: str):
self.lock_path = lock_path
self.fd: int | None = None
def _read_owner_pid(self):
try:
with open(self.lock_path, "r", encoding="utf-8") as f:
raw = f.read().strip()
except FileNotFoundError:
return None
except Exception:
return -1
if raw == "":
return -1
try:
return int(raw)
except Exception:
return -1
def _pid_is_alive(self, pid: int):
if pid <= 0:
return False
try:
os.kill(pid, 0)
except ProcessLookupError:
return False
except PermissionError:
return True
except Exception:
return True
return True
def is_stale(self, max_age: float = 30.0):
try:
stat = os.stat(self.lock_path)
except FileNotFoundError:
return False
pid = self._read_owner_pid()
if pid is None:
return False
if pid > 0:
return not self._pid_is_alive(pid)
age = time.time() - stat.st_mtime
return age >= max(max_age, 0.0)
def break_stale(self, max_age: float = 30.0):
if not self.is_stale(max_age=max_age):
return False
try:
os.unlink(self.lock_path)
return True
except FileNotFoundError:
return True
except Exception:
return False
def acquire(self):
try:
self.fd = os.open(self.lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600)
self.fd = os.open(
self.lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600
)
os.write(self.fd, str(os.getpid()).encode("utf-8"))
return True
except OSError as e:
if e.errno == errno.EEXIST:
if self.break_stale():
return self.acquire()
return False
raise
@@ -5,10 +5,12 @@ import threading
import shutil
import sys
from typing import Any
from abc import ABCMeta
from datetime import datetime, timedelta
from queue import Queue
import requests
from opensnitch.proto import ui_pb2
if "PyQt6" in sys.modules:
from PyQt6 import QtCore, QtGui
@@ -21,8 +23,12 @@ else:
from PyQt5 import QtCore, QtGui
from opensnitch.dialogs.stats import StatsDialog
from opensnitch.config import Config
from opensnitch.nodes import Nodes
from opensnitch.notifications import DesktopNotifications
from opensnitch.plugins import PluginBase, PluginSignal
from opensnitch.rules import Rule
from opensnitch.database import Database
from opensnitch.utils import GenericTimer
from opensnitch.utils.xdg import xdg_config_home
from opensnitch.plugins.list_subscriptions._models import (
@@ -30,22 +36,23 @@ from opensnitch.plugins.list_subscriptions._models import (
ListMetadata,
PluginConfig,
SubscriptionSpec,
ensure_filename_type_suffix,
normalize_group,
normalize_lists_dir,
)
from opensnitch.plugins.list_subscriptions._utils import (
FileLock,
RuntimeEvent,
ensure_filename_type_suffix,
is_hosts_file_like,
normalize_groups,
normalize_lists_dir,
now_iso,
parse_iso,
read_json,
write_json_atomic,
read_json_locked,
write_json_atomic_locked,
)
ch = logging.StreamHandler()
#ch.setLevel(logging.ERROR)
formatter = logging.Formatter('%(asctime)s - %(name)s - [%(levelname)s] %(message)s')
# ch.setLevel(logging.ERROR)
formatter = logging.Formatter("%(asctime)s - %(name)s - [%(levelname)s] %(message)s")
ch.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.addHandler(ch)
@@ -54,15 +61,28 @@ logger.setLevel(logging.WARNING)
# -------------------- plugin core --------------------
class ListSubscriptions(PluginBase):
""" A plugin to manage list subscriptions (e.g. blocklists).
class SingletonABCMeta(ABCMeta):
_instances: dict[type, object] = {}
_lock = threading.Lock()
def __call__(cls, *args, **kwargs):
with cls._lock:
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]
class ListSubscriptions(PluginBase, metaclass=SingletonABCMeta):
"""A plugin to manage list subscriptions (e.g. blocklists).
The plugin is configured via a JSON file specifying a list of subscriptions.
Each subscription has a URL and a local filename to save to.
The plugin periodically checks each URL for updates, using HTTP cache validators to avoid unnecessary downloads.
Metadata about each subscription is stored in a sidecar JSON file (same name + .meta.json) to track last update time, errors, backoff, etc.
Metadata about each subscription is stored in a metadata JSON file (same name + .meta.json) to track last update time, errors, backoff, etc.
The plugin exposes a results queue for the UI to display subscription status and errors.
"""
# fields overriden from parent class
name = "List_subscriptions"
version = 0
@@ -77,33 +97,90 @@ class ListSubscriptions(PluginBase):
# runtime state
scheduled_tasks: dict[str, GenericTimer] = {}
default_conf = "{0}/{1}".format(xdg_config_home, "opensnitch/actions/list_subscriptions.json")
default_lists_dir = os.path.join(xdg_config_home, "opensnitch", "list_subscriptions")
default_conf = "{0}/{1}".format(
xdg_config_home, "opensnitch/actions/list_subscriptions.json"
)
default_lists_dir = os.path.join(
xdg_config_home, "opensnitch", "list_subscriptions"
)
@classmethod
def get_instance(cls) -> "ListSubscriptions | None":
instance = SingletonABCMeta._instances.get(cls)
if isinstance(instance, cls):
return instance
return None
def __init__(self, config: dict[str, Any] | None = None):
config = config or {}
self._log = logger
if getattr(self, "_initialized", False):
self._load_action_config(config)
return
self._initialized = True
self.signal_in.connect(self.cb_signal)
self._desktop_notifications = DesktopNotifications()
self._db = Database.instance()
self._nodes = Nodes.instance()
self._ok_msg = ""
self._err_msg = ""
self._notify: dict[str, Any] | None = None
self._notify_title = "[OpenSnitch] List subscriptions downloader"
self._resultsQueue: Queue[tuple[str, bool, str]] = Queue()
self._running = False
self._app_icon = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../res/icon-white.svg")
self._app_icon = os.path.join(
os.path.abspath(os.path.dirname(__file__)), "../../res/icon-white.svg"
)
self._cfg_dialog = None
self._cfg_action = None
self.scheduled_tasks = {}
self._startup_recheck_lock = threading.Lock()
self._startup_recheck_pending = False
self._startup_recheck_scheduled = False
self._nodes.nodesUpdated.connect(self._on_nodes_updated)
self._load_action_config(config)
if config.get("enabled") is True:
self.enabled = True
# Set up requests session with default UA
self._session: requests.Session = requests.Session()
if self._config.defaults.user_agent:
self._session.headers.update(
{"User-Agent": self._config.defaults.user_agent}
)
else:
self._session.headers.update({"User-Agent": DEFAULT_UA})
# Load config
plugin_cfg: Any = config.get("config", {})
def _emit_runtime_event(
self,
event: RuntimeEvent,
message: str,
*,
error: str | None = None,
action_path: str | None = None,
):
payload: dict[str, Any] = {
"plugin": self.get_name(),
"event": event,
"message": message,
}
if action_path:
payload["action_path"] = action_path
if error:
payload["error"] = error
self.signal_out.emit(payload)
def _load_action_config(self, action_cfg: dict[str, Any] | None = None):
action_cfg = action_cfg or {}
self.enabled = bool(action_cfg.get("enabled") is True)
plugin_cfg: Any = action_cfg.get("config", {})
if not isinstance(plugin_cfg, dict):
plugin_cfg = {}
self._config = PluginConfig.from_dict(plugin_cfg, lists_dir=self.default_lists_dir)
self._config = PluginConfig.from_dict(
plugin_cfg,
lists_dir=self.default_lists_dir,
)
self._notify = plugin_cfg.get("notify")
self._ok_msg = ""
self._err_msg = ""
if isinstance(self._notify, dict):
ok = self._notify.get("success")
err = self._notify.get("error")
@@ -118,14 +195,104 @@ class ListSubscriptions(PluginBase):
else:
self._notify = None
# Set up requests session with default UA
self._session: requests.Session = requests.Session()
if self._config.defaults.user_agent:
self._session.headers.update({"User-Agent": self._config.defaults.user_agent})
else:
self._session.headers.update({"User-Agent": DEFAULT_UA})
def _start_runtime(self, *, recheck: bool):
if not self.enabled:
return
# -------- metadata sidecar --------
for t in self.scheduled_tasks.values():
try:
t.start()
except Exception:
pass
if recheck:
if self._has_ready_local_node():
self._schedule_startup_recheck(delay=0.5)
else:
with self._startup_recheck_lock:
self._startup_recheck_pending = True
logger.warning(
"deferring startup refresh until a local node is connected"
)
def disable_runtime(self):
self.enabled = False
with self._startup_recheck_lock:
self._startup_recheck_pending = False
self.stop()
def _has_ready_local_node(self) -> bool:
for addr in self._nodes.get().keys():
if not self._nodes.is_local(addr):
continue
if self._nodes.is_connected(addr):
return True
return False
def _schedule_startup_recheck(self, *, delay: float):
with self._startup_recheck_lock:
if self._startup_recheck_scheduled:
return
self._startup_recheck_pending = False
self._startup_recheck_scheduled = True
def _run():
try:
self._startup_recheck_all()
finally:
with self._startup_recheck_lock:
self._startup_recheck_scheduled = False
timer = threading.Timer(delay, _run)
timer.daemon = True
timer.start()
def _on_nodes_updated(self, total: int):
if total <= 0 or not self.enabled:
return
with self._startup_recheck_lock:
pending = self._startup_recheck_pending
if pending and self._has_ready_local_node():
logger.warning(
"local node connected, running deferred startup refresh"
)
self._schedule_startup_recheck(delay=0.5)
def _reload_from_action_file(self, action_path: str | None = None):
action_path = (action_path or self.default_conf).strip() or self.default_conf
try:
raw_action = read_json_locked(action_path)
except Exception as exc:
logger.warning(
"failed to read action file %s: %r",
action_path,
exc,
)
return False, str(exc)
if not isinstance(raw_action, dict):
logger.warning(
"invalid action payload in %s: %r",
action_path,
type(raw_action).__name__,
)
return False, f"invalid action payload type: {type(raw_action).__name__}"
actions_obj = raw_action.get("actions", {})
if not isinstance(actions_obj, dict):
actions_obj = {}
action_cfg = actions_obj.get("list_subscriptions", {})
if not isinstance(action_cfg, dict):
action_cfg = {}
self._load_action_config(action_cfg)
self._session.headers.update(
{"User-Agent": self._config.defaults.user_agent or DEFAULT_UA}
)
self.compile()
return True, None
# -------- metadata/files handling --------
def _paths(self, sub: SubscriptionSpec):
if self._config is None:
@@ -134,6 +301,15 @@ class ListSubscriptions(PluginBase):
os.makedirs(lists_dir, mode=0o700, exist_ok=True)
sources_dir = os.path.join(lists_dir, "sources.list.d")
os.makedirs(sources_dir, mode=0o700, exist_ok=True)
safe_filename = os.path.basename((sub.filename or "").strip())
if safe_filename == "":
safe_filename = "subscription.list"
safe_filename = ensure_filename_type_suffix(safe_filename, sub.format)
list_path = os.path.join(sources_dir, safe_filename)
meta_path = list_path + ".meta.json"
return list_path, meta_path
def _subscription_dirname(self, sub: SubscriptionSpec):
safe_filename = os.path.basename((sub.filename or "").strip())
if safe_filename == "":
safe_filename = "subscription.list"
@@ -144,21 +320,21 @@ class ListSubscriptions(PluginBase):
sub_dirname = base if base else "subscription"
if not sub_dirname.lower().endswith(suffix):
sub_dirname = f"{sub_dirname}{suffix}"
sub_dir = os.path.join(sources_dir, sub_dirname)
os.makedirs(sub_dir, mode=0o700, exist_ok=True)
list_path = os.path.join(sub_dir, safe_filename)
meta_path = list_path + ".meta.json"
return list_path, meta_path
return sub_dirname
def _rules_root_dir(self):
if self._config is None:
return os.path.join(self.default_lists_dir, "rules.list.d")
return os.path.join(normalize_lists_dir(self._config.defaults.lists_dir), "rules.list.d")
return os.path.join(
normalize_lists_dir(self._config.defaults.lists_dir), "rules.list.d"
)
def _sources_root_dir(self):
if self._config is None:
return os.path.join(self.default_lists_dir, "sources.list.d")
return os.path.join(normalize_lists_dir(self._config.defaults.lists_dir), "sources.list.d")
return os.path.join(
normalize_lists_dir(self._config.defaults.lists_dir), "sources.list.d"
)
def _sync_sources_dirs(self):
if self._config is None:
@@ -166,18 +342,16 @@ class ListSubscriptions(PluginBase):
sources_dir = self._sources_root_dir()
os.makedirs(sources_dir, mode=0o700, exist_ok=True)
desired_dirs: set[str] = set()
desired_paths: set[str] = set()
for sub in self._config.subscriptions:
list_path, _ = self._paths(sub)
desired_dirs.add(os.path.dirname(list_path))
list_path, meta_path = self._paths(sub)
desired_paths.add(list_path)
desired_paths.add(meta_path)
for entry in os.listdir(sources_dir):
p = os.path.join(sources_dir, entry)
try:
if os.path.isdir(p) and not os.path.islink(p):
if p not in desired_dirs:
shutil.rmtree(p)
else:
if p not in desired_paths:
os.unlink(p)
except Exception:
pass
@@ -195,9 +369,7 @@ class ListSubscriptions(PluginBase):
if not os.path.exists(list_path):
continue
raw_groups: tuple[str, ...] = getattr(sub, "groups", tuple())
groups: list[str] = list(raw_groups)
groups.append("all")
groups = sorted(normalize_group(g) for g in set(groups))
groups = [self._subscription_dirname(sub), "all", *normalize_groups(raw_groups)]
link_name = f"{idx:02d}-{os.path.basename(list_path)}"
for group in groups:
desired.setdefault(group, {})[link_name] = list_path
@@ -218,7 +390,7 @@ class ListSubscriptions(PluginBase):
except Exception:
pass
for group_name in (existing_groups | set(desired.keys())):
for group_name in existing_groups | set(desired.keys()):
group_dir = os.path.join(rules_dir, group_name)
desired_links = desired.get(group_name, {})
if desired_links:
@@ -244,7 +416,9 @@ class ListSubscriptions(PluginBase):
in_sync = False
try:
if os.path.islink(entry_path):
in_sync = os.path.realpath(entry_path) == os.path.realpath(expected_target)
in_sync = os.path.realpath(entry_path) == os.path.realpath(
expected_target
)
except Exception:
in_sync = False
@@ -271,12 +445,91 @@ class ListSubscriptions(PluginBase):
def _load_meta(self, meta_path: str):
try:
return ListMetadata.from_dict(read_json(meta_path))
return ListMetadata.from_dict(read_json_locked(meta_path))
except Exception:
return ListMetadata()
def _save_meta(self, meta_path: str, meta: ListMetadata):
write_json_atomic(meta_path, meta.to_dict())
write_json_atomic_locked(meta_path, meta.to_dict())
def _fsync_parent_dir(self, path: str):
parent = os.path.dirname(path)
if parent == "":
return
try:
dir_fd = os.open(parent, os.O_RDONLY | getattr(os, "O_DIRECTORY", 0))
except Exception:
return
try:
os.fsync(dir_fd)
except Exception:
pass
finally:
os.close(dir_fd)
def _affected_rule_dirs(self, sub: SubscriptionSpec):
affected_dirs = {os.path.join(self._rules_root_dir(), self._subscription_dirname(sub))}
rules_root = self._rules_root_dir()
affected_dirs.add(os.path.join(rules_root, "all"))
for group in normalize_groups(sub.groups):
affected_dirs.add(os.path.join(rules_root, group))
return {
os.path.normpath(path)
for path in affected_dirs
if path.strip() != ""
}
def _reload_rules_for_updated_subscription(self, sub: SubscriptionSpec):
try:
affected_dirs = self._affected_rule_dirs(sub)
found_match = False
for addr in self._nodes.get().keys():
if not self._nodes.is_local(addr):
continue
records = self._db.get_rules(addr)
if records is None or records == -1:
continue
matched = False
while records.next():
rule = Rule.new_from_records(records)
if rule.operator.operand == Config.OPERAND_LIST_DOMAINS:
direct_dir = os.path.normpath(str(rule.operator.data or "").strip())
if direct_dir in affected_dirs:
matched = True
if not matched:
for operator in getattr(rule.operator, "list", []):
if operator.operand != Config.OPERAND_LIST_DOMAINS:
continue
nested_dir = os.path.normpath(str(operator.data or "").strip())
if nested_dir in affected_dirs:
matched = True
break
if not matched:
continue
notification = ui_pb2.Notification(
type=ui_pb2.CHANGE_RULE,
rules=[rule],
)
self._nodes.send_notification(addr, notification, None)
found_match = True
logger.warning(
"signaling affected rule '%s' for updated subscription '%s'",
rule.name,
sub.name,
)
break
if found_match is False:
logger.warning(
"no matching rules found for updated subscription '%s'",
sub.name,
)
except Exception as e:
logger.warning(
"reload rules after updating '%s' failed: %s",
sub.name,
repr(e),
)
# -------- timer lifecycle --------
@@ -285,7 +538,7 @@ class ListSubscriptions(PluginBase):
return hashlib.sha1(base.encode("utf-8")).hexdigest()[:16]
def configure(self, parent: Any = None):
if type(parent) == StatsDialog:
if type(parent) == StatsDialog: # noqa: E721
if self._cfg_action is not None:
return
@@ -293,8 +546,12 @@ class ListSubscriptions(PluginBase):
if menu is None:
return
icon_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "blocklist.svg")
icon = QtGui.QIcon(icon_path) if os.path.exists(icon_path) else QtGui.QIcon()
icon_path = os.path.join(
os.path.abspath(os.path.dirname(__file__)), "res", "blocklist.svg"
)
icon = (
QtGui.QIcon(icon_path) if os.path.exists(icon_path) else QtGui.QIcon()
)
quit_action = self._find_quit_action(menu)
if quit_action is not None:
@@ -312,7 +569,9 @@ class ListSubscriptions(PluginBase):
else:
self._cfg_action = menu.addAction("List subscriptions")
self._cfg_action.triggered.connect(lambda *_: self._open_config_dialog(parent))
self._cfg_action.triggered.connect(
lambda *_: self._open_config_dialog(parent)
)
def _find_quit_action(self, menu: Any):
qt_key = getattr(getattr(QtCore, "Qt", object()), "Key", None)
@@ -324,7 +583,11 @@ class ListSubscriptions(PluginBase):
if txt == "quit":
return act
shortcut = act.shortcut()
if key_q is not None and shortcut and shortcut.matches(QtGui.QKeySequence(key_q)):
if (
key_q is not None
and shortcut
and shortcut.matches(QtGui.QKeySequence(key_q))
):
return act
# In OpenSnitch main actions menu, Quit is typically the last entry.
acts = [a for a in menu.actions() if not a.isSeparator()]
@@ -344,7 +607,9 @@ class ListSubscriptions(PluginBase):
if self._cfg_dialog is None:
# Some wrapped dialog types are not accepted as QWidget parents by
# PyQt6 constructors in plugin context. Use a top-level dialog.
self._cfg_dialog = _gui.ListSubscriptionsDialog(parent=None, appicon=appicon)
self._cfg_dialog = _gui.ListSubscriptionsDialog(
parent=None, appicon=appicon
)
self._cfg_dialog.show()
self._cfg_dialog.raise_()
self._cfg_dialog.activateWindow()
@@ -399,21 +664,15 @@ class ListSubscriptions(PluginBase):
if parent == StatsDialog:
pass
self._running = True
for t in self.scheduled_tasks.values():
try:
t.start()
except Exception:
pass
# Validate + force download all subscriptions at startup.
th = threading.Thread(target=self._startup_recheck_all, daemon=True)
th.start()
self._start_runtime(recheck=True)
def _startup_recheck_all(self):
if self._config is None:
if self._config is None or not self.enabled:
return
if not self._has_ready_local_node():
with self._startup_recheck_lock:
self._startup_recheck_pending = True
logger.warning("startup refresh skipped, no local node is ready yet")
return
for sub in self._config.subscriptions:
if not sub.enabled:
@@ -421,12 +680,16 @@ class ListSubscriptions(PluginBase):
try:
self.force_refresh_subscription(sub)
except Exception as e:
logger.warning("list_subscriptions: startup recheck error name='%s' err=%s", sub.name, repr(e))
logger.warning(
"startup recheck error name='%s' err=%s",
sub.name,
repr(e),
)
self._sync_global_symlinks()
def stop(self):
"""
Stop timers.
Stop timers and clear them from memory.
"""
for t in self.scheduled_tasks.values():
try:
@@ -434,7 +697,6 @@ class ListSubscriptions(PluginBase):
except Exception:
pass
self.scheduled_tasks.clear()
self._running = False
# -------- scheduled execution --------
@@ -448,15 +710,15 @@ class ListSubscriptions(PluginBase):
sub: SubscriptionSpec
key, sub = args
# due/backoff gate via sidecar meta
# due/backoff gate via metadata
_, meta_path = self._paths(sub)
meta = self._load_meta(meta_path)
if self._in_backoff(meta):
logger.warning("list_subscriptions: skip '%s' (in backoff)", sub.name)
logger.warning("skip '%s' (in backoff)", sub.name)
return
if not self._is_due(meta, sub):
logger.warning("list_subscriptions: skip '%s' (not due yet)", sub.name)
logger.warning("skip '%s' (not due yet)", sub.name)
return
th = threading.Thread(target=self.download, args=(key, sub))
@@ -488,34 +750,103 @@ class ListSubscriptions(PluginBase):
else:
result_msg = self._err_msg or f"{sub.name} failed: {', '.join(statuses)}"
if self._notify is not None and self._desktop_notifications.is_available() and self._desktop_notifications.are_enabled():
self._desktop_notifications.show(self._notify_title, result_msg, self._app_icon)
if (
self._notify is not None
and self._desktop_notifications.is_available()
and self._desktop_notifications.are_enabled()
):
self._desktop_notifications.show(
self._notify_title, result_msg, self._app_icon
)
def force_refresh_subscription(self, sub: SubscriptionSpec):
key = self._sub_key(sub)
logger.warning(
"list_subscriptions: force refresh requested name='%s' url='%s' file='%s'",
sub.name, sub.url, sub.filename
)
ok = self.download(key, sub)
logger.warning(
"list_subscriptions: force refresh finished name='%s' result=%s",
sub.name, "ok" if ok else "error"
)
ok = self.download(key, sub, force=True)
self._sync_global_symlinks()
return ok
def cb_signal(self, signal: Any):
logger.debug("cb_signal: %s, %s", self.name, signal)
def cb_signal(self, signal: dict[str, Any]):
try:
if signal == PluginSignal.ENABLE:
self.enabled = True
sig = signal.get("signal")
action_path = signal.get("action_path")
if signal['signal'] == PluginSignal.DISABLE or signal['signal'] == PluginSignal.STOP: #type: ignore[union-attr]
for t in self.scheduled_tasks:
logger.debug("cb_signal.stopping task: %s, %s", self.name, signal)
self.scheduled_tasks[t].stop()
if sig == PluginSignal.ENABLE:
logger.warning(
"cb_signal: ENABLE action_path=%r",
action_path,
)
ok, err = self._reload_from_action_file(action_path)
if ok:
self.enabled = True
self.run()
self._emit_runtime_event(
RuntimeEvent.RUNTIME_ENABLED,
"Plugin runtime enabled.",
action_path=action_path,
)
else:
self._emit_runtime_event(
RuntimeEvent.RUNTIME_ERROR,
"Failed to enable plugin runtime.",
error=err,
action_path=action_path,
)
return
if sig == PluginSignal.CONFIG_UPDATE:
logger.warning(
"cb_signal: CONFIG_UPDATE action_path=%r",
action_path,
)
self.stop()
ok, err = self._reload_from_action_file(action_path)
if ok:
if self.enabled:
self.run()
self._emit_runtime_event(
RuntimeEvent.CONFIG_RELOADED,
"Plugin runtime configuration reloaded.",
action_path=action_path,
)
else:
self._emit_runtime_event(
RuntimeEvent.RUNTIME_ERROR,
"Failed to reload plugin runtime configuration.",
error=err,
action_path=action_path,
)
return
if sig == PluginSignal.DISABLE or sig == PluginSignal.STOP:
logger.warning(
"cb_signal: %s action_path=%r",
"DISABLE" if sig == PluginSignal.DISABLE else "STOP",
action_path,
)
self.enabled = False
self.stop()
self._emit_runtime_event(
RuntimeEvent.RUNTIME_DISABLED
if sig == PluginSignal.DISABLE
else RuntimeEvent.RUNTIME_STOPPED,
"Plugin runtime disabled."
if sig == PluginSignal.DISABLE
else "Plugin runtime stopped.",
action_path=action_path,
)
return
if sig == PluginSignal.ERROR:
err = str(signal.get("error") or signal.get("message") or "")
self._emit_runtime_event(
RuntimeEvent.RUNTIME_ERROR,
"Plugin runtime reported an error.",
error=err or None,
action_path=action_path,
)
return
raise ValueError(f"unrecognized signal: {sig}")
except Exception as e:
logger.warning("cb_signal() exception: %s", repr(e))
@@ -533,7 +864,9 @@ class ListSubscriptions(PluginBase):
lc = parse_iso(meta.last_checked)
if not lc:
return True
return (datetime.now().astimezone() - lc).total_seconds() >= sub.interval_seconds
return (
datetime.now().astimezone() - lc
).total_seconds() >= sub.interval_seconds
# -------- worker: download + update metadata --------
@@ -543,15 +876,13 @@ class ListSubscriptions(PluginBase):
meta.last_result = "error"
seconds = min((2 ** max(0, meta.fail_count)) * 60, 6 * 3600)
meta.backoff_until = (datetime.now().astimezone() + timedelta(seconds=seconds)).isoformat()
meta.backoff_until = (
datetime.now().astimezone() + timedelta(seconds=seconds)
).isoformat()
def download(self, key: str, sub: SubscriptionSpec):
def download(self, key: str, sub: SubscriptionSpec, force: bool = False):
list_path, meta_path = self._paths(sub)
os.makedirs(os.path.dirname(list_path), exist_ok=True)
logger.warning(
"list_subscriptions: download start key=%s name='%s' dst='%s'",
key, sub.name, list_path
)
meta = self._load_meta(meta_path)
@@ -565,9 +896,9 @@ class ListSubscriptions(PluginBase):
# conditional headers
headers: dict[str, str] = {}
if meta.etag:
if not force and meta.etag:
headers["If-None-Match"] = meta.etag
if meta.last_modified:
if not force and meta.last_modified:
headers["If-Modified-Since"] = meta.last_modified
headers["User-Agent"] = self._config.defaults.user_agent or DEFAULT_UA
@@ -591,6 +922,7 @@ class ListSubscriptions(PluginBase):
self._resultsQueue.put((key, False, "request_error"))
return False
response_closed = False
try:
if r.status_code == 304:
meta.fail_count = 0
@@ -598,14 +930,18 @@ class ListSubscriptions(PluginBase):
meta.last_result = "not_modified"
self._save_meta(meta_path, meta)
self._resultsQueue.put((key, True, "not_modified"))
logger.warning("list_subscriptions: download not-modified name='%s'", sub.name)
logger.warning("subscription not-modified name='%s'", sub.name)
return True
if r.status_code != 200:
self._mark_failure(meta, f"http_{r.status_code}")
self._save_meta(meta_path, meta)
self._resultsQueue.put((key, False, f"http_{r.status_code}"))
logger.warning("list_subscriptions: download http error name='%s' code=%s", sub.name, r.status_code)
logger.warning(
"subscription download http-error name='%s' code=%s",
sub.name,
r.status_code,
)
return False
cl: str | None = r.headers.get("Content-Length")
@@ -615,7 +951,11 @@ class ListSubscriptions(PluginBase):
self._mark_failure(meta, f"too_large:{cl}")
self._save_meta(meta_path, meta)
self._resultsQueue.put((key, False, "too_large"))
logger.warning("list_subscriptions: download too-large name='%s' len=%s", sub.name, cl)
logger.warning(
"subscription download too-large name='%s' len=%s",
sub.name,
cl,
)
return False
except Exception:
pass
@@ -634,7 +974,10 @@ class ListSubscriptions(PluginBase):
raise RuntimeError("too_large_streamed")
f.write(chunk)
if sub.format.lower() == "hosts" and len(sample_lines) < 200:
if (
sub.format.lower() == "hosts"
and len(sample_lines) < 200
):
txt = chunk.decode("utf-8", errors="ignore")
for ln in txt.splitlines():
if len(sample_lines) < 200:
@@ -645,7 +988,9 @@ class ListSubscriptions(PluginBase):
f.flush()
os.fsync(f.fileno())
if sub.format.lower() == "hosts" and not is_hosts_file_like(sample_lines):
if sub.format.lower() == "hosts" and not is_hosts_file_like(
sample_lines
):
try:
os.remove(tmp)
except Exception:
@@ -653,10 +998,14 @@ class ListSubscriptions(PluginBase):
self._mark_failure(meta, "bad_format_hosts")
self._save_meta(meta_path, meta)
self._resultsQueue.put((key, False, "bad_format"))
logger.warning("list_subscriptions: download bad-format name='%s'", sub.name)
logger.warning(
"subscription file bad-format name='%s'",
sub.name,
)
return False
os.replace(tmp, list_path)
self._fsync_parent_dir(list_path)
except Exception as e:
try:
@@ -667,7 +1016,11 @@ class ListSubscriptions(PluginBase):
self._mark_failure(meta, repr(e))
self._save_meta(meta_path, meta)
self._resultsQueue.put((key, False, "write_error"))
logger.warning("list_subscriptions: download write-error name='%s' err=%s", sub.name, repr(e))
logger.warning(
"subscription file write-error name='%s' err=%s",
sub.name,
repr(e),
)
return False
# update cache validators
@@ -684,16 +1037,28 @@ class ListSubscriptions(PluginBase):
meta.backoff_until = ""
meta.last_result = "updated"
self._save_meta(meta_path, meta)
logger.warning(
"subscription updated name='%s' bytes=%s",
sub.name,
downloaded,
)
r.close()
response_closed = True
self._reload_rules_for_updated_subscription(sub)
self._resultsQueue.put((key, True, "updated"))
logger.warning("list_subscriptions: download updated name='%s' bytes=%s", sub.name, downloaded)
return True
finally:
r.close()
if not response_closed:
r.close()
except Exception as e:
self._mark_failure(meta, repr(e))
self._save_meta(meta_path, meta)
self._resultsQueue.put((key, False, "unexpected_error"))
logger.warning("list_subscriptions: download unexpected-error name='%s' err=%s", sub.name, repr(e))
logger.warning(
"subscription download unexpected-error name='%s' err=%s",
sub.name,
repr(e),
)
return False
finally:

Before

Width:  |  Height:  |  Size: 1.5 KiB

After

Width:  |  Height:  |  Size: 1.5 KiB

@@ -57,6 +57,27 @@
</property>
</widget>
</item>
<item>
<widget class="QPushButton" name="start_runtime_button">
<property name="text">
<string>Start</string>
</property>
</widget>
</item>
<item>
<widget class="QPushButton" name="stop_runtime_button">
<property name="text">
<string>Stop</string>
</property>
</widget>
</item>
<item>
<widget class="QLabel" name="runtime_status_label">
<property name="text">
<string>Runtime: inactive</string>
</property>
</widget>
</item>
</layout>
</item>
<item>
@@ -183,7 +204,7 @@
<item>
<widget class="QGroupBox" name="rule_actions_box">
<property name="title">
<string>Rule actions</string>
<string>Selected subscription(s) actions</string>
</property>
<layout class="QHBoxLayout" name="ruleActionsLayout">
<item>
@@ -203,7 +224,7 @@
<item>
<widget class="QPushButton" name="refresh_now_button">
<property name="text">
<string>Refresh now</string>
<string>Refresh</string>
</property>
</widget>
</item>
@@ -39,54 +39,82 @@
<item row="1" column="1">
<widget class="QLineEdit" name="name_edit"/>
</item>
<item row="2" column="0">
<item row="2" column="1">
<widget class="QLabel" name="name_error_label">
<property name="text">
<string/>
</property>
</widget>
</item>
<item row="3" column="0">
<widget class="QLabel" name="url_label">
<property name="text">
<string>URL</string>
</property>
</widget>
</item>
<item row="2" column="1">
</widget>
</item>
<item row="3" column="1">
<widget class="QLineEdit" name="url_edit"/>
</item>
<item row="3" column="0">
<item row="4" column="1">
<widget class="QLabel" name="url_error_label">
<property name="text">
<string/>
</property>
</widget>
</item>
<item row="5" column="0">
<widget class="QLabel" name="filename_label">
<property name="text">
<string>Filename</string>
</property>
</widget>
</item>
<item row="3" column="1">
</widget>
</item>
<item row="5" column="1">
<widget class="QLineEdit" name="filename_edit"/>
</item>
<item row="4" column="0">
<item row="6" column="1">
<widget class="QLabel" name="filename_error_label">
<property name="text">
<string/>
</property>
</widget>
</item>
<item row="7" column="0">
<widget class="QLabel" name="format_label">
<property name="text">
<string>Format</string>
</property>
</widget>
</item>
<item row="4" column="1">
<item row="7" column="1">
<widget class="QComboBox" name="format_combo"/>
</item>
<item row="5" column="0">
<item row="8" column="0">
<widget class="QLabel" name="groups_label">
<property name="text">
<string>Groups</string>
</property>
</widget>
</item>
<item row="5" column="1">
<item row="8" column="1">
<widget class="QComboBox" name="group_combo"/>
</item>
<item row="6" column="0">
<item row="9" column="1">
<widget class="QLabel" name="group_error_label">
<property name="text">
<string/>
</property>
</widget>
</item>
<item row="10" column="0">
<widget class="QLabel" name="interval_label">
<property name="text">
<string>Interval</string>
</property>
</widget>
</item>
<item row="6" column="1">
<item row="10" column="1">
<layout class="QHBoxLayout" name="interval_layout">
<item>
<widget class="QSpinBox" name="interval_spin"/>
@@ -96,14 +124,14 @@
</item>
</layout>
</item>
<item row="7" column="0">
<item row="11" column="0">
<widget class="QLabel" name="timeout_label">
<property name="text">
<string>Timeout</string>
</property>
</widget>
</item>
<item row="7" column="1">
<item row="11" column="1">
<layout class="QHBoxLayout" name="timeout_layout">
<item>
<widget class="QSpinBox" name="timeout_spin"/>
@@ -113,14 +141,14 @@
</item>
</layout>
</item>
<item row="8" column="0">
<item row="12" column="0">
<widget class="QLabel" name="max_size_label">
<property name="text">
<string>Max size</string>
</property>
</widget>
</item>
<item row="8" column="1">
<item row="12" column="1">
<layout class="QHBoxLayout" name="max_size_layout">
<item>
<widget class="QSpinBox" name="max_size_spin"/>
@@ -298,6 +326,13 @@
</property>
</spacer>
</item>
<item>
<widget class="QPushButton" name="test_url_button">
<property name="text">
<string>Test URL</string>
</property>
</widget>
</item>
<item>
<widget class="QPushButton" name="cancel_button">
<property name="text">