Preventing Sensitive Data Exposure in LLMs
Many organizations want to leverage LLMs to help users query their databases more effectively. However, there’s a critical challenge: enterprise databases often contain sensitive information like PII that requires strict data governance. Simply passing database results to LLM APIs (like Azure OpenAI or Gemini) could expose sensitive data and usually requires extensive security reviews and whitelisting processes that can delay deployment.
So I developed a structured data anonymization solution that creates a secure pathway between LLMs and databases. The solution employs a two-stage approach: first, the SQL is generated in a sandbox environment that mirrors the production database’s structure — either by replicating the full DDL or by including only the required columns — using synthetic data. This setup ensures the model learns the table information without any risk to real data. In the 2nd stage, the generated SQL queries are executed against the production database, where any sensitive data in the results is detected and anonymized before being sent to the LLM. The LLM then processes this anonymized data to generate an answer, which is de-anonymized back to the original values before being presented to the user.
Below is a step-by-step breakdown of how the flow operates in my project:
User Query & BigQuery Sandbox Interaction
The user’s question is received by the system and processed through a LangGraph workflow, starting with SQL generation using Google’s Gemini 2.0 Flash model.
The system connects to a sandbox BigQuery dataset that mirrors the production schema, allowing LLMs to understand the database structure and generate appropriate SQL without accessing sensitive data.
The sandbox dataset resides in a separate GCP project, ensuring complete isolation from production data.
SQL Generation with Gemini
LLM generates SQL queries by combining:
- The user’s natural language question.
- The sandbox BigQuery dataset table structure.
The generated SQL is validated against the sandbox environment.
Production Query Execution
The validated SQL is executed against the production BigQuery project through a dedicated execution node.
The query returns structured data in a format that maintains column names and data types.
PII Detection & Processing Paths
The Presidio-based PII detection node scans the structured BigQuery results.
Clean Data Path:
- If no PII is detected in the query results.
- The structured data is passed directly to LLM for answer generation.
- LLM creates a summary and suggests follow-up questions.
PII Protection Path:
- When PII is found in the BigQuery results.
- The anonymization node uses Presidio to create reversible anonymized data.
- A new question is generated matching the original format.
- LLM processes the anonymized data to generate an answer.
- The final step de-anonymizes the response to show actual production data.
Final Answer Delivery
The system returns:
- A concise summary of the data.
- A suggested follow-up question.
LangGraph Workflow Management
The entire process is orchestrated using LangGraph, which:
- Manages state between nodes.
- Handles conditional routing based on PII detection.
- Provides progress tracking through the pipeline.
- Ensures proper error handling at each step.
Here is the graph nodes diagram:
Implementation
Let’s look at how the solution is implemented in code. The solution is built using two main Python files main.py
and graph_chain.py
:
graph_chain.py
This file contains the core workflow implementation, including LangGraph nodes, PII detection, and data anonymization logic:
from functools import partial
import re
import json
from typing import Any, Dict, Tuple, Annotated, Literal, Optional, List
from typing_extensions import TypedDict
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_core.messages import HumanMessage
from langchain_core.prompts import PromptTemplate
from langchain.chains import create_sql_query_chain
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.memory import MemorySaver
from langchain_experimental.data_anonymizer import PresidioReversibleAnonymizer
from presidio_anonymizer.entities import OperatorConfig
from presidio_analyzer import PatternRecognizer
from faker import Faker
fake = Faker('en_AU')
Faker.seed(42)
memory = MemorySaver()
SQL_PROMPT_TEMPLATE = """
You are a bigquery SQL expert. Given an input question, create a syntactically correct SQL query to run. Unless otherwise specified, do not return more than {top_k} rows.
Here is the relevant table info:
{table_info}
Question: {input}
Important Guidelines:
1. Ensure that all attribute searches are case-insensitive.
2. ALWAYS add 'LIMIT {top_k}' at the end of the query unless:
- The question explicitly asks for all records
- The query uses GROUP BY and needs to show all groups
- The query is counting records (using COUNT)
- The query calculates aggregates that need all data
Double check the user's bigquery sql query for common mistakes, including:
- Using NOT IN with NULL values
- Using UNION when UNION ALL should have been used
- Using BETWEEN for exclusive ranges
- Data type mismatch in predicates
- Properly quoting identifiers
- Using the correct number of arguments for functions
- Casting to the correct data type
- Using the proper columns for joins
- Missing LIMIT clause when returning raw records
If there are any of the above mistakes, rewrite the query.
If there are no mistakes, just reproduce the original query with no further commentary.
If the question does not seem related to the database, just return "I don't know" as the answer.
**YOU MUST OUTPUT ONLY THE SQL QUERY AS PLAIN TEXT. DO NOT INCLUDE ANY EXPLANATIONS, INTRODUCTORY PHRASES, MARKDOWN CODE BLOCKS, OR ANY OTHER TEXT. The output must be directly executable SQL. Do not include any backticks or code fences around the SQL.**
"""
class GraphState(TypedDict):
messages: Annotated[list, add_messages] = []
sql: Optional[str] = ""
results: Any = None
has_pii: Optional[bool]
anonymized_data: Dict[str, Any]
anonymized_question: Optional[str]
answer: Optional[str]
progress: Optional[str]
def setup_anonymizer():
"""Initialize the Presidio reversible anonymizer."""
return PresidioReversibleAnonymizer(analyzed_fields=[],
faker_seed=42)
anonymizer = setup_anonymizer()
def convert_dates(obj):
response_str = re.sub(
r'datetime\.date\((\d+),\s*(\d+),\s*(\d+)\)', r"'\1-\2-\3'", obj)
response_str = re.sub(r'datetime\.datetime\((\d+),\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+)(?:,\s*(\d+))?\)',
r"'\1-\2-\3 \4:\5:\6.\7'", response_str)
return response_str
def create_sql_node(state: GraphState, config: RunnableConfig, llm, db_sandbox) -> GraphState:
"""Generates SQL using sandbox database schema"""
real_question = state["messages"][-1].content
prompt_template = PromptTemplate(
template=SQL_PROMPT_TEMPLATE,
input_variables=["input", "top_k", "table_info"]
)
sql_chain = create_sql_query_chain(llm, db_sandbox, prompt=prompt_template)
sql = sql_chain.invoke({
"question": real_question,
"top_k": 10
})
cleaned_sql = sql.replace("```sql", "").replace("```", "").strip().lower()
state["sql"] = cleaned_sql
state["progress"] = "SQL generation completed."
return state
def create_execute_node(state: GraphState, config: RunnableConfig, db_production) -> GraphState:
"""Executes generated SQL against production database"""
sql = state["sql"]
results = convert_dates(db_production.run_no_throw(
sql, include_columns=True))
state["results"] = results
state["progress"] = "SQL execution completed."
print("Production DB Response:")
print(results)
return state
def create_pii_detection_node(state: GraphState) -> Tuple[str, GraphState]:
"""Checks if query results contain PII based on entity configurations"""
results = state["results"]
has_pii = any(key in str(results) for key in entity_configs.keys())
state["has_pii"] = has_pii
return state
def create_anonymization_node(state: GraphState, config: RunnableConfig) -> GraphState:
"""Anonymizes detected PII in query results using Presidio"""
results = state["results"]
sql = state["sql"]
anonymized_result = process_query_result(anonymizer)({
"result": results,
"query": sql,
"question": state["messages"][-1].content
})
state["anonymized_data"] = anonymized_result
state["progress"] = "Anonymization completed."
return state
def create_question_generation_node(state: GraphState, config: RunnableConfig, llm) -> GraphState:
"""Reformulates original question using anonymized data"""
anonymized_data = state["anonymized_data"]
question_prompt = PromptTemplate.from_template(
"""Using the sample question and its corresponding SQL query below as a guide:
Sample Question: {question}
Sample SQL Query: {query}
Generate a new plain text question that matches the style of the sample question and is relevant to the anonymized SQL result provided below.
Important Guidelines:
- If the SQL result contains addresses, include the full address exactly as shown, without altering its format.
Anonymized SQL Result: {anonymized_text}"""
)
anonymized_question = llm.invoke(
question_prompt.format(
question=state["messages"][-1].content,
query=state["sql"],
anonymized_text=anonymized_data["anonymized_text"]
)
)
print("Anonymized question:")
print(anonymized_question.content)
state["anonymized_question"] = anonymized_question
state["progress"] = "Anonymized question generated."
return state
class AnswerSchema(BaseModel):
summary: str = Field(description="A concise summary of the answer.")
followup_question: str = Field(
description="A followup question the user could ask")
def create_answer_node(state: GraphState, config: RunnableConfig, llm) -> GraphState:
"""Generate answer based on the question and SQL results."""
answer_prompt = PromptTemplate.from_template(
"""Given a user question and the corresponding SQL results, provide a clear and accurate answer based on the SQL results. If a SQL query is provided but appears incorrect or suboptimal, ignore it and focus solely on the question and SQL results. Only consider the SQL query if it is correct and adds value to your answer.
Do not change the date format in your answer.
Do not change the address format in your answer.
Question: {question}
SQL Result: {sql_result}
[Optional Reference—use only if the query is correct]
SQL Query: {query}
Answer: """
)
structured_llm = llm.with_structured_output(AnswerSchema)
if state["has_pii"]:
print("Send the anonymized data to the LLM:")
print(state["anonymized_data"]["anonymized_text"])
answer = structured_llm.invoke(
answer_prompt.format(
question=state["anonymized_question"].content,
query=state["anonymized_data"]["anonymized_query"],
sql_result=state["anonymized_data"]["anonymized_text"]
)
)
anonymizer = setup_anonymizer()
answer = anonymizer.deanonymize(answer)
else:
answer = structured_llm.invoke(
answer_prompt.format(
question=state["messages"][-1].content,
query=state["sql"],
sql_result=state["results"]
)
)
state["answer"] = answer
state["progress"] = "Answer generated."
return state
def create_deanonymize_node(state: GraphState, config: RunnableConfig) -> GraphState:
answer = state["answer"]
if isinstance(answer, str):
deanonymized_answer = anonymizer.deanonymize(answer)
elif isinstance(answer, BaseModel):
answer_json = answer.model_dump_json()
deanonymized_json = anonymizer.deanonymize(answer_json)
deanonymized_answer = deanonymized_json
else:
deanonymized_answer = answer
state["answer"] = deanonymized_answer
state["progress"] = "Answer deanonymized."
return state
def create_format_response_node(state: GraphState, config: RunnableConfig) -> GraphState:
answer = state.get("answer")
sql_response = state.get("sql")
if sql_response == "i don't know":
response = json.dumps({"answer": "I don't know"})
elif isinstance(answer, BaseModel):
response = answer.model_dump_json()
elif isinstance(answer, str):
response = answer
else:
response = json.dumps({"error": "Invalid answer format"})
state["answer"] = response
state["progress"] = "Final response formatted."
return state
def check_sql_route(state: GraphState) -> Literal["format_response", "execute_sql"]:
if state["sql"].strip().lower() == "i don't know":
return "format_response"
else:
return "execute_sql"
def pii_routing(state: GraphState) -> str:
if state["has_pii"]:
print(
f"PII Routing: PII detected. Routing to pii_flow. State: {state}")
return "pii_flow"
else:
print(
f"PII Routing: No PII detected. Routing to simple_flow. State: {state}")
return "simple_flow"
def build_graph(llm, db_sandbox, db_production):
"""
Build the LangGraph workflow.
db_production is used for executing SQL on the production DB.
"""
nodes = {
"sql_generation": create_sql_node,
"execute_sql": create_execute_node,
"detect_pii": create_pii_detection_node,
"anonymize": create_anonymization_node,
"generate_question": create_question_generation_node,
"generate_answer": create_answer_node,
"deanonymize": create_deanonymize_node,
"format_response": create_format_response_node,
}
workflow = StateGraph(GraphState)
workflow.add_node("sql_generation", partial(
nodes["sql_generation"], llm=llm, db_sandbox=db_sandbox))
workflow.add_node("execute_sql", partial(
nodes["execute_sql"], db_production=db_production))
workflow.add_node("detect_pii", nodes["detect_pii"])
workflow.add_node("anonymize", nodes["anonymize"])
workflow.add_node("generate_question", partial(
nodes["generate_question"], llm=llm))
workflow.add_node("generate_answer", partial(
nodes["generate_answer"], llm=llm))
workflow.add_node("deanonymize", nodes["deanonymize"])
workflow.add_node("format_response", nodes["format_response"])
workflow.add_conditional_edges(
"sql_generation",
check_sql_route
)
workflow.add_edge("execute_sql", "detect_pii")
workflow.add_conditional_edges(
"detect_pii",
pii_routing,
{
"pii_flow": "anonymize",
"simple_flow": "generate_answer"
}
)
workflow.add_edge("anonymize", "generate_question")
workflow.add_edge("generate_question", "generate_answer")
workflow.add_edge("generate_answer", "deanonymize")
workflow.add_edge("deanonymize", "format_response")
workflow.add_edge("format_response", END)
workflow.set_entry_point("sql_generation")
return workflow.compile(checkpointer=memory)
def create_chain(llm, db_sandbox, db_production):
"""
Create the complete chain invocation function.
"""
graph = build_graph(llm, db_sandbox, db_production)
config = {"configurable": {"thread_id": "1"}}
def invoke_chain_stream(question: str):
initial_state = {
"messages": [HumanMessage(content=question)],
"sql": "",
"results": None,
"has_pii": False,
"anonymized_data": {},
"anonymized_question": "",
"answer": "",
"progress": "Starting workflow"
}
for partial_response in graph.stream(initial_state, config,
stream_mode="values"):
yield partial_response
return invoke_chain_stream
def process_query_result(anonymizer):
def process(result):
entities = {
'first_name': set(),
'last_name': set(),
'dob': set(),
'full_address': set(),
'state': set(),
'country': set(),
'contact_value': set()
}
if isinstance(result, dict) and isinstance(result.get("result"), str):
try:
result_data = eval(result["result"])
if isinstance(result_data, list):
for row in result_data:
for key in entities:
if row.get(key) and str(row[key]).strip():
entities[key].add(str(row[key]).strip())
except (SyntaxError, NameError) as e:
print(f"Error while evaluating result: {e}")
return None
else:
print("Error: Expected result to be a dictionary with 'result' as a string.")
return None
for entity_name, (context_words, fake_function) in entity_configs.items():
if entities[entity_name]:
add_recognizer_and_operator(
anonymizer,
entity_name.upper(),
entities[entity_name],
context_words,
fake_function
)
return {
"query": result["query"],
"anonymized_text": anonymizer.anonymize(json.dumps(result["result"])),
"anonymized_query": anonymizer.anonymize(json.dumps(result["query"]))
}
return process
def add_recognizer_and_operator(anonymizer, entity_name, entity_list, context_words, fake_function):
if entity_list:
recognizer = PatternRecognizer(
supported_entity=entity_name,
deny_list=list(entity_list),
context=context_words
)
operator = {
entity_name: OperatorConfig("custom", {"lambda": fake_function})
}
anonymizer.add_recognizer(recognizer)
anonymizer.add_operators(operator)
entity_configs = {
'first_name': (["first_name", "first", "name", "given"], lambda _: fake.first_name()),
'last_name': (["last_name", "last", "name", "surname", "family"], lambda _: fake.last_name()),
'dob': (["dob", "date of birth", "birth date", "birthdate"], lambda _: fake.date_of_birth().isoformat()),
'full_address': (["full_address", "full address", "address", "location", "residence"], lambda _: fake.street_address()),
'state': (["state", "province", "region"], lambda _: fake.state()),
'country': (["country", "nation"], lambda _: fake.country()),
'contact_value': (["contact", "email", "phone", "contact value"],
lambda value: fake.email() if re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', str(value)) else fake.phone_number())
}
In the code above, PresidioReversibleAnonymizer
is used to enable both anonymization and de-anonymization of data. ad-hoc column recognition in Presidio helps identify sensitive fields, while SQL query results include column names using:
db_production.run_no_throw(sql, include_columns=True)
Custom entity handlers are also defined to manage different types of PII, ensuring precise control over data anonymization and restoration.
def process_query_result(anonymizer):
def process(result):
entities = {
'first_name': set(),
'last_name': set(),
'dob': set(),
'full_address': set(),
'state': set(),
'country': set(),
'contact_value': set()
}
if isinstance(result, dict) and isinstance(result.get("result"), str):
try:
result_data = eval(result["result"])
if isinstance(result_data, list):
for row in result_data:
for key in entities:
if row.get(key) and str(row[key]).strip():
entities[key].add(str(row[key]).strip())
except (SyntaxError, NameError) as e:
print(f"Error while evaluating result: {e}")
return None
else:
print("Error: Expected result to be a dictionary with 'result' as a string.")
return None
for entity_name, (context_words, fake_function) in entity_configs.items():
if entities[entity_name]:
add_recognizer_and_operator(
anonymizer,
entity_name.upper(),
entities[entity_name],
context_words,
fake_function
)
return {
"query": result["query"],
"anonymized_text": anonymizer.anonymize(json.dumps(result["result"])),
"anonymized_query": anonymizer.anonymize(json.dumps(result["query"]))
}
return process
def add_recognizer_and_operator(anonymizer, entity_name, entity_list, context_words, fake_function):
if entity_list:
recognizer = PatternRecognizer(
supported_entity=entity_name,
deny_list=list(entity_list),
context=context_words
)
operator = {
entity_name: OperatorConfig("custom", {"lambda": fake_function})
}
anonymizer.add_recognizer(recognizer)
anonymizer.add_operators(operator)
entity_configs = {
'first_name': (["first_name", "first", "name", "given"], lambda _: fake.first_name()),
'last_name': (["last_name", "last", "name", "surname", "family"], lambda _: fake.last_name()),
'dob': (["dob", "date of birth", "birth date", "birthdate"], lambda _: fake.date_of_birth().isoformat()),
'full_address': (["full_address", "full address", "address", "location", "residence"], lambda _: fake.street_address()),
'state': (["state", "province", "region"], lambda _: fake.state()),
'country': (["country", "nation"], lambda _: fake.country()),
'contact_value': (["contact", "email", "phone", "contact value"],
lambda value: fake.email() if re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', str(value)) else fake.phone_number())
}
In my project, PII data attributes include:
- first_name
- last_name
- dob (date of birth)
- full_address
- state
- country
- contact_value (email or phone)
You will need to adjust these attributes based on your own data structure.
main.py
This file handles environment setup, database connections, and user interaction:
import os
import json
from dotenv import load_dotenv
import argparse
from typing import Optional
from langchain_community.utilities import SQLDatabase
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.globals import set_llm_cache
from langchain_core.caches import InMemoryCache
from langchain.schema import HumanMessage, AIMessage, SystemMessage
from pydantic import BaseModel
from graph_chain import create_chain
def setup_environment():
load_dotenv()
set_llm_cache(InMemoryCache())
llm = ChatGoogleGenerativeAI(
model="gemini-2.0-flash",
temperature=0,
max_tokens=None,
timeout=None,
max_retries=2,
verbose=True
)
service_account_file = f"{os.getcwd()}/{os.getenv('SERVICE_ACCOUNT_FILE')}"
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = service_account_file
return llm, service_account_file
def create_bigquery_urls(service_account_file):
config = {
'project': os.getenv("BIGQUERY_PROJECT"),
'project_sandbox': os.getenv("BIGQUERY_PROJECT_SANDBOX"),
'dataset': os.getenv("BIGQUERY_DATASET"),
'dataset_sandbox': os.getenv("BIGQUERY_DATASET_SANDBOX")
}
base_url = "bigquery://{project}/{dataset}?credentials_path={creds}"
urls = {
'sandbox': base_url.format(
project=config['project_sandbox'],
dataset=config['dataset_sandbox'],
creds=service_account_file
),
'unmasked': base_url.format(
project=config['project'],
dataset=config['dataset'],
creds=service_account_file
)
}
db_sandbox = SQLDatabase.from_uri(urls['sandbox'])
db_unmasked = SQLDatabase.from_uri(urls['unmasked'])
return db_sandbox, db_unmasked
def message_to_dict(message):
"""Convert LangChain message objects to a JSON-serializable format."""
if isinstance(message, (HumanMessage, AIMessage, SystemMessage)):
return {"role": message.__class__.__name__, "content": message.content}
return message
def response_to_serializable(response):
if isinstance(response, dict):
return {k: response_to_serializable(v) for k, v in response.items()}
elif isinstance(response, list):
return [response_to_serializable(item) for item in response]
elif isinstance(response, BaseModel):
return response.model_dump()
try:
return message_to_dict(response)
except Exception:
return str(response)
def process_question(chain, question: str) -> Optional[dict]:
try:
final_answer = None
for response in chain(question):
if isinstance(response, dict):
if "answer" in response:
answer_data = response["answer"]
if isinstance(answer_data, str):
try:
answer_data = json.loads(answer_data)
except json.JSONDecodeError:
answer_data = {"error": "Invalid answer format"}
final_answer = answer_data
if "progress" in response:
progress = response["progress"]
print(f"Progress: {progress}")
return final_answer
except Exception as e:
print(f"Error processing question: {str(e)}")
return None
def print_answer(answer: dict):
if answer:
print("\n=== Answer ===")
if isinstance(answer, BaseModel):
answer = answer.model_dump()
print(f"Summary: {answer.get('summary', 'N/A')}")
print(f"Follow-up Question: {answer.get('followup_question', 'N/A')}")
print("=============\n")
def interactive_mode():
print("Setting up environment...")
llm, service_account_file = setup_environment()
print("Connecting to BigQuery...")
db_sandbox, db_unmasked = create_bigquery_urls(service_account_file)
print("Creating chain...")
graph_chain = create_chain(llm, db_sandbox, db_unmasked)
print("\nInteractive CLI Ready!")
print("Type 'exit' or 'quit' to end the session")
print("Enter your question below:\n")
while True:
try:
question = input("> ")
if question.lower() in ['exit', 'quit']:
print("Goodbye!")
break
if not question.strip():
continue
answer = process_question(graph_chain, question)
print_answer(answer)
except KeyboardInterrupt:
print("\nGoodbye!")
break
except Exception as e:
print(f"An error occurred: {str(e)}")
def main():
parser = argparse.ArgumentParser(
description="Interactive CLI for Graph Chain")
parser.add_argument("--question", "-q", help="Single question mode")
args = parser.parse_args()
if args.question:
# Single question mode
llm, service_account_file = setup_environment()
db_sandbox, db_unmasked = create_bigquery_urls(service_account_file)
graph_chain = create_chain(llm, db_sandbox, db_unmasked)
answer = process_question(graph_chain, args.question)
print_answer(answer)
else:
# Interactive mode
interactive_mode()
if __name__ == "__main__":
main()
Running the Application
Now that we understand how the system works and have reviewed the code, let’s get it up and running to see it in action.
Before running the application, you’ll need to install the required Python libraries. You can install them using pip:
pip install langgraph langchain langchain-community langchain-openai "langchain-google-community[bigquery]" \
langchain-google-genai langchain_experimental python-dotenv sqlalchemy-bigquery \
presidio-analyzer presidio-anonymizer spacy Faker
Environment Setup
With all dependencies installed, let’s configure our environment.
Create a .env
file in your project root with the following configurations:
GOOGLE_APPLICATION_CREDENTIALS=path/to/your/service-account.json
BIGQUERY_PROJECT=your-production-project
BIGQUERY_PROJECT_SANDBOX=your-sandbox-project
BIGQUERY_DATASET=your-production-dataset
BIGQUERY_DATASET_SANDBOX=your-sandbox-dataset
Running the Application
You can run the application in interactive mode:
python main.py
Let’s test it with a sample query to observe how the system manages PII detection and anonymization during database interaction.
Sample Interaction
Let’s look at a real log that shows how the system protects sensitive data:
Retrieve the phone number and email address of a customer whose firstname is Danielle
Progress: Starting workflow
Progress: SQL generation completed.
Production DB Response:
[{'contact_value': '+61 3 5881 5371'}, {'contact_value': 'shannon21@example.org'}, {'contact_value': '(08)-9632-5953'}, {'contact_value': 'tyronerobertson@example.com'}, {'contact_value': '(08)-0168-7339'}, {'contact_value': 'brandon47@example.net'}, {'contact_value': '+61-474-801-624'}, {'contact_value': 'kathryn50@example.com'}, {'contact_value': '9835-8416'}, {'contact_value': 'joel28@example.org'}]
Progress: SQL execution completed.
PII Routing: PII detected. Routing to pii_flow. State: {'messages': [HumanMessage(content='Retrieve the phone number and email address of a customer whose firstname is Danielle', additional_kwargs={}, response_metadata={}, id='df1cdabf-bdee-43e1-859b-c83909747a53')], 'sql': "select\n t2.contact_value\n from\n `customer` as t1\n inner join `contact` as t2 on t1.customer_key = t2.customer_key\n where lower(t1.first_name) = 'danielle'\n and (lower(t2.contact_type) = 'phone' or lower(t2.contact_type) = 'email')\nlimit 10", 'results': "[{'contact_value': '+61 3 5881 5371'}, {'contact_value': 'shannon21@example.org'}, {'contact_value': '(08)-9632-5953'}, {'contact_value': 'tyronerobertson@example.com'}, {'contact_value': '(08)-0168-7339'}, {'contact_value': 'brandon47@example.net'}, {'contact_value': '+61-474-801-624'}, {'contact_value': 'kathryn50@example.com'}, {'contact_value': '9835-8416'}, {'contact_value': 'joel28@example.org'}]", 'has_pii': True, 'anonymized_data': {}, 'anonymized_question': '', 'answer': '', 'progress': 'SQL execution completed.'}
Progress: SQL execution completed.
Progress: Anonymization completed.
Anonymized question:
Retrieve the phone number and email address of a customer.
Progress: Anonymized question generated.
Send the anonymized data to the LLM:
"[{'contact_value': '+61 457 015 430'}, {'contact_value': 'harrellkenneth@example.net'}, {'contact_value': '+61 8 8480 1845'}, {'contact_value': 'jeffrey28@example.com'}, {'contact_value': '3953.7672'}, {'contact_value': 'onelson@example.net'}, {'contact_value': '(02)-3164-7525'}, {'contact_value': 'yherrera@example.org'}, {'contact_value': '+61.7.0265.4235'}, {'contact_value': 'garzaanthony@example.org'}]"
Progress: Answer generated.
Progress: Answer deanonymized.
Progress: Final response formatted.
=== Answer ===
Summary: The phone number and email address of the customer are: +61 3 5881 5371, shannon21@example.org, (08)-9632-5953, tyronerobertson@example.com, (08)-0168-7339, brandon47@example.net, +61-474-801-624, kathryn50@example.com, 9835-8416 and joel28@example.org
Follow-up Question: Can you retrieve other contact information for this customer?
=============
This example demonstrates how the system:
- Safely executes the query against production
- Automatically detects PII in the contact information
- Creates anonymized data for LLM processing
- Returns de-anonymized results to the user
Conclusion
There you have it. If you find this approach useful, feel free to adapt it for your own use case. The code is ready to run, and the design can be applied to any database system that needs to balance AI capabilities with data privacy.