Containers

Build a multi-tenant chatbot with RAG using Amazon Bedrock and Amazon EKS

Introduction

With the availability of Generative AI models, many customers are exploring ways to build chatbot applications that can cater to a wide range of their end-customers, with each instance of chatbot specializing on a specific tenant’s contextual information, and run such multi-tenant applications at scale with a cost-efficient infrastructure familiar to their development teams. Amazon Bedrock, a fully managed cloud service, tackles this challenge head-on by offering a range of powerful foundation models (FMs) through an Application Programming Interface (API), without managing infrastructure, streamlining the development of highly adaptable and cost-effective Generative AI chatbots.

Amazon Elastic Kubernetes Service (Amazon EKS) addresses several challenges around building chatbot applications such as multi-tenancy, scaling challenges, resiliency, and cost. Amazon EKS is commonly adopted by AWS customers for large scale deployment of container-based multi-tenant applications that cater to both internal and external audiences. Various surveys and reports including from Cloud Native Computing Foundation (CNCF) Dynatrace, Splunk, and Datadog also indicate the growth of Kubernetes as the container orchestration solution of choice in enterprise environments.

This post delves into a solution for building a multi-tenant chatbot with Retrieval Augmented Generation (RAG). RAG is a common pattern where a general-purpose language model is queried with a user question along with additional contextual information extracted from private documents. We have provided step-by-step deployment instructions and sample code for the solution in a GitHub repository.

Solution overview 

As depicted in Figure 1, the solution uses Amazon EKS as the foundation for running the containerized chatbot application, with Kubernetes namespaces serving as a logical unit of compute workload per tenant. Istio is an open-source service mesh commonly used with Kubernetes for deploying their multi-tenant applications, and provides features such as traffic management, security, and observability at the pod level. We have used Istio service mesh for ingress control and routing functions as it supports external authorization, JWT validation at the edge, and request routing based on various HTTP parameters. We use an Open ID Connect (OIDC) proxy to relay authorization requests to an Identity Provider which in our case is Amazon Cognito User Pools. Requests from clients are received through a Network Load Balancer (NLB). For each tenant we use the DNS subdomain pattern to direct traffic from the end user, where the DNS subdomain maps to the IP addresses of the NLB. The NLB forwards all traffic to an Istio Ingress Gateway. A detailed discussion of how Istio supports multi-tenancy on Amazon EKS can be found in the post SaaS Identity and Routing with Istio Service Mesh and Amazon EKS, along with sample code.

High-level architecture

Figure 1 – High-level architecture

The private documents that serve as the knowledge base for the RAG implementation, are converted into vector embeddings and are indexed and stored in the in-memory Facebook AI Similarity Search (FAISS) vector database index for efficient similarity search. We use the Titan Embeddings model to convert textual data to vector embeddings, and Claude Instant for generating the response to user queries. Both models are available through Amazon Bedrock. RAG is implemented with FAISS, an open-source library for implementing in-memory storage of vector embeddings and similarity search. We use FAISS for its simplicity and smaller footprint that suit demo use cases. For enterprise implementations, vector databases such as pgvector, Amazon OpenSearch, and other AWS Partner products, would be the preferred options, supporting scalability, persistence, and ubiquitous access. Vector embeddings are generated using the Titan Embeddings model. The LangChain library provides the functionality for running queries against the text model and for managing chat history by saving it in Amazon DynamoDB. It helps orchestrate various steps during the processing of user input, including chat history retrieval, requesting the vector embedding of the chat history combined with user input from the embeddings model, using the received embeddings to search the vector database, and sending the retrieved documents along with the chat history and user input to the Claude Instant model to receive the output.

Walkthrough

Containerization of chatbot and RAG components

The chatbot application consists of two component microservices: a frontend chat interface that interacts with the client and a RAG API. Both the microservices are built as Docker container images for scheduling as Kubernetes pods on Amazon EKS.

The Dockerfile for the chat interface is shown in the following snippet.

FROM public.ecr.aws/docker/library/python:3.11.4-slim AS installer-image
WORKDIR /app
RUN DEBIAN_FRONTEND=noninteractive apt-get -qq update -y 2>/dev/null >/dev/null && \
    DEBIAN_FRONTEND=noninteractive apt-get -qq install -y \
    build-essential \
    curl \
    software-properties-common 2>/dev/null >/dev/null \
    && rm -rf /var/lib/apt/lists/*
ADD app/* ./
RUN pip install --user --upgrade -q -q pip && pip install --user -q -q -r requirements.txt


FROM public.ecr.aws/docker/library/python:3.11.4-slim
RUN DEBIAN_FRONTEND=noninteractive apt-get -qq update -y 2>/dev/null >/dev/null && \
    DEBIAN_FRONTEND=noninteractive apt-get -qq upgrade -y 2>/dev/null >/dev/null && \
    DEBIAN_FRONTEND=noninteractive apt install -qq -y curl 2>/dev/null >/dev/null && \
    addgroup --gid 8000 streamlit && \
    adduser --uid 8000 --gid 8000 --disabled-password --gecos "" streamlit
USER streamlit
WORKDIR /home/streamlit/app
COPY --chown=streamlit:streamlit --from=installer-image /root/.local /home/streamlit/.local/
COPY --chown=streamlit:streamlit app/*.py /home/streamlit/app/
ENV PATH=/home/streamlit/.local/bin:$PATH
EXPOSE 8501
HEALTHCHECK CMD curl --fail http://localhost:8501/_stcore/health
ENTRYPOINT ["streamlit", "run", "webapp.py", "--server.port=8501", "--server.address=0.0.0.0"]

We use a lightweight Python base image from Amazon ECR Public Gallery, and use the multistage build approach. In the first stage, we build and install the required Python packages into the user home directory. In the second stage, we create a non-root Linux user and group for the Streamlit-based chat interface to run, copy the installed Python packages from the first stage image, and also copy the application code along with all the associated modules into the application working directory in the container image.

The Dockerfile for the RAG API Docker image is shown in the following code snippet.

FROM public.ecr.aws/docker/library/python:3.11.4-slim AS installer-image
WORKDIR /app
RUN DEBIAN_FRONTEND=noninteractive apt-get -qq update -y 2>/dev/null >/dev/null && \
    DEBIAN_FRONTEND=noninteractive apt-get -qq install -y \
    build-essential \
    curl 2>/dev/null >/dev/null \
    && rm -rf /var/lib/apt/lists/*
ADD api/requirements.txt ./
RUN pip install --upgrade -q -q pip && \
    pip install --user --upgrade -q -q pip && pip install --user -q -q -r requirements.txt && \
    python -m pip install --user -q -q botocore && \
    python -m pip install --user -q -q boto3

FROM public.ecr.aws/docker/library/python:3.11.4-slim
RUN DEBIAN_FRONTEND=noninteractive apt-get -qq update -y 2>/dev/null >/dev/null && \
    DEBIAN_FRONTEND=noninteractive apt-get -qq upgrade -y 2>/dev/null >/dev/null && \
    DEBIAN_FRONTEND=noninteractive apt install -qq -y curl 2>/dev/null >/dev/null && \
    addgroup --gid 8000 ragapi && \
    adduser --uid 8000 --gid 8000 --disabled-password --gecos "" ragapi
USER ragapi
WORKDIR /home/ragapi/app
COPY --chown=ragapi:ragapi --from=installer-image /root/.local /home/ragapi/.local/
COPY --chown=ragapi:ragapi api/app /home/ragapi/app/
ENV PATH=/home/ragapi/.local/bin:$PATH
EXPOSE 8000
ENTRYPOINT ["gunicorn", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000", "main:app"]

For the RAG API, we use the same Python base image as the frontend, and follow similar steps as outlined for the frontend. For serving the API, we use the FastAPI framework, and a combination of Gunicorn/Uvicorn to run an efficient ASGI web server.

Once the container images are built, they are pushed to Amazon Elastic Container Registry (Amazon ECR) and are ready for deployment. The container images from the ECR are pulled during the deployment of the chatbot pod on Amazon EKS.

As shown in Figure 2, a Kubernetes namespace is created for each tenant deployment, into which the chatbot pod is deployed with environment variables configured to tenant-specific settings. The chatbot pod consists of two containers, with the main container being the frontend, while the RAG API acts as a sidecar container serving the API required for the Q & A task. We also create an AWS Identity and Access Management (AWS IAM) role per tenant that has the necessary permission for the tenant-specific Amazon Simple Storage Service (Amazon S3) bucket and Amazon DynamoDB table. In order to simplify the use case, we’ve created per-tenant dedicated Amazon S3 and Amazon DynamoDB resources. The AWS IAM role is associated with a Kubernetes Service Account that’s also provisioned along with the pod, and allows the containers in the pod to access required data resources.

Amazon EKS pod architecture

Figure 2 – Amazon EKS pod architecture

It is also a common pattern to comingle tenant data into fewer buckets and tables vis-à-vis the number of tenants, with IAM conditions to generate dynamic credentials using AWS Security Token Service (STS) in order to restrict access to tenant and user specific data per incoming request. Details and sample code for implementing the dynamic credentials pattern can be found in the AWS SaaS Microservices Workshop, and the post on SaaS Data Isolation with Dynamic Credentials Using HashiCorp Vault in Amazon EKS.

The Kubernetes manifest used to deploy the service account, deployment and service is listed as follows.

apiVersion: v1
kind: ServiceAccount
metadata:
  annotations:
    eks.amazonaws.com/role-arn: arn:aws:iam::${ACCOUNT_ID}:role/${EKS_CLUSTER_NAME}-${TENANT}-chatbot-access-role-${RANDOM_STRING}
  name: ${SA_NAME}
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: chatbot
  labels:
    app: chatbot
spec:
  replicas: 1
  selector:
    matchLabels:
      app: chatbot
  template:
    metadata:
      labels:
        workload-tier: frontend
        app: chatbot
    spec:
      serviceAccountName: ${SA_NAME}
      containers:
        - image: ${REPO_URI_CHATBOT}:latest
          imagePullPolicy: Always
          name: chatbot
          ports:
            - containerPort: 8501
          env:
          - name: ISSUER_URI
            value: ${ISSUER_URI}
          - name: SESSIONS_TABLE
            value: ${SESSIONS_TABLE}
        - image: ${REPO_URI_RAGAPI}:latest
          imagePullPolicy: Always
          name: ragapi
          ports:
            - containerPort: 8000
          env:
          - name: CONTEXTUAL_DATA_BUCKET
            value: contextual-data-${TENANT}-${RANDOM_STRING}
          - name: CHATHISTORY_TABLE
            value: ${CHATHISTORY_TABLE}
          - name: TEXT2TEXT_MODEL_ID
            value: ${TEXT2TEXT_MODEL_ID}
          - name: EMBEDDING_MODEL_ID
            value: ${EMBEDDING_MODEL_ID}
          - name: BEDROCK_SERVICE
            value: ${BEDROCK_SERVICE}
          - name: AWS_DEFAULT_REGION
            value: ${AWS_DEFAULT_REGION}
---
kind: Service
apiVersion: v1
metadata:
  name: chatbot
  labels:
    app: chatbot
spec:
  selector:
    app: chatbot
  ports:
    - port: 80
      name: http
      targetPort: 8501

As can be seen in the container definitions, several environment variables are passed to the containers that are used by the application to locate tenant-specific data resources and API endpoints.

It is also worth noting that the locations for container images are also parametrized that can be passed during the manifest deployment, which in our case are set to the Amazon ECR endpoints for each image.

Ingesting data with FAISS and Titan embeddings model

The contextual data that the chatbot uses for generating responses to user queries is ingested into a FAISS index as part of the deployment. As shown in Figure 3, contextual data is first read from the tenant’s Amazon S3 bucket.

Ingestion of contextual data

Figure 3 – Ingestion of contextual data

The ingestion mechanism uses LangChain’s CharacterTextSplitter to chunk the ingested data into reasonable sizes, and the FAISS library converts each of the chunks into a vector embedding by calling the Titan embeddings model, creating an index of the embeddings. This indexing improves the response time for similarity search. The set of indexes so created are output in a binary formatted file and uploaded back into the Amazon S3 bucket, which is accessed by the RAG-API microservice during bootstrap and loaded into its memory. With larger vector databases, the index is created and managed by the database instance and can be queried by the application using database-specific APIs. Having a dedicated vector database decouples the loading, updating, and indexing of context data from the application, as well as the scaling of the database instance.

The following code snippet show the implementation details of the above sequence.

boto3_bedrock = boto3.client('bedrock', BEDROCK_REGION, endpoint_url=BEDROCK_ENDPOINT)
br_embeddings = BedrockEmbeddings(client=boto3_bedrock)

...

loader = CSVLoader(f"./{LOCAL_RAG_DIR}/{DATAFILE}")
documents_aws = loader.load() #
print(f"documents:loaded:size={len(documents_aws)}")

docs = CharacterTextSplitter(chunk_size=2000, chunk_overlap=400, separator=",").split_documents(documents_aws)

...

vector_db = FAISS.from_documents(
    documents=docs,
    embedding=br_embeddings, 
)

...

vector_db.save_local(f"{FAISS_INDEX_DIR}-{t}")

try:
    to_upload = os.listdir(f"./{FAISS_INDEX_DIR}-{t}")
    for file in to_upload:
        s3.Bucket(S3_BUCKET).upload_file(f"./{FAISS_INDEX_DIR}-{t}/{file}", f"{FAISS_INDEX_DIR}/{file}", )
except botocore.exceptions.ClientError as e:
    if e.response['Error']['Code'] == "404":
        print("The object does not exist.")
    else:
        raise

Integration with Claude Instant

The RAG API microservice receives the user queries from the frontend, converts it into a prompt formatted for the appropriate FM, and interacts with the model using the LangChain library. The LangChain library is initialized to maintain chat history in a tenant-specific Amazon DynamoDB table. When a query is received, LangChain is called to retrieve similarity search results from the vector database, and send the prompt along with the matching results from the search and the previous chat history as context to the Claude Instant model. The response received from the FM is returned by the RAG API to the client through the chat interface.

The flow of a chat conversation is shown in Figure 4. When the user initiates a connection for the first time to the chat interface by pointing a browser to its Fully Qualified Domain Name (FQDN) such as tenant.example.com, the request will be intercepted by the Istio Ingress Gateway and forwarded to the OIDC proxy for authorization. As the request doesn’t contain the cookie associated with the authorization token. This redirects the user’s browser to the Hosted UI of the tenant’s User Pool. Once the user signs in, the request receives a code from the User Pool that is passed on to the OIDC Proxy which exchanges it with the User Pool for the user’s Identity Token (formatted as JSON Web Token [JWT]). The OIDC Proxy caches the JWT and returns a cookie back to the user’s browser to be used in future requests. When the request reaches the tenant’s pod, the JWT is verified against its Issuer URL to ensure that the JWT is genuine. It is also verified that the request coming from the Istio Ingress Gateway. After passing through these checks, the request reached the chat interface deployed in the tenant’s namespace.

Chat conversation flow

Figure 4 – Chat conversation flow

The following code snippet shows that there are two Amazon DynamoDB tables created for each tenant during deployment. The Sessions table keeps track of user interactions, and the ChatHistory table is used by LangChain for recording the history of chat conversation.

TENANTS="tenanta tenantb"

for t in $TENANTS
do

    export TABLE_NAME="Sessions_${t}_${RANDOM_STRING}"
    
    echo "Creating DynamoDB table ${TABLE_NAME}"
    export DDB_TABLE=$(aws dynamodb create-table \
                        --table-name ${TABLE_NAME} \
                        --attribute-definitions \
                            AttributeName=TenantId,AttributeType=S \
                        --provisioned-throughput \
                            ReadCapacityUnits=5,WriteCapacityUnits=5 \
                        --key-schema \
                            AttributeName=TenantId,KeyType=HASH \
                        --table-class STANDARD
                        )
        
    export TABLE_NAME="ChatHistory_${t}_${RANDOM_STRING}"

    echo "Creating DynamoDB table ${TABLE_NAME}"
    export DDB_TABLE=$(aws dynamodb create-table \
                        --table-name ${TABLE_NAME} \
                        --attribute-definitions \
                            AttributeName=SessionId,AttributeType=S \
                        --provisioned-throughput \
                            ReadCapacityUnits=5,WriteCapacityUnits=5 \
                        --key-schema \
                            AttributeName=SessionId,KeyType=HASH \
                        --table-class STANDARD
                        )
done

The TenantId and UserEmail are extracted from the authentication JWT by the Istio proxy sidecar and injected as headers X-Auth-Request-Tenantid and X-Auth-Request-Email, respectively. The chat application, while receiving user input, creates a SessionId associated with the TenantId and UserEmail. It also records the UNIX time of the last user interaction. The SessionId is a Version 4 UUID string. A concatenated string of TenantId:UserEmail:SessionId is sent to the RAG API along with the chat query, to be used a session identifier by the backend, so that chat histories can be tied back to specific users that may provide rich contextual information for downstream usage such as product recommendations, customer relations, technical support, etc. The following code snippet shows this processing within the application.

...

headers = _get_websocket_headers()
tenantid = headers.get("X-Auth-Request-Tenantid")
user_email = headers.get("X-Auth-Request-Email")

tenant_id = tenantid + ":" + user_email
dyn_resource = boto3.resource('dynamodb')
IDLE_TIME = 600                                     # seconds
current_time = int(datetime.now().timestamp())

...
    
if user_input:
    try:
        sessions = Sessions(dyn_resource)
        sessions_exists = sessions.exists(table_name)
        if sessions_exists:
            session = sessions.get_session(tenant_id)
            if session:
                if ((current_time - session['last_interaction']) < IDLE_TIME):
                    sessions.update_session_last_interaction(tenant_id, current_time)
                    updated_session = sessions.get_session(tenant_id)
                    print(updated_session['session_id'])
                else:
                    sessions.update_session(tenant_id, current_time)
                    updated_session = sessions.get_session(tenant_id)
            else:
                sessions.add_session(tenant_id)
                session = sessions.get_session(tenant_id)
    except Exception as e:
        print(f"Something went wrong: {e}")

    headers: Dict = {"accept": "application/json",
                     "Content-Type": "application/json"
                    }
    if mode == MODE_RAG:
        user_session_id = tenant_id + ":" + session["session_id"]
        data = {"q": user_input, "user_session_id": user_session_id, "verbose": True}
        resp = req.post(api_rag_ep, headers=headers, json=data)
        if resp.status_code != HTTP_OK:
            output = resp.text
        else:
            resp = resp.json()
            sources = [d['metadata']['source'] for d in resp['docs']]
            output = f"{resp['answer']} \n \n Sources: {sources}"
    else:
        print("error")
        output = f"unhandled mode value={mode}"
    st.session_state.past.append(user_input)  
    st.session_state.generated.append(output)

Since the chat interface and RAG API microservices are collocated within the same Kubernetes pod, the chat interface communicates with the RAG API over the localhost (127.0.0.1) network interface.

When the RAG API microservice starts, it loads the FAISS index into its memory, and starts listening for connections. When a request is received, it initializes an AWS boto3 client for Bedrock containing credentials, and passes the request data along with FM parameters and the Bedrock client data structure to LangChain. The LangChain library is configured to save chat conversations in a Amazon DynamoDB table, and starts a ConversationalRetrievalChain, which automatically searches for context in FAISS index, retrieves the saved history, formats the query based on a template definition, bundles the context together formatted query, sends it to the Claude Instant model, and saves the response from the model. The RAG API returns the response received from LangChain to the frontend application for presentation to the user.  The sequence of events managed by the RAG API is detailed in the following code snippet.

...

VECTOR_DB_DIR = os.path.join("/tmp", "_vectordb")
_vector_db = None
vectordb_s3_path: str = f"s3://{os.environ.get('CONTEXTUAL_DATA_BUCKET')}/faiss_index/"

if _vector_db is None:
    _vector_db = load_vector_db_faiss(vectordb_s3_path,
                                      VECTOR_DB_DIR,
                                      BEDROCK_ENDPOINT,
                                      BEDROCK_REGION)
...

@router.post("/rag")
def rag_handler(req: Request) -> Dict[str, Any]:
    docs = _vector_db.similarity_search(req.q, k=req.max_matching_docs)

...

    parameters = {
        "max_tokens_to_sample": req.maxTokenCount,
        "stop_sequences": req.stopSequences,
        "temperature": req.temperature,
        "top_k": req.topK,
        "top_p": req.topP
        }

    endpoint_name = req.text_generation_model

    session_id = req.user_session_id
    boto3_bedrock = boto3.client(service_name=BEDROCK_SERVICE)
    bedrock_llm = Bedrock(model_id=TEXT2TEXT_MODEL_ID, client=boto3_bedrock)
    bedrock_llm.model_kwargs = parameters
    
    message_history = DynamoDBChatMessageHistory(table_name=CHATHISTORY_TABLE, session_id=session_id)

    memory_chain = ConversationBufferMemory(
        memory_key="chat_history",
        chat_memory=message_history,
        input_key="question",
        ai_prefix="Assistant",
        return_messages=True
    )
    
    condense_prompt_claude = PromptTemplate.from_template("""
    Answer only with the new question.
    
    Human: How would you ask the question considering the previous conversation: {question}
    
    Assistant: Question:""")

    qa = ConversationalRetrievalChain.from_llm(
        llm=bedrock_llm, 
        retriever=_vector_db.as_retriever(search_type='similarity', search_kwargs={"k": req.max_matching_docs}), 
        memory=memory_chain,
        condense_question_prompt=condense_prompt_claude,
        chain_type='stuff', # 'refine',
    )

    qa.combine_docs_chain.llm_chain.prompt = PromptTemplate.from_template("""
    {context}

    Human: Answer the question inside the <q></q> XML tags.
    
    <q>{question}</q>
    
    Do not use any XML tags in the answer. If you don't know the answer or if the answer is not in the context say "Sorry, I don't know."

    Assistant:""")

    answer = ""
    answer = qa.run({'question': req.q })

    logger.info(f"answer received from llm,\nquestion: \"{req.q}\"\nanswer: \"{answer}\"")
    resp = {'question': req.q, 'answer': answer, 'session_id': req.user_session_id}
    if req.verbose is True:
        resp['docs'] = docs

    return resp

Conclusion

In this post, we showed you how to build a multi-tenant chatbot with RAG using Amazon Bedrock and Amazon EKS. The RAG implementation uses private data as context for large language models to produce predictable and relevant responses to chat queries. While this post demonstrates the chatbot use case, this approach and components can also be applied to use cases and tasks such as question answering, text summarization, and content creation.

Amazon EKS provides native multi-tenancy allowing workload isolation among tenants of users enabling efficient sharing of resources. We encourage you to explore Amazon Titan models and Amazon Bedrock for your Generative AI use cases and build a resilient and cost-effective solution with Amazon EKS as the underlying orchestration platform.

Checkout step-by-step deployment instructions and sample code for the solution discussed in the post, in the GitHub repository here.

If you’re interested in the concepts introduced in this post, then please feel free to reach out using social media (Farooq Ashraf, Ravi Yadav, and Jared Dean)