import os import time import asyncio import requests import struct import math import cv2 import pvporcupine import pyaudio from dotenv import load_dotenv, set_key from fastapi import FastAPI, BackgroundTasks from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from pydantic import BaseModel import uvicorn from ctypes import CFUNCTYPE, c_char_p, c_int, cdll # Load environment variables load_dotenv() ENV_PATH = ".env" # Logic to suppress ALSA/JACK error spam on Linux def suppress_alsa_errors(): def py_error_handler(filename, line, function, err, fmt): pass ERROR_HANDLER_FUNC = CFUNCTYPE(None, c_char_p, c_int, c_char_p, c_int, c_char_p) c_error_handler = ERROR_HANDLER_FUNC(py_error_handler) try: asound = cdll.LoadLibrary('libasound.so.2') asound.snd_lib_error_set_handler(c_error_handler) except Exception: pass suppress_alsa_errors() app = FastAPI(title="Home-Bro Satellite") app.mount("/static", StaticFiles(directory="static"), name="static") class Config(BaseModel): bro_name: str pico_key: str brain_url: str wake_word: str custom_wake_word_path: str = "" audio_index: str = "" @app.get("/") async def read_index(): return FileResponse("static/index.html") @app.get("/favicon.ico", include_in_schema=False) async def favicon(): return FileResponse("static/index.html") @app.get("/config") async def get_config(): return { "bro_name": os.getenv("BRO_NAME", "Home-Bro"), "pico_key": os.getenv("PICOVOICE_API_KEY", ""), "brain_url": os.getenv("BRAIN_URL", "http://localhost:8000"), "wake_word": os.getenv("WAKE_WORD", "porcupine"), "custom_wake_word_path": os.getenv("CUSTOM_WAKE_WORD_PATH", ""), "audio_index": os.getenv("AUDIO_DEVICE_INDEX", "") } @app.get("/check-brain") async def check_brain(url: str): try: response = requests.get(f"{url}/api/info", timeout=2) return {"online": response.status_code == 200} except Exception: return {"online": False} def take_snapshot(): cam_index = int(os.getenv("CAM_INDEX", 0)) cap = cv2.VideoCapture(cam_index) if not cap.isOpened(): print("Error: Could not open camera.") return None ret, frame = cap.read() if ret: path = "snapshot.jpg" cv2.imwrite(path, frame) cap.release() return path cap.release() return None def send_to_brain(image_path): brain_url = os.getenv("BRAIN_URL") if not brain_url: return None try: with open(image_path, "rb") as f: files = {"file": f} response = requests.post(f"{brain_url}/analyze/pi", files=files) if response.status_code == 200: return response.json() except Exception as e: print(f"Upload failed: {e}") return None def list_audio_devices(): pa = pyaudio.PyAudio() print("\n--- Verfügbare Audiogeräte ---") for i in range(pa.get_device_count()): dev = pa.get_device_info_by_index(i) print(f"Index {i}: {dev['name']} (Input Channels: {dev['maxInputChannels']})") print("------------------------------\n") pa.terminate() def get_wake_word_params(): keyword = os.getenv("WAKE_WORD", "porcupine") custom_path = os.getenv("CUSTOM_WAKE_WORD_PATH", "") if keyword == "custom" and custom_path: return {"keyword_paths": [custom_path]} if keyword in pvporcupine.KEYWORDS: return {"keywords": [keyword]} return {"keywords": ["porcupine"]} interaction_task = None async def brain_interaction_loop(): list_audio_devices() access_key = os.getenv("PICOVOICE_API_KEY") if not access_key: print("CRITICAL: PICOVOICE_API_KEY Missing.") return pa = None audio_stream = None porcupine = None try: params = get_wake_word_params() porcupine = pvporcupine.create(access_key=access_key, **params) pa = pyaudio.PyAudio() device_index = os.getenv("AUDIO_DEVICE_INDEX") input_device_index = int(device_index) if device_index and device_index.strip() else None audio_stream = pa.open( rate=porcupine.sample_rate, channels=1, format=pyaudio.paInt16, input=True, frames_per_buffer=porcupine.frame_length, input_device_index=input_device_index ) print(f"Listening for '{os.getenv('WAKE_WORD', 'porcupine')}' on device {input_device_index}...") frame_count = 0 while True: await asyncio.sleep(0.01) pcm = audio_stream.read(porcupine.frame_length, exception_on_overflow=False) pcm_unpacked = struct.unpack_from("h" * porcupine.frame_length, pcm) frame_count += 1 if frame_count % 100 == 0: rms = math.sqrt(sum(x*x for x in pcm_unpacked) / len(pcm_unpacked)) if rms < 10: print(f"DEBUG: Pegel sehr niedrig ({rms:.2f}). Check Mic!") if porcupine.process(pcm_unpacked) >= 0: print(f"[{os.getenv('BRO_NAME', 'Home-Bro')}] Wake-word detected!") img_path = take_snapshot() if img_path: result = send_to_brain(img_path) if result: print(f"Brain says: {result.get('comment')}") except asyncio.CancelledError: print("Restarting Loop...") except Exception as e: print(f"Error: {e}") finally: if audio_stream: audio_stream.close() if pa: pa.terminate() if porcupine: porcupine.delete() def start_interaction_loop(): global interaction_task interaction_task = asyncio.create_task(brain_interaction_loop()) async def restart_interaction_loop(): global interaction_task if interaction_task: interaction_task.cancel() try: await interaction_task except asyncio.CancelledError: pass start_interaction_loop() @app.post("/config") async def save_config(config: Config, background_tasks: BackgroundTasks): settings = { "BRO_NAME": config.bro_name, "PICOVOICE_API_KEY": config.pico_key, "BRAIN_URL": config.brain_url, "WAKE_WORD": config.wake_word, "CUSTOM_WAKE_WORD_PATH": config.custom_wake_word_path, "AUDIO_DEVICE_INDEX": config.audio_index } for key, value in settings.items(): os.environ[key] = value set_key(ENV_PATH, key, value) background_tasks.add_task(restart_interaction_loop) return {"status": "ok"} @app.on_event("startup") async def startup_event(): start_interaction_loop() if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8080)