This commit is contained in:
2026-06-03 21:26:54 +02:00
parent 05e6b8d061
commit d21e5175d5
125 changed files with 41986 additions and 0 deletions
+4
View File
@@ -0,0 +1,4 @@
"""
Ax-Shell services package.
Contains background services and utilities for the shell.
"""
+357
View File
@@ -0,0 +1,357 @@
import os
import subprocess
import re
import time
from fabric.core.service import Property, Service, Signal
from fabric.utils import exec_shell_command_async, monitor_file
from gi.repository import GLib
from loguru import logger
import utils.functions as helpers
from utils.colors import Colors
class Brightness(Service):
"""Service for controlling screen brightness using ddcutil or brightnessctl backends.
The service works with RAW values (0 to max_screen) for both backends:
- brightnessctl: raw values are device-specific (e.g., 0-96000)
- ddcutil: raw values are percentages (0-100)
The 'screen' signal emits percentage values (0-100) for UI display.
"""
instance = None
DDCUTIL_PARAMS = "--disable-dynamic-sleep --sleep-multiplier=0.05"
MIN_CHANGE_THRESHOLD = 2 # Minimum brightness change to apply (percent)
CACHE_INTERVAL = 3 # Cache duration in seconds
POLL_INTERVAL = 500 # File polling interval in ms
@staticmethod
def get_initial():
"""Singleton to get Brightness service instance."""
if Brightness.instance is None:
Brightness.instance = Brightness()
return Brightness.instance
@Signal
def screen(self, value: int) -> None:
"""Signal emitted when screen brightness changes (value: percentage from 0 to 100)."""
pass
def __init__(self, backend=None, **kwargs):
"""Initialize service with automatic backend detection."""
super().__init__(**kwargs)
self._pending_raw = None
self._timer_id = None
self._poll_timer_id = None
self._lock = GLib.Mutex()
self._last_percent = -1
self._last_raw = -1
self._last_update_time = 0
self._last_file_mtime = 0
# Detect backend
self.backend = self._detect_backend(backend)
self.max_screen = self._read_max_brightness() or 100
if self.backend:
if self.backend == "ddcutil":
# Initialize brightness cache
GLib.timeout_add(100, lambda: self._update_brightness_cache())
else:
# Setup polling for brightness file
self._setup_polling()
def _setup_polling(self):
"""Setup periodic polling of brightness file."""
try:
file_path = f"/sys/class/backlight/{self._get_screen_device()}/brightness"
if os.path.exists(file_path):
# Initialize cache with current value
with open(file_path) as f:
self._last_raw = int(f.readline().strip())
self._last_percent = (
int((self._last_raw / self.max_screen) * 100)
if self.max_screen > 0
else 0
)
self._last_file_mtime = os.path.getmtime(file_path)
self._poll_timer_id = GLib.timeout_add(
self.POLL_INTERVAL, self._check_brightness_file
)
except Exception as e:
logger.error(f"Error setting up brightness polling: {e}")
def _check_brightness_file(self):
"""Periodically check brightness file for changes."""
try:
file_path = f"/sys/class/backlight/{self._get_screen_device()}/brightness"
if os.path.exists(file_path):
current_mtime = os.path.getmtime(file_path)
if current_mtime > self._last_file_mtime:
self._last_file_mtime = current_mtime
with open(file_path) as f:
raw = int(f.readline().strip())
if raw != self._last_raw:
self._last_raw = raw
percent = (
int((raw / self.max_screen) * 100)
if self.max_screen > 0
else 0
)
if (
abs(percent - self._last_percent)
>= self.MIN_CHANGE_THRESHOLD
):
self._last_percent = percent
self.emit("screen", percent)
return True
except Exception as e:
logger.error(f"Error checking brightness file: {e}")
return True
def _detect_backend(self, backend):
"""Detect appropriate backend for brightness control."""
if backend:
logger.info(f"Using forced backend: {backend}")
return backend
# Try brightnessctl first (preferred for laptop internal displays)
if helpers.executable_exists("brightnessctl"):
device = self._get_screen_device()
if device: # Non-empty string means device found
logger.info(f"Using brightnessctl backend with device: {device}")
return "brightnessctl"
else:
logger.debug(
"brightnessctl is available but no backlight devices found in /sys/class/backlight/"
)
# Try ddcutil for external monitors (via DDC/CI protocol)
if helpers.executable_exists("ddcutil"):
bus = self._detect_ddcutil_bus()
if bus != -1:
self.ddcutil_bus = bus
logger.info(f"Using ddcutil backend with I2C bus: {bus}")
return "ddcutil"
else:
logger.debug(
"ddcutil is available but no DDC/CI capable monitors detected"
)
logger.warning(
"No available backend for brightness control - no backlight devices or DDC/CI monitors found"
)
return None
def _get_screen_device(self):
"""Return first backlight device from sysfs."""
try:
return os.listdir("/sys/class/backlight")[0]
except Exception:
return ""
def _detect_ddcutil_bus(self):
"""Detect I2C bus number for ddcutil."""
try:
process = subprocess.run(
["ddcutil", "detect"], text=True, capture_output=True, timeout=2
)
if process.returncode == 0:
match = re.search(r"I2C bus:\s*/dev/i2c-(\d+)", process.stdout)
return int(match.group(1)) if match else -1
return -1
except Exception:
return -1
def _read_max_brightness(self):
"""Read maximum brightness value"""
if self.backend:
if self.backend == "ddcutil":
try:
process = subprocess.run(
[
"ddcutil",
"--bus",
str(self.ddcutil_bus),
*self.DDCUTIL_PARAMS.split(),
"getvcp",
"10",
],
text=True,
capture_output=True,
timeout=2,
)
if process.returncode == 0:
match = re.search(
r"current value\s*=\s*(\d+)\s*,\s*max value\s*=\s*(\d+)",
process.stdout,
)
if match:
return int(match.group(2))
except Exception as e:
logger.error(f"Error executing ddcutil: {e}")
else:
try:
with open(
f"/sys/class/backlight/{self._get_screen_device()}/max_brightness"
) as f:
return int(f.readline().strip())
except Exception:
return None
def _update_brightness_cache(self):
"""Update brightness cache with current value."""
if self.backend == "ddcutil":
self.screen_brightness # This will update the cache
return False
@Property(int, "read-write")
def screen_brightness(self):
"""Getter returns current brightness in RAW value (0 to max_screen)."""
if not self.backend:
return -1
if self.backend == "brightnessctl":
# Return cached raw value if available
if self._last_raw != -1:
return self._last_raw
try:
with open(
f"/sys/class/backlight/{self._get_screen_device()}/brightness"
) as f:
raw = int(f.readline().strip())
self._last_raw = raw
return raw
except Exception as e:
logger.error(f"Error reading brightness file: {e}")
return -1
elif self.backend == "ddcutil":
# Use cached value if recent enough
if (
time.time() - self._last_update_time < self.CACHE_INTERVAL
and self._last_raw != -1
):
return self._last_raw
try:
process = subprocess.run(
[
"ddcutil",
"--bus",
str(self.ddcutil_bus),
*self.DDCUTIL_PARAMS.split(),
"getvcp",
"10",
],
text=True,
capture_output=True,
timeout=2,
)
if process.returncode == 0:
match = re.search(
r"current value\s*=\s*(\d+)\s*,\s*max value\s*=\s*(\d+)",
process.stdout,
)
if match:
current = int(match.group(1))
# For ddcutil, raw value IS the current value (0-100)
self._last_raw = current
self._last_update_time = time.time()
return current
except Exception as e:
logger.error(f"Error executing ddcutil: {e}")
return self._last_raw if self._last_raw != -1 else -1
@screen_brightness.setter
def screen_brightness(self, value: int):
"""Setter accepts brightness value in RAW (0 to max_screen)."""
self._lock.lock()
try:
# Limit value between 0 and max_screen
value = max(0, min(value, self.max_screen))
# Check if change is significant enough (in percentage terms)
current_percent = (
int((self._last_raw / self.max_screen) * 100)
if self._last_raw != -1 and self.max_screen > 0
else -1
)
new_percent = (
int((value / self.max_screen) * 100) if self.max_screen > 0 else 0
)
if (
abs(new_percent - current_percent) < self.MIN_CHANGE_THRESHOLD
and self._last_raw != -1
):
return
self._pending_raw = value
# Use a single timer for applying changes
if self._timer_id:
GLib.source_remove(self._timer_id)
self._timer_id = GLib.timeout_add(50, self._apply_brightness)
finally:
self._lock.unlock()
def _apply_brightness(self):
"""Apply pending brightness change with optimized debouncing."""
self._lock.lock()
try:
if self._pending_raw is None:
self._timer_id = None
return False
raw = self._pending_raw
self._pending_raw = None
self._timer_id = None
finally:
self._lock.unlock()
try:
# Update cache before executing command for faster UI response
self._last_raw = raw
# Calculate percentage for signal emission
percent = int((raw / self.max_screen) * 100) if self.max_screen > 0 else 0
if self.backend == "brightnessctl":
self.emit("screen", percent)
exec_shell_command_async(
f"brightnessctl --device '{self._get_screen_device()}' set {raw}"
)
elif self.backend == "ddcutil":
self._last_update_time = time.time()
self.emit("screen", percent)
exec_shell_command_async(
f"ddcutil --bus {self.ddcutil_bus} {self.DDCUTIL_PARAMS} --terse setvcp 10 {raw}",
lambda exit_code, stdout, stderr: logger.error(
f"ddcutil error (code {exit_code}): {stderr}"
)
if exit_code != 0
else None,
)
except Exception as e:
logger.error(f"Error setting brightness: {e}")
return False
def cleanup(self):
"""Clean up resources when service is stopped."""
if self._timer_id:
GLib.source_remove(self._timer_id)
self._timer_id = None
if self._poll_timer_id:
GLib.source_remove(self._poll_timer_id)
self._poll_timer_id = None
+233
View File
@@ -0,0 +1,233 @@
import json
import subprocess
import threading
from typing import Optional
class Signal:
"""Simple signal implementation for monitor focus service."""
def __init__(self):
self._callbacks = []
def connect(self, callback):
"""Connect a callback to this signal."""
self._callbacks.append(callback)
def emit(self, *args, **kwargs):
"""Emit the signal to all connected callbacks."""
for callback in self._callbacks:
try:
callback(*args, **kwargs)
except Exception as e:
print(f"Error in signal callback: {e}")
class MonitorFocusService:
"""
Service to track monitor focus changes through Hyprland events.
Listens to 'focusedmon' and 'workspace' events and emits signals
when monitor focus changes.
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if hasattr(self, '_initialized'):
return
self._initialized = True
self._monitor_name_to_id = {}
self._monitor_info = {} # Store rich monitor information
self._current_workspace = 1
self._current_monitor_name = ""
self._listening = False
self._thread = None
# Signals
self.monitor_focused = Signal()
self.workspace_changed = Signal()
self._update_monitor_mapping()
self.start_listening()
def _update_monitor_mapping(self):
"""Update the monitor name to ID mapping with rich monitor information."""
try:
# Import here to avoid circular imports
from utils.monitor_manager import get_monitor_manager
manager = get_monitor_manager()
monitors = manager.get_monitors()
self._monitor_name_to_id = {}
self._monitor_info = {} # Store rich monitor information
for monitor in monitors:
monitor_name = monitor['name']
monitor_id = monitor['id']
self._monitor_name_to_id[monitor_name] = monitor_id
self._monitor_info[monitor_id] = {
'name': monitor_name,
'width': monitor.get('width', 1920),
'height': monitor.get('height', 1080),
'x': monitor.get('x', 0),
'y': monitor.get('y', 0),
'scale': monitor.get('scale', 1.0),
'focused': monitor.get('focused', False)
}
except ImportError:
# Fallback if monitor manager not available yet
self._monitor_name_to_id = {}
self._monitor_info = {}
def start_listening(self):
"""Start listening to Hyprland events in a separate thread."""
if self._listening:
return
self._listening = True
self._thread = threading.Thread(target=self._listen_to_hyprland, daemon=True)
self._thread.start()
def stop_listening(self):
"""Stop listening to Hyprland events."""
self._listening = False
if self._thread and self._thread.is_alive():
self._thread.join(timeout=1.0)
def _listen_to_hyprland(self):
"""Listen to Hyprland events via socat."""
try:
process = subprocess.Popen(
["socat", "-U", "-", "UNIX-CONNECT:/tmp/hypr/$HYPRLAND_INSTANCE_SIGNATURE/.socket2.sock"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1
)
while self._listening and process.poll() is None:
if process.stdout:
line = process.stdout.readline()
if line:
self._handle_hyprland_event(line.strip())
except (subprocess.SubprocessError, FileNotFoundError) as e:
print(f"MonitorFocusService: Error listening to Hyprland: {e}")
except Exception as e:
print(f"MonitorFocusService: Unexpected error: {e}")
def _handle_hyprland_event(self, event_line: str):
"""Parse and handle Hyprland event."""
try:
if '>>' not in event_line:
return
parts = event_line.split('>>')
if len(parts) < 2:
return
event_type = parts[0]
event_data = parts[1]
if event_type == "focusedmon":
self._handle_focused_monitor(event_data)
elif event_type == "workspace":
self._handle_workspace_change(event_data)
except Exception as e:
print(f"MonitorFocusService: Error handling event '{event_line}': {e}")
def _handle_focused_monitor(self, data: str):
"""Handle focusedmon event: monitor_name,workspace_name"""
try:
parts = data.split(',')
if len(parts) >= 2:
monitor_name = parts[0]
workspace_name = parts[1]
# Update monitor mapping if needed
if monitor_name not in self._monitor_name_to_id:
self._update_monitor_mapping()
monitor_id = self._monitor_name_to_id.get(monitor_name, 0)
# Extract workspace ID from name
try:
workspace_id = int(workspace_name)
except ValueError:
workspace_id = 1
self._current_monitor_name = monitor_name
self._current_workspace = workspace_id
# Emit signal
self.monitor_focused.emit(monitor_name, monitor_id, workspace_id)
except Exception as e:
print(f"MonitorFocusService: Error in _handle_focused_monitor: {e}")
def _handle_workspace_change(self, data: str):
"""Handle workspace event: workspace_name"""
try:
workspace_name = data.strip()
# Extract workspace ID
try:
workspace_id = int(workspace_name)
except ValueError:
workspace_id = 1
self._current_workspace = workspace_id
# Emit signal
self.workspace_changed.emit(workspace_id, self._current_monitor_name)
except Exception as e:
print(f"MonitorFocusService: Error in _handle_workspace_change: {e}")
def get_current_monitor_id(self) -> int:
"""Get current monitor ID."""
return self._monitor_name_to_id.get(self._current_monitor_name, 0)
def get_current_workspace(self) -> int:
"""Get current workspace ID."""
return self._current_workspace
def get_monitor_id_by_name(self, monitor_name: str) -> Optional[int]:
"""Get monitor ID by name."""
return self._monitor_name_to_id.get(monitor_name)
def get_monitor_info(self, monitor_id: int) -> Optional[dict]:
"""Get rich monitor information by ID."""
return self._monitor_info.get(monitor_id)
def get_current_monitor_info(self) -> Optional[dict]:
"""Get rich information for current monitor."""
current_id = self.get_current_monitor_id()
return self.get_monitor_info(current_id)
def get_monitor_scale(self, monitor_id: int) -> float:
"""Get monitor scale factor by ID."""
info = self.get_monitor_info(monitor_id)
return info.get('scale', 1.0) if info else 1.0
def get_current_monitor_scale(self) -> float:
"""Get current monitor scale factor."""
return self.get_monitor_scale(self.get_current_monitor_id())
# Singleton accessor
_monitor_focus_service_instance = None
def get_monitor_focus_service() -> MonitorFocusService:
"""Get the global MonitorFocusService instance."""
global _monitor_focus_service_instance
if _monitor_focus_service_instance is None:
_monitor_focus_service_instance = MonitorFocusService()
return _monitor_focus_service_instance
+279
View File
@@ -0,0 +1,279 @@
# Standard library imports
import contextlib
# Third-party imports
import gi
from gi.repository import GLib # type: ignore
from loguru import logger
# Fabric imports
from fabric.core.service import Property, Service, Signal
from fabric.utils import bulk_connect
class PlayerctlImportError(ImportError):
"""An error to raise when playerctl is not installed."""
def __init__(self, *args):
super().__init__(
"Playerctl is not installed, please install it first",
*args,
)
# Try to import Playerctl, raise custom error if not available
try:
gi.require_version("Playerctl", "2.0")
from gi.repository import Playerctl
except ValueError:
raise PlayerctlImportError
class MprisPlayer(Service):
"""A service to manage a mpris player."""
@Signal
def exit(self, value: bool) -> bool: ...
@Signal
def changed(self) -> None: ...
def __init__(
self,
player: Playerctl.Player,
**kwargs,
):
self._signal_connectors: dict = {}
self._player: Playerctl.Player = player
super().__init__(**kwargs)
for sn in ["playback-status", "loop-status", "shuffle", "volume", "seeked"]:
self._signal_connectors[sn] = self._player.connect(
sn,
lambda *args, sn=sn: self.notifier(sn, args),
)
self._signal_connectors["exit"] = self._player.connect(
"exit",
self.on_player_exit,
)
self._signal_connectors["metadata"] = self._player.connect(
"metadata",
lambda *args: self.update_status(),
)
GLib.idle_add(lambda *args: self.update_status_once())
def update_status(self):
# schedule each notifier asynchronously.
def notify_property(prop):
if self.get_property(prop) is not None:
self.notifier(prop)
for prop in [
"metadata",
"title",
"artist",
"arturl",
"length",
]:
GLib.idle_add(lambda p=prop: (notify_property(p), False))
for prop in [
"can-seek",
"can-pause",
"can-shuffle",
"can-go-next",
"can-go-previous",
]:
GLib.idle_add(lambda p=prop: (self.notifier(p), False))
def update_status_once(self):
# schedule notifier calls for each property
def notify_all():
for prop in self.list_properties(): # type: ignore
self.notifier(prop.name)
return False
GLib.idle_add(notify_all, priority=GLib.PRIORITY_DEFAULT_IDLE)
def notifier(self, name: str, args=None):
def notify_and_emit():
self.notify(name)
self.emit("changed")
return False
GLib.idle_add(notify_and_emit, priority=GLib.PRIORITY_DEFAULT_IDLE)
def on_player_exit(self, player):
for id in list(self._signal_connectors.values()):
with contextlib.suppress(Exception):
self._player.disconnect(id)
del self._signal_connectors
GLib.idle_add(lambda: (self.emit("exit", True), False))
del self._player
def toggle_shuffle(self):
if self.can_shuffle:
# schedule the shuffle toggle in the GLib idle loop
GLib.idle_add(lambda: (setattr(self, 'shuffle', not self.shuffle), False))
# else do nothing
def play_pause(self):
if self.can_pause:
GLib.idle_add(lambda: (self._player.play_pause(), False))
def next(self):
if self.can_go_next:
GLib.idle_add(lambda: (self._player.next(), False))
def previous(self):
if self.can_go_previous:
GLib.idle_add(lambda: (self._player.previous(), False))
# Properties
@Property(str, "readable")
def player_name(self) -> int:
return self._player.get_property("player-name") # type: ignore
@Property(int, "read-write", default_value=0)
def position(self) -> int:
return self._player.get_property("position") # type: ignore
@position.setter
def position(self, new_pos: int):
self._player.set_position(new_pos)
@Property(object, "readable")
def metadata(self) -> dict:
return self._player.get_property("metadata") # type: ignore
@Property(str or None, "readable")
def arturl(self) -> str | None:
if "mpris:artUrl" in self.metadata.keys(): # type: ignore # noqa: SIM118
return self.metadata["mpris:artUrl"] # type: ignore
return None
@Property(str or None, "readable")
def length(self) -> str | None:
if "mpris:length" in self.metadata.keys(): # type: ignore # noqa: SIM118
return self.metadata["mpris:length"] # type: ignore
return None
@Property(str, "readable")
def artist(self) -> str:
artist = self._player.get_artist() # type: ignore
if isinstance(artist, (list, tuple)):
return ", ".join(artist)
return artist
@Property(str, "readable")
def album(self) -> str:
return self._player.get_album() # type: ignore
@Property(str, "readable")
def title(self) -> str:
if self._player is None:
return ""
title_data = self._player.get_title()
return title_data if isinstance(title_data, str) else ""
@Property(bool, "read-write", default_value=False)
def shuffle(self) -> bool:
return self._player.get_property("shuffle") # type: ignore
@shuffle.setter
def shuffle(self, do_shuffle: bool):
self.notifier("shuffle")
return self._player.set_shuffle(do_shuffle)
@Property(str, "readable")
def playback_status(self) -> str:
return {
Playerctl.PlaybackStatus.PAUSED: "paused",
Playerctl.PlaybackStatus.PLAYING: "playing",
Playerctl.PlaybackStatus.STOPPED: "stopped",
}.get(self._player.get_property("playback_status"), "unknown") # type: ignore
@Property(str, "read-write")
def loop_status(self) -> str:
return {
Playerctl.LoopStatus.NONE: "none",
Playerctl.LoopStatus.TRACK: "track",
Playerctl.LoopStatus.PLAYLIST: "playlist",
}.get(self._player.get_property("loop_status"), "unknown") # type: ignore
@loop_status.setter
def loop_status(self, status: str):
loop_status = {
"none": Playerctl.LoopStatus.NONE,
"track": Playerctl.LoopStatus.TRACK,
"playlist": Playerctl.LoopStatus.PLAYLIST,
}.get(status)
self._player.set_loop_status(loop_status) if loop_status else None
@Property(bool, "readable", default_value=False)
def can_go_next(self) -> bool:
return self._player.get_property("can_go_next") # type: ignore
@Property(bool, "readable", default_value=False)
def can_go_previous(self) -> bool:
return self._player.get_property("can_go_previous") # type: ignore
@Property(bool, "readable", default_value=False)
def can_seek(self) -> bool:
return self._player.get_property("can_seek") # type: ignore
@Property(bool, "readable", default_value=False)
def can_pause(self) -> bool:
return self._player.get_property("can_pause") # type: ignore
@Property(bool, "readable", default_value=False)
def can_shuffle(self) -> bool:
try:
self._player.set_shuffle(self._player.get_property("shuffle"))
return True
except Exception:
return False
@Property(bool, "readable", default_value=False)
def can_loop(self) -> bool:
try:
self._player.set_shuffle(self._player.get_property("shuffle"))
return True
except Exception:
return False
class MprisPlayerManager(Service):
"""A service to manage mpris players."""
@Signal
def player_appeared(self, player: Playerctl.Player) -> Playerctl.Player: ...
@Signal
def player_vanished(self, player_name: str) -> str: ...
def __init__(
self,
**kwargs,
):
self._manager = Playerctl.PlayerManager.new()
bulk_connect(
self._manager,
{
"name-appeared": self.on_name_appeard,
"name-vanished": self.on_name_vanished,
},
)
self.add_players()
super().__init__(**kwargs)
def on_name_appeard(self, manager, player_name: Playerctl.PlayerName):
logger.info(f"[MprisPlayer] {player_name.name} appeared")
new_player = Playerctl.Player.new_from_name(player_name)
manager.manage_player(new_player)
self.emit("player-appeared", new_player) # type: ignore
def on_name_vanished(self, manager, player_name: Playerctl.PlayerName):
logger.info(f"[MprisPlayer] {player_name.name} vanished")
self.emit("player-vanished", player_name.name) # type: ignore
def add_players(self):
for player in self._manager.get_property("player-names"): # type: ignore
self._manager.manage_player(Playerctl.Player.new_from_name(player)) # type: ignore
@Property(object, "readable")
def players(self):
return self._manager.get_property("players") # type: ignore
+323
View File
@@ -0,0 +1,323 @@
from typing import Any, List, Literal
import gi
from fabric.core.service import Property, Service, Signal
from fabric.utils import bulk_connect, exec_shell_command_async
from gi.repository import Gio
from loguru import logger
try:
gi.require_version("NM", "1.0")
from gi.repository import NM
except ValueError:
logger.error("Failed to start network manager")
class Wifi(Service):
"""A service to manage the wifi connection."""
@Signal
def changed(self) -> None: ...
@Signal
def enabled(self) -> bool: ...
def __init__(self, client: NM.Client, device: NM.DeviceWifi, **kwargs):
self._client: NM.Client = client
self._device: NM.DeviceWifi = device
self._ap: NM.AccessPoint | None = None
self._ap_signal: int | None = None
super().__init__(**kwargs)
self._client.connect(
"notify::wireless-enabled",
lambda *args: self.notifier("enabled", args),
)
if self._device:
bulk_connect(
self._device,
{
"notify::active-access-point": lambda *args: self._activate_ap(),
"access-point-added": lambda *args: self.emit("changed"),
"access-point-removed": lambda *args: self.emit("changed"),
"state-changed": lambda *args: self.ap_update(),
},
)
self._activate_ap()
def ap_update(self):
self.emit("changed")
for sn in [
"enabled",
"internet",
"strength",
"frequency",
"access-points",
"ssid",
"state",
"icon-name",
]:
self.notify(sn)
def _activate_ap(self):
if self._ap:
self._ap.disconnect(self._ap_signal)
self._ap = self._device.get_active_access_point()
if not self._ap:
return
self._ap_signal = self._ap.connect(
"notify::strength", lambda *args: self.ap_update()
) # type: ignore
def toggle_wifi(self):
self._client.wireless_set_enabled(not self._client.wireless_get_enabled())
# def set_active_ap(self, ap):
# self._device.access
def scan(self):
self._device.request_scan_async(
None,
lambda device, result: [
device.request_scan_finish(result),
self.emit("changed"),
],
)
def notifier(self, name: str, *args):
self.notify(name)
self.emit("changed")
return
@Property(bool, "read-write", default_value=False)
def enabled(self) -> bool: # type: ignore
return bool(self._client.wireless_get_enabled())
@enabled.setter
def enabled(self, value: bool):
self._client.wireless_set_enabled(value)
@Property(int, "readable")
def strength(self):
return self._ap.get_strength() if self._ap else -1
@Property(str, "readable")
def icon_name(self):
if not self._ap:
return "network-wireless-disabled-symbolic"
if self.internet == "activated":
return {
80: "network-wireless-signal-excellent-symbolic",
60: "network-wireless-signal-good-symbolic",
40: "network-wireless-signal-ok-symbolic",
20: "network-wireless-signal-weak-symbolic",
00: "network-wireless-signal-none-symbolic",
}.get(
min(80, 20 * round(self._ap.get_strength() / 20)),
"network-wireless-no-route-symbolic",
)
if self.internet == "activating":
return "network-wireless-acquiring-symbolic"
return "network-wireless-offline-symbolic"
@Property(int, "readable")
def frequency(self):
return self._ap.get_frequency() if self._ap else -1
@Property(int, "readable")
def internet(self):
return {
NM.ActiveConnectionState.ACTIVATED: "activated",
NM.ActiveConnectionState.ACTIVATING: "activating",
NM.ActiveConnectionState.DEACTIVATING: "deactivating",
NM.ActiveConnectionState.DEACTIVATED: "deactivated",
}.get(
self._device.get_active_connection().get_state(),
"unknown",
)
@Property(object, "readable")
def access_points(self) -> List[object]:
points: list[NM.AccessPoint] = self._device.get_access_points()
def make_ap_dict(ap: NM.AccessPoint):
return {
"bssid": ap.get_bssid(),
# "address": ap.get_
"last_seen": ap.get_last_seen(),
"ssid": NM.utils_ssid_to_utf8(ap.get_ssid().get_data())
if ap.get_ssid()
else "Unknown",
"active-ap": self._ap,
"strength": ap.get_strength(),
"frequency": ap.get_frequency(),
"icon-name": {
80: "network-wireless-signal-excellent-symbolic",
60: "network-wireless-signal-good-symbolic",
40: "network-wireless-signal-ok-symbolic",
20: "network-wireless-signal-weak-symbolic",
00: "network-wireless-signal-none-symbolic",
}.get(
min(80, 20 * round(ap.get_strength() / 20)),
"network-wireless-no-route-symbolic",
),
}
return list(map(make_ap_dict, points))
@Property(str, "readable")
def ssid(self):
if not self._ap:
return "Disconnected"
ssid = self._ap.get_ssid().get_data()
return NM.utils_ssid_to_utf8(ssid) if ssid else "Unknown"
@Property(int, "readable")
def state(self):
return {
NM.DeviceState.UNMANAGED: "unmanaged",
NM.DeviceState.UNAVAILABLE: "unavailable",
NM.DeviceState.DISCONNECTED: "disconnected",
NM.DeviceState.PREPARE: "prepare",
NM.DeviceState.CONFIG: "config",
NM.DeviceState.NEED_AUTH: "need_auth",
NM.DeviceState.IP_CONFIG: "ip_config",
NM.DeviceState.IP_CHECK: "ip_check",
NM.DeviceState.SECONDARIES: "secondaries",
NM.DeviceState.ACTIVATED: "activated",
NM.DeviceState.DEACTIVATING: "deactivating",
NM.DeviceState.FAILED: "failed",
}.get(self._device.get_state(), "unknown")
class Ethernet(Service):
"""A service to manage the ethernet connection."""
@Signal
def changed(self) -> None: ...
@Signal
def enabled(self) -> bool: ...
@Property(int, "readable")
def speed(self) -> int:
return self._device.get_speed()
@Property(str, "readable")
def internet(self) -> str:
return {
NM.ActiveConnectionState.ACTIVATED: "activated",
NM.ActiveConnectionState.ACTIVATING: "activating",
NM.ActiveConnectionState.DEACTIVATING: "deactivating",
NM.ActiveConnectionState.DEACTIVATED: "deactivated",
}.get(
self._device.get_active_connection().get_state(),
"disconnected",
)
@Property(str, "readable")
def icon_name(self) -> str:
network = self.internet
if network == "activated":
return "network-wired-symbolic"
elif network == "activating":
return "network-wired-acquiring-symbolic"
elif self._device.get_connectivity != NM.ConnectivityState.FULL:
return "network-wired-no-route-symbolic"
return "network-wired-disconnected-symbolic"
def __init__(self, client: NM.Client, device: NM.DeviceEthernet, **kwargs) -> None:
super().__init__(**kwargs)
self._client: NM.Client = client
self._device: NM.DeviceEthernet = device
for pn in (
"active-connection",
"icon-name",
"internet",
"speed",
"state",
):
self._device.connect(f"notify::{pn}", lambda *_: self.notifier(pn))
self._device.connect("notify::speed", lambda *_: print(_))
def notifier(self, pn):
self.notify(pn)
self.emit("changed")
class NetworkClient(Service):
"""A service to manage the network connections."""
@Signal
def device_ready(self) -> None: ...
def __init__(self, **kwargs):
self._client: NM.Client | None = None
self.wifi_device: Wifi | None = None
self.ethernet_device: Ethernet | None = None
super().__init__(**kwargs)
NM.Client.new_async(
cancellable=None,
callback=self._init_network_client,
**kwargs,
)
def _init_network_client(self, client: NM.Client, task: Gio.Task, **kwargs):
self._client = client
wifi_device: NM.DeviceWifi | None = self._get_device(NM.DeviceType.WIFI) # type: ignore
ethernet_device: NM.DeviceEthernet | None = self._get_device(
NM.DeviceType.ETHERNET
)
if wifi_device:
self.wifi_device = Wifi(self._client, wifi_device)
self.emit("device-ready")
if ethernet_device:
self.ethernet_device = Ethernet(client=self._client, device=ethernet_device)
self.emit("device-ready")
self.notify("primary-device")
def _get_device(self, device_type) -> Any:
devices: List[NM.Device] = self._client.get_devices() # type: ignore
return next(
(
x
for x in devices
if x.get_device_type() == device_type
and x.get_active_connection() is not None
),
None,
)
def _get_primary_device(self) -> Literal["wifi", "wired"] | None:
if not self._client:
return None
return (
"wifi"
if "wireless"
in str(self._client.get_primary_connection().get_connection_type())
else "wired"
if "ethernet"
in str(self._client.get_primary_connection().get_connection_type())
else None
)
def connect_wifi_bssid(self, bssid):
# We are using nmcli here, idk im lazy
exec_shell_command_async(
f"nmcli device wifi connect {bssid}", lambda *args: print(args)
)
@Property(str, "readable")
def primary_device(self) -> Literal["wifi", "wired"] | None:
return self._get_primary_device()