· last year · Aug 28, 2024, 12:10 AM
1from typing import List, Dict, Any, Union, Iterable, Callable, Hashable, TypeVar
2from dataclasses import dataclass, field
3import functools
4from operator import itemgetter
5import os
6import logging
7import structlog
8from dotenv import load_dotenv
9from tenacity import retry, stop_after_attempt, wait_exponential
10
11from langchain_openai import ChatOpenAI, OpenAIEmbeddings
12from langchain.agents import AgentExecutor, create_openai_functions_agent
13from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
14from langchain_core.tools import tool
15from langchain_core.output_parsers import StrOutputParser
16from langchain.text_splitter import RecursiveCharacterTextSplitter
17from langchain.document_loaders import PyMuPDFLoader
18from langchain_community.vectorstores import Qdrant
19from langgraph.graph import StateGraph, END
20
21# Load environment variables from .env file for better security management
22load_dotenv()
23
24# Improved API Key Handling
25OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
26if not OPENAI_API_KEY:
27 raise ValueError("OPENAI_API_KEY is not set in environment variables")
28
29# Structlog configuration with context-specific error handling
30structlog.configure(
31 processors=[
32 structlog.processors.TimeStamper(fmt="iso"),
33 structlog.processors.JSONRenderer()
34 ],
35 logger_factory=structlog.stdlib.LoggerFactory(),
36)
37logger = structlog.get_logger()
38
39# LLM Initialization with prompt caching consideration
40llm = ChatOpenAI(model="gpt-4o-mini", api_key=OPENAI_API_KEY)
41cot_prompt = ChatPromptTemplate.from_template(
42 "Before providing your final answer, list the logical steps or thoughts leading to your conclusion."
43)
44
45# Enhanced Error Handling and RAG Chain Setup with retry mechanism
46@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
47def setup_rag_chain(pdf_url: str, collection_name: str) -> Union[Any, None]:
48 try:
49 docs = PyMuPDFLoader(pdf_url).load()
50 logger.info("Successfully loaded PDF", pdf_url=pdf_url)
51 except FileNotFoundError as e:
52 logger.error("File not found while loading PDF", pdf_url=pdf_url, error=str(e))
53 return None
54 except Exception as e:
55 logger.error("Unexpected error occurred while loading PDF", pdf_url=pdf_url, error=str(e))
56 return None
57
58 text_splitter = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=0)
59 splits = text_splitter.split_documents(docs)
60
61 try:
62 embedding_model = OpenAIEmbeddings(model="text-embedding-3-small")
63 # Consider persistent storage for larger or production use cases
64 vectorstore = Qdrant.from_documents(splits, embedding_model, location=":memory:", collection_name=collection_name)
65 retriever = vectorstore.as_retriever()
66 logger.info(f"Successfully set up vector store and retriever: {collection_name}")
67 except Exception as e:
68 logger.error("Failed to set up vector store and retriever", collection_name=collection_name, error=str(e))
69 return None
70
71 rag_prompt = ChatPromptTemplate.from_template(
72 "Context: {context}\n\nQuery: {question}\n\nUse the context to answer the query in a step-by-step manner."
73 )
74
75 rag_chain = (
76 {"context": lambda question: retriever.retrieve(question), "question": itemgetter("question")}
77 | rag_prompt
78 | llm
79 | StrOutputParser()
80 )
81
82 return rag_chain
83
84# Setup RAG Chain with Improved Error Handling
85handbook_rag_chain = setup_rag_chain(
86 "https://static.e-publishing.af.mil/production/1/af_a1/publication/afh1/afh1.pdf",
87 "pilot_demo_airforce_handbook"
88)
89
90# Dynamic Role Assignment with enhanced state management
91def dynamic_role_assignment(state: Dict[str, Any]) -> str:
92 if "critical" in state["mission_progress"]:
93 return "Pilot"
94 elif "navigation" in state["mission_progress"]:
95 return "CSO"
96 return "Copilot"
97
98# Logging Agent Actions with context-specific details
99def log_agent_action(agent_name: str, action: str, result: str) -> None:
100 logger.info("Agent action executed", agent_name=agent_name, action=action, result=result)
101
102# TypeVar for tool functions
103T = TypeVar('T', bound=Any)
104
105# Tool definition with PEP 8 compliance and new Python 3.12 features
106@tool("retrieve_information", description="Provides detailed information from the 'Air Force Handbook'.")
107def retrieve_information(query: str) -> str:
108 if handbook_rag_chain:
109 return handbook_rag_chain.invoke({"question": query})
110 else:
111 logger.error("RAG chain is not initialized.")
112 return "RAG chain is not initialized."
113
114# Agent and Supervision Creation Functions with CoT integration
115def create_agent(llm: ChatOpenAI, tools: List[Any], system_prompt: str) -> AgentExecutor:
116 prompt = ChatPromptTemplate.from_messages([
117 ("system", system_prompt),
118 MessagesPlaceholder(variable_name="messages"),
119 MessagesPlaceholder(variable_name="agent_scratchpad"),
120 ])
121 agent = create_openai_functions_agent(llm, tools, prompt | cot_prompt)
122 return AgentExecutor(agent=agent, tools=tools)
123
124# Create agents with descriptive roles and CoT prompting
125pilot_agent = create_agent(llm, [retrieve_information], "You are a fully qualified pilot. Speak only as your role.")
126copilot_agent = create_agent(llm, [retrieve_information], "You are a fully qualified copilot. Speak only as your role.")
127cso_agent = create_agent(llm, [retrieve_information], "You are a fully qualified Combat Systems Operator. Speak only as your role.")
128
129# Graph Setup with Multi-Agent Communication and enhanced state management
130@dataclass
131class PilotTeamState:
132 messages: Annotated[List[str], functools.partial(itemgetter, 0)] = field(default_factory=list)
133 next: str = "Pilot"
134
135def create_agent_node(agent: AgentExecutor, name: str):
136 def agent_node(state: Dict[str, Any]) -> Dict[str, Any]:
137 try:
138 result = agent.invoke(state)
139 reasoning = f"{name} reasoning: {result['output'].get('reasoning', 'N/A')}"
140 log_agent_action(name, "invoke", result['output']['answer'])
141 return {"messages": [f"{name}: {result['output']['answer']}", reasoning]}
142 except Exception as e:
143 logger.error("Error in agent", agent_name=name, error=str(e))
144 return {"messages": [f"{name}: Error occurred, reverting to fallback action."]}
145 return agent_node
146
147# Define nodes for the Pilot, Copilot, and CSO agents
148pilot_node = create_agent_node(pilot_agent, "Pilot")
149copilot_node = create_agent_node(copilot_agent, "Copilot")
150cso_node = create_agent_node(cso_agent, "CSO")
151
152# Define and compile the command and control (C2) graph
153candc_graph = StateGraph(PilotTeamState)
154
155candc_graph.add_node("Pilot", pilot_node)
156candc_graph.add_node("Copilot", copilot_node)
157candc_graph.add_node("CSO", cso_node)
158candc_graph.add_edge("Pilot", "candc")
159candc_graph.add_edge("Copilot", "candc")
160candc_graph.add_edge("CSO", "candc")
161candc_graph.add_conditional_edges(
162 "candc",
163 lambda x: x["next"],
164 {
165 "Pilot": "Pilot",
166 "Copilot": "Copilot",
167 "CSO": "CSO",
168 "FINISH": END,
169 },
170)
171
172candc_graph.set_entry_point("candc")
173chain = candc_graph.compile()
174
175# Enhanced state update function with clear separation of concerns
176def update_mission_state(state: Dict[str, Any], new_data: List[str]) -> Dict[str, Any]:
177 state["mission_progress"].append(new_data)
178 state["next"] = dynamic_role_assignment(state)
179 return state
180
181# Scenario execution with detailed logging
182def run_scenario(scenario: str) -> None:
183 messages = [scenario]
184 state = {"messages": messages, "mission_progress": []}
185
186 while True:
187 result = chain.invoke(state)
188 if "__end__" in result:
189 logger.info("Mission scenario completed successfully.")
190 break
191 messages.extend(result["messages"])
192 state = update_mission_state(state, result["messages"])
193 print("\n".join(result["messages"]))
194 print("---")
195
196# Example scenario for running the simulation
197scenario = """
198Mission Brief: Operation Ocean Guardian
199
200A cargo ship, the SS Meridian, has been missing for 72 hours in the North Atlantic. Your mission is to locate the ship using a P-8 Poseidon aircraft. The crew consists of a Pilot, Copilot, and Combat Systems Operator (CSO).
201
202Objectives:
203
204Navigate to the last known coordinates of the SS Meridian.
205Search for the ship using the aircraft's sensors.
206Communicate findings and coordinate with search and rescue teams.
207Execute the mission, with each crew member performing their role.
208"""
209
210run_scenario(scenario)
211