在大语言模型时代,如何有效地存储和检索海量信息成为关键挑战。本文将详细介绍如何通过LLM智能提取重要数据,进行数据精简,并存储到向量数据库中,实现高效的知识检索和增强生成。

🎯 为什么需要LLM数据提取和向量存储?

传统方法的局限性

1. 数据爆炸问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 传统存储方式的痛点
documents = [
"2023年第一季度,公司营收达到1500万美元,其中线上销售占比65%,线下销售占比35%。",
"2023年第二季度,公司营收同比增长25%,主要得益于新产品线的推出。",
"2023年第三季度,公司面临供应链挑战,导致营收增长放缓5%。",
"2023年第四季度,公司通过优化库存管理,营收恢复到预期水平。",
# ... 成千上万条记录
]

# 检索效率低下
def search_traditional(query):
results = []
for doc in documents:
if query.lower() in doc.lower():
results.append(doc)
return results # 可能返回过多无关结果

2. 语义理解缺失

  • 关键词匹配无法理解语义
  • 无法捕捉上下文关系
  • 相似概念无法关联

3. LLM上下文长度限制

  • GPT-4的最大上下文长度为8192 tokens
  • 超出限制会导致信息丢失
  • 过长的上下文影响响应质量

向量存储的核心优势

1. 语义相似度检索

1
2
3
4
5
6
7
# 向量检索的优势
query = "公司年度营收表现如何?"
# 传统方法:只能找到包含"营收"的文档
# 向量方法:可以找到所有相关的财务数据,即使不包含精确关键词

similar_docs = vector_search(query, top_k=5)
# 返回:年度报告、季度财报、营收分析等相关文档

2. 数据压缩和精简

  • 从冗长文档中提取关键信息
  • 去除重复和无关内容
  • 保持核心知识点

3. 高效检索

  • 毫秒级相似度计算
  • 支持高并发查询
  • 可扩展到海量数据

🧠 LLM数据提取技术详解

1. 文档预处理

文本分块策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from langchain.text_splitter import RecursiveCharacterTextSplitter

def preprocess_documents(documents):
"""文档预处理和分块"""
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # 分块大小
chunk_overlap=200, # 分块重叠
separators=["\n\n", "\n", "。", "!", "?", ";", ":", " ", ""] # 分隔符优先级
)

processed_chunks = []
for doc in documents:
chunks = text_splitter.split_text(doc.page_content)
for chunk in chunks:
processed_chunks.append({
'content': chunk,
'source': doc.metadata.get('source', ''),
'page': doc.metadata.get('page', 0),
'chunk_id': len(processed_chunks)
})

return processed_chunks

质量过滤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def filter_quality_chunks(chunks, min_length=50, max_length=2000):
"""过滤低质量文本块"""
filtered_chunks = []

for chunk in chunks:
content = chunk['content'].strip()

# 长度过滤
if len(content) < min_length or len(content) > max_length:
continue

# 内容质量检查
if is_meaningful_content(content):
filtered_chunks.append(chunk)

return filtered_chunks

def is_meaningful_content(text):
"""判断内容是否有意义"""
# 去除纯符号内容
if len(re.sub(r'[^\w\s\u4e00-\u9fff]', '', text)) < 20:
return False

# 去除重复内容
words = text.split()
if len(set(words)) / len(words) < 0.3: # 重复词过多
return False

return True

2. LLM信息提取

关键信息识别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import openai
from typing import List, Dict

class InformationExtractor:
def __init__(self, api_key: str):
self.client = openai.OpenAI(api_key=api_key)

def extract_key_information(self, text: str) -> Dict:
"""使用LLM提取关键信息"""

prompt = f"""
请从以下文本中提取关键信息,并以结构化格式返回:

文本内容:
{text}

请提取以下信息:
1. 核心主题(1-2句话总结)
2. 关键事实和数据
3. 重要概念和术语
4. 因果关系和逻辑推理
5. 结论和建议

请用JSON格式返回,包含confidence_score(置信度)和extracted_info字段。
"""

try:
response = self.client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
temperature=0.1,
max_tokens=1000
)

result = response.choices[0].message.content
return self.parse_extraction_result(result)

except Exception as e:
print(f"提取失败: {e}")
return {"error": str(e)}

def parse_extraction_result(self, result: str) -> Dict:
"""解析LLM返回的结果"""
try:
# 清理和解析JSON
cleaned_result = result.strip()
if cleaned_result.startswith('```json'):
cleaned_result = cleaned_result[7:]
if cleaned_result.endswith('```'):
cleaned_result = cleaned_result[:-3]

parsed = json.loads(cleaned_result)
return parsed
except:
return {"raw_result": result}

实体关系抽取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def extract_entities_and_relations(text: str) -> Dict:
"""抽取实体和关系"""

extraction_prompt = f"""
从以下文本中抽取实体和关系:

文本:{text}

请识别:
1. 命名实体(人名、地名、组织、产品等)
2. 概念实体(技术名词、领域概念等)
3. 实体间的关系

返回格式:
{{
"entities": [
{{"type": "PERSON", "name": "张三", "context": "..."}},
{{"type": "ORG", "name": "ABC公司", "context": "..."}}
],
"relations": [
{{"subject": "张三", "relation": "工作于", "object": "ABC公司"}}
]
}}
"""

# 调用LLM进行实体关系抽取
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": extraction_prompt}],
temperature=0.1
)

return json.loads(response.choices[0].message.content)

3. 数据精简和压缩

冗余信息过滤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class DataCompressor:
def __init__(self):
self.extractor = InformationExtractor()

def compress_document(self, document: str) -> str:
"""压缩文档,保留核心信息"""

# 提取关键信息
key_info = self.extractor.extract_key_information(document)

# 生成压缩版本
compressed = self.generate_compressed_version(document, key_info)

# 验证压缩质量
if self.validate_compression(document, compressed):
return compressed
else:
return document # 如果压缩质量不佳,返回原文

def generate_compressed_version(self, original: str, key_info: Dict) -> str:
"""生成压缩版本"""

prompt = f"""
基于以下关键信息,生成原文的压缩版本:

原文:{original}

关键信息:{json.dumps(key_info, ensure_ascii=False)}

要求:
1. 保留所有重要事实和数据
2. 保持逻辑连贯性
3. 压缩到原来30%-50%的长度
4. 语言简洁明了

压缩版本:
"""

response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
max_tokens=len(original.split()) // 2
)

return response.choices[0].message.content

def validate_compression(self, original: str, compressed: str) -> bool:
"""验证压缩质量"""

# 计算压缩率
compression_ratio = len(compressed) / len(original)

# 压缩率应在合理范围内
if compression_ratio < 0.3 or compression_ratio > 0.8:
return False

# 检查是否保留了关键信息
return self.check_information_preservation(original, compressed)

智能摘要生成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def generate_smart_summary(text: str, max_length: int = 200) -> str:
"""生成智能摘要"""

summary_prompt = f"""
请为以下文本生成一个简洁的摘要:

要求:
1. 长度控制在{max_length}字符以内
2. 包含最重要的信息点
3. 语言流畅,逻辑清晰
4. 突出关键数据和结论

文本:
{text}

摘要:
"""

response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": summary_prompt}],
temperature=0.3,
max_tokens=max_length // 4 # 预估token数
)

return response.choices[0].message.content.strip()

🗂️ 向量数据库设计与实现

1. 向量嵌入生成

多种嵌入模型选择

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModel
import torch

class EmbeddingGenerator:
def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
self.model = SentenceTransformer(model_name)

def generate_embedding(self, text: str) -> List[float]:
"""生成文本向量"""
return self.model.encode(text).tolist()

def batch_generate_embeddings(self, texts: List[str]) -> List[List[float]]:
"""批量生成向量"""
embeddings = self.model.encode(texts, batch_size=32, show_progress_bar=True)
return embeddings.tolist()

def generate_query_embedding(self, query: str) -> List[float]:
"""生成查询向量"""
# 查询向量可以与文档向量使用不同的处理方式
return self.model.encode([query], normalize_embeddings=True)[0].tolist()

嵌入优化策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def optimize_embedding(text: str, strategy: str = "hybrid") -> str:
"""优化文本以获得更好的嵌入"""

if strategy == "keyword_extraction":
# 提取关键词
keywords = extract_keywords(text)
return " ".join(keywords)

elif strategy == "semantic_summary":
# 生成语义摘要
summary = generate_semantic_summary(text)
return summary

elif strategy == "hybrid":
# 结合关键词和摘要
keywords = extract_keywords(text)
summary = generate_semantic_summary(text)
return f"{summary} {' '.join(keywords[:5])}"

return text

2. 向量数据库架构

数据模型设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from pydantic import BaseModel
from typing import List, Dict, Optional
from datetime import datetime

class DocumentChunk(BaseModel):
"""文档块数据模型"""
id: str
content: str
embedding: List[float]
metadata: Dict[str, any]
summary: Optional[str] = None
key_info: Optional[Dict] = None
created_at: datetime = datetime.now()
updated_at: datetime = datetime.now()

class KnowledgeBase:
"""知识库数据模型"""
id: str
name: str
description: str
chunks: List[DocumentChunk]
embedding_model: str
created_at: datetime = datetime.now()

数据库操作类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import chromadb
from chromadb.config import Settings
import uuid

class VectorDatabase:
def __init__(self, persist_directory: str = "./chroma_db"):
self.client = chromadb.PersistentClient(path=persist_directory)
self.collection = None

def create_collection(self, name: str, metadata: Dict = None):
"""创建向量集合"""
self.collection = self.client.create_collection(
name=name,
metadata=metadata or {"hnsw:space": "cosine"}
)
return self.collection

def get_or_create_collection(self, name: str):
"""获取或创建集合"""
self.collection = self.client.get_or_create_collection(name=name)
return self.collection

def add_documents(self, documents: List[DocumentChunk]):
"""添加文档到向量数据库"""

ids = [doc.id for doc in documents]
embeddings = [doc.embedding for doc in documents]
metadatas = [doc.metadata for doc in documents]
documents_content = [doc.content for doc in documents]

self.collection.add(
ids=ids,
embeddings=embeddings,
metadatas=metadatas,
documents=documents_content
)

def search_similar(self, query_embedding: List[float],
n_results: int = 5) -> Dict:
"""搜索相似文档"""

results = self.collection.query(
query_embeddings=[query_embedding],
n_results=n_results,
include=['documents', 'metadatas', 'distances']
)

return {
'documents': results['documents'][0],
'metadatas': results['metadatas'][0],
'distances': results['distances'][0]
}

def delete_documents(self, ids: List[str]):
"""删除文档"""
self.collection.delete(ids=ids)

def update_document(self, id: str, embedding: List[float],
metadata: Dict, document: str):
"""更新文档"""
self.collection.update(
ids=[id],
embeddings=[embedding],
metadatas=[metadata],
documents=[document]
)

3. 索引优化策略

HNSW索引配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def create_optimized_collection():
"""创建优化后的向量集合"""

collection = client.create_collection(
name="knowledge_base",
metadata={
"hnsw:space": "cosine", # 距离度量
"hnsw:construction_ef": 128, # 构建时的ef参数
"hnsw:search_ef": 64, # 搜索时的ef参数
"hnsw:M": 16, # 最大连接数
"hnsw:num_threads": 4, # 线程数
"hnsw:resize_factor": 1.2 # 调整因子
}
)

return collection

动态索引更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class DynamicIndexer:
def __init__(self, vector_db: VectorDatabase):
self.vector_db = vector_db
self.pending_updates = []
self.batch_size = 100

def add_to_batch(self, document: DocumentChunk):
"""添加到批处理队列"""
self.pending_updates.append(document)

if len(self.pending_updates) >= self.batch_size:
self.flush_batch()

def flush_batch(self):
"""执行批处理更新"""
if not self.pending_updates:
return

try:
self.vector_db.add_documents(self.pending_updates)
print(f"成功添加 {len(self.pending_updates)} 个文档")
self.pending_updates.clear()
except Exception as e:
print(f"批处理失败: {e}")

def force_flush(self):
"""强制刷新所有待处理更新"""
self.flush_batch()

🔍 检索增强生成(RAG)流程

1. 查询预处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class QueryProcessor:
def __init__(self, embedding_generator: EmbeddingGenerator):
self.embedding_gen = embedding_generator

def process_query(self, query: str) -> Dict:
"""处理用户查询"""

# 查询意图识别
intent = self.identify_intent(query)

# 查询扩展
expanded_queries = self.expand_query(query, intent)

# 生成查询向量
query_embeddings = []
for q in expanded_queries:
embedding = self.embedding_gen.generate_query_embedding(q)
query_embeddings.append(embedding)

return {
'original_query': query,
'intent': intent,
'expanded_queries': expanded_queries,
'query_embeddings': query_embeddings
}

def identify_intent(self, query: str) -> str:
"""识别查询意图"""

intent_prompt = f"""
分析以下查询的意图:

查询:{query}

可能的意图:
- factual:事实性问题
- analytical:分析性问题
- comparative:比较性问题
- procedural:过程性问题
- creative:创造性问题

返回最合适的意图类型:
"""

response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": intent_prompt}],
temperature=0.1,
max_tokens=50
)

return response.choices[0].message.content.strip()

def expand_query(self, query: str, intent: str) -> List[str]:
"""查询扩展"""

expansion_prompt = f"""
为以下查询生成相关变体:

原始查询:{query}
查询意图:{intent}

生成3-5个相关的查询变体,用于更全面的检索:
"""

response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": expansion_prompt}],
temperature=0.3,
max_tokens=200
)

variants = response.choices[0].message.content.split('\n')
return [query] + [v.strip('- ').strip() for v in variants if v.strip()]

2. 多路检索

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class MultiPathRetriever:
def __init__(self, vector_db: VectorDatabase, query_processor: QueryProcessor):
self.vector_db = vector_db
self.query_processor = query_processor

def retrieve(self, query: str, top_k: int = 5) -> List[Dict]:
"""多路检索"""

# 处理查询
processed_query = self.query_processor.process_query(query)

all_results = []

# 1. 向量相似度检索
for embedding in processed_query['query_embeddings']:
vector_results = self.vector_db.search_similar(embedding, top_k)
all_results.extend(self.format_vector_results(vector_results))

# 2. 关键词检索
keyword_results = self.keyword_search(query, top_k)
all_results.extend(keyword_results)

# 3. 语义检索
semantic_results = self.semantic_search(query, top_k)
all_results.extend(semantic_results)

# 去重和重排序
final_results = self.deduplicate_and_rerank(all_results, query)

return final_results[:top_k]

def keyword_search(self, query: str, top_k: int) -> List[Dict]:
"""关键词检索"""
# 实现关键词检索逻辑
keywords = self.extract_keywords(query)
results = []

for keyword in keywords:
# 在向量数据库中搜索包含关键词的文档
keyword_results = self.vector_db.collection.query(
query_texts=[keyword],
n_results=top_k
)
results.extend(self.format_results(keyword_results))

return results

def semantic_search(self, query: str, top_k: int) -> List[Dict]:
"""语义检索"""
# 使用不同的嵌入模型进行语义检索
semantic_embedding = self.generate_semantic_embedding(query)

results = self.vector_db.search_similar(semantic_embedding, top_k)
return self.format_vector_results(results)

3. 结果融合和重排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def deduplicate_and_rerank(results: List[Dict], query: str) -> List[Dict]:
"""去重和重排序"""

# 去重
seen_ids = set()
unique_results = []

for result in results:
doc_id = result.get('id')
if doc_id not in seen_ids:
seen_ids.add(doc_id)
unique_results.append(result)

# 重排序
for result in unique_results:
# 计算相关性得分
relevance_score = calculate_relevance_score(result, query)
result['relevance_score'] = relevance_score

# 按得分排序
sorted_results = sorted(unique_results,
key=lambda x: x['relevance_score'],
reverse=True)

return sorted_results

def calculate_relevance_score(result: Dict, query: str) -> float:
"""计算相关性得分"""

score = 0.0

# 向量相似度得分
if 'distance' in result:
score += (1 - result['distance']) * 0.6 # 转换为相似度

# 关键词匹配得分
if 'content' in result:
keywords = extract_keywords(query)
content_words = set(result['content'].lower().split())

keyword_matches = sum(1 for kw in keywords if kw.lower() in content_words)
score += (keyword_matches / len(keywords)) * 0.3

# 语义相关性得分
semantic_score = calculate_semantic_relevance(result, query)
score += semantic_score * 0.1

return score

🚀 完整实现示例

1. 端到端知识库构建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class KnowledgeBaseBuilder:
def __init__(self):
self.extractor = InformationExtractor()
self.compressor = DataCompressor()
self.embedding_gen = EmbeddingGenerator()
self.vector_db = VectorDatabase()

def build_knowledge_base(self, documents: List[str], kb_name: str) -> str:
"""构建完整知识库"""

print("📚 开始构建知识库...")

# 1. 文档预处理
print("🔍 预处理文档...")
processed_chunks = preprocess_documents(documents)

# 2. 信息提取和精简
print("🧠 提取和精简信息...")
compressed_chunks = []

for i, chunk in enumerate(processed_chunks):
print(f"处理文档块 {i+1}/{len(processed_chunks)}")

# 提取关键信息
key_info = self.extractor.extract_key_information(chunk['content'])

# 压缩内容
compressed_content = self.compressor.compress_document(chunk['content'])

# 生成摘要
summary = generate_smart_summary(compressed_content)

compressed_chunks.append({
'id': f"chunk_{i}",
'content': compressed_content,
'summary': summary,
'key_info': key_info,
'metadata': chunk
})

# 3. 生成向量嵌入
print("🔢 生成向量嵌入...")
texts_to_embed = [chunk['content'] for chunk in compressed_chunks]
embeddings = self.embedding_gen.batch_generate_embeddings(texts_to_embed)

# 添加嵌入到文档块
for i, chunk in enumerate(compressed_chunks):
chunk['embedding'] = embeddings[i]

# 4. 存储到向量数据库
print("💾 存储到向量数据库...")
self.vector_db.create_collection(kb_name)
self.vector_db.add_documents(compressed_chunks)

print(f"✅ 知识库构建完成!共处理 {len(compressed_chunks)} 个文档块")

return kb_name

2. 智能问答系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class IntelligentQASystem:
def __init__(self, knowledge_base: str):
self.vector_db = VectorDatabase()
self.vector_db.get_or_create_collection(knowledge_base)
self.query_processor = QueryProcessor(EmbeddingGenerator())
self.retriever = MultiPathRetriever(self.vector_db, self.query_processor)

def answer_question(self, question: str) -> Dict:
"""回答问题"""

# 1. 检索相关文档
relevant_docs = self.retriever.retrieve(question, top_k=3)

# 2. 构建上下文
context = self.build_context(relevant_docs)

# 3. 生成答案
answer = self.generate_answer(question, context)

# 4. 评估答案质量
quality_score = self.evaluate_answer_quality(question, answer, context)

return {
'question': question,
'answer': answer,
'relevant_documents': relevant_docs,
'context': context,
'quality_score': quality_score,
'sources': [doc['metadata']['source'] for doc in relevant_docs]
}

def build_context(self, relevant_docs: List[Dict]) -> str:
"""构建答案上下文"""

context_parts = []

for doc in relevant_docs:
# 使用压缩后的内容和关键信息构建上下文
content = doc.get('content', '')
summary = doc.get('summary', '')
key_info = doc.get('key_info', {})

# 构建简洁的上下文片段
context_part = f"""
内容:{summary}

详细信息:{content[:500]}...

关键信息:{json.dumps(key_info, ensure_ascii=False, indent=2)}
"""

context_parts.append(context_part)

return "\n---\n".join(context_parts)

def generate_answer(self, question: str, context: str) -> str:
"""生成答案"""

answer_prompt = f"""
基于以下上下文信息,回答用户的问题。

上下文:
{context}

问题:{question}

要求:
1. 答案要准确、客观
2. 引用具体数据和事实
3. 如果信息不足,要明确说明
4. 保持逻辑清晰,语言简洁

答案:
"""

response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": answer_prompt}],
temperature=0.1,
max_tokens=1000
)

return response.choices[0].message.content

def evaluate_answer_quality(self, question: str, answer: str, context: str) -> float:
"""评估答案质量"""

evaluation_prompt = f"""
评估以下答案的质量:

问题:{question}
答案:{answer}
上下文:{context[:1000]}...

从以下维度评估(0-1分):
1. 准确性:答案是否准确反映了上下文信息
2. 相关性:答案是否直接回答了问题
3. 完整性:答案是否包含了必要的信息
4. 清晰度:答案是否表达清晰

返回平均得分(0-1之间):
"""

response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": evaluation_prompt}],
temperature=0.1
)

try:
score = float(response.choices[0].message.content.strip())
return max(0.0, min(1.0, score))
except:
return 0.5 # 默认中等质量

📊 性能优化策略

1. 索引优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def optimize_vector_index(collection_name: str):
"""优化向量索引"""

# 重新构建索引
collection.modify(
metadata={
"hnsw:construction_ef": 200, # 提高构建质量
"hnsw:M": 32, # 增加连接数
"hnsw:search_ef": 100, # 提高搜索质量
}
)

# 执行索引重建
collection.rebuild_index()

2. 缓存策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from functools import lru_cache
import time

class CachedRetriever:
def __init__(self, retriever, cache_ttl: int = 3600):
self.retriever = retriever
self.cache_ttl = cache_ttl
self.cache = {}

@lru_cache(maxsize=1000)
def retrieve_with_cache(self, query: str, top_k: int = 5):
"""带缓存的检索"""

cache_key = f"{query}_{top_k}"
current_time = time.time()

# 检查缓存
if cache_key in self.cache:
cached_result, timestamp = self.cache[cache_key]
if current_time - timestamp < self.cache_ttl:
return cached_result

# 执行检索
result = self.retriever.retrieve(query, top_k)

# 更新缓存
self.cache[cache_key] = (result, current_time)

return result

def clear_expired_cache(self):
"""清理过期缓存"""
current_time = time.time()
expired_keys = [
key for key, (_, timestamp) in self.cache.items()
if current_time - timestamp >= self.cache_ttl
]

for key in expired_keys:
del self.cache[key]

3. 批量处理优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def process_documents_batch(documents: List[str], batch_size: int = 10):
"""批量处理文档"""

results = []

for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]

# 并行处理批次
batch_results = process_batch_parallel(batch)

results.extend(batch_results)

print(f"处理进度: {min(i + batch_size, len(documents))}/{len(documents)}")

return results

def process_batch_parallel(batch: List[str]):
"""并行处理单个批次"""

with ThreadPoolExecutor(max_workers=4) as executor:
futures = [
executor.submit(process_single_document, doc)
for doc in batch
]

results = []
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"处理失败: {e}")

return results

🎯 最佳实践

1. 数据质量保证

  • 数据清洗:去除噪声和无关信息
  • 质量评估:建立数据质量指标
  • 持续监控:定期检查数据质量变化

2. 检索效果优化

  • 查询扩展:使用同义词和相关概念
  • 重排序策略:结合多种相关性指标
  • 用户反馈:收集用户对检索结果的反馈

3. 系统可扩展性

  • 分层存储:热数据和冷数据分离
  • 分布式部署:支持水平扩展
  • 负载均衡:合理分配查询压力

4. 安全和隐私保护

  • 数据脱敏:保护敏感信息
  • 访问控制:实现细粒度权限管理
  • 审计日志:记录所有操作行为

🔍 监控和维护

1. 性能监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class PerformanceMonitor:
def __init__(self):
self.metrics = {
'query_count': 0,
'avg_response_time': 0,
'cache_hit_rate': 0,
'error_rate': 0
}

def record_query(self, response_time: float, is_error: bool = False):
"""记录查询指标"""

self.metrics['query_count'] += 1

# 更新平均响应时间
total_time = self.metrics['avg_response_time'] * (self.metrics['query_count'] - 1)
self.metrics['avg_response_time'] = (total_time + response_time) / self.metrics['query_count']

# 更新错误率
if is_error:
self.metrics['error_rate'] = (self.metrics['error_rate'] * (self.metrics['query_count'] - 1) + 1) / self.metrics['query_count']

def get_metrics(self) -> Dict:
"""获取性能指标"""
return self.metrics.copy()

def reset_metrics(self):
"""重置指标"""
self.metrics = {key: 0 for key in self.metrics.keys()}

2. 数据更新策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class KnowledgeBaseUpdater:
def __init__(self, vector_db: VectorDatabase):
self.vector_db = vector_db

def incremental_update(self, new_documents: List[str]):
"""增量更新知识库"""

# 处理新文档
processed_chunks = preprocess_documents(new_documents)

# 生成向量
embeddings = self.embedding_gen.batch_generate_embeddings(
[chunk['content'] for chunk in processed_chunks]
)

# 添加到现有集合
for i, chunk in enumerate(processed_chunks):
chunk['embedding'] = embeddings[i]
chunk['id'] = f"update_{int(time.time())}_{i}"

self.vector_db.add_documents(processed_chunks)

def full_rebuild(self, all_documents: List[str]):
"""完全重建知识库"""

# 删除旧集合
try:
self.vector_db.client.delete_collection(self.collection_name)
except:
pass

# 重新构建
self.knowledge_builder.build_knowledge_base(all_documents, self.collection_name)

📚 总结

通过LLM数据提取与向量存储架构,我们可以构建一个智能、高效的知识库系统:

🎯 核心价值

  1. 智能提取:利用LLM理解和提取重要信息
  2. 高效存储:向量数据库提供毫秒级相似度检索
  3. 精准检索:语义相似度而非关键词匹配
  4. 持续优化:通过反馈机制不断改进系统

🛠️ 技术栈选择

  • LLM:GPT-4用于信息提取,GPT-3.5用于摘要生成
  • 向量数据库:ChromaDB用于轻量级应用,Milvus用于大规模部署
  • 嵌入模型:Sentence Transformers系列
  • 框架:LangChain用于流程编排

🚀 应用前景

  • 企业知识库:构建智能问答系统
  • 客户服务:提高响应质量和效率
  • 内容创作:辅助内容生成和编辑
  • 学术研究:加速文献检索和分析

💡 关键成功因素

  1. 数据质量:高质量的原始数据是基础
  2. 模型选择:合适的嵌入模型和LLM
  3. 索引优化:高效的向量索引策略
  4. 持续迭代:基于用户反馈的系统优化

这个架构不仅解决了传统检索方法的局限性,还为LLM应用提供了强大的外部知识支持,是构建下一代AI应用的重要基础设施。


参考资料:

  • “Retrieval-Augmented Generation for Knowledge-Intensive NLP Tasks” - Lewis et al.
  • “Dense Passage Retrieval for Open-Domain Question Answering” - Karpukhin et al.
  • Sentence-BERT: Sentence Embeddings using Siamese BERT-Networks - Reimers & Gurevych
  • “Approximate Nearest Neighbor Search in High Dimensions” - Malkov & Yashunin