223 lines
6.5 KiB
Python
223 lines
6.5 KiB
Python
import os
|
|
import time
|
|
import asyncio
|
|
import requests
|
|
from dotenv import load_dotenv, set_key
|
|
from fastapi import FastAPI, BackgroundTasks
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.responses import FileResponse
|
|
from pydantic import BaseModel
|
|
import uvicorn
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
ENV_PATH = ".env"
|
|
|
|
app = FastAPI(title="Home-Bro Satellite")
|
|
|
|
# Ensure static directory exists or serve it
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
|
|
class Config(BaseModel):
|
|
bro_name: str
|
|
pico_key: str
|
|
brain_url: str
|
|
wake_word: str
|
|
custom_wake_word_path: str = ""
|
|
|
|
@app.get("/")
|
|
async def read_index():
|
|
return FileResponse("static/index.html")
|
|
|
|
@app.get("/favicon.ico", include_in_schema=False)
|
|
async def favicon():
|
|
return FileResponse("static/index.html") # Or just return 204
|
|
|
|
@app.get("/config")
|
|
async def get_config():
|
|
return {
|
|
"bro_name": os.getenv("BRO_NAME", "Home-Bro"),
|
|
"pico_key": os.getenv("PICOVOICE_API_KEY", ""),
|
|
"brain_url": os.getenv("BRAIN_URL", "http://localhost:8000"),
|
|
"wake_word": os.getenv("WAKE_WORD", "porcupine"),
|
|
"custom_wake_word_path": os.getenv("CUSTOM_WAKE_WORD_PATH", "")
|
|
}
|
|
|
|
@app.post("/config")
|
|
async def save_config(config: Config):
|
|
# Update environment variables in memory and file
|
|
settings = {
|
|
"BRO_NAME": config.bro_name,
|
|
"PICOVOICE_API_KEY": config.pico_key,
|
|
"BRAIN_URL": config.brain_url,
|
|
"WAKE_WORD": config.wake_word,
|
|
"CUSTOM_WAKE_WORD_PATH": config.custom_wake_word_path
|
|
}
|
|
|
|
for key, value in settings.items():
|
|
os.environ[key] = value
|
|
set_key(ENV_PATH, key, value)
|
|
|
|
return {"status": "ok"}
|
|
|
|
@app.get("/check-brain")
|
|
async def check_brain(url: str):
|
|
try:
|
|
# Simple health check to the brain
|
|
# We use a short timeout (2s) to keep UI responsive
|
|
response = requests.get(f"{url}/docs", timeout=2)
|
|
return {"online": response.status_code == 200}
|
|
except Exception:
|
|
return {"online": False}
|
|
|
|
import cv2
|
|
|
|
def take_snapshot():
|
|
cam_index = int(os.getenv("CAM_INDEX", 0))
|
|
cap = cv2.VideoCapture(cam_index)
|
|
if not cap.isOpened():
|
|
print("Error: Could not open camera.")
|
|
return None
|
|
|
|
ret, frame = cap.read()
|
|
if ret:
|
|
path = "snapshot.jpg"
|
|
cv2.imwrite(path, frame)
|
|
cap.release()
|
|
return path
|
|
cap.release()
|
|
return None
|
|
|
|
def send_to_brain(image_path):
|
|
brain_url = os.getenv("BRAIN_URL")
|
|
if not brain_url:
|
|
return None
|
|
|
|
try:
|
|
with open(image_path, "rb") as f:
|
|
files = {"file": f}
|
|
response = requests.post(f"{brain_url}/analyze", files=files)
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
except Exception as e:
|
|
print(f"Upload failed: {e}")
|
|
return None
|
|
|
|
import pvporcupine
|
|
import pyaudio
|
|
import struct
|
|
|
|
def get_wake_word_params():
|
|
"""Helper to get Porcupine parameters based on settings."""
|
|
keyword = os.getenv("WAKE_WORD", "porcupine")
|
|
custom_path = os.getenv("CUSTOM_WAKE_WORD_PATH", "")
|
|
|
|
if keyword == "custom" and custom_path:
|
|
return {"library_path": None, "model_path": None, "keyword_paths": [custom_path]}
|
|
|
|
# Standard builtin keywords
|
|
if keyword in pvporcupine.KEYWORDS:
|
|
return {"keywords": [keyword]}
|
|
|
|
return {"keywords": ["porcupine"]}
|
|
|
|
# Global variable to hold the task
|
|
interaction_task = None
|
|
|
|
async def brain_interaction_loop():
|
|
"""Background loop for wake-word detection and brain interaction."""
|
|
# RE-LOAD settings inside the loop to ensure we use latest
|
|
access_key = os.getenv("PICOVOICE_API_KEY")
|
|
if not access_key:
|
|
print("CRITICAL: PICOVOICE_API_KEY not found in .env. Wake-word detection disabled.")
|
|
return
|
|
|
|
pa = None
|
|
audio_stream = None
|
|
porcupine = None
|
|
|
|
try:
|
|
params = get_wake_word_params()
|
|
porcupine = pvporcupine.create(access_key=access_key, **params)
|
|
|
|
pa = pyaudio.PyAudio()
|
|
audio_stream = pa.open(
|
|
rate=porcupine.sample_rate,
|
|
channels=1,
|
|
format=pyaudio.paInt16,
|
|
input=True,
|
|
frames_per_buffer=porcupine.frame_length
|
|
)
|
|
|
|
print(f"Listening for wake-word '{os.getenv('WAKE_WORD', 'porcupine')}'...")
|
|
|
|
while True:
|
|
# Check for cancellation
|
|
await asyncio.sleep(0.01)
|
|
|
|
pcm = audio_stream.read(porcupine.frame_length, exception_on_overflow=False)
|
|
pcm = struct.unpack_from("h" * porcupine.frame_length, pcm)
|
|
|
|
keyword_index = porcupine.process(pcm)
|
|
|
|
if keyword_index >= 0:
|
|
print(f"[{os.getenv('BRO_NAME', 'Home-Bro')}] Wake-word detected!")
|
|
img_path = take_snapshot()
|
|
if img_path:
|
|
result = send_to_brain(img_path)
|
|
if result:
|
|
print(f"Brain says: {result.get('comment')}")
|
|
|
|
except asyncio.CancelledError:
|
|
print("Wake-word loop cancelled for restart...")
|
|
except Exception as e:
|
|
print(f"Error in wake-word loop: {e}")
|
|
finally:
|
|
if audio_stream is not None:
|
|
audio_stream.close()
|
|
if pa is not None:
|
|
pa.terminate()
|
|
if porcupine is not None:
|
|
porcupine.delete()
|
|
|
|
def start_interaction_loop():
|
|
global interaction_task
|
|
interaction_task = asyncio.create_task(brain_interaction_loop())
|
|
|
|
async def restart_interaction_loop():
|
|
global interaction_task
|
|
if interaction_task:
|
|
interaction_task.cancel()
|
|
try:
|
|
await interaction_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
start_interaction_loop()
|
|
|
|
@app.post("/config")
|
|
async def save_config(config: Config, background_tasks: BackgroundTasks):
|
|
# Update environment variables in memory and file
|
|
settings = {
|
|
"BRO_NAME": config.bro_name,
|
|
"PICOVOICE_API_KEY": config.pico_key,
|
|
"BRAIN_URL": config.brain_url,
|
|
"WAKE_WORD": config.wake_word,
|
|
"CUSTOM_WAKE_WORD_PATH": config.custom_wake_word_path
|
|
}
|
|
|
|
for key, value in settings.items():
|
|
os.environ[key] = value
|
|
set_key(ENV_PATH, key, value)
|
|
|
|
# Trigger restart in the background
|
|
background_tasks.add_task(restart_interaction_loop)
|
|
|
|
return {"status": "ok"}
|
|
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
start_interaction_loop()
|
|
|
|
if __name__ == "__main__":
|
|
uvicorn.run(app, host="0.0.0.0", port=8080)
|