Revolutionizing Retail Product Recommendations with Databricks LLMOps and Lakehouse AI
In the fiercely competitive retail industry, personalized product recommendations have become a cornerstone of customer engagement and revenue generation. Traditional recommendation systems often struggle to capture nuanced customer preferences or respond dynamically to new trends. Enter Databricks’ LLMOps (Large Language Model Operations), leveraging the power of Lakehouse AI, to revolutionize how retailers approach this challenge.
The Retail Business Case: Product Recommendations
Retailers aim to deliver hyper-personalized shopping experiences that resonate with individual customer preferences. Effective product recommendations can:
- Increase Sales: Personalized suggestions drive higher conversion rates and average order values.
- Enhance Customer Retention: Relevant recommendations foster customer loyalty by improving the overall shopping experience.
- Streamline Operations: Automating product recommendations reduces manual intervention and operational costs.
Traditional systems, while effective to some extent, often lack the agility to incorporate real-time data and adapt to evolving customer behaviors. This is where Databricks’ AI-driven solutions shine.
Databricks Lakehouse AI: The Foundation of Innovation
Lakehouse AI integrates the power of a data lake with the structured capabilities of a warehouse, creating a unified platform for analytics and AI. For product recommendations in retail, this architecture enables:
- Unified Data Management: Seamless integration of transactional, behavioral, and third-party data.
- Real-Time Insights: Processing large volumes of data in near real-time to reflect the latest customer preferences.
- Scalability: Handling massive datasets with ease, a critical capability for retail giants.
This foundation supports advanced tools like the Databricks AI Gateway and Evaluation Agent to operationalize and optimize AI models effectively.
Mosaic AI Gateway: Seamless LLM Integration
The Mosaic AI Gateway acts as a bridge between enterprise data and large language models (LLMs). Retailers can:
- Centralize Model Management: Deploy and manage various LLMs from a single interface.
- Ensure Compliance: Monitor data usage to align with regulatory and organizational policies.
- Enable Customization: Fine-tune pre-trained models to align with specific retail needs, such as recommending complementary products or highlighting trending items.
By simplifying integration and providing robust governance, the AI Gateway accelerates time-to-value for LLM-driven solutions.
Mosaic AI Evaluation Agent: Measuring and Improving Model Performance
The Mosaic AI Evaluation Agent, currently in preview, is a groundbreaking tool for assessing and refining LLM outputs. It employs both automated metrics and human-in-the-loop processes to:
- Assess Quality: Metrics like coherence, relevance, and accuracy help evaluate recommendation efficacy.
- Enable Iterative Improvement: Continuous feedback loops refine model performance over time.
- Reduce Bias: Address potential biases in recommendations, ensuring fairness and inclusivity.
The AI Evaluation Agent also integrates seamlessly with Lakehouse AI, allowing retailers to monitor model performance in real-world scenarios and make adjustments as needed.
Real-World Impact on Retail
By deploying Databricks’ LLMOps with tools like the AI Gateway and Evaluation Agent, retailers can unlock transformative business outcomes:
- Enhanced Customer Engagement: Tailored recommendations delight customers, driving higher satisfaction.
- Increased Revenue: Data-driven insights lead to upselling and cross-selling opportunities.
- Operational Efficiency: Automated workflows reduce manual effort, freeing up resources for strategic initiatives.
- Future-Proofing: Retailers gain the flexibility to adapt to new trends and customer demands in real-time.
Expanded Business Value
The business value of adopting Databricks’ solutions extends beyond immediate operational benefits. It provides:
- Strategic Decision-Making: With access to unified and accurate data, retail leaders can make informed decisions about inventory, marketing strategies, and customer engagement.
- Customer-Centric Innovation: Understanding customer preferences at a granular level empowers retailers to introduce new products and services that directly address unmet needs.
- Sustainability Gains: AI-driven optimization can reduce waste by predicting demand more accurately, aligning inventory with actual customer needs.
- Competitive Advantage: Early adoption of cutting-edge AI technologies positions retailers as industry leaders, building brand reputation and trust.
Moreover, by creating a dynamic recommendation ecosystem, businesses can continuously improve their offerings and respond to market shifts swiftly, ensuring long-term profitability and relevance.
We explore how to build and deploy a Generative AI-powered product recommendation system using Databricks Lakehouse AI, AI Gateway, and Agent Evaluation Framework.
1. Setting Up Databricks AI Gateway for Azure OpenAI
Why Do This?
- AI Gateway simplifies secure access to various LLM providers, such as Azure OpenAI, via a centralized interface.
- It eliminates the need to manage individual API keys for each provider.
- Provides a consistent interface for querying models, ensuring scalability and compliance.
Step 1: Set Up Azure OpenAI
- Create an Azure OpenAI resource to use GPT-4o-mini.
- Deploy GPT-4o-mini with a deployment name like
gpt4o-mini-deployment
. - Generate an API key, which will be used to connect Databricks with Azure OpenAI.
Utility: This sets up the connection to Azure OpenAI, allowing the Generative AI model to process natural language queries.
Step 2: Configure Databricks AI Gateway
- Create a new Gateway in Databricks and select Azure OpenAI as the provider.
- Provide the Azure OpenAI API key, deployment name, and endpoint.
- Test the configuration by sending a sample query to ensure everything is working.
Testing it :
Utility:
- The Gateway abstracts the connection to Azure OpenAI, enabling secure and scalable interaction with GPT-4o-mini.
- Provides unified access to multiple LLMs, which is useful if switching or adding providers.
2. Generating and Preprocessing Product Data
Why Do This?
- A product recommendation system needs a database of products to recommend.
- This synthetic dataset simulates product names, categories, and descriptions to test the system’s functionality.
import pandas as pd
import random
# Generate synthetic product data
categories = ["Shoes", "Shirts", "Pants", "Accessories", "Outdoor"]
descriptions = [
"Lightweight and durable for all activities",
"Comfortable and stylish for everyday wear",
"Breathable and perfect for workouts",
"Fashionable and professional for office use",
"Ideal for adventures and outdoor activities"
]
# Create a DataFrame
data = []
for i in range(1, 101): # Generate 100 products
category = random.choice(categories)
description = random.choice(descriptions)
product_name = f"{category} - {random.randint(1000, 9999)}"
data.append({"product_id": i, "name": product_name, "description": f"{description} ({category})"})
product_df = pd.DataFrame(data)
Let’s save the data into the Azure Storage Data Lake:
# Save to CSV in the external location
output_path = "abfss://landing@blogmedium0026.dfs.core.windows.net/raw_products.csv"
# Convert to Spark DataFrame
spark_df = spark.createDataFrame(product_df)
# Write as CSV to ADLS
spark_df.write.format("csv").mode("overwrite").option("header", True).save(output_path)
# Verify the file has been written
display(dbutils.fs.ls("abfss://landing@blogmedium0026.dfs.core.windows.net/"))
input_path = "abfss://landing@blogmedium0026.dfs.core.windows.net/raw_products.csv"
product_df = spark.read.csv(input_path, header=True, inferSchema=True)
delta_output_path = "abfss://retail@blogmedium0026.dfs.core.windows.net/delta/product_catalog"
product_df.write.format("delta").mode("overwrite").save(delta_output_path)
Utility:
- Simulates real-world product data, which is essential for testing recommendation models.
- Includes diversity in categories and descriptions to reflect typical user queries.
- Storing the data in Azure Data Lake Storage (ADLS) ensures it is accessible across all Databricks workflows.
- ADLS integration with Databricks provides scalability and reliability for handling large datasets.
3. Compute Embeddings
- Use a pretrained model like
all-MiniLM-L6-v2
to compute embeddings. - Convert descriptions into embeddings
- Store in Delta Table
# Path to Delta table
delta_catalog_path = "abfss://retail@blogmedium0026.dfs.core.windows.net/delta/product_catalog"
# Load Delta Table
product_df = spark.read.format("delta").load(delta_catalog_path)
display(product_df)
from sentence_transformers import SentenceTransformer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import ArrayType, FloatType
# Load the pre-trained sentence transformer model
model = SentenceTransformer('all-MiniLM-L6-v2')
# Define a UDF to compute embeddings
@udf(ArrayType(FloatType()))
def compute_embedding(text):
return model.encode(text).tolist()
# Apply the UDF to generate embeddings for product descriptions
product_df_with_embeddings = product_df.withColumn("embedding", compute_embedding(col("description")))
display(product_df_with_embeddings)
# Path to save Delta table with embeddings
embedding_output_path = "abfss://retail@blogmedium0026.dfs.core.windows.net/delta/product_embeddings"
# Save as Delta Table
product_df_with_embeddings.write.format("delta").mode("overwrite").save(embedding_output_path)
4. Product Recommendations
Here’s an expanded blog post with detailed explanations of each step, including their purpose and importance in the overall workflow:
- Databricks AI Gateway for securely accessing Azure OpenAI’s GPT-4o-mini.
- Agent Evaluation Framework for systematic evaluation.
- Model Serving for real-time recommendations.
Building the Recommendation Model
Why Do This?
- The recommendation model forms the core of the system, translating user queries into actionable recommendations.
- GPT-4o-mini processes natural language queries and generates product recommendations.
Evaluating the Model
Why Do This?
- Evaluation ensures the model performs well by comparing its predictions to expected results.
- The Databricks Agent Evaluation Framework provides structured, repeatable testing for Generative AI models.
Serving the Model
Why Do This?
- Serving the model allows it to process real-time user queries.
- Databricks Model Serving ensures scalable, low-latency inference.
Utility:
- Deploys the recommendation model as a serving endpoint for real-time use cases.
- Stores evaluation metrics in MLflow for tracking and comparison across model versions.
- Ensures continuous monitoring and improvement.
- Automates the evaluation of Generative AI models.
- Provides metrics like exact match accuracy to gauge performance against expected responses.
- MLflow logging ensures the model is versioned and ready for deployment.
Results :
- Evaluation using databricks agent :
Amazing thing that we can check details of each of our evaluation request :
Metrics :
Model Registred in Unity Catalog :
Notebook :
%pip install databricks-agents mlflow
dbutils.library.restartPython()
import numpy as np
import pandas as pd
# Define a helper function for cosine similarity
def cosine_similarity(vec1, vec2):
vec1 = np.array(vec1)
vec2 = np.array(vec2)
dot_product = np.dot(vec1, vec2)
norm_vec1 = np.linalg.norm(vec1)
norm_vec2 = np.linalg.norm(vec2)
return dot_product / (norm_vec1 * norm_vec2) if norm_vec1 > 0 and norm_vec2 > 0 else 0.0
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import ChatMessage, ChatMessageRole
import mlflow
class GatewayGenAIRecommendationPipeline(mlflow.pyfunc.PythonModel):
def __init__(self, gateway_endpoint, embedding_table):
self.gateway_endpoint = gateway_endpoint
self.embedding_table = embedding_table # Pandas DataFrame with embeddings
def rank_products_by_embeddings(self, query_embedding):
# Compute similarity for each product embedding
self.embedding_table["similarity"] = self.embedding_table["embedding"].apply(
lambda emb: cosine_similarity(query_embedding, emb)
)
# Sort products by similarity and return top N
return self.embedding_table.sort_values(by="similarity", ascending=False).head(10)
def recommend_products(self, query, top_products):
"""
Use GenAI to generate product recommendations.
:param query: User query string
:param top_products: List of top-ranked product names
:return: List of recommended products
"""
# Combine query with top product names
product_context = ", ".join(top_products)
enriched_query = f"Here are some products: {product_context}. Based on this, what would you recommend for '{query}'?"
# Query the AI Gateway
w = WorkspaceClient()
response = w.serving_endpoints.query(
name=self.gateway_endpoint,
messages=[
ChatMessage(
role=ChatMessageRole.SYSTEM,
content="You are a product recommendation assistant. Respond only with product names separated by commas."
),
ChatMessage(
role=ChatMessageRole.USER,
content=enriched_query
),
],
max_tokens=128,
)
recommendations = response.choices[0].message.content
return recommendations.split(",") # Split recommendations into a list
def predict(self, context, model_input):
# Extract query
query = model_input["query"]
# Generate query embedding
query_embedding = model.encode(query).tolist()
# Rank products based on embeddings
top_products = self.rank_products_by_embeddings(query_embedding)["name"].tolist()
# Use GenAI for final recommendations
return self.recommend_products(query, top_products)
evaluation_df = pd.DataFrame({
"query": ["What are the best running shoes?", "What are good office shoes?"],
"ground_truth": [["Shoes - 2909"," Shoes - 4481"," Shoes - 1883"," Shoes - 8293"," Shoes - 3457"," Shoes - 2526"], ["Shoes - 7373"," Shoes - 8050"," Shoes - 9423"," Shoes - 4540"," Shoes - 7469"]],
"predicted": [[]] * 2
})
eval_set = pd.DataFrame([
{
"request_id": "request-1",
"request": { "query" : "What are the best shoes for running?"},
"expected_response": ["Shoes - 2909"," Shoes - 4481"," Shoes - 1883"," Shoes - 8293"," Shoes - 3457"," Shoes - 2526"],
},
{
"request_id": "request-2",
"request": { "query" : "What are good shoes for office wear?"},
"expected_response": ["Shoes - 7373"," Shoes - 8050"," Shoes - 9423"," Shoes - 4540"," Shoes - 7469"],
},
])
# Load Delta table with embeddings
embedding_df = spark.read.format("delta").load(
"abfss://retail@blogmedium0026.dfs.core.windows.net/delta/product_embeddings"
)
# Convert to Pandas
embedding_table = embedding_df.toPandas()
from sentence_transformers import SentenceTransformer
import mlflow
import os
from mlflow.models.signature import infer_signature
# Example input data (replace with your actual input data)
example_input = pd.DataFrame({"query": ["What are the best shoes for running?"]})
# Example output data (replace with your model's output)
example_output = pd.DataFrame({"recommendations": [["Shoes - 123", "Shoes - 456"]]})
# Infer the signature
signature = infer_signature(example_input, example_output)
model = SentenceTransformer('all-MiniLM-L6-v2')
embedding_df = spark.read.format("delta").load("abfss://retail@blogmedium0026.dfs.core.windows.net/delta/product_embeddings").toPandas()
display(embedding_df)
with mlflow.start_run() as run:
# Log parameters
mlflow.log_param("model_type", "Generative AI")
mlflow.log_param("llm_provider", "OpenAI via AI Gateway")
# Instantiate and log the pipeline
recommendation_pipeline = GatewayGenAIRecommendationPipeline(
gateway_endpoint="openai_recommendation",
embedding_table=embedding_df
)
logged_model_info = mlflow.pyfunc.log_model(
artifact_path="genai_with_embeddings_pipeline",
python_model=recommendation_pipeline,
signature=signature
)
mlflow.evaluate(data=eval_set, model=logged_model_info.model_uri,
model_type="databricks-agent")
run_id = run.info.run_id
print(f"Run ID: {run_id}")
model_uri = f"runs:/{run_id}/genai_with_embeddings_pipeline"
loaded_model = mlflow.pyfunc.load_model(model_uri)
recommendations = []
for index, row in evaluation_df.iterrows():
query = row["query"]
# Generate predictions using the model
predicted = loaded_model.predict(pd.DataFrame({"query": [query]}))
# Add predictions to the DataFrame
evaluation_df.at[index, "predicted"] = predicted
os.environ['MODEL'] = model_uri
mlflow.register_model(model_uri, "dev_catalog.model_product_rec.product_recommendation")
import mlflow
from mlflow.deployments import get_deploy_client
mlflow.set_registry_uri("databricks-uc")
client = get_deploy_client("databricks")
endpoint = client.create_endpoint(
name=f"product_recommendation",
config={
"served_entities": [
{
"entity_name": "dev_catalog.model_product_rec.product_recommendation",
"entity_version": "1",
"workload_size": "Small",
"scale_to_zero_enabled": True
}
],
"traffic_config": {
"routes": [
{
"served_model_name": f"product_recommendation-1",
"traffic_percentage": 100
}
]
}
}
)
Conclusion
Databricks’ innovative approach to LLMOps, underpinned by the Lakehouse AI platform, empowers retailers to reimagine their product recommendation strategies. By integrating advanced tools like the AI Gateway and Evaluation Agent, businesses can harness the full potential of generative AI to deliver exceptional value to their customers.
The future of retail lies in data-driven personalization, and with Databricks leading the charge, the possibilities are limitless.