AI Agent正在成为AI应用的新范式,从简单的问答机器人到能够自主执行复杂任务的智能助手。本文将手把手教您搭建第一个AI Agent,涵盖从环境搭建到功能实现的全过程,让您快速入门AI Agent开发。

AI Agent基础概念

什么是AI Agent?

AI Agent(人工智能代理)是指能够感知环境、自主决策并执行任务的智能实体。

1
2
传统AI应用:用户输入 → AI处理 → 输出结果
AI Agent应用:用户需求 → Agent分析 → 自主执行 → 持续优化

核心特征

  • 自主性:能够独立执行任务,无需实时监督
  • 学习能力:通过经验改进性能
  • 适应性:能适应不同的环境和需求变化
  • 协作性:可以与其他Agent或人类协作

AI Agent的分类

按复杂度分层

  1. 简单Agent:基于规则的反应式Agent
  2. 学习Agent:具备学习能力的适应性Agent
  3. 认知Agent:具有复杂推理和规划能力的Agent

按功能分层

  1. 任务执行Agent:专注于特定任务的执行
  2. 对话Agent:以对话为主的交互式Agent
  3. 自主Agent:具备高度自主决策能力的Agent

开发环境搭建

1. Python开发环境

环境要求

1
2
3
4
5
6
# Python版本要求
Python >= 3.8

# 推荐使用conda管理环境
conda create -n ai-agent python=3.10
conda activate ai-agent

核心依赖安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 基础依赖
pip install openai python-dotenv requests

# AI Agent框架
pip install langchain langchain-openai

# 工具集成
pip install google-search-results serpapi
pip install wikipedia-api
pip install yfinance # 金融数据

# 开发工具
pip install jupyter notebook
pip install streamlit # Web界面

2. API密钥配置

创建环境配置文件

1
2
3
4
# .env文件
OPENAI_API_KEY=your_openai_api_key_here
SERPAPI_API_KEY=your_serpapi_key_here
GOOGLE_API_KEY=your_google_api_key_here

安全配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# config.py
import os
from dotenv import load_dotenv

load_dotenv()

class Config:
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
SERPAPI_API_KEY = os.getenv("SERPAPI_API_KEY")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")

# Agent配置
AGENT_MODEL = "gpt-4"
AGENT_TEMPERATURE = 0.7
AGENT_MAX_TOKENS = 2000

# 工具配置
MAX_SEARCH_RESULTS = 5
SEARCH_ENGINE = "google"

第一个AI Agent:智能问答助手

1. 基础Agent架构

创建Agent基类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// BaseAgent.java
import java.util.Map;
import java.util.List;
import java.util.logging.Logger;
import java.util.logging.Level;

public abstract class BaseAgent {
protected Map<String, Object> config;
protected Logger logger;

public BaseAgent(Map<String, Object> config) {
this.config = config;
this.logger = Logger.getLogger(this.getClass().getName());
setupLogging();
}

private void setupLogging() {
Logger.getGlobal().setLevel(Level.INFO);
}

public abstract String processRequest(String request);

public abstract List<String> getCapabilities();

protected void logRequest(String request) {
logger.info("Processing request: " + request.substring(0, Math.min(100, request.length())) + "...");
}

protected void logResponse(String response) {
logger.info("Generated response: " + response.length() + " characters");
}
}

实现简单问答Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// SimpleQAAgent.java
import java.util.Arrays;
import java.util.List;

public class SimpleQAAgent extends BaseAgent {
private String apiKey;
private String model;
private int maxTokens;
private double temperature;

public SimpleQAAgent(Map<String, Object> config) {
super(config);
this.apiKey = (String) config.get("OPENAI_API_KEY");
this.model = (String) config.getOrDefault("AGENT_MODEL", "gpt-3.5-turbo");
this.maxTokens = ((Number) config.getOrDefault("AGENT_MAX_TOKENS", 1000)).intValue();
this.temperature = ((Number) config.getOrDefault("AGENT_TEMPERATURE", 0.7)).doubleValue();
}

@Override
public List<String> getCapabilities() {
return Arrays.asList(
"回答一般问题",
"提供信息解释",
"进行简单对话"
);
}

@Override
public String processRequest(String request) {
logRequest(request);

try {
// 使用OpenAI Java SDK调用API
OpenAIClient client = new OpenAIClient(apiKey);

ChatCompletionRequest completionRequest = ChatCompletionRequest.builder()
.model(model)
.messages(Arrays.asList(
new ChatMessage("system", "你是一个友好的AI助手,请用中文回答用户的问题。"),
new ChatMessage("user", request)
))
.maxTokens(maxTokens)
.temperature(temperature)
.build();

ChatCompletionResult result = client.createChatCompletion(completionRequest);
String answer = result.getChoices().get(0).getMessage().getContent();

logResponse(answer);
return answer;

} catch (Exception e) {
logger.severe("Error processing request: " + e.getMessage());
return "抱歉,我遇到了一些问题,请稍后再试。";
}
}
}

2. 运行第一个Agent

创建主程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// Main.java
import java.util.Map;
import java.util.HashMap;
import java.util.Scanner;

public class Main {
public static void main(String[] args) {
// 配置信息
Map<String, Object> config = new HashMap<>();
config.put("OPENAI_API_KEY", "your_openai_api_key_here");
config.put("AGENT_MODEL", "gpt-3.5-turbo");
config.put("AGENT_MAX_TOKENS", 1000);
config.put("AGENT_TEMPERATURE", 0.7);

// 创建Agent实例
SimpleQAAgent agent = new SimpleQAAgent(config);

// 显示Agent能力
System.out.println("🤖 AI Agent 已启动!");
System.out.println("📋 能力列表:");
for (String capability : agent.getCapabilities()) {
System.out.println(" • " + capability);
}

System.out.println("\n💬 输入您的问题(输入 'quit' 退出):");

Scanner scanner = new Scanner(System.in);
while (true) {
System.out.print("\n👤 您:");
String userInput = scanner.nextLine();

if (userInput.toLowerCase().matches("quit|exit|q")) {
System.out.println("👋 再见!");
break;
}

String response = agent.processRequest(userInput);
System.out.println("🤖 Agent:" + response);
}

scanner.close();
}
}

运行测试

1
2
3
4
5
# 编译Java代码
javac -cp ".:lib/*" *.java

# 运行Agent
java -cp ".:lib/*" Main

增强Agent:添加工具调用能力

1. 工具系统设计

定义工具接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// BaseTool.java
import java.util.Map;
import java.util.List;

public abstract class BaseTool {
public abstract String getName();

public abstract String getDescription();

public abstract String execute(Map<String, Object> kwargs);

public Map<String, Object> getSchema() {
Map<String, Object> schema = new java.util.HashMap<>();
schema.put("name", getName());
schema.put("description", getDescription());

Map<String, Object> parameters = new java.util.HashMap<>();
parameters.put("type", "object");
parameters.put("properties", getParametersSchema());
parameters.put("required", getRequiredParameters());

schema.put("parameters", parameters);
return schema;
}

public abstract Map<String, Object> getParametersSchema();

public abstract List<String> getRequiredParameters();
}

实现搜索工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// SearchTool.java
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
import java.util.HashMap;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.URI;
import com.fasterxml.jackson.databind.ObjectMapper;

public class SearchTool extends BaseTool {
private String apiKey;
private String baseUrl;
private HttpClient httpClient;
private ObjectMapper objectMapper;

public SearchTool(String apiKey) {
this.apiKey = apiKey;
this.baseUrl = "https://serpapi.com/search";
this.httpClient = HttpClient.newHttpClient();
this.objectMapper = new ObjectMapper();
}

@Override
public String getName() {
return "web_search";
}

@Override
public String getDescription() {
return "搜索网页信息,返回相关结果";
}

@Override
public String execute(Map<String, Object> kwargs) {
String query = (String) kwargs.get("query");
Integer numResults = kwargs.containsKey("num_results") ?
(Integer) kwargs.get("num_results") : 5;

try {
String url = baseUrl + "?q=" + java.net.URLEncoder.encode(query, "UTF-8") +
"&api_key=" + apiKey + "&num=" + numResults;

HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.GET()
.build();

HttpResponse<String> response = httpClient.send(request,
HttpResponse.BodyHandlers.ofString());

Map<String, Object> data = objectMapper.readValue(response.body(), Map.class);
List<Map<String, Object>> organicResults = (List<Map<String, Object>>)
data.getOrDefault("organic_results", new ArrayList<>());

List<Map<String, Object>> results = new ArrayList<>();
for (int i = 0; i < Math.min(numResults, organicResults.size()); i++) {
Map<String, Object> result = organicResults.get(i);
Map<String, Object> resultMap = new HashMap<>();
resultMap.put("title", result.getOrDefault("title", ""));
resultMap.put("link", result.getOrDefault("link", ""));
resultMap.put("snippet", result.getOrDefault("snippet", ""));
results.add(resultMap);
}

return formatResults(results);

} catch (Exception e) {
return "搜索失败:" + e.getMessage();
}
}

private String formatResults(List<Map<String, Object>> results) {
if (results.isEmpty()) {
return "未找到相关结果";
}

StringBuilder formatted = new StringBuilder("🔍 搜索结果:\n\n");
for (int i = 0; i < results.size(); i++) {
Map<String, Object> result = results.get(i);
formatted.append(String.format("%d. %s\n", i + 1, result.get("title")));
formatted.append(String.format(" %s\n", result.get("link")));
String snippet = (String) result.get("snippet");
formatted.append(String.format(" %s...\n\n",
snippet.substring(0, Math.min(100, snippet.length()))));
}

return formatted.toString();
}

@Override
public Map<String, Object> getParametersSchema() {
Map<String, Object> schema = new HashMap<>();

Map<String, Object> querySchema = new HashMap<>();
querySchema.put("type", "string");
querySchema.put("description", "搜索查询关键词");

Map<String, Object> numResultsSchema = new HashMap<>();
numResultsSchema.put("type", "integer");
numResultsSchema.put("description", "返回结果数量");
numResultsSchema.put("default", 5);
numResultsSchema.put("minimum", 1);
numResultsSchema.put("maximum", 10);

schema.put("query", querySchema);
schema.put("num_results", numResultsSchema);

return schema;
}

@Override
public List<String> getRequiredParameters() {
return java.util.Arrays.asList("query");
}

实现计算器工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// CalculatorTool.java
import java.util.Map;
import java.util.List;
import java.util.Arrays;
import java.util.HashMap;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;

public class CalculatorTool extends BaseTool {
private ScriptEngine engine;

public CalculatorTool() {
ScriptEngineManager mgr = new ScriptEngineManager();
this.engine = mgr.getEngineByName("JavaScript");
}

@Override
public String getName() {
return "calculator";
}

@Override
public String getDescription() {
return "执行数学计算,支持基本运算和高级函数";
}

@Override
public String execute(Map<String, Object> kwargs) {
String expression = (String) kwargs.get("expression");

try {
// 使用JavaScript引擎安全评估数学表达式
Object result = engine.eval(expression);
return String.format("计算结果:%s = %s", expression, result);

} catch (ScriptException e) {
return "计算错误:" + e.getMessage();
}
}

@Override
public Map<String, Object> getParametersSchema() {
Map<String, Object> schema = new HashMap<>();

Map<String, Object> expressionSchema = new HashMap<>();
expressionSchema.put("type", "string");
expressionSchema.put("description", "数学表达式,如 '2 + 3 * 4' 或 'Math.sin(Math.PI/2)'");

schema.put("expression", expressionSchema);
return schema;
}

@Override
public List<String> getRequiredParameters() {
return Arrays.asList("expression");
}
}

2. 智能Agent实现

创建工具管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# tool_manager.py
from typing import Dict, List
from .tools.base_tool import BaseTool

class ToolManager:
def __init__(self):
self.tools: Dict[str, BaseTool] = {}

def register_tool(self, tool: BaseTool):
self.tools[tool.name] = tool

def get_tool(self, name: str) -> BaseTool:
return self.tools.get(name)

def get_all_tools(self) -> List[BaseTool]:
return list(self.tools.values())

def get_tools_schema(self) -> List[Dict]:
return [tool.get_schema() for tool in self.tools.values()]

def execute_tool(self, tool_name: str, **kwargs) -> str:
tool = self.get_tool(tool_name)
if not tool:
return f"工具 '{tool_name}' 不存在"

try:
return tool.execute(**kwargs)
except Exception as e:
return f"工具执行失败:{str(e)}"

实现智能Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# smart_agent.py
import json
from .base_agent import BaseAgent
from .tool_manager import ToolManager

class SmartAgent(BaseAgent):
def __init__(self):
super().__init__(Config().__dict__)
self.tool_manager = ToolManager()
self._setup_tools()
self.conversation_history = []

def _setup_tools(self):
# 注册工具
from .tools.search_tool import SearchTool
from .tools.calculator_tool import CalculatorTool

self.tool_manager.register_tool(
SearchTool(self.config.get("SERPAPI_API_KEY"))
)
self.tool_manager.register_tool(CalculatorTool())

def get_capabilities(self) -> List[str]:
capabilities = [
"智能问答",
"网页搜索",
"数学计算",
"多工具协同"
]

# 添加工具能力
for tool in self.tool_manager.get_all_tools():
capabilities.append(f"使用{tool.description}")

return capabilities

def process_request(self, request: str) -> str:
self.log_request(request)

# 添加到对话历史
self.conversation_history.append({"role": "user", "content": request})

try:
# 构建系统提示
system_prompt = self._build_system_prompt()

# 调用AI模型
response = self._call_ai_model(system_prompt, request)

# 解析响应
result = self._parse_response(response)

self.log_response(result)
return result

except Exception as e:
self.logger.error(f"Error processing request: {e}")
return "抱歉,我遇到了一些问题,请稍后再试。"

def _build_system_prompt(self) -> str:
tools_schema = self.tool_manager.get_tools_schema()

prompt = """你是一个智能AI助手,可以使用各种工具来帮助用户解决问题。

可用工具:
"""

for tool in tools_schema:
prompt += f"- {tool['name']}: {tool['description']}\n"

prompt += """

当用户提出需要工具辅助的问题时,请按以下格式回复:
{
"thought": "分析用户需求并选择合适的工具",
"tool": "工具名称",
"parameters": {
"参数名": "参数值"
}
}

如果不需要使用工具,直接回答用户的问题。

请用中文回复。"""

return prompt

def _call_ai_model(self, system_prompt: str, user_request: str):
import openai

openai.api_key = self.config.get("OPENAI_API_KEY")

messages = [
{"role": "system", "content": system_prompt}
]

# 添加对话历史
messages.extend(self.conversation_history[-10:]) # 只保留最近10条

response = openai.ChatCompletion.create(
model=self.config.get("AGENT_MODEL", "gpt-4"),
messages=messages,
max_tokens=self.config.get("AGENT_MAX_TOKENS", 2000),
temperature=self.config.get("AGENT_TEMPERATURE", 0.7)
)

return response.choices[0].message.content

def _parse_response(self, response: str) -> str:
try:
# 尝试解析JSON格式的工具调用
tool_call = json.loads(response)

if "tool" in tool_call and "parameters" in tool_call:
# 执行工具调用
tool_name = tool_call["tool"]
parameters = tool_call["parameters"]

self.logger.info(f"Executing tool: {tool_name} with params: {parameters}")

tool_result = self.tool_manager.execute_tool(tool_name, **parameters)

# 将工具结果添加到对话历史
self.conversation_history.append({
"role": "assistant",
"content": f"工具调用结果:{tool_result}"
})

return tool_result

except json.JSONDecodeError:
# 不是工具调用,直接返回响应
pass

# 添加到对话历史
self.conversation_history.append({"role": "assistant", "content": response})

return response

Agent的记忆与学习能力

1. 对话记忆系统

实现记忆管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# memory_manager.py
from typing import List, Dict, Any
import json
from datetime import datetime

class MemoryManager:
def __init__(self, max_memory: int = 100):
self.memories: List[Dict[str, Any]] = []
self.max_memory = max_memory

def add_memory(self, memory_type: str, content: Any, metadata: Dict = None):
memory = {
"type": memory_type,
"content": content,
"timestamp": datetime.now().isoformat(),
"metadata": metadata or {}
}

self.memories.append(memory)

# 限制记忆数量
if len(self.memories) > self.max_memory:
self.memories.pop(0)

def get_memories(self, memory_type: str = None, limit: int = 10) -> List[Dict]:
memories = self.memories

if memory_type:
memories = [m for m in memories if m["type"] == memory_type]

return memories[-limit:]

def search_memories(self, query: str, limit: int = 5) -> List[Dict]:
# 简单的关键词搜索
results = []
query_lower = query.lower()

for memory in reversed(self.memories):
content_str = str(memory["content"]).lower()
if query_lower in content_str:
results.append(memory)
if len(results) >= limit:
break

return results

def save_memories(self, file_path: str):
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(self.memories, f, ensure_ascii=False, indent=2)

def load_memories(self, file_path: str):
try:
with open(file_path, 'r', encoding='utf-8') as f:
self.memories = json.load(f)
except FileNotFoundError:
self.memories = []

集成到Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# memory_enabled_agent.py
from .smart_agent import SmartAgent
from .memory_manager import MemoryManager

class MemoryEnabledAgent(SmartAgent):
def __init__(self):
super().__init__()
self.memory_manager = MemoryManager()
self.memory_file = "agent_memory.json"
self.load_memory()

def process_request(self, request: str) -> str:
# 搜索相关记忆
relevant_memories = self.memory_manager.search_memories(request, limit=3)

# 将相关记忆添加到上下文
if relevant_memories:
memory_context = "\n\n相关历史信息:\n"
for memory in relevant_memories:
memory_context += f"- {memory['content'][:100]}...\n"
else:
memory_context = ""

# 处理请求
response = super().process_request(request + memory_context)

# 保存到记忆
self.memory_manager.add_memory(
"conversation",
f"用户: {request}\n助手: {response}",
{"topic": self._extract_topic(request)}
)

# 定期保存记忆
if len(self.memory_manager.memories) % 10 == 0:
self.save_memory()

return response

def _extract_topic(self, request: str) -> str:
# 简单的主题提取
keywords = ["搜索", "计算", "问题", "帮助", "查询"]
for keyword in keywords:
if keyword in request:
return keyword
return "general"

def load_memory(self):
self.memory_manager.load_memories(self.memory_file)

def save_memory(self):
self.memory_manager.save_memories(self.memory_file)

Agent的用户界面

1. 命令行界面

增强的CLI界面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# cli_interface.py
import cmd
from .memory_enabled_agent import MemoryEnabledAgent

class AgentCLI(cmd.Cmd):
intro = "🤖 欢迎使用AI Agent!输入 'help' 查看命令,输入 'quit' 退出。"
prompt = "👤 您:"

def __init__(self):
super().__init__()
self.agent = MemoryEnabledAgent()
print("📋 Agent能力:")
for capability in self.agent.get_capabilities():
print(f" • {capability}")
print()

def default(self, line):
if line.strip():
response = self.agent.process_request(line)
print(f"🤖 Agent:{response}")
else:
print("请输入您的问题...")

def do_quit(self, arg):
"""退出Agent"""
print("👋 再见!")
self.agent.save_memory() # 保存记忆
return True

def do_help(self, arg):
"""显示帮助信息"""
print("""
可用命令:
- quit: 退出程序
- help: 显示此帮助信息

直接输入问题即可与Agent对话!

特殊功能:
- Agent会记住对话历史
- 可以调用搜索、计算等工具
- 支持多轮对话
""")

def do_memory(self, arg):
"""查看记忆"""
memories = self.agent.memory_manager.get_memories(limit=5)
if memories:
print("🧠 最近记忆:")
for memory in memories:
print(f" {memory['timestamp'][:19]}: {memory['content'][:100]}...")
else:
print("暂无记忆")

def do_clear_memory(self, arg):
"""清除记忆"""
self.agent.memory_manager.memories.clear()
self.agent.save_memory()
print("🗑️ 记忆已清除")

def main():
AgentCLI().cmdloop()

if __name__ == "__main__":
main()

2. Web界面

使用Streamlit创建Web界面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# web_interface.py
import streamlit as st
from .memory_enabled_agent import MemoryEnabledAgent

class WebInterface:
def __init__(self):
if 'agent' not in st.session_state:
st.session_state.agent = MemoryEnabledAgent()
if 'messages' not in st.session_state:
st.session_state.messages = []

def run(self):
st.title("🤖 AI Agent 智能助手")
st.markdown("---")

# 显示Agent能力
with st.expander("📋 Agent能力"):
for capability in st.session_state.agent.get_capabilities():
st.write(f"• {capability}")

# 显示对话历史
self.display_chat_history()

# 用户输入
user_input = st.text_input("输入您的问题:", key="user_input")

if st.button("发送", key="send_button") and user_input:
self.process_user_input(user_input)

# 侧边栏功能
self.sidebar_functions()

def display_chat_history(self):
st.subheader("💬 对话历史")

for message in st.session_state.messages[-10:]: # 只显示最近10条
with st.chat_message(message["role"]):
st.write(message["content"])

def process_user_input(self, user_input):
# 添加用户消息
st.session_state.messages.append({
"role": "user",
"content": user_input
})

# 获取Agent响应
with st.spinner("Agent正在思考..."):
response = st.session_state.agent.process_request(user_input)

# 添加Agent响应
st.session_state.messages.append({
"role": "assistant",
"content": response
})

# 重新运行以更新界面
st.rerun()

def sidebar_functions(self):
with st.sidebar:
st.header("🛠️ 工具面板")

if st.button("🧠 查看记忆"):
memories = st.session_state.agent.memory_manager.get_memories(limit=5)
if memories:
st.subheader("最近记忆")
for memory in memories:
st.write(f"**{memory['timestamp'][:19]}**")
st.write(memory['content'][:200] + "...")
st.divider()
else:
st.write("暂无记忆")

if st.button("🗑️ 清除记忆"):
st.session_state.agent.memory_manager.memories.clear()
st.session_state.agent.save_memory()
st.success("记忆已清除!")

if st.button("📊 Agent状态"):
st.subheader("Agent状态")
st.write(f"对话轮数: {len(st.session_state.messages) // 2}")
st.write(f"记忆数量: {len(st.session_state.agent.memory_manager.memories)}")
st.write(f"可用工具: {len(st.session_state.agent.tool_manager.get_all_tools())}")

def main():
interface = WebInterface()
interface.run()

if __name__ == "__main__":
main()

运行Web界面

1
2
3
4
5
# 安装Streamlit(如果还没安装)
pip install streamlit

# 运行Web界面
streamlit run web_interface.py

高级Agent特性

1. 多Agent协作

创建Agent协作系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# multi_agent_system.py
from typing import List, Dict
from .memory_enabled_agent import MemoryEnabledAgent

class MultiAgentSystem:
def __init__(self):
self.agents: Dict[str, MemoryEnabledAgent] = {}
self.task_router = TaskRouter()

def register_agent(self, name: str, agent: MemoryEnabledAgent):
self.agents[name] = agent

def process_request(self, request: str) -> str:
# 路由到合适的Agent
agent_name = self.task_router.route_task(request)

if agent_name in self.agents:
agent = self.agents[agent_name]
return agent.process_request(request)
else:
return "抱歉,没有找到合适的Agent来处理您的请求。"

def get_all_capabilities(self) -> Dict[str, List[str]]:
capabilities = {}
for name, agent in self.agents.items():
capabilities[name] = agent.get_capabilities()
return capabilities

class TaskRouter:
def __init__(self):
self.routing_rules = {
"search": ["search_agent"],
"calculate": ["calculator_agent"],
"general": ["general_agent"]
}

def route_task(self, request: str) -> str:
request_lower = request.lower()

for keyword, agents in self.routing_rules.items():
if keyword in request_lower:
return agents[0]

return "general_agent"

2. Agent学习与进化

实现学习系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# learning_system.py
import json
from typing import Dict, List
from datetime import datetime

class LearningSystem:
def __init__(self):
self.feedback_data = []
self.performance_metrics = {}

def collect_feedback(self, request: str, response: str, rating: int, comments: str = ""):
feedback = {
"timestamp": datetime.now().isoformat(),
"request": request,
"response": response,
"rating": rating,
"comments": comments,
"performance": self.analyze_performance(request, response)
}

self.feedback_data.append(feedback)

# 实时学习和改进
self.learn_from_feedback(feedback)

def analyze_performance(self, request: str, response: str) -> Dict:
# 分析响应质量
return {
"response_length": len(response),
"has_tools_used": self.detect_tool_usage(response),
"relevance_score": self.calculate_relevance(request, response),
"helpfulness_score": self.calculate_helpfulness(response)
}

def learn_from_feedback(self, feedback: Dict):
# 从反馈中学习
if feedback["rating"] < 3: # 低评分的反馈
self.identify_improvement_areas(feedback)

if feedback["rating"] >= 4: # 高评分的反馈
self.extract_best_practices(feedback)

def identify_improvement_areas(self, feedback: Dict):
# 识别改进领域
issues = []

if feedback["performance"]["response_length"] < 50:
issues.append("响应过于简短")

if not feedback["performance"]["has_tools_used"]:
issues.append("应该使用更多工具")

if feedback["performance"]["relevance_score"] < 0.7:
issues.append("响应相关性不足")

return issues

def extract_best_practices(self, feedback: Dict):
# 提取最佳实践
practices = []

if feedback["performance"]["helpfulness_score"] > 0.8:
practices.append({
"pattern": "highly_helpful_response",
"example": feedback["response"][:200],
"score": feedback["performance"]["helpfulness_score"]
})

return practices

def generate_improvement_report(self) -> Dict:
# 生成改进报告
report = {
"total_feedback": len(self.feedback_data),
"average_rating": self.calculate_average_rating(),
"common_issues": self.identify_common_issues(),
"best_practices": self.get_best_practices(),
"recommendations": self.generate_recommendations()
}

return report

def calculate_average_rating(self) -> float:
if not self.feedback_data:
return 0.0

total_rating = sum(f["rating"] for f in self.feedback_data)
return total_rating / len(self.feedback_data)

def identify_common_issues(self) -> List[str]:
# 识别常见问题
issue_counts = {}
for feedback in self.feedback_data:
issues = self.identify_improvement_areas(feedback)
for issue in issues:
issue_counts[issue] = issue_counts.get(issue, 0) + 1

# 返回最常见的问题
sorted_issues = sorted(issue_counts.items(), key=lambda x: x[1], reverse=True)
return [issue for issue, count in sorted_issues[:5]]

def generate_recommendations(self) -> List[str]:
# 生成改进建议
recommendations = []

avg_rating = self.calculate_average_rating()
if avg_rating < 3.0:
recommendations.append("需要显著改进响应质量")

common_issues = self.identify_common_issues()
for issue in common_issues:
if "响应过于简短" in issue:
recommendations.append("增加响应详细程度,提供更多上下文信息")
if "应该使用更多工具" in issue:
recommendations.append("增强工具集成能力,扩展可用工具集")

return recommendations

部署与运维

1. Docker容器化部署

创建Dockerfile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# Dockerfile
FROM python:3.10-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*

# 复制requirements文件
COPY requirements.txt .

# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 创建数据目录
RUN mkdir -p /app/data

# 设置环境变量
ENV PYTHONPATH=/app

# 暴露端口(如果使用Web界面)
EXPOSE 8501

# 启动命令
CMD ["python", "main.py"]

Docker Compose配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# docker-compose.yml
version: '3.8'

services:
ai-agent:
build: .
ports:
- "8501:8501"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- SERPAPI_API_KEY=${SERPAPI_API_KEY}
volumes:
- ./data:/app/data
- ./logs:/app/logs
restart: unless-stopped

# 可选:添加Redis用于缓存
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
restart: unless-stopped

volumes:
redis_data:

2. 监控和日志

添加监控功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# monitoring.py
import logging
import time
from functools import wraps
from typing import Dict, Any

class AgentMonitor:
def __init__(self):
self.metrics = {
"requests_total": 0,
"requests_success": 0,
"requests_failed": 0,
"average_response_time": 0,
"tool_usage": {},
"error_types": {}
}
self.response_times = []

def record_request(self, success: bool = True):
self.metrics["requests_total"] += 1
if success:
self.metrics["requests_success"] += 1
else:
self.metrics["requests_failed"] += 1

def record_response_time(self, response_time: float):
self.response_times.append(response_time)

# 保持最近1000个响应时间的记录
if len(self.response_times) > 1000:
self.response_times.pop(0)

# 更新平均响应时间
self.metrics["average_response_time"] = sum(self.response_times) / len(self.response_times)

def record_tool_usage(self, tool_name: str):
if tool_name not in self.metrics["tool_usage"]:
self.metrics["tool_usage"][tool_name] = 0
self.metrics["tool_usage"][tool_name] += 1

def record_error(self, error_type: str):
if error_type not in self.metrics["error_types"]:
self.metrics["error_types"][error_type] = 0
self.metrics["error_types"][error_type] += 1

def get_metrics(self) -> Dict[str, Any]:
return self.metrics.copy()

def generate_report(self) -> str:
report = "📊 Agent监控报告\n\n"

report += f"总请求数: {self.metrics['requests_total']}\n"
report += f"成功请求: {self.metrics['requests_success']}\n"
report += f"失败请求: {self.metrics['requests_failed']}\n"
report += ".2f"

if self.metrics["tool_usage"]:
report += "\n🛠️ 工具使用统计:\n"
for tool, count in self.metrics["tool_usage"].items():
report += f" {tool}: {count}次\n"

if self.metrics["error_types"]:
report += "\n❌ 错误类型统计:\n"
for error_type, count in self.metrics["error_types"].items():
report += f" {error_type}: {count}次\n"

return report

def monitor_request(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
start_time = time.time()

try:
result = func(self, *args, **kwargs)
self.monitor.record_request(success=True)
return result
except Exception as e:
self.monitor.record_request(success=False)
self.monitor.record_error(type(e).__name__)
raise
finally:
response_time = time.time() - start_time
self.monitor.record_response_time(response_time)

return wrapper

总结与展望

Agent开发的核心要点

开发阶段 关键要点 实现建议
基础搭建 选择合适的框架和模型 从LangChain + OpenAI开始
功能扩展 添加工具调用能力 从搜索和计算工具入手
记忆系统 实现对话记忆 使用简单的文件存储
用户界面 提供友好的交互 从CLI开始,逐步添加Web界面
监控运维 添加监控和日志 实现基本的性能监控

学习路径建议

初学者路线(1-2周)

  1. 环境搭建:安装Python,配置OpenAI API
  2. 基础Agent:实现简单的问答Agent
  3. 工具集成:添加搜索和计算工具
  4. 界面开发:创建命令行或Web界面

中级开发者(2-4周)

  1. 记忆系统:实现对话记忆功能
  2. 多Agent协作:学习多个Agent之间的协作
  3. 学习能力:添加反馈学习机制
  4. 性能优化:优化响应速度和资源使用

高级开发者(1个月+)

  1. 自定义工具:开发专用的业务工具
  2. 企业集成:集成企业级系统和API
  3. 安全加固:实现安全措施和权限控制
  4. 大规模部署:学习分布式部署和扩展

最佳实践总结

  1. 从小开始:从简单的功能开始,逐步扩展
  2. 模块化设计:将Agent分解为可复用的组件
  3. 错误处理:完善的异常处理和错误恢复
  4. 用户体验:提供清晰的状态反馈和帮助信息
  5. 持续改进:收集用户反馈,不断优化Agent

未来展望

技术发展趋势

  • 多模态Agent:支持文本、图像、语音等多种输入输出
  • 自主学习:Agent能够自主发现新知识和技能
  • 群体智能:多个Agent协作形成智能网络
  • 个性化定制:根据用户偏好定制Agent行为

应用场景拓展

  • 企业助手:处理日常办公任务
  • 教育辅导:提供个性化学习指导
  • 创意协作:辅助艺术创作和设计
  • 自动化运维:智能监控和故障处理

通过这篇指南,您已经掌握了构建AI Agent的核心技能。从简单的问答助手到功能丰富的智能Agent,从单机运行到分布式部署,AI Agent的世界正在向您敞开大门。

系列文章导航

本文是AI Agent开发系列的第一篇,与AIGC系列形成互补:

后续Agent系列文章:

  • 《AI Agent架构设计模式》
  • 《企业级Agent开发实践》
  • 《Agent安全与隐私保护》

参考资料

  1. LangChain官方文档
  2. OpenAI API文档
  3. Streamlit文档
  4. Docker最佳实践
  5. Python Agent框架对比