我正在构建一个聊天机器人应用程序,该应用程序连接到 SQL 服务器,抓取自定义数据表,将它们转换为 pandas 数据框,然后将它们与 Langchain 代理一起使用。页面加载/签名时...
我正在构建一个聊天机器人应用程序,该应用程序连接到 SQL 服务器,抓取自定义数据表,将它们转换为 pandas 数据框,然后将它们与 Langchain 代理一起使用。在页面加载/登录时,用户的电子邮件用于确定从同一数据库的不同 SQL 表中提取的敏感信息的访问级别。这些表存储在数据框中,并与我的 llm 一起传递给 pandas Langchain 代理。每当用户提出问题时,它都会调用代理,然后代理会做出回应。在我的后端,我还会检查用户是否想要某种形式的可视化,并使用 plotly、数据框和我让代理创建的自定义代码来创建它。
在本地,当我为一个用户及其电子邮件进行测试时,这种方法确实很有效。但是,我不确定当许多用户同时使用该网站时如何扩展它。我正在努力弄清楚如何确保数据帧、代理/llm 的内存和代理本身可以为每个人初始化并在众多请求中使用。
目前,这是我对网络应用程序启动的实现:
from flask import Flask, g, sessions, request, jsonify, render_template, session
from flask_session import Session
from sqlalchemy import create_engine, text
import pandas as pd
import os
import urllib.parse
from langchain_experimental.agents.agent_toolkits import create_pandas_dataframe_agent
from langchain_openai import AzureChatOpenAI
from langchain import prompts
from langchain.agents.agent_types import AgentType
from langchain.memory import ConversationBufferWindowMemory
import seaborn as sns
import plotly.express as px
import re
import plotly.io as pio
import logging
from sqlalchemy.engine import URL
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__, template_folder='templates')
secret_key = os.environ['FLASK_SECRET_KEY']
app.secret_key = secret_key
user = os.environ["USER_NAME"]
password = os.environ["PASSWORD"]
hostName = os.environ["HOSTNAME"]
port = os.environ["SQLPORT"]
db = os.environ["DATABASE"]
db_context = os.environ["CONTEXT"]
params = urllib.parse.quote_plus(
"Driver={ODBC Driver 17 for SQL Server};"
f"Server=tcp:{hostName},1433;"
f"Database={db};"
f"Uid={user};"
f"Pwd={password};"
"Encrypt=yes;"
"TrustServerCertificate=yes;" #yes if running locally, no for production
"Connection Timeout=240;"
)
connection_string = f"mssql+pyodbc:///?odbc_connect={params}"
app.config['SQLALCHEMY_DATABASE_URI'] = connection_string
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
@app.route('/')
def home():
global dataframes, memory, chat_prompt, context, userName, userImage, agent
chat_prompt = chat_prompt_init()
email = request.args.get('email')
logging.info('email got')
if not email:
return render_template('no_email.html')
dataframes = load_data_frames1(email)
context = dataframes.get("context")
contractDetails = dataframes.get("contract_details")
projectDetails = dataframes.get("project_details")
storeDetails = dataframes.get("store_details")
userName = dataframes.get("name")
userImage = dataframes.get("profile_pic")
memory = ConversationBufferWindowMemory()
client = AzureChatOpenAI(
api_key=os.environ["AZURE_OPENAI_KEY"],
api_version="2023-12-01-preview",
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
deployment_name=os.environ["DEPLOYMENT_NAME"],
temperature=0
)
agent = create_pandas_dataframe_agent(llm=client, df=[storeDetails, contractDetails, projectDetails], verbose=True, agent_type=AgentType.OPENAI_FUNCTIONS, return_intermediate_steps=True)
return render_template('Index.html')
当用户按下发送按钮或点击输入他们的查询/问题时,这是我的路线:
@app.route('/process_query', methods=['POST'])
def process_query():
if not agent:
return jsonify({"error": "Agent not initialized"}), 500
# Extract query from the POST request
data = request.json
query = data.get('query')
if not query:
return jsonify({"error": "No query provided"}), 400
try:
response = agent.invoke(chat_prompt.format_prompt(query=query, chat_history=memory.buffer_as_messages, context=context).to_messages())
if response["intermediate_steps"]:
queries = extract_queries(response['intermediate_steps'])
else:
queries = None
pattern = r'```python\s(.*?)```'
code_snippets = re.findall(pattern, response["output"], re.DOTALL)
if code_snippets:
cleaned_snippets = [snippet.replace("python\n", "", 1) for snippet in code_snippets]
else:
cleaned_snippets = None
graph = extract_graph_code(output_code=cleaned_snippets, queries=queries)
clean_response = remove_code(response["output"])
memory.chat_memory.add_user_message(query)
memory.chat_memory.add_ai_message(clean_response)
return jsonify({"response": clean_response,
"graph": graph})
except Exception as e:
return jsonify({"error": str(e)}), 500
通常,如果表是简单读取且代理没有内存,那么将这些对象编码到 process_query() 函数中将非常容易。但是,我需要确保每次运行处理查询的请求时代理都不会初始化,因为我需要为每个用户提供持久内存。此外,我无法让每个请求都生成数据帧到 process_query(),因为为了抓取表并生成数据帧,平均需要 27 秒(我还没有找到简化 SQL 查询的方法,以便根据用户访问级别抓取和生成自定义表)。
我需要找到一种方法,使所有对象不作为全局变量(数据框、代理、内存)。我尝试将数据框设为 Json 对象并将它们传递到会话 cookie 中,但仅一个数据框就已经太大了。我一直在尝试在线查找可以做到这一点的不同方法,但似乎找不到好的答案。我见过人们使用 Redis 进行内存存储,但我没有,希望找到替代方案(如果没有,请告诉我)。我计划将此应用程序托管为 Azure Web 应用程序;不确定这是否有助于找到解决我的问题的方法。
我考虑过为每个用户创建一个唯一的、随机生成的 ID,并将代理、内存和数据帧存储在全局字典中,并以 ID 作为密钥来确保它们的安全。注销时,我会删除键值对,以确保字典不会变得太大,从而防止出现内存和空间问题。我非常怀疑这在生产环境中是否是最佳选择,但这是我目前能想到的全部。
我如何获取每个用户的数据/对象并将它们传递到 Flask 中的不同路由?请让我知道我还能做什么或尝试什么!