import os import tempfile import urllib.parse import urllib.request from fabric.widgets.box import Box from fabric.widgets.button import Button from fabric.widgets.centerbox import CenterBox from fabric.widgets.circularprogressbar import CircularProgressBar from fabric.widgets.label import Label from fabric.widgets.overlay import Overlay from fabric.widgets.stack import Stack from gi.repository import Gdk, Gio, GLib, Gtk import config.data as data import modules.icons as icons from modules.cavalcade import SpectrumRender from services.mpris import MprisPlayer, MprisPlayerManager from widgets.circle_image import CircleImage vertical_mode = False if data.PANEL_THEME == "Panel" and (data.BAR_POSITION in ["Left", "Right"] or data.PANEL_POSITION in ["Start", "End"]): vertical_mode = True def get_player_icon_markup_by_name(player_name): if player_name: pn = player_name.lower() if pn == "firefox": return icons.firefox elif pn == "spotify": return icons.spotify elif pn in ("chromium", "brave"): return icons.chromium return icons.disc def add_hover_cursor(widget): widget.add_events(Gdk.EventMask.ENTER_NOTIFY_MASK | Gdk.EventMask.LEAVE_NOTIFY_MASK) widget.connect("enter-notify-event", lambda w, event: w.get_window().set_cursor( Gdk.Cursor.new_from_name(Gdk.Display.get_default(), "pointer"))) widget.connect("leave-notify-event", lambda w, event: w.get_window().set_cursor(None)) class PlayerBox(Box): def __init__(self, mpris_player=None): super().__init__(orientation="v", h_align="fill", spacing=0, h_expand=False, v_expand=not vertical_mode) self.mpris_player = mpris_player self._progress_timer_id = None self.cover = CircleImage( name="player-cover", image_file=os.path.expanduser("~/.current.wall"), size=162 if not vertical_mode else 96, h_align="center", v_align="center", ) self.cover_placerholder = CircleImage( name="player-cover", size=198 if not vertical_mode else 132, h_align="center", v_align="center", ) self.title = Label(name="player-title", h_expand=True, h_align="fill", ellipsization="end", max_chars_width=1, style_classes=["vertical"] if vertical_mode else []) self.album = Label(name="player-album", h_expand=True, h_align="fill", ellipsization="end", max_chars_width=1) self.artist = Label(name="player-artist", h_expand=True, h_align="fill", ellipsization="end", max_chars_width=1) self.progressbar = CircularProgressBar( name="player-progress", size=198 if not vertical_mode else 132, h_align="center", v_align="center", start_angle=180, end_angle=360, ) self.time = Label(name="player-time", label="--:-- / --:--") self.overlay = Overlay( child=self.cover_placerholder, overlays=[self.progressbar, self.cover], ) self.overlay_container = CenterBox(name="player-overlay", center_children=[self.overlay]) self.title.set_label("Nothing Playing") self.album.set_label("Enjoy the silence") self.artist.set_label("¯\\_(ツ)_/¯") self.progressbar.set_value(0.0) self.prev = Button( name="player-btn", child=Label(name="player-btn-label", markup=icons.prev), h_expand=False, v_expand=False, h_align="center", v_align="center", ) self.backward = Button( name="player-btn", child=Label(name="player-btn-label", markup=icons.skip_back), h_expand=False, v_expand=False, h_align="center", v_align="center", ) self.play_pause = Button( name="player-btn", child=Label(name="player-btn-label", markup=icons.play, style_classes=["play-pause"]), style_classes=["play-pause"], h_expand=False, v_expand=False, h_align="center", v_align="center", ) self.forward = Button( name="player-btn", child=Label(name="player-btn-label", markup=icons.skip_forward), h_expand=False, v_expand=False, h_align="center", v_align="center", ) self.next = Button( name="player-btn", child=Label(name="player-btn-label", markup=icons.next), h_expand=False, v_expand=False, h_align="center", v_align="center", ) add_hover_cursor(self.prev) add_hover_cursor(self.backward) add_hover_cursor(self.play_pause) add_hover_cursor(self.forward) add_hover_cursor(self.next) self.btn_box = CenterBox( name="player-btn-box", orientation="h", center_children=[ Box( orientation="h", spacing=8, h_expand=True, h_align="fill", children=[ self.prev, self.backward, self.play_pause, self.forward, self.next, ] ) ] ) self.p_children=[ self.overlay_container, self.title, self.album, self.artist, self.btn_box, self.time, ] if not vertical_mode else [ self.overlay_container, Box( orientation="v", spacing=4, h_expand=True, h_align="fill", v_expand=False, v_align="center", children=[ self.title, self.album, self.btn_box, self.artist, self.time, ] ) ] self.player_box = Box( name="player-box", orientation="v" if not vertical_mode else "h", v_align="center", spacing=4, children=self.p_children, ) self.add(self.player_box) if mpris_player: self._apply_mpris_properties() self.prev.connect("clicked", self._on_prev_clicked) self.play_pause.connect("clicked", self._on_play_pause_clicked) self.backward.connect("clicked", self._on_backward_clicked) self.forward.connect("clicked", self._on_forward_clicked) self.next.connect("clicked", self._on_next_clicked) self.mpris_player.connect("changed", self._on_mpris_changed) else: self.play_pause.get_child().set_markup(icons.stop) self.play_pause.add_style_class("stop") self.backward.add_style_class("disabled") self.forward.add_style_class("disabled") self.prev.add_style_class("disabled") self.next.add_style_class("disabled") self.progressbar.set_value(0.0) self.time.set_text("--:-- / --:--") def _apply_mpris_properties(self): mp = self.mpris_player self.title.set_visible(bool(mp.title and mp.title.strip())) if mp.title and mp.title.strip(): self.title.set_text(mp.title) self.album.set_visible(bool(mp.album and mp.album.strip())) if mp.album and mp.album.strip(): self.album.set_text(mp.album) self.artist.set_visible(bool(mp.artist and mp.artist.strip())) if mp.artist and mp.artist.strip(): self.artist.set_text(mp.artist) if mp.arturl: parsed = urllib.parse.urlparse(mp.arturl) if parsed.scheme == "file": local_arturl = urllib.parse.unquote(parsed.path) self._set_cover_image(local_arturl) elif parsed.scheme in ("http", "https"): GLib.Thread.new("download-artwork", self._download_and_set_artwork, mp.arturl) else: self._set_cover_image(mp.arturl) else: fallback = os.path.expanduser("~/.current.wall") self._set_cover_image(fallback) file_obj = Gio.File.new_for_path(fallback) monitor = file_obj.monitor_file(Gio.FileMonitorFlags.NONE, None) monitor.connect("changed", self.on_wallpaper_changed) self._wallpaper_monitor = monitor self.update_play_pause_icon() self.progressbar.set_visible(True) self.time.set_visible(True) player_name = mp.player_name.lower() if hasattr(mp, "player_name") and mp.player_name else "" can_seek = hasattr(mp, "can_seek") and mp.can_seek if player_name == "firefox" or not can_seek: # Firefox and non-seekable players don't support progress tracking self.backward.add_style_class("disabled") self.forward.add_style_class("disabled") self.progressbar.set_value(0.0) self.time.set_text("--:-- / --:--") # Stop the timer since we can't track progress if self._progress_timer_id: GLib.source_remove(self._progress_timer_id) self._progress_timer_id = None else: # Enable seeking controls self.backward.remove_style_class("disabled") self.forward.remove_style_class("disabled") # Use adaptive timer based on playback status instead of fixed 1-second polling self._start_adaptive_progress_timer() if hasattr(mp, "can_go_previous") and mp.can_go_previous: self.prev.remove_style_class("disabled") else: self.prev.add_style_class("disabled") if hasattr(mp, "can_go_next") and mp.can_go_next: self.next.remove_style_class("disabled") else: self.next.add_style_class("disabled") def _start_adaptive_progress_timer(self): """Start progress timer with adaptive interval based on playback status""" if self._progress_timer_id: GLib.source_remove(self._progress_timer_id) # Use longer intervals when paused to reduce CPU usage if hasattr(self.mpris_player, 'playback_status') and self.mpris_player.playback_status == "playing": interval = 1000 # 1 second when playing else: interval = 5000 # 5 seconds when paused/stopped self._progress_timer_id = GLib.timeout_add(interval, self._update_progress) self._update_progress() # Update immediately def _set_cover_image(self, image_path): if image_path and os.path.isfile(image_path): self.cover.set_image_from_file(image_path) else: fallback = os.path.expanduser("~/.current.wall") self.cover.set_image_from_file(fallback) file_obj = Gio.File.new_for_path(fallback) monitor = file_obj.monitor_file(Gio.FileMonitorFlags.NONE, None) monitor.connect("changed", self.on_wallpaper_changed) self._wallpaper_monitor = monitor def _download_and_set_artwork(self, arturl): """ Download the artwork from the given URL asynchronously and update the cover image using GLib.idle_add to ensure UI updates occur on the main thread. """ try: parsed = urllib.parse.urlparse(arturl) suffix = os.path.splitext(parsed.path)[1] or ".png" with urllib.request.urlopen(arturl) as response: data = response.read() temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) temp_file.write(data) temp_file.close() local_arturl = temp_file.name except Exception: local_arturl = os.path.expanduser("~/.current.wall") GLib.idle_add(self._set_cover_image, local_arturl) return None def update_play_pause_icon(self): if self.mpris_player.playback_status == "playing": self.play_pause.get_child().set_markup(icons.pause) self.play_pause.add_style_class("playing") else: self.play_pause.get_child().set_markup(icons.play) self.play_pause.remove_style_class("playing") def on_wallpaper_changed(self, monitor, file, other_file, event): self.cover.set_image_from_file(os.path.expanduser("~/.current.wall")) def _on_prev_clicked(self, button): if self.mpris_player: self.mpris_player.previous() def _on_play_pause_clicked(self, button): if self.mpris_player: self.mpris_player.play_pause() self.update_play_pause_icon() def _on_backward_clicked(self, button): if self.mpris_player and self.mpris_player.can_seek and "disabled" not in self.backward.get_style_context().list_classes(): new_pos = max(0, self.mpris_player.position - 5000000) self.mpris_player.position = new_pos def _on_forward_clicked(self, button): if self.mpris_player and self.mpris_player.can_seek and "disabled" not in self.forward.get_style_context().list_classes(): new_pos = self.mpris_player.position + 5000000 self.mpris_player.position = new_pos def _on_next_clicked(self, button): if self.mpris_player: self.mpris_player.next() def _update_progress(self): if not self.mpris_player: if self._progress_timer_id: GLib.source_remove(self._progress_timer_id) self._progress_timer_id = None return False try: current = self.mpris_player.position except Exception: current = 0 try: total = int(self.mpris_player.length or 0) except Exception: total = 0 if total <= 0: progress = 0.0 self.time.set_text("--:-- / --:--") else: progress = (current / total) self.time.set_text(f"{self._format_time(current)} / {self._format_time(total)}") self.progressbar.set_value(progress) return True def _format_time(self, us): seconds = int(us / 1000000) minutes = seconds // 60 seconds = seconds % 60 return f"{minutes}:{seconds:02}" def _update_metadata(self): if not self.mpris_player: return False self._apply_mpris_properties() return True def _on_mpris_changed(self, *args): if not hasattr(self, "_update_pending") or not self._update_pending: self._update_pending = True GLib.idle_add(self._apply_mpris_properties_debounced) def _apply_mpris_properties_debounced(self): """Apply MPRIS properties with debouncing and restart adaptive timer""" if self.mpris_player: self._apply_mpris_properties() else: # Clean up timer when player is removed if self._progress_timer_id: GLib.source_remove(self._progress_timer_id) self._progress_timer_id = None self._update_pending = False return False class Player(Box): def __init__(self): super().__init__(name="player", orientation="v", h_align="fill", spacing=0, h_expand=False, v_expand=not vertical_mode) self.player_stack = Stack( name="player-stack", transition_type="slide-left-right", transition_duration=500, v_align="center", v_expand=not vertical_mode, ) self.switcher = Gtk.StackSwitcher( name="player-switcher" if not vertical_mode else "player-switcher-vertical", spacing=8, ) self.switcher.set_stack(self.player_stack) self.switcher.set_halign(Gtk.Align.CENTER) self.mpris_manager = MprisPlayerManager() players = self.mpris_manager.players if players: for p in players: mp = MprisPlayer(p) pb = PlayerBox(mpris_player=mp) self.player_stack.add_titled(pb, mp.player_name, mp.player_name) else: pb = PlayerBox(mpris_player=None) self.player_stack.add_titled(pb, "nothing", "Nothing Playing") self.mpris_manager.connect("player-appeared", self.on_player_appeared) self.mpris_manager.connect("player-vanished", self.on_player_vanished) self.switcher.set_visible(True) self.add(self.player_stack) self.add(self.switcher) GLib.idle_add(self._replace_switcher_labels) def on_player_appeared(self, manager, player): children = self.player_stack.get_children() if len(children) == 1 and not getattr(children[0], "mpris_player", None): self.player_stack.remove(children[0]) mp = MprisPlayer(player) pb = PlayerBox(mpris_player=mp) self.player_stack.add_titled(pb, mp.player_name, mp.player_name) self.switcher.set_visible(True) GLib.idle_add(lambda: self._update_switcher_for_player(mp.player_name)) GLib.idle_add(self._replace_switcher_labels) def on_player_vanished(self, manager, player_name): for child in self.player_stack.get_children(): if hasattr(child, "mpris_player") and child.mpris_player and child.mpris_player.player_name == player_name: self.player_stack.remove(child) break if not any(getattr(child, "mpris_player", None) for child in self.player_stack.get_children()): pb = PlayerBox(mpris_player=None) self.player_stack.add_titled(pb, "nothing", "Nothing Playing") self.switcher.set_visible(True) GLib.idle_add(self._replace_switcher_labels) def _replace_switcher_labels(self): buttons = self.switcher.get_children() for btn in buttons: if isinstance(btn, Gtk.ToggleButton): default_label = None for child in btn.get_children(): if isinstance(child, Gtk.Label): default_label = child break if default_label: label_player_name = getattr(default_label, "player_name", default_label.get_text().lower()) icon_markup = get_player_icon_markup_by_name(label_player_name) btn.remove(default_label) new_label = Label(name="player-label", markup=icon_markup) new_label.player_name = label_player_name btn.add(new_label) new_label.show_all() return False def _update_switcher_for_player(self, player_name): for btn in self.switcher.get_children(): if isinstance(btn, Gtk.ToggleButton): default_label = None for child in btn.get_children(): if isinstance(child, Gtk.Label): default_label = child break if default_label: label_player_name = getattr(default_label, "player_name", default_label.get_text().lower()) if label_player_name == player_name.lower(): icon_markup = get_player_icon_markup_by_name(player_name) btn.remove(default_label) new_label = Label(name="player-label", markup=icon_markup) new_label.player_name = player_name.lower() btn.add(new_label) new_label.show_all() return False class PlayerSmall(CenterBox): def __init__(self): super().__init__(name="player-small", orientation="h", h_align="fill", v_align="center") self._show_artist = False self._display_options = ["cavalcade", "title", "artist"] self._display_index = 0 self._current_display = "cavalcade" self.mpris_icon = Button( name="compact-mpris-icon", h_align="center", v_align="center", child=Label(name="compact-mpris-icon-label", markup=icons.disc) ) self.mpris_icon.add_events(Gdk.EventMask.BUTTON_PRESS_MASK) self.mpris_icon.connect("button-press-event", self._on_icon_button_press) child = self.mpris_icon.get_child() child.add_events(Gdk.EventMask.BUTTON_PRESS_MASK) child.connect("button-press-event", lambda widget, event: True) add_hover_cursor(self.mpris_icon) self.mpris_label = Label( name="compact-mpris-label", label="Nothing Playing", ellipsization="end", max_chars_width=26, h_align="center", ) self.mpris_button = Button( name="compact-mpris-button", h_align="center", v_align="center", child=Label(name="compact-mpris-button-label", markup=icons.play) ) self.mpris_button.add_events(Gdk.EventMask.BUTTON_PRESS_MASK) self.mpris_button.connect("button-press-event", self._on_play_pause_button_press) add_hover_cursor(self.mpris_button) self.cavalcade = SpectrumRender() self.cavalcade_box = self.cavalcade.get_spectrum_box() self.center_stack = Stack( name="compact-mpris", transition_type="crossfade", transition_duration=100, v_align="center", v_expand=False, children=[ self.cavalcade_box, self.mpris_label, ] ) self.center_stack.set_visible_child(self.cavalcade_box) self.mpris_small = CenterBox( name="compact-mpris", orientation="h", h_expand=True, h_align="fill", v_align="center", v_expand=False, start_children=self.mpris_icon, center_children=self.center_stack, end_children=self.mpris_button, ) self.add(self.mpris_small) self.mpris_manager = MprisPlayerManager() self.mpris_player = None self.current_index = 0 players = self.mpris_manager.players if players: mp = MprisPlayer(players[self.current_index]) self.mpris_player = mp self._apply_mpris_properties() self.mpris_player.connect("changed", self._on_mpris_changed) else: self._apply_mpris_properties() self.mpris_manager.connect("player-appeared", self.on_player_appeared) self.mpris_manager.connect("player-vanished", self.on_player_vanished) self.mpris_button.connect("clicked", self._on_play_pause_clicked) def _apply_mpris_properties(self): if not self.mpris_player: self.mpris_label.set_text("Nothing Playing") self.mpris_button.get_child().set_markup(icons.stop) self.mpris_icon.get_child().set_markup(icons.disc) if self._current_display != "cavalcade": self.center_stack.set_visible_child(self.mpris_label) else: self.center_stack.set_visible_child(self.cavalcade_box) return mp = self.mpris_player player_name = mp.player_name.lower() if hasattr(mp, "player_name") and mp.player_name else "" icon_markup = get_player_icon_markup_by_name(player_name) self.mpris_icon.get_child().set_markup(icon_markup) self.update_play_pause_icon() if self._current_display == "title": text = (mp.title if mp.title and mp.title.strip() else "Nothing Playing") self.mpris_label.set_text(text) self.center_stack.set_visible_child(self.mpris_label) elif self._current_display == "artist": text = (mp.artist if mp.artist else "Nothing Playing") self.mpris_label.set_text(text) self.center_stack.set_visible_child(self.mpris_label) else: self.center_stack.set_visible_child(self.cavalcade_box) def _on_icon_button_press(self, widget, event): from gi.repository import Gdk if event.type == Gdk.EventType.BUTTON_PRESS: players = self.mpris_manager.players if not players: return True if event.button == 2: self._display_index = (self._display_index + 1) % len(self._display_options) self._current_display = self._display_options[self._display_index] self._apply_mpris_properties() return True if event.button == 1: self.current_index = (self.current_index + 1) % len(players) elif event.button == 3: self.current_index = (self.current_index - 1) % len(players) if self.current_index < 0: self.current_index = len(players) - 1 mp_new = MprisPlayer(players[self.current_index]) self.mpris_player = mp_new self.mpris_player.connect("changed", self._on_mpris_changed) self._apply_mpris_properties() return True return True def _on_play_pause_button_press(self, widget, event): if event.type == Gdk.EventType.BUTTON_PRESS: if event.button == 1: if self.mpris_player: self.mpris_player.previous() self.mpris_button.get_child().set_markup(icons.prev) GLib.timeout_add(500, self._restore_play_pause_icon) elif event.button == 3: if self.mpris_player: self.mpris_player.next() self.mpris_button.get_child().set_markup(icons.next) GLib.timeout_add(500, self._restore_play_pause_icon) elif event.button == 2: if self.mpris_player: self.mpris_player.play_pause() self.update_play_pause_icon() return True return True def _restore_play_pause_icon(self): self.update_play_pause_icon() return False def _on_icon_clicked(self, widget): pass def update_play_pause_icon(self): if self.mpris_player and self.mpris_player.playback_status == "playing": self.mpris_button.get_child().set_markup(icons.pause) else: self.mpris_button.get_child().set_markup(icons.play) def _on_play_pause_clicked(self, button): if self.mpris_player: self.mpris_player.play_pause() self.update_play_pause_icon() def _on_mpris_changed(self, *args): self._apply_mpris_properties() def on_player_appeared(self, manager, player): if not self.mpris_player: mp = MprisPlayer(player) self.mpris_player = mp self._apply_mpris_properties() self.mpris_player.connect("changed", self._on_mpris_changed) def on_player_vanished(self, manager, player_name): players = self.mpris_manager.players if players and self.mpris_player and self.mpris_player.player_name == player_name: if players: self.current_index = self.current_index % len(players) new_player = MprisPlayer(players[self.current_index]) self.mpris_player = new_player self.mpris_player.connect("changed", self._on_mpris_changed) else: self.mpris_player = None elif not players: self.mpris_player = None self._apply_mpris_properties()