Code Implementation for Building a Bulletproof Agent Workflow with PydanticAI Using Robust Schemas, Tool Injection, and Model-Agnostic Release

In this tutorial, we build a production-ready workflow that prioritizes reliability over effort-intensive production by enforcing strong, typed results at every step. We use PydanticAI to define transparent response schemes, wire in tools with dependency injection, and ensure that the agent can safely interact with external systems, such as databases, without breaking the law. By implementing everything in a notebook-friendly, async-first set, we show how to move beyond weak chatbot patterns to robust agent systems suited for real business workflows.
!pip -q install "pydantic-ai-slim[openai]" pydantic
import os, json, sqlite3
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Literal, Optional, List
from pydantic import BaseModel, Field, field_validator
from pydantic_ai import Agent, RunContext, ModelRetry
if not os.environ.get("OPENAI_API_KEY"):
try:
from google.colab import userdata
os.environ["OPENAI_API_KEY"] = (userdata.get("OPENAI_API_KEY") or "").strip()
except Exception:
pass
if not os.environ.get("OPENAI_API_KEY"):
import getpass
os.environ["OPENAI_API_KEY"] = getpass.getpass("Paste your OPENAI_API_KEY: ").strip()
assert os.environ.get("OPENAI_API_KEY"), "OPENAI_API_KEY is required."
We set up the workspace and make sure that all the required libraries are available for the agent to work properly. We securely upload the OpenAI API key in a Colab-compatible way so that the tutorial works without manual configuration changes. We also import all important dependencies that will be shared across schemas, tools, and agent logic.
Priority = Literal["low", "medium", "high", "critical"]
ActionType = Literal["create_ticket", "update_ticket", "query_ticket", "list_open_tickets", "no_action"]
Confidence = Literal["low", "medium", "high"]
class TicketDraft(BaseModel):
title: str = Field(..., min_length=8, max_length=120)
customer: str = Field(..., min_length=2, max_length=60)
priority: Priority
category: Literal["billing", "bug", "feature_request", "security", "account", "other"]
description: str = Field(..., min_length=20, max_length=1000)
expected_outcome: str = Field(..., min_length=10, max_length=250)
class AgentDecision(BaseModel):
action: ActionType
reason: str = Field(..., min_length=20, max_length=400)
confidence: Confidence
ticket: Optional[TicketDraft] = None
ticket_id: Optional[int] = None
follow_up_questions: List[str] = Field(default_factory=list, max_length=5)
@field_validator("follow_up_questions")
@classmethod
def short_questions(cls, v):
for q in v:
if len(q) > 140:
raise ValueError("Each follow-up question must be <= 140 characters.")
return v
We define robust data models that act as a contract between the agent and the rest of the system. We use typed fields and validation rules to ensure that every agent response follows a predictable structure. By enforcing these schemas, we prevent negative results from silently propagating through the workflow.
@dataclass
class SupportDeps:
db: sqlite3.Connection
tenant: str
policy: dict
def utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def init_db() -> sqlite3.Connection:
conn = sqlite3.connect(":memory:", check_same_thread=False)
conn.execute("""
CREATE TABLE tickets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tenant TEXT NOT NULL,
title TEXT NOT NULL,
customer TEXT NOT NULL,
priority TEXT NOT NULL,
category TEXT NOT NULL,
description TEXT NOT NULL,
expected_outcome TEXT NOT NULL,
status TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
""")
conn.commit()
return conn
def seed_ticket(db: sqlite3.Connection, tenant: str, ticket: TicketDraft, status: str = "open") -> int:
now = utc_now_iso()
cur = db.execute(
"""
INSERT INTO tickets
(tenant, title, customer, priority, category, description, expected_outcome, status, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
tenant,
ticket.title,
ticket.customer,
ticket.priority,
ticket.category,
ticket.description,
ticket.expected_outcome,
status,
now,
now,
),
)
db.commit()
return int(cur.lastrowid)
We create a dependency layer and implement a lightweight SQLite database for processing. We model real-world runtime dependencies, such as database connections and employer policies, and have them injected into the agent. We also define helper functions that securely enter and manage ticket data while it is still running.
def build_agent(model_name: str) -> Agent[SupportDeps, AgentDecision]:
agent = Agent(
f"openai:{model_name}",
output_type=AgentDecision,
output_retries=2,
instructions=(
"You are a production support triage agent.n"
"Return an output that matches the AgentDecision schema.n"
"Use tools when you need DB state.n"
"Never invent ticket IDs.n"
"If the user intent is unclear, ask concise follow-up questions.n"
),
)
@agent.tool
def create_ticket(ctx: RunContext[SupportDeps], ticket: TicketDraft) -> int:
deps = ctx.deps
if ticket.priority in ("critical", "high") and deps.policy.get("require_security_phrase_for_critical", False):
if ticket.category == "security" and "incident" not in ticket.description.lower():
raise ModelRetry("For security high/critical, include the word 'incident' in description and retry.")
return seed_ticket(deps.db, deps.tenant, ticket, status="open")
@agent.tool
def update_ticket_status(
ctx: RunContext[SupportDeps],
ticket_id: int,
status: Literal["open", "in_progress", "resolved", "closed"],
) -> dict:
deps = ctx.deps
now = utc_now_iso()
cur = deps.db.execute("SELECT id FROM tickets WHERE tenant=? AND id=?", (deps.tenant, ticket_id))
if not cur.fetchone():
raise ModelRetry(f"Ticket {ticket_id} not found for this tenant. Ask for the correct ticket_id.")
deps.db.execute(
"UPDATE tickets SET status=?, updated_at=? WHERE tenant=? AND id=?",
(status, now, deps.tenant, ticket_id),
)
deps.db.commit()
return {"ticket_id": ticket_id, "status": status, "updated_at": now}
@agent.tool
def query_ticket(ctx: RunContext[SupportDeps], ticket_id: int) -> dict:
deps = ctx.deps
cur = deps.db.execute(
"""
SELECT id, title, customer, priority, category, status, created_at, updated_at
FROM tickets WHERE tenant=? AND id=?
""",
(deps.tenant, ticket_id),
)
row = cur.fetchone()
if not row:
raise ModelRetry(f"Ticket {ticket_id} not found. Ask the user for a valid ticket_id.")
keys = ["id", "title", "customer", "priority", "category", "status", "created_at", "updated_at"]
return dict(zip(keys, row))
@agent.tool
def list_open_tickets(ctx: RunContext[SupportDeps], limit: int = 5) -> list:
deps = ctx.deps
limit = max(1, min(int(limit), 20))
cur = deps.db.execute(
"""
SELECT id, title, priority, category, status, updated_at
FROM tickets
WHERE tenant=? AND status IN ('open','in_progress')
ORDER BY updated_at DESC
LIMIT ?
""",
(deps.tenant, limit),
)
rows = cur.fetchall()
return [
{"id": r[0], "title": r[1], "priority": r[2], "category": r[3], "status": r[4], "updated_at": r[5]}
for r in rows
]
@agent.output_validator
def validate_decision(ctx: RunContext[SupportDeps], out: AgentDecision) -> AgentDecision:
deps = ctx.deps
if out.action == "create_ticket" and out.ticket is None:
raise ModelRetry("You chose create_ticket but did not provide ticket. Provide ticket fields and retry.")
if out.action in ("update_ticket", "query_ticket") and out.ticket_id is None:
raise ModelRetry("You chose update/query but did not provide ticket_id. Ask for ticket_id and retry.")
if out.ticket and out.ticket.priority == "critical" and not deps.policy.get("allow_critical", True):
raise ModelRetry("This tenant does not allow 'critical'. Downgrade to 'high' and retry.")
return out
return agent
It contains the core agent logic for integrating the model-agnostic PydanticAI agent. We register typed tools for creating, querying, reviewing, and listing tickets, allowing the agent to interact with the external environment in a controlled manner. We also use output validation so that the agent can correct itself whenever its decisions violate business rules.
db = init_db()
deps = SupportDeps(
db=db,
tenant="acme_corp",
policy={"allow_critical": True, "require_security_phrase_for_critical": True},
)
seed_ticket(
db,
deps.tenant,
TicketDraft(
title="Double-charged on invoice 8831",
customer="Riya",
priority="high",
category="billing",
description="Customer reports they were billed twice for invoice 8831 and wants a refund and confirmation email.",
expected_outcome="Issue a refund and confirm resolution to customer.",
),
)
seed_ticket(
db,
deps.tenant,
TicketDraft(
title="App crashes on login after update",
customer="Sam",
priority="high",
category="bug",
description="After latest update, the app crashes immediately on login. Reproducible on two devices; needs investigation.",
expected_outcome="Provide a fix or workaround and restore successful logins.",
),
)
agent = build_agent("gpt-4o-mini")
async def run_case(prompt: str):
res = await agent.run(prompt, deps=deps)
out = res.output
print(json.dumps(out.model_dump(), indent=2))
return out
case_a = await run_case(
"We suspect account takeover: multiple password reset emails and unauthorized logins. "
"Customer=Leila. Priority=critical. Open a security ticket."
)
case_b = await run_case("List our open tickets and summarize what to tackle first.")
case_c = await run_case("What is the status of ticket 1? If it's open, move it to in_progress.")
agent_alt = build_agent("gpt-4o")
alt_res = await agent_alt.run(
"Create a feature request ticket: customer=Noah wants 'export to CSV' in analytics dashboard; priority=medium.",
deps=deps,
)
print(json.dumps(alt_res.output.model_dump(), indent=2))
We put everything together by entering the initial data and running the agent in parallel, in a notebook-safe way. We create several real-world scenarios to demonstrate how the agent triggers, calls tools, and returns valid schema results. We also show how easily we can change the underlying model while keeping the same workflow and credentials intact.
In conclusion, we have shown how a type-safe agent can reason, call tools, validate its output, and recover from errors without manual intervention. We kept the logic model-agnostic, allowing us to change underlying LLMs while maintaining the same schemas and tools, which is important for long-term sustainability. Overall, we have shown that combining robust schema implementation, dependency injection, and async implementation closes the reliability gap in agent AI and provides a solid foundation for building reliable production systems.
Check it out Full Codes Here. Also, feel free to follow us Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to Our newspaper. Wait! are you on telegram? now you can join us on telegram too.


