AI Sparks

Code Implementation for Building a Bulletproof Agent Workflow with PydanticAI Using Robust Schemas, Tool Injection, and Model-Agnostic Release

In this tutorial, we build a production-ready workflow that prioritizes reliability over effort-intensive production by enforcing strong, typed results at every step. We use PydanticAI to define transparent response schemes, wire in tools with dependency injection, and ensure that the agent can safely interact with external systems, such as databases, without breaking the law. By implementing everything in a notebook-friendly, async-first set, we show how to move beyond weak chatbot patterns to robust agent systems suited for real business workflows.

!pip -q install "pydantic-ai-slim[openai]" pydantic


import os, json, sqlite3
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Literal, Optional, List


from pydantic import BaseModel, Field, field_validator
from pydantic_ai import Agent, RunContext, ModelRetry


if not os.environ.get("OPENAI_API_KEY"):
   try:
       from google.colab import userdata
       os.environ["OPENAI_API_KEY"] = (userdata.get("OPENAI_API_KEY") or "").strip()
   except Exception:
       pass


if not os.environ.get("OPENAI_API_KEY"):
   import getpass
   os.environ["OPENAI_API_KEY"] = getpass.getpass("Paste your OPENAI_API_KEY: ").strip()


assert os.environ.get("OPENAI_API_KEY"), "OPENAI_API_KEY is required."

We set up the workspace and make sure that all the required libraries are available for the agent to work properly. We securely upload the OpenAI API key in a Colab-compatible way so that the tutorial works without manual configuration changes. We also import all important dependencies that will be shared across schemas, tools, and agent logic.

Priority = Literal["low", "medium", "high", "critical"]
ActionType = Literal["create_ticket", "update_ticket", "query_ticket", "list_open_tickets", "no_action"]
Confidence = Literal["low", "medium", "high"]


class TicketDraft(BaseModel):
   title: str = Field(..., min_length=8, max_length=120)
   customer: str = Field(..., min_length=2, max_length=60)
   priority: Priority
   category: Literal["billing", "bug", "feature_request", "security", "account", "other"]
   description: str = Field(..., min_length=20, max_length=1000)
   expected_outcome: str = Field(..., min_length=10, max_length=250)


class AgentDecision(BaseModel):
   action: ActionType
   reason: str = Field(..., min_length=20, max_length=400)
   confidence: Confidence
   ticket: Optional[TicketDraft] = None
   ticket_id: Optional[int] = None
   follow_up_questions: List[str] = Field(default_factory=list, max_length=5)


   @field_validator("follow_up_questions")
   @classmethod
   def short_questions(cls, v):
       for q in v:
           if len(q) > 140:
               raise ValueError("Each follow-up question must be <= 140 characters.")
       return v

We define robust data models that act as a contract between the agent and the rest of the system. We use typed fields and validation rules to ensure that every agent response follows a predictable structure. By enforcing these schemas, we prevent negative results from silently propagating through the workflow.

@dataclass
class SupportDeps:
   db: sqlite3.Connection
   tenant: str
   policy: dict


def utc_now_iso() -> str:
   return datetime.now(timezone.utc).isoformat()


def init_db() -> sqlite3.Connection:
   conn = sqlite3.connect(":memory:", check_same_thread=False)
   conn.execute("""
       CREATE TABLE tickets (
           id INTEGER PRIMARY KEY AUTOINCREMENT,
           tenant TEXT NOT NULL,
           title TEXT NOT NULL,
           customer TEXT NOT NULL,
           priority TEXT NOT NULL,
           category TEXT NOT NULL,
           description TEXT NOT NULL,
           expected_outcome TEXT NOT NULL,
           status TEXT NOT NULL,
           created_at TEXT NOT NULL,
           updated_at TEXT NOT NULL
       );
   """)
   conn.commit()
   return conn


def seed_ticket(db: sqlite3.Connection, tenant: str, ticket: TicketDraft, status: str = "open") -> int:
   now = utc_now_iso()
   cur = db.execute(
       """
       INSERT INTO tickets
           (tenant, title, customer, priority, category, description, expected_outcome, status, created_at, updated_at)
       VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
       """,
       (
           tenant,
           ticket.title,
           ticket.customer,
           ticket.priority,
           ticket.category,
           ticket.description,
           ticket.expected_outcome,
           status,
           now,
           now,
       ),
   )
   db.commit()
   return int(cur.lastrowid)

We create a dependency layer and implement a lightweight SQLite database for processing. We model real-world runtime dependencies, such as database connections and employer policies, and have them injected into the agent. We also define helper functions that securely enter and manage ticket data while it is still running.

def build_agent(model_name: str) -> Agent[SupportDeps, AgentDecision]:
   agent = Agent(
       f"openai:{model_name}",
       output_type=AgentDecision,
       output_retries=2,
       instructions=(
           "You are a production support triage agent.n"
           "Return an output that matches the AgentDecision schema.n"
           "Use tools when you need DB state.n"
           "Never invent ticket IDs.n"
           "If the user intent is unclear, ask concise follow-up questions.n"
       ),
   )


   @agent.tool
   def create_ticket(ctx: RunContext[SupportDeps], ticket: TicketDraft) -> int:
       deps = ctx.deps
       if ticket.priority in ("critical", "high") and deps.policy.get("require_security_phrase_for_critical", False):
           if ticket.category == "security" and "incident" not in ticket.description.lower():
               raise ModelRetry("For security high/critical, include the word 'incident' in description and retry.")
       return seed_ticket(deps.db, deps.tenant, ticket, status="open")


   @agent.tool
   def update_ticket_status(
       ctx: RunContext[SupportDeps],
       ticket_id: int,
       status: Literal["open", "in_progress", "resolved", "closed"],
   ) -> dict:
       deps = ctx.deps
       now = utc_now_iso()
       cur = deps.db.execute("SELECT id FROM tickets WHERE tenant=? AND id=?", (deps.tenant, ticket_id))
       if not cur.fetchone():
           raise ModelRetry(f"Ticket {ticket_id} not found for this tenant. Ask for the correct ticket_id.")
       deps.db.execute(
           "UPDATE tickets SET status=?, updated_at=? WHERE tenant=? AND id=?",
           (status, now, deps.tenant, ticket_id),
       )
       deps.db.commit()
       return {"ticket_id": ticket_id, "status": status, "updated_at": now}


   @agent.tool
   def query_ticket(ctx: RunContext[SupportDeps], ticket_id: int) -> dict:
       deps = ctx.deps
       cur = deps.db.execute(
           """
           SELECT id, title, customer, priority, category, status, created_at, updated_at
           FROM tickets WHERE tenant=? AND id=?
           """,
           (deps.tenant, ticket_id),
       )
       row = cur.fetchone()
       if not row:
           raise ModelRetry(f"Ticket {ticket_id} not found. Ask the user for a valid ticket_id.")
       keys = ["id", "title", "customer", "priority", "category", "status", "created_at", "updated_at"]
       return dict(zip(keys, row))


   @agent.tool
   def list_open_tickets(ctx: RunContext[SupportDeps], limit: int = 5) -> list:
       deps = ctx.deps
       limit = max(1, min(int(limit), 20))
       cur = deps.db.execute(
           """
           SELECT id, title, priority, category, status, updated_at
           FROM tickets
           WHERE tenant=? AND status IN ('open','in_progress')
           ORDER BY updated_at DESC
           LIMIT ?
           """,
           (deps.tenant, limit),
       )
       rows = cur.fetchall()
       return [
           {"id": r[0], "title": r[1], "priority": r[2], "category": r[3], "status": r[4], "updated_at": r[5]}
           for r in rows
       ]


   @agent.output_validator
   def validate_decision(ctx: RunContext[SupportDeps], out: AgentDecision) -> AgentDecision:
       deps = ctx.deps
       if out.action == "create_ticket" and out.ticket is None:
           raise ModelRetry("You chose create_ticket but did not provide ticket. Provide ticket fields and retry.")
       if out.action in ("update_ticket", "query_ticket") and out.ticket_id is None:
           raise ModelRetry("You chose update/query but did not provide ticket_id. Ask for ticket_id and retry.")
       if out.ticket and out.ticket.priority == "critical" and not deps.policy.get("allow_critical", True):
           raise ModelRetry("This tenant does not allow 'critical'. Downgrade to 'high' and retry.")
       return out


   return agent

It contains the core agent logic for integrating the model-agnostic PydanticAI agent. We register typed tools for creating, querying, reviewing, and listing tickets, allowing the agent to interact with the external environment in a controlled manner. We also use output validation so that the agent can correct itself whenever its decisions violate business rules.

db = init_db()
deps = SupportDeps(
   db=db,
   tenant="acme_corp",
   policy={"allow_critical": True, "require_security_phrase_for_critical": True},
)


seed_ticket(
   db,
   deps.tenant,
   TicketDraft(
       title="Double-charged on invoice 8831",
       customer="Riya",
       priority="high",
       category="billing",
       description="Customer reports they were billed twice for invoice 8831 and wants a refund and confirmation email.",
       expected_outcome="Issue a refund and confirm resolution to customer.",
   ),
)
seed_ticket(
   db,
   deps.tenant,
   TicketDraft(
       title="App crashes on login after update",
       customer="Sam",
       priority="high",
       category="bug",
       description="After latest update, the app crashes immediately on login. Reproducible on two devices; needs investigation.",
       expected_outcome="Provide a fix or workaround and restore successful logins.",
   ),
)


agent = build_agent("gpt-4o-mini")


async def run_case(prompt: str):
   res = await agent.run(prompt, deps=deps)
   out = res.output
   print(json.dumps(out.model_dump(), indent=2))
   return out


case_a = await run_case(
   "We suspect account takeover: multiple password reset emails and unauthorized logins. "
   "Customer=Leila. Priority=critical. Open a security ticket."
)


case_b = await run_case("List our open tickets and summarize what to tackle first.")


case_c = await run_case("What is the status of ticket 1? If it's open, move it to in_progress.")


agent_alt = build_agent("gpt-4o")
alt_res = await agent_alt.run(
   "Create a feature request ticket: customer=Noah wants 'export to CSV' in analytics dashboard; priority=medium.",
   deps=deps,
)


print(json.dumps(alt_res.output.model_dump(), indent=2))

We put everything together by entering the initial data and running the agent in parallel, in a notebook-safe way. We create several real-world scenarios to demonstrate how the agent triggers, calls tools, and returns valid schema results. We also show how easily we can change the underlying model while keeping the same workflow and credentials intact.

In conclusion, we have shown how a type-safe agent can reason, call tools, validate its output, and recover from errors without manual intervention. We kept the logic model-agnostic, allowing us to change underlying LLMs while maintaining the same schemas and tools, which is important for long-term sustainability. Overall, we have shown that combining robust schema implementation, dependency injection, and async implementation closes the reliability gap in agent AI and provides a solid foundation for building reliable production systems.


Check it out Full Codes Here. Also, feel free to follow us Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to Our newspaper. Wait! are you on telegram? now you can join us on telegram too.


Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button