Automating Business Insights using GenAI

Faisal KK
8 min readNov 8, 2024

--

LLM models are advancing their capabilities day by day. Better models, new versions of existing models with improved capabilities, and models to handle very specific use-cases are getting released daily.

As we all know, these models are are no way perfect now but are getting better every day. Our approach now should be to identify some use-cases which is handled fairly good at this moment and has potential to have a greater impact on business as models mature more to have true reasoning capabilities. Automating business insights is one of them and here is how we are put things together.

Solution Overview

This solution uses AWS services and Open-source tools hosted in AWS. No data has to move outside the VPC. User interacts using Slack and it is integrated via API Gateway and requests are served via Lambda. Once the the Lambda received the question, it will retrieve more context form the vector store(Open0search indexes) and then forwarded the question and context to LLM to generate the SQL. This SQL then tested for any syntax errors if if any, again ask LLM to correct it. Once a correct SQL is generated the query ran on Data Lake and the result is returned to Slack.

Slack Integration

Slack App integration is straight forward. Not explaining in detail here. You can refer this blog and associated CloudFormation Template for that

If you face any difficulty in slack integration, add in comments, will try to help

Lambda

Lambda is responsible for serving the user requests by co-ordinating other components.High level flow in Lambda will be :

  1. Get the input question
  2. Enrich the questions with more context. This context can be from metadata or from the user who is requesting(for example if you can get department and designation of the user from Slack userID that can be used.). Metadata context is retrived from Vector store.
  3. Send the question with the context with proper XML tags to LLM hosted in Bedrock . Model has additional context available via knowledge base attached to it.
  4. Extract the SQL from generated and test it in Data Lake. If any syntax errors ask the model to correct it.
  5. Execute the SQL and result send back to Slack along with explanation

Some points to remember in Lambda Code:

  1. To avoid duplicate responses in Slack, ignore retry attempts from Slack
def handler(event, context):
"""
The main Lambda handler function.

Args:
event (dict): The event data from the Slack API.
context (dict): The Lambda context object.

Returns:
dict: The response dictionary based on the event type.
"""
# Respond to the Slack Challenge if presented, otherwise handle the Bedrock interaction
retry_no = event.get('headers',{}).get('x-slack-retry-num','0')
retry_reason = event.get('headers',{}).get('x-slack-retry-reason','unknown')

if int(retry_no) > 0 and retry_reason == 'http_timeout': #This is retry from Slack
print(f'Returning from from retry call, Call header is {event.get('headers',{})}')
return

event_body = json.loads(event.get("body"))
response = None
if event_body.get("type") == "url_verification":
response = handle_challenge(event)
else:
response = handle_message(event)

return response

2. While returning results data to Slack use CSV file , and don’t use Pandas library, simple CSV module will be enough

import boto3
import time
import uuid
import re
import csv

def execut_sql_to_csv(inp_sql):
print(f'Input SQL {inp_sql}')
res = execute_athen_query('db_name',inp_sql,res_return=True)
rows = res['ResultSet']['Rows']
#Get the headers
headers = rows[0]['Data']
headers = [header['VarCharValue'] for header in headers]

#Get the data
data = rows[1:]
print("Rows and Data :")
print(rows,data)
data = [[cell.get('VarCharValue','NULL') for cell in row['Data']] for row in data]

#random name
fid = f'ai_bot_out_{str(uuid.uuid4())[:8]}'
file_path = f'/tmp/{fid}.csv'
with open(file_path, mode='w', newline='') as file:
writer = csv.writer(file)
writer.writerow(headers) # Write headers
writer.writerows(data) # Write data

return {'file_path':file_path}


def execute_athen_query(db, query,res_return=True,ath_workgroup = 'primary'):
client = boto3.client('athena')
database = 'ds_prd_icon_final'
s3_outpt = 's3://bucket-athena/query_results/users/faisal/'
print(f"Running query {query}")
response = client.start_query_execution(
QueryString=query,
WorkGroup = ath_workgroup,
QueryExecutionContext={
'Database': db
},
ResultConfiguration={
'OutputLocation': s3_outpt,
},)

query_execution_id = response['QueryExecutionId']

# get execution status
while True:
query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
query_execution_status = query_status['QueryExecution']['Status']['State']

if query_execution_status == 'SUCCEEDED':
print("STATUS:" + query_execution_status)
if res_return:
results = client.get_query_results(QueryExecutionId=query_execution_id)
return results
break
if query_execution_status == 'FAILED':
print(query_status)
raise Exception("STATUS:" + query_execution_status)
time.sleep(

Next two components (Reading meta data from OpenMetadata and inserting to VectorStore) should be scheduled to run at regular intervals. Tools like Airflow will help on this

Open Metadata

Keeping the meta-data up-to date is the most challenging and crucial part in this solution to work well. OpenMetadata is an open-source all-in-one platform for data and team collaboration, data discovery, data lineage, data quality, observability, governance. Read more here.

  • Tool comes with a great UI and features for collaboration across the team to have an upto date meta data
  • All the meta data about tables and each columns are documented in the tool and keep up-to dated
  • The tool will check for data quality and other checks regularly

Openmetadata store the metadata of tables in back-end Postgres. We can query those and can be saved as JSON files. Thewn will upload those to vector store

import psycopg2
import os
import os
#Connect to OpenMatdata Backend-db
conn = psycopg2.connect(database="openmetadata_db",
host="metabase-poc.c",
user="openmetadata_user",
password=os.environ.get('Open_Metadata_db_password'), ##### get it from secrets entry Openmetadata_Credentials
port="5432")

cursor = conn.cursor()

def run_sql_query(query):
try:
# Connect to the PostgreSQL database
conn = psycopg2.connect(**db_config)
cursor = conn.cursor()

# Execute the SQL query
cursor.execute(query)

# Fetch all rows from the executed query
rows = cursor.fetchall()

# Get column names
colnames = [desc[0] for desc in cursor.description]

# Convert rows to list of dictionaries
result = [dict(zip(colnames, row)) for row in rows]

# Close the cursor and connection
cursor.close()
conn.close()

return result
except Exception as e:
print(f"Error: {e}")
return None

def save_json_to_file(data, filename):
with open(filename, 'w') as json_file:
json.dump(data, json_file, indent=4)

def modify_fullyqualifiedname(fqn):
# Modify the fullyqualifiedname to create a valid filename
parts = fqn.split('.')
modified_fqn = '_'.join(parts[2:])
return modified_fqn

def main():
# SQL query to run
query = """
SELECT "json" ->> 'description' AS description, fullyqualifiedname, "json"
FROM table_entity te
WHERE "json" ->> 'description' IS NOT NULL;
"""

# Run the SQL query and get the result
result = run_sql_query(query)

if result:
# Ensure the output directory exists
os.makedirs(output_directory, exist_ok=True)

# Iterate through the result and save each JSON object as a separate file
for row in result:
fqn = row['fullyqualifiedname']
json_data = row['json']

# Modify the fullyqualifiedname to create a valid filename
filename = modify_fullyqualifiedname(fqn)
filepath = os.path.join(output_directory, f'{filename}.json')

# Save the JSON data to a file
save_json_to_file(json_data, filepath)
print(f'Saved: {filepath}')

if __name__ == '__main__':
main()

Output JSON will look like :

{
"id": "4696ed05-56bf-4560-9a1c-0081bb530ba5",
"name": "tbl_name",
"columns": [
{
"name": "job_id",
"dataType": "BIGINT",
"constraint": "NULL",
"dataLength": 1,
"description": "Unique order id (record identifier)",
"dataTypeDisplay": "bigint",
"fullyQualifiedName": "db_srvice_name.default.db_name.tbl_name.job_id"
},
{
"name": "customer_id",
"dataType": "BIGINT",
"constraint": "NULL",
"dataLength": 1,
"dataTypeDisplay": "bigint",
"fullyQualifiedName": "db_srvice_name.default.db_name.tbl_name.customer_id"
}
],
"deleted": false,
"version": 8.0,
"database": {
"id": "31d722ee-b734-440d-a522-58eca270b8ef",
"name": "default",
"type": "database",
"deleted": false,
"fullyQualifiedName": "db_srvice_name.default"
},
"followers": [],
"tableType": "External",
"updatedAt": 1686653944463,
"updatedBy": "user1",
"description": "Transactional data at an order level defining all the data points related to an order ",
"serviceType": "Athena",
"databaseSchema": {
"id": "ca2c0e6b-c327-4f36-925c-d03c5596fe2b",
"name": "db_name",
"type": "databaseSchema",
"deleted": false,
"fullyQualifiedName": "db_srvice_name.default.db_name"
},
"fullyQualifiedName": "db_srvice_name.default.db_name.tbl_name"
}

Opensearch as vector store

First step is to store above JSON files in VectorStore. Amazon OpenSearch offers three vector engines to choose from, each catering to different use cases. Facebook AI Similarity Search (Faiss) is a library for efficient similarity search and clustering of dense vectors. This code bases used FAISS for similiarity search.

Code for this is copied from AWS example repo here

#All the submodlues are available in the repo above given.
from llm_basemodel import LanguageModel
from boto_client import Clientmodules
from langchain_community.vectorstores import OpenSearchVectorSearch
from langchain_community.document_loaders import JSONLoader
from openSearchVCEmbedding import EmbeddingBedrockOpenSearch


print('main() executed')
index_name1 = 'bedrock-knowledge-base-default-index'
domain = 'https://OPENSEARCH.us-east-1.aoss.amazonaws.com'
vector_field = 'bedrock-knowledge-base-default-vector'
fieldname = 'id'



ebropen = EmbeddingBedrockOpenSearch (domain, vector_field, fieldname)

#1 Add document to index :
json_file_name = "Json files generated from Openmetadat in last step"
ebropen.add_documnets(index_name1,json_file_name) #loop through it to upload all json files


#Test to retreive data from vector store
ebropen.check_if_index_exists(index_name=index_name1, region='us-east-1',host=domain,http_auth=awsauth )
vcindxdoc=ebropen.getDocumentfromIndex(index_name=index_name1)
user_query='User quert text'
document=ebropen.getSimilaritySearch(user_query,vcindex = vcindxdoc )
result = ebropen.get_data(document)

print(result)

Above two steps (Reading meta data from OpenMetadata and inserting to VectorStore) should be scheduled to run at regular intervals. Tools like Airflow will help on this

Calling Bedrock LLMs

LLM prompts will have the out-put from vector store and the user query

(Code is to understand the flow and quick testing. Not production ready)

#All the submodlues are available in the repo above given.
from llm_basemodel import LanguageModel
from boto_client import Clientmodules
from langchain_community.vectorstores import OpenSearchVectorSearch
from langchain_community.document_loaders import JSONLoader
from openSearchVCEmbedding import EmbeddingBedrockOpenSearch
import re

# Params
index_name1 = 'bedrock-knowledge-base-default-index'
domain = 'https://OPENSEARCH.us-east-1.aoss.amazonaws.com'
vector_field = 'bedrock-knowledge-base-default-vector'
fieldname = 'id'


ebropen = EmbeddingBedrockOpenSearch (domain, vector_field, fieldname)
ebropen.check_if_index_exists(index_name=index_name1, region='us-east-1',host=domain,http_auth=awsauth )
vcindxdoc=ebropen.getDocumentfromIndex(index_name=index_name1)
user_query='User quert text'
document=ebropen.getSimilaritySearch(user_query,vcindex = vcindxdoc )
vect_out = ebropen.get_data(document)

final_question = "\n\nHuman:" + vect_out + user_query+ "n\nAssistant:"

#Sending this question to LLM and getting output :
bedrock_client = Clientmodules.createBedrockRuntimeClient()
language_model = LanguageModel(bedrock_client)
llm = language_model.llm

generated_out = llm.predict(final_question)
# This will return the SQL result and explanation.
# To extract SQL from it use Regex (assuming sql is enclosed in ```)

sql_pattern = re.compile(r'```sql(.*?)```', re.DOTALL)
match = sql_pattern.search(text)
sql = match.group(1).strip()

Testing SQLs

To check generated SQL syntax is correct , EXPLAIN command can be used. No need to run actual SQL. See code below. If you get an error again send back the SQL to LLM and with error. Another model can also be tried for correcting the SQL. Hopefully LLM will return a syntactically correct SQL within couple of iteration and then next is to run the SQL on Athena and return result to Slack

import boto3
import time

def check_athena_query_syntax(athena_client, query_string, database, output_location):
# Construct the EXPLAIN query
explain_query = f"EXPLAIN {query_string}"

try:
# Start the query execution
response = athena_client.start_query_execution(
QueryString=explain_query,
QueryExecutionContext={'Database': database},
ResultConfiguration={'OutputLocation': output_location}
)

# Get the query execution ID
query_execution_id = response['QueryExecutionId']

# Wait for the query to complete
while True:
query_status = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
status = query_status['QueryExecution']['Status']['State']

if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
break
time.sleep(1)

# Check the query status
if status == 'SUCCEEDED':
return "Success"
else:
error_message = query_status['QueryExecution']['Status']['StateChangeReason']
return f"Error: {error_message}"

except Exception as e:
return f"Exception: {str(e)}"

Returning Results

As discussed in Lambda section, run the SQL and store result as CSV. To send result and explanation to Slack, store them as text or scv files and then send as files. It will be expanded by Slack for quick read.

from slack_sdk.errors import SlackApiError
from slack_sdk.webhook import WebhookClient

def postFileInSlack(user_id,file_path,msg,slack_token):
slackClient = WebClient(token=slack_token)
try:
# Call the files.upload method using the WebClient
# Uploading files requires the `files:write` scope
result = slackClient.files_upload_v2(
channels=user_id,
title=msg,
file=file_path,
)
# Log the result
print(result)

except SlackApiError as e:
print("Error uploading file: {}".format(e))

Future Plans

To prepare to use the power of any LLM with true reasoning abilities, one thing we can do is to have a proper semantic layer of data representing the actual business entities and how those interacts. Keeping an accurate metadata is the first step towards it. Once that is ready it is easy to trial run any LLM to see if it can actually understand the business and getting right answers(correct SQLs)

--

--

No responses yet