← LOGBOOK LOG-300
WORKING · SOFTWARE ·
LANGGRAPHLANGCHAINLLMAGENTSSTATE-MACHINEPYTHONPIPELINEAI

LangGraph for Multi-Step AI Pipelines

Using LangGraph's StateGraph to wire LLM nodes into a DAG — how TypedDict state flows through extraction, context-building, and note generation stages without turning into callback soup.

Goal

Build a multi-step AI pipeline that takes a scanned exam PDF, extracts questions, and generates study notes per question — across 5 distinct LLM stages. The challenge: managing state cleanly across stages without tangling it into a mess of function calls and global variables.

LangGraph solves this with a StateGraph — a directed acyclic graph where each node is a function that receives the current state, does its work, and returns the fields it wants to update.

The State Shape

Everything the pipeline knows lives in two TypedDicts: a top-level State and a nested QuestionState.

class QuestionState(TypedDict):
    question: str
    topic: Optional[str]
    key_concepts: Optional[List[str]]
    note_help: Optional[str]
    first_pass_note: Optional[str]
    second_pass_note: Optional[str]
    reviewed_note: Optional[str]
    final_answer: Optional[str]

class State(TypedDict):
    pdf_path: str
    paper_text: Optional[str]
    semester: Optional[str]
    course: Optional[str]
    subject: Optional[str]
    paper_code: Optional[str]
    scheme: Optional[str]
    total_marks: Optional[int]
    questions: List[QuestionState]

Every Optional field starts as None. Nodes only fill in what they own — the rest passes through untouched. This makes each node’s scope explicit from the type signature alone.

Wiring the Graph

from langgraph.graph import END, START, StateGraph

workflow = StateGraph(State)

workflow.add_node("ocr_extractor", ocr_extractor)
workflow.add_node("extraction_agent_node", extraction_agent_node)
workflow.add_node("parallel_question_processor", parallel_question_processor)

workflow.add_edge("ocr_extractor", "extraction_agent_node")
workflow.add_edge("extraction_agent_node", "parallel_question_processor")

workflow.set_entry_point("ocr_extractor")
workflow.set_finish_point("parallel_question_processor")

app = workflow.compile()

The compiled graph runs with a single call:

result = app.invoke({"pdf_path": "exam.pdf"})

LangGraph handles routing between nodes and merges partial state returns. A node returning {"questions": [...]} updates only that key — it doesn’t need to pass the entire state back.

Visualizing the DAG

One practical benefit of the graph abstraction: you can render it.

from IPython.display import Image, display
display(Image(app.get_graph().draw_mermaid_png()))

Shows the node topology directly in a Jupyter cell. Useful for debugging edge misconnections before running any LLMs.

What the Pattern Buys

Compared to chaining functions with result = fn_a(fn_b(fn_c(input))):

ApproachState ManagementDebuggabilityExtensibility
Nested function callsManually threadedHard to isolateRe-thread everything
LangGraph StateGraphDeclared, typedIsolate per nodeAdd node + edge

The real payoff shows up when adding a new stage. Adding a fact_checker_node between teacher_evaluate and clarity_booster is three lines: define the function, add_node, add_edge. The rest of the graph doesn’t need to know.

What to Watch

Nested state mutation. The questions list inside State is a list of dicts. When a downstream node updates individual questions, it needs to return the full updated list — partial list updates don’t merge automatically the way scalar fields do. This caught some bugs: returning {"questions": [updated_q]} replaced the list with a single element rather than patching one entry.

“Parallel” isn’t actually parallel (yet). The parallel_question_processor node iterates questions sequentially in a loop. LangGraph supports fan-out to true parallel branches, but that requires splitting each question into its own subgraph path and joining them back — more setup, warranted once the question count grows past ~10.

No retry logic built in. If Gemini times out mid-pipeline, the whole run fails. LangGraph doesn’t add retry semantics automatically — wrap individual node calls with try/except and either retry or log + skip.

What’s Next

  • Wire up fan-out parallel processing per question — each QuestionState gets its own branch
  • Add a conditional edge: if final_answer passes a quality check, skip to storage; otherwise route back to teacher evaluation
  • Try LangGraph Cloud for persistent state across runs (resume a failed pipeline without re-running OCR)