AWS in Plain English

New AWS, Cloud, and DevOps content every day. Follow to join our 3.5M+ monthly readers.

Follow publication

RAG solution on Amazon Bedrock - Part 6: Enhancing Document Indexing with Event-Driven Architecture

Generative AI Demo using the Well-Architected Machine Learning Lens PDF to prepare for Machine Learning Associate (MLA-C01) Certification

MCQ (Multiple Choice Question) GenerativeAI App for Machine Learning Associate Certification (MLA-C01)

In this part of the series, we will address the challenge of container restarts due to timeouts during the document indexing process. We will enhance our GenAI application by incorporating an event-driven approach using Amazon SQS and AWS Lambda to handle document indexing asynchronously.

Pre-requisites

RAG Application with Event-Driven Document Indexing (CDK TypeScript Implementation)

OpenSearch Serverless RAG with Cognito, ECS Fargate and Event-Driven Document Indexing

CDK Project Implementation

Ensure you have the AWS CDK installed and initialized as per the pre-requisites. The complete source code for this project can be found on our GitHub repository. Clone the repository using the following command with release v0.6.0:

git clone -b v0.6.0 https://github.com/awsdataarchitect/opensearch-bedrock-rag-cdk.git && cd opensearch-bedrock-rag-cdk
npm install

Enhancements in ECS Fargate Stack

1. Creating an SQS Queue

We will create an Amazon SQS queue to decouple the document indexing process from the ECS service. This queue will hold messages that represent document indexing tasks.

Code Changes:

import * as sqs from 'aws-cdk-lib/aws-sqs';

// Create an SQS queue
const queue = new sqs.Queue(this, 'MyQueue', {
queueName: 'docs-queue',
retentionPeriod: cdk.Duration.days(1),
visibilityTimeout: cdk.Duration.seconds(30),
removalPolicy: cdk.RemovalPolicy.DESTROY,
});

2. Adding IAM Policies for SQS

We need to grant the ECS task role permission to send messages to the SQS queue.

Code Changes:

appService.taskDefinition.taskRole?.attachInlinePolicy(new iam.Policy(this, 'QueuePolicy', {
statements: [
new iam.PolicyStatement({
actions: [
'sqs:SendMessage',
],
resources: [queue.queueArn],
effect: iam.Effect.ALLOW,
}),
],
}));

3. Adding Environment Variable for SQS Queue URL

We will add the sqs_queue_url environment variable to the ECS task definition. This allows the ECS service to send document indexing tasks to the SQS queue.

Code Changes:

// Create a new Fargate service with the image from ECR and specify the service name
const appService = new ecs_patterns.ApplicationLoadBalancedFargateService(this, 'MyFargateService', {
cluster,
serviceName: 'ecs-bedrock-service',
taskImageOptions: {
image: ecs.ContainerImage.fromDockerImageAsset(appImageAsset),
containerPort: 8501,
environment: {
'opensearch_host': props.OpenSearchEndpoint,
'vector_index_name': props.VectorIndexName,
'vector_field_name': props.VectorFieldName,
'sqs_queue_url': queue.queueUrl,
},
},
});

Enhancements in Lambda Function

Creating a Lambda Function

We will create a Lambda function that will process messages from the SQS queue and handle document indexing.

Code Changes:

import * as lambda from 'aws-cdk-lib/aws-lambda';
import { SqsEventSource } from 'aws-cdk-lib/aws-lambda-event-sources';

// Create a Lambda function
const lambdaFunction = new lambda.Function(this, 'MyLambdaFunction', {
functionName: 'docs-indexer',
timeout: cdk.Duration.seconds(20),
runtime: lambda.Runtime.PYTHON_3_9,
handler: 'index.handler',
code: lambda.Code.fromAsset('lambda/indexer'),
environment: {
'opensearch_host': props.OpenSearchEndpoint,
'vector_index_name': props.VectorIndexName,
'vector_field_name': props.VectorFieldName,
},
});
// Grant necessary permissions to the Lambda function
lambdaFunction.role?.attachInlinePolicy(bedrockPolicy);
lambdaFunction.role?.attachInlinePolicy(openSearchPolicy);
lambdaFunction.addToRolePolicy(new iam.PolicyStatement({
actions: ['sqs:ReceiveMessage', 'sqs:DeleteMessage', 'sqs:GetQueueAttributes'],
resources: [queue.queueArn],
effect: iam.Effect.ALLOW,
}));
// Configure the SQS queue as an event source for the Lambda function
lambdaFunction.addEventSource(new SqsEventSource(queue));

Changes in docs_to_openSearch.py Script

SQS Client Setup and Message Sending

We will set up the SQS client and send document chunks to the SQS queue from the docs_to_openSearch.py script. This change ensures that each document chunk is processed asynchronously by sending it to the SQS queue.

Code Changes:

# SQS client setup
sqs = boto3.client('sqs', region_name='us-east-1')
queue_url = os.getenv('sqs_queue_url')

...
...

# Send each document chunk to SQS
for i in doc:
exampleContent = i.page_content
message = json.dumps({"content": exampleContent})
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=message
)
print(f"Sent message to SQS: {response.get('MessageId')}")

These additions ensure that document indexing tasks are sent to the SQS queue, allowing them to be processed asynchronously by the Lambda function.

Deploy the Updated CDK Stack

Run the following commands to deploy the updated CDK stack:

cdk deploy --all

Verify Deployment

  • Check the SQS queue and ensure it is created successfully.
  • Verify the Lambda function configuration and its association with the SQS queue.
  • Access the application by navigating to the Route 53 DNS name provided in the CDK output.
Cognito Login UI

Clean-up

To delete all the resources, simply run the following command:

cdk destroy --all

By running this command, you ensure the complete removal of the defined resources, freeing up any allocated resources and eliminating associated costs.

Conclusion

In this part of the series, we enhanced the document indexing process by incorporating an event-driven architecture using Amazon SQS and AWS Lambda. This approach helps avoid container restarts due to timeouts and ensures a more resilient and scalable application. Stay tuned for the next part of this series, where we will continue to enhance our RAG solution.

Recap: To catch up on the previous parts of this series, please refer to the Blog Posts:

In Plain English 🚀

Thank you for being a part of the In Plain English community! Before you go:

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

Published in AWS in Plain English

New AWS, Cloud, and DevOps content every day. Follow to join our 3.5M+ monthly readers.

Written by Vivek V

AWS Ambassador | AWS Community Builder (AI Eng.) | 15x AWS All-Star Award AWS Gold Jacket | 3x AWS Certification Subject Matter Expert (SME) | 4x K8s | 5x Azure

No responses yet

Write a response