import os import time import asyncio import requests from dotenv import load_dotenv, set_key from fastapi import FastAPI, BackgroundTasks from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from pydantic import BaseModel import uvicorn # Load environment variables load_dotenv() ENV_PATH = ".env" app = FastAPI(title="Home-Bro Satellite") # Ensure static directory exists or serve it app.mount("/static", StaticFiles(directory="static"), name="static") class Config(BaseModel): bro_name: str pico_key: str brain_url: str wake_word: str custom_wake_word_path: str = "" @app.get("/") async def read_index(): return FileResponse("static/index.html") @app.get("/config") async def get_config(): return { "bro_name": os.getenv("BRO_NAME", "Home-Bro"), "pico_key": os.getenv("PICOVOICE_API_KEY", ""), "brain_url": os.getenv("BRAIN_URL", "http://localhost:8000"), "wake_word": os.getenv("WAKE_WORD", "porcupine"), "custom_wake_word_path": os.getenv("CUSTOM_WAKE_WORD_PATH", "") } @app.post("/config") async def save_config(config: Config): # Update environment variables in memory and file settings = { "BRO_NAME": config.bro_name, "PICOVOICE_API_KEY": config.pico_key, "BRAIN_URL": config.brain_url, "WAKE_WORD": config.wake_word, "CUSTOM_WAKE_WORD_PATH": config.custom_wake_word_path } for key, value in settings.items(): os.environ[key] = value set_key(ENV_PATH, key, value) return {"status": "ok"} @app.get("/check-brain") async def check_brain(url: str): try: # Simple health check to the brain # We use a short timeout (2s) to keep UI responsive response = requests.get(f"{url}/docs", timeout=2) return {"online": response.status_code == 200} except Exception: return {"online": False} import cv2 def take_snapshot(): cam_index = int(os.getenv("CAM_INDEX", 0)) cap = cv2.VideoCapture(cam_index) if not cap.isOpened(): print("Error: Could not open camera.") return None ret, frame = cap.read() if ret: path = "snapshot.jpg" cv2.imwrite(path, frame) cap.release() return path cap.release() return None def send_to_brain(image_path): brain_url = os.getenv("BRAIN_URL") if not brain_url: return None try: with open(image_path, "rb") as f: files = {"file": f} response = requests.post(f"{brain_url}/analyze", files=files) if response.status_code == 200: return response.json() except Exception as e: print(f"Upload failed: {e}") return None import pvporcupine import pyaudio import struct def get_wake_word_params(): """Helper to get Porcupine parameters based on settings.""" keyword = os.getenv("WAKE_WORD", "porcupine") custom_path = os.getenv("CUSTOM_WAKE_WORD_PATH", "") if keyword == "custom" and custom_path: return {"library_path": None, "model_path": None, "keyword_paths": [custom_path]} # Standard builtin keywords if keyword in pvporcupine.KEYWORDS: return {"keywords": [keyword]} return {"keywords": ["porcupine"]} # Global variable to hold the task interaction_task = None async def brain_interaction_loop(): """Background loop for wake-word detection and brain interaction.""" # RE-LOAD settings inside the loop to ensure we use latest access_key = os.getenv("PICOVOICE_API_KEY") if not access_key: print("CRITICAL: PICOVOICE_API_KEY not found in .env. Wake-word detection disabled.") return pa = None audio_stream = None porcupine = None try: params = get_wake_word_params() porcupine = pvporcupine.create(access_key=access_key, **params) pa = pyaudio.PyAudio() audio_stream = pa.open( rate=porcupine.sample_rate, channels=1, format=pyaudio.paInt16, input=True, frames_per_buffer=porcupine.frame_length ) print(f"Listening for wake-word '{os.getenv('WAKE_WORD', 'porcupine')}'...") while True: # Check for cancellation await asyncio.sleep(0.01) pcm = audio_stream.read(porcupine.frame_length, exception_on_overflow=False) pcm = struct.unpack_from("h" * porcupine.frame_length, pcm) keyword_index = porcupine.process(pcm) if keyword_index >= 0: print(f"[{os.getenv('BRO_NAME', 'Home-Bro')}] Wake-word detected!") img_path = take_snapshot() if img_path: result = send_to_brain(img_path) if result: print(f"Brain says: {result.get('comment')}") except asyncio.CancelledError: print("Wake-word loop cancelled for restart...") except Exception as e: print(f"Error in wake-word loop: {e}") finally: if audio_stream is not None: audio_stream.close() if pa is not None: pa.terminate() if porcupine is not None: porcupine.delete() def start_interaction_loop(): global interaction_task interaction_task = asyncio.create_task(brain_interaction_loop()) async def restart_interaction_loop(): global interaction_task if interaction_task: interaction_task.cancel() try: await interaction_task except asyncio.CancelledError: pass start_interaction_loop() @app.post("/config") async def save_config(config: Config, background_tasks: BackgroundTasks): # Update environment variables in memory and file settings = { "BRO_NAME": config.bro_name, "PICOVOICE_API_KEY": config.pico_key, "BRAIN_URL": config.brain_url, "WAKE_WORD": config.wake_word, "CUSTOM_WAKE_WORD_PATH": config.custom_wake_word_path } for key, value in settings.items(): os.environ[key] = value set_key(ENV_PATH, key, value) # Trigger restart in the background background_tasks.add_task(restart_interaction_loop) return {"status": "ok"} @app.on_event("startup") async def startup_event(): start_interaction_loop() if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8080)