How to implement RAG in Microsoft Fabric

In the previous posts we were starting our journey with Azure OpenAI and Microsoft Fabric for Data Engineering and Business Intelligence use cases. In this video we are kicking things up a notch or two as we are going to cover how to implement RAG (retrieval augmented architecture) in Microsoft Fabric.

This video is inspired by the following tutorial provided by Microsoft however, I had to modify a few things to make sure that the code could actually run in my Fabric environment. The modified notebook can be found below.

This cell is required to make sure that the proper version of SynapseML is installed. The version that comes with Fabric by default - does not work, at least not right now.

%%configure -f
{
  "name": "synapseml",
  "conf": {
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:1.0.2-15-ce5cc5c0-SNAPSHOT",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,com.fasterxml.jackson.core:jackson-databind",
      "spark.yarn.user.classpath.first": "true",
      "spark.sql.parquet.enableVectorizedReader": "false"
  }
}

This cell initializes all the variables that we need to connect to all the required Azure services: OpenAI, Document Intelligence, AI Search

from synapse.ml.core.platform import find_secret

doc_intel_key = find_secret(secret_name="doc-intel-key", keyvault="YOUR-keys")
doc_intel_services_location = "eastus"

openai_key = find_secret(secret_name="openaikey", keyvault="YOUR-keys")
openai_service_name = "YOURS"
openai_endpoint = "https://YOURS.openai.azure.com/"
openai_deployment_for_embeddings = "text-embedding-ada-002"
openai_deployment_for_query = "gpt-35-turbo"

aisearch_name = "YOURSEARCH"
aisearch_index_name = "YOURSEARCH-idx"#"YOURSEARCH-index"
aisearch_key = find_secret(secret_name="YOURSEARCH-key", keyvault="YOUR-keys")
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

document_path = "abfss://Fabric_Dev@onelake.dfs.fabric.microsoft.com/lh_fabric_demo.Lakehouse/Files/ph/"  # path to your documents folder
df = spark.read.format("binaryFile").load(document_path).limit(10).cache()

This cell reads the binary contents from the PDF and converts it to text and to paragraphs

from synapse.ml.services.form import AnalyzeDocument
from pyspark.sql.functions import col

analyze_document = (
    AnalyzeDocument()
    .setPrebuiltModelId("prebuilt-layout")
    .setSubscriptionKey(doc_intel_key)
    .setLocation(doc_intel_services_location)
    .setImageBytesCol("content")
    .setOutputCol("result")
    .setPages(
        "1-15"
    )  # Here we are reading the first 15 pages of the documents for demo purposes
)

analyzed_df = (
    analyze_document.transform(df)
    .withColumn("output_content", col("result.analyzeResult.content"))
    .withColumn("paragraphs", col("result.analyzeResult.paragraphs"))
).cache()

This cell takes the content of the documents and breaks it into chunks

from synapse.ml.featurize.text import PageSplitter

ps = (
    PageSplitter()
    .setInputCol("output_content")
    .setMaximumPageLength(4000)
    .setMinimumPageLength(3000)
    .setOutputCol("chunks")
)

splitted_df = ps.transform(analyzed_df)
display(splitted_df)
# Each column contains many chunks for the same document as a vector.
# Explode will distribute and replicate the content of a vecor across multple rows
from pyspark.sql.functions import explode, col

exploded_df = splitted_df.select("path", explode(col("chunks")).alias("chunk")).select(
    "path", "chunk"
)
display(exploded_df)

Now we can take our chunks and convert them into embeddings (or a set of numbers that correspond to our words)

from synapse.ml.services.openai import OpenAIEmbedding

embedding = (
    OpenAIEmbedding()
    .setSubscriptionKey(openai_key)
    .setDeploymentName(openai_deployment_for_embeddings)
    .setCustomServiceName(openai_service_name)
    .setTextCol("chunk")
    .setErrorCol("error")
    .setOutputCol("embeddings")
)

df_embeddings = embedding.transform(exploded_df)

display(df_embeddings)
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import lit

df_embeddings = (
    df_embeddings.drop("error")
    .withColumn(
        "idx", monotonically_increasing_id().cast("string")
    )  # create index ID for ACS
    .withColumn("searchAction", lit("upload"))
)

Now we are writing our embeddings into a vector database in AI Search

#this cell creates a new index, 
from synapse.ml.services import writeToAzureSearch
import json

df_embeddings.writeToAzureSearch(
    subscriptionKey=aisearch_key,
    actionCol="searchAction",
    serviceName=aisearch_name,
    indexName=aisearch_index_name,
    keyCol="idx",
    vectorCols=json.dumps([{"name": "embeddings", "dimension": 1536}]),
)

We have done the heavy lifting of reading our documents and storing them in a vector database, now we need to write a few functions to allow us to convert a question into embeddings and then look up relevant information in the vector database and return the results.

import requests

# Ask a question and convert to embeddings


def gen_question_embedding(user_question):
    # Convert question to embedding using synapseML
    from synapse.ml.services.openai import OpenAIEmbedding

    df_ques = spark.createDataFrame([(user_question, 1)], ["questions", "dummy"])
    embedding = (
        OpenAIEmbedding()
        .setSubscriptionKey(openai_key)
        .setDeploymentName(openai_deployment_for_embeddings)
        .setCustomServiceName(openai_service_name)
        .setTextCol("questions")
        .setErrorCol("errorQ")
        .setOutputCol("embeddings")
    )
    df_ques_embeddings = embedding.transform(df_ques)
    row = df_ques_embeddings.collect()[0]
    question_embedding = row.embeddings.tolist()
    return question_embedding


def retrieve_k_chunk(k, question_embedding):
    # Retrieve the top K entries
    url = f"https://{aisearch_name}.search.windows.net/indexes/{aisearch_index_name}/docs/search?api-version=2023-07-01-Preview"

    payload = json.dumps(
        {"vector": {"value": question_embedding, "fields": "embeddings", "k": k}}
    )
    headers = {
        "Content-Type": "application/json",
        "api-key": aisearch_key,
    }

    response = requests.request("POST", url, headers=headers, data=payload)
    output = json.loads(response.text)
    print(response.status_code)
    return output


# Generate embeddings for the question and retrieve the top k document chunks
question_embedding = gen_question_embedding(user_question)
output = retrieve_k_chunk(retrieve_k, question_embedding)

# Concatenate the content of retrieved documents
context = [i["chunk"] for i in output["value"]]

Now everything is coming together, GetContext function takes a question and returns the context back to the LLM to create an answer. Get_Response function takes the user question, looks up the context in AI search, then uses a prompt to give us the answer that we seek.

from synapse.ml.services.openai import OpenAIChatCompletion
from pyspark.sql import Row
from pyspark.sql.types import *

def get_context(user_question, retrieved_k = 5):
    # Generate embeddings for the question
    question_embedding = gen_question_embedding(user_question)

    # Retrieve the top K entries
    output = retrieve_k_chunk(retrieved_k, question_embedding)

    # concatenate the content of the retrieved documents
    context = [i["chunk"] for i in output["value"]]

    return context

def get_response(user_question):
    context = get_context(user_question)

    # Write a prompt with context and user_question as variables 
    prompt = f"""
    context: {context}
    Answer the question based on the context above.
    If the information to answer the question is not present in the given context then reply "I don't know".
    """

    chat_df = spark.createDataFrame(
        [
            (
                [
                    make_message(
                        "system", prompt
                    ),
                    make_message("user", user_question),
                ],
            ),
        ]
    ).toDF("messages")

    chat_completion = (
        OpenAIChatCompletion()
        .setSubscriptionKey(openai_key)
        .setDeploymentName(openai_deployment_for_query)
        .setCustomServiceName(openai_service_name)
        .setMessagesCol("messages")
        .setErrorCol("error")
        .setOutputCol("chat_completions")
    )


    result_df = chat_completion.transform(chat_df).select("chat_completions.choices.message.content")

    result = []
    for row in result_df.collect():
        content_string = ' '.join(row['content'])
        result.append(content_string)

    # Join the list into a single string
    result = ' '.join(result)
    
    return result    


def make_message(role, content):
    return Row(role=role, content=content, name=role)


response = get_response("What is an employee campaign?")

Previous
Previous

LangChain Agents to query Power BI datasets in Microsoft Fabric

Next
Next

How to generate Business Friendly #PowerBI Data Dictionary using #OpenAI #SemanticLink #msfabric