Building a Real-Time Data Visualization Solution with Generative AI
Leveraging OpenAI, LangChain, and Streamlit for Intelligent Data Analysis and Visualization
This article demonstrates how to build a data visualization system that uses LangChain with LLMs to transform queries into interactive visual insights. The system integrates OpenAI’s GPT-4o-mini for query interpretation and data transformation, BigQuery as a data storage solution, and Streamlit to deliver real-time visualizations.
What We’re Building
We’ll create a full-stack data analysis system that:
- Converts text questions into optimized SQL queries
- Executes queries against BigQuery and processes results
- Extracts relevant data features (like coordinates for geographic data)
- Automatically detect appropriate visualization types based on query results
- Transforms data into format-specific visualization structures
- Provides real-time, interactive visualizations using Streamlit
- Explains insights into natural language
- Handles geographic data visualization with interactive maps and chart-based data visualization using dynamic charts.
Prerequisites
Before we begin, ensure you have the following installed:
- Python 3.7 or later
- OpenAI API key
- Access to a BigQuery dataset
- Necessary Python libraries: Install them using the following pip command:
pip install streamlit langchain openai google-cloud-bigquery pandas "langchain-google-community[bigquery]" python-dotenv langchain_openai plotly
Architecture Overview
Before diving into the implementation, let’s understand the architecture of our data visualization agent.
Components
- User Interface (Streamlit): Allows users to input queries and display results.
- Text to SQL: Uses gpt-4o-mini to convert queries to SQL.
- Query Execution: The SQL query is executed against the database.
- Coordinate Extractor: Extracts geographical coordinates from the query results for map visualizations.
- Visualization Recommender: Uses GPT-4o-mini to suggest the most suitable visualization based on the query and results.
- Data Transformer: Uses GPT-4o-mini to transform the SQL query results into a format suitable for the recommended visualization.
- Visualization Renderer (Streamlit and Plotly): Renders the recommended visualization using the transformed data.
Workflow
- Input: The user enters a question.
- Text-to-SQL Conversion: Converts the question into an SQL query.
- Query Execution: The SQL query is executed against the database.
- Coordinate Extractor: Extracts geographical coordinates from the query results for map visualizations.
- Visualization Recommendation: Based on the question, query, and results, the AI recommends a visualization type.
- Data Transformation: The query results are transformed to fit the recommended visualization format.
- Answer Generation: Generates a textual answer to the user’s question.
- Result Formatting: The answer, visualization, and geographical data are compiled into the final result.
- Output: The user interface displays the answer, visualization, and map (if applicable).
Setting Up the Database
We’ll use a sample dataset consisting of customer information. Below are the DDL statements for creating the necessary tables in the BigQuery dataset:
CREATE TABLE `demo_data_set.customer` (
customer_key STRING NOT NULL,
first_name STRING,
last_name STRING,
source_system_name STRING,
dob DATE,
gender STRING,
create_timestamp TIMESTAMP
);
CREATE TABLE `demo_data_set.customer_address` (
customer_key STRING NOT NULL,
address_key STRING NOT NULL
);
CREATE TABLE `demo_data_set.address` (
address_key STRING NOT NULL,
full_address STRING,
state STRING,
country STRING,
latitude STRING,
longitude STRING
);
Building the Data Visualization Agent
The agent’s workflow involves several steps, from converting user questions into SQL queries to recommending appropriate visualizations and presenting the final answer.
Configuring the database connection and LLM
service_account_file = os.environ["GOOGLE_APPLICATION_CREDENTIALS"]
project = os.environ["GOOGLE_PROJECT"]
dataset = os.environ["BIGQUERY_DATASET"]
sql_url = (
f"bigquery://{project}/{dataset}?credentials_path={service_account_file}"
)
db = SQLDatabase.from_uri(sql_url)
model = ChatOpenAI(model="gpt-4o-mini", temperature=0,
max_tokens="10000", timeout=30000, verbose=True)
Text-to-SQL
We start by converting natural language questions into SQL queries using the LLM model:
from langchain.chains import create_sql_query_chain
sql_prompt = PromptTemplate.from_template("""
You are a SQL expert with access to a BigQuery dataset containing customers and customer addresses.
Given an input question, generate a syntactically correct SQL query to answer it. Unless explicitly requested otherwise, limit the results to {top_k} rows.
Relevant Table Information:
{table_info}
Question: {input}
Guidelines:
1. Ensure that all attribute searches are case-insensitive.
2. ALWAYS add 'LIMIT {top_k}' at the end of the query unless:
- The question explicitly asks for all records
- The query uses GROUP BY and needs to show all groups
- The query is counting records (using COUNT)
- The query calculates aggregates that need all data
Address and Location Queries:
1. For questions about addresses, locations, or properties, always include latitude and longitude columns in the SELECT clause.
Double check the user's sql query for common mistakes, including:
- Using NOT IN with NULL values
- Using UNION when UNION ALL should have been used
- Using BETWEEN for exclusive ranges
- Data type mismatch in predicates
- Properly quoting identifiers
- Using the correct number of arguments for functions
- Casting to the correct data type
- Using the proper columns for joins
- Missing LIMIT clause when returning raw records
If there are any of the above mistakes, rewrite the query.
If there are no mistakes, just reproduce the original query with no further commentary.
Provide only the final SQL query as plain text without any formatting.
If the question is not about customers or addresses, respond with "I don't know"
""")
text_to_sql = create_sql_query_chain(
model, db, sql_prompt)
Executing SQL Queries
Once we have the SQL query, we execute it against the database:
@RunnableLambda
def execute_query(result, config):
dispatch_custom_event(
"process.execute_query",
{
"status": ""
},
config=config
)
return {
**result,
'result': db.run_no_throw(
command=result["query"], include_columns=True)
}
Extracting Coordinates
If the query results include geographical data, we extract latitude and longitude for map visualizations:
def extract_coordinates(self, result: dict):
try:
if isinstance(result, dict):
if "result" in result:
result = result["result"]
if isinstance(result, dict) and "result" in result:
result_str = result["result"]
else:
result_str = str(result)
else:
return None
else:
return None
try:
if isinstance(result_str, str):
result_data = eval(result_str)
else:
result_data = result_str
except Exception as e:
print(f"Error evaluating result string: {e}")
return None
if not isinstance(result_data, list):
return None
unique_lat_values = set()
unique_long_values = set()
for row in result_data:
if isinstance(row, dict):
if 'latitude' in row and row['latitude'] is not None:
try:
unique_lat_values.add(float(row['latitude']))
except (ValueError, TypeError):
pass
if 'longitude' in row and row['longitude'] is not None:
try:
unique_long_values.add(float(row['longitude']))
except (ValueError, TypeError):
pass
if unique_lat_values and unique_long_values:
return {
"latitude": list(unique_lat_values),
"longitude": list(unique_long_values)
}
return None
except Exception as e:
print(f"Error extracting coordinates: {e}")
return None
Recommending Visualizations
We use LLM to recommend the most suitable visualization based on the user’s question, SQL query, and results:
viz_prompt = PromptTemplate.from_template("""
You are an AI assistant that recommends appropriate data visualizations for customer and address analytics. Based on the user's question, SQL query, and query results, suggest the most suitable type of graph or chart to visualize the data.
Available chart types and their best use cases:
- Bar Graphs (for 3+ categories):
* Comparing distributions across multiple categories
* Customer counts by source system
* Customer demographics across regions/states
* Age group distributions
* Monthly/yearly registration counts
- Horizontal Bar Graphs (for 2-3 categories or large value disparities):
* Binary comparisons (e.g., gender distribution)
* Limited category comparisons (2-3 items)
* Cases with large value differences between categories
- Line Graphs (for time series only):
* Customer registration trends over time
* Growth patterns by source system
* Any metric tracked over time periods
Note: X-axis MUST represent time (create_timestamp or similar)
- Pie Charts (for proportions, 3-7 categories max):
* Distribution percentages
* Market share analysis
* Proportional comparisons
Note: Total should sum to 100%
- Scatter Plots (for numeric relationships):
* Age vs other numeric metrics
* Timestamp patterns
* Distribution analysis
Note: Both axes must be numeric, non-categorical
Special Cases:
1. Geographic Data:
* If result contains latitude and longitude → No chart (will display map)
* For address/location questions → No chart (will display map)
2. Raw Data:
* Individual customer records → No chart (tabular display)
* Non-aggregated data → No chart (tabular display)
Tables in scope:
- customer: customer_key, first_name, last_name, source_system_name, dob, gender, create_timestamp
- customer_address: customer_key, address_key
- address: address_key, full_address, state, country, latitude, longitude
Question: {question}
SQL Query: {query}
SQL Result: {result}
Provide your response in the following format:
Recommended Visualization: [Chart type or "none"]. ONLY use the following names: bar, horizontal_bar, line, pie, scatter, none
Reason: [Brief explanation for your recommendation]
""")
@RunnableLambda
def transform_data_for_visualization_chain(args, config):
try:
dispatch_custom_event(
"process.transform_data_for_visualization_chain",
{
"status": ""
},
config=config
)
chart_type = args.get("visualization").get("type")
result = args.get("result")
if not chart_type or not result:
return {"chart_data": None}
if chart_type == 'bar':
transform_prompt = bar_prompt
elif chart_type == 'horizontal_bar':
transform_prompt = horizontal_bar_prompt
elif chart_type == 'pie':
transform_prompt = pie_prompt
elif chart_type == 'scatter':
transform_prompt = scatter_prompt
elif chart_type == 'line':
transform_prompt = line_prompt
else:
transform_prompt = None
assign_chart_type_and_result = RunnableLambda(
lambda args: {**args, "chart_type": args.get("visualization", {}).get(
"type"), "result": args.get("result")}
)
if transform_prompt:
transform_chain = (
assign_chart_type_and_result
| transform_prompt
| model
)
return transform_chain
return {"chart_data": None}
except Exception as e:
print(e)
print(f"Error in transform_data_for_visualization: {e}")
return {"chart_data": None}
viz_prompt
| model
| parse_visualization_response
Transforming Data for Visualization
Depending on the recommended chart type, we transform the data accordingly:
def create_data_transform_prompt(self):
base_prompt = """You are a data transformation expert. Transform the SQL query result into the exact format needed for a {chart_type} chart.
SQL Query Result: {result}
Your response must be a valid JSON object containing ONLY the chart_data field with the exact structure shown in the example.
"""
chart_prompts = {
"bar": """For a bar chart, return JSON in this EXACT format:
{{
"chart_data": {{
"labels": ["Category1", "Category2", ...],
"values": [
{{
"data": [number1, number2, ...],
"label": "Metric Name"
}}
]
}}
}}
Example with SQL: "SELECT source_system_name, COUNT(*) as count FROM customer GROUP BY source_system_name"
{{
"chart_data": {{
"labels": ["System A", "System B", "System C"],
"values": [
{{
"data": [45, 32, 28],
"label": "Customer Count"
}}
]
}}
}}
Example with multiple series:
{{
"chart_data": {{
"labels": ["NSW", "VIC", "QLD"],
"values": [
{{
"data": [500000, 750000, 450000],
"label": "Total Customers"
}},
{{
"data": [35, 30, 28],
"label": "Average Age"
}}
]
}}
}}""",
"horizontal_bar": """For a horizontal bar chart, return JSON in this EXACT format:
{{
"chart_data": {{
"labels": ["Category1", "Category2", ...],
"values": [
{{
"data": [number1, number2, ...],
"label": "Metric Name"
}}
]
}}
}}
Example:
{{
"chart_data": {{
"labels": ["Male", "Female"],
"values": [
{{
"data": [75000, 78000],
"label": "Customer Count"
}}
]
}}
}}""",
"line": """For a line chart, return JSON in this EXACT format:
{{
"chart_data": {{
"xValues": ["2023-01", "2023-02", ...],
"yValues": [
{{
"data": [number1, number2, ...],
"label": "Metric Name"
}}
]
}}
}}
Example:
{{
"chart_data": {{
"xValues": ["2023-01", "2023-02", "2023-03", "2023-04"],
"yValues": [
{{
"data": [12500, 13600, 14800, 15200],
"label": "Monthly Registrations"
}}
]
}}
}}
Example with multiple series:
{{
"chart_data": {{
"xValues": ["2023-01", "2023-02", "2023-03"],
"yValues": [
{{
"data": [5000, 5500, 6000],
"label": "System A Customers"
}},
{{
"data": [4000, 4200, 4500],
"label": "System B Customers"
}}
]
}}
}}""",
"pie": """For a pie chart, return JSON in this EXACT format:
{{
"chart_data": [
{{
"value": number,
"label": "Category Name"
}}
]
}}
Example:
{{
"chart_data": [
{{
"value": 150,
"label": "System A"
}},
{{
"value": 45,
"label": "System B"
}},
{{
"value": 25,
"label": "System C"
}}
]
}}""",
"scatter": """For a scatter plot, return JSON in this EXACT format:
{{
"chart_data": {{
"series": [
{{
"data": [
{{
"x": number,
"y": number,
"id": number
}}
],
"label": "Series Name"
}}
]
}}
}}
Example:
{{
"chart_data": {{
"series": [
{{
"data": [
{{
"x": -33.865,
"y": 151.209,
"id": 1
}},
{{
"x": -37.813,
"y": 144.963,
"id": 2
}},
{{
"x": -27.470,
"y": 153.021,
"id": 3
}}
],
"label": "Customer Locations"
}}
]
}}
}}
Example with multiple series:
{{
"chart_data": {{
"series": [
{{
"data": [
{{
"x": -33.865,
"y": 151.209,
"id": 1
}},
{{
"x": -37.813,
"y": 144.963,
"id": 2
}}
],
"label": "Male Customers"
}},
{{
"data": [
{{
"x": -27.470,
"y": 153.021,
"id": 3
}},
{{
"x": -31.950,
"y": 115.860,
"id": 4
}}
],
"label": "Female Customers"
}}
]
}}
}}"""
}
bar_prompt = base_prompt + chart_prompts.get("bar")
horizontal_bar_prompt = base_prompt + \
chart_prompts.get("horizontal_bar")
pie_prompt = base_prompt + chart_prompts.get("pie")
scatter_prompt = base_prompt + chart_prompts.get("scatter")
line_prompt = base_prompt + chart_prompts.get("line")
return (
PromptTemplate.from_template(bar_prompt),
PromptTemplate.from_template(horizontal_bar_prompt),
PromptTemplate.from_template(pie_prompt),
PromptTemplate.from_template(scatter_prompt),
PromptTemplate.from_template(line_prompt)
)
bar_prompt, horizontal_bar_prompt, pie_prompt, scatter_prompt, line_prompt = create_data_transform_prompt()
@RunnableLambda
def transform_data_for_visualization_chain(args, config):
try:
dispatch_custom_event(
"process.transform_data_for_visualization_chain",
{
"status": ""
},
config=config
)
chart_type = args.get("visualization").get("type")
result = args.get("result")
if not chart_type or not result:
return {"chart_data": None}
if chart_type == 'bar':
transform_prompt = bar_prompt
elif chart_type == 'horizontal_bar':
transform_prompt = horizontal_bar_prompt
elif chart_type == 'pie':
transform_prompt = pie_prompt
elif chart_type == 'scatter':
transform_prompt = scatter_prompt
elif chart_type == 'line':
transform_prompt = line_prompt
else:
transform_prompt = None
assign_chart_type_and_result = RunnableLambda(
lambda args: {**args, "chart_type": args.get("visualization", {}).get(
"type"), "result": args.get("result")}
)
if transform_prompt:
transform_chain = (
assign_chart_type_and_result
| transform_prompt
| model
)
return transform_chain
return {"chart_data": None}
except Exception as e:
print(e)
print(f"Error in transform_data_for_visualization: {e}")
return {"chart_data": None}
Generating the Answer
We generate a textual answer to the user’s question using the LLM model:
create_answer_prommpt = PromptTemplate.from_template(
"""Given the following user question, corresponding SQL query, and SQL result, answer the user question.
Question: {question}
SQL Query: {query}
SQL Result: {result}
Answer: """
)
create_answer_prommpt | model
Formatting the Final Result
Next, we compile the answer, visualization, and any coordinates into the final result:
@RunnableLambda
def format_final_result(result, config):
try:
dispatch_custom_event(
"process.format_final_result",
{
"status": ""
},
config=config
)
if isinstance(result, str):
try:
result = json.loads(result)
except json.JSONDecodeError:
result = {"answer": result}
answer = ""
chart_data = None
chart_type = None
coordinates = None
# Extract chart data from AIMessage
if isinstance(result, dict):
coordinates = result.get("coordinates")
# Get chart type from visualization
visualization = result.get('visualization', {})
if isinstance(visualization, dict):
chart_type = visualization.get('type')
chart_data_msg = result.get('chart_data')
if hasattr(chart_data_msg, 'content'):
try:
content = chart_data_msg.content
content = content.replace(
'```json', '').replace('```', '').strip()
parsed_data = json.loads(content)
if isinstance(parsed_data, dict) and 'chart_data' in parsed_data:
chart_data = parsed_data['chart_data']
except json.JSONDecodeError:
print("Failed to parse chart data JSON")
chart_data = None
answer_msg = result.get('answer')
if hasattr(answer_msg, 'content'):
answer = answer_msg.content
elif isinstance(answer_msg, str):
answer = answer_msg
elif isinstance(answer_msg, dict) and 'content' in answer_msg:
answer = answer_msg['content']
else:
result_data = result.get("result", {})
if isinstance(result_data, dict) and "result" in result_data:
answer = str(result_data["result"])
else:
answer = str(result_data)
response_dict = {
"answer": answer,
"coordinates": coordinates,
"chart_data": chart_data,
"chart_type": chart_type
}
return json.dumps(response_dict)
except Exception as e:
print(f"Error in format_final_result: {e}")
return json.dumps({
"answer": "Error formatting result",
"coordinates": None,
"chart_data": None,
"chart_type": None
})
Full Chain Assembly
We assemble all the steps into a cohesive chain:
chain = (
RunnablePassthrough().assign(query=text_to_sql)
| RunnablePassthrough().assign(
result=execute_query
)
| RunnablePassthrough().assign(
coordinates=lambda x: extract_coordinates(x)
)
| RunnablePassthrough.assign(
visualization=RunnableLambda(
lambda x: {
"question": x.get("question", ""),
"query": x["query"],
"result": x.get("result", {}).get("result")
}
)
| viz_prompt
| model
| parse_visualization_response
)
| RunnablePassthrough().assign(
chart_data=transform_data_for_visualization_chain
)
| RunnablePassthrough.assign(
answer=create_answer_prommpt | model
)
| format_final_result
| StrOutputParser()
)
Integrating with Streamlit
Streamlit allows us to create an interactive web application to interface with our agent. It enables users to interact with the visualization agent, execute database queries, and view dynamic visualizations — all through natural language conversations.
While I won’t be providing the complete source code for the Streamlit app in this article, you can find the example code here. In this section, we’ll focus specifically on how to extend Streamlit with Lanchain Streaming API and custom event handlers to enhance user interaction and provide real-time feedback.
Extending Streamlit with Custom Event Handlers
When using LangChain’s streaming API, the defaultStreamlitCallbackHandler
can only display the LLM's status for the first chain (e.g., text-to-SQL step). To display the LLM's status throughout the entire chain in the Streamlit interface, we need to create a custom event handler that can receive and handle custom events.
Importing the Correct Streamlit Callback Handler
First, import the StreamlitCallbackHandler
from the appropriate module:
from langchain_community.callbacks.streamlit.streamlit_callback_handler import StreamlitCallbackHandler
Next, create a custom class that inherits from StreamlitCallbackHandler
to handle custom events:
class CustomStreamlitCallbackHandler(StreamlitCallbackHandler):
def on_custom_event(self, name: str, data: dict, **kwargs):
"""Handle custom events, update labels, and mark as complete if specified."""
if self._current_thought is not None:
custom_event_label = f"💡{name}"
self._current_thought.container.update(
new_label=custom_event_label)
content = f"**{name}:** {data}"
self._current_thought.container.markdown(content)
is_complete = data.get("is_complete", False)
if is_complete or name == "process.completed":
complete_label = f"✅ Complete, awaiting response"
self._current_thought.complete(final_label=complete_label)
else:
st.write(f"Custom Event Triggered Outside Thought Context: {data}")
def on_llm_end(self, response, **kwargs):
"""Override to ensure the label updates on LLM completion."""
super().on_llm_end(response, **kwargs)
if self._current_thought:
self._current_thought.complete(
final_label="✅ Complete, awaiting response")
def on_tool_end(self, output, **kwargs):
"""Override to ensure the label updates on tool completion."""
super().on_tool_end(output, **kwargs)
if self._current_thought:
self._current_thought.complete(final_label="✅ Tool Complete")
Dispatching Custom Events in LangChain Code
To utilize the custom event handler, dispatch custom events in your LangChain code where appropriate. For example, in the execute_query
step:
from langchain.callbacks import dispatch_custom_event
@RunnableLambda
def execute_query(query, config):
# Send custom event
dispatch_custom_event(
"process.execute_query",
{"status": "Executing SQL query..."},
config=config
)
try:
result = db.run_query(query)
dispatch_custom_event(
"process.execute_query",
{"status": "SQL query executed successfully.", "is_complete": True},
config=config
)
return result
except Exception as e:
dispatch_custom_event(
"process.execute_query",
{"status": f"Error executing SQL query: {e}", "is_complete": True},
config=config
)
return None
Updating the Streamlit App to Use the Custom Handler
In your Streamlit app, initialize the custom callback handler and pass it to your LangChain chains:
def process_query(question: str, assistant: DatabaseAssistant) -> Dict[str, Any]:
try:
chain = assistant.process_query_chain()
return chain.stream(
{"question": question, "top_k": 10},
{"callbacks": [get_streamlit_cb(st.container())]}
)
except Exception as e:
st.error(f"Error processing query: {str(e)}")
return {"answer": "Sorry, I encountered an error processing your query.", "chart_data": None, "chart_type": None}
with st.chat_message("assistant"):
response = process_query(prompt, assistant)
if response is not None:
for chunk in response:
handle_stream_response(
chunk, st.session_state.messages)Copy code
By implementing custom event handlers, we can provide real-time status updates throughout the entire processing chain, enhancing user experience by informing users about each step.
Running the Application
Now let’s run the app 🥳. You can find the complete working example in my GitHub repository. To launch the application, navigate to the directory containing your main.py
file and execute the following command:
streamlit run main.py
Check out how the data and charts are generated based on data from the BigQuery dataset. The application connects to the BigQuery dataset, executes the generated SQL queries, and displays dynamic visualizations and maps accordingly. The custom event handlers provide real-time feedback in the Streamlit interface, enhancing the interactivity and user experience.
Check out the recording below:
Conclusion
This visualization solution streamlines the data analysis workflow, making it accessible even to users without extensive technical expertise. You can enhance this agent further by incorporating additional visualization types, integrating more data sources, or improving its natural language understanding capabilities. Enjoy exploring its possibilities!