前言
在数字化浪潮中,大型语言模型(LLM)发展迅猛,在智能客服、开发助手等多个领域展现出卓越的应用潜力,推动了各行业的智能化转型。在数据库查询领域,Text-to-SQL 技术正逐渐成为自然语言与结构化数据之间的关键桥梁,帮助人们以更加便捷的方式获取数据洞察。
Text-to-SQL,又简称为 T2S 或 Text2SQL,其核心功能在于将数据库场景下的自然语言(NL)问题,转化为可在关系型数据库中执行的结构化查询语言(SQL),因而也被称为 NL2SQL。在 Text-to-SQL 技术领域,Spider 等框架已得到广泛应用。Spider 构建了一个包含复杂跨领域数据库问题的数据集,凭借语义解析和强化学习技术,在复杂查询场景下能给出精准的 SQL 语句生成方案。然而,这类解决方案通常需要繁琐的前期准备,如数据标注、模型微调,对使用者的技术储备和工程能力要求较高。
在实际业务场景中,客户往往期望拥有简单易用、即插即用的解决方案,以满足日常数据查询与分析需求,而无需投入大量的技术资源进行复杂配置。
针对这一需求,我们可结合 Vanna.AI 框架与 Amazon Bedrock 所提供的大模型能力,构建面向 Amazon RDS/Aurora MySQL 和 Amazon Redshift 的高效查询分析解决方案。该方案融合了大模型强大的自然语言理解能力与 AWS 数据库服务的高性能特性,为用户开辟了一条低门槛、高效能的 Text-to-SQL 实现路径。
通过这一方案,业务用户只需用自然语言描述数据需求,系统便能自动将其转换为准确的 SQL 查询,显著降低了数据分析的技术门槛,大幅提升了数据利用效率 。
方案及功能
Vanna.AI
Vanna.AI 是 MIT 许可的开源 Python RAG(检索增强生成)框架,用于 SQL 生成和相关功能。
Vanna.AI:输出方式
下表是 Vanna.AI 使用自然语言与数据库交互的多种输出方式。
Output |
Description |
📄 SQL |
Vanna.AI 可以从自然语言问题中生成 SQL 查询。这些 SQL 查询可用于与数据库交互。 |
📁 DataFrame |
生成 SQL 查询后,Vanna.AI 可以在数据库中执行它,并以 pandas DataFrame 格式返回。 |
📊 Charts |
生成 SQL 查询后,Vanna.AI 可以在数据库中执行它,并以合适的图表类型返回。 |
❓ Follow-up questions |
Vanna 可以基于生成的 SQL 查询生成后续问题,这些后续问题可以帮助用户细化他们的查询或更详细地探索数据。 |
🔍 Explanations queries |
Vanna 可以为生成的 SQL 查询提供解释,这些解释可以帮助用户理解他们的自然语言问题是如何被解释成 SQL 查询的。 |
Vanna.AI:功能特点
下表包含 Vanna.AI 提供的关键功能,这些功能丰富了数据交互能力:
Feature |
Description |
🚀 Model Training |
在训练过程中,使用包括数据定义语言(DDL)语句、文档和 SQL 查询在内的数据源来训练 RAG 模型。 |
🤖 User Query Handling |
用户可以用自然语言提出问题,而 Vanna.AI 会通过生成 SQL 查询来做出响应。 |
📚 Documentation |
提供了详尽的文档、专门的网站和 Discord 上的支持社区,以提供全面的帮助。 |
🔌 Database Connections |
Vanna.AI 允许连接到多个数据库,使用户不仅能够获取 SQL 查询,还能通过建立与各自数据库的连接来执行这些查询。 |
🤔 AI-Generated Recommendation Questions |
这个框架包含一个生成 AI 驱动问题的功能,为用户提供可以探索的额外查询建议。 |
Vanna.AI 组件详解
Embedding model
基于对 vanna-github 项目代码的分析,我发现该项目使用了以下向量模型:
1. 默认的 Embedding模型:all-MiniLM-L6-v2
- 这是一个由 Sentence Transformers 提供的模型,用于将文本转换为向量表示
- 该模型在多个组件中被用作默认的嵌入模型
2. 自定义 Embedding 模型
- Vanna.AI 支持更换 Embedding 模型
- 本文也使用了 amazon.titan-embed-text-v2:0
Large language model
Amazon Bedrock 是一项完全托管的服务,它通过统一的 API 提供来自领先人工智能公司和亚马逊的高性能基础模型(FMs)。本文使用 DeepSeek in Amazon Bedrock 来提供大模型推理能力。
Vector Database
Vector DB 用于存储 DDL、SQL、Document 等数据,提供 Vanna.AI 的 RAG 功能。
Vanna.AI 支持多种 Vector DB,考虑到数据库的性能扩展和数据持久性需求,本文使用 Amazon RDS Postgresql/pgvector 作 vector 存储。
Vanna.AI 的向量库构造详解:
Vanna.AI 在使用 pgvector 作为向量存储时,会在 PostgreSQL 数据库中创建以下表来存储嵌入向量。
主要表是 langchain_pg_embedding,这是 LangChain 的 PGVector 实现创建的表。这个表的结构如下:
CREATE TABLE langchain_pg_embedding (
uuid UUID PRIMARY KEY,
collection_id TEXT NOT NULL,
embedding VECTOR(N), -- N 是嵌入向量的维度,取决于使用的嵌入模型
document TEXT,
cmetadata JSONB,
custom_id TEXT
);
表中的字段含义:
- uuid: 每条记录的唯一标识符
- collection_id: 集合的名称,Vanna.AI 使用三个集合:’sql’、’ddl’ 和 ‘documentation’
- embedding: 向量字段,存储文档的嵌入向量
- document: 原始文档内容
- cmetadata: JSON 格式的元数据,包含额外信息
- custom_id: 自定义 ID,Vanna.AI 使用格式如 “uuid-sql”、”uuid-ddl” 或 “uuid-doc”
此外,PGVector 还会创建索引来加速向量搜索:
CREATE INDEX ON langchain_pg_embedding
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
Vanna.AI 使用这个表存储三种类型的数据:
- SQL 查询和相关问题(collection_id = ‘sql’)
- 数据库模式定义 DDL(collection_id = ‘ddl’)
- 业务文档(collection_id = ‘documentation’)
vn.train() 方法会将训练数据转换为嵌入向量,并存储在这个表中。
vn.ask() 方法会将问题转换为嵌入向量,并搜索最相似的向量,找到相关的 SQL 查询、DDL 或文档。
通过以下 SQL 查询查看表中所有存储的文档及其集合和元数据,但不包括嵌入向量(因为它们通常是高维的,不适合直接查看)。
SELECT collection_id, document, cmetadata FROM langchain_pg_embedding;
方案部署
本文是基于 Jupyter Notebook 环境进行部署和演示。需要准备 Jupyter Notebook 环境,和安装相关 Python 依赖包。
安装相关 Python 包
pip install --upgrade pip setuptools
pip install --upgrade boto3 botocore
pip install boto3 PyMySQL vanna
pip install 'vanna[chromadb,anthropic,postgres,pymysql]'
Vanna.AI 初始化
根据不同需求,本小节分两个部署方案,两个方案在 Embedding Model,LLM,Vector DB 选型上有差异。
部署方案(1)
环境信息:
Embedding Model:all-MiniLM-L6-v2
LLM: Bedrock Deepseek
Vector DB:PG_VectorStore
Target DB:Postgresql
样例代码:
# 使用默认的embedding model all-MiniLM-L6-v2
import boto3
from vanna.pgvector.pgvector import PG_VectorStore
from vanna.bedrock.bedrock_converse import Bedrock_Converse
# AWS 凭证
AWS_ACCESS_KEY_ID = ' <your-access-key>'
AWS_SECRET_ACCESS_KEY = ' <your-secret_key>'
AWS_REGION = 'us-east-1'
# 创建 Bedrock 客户端
try:
bedrock_client = boto3.client(
'bedrock-runtime',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=AWS_REGION
)
print(f"✅ Bedrock 客户端创建成功 (区域: {AWS_REGION})")
except Exception as e:
print(f"❌ Bedrock 客户端创建失败: {str(e)}")
# 数据库连接变量
my_host = 'hostname'
my_dbname = 'pgvector'
my_user = 'username'
my_password = 'password'
my_port = 5432
# 配置 Bedrock 模型用于对话
bedrock_config = {
"modelId": "us.deepseek.r1-v1:0",
"allow_llm_to_see_data": True, # 设置允许LLM查看数据
"temperature": 0.7,
"max_tokens": 500
}
# 初始化 MyVanna 实例
try:
class MyVanna(PG_VectorStore, Bedrock_Converse):
def __init__(self, config=None):
if config is None:
config = {}
# 构建连接字符串
config["connection_string"] = f"postgresql://{my_user}:{my_password}@{my_host}:{my_port}/{my_dbname}"
# 初始化 PG_VectorStore
PG_VectorStore.__init__(self, config=config)
# 初始化 Bedrock_Converse
Bedrock_Converse.__init__(self, client=bedrock_client, config=bedrock_config)
# 创建 MyVanna 实例
vn = MyVanna()
print(f"✅ MyVanna 实例初始化成功!Vector Database(是: {my_host}:{my_port}/{my_dbname})")
except Exception as e:
print(f"❌ 连接Vecotr数据库时发生错误: {e}")
#需要被查询的库
try:
vn.connect_to_postgres(host=my_host, dbname=my_dbname, user=my_user, password=my_password, port=my_port)
print(f"✅ 成功连接到(数据库: {my_host}:{my_port}/{my_dbname})")
except Exception as e:
print(f"❌ 连接数据库时发生错误: {e}")
# 打印使用的 embedding model 信息
if hasattr(vn.embedding_function, 'model_id'):
print(f"使用的 embedding model: {vn.embedding_function.model_id}")
elif hasattr(vn.embedding_function, 'model_name'):
print(f"使用的 embedding model: {vn.embedding_function.model_name}")
else:
print(f"使用的 embedding model: 未知 (类型: {type(vn.embedding_function).__name__})")
部署方案(2)
环境信息
使用 aws embedding 模型
要使用这段代码,你需要安装 langchain-aws 包:
pip install langchain-aws
这段代码的主要变化是:
- 导入 BedrockEmbeddings 从 langchain_aws 包
- 创建一个 BedrockEmbeddings 实例,使用 Titan 嵌入模型(amazon.titan-embed-text-v2)
- 将这个嵌入函数添加到配置中,传递给 PG_VectorStore
- 这样,Vanna.AI 将使用 AWS Bedrock 的 Titan 嵌入模型而不是默认的 HuggingFace 嵌入模型或 OpenAI 嵌入模型
注意:确保你的 AWS 账户有权限访问 Titan 嵌入模型。如果你遇到权限错误,可能需要在 AWS 控制台中启用对该模型的访问权限。
样例代码:
#使用amazon.titan-embed-text-v2模型
import boto3
from langchain_aws import BedrockEmbeddings
from vanna.pgvector.pgvector import PG_VectorStore
from vanna.bedrock.bedrock_converse import Bedrock_Converse
# AWS 凭证
AWS_ACCESS_KEY_ID = '<your-access-key>'
AWS_SECRET_ACCESS_KEY = '<your-secret_key>'
AWS_REGION = 'us-east-1'
# 创建 Bedrock 客户端
try:
bedrock_client = boto3.client(
'bedrock-runtime',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=AWS_REGION
)
print(f"✅ Bedrock 客户端创建成功 (区域: {AWS_REGION})")
except Exception as e:
print(f"❌ Bedrock 客户端创建失败: {str(e)}")
# 数据库连接变量
my_host = '44.201.54.116'
my_dbname = 'pgvector'
my_user = 'pgvector'
my_password = 'password'
my_port = 54333
# 创建 Bedrock Titan 嵌入模型
embedding_function = BedrockEmbeddings(
client=bedrock_client,
model_id="amazon.titan-embed-text-v2:0" # Titan 嵌入模型
)
# 配置 Bedrock 模型用于对话
bedrock_config = {
"modelId": "us.deepseek.r1-v1:0",
"allow_llm_to_see_data": True, # 设置允许LLM查看数据
"temperature": 0.7,
"max_tokens": 500
}
# 初始化 MyVanna 实例
try:
class MyVanna(PG_VectorStore, Bedrock_Converse):
def __init__(self, config=None):
if config is None:
config = {}
# 构建连接字符串
config["connection_string"] = f"postgresql://{my_user}:{my_password}@{my_host}:{my_port}/{my_dbname}"
# 添加嵌入函数到配置
config["embedding_function"] = embedding_function
# 初始化 PG_VectorStore
PG_VectorStore.__init__(self, config=config)
# 初始化 Bedrock_Converse
Bedrock_Converse.__init__(self, client=bedrock_client, config=bedrock_config)
# 创建 MyVanna 实例
vn = MyVanna()
print(f"✅ MyVanna 实例初始化成功!Vector Database(是: {my_host}:{my_port}/{my_dbname})")
except Exception as e:
print(f"❌ 连接Vector数据库时发生错误: {e}")
#需要被查询的库
try:
vn.connect_to_postgres(host=my_host, dbname=my_dbname, user=my_user, password=my_password, port=my_port)
print(f"✅ 成功连接到(数据库: {my_host}:{my_port}/{my_dbname})")
except Exception as e:
print(f"❌ 连接数据库时发生错误: {e}")
# 打印使用的 embedding model 信息
if hasattr(vn.embedding_function, 'model_id'):
print(f"使用的 embedding model是: {vn.embedding_function.model_id}")
elif hasattr(vn.embedding_function, 'model_name'):
print(f"使用的 embedding model是: {vn.embedding_function.model_name}")
else:
print(f"使用的 embedding model是: 未知 (类型: {type(vn.embedding_function).__name__})")
数据库连接代码样例
Vanna.AI 支持很多目标数据库类型,包括 PostgreSQL、MySQL、PrestoDB、Apache Hive、ClickHouse、Snowflake、Oracle、Microsoft SQL Server、BigQuery、SQLite、DuckDB 等。下面是 Mysql 和 Postgresql 的连接样例。
- Mysql 数据库
try:
vn.connect_to_mysql(host=my_host, dbname=my_dbname, user=my_user, password=my_password, port=my_port)
print(f"✅ 成功连接到(数据库: {my_host}:{my_port}/{my_dbname})")
except Exception as e:
print(f"❌ 连接数据库时发生错误: {e}")
- Postgresql 数据库
#需要被查询的库
try:
vn.connect_to_postgres(host=my_host, dbname=my_dbname, user=my_user, password=my_password, port=my_port)
print(f"✅ 成功连接到(数据库: {my_host}:{my_port}/{my_dbname})")
except Exception as e:
print(f"❌ 连接数据库时发生错误: {e}")
Text-to-SQL 效果演示
通过以上步骤,我们已经完成 Text-to-SQL 环境的部署。接下来分别通过 Jupyter Notebook 和可视化界面进行演示。
Jupyter Notebook 环境演示
训练数据
使用 vn.train 训练专有的 RAG“模型”
# The information schema query may need some tweaking depending on your database. This is a good starting point.
df_information_schema = vn.run_sql("SELECT * FROM INFORMATION_SCHEMA.COLUMNS")
# This will break up the information schema into bite-sized chunks that can be referenced by the LLM
plan = vn.get_training_plan_generic(df_information_schema)
plan
# If you like the plan, then uncomment this and run it to train
vn.train(plan=plan)
Embedding 并存入向量数据库
# The following are methods for adding training data. Make sure you modify the examples to match your database.
# DDL statements are powerful because they specify table names, colume names, types, and potentially relationships
vn.train(ddl="""
CREATE TABLE IF NOT EXISTS my-table (
id INT PRIMARY KEY,
name VARCHAR(100),
age INT
)
""")
# Sometimes you may want to add documentation about your business terminology or definitions.
vn.train(documentation="有多少个年龄小于30的人?")
# You can also add SQL queries to your training data. This is useful if you have some queries already laying around. You can just copy and paste those from your editor to begin generating new SQL.
vn.train(sql="SELECT count(*) FROM my_table WHERE age < 30")
# At any time you can inspect what training data the package is able to reference
training_data = vn.get_training_data()
training_data
删除训练数据
可以根据需求删除不需要的训练数据:
# You can remove training data if there's obsolete/incorrect information.
vn.remove_training_data(id='7db2674a-a22d-5a3e-8e87-5413cfef9756-sql')
训练业务 SQL
vn.train(sql="SELECT count(*) FROM my_table WHERE age < 30", documentation="有多少个年龄小于30的人?")
查询数据
可以通过自然语言查询需要的数据:
vn.ask("针对每个年龄段,统计人数,以10岁为一个年龄段?")
查询过程输出如下:
Add of existing embedding ID: d202524f-ce0f-5750-980c-c7cc7c9f69c9-sql
Add of existing embedding ID: 8dde0b98-0a23-50dc-8a90-787057bdb503-sql
Number of requested results 10 is greater than number of elements in index 5, updating n_results = 5
Number of requested results 10 is greater than number of elements in index 1, updating n_results = 1
Add of existing embedding ID: 5c4c38b0-e1fa-5b6d-b782-237117a6eb72-doc
SQL Prompt: [{'role': 'system', 'content': "You are a SQL expert. Please help to generate a SQL query to answer the question. Your response should ONLY be based on the given context and follow the response guidelines and format instructions. \n===Tables \n\n CREATE TABLE IF NOT EXISTS my-table (\n id INT PRIMARY KEY,\n name VARCHAR(100),\n age INT\n )\n\n\n\n===Additional Context \n\n有多少个年龄小于30的人?\n\nThe following columns are in the ST_UNITS_OF_MEASURE table in the def database:\n\n| | TABLE_CATALOG | TABLE_SCHEMA | TABLE_NAME | COLUMN_NAME | DATA_TYPE | COLUMN_COMMENT |\n|----:|:----------------|:-------------------|:--------------------|:------------------|:------------|:-----------------|\n| 741 | def | information_schema | ST_UNITS_OF_MEASURE | UNIT_NAME | varchar | |\n| 742 | def | information_schema | ST_UNITS_OF_MEASURE | UNIT_TYPE | varchar | |\n| 743 | def | information_schema | ST_UNITS_OF_MEASURE | CONVERSION_FACTOR | double | |\n| 744 | def | information_schema | ST_UNITS_OF_MEASURE | DESCRIPTION | varchar | |\n\nThe following columns are in the x$user_summary_by_file_io table in the def database:\n\n| | TABLE_CATALOG | TABLE_SCHEMA | TABLE_NAME | COLUMN_NAME | DATA_TYPE | COLUMN_COMMENT |\n|-----:|:----------------|:---------------|:--------------------------|:--------------|:------------|:-----------------|\n| 3754 | def | sys | x$user_summary_by_file_io | user | varchar | |\n| 3755 | def | sys | x$user_summary_by_file_io | ios | decimal | |\n| 3756 | def | sys | x$user_summary_by_file_io | io_latency | decimal | |\n\n===Response Guidelines \n1. If the provided context is sufficient, please generate a valid SQL query without any explanations for the question. \n2. If the provided context is almost sufficient but requires knowledge of a specific string in a particular column, please generate an intermediate SQL query to find the distinct strings in that column. Prepend the query with a comment saying intermediate_sql \n3. If the provided context is insufficient, please explain why it can't be generated. \n4. Please use the most relevant table(s). \n5. If the question has been asked and answered before, please repeat the answer exactly as it was given before. \n6. Ensure that the output SQL is SQL-compliant and executable, and free of syntax errors. \n"}, {'role': 'user', 'content': '针对每个年龄段,统计人数,以10岁为一个年龄段?'}, {'role': 'assistant', 'content': "SELECT \n FLOOR(age/10)*10 AS age_group,\n CONCAT(FLOOR(age/10)*10, '-', FLOOR(age/10)*10+9) AS age_range,\n COUNT(*) AS count\nFROM my_table\nGROUP BY FLOOR(age/10)\nORDER BY age_group\n"}, {'role': 'user', 'content': '列出小于10的人名字和年龄?'}, {'role': 'assistant', 'content': 'SELECT name, age FROM my_table WHERE age < 10'}, {'role': 'user', 'content': '有多少个年龄小于30的人?'}, {'role': 'assistant', 'content': 'SELECT COUNT(*) FROM my_table WHERE age < 30'}, {'role': 'user', 'content': '有多少个年龄小于30的人?'}, {'role': 'assistant', 'content': 'SELECT count(*) FROM my_table WHERE age < 30'}, {'role': 'user', 'content': 'How many individuals are under 30 years old?'}, {'role': 'assistant', 'content': 'SELECT count(*) FROM my_table WHERE age < 30'}, {'role': 'user', 'content': '针对每个年龄段,统计人数,以10岁为一个年龄段?'}]
LLM Response: SELECT
FLOOR(age/10)*10 AS age_group,
CONCAT(FLOOR(age/10)*10, '-', FLOOR(age/10)*10+9) AS age_range,
COUNT(*) AS count
FROM my_table
GROUP BY FLOOR(age/10)
ORDER BY age_group
SELECT
FLOOR(age/10)*10 AS age_group,
CONCAT(FLOOR(age/10)*10, '-', FLOOR(age/10)*10+9) AS age_range,
COUNT(*) AS count
FROM my_table
GROUP BY FLOOR(age/10)
ORDER BY age_group
age_group age_range count
0 0 0-9 11
1 10 10-19 12
2 20 20-29 12
3 30 30-39 14
4 40 40-49 14
5 50 50-59 12
6 60 60-69 12
7 70 70-79 12
8 80 80-89 1
绘制图表
根据查询输出的数据,使用 Python 交互式可视化库 plotly 进行图表绘制。
可视化界面演示
Vanna.AI 提供了一个内置的基于 Flask 框架的 Web APP,可以直接运行后,通过更直观的界面与你的数据库对话,并且具有图表可视化的效果,还内置了简单的 RAG Model 数据的管理功能。
通过下面方式启动 Web App:
from vanna.flask import VannaFlaskApp
app = VannaFlaskApp(vn,
auth=SimplePassword(users=[{"email": "admin@mail.com", "password": "<your-password>"}]),
logo="https://<xxxxxx>.com/16540818023239040",
allow_llm_to_see_data=True,
summarization=True,
title=" XiaoliangGenBI",
subtitle="sql 生成器")
app.run(port = 8084, host = '0.0.0.0' , debug = True)
Web 页面介绍
通过 Web 页面登陆 Vanna.AI,可以看到两个模块:
查询数据
可以通过自然语言生成的 SQL,并用表格形式展示查询结果:
绘制图表
Vanna.AI 内置了强大的可视化功能,通过大型语言模型(LLM)智能生成图表代码,并基于 Python 著名的开源交互式可视化库 plotly 进行渲染展示。系统不仅能提供查询结果的 AI 智能汇总分析,还支持多样化的数据可视化呈现,包括柱状图、曲线图等多种图表类型。用户可根据具体需求灵活调整可视化方式,实现数据洞察的直观展现,使数据分析成果更加清晰明了。
训练数据
可以通过 DDL、Documentation、SQL 维度,增加训练数据:
至此,我们成功实现了通过自然语言查询业务数据的功能。极大简化了数据库查询流程,使非技术背景的用户也能轻松获取所需信息。这不仅为业务人员提供了便利,更为组织中的各类人员开辟了挖掘数据价值的新途径,有效促进了数据驱动决策的普及与应用。
*前述特定亚马逊云科技生成式人工智能相关的服务仅在亚马逊云科技海外区域可用,亚马逊云科技中国仅为帮助您了解行业前沿技术和发展海外业务选择推介该服务。
参考文档
https://github.com/vanna-ai/vanna
https://docs.aws.amazon.com/zh_cn/bedrock/latest/userguide/getting-started.html
https://docs.aws.amazon.com/zh_cn/vector-databases/
本篇作者