Every engineer who has shipped an AI agent has thought the same thing: "I'll just add a simple loop counter and a max_iterations check. How hard can it be?"
Here's what actually happens.
You write something like this. It looks reasonable. It even passes code review.
# The "good enough" version every team ships first
async def run_agent(task: str, max_iterations: int = 10):
iterations = 0
while True:
result = await llm.complete(task)
iterations += 1
if iterations >= max_iterations:
break
if result.is_done:
break
return result
This code will destroy you. Not immediately — that would be merciful. It will destroy you at 3am on a Tuesday, three months after launch.
Here's the real-world failure mode nobody talks about in tutorials:
# What actually happens under concurrent load
# 50 users hit your agent endpoint simultaneously
async def run_agent_broken(task: str):
iterations = 0
# BUG: iterations is a local variable
# Each coroutine has its OWN counter
# They don't share state
while True:
result = await llm.complete(task) # yields here
iterations += 1
# While THIS coroutine sleeps at "await",
# 49 OTHER coroutines are ALSO running
# Each thinks it's at iteration 1, 2, 3...
# None of them stop
if result.needs_more_work:
task = result.next_task # ← infinite loop trigger
continue # ← never breaks
"We woke up to $8,400 in OpenAI charges overnight. The agent had looped 40,000 times on a single malformed user request. Our max_iterations check had a race condition we never caught in testing." — YC-backed startup, 2025
OK, you read about the race condition. You add global state. Now you have a different problem:
# Attempt 2: Global counter (introduces new bugs)
import asyncio
from collections import defaultdict
agent_call_counts = defaultdict(int) # global state
agent_costs = defaultdict(float)
MAX_CALLS = 50
MAX_COST = 1.00 # $1 limit per session
async def run_agent_v2(session_id: str, task: str):
while True:
# BUG 1: Not thread-safe without locks
agent_call_counts[session_id] += 1
if agent_call_counts[session_id] > MAX_CALLS:
raise Exception("Max calls exceeded")
result = await llm.complete(task)
# BUG 2: Cost estimate is wrong (no token counting)
agent_costs[session_id] += 0.01 # ← hardcoded, wrong
# BUG 3: Memory leak — session IDs never cleaned up
# After 1M sessions: OOMKilled
# BUG 4: No persistence — restart resets all counters
# Deploy during incident = counters reset = loop resumes
if result.is_done:
break
# BUG 5: Never cleans up session state
return result
You now have 5 new bugs in your "fixed" code. This is the DIY trap: every fix introduces new failure modes. You're not building a product anymore — you're maintaining an incident response system.
If you want to do this right yourself, here's the minimum viable implementation. Set aside 2 weeks:
# Production-grade: what you actually need to build
import asyncio, redis, time
from dataclasses import dataclass
from typing import Optional
@dataclass
class AgentBudget:
max_iterations: int = 50
max_cost_usd: float = 2.00
max_wall_time_seconds: int = 30
max_tokens_total: int = 100000
class AgentRunawayGuard:
def __init__(self, redis_client, budget: AgentBudget):
self.redis = redis_client # Need Redis for persistence
self.budget = budget
async def check_and_increment(
self, session_id: str, tokens_used: int, cost: float
) -> dict:
async with self.redis.pipeline() as pipe:
try:
# Atomic increment with TTL
await pipe.watch(f"agent:{session_id}:calls")
calls = int(await self.redis.get(
f"agent:{session_id}:calls") or 0)
if calls >= self.budget.max_iterations:
return {"kill": True, "reason": "max_iterations"}
pipe.multi()
pipe.incr(f"agent:{session_id}:calls")
pipe.expire(f"agent:{session_id}:calls", 3600)
pipe.incrbyfloat(f"agent:{session_id}:cost", cost)
await pipe.execute()
total_cost = float(await self.redis.get(
f"agent:{session_id}:cost") or 0)
if total_cost >= self.budget.max_cost_usd:
return {"kill": True, "reason": "budget_exceeded"}
return {"kill": False, "calls": calls + 1, "cost": total_cost}
except redis.WatchError:
return {"kill": True, "reason": "concurrency_error"}
# You also need:
# - Actual token counting (tiktoken integration)
# - Wall clock timeout (asyncio.wait_for)
# - Webhook alerts when agents are killed
# - Dashboard to see which sessions were terminated
# - Audit log for compliance
# - Tests for all the race conditions above
# Total: ~400 lines of production code + Redis infra
That's before you add token counting, wall-clock timeouts, webhook alerts, a dashboard, audit logging, and tests. You're looking at 400+ lines of production code, a Redis instance, and a weekend you'll never get back.
Or you do this:
from soruvalab import verify
# Before passing output to next agent iteration
result = verify(agent_output)
if result["agent_runaway_score"] > 70:
raise AgentKillSwitch("Runaway detected. Session terminated.")
# Logged. Alerted. Compliant. Done.
SoruvaGuard's runaway detection handles iteration counting, cost tracking, semantic loop detection (catching agents that rephrase the same task infinitely), and EU AI Act audit logging — in a single API call that returns in 180ms.
The Redis infrastructure, the race condition handling, the token counting, the dashboard — it's all on our side. Your side is one line.
SoruvaGuard's agent runaway detection handles the hard parts so you don't have to. One API call. 180ms. No Redis required.
Get Early Access — Free