Partnerships

Partnerships

Partnerships

Leveraging RAG for Improved Video Processing Times with TwelveLabs and Weaviate

James Le

James Le

James Le

In this post, we'll explore how to combine Twelve Labs' video understanding capabilities with Weaviate's vector database to create an efficient RAG system for video content.

In this post, we'll explore how to combine Twelve Labs' video understanding capabilities with Weaviate's vector database to create an efficient RAG system for video content.

Join our newsletter

Receive the latest advancements, tutorials, and industry insights in video understanding

Mar 18, 2025

Mar 18, 2025

Mar 18, 2025

25 Min

25 Min

25 Min

Copy link to article

Copy link to article

Copy link to article

Big thanks to Tuana Celik and Erika Cardenas from the Weaviate team for reviewing the draft!


Video processing is computationally expensive and time-consuming, especially when analyzing long-form content. Retrieval-Augmented Generation (RAG) offers a solution by enabling systems to process only the most relevant video segments rather than entire videos. This targeted approach significantly reduces processing time while maintaining or improving response quality.

In this post, we'll explore how to combine Twelve Labs' video understanding capabilities with Weaviate's vector database to create an efficient RAG system for video content. By segmenting videos and using embeddings to retrieve only the most relevant portions for analysis, we can significantly improve processing times while maintaining or even enhancing accuracy.

Our approach leverages several key technologies:

  • TwelveLabs Pegasus and Marengo models for video understanding and embedding generation

  • Weaviate vector database for efficient storage and retrieval of video segments

  • Open source LLaVA-NeXT-Video model as a comparison point for video analysis

We'll demonstrate how this RAG-based approach can reduce the computational load of video processing by focusing only on the most relevant segments, making it possible to analyze longer videos more efficiently. Whether you're building applications for content moderation, sports analysis, or educational content, this approach can help you scale your video processing capabilities while maintaining high-quality results.


1 - Set Up TwelveLabs and Weaviate


TwelveLabs

If you have not already signed up for Twelve Labs you can do so here. Once you have your account set up, go to the Playground, click on your user icon in the top right corner of the screen, and go to API Key.

In your notebook click the key icon on the left and create a secret with this value as TL_API_KEY.


Weaviate

If you do not have a Weaviate account, you can sign up here. Once you have an account, go to the cloud dashboard and create a new cluster. Once you have your cluster set up, you’ll need to populate two values in your notebook secrets section.

Add URL under REST Endpoint to a WEAVIATE_URL variable. Copy the Admin key under API Keys and save it to WEAVIATE_API_KEY.


2 - Choose a GPU Runtime

We’ll need a GPU to run the LLaVA-NeXT-Video model. In your notebook go to Runtime > Change runtime type and select a T4 GPU.


3 - Setting Up Our Environment


Install Dependencies

First, we need to install the TwelveLabs and Weaviate SDKs:

!python -m pip install -U -q twelvelabs
!python -m pip install -U -q "weaviate-client>=4.0.0"

We’ll then install the remaining dependencies.

!python -m pip install torch
!python -m pip install -q av
!python -m pip install --upgrade -q accelerate 
!python -m pip install -U bitsandbytes
!python -m pip install git

!python -m pip install pillow
!python -m pip install sentencepiece
!python -m


Set Up TwelveLabs and Weaviate SDKs
from google.colab import userdata

TL_API_KEY=userdata.get('TL_API_KEY')
weaviate_url = userdata.get("WEAVIATE_URL")
weaviate_api_key = userdata.get("WEAVIATE_API_KEY")

We’ll then initialize the TwelveLabs Client.

from twelvelabs import TwelveLabs

# Initialize the Twelve Labs client
twelve_labs_client = TwelveLabs(api_key=TL_API_KEY)

Finally, we’ll set up our Weaviate client and initialize a Video_Embeddings collection.

import weaviate
from weaviate.classes.init import Auth

# Connect to Weaviate Cloud
weaviate_client = weaviate.connect_to_weaviate_cloud(
    cluster_url=weaviate_url,
    auth_credentials=Auth.api_key(weaviate_api_key),
)

# Get or create collection
try:
    collection = weaviate_client.collections.get("Video_Embeddings")
except:
    collection = weaviate_client.collections.create(name="Video_Embeddings")


Setting Up Our Video Data

Now, we need to get our video data for embedding. You can find the video data in a Google Drive folder using this link. Copy it to a folder called "TwelveLabs-Weaviate" in your base Google Drive folder. We'll use the following cell to mount your drive and give our notebook access to the video files.

from google.colab import drive
drive.mount('/content/drive')
base_folder_path = "/content/drive/MyDrive/TwelveLabs-Weaviate"
raw_video_dir = base_folder_path + "/sports_videos"

upscaled_video_dir = base_folder_path + "/upscaled_videos/"
video_segments_dir = base_folder_path + "/video_segments/"


Upscaling Our Videos

Some of our videos are too low resolution for our embeddings model. We’ll need to upscale them before using them.

We’ll create the upscale function here. read_video_pyav comes directly from the LLaVa-NeXT-Video collab notebook and it formats videos in the correct numpy representation for inference.

import av
import numpy as np

def upscale_video(input_file, output_file, target_width=1280, target_height=720):
    input_container = av.open(input_file)
    output_container = av.open(output_file, mode='w')

    input_stream = input_container.streams.video[0]
    output_stream = output_container.add_stream('libx264', rate=input_stream.average_rate)
    output_stream.width = target_width
    output_stream.height = target_height
    output_stream.pix_fmt = 'yuv420p'

    for frame in input_container.decode(input_stream):
        frame = frame.reformat(width=target_width, height=target_height)
        packet = output_stream.encode(frame)
        output_container.mux(packet)

    # Flush the encoder
    packet = output_stream.encode(None)
    output_container.mux(packet)

    # Close the containers
    input_container.close()
    output_container.close()

def read_video_pyav(container, indices):
    '''
    Decode the video with PyAV decoder.

    Args:
        container (av.container.input.InputContainer): PyAV container.
        indices (List[int]): List of frame indices to decode.

    Returns:
        np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3).
    '''
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)
    return np.stack([x.to_ndarray(format="rgb24") for x in frames])

We’ll take the videos in our raw_video_dir , upscale them, and save them to upscaled_video_dir.

# Create output directory if it doesn't exist
if not os.path.exists(upscaled_video_dir):
    os.makedirs(upscaled_video_dir)

# Iterate over all files in the raw video directory
for filename in os.listdir(raw_video_dir):
    
    # Check if the file is a video file
    if filename.endswith(".mp4"):
        print(filename)
        # Get the file name without extension
        input_file_no_ext = os.path.splitext(filename)[0]
        # Define the output file name
        output_file = f"{input_file_no_ext}_480.mp4"
        if output_file in os.listdir(upscaled_video_dir):
            continue
        # Define the full path for the input and output files
        input_file_path = os.path.join(raw_video_dir, filename)
        output_file_path = os.path.join(upscaled_video_dir, output_file)
        # Upscale the video
        upscale_video(input_file_path, output_file_path)


4 - Comparing Pegasus and LLaVa-NeXT-Video on a Single Video

Pegasus and LLaVa-NeXT-Video are both video understanding models that allow us to take a video and ask questions about it.

We’ll start by comparing Pegasus and LLaVa-NeXT-Video on a single video from our video collection. The video shows a sequence from Super Bowl XLII where the New York Giants and playing the New England Patriots. It is a famous catch called the “Helmet Catch” where Eli Manning, the Giants Quarterback, throws the ball to the Giants receiver, David Tyree, who successfully catches the ball against his helmet in the final two minutes of the game.

Now that we have context of the video, we will determine how well our two models are able to understand the video when asked “What is happening in this video?”.


Using Pegasus to Chat with our Video

Before we start, we need to set up a Pegasus index to store our video.

models = [
        {
            "name": "pegasus1.2",
            "options": ["visual"]
        }
    ]

index_name = "sports_videos"
indices_list = twelve_labs_client.index.list(name=index_name)

if len(indices_list) == 0:
    index = twelve_labs_client.index.create(
        name=index_name,
        models=models

    )
    print(f"A new index has been created: id={index.id} name={index.name} models={index.models}")
else:
    index = indices_list[0]
    print(f"Index already exists: id={index.id} name={index.name} models={index.models}")

We then create a function to upload our video to the index. This will return a Pegasus Video ID which we can use to ask questions with our video.

# Monitor the status of the video task
def on_task_update(task):
    print(f"  Status={task.status}")
    
def upload_video_to_twelve_labs_pegasus(video_path):
    task = twelve_labs_client.task.create(
        index_id=index.id,
        file = video_path
    )
    print(f"Task created: id={task.id} status={task.status}")

    task.wait_for_done(sleep_interval=5, callback=on_task_update)

    if task.status != "ready":
      raise RuntimeError(f"Indexing failed with status {task.status}")
    print(f"The unique identifer of your video is {task.video_id}.")
    return task.video_id

We’ll upload our video and save the Pegasus Video ID to single_video_id.

# Define the video file path
single_video_file = upscaled_video_dir + "football_480.mp4"

single_video_id = upload_video_to_twelve_labs_pegasus(single_video_file)

We’ll ask “What is going on in this video? Please be concise.” to ensure that Pegasus really understand the video.

single_video_query = "What is going on in this video? Please be concise."

res = twelve_labs_client.generate.text(
  video_id=single_video_id,
  prompt=single_video_query
)
print(f"{res.data}")

Pegasus responds with this message:

The video showcases a pivotal moment in a football game between the New York Giants and the New England Patriots. Eli Manning, the Giants' quarterback, throws a pass that David Tyree catches spectacularly by pinning the ball against his helmet as he falls out of bounds. Multiple angles replay the catch, emphasizing its difficulty and precision. Tyree briefly celebrates after the play, and the video ends with him and other players walking off the field.

We can see from the response that Pegasus has a deep understanding of the video. It understands that this is a football game between the Giants and Patriots. It also understands that Eli Manning threw the ball, David Tyree caught it, and that it is a pivotal moment in the game.

Pegasus does not mention that it is the Super Bowl, so we’ll ask to make sure.

res = twelve_labs_client.generate.text(
  video_id=single_video_id,
  prompt="What game is this?"
)
print(f"{res.data}")

Pegasus responds with This is the Super Bowl XLII game. , which is correct.

Now, lets see how well LLaVa-NeXT-Video understands the video.


Using LLaVa-NeXT-Video to Chat with our Video

For LLaVa-NeXT-Video, we need to prepare our video data in a specific format before inference. This involves sampling frames uniformly throughout the video, as the model doesn't process the entire video stream at once. We'll create a sampling function that extracts 40 evenly distributed frames from each video, ensuring we capture the key moments throughout the content. This sampling approach is adapted from the official LLaVA-NeXT-Video implementation. After sampling, we'll load the model from Hugging Face Hub, format our inputs according to the model's requirements, and run inference to generate responses to our queries.

def sample_video(video_path, num_samples=8):
    container = av.open(video_path)

    # sample uniformly num_samples frames from the video
    total_frames = container.streams.video[0].frames
    indices = np.arange(0, total_frames, total_frames / num_samples).astype(int)
    
    sampled_frames = read_video_pyav(container, indices)
    
    return sampled_frames
    
sampled_video = sample_video(single_video_file, num_samples=40)

Once the video is properly sampled, we’ll set up our model.

from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor
import torch

quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16
)

llava_next_processor = LlavaNextVideoProcessor.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf")
llava_next_model = LlavaNextVideoForConditionalGeneration.from_pretrained(
    "llava-hf/LLaVA-NeXT-Video-7B-hf",
    quantization_config=quantization_config,
    device_map='auto'
)

Then we’ll create a function to query our model.

def query_llava_next(query,model,processor,sampled_video):

    # Each "content" is a list of dicts and you can add image/video/text modalities
    conversation = [
        {
            "role": "user",
            "content": [
                {"type": "text", "text": query},
                {"type": "video"},
                ],
        },
    ]

    prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
    # prompt_len = len(prompt)

    inputs = processor([prompt], videos=[sampled_video], padding=True, return_tensors="pt").to(model.device)

    generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9}

    output = model.generate(**inputs, **generate_kwargs)
    generated_text = processor.batch_decode(output, skip_special_tokens=True)

    return generated_text[0]

Finally, we’ll ask the same questions to compare the output to Pegasus.

llava_next_result = query_llava_next(single_video_query,llava_next_model,llava_next_processor,sampled_video)
print(llava_next_result)

We get this response:

What is happening in this video? Be concise ASSISTANT: The video shows a football game in progress, with various players on the field. It appears to be the Super Bowl III between the New York Giants and the New England Patriots, judging by the jersey numbers and the old-fashioned helmets worn by some players. One player is in mid-action, grabbing the ball and getting tackled by another player, while a referee is signaling a first down. There are also coaches and other game

While this model does recognize that there is a football game happening between the Giants and the Patriots, it incorrectly identifies the game as Super Bowl III. It also misses the most important part of the video - the helmet catch.

We’ll ask the same “what game is this?” question that we asked before. We see that the model is closer, but still incorrect.

llava_next_result = query_llava_next("what game is this?",llava_next_model,llava_next_processor,sampled_video)
print(llava_next_result)

USER: what game is this? ASSISTANT: The image you've provided is of a football game in progress, specifically from Super Bowl XLI. It is the New England Patriots versus the Giants. The players in the image are Giants and Patriots.


5 - RAG for Segment-Level Queries on a Single Video

Our comparison shows that Pegasus delivers superior results when analyzing complete videos, offering more accurate and coherent responses with faster processing times.

However, we can likely improve the performance of our models by narrowing their focus to only the most relevant video segments. This is where Retrieval-Augmented Generation (RAG) becomes valuable - instead of processing entire videos, we can identify and analyze only the segments that contain information relevant to a specific query.

To implement this approach, we'll leverage TwelveLabs' Marengo model, which specializes in creating high-quality embeddings that capture the semantic content of video segments. These embeddings allow us to:

  1. Index each segment of a video independently.

  2. Match user queries to the most relevant segments.

  3. Process only those specific segments with our video understanding models.

Let's begin by dividing our video into segments and generating embeddings for each one using the Marengo model. These embeddings will serve as the foundation of our RAG system.


Using Marengo to Create Full Video and Video Clip Embeddings

We set our segment length to 10 seconds - the maximum that Marengo allows.

# Define the video segment length
segment_length = 10

We’ll then use Marengo to embed our video. Note: we set video_embedding_scopes=["clip", "video"] and video_clip_length=segment_lengthto ensure that Marengo returns embeddings for our entire video along with embeddings for each 10 second clip in the video.

task = twelve_labs_client.embed.task.create(
    model_name="Marengo-retrieval-2.7",
    video_file=single_video_file,
    video_clip_length=segment_length,
    video_embedding_scopes=["clip", "video"]
)
print(
    f"Created task: id={task.id} model_name={task.model_name} status={task.status}"
)

# Monitor the status of the video embedding task
status = task.wait_for_done(
    sleep_interval=2,
    callback=on_task_update
)
print(f"Embedding done: {status}")

Once the embedding is done, we can save the Marengo Task ID to retrieve those embeddings when we need them. We’ll store our Task ID in marengo_task_ids for later use when populating our Weaviate databse.

single_video_task_id = task.id

marengo_task_ids = {}

single_video_file_name = single_video_file.split("/")[-1]
marengo_task_ids[single_video_file_name] = single_video_task_id


Prepare Video Segments for RAG

To create an efficient RAG pipeline, we want to associate the Pegasus Video ID with the Marengo Task ID in our database. This will allow us to chat with that video segment when it is returned in our vector search. To do this we will also upload each segment of the video to Pegasus to be indexed.

First, we will create a split_video function to split the video into 10 second segments to be uploaded to Pegasus. We also need to ensure that each segment is over 4 seconds long - the minimum for Pegasus. We do this by including some overlap between the last two clips if the last clip is less than 5 seconds long.

import os
import subprocess
import json
    
def split_video(input_path, output_dir, segment_duration=10):
    """
    Split a video into segments of the specified duration.
    Regular segments will be exactly segment_duration seconds.
    The last segment will be at least 5 seconds long, potentially overlapping
    with the previous segment if needed.
    
    Args:
        input_path: Path to the input video file
        output_dir: Directory to save the output segments
        segment_duration: Duration of each segment in seconds (default: 10)
    """

    # Minimum length for the last segment
    min_last_segment_len = 5
    
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Get base filename without extension
    base_name = os.path.splitext(os.path.basename(input_path))[0]
    
    # Get video duration using ffprobe
    probe_cmd = [
        "ffprobe", "-v", "quiet", "-print_format", "json",
        "-show_format", input_path
    ]
    
    try:
        probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True)
        video_info = json.loads(probe_result.stdout)
        duration = float(video_info["format"]["duration"])
    except Exception as e:
        print(f"Error getting video duration: {e}")
        return 0
    
    # Calculate number of full segments
    num_full_segments = int(duration / segment_duration)
    
    # Calculate remaining duration
    remaining_duration = duration - (num_full_segments * segment_duration)
    
    # Determine total number of segments and if we need to adjust the last segment
    if remaining_duration > 0:
        if remaining_duration < min_last_segment_len:
            # Last segment would be too short, so we'll adjust its start time
            num_segments = num_full_segments + 1
            needs_adjustment = True
        else:
            # Last segment is already long enough
            num_segments = num_full_segments + 1
            needs_adjustment = False
    else:
        # No remaining duration, all segments are complete
        num_segments = num_full_segments
        needs_adjustment = False
    
    print(f"Video {base_name} is {duration:.2f} seconds long")
    print(f"Creating {num_segments} segments")
    
    # Create each segment
    for i in range(num_segments):
        # For regular segments, start at the segment boundary
        if i < num_full_segments:
            start_time = i * segment_duration
            actual_duration = segment_duration
        else:
            # This is the last segment
            if needs_adjustment:
                # Start earlier to ensure it's at least min_last_segment_len seconds
                start_time = duration - min_last_segment_len
                actual_duration = min_last_segment_len
            else:
                # Last segment is already long enough
                start_time = i * segment_duration
                actual_duration = remaining_duration
        
        output_path = os.path.join(output_dir, f"{base_name}_segment_{i:03d}.mp4")
        
        # For all segments, use copy mode for speed
        cmd = [
            "ffmpeg", "-y",
            "-ss", str(start_time),
            "-i", input_path,
            "-t", str(actual_duration),
            "-c:v", "copy",
            "-c:a", "copy",
            output_path
        ]
        
        result = subprocess.run(cmd, capture_output=True, text=True)
        
        if result.returncode != 0:
            print(f"Error creating segment {i+1}: {result.stderr[:100]}...")
        else:
            end_time = start_time + actual_duration
            if i == num_segments - 1 and needs_adjustment:
                print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s (adjusted to ensure at least {min_last_segment_len}s)")
            else:
                print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s")
    
    print(f"Successfully split {base_name} into {num_segments} segments")
    return num_segments

We’ll save our videos to a new video_segments_dir

split_video(single_video_file, video_segments_dir,segment_length)

We’ll then create pegasus_video_ids - a dictionary mapping file names with Pegasus Video IDs and add the Video ID for our full video.

pegasus_video_ids = {}

fname = single_video_file.split("/")[-1]
pegasus_video_ids[fname] = single_video_id

We’ll then upload our video segments to Pegasus and populate pegasus_video_ids with their video IDs.

segment_video_files = [f for f in os.listdir(video_segments_dir) if f.endswith(('.mp4'))]

# Process each video
for segment_video_file in segment_video_files:
    if segment_video_file in pegasus_video_ids:
        print("skip file",segment_video_file)
        continue
    print("processing file",segment_video_file)
    try:
        video_id = upload_video_to_twelve_labs_pegasus(video_segments_dir+segment_video_file)
        pegasus_video_ids[segment_video_file] = video_id
    except:
        print("error",segment_video_file)
        continue

Finally we need to segment all of our videos for efficient use with the LLaVa-NeXT-Video model.

sampled_video_files = {}

for video_file in os.listdir(video_segments_dir):
    print(video_file)
    sampled_video = sample_video(video_segments_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video

for video_file in os.listdir(upscaled_video_dir):
    print(video_file)
    sampled_video = sample_video(upscaled_video_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video


Uploading Embeddings to Weaviate

Weaviate expects the metadata records and embeddings vectors to be separate when uploading to a collection. We’ll create prepare_marengo_embeddings_for_weaviate to take our Marengo Task IDs and our Pegasus Video IDs and prepare the records and vectors for upload.

def prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids):

    # Prepare data for Weaviate upload
    records = []
    vectors = []

    for video_file_name in marengo_task_ids.keys():

        

        marengo_task_id = marengo_task_ids[video_file_name]

        # Retrieve marengo full video and clip embeddings
        marengo_embeddings_result = twelve_labs_client.embed.task.retrieve(marengo_task_id)


        #track segment number to match with fiel
        segment_number = 0

        for segment in marengo_embeddings_result.video_embedding.segments:
            # Determine if this is a video or clip segment
            is_video = segment.embedding_scope == "video"


            #Update the file name if segment
            updated_file_name = video_file_name
            if not is_video:
                updated_file_name = updated_file_name.replace(".mp4",f"_segment_{segment_number:03d}.mp4")
                segment_number += 1

            video_name = video_file_name.replace(".mp4","")
            
            pegasus_video_id = None
            if updated_file_name in pegasus_video_ids:
                pegasus_video_id = pegasus_video_ids[updated_file_name] 

            record = {
                'video_name':video_name,
                'segment_number': 0 if is_video else segment_number,
                'video_file': updated_file_name,
                'start_time': getattr(segment, 'start_offset_sec', 0),
                'end_time': getattr(segment, 'end_offset_sec', 0),
                'type': 'video' if is_video else 'clip',
                'task_id': marengo_task_id,
                'pegasus_video_id': pegasus_video_id
            }
            
            # Get the embedding vector
            embedding_vector = [float(x) for x in segment.embeddings_float]
            
            # Add to our lists
            records.append(record)
            vectors.append(embedding_vector)

    # Print summary
    print(f"Prepared {len(records)} segments for upload to Weaviate")
    print(f"- Video embeddings: {sum(1 for r in records if r['type'] == 'video')}")
    print(f"- Clip embeddings: {sum(1 for r in records if r['type'] == 'clip')}")

    return records, vectors

We’ll then use our function to get the records and vectors to upload to Weaviate.

records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids)

with collection.batch.dynamic() as batch:
    for i, record in enumerate(records):
        batch.add_object(
            properties=record,
            vector=vectors[i]
        )

print(f"Added {len(records)} embeddings to Weaviate")


Testing Vector Search

Now that we have everything in the collection, we can test and see that a vector search on Weaviate returns the correct video. We're using Weaviate’s near_vector search, so if we search with a video's vector it should return a distance of zero to itself.

We'll search using vector #5 from our collection, which should return the corresponding video segment with a distance of zero.

from weaviate.classes.query import MetadataQuery, Filter

# Use a specific vector for the query
query_vector = vectors[5]

# Perform vector search
response = collection.query.near_vector(
    near_vector=query_vector,
    limit=1,  # Increased limit to get more results
    return_metadata=MetadataQuery(distance=True),
)

print(f"Found {len(response.objects)} results for vector search")
for obj in response.objects:
    print(f"Video: {obj.properties['video_file']}, Type: {obj.properties['type']}")
    if 'segment_id' in obj.properties:
        print(f"Segment: {obj.properties['segment_id']}")
    if 'text' in obj.properties and obj.properties['text']:
        print(f"Text: {obj.properties['text']}")
    print(f"Distance: {obj.metadata.distance}")
    print("-" * 50)

This output confirms our embeddings were properly stored and can be accurately retrieved.



Retrieving Relevant Video Segments for RAG

The core of our RAG pipeline is the ability to match user questions with the most relevant video segments. This process works in three key steps:

  1. We use TwelveLabs' Marengo model to convert the user's text query into a vector embedding

  2. We search Weaviate for video segment embeddings that are most similar to our query embedding

  3. Once we identify the most relevant video segment, we use its associated Pegasus video ID to generate accurate responses specific to that segment

This targeted approach allows us to process only the most relevant portions of video content, significantly improving both efficiency and response quality.

First, we use Marengo to embed our text query.

sample_question = "What technique did David Tyree use to catch the ball?"

embedding = twelve_labs_client.embed.create(
    model_name="Marengo-retrieval-2.7",
    text=sample_question,
    text_truncate="start",
)

query_vector = embedding.text_embedding.segments[0].embeddings_float

Then we find the most relevant clip. We use filters=(Filter.by_property("type").equal("clip")) to return just the clip embeddings, ignoring the full video embeddings.

response = collection.query.near_vector(
    near_vector=query_vector,
    limit=1,
    return_metadata=MetadataQuery(distance=True),
    filters=(Filter.by_property("type").equal("clip"))
)

video_file = response.objects[0].properties.get("video_file")
print(video_file)

We see that it returned the fourth clip (index 3) football_480_segment_003.mp4

Lets take a look at the clip:

import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import HTML

video_file = response.objects[0].properties.get("video_file")
video = sampled_video_files[video_file]

fig = plt.figure()
im = plt.imshow(video[0,:,:,:])

plt.close() # this is required to not display the generated image

def init():
    im.set_data(video[0,:,:,:])

def animate(i):
    im.set_data(video[i,:,:,:])
    return im

anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0],
                               interval=100)
HTML(anim.to_html5_video())

We see that it finds the exact moment in the video where the helmet catch is made.

Now that we know we have the proper segment, we can see how Pegasus and LLaVa-NeXT-Video perform with a shorter clip.


Chatting with our Video Segment: Pegasus vs LLaVa-NeTX-Video

First we will see how Pegasus answers.

pegasus_video_id = response.objects[0].properties.get("pegasus_video_id")


print(sample_question)

res = twelve_labs_client.generate.text(
  video_id=pegasus_video_id,
  prompt=sample_question
)
print(f"{res.data}")

What technique did David Tyree use to catch the ball?

David Tyree used a technique where he pressed the ball against his helmet to secure the catch. This was a crucial play that allowed the New York Giants to maintain possession and continue their drive.

We see that Pegasus gives us a great answer - mentioning the helmet catch and how it is a cricial play for the Giants.

Now, lets see if LLaVa-NeTX-Video gives a better answer when looking at a segment.

video_file = response.objects[0].properties.get("video_file")
sampled_video = sampled_video_files[video_file]
generated_text = query_llava_next(sample_question,llava_next_model,llava_next_processor,sampled_video)

print(generated_text)

USER: What technique did David Tyree use to catch the ball? ASSISTANT: The player who was caught in the act of catching the ball used a two-handed, over-the-head catch technique. This technique involves using both hands to secure the ball while elevating it above the receiver's head. It can be a risky maneuver if the ball is deflected from the intended target, but it can also be a very effective way to grab the ball out of the air or catch it in a traffic jam. David Tyree'

We see that it gives an accurate answer that the ball was caught overhead. However it does ot mention that it was a helmet catch. It also starts to ramble a bit at the end.


6 - Multi Video RAG with Marengo, Weaviate, and Pegasus

Now that we know how Marengo embeddings perform on individual clips from a single video, we will show how to use embeddings across multiple videos for a more realistic RAG use case.


Get Marengo Embeddings for All Videos

First, we’ll update the marengo_task_ids dictionary with Marengo Task IDs for all of our videos.

for video_file_name in os.listdir(upscaled_video_dir):

    if video_file_name in marengo_task_ids:
        print(f"skipping {video_file_name} because embeddings already exist")
        continue

    print(f"processing {video_file_name}")

    file_path = os.path.join(upscaled_video_dir, video_file_name)

    task = twelve_labs_client.embed.task.create(
        model_name="Marengo-retrieval-2.7",
        video_file=file_path,
        video_clip_length=segment_length,
        video_embedding_scopes=["clip", "video"]
    )
    print(
        f"Created task: id={task.id} model_name={task.model_name} status={task.status}"
    )

    # Monitor the status of the video embedding task
    status = task.wait_for_done(
        sleep_interval=2,
        callback=on_task_update
    )
    print(f"Embedding done: {status}")

    marengo_task_ids[video_file_name] = task.id


Split our Remaining Videos into Segments

We’ll then split the remainder of our videos into segments like we did before.

# Create output folder if it doesn't exist
os.makedirs(upscaled_video_dir, exist_ok=True)

# Get all video files
video_files = [f for f in os.listdir(upscaled_video_dir) if f.endswith(('.mp4', '.avi', '.mov'))]

# Process each video
for video_file in video_files:
    split_video(upscaled_video_dir + video_file,video_segments_dir,segment_length)


Get Pegasus Video IDs for All Videos and their Segments

We’ll then get the Pegasus Video IDs for the remainder of the segments full videos. We’ll do this in parallel to save time.

import concurrent.futures
import os
from tqdm import tqdm  # Use standard tqdm instead of tqdm.notebook

def process_video(video_path):
    video_file_name = video_path.split("/")[-1]
    try:
        video_id = upload_video_to_twelve_labs_pegasus(video_path)
        return video_file_name, video_id
    except Exception as e:
        print(f"Error processing {video_file_name}: {str(e)}")
        return video_file_name, None

# Filter out videos that are already processed
segment_video_files = [ video_segments_dir + f for f in os.listdir(video_segments_dir) if f.endswith('.mp4')]
full_video_files = [ upscaled_video_dir + f for f in os.listdir(upscaled_video_dir) if f.endswith('.mp4')]
all_video_files = segment_video_files + full_video_files

videos_to_process = [f for f in all_video_files if f.split("/")[-1] not in pegasus_video_ids]

print(f"Processing {len(videos_to_process)} videos in parallel...")

# Use ThreadPoolExecutor for I/O-bound operations like API calls
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    # Submit all tasks and create a dictionary mapping futures to their video files
    future_to_video = {executor.submit(process_video, video_path): video_path for video_path in videos_to_process}
    
    # Process results as they complete with a progress bar
    for future in tqdm(concurrent.futures.as_completed(future_to_video), total=len(videos_to_process)):
        video_file_name, video_id = future.result()
        if video_id:
            pegasus_video_ids[video_file_name] = video_id

print("All videos processed!")
print(f"Successfully processed {len([v for v in pegasus_video_ids.values() if v is not None])} videos")


Upload Data to Weaviate

Next, we’ll upload the rest of our data to Weaviate.

records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids)

with collection.batch.dynamic() as batch:
    for i, record in enumerate(records):
        if record["pegasus_video_id"] is None:
            continue
        batch.add_object(
            properties=record,
            vector=vectors[i]
        )

print(f"Added {len(records)} embeddings to Weaviate")


RAG Performance Evaluation: Clips vs. Full Videos

With our Marengo embeddings and Pegasus video IDs successfully indexed in Weaviate, we can now evaluate the effectiveness of our RAG system. This evaluation will focus on two critical aspects:

  1. Answer Quality: How accurately does the system respond to queries when using clip-level retrieval versus full-video retrieval?

  2. Processing Efficiency: What are the performance differences in terms of response time and computational resources?

We'll run a series of targeted queries against both approaches - retrieving relevant video clips and retrieving entire videos - to measure these differences quantitatively. This comparison will demonstrate how RAG can significantly improve video processing by focusing only on the most relevant segments, particularly for longer videos or complex queries that reference specific moments.

Let's begin by defining a set of diverse test questions that span different sports and require understanding specific actions or events within our videos.

video_questions = [
    "In the American Football Video, what are the teams playing?", 
    "What technique does David Tyree use to catch the ball?",
    "In the tennis match video, who is playing?", 
    "What foot does Messi shoot at the goal with?",
    "When does Keri Strug hurt her foot?"
]


Multi Video RAG with Pegasus

First we will assess the performance querying full videos:

from weaviate.classes.query import MetadataQuery, Filter
import time

pegasus_full_video_answers = []

start_time = time.time()

for question in video_questions:

    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start",
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("video"))
    )

    selected_video_name = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    res = twelve_labs_client.generate.text(
        video_id=selected_video_id,
        prompt=question
    )

    pegasus_full_video_answers.append([question,selected_video_name,res.data])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 72 seconds

We’ll then compare this to clips.

pegasus_clip_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start",
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("clip"))
    )

    selected_video_name = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    res = twelve_labs_client.generate.text(
        video_id=selected_video_id,
        prompt=question
    )

    pegasus_clip_video_answers.append([question,selected_video_name,res.data])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 20 seconds

Now we’ll compare the answers from the selected clip vs the full video.

for clip_answer, full_answer in zip(pegasus_clip_video_answers, pegasus_full_video_answers):

    print("question",clip_answer[0])
    print("clip:  ",clip_answer[2])
    print("full:  ",full_answer[2])
    print("\n")

We see that the answers given are accurate and comparable to each other. However, the clip processing took 20 seconds, while the full video processing took 72 seconds.



Multi Video RAG with LLaVa-NeXT-Video

Now, we will run the same experiment with the LLaVa-NeXT-Video model. But, first we must sample all of our videos.

for video_file in os.listdir(video_segments_dir):
    print(video_file)
    sampled_video = sample_video(video_segments_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video

We’ll start by using the full video queries.

llava_full_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start"
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("video"))
    )

    selected_video_file = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    sampled_video = sampled_video_files[selected_video_file]
    generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video)

    llava_full_video_answers.append([question,selected_video_name,generated_text])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 24 seconds

We’ll then compare this to the clips.

from weaviate.classes.query import MetadataQuery

import time

llava_clip_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start"
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("clip"))
    )

    selected_video_file = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    sampled_video = sampled_video_files[selected_video_file]
    generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video)

    llava_clip_video_answers.append([question,selected_video_name,generated_text])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 24 seconds

We see that execution took the same amount of time for each. This is because we sample 40 frames from each video regardless of length.

Now, we’ll take a look at the answers that LLaVa-NeXT-Video gives for clips and full videos.

for clip_answer, full_answer in zip(llava_clip_video_answers, llava_full_video_answers):

    print("question",clip_answer[0])
    print("clip:  ",clip_answer[2])
    print("full:  ",full_answer[2])
    print("\n")

In this case LLaVa-NeXT-Video gets 2/5 correct while analyzing the clips:

  1. It correctly understands that the New York Giants and playing the New England Patriots in the first question; and

  2. It correctly identifies that the tennis match is between Roger Federer and Novak Djokovic in the third question.



7 - Conclusion: Leveraging RAG for Efficient Video Understanding with TwelveLabs and Weaviate

Our exploration of Retrieval-Augmented Generation (RAG) for video processing has demonstrated significant benefits in both efficiency and accuracy. By combining TwelveLabs' advanced video understanding capabilities with Weaviate's powerful vector database, we've created a system that intelligently processes only the most relevant video segments rather than entire videos.


Key Findings
  1. Performance Improvements: When using TwelveLabs' Pegasus with our Weaviate-powered RAG system, we significantly improved processing speeds by querying shorter, more relevant video clips instead of entire videos.

  2. Enhanced Accuracy: For open-source models like LLaVa-NeXT-Video, focusing on specific video segments dramatically improved answer accuracy, enabling more precise responses to queries about video content.

  3. Scalable Architecture: Our RAG pipeline demonstrates how TwelveLabs' embedding models (Marengo) and Weaviate's vector database create a powerful foundation for efficient video understanding. Weaviate's ability to store and retrieve high-dimensional embeddings with low latency is crucial for making this approach practical in real-world applications.


Use Cases

The integration of TwelveLabs' video understanding capabilities with Weaviate's vector database enables powerful applications across numerous industries:

  1. Media & Entertainment: Content creators can quickly locate specific scenes across large video libraries, enabling efficient editing, content repurposing, and clip generation for social media.

  2. Sports Analytics: Coaches and analysts can instantly retrieve relevant plays from game footage by describing the action they're looking for, without manually scrubbing through hours of video.

  3. Retail & E-commerce: Retailers can transform their product demonstration videos into interactive shopping experiences by enabling customers to ask specific questions like "How do I adjust the strap?" or "Show me how it fits in a backpack" and instantly receive the relevant video segment.

Together, TwelveLabs and Weaviate can create powerful video RAG systems that significantly enhance how we interact with and extract value from video content at scale.

Big thanks to Tuana Celik and Erika Cardenas from the Weaviate team for reviewing the draft!


Video processing is computationally expensive and time-consuming, especially when analyzing long-form content. Retrieval-Augmented Generation (RAG) offers a solution by enabling systems to process only the most relevant video segments rather than entire videos. This targeted approach significantly reduces processing time while maintaining or improving response quality.

In this post, we'll explore how to combine Twelve Labs' video understanding capabilities with Weaviate's vector database to create an efficient RAG system for video content. By segmenting videos and using embeddings to retrieve only the most relevant portions for analysis, we can significantly improve processing times while maintaining or even enhancing accuracy.

Our approach leverages several key technologies:

  • TwelveLabs Pegasus and Marengo models for video understanding and embedding generation

  • Weaviate vector database for efficient storage and retrieval of video segments

  • Open source LLaVA-NeXT-Video model as a comparison point for video analysis

We'll demonstrate how this RAG-based approach can reduce the computational load of video processing by focusing only on the most relevant segments, making it possible to analyze longer videos more efficiently. Whether you're building applications for content moderation, sports analysis, or educational content, this approach can help you scale your video processing capabilities while maintaining high-quality results.


1 - Set Up TwelveLabs and Weaviate


TwelveLabs

If you have not already signed up for Twelve Labs you can do so here. Once you have your account set up, go to the Playground, click on your user icon in the top right corner of the screen, and go to API Key.

In your notebook click the key icon on the left and create a secret with this value as TL_API_KEY.


Weaviate

If you do not have a Weaviate account, you can sign up here. Once you have an account, go to the cloud dashboard and create a new cluster. Once you have your cluster set up, you’ll need to populate two values in your notebook secrets section.

Add URL under REST Endpoint to a WEAVIATE_URL variable. Copy the Admin key under API Keys and save it to WEAVIATE_API_KEY.


2 - Choose a GPU Runtime

We’ll need a GPU to run the LLaVA-NeXT-Video model. In your notebook go to Runtime > Change runtime type and select a T4 GPU.


3 - Setting Up Our Environment


Install Dependencies

First, we need to install the TwelveLabs and Weaviate SDKs:

!python -m pip install -U -q twelvelabs
!python -m pip install -U -q "weaviate-client>=4.0.0"

We’ll then install the remaining dependencies.

!python -m pip install torch
!python -m pip install -q av
!python -m pip install --upgrade -q accelerate 
!python -m pip install -U bitsandbytes
!python -m pip install git

!python -m pip install pillow
!python -m pip install sentencepiece
!python -m


Set Up TwelveLabs and Weaviate SDKs
from google.colab import userdata

TL_API_KEY=userdata.get('TL_API_KEY')
weaviate_url = userdata.get("WEAVIATE_URL")
weaviate_api_key = userdata.get("WEAVIATE_API_KEY")

We’ll then initialize the TwelveLabs Client.

from twelvelabs import TwelveLabs

# Initialize the Twelve Labs client
twelve_labs_client = TwelveLabs(api_key=TL_API_KEY)

Finally, we’ll set up our Weaviate client and initialize a Video_Embeddings collection.

import weaviate
from weaviate.classes.init import Auth

# Connect to Weaviate Cloud
weaviate_client = weaviate.connect_to_weaviate_cloud(
    cluster_url=weaviate_url,
    auth_credentials=Auth.api_key(weaviate_api_key),
)

# Get or create collection
try:
    collection = weaviate_client.collections.get("Video_Embeddings")
except:
    collection = weaviate_client.collections.create(name="Video_Embeddings")


Setting Up Our Video Data

Now, we need to get our video data for embedding. You can find the video data in a Google Drive folder using this link. Copy it to a folder called "TwelveLabs-Weaviate" in your base Google Drive folder. We'll use the following cell to mount your drive and give our notebook access to the video files.

from google.colab import drive
drive.mount('/content/drive')
base_folder_path = "/content/drive/MyDrive/TwelveLabs-Weaviate"
raw_video_dir = base_folder_path + "/sports_videos"

upscaled_video_dir = base_folder_path + "/upscaled_videos/"
video_segments_dir = base_folder_path + "/video_segments/"


Upscaling Our Videos

Some of our videos are too low resolution for our embeddings model. We’ll need to upscale them before using them.

We’ll create the upscale function here. read_video_pyav comes directly from the LLaVa-NeXT-Video collab notebook and it formats videos in the correct numpy representation for inference.

import av
import numpy as np

def upscale_video(input_file, output_file, target_width=1280, target_height=720):
    input_container = av.open(input_file)
    output_container = av.open(output_file, mode='w')

    input_stream = input_container.streams.video[0]
    output_stream = output_container.add_stream('libx264', rate=input_stream.average_rate)
    output_stream.width = target_width
    output_stream.height = target_height
    output_stream.pix_fmt = 'yuv420p'

    for frame in input_container.decode(input_stream):
        frame = frame.reformat(width=target_width, height=target_height)
        packet = output_stream.encode(frame)
        output_container.mux(packet)

    # Flush the encoder
    packet = output_stream.encode(None)
    output_container.mux(packet)

    # Close the containers
    input_container.close()
    output_container.close()

def read_video_pyav(container, indices):
    '''
    Decode the video with PyAV decoder.

    Args:
        container (av.container.input.InputContainer): PyAV container.
        indices (List[int]): List of frame indices to decode.

    Returns:
        np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3).
    '''
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)
    return np.stack([x.to_ndarray(format="rgb24") for x in frames])

We’ll take the videos in our raw_video_dir , upscale them, and save them to upscaled_video_dir.

# Create output directory if it doesn't exist
if not os.path.exists(upscaled_video_dir):
    os.makedirs(upscaled_video_dir)

# Iterate over all files in the raw video directory
for filename in os.listdir(raw_video_dir):
    
    # Check if the file is a video file
    if filename.endswith(".mp4"):
        print(filename)
        # Get the file name without extension
        input_file_no_ext = os.path.splitext(filename)[0]
        # Define the output file name
        output_file = f"{input_file_no_ext}_480.mp4"
        if output_file in os.listdir(upscaled_video_dir):
            continue
        # Define the full path for the input and output files
        input_file_path = os.path.join(raw_video_dir, filename)
        output_file_path = os.path.join(upscaled_video_dir, output_file)
        # Upscale the video
        upscale_video(input_file_path, output_file_path)


4 - Comparing Pegasus and LLaVa-NeXT-Video on a Single Video

Pegasus and LLaVa-NeXT-Video are both video understanding models that allow us to take a video and ask questions about it.

We’ll start by comparing Pegasus and LLaVa-NeXT-Video on a single video from our video collection. The video shows a sequence from Super Bowl XLII where the New York Giants and playing the New England Patriots. It is a famous catch called the “Helmet Catch” where Eli Manning, the Giants Quarterback, throws the ball to the Giants receiver, David Tyree, who successfully catches the ball against his helmet in the final two minutes of the game.

Now that we have context of the video, we will determine how well our two models are able to understand the video when asked “What is happening in this video?”.


Using Pegasus to Chat with our Video

Before we start, we need to set up a Pegasus index to store our video.

models = [
        {
            "name": "pegasus1.2",
            "options": ["visual"]
        }
    ]

index_name = "sports_videos"
indices_list = twelve_labs_client.index.list(name=index_name)

if len(indices_list) == 0:
    index = twelve_labs_client.index.create(
        name=index_name,
        models=models

    )
    print(f"A new index has been created: id={index.id} name={index.name} models={index.models}")
else:
    index = indices_list[0]
    print(f"Index already exists: id={index.id} name={index.name} models={index.models}")

We then create a function to upload our video to the index. This will return a Pegasus Video ID which we can use to ask questions with our video.

# Monitor the status of the video task
def on_task_update(task):
    print(f"  Status={task.status}")
    
def upload_video_to_twelve_labs_pegasus(video_path):
    task = twelve_labs_client.task.create(
        index_id=index.id,
        file = video_path
    )
    print(f"Task created: id={task.id} status={task.status}")

    task.wait_for_done(sleep_interval=5, callback=on_task_update)

    if task.status != "ready":
      raise RuntimeError(f"Indexing failed with status {task.status}")
    print(f"The unique identifer of your video is {task.video_id}.")
    return task.video_id

We’ll upload our video and save the Pegasus Video ID to single_video_id.

# Define the video file path
single_video_file = upscaled_video_dir + "football_480.mp4"

single_video_id = upload_video_to_twelve_labs_pegasus(single_video_file)

We’ll ask “What is going on in this video? Please be concise.” to ensure that Pegasus really understand the video.

single_video_query = "What is going on in this video? Please be concise."

res = twelve_labs_client.generate.text(
  video_id=single_video_id,
  prompt=single_video_query
)
print(f"{res.data}")

Pegasus responds with this message:

The video showcases a pivotal moment in a football game between the New York Giants and the New England Patriots. Eli Manning, the Giants' quarterback, throws a pass that David Tyree catches spectacularly by pinning the ball against his helmet as he falls out of bounds. Multiple angles replay the catch, emphasizing its difficulty and precision. Tyree briefly celebrates after the play, and the video ends with him and other players walking off the field.

We can see from the response that Pegasus has a deep understanding of the video. It understands that this is a football game between the Giants and Patriots. It also understands that Eli Manning threw the ball, David Tyree caught it, and that it is a pivotal moment in the game.

Pegasus does not mention that it is the Super Bowl, so we’ll ask to make sure.

res = twelve_labs_client.generate.text(
  video_id=single_video_id,
  prompt="What game is this?"
)
print(f"{res.data}")

Pegasus responds with This is the Super Bowl XLII game. , which is correct.

Now, lets see how well LLaVa-NeXT-Video understands the video.


Using LLaVa-NeXT-Video to Chat with our Video

For LLaVa-NeXT-Video, we need to prepare our video data in a specific format before inference. This involves sampling frames uniformly throughout the video, as the model doesn't process the entire video stream at once. We'll create a sampling function that extracts 40 evenly distributed frames from each video, ensuring we capture the key moments throughout the content. This sampling approach is adapted from the official LLaVA-NeXT-Video implementation. After sampling, we'll load the model from Hugging Face Hub, format our inputs according to the model's requirements, and run inference to generate responses to our queries.

def sample_video(video_path, num_samples=8):
    container = av.open(video_path)

    # sample uniformly num_samples frames from the video
    total_frames = container.streams.video[0].frames
    indices = np.arange(0, total_frames, total_frames / num_samples).astype(int)
    
    sampled_frames = read_video_pyav(container, indices)
    
    return sampled_frames
    
sampled_video = sample_video(single_video_file, num_samples=40)

Once the video is properly sampled, we’ll set up our model.

from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor
import torch

quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16
)

llava_next_processor = LlavaNextVideoProcessor.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf")
llava_next_model = LlavaNextVideoForConditionalGeneration.from_pretrained(
    "llava-hf/LLaVA-NeXT-Video-7B-hf",
    quantization_config=quantization_config,
    device_map='auto'
)

Then we’ll create a function to query our model.

def query_llava_next(query,model,processor,sampled_video):

    # Each "content" is a list of dicts and you can add image/video/text modalities
    conversation = [
        {
            "role": "user",
            "content": [
                {"type": "text", "text": query},
                {"type": "video"},
                ],
        },
    ]

    prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
    # prompt_len = len(prompt)

    inputs = processor([prompt], videos=[sampled_video], padding=True, return_tensors="pt").to(model.device)

    generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9}

    output = model.generate(**inputs, **generate_kwargs)
    generated_text = processor.batch_decode(output, skip_special_tokens=True)

    return generated_text[0]

Finally, we’ll ask the same questions to compare the output to Pegasus.

llava_next_result = query_llava_next(single_video_query,llava_next_model,llava_next_processor,sampled_video)
print(llava_next_result)

We get this response:

What is happening in this video? Be concise ASSISTANT: The video shows a football game in progress, with various players on the field. It appears to be the Super Bowl III between the New York Giants and the New England Patriots, judging by the jersey numbers and the old-fashioned helmets worn by some players. One player is in mid-action, grabbing the ball and getting tackled by another player, while a referee is signaling a first down. There are also coaches and other game

While this model does recognize that there is a football game happening between the Giants and the Patriots, it incorrectly identifies the game as Super Bowl III. It also misses the most important part of the video - the helmet catch.

We’ll ask the same “what game is this?” question that we asked before. We see that the model is closer, but still incorrect.

llava_next_result = query_llava_next("what game is this?",llava_next_model,llava_next_processor,sampled_video)
print(llava_next_result)

USER: what game is this? ASSISTANT: The image you've provided is of a football game in progress, specifically from Super Bowl XLI. It is the New England Patriots versus the Giants. The players in the image are Giants and Patriots.


5 - RAG for Segment-Level Queries on a Single Video

Our comparison shows that Pegasus delivers superior results when analyzing complete videos, offering more accurate and coherent responses with faster processing times.

However, we can likely improve the performance of our models by narrowing their focus to only the most relevant video segments. This is where Retrieval-Augmented Generation (RAG) becomes valuable - instead of processing entire videos, we can identify and analyze only the segments that contain information relevant to a specific query.

To implement this approach, we'll leverage TwelveLabs' Marengo model, which specializes in creating high-quality embeddings that capture the semantic content of video segments. These embeddings allow us to:

  1. Index each segment of a video independently.

  2. Match user queries to the most relevant segments.

  3. Process only those specific segments with our video understanding models.

Let's begin by dividing our video into segments and generating embeddings for each one using the Marengo model. These embeddings will serve as the foundation of our RAG system.


Using Marengo to Create Full Video and Video Clip Embeddings

We set our segment length to 10 seconds - the maximum that Marengo allows.

# Define the video segment length
segment_length = 10

We’ll then use Marengo to embed our video. Note: we set video_embedding_scopes=["clip", "video"] and video_clip_length=segment_lengthto ensure that Marengo returns embeddings for our entire video along with embeddings for each 10 second clip in the video.

task = twelve_labs_client.embed.task.create(
    model_name="Marengo-retrieval-2.7",
    video_file=single_video_file,
    video_clip_length=segment_length,
    video_embedding_scopes=["clip", "video"]
)
print(
    f"Created task: id={task.id} model_name={task.model_name} status={task.status}"
)

# Monitor the status of the video embedding task
status = task.wait_for_done(
    sleep_interval=2,
    callback=on_task_update
)
print(f"Embedding done: {status}")

Once the embedding is done, we can save the Marengo Task ID to retrieve those embeddings when we need them. We’ll store our Task ID in marengo_task_ids for later use when populating our Weaviate databse.

single_video_task_id = task.id

marengo_task_ids = {}

single_video_file_name = single_video_file.split("/")[-1]
marengo_task_ids[single_video_file_name] = single_video_task_id


Prepare Video Segments for RAG

To create an efficient RAG pipeline, we want to associate the Pegasus Video ID with the Marengo Task ID in our database. This will allow us to chat with that video segment when it is returned in our vector search. To do this we will also upload each segment of the video to Pegasus to be indexed.

First, we will create a split_video function to split the video into 10 second segments to be uploaded to Pegasus. We also need to ensure that each segment is over 4 seconds long - the minimum for Pegasus. We do this by including some overlap between the last two clips if the last clip is less than 5 seconds long.

import os
import subprocess
import json
    
def split_video(input_path, output_dir, segment_duration=10):
    """
    Split a video into segments of the specified duration.
    Regular segments will be exactly segment_duration seconds.
    The last segment will be at least 5 seconds long, potentially overlapping
    with the previous segment if needed.
    
    Args:
        input_path: Path to the input video file
        output_dir: Directory to save the output segments
        segment_duration: Duration of each segment in seconds (default: 10)
    """

    # Minimum length for the last segment
    min_last_segment_len = 5
    
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Get base filename without extension
    base_name = os.path.splitext(os.path.basename(input_path))[0]
    
    # Get video duration using ffprobe
    probe_cmd = [
        "ffprobe", "-v", "quiet", "-print_format", "json",
        "-show_format", input_path
    ]
    
    try:
        probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True)
        video_info = json.loads(probe_result.stdout)
        duration = float(video_info["format"]["duration"])
    except Exception as e:
        print(f"Error getting video duration: {e}")
        return 0
    
    # Calculate number of full segments
    num_full_segments = int(duration / segment_duration)
    
    # Calculate remaining duration
    remaining_duration = duration - (num_full_segments * segment_duration)
    
    # Determine total number of segments and if we need to adjust the last segment
    if remaining_duration > 0:
        if remaining_duration < min_last_segment_len:
            # Last segment would be too short, so we'll adjust its start time
            num_segments = num_full_segments + 1
            needs_adjustment = True
        else:
            # Last segment is already long enough
            num_segments = num_full_segments + 1
            needs_adjustment = False
    else:
        # No remaining duration, all segments are complete
        num_segments = num_full_segments
        needs_adjustment = False
    
    print(f"Video {base_name} is {duration:.2f} seconds long")
    print(f"Creating {num_segments} segments")
    
    # Create each segment
    for i in range(num_segments):
        # For regular segments, start at the segment boundary
        if i < num_full_segments:
            start_time = i * segment_duration
            actual_duration = segment_duration
        else:
            # This is the last segment
            if needs_adjustment:
                # Start earlier to ensure it's at least min_last_segment_len seconds
                start_time = duration - min_last_segment_len
                actual_duration = min_last_segment_len
            else:
                # Last segment is already long enough
                start_time = i * segment_duration
                actual_duration = remaining_duration
        
        output_path = os.path.join(output_dir, f"{base_name}_segment_{i:03d}.mp4")
        
        # For all segments, use copy mode for speed
        cmd = [
            "ffmpeg", "-y",
            "-ss", str(start_time),
            "-i", input_path,
            "-t", str(actual_duration),
            "-c:v", "copy",
            "-c:a", "copy",
            output_path
        ]
        
        result = subprocess.run(cmd, capture_output=True, text=True)
        
        if result.returncode != 0:
            print(f"Error creating segment {i+1}: {result.stderr[:100]}...")
        else:
            end_time = start_time + actual_duration
            if i == num_segments - 1 and needs_adjustment:
                print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s (adjusted to ensure at least {min_last_segment_len}s)")
            else:
                print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s")
    
    print(f"Successfully split {base_name} into {num_segments} segments")
    return num_segments

We’ll save our videos to a new video_segments_dir

split_video(single_video_file, video_segments_dir,segment_length)

We’ll then create pegasus_video_ids - a dictionary mapping file names with Pegasus Video IDs and add the Video ID for our full video.

pegasus_video_ids = {}

fname = single_video_file.split("/")[-1]
pegasus_video_ids[fname] = single_video_id

We’ll then upload our video segments to Pegasus and populate pegasus_video_ids with their video IDs.

segment_video_files = [f for f in os.listdir(video_segments_dir) if f.endswith(('.mp4'))]

# Process each video
for segment_video_file in segment_video_files:
    if segment_video_file in pegasus_video_ids:
        print("skip file",segment_video_file)
        continue
    print("processing file",segment_video_file)
    try:
        video_id = upload_video_to_twelve_labs_pegasus(video_segments_dir+segment_video_file)
        pegasus_video_ids[segment_video_file] = video_id
    except:
        print("error",segment_video_file)
        continue

Finally we need to segment all of our videos for efficient use with the LLaVa-NeXT-Video model.

sampled_video_files = {}

for video_file in os.listdir(video_segments_dir):
    print(video_file)
    sampled_video = sample_video(video_segments_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video

for video_file in os.listdir(upscaled_video_dir):
    print(video_file)
    sampled_video = sample_video(upscaled_video_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video


Uploading Embeddings to Weaviate

Weaviate expects the metadata records and embeddings vectors to be separate when uploading to a collection. We’ll create prepare_marengo_embeddings_for_weaviate to take our Marengo Task IDs and our Pegasus Video IDs and prepare the records and vectors for upload.

def prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids):

    # Prepare data for Weaviate upload
    records = []
    vectors = []

    for video_file_name in marengo_task_ids.keys():

        

        marengo_task_id = marengo_task_ids[video_file_name]

        # Retrieve marengo full video and clip embeddings
        marengo_embeddings_result = twelve_labs_client.embed.task.retrieve(marengo_task_id)


        #track segment number to match with fiel
        segment_number = 0

        for segment in marengo_embeddings_result.video_embedding.segments:
            # Determine if this is a video or clip segment
            is_video = segment.embedding_scope == "video"


            #Update the file name if segment
            updated_file_name = video_file_name
            if not is_video:
                updated_file_name = updated_file_name.replace(".mp4",f"_segment_{segment_number:03d}.mp4")
                segment_number += 1

            video_name = video_file_name.replace(".mp4","")
            
            pegasus_video_id = None
            if updated_file_name in pegasus_video_ids:
                pegasus_video_id = pegasus_video_ids[updated_file_name] 

            record = {
                'video_name':video_name,
                'segment_number': 0 if is_video else segment_number,
                'video_file': updated_file_name,
                'start_time': getattr(segment, 'start_offset_sec', 0),
                'end_time': getattr(segment, 'end_offset_sec', 0),
                'type': 'video' if is_video else 'clip',
                'task_id': marengo_task_id,
                'pegasus_video_id': pegasus_video_id
            }
            
            # Get the embedding vector
            embedding_vector = [float(x) for x in segment.embeddings_float]
            
            # Add to our lists
            records.append(record)
            vectors.append(embedding_vector)

    # Print summary
    print(f"Prepared {len(records)} segments for upload to Weaviate")
    print(f"- Video embeddings: {sum(1 for r in records if r['type'] == 'video')}")
    print(f"- Clip embeddings: {sum(1 for r in records if r['type'] == 'clip')}")

    return records, vectors

We’ll then use our function to get the records and vectors to upload to Weaviate.

records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids)

with collection.batch.dynamic() as batch:
    for i, record in enumerate(records):
        batch.add_object(
            properties=record,
            vector=vectors[i]
        )

print(f"Added {len(records)} embeddings to Weaviate")


Testing Vector Search

Now that we have everything in the collection, we can test and see that a vector search on Weaviate returns the correct video. We're using Weaviate’s near_vector search, so if we search with a video's vector it should return a distance of zero to itself.

We'll search using vector #5 from our collection, which should return the corresponding video segment with a distance of zero.

from weaviate.classes.query import MetadataQuery, Filter

# Use a specific vector for the query
query_vector = vectors[5]

# Perform vector search
response = collection.query.near_vector(
    near_vector=query_vector,
    limit=1,  # Increased limit to get more results
    return_metadata=MetadataQuery(distance=True),
)

print(f"Found {len(response.objects)} results for vector search")
for obj in response.objects:
    print(f"Video: {obj.properties['video_file']}, Type: {obj.properties['type']}")
    if 'segment_id' in obj.properties:
        print(f"Segment: {obj.properties['segment_id']}")
    if 'text' in obj.properties and obj.properties['text']:
        print(f"Text: {obj.properties['text']}")
    print(f"Distance: {obj.metadata.distance}")
    print("-" * 50)

This output confirms our embeddings were properly stored and can be accurately retrieved.



Retrieving Relevant Video Segments for RAG

The core of our RAG pipeline is the ability to match user questions with the most relevant video segments. This process works in three key steps:

  1. We use TwelveLabs' Marengo model to convert the user's text query into a vector embedding

  2. We search Weaviate for video segment embeddings that are most similar to our query embedding

  3. Once we identify the most relevant video segment, we use its associated Pegasus video ID to generate accurate responses specific to that segment

This targeted approach allows us to process only the most relevant portions of video content, significantly improving both efficiency and response quality.

First, we use Marengo to embed our text query.

sample_question = "What technique did David Tyree use to catch the ball?"

embedding = twelve_labs_client.embed.create(
    model_name="Marengo-retrieval-2.7",
    text=sample_question,
    text_truncate="start",
)

query_vector = embedding.text_embedding.segments[0].embeddings_float

Then we find the most relevant clip. We use filters=(Filter.by_property("type").equal("clip")) to return just the clip embeddings, ignoring the full video embeddings.

response = collection.query.near_vector(
    near_vector=query_vector,
    limit=1,
    return_metadata=MetadataQuery(distance=True),
    filters=(Filter.by_property("type").equal("clip"))
)

video_file = response.objects[0].properties.get("video_file")
print(video_file)

We see that it returned the fourth clip (index 3) football_480_segment_003.mp4

Lets take a look at the clip:

import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import HTML

video_file = response.objects[0].properties.get("video_file")
video = sampled_video_files[video_file]

fig = plt.figure()
im = plt.imshow(video[0,:,:,:])

plt.close() # this is required to not display the generated image

def init():
    im.set_data(video[0,:,:,:])

def animate(i):
    im.set_data(video[i,:,:,:])
    return im

anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0],
                               interval=100)
HTML(anim.to_html5_video())

We see that it finds the exact moment in the video where the helmet catch is made.

Now that we know we have the proper segment, we can see how Pegasus and LLaVa-NeXT-Video perform with a shorter clip.


Chatting with our Video Segment: Pegasus vs LLaVa-NeTX-Video

First we will see how Pegasus answers.

pegasus_video_id = response.objects[0].properties.get("pegasus_video_id")


print(sample_question)

res = twelve_labs_client.generate.text(
  video_id=pegasus_video_id,
  prompt=sample_question
)
print(f"{res.data}")

What technique did David Tyree use to catch the ball?

David Tyree used a technique where he pressed the ball against his helmet to secure the catch. This was a crucial play that allowed the New York Giants to maintain possession and continue their drive.

We see that Pegasus gives us a great answer - mentioning the helmet catch and how it is a cricial play for the Giants.

Now, lets see if LLaVa-NeTX-Video gives a better answer when looking at a segment.

video_file = response.objects[0].properties.get("video_file")
sampled_video = sampled_video_files[video_file]
generated_text = query_llava_next(sample_question,llava_next_model,llava_next_processor,sampled_video)

print(generated_text)

USER: What technique did David Tyree use to catch the ball? ASSISTANT: The player who was caught in the act of catching the ball used a two-handed, over-the-head catch technique. This technique involves using both hands to secure the ball while elevating it above the receiver's head. It can be a risky maneuver if the ball is deflected from the intended target, but it can also be a very effective way to grab the ball out of the air or catch it in a traffic jam. David Tyree'

We see that it gives an accurate answer that the ball was caught overhead. However it does ot mention that it was a helmet catch. It also starts to ramble a bit at the end.


6 - Multi Video RAG with Marengo, Weaviate, and Pegasus

Now that we know how Marengo embeddings perform on individual clips from a single video, we will show how to use embeddings across multiple videos for a more realistic RAG use case.


Get Marengo Embeddings for All Videos

First, we’ll update the marengo_task_ids dictionary with Marengo Task IDs for all of our videos.

for video_file_name in os.listdir(upscaled_video_dir):

    if video_file_name in marengo_task_ids:
        print(f"skipping {video_file_name} because embeddings already exist")
        continue

    print(f"processing {video_file_name}")

    file_path = os.path.join(upscaled_video_dir, video_file_name)

    task = twelve_labs_client.embed.task.create(
        model_name="Marengo-retrieval-2.7",
        video_file=file_path,
        video_clip_length=segment_length,
        video_embedding_scopes=["clip", "video"]
    )
    print(
        f"Created task: id={task.id} model_name={task.model_name} status={task.status}"
    )

    # Monitor the status of the video embedding task
    status = task.wait_for_done(
        sleep_interval=2,
        callback=on_task_update
    )
    print(f"Embedding done: {status}")

    marengo_task_ids[video_file_name] = task.id


Split our Remaining Videos into Segments

We’ll then split the remainder of our videos into segments like we did before.

# Create output folder if it doesn't exist
os.makedirs(upscaled_video_dir, exist_ok=True)

# Get all video files
video_files = [f for f in os.listdir(upscaled_video_dir) if f.endswith(('.mp4', '.avi', '.mov'))]

# Process each video
for video_file in video_files:
    split_video(upscaled_video_dir + video_file,video_segments_dir,segment_length)


Get Pegasus Video IDs for All Videos and their Segments

We’ll then get the Pegasus Video IDs for the remainder of the segments full videos. We’ll do this in parallel to save time.

import concurrent.futures
import os
from tqdm import tqdm  # Use standard tqdm instead of tqdm.notebook

def process_video(video_path):
    video_file_name = video_path.split("/")[-1]
    try:
        video_id = upload_video_to_twelve_labs_pegasus(video_path)
        return video_file_name, video_id
    except Exception as e:
        print(f"Error processing {video_file_name}: {str(e)}")
        return video_file_name, None

# Filter out videos that are already processed
segment_video_files = [ video_segments_dir + f for f in os.listdir(video_segments_dir) if f.endswith('.mp4')]
full_video_files = [ upscaled_video_dir + f for f in os.listdir(upscaled_video_dir) if f.endswith('.mp4')]
all_video_files = segment_video_files + full_video_files

videos_to_process = [f for f in all_video_files if f.split("/")[-1] not in pegasus_video_ids]

print(f"Processing {len(videos_to_process)} videos in parallel...")

# Use ThreadPoolExecutor for I/O-bound operations like API calls
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    # Submit all tasks and create a dictionary mapping futures to their video files
    future_to_video = {executor.submit(process_video, video_path): video_path for video_path in videos_to_process}
    
    # Process results as they complete with a progress bar
    for future in tqdm(concurrent.futures.as_completed(future_to_video), total=len(videos_to_process)):
        video_file_name, video_id = future.result()
        if video_id:
            pegasus_video_ids[video_file_name] = video_id

print("All videos processed!")
print(f"Successfully processed {len([v for v in pegasus_video_ids.values() if v is not None])} videos")


Upload Data to Weaviate

Next, we’ll upload the rest of our data to Weaviate.

records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids)

with collection.batch.dynamic() as batch:
    for i, record in enumerate(records):
        if record["pegasus_video_id"] is None:
            continue
        batch.add_object(
            properties=record,
            vector=vectors[i]
        )

print(f"Added {len(records)} embeddings to Weaviate")


RAG Performance Evaluation: Clips vs. Full Videos

With our Marengo embeddings and Pegasus video IDs successfully indexed in Weaviate, we can now evaluate the effectiveness of our RAG system. This evaluation will focus on two critical aspects:

  1. Answer Quality: How accurately does the system respond to queries when using clip-level retrieval versus full-video retrieval?

  2. Processing Efficiency: What are the performance differences in terms of response time and computational resources?

We'll run a series of targeted queries against both approaches - retrieving relevant video clips and retrieving entire videos - to measure these differences quantitatively. This comparison will demonstrate how RAG can significantly improve video processing by focusing only on the most relevant segments, particularly for longer videos or complex queries that reference specific moments.

Let's begin by defining a set of diverse test questions that span different sports and require understanding specific actions or events within our videos.

video_questions = [
    "In the American Football Video, what are the teams playing?", 
    "What technique does David Tyree use to catch the ball?",
    "In the tennis match video, who is playing?", 
    "What foot does Messi shoot at the goal with?",
    "When does Keri Strug hurt her foot?"
]


Multi Video RAG with Pegasus

First we will assess the performance querying full videos:

from weaviate.classes.query import MetadataQuery, Filter
import time

pegasus_full_video_answers = []

start_time = time.time()

for question in video_questions:

    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start",
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("video"))
    )

    selected_video_name = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    res = twelve_labs_client.generate.text(
        video_id=selected_video_id,
        prompt=question
    )

    pegasus_full_video_answers.append([question,selected_video_name,res.data])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 72 seconds

We’ll then compare this to clips.

pegasus_clip_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start",
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("clip"))
    )

    selected_video_name = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    res = twelve_labs_client.generate.text(
        video_id=selected_video_id,
        prompt=question
    )

    pegasus_clip_video_answers.append([question,selected_video_name,res.data])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 20 seconds

Now we’ll compare the answers from the selected clip vs the full video.

for clip_answer, full_answer in zip(pegasus_clip_video_answers, pegasus_full_video_answers):

    print("question",clip_answer[0])
    print("clip:  ",clip_answer[2])
    print("full:  ",full_answer[2])
    print("\n")

We see that the answers given are accurate and comparable to each other. However, the clip processing took 20 seconds, while the full video processing took 72 seconds.



Multi Video RAG with LLaVa-NeXT-Video

Now, we will run the same experiment with the LLaVa-NeXT-Video model. But, first we must sample all of our videos.

for video_file in os.listdir(video_segments_dir):
    print(video_file)
    sampled_video = sample_video(video_segments_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video

We’ll start by using the full video queries.

llava_full_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start"
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("video"))
    )

    selected_video_file = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    sampled_video = sampled_video_files[selected_video_file]
    generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video)

    llava_full_video_answers.append([question,selected_video_name,generated_text])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 24 seconds

We’ll then compare this to the clips.

from weaviate.classes.query import MetadataQuery

import time

llava_clip_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start"
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("clip"))
    )

    selected_video_file = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    sampled_video = sampled_video_files[selected_video_file]
    generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video)

    llava_clip_video_answers.append([question,selected_video_name,generated_text])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 24 seconds

We see that execution took the same amount of time for each. This is because we sample 40 frames from each video regardless of length.

Now, we’ll take a look at the answers that LLaVa-NeXT-Video gives for clips and full videos.

for clip_answer, full_answer in zip(llava_clip_video_answers, llava_full_video_answers):

    print("question",clip_answer[0])
    print("clip:  ",clip_answer[2])
    print("full:  ",full_answer[2])
    print("\n")

In this case LLaVa-NeXT-Video gets 2/5 correct while analyzing the clips:

  1. It correctly understands that the New York Giants and playing the New England Patriots in the first question; and

  2. It correctly identifies that the tennis match is between Roger Federer and Novak Djokovic in the third question.



7 - Conclusion: Leveraging RAG for Efficient Video Understanding with TwelveLabs and Weaviate

Our exploration of Retrieval-Augmented Generation (RAG) for video processing has demonstrated significant benefits in both efficiency and accuracy. By combining TwelveLabs' advanced video understanding capabilities with Weaviate's powerful vector database, we've created a system that intelligently processes only the most relevant video segments rather than entire videos.


Key Findings
  1. Performance Improvements: When using TwelveLabs' Pegasus with our Weaviate-powered RAG system, we significantly improved processing speeds by querying shorter, more relevant video clips instead of entire videos.

  2. Enhanced Accuracy: For open-source models like LLaVa-NeXT-Video, focusing on specific video segments dramatically improved answer accuracy, enabling more precise responses to queries about video content.

  3. Scalable Architecture: Our RAG pipeline demonstrates how TwelveLabs' embedding models (Marengo) and Weaviate's vector database create a powerful foundation for efficient video understanding. Weaviate's ability to store and retrieve high-dimensional embeddings with low latency is crucial for making this approach practical in real-world applications.


Use Cases

The integration of TwelveLabs' video understanding capabilities with Weaviate's vector database enables powerful applications across numerous industries:

  1. Media & Entertainment: Content creators can quickly locate specific scenes across large video libraries, enabling efficient editing, content repurposing, and clip generation for social media.

  2. Sports Analytics: Coaches and analysts can instantly retrieve relevant plays from game footage by describing the action they're looking for, without manually scrubbing through hours of video.

  3. Retail & E-commerce: Retailers can transform their product demonstration videos into interactive shopping experiences by enabling customers to ask specific questions like "How do I adjust the strap?" or "Show me how it fits in a backpack" and instantly receive the relevant video segment.

Together, TwelveLabs and Weaviate can create powerful video RAG systems that significantly enhance how we interact with and extract value from video content at scale.

Big thanks to Tuana Celik and Erika Cardenas from the Weaviate team for reviewing the draft!


Video processing is computationally expensive and time-consuming, especially when analyzing long-form content. Retrieval-Augmented Generation (RAG) offers a solution by enabling systems to process only the most relevant video segments rather than entire videos. This targeted approach significantly reduces processing time while maintaining or improving response quality.

In this post, we'll explore how to combine Twelve Labs' video understanding capabilities with Weaviate's vector database to create an efficient RAG system for video content. By segmenting videos and using embeddings to retrieve only the most relevant portions for analysis, we can significantly improve processing times while maintaining or even enhancing accuracy.

Our approach leverages several key technologies:

  • TwelveLabs Pegasus and Marengo models for video understanding and embedding generation

  • Weaviate vector database for efficient storage and retrieval of video segments

  • Open source LLaVA-NeXT-Video model as a comparison point for video analysis

We'll demonstrate how this RAG-based approach can reduce the computational load of video processing by focusing only on the most relevant segments, making it possible to analyze longer videos more efficiently. Whether you're building applications for content moderation, sports analysis, or educational content, this approach can help you scale your video processing capabilities while maintaining high-quality results.


1 - Set Up TwelveLabs and Weaviate


TwelveLabs

If you have not already signed up for Twelve Labs you can do so here. Once you have your account set up, go to the Playground, click on your user icon in the top right corner of the screen, and go to API Key.

In your notebook click the key icon on the left and create a secret with this value as TL_API_KEY.


Weaviate

If you do not have a Weaviate account, you can sign up here. Once you have an account, go to the cloud dashboard and create a new cluster. Once you have your cluster set up, you’ll need to populate two values in your notebook secrets section.

Add URL under REST Endpoint to a WEAVIATE_URL variable. Copy the Admin key under API Keys and save it to WEAVIATE_API_KEY.


2 - Choose a GPU Runtime

We’ll need a GPU to run the LLaVA-NeXT-Video model. In your notebook go to Runtime > Change runtime type and select a T4 GPU.


3 - Setting Up Our Environment


Install Dependencies

First, we need to install the TwelveLabs and Weaviate SDKs:

!python -m pip install -U -q twelvelabs
!python -m pip install -U -q "weaviate-client>=4.0.0"

We’ll then install the remaining dependencies.

!python -m pip install torch
!python -m pip install -q av
!python -m pip install --upgrade -q accelerate 
!python -m pip install -U bitsandbytes
!python -m pip install git

!python -m pip install pillow
!python -m pip install sentencepiece
!python -m


Set Up TwelveLabs and Weaviate SDKs
from google.colab import userdata

TL_API_KEY=userdata.get('TL_API_KEY')
weaviate_url = userdata.get("WEAVIATE_URL")
weaviate_api_key = userdata.get("WEAVIATE_API_KEY")

We’ll then initialize the TwelveLabs Client.

from twelvelabs import TwelveLabs

# Initialize the Twelve Labs client
twelve_labs_client = TwelveLabs(api_key=TL_API_KEY)

Finally, we’ll set up our Weaviate client and initialize a Video_Embeddings collection.

import weaviate
from weaviate.classes.init import Auth

# Connect to Weaviate Cloud
weaviate_client = weaviate.connect_to_weaviate_cloud(
    cluster_url=weaviate_url,
    auth_credentials=Auth.api_key(weaviate_api_key),
)

# Get or create collection
try:
    collection = weaviate_client.collections.get("Video_Embeddings")
except:
    collection = weaviate_client.collections.create(name="Video_Embeddings")


Setting Up Our Video Data

Now, we need to get our video data for embedding. You can find the video data in a Google Drive folder using this link. Copy it to a folder called "TwelveLabs-Weaviate" in your base Google Drive folder. We'll use the following cell to mount your drive and give our notebook access to the video files.

from google.colab import drive
drive.mount('/content/drive')
base_folder_path = "/content/drive/MyDrive/TwelveLabs-Weaviate"
raw_video_dir = base_folder_path + "/sports_videos"

upscaled_video_dir = base_folder_path + "/upscaled_videos/"
video_segments_dir = base_folder_path + "/video_segments/"


Upscaling Our Videos

Some of our videos are too low resolution for our embeddings model. We’ll need to upscale them before using them.

We’ll create the upscale function here. read_video_pyav comes directly from the LLaVa-NeXT-Video collab notebook and it formats videos in the correct numpy representation for inference.

import av
import numpy as np

def upscale_video(input_file, output_file, target_width=1280, target_height=720):
    input_container = av.open(input_file)
    output_container = av.open(output_file, mode='w')

    input_stream = input_container.streams.video[0]
    output_stream = output_container.add_stream('libx264', rate=input_stream.average_rate)
    output_stream.width = target_width
    output_stream.height = target_height
    output_stream.pix_fmt = 'yuv420p'

    for frame in input_container.decode(input_stream):
        frame = frame.reformat(width=target_width, height=target_height)
        packet = output_stream.encode(frame)
        output_container.mux(packet)

    # Flush the encoder
    packet = output_stream.encode(None)
    output_container.mux(packet)

    # Close the containers
    input_container.close()
    output_container.close()

def read_video_pyav(container, indices):
    '''
    Decode the video with PyAV decoder.

    Args:
        container (av.container.input.InputContainer): PyAV container.
        indices (List[int]): List of frame indices to decode.

    Returns:
        np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3).
    '''
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)
    return np.stack([x.to_ndarray(format="rgb24") for x in frames])

We’ll take the videos in our raw_video_dir , upscale them, and save them to upscaled_video_dir.

# Create output directory if it doesn't exist
if not os.path.exists(upscaled_video_dir):
    os.makedirs(upscaled_video_dir)

# Iterate over all files in the raw video directory
for filename in os.listdir(raw_video_dir):
    
    # Check if the file is a video file
    if filename.endswith(".mp4"):
        print(filename)
        # Get the file name without extension
        input_file_no_ext = os.path.splitext(filename)[0]
        # Define the output file name
        output_file = f"{input_file_no_ext}_480.mp4"
        if output_file in os.listdir(upscaled_video_dir):
            continue
        # Define the full path for the input and output files
        input_file_path = os.path.join(raw_video_dir, filename)
        output_file_path = os.path.join(upscaled_video_dir, output_file)
        # Upscale the video
        upscale_video(input_file_path, output_file_path)


4 - Comparing Pegasus and LLaVa-NeXT-Video on a Single Video

Pegasus and LLaVa-NeXT-Video are both video understanding models that allow us to take a video and ask questions about it.

We’ll start by comparing Pegasus and LLaVa-NeXT-Video on a single video from our video collection. The video shows a sequence from Super Bowl XLII where the New York Giants and playing the New England Patriots. It is a famous catch called the “Helmet Catch” where Eli Manning, the Giants Quarterback, throws the ball to the Giants receiver, David Tyree, who successfully catches the ball against his helmet in the final two minutes of the game.

Now that we have context of the video, we will determine how well our two models are able to understand the video when asked “What is happening in this video?”.


Using Pegasus to Chat with our Video

Before we start, we need to set up a Pegasus index to store our video.

models = [
        {
            "name": "pegasus1.2",
            "options": ["visual"]
        }
    ]

index_name = "sports_videos"
indices_list = twelve_labs_client.index.list(name=index_name)

if len(indices_list) == 0:
    index = twelve_labs_client.index.create(
        name=index_name,
        models=models

    )
    print(f"A new index has been created: id={index.id} name={index.name} models={index.models}")
else:
    index = indices_list[0]
    print(f"Index already exists: id={index.id} name={index.name} models={index.models}")

We then create a function to upload our video to the index. This will return a Pegasus Video ID which we can use to ask questions with our video.

# Monitor the status of the video task
def on_task_update(task):
    print(f"  Status={task.status}")
    
def upload_video_to_twelve_labs_pegasus(video_path):
    task = twelve_labs_client.task.create(
        index_id=index.id,
        file = video_path
    )
    print(f"Task created: id={task.id} status={task.status}")

    task.wait_for_done(sleep_interval=5, callback=on_task_update)

    if task.status != "ready":
      raise RuntimeError(f"Indexing failed with status {task.status}")
    print(f"The unique identifer of your video is {task.video_id}.")
    return task.video_id

We’ll upload our video and save the Pegasus Video ID to single_video_id.

# Define the video file path
single_video_file = upscaled_video_dir + "football_480.mp4"

single_video_id = upload_video_to_twelve_labs_pegasus(single_video_file)

We’ll ask “What is going on in this video? Please be concise.” to ensure that Pegasus really understand the video.

single_video_query = "What is going on in this video? Please be concise."

res = twelve_labs_client.generate.text(
  video_id=single_video_id,
  prompt=single_video_query
)
print(f"{res.data}")

Pegasus responds with this message:

The video showcases a pivotal moment in a football game between the New York Giants and the New England Patriots. Eli Manning, the Giants' quarterback, throws a pass that David Tyree catches spectacularly by pinning the ball against his helmet as he falls out of bounds. Multiple angles replay the catch, emphasizing its difficulty and precision. Tyree briefly celebrates after the play, and the video ends with him and other players walking off the field.

We can see from the response that Pegasus has a deep understanding of the video. It understands that this is a football game between the Giants and Patriots. It also understands that Eli Manning threw the ball, David Tyree caught it, and that it is a pivotal moment in the game.

Pegasus does not mention that it is the Super Bowl, so we’ll ask to make sure.

res = twelve_labs_client.generate.text(
  video_id=single_video_id,
  prompt="What game is this?"
)
print(f"{res.data}")

Pegasus responds with This is the Super Bowl XLII game. , which is correct.

Now, lets see how well LLaVa-NeXT-Video understands the video.


Using LLaVa-NeXT-Video to Chat with our Video

For LLaVa-NeXT-Video, we need to prepare our video data in a specific format before inference. This involves sampling frames uniformly throughout the video, as the model doesn't process the entire video stream at once. We'll create a sampling function that extracts 40 evenly distributed frames from each video, ensuring we capture the key moments throughout the content. This sampling approach is adapted from the official LLaVA-NeXT-Video implementation. After sampling, we'll load the model from Hugging Face Hub, format our inputs according to the model's requirements, and run inference to generate responses to our queries.

def sample_video(video_path, num_samples=8):
    container = av.open(video_path)

    # sample uniformly num_samples frames from the video
    total_frames = container.streams.video[0].frames
    indices = np.arange(0, total_frames, total_frames / num_samples).astype(int)
    
    sampled_frames = read_video_pyav(container, indices)
    
    return sampled_frames
    
sampled_video = sample_video(single_video_file, num_samples=40)

Once the video is properly sampled, we’ll set up our model.

from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor
import torch

quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16
)

llava_next_processor = LlavaNextVideoProcessor.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf")
llava_next_model = LlavaNextVideoForConditionalGeneration.from_pretrained(
    "llava-hf/LLaVA-NeXT-Video-7B-hf",
    quantization_config=quantization_config,
    device_map='auto'
)

Then we’ll create a function to query our model.

def query_llava_next(query,model,processor,sampled_video):

    # Each "content" is a list of dicts and you can add image/video/text modalities
    conversation = [
        {
            "role": "user",
            "content": [
                {"type": "text", "text": query},
                {"type": "video"},
                ],
        },
    ]

    prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
    # prompt_len = len(prompt)

    inputs = processor([prompt], videos=[sampled_video], padding=True, return_tensors="pt").to(model.device)

    generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9}

    output = model.generate(**inputs, **generate_kwargs)
    generated_text = processor.batch_decode(output, skip_special_tokens=True)

    return generated_text[0]

Finally, we’ll ask the same questions to compare the output to Pegasus.

llava_next_result = query_llava_next(single_video_query,llava_next_model,llava_next_processor,sampled_video)
print(llava_next_result)

We get this response:

What is happening in this video? Be concise ASSISTANT: The video shows a football game in progress, with various players on the field. It appears to be the Super Bowl III between the New York Giants and the New England Patriots, judging by the jersey numbers and the old-fashioned helmets worn by some players. One player is in mid-action, grabbing the ball and getting tackled by another player, while a referee is signaling a first down. There are also coaches and other game

While this model does recognize that there is a football game happening between the Giants and the Patriots, it incorrectly identifies the game as Super Bowl III. It also misses the most important part of the video - the helmet catch.

We’ll ask the same “what game is this?” question that we asked before. We see that the model is closer, but still incorrect.

llava_next_result = query_llava_next("what game is this?",llava_next_model,llava_next_processor,sampled_video)
print(llava_next_result)

USER: what game is this? ASSISTANT: The image you've provided is of a football game in progress, specifically from Super Bowl XLI. It is the New England Patriots versus the Giants. The players in the image are Giants and Patriots.


5 - RAG for Segment-Level Queries on a Single Video

Our comparison shows that Pegasus delivers superior results when analyzing complete videos, offering more accurate and coherent responses with faster processing times.

However, we can likely improve the performance of our models by narrowing their focus to only the most relevant video segments. This is where Retrieval-Augmented Generation (RAG) becomes valuable - instead of processing entire videos, we can identify and analyze only the segments that contain information relevant to a specific query.

To implement this approach, we'll leverage TwelveLabs' Marengo model, which specializes in creating high-quality embeddings that capture the semantic content of video segments. These embeddings allow us to:

  1. Index each segment of a video independently.

  2. Match user queries to the most relevant segments.

  3. Process only those specific segments with our video understanding models.

Let's begin by dividing our video into segments and generating embeddings for each one using the Marengo model. These embeddings will serve as the foundation of our RAG system.


Using Marengo to Create Full Video and Video Clip Embeddings

We set our segment length to 10 seconds - the maximum that Marengo allows.

# Define the video segment length
segment_length = 10

We’ll then use Marengo to embed our video. Note: we set video_embedding_scopes=["clip", "video"] and video_clip_length=segment_lengthto ensure that Marengo returns embeddings for our entire video along with embeddings for each 10 second clip in the video.

task = twelve_labs_client.embed.task.create(
    model_name="Marengo-retrieval-2.7",
    video_file=single_video_file,
    video_clip_length=segment_length,
    video_embedding_scopes=["clip", "video"]
)
print(
    f"Created task: id={task.id} model_name={task.model_name} status={task.status}"
)

# Monitor the status of the video embedding task
status = task.wait_for_done(
    sleep_interval=2,
    callback=on_task_update
)
print(f"Embedding done: {status}")

Once the embedding is done, we can save the Marengo Task ID to retrieve those embeddings when we need them. We’ll store our Task ID in marengo_task_ids for later use when populating our Weaviate databse.

single_video_task_id = task.id

marengo_task_ids = {}

single_video_file_name = single_video_file.split("/")[-1]
marengo_task_ids[single_video_file_name] = single_video_task_id


Prepare Video Segments for RAG

To create an efficient RAG pipeline, we want to associate the Pegasus Video ID with the Marengo Task ID in our database. This will allow us to chat with that video segment when it is returned in our vector search. To do this we will also upload each segment of the video to Pegasus to be indexed.

First, we will create a split_video function to split the video into 10 second segments to be uploaded to Pegasus. We also need to ensure that each segment is over 4 seconds long - the minimum for Pegasus. We do this by including some overlap between the last two clips if the last clip is less than 5 seconds long.

import os
import subprocess
import json
    
def split_video(input_path, output_dir, segment_duration=10):
    """
    Split a video into segments of the specified duration.
    Regular segments will be exactly segment_duration seconds.
    The last segment will be at least 5 seconds long, potentially overlapping
    with the previous segment if needed.
    
    Args:
        input_path: Path to the input video file
        output_dir: Directory to save the output segments
        segment_duration: Duration of each segment in seconds (default: 10)
    """

    # Minimum length for the last segment
    min_last_segment_len = 5
    
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Get base filename without extension
    base_name = os.path.splitext(os.path.basename(input_path))[0]
    
    # Get video duration using ffprobe
    probe_cmd = [
        "ffprobe", "-v", "quiet", "-print_format", "json",
        "-show_format", input_path
    ]
    
    try:
        probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True)
        video_info = json.loads(probe_result.stdout)
        duration = float(video_info["format"]["duration"])
    except Exception as e:
        print(f"Error getting video duration: {e}")
        return 0
    
    # Calculate number of full segments
    num_full_segments = int(duration / segment_duration)
    
    # Calculate remaining duration
    remaining_duration = duration - (num_full_segments * segment_duration)
    
    # Determine total number of segments and if we need to adjust the last segment
    if remaining_duration > 0:
        if remaining_duration < min_last_segment_len:
            # Last segment would be too short, so we'll adjust its start time
            num_segments = num_full_segments + 1
            needs_adjustment = True
        else:
            # Last segment is already long enough
            num_segments = num_full_segments + 1
            needs_adjustment = False
    else:
        # No remaining duration, all segments are complete
        num_segments = num_full_segments
        needs_adjustment = False
    
    print(f"Video {base_name} is {duration:.2f} seconds long")
    print(f"Creating {num_segments} segments")
    
    # Create each segment
    for i in range(num_segments):
        # For regular segments, start at the segment boundary
        if i < num_full_segments:
            start_time = i * segment_duration
            actual_duration = segment_duration
        else:
            # This is the last segment
            if needs_adjustment:
                # Start earlier to ensure it's at least min_last_segment_len seconds
                start_time = duration - min_last_segment_len
                actual_duration = min_last_segment_len
            else:
                # Last segment is already long enough
                start_time = i * segment_duration
                actual_duration = remaining_duration
        
        output_path = os.path.join(output_dir, f"{base_name}_segment_{i:03d}.mp4")
        
        # For all segments, use copy mode for speed
        cmd = [
            "ffmpeg", "-y",
            "-ss", str(start_time),
            "-i", input_path,
            "-t", str(actual_duration),
            "-c:v", "copy",
            "-c:a", "copy",
            output_path
        ]
        
        result = subprocess.run(cmd, capture_output=True, text=True)
        
        if result.returncode != 0:
            print(f"Error creating segment {i+1}: {result.stderr[:100]}...")
        else:
            end_time = start_time + actual_duration
            if i == num_segments - 1 and needs_adjustment:
                print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s (adjusted to ensure at least {min_last_segment_len}s)")
            else:
                print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s")
    
    print(f"Successfully split {base_name} into {num_segments} segments")
    return num_segments

We’ll save our videos to a new video_segments_dir

split_video(single_video_file, video_segments_dir,segment_length)

We’ll then create pegasus_video_ids - a dictionary mapping file names with Pegasus Video IDs and add the Video ID for our full video.

pegasus_video_ids = {}

fname = single_video_file.split("/")[-1]
pegasus_video_ids[fname] = single_video_id

We’ll then upload our video segments to Pegasus and populate pegasus_video_ids with their video IDs.

segment_video_files = [f for f in os.listdir(video_segments_dir) if f.endswith(('.mp4'))]

# Process each video
for segment_video_file in segment_video_files:
    if segment_video_file in pegasus_video_ids:
        print("skip file",segment_video_file)
        continue
    print("processing file",segment_video_file)
    try:
        video_id = upload_video_to_twelve_labs_pegasus(video_segments_dir+segment_video_file)
        pegasus_video_ids[segment_video_file] = video_id
    except:
        print("error",segment_video_file)
        continue

Finally we need to segment all of our videos for efficient use with the LLaVa-NeXT-Video model.

sampled_video_files = {}

for video_file in os.listdir(video_segments_dir):
    print(video_file)
    sampled_video = sample_video(video_segments_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video

for video_file in os.listdir(upscaled_video_dir):
    print(video_file)
    sampled_video = sample_video(upscaled_video_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video


Uploading Embeddings to Weaviate

Weaviate expects the metadata records and embeddings vectors to be separate when uploading to a collection. We’ll create prepare_marengo_embeddings_for_weaviate to take our Marengo Task IDs and our Pegasus Video IDs and prepare the records and vectors for upload.

def prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids):

    # Prepare data for Weaviate upload
    records = []
    vectors = []

    for video_file_name in marengo_task_ids.keys():

        

        marengo_task_id = marengo_task_ids[video_file_name]

        # Retrieve marengo full video and clip embeddings
        marengo_embeddings_result = twelve_labs_client.embed.task.retrieve(marengo_task_id)


        #track segment number to match with fiel
        segment_number = 0

        for segment in marengo_embeddings_result.video_embedding.segments:
            # Determine if this is a video or clip segment
            is_video = segment.embedding_scope == "video"


            #Update the file name if segment
            updated_file_name = video_file_name
            if not is_video:
                updated_file_name = updated_file_name.replace(".mp4",f"_segment_{segment_number:03d}.mp4")
                segment_number += 1

            video_name = video_file_name.replace(".mp4","")
            
            pegasus_video_id = None
            if updated_file_name in pegasus_video_ids:
                pegasus_video_id = pegasus_video_ids[updated_file_name] 

            record = {
                'video_name':video_name,
                'segment_number': 0 if is_video else segment_number,
                'video_file': updated_file_name,
                'start_time': getattr(segment, 'start_offset_sec', 0),
                'end_time': getattr(segment, 'end_offset_sec', 0),
                'type': 'video' if is_video else 'clip',
                'task_id': marengo_task_id,
                'pegasus_video_id': pegasus_video_id
            }
            
            # Get the embedding vector
            embedding_vector = [float(x) for x in segment.embeddings_float]
            
            # Add to our lists
            records.append(record)
            vectors.append(embedding_vector)

    # Print summary
    print(f"Prepared {len(records)} segments for upload to Weaviate")
    print(f"- Video embeddings: {sum(1 for r in records if r['type'] == 'video')}")
    print(f"- Clip embeddings: {sum(1 for r in records if r['type'] == 'clip')}")

    return records, vectors

We’ll then use our function to get the records and vectors to upload to Weaviate.

records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids)

with collection.batch.dynamic() as batch:
    for i, record in enumerate(records):
        batch.add_object(
            properties=record,
            vector=vectors[i]
        )

print(f"Added {len(records)} embeddings to Weaviate")


Testing Vector Search

Now that we have everything in the collection, we can test and see that a vector search on Weaviate returns the correct video. We're using Weaviate’s near_vector search, so if we search with a video's vector it should return a distance of zero to itself.

We'll search using vector #5 from our collection, which should return the corresponding video segment with a distance of zero.

from weaviate.classes.query import MetadataQuery, Filter

# Use a specific vector for the query
query_vector = vectors[5]

# Perform vector search
response = collection.query.near_vector(
    near_vector=query_vector,
    limit=1,  # Increased limit to get more results
    return_metadata=MetadataQuery(distance=True),
)

print(f"Found {len(response.objects)} results for vector search")
for obj in response.objects:
    print(f"Video: {obj.properties['video_file']}, Type: {obj.properties['type']}")
    if 'segment_id' in obj.properties:
        print(f"Segment: {obj.properties['segment_id']}")
    if 'text' in obj.properties and obj.properties['text']:
        print(f"Text: {obj.properties['text']}")
    print(f"Distance: {obj.metadata.distance}")
    print("-" * 50)

This output confirms our embeddings were properly stored and can be accurately retrieved.



Retrieving Relevant Video Segments for RAG

The core of our RAG pipeline is the ability to match user questions with the most relevant video segments. This process works in three key steps:

  1. We use TwelveLabs' Marengo model to convert the user's text query into a vector embedding

  2. We search Weaviate for video segment embeddings that are most similar to our query embedding

  3. Once we identify the most relevant video segment, we use its associated Pegasus video ID to generate accurate responses specific to that segment

This targeted approach allows us to process only the most relevant portions of video content, significantly improving both efficiency and response quality.

First, we use Marengo to embed our text query.

sample_question = "What technique did David Tyree use to catch the ball?"

embedding = twelve_labs_client.embed.create(
    model_name="Marengo-retrieval-2.7",
    text=sample_question,
    text_truncate="start",
)

query_vector = embedding.text_embedding.segments[0].embeddings_float

Then we find the most relevant clip. We use filters=(Filter.by_property("type").equal("clip")) to return just the clip embeddings, ignoring the full video embeddings.

response = collection.query.near_vector(
    near_vector=query_vector,
    limit=1,
    return_metadata=MetadataQuery(distance=True),
    filters=(Filter.by_property("type").equal("clip"))
)

video_file = response.objects[0].properties.get("video_file")
print(video_file)

We see that it returned the fourth clip (index 3) football_480_segment_003.mp4

Lets take a look at the clip:

import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import HTML

video_file = response.objects[0].properties.get("video_file")
video = sampled_video_files[video_file]

fig = plt.figure()
im = plt.imshow(video[0,:,:,:])

plt.close() # this is required to not display the generated image

def init():
    im.set_data(video[0,:,:,:])

def animate(i):
    im.set_data(video[i,:,:,:])
    return im

anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0],
                               interval=100)
HTML(anim.to_html5_video())

We see that it finds the exact moment in the video where the helmet catch is made.

Now that we know we have the proper segment, we can see how Pegasus and LLaVa-NeXT-Video perform with a shorter clip.


Chatting with our Video Segment: Pegasus vs LLaVa-NeTX-Video

First we will see how Pegasus answers.

pegasus_video_id = response.objects[0].properties.get("pegasus_video_id")


print(sample_question)

res = twelve_labs_client.generate.text(
  video_id=pegasus_video_id,
  prompt=sample_question
)
print(f"{res.data}")

What technique did David Tyree use to catch the ball?

David Tyree used a technique where he pressed the ball against his helmet to secure the catch. This was a crucial play that allowed the New York Giants to maintain possession and continue their drive.

We see that Pegasus gives us a great answer - mentioning the helmet catch and how it is a cricial play for the Giants.

Now, lets see if LLaVa-NeTX-Video gives a better answer when looking at a segment.

video_file = response.objects[0].properties.get("video_file")
sampled_video = sampled_video_files[video_file]
generated_text = query_llava_next(sample_question,llava_next_model,llava_next_processor,sampled_video)

print(generated_text)

USER: What technique did David Tyree use to catch the ball? ASSISTANT: The player who was caught in the act of catching the ball used a two-handed, over-the-head catch technique. This technique involves using both hands to secure the ball while elevating it above the receiver's head. It can be a risky maneuver if the ball is deflected from the intended target, but it can also be a very effective way to grab the ball out of the air or catch it in a traffic jam. David Tyree'

We see that it gives an accurate answer that the ball was caught overhead. However it does ot mention that it was a helmet catch. It also starts to ramble a bit at the end.


6 - Multi Video RAG with Marengo, Weaviate, and Pegasus

Now that we know how Marengo embeddings perform on individual clips from a single video, we will show how to use embeddings across multiple videos for a more realistic RAG use case.


Get Marengo Embeddings for All Videos

First, we’ll update the marengo_task_ids dictionary with Marengo Task IDs for all of our videos.

for video_file_name in os.listdir(upscaled_video_dir):

    if video_file_name in marengo_task_ids:
        print(f"skipping {video_file_name} because embeddings already exist")
        continue

    print(f"processing {video_file_name}")

    file_path = os.path.join(upscaled_video_dir, video_file_name)

    task = twelve_labs_client.embed.task.create(
        model_name="Marengo-retrieval-2.7",
        video_file=file_path,
        video_clip_length=segment_length,
        video_embedding_scopes=["clip", "video"]
    )
    print(
        f"Created task: id={task.id} model_name={task.model_name} status={task.status}"
    )

    # Monitor the status of the video embedding task
    status = task.wait_for_done(
        sleep_interval=2,
        callback=on_task_update
    )
    print(f"Embedding done: {status}")

    marengo_task_ids[video_file_name] = task.id


Split our Remaining Videos into Segments

We’ll then split the remainder of our videos into segments like we did before.

# Create output folder if it doesn't exist
os.makedirs(upscaled_video_dir, exist_ok=True)

# Get all video files
video_files = [f for f in os.listdir(upscaled_video_dir) if f.endswith(('.mp4', '.avi', '.mov'))]

# Process each video
for video_file in video_files:
    split_video(upscaled_video_dir + video_file,video_segments_dir,segment_length)


Get Pegasus Video IDs for All Videos and their Segments

We’ll then get the Pegasus Video IDs for the remainder of the segments full videos. We’ll do this in parallel to save time.

import concurrent.futures
import os
from tqdm import tqdm  # Use standard tqdm instead of tqdm.notebook

def process_video(video_path):
    video_file_name = video_path.split("/")[-1]
    try:
        video_id = upload_video_to_twelve_labs_pegasus(video_path)
        return video_file_name, video_id
    except Exception as e:
        print(f"Error processing {video_file_name}: {str(e)}")
        return video_file_name, None

# Filter out videos that are already processed
segment_video_files = [ video_segments_dir + f for f in os.listdir(video_segments_dir) if f.endswith('.mp4')]
full_video_files = [ upscaled_video_dir + f for f in os.listdir(upscaled_video_dir) if f.endswith('.mp4')]
all_video_files = segment_video_files + full_video_files

videos_to_process = [f for f in all_video_files if f.split("/")[-1] not in pegasus_video_ids]

print(f"Processing {len(videos_to_process)} videos in parallel...")

# Use ThreadPoolExecutor for I/O-bound operations like API calls
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    # Submit all tasks and create a dictionary mapping futures to their video files
    future_to_video = {executor.submit(process_video, video_path): video_path for video_path in videos_to_process}
    
    # Process results as they complete with a progress bar
    for future in tqdm(concurrent.futures.as_completed(future_to_video), total=len(videos_to_process)):
        video_file_name, video_id = future.result()
        if video_id:
            pegasus_video_ids[video_file_name] = video_id

print("All videos processed!")
print(f"Successfully processed {len([v for v in pegasus_video_ids.values() if v is not None])} videos")


Upload Data to Weaviate

Next, we’ll upload the rest of our data to Weaviate.

records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids)

with collection.batch.dynamic() as batch:
    for i, record in enumerate(records):
        if record["pegasus_video_id"] is None:
            continue
        batch.add_object(
            properties=record,
            vector=vectors[i]
        )

print(f"Added {len(records)} embeddings to Weaviate")


RAG Performance Evaluation: Clips vs. Full Videos

With our Marengo embeddings and Pegasus video IDs successfully indexed in Weaviate, we can now evaluate the effectiveness of our RAG system. This evaluation will focus on two critical aspects:

  1. Answer Quality: How accurately does the system respond to queries when using clip-level retrieval versus full-video retrieval?

  2. Processing Efficiency: What are the performance differences in terms of response time and computational resources?

We'll run a series of targeted queries against both approaches - retrieving relevant video clips and retrieving entire videos - to measure these differences quantitatively. This comparison will demonstrate how RAG can significantly improve video processing by focusing only on the most relevant segments, particularly for longer videos or complex queries that reference specific moments.

Let's begin by defining a set of diverse test questions that span different sports and require understanding specific actions or events within our videos.

video_questions = [
    "In the American Football Video, what are the teams playing?", 
    "What technique does David Tyree use to catch the ball?",
    "In the tennis match video, who is playing?", 
    "What foot does Messi shoot at the goal with?",
    "When does Keri Strug hurt her foot?"
]


Multi Video RAG with Pegasus

First we will assess the performance querying full videos:

from weaviate.classes.query import MetadataQuery, Filter
import time

pegasus_full_video_answers = []

start_time = time.time()

for question in video_questions:

    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start",
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("video"))
    )

    selected_video_name = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    res = twelve_labs_client.generate.text(
        video_id=selected_video_id,
        prompt=question
    )

    pegasus_full_video_answers.append([question,selected_video_name,res.data])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 72 seconds

We’ll then compare this to clips.

pegasus_clip_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start",
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("clip"))
    )

    selected_video_name = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    res = twelve_labs_client.generate.text(
        video_id=selected_video_id,
        prompt=question
    )

    pegasus_clip_video_answers.append([question,selected_video_name,res.data])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 20 seconds

Now we’ll compare the answers from the selected clip vs the full video.

for clip_answer, full_answer in zip(pegasus_clip_video_answers, pegasus_full_video_answers):

    print("question",clip_answer[0])
    print("clip:  ",clip_answer[2])
    print("full:  ",full_answer[2])
    print("\n")

We see that the answers given are accurate and comparable to each other. However, the clip processing took 20 seconds, while the full video processing took 72 seconds.



Multi Video RAG with LLaVa-NeXT-Video

Now, we will run the same experiment with the LLaVa-NeXT-Video model. But, first we must sample all of our videos.

for video_file in os.listdir(video_segments_dir):
    print(video_file)
    sampled_video = sample_video(video_segments_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video

We’ll start by using the full video queries.

llava_full_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start"
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("video"))
    )

    selected_video_file = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    sampled_video = sampled_video_files[selected_video_file]
    generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video)

    llava_full_video_answers.append([question,selected_video_name,generated_text])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 24 seconds

We’ll then compare this to the clips.

from weaviate.classes.query import MetadataQuery

import time

llava_clip_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start"
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("clip"))
    )

    selected_video_file = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    sampled_video = sampled_video_files[selected_video_file]
    generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video)

    llava_clip_video_answers.append([question,selected_video_name,generated_text])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 24 seconds

We see that execution took the same amount of time for each. This is because we sample 40 frames from each video regardless of length.

Now, we’ll take a look at the answers that LLaVa-NeXT-Video gives for clips and full videos.

for clip_answer, full_answer in zip(llava_clip_video_answers, llava_full_video_answers):

    print("question",clip_answer[0])
    print("clip:  ",clip_answer[2])
    print("full:  ",full_answer[2])
    print("\n")

In this case LLaVa-NeXT-Video gets 2/5 correct while analyzing the clips:

  1. It correctly understands that the New York Giants and playing the New England Patriots in the first question; and

  2. It correctly identifies that the tennis match is between Roger Federer and Novak Djokovic in the third question.



7 - Conclusion: Leveraging RAG for Efficient Video Understanding with TwelveLabs and Weaviate

Our exploration of Retrieval-Augmented Generation (RAG) for video processing has demonstrated significant benefits in both efficiency and accuracy. By combining TwelveLabs' advanced video understanding capabilities with Weaviate's powerful vector database, we've created a system that intelligently processes only the most relevant video segments rather than entire videos.


Key Findings
  1. Performance Improvements: When using TwelveLabs' Pegasus with our Weaviate-powered RAG system, we significantly improved processing speeds by querying shorter, more relevant video clips instead of entire videos.

  2. Enhanced Accuracy: For open-source models like LLaVa-NeXT-Video, focusing on specific video segments dramatically improved answer accuracy, enabling more precise responses to queries about video content.

  3. Scalable Architecture: Our RAG pipeline demonstrates how TwelveLabs' embedding models (Marengo) and Weaviate's vector database create a powerful foundation for efficient video understanding. Weaviate's ability to store and retrieve high-dimensional embeddings with low latency is crucial for making this approach practical in real-world applications.


Use Cases

The integration of TwelveLabs' video understanding capabilities with Weaviate's vector database enables powerful applications across numerous industries:

  1. Media & Entertainment: Content creators can quickly locate specific scenes across large video libraries, enabling efficient editing, content repurposing, and clip generation for social media.

  2. Sports Analytics: Coaches and analysts can instantly retrieve relevant plays from game footage by describing the action they're looking for, without manually scrubbing through hours of video.

  3. Retail & E-commerce: Retailers can transform their product demonstration videos into interactive shopping experiences by enabling customers to ask specific questions like "How do I adjust the strap?" or "Show me how it fits in a backpack" and instantly receive the relevant video segment.

Together, TwelveLabs and Weaviate can create powerful video RAG systems that significantly enhance how we interact with and extract value from video content at scale.