Documentation Index
Fetch the complete documentation index at: https://docs.illocution.ai/llms.txt
Use this file to discover all available pages before exploring further.
Use these synchronous recipes if your integration runs inside a WSGI app, Lambda, or any environment where asyncio is inconvenient. This walkthrough targets the true live capture API (/analyze/live/*) that streams transcripts, emotion, and cognitive analytics as the media arrives. It is not the Replay endpoint (POST /analyze/stream), which simply replays an uploaded file.
Endpoint Reference
| Purpose | Method & Path | Notes |
| Start session | POST /analyze/live/start | Returns session_id, chunk/events/control URLs, TTL metadata. |
| Push audio chunk | POST /analyze/live/{session_id}/chunk | Sequential binary chunks with chunk_seq counters. |
| Control session | POST /analyze/live/{session_id}/control | finalize, cancel, or keepalive. |
| Subscribe to analytics | GET /analyze/live/{session_id}/events | Single SSE stream delivering status, final_transcript, emotion, cognitive, transition, moment, summary_update, error, and done. |
All requests require X-API-Key, and live capture must be enabled on the deployment (ENABLE_LIVE_STREAMING=true).
Requirements
pip install requests==2.32.3
- Audio input: 16-bit PCM WAV, MP3, or Opus are safe defaults (server handles transcoding).
- Chunking: 32–128 kB provides good latency without overwhelming FastAPI.
- SSE: Only one consumer may connect to
events_endpoint per session.
Recipe 1 – Start a session & read SSE synchronously
import json
import requests
API_KEY = "YOUR_KEY"
BASE_URL = "https://api.illocution.ai"
def start_session(filename: str, goal: str = "") -> dict:
resp = requests.post(
f"{BASE_URL}/analyze/live/start",
headers={"X-API-Key": API_KEY},
json={"filename": filename, "goal": goal},
timeout=30,
)
resp.raise_for_status()
return resp.json()
def stream_events(events_url: str) -> None:
headers = {
"Accept": "text/event-stream",
"X-API-Key": API_KEY,
}
with requests.get(events_url, headers=headers, stream=True, timeout=None) as resp:
resp.raise_for_status()
event_name = None
buffer = []
for line in resp.iter_lines(decode_unicode=True):
if not line:
if buffer:
payload = json.loads("".join(buffer))
print(f"{event_name or 'message'}: {payload}")
buffer.clear()
continue
if line.startswith("event:"):
event_name = line.split("event:", 1)[1].strip()
elif line.startswith("data:"):
buffer.append(line.split("data:", 1)[1].strip())
Event payloads mirror the async cookbook: status, final_transcript, emotion, cognitive, transition, moment, summary_update, error, done.
Recipe 2 – Stream audio chunks synchronously
from pathlib import Path
def upload_chunks(chunk_url: str, file_path: Path, chunk_size: int = 64 * 1024) -> None:
headers = {"X-API-Key": API_KEY}
seq = 0
with file_path.open("rb") as media:
while True:
data = media.read(chunk_size)
if not data:
break
files = {"chunk": (f"chunk-{seq}.bin", data, "application/octet-stream")}
form = {"chunk_seq": str(seq)}
resp = requests.post(chunk_url, headers=headers, data=form, files=files, timeout=30)
resp.raise_for_status()
ack = resp.json()
print(f"chunk {seq} ack bytes={ack['received_bytes']}")
seq += 1
Guidelines:
chunk_seq must be exactly 0,1,2.... On retry, resend the same sequence number.
- Keep chunk sizes consistent; smaller chunks reduce latency but increase HTTP overhead.
- The server rejects additional chunks after you send
finalize.
Recipe 3 – Control the session
def finalize_session(control_url: str) -> None:
headers = {"X-API-Key": API_KEY}
resp = requests.post(control_url, headers=headers, json={"action": "finalize"}, timeout=15)
resp.raise_for_status()
print(f"finalize ack: {resp.json()}")
def cancel_session(control_url: str, reason: str) -> None:
headers = {"X-API-Key": API_KEY}
resp = requests.post(
control_url,
headers=headers,
json={"action": "cancel", "reason": reason},
timeout=15,
)
resp.raise_for_status()
def keepalive_session(control_url: str) -> None:
headers = {"X-API-Key": API_KEY}
resp = requests.post(control_url, headers=headers, json={"action": "keepalive"}, timeout=15)
resp.raise_for_status()
Recipe 4 – Full synchronous runner with threads
To ingest while listening for analytics, use a background thread for SSE.
import threading
from pathlib import Path
def run_live_capture(file_path: Path) -> None:
session = start_session(filename=file_path.name, goal="Live coaching")
print(f"Session {session['session_id']} (conversation {session['conversation_id']}) started")
events_thread = threading.Thread(
target=stream_events,
args=(session["events_endpoint"],),
daemon=True,
)
events_thread.start()
upload_chunks(session["chunk_endpoint"], file_path)
finalize_session(session["control_endpoint"])
events_thread.join() # wait for `done`
if __name__ == "__main__":
run_live_capture(Path("call.wav"))
Production Notes
- Replace the file reader with microphone capture: enqueue PCM frames and write them out via
upload_chunks without staging on disk.
- Add exponential backoff & retry for
requests.post calls; resend the same chunk_seq if you get a transient network error.
- If the SSE thread exits without a
done event, inspect the last error payload and consider restarting the session.
- Monitor
/metrics for events counts and agent latency to ensure the live service keeps up with ingest.
- Artifacts (timeline, summary, segmentation, agent outputs) are persisted and referenced by the final
done event.