Andre Fomin Andre Fomin

How to implement RAG in Microsoft Fabric

In the previous posts we were starting our journey with Azure OpenAI and Microsoft Fabric for Data Engineering and Business Intelligence use cases. In this video we are kicking things up a notch or two as we are going to cover how to implement RAG (retrieval augmented architecture) in Microsoft Fabric.

In the previous posts we were starting our journey with Azure OpenAI and Microsoft Fabric for Data Engineering and Business Intelligence use cases. In this video we are kicking things up a notch or two as we are going to cover how to implement RAG (retrieval augmented architecture) in Microsoft Fabric.

This video is inspired by the following tutorial provided by Microsoft however, I had to modify a few things to make sure that the code could actually run in my Fabric environment. The modified notebook can be found below.

This cell is required to make sure that the proper version of SynapseML is installed. The version that comes with Fabric by default - does not work, at least not right now.

%%configure -f
{
  "name": "synapseml",
  "conf": {
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:1.0.2-15-ce5cc5c0-SNAPSHOT",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,com.fasterxml.jackson.core:jackson-databind",
      "spark.yarn.user.classpath.first": "true",
      "spark.sql.parquet.enableVectorizedReader": "false"
  }
}

This cell initializes all the variables that we need to connect to all the required Azure services: OpenAI, Document Intelligence, AI Search

from synapse.ml.core.platform import find_secret

doc_intel_key = find_secret(secret_name="doc-intel-key", keyvault="YOUR-keys")
doc_intel_services_location = "eastus"

openai_key = find_secret(secret_name="openaikey", keyvault="YOUR-keys")
openai_service_name = "YOURS"
openai_endpoint = "https://YOURS.openai.azure.com/"
openai_deployment_for_embeddings = "text-embedding-ada-002"
openai_deployment_for_query = "gpt-35-turbo"

aisearch_name = "YOURSEARCH"
aisearch_index_name = "YOURSEARCH-idx"#"YOURSEARCH-index"
aisearch_key = find_secret(secret_name="YOURSEARCH-key", keyvault="YOUR-keys")
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

document_path = "abfss://Fabric_Dev@onelake.dfs.fabric.microsoft.com/lh_fabric_demo.Lakehouse/Files/ph/"  # path to your documents folder
df = spark.read.format("binaryFile").load(document_path).limit(10).cache()

This cell reads the binary contents from the PDF and converts it to text and to paragraphs

from synapse.ml.services.form import AnalyzeDocument
from pyspark.sql.functions import col

analyze_document = (
    AnalyzeDocument()
    .setPrebuiltModelId("prebuilt-layout")
    .setSubscriptionKey(doc_intel_key)
    .setLocation(doc_intel_services_location)
    .setImageBytesCol("content")
    .setOutputCol("result")
    .setPages(
        "1-15"
    )  # Here we are reading the first 15 pages of the documents for demo purposes
)

analyzed_df = (
    analyze_document.transform(df)
    .withColumn("output_content", col("result.analyzeResult.content"))
    .withColumn("paragraphs", col("result.analyzeResult.paragraphs"))
).cache()

This cell takes the content of the documents and breaks it into chunks

from synapse.ml.featurize.text import PageSplitter

ps = (
    PageSplitter()
    .setInputCol("output_content")
    .setMaximumPageLength(4000)
    .setMinimumPageLength(3000)
    .setOutputCol("chunks")
)

splitted_df = ps.transform(analyzed_df)
display(splitted_df)
# Each column contains many chunks for the same document as a vector.
# Explode will distribute and replicate the content of a vecor across multple rows
from pyspark.sql.functions import explode, col

exploded_df = splitted_df.select("path", explode(col("chunks")).alias("chunk")).select(
    "path", "chunk"
)
display(exploded_df)

Now we can take our chunks and convert them into embeddings (or a set of numbers that correspond to our words)

from synapse.ml.services.openai import OpenAIEmbedding

embedding = (
    OpenAIEmbedding()
    .setSubscriptionKey(openai_key)
    .setDeploymentName(openai_deployment_for_embeddings)
    .setCustomServiceName(openai_service_name)
    .setTextCol("chunk")
    .setErrorCol("error")
    .setOutputCol("embeddings")
)

df_embeddings = embedding.transform(exploded_df)

display(df_embeddings)
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import lit

df_embeddings = (
    df_embeddings.drop("error")
    .withColumn(
        "idx", monotonically_increasing_id().cast("string")
    )  # create index ID for ACS
    .withColumn("searchAction", lit("upload"))
)

Now we are writing our embeddings into a vector database in AI Search

#this cell creates a new index, 
from synapse.ml.services import writeToAzureSearch
import json

df_embeddings.writeToAzureSearch(
    subscriptionKey=aisearch_key,
    actionCol="searchAction",
    serviceName=aisearch_name,
    indexName=aisearch_index_name,
    keyCol="idx",
    vectorCols=json.dumps([{"name": "embeddings", "dimension": 1536}]),
)

We have done the heavy lifting of reading our documents and storing them in a vector database, now we need to write a few functions to allow us to convert a question into embeddings and then look up relevant information in the vector database and return the results.

import requests

# Ask a question and convert to embeddings


def gen_question_embedding(user_question):
    # Convert question to embedding using synapseML
    from synapse.ml.services.openai import OpenAIEmbedding

    df_ques = spark.createDataFrame([(user_question, 1)], ["questions", "dummy"])
    embedding = (
        OpenAIEmbedding()
        .setSubscriptionKey(openai_key)
        .setDeploymentName(openai_deployment_for_embeddings)
        .setCustomServiceName(openai_service_name)
        .setTextCol("questions")
        .setErrorCol("errorQ")
        .setOutputCol("embeddings")
    )
    df_ques_embeddings = embedding.transform(df_ques)
    row = df_ques_embeddings.collect()[0]
    question_embedding = row.embeddings.tolist()
    return question_embedding


def retrieve_k_chunk(k, question_embedding):
    # Retrieve the top K entries
    url = f"https://{aisearch_name}.search.windows.net/indexes/{aisearch_index_name}/docs/search?api-version=2023-07-01-Preview"

    payload = json.dumps(
        {"vector": {"value": question_embedding, "fields": "embeddings", "k": k}}
    )
    headers = {
        "Content-Type": "application/json",
        "api-key": aisearch_key,
    }

    response = requests.request("POST", url, headers=headers, data=payload)
    output = json.loads(response.text)
    print(response.status_code)
    return output


# Generate embeddings for the question and retrieve the top k document chunks
question_embedding = gen_question_embedding(user_question)
output = retrieve_k_chunk(retrieve_k, question_embedding)

# Concatenate the content of retrieved documents
context = [i["chunk"] for i in output["value"]]

Now everything is coming together, GetContext function takes a question and returns the context back to the LLM to create an answer. Get_Response function takes the user question, looks up the context in AI search, then uses a prompt to give us the answer that we seek.

from synapse.ml.services.openai import OpenAIChatCompletion
from pyspark.sql import Row
from pyspark.sql.types import *

def get_context(user_question, retrieved_k = 5):
    # Generate embeddings for the question
    question_embedding = gen_question_embedding(user_question)

    # Retrieve the top K entries
    output = retrieve_k_chunk(retrieved_k, question_embedding)

    # concatenate the content of the retrieved documents
    context = [i["chunk"] for i in output["value"]]

    return context

def get_response(user_question):
    context = get_context(user_question)

    # Write a prompt with context and user_question as variables 
    prompt = f"""
    context: {context}
    Answer the question based on the context above.
    If the information to answer the question is not present in the given context then reply "I don't know".
    """

    chat_df = spark.createDataFrame(
        [
            (
                [
                    make_message(
                        "system", prompt
                    ),
                    make_message("user", user_question),
                ],
            ),
        ]
    ).toDF("messages")

    chat_completion = (
        OpenAIChatCompletion()
        .setSubscriptionKey(openai_key)
        .setDeploymentName(openai_deployment_for_query)
        .setCustomServiceName(openai_service_name)
        .setMessagesCol("messages")
        .setErrorCol("error")
        .setOutputCol("chat_completions")
    )


    result_df = chat_completion.transform(chat_df).select("chat_completions.choices.message.content")

    result = []
    for row in result_df.collect():
        content_string = ' '.join(row['content'])
        result.append(content_string)

    # Join the list into a single string
    result = ' '.join(result)
    
    return result    


def make_message(role, content):
    return Row(role=role, content=content, name=role)


response = get_response("What is an employee campaign?")

Read More
Andre Fomin Andre Fomin

How to generate Business Friendly #PowerBI Data Dictionary using #OpenAI #SemanticLink #msfabric

Learn how to generate a business-friendly PowerBI data dictionary using OpenAI, SemanticLink, and MSFabric!

In this video, we explore how to incorporate Azure OpenAI into the world of Business Intelligence and Microsoft Fabric.

We use the Semantic Link library to create our data dictionary. Then we enrich it using Azure OpenAI to derive business audience-friendly descriptions from our DAX measure definitions.

Check out the video to learn more! #PowerBI #OpenAI #SemanticLink #MSFabric

In this video I will continue to explore how we can incorporate Azure OpenAI, LLM, GenAI, not even sure what right buzz words to use here 🙂 , in the world of Business Intelligence and Microsoft Fabric. I will use Fabric notebooks along with Semantic Link library to first create the data dictionary of our datasets deployed in a current workspace, listing the datasets, measures and table columns. Then we will use Azure OpenAI to take our DAX measure definitions and derive business audience friendly descriptions from them.

first, we use Semantic Link library to create our tables Datasets, Measures and Columns (the Columns table also have the corresponding table name, but the Measures and Columns tables do not have a corresponding data set name in them, so we have to add it ourselves.

datasets = fabric.list_datasets()
spark_datasets = spark.createDataFrame(rename_columns(datasets))
 
 
spark_datasets.write.mode("overwrite").saveAsTable("pbi_dataset")
 
spark.sql("drop table if exists lh_fabric_demo.pbi_measure")
for index, row in datasets.iterrows():
    dataset_name = row["Dataset_Name"]
    measures = fabric.list_measures(dataset = dataset_name)
    if len(measures)> 0:
        rename_columns(measures)
        spark_measures = spark.createDataFrame(measures)
        spark_measures = spark_measures.withColumn("Dataset_Name", F.lit(dataset_name))
 
        spark_measures_ai = spark_measures.withColumn( \
        "Measure_Definition", \
        F.concat_ws(" = ", spark_measures["Measure_Name"], spark_measures["Measure_Expression"]) \
        ).withColumn( \
        "Measure_Description_AI", \
        generate_measure_description_from_DAX(F.col("Measure_Definition")))
        spark_measures_ai.write.mode("append").saveAsTable("pbi_measure")
spark.sql("drop table if exists lh_fabric_demo.pbi_table")
for index, row in datasets.iterrows():
    dataset_name = row["Dataset_Name"]
    tables = fabric.list_tables(dataset = dataset_name)

    if len(tables)> 0:
        rename_columns(tables)
        spark_tables = spark.createDataFrame(tables).withColumn("Dataset_Name", F.lit(dataset_name))
        spark_tables.write.mode("append").saveAsTable("pbi_table")

spark.sql("drop table if exists lh_fabric_demo.pbi_column")
for index, row in datasets.iterrows():
    dataset_name = row["Dataset_Name"]
    cols = fabric.list_columns(dataset = dataset_name)
    if len(cols)> 0:
        rename_columns(cols)
        spark_cols = spark.createDataFrame(cols).withColumn("Dataset_Name", F.lit(dataset_name))
        spark_cols.write.mode("append").saveAsTable("pbi_column")        

Now we can just query them using Spark SQL

%%sql
select * from lh_fabric_demo.pbi_measure limit 10
Read More
Andre Fomin Andre Fomin

Data Enrichment with OpenAI in Fabric

Dive into the world of Azure OpenAI and its groundbreaking impact on data engineering. This post explores the cutting-edge applications of large language models (LLMs) for data enrichment, offering a deep dive into practical use cases that underscore the transformative potential of Azure OpenAI in the field. If you're poised to elevate your data strategies and harness the power of LLMs, this essential read is your gateway to the future of data engineering.

I've just put together a video that I'm really excited about. It's about using Azure OpenAI to make your data work smarter, not harder. If you've been curious about how big language models (like the ones you hear about in the news) can fit into the work you do every day, this one's for you.

Why You Should Care:

Ever wonder how you can do more with your data? That's where Azure OpenAI comes in. It's a tool that can seriously crank up your data's value, making it tell you things you hadn't thought possible. We're talking about enriching data with insights you had not thought were possible.

What's In The Video:

I'll walk you through some real-deal examples of Azure OpenAI in action. You'll see how it can tackle everything from figuring out the mood of customer feedback to getting the lowdown on different demographics. It's like giving your data a superpower.

The Future Is Now:

This is the moment where data engineering takes a giant leap forward. The question isn't if you'll start using these tools, but how quickly you can get them into your workflow. The ideas I'm sharing are just the start. The real magic happens when you start playing around with Azure OpenAI yourself.

Let's Dive In:

Check out the video and see what you think. It's an exciting time to be in data engineering, and Azure OpenAI is right at the heart of it. Let's explore what's possible together.


Read More