Files
Matthias Hinrichs 4c826d628c fixing input device
2026-01-29 22:51:03 +01:00

208 lines
6.3 KiB
Python

import os
import time
import asyncio
import requests
import struct
import math
import cv2
import pvporcupine
import pyaudio
from dotenv import load_dotenv, set_key
from fastapi import FastAPI, BackgroundTasks
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pydantic import BaseModel
import uvicorn
# Load environment variables
load_dotenv()
ENV_PATH = ".env"
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: Start the background loop
start_interaction_loop()
yield
# Shutdown: Cleanup if needed
if interaction_task:
interaction_task.cancel()
app = FastAPI(title="Home-Bro Satellite", lifespan=lifespan)
app.mount("/static", StaticFiles(directory="static"), name="static")
class Config(BaseModel):
bro_name: str
pico_key: str
brain_url: str
wake_word: str
custom_wake_word_path: str = ""
audio_index: str = ""
@app.get("/")
async def read_index():
return FileResponse("static/index.html")
@app.get("/favicon.ico", include_in_schema=False)
async def favicon():
return FileResponse("static/index.html")
@app.get("/config")
async def get_config():
return {
"bro_name": os.getenv("BRO_NAME", "Home-Bro"),
"pico_key": os.getenv("PICOVOICE_API_KEY", ""),
"brain_url": os.getenv("BRAIN_URL", "http://localhost:8000"),
"wake_word": os.getenv("WAKE_WORD", "porcupine"),
"custom_wake_word_path": os.getenv("CUSTOM_WAKE_WORD_PATH", ""),
"audio_index": os.getenv("AUDIO_DEVICE_INDEX", "")
}
@app.get("/check-brain")
async def check_brain(url: str):
try:
response = requests.get(f"{url}/api/info", timeout=2)
return {"online": response.status_code == 200}
except Exception:
return {"online": False}
def take_snapshot():
cam_index = int(os.getenv("CAM_INDEX", 0))
cap = cv2.VideoCapture(cam_index)
if not cap.isOpened():
print("Error: Could not open camera.")
return None
ret, frame = cap.read()
if ret:
path = "snapshot.jpg"
cv2.imwrite(path, frame)
cap.release()
return path
cap.release()
return None
def send_to_brain(image_path):
brain_url = os.getenv("BRAIN_URL")
if not brain_url: return None
try:
with open(image_path, "rb") as f:
files = {"file": f}
response = requests.post(f"{brain_url}/analyze/pi", files=files)
if response.status_code == 200:
return response.json()
except Exception as e:
print(f"Upload failed: {e}")
return None
def list_audio_devices():
pa = pyaudio.PyAudio()
print("\n--- Verfügbare Audiogeräte ---")
for i in range(pa.get_device_count()):
dev = pa.get_device_info_by_index(i)
print(f"Index {i}: {dev['name']} (Input Channels: {dev['maxInputChannels']})")
print("------------------------------\n")
pa.terminate()
def get_wake_word_params():
keyword = os.getenv("WAKE_WORD", "porcupine")
custom_path = os.getenv("CUSTOM_WAKE_WORD_PATH", "")
if keyword == "custom" and custom_path:
return {"keyword_paths": [custom_path]}
if keyword in pvporcupine.KEYWORDS:
return {"keywords": [keyword]}
return {"keywords": ["porcupine"]}
interaction_task = None
async def brain_interaction_loop():
list_audio_devices()
access_key = os.getenv("PICOVOICE_API_KEY")
if not access_key:
print("CRITICAL: PICOVOICE_API_KEY Missing.")
return
pa = None
audio_stream = None
porcupine = None
try:
params = get_wake_word_params()
porcupine = pvporcupine.create(access_key=access_key, **params)
pa = pyaudio.PyAudio()
device_index = os.getenv("AUDIO_DEVICE_INDEX")
input_device_index = int(device_index) if device_index and device_index.strip() else None
audio_stream = pa.open(
rate=porcupine.sample_rate,
channels=1,
format=pyaudio.paInt16,
input=True,
frames_per_buffer=porcupine.frame_length,
input_device_index=input_device_index
)
print(f"Listening for '{os.getenv('WAKE_WORD', 'porcupine')}' on device {input_device_index}...")
frame_count = 0
while True:
await asyncio.sleep(0.01)
pcm = audio_stream.read(porcupine.frame_length, exception_on_overflow=False)
pcm_unpacked = struct.unpack_from("h" * porcupine.frame_length, pcm)
frame_count += 1
if frame_count % 100 == 0:
rms = math.sqrt(sum(x*x for x in pcm_unpacked) / len(pcm_unpacked))
if rms < 10:
print(f"DEBUG: Pegel sehr niedrig ({rms:.2f}). Check Mic!")
if porcupine.process(pcm_unpacked) >= 0:
print(f"[{os.getenv('BRO_NAME', 'Home-Bro')}] Wake-word detected!")
img_path = take_snapshot()
if img_path:
result = send_to_brain(img_path)
if result:
print(f"Brain says: {result.get('comment')}")
except asyncio.CancelledError:
print("Restarting Loop...")
except Exception as e:
print(f"Error: {e}")
finally:
if audio_stream: audio_stream.close()
if pa: pa.terminate()
if porcupine: porcupine.delete()
def start_interaction_loop():
global interaction_task
interaction_task = asyncio.create_task(brain_interaction_loop())
async def restart_interaction_loop():
global interaction_task
if interaction_task:
interaction_task.cancel()
try: await interaction_task
except asyncio.CancelledError: pass
start_interaction_loop()
@app.post("/config")
async def save_config(config: Config, background_tasks: BackgroundTasks):
settings = {
"BRO_NAME": config.bro_name,
"PICOVOICE_API_KEY": config.pico_key,
"BRAIN_URL": config.brain_url,
"WAKE_WORD": config.wake_word,
"CUSTOM_WAKE_WORD_PATH": config.custom_wake_word_path,
"AUDIO_DEVICE_INDEX": config.audio_index
}
for key, value in settings.items():
os.environ[key] = value
set_key(ENV_PATH, key, value)
background_tasks.add_task(restart_interaction_loop)
return {"status": "ok"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080)