Partnerships

Partnerships

Partnerships

From MP4 to API: End-to-End Video Understanding with Marengo & Pegasus on AWS Bedrock

Eric Kim, James Le

Eric Kim, James Le

Eric Kim, James Le

This tutorial is designed for developers and enterprise data teams looking to harness the power of video understanding within their AWS environments. Our goal is to provide a practical, hands-on guide to integrating TwelveLabs' cutting-edge video AI models—Marengo and Pegasus—through Amazon Bedrock. By the end of this tutorial, you will have built a powerful video analysis pipeline, enabling you to create searchable video libraries and generate rich, descriptive metadata automatically.

This tutorial is designed for developers and enterprise data teams looking to harness the power of video understanding within their AWS environments. Our goal is to provide a practical, hands-on guide to integrating TwelveLabs' cutting-edge video AI models—Marengo and Pegasus—through Amazon Bedrock. By the end of this tutorial, you will have built a powerful video analysis pipeline, enabling you to create searchable video libraries and generate rich, descriptive metadata automatically.

Join our newsletter

Receive the latest advancements, tutorials, and industry insights in video understanding

Search, analyze, and explore your videos with AI.

Aug 22, 2025

Aug 22, 2025

Aug 22, 2025

15 Min

15 Min

15 Min

Copy link to article

Copy link to article

Copy link to article

Overview

This tutorial is designed for developers and enterprise data teams looking to harness the power of video understanding within their AWS environments. Our goal is to provide a practical, hands-on guide to integrating TwelveLabs' cutting-edge video AI models—Marengo and Pegasus—through Amazon Bedrock. By the end of this tutorial, you will have built a powerful video analysis pipeline, enabling you to create searchable video libraries and generate rich, descriptive metadata automatically.

You will build a two-part Jupyter Notebook that demonstrates two core use cases:

  1. Multimodal Search: Use the Marengo Embed 2.7 model to generate multimodal vector embeddings from video files. These embeddings will be indexed in Amazon OpenSearch Serverless to create a scalable, fast, and accurate similarity search engine for your video content.

  2. Generative Video Analysis: Leverage the Pegasus 1.2 model to perform video-to-text tasks, such as creating summaries, generating chapters, and extracting structured metadata from the same set of videos.


Prerequisites

To follow this tutorial, you will need:

  • An AWS account with an IAM role configured with permissions for Amazon S3, Amazon Bedrock, and Amazon OpenSearch Serverless.

  • Model access enabled for TwelveLabs Marengo and Pegasus in your chosen AWS regions.

  • An S3 bucket to store your video files and model outputs.

  • A Python 3.10+ environment with boto3 and other required libraries installed.

  • A small collection of 5–10 short MP4 video files for analysis.


1 - Architecture & Key Concepts

The solution is architected around a serverless, event-driven workflow that is both scalable and cost-effective. The primary data flow for the multimodal search component is as follows:

This architecture allows you to process large video libraries efficiently. When a video is uploaded to an S3 bucket, an asynchronous call to Bedrock with the Marengo model is initiated. Bedrock processes the video and writes the resulting vector embeddings back to a designated S3 prefix. From there, the embeddings can be loaded into an Amazon OpenSearch Serverless index for low-latency similarity search.

Before we dive into the code, it's important to understand the two different types of models we'll be using:

  • Multimodal Embeddings (Marengo): An embedding is a numerical vector that represents complex data like a video, image, or text. Marengo transforms different types of media into a single, unified vector space. This is powerful because it means you can search for a visual moment in a video using a simple text query—the model understands the relationship between the words in your query and the pixels in the video. We will use this for building our search application.

  • Video-to-Text Generation (Pegasus): Pegasus is a generative model designed to "watch" a video and produce human-readable text about it. Unlike Marengo, its output is not a vector but rather descriptive text, such as a summary, a list of highlights, or answers to specific questions about the video's content.

A key practical consideration for AWS developers is region availability. The TwelveLabs models are deployed in specific AWS regions. Please refer to the list of region availability in the AWS documentation. For instance, Marengo is available in us-east-1, while Pegasus is both in us-east-1 and us-west-2. Important to note, developers must first enable the model in a specific region, but when invoking it through the Bedrock API, they must use the cross-region inference endpoint. The default Bedrock model ID cannot be used directly with Pegasus. Your code should be architected to handle these differences, which can be managed by initializing separate Boto3 clients for each required region and making cross-region calls when necessary.


2 - Environment Setup

Before we start building our video intelligence pipeline, we need to prepare our development environment. This involves installing the necessary Python libraries, configuring the AWS SDK to communicate with Bedrock and other services, and setting up a structured S3 bucket to store our files.


2.1 - Install Dependencies

First, you'll need to install several Python packages that our script will rely on. These libraries handle everything from interacting with AWS services to managing data and performing calculations.

You can install them all with a single pip command in your terminal:

Here’s a quick rundown of what each library does:

  • boto3: The official AWS SDK for Python. It's how we'll interact with services like Bedrock, S3, and OpenSearch.

  • opensearch-py: The official Python client for OpenSearch, which we'll use to index and search our video embeddings.

  • pandas & numpy: Powerful libraries for data manipulation and numerical operations. They are useful for handling the embedding vectors returned by Marengo.

  • scikit-learn: A machine learning library that provides helpful tools, including a function to calculate cosine similarity between vectors.

  • botocore: The low-level library that boto3 is built on. It provides core functionalities like handling credentials and AWS request signing.

Noted that pandas , numpy , and scikit-learn are not strictly required here, only necessary for working directly with the embeddings and calculating cosine similarity.


2.2 - Configure the AWS SDK

With the libraries installed, the next step is to configure our script to communicate with your AWS account. We'll use boto3 to create client objects for the services we need.

import boto3
import json

# Define the AWS Region where your Bedrock models are enabled
# For Marengo, this might be "us-east-1"
# For Pegasus, this might be "us-west-2"
region_name = "us-east-1" 

# Initialize a session with your default AWS profile
session = boto3.Session(profile_name="default", region_name=region_name)

# Create clients for Bedrock Runtime and S3
bedrock_runtime_client = session.client("bedrock-runtime")
s3_client = session.client("s3")

print("✅ AWS clients created successfully.")

What this code does:

This script sets up the connection to AWS.

  • First, we import the boto3 library.

  • We then establish a session using your "default" AWS credentials profile. Think of this like logging into your AWS account. We also specify the region_name where you've enabled the TwelveLabs models.

  • Finally, we create specialized client objects. s3_client is for interacting with your S3 buckets, and bedrock_runtime_client is for making calls to the Bedrock models themselves.

A key point for developers is the distinction between bedrock and bedrock-runtime. The bedrock client is for management tasks, like requesting model access or managing provisioned throughput. The bedrock-runtime client, which we use here, is specifically for invoking models to get inferences (like generating embeddings or text).


2.3 - S3 Bucket Layout

A well-organized S3 bucket is crucial for a clean and scalable architecture. For this tutorial, we recommend creating separate prefixes (which function like folders) for your raw videos, the embeddings generated by Marengo, and any optional images you might use for search.

Your S3 bucket structure should look like this:


This layout helps keep your data organized, making it easier to manage permissions and track costs associated with different parts of your workflow. In the following sections, our code will upload videos to the /videos/ prefix and configure Marengo to write its output to the /embeddings/ prefix.


3 - Part A – Multimodal Search with Marengo

Now that our environment is set up, let's dive into the first part of our project: building a powerful multimodal search engine. In this section, we will use the TwelveLabs Marengo model on Bedrock to convert our videos into vector embeddings. We will then store these embeddings in Amazon OpenSearch Serverless and build a search application that can find moments in videos using either a text description or a sample image.


3.1 - Upload Sample Videos

First, we need some videos to work with. To make things easy, we'll use a helper function to copy a few sample videos from the Netflix Open Content library directly into the S3 bucket you configured earlier.

import re

# A list of publicly available sample videos from Netflix
sample_videos = [
    's3://download.opencontent.netflix.com/TechblogAssets/CosmosLaundromat/encodes/CosmosLaundromat_2048x858_24fps_SDR.mp4',
    's3://download.opencontent.netflix.com/TechblogAssets/Meridian/encodes/Meridian_3840x2160_5994fps_SDR.mp4',
    's3://download.opencontent.netflix.com/TechblogAssets/Sparks/encodes/Sparks_4096x2160_5994fps_SDR.mp4'
]

# This client does not require AWS credentials to access public S3 buckets
public_s3_client = boto3.client('s3', config=botocore.client.Config(signature_version=botocore.UNSIGNED))

def parse_s3_uri(s3_uri: str) -> tuple[str, str]:
    """Helper function to split an S3 URI into bucket and key."""
    match = re.match(r'^s3://([^/]+)/(.+)$', s3_uri)
    if not match:
        raise ValueError(f"Invalid S3 URI format: {s3_uri}")
    return match.group(1), match.group(2)

def copy_public_s3_object(public_s3_uri: str, dest_bucket: str, dest_key: str):
    """Copies an object from a public S3 bucket to your private bucket."""
    source_bucket, source_key = parse_s3_uri(public_s3_uri)
    print(f"Downloading from {public_s3_uri}...")
    response = public_s3_client.get_object(Bucket=source_bucket, Key=source_key)
    
    print(f"Uploading to s3://{dest_bucket}/{dest_key} ...")
    s3_client.put_object(Bucket=dest_bucket, Key=dest_key, Body=response['Body'].read())
    print("✅ Copy completed successfully!")

# Loop through the sample videos and copy them to your S3 bucket
S3_BUCKET_NAME = "<your-bucket-name>" # Replace with your bucket name
S3_VIDEOS_PATH = "videos"

for video_uri in sample_videos:
    _, src_key = parse_s3_uri(video_uri)
    filename = src_key.split("/")[-1]
    dest_key = f"{S3_VIDEOS_PATH}/{filename}"
    copy_public_s3_object(
        public_s3_uri=video_uri,
        dest_bucket=S3_BUCKET_NAME,
        dest_key=dest_key
    )

What this code does:

This script automates the process of getting sample data. It defines a list of S3 URIs pointing to high-quality videos provided by Netflix. It then loops through this list, using a special boto3 client to download each video from its public location and upload it into the /videos/ prefix in your own S3 bucket.


3.2 - Synchronous vs. Asynchronous Embedding with Marengo

Marengo is designed to process a variety of media inputs, from large video files to short text queries. To optimize for these different use cases, Amazon Bedrock now provides two ways to invoke the model: asynchronously for large media and synchronously for real-time queries.


Asynchronous StartAsyncInvoke for Large-Scale Video Processing

For embedding large files like videos and audio, the asynchronous StartAsyncInvoke API remains the best practice. It allows you to submit a job to Bedrock and let it process in the background, which is ideal for batch-processing an entire video library without holding a connection open.

Think of it like dropping off film at a photo lab—you get a ticket (the invocation ARN) and can check on the status later to pick up the developed pictures (the embeddings). Our embed_video_async function from the previous section still uses this method and is the correct approach for video.

def embed_video_async(s3_uri: str, output_s3_bucket: str) -> str:
    """
    Starts an asynchronous job to generate embeddings for a video.
    
    Args:
        s3_uri (str): The S3 URI of the video to process.
        output_s3_bucket (str): The S3 bucket where results should be stored.

    Returns:
        str: The invocation ARN, which is like a job ID.
    """
    response = bedrock_runtime_client.start_async_invoke(
        modelId="twelvelabs.marengo-embed-2-7-v1:0",
        modelInput={
            "inputType": "video",
            "mediaSource": {"s3Location": {"uri": s3_uri}}
        },
        outputDataConfig={"s3OutputDataConfig": {
            "s3Uri": f"s3://{output_s3_bucket}/embeddings/"
        }}
    )
    return response["invocationArn"]

What this code does:

The embed_video_async function takes the S3 location of a video and submits it to the Marengo model (twelvelabs.marengo-embed-2-7-v1:0). We specify that the inputType is a video and point to its s3Location. We also tell Bedrock where to save the output: a JSON file in the /embeddings/ prefix of our bucket. The function returns an invocationArn, which we'll use to check the job status.


Synchronous InvokeModel for Real-Time Text and Image Queries

For small, low-latency tasks like embedding a user's text search query or a sample image, you can now use the synchronous InvokeModel API. This is a significant improvement for building responsive applications, as it returns the embedding directly in the API response without the need to poll for results.

Here is how you would create a function to embed a text query synchronously:

def embed_text_sync(query_text: str) -> list:
    """
    Generates an embedding for a text query using the synchronous InvokeModel API.

    Args:
        query_text (str): The text query to embed.

    Returns:
        list: The embedding vector for the text.
    """
    # Note: Ensure your bedrock_runtime_client is configured for a region where Marengo is available
    request_body = {
        "inputType": "text",
        "inputText": query_text
    }

    response = bedrock_runtime_client.invoke_model(
        modelId="twelvelabs.marengo-embed-2-7-v1:0",
        body=json.dumps(request_body),
        contentType="application/json",
        accept="application/json"
    )
    
    response_body = json.loads(response["body"].read())
    
    # The embedding is returned directly in the response
    return response_body["embedding"]

What this code does:

This function sends a request to the Marengo model using invoke_model.

  • We specify the inputType as text and provide the inputText.

  • Unlike the async call, the response from invoke_model contains the embedding directly. We parse the JSON response and return the embedding vector.

  • This synchronous approach eliminates the need for a separate polling function, simplifying the code and reducing latency for your application's search functionality. A similar function can be used for images by providing the mediaSource in the request.

By using the right tool for the job—StartAsyncInvoke for batch video processing and InvokeModel for real-time queries—you can build a more efficient and responsive video search architecture.


3.3 - Parse the Embedding JSON

Once Bedrock finishes processing the video, it will place an output.json file in the specified S3 location. This file contains a list of embedding vectors, where each vector represents a short segment of the video.

The structure of the output looks like this:

{
  "data": [
    {
      "embedding": [0.123, -0.045, 0.567, ...],
      "embeddingOption": "visual-text",
      "startSec": 0.0,
      "endSec": 5.0
    },
    {
      "embedding": [-0.234, 0.089, -0.112, ...],
      "embeddingOption": "visual-text",
      "startSec": 5.0,
      "endSec": 10.0
    }
    // ... more segments
  ]
}

Each object in the data array contains the embedding vector (a list of 1,024 numbers), the start and end time of the video segment in seconds (startSecendSec), and the type of embedding generated.


3.4 - Create an OpenSearch Serverless Index

To search these embeddings efficiently, we need a vector database. Amazon OpenSearch Serverless provides a fully managed solution with powerful k-Nearest Neighbor (k-NN) search capabilities.

Let's create an index configured for vector search.

# Note: You must create an OpenSearch Serverless client 'os_client' first.
# See the workshop file for details on creating the client with AWSV4SignerAuth.

INDEX_NAME = "video-search-index"

index_body = {
    "settings": { "index": { "knn": True } },
    "mappings": {
        "properties": {
            "embedding": {
                "type": "knn_vector",
                "dimension": 1024,
                "method": {
                    "engine": "faiss",
                    "name": "hnsw",
                    "space_type": "cosinesimil"
                }
            },
            "start_time": {"type": "float"},
            "end_time": {"type": "float"},
            "video_s3_uri": {"type": "keyword"}
        }
    }
}

os_client.indices.create(index=INDEX_NAME, body=index_body)
print(f"✅ Index '{INDEX_NAME}' created successfully.")

What this code does:

This script defines the schema for our search index.

  • "knn": True enables the k-NN functionality.

  • The embedding field is defined as a knn_vector of dimension 1024, matching Marengo's output.

  • We specify the faiss engine and hnsw algorithm, which are highly efficient for vector search.

  • space_type is set to cosinesimil, which tells OpenSearch to measure similarity based on the angle between vectors—a great choice for multimodal embeddings.


3.5 - Bulk-Index the Video Segments

With our index ready, we can now process our embedding files from S3 and load them into OpenSearch. This is typically done in bulk for efficiency. You would write a script to list all output.json files, parse them, and send the data to OpenSearch in batches.


3.6 - Query with Text and Image

This is where the magic happens. Because Marengo creates embeddings in a unified vector space, we can generate an embedding for a text query or an image and use it to find similar video segments.

Here’s a function for text-to-video search:

def search_videos_by_text(query_text: str, top_k: int = 3) -> list:
    """Searches for video segments using a text query."""
    # 1. Generate an embedding for the text query using Marengo
    print(f"Generating embedding for query: '{query_text}'")
    query_embedding_response = # Call Marengo with inputType: "text"
    query_embedding = query_embedding_response[0]["embedding"]

    # 2. Use the embedding to search OpenSearch
    search_body = {
        "query": { "knn": { "embedding": { "vector": query_embedding, "k": top_k } } },
        "_source": ["video_s3_uri", "start_time", "end_time"]
    }
    
    response = os_client.search(index=INDEX_NAME, body=search_body)
    return response['hits']['hits']

What this code does:

The function first calls Marengo to convert the query_text into a vector. It then uses this vector to perform a k-NN search on our OpenSearch index, asking for the top k most similar video segments. A similar function can be written for image search by calling Marengo with inputType: "image".


3.7 - Demo Search

Let's try it out. Imagine you want to find a specific scene in your video library.

query = "a car chase at night with neon lights"
search_results = search_videos_by_text(query)

# Print the top result
top_result = search_results[0]['_source']
video_url = top_result['video_s3_uri'] # You'd generate a presigned URL here
start_time = top_result['start_time']

print(f"Top match found in {video_url} at {start_time} seconds.")
# In a Jupyter Notebook, you could now display an HTML5 video player
# set to start at 'start_time'.

This would instantly find the video segment that best matches your description, demonstrating the power of multimodal search.


3.8 - Takeaways

In this section, you have successfully built a sophisticated video search engine using a completely serverless architecture. The key takeaways are:

  • Unified Vector Space: Marengo's ability to embed different modalities (video, text, image) into a single vector space is what enables powerful cross-modal search. You can search your videos with a sentence, not just keywords.

  • Scalable Architecture: By combining Bedrock's asynchronous API with Amazon OpenSearch Serverless, you have a pipeline that can scale to handle petabytes of video data without managing any servers.


4 - Part B – Rich Descriptions with Pegasus

In the first part of this tutorial, we built a powerful search application using Marengo's vector embeddings. Now, we will explore the generative capabilities of the TwelveLabs Pegasus model. Pegasus is designed to "watch" a video and generate rich, human-readable text, such as summaries, highlights, and even structured metadata. This allows you to move beyond search and unlock a deeper, more contextual understanding of your video content.


4.1 - Choose a Video for Analysis

We'll continue working with the same videos we uploaded to our S3 bucket in Part A. First, let's select one of them to analyze with Pegasus. You can list the objects in your /videos/ prefix and pick one.

# Assuming s3_client and S3_BUCKET_NAME are already configured
S3_VIDEOS_PATH = "videos"

# List videos in your S3 bucket
response = s3_client.list_objects_v2(
    Bucket=S3_BUCKET_NAME, 
    Prefix=f"{S3_VIDEOS_PATH}/"
)

if 'Contents' in response and len(response['Contents']) > 1:
    # Select the first video file for analysis (skipping the folder object)
    video_s3_key = response['Contents'][1]['Key']
    video_s3_uri = f"s3://{S3_BUCKET_NAME}/{video_s3_key}"
    print(f"✅ Selected for analysis: {video_s3_uri}")
else:
    print("❌ No videos found in the specified S3 path.")

This code snippet lists the files in your videos directory and selects the first one to work with, printing its S3 URI for confirmation.


4.2 - Get a Quick Analysis with InvokeModel

The simplest way to interact with Pegasus is through a synchronous invoke_model call. This is ideal for quick questions where you expect a relatively short, immediate answer. Let's ask Pegasus for a basic description of our selected video.

# Note: Ensure your bedrock_runtime_client is configured for a region where Pegasus is available (e.g., 'us-west-2')
prompt = "What is the video about?"

request_body = {
    "inputPrompt": prompt,
    "mediaSource": {
        "s3Location": {"uri": video_s3_uri}
    },
    "temperature": 0
}

response = bedrock_runtime_client.invoke_model(
    modelId="us.twelvelabs.pegasus-1-2-v1:0", # Use the correct model ID for your region
    body=json.dumps(request_body),
    contentType="application/json",
    accept="application/json"
)

# Parse the response to get the generated text
response_body = json.loads(response["body"].read())
print(response_body["message"])

What this code does:

This function sends a request to the Pegasus model.

  • The inputPrompt contains our question.

  • mediaSource points to the video's location in S3.

  • temperature: 0 is set to ensure the model provides the most factual, deterministic answer possible. A higher temperature (e.g., 0.7) would produce more creative but less predictable results.

  • The invoke_model call blocks until the analysis is complete and returns the full response, which we then parse to extract and print the message.

For longer videos or when generating extensive text, it's better to use invoke_model_with_response_stream. This method returns the response in chunks as it's being generated, improving the user experience by showing results faster.


4.3 - Generate Summaries, Hashtags, and Highlights

Pegasus excels at creating different types of descriptive content based on simple, natural language prompts. You don't need complex instructions; just ask for what you want.

Here are a few examples of prompts you can use to generate useful metadata:

  • For a summary"Summarize the video"

  • For hashtags"Generate relevant hashtags for the video"

  • For highlights"What are the highlighted moments of this video?"

Each of these prompts can be used in the request_body of an invoke_model or invoke_model_with_response_stream call to produce concise summaries, SEO-friendly tags, or a chronological list of key events in the video.


4.4 - Get Structured Metadata with Pegasus

A powerful feature of Pegasus on Bedrock is its ability to return structured output in a JSON format that you define. This is incredibly useful for programmatic workflows, such as automatically populating a content management system (CMS) or a video database.

It appears the request structure has been updated for clarity. The responseFormat object now directly contains the schema for the desired output.

prompt = """
Generate metadata for the video with the following fields:
- title: A creative and fitting title for the video.
- description: A short, one-paragraph summary.
- mood: The overall mood or feeling of the video.
- genre: The genre that best fits the video content.
"""

# Define the JSON schema for the desired output structure
json_schema = {
    "type": "object",
    "properties": {
        "title": {"type": "string", "description": "The title of the video"},
        "description": {"type": "string", "description": "A short summary of the video"},
        "mood": {"type": "string", "description": "The overall mood of the video"},
        "genre": {"type": "string", "description": "The genre that best fits the video"}
    },
    "required": ["title", "description", "mood", "genre"]
}

request_body = {
    "inputPrompt": prompt,
    "mediaSource": {"s3Location": {"uri": video_s3_uri}},
    "temperature": 0.5,
    "responseFormat": {
        "schema": json_schema # The schema is now passed directly
    }
}

response = bedrock_runtime_client.invoke_model(
    modelId="us.twelvelabs.pegasus-1-2-v1:0", # Use the correct model ID for your region
    body=json.dumps(request_body)
)

# The response body now contains a stringified JSON object
response_body = json.loads(response.get("body").read())
structured_metadata = json.loads(response_body["message"])

# Print the formatted JSON output
print(json.dumps(structured_metadata, indent=4))

What this code does:

This request tells Pegasus to generate specific metadata and format it according to the provided json_schema. The key change here is in the responseFormat object—the schema is now passed directly under the schema key, simplifying the request. The model will populate the titledescription, mood, and genre fields based on its analysis. The response is a clean, predictable JSON object that can be directly used in your application workflows, eliminating the need for fragile text parsing.


4.5 - Combine with Search Results

The true power of the TwelveLabs models on Bedrock is realized when you combine the outputs of Marengo and Pegasus. The structured metadata and summaries generated by Pegasus can be stored directly within your OpenSearch index alongside the Marengo vector embeddings.

When a user performs a search, your application can not only return the relevant video clip (found via Marengo's vector search) but also display a rich, contextual summary (generated by Pegasus). This creates a far more informative and complete user experience, turning a simple search result into a comprehensive piece of knowledge.


5 - Cleanup

As a best practice in any cloud environment, it's important to remove the resources you've created to avoid incurring ongoing costs. This section will guide you through decommissioning the components we set up in this tutorial.


5.1 - Delete the OpenSearch Serverless Index

First, we'll delete the OpenSearch Serverless index that stores our video embeddings. This will permanently remove the index and all the data within it. You can do this programmatically using the OpenSearch Python client.

# Assuming 'os_client' and 'INDEX_NAME' are already defined
try:
    response = os_client.indices.delete(index=INDEX_NAME)
    print(f"✅ Index '{INDEX_NAME}' deleted successfully.")
except Exception as e:
    print(f"❌ Error deleting index '{INDEX_NAME}': {e}")

This script sends a delete request to your OpenSearch Serverless collection for the specified index name.


5.2 - Empty the S3 Bucket

Next, let's clean up the S3 bucket by deleting the sample videos, the generated embeddings, and any query images you may have uploaded. The following script will list all objects in your bucket and delete them in a single batch operation.

# Assuming 's3_client' and 'S3_BUCKET_NAME' are already defined
try:
    # List all objects in the bucket
    response = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME)
    
    if 'Contents' in response:
        # Prepare the list of objects for deletion
        objects_to_delete = [{'Key': obj['Key']} for obj in response['Contents']]
        
        # Perform the delete operation
        s3_client.delete_objects(
            Bucket=S3_BUCKET_NAME,
            Delete={'Objects': objects_to_delete}
        )
        print(f"✅ Bucket '{S3_BUCKET_NAME}' emptied successfully.")
    else:
        print(f"✅ Bucket '{S3_BUCKET_NAME}' is already empty.")
except Exception as e:
    print(f"❌ Error emptying bucket '{S3_BUCKET_NAME}': {e}")

What this code does:

It first gets a list of all files in your bucket and then passes this list to the delete_objects API call, which efficiently removes all specified content.


5.3 - Stop Notebook Instances

Finally, if you were running this tutorial on an Amazon SageMaker Notebook Instance or an EC2 instance, remember to stop or terminate the instance from the AWS Management Console to prevent further charges.


6 - Next Steps and Extensions

Congratulations on building a fully functional video search and analysis pipeline! The concepts you've learned are the foundation for a wide range of powerful, real-world applications. Here are a few ideas to take your project to the next level:

  • Build a Bedrock Agent for Your Videos: Create a sophisticated conversational AI using Bedrock Agents. You can build an agent that orchestrates calls to both Marengo and Pegasus. This would allow a user to have a natural conversation with their video library, asking complex questions like, "Show me all the scenes from last year's conference where our CEO was on stage, and then generate a summary of their key points." Check out a great example here: https://github.com/garystafford/twelvelabs-bedrock-search-agent

  • Power a RAG Pipeline with Amazon Kendra: Integrate Marengo's vector embeddings with Amazon Kendra, AWS's intelligent search service. By using Marengo as a custom embedding provider, you can enable Kendra to perform semantic searches across your video library and use the results to ground a Large Language Model (LLM) for a Retrieval-Augmented Generation (RAG) system. This turns your video archive into a knowledge base you can query.

  • Automate Content Workflows with AWS Step Functions: Use AWS Step Functions to orchestrate a serverless workflow. For example, you could design a state machine that triggers whenever a new video is uploaded to S3. The workflow would automatically:

    1. Generate embeddings with Marengo.

    2. Index the embeddings in OpenSearch Serverless.

    3. Generate a summary and structured metadata with Pegasus.

    4. Store the Pegasus output in Amazon DynamoDB, linked to the video.

  • Create Dynamic Video Experiences: Use the highlight and chapter generation capabilities of Pegasus to create new forms of media. For example, you could feed the timestamps of highlights into AWS Elemental MediaTailor to dynamically insert ads or chapter markers, or use the data to generate a "trailer" or sizzle reel automatically.

Check out the full code in this repository: https://github.com/twelvelabs-io/tl-solutions-samples/tree/main/Workshops/TwelveLabs_Bedrock_Workshop


Essential Resources

Get Started Immediately:

Implementation Guides:

Developer Resources:

Overview

This tutorial is designed for developers and enterprise data teams looking to harness the power of video understanding within their AWS environments. Our goal is to provide a practical, hands-on guide to integrating TwelveLabs' cutting-edge video AI models—Marengo and Pegasus—through Amazon Bedrock. By the end of this tutorial, you will have built a powerful video analysis pipeline, enabling you to create searchable video libraries and generate rich, descriptive metadata automatically.

You will build a two-part Jupyter Notebook that demonstrates two core use cases:

  1. Multimodal Search: Use the Marengo Embed 2.7 model to generate multimodal vector embeddings from video files. These embeddings will be indexed in Amazon OpenSearch Serverless to create a scalable, fast, and accurate similarity search engine for your video content.

  2. Generative Video Analysis: Leverage the Pegasus 1.2 model to perform video-to-text tasks, such as creating summaries, generating chapters, and extracting structured metadata from the same set of videos.


Prerequisites

To follow this tutorial, you will need:

  • An AWS account with an IAM role configured with permissions for Amazon S3, Amazon Bedrock, and Amazon OpenSearch Serverless.

  • Model access enabled for TwelveLabs Marengo and Pegasus in your chosen AWS regions.

  • An S3 bucket to store your video files and model outputs.

  • A Python 3.10+ environment with boto3 and other required libraries installed.

  • A small collection of 5–10 short MP4 video files for analysis.


1 - Architecture & Key Concepts

The solution is architected around a serverless, event-driven workflow that is both scalable and cost-effective. The primary data flow for the multimodal search component is as follows:

This architecture allows you to process large video libraries efficiently. When a video is uploaded to an S3 bucket, an asynchronous call to Bedrock with the Marengo model is initiated. Bedrock processes the video and writes the resulting vector embeddings back to a designated S3 prefix. From there, the embeddings can be loaded into an Amazon OpenSearch Serverless index for low-latency similarity search.

Before we dive into the code, it's important to understand the two different types of models we'll be using:

  • Multimodal Embeddings (Marengo): An embedding is a numerical vector that represents complex data like a video, image, or text. Marengo transforms different types of media into a single, unified vector space. This is powerful because it means you can search for a visual moment in a video using a simple text query—the model understands the relationship between the words in your query and the pixels in the video. We will use this for building our search application.

  • Video-to-Text Generation (Pegasus): Pegasus is a generative model designed to "watch" a video and produce human-readable text about it. Unlike Marengo, its output is not a vector but rather descriptive text, such as a summary, a list of highlights, or answers to specific questions about the video's content.

A key practical consideration for AWS developers is region availability. The TwelveLabs models are deployed in specific AWS regions. Please refer to the list of region availability in the AWS documentation. For instance, Marengo is available in us-east-1, while Pegasus is both in us-east-1 and us-west-2. Important to note, developers must first enable the model in a specific region, but when invoking it through the Bedrock API, they must use the cross-region inference endpoint. The default Bedrock model ID cannot be used directly with Pegasus. Your code should be architected to handle these differences, which can be managed by initializing separate Boto3 clients for each required region and making cross-region calls when necessary.


2 - Environment Setup

Before we start building our video intelligence pipeline, we need to prepare our development environment. This involves installing the necessary Python libraries, configuring the AWS SDK to communicate with Bedrock and other services, and setting up a structured S3 bucket to store our files.


2.1 - Install Dependencies

First, you'll need to install several Python packages that our script will rely on. These libraries handle everything from interacting with AWS services to managing data and performing calculations.

You can install them all with a single pip command in your terminal:

Here’s a quick rundown of what each library does:

  • boto3: The official AWS SDK for Python. It's how we'll interact with services like Bedrock, S3, and OpenSearch.

  • opensearch-py: The official Python client for OpenSearch, which we'll use to index and search our video embeddings.

  • pandas & numpy: Powerful libraries for data manipulation and numerical operations. They are useful for handling the embedding vectors returned by Marengo.

  • scikit-learn: A machine learning library that provides helpful tools, including a function to calculate cosine similarity between vectors.

  • botocore: The low-level library that boto3 is built on. It provides core functionalities like handling credentials and AWS request signing.

Noted that pandas , numpy , and scikit-learn are not strictly required here, only necessary for working directly with the embeddings and calculating cosine similarity.


2.2 - Configure the AWS SDK

With the libraries installed, the next step is to configure our script to communicate with your AWS account. We'll use boto3 to create client objects for the services we need.

import boto3
import json

# Define the AWS Region where your Bedrock models are enabled
# For Marengo, this might be "us-east-1"
# For Pegasus, this might be "us-west-2"
region_name = "us-east-1" 

# Initialize a session with your default AWS profile
session = boto3.Session(profile_name="default", region_name=region_name)

# Create clients for Bedrock Runtime and S3
bedrock_runtime_client = session.client("bedrock-runtime")
s3_client = session.client("s3")

print("✅ AWS clients created successfully.")

What this code does:

This script sets up the connection to AWS.

  • First, we import the boto3 library.

  • We then establish a session using your "default" AWS credentials profile. Think of this like logging into your AWS account. We also specify the region_name where you've enabled the TwelveLabs models.

  • Finally, we create specialized client objects. s3_client is for interacting with your S3 buckets, and bedrock_runtime_client is for making calls to the Bedrock models themselves.

A key point for developers is the distinction between bedrock and bedrock-runtime. The bedrock client is for management tasks, like requesting model access or managing provisioned throughput. The bedrock-runtime client, which we use here, is specifically for invoking models to get inferences (like generating embeddings or text).


2.3 - S3 Bucket Layout

A well-organized S3 bucket is crucial for a clean and scalable architecture. For this tutorial, we recommend creating separate prefixes (which function like folders) for your raw videos, the embeddings generated by Marengo, and any optional images you might use for search.

Your S3 bucket structure should look like this:


This layout helps keep your data organized, making it easier to manage permissions and track costs associated with different parts of your workflow. In the following sections, our code will upload videos to the /videos/ prefix and configure Marengo to write its output to the /embeddings/ prefix.


3 - Part A – Multimodal Search with Marengo

Now that our environment is set up, let's dive into the first part of our project: building a powerful multimodal search engine. In this section, we will use the TwelveLabs Marengo model on Bedrock to convert our videos into vector embeddings. We will then store these embeddings in Amazon OpenSearch Serverless and build a search application that can find moments in videos using either a text description or a sample image.


3.1 - Upload Sample Videos

First, we need some videos to work with. To make things easy, we'll use a helper function to copy a few sample videos from the Netflix Open Content library directly into the S3 bucket you configured earlier.

import re

# A list of publicly available sample videos from Netflix
sample_videos = [
    's3://download.opencontent.netflix.com/TechblogAssets/CosmosLaundromat/encodes/CosmosLaundromat_2048x858_24fps_SDR.mp4',
    's3://download.opencontent.netflix.com/TechblogAssets/Meridian/encodes/Meridian_3840x2160_5994fps_SDR.mp4',
    's3://download.opencontent.netflix.com/TechblogAssets/Sparks/encodes/Sparks_4096x2160_5994fps_SDR.mp4'
]

# This client does not require AWS credentials to access public S3 buckets
public_s3_client = boto3.client('s3', config=botocore.client.Config(signature_version=botocore.UNSIGNED))

def parse_s3_uri(s3_uri: str) -> tuple[str, str]:
    """Helper function to split an S3 URI into bucket and key."""
    match = re.match(r'^s3://([^/]+)/(.+)$', s3_uri)
    if not match:
        raise ValueError(f"Invalid S3 URI format: {s3_uri}")
    return match.group(1), match.group(2)

def copy_public_s3_object(public_s3_uri: str, dest_bucket: str, dest_key: str):
    """Copies an object from a public S3 bucket to your private bucket."""
    source_bucket, source_key = parse_s3_uri(public_s3_uri)
    print(f"Downloading from {public_s3_uri}...")
    response = public_s3_client.get_object(Bucket=source_bucket, Key=source_key)
    
    print(f"Uploading to s3://{dest_bucket}/{dest_key} ...")
    s3_client.put_object(Bucket=dest_bucket, Key=dest_key, Body=response['Body'].read())
    print("✅ Copy completed successfully!")

# Loop through the sample videos and copy them to your S3 bucket
S3_BUCKET_NAME = "<your-bucket-name>" # Replace with your bucket name
S3_VIDEOS_PATH = "videos"

for video_uri in sample_videos:
    _, src_key = parse_s3_uri(video_uri)
    filename = src_key.split("/")[-1]
    dest_key = f"{S3_VIDEOS_PATH}/{filename}"
    copy_public_s3_object(
        public_s3_uri=video_uri,
        dest_bucket=S3_BUCKET_NAME,
        dest_key=dest_key
    )

What this code does:

This script automates the process of getting sample data. It defines a list of S3 URIs pointing to high-quality videos provided by Netflix. It then loops through this list, using a special boto3 client to download each video from its public location and upload it into the /videos/ prefix in your own S3 bucket.


3.2 - Synchronous vs. Asynchronous Embedding with Marengo

Marengo is designed to process a variety of media inputs, from large video files to short text queries. To optimize for these different use cases, Amazon Bedrock now provides two ways to invoke the model: asynchronously for large media and synchronously for real-time queries.


Asynchronous StartAsyncInvoke for Large-Scale Video Processing

For embedding large files like videos and audio, the asynchronous StartAsyncInvoke API remains the best practice. It allows you to submit a job to Bedrock and let it process in the background, which is ideal for batch-processing an entire video library without holding a connection open.

Think of it like dropping off film at a photo lab—you get a ticket (the invocation ARN) and can check on the status later to pick up the developed pictures (the embeddings). Our embed_video_async function from the previous section still uses this method and is the correct approach for video.

def embed_video_async(s3_uri: str, output_s3_bucket: str) -> str:
    """
    Starts an asynchronous job to generate embeddings for a video.
    
    Args:
        s3_uri (str): The S3 URI of the video to process.
        output_s3_bucket (str): The S3 bucket where results should be stored.

    Returns:
        str: The invocation ARN, which is like a job ID.
    """
    response = bedrock_runtime_client.start_async_invoke(
        modelId="twelvelabs.marengo-embed-2-7-v1:0",
        modelInput={
            "inputType": "video",
            "mediaSource": {"s3Location": {"uri": s3_uri}}
        },
        outputDataConfig={"s3OutputDataConfig": {
            "s3Uri": f"s3://{output_s3_bucket}/embeddings/"
        }}
    )
    return response["invocationArn"]

What this code does:

The embed_video_async function takes the S3 location of a video and submits it to the Marengo model (twelvelabs.marengo-embed-2-7-v1:0). We specify that the inputType is a video and point to its s3Location. We also tell Bedrock where to save the output: a JSON file in the /embeddings/ prefix of our bucket. The function returns an invocationArn, which we'll use to check the job status.


Synchronous InvokeModel for Real-Time Text and Image Queries

For small, low-latency tasks like embedding a user's text search query or a sample image, you can now use the synchronous InvokeModel API. This is a significant improvement for building responsive applications, as it returns the embedding directly in the API response without the need to poll for results.

Here is how you would create a function to embed a text query synchronously:

def embed_text_sync(query_text: str) -> list:
    """
    Generates an embedding for a text query using the synchronous InvokeModel API.

    Args:
        query_text (str): The text query to embed.

    Returns:
        list: The embedding vector for the text.
    """
    # Note: Ensure your bedrock_runtime_client is configured for a region where Marengo is available
    request_body = {
        "inputType": "text",
        "inputText": query_text
    }

    response = bedrock_runtime_client.invoke_model(
        modelId="twelvelabs.marengo-embed-2-7-v1:0",
        body=json.dumps(request_body),
        contentType="application/json",
        accept="application/json"
    )
    
    response_body = json.loads(response["body"].read())
    
    # The embedding is returned directly in the response
    return response_body["embedding"]

What this code does:

This function sends a request to the Marengo model using invoke_model.

  • We specify the inputType as text and provide the inputText.

  • Unlike the async call, the response from invoke_model contains the embedding directly. We parse the JSON response and return the embedding vector.

  • This synchronous approach eliminates the need for a separate polling function, simplifying the code and reducing latency for your application's search functionality. A similar function can be used for images by providing the mediaSource in the request.

By using the right tool for the job—StartAsyncInvoke for batch video processing and InvokeModel for real-time queries—you can build a more efficient and responsive video search architecture.


3.3 - Parse the Embedding JSON

Once Bedrock finishes processing the video, it will place an output.json file in the specified S3 location. This file contains a list of embedding vectors, where each vector represents a short segment of the video.

The structure of the output looks like this:

{
  "data": [
    {
      "embedding": [0.123, -0.045, 0.567, ...],
      "embeddingOption": "visual-text",
      "startSec": 0.0,
      "endSec": 5.0
    },
    {
      "embedding": [-0.234, 0.089, -0.112, ...],
      "embeddingOption": "visual-text",
      "startSec": 5.0,
      "endSec": 10.0
    }
    // ... more segments
  ]
}

Each object in the data array contains the embedding vector (a list of 1,024 numbers), the start and end time of the video segment in seconds (startSecendSec), and the type of embedding generated.


3.4 - Create an OpenSearch Serverless Index

To search these embeddings efficiently, we need a vector database. Amazon OpenSearch Serverless provides a fully managed solution with powerful k-Nearest Neighbor (k-NN) search capabilities.

Let's create an index configured for vector search.

# Note: You must create an OpenSearch Serverless client 'os_client' first.
# See the workshop file for details on creating the client with AWSV4SignerAuth.

INDEX_NAME = "video-search-index"

index_body = {
    "settings": { "index": { "knn": True } },
    "mappings": {
        "properties": {
            "embedding": {
                "type": "knn_vector",
                "dimension": 1024,
                "method": {
                    "engine": "faiss",
                    "name": "hnsw",
                    "space_type": "cosinesimil"
                }
            },
            "start_time": {"type": "float"},
            "end_time": {"type": "float"},
            "video_s3_uri": {"type": "keyword"}
        }
    }
}

os_client.indices.create(index=INDEX_NAME, body=index_body)
print(f"✅ Index '{INDEX_NAME}' created successfully.")

What this code does:

This script defines the schema for our search index.

  • "knn": True enables the k-NN functionality.

  • The embedding field is defined as a knn_vector of dimension 1024, matching Marengo's output.

  • We specify the faiss engine and hnsw algorithm, which are highly efficient for vector search.

  • space_type is set to cosinesimil, which tells OpenSearch to measure similarity based on the angle between vectors—a great choice for multimodal embeddings.


3.5 - Bulk-Index the Video Segments

With our index ready, we can now process our embedding files from S3 and load them into OpenSearch. This is typically done in bulk for efficiency. You would write a script to list all output.json files, parse them, and send the data to OpenSearch in batches.


3.6 - Query with Text and Image

This is where the magic happens. Because Marengo creates embeddings in a unified vector space, we can generate an embedding for a text query or an image and use it to find similar video segments.

Here’s a function for text-to-video search:

def search_videos_by_text(query_text: str, top_k: int = 3) -> list:
    """Searches for video segments using a text query."""
    # 1. Generate an embedding for the text query using Marengo
    print(f"Generating embedding for query: '{query_text}'")
    query_embedding_response = # Call Marengo with inputType: "text"
    query_embedding = query_embedding_response[0]["embedding"]

    # 2. Use the embedding to search OpenSearch
    search_body = {
        "query": { "knn": { "embedding": { "vector": query_embedding, "k": top_k } } },
        "_source": ["video_s3_uri", "start_time", "end_time"]
    }
    
    response = os_client.search(index=INDEX_NAME, body=search_body)
    return response['hits']['hits']

What this code does:

The function first calls Marengo to convert the query_text into a vector. It then uses this vector to perform a k-NN search on our OpenSearch index, asking for the top k most similar video segments. A similar function can be written for image search by calling Marengo with inputType: "image".


3.7 - Demo Search

Let's try it out. Imagine you want to find a specific scene in your video library.

query = "a car chase at night with neon lights"
search_results = search_videos_by_text(query)

# Print the top result
top_result = search_results[0]['_source']
video_url = top_result['video_s3_uri'] # You'd generate a presigned URL here
start_time = top_result['start_time']

print(f"Top match found in {video_url} at {start_time} seconds.")
# In a Jupyter Notebook, you could now display an HTML5 video player
# set to start at 'start_time'.

This would instantly find the video segment that best matches your description, demonstrating the power of multimodal search.


3.8 - Takeaways

In this section, you have successfully built a sophisticated video search engine using a completely serverless architecture. The key takeaways are:

  • Unified Vector Space: Marengo's ability to embed different modalities (video, text, image) into a single vector space is what enables powerful cross-modal search. You can search your videos with a sentence, not just keywords.

  • Scalable Architecture: By combining Bedrock's asynchronous API with Amazon OpenSearch Serverless, you have a pipeline that can scale to handle petabytes of video data without managing any servers.


4 - Part B – Rich Descriptions with Pegasus

In the first part of this tutorial, we built a powerful search application using Marengo's vector embeddings. Now, we will explore the generative capabilities of the TwelveLabs Pegasus model. Pegasus is designed to "watch" a video and generate rich, human-readable text, such as summaries, highlights, and even structured metadata. This allows you to move beyond search and unlock a deeper, more contextual understanding of your video content.


4.1 - Choose a Video for Analysis

We'll continue working with the same videos we uploaded to our S3 bucket in Part A. First, let's select one of them to analyze with Pegasus. You can list the objects in your /videos/ prefix and pick one.

# Assuming s3_client and S3_BUCKET_NAME are already configured
S3_VIDEOS_PATH = "videos"

# List videos in your S3 bucket
response = s3_client.list_objects_v2(
    Bucket=S3_BUCKET_NAME, 
    Prefix=f"{S3_VIDEOS_PATH}/"
)

if 'Contents' in response and len(response['Contents']) > 1:
    # Select the first video file for analysis (skipping the folder object)
    video_s3_key = response['Contents'][1]['Key']
    video_s3_uri = f"s3://{S3_BUCKET_NAME}/{video_s3_key}"
    print(f"✅ Selected for analysis: {video_s3_uri}")
else:
    print("❌ No videos found in the specified S3 path.")

This code snippet lists the files in your videos directory and selects the first one to work with, printing its S3 URI for confirmation.


4.2 - Get a Quick Analysis with InvokeModel

The simplest way to interact with Pegasus is through a synchronous invoke_model call. This is ideal for quick questions where you expect a relatively short, immediate answer. Let's ask Pegasus for a basic description of our selected video.

# Note: Ensure your bedrock_runtime_client is configured for a region where Pegasus is available (e.g., 'us-west-2')
prompt = "What is the video about?"

request_body = {
    "inputPrompt": prompt,
    "mediaSource": {
        "s3Location": {"uri": video_s3_uri}
    },
    "temperature": 0
}

response = bedrock_runtime_client.invoke_model(
    modelId="us.twelvelabs.pegasus-1-2-v1:0", # Use the correct model ID for your region
    body=json.dumps(request_body),
    contentType="application/json",
    accept="application/json"
)

# Parse the response to get the generated text
response_body = json.loads(response["body"].read())
print(response_body["message"])

What this code does:

This function sends a request to the Pegasus model.

  • The inputPrompt contains our question.

  • mediaSource points to the video's location in S3.

  • temperature: 0 is set to ensure the model provides the most factual, deterministic answer possible. A higher temperature (e.g., 0.7) would produce more creative but less predictable results.

  • The invoke_model call blocks until the analysis is complete and returns the full response, which we then parse to extract and print the message.

For longer videos or when generating extensive text, it's better to use invoke_model_with_response_stream. This method returns the response in chunks as it's being generated, improving the user experience by showing results faster.


4.3 - Generate Summaries, Hashtags, and Highlights

Pegasus excels at creating different types of descriptive content based on simple, natural language prompts. You don't need complex instructions; just ask for what you want.

Here are a few examples of prompts you can use to generate useful metadata:

  • For a summary"Summarize the video"

  • For hashtags"Generate relevant hashtags for the video"

  • For highlights"What are the highlighted moments of this video?"

Each of these prompts can be used in the request_body of an invoke_model or invoke_model_with_response_stream call to produce concise summaries, SEO-friendly tags, or a chronological list of key events in the video.


4.4 - Get Structured Metadata with Pegasus

A powerful feature of Pegasus on Bedrock is its ability to return structured output in a JSON format that you define. This is incredibly useful for programmatic workflows, such as automatically populating a content management system (CMS) or a video database.

It appears the request structure has been updated for clarity. The responseFormat object now directly contains the schema for the desired output.

prompt = """
Generate metadata for the video with the following fields:
- title: A creative and fitting title for the video.
- description: A short, one-paragraph summary.
- mood: The overall mood or feeling of the video.
- genre: The genre that best fits the video content.
"""

# Define the JSON schema for the desired output structure
json_schema = {
    "type": "object",
    "properties": {
        "title": {"type": "string", "description": "The title of the video"},
        "description": {"type": "string", "description": "A short summary of the video"},
        "mood": {"type": "string", "description": "The overall mood of the video"},
        "genre": {"type": "string", "description": "The genre that best fits the video"}
    },
    "required": ["title", "description", "mood", "genre"]
}

request_body = {
    "inputPrompt": prompt,
    "mediaSource": {"s3Location": {"uri": video_s3_uri}},
    "temperature": 0.5,
    "responseFormat": {
        "schema": json_schema # The schema is now passed directly
    }
}

response = bedrock_runtime_client.invoke_model(
    modelId="us.twelvelabs.pegasus-1-2-v1:0", # Use the correct model ID for your region
    body=json.dumps(request_body)
)

# The response body now contains a stringified JSON object
response_body = json.loads(response.get("body").read())
structured_metadata = json.loads(response_body["message"])

# Print the formatted JSON output
print(json.dumps(structured_metadata, indent=4))

What this code does:

This request tells Pegasus to generate specific metadata and format it according to the provided json_schema. The key change here is in the responseFormat object—the schema is now passed directly under the schema key, simplifying the request. The model will populate the titledescription, mood, and genre fields based on its analysis. The response is a clean, predictable JSON object that can be directly used in your application workflows, eliminating the need for fragile text parsing.


4.5 - Combine with Search Results

The true power of the TwelveLabs models on Bedrock is realized when you combine the outputs of Marengo and Pegasus. The structured metadata and summaries generated by Pegasus can be stored directly within your OpenSearch index alongside the Marengo vector embeddings.

When a user performs a search, your application can not only return the relevant video clip (found via Marengo's vector search) but also display a rich, contextual summary (generated by Pegasus). This creates a far more informative and complete user experience, turning a simple search result into a comprehensive piece of knowledge.


5 - Cleanup

As a best practice in any cloud environment, it's important to remove the resources you've created to avoid incurring ongoing costs. This section will guide you through decommissioning the components we set up in this tutorial.


5.1 - Delete the OpenSearch Serverless Index

First, we'll delete the OpenSearch Serverless index that stores our video embeddings. This will permanently remove the index and all the data within it. You can do this programmatically using the OpenSearch Python client.

# Assuming 'os_client' and 'INDEX_NAME' are already defined
try:
    response = os_client.indices.delete(index=INDEX_NAME)
    print(f"✅ Index '{INDEX_NAME}' deleted successfully.")
except Exception as e:
    print(f"❌ Error deleting index '{INDEX_NAME}': {e}")

This script sends a delete request to your OpenSearch Serverless collection for the specified index name.


5.2 - Empty the S3 Bucket

Next, let's clean up the S3 bucket by deleting the sample videos, the generated embeddings, and any query images you may have uploaded. The following script will list all objects in your bucket and delete them in a single batch operation.

# Assuming 's3_client' and 'S3_BUCKET_NAME' are already defined
try:
    # List all objects in the bucket
    response = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME)
    
    if 'Contents' in response:
        # Prepare the list of objects for deletion
        objects_to_delete = [{'Key': obj['Key']} for obj in response['Contents']]
        
        # Perform the delete operation
        s3_client.delete_objects(
            Bucket=S3_BUCKET_NAME,
            Delete={'Objects': objects_to_delete}
        )
        print(f"✅ Bucket '{S3_BUCKET_NAME}' emptied successfully.")
    else:
        print(f"✅ Bucket '{S3_BUCKET_NAME}' is already empty.")
except Exception as e:
    print(f"❌ Error emptying bucket '{S3_BUCKET_NAME}': {e}")

What this code does:

It first gets a list of all files in your bucket and then passes this list to the delete_objects API call, which efficiently removes all specified content.


5.3 - Stop Notebook Instances

Finally, if you were running this tutorial on an Amazon SageMaker Notebook Instance or an EC2 instance, remember to stop or terminate the instance from the AWS Management Console to prevent further charges.


6 - Next Steps and Extensions

Congratulations on building a fully functional video search and analysis pipeline! The concepts you've learned are the foundation for a wide range of powerful, real-world applications. Here are a few ideas to take your project to the next level:

  • Build a Bedrock Agent for Your Videos: Create a sophisticated conversational AI using Bedrock Agents. You can build an agent that orchestrates calls to both Marengo and Pegasus. This would allow a user to have a natural conversation with their video library, asking complex questions like, "Show me all the scenes from last year's conference where our CEO was on stage, and then generate a summary of their key points." Check out a great example here: https://github.com/garystafford/twelvelabs-bedrock-search-agent

  • Power a RAG Pipeline with Amazon Kendra: Integrate Marengo's vector embeddings with Amazon Kendra, AWS's intelligent search service. By using Marengo as a custom embedding provider, you can enable Kendra to perform semantic searches across your video library and use the results to ground a Large Language Model (LLM) for a Retrieval-Augmented Generation (RAG) system. This turns your video archive into a knowledge base you can query.

  • Automate Content Workflows with AWS Step Functions: Use AWS Step Functions to orchestrate a serverless workflow. For example, you could design a state machine that triggers whenever a new video is uploaded to S3. The workflow would automatically:

    1. Generate embeddings with Marengo.

    2. Index the embeddings in OpenSearch Serverless.

    3. Generate a summary and structured metadata with Pegasus.

    4. Store the Pegasus output in Amazon DynamoDB, linked to the video.

  • Create Dynamic Video Experiences: Use the highlight and chapter generation capabilities of Pegasus to create new forms of media. For example, you could feed the timestamps of highlights into AWS Elemental MediaTailor to dynamically insert ads or chapter markers, or use the data to generate a "trailer" or sizzle reel automatically.

Check out the full code in this repository: https://github.com/twelvelabs-io/tl-solutions-samples/tree/main/Workshops/TwelveLabs_Bedrock_Workshop


Essential Resources

Get Started Immediately:

Implementation Guides:

Developer Resources:

Overview

This tutorial is designed for developers and enterprise data teams looking to harness the power of video understanding within their AWS environments. Our goal is to provide a practical, hands-on guide to integrating TwelveLabs' cutting-edge video AI models—Marengo and Pegasus—through Amazon Bedrock. By the end of this tutorial, you will have built a powerful video analysis pipeline, enabling you to create searchable video libraries and generate rich, descriptive metadata automatically.

You will build a two-part Jupyter Notebook that demonstrates two core use cases:

  1. Multimodal Search: Use the Marengo Embed 2.7 model to generate multimodal vector embeddings from video files. These embeddings will be indexed in Amazon OpenSearch Serverless to create a scalable, fast, and accurate similarity search engine for your video content.

  2. Generative Video Analysis: Leverage the Pegasus 1.2 model to perform video-to-text tasks, such as creating summaries, generating chapters, and extracting structured metadata from the same set of videos.


Prerequisites

To follow this tutorial, you will need:

  • An AWS account with an IAM role configured with permissions for Amazon S3, Amazon Bedrock, and Amazon OpenSearch Serverless.

  • Model access enabled for TwelveLabs Marengo and Pegasus in your chosen AWS regions.

  • An S3 bucket to store your video files and model outputs.

  • A Python 3.10+ environment with boto3 and other required libraries installed.

  • A small collection of 5–10 short MP4 video files for analysis.


1 - Architecture & Key Concepts

The solution is architected around a serverless, event-driven workflow that is both scalable and cost-effective. The primary data flow for the multimodal search component is as follows:

This architecture allows you to process large video libraries efficiently. When a video is uploaded to an S3 bucket, an asynchronous call to Bedrock with the Marengo model is initiated. Bedrock processes the video and writes the resulting vector embeddings back to a designated S3 prefix. From there, the embeddings can be loaded into an Amazon OpenSearch Serverless index for low-latency similarity search.

Before we dive into the code, it's important to understand the two different types of models we'll be using:

  • Multimodal Embeddings (Marengo): An embedding is a numerical vector that represents complex data like a video, image, or text. Marengo transforms different types of media into a single, unified vector space. This is powerful because it means you can search for a visual moment in a video using a simple text query—the model understands the relationship between the words in your query and the pixels in the video. We will use this for building our search application.

  • Video-to-Text Generation (Pegasus): Pegasus is a generative model designed to "watch" a video and produce human-readable text about it. Unlike Marengo, its output is not a vector but rather descriptive text, such as a summary, a list of highlights, or answers to specific questions about the video's content.

A key practical consideration for AWS developers is region availability. The TwelveLabs models are deployed in specific AWS regions. Please refer to the list of region availability in the AWS documentation. For instance, Marengo is available in us-east-1, while Pegasus is both in us-east-1 and us-west-2. Important to note, developers must first enable the model in a specific region, but when invoking it through the Bedrock API, they must use the cross-region inference endpoint. The default Bedrock model ID cannot be used directly with Pegasus. Your code should be architected to handle these differences, which can be managed by initializing separate Boto3 clients for each required region and making cross-region calls when necessary.


2 - Environment Setup

Before we start building our video intelligence pipeline, we need to prepare our development environment. This involves installing the necessary Python libraries, configuring the AWS SDK to communicate with Bedrock and other services, and setting up a structured S3 bucket to store our files.


2.1 - Install Dependencies

First, you'll need to install several Python packages that our script will rely on. These libraries handle everything from interacting with AWS services to managing data and performing calculations.

You can install them all with a single pip command in your terminal:

Here’s a quick rundown of what each library does:

  • boto3: The official AWS SDK for Python. It's how we'll interact with services like Bedrock, S3, and OpenSearch.

  • opensearch-py: The official Python client for OpenSearch, which we'll use to index and search our video embeddings.

  • pandas & numpy: Powerful libraries for data manipulation and numerical operations. They are useful for handling the embedding vectors returned by Marengo.

  • scikit-learn: A machine learning library that provides helpful tools, including a function to calculate cosine similarity between vectors.

  • botocore: The low-level library that boto3 is built on. It provides core functionalities like handling credentials and AWS request signing.

Noted that pandas , numpy , and scikit-learn are not strictly required here, only necessary for working directly with the embeddings and calculating cosine similarity.


2.2 - Configure the AWS SDK

With the libraries installed, the next step is to configure our script to communicate with your AWS account. We'll use boto3 to create client objects for the services we need.

import boto3
import json

# Define the AWS Region where your Bedrock models are enabled
# For Marengo, this might be "us-east-1"
# For Pegasus, this might be "us-west-2"
region_name = "us-east-1" 

# Initialize a session with your default AWS profile
session = boto3.Session(profile_name="default", region_name=region_name)

# Create clients for Bedrock Runtime and S3
bedrock_runtime_client = session.client("bedrock-runtime")
s3_client = session.client("s3")

print("✅ AWS clients created successfully.")

What this code does:

This script sets up the connection to AWS.

  • First, we import the boto3 library.

  • We then establish a session using your "default" AWS credentials profile. Think of this like logging into your AWS account. We also specify the region_name where you've enabled the TwelveLabs models.

  • Finally, we create specialized client objects. s3_client is for interacting with your S3 buckets, and bedrock_runtime_client is for making calls to the Bedrock models themselves.

A key point for developers is the distinction between bedrock and bedrock-runtime. The bedrock client is for management tasks, like requesting model access or managing provisioned throughput. The bedrock-runtime client, which we use here, is specifically for invoking models to get inferences (like generating embeddings or text).


2.3 - S3 Bucket Layout

A well-organized S3 bucket is crucial for a clean and scalable architecture. For this tutorial, we recommend creating separate prefixes (which function like folders) for your raw videos, the embeddings generated by Marengo, and any optional images you might use for search.

Your S3 bucket structure should look like this:


This layout helps keep your data organized, making it easier to manage permissions and track costs associated with different parts of your workflow. In the following sections, our code will upload videos to the /videos/ prefix and configure Marengo to write its output to the /embeddings/ prefix.


3 - Part A – Multimodal Search with Marengo

Now that our environment is set up, let's dive into the first part of our project: building a powerful multimodal search engine. In this section, we will use the TwelveLabs Marengo model on Bedrock to convert our videos into vector embeddings. We will then store these embeddings in Amazon OpenSearch Serverless and build a search application that can find moments in videos using either a text description or a sample image.


3.1 - Upload Sample Videos

First, we need some videos to work with. To make things easy, we'll use a helper function to copy a few sample videos from the Netflix Open Content library directly into the S3 bucket you configured earlier.

import re

# A list of publicly available sample videos from Netflix
sample_videos = [
    's3://download.opencontent.netflix.com/TechblogAssets/CosmosLaundromat/encodes/CosmosLaundromat_2048x858_24fps_SDR.mp4',
    's3://download.opencontent.netflix.com/TechblogAssets/Meridian/encodes/Meridian_3840x2160_5994fps_SDR.mp4',
    's3://download.opencontent.netflix.com/TechblogAssets/Sparks/encodes/Sparks_4096x2160_5994fps_SDR.mp4'
]

# This client does not require AWS credentials to access public S3 buckets
public_s3_client = boto3.client('s3', config=botocore.client.Config(signature_version=botocore.UNSIGNED))

def parse_s3_uri(s3_uri: str) -> tuple[str, str]:
    """Helper function to split an S3 URI into bucket and key."""
    match = re.match(r'^s3://([^/]+)/(.+)$', s3_uri)
    if not match:
        raise ValueError(f"Invalid S3 URI format: {s3_uri}")
    return match.group(1), match.group(2)

def copy_public_s3_object(public_s3_uri: str, dest_bucket: str, dest_key: str):
    """Copies an object from a public S3 bucket to your private bucket."""
    source_bucket, source_key = parse_s3_uri(public_s3_uri)
    print(f"Downloading from {public_s3_uri}...")
    response = public_s3_client.get_object(Bucket=source_bucket, Key=source_key)
    
    print(f"Uploading to s3://{dest_bucket}/{dest_key} ...")
    s3_client.put_object(Bucket=dest_bucket, Key=dest_key, Body=response['Body'].read())
    print("✅ Copy completed successfully!")

# Loop through the sample videos and copy them to your S3 bucket
S3_BUCKET_NAME = "<your-bucket-name>" # Replace with your bucket name
S3_VIDEOS_PATH = "videos"

for video_uri in sample_videos:
    _, src_key = parse_s3_uri(video_uri)
    filename = src_key.split("/")[-1]
    dest_key = f"{S3_VIDEOS_PATH}/{filename}"
    copy_public_s3_object(
        public_s3_uri=video_uri,
        dest_bucket=S3_BUCKET_NAME,
        dest_key=dest_key
    )

What this code does:

This script automates the process of getting sample data. It defines a list of S3 URIs pointing to high-quality videos provided by Netflix. It then loops through this list, using a special boto3 client to download each video from its public location and upload it into the /videos/ prefix in your own S3 bucket.


3.2 - Synchronous vs. Asynchronous Embedding with Marengo

Marengo is designed to process a variety of media inputs, from large video files to short text queries. To optimize for these different use cases, Amazon Bedrock now provides two ways to invoke the model: asynchronously for large media and synchronously for real-time queries.


Asynchronous StartAsyncInvoke for Large-Scale Video Processing

For embedding large files like videos and audio, the asynchronous StartAsyncInvoke API remains the best practice. It allows you to submit a job to Bedrock and let it process in the background, which is ideal for batch-processing an entire video library without holding a connection open.

Think of it like dropping off film at a photo lab—you get a ticket (the invocation ARN) and can check on the status later to pick up the developed pictures (the embeddings). Our embed_video_async function from the previous section still uses this method and is the correct approach for video.

def embed_video_async(s3_uri: str, output_s3_bucket: str) -> str:
    """
    Starts an asynchronous job to generate embeddings for a video.
    
    Args:
        s3_uri (str): The S3 URI of the video to process.
        output_s3_bucket (str): The S3 bucket where results should be stored.

    Returns:
        str: The invocation ARN, which is like a job ID.
    """
    response = bedrock_runtime_client.start_async_invoke(
        modelId="twelvelabs.marengo-embed-2-7-v1:0",
        modelInput={
            "inputType": "video",
            "mediaSource": {"s3Location": {"uri": s3_uri}}
        },
        outputDataConfig={"s3OutputDataConfig": {
            "s3Uri": f"s3://{output_s3_bucket}/embeddings/"
        }}
    )
    return response["invocationArn"]

What this code does:

The embed_video_async function takes the S3 location of a video and submits it to the Marengo model (twelvelabs.marengo-embed-2-7-v1:0). We specify that the inputType is a video and point to its s3Location. We also tell Bedrock where to save the output: a JSON file in the /embeddings/ prefix of our bucket. The function returns an invocationArn, which we'll use to check the job status.


Synchronous InvokeModel for Real-Time Text and Image Queries

For small, low-latency tasks like embedding a user's text search query or a sample image, you can now use the synchronous InvokeModel API. This is a significant improvement for building responsive applications, as it returns the embedding directly in the API response without the need to poll for results.

Here is how you would create a function to embed a text query synchronously:

def embed_text_sync(query_text: str) -> list:
    """
    Generates an embedding for a text query using the synchronous InvokeModel API.

    Args:
        query_text (str): The text query to embed.

    Returns:
        list: The embedding vector for the text.
    """
    # Note: Ensure your bedrock_runtime_client is configured for a region where Marengo is available
    request_body = {
        "inputType": "text",
        "inputText": query_text
    }

    response = bedrock_runtime_client.invoke_model(
        modelId="twelvelabs.marengo-embed-2-7-v1:0",
        body=json.dumps(request_body),
        contentType="application/json",
        accept="application/json"
    )
    
    response_body = json.loads(response["body"].read())
    
    # The embedding is returned directly in the response
    return response_body["embedding"]

What this code does:

This function sends a request to the Marengo model using invoke_model.

  • We specify the inputType as text and provide the inputText.

  • Unlike the async call, the response from invoke_model contains the embedding directly. We parse the JSON response and return the embedding vector.

  • This synchronous approach eliminates the need for a separate polling function, simplifying the code and reducing latency for your application's search functionality. A similar function can be used for images by providing the mediaSource in the request.

By using the right tool for the job—StartAsyncInvoke for batch video processing and InvokeModel for real-time queries—you can build a more efficient and responsive video search architecture.


3.3 - Parse the Embedding JSON

Once Bedrock finishes processing the video, it will place an output.json file in the specified S3 location. This file contains a list of embedding vectors, where each vector represents a short segment of the video.

The structure of the output looks like this:

{
  "data": [
    {
      "embedding": [0.123, -0.045, 0.567, ...],
      "embeddingOption": "visual-text",
      "startSec": 0.0,
      "endSec": 5.0
    },
    {
      "embedding": [-0.234, 0.089, -0.112, ...],
      "embeddingOption": "visual-text",
      "startSec": 5.0,
      "endSec": 10.0
    }
    // ... more segments
  ]
}

Each object in the data array contains the embedding vector (a list of 1,024 numbers), the start and end time of the video segment in seconds (startSecendSec), and the type of embedding generated.


3.4 - Create an OpenSearch Serverless Index

To search these embeddings efficiently, we need a vector database. Amazon OpenSearch Serverless provides a fully managed solution with powerful k-Nearest Neighbor (k-NN) search capabilities.

Let's create an index configured for vector search.

# Note: You must create an OpenSearch Serverless client 'os_client' first.
# See the workshop file for details on creating the client with AWSV4SignerAuth.

INDEX_NAME = "video-search-index"

index_body = {
    "settings": { "index": { "knn": True } },
    "mappings": {
        "properties": {
            "embedding": {
                "type": "knn_vector",
                "dimension": 1024,
                "method": {
                    "engine": "faiss",
                    "name": "hnsw",
                    "space_type": "cosinesimil"
                }
            },
            "start_time": {"type": "float"},
            "end_time": {"type": "float"},
            "video_s3_uri": {"type": "keyword"}
        }
    }
}

os_client.indices.create(index=INDEX_NAME, body=index_body)
print(f"✅ Index '{INDEX_NAME}' created successfully.")

What this code does:

This script defines the schema for our search index.

  • "knn": True enables the k-NN functionality.

  • The embedding field is defined as a knn_vector of dimension 1024, matching Marengo's output.

  • We specify the faiss engine and hnsw algorithm, which are highly efficient for vector search.

  • space_type is set to cosinesimil, which tells OpenSearch to measure similarity based on the angle between vectors—a great choice for multimodal embeddings.


3.5 - Bulk-Index the Video Segments

With our index ready, we can now process our embedding files from S3 and load them into OpenSearch. This is typically done in bulk for efficiency. You would write a script to list all output.json files, parse them, and send the data to OpenSearch in batches.


3.6 - Query with Text and Image

This is where the magic happens. Because Marengo creates embeddings in a unified vector space, we can generate an embedding for a text query or an image and use it to find similar video segments.

Here’s a function for text-to-video search:

def search_videos_by_text(query_text: str, top_k: int = 3) -> list:
    """Searches for video segments using a text query."""
    # 1. Generate an embedding for the text query using Marengo
    print(f"Generating embedding for query: '{query_text}'")
    query_embedding_response = # Call Marengo with inputType: "text"
    query_embedding = query_embedding_response[0]["embedding"]

    # 2. Use the embedding to search OpenSearch
    search_body = {
        "query": { "knn": { "embedding": { "vector": query_embedding, "k": top_k } } },
        "_source": ["video_s3_uri", "start_time", "end_time"]
    }
    
    response = os_client.search(index=INDEX_NAME, body=search_body)
    return response['hits']['hits']

What this code does:

The function first calls Marengo to convert the query_text into a vector. It then uses this vector to perform a k-NN search on our OpenSearch index, asking for the top k most similar video segments. A similar function can be written for image search by calling Marengo with inputType: "image".


3.7 - Demo Search

Let's try it out. Imagine you want to find a specific scene in your video library.

query = "a car chase at night with neon lights"
search_results = search_videos_by_text(query)

# Print the top result
top_result = search_results[0]['_source']
video_url = top_result['video_s3_uri'] # You'd generate a presigned URL here
start_time = top_result['start_time']

print(f"Top match found in {video_url} at {start_time} seconds.")
# In a Jupyter Notebook, you could now display an HTML5 video player
# set to start at 'start_time'.

This would instantly find the video segment that best matches your description, demonstrating the power of multimodal search.


3.8 - Takeaways

In this section, you have successfully built a sophisticated video search engine using a completely serverless architecture. The key takeaways are:

  • Unified Vector Space: Marengo's ability to embed different modalities (video, text, image) into a single vector space is what enables powerful cross-modal search. You can search your videos with a sentence, not just keywords.

  • Scalable Architecture: By combining Bedrock's asynchronous API with Amazon OpenSearch Serverless, you have a pipeline that can scale to handle petabytes of video data without managing any servers.


4 - Part B – Rich Descriptions with Pegasus

In the first part of this tutorial, we built a powerful search application using Marengo's vector embeddings. Now, we will explore the generative capabilities of the TwelveLabs Pegasus model. Pegasus is designed to "watch" a video and generate rich, human-readable text, such as summaries, highlights, and even structured metadata. This allows you to move beyond search and unlock a deeper, more contextual understanding of your video content.


4.1 - Choose a Video for Analysis

We'll continue working with the same videos we uploaded to our S3 bucket in Part A. First, let's select one of them to analyze with Pegasus. You can list the objects in your /videos/ prefix and pick one.

# Assuming s3_client and S3_BUCKET_NAME are already configured
S3_VIDEOS_PATH = "videos"

# List videos in your S3 bucket
response = s3_client.list_objects_v2(
    Bucket=S3_BUCKET_NAME, 
    Prefix=f"{S3_VIDEOS_PATH}/"
)

if 'Contents' in response and len(response['Contents']) > 1:
    # Select the first video file for analysis (skipping the folder object)
    video_s3_key = response['Contents'][1]['Key']
    video_s3_uri = f"s3://{S3_BUCKET_NAME}/{video_s3_key}"
    print(f"✅ Selected for analysis: {video_s3_uri}")
else:
    print("❌ No videos found in the specified S3 path.")

This code snippet lists the files in your videos directory and selects the first one to work with, printing its S3 URI for confirmation.


4.2 - Get a Quick Analysis with InvokeModel

The simplest way to interact with Pegasus is through a synchronous invoke_model call. This is ideal for quick questions where you expect a relatively short, immediate answer. Let's ask Pegasus for a basic description of our selected video.

# Note: Ensure your bedrock_runtime_client is configured for a region where Pegasus is available (e.g., 'us-west-2')
prompt = "What is the video about?"

request_body = {
    "inputPrompt": prompt,
    "mediaSource": {
        "s3Location": {"uri": video_s3_uri}
    },
    "temperature": 0
}

response = bedrock_runtime_client.invoke_model(
    modelId="us.twelvelabs.pegasus-1-2-v1:0", # Use the correct model ID for your region
    body=json.dumps(request_body),
    contentType="application/json",
    accept="application/json"
)

# Parse the response to get the generated text
response_body = json.loads(response["body"].read())
print(response_body["message"])

What this code does:

This function sends a request to the Pegasus model.

  • The inputPrompt contains our question.

  • mediaSource points to the video's location in S3.

  • temperature: 0 is set to ensure the model provides the most factual, deterministic answer possible. A higher temperature (e.g., 0.7) would produce more creative but less predictable results.

  • The invoke_model call blocks until the analysis is complete and returns the full response, which we then parse to extract and print the message.

For longer videos or when generating extensive text, it's better to use invoke_model_with_response_stream. This method returns the response in chunks as it's being generated, improving the user experience by showing results faster.


4.3 - Generate Summaries, Hashtags, and Highlights

Pegasus excels at creating different types of descriptive content based on simple, natural language prompts. You don't need complex instructions; just ask for what you want.

Here are a few examples of prompts you can use to generate useful metadata:

  • For a summary"Summarize the video"

  • For hashtags"Generate relevant hashtags for the video"

  • For highlights"What are the highlighted moments of this video?"

Each of these prompts can be used in the request_body of an invoke_model or invoke_model_with_response_stream call to produce concise summaries, SEO-friendly tags, or a chronological list of key events in the video.


4.4 - Get Structured Metadata with Pegasus

A powerful feature of Pegasus on Bedrock is its ability to return structured output in a JSON format that you define. This is incredibly useful for programmatic workflows, such as automatically populating a content management system (CMS) or a video database.

It appears the request structure has been updated for clarity. The responseFormat object now directly contains the schema for the desired output.

prompt = """
Generate metadata for the video with the following fields:
- title: A creative and fitting title for the video.
- description: A short, one-paragraph summary.
- mood: The overall mood or feeling of the video.
- genre: The genre that best fits the video content.
"""

# Define the JSON schema for the desired output structure
json_schema = {
    "type": "object",
    "properties": {
        "title": {"type": "string", "description": "The title of the video"},
        "description": {"type": "string", "description": "A short summary of the video"},
        "mood": {"type": "string", "description": "The overall mood of the video"},
        "genre": {"type": "string", "description": "The genre that best fits the video"}
    },
    "required": ["title", "description", "mood", "genre"]
}

request_body = {
    "inputPrompt": prompt,
    "mediaSource": {"s3Location": {"uri": video_s3_uri}},
    "temperature": 0.5,
    "responseFormat": {
        "schema": json_schema # The schema is now passed directly
    }
}

response = bedrock_runtime_client.invoke_model(
    modelId="us.twelvelabs.pegasus-1-2-v1:0", # Use the correct model ID for your region
    body=json.dumps(request_body)
)

# The response body now contains a stringified JSON object
response_body = json.loads(response.get("body").read())
structured_metadata = json.loads(response_body["message"])

# Print the formatted JSON output
print(json.dumps(structured_metadata, indent=4))

What this code does:

This request tells Pegasus to generate specific metadata and format it according to the provided json_schema. The key change here is in the responseFormat object—the schema is now passed directly under the schema key, simplifying the request. The model will populate the titledescription, mood, and genre fields based on its analysis. The response is a clean, predictable JSON object that can be directly used in your application workflows, eliminating the need for fragile text parsing.


4.5 - Combine with Search Results

The true power of the TwelveLabs models on Bedrock is realized when you combine the outputs of Marengo and Pegasus. The structured metadata and summaries generated by Pegasus can be stored directly within your OpenSearch index alongside the Marengo vector embeddings.

When a user performs a search, your application can not only return the relevant video clip (found via Marengo's vector search) but also display a rich, contextual summary (generated by Pegasus). This creates a far more informative and complete user experience, turning a simple search result into a comprehensive piece of knowledge.


5 - Cleanup

As a best practice in any cloud environment, it's important to remove the resources you've created to avoid incurring ongoing costs. This section will guide you through decommissioning the components we set up in this tutorial.


5.1 - Delete the OpenSearch Serverless Index

First, we'll delete the OpenSearch Serverless index that stores our video embeddings. This will permanently remove the index and all the data within it. You can do this programmatically using the OpenSearch Python client.

# Assuming 'os_client' and 'INDEX_NAME' are already defined
try:
    response = os_client.indices.delete(index=INDEX_NAME)
    print(f"✅ Index '{INDEX_NAME}' deleted successfully.")
except Exception as e:
    print(f"❌ Error deleting index '{INDEX_NAME}': {e}")

This script sends a delete request to your OpenSearch Serverless collection for the specified index name.


5.2 - Empty the S3 Bucket

Next, let's clean up the S3 bucket by deleting the sample videos, the generated embeddings, and any query images you may have uploaded. The following script will list all objects in your bucket and delete them in a single batch operation.

# Assuming 's3_client' and 'S3_BUCKET_NAME' are already defined
try:
    # List all objects in the bucket
    response = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME)
    
    if 'Contents' in response:
        # Prepare the list of objects for deletion
        objects_to_delete = [{'Key': obj['Key']} for obj in response['Contents']]
        
        # Perform the delete operation
        s3_client.delete_objects(
            Bucket=S3_BUCKET_NAME,
            Delete={'Objects': objects_to_delete}
        )
        print(f"✅ Bucket '{S3_BUCKET_NAME}' emptied successfully.")
    else:
        print(f"✅ Bucket '{S3_BUCKET_NAME}' is already empty.")
except Exception as e:
    print(f"❌ Error emptying bucket '{S3_BUCKET_NAME}': {e}")

What this code does:

It first gets a list of all files in your bucket and then passes this list to the delete_objects API call, which efficiently removes all specified content.


5.3 - Stop Notebook Instances

Finally, if you were running this tutorial on an Amazon SageMaker Notebook Instance or an EC2 instance, remember to stop or terminate the instance from the AWS Management Console to prevent further charges.


6 - Next Steps and Extensions

Congratulations on building a fully functional video search and analysis pipeline! The concepts you've learned are the foundation for a wide range of powerful, real-world applications. Here are a few ideas to take your project to the next level:

  • Build a Bedrock Agent for Your Videos: Create a sophisticated conversational AI using Bedrock Agents. You can build an agent that orchestrates calls to both Marengo and Pegasus. This would allow a user to have a natural conversation with their video library, asking complex questions like, "Show me all the scenes from last year's conference where our CEO was on stage, and then generate a summary of their key points." Check out a great example here: https://github.com/garystafford/twelvelabs-bedrock-search-agent

  • Power a RAG Pipeline with Amazon Kendra: Integrate Marengo's vector embeddings with Amazon Kendra, AWS's intelligent search service. By using Marengo as a custom embedding provider, you can enable Kendra to perform semantic searches across your video library and use the results to ground a Large Language Model (LLM) for a Retrieval-Augmented Generation (RAG) system. This turns your video archive into a knowledge base you can query.

  • Automate Content Workflows with AWS Step Functions: Use AWS Step Functions to orchestrate a serverless workflow. For example, you could design a state machine that triggers whenever a new video is uploaded to S3. The workflow would automatically:

    1. Generate embeddings with Marengo.

    2. Index the embeddings in OpenSearch Serverless.

    3. Generate a summary and structured metadata with Pegasus.

    4. Store the Pegasus output in Amazon DynamoDB, linked to the video.

  • Create Dynamic Video Experiences: Use the highlight and chapter generation capabilities of Pegasus to create new forms of media. For example, you could feed the timestamps of highlights into AWS Elemental MediaTailor to dynamically insert ads or chapter markers, or use the data to generate a "trailer" or sizzle reel automatically.

Check out the full code in this repository: https://github.com/twelvelabs-io/tl-solutions-samples/tree/main/Workshops/TwelveLabs_Bedrock_Workshop


Essential Resources

Get Started Immediately:

Implementation Guides:

Developer Resources: