How to implement RAG in Microsoft Fabric
In the previous posts we were starting our journey with Azure OpenAI and Microsoft Fabric for Data Engineering and Business Intelligence use cases. In this video we are kicking things up a notch or two as we are going to cover how to implement RAG (retrieval augmented architecture) in Microsoft Fabric.
This video is inspired by the following tutorial provided by Microsoft however, I had to modify a few things to make sure that the code could actually run in my Fabric environment. The modified notebook can be found below.
This cell is required to make sure that the proper version of SynapseML is installed. The version that comes with Fabric by default - does not work, at least not right now.
%%configure -f { "name": "synapseml", "conf": { "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:1.0.2-15-ce5cc5c0-SNAPSHOT", "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,com.fasterxml.jackson.core:jackson-databind", "spark.yarn.user.classpath.first": "true", "spark.sql.parquet.enableVectorizedReader": "false" } }
This cell initializes all the variables that we need to connect to all the required Azure services: OpenAI, Document Intelligence, AI Search
from synapse.ml.core.platform import find_secret doc_intel_key = find_secret(secret_name="doc-intel-key", keyvault="YOUR-keys") doc_intel_services_location = "eastus" openai_key = find_secret(secret_name="openaikey", keyvault="YOUR-keys") openai_service_name = "YOURS" openai_endpoint = "https://YOURS.openai.azure.com/" openai_deployment_for_embeddings = "text-embedding-ada-002" openai_deployment_for_query = "gpt-35-turbo" aisearch_name = "YOURSEARCH" aisearch_index_name = "YOURSEARCH-idx"#"YOURSEARCH-index" aisearch_key = find_secret(secret_name="YOURSEARCH-key", keyvault="YOUR-keys")
from pyspark.sql.functions import udf from pyspark.sql.types import StringType document_path = "abfss://Fabric_Dev@onelake.dfs.fabric.microsoft.com/lh_fabric_demo.Lakehouse/Files/ph/" # path to your documents folder df = spark.read.format("binaryFile").load(document_path).limit(10).cache()
This cell reads the binary contents from the PDF and converts it to text and to paragraphs
from synapse.ml.services.form import AnalyzeDocument from pyspark.sql.functions import col analyze_document = ( AnalyzeDocument() .setPrebuiltModelId("prebuilt-layout") .setSubscriptionKey(doc_intel_key) .setLocation(doc_intel_services_location) .setImageBytesCol("content") .setOutputCol("result") .setPages( "1-15" ) # Here we are reading the first 15 pages of the documents for demo purposes ) analyzed_df = ( analyze_document.transform(df) .withColumn("output_content", col("result.analyzeResult.content")) .withColumn("paragraphs", col("result.analyzeResult.paragraphs")) ).cache()
This cell takes the content of the documents and breaks it into chunks
from synapse.ml.featurize.text import PageSplitter ps = ( PageSplitter() .setInputCol("output_content") .setMaximumPageLength(4000) .setMinimumPageLength(3000) .setOutputCol("chunks") ) splitted_df = ps.transform(analyzed_df) display(splitted_df)
# Each column contains many chunks for the same document as a vector. # Explode will distribute and replicate the content of a vecor across multple rows from pyspark.sql.functions import explode, col exploded_df = splitted_df.select("path", explode(col("chunks")).alias("chunk")).select( "path", "chunk" ) display(exploded_df)
Now we can take our chunks and convert them into embeddings (or a set of numbers that correspond to our words)
from synapse.ml.services.openai import OpenAIEmbedding embedding = ( OpenAIEmbedding() .setSubscriptionKey(openai_key) .setDeploymentName(openai_deployment_for_embeddings) .setCustomServiceName(openai_service_name) .setTextCol("chunk") .setErrorCol("error") .setOutputCol("embeddings") ) df_embeddings = embedding.transform(exploded_df) display(df_embeddings)
from pyspark.sql.functions import monotonically_increasing_id from pyspark.sql.functions import lit df_embeddings = ( df_embeddings.drop("error") .withColumn( "idx", monotonically_increasing_id().cast("string") ) # create index ID for ACS .withColumn("searchAction", lit("upload")) )
Now we are writing our embeddings into a vector database in AI Search
#this cell creates a new index, from synapse.ml.services import writeToAzureSearch import json df_embeddings.writeToAzureSearch( subscriptionKey=aisearch_key, actionCol="searchAction", serviceName=aisearch_name, indexName=aisearch_index_name, keyCol="idx", vectorCols=json.dumps([{"name": "embeddings", "dimension": 1536}]), )
We have done the heavy lifting of reading our documents and storing them in a vector database, now we need to write a few functions to allow us to convert a question into embeddings and then look up relevant information in the vector database and return the results.
import requests # Ask a question and convert to embeddings def gen_question_embedding(user_question): # Convert question to embedding using synapseML from synapse.ml.services.openai import OpenAIEmbedding df_ques = spark.createDataFrame([(user_question, 1)], ["questions", "dummy"]) embedding = ( OpenAIEmbedding() .setSubscriptionKey(openai_key) .setDeploymentName(openai_deployment_for_embeddings) .setCustomServiceName(openai_service_name) .setTextCol("questions") .setErrorCol("errorQ") .setOutputCol("embeddings") ) df_ques_embeddings = embedding.transform(df_ques) row = df_ques_embeddings.collect()[0] question_embedding = row.embeddings.tolist() return question_embedding def retrieve_k_chunk(k, question_embedding): # Retrieve the top K entries url = f"https://{aisearch_name}.search.windows.net/indexes/{aisearch_index_name}/docs/search?api-version=2023-07-01-Preview" payload = json.dumps( {"vector": {"value": question_embedding, "fields": "embeddings", "k": k}} ) headers = { "Content-Type": "application/json", "api-key": aisearch_key, } response = requests.request("POST", url, headers=headers, data=payload) output = json.loads(response.text) print(response.status_code) return output # Generate embeddings for the question and retrieve the top k document chunks question_embedding = gen_question_embedding(user_question) output = retrieve_k_chunk(retrieve_k, question_embedding) # Concatenate the content of retrieved documents context = [i["chunk"] for i in output["value"]]
Now everything is coming together, GetContext function takes a question and returns the context back to the LLM to create an answer. Get_Response function takes the user question, looks up the context in AI search, then uses a prompt to give us the answer that we seek.
from synapse.ml.services.openai import OpenAIChatCompletion from pyspark.sql import Row from pyspark.sql.types import * def get_context(user_question, retrieved_k = 5): # Generate embeddings for the question question_embedding = gen_question_embedding(user_question) # Retrieve the top K entries output = retrieve_k_chunk(retrieved_k, question_embedding) # concatenate the content of the retrieved documents context = [i["chunk"] for i in output["value"]] return context def get_response(user_question): context = get_context(user_question) # Write a prompt with context and user_question as variables prompt = f""" context: {context} Answer the question based on the context above. If the information to answer the question is not present in the given context then reply "I don't know". """ chat_df = spark.createDataFrame( [ ( [ make_message( "system", prompt ), make_message("user", user_question), ], ), ] ).toDF("messages") chat_completion = ( OpenAIChatCompletion() .setSubscriptionKey(openai_key) .setDeploymentName(openai_deployment_for_query) .setCustomServiceName(openai_service_name) .setMessagesCol("messages") .setErrorCol("error") .setOutputCol("chat_completions") ) result_df = chat_completion.transform(chat_df).select("chat_completions.choices.message.content") result = [] for row in result_df.collect(): content_string = ' '.join(row['content']) result.append(content_string) # Join the list into a single string result = ' '.join(result) return result def make_message(role, content): return Row(role=role, content=content, name=role) response = get_response("What is an employee campaign?")