Partnerships
Partnerships
Partnerships
Leveraging RAG for Improved Video Processing Times with TwelveLabs and Weaviate


James Le
James Le
James Le
In this post, we'll explore how to combine Twelve Labs' video understanding capabilities with Weaviate's vector database to create an efficient RAG system for video content.
In this post, we'll explore how to combine Twelve Labs' video understanding capabilities with Weaviate's vector database to create an efficient RAG system for video content.


Join our newsletter
Receive the latest advancements, tutorials, and industry insights in video understanding
Mar 18, 2025
Mar 18, 2025
Mar 18, 2025
25 Min
25 Min
25 Min
Copy link to article
Copy link to article
Copy link to article
Big thanks to Tuana Celik and Erika Cardenas from the Weaviate team for reviewing the draft!
Video processing is computationally expensive and time-consuming, especially when analyzing long-form content. Retrieval-Augmented Generation (RAG) offers a solution by enabling systems to process only the most relevant video segments rather than entire videos. This targeted approach significantly reduces processing time while maintaining or improving response quality.
In this post, we'll explore how to combine Twelve Labs' video understanding capabilities with Weaviate's vector database to create an efficient RAG system for video content. By segmenting videos and using embeddings to retrieve only the most relevant portions for analysis, we can significantly improve processing times while maintaining or even enhancing accuracy.
Our approach leverages several key technologies:
TwelveLabs Pegasus and Marengo models for video understanding and embedding generation
Weaviate vector database for efficient storage and retrieval of video segments
Open source LLaVA-NeXT-Video model as a comparison point for video analysis
We'll demonstrate how this RAG-based approach can reduce the computational load of video processing by focusing only on the most relevant segments, making it possible to analyze longer videos more efficiently. Whether you're building applications for content moderation, sports analysis, or educational content, this approach can help you scale your video processing capabilities while maintaining high-quality results.
1 - Set Up TwelveLabs and Weaviate
TwelveLabs
If you have not already signed up for Twelve Labs you can do so here. Once you have your account set up, go to the Playground, click on your user icon in the top right corner of the screen, and go to API Key
.
In your notebook click the key icon on the left and create a secret with this value as TL_API_KEY
.
Weaviate
If you do not have a Weaviate account, you can sign up here. Once you have an account, go to the cloud dashboard and create a new cluster. Once you have your cluster set up, you’ll need to populate two values in your notebook secrets section.
Add URL under REST Endpoint
to a WEAVIATE_URL
variable. Copy the Admin
key under API Keys
and save it to WEAVIATE_API_KEY
.
2 - Choose a GPU Runtime
We’ll need a GPU to run the LLaVA-NeXT-Video model. In your notebook go to Runtime > Change runtime type
and select a T4 GPU
.
3 - Setting Up Our Environment
Install Dependencies
First, we need to install the TwelveLabs and Weaviate SDKs:
!python -m pip install -U -q twelvelabs !python -m pip install -U -q "weaviate-client>=4.0.0"
We’ll then install the remaining dependencies.
!python -m pip install torch !python -m pip install -q av !python -m pip install --upgrade -q accelerate !python -m pip install -U bitsandbytes !python -m pip install git
!python -m pip install pillow !python -m pip install sentencepiece !python -m
Set Up TwelveLabs and Weaviate SDKs
from google.colab import userdata TL_API_KEY=userdata.get('TL_API_KEY') weaviate_url = userdata.get("WEAVIATE_URL") weaviate_api_key = userdata.get("WEAVIATE_API_KEY")
We’ll then initialize the TwelveLabs Client.
from twelvelabs import TwelveLabs # Initialize the Twelve Labs client twelve_labs_client = TwelveLabs(api_key=TL_API_KEY)
Finally, we’ll set up our Weaviate client and initialize a Video_Embeddings
collection.
import weaviate from weaviate.classes.init import Auth # Connect to Weaviate Cloud weaviate_client = weaviate.connect_to_weaviate_cloud( cluster_url=weaviate_url, auth_credentials=Auth.api_key(weaviate_api_key), ) # Get or create collection try: collection = weaviate_client.collections.get("Video_Embeddings") except: collection = weaviate_client.collections.create(name="Video_Embeddings")
Setting Up Our Video Data
Now, we need to get our video data for embedding. You can find the video data in a Google Drive folder using this link. Copy it to a folder called "TwelveLabs-Weaviate" in your base Google Drive folder. We'll use the following cell to mount your drive and give our notebook access to the video files.
from google.colab import drive drive.mount('/content/drive') base_folder_path = "/content/drive/MyDrive/TwelveLabs-Weaviate" raw_video_dir = base_folder_path + "/sports_videos" upscaled_video_dir = base_folder_path + "/upscaled_videos/" video_segments_dir = base_folder_path + "/video_segments/"
Upscaling Our Videos
Some of our videos are too low resolution for our embeddings model. We’ll need to upscale them before using them.
We’ll create the upscale function here. read_video_pyav
comes directly from the LLaVa-NeXT-Video collab notebook and it formats videos in the correct numpy representation for inference.
import av import numpy as np def upscale_video(input_file, output_file, target_width=1280, target_height=720): input_container = av.open(input_file) output_container = av.open(output_file, mode='w') input_stream = input_container.streams.video[0] output_stream = output_container.add_stream('libx264', rate=input_stream.average_rate) output_stream.width = target_width output_stream.height = target_height output_stream.pix_fmt = 'yuv420p' for frame in input_container.decode(input_stream): frame = frame.reformat(width=target_width, height=target_height) packet = output_stream.encode(frame) output_container.mux(packet) # Flush the encoder packet = output_stream.encode(None) output_container.mux(packet) # Close the containers input_container.close() output_container.close() def read_video_pyav(container, indices): ''' Decode the video with PyAV decoder. Args: container (av.container.input.InputContainer): PyAV container. indices (List[int]): List of frame indices to decode. Returns: np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3). ''' frames = [] container.seek(0) start_index = indices[0] end_index = indices[-1] for i, frame in enumerate(container.decode(video=0)): if i > end_index: break if i >= start_index and i in indices: frames.append(frame) return np.stack([x.to_ndarray(format="rgb24") for x in frames])
We’ll take the videos in our raw_video_dir
, upscale them, and save them to upscaled_video_dir
.
# Create output directory if it doesn't exist if not os.path.exists(upscaled_video_dir): os.makedirs(upscaled_video_dir) # Iterate over all files in the raw video directory for filename in os.listdir(raw_video_dir): # Check if the file is a video file if filename.endswith(".mp4"): print(filename) # Get the file name without extension input_file_no_ext = os.path.splitext(filename)[0] # Define the output file name output_file = f"{input_file_no_ext}_480.mp4" if output_file in os.listdir(upscaled_video_dir): continue # Define the full path for the input and output files input_file_path = os.path.join(raw_video_dir, filename) output_file_path = os.path.join(upscaled_video_dir, output_file) # Upscale the video upscale_video(input_file_path, output_file_path)
4 - Comparing Pegasus and LLaVa-NeXT-Video on a Single Video
Pegasus and LLaVa-NeXT-Video are both video understanding models that allow us to take a video and ask questions about it.
We’ll start by comparing Pegasus and LLaVa-NeXT-Video on a single video from our video collection. The video shows a sequence from Super Bowl XLII where the New York Giants and playing the New England Patriots. It is a famous catch called the “Helmet Catch” where Eli Manning, the Giants Quarterback, throws the ball to the Giants receiver, David Tyree, who successfully catches the ball against his helmet in the final two minutes of the game.
Now that we have context of the video, we will determine how well our two models are able to understand the video when asked “What is happening in this video?”.
Using Pegasus to Chat with our Video
Before we start, we need to set up a Pegasus index to store our video.
models = [ { "name": "pegasus1.2", "options": ["visual"] } ] index_name = "sports_videos" indices_list = twelve_labs_client.index.list(name=index_name) if len(indices_list) == 0: index = twelve_labs_client.index.create( name=index_name, models=models ) print(f"A new index has been created: id={index.id} name={index.name} models={index.models}") else: index = indices_list[0] print(f"Index already exists: id={index.id} name={index.name} models={index.models}")
We then create a function to upload our video to the index. This will return a Pegasus Video ID which we can use to ask questions with our video.
# Monitor the status of the video task def on_task_update(task): print(f" Status={task.status}") def upload_video_to_twelve_labs_pegasus(video_path): task = twelve_labs_client.task.create( index_id=index.id, file = video_path ) print(f"Task created: id={task.id} status={task.status}") task.wait_for_done(sleep_interval=5, callback=on_task_update) if task.status != "ready": raise RuntimeError(f"Indexing failed with status {task.status}") print(f"The unique identifer of your video is {task.video_id}.") return task.video_id
We’ll upload our video and save the Pegasus Video ID to single_video_id
.
# Define the video file path single_video_file = upscaled_video_dir + "football_480.mp4" single_video_id = upload_video_to_twelve_labs_pegasus(single_video_file)
We’ll ask “What is going on in this video? Please be concise.” to ensure that Pegasus really understand the video.
single_video_query = "What is going on in this video? Please be concise." res = twelve_labs_client.generate.text( video_id=single_video_id, prompt=single_video_query ) print(f"{res.data}")
Pegasus responds with this message:
The video showcases a pivotal moment in a football game between the New York Giants and the New England Patriots. Eli Manning, the Giants' quarterback, throws a pass that David Tyree catches spectacularly by pinning the ball against his helmet as he falls out of bounds. Multiple angles replay the catch, emphasizing its difficulty and precision. Tyree briefly celebrates after the play, and the video ends with him and other players walking off the field.
We can see from the response that Pegasus has a deep understanding of the video. It understands that this is a football game between the Giants and Patriots. It also understands that Eli Manning threw the ball, David Tyree caught it, and that it is a pivotal moment in the game.
Pegasus does not mention that it is the Super Bowl, so we’ll ask to make sure.
res = twelve_labs_client.generate.text( video_id=single_video_id, prompt="What game is this?" ) print(f"{res.data}")
Pegasus responds with This is the Super Bowl XLII game.
, which is correct.
Now, lets see how well LLaVa-NeXT-Video understands the video.
Using LLaVa-NeXT-Video to Chat with our Video
For LLaVa-NeXT-Video, we need to prepare our video data in a specific format before inference. This involves sampling frames uniformly throughout the video, as the model doesn't process the entire video stream at once. We'll create a sampling function that extracts 40 evenly distributed frames from each video, ensuring we capture the key moments throughout the content. This sampling approach is adapted from the official LLaVA-NeXT-Video implementation. After sampling, we'll load the model from Hugging Face Hub, format our inputs according to the model's requirements, and run inference to generate responses to our queries.
def sample_video(video_path, num_samples=8): container = av.open(video_path) # sample uniformly num_samples frames from the video total_frames = container.streams.video[0].frames indices = np.arange(0, total_frames, total_frames / num_samples).astype(int) sampled_frames = read_video_pyav(container, indices) return sampled_frames sampled_video = sample_video(single_video_file, num_samples=40)
Once the video is properly sampled, we’ll set up our model.
from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor import torch quantization_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16 ) llava_next_processor = LlavaNextVideoProcessor.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf") llava_next_model = LlavaNextVideoForConditionalGeneration.from_pretrained( "llava-hf/LLaVA-NeXT-Video-7B-hf", quantization_config=quantization_config, device_map='auto' )
Then we’ll create a function to query our model.
def query_llava_next(query,model,processor,sampled_video): # Each "content" is a list of dicts and you can add image/video/text modalities conversation = [ { "role": "user", "content": [ {"type": "text", "text": query}, {"type": "video"}, ], }, ] prompt = processor.apply_chat_template(conversation, add_generation_prompt=True) # prompt_len = len(prompt) inputs = processor([prompt], videos=[sampled_video], padding=True, return_tensors="pt").to(model.device) generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9} output = model.generate(**inputs, **generate_kwargs) generated_text = processor.batch_decode(output, skip_special_tokens=True) return generated_text[0]
Finally, we’ll ask the same questions to compare the output to Pegasus.
llava_next_result = query_llava_next(single_video_query,llava_next_model,llava_next_processor,sampled_video) print(llava_next_result)
We get this response:
What is happening in this video? Be concise ASSISTANT: The video shows a football game in progress, with various players on the field. It appears to be the Super Bowl III between the New York Giants and the New England Patriots, judging by the jersey numbers and the old-fashioned helmets worn by some players. One player is in mid-action, grabbing the ball and getting tackled by another player, while a referee is signaling a first down. There are also coaches and other game
While this model does recognize that there is a football game happening between the Giants and the Patriots, it incorrectly identifies the game as Super Bowl III. It also misses the most important part of the video - the helmet catch.
We’ll ask the same “what game is this?” question that we asked before. We see that the model is closer, but still incorrect.
llava_next_result = query_llava_next("what game is this?",llava_next_model,llava_next_processor,sampled_video) print(llava_next_result)
USER: what game is this? ASSISTANT: The image you've provided is of a football game in progress, specifically from Super Bowl XLI. It is the New England Patriots versus the Giants. The players in the image are Giants and Patriots.
5 - RAG for Segment-Level Queries on a Single Video
Our comparison shows that Pegasus delivers superior results when analyzing complete videos, offering more accurate and coherent responses with faster processing times.
However, we can likely improve the performance of our models by narrowing their focus to only the most relevant video segments. This is where Retrieval-Augmented Generation (RAG) becomes valuable - instead of processing entire videos, we can identify and analyze only the segments that contain information relevant to a specific query.
To implement this approach, we'll leverage TwelveLabs' Marengo model, which specializes in creating high-quality embeddings that capture the semantic content of video segments. These embeddings allow us to:
Index each segment of a video independently.
Match user queries to the most relevant segments.
Process only those specific segments with our video understanding models.
Let's begin by dividing our video into segments and generating embeddings for each one using the Marengo model. These embeddings will serve as the foundation of our RAG system.
Using Marengo to Create Full Video and Video Clip Embeddings
We set our segment length to 10 seconds - the maximum that Marengo allows.
# Define the video segment length segment_length = 10
We’ll then use Marengo to embed our video. Note: we set video_embedding_scopes=["clip", "video"]
and video_clip_length=segment_length
to ensure that Marengo returns embeddings for our entire video along with embeddings for each 10 second clip in the video.
task = twelve_labs_client.embed.task.create( model_name="Marengo-retrieval-2.7", video_file=single_video_file, video_clip_length=segment_length, video_embedding_scopes=["clip", "video"] ) print( f"Created task: id={task.id} model_name={task.model_name} status={task.status}" ) # Monitor the status of the video embedding task status = task.wait_for_done( sleep_interval=2, callback=on_task_update ) print(f"Embedding done: {status}")
Once the embedding is done, we can save the Marengo Task ID to retrieve those embeddings when we need them. We’ll store our Task ID in marengo_task_ids
for later use when populating our Weaviate databse.
single_video_task_id = task.id marengo_task_ids = {} single_video_file_name = single_video_file.split("/")[-1] marengo_task_ids[single_video_file_name] = single_video_task_id
Prepare Video Segments for RAG
To create an efficient RAG pipeline, we want to associate the Pegasus Video ID with the Marengo Task ID in our database. This will allow us to chat with that video segment when it is returned in our vector search. To do this we will also upload each segment of the video to Pegasus to be indexed.
First, we will create a split_video
function to split the video into 10 second segments to be uploaded to Pegasus. We also need to ensure that each segment is over 4 seconds long - the minimum for Pegasus. We do this by including some overlap between the last two clips if the last clip is less than 5 seconds long.
import os import subprocess import json def split_video(input_path, output_dir, segment_duration=10): """ Split a video into segments of the specified duration. Regular segments will be exactly segment_duration seconds. The last segment will be at least 5 seconds long, potentially overlapping with the previous segment if needed. Args: input_path: Path to the input video file output_dir: Directory to save the output segments segment_duration: Duration of each segment in seconds (default: 10) """ # Minimum length for the last segment min_last_segment_len = 5 # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) # Get base filename without extension base_name = os.path.splitext(os.path.basename(input_path))[0] # Get video duration using ffprobe probe_cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", input_path ] try: probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True) video_info = json.loads(probe_result.stdout) duration = float(video_info["format"]["duration"]) except Exception as e: print(f"Error getting video duration: {e}") return 0 # Calculate number of full segments num_full_segments = int(duration / segment_duration) # Calculate remaining duration remaining_duration = duration - (num_full_segments * segment_duration) # Determine total number of segments and if we need to adjust the last segment if remaining_duration > 0: if remaining_duration < min_last_segment_len: # Last segment would be too short, so we'll adjust its start time num_segments = num_full_segments + 1 needs_adjustment = True else: # Last segment is already long enough num_segments = num_full_segments + 1 needs_adjustment = False else: # No remaining duration, all segments are complete num_segments = num_full_segments needs_adjustment = False print(f"Video {base_name} is {duration:.2f} seconds long") print(f"Creating {num_segments} segments") # Create each segment for i in range(num_segments): # For regular segments, start at the segment boundary if i < num_full_segments: start_time = i * segment_duration actual_duration = segment_duration else: # This is the last segment if needs_adjustment: # Start earlier to ensure it's at least min_last_segment_len seconds start_time = duration - min_last_segment_len actual_duration = min_last_segment_len else: # Last segment is already long enough start_time = i * segment_duration actual_duration = remaining_duration output_path = os.path.join(output_dir, f"{base_name}_segment_{i:03d}.mp4") # For all segments, use copy mode for speed cmd = [ "ffmpeg", "-y", "-ss", str(start_time), "-i", input_path, "-t", str(actual_duration), "-c:v", "copy", "-c:a", "copy", output_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f"Error creating segment {i+1}: {result.stderr[:100]}...") else: end_time = start_time + actual_duration if i == num_segments - 1 and needs_adjustment: print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s (adjusted to ensure at least {min_last_segment_len}s)") else: print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s") print(f"Successfully split {base_name} into {num_segments} segments") return num_segments
We’ll save our videos to a new video_segments_dir
split_video(single_video_file, video_segments_dir,segment_length)
We’ll then create pegasus_video_ids
- a dictionary mapping file names with Pegasus Video IDs and add the Video ID for our full video.
pegasus_video_ids = {} fname = single_video_file.split("/")[-1] pegasus_video_ids[fname] = single_video_id
We’ll then upload our video segments to Pegasus and populate pegasus_video_ids
with their video IDs.
segment_video_files = [f for f in os.listdir(video_segments_dir) if f.endswith(('.mp4'))] # Process each video for segment_video_file in segment_video_files: if segment_video_file in pegasus_video_ids: print("skip file",segment_video_file) continue print("processing file",segment_video_file) try: video_id = upload_video_to_twelve_labs_pegasus(video_segments_dir+segment_video_file) pegasus_video_ids[segment_video_file] = video_id except: print("error",segment_video_file) continue
Finally we need to segment all of our videos for efficient use with the LLaVa-NeXT-Video model.
sampled_video_files = {} for video_file in os.listdir(video_segments_dir): print(video_file) sampled_video = sample_video(video_segments_dir + video_file,num_samples=40) sampled_video_files[video_file] = sampled_video for video_file in os.listdir(upscaled_video_dir): print(video_file) sampled_video = sample_video(upscaled_video_dir + video_file,num_samples=40) sampled_video_files[video_file] = sampled_video
Uploading Embeddings to Weaviate
Weaviate expects the metadata records and embeddings vectors to be separate when uploading to a collection. We’ll create prepare_marengo_embeddings_for_weaviate
to take our Marengo Task IDs and our Pegasus Video IDs and prepare the records
and vectors
for upload.
def prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids): # Prepare data for Weaviate upload records = [] vectors = [] for video_file_name in marengo_task_ids.keys(): marengo_task_id = marengo_task_ids[video_file_name] # Retrieve marengo full video and clip embeddings marengo_embeddings_result = twelve_labs_client.embed.task.retrieve(marengo_task_id) #track segment number to match with fiel segment_number = 0 for segment in marengo_embeddings_result.video_embedding.segments: # Determine if this is a video or clip segment is_video = segment.embedding_scope == "video" #Update the file name if segment updated_file_name = video_file_name if not is_video: updated_file_name = updated_file_name.replace(".mp4",f"_segment_{segment_number:03d}.mp4") segment_number += 1 video_name = video_file_name.replace(".mp4","") pegasus_video_id = None if updated_file_name in pegasus_video_ids: pegasus_video_id = pegasus_video_ids[updated_file_name] record = { 'video_name':video_name, 'segment_number': 0 if is_video else segment_number, 'video_file': updated_file_name, 'start_time': getattr(segment, 'start_offset_sec', 0), 'end_time': getattr(segment, 'end_offset_sec', 0), 'type': 'video' if is_video else 'clip', 'task_id': marengo_task_id, 'pegasus_video_id': pegasus_video_id } # Get the embedding vector embedding_vector = [float(x) for x in segment.embeddings_float] # Add to our lists records.append(record) vectors.append(embedding_vector) # Print summary print(f"Prepared {len(records)} segments for upload to Weaviate") print(f"- Video embeddings: {sum(1 for r in records if r['type'] == 'video')}") print(f"- Clip embeddings: {sum(1 for r in records if r['type'] == 'clip')}") return records, vectors
We’ll then use our function to get the records and vectors to upload to Weaviate.
records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids) with collection.batch.dynamic() as batch: for i, record in enumerate(records): batch.add_object( properties=record, vector=vectors[i] ) print(f"Added {len(records)} embeddings to Weaviate")
Testing Vector Search
Now that we have everything in the collection, we can test and see that a vector search on Weaviate returns the correct video. We're using Weaviate’s near_vector
search, so if we search with a video's vector it should return a distance of zero to itself.
We'll search using vector #5 from our collection, which should return the corresponding video segment with a distance of zero.
from weaviate.classes.query import MetadataQuery, Filter # Use a specific vector for the query query_vector = vectors[5] # Perform vector search response = collection.query.near_vector( near_vector=query_vector, limit=1, # Increased limit to get more results return_metadata=MetadataQuery(distance=True), ) print(f"Found {len(response.objects)} results for vector search") for obj in response.objects: print(f"Video: {obj.properties['video_file']}, Type: {obj.properties['type']}") if 'segment_id' in obj.properties: print(f"Segment: {obj.properties['segment_id']}") if 'text' in obj.properties and obj.properties['text']: print(f"Text: {obj.properties['text']}") print(f"Distance: {obj.metadata.distance}") print("-" * 50)
This output confirms our embeddings were properly stored and can be accurately retrieved.
Retrieving Relevant Video Segments for RAG
The core of our RAG pipeline is the ability to match user questions with the most relevant video segments. This process works in three key steps:
We use TwelveLabs' Marengo model to convert the user's text query into a vector embedding
We search Weaviate for video segment embeddings that are most similar to our query embedding
Once we identify the most relevant video segment, we use its associated Pegasus video ID to generate accurate responses specific to that segment
This targeted approach allows us to process only the most relevant portions of video content, significantly improving both efficiency and response quality.
First, we use Marengo to embed our text query.
sample_question = "What technique did David Tyree use to catch the ball?" embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=sample_question, text_truncate="start", ) query_vector = embedding.text_embedding.segments[0].embeddings_float
Then we find the most relevant clip. We use filters=(Filter.by_property("type").equal("clip"))
to return just the clip embeddings, ignoring the full video embeddings.
response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("clip")) ) video_file = response.objects[0].properties.get("video_file") print(video_file)
We see that it returned the fourth clip (index 3) football_480_segment_003.mp4
Lets take a look at the clip:
import matplotlib.pyplot as plt from matplotlib import animation from IPython.display import HTML video_file = response.objects[0].properties.get("video_file") video = sampled_video_files[video_file] fig = plt.figure() im = plt.imshow(video[0,:,:,:]) plt.close() # this is required to not display the generated image def init(): im.set_data(video[0,:,:,:]) def animate(i): im.set_data(video[i,:,:,:]) return im anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0], interval=100) HTML(anim.to_html5_video())
We see that it finds the exact moment in the video where the helmet catch is made.
Now that we know we have the proper segment, we can see how Pegasus and LLaVa-NeXT-Video perform with a shorter clip.
Chatting with our Video Segment: Pegasus vs LLaVa-NeTX-Video
First we will see how Pegasus answers.
pegasus_video_id = response.objects[0].properties.get("pegasus_video_id") print(sample_question) res = twelve_labs_client.generate.text( video_id=pegasus_video_id, prompt=sample_question ) print(f"{res.data}")
What technique did David Tyree use to catch the ball?
David Tyree used a technique where he pressed the ball against his helmet to secure the catch. This was a crucial play that allowed the New York Giants to maintain possession and continue their drive.
We see that Pegasus gives us a great answer - mentioning the helmet catch and how it is a cricial play for the Giants.
Now, lets see if LLaVa-NeTX-Video gives a better answer when looking at a segment.
video_file = response.objects[0].properties.get("video_file") sampled_video = sampled_video_files[video_file] generated_text = query_llava_next(sample_question,llava_next_model,llava_next_processor,sampled_video) print(generated_text)
USER: What technique did David Tyree use to catch the ball? ASSISTANT: The player who was caught in the act of catching the ball used a two-handed, over-the-head catch technique. This technique involves using both hands to secure the ball while elevating it above the receiver's head. It can be a risky maneuver if the ball is deflected from the intended target, but it can also be a very effective way to grab the ball out of the air or catch it in a traffic jam. David Tyree'
We see that it gives an accurate answer that the ball was caught overhead. However it does ot mention that it was a helmet catch. It also starts to ramble a bit at the end.
6 - Multi Video RAG with Marengo, Weaviate, and Pegasus
Now that we know how Marengo embeddings perform on individual clips from a single video, we will show how to use embeddings across multiple videos for a more realistic RAG use case.
Get Marengo Embeddings for All Videos
First, we’ll update the marengo_task_ids
dictionary with Marengo Task IDs for all of our videos.
for video_file_name in os.listdir(upscaled_video_dir): if video_file_name in marengo_task_ids: print(f"skipping {video_file_name} because embeddings already exist") continue print(f"processing {video_file_name}") file_path = os.path.join(upscaled_video_dir, video_file_name) task = twelve_labs_client.embed.task.create( model_name="Marengo-retrieval-2.7", video_file=file_path, video_clip_length=segment_length, video_embedding_scopes=["clip", "video"] ) print( f"Created task: id={task.id} model_name={task.model_name} status={task.status}" ) # Monitor the status of the video embedding task status = task.wait_for_done( sleep_interval=2, callback=on_task_update ) print(f"Embedding done: {status}") marengo_task_ids[video_file_name] = task.id
Split our Remaining Videos into Segments
We’ll then split the remainder of our videos into segments like we did before.
# Create output folder if it doesn't exist os.makedirs(upscaled_video_dir, exist_ok=True) # Get all video files video_files = [f for f in os.listdir(upscaled_video_dir) if f.endswith(('.mp4', '.avi', '.mov'))] # Process each video for video_file in video_files: split_video(upscaled_video_dir + video_file,video_segments_dir,segment_length)
Get Pegasus Video IDs for All Videos and their Segments
We’ll then get the Pegasus Video IDs for the remainder of the segments full videos. We’ll do this in parallel to save time.
import concurrent.futures import os from tqdm import tqdm # Use standard tqdm instead of tqdm.notebook def process_video(video_path): video_file_name = video_path.split("/")[-1] try: video_id = upload_video_to_twelve_labs_pegasus(video_path) return video_file_name, video_id except Exception as e: print(f"Error processing {video_file_name}: {str(e)}") return video_file_name, None # Filter out videos that are already processed segment_video_files = [ video_segments_dir + f for f in os.listdir(video_segments_dir) if f.endswith('.mp4')] full_video_files = [ upscaled_video_dir + f for f in os.listdir(upscaled_video_dir) if f.endswith('.mp4')] all_video_files = segment_video_files + full_video_files videos_to_process = [f for f in all_video_files if f.split("/")[-1] not in pegasus_video_ids] print(f"Processing {len(videos_to_process)} videos in parallel...") # Use ThreadPoolExecutor for I/O-bound operations like API calls with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: # Submit all tasks and create a dictionary mapping futures to their video files future_to_video = {executor.submit(process_video, video_path): video_path for video_path in videos_to_process} # Process results as they complete with a progress bar for future in tqdm(concurrent.futures.as_completed(future_to_video), total=len(videos_to_process)): video_file_name, video_id = future.result() if video_id: pegasus_video_ids[video_file_name] = video_id print("All videos processed!") print(f"Successfully processed {len([v for v in pegasus_video_ids.values() if v is not None])} videos")
Upload Data to Weaviate
Next, we’ll upload the rest of our data to Weaviate.
records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids) with collection.batch.dynamic() as batch: for i, record in enumerate(records): if record["pegasus_video_id"] is None: continue batch.add_object( properties=record, vector=vectors[i] ) print(f"Added {len(records)} embeddings to Weaviate")
RAG Performance Evaluation: Clips vs. Full Videos
With our Marengo embeddings and Pegasus video IDs successfully indexed in Weaviate, we can now evaluate the effectiveness of our RAG system. This evaluation will focus on two critical aspects:
Answer Quality: How accurately does the system respond to queries when using clip-level retrieval versus full-video retrieval?
Processing Efficiency: What are the performance differences in terms of response time and computational resources?
We'll run a series of targeted queries against both approaches - retrieving relevant video clips and retrieving entire videos - to measure these differences quantitatively. This comparison will demonstrate how RAG can significantly improve video processing by focusing only on the most relevant segments, particularly for longer videos or complex queries that reference specific moments.
Let's begin by defining a set of diverse test questions that span different sports and require understanding specific actions or events within our videos.
video_questions = [ "In the American Football Video, what are the teams playing?", "What technique does David Tyree use to catch the ball?", "In the tennis match video, who is playing?", "What foot does Messi shoot at the goal with?", "When does Keri Strug hurt her foot?" ]
Multi Video RAG with Pegasus
First we will assess the performance querying full videos:
from weaviate.classes.query import MetadataQuery, Filter import time pegasus_full_video_answers = [] start_time = time.time() for question in video_questions: embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=question, text_truncate="start", ) query_vector = embedding.text_embedding.segments[0].embeddings_float response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("video")) ) selected_video_name = response.objects[0].properties["video_file"] selected_video_id = response.objects[0].properties["pegasus_video_id"] res = twelve_labs_client.generate.text( video_id=selected_video_id, prompt=question ) pegasus_full_video_answers.append([question,selected_video_name,res.data]) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {int(execution_time)} seconds")
Execution time: 72 seconds
We’ll then compare this to clips.
pegasus_clip_video_answers = [] start_time = time.time() for question in video_questions: embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=question, text_truncate="start", ) query_vector = embedding.text_embedding.segments[0].embeddings_float response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("clip")) ) selected_video_name = response.objects[0].properties["video_file"] selected_video_id = response.objects[0].properties["pegasus_video_id"] res = twelve_labs_client.generate.text( video_id=selected_video_id, prompt=question ) pegasus_clip_video_answers.append([question,selected_video_name,res.data]) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {int(execution_time)} seconds")
Execution time: 20 seconds
Now we’ll compare the answers from the selected clip vs the full video.
for clip_answer, full_answer in zip(pegasus_clip_video_answers, pegasus_full_video_answers): print("question",clip_answer[0]) print("clip: ",clip_answer[2]) print("full: ",full_answer[2]) print("\n")
We see that the answers given are accurate and comparable to each other. However, the clip processing took 20 seconds, while the full video processing took 72 seconds.
Multi Video RAG with LLaVa-NeXT-Video
Now, we will run the same experiment with the LLaVa-NeXT-Video model. But, first we must sample all of our videos.
for video_file in os.listdir(video_segments_dir): print(video_file) sampled_video = sample_video(video_segments_dir + video_file,num_samples=40) sampled_video_files[video_file] = sampled_video
We’ll start by using the full video queries.
llava_full_video_answers = [] start_time = time.time() for question in video_questions: embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=question, text_truncate="start" ) query_vector = embedding.text_embedding.segments[0].embeddings_float response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("video")) ) selected_video_file = response.objects[0].properties["video_file"] selected_video_id = response.objects[0].properties["pegasus_video_id"] sampled_video = sampled_video_files[selected_video_file] generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video) llava_full_video_answers.append([question,selected_video_name,generated_text]) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {int(execution_time)} seconds")
Execution time: 24 seconds
We’ll then compare this to the clips.
from weaviate.classes.query import MetadataQuery import time llava_clip_video_answers = [] start_time = time.time() for question in video_questions: embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=question, text_truncate="start" ) query_vector = embedding.text_embedding.segments[0].embeddings_float response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("clip")) ) selected_video_file = response.objects[0].properties["video_file"] selected_video_id = response.objects[0].properties["pegasus_video_id"] sampled_video = sampled_video_files[selected_video_file] generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video) llava_clip_video_answers.append([question,selected_video_name,generated_text]) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {int(execution_time)} seconds")
Execution time: 24 seconds
We see that execution took the same amount of time for each. This is because we sample 40 frames from each video regardless of length.
Now, we’ll take a look at the answers that LLaVa-NeXT-Video gives for clips and full videos.
for clip_answer, full_answer in zip(llava_clip_video_answers, llava_full_video_answers): print("question",clip_answer[0]) print("clip: ",clip_answer[2]) print("full: ",full_answer[2]) print("\n")
In this case LLaVa-NeXT-Video gets 2/5 correct while analyzing the clips:
It correctly understands that the New York Giants and playing the New England Patriots in the first question; and
It correctly identifies that the tennis match is between Roger Federer and Novak Djokovic in the third question.
7 - Conclusion: Leveraging RAG for Efficient Video Understanding with TwelveLabs and Weaviate
Our exploration of Retrieval-Augmented Generation (RAG) for video processing has demonstrated significant benefits in both efficiency and accuracy. By combining TwelveLabs' advanced video understanding capabilities with Weaviate's powerful vector database, we've created a system that intelligently processes only the most relevant video segments rather than entire videos.
Key Findings
Performance Improvements: When using TwelveLabs' Pegasus with our Weaviate-powered RAG system, we significantly improved processing speeds by querying shorter, more relevant video clips instead of entire videos.
Enhanced Accuracy: For open-source models like LLaVa-NeXT-Video, focusing on specific video segments dramatically improved answer accuracy, enabling more precise responses to queries about video content.
Scalable Architecture: Our RAG pipeline demonstrates how TwelveLabs' embedding models (Marengo) and Weaviate's vector database create a powerful foundation for efficient video understanding. Weaviate's ability to store and retrieve high-dimensional embeddings with low latency is crucial for making this approach practical in real-world applications.
Use Cases
The integration of TwelveLabs' video understanding capabilities with Weaviate's vector database enables powerful applications across numerous industries:
Media & Entertainment: Content creators can quickly locate specific scenes across large video libraries, enabling efficient editing, content repurposing, and clip generation for social media.
Sports Analytics: Coaches and analysts can instantly retrieve relevant plays from game footage by describing the action they're looking for, without manually scrubbing through hours of video.
Retail & E-commerce: Retailers can transform their product demonstration videos into interactive shopping experiences by enabling customers to ask specific questions like "How do I adjust the strap?" or "Show me how it fits in a backpack" and instantly receive the relevant video segment.
Together, TwelveLabs and Weaviate can create powerful video RAG systems that significantly enhance how we interact with and extract value from video content at scale.
Big thanks to Tuana Celik and Erika Cardenas from the Weaviate team for reviewing the draft!
Video processing is computationally expensive and time-consuming, especially when analyzing long-form content. Retrieval-Augmented Generation (RAG) offers a solution by enabling systems to process only the most relevant video segments rather than entire videos. This targeted approach significantly reduces processing time while maintaining or improving response quality.
In this post, we'll explore how to combine Twelve Labs' video understanding capabilities with Weaviate's vector database to create an efficient RAG system for video content. By segmenting videos and using embeddings to retrieve only the most relevant portions for analysis, we can significantly improve processing times while maintaining or even enhancing accuracy.
Our approach leverages several key technologies:
TwelveLabs Pegasus and Marengo models for video understanding and embedding generation
Weaviate vector database for efficient storage and retrieval of video segments
Open source LLaVA-NeXT-Video model as a comparison point for video analysis
We'll demonstrate how this RAG-based approach can reduce the computational load of video processing by focusing only on the most relevant segments, making it possible to analyze longer videos more efficiently. Whether you're building applications for content moderation, sports analysis, or educational content, this approach can help you scale your video processing capabilities while maintaining high-quality results.
1 - Set Up TwelveLabs and Weaviate
TwelveLabs
If you have not already signed up for Twelve Labs you can do so here. Once you have your account set up, go to the Playground, click on your user icon in the top right corner of the screen, and go to API Key
.
In your notebook click the key icon on the left and create a secret with this value as TL_API_KEY
.
Weaviate
If you do not have a Weaviate account, you can sign up here. Once you have an account, go to the cloud dashboard and create a new cluster. Once you have your cluster set up, you’ll need to populate two values in your notebook secrets section.
Add URL under REST Endpoint
to a WEAVIATE_URL
variable. Copy the Admin
key under API Keys
and save it to WEAVIATE_API_KEY
.
2 - Choose a GPU Runtime
We’ll need a GPU to run the LLaVA-NeXT-Video model. In your notebook go to Runtime > Change runtime type
and select a T4 GPU
.
3 - Setting Up Our Environment
Install Dependencies
First, we need to install the TwelveLabs and Weaviate SDKs:
!python -m pip install -U -q twelvelabs !python -m pip install -U -q "weaviate-client>=4.0.0"
We’ll then install the remaining dependencies.
!python -m pip install torch !python -m pip install -q av !python -m pip install --upgrade -q accelerate !python -m pip install -U bitsandbytes !python -m pip install git
!python -m pip install pillow !python -m pip install sentencepiece !python -m
Set Up TwelveLabs and Weaviate SDKs
from google.colab import userdata TL_API_KEY=userdata.get('TL_API_KEY') weaviate_url = userdata.get("WEAVIATE_URL") weaviate_api_key = userdata.get("WEAVIATE_API_KEY")
We’ll then initialize the TwelveLabs Client.
from twelvelabs import TwelveLabs # Initialize the Twelve Labs client twelve_labs_client = TwelveLabs(api_key=TL_API_KEY)
Finally, we’ll set up our Weaviate client and initialize a Video_Embeddings
collection.
import weaviate from weaviate.classes.init import Auth # Connect to Weaviate Cloud weaviate_client = weaviate.connect_to_weaviate_cloud( cluster_url=weaviate_url, auth_credentials=Auth.api_key(weaviate_api_key), ) # Get or create collection try: collection = weaviate_client.collections.get("Video_Embeddings") except: collection = weaviate_client.collections.create(name="Video_Embeddings")
Setting Up Our Video Data
Now, we need to get our video data for embedding. You can find the video data in a Google Drive folder using this link. Copy it to a folder called "TwelveLabs-Weaviate" in your base Google Drive folder. We'll use the following cell to mount your drive and give our notebook access to the video files.
from google.colab import drive drive.mount('/content/drive') base_folder_path = "/content/drive/MyDrive/TwelveLabs-Weaviate" raw_video_dir = base_folder_path + "/sports_videos" upscaled_video_dir = base_folder_path + "/upscaled_videos/" video_segments_dir = base_folder_path + "/video_segments/"
Upscaling Our Videos
Some of our videos are too low resolution for our embeddings model. We’ll need to upscale them before using them.
We’ll create the upscale function here. read_video_pyav
comes directly from the LLaVa-NeXT-Video collab notebook and it formats videos in the correct numpy representation for inference.
import av import numpy as np def upscale_video(input_file, output_file, target_width=1280, target_height=720): input_container = av.open(input_file) output_container = av.open(output_file, mode='w') input_stream = input_container.streams.video[0] output_stream = output_container.add_stream('libx264', rate=input_stream.average_rate) output_stream.width = target_width output_stream.height = target_height output_stream.pix_fmt = 'yuv420p' for frame in input_container.decode(input_stream): frame = frame.reformat(width=target_width, height=target_height) packet = output_stream.encode(frame) output_container.mux(packet) # Flush the encoder packet = output_stream.encode(None) output_container.mux(packet) # Close the containers input_container.close() output_container.close() def read_video_pyav(container, indices): ''' Decode the video with PyAV decoder. Args: container (av.container.input.InputContainer): PyAV container. indices (List[int]): List of frame indices to decode. Returns: np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3). ''' frames = [] container.seek(0) start_index = indices[0] end_index = indices[-1] for i, frame in enumerate(container.decode(video=0)): if i > end_index: break if i >= start_index and i in indices: frames.append(frame) return np.stack([x.to_ndarray(format="rgb24") for x in frames])
We’ll take the videos in our raw_video_dir
, upscale them, and save them to upscaled_video_dir
.
# Create output directory if it doesn't exist if not os.path.exists(upscaled_video_dir): os.makedirs(upscaled_video_dir) # Iterate over all files in the raw video directory for filename in os.listdir(raw_video_dir): # Check if the file is a video file if filename.endswith(".mp4"): print(filename) # Get the file name without extension input_file_no_ext = os.path.splitext(filename)[0] # Define the output file name output_file = f"{input_file_no_ext}_480.mp4" if output_file in os.listdir(upscaled_video_dir): continue # Define the full path for the input and output files input_file_path = os.path.join(raw_video_dir, filename) output_file_path = os.path.join(upscaled_video_dir, output_file) # Upscale the video upscale_video(input_file_path, output_file_path)
4 - Comparing Pegasus and LLaVa-NeXT-Video on a Single Video
Pegasus and LLaVa-NeXT-Video are both video understanding models that allow us to take a video and ask questions about it.
We’ll start by comparing Pegasus and LLaVa-NeXT-Video on a single video from our video collection. The video shows a sequence from Super Bowl XLII where the New York Giants and playing the New England Patriots. It is a famous catch called the “Helmet Catch” where Eli Manning, the Giants Quarterback, throws the ball to the Giants receiver, David Tyree, who successfully catches the ball against his helmet in the final two minutes of the game.
Now that we have context of the video, we will determine how well our two models are able to understand the video when asked “What is happening in this video?”.
Using Pegasus to Chat with our Video
Before we start, we need to set up a Pegasus index to store our video.
models = [ { "name": "pegasus1.2", "options": ["visual"] } ] index_name = "sports_videos" indices_list = twelve_labs_client.index.list(name=index_name) if len(indices_list) == 0: index = twelve_labs_client.index.create( name=index_name, models=models ) print(f"A new index has been created: id={index.id} name={index.name} models={index.models}") else: index = indices_list[0] print(f"Index already exists: id={index.id} name={index.name} models={index.models}")
We then create a function to upload our video to the index. This will return a Pegasus Video ID which we can use to ask questions with our video.
# Monitor the status of the video task def on_task_update(task): print(f" Status={task.status}") def upload_video_to_twelve_labs_pegasus(video_path): task = twelve_labs_client.task.create( index_id=index.id, file = video_path ) print(f"Task created: id={task.id} status={task.status}") task.wait_for_done(sleep_interval=5, callback=on_task_update) if task.status != "ready": raise RuntimeError(f"Indexing failed with status {task.status}") print(f"The unique identifer of your video is {task.video_id}.") return task.video_id
We’ll upload our video and save the Pegasus Video ID to single_video_id
.
# Define the video file path single_video_file = upscaled_video_dir + "football_480.mp4" single_video_id = upload_video_to_twelve_labs_pegasus(single_video_file)
We’ll ask “What is going on in this video? Please be concise.” to ensure that Pegasus really understand the video.
single_video_query = "What is going on in this video? Please be concise." res = twelve_labs_client.generate.text( video_id=single_video_id, prompt=single_video_query ) print(f"{res.data}")
Pegasus responds with this message:
The video showcases a pivotal moment in a football game between the New York Giants and the New England Patriots. Eli Manning, the Giants' quarterback, throws a pass that David Tyree catches spectacularly by pinning the ball against his helmet as he falls out of bounds. Multiple angles replay the catch, emphasizing its difficulty and precision. Tyree briefly celebrates after the play, and the video ends with him and other players walking off the field.
We can see from the response that Pegasus has a deep understanding of the video. It understands that this is a football game between the Giants and Patriots. It also understands that Eli Manning threw the ball, David Tyree caught it, and that it is a pivotal moment in the game.
Pegasus does not mention that it is the Super Bowl, so we’ll ask to make sure.
res = twelve_labs_client.generate.text( video_id=single_video_id, prompt="What game is this?" ) print(f"{res.data}")
Pegasus responds with This is the Super Bowl XLII game.
, which is correct.
Now, lets see how well LLaVa-NeXT-Video understands the video.
Using LLaVa-NeXT-Video to Chat with our Video
For LLaVa-NeXT-Video, we need to prepare our video data in a specific format before inference. This involves sampling frames uniformly throughout the video, as the model doesn't process the entire video stream at once. We'll create a sampling function that extracts 40 evenly distributed frames from each video, ensuring we capture the key moments throughout the content. This sampling approach is adapted from the official LLaVA-NeXT-Video implementation. After sampling, we'll load the model from Hugging Face Hub, format our inputs according to the model's requirements, and run inference to generate responses to our queries.
def sample_video(video_path, num_samples=8): container = av.open(video_path) # sample uniformly num_samples frames from the video total_frames = container.streams.video[0].frames indices = np.arange(0, total_frames, total_frames / num_samples).astype(int) sampled_frames = read_video_pyav(container, indices) return sampled_frames sampled_video = sample_video(single_video_file, num_samples=40)
Once the video is properly sampled, we’ll set up our model.
from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor import torch quantization_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16 ) llava_next_processor = LlavaNextVideoProcessor.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf") llava_next_model = LlavaNextVideoForConditionalGeneration.from_pretrained( "llava-hf/LLaVA-NeXT-Video-7B-hf", quantization_config=quantization_config, device_map='auto' )
Then we’ll create a function to query our model.
def query_llava_next(query,model,processor,sampled_video): # Each "content" is a list of dicts and you can add image/video/text modalities conversation = [ { "role": "user", "content": [ {"type": "text", "text": query}, {"type": "video"}, ], }, ] prompt = processor.apply_chat_template(conversation, add_generation_prompt=True) # prompt_len = len(prompt) inputs = processor([prompt], videos=[sampled_video], padding=True, return_tensors="pt").to(model.device) generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9} output = model.generate(**inputs, **generate_kwargs) generated_text = processor.batch_decode(output, skip_special_tokens=True) return generated_text[0]
Finally, we’ll ask the same questions to compare the output to Pegasus.
llava_next_result = query_llava_next(single_video_query,llava_next_model,llava_next_processor,sampled_video) print(llava_next_result)
We get this response:
What is happening in this video? Be concise ASSISTANT: The video shows a football game in progress, with various players on the field. It appears to be the Super Bowl III between the New York Giants and the New England Patriots, judging by the jersey numbers and the old-fashioned helmets worn by some players. One player is in mid-action, grabbing the ball and getting tackled by another player, while a referee is signaling a first down. There are also coaches and other game
While this model does recognize that there is a football game happening between the Giants and the Patriots, it incorrectly identifies the game as Super Bowl III. It also misses the most important part of the video - the helmet catch.
We’ll ask the same “what game is this?” question that we asked before. We see that the model is closer, but still incorrect.
llava_next_result = query_llava_next("what game is this?",llava_next_model,llava_next_processor,sampled_video) print(llava_next_result)
USER: what game is this? ASSISTANT: The image you've provided is of a football game in progress, specifically from Super Bowl XLI. It is the New England Patriots versus the Giants. The players in the image are Giants and Patriots.
5 - RAG for Segment-Level Queries on a Single Video
Our comparison shows that Pegasus delivers superior results when analyzing complete videos, offering more accurate and coherent responses with faster processing times.
However, we can likely improve the performance of our models by narrowing their focus to only the most relevant video segments. This is where Retrieval-Augmented Generation (RAG) becomes valuable - instead of processing entire videos, we can identify and analyze only the segments that contain information relevant to a specific query.
To implement this approach, we'll leverage TwelveLabs' Marengo model, which specializes in creating high-quality embeddings that capture the semantic content of video segments. These embeddings allow us to:
Index each segment of a video independently.
Match user queries to the most relevant segments.
Process only those specific segments with our video understanding models.
Let's begin by dividing our video into segments and generating embeddings for each one using the Marengo model. These embeddings will serve as the foundation of our RAG system.
Using Marengo to Create Full Video and Video Clip Embeddings
We set our segment length to 10 seconds - the maximum that Marengo allows.
# Define the video segment length segment_length = 10
We’ll then use Marengo to embed our video. Note: we set video_embedding_scopes=["clip", "video"]
and video_clip_length=segment_length
to ensure that Marengo returns embeddings for our entire video along with embeddings for each 10 second clip in the video.
task = twelve_labs_client.embed.task.create( model_name="Marengo-retrieval-2.7", video_file=single_video_file, video_clip_length=segment_length, video_embedding_scopes=["clip", "video"] ) print( f"Created task: id={task.id} model_name={task.model_name} status={task.status}" ) # Monitor the status of the video embedding task status = task.wait_for_done( sleep_interval=2, callback=on_task_update ) print(f"Embedding done: {status}")
Once the embedding is done, we can save the Marengo Task ID to retrieve those embeddings when we need them. We’ll store our Task ID in marengo_task_ids
for later use when populating our Weaviate databse.
single_video_task_id = task.id marengo_task_ids = {} single_video_file_name = single_video_file.split("/")[-1] marengo_task_ids[single_video_file_name] = single_video_task_id
Prepare Video Segments for RAG
To create an efficient RAG pipeline, we want to associate the Pegasus Video ID with the Marengo Task ID in our database. This will allow us to chat with that video segment when it is returned in our vector search. To do this we will also upload each segment of the video to Pegasus to be indexed.
First, we will create a split_video
function to split the video into 10 second segments to be uploaded to Pegasus. We also need to ensure that each segment is over 4 seconds long - the minimum for Pegasus. We do this by including some overlap between the last two clips if the last clip is less than 5 seconds long.
import os import subprocess import json def split_video(input_path, output_dir, segment_duration=10): """ Split a video into segments of the specified duration. Regular segments will be exactly segment_duration seconds. The last segment will be at least 5 seconds long, potentially overlapping with the previous segment if needed. Args: input_path: Path to the input video file output_dir: Directory to save the output segments segment_duration: Duration of each segment in seconds (default: 10) """ # Minimum length for the last segment min_last_segment_len = 5 # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) # Get base filename without extension base_name = os.path.splitext(os.path.basename(input_path))[0] # Get video duration using ffprobe probe_cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", input_path ] try: probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True) video_info = json.loads(probe_result.stdout) duration = float(video_info["format"]["duration"]) except Exception as e: print(f"Error getting video duration: {e}") return 0 # Calculate number of full segments num_full_segments = int(duration / segment_duration) # Calculate remaining duration remaining_duration = duration - (num_full_segments * segment_duration) # Determine total number of segments and if we need to adjust the last segment if remaining_duration > 0: if remaining_duration < min_last_segment_len: # Last segment would be too short, so we'll adjust its start time num_segments = num_full_segments + 1 needs_adjustment = True else: # Last segment is already long enough num_segments = num_full_segments + 1 needs_adjustment = False else: # No remaining duration, all segments are complete num_segments = num_full_segments needs_adjustment = False print(f"Video {base_name} is {duration:.2f} seconds long") print(f"Creating {num_segments} segments") # Create each segment for i in range(num_segments): # For regular segments, start at the segment boundary if i < num_full_segments: start_time = i * segment_duration actual_duration = segment_duration else: # This is the last segment if needs_adjustment: # Start earlier to ensure it's at least min_last_segment_len seconds start_time = duration - min_last_segment_len actual_duration = min_last_segment_len else: # Last segment is already long enough start_time = i * segment_duration actual_duration = remaining_duration output_path = os.path.join(output_dir, f"{base_name}_segment_{i:03d}.mp4") # For all segments, use copy mode for speed cmd = [ "ffmpeg", "-y", "-ss", str(start_time), "-i", input_path, "-t", str(actual_duration), "-c:v", "copy", "-c:a", "copy", output_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f"Error creating segment {i+1}: {result.stderr[:100]}...") else: end_time = start_time + actual_duration if i == num_segments - 1 and needs_adjustment: print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s (adjusted to ensure at least {min_last_segment_len}s)") else: print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s") print(f"Successfully split {base_name} into {num_segments} segments") return num_segments
We’ll save our videos to a new video_segments_dir
split_video(single_video_file, video_segments_dir,segment_length)
We’ll then create pegasus_video_ids
- a dictionary mapping file names with Pegasus Video IDs and add the Video ID for our full video.
pegasus_video_ids = {} fname = single_video_file.split("/")[-1] pegasus_video_ids[fname] = single_video_id
We’ll then upload our video segments to Pegasus and populate pegasus_video_ids
with their video IDs.
segment_video_files = [f for f in os.listdir(video_segments_dir) if f.endswith(('.mp4'))] # Process each video for segment_video_file in segment_video_files: if segment_video_file in pegasus_video_ids: print("skip file",segment_video_file) continue print("processing file",segment_video_file) try: video_id = upload_video_to_twelve_labs_pegasus(video_segments_dir+segment_video_file) pegasus_video_ids[segment_video_file] = video_id except: print("error",segment_video_file) continue
Finally we need to segment all of our videos for efficient use with the LLaVa-NeXT-Video model.
sampled_video_files = {} for video_file in os.listdir(video_segments_dir): print(video_file) sampled_video = sample_video(video_segments_dir + video_file,num_samples=40) sampled_video_files[video_file] = sampled_video for video_file in os.listdir(upscaled_video_dir): print(video_file) sampled_video = sample_video(upscaled_video_dir + video_file,num_samples=40) sampled_video_files[video_file] = sampled_video
Uploading Embeddings to Weaviate
Weaviate expects the metadata records and embeddings vectors to be separate when uploading to a collection. We’ll create prepare_marengo_embeddings_for_weaviate
to take our Marengo Task IDs and our Pegasus Video IDs and prepare the records
and vectors
for upload.
def prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids): # Prepare data for Weaviate upload records = [] vectors = [] for video_file_name in marengo_task_ids.keys(): marengo_task_id = marengo_task_ids[video_file_name] # Retrieve marengo full video and clip embeddings marengo_embeddings_result = twelve_labs_client.embed.task.retrieve(marengo_task_id) #track segment number to match with fiel segment_number = 0 for segment in marengo_embeddings_result.video_embedding.segments: # Determine if this is a video or clip segment is_video = segment.embedding_scope == "video" #Update the file name if segment updated_file_name = video_file_name if not is_video: updated_file_name = updated_file_name.replace(".mp4",f"_segment_{segment_number:03d}.mp4") segment_number += 1 video_name = video_file_name.replace(".mp4","") pegasus_video_id = None if updated_file_name in pegasus_video_ids: pegasus_video_id = pegasus_video_ids[updated_file_name] record = { 'video_name':video_name, 'segment_number': 0 if is_video else segment_number, 'video_file': updated_file_name, 'start_time': getattr(segment, 'start_offset_sec', 0), 'end_time': getattr(segment, 'end_offset_sec', 0), 'type': 'video' if is_video else 'clip', 'task_id': marengo_task_id, 'pegasus_video_id': pegasus_video_id } # Get the embedding vector embedding_vector = [float(x) for x in segment.embeddings_float] # Add to our lists records.append(record) vectors.append(embedding_vector) # Print summary print(f"Prepared {len(records)} segments for upload to Weaviate") print(f"- Video embeddings: {sum(1 for r in records if r['type'] == 'video')}") print(f"- Clip embeddings: {sum(1 for r in records if r['type'] == 'clip')}") return records, vectors
We’ll then use our function to get the records and vectors to upload to Weaviate.
records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids) with collection.batch.dynamic() as batch: for i, record in enumerate(records): batch.add_object( properties=record, vector=vectors[i] ) print(f"Added {len(records)} embeddings to Weaviate")
Testing Vector Search
Now that we have everything in the collection, we can test and see that a vector search on Weaviate returns the correct video. We're using Weaviate’s near_vector
search, so if we search with a video's vector it should return a distance of zero to itself.
We'll search using vector #5 from our collection, which should return the corresponding video segment with a distance of zero.
from weaviate.classes.query import MetadataQuery, Filter # Use a specific vector for the query query_vector = vectors[5] # Perform vector search response = collection.query.near_vector( near_vector=query_vector, limit=1, # Increased limit to get more results return_metadata=MetadataQuery(distance=True), ) print(f"Found {len(response.objects)} results for vector search") for obj in response.objects: print(f"Video: {obj.properties['video_file']}, Type: {obj.properties['type']}") if 'segment_id' in obj.properties: print(f"Segment: {obj.properties['segment_id']}") if 'text' in obj.properties and obj.properties['text']: print(f"Text: {obj.properties['text']}") print(f"Distance: {obj.metadata.distance}") print("-" * 50)
This output confirms our embeddings were properly stored and can be accurately retrieved.
Retrieving Relevant Video Segments for RAG
The core of our RAG pipeline is the ability to match user questions with the most relevant video segments. This process works in three key steps:
We use TwelveLabs' Marengo model to convert the user's text query into a vector embedding
We search Weaviate for video segment embeddings that are most similar to our query embedding
Once we identify the most relevant video segment, we use its associated Pegasus video ID to generate accurate responses specific to that segment
This targeted approach allows us to process only the most relevant portions of video content, significantly improving both efficiency and response quality.
First, we use Marengo to embed our text query.
sample_question = "What technique did David Tyree use to catch the ball?" embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=sample_question, text_truncate="start", ) query_vector = embedding.text_embedding.segments[0].embeddings_float
Then we find the most relevant clip. We use filters=(Filter.by_property("type").equal("clip"))
to return just the clip embeddings, ignoring the full video embeddings.
response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("clip")) ) video_file = response.objects[0].properties.get("video_file") print(video_file)
We see that it returned the fourth clip (index 3) football_480_segment_003.mp4
Lets take a look at the clip:
import matplotlib.pyplot as plt from matplotlib import animation from IPython.display import HTML video_file = response.objects[0].properties.get("video_file") video = sampled_video_files[video_file] fig = plt.figure() im = plt.imshow(video[0,:,:,:]) plt.close() # this is required to not display the generated image def init(): im.set_data(video[0,:,:,:]) def animate(i): im.set_data(video[i,:,:,:]) return im anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0], interval=100) HTML(anim.to_html5_video())
We see that it finds the exact moment in the video where the helmet catch is made.
Now that we know we have the proper segment, we can see how Pegasus and LLaVa-NeXT-Video perform with a shorter clip.
Chatting with our Video Segment: Pegasus vs LLaVa-NeTX-Video
First we will see how Pegasus answers.
pegasus_video_id = response.objects[0].properties.get("pegasus_video_id") print(sample_question) res = twelve_labs_client.generate.text( video_id=pegasus_video_id, prompt=sample_question ) print(f"{res.data}")
What technique did David Tyree use to catch the ball?
David Tyree used a technique where he pressed the ball against his helmet to secure the catch. This was a crucial play that allowed the New York Giants to maintain possession and continue their drive.
We see that Pegasus gives us a great answer - mentioning the helmet catch and how it is a cricial play for the Giants.
Now, lets see if LLaVa-NeTX-Video gives a better answer when looking at a segment.
video_file = response.objects[0].properties.get("video_file") sampled_video = sampled_video_files[video_file] generated_text = query_llava_next(sample_question,llava_next_model,llava_next_processor,sampled_video) print(generated_text)
USER: What technique did David Tyree use to catch the ball? ASSISTANT: The player who was caught in the act of catching the ball used a two-handed, over-the-head catch technique. This technique involves using both hands to secure the ball while elevating it above the receiver's head. It can be a risky maneuver if the ball is deflected from the intended target, but it can also be a very effective way to grab the ball out of the air or catch it in a traffic jam. David Tyree'
We see that it gives an accurate answer that the ball was caught overhead. However it does ot mention that it was a helmet catch. It also starts to ramble a bit at the end.
6 - Multi Video RAG with Marengo, Weaviate, and Pegasus
Now that we know how Marengo embeddings perform on individual clips from a single video, we will show how to use embeddings across multiple videos for a more realistic RAG use case.
Get Marengo Embeddings for All Videos
First, we’ll update the marengo_task_ids
dictionary with Marengo Task IDs for all of our videos.
for video_file_name in os.listdir(upscaled_video_dir): if video_file_name in marengo_task_ids: print(f"skipping {video_file_name} because embeddings already exist") continue print(f"processing {video_file_name}") file_path = os.path.join(upscaled_video_dir, video_file_name) task = twelve_labs_client.embed.task.create( model_name="Marengo-retrieval-2.7", video_file=file_path, video_clip_length=segment_length, video_embedding_scopes=["clip", "video"] ) print( f"Created task: id={task.id} model_name={task.model_name} status={task.status}" ) # Monitor the status of the video embedding task status = task.wait_for_done( sleep_interval=2, callback=on_task_update ) print(f"Embedding done: {status}") marengo_task_ids[video_file_name] = task.id
Split our Remaining Videos into Segments
We’ll then split the remainder of our videos into segments like we did before.
# Create output folder if it doesn't exist os.makedirs(upscaled_video_dir, exist_ok=True) # Get all video files video_files = [f for f in os.listdir(upscaled_video_dir) if f.endswith(('.mp4', '.avi', '.mov'))] # Process each video for video_file in video_files: split_video(upscaled_video_dir + video_file,video_segments_dir,segment_length)
Get Pegasus Video IDs for All Videos and their Segments
We’ll then get the Pegasus Video IDs for the remainder of the segments full videos. We’ll do this in parallel to save time.
import concurrent.futures import os from tqdm import tqdm # Use standard tqdm instead of tqdm.notebook def process_video(video_path): video_file_name = video_path.split("/")[-1] try: video_id = upload_video_to_twelve_labs_pegasus(video_path) return video_file_name, video_id except Exception as e: print(f"Error processing {video_file_name}: {str(e)}") return video_file_name, None # Filter out videos that are already processed segment_video_files = [ video_segments_dir + f for f in os.listdir(video_segments_dir) if f.endswith('.mp4')] full_video_files = [ upscaled_video_dir + f for f in os.listdir(upscaled_video_dir) if f.endswith('.mp4')] all_video_files = segment_video_files + full_video_files videos_to_process = [f for f in all_video_files if f.split("/")[-1] not in pegasus_video_ids] print(f"Processing {len(videos_to_process)} videos in parallel...") # Use ThreadPoolExecutor for I/O-bound operations like API calls with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: # Submit all tasks and create a dictionary mapping futures to their video files future_to_video = {executor.submit(process_video, video_path): video_path for video_path in videos_to_process} # Process results as they complete with a progress bar for future in tqdm(concurrent.futures.as_completed(future_to_video), total=len(videos_to_process)): video_file_name, video_id = future.result() if video_id: pegasus_video_ids[video_file_name] = video_id print("All videos processed!") print(f"Successfully processed {len([v for v in pegasus_video_ids.values() if v is not None])} videos")
Upload Data to Weaviate
Next, we’ll upload the rest of our data to Weaviate.
records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids) with collection.batch.dynamic() as batch: for i, record in enumerate(records): if record["pegasus_video_id"] is None: continue batch.add_object( properties=record, vector=vectors[i] ) print(f"Added {len(records)} embeddings to Weaviate")
RAG Performance Evaluation: Clips vs. Full Videos
With our Marengo embeddings and Pegasus video IDs successfully indexed in Weaviate, we can now evaluate the effectiveness of our RAG system. This evaluation will focus on two critical aspects:
Answer Quality: How accurately does the system respond to queries when using clip-level retrieval versus full-video retrieval?
Processing Efficiency: What are the performance differences in terms of response time and computational resources?
We'll run a series of targeted queries against both approaches - retrieving relevant video clips and retrieving entire videos - to measure these differences quantitatively. This comparison will demonstrate how RAG can significantly improve video processing by focusing only on the most relevant segments, particularly for longer videos or complex queries that reference specific moments.
Let's begin by defining a set of diverse test questions that span different sports and require understanding specific actions or events within our videos.
video_questions = [ "In the American Football Video, what are the teams playing?", "What technique does David Tyree use to catch the ball?", "In the tennis match video, who is playing?", "What foot does Messi shoot at the goal with?", "When does Keri Strug hurt her foot?" ]
Multi Video RAG with Pegasus
First we will assess the performance querying full videos:
from weaviate.classes.query import MetadataQuery, Filter import time pegasus_full_video_answers = [] start_time = time.time() for question in video_questions: embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=question, text_truncate="start", ) query_vector = embedding.text_embedding.segments[0].embeddings_float response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("video")) ) selected_video_name = response.objects[0].properties["video_file"] selected_video_id = response.objects[0].properties["pegasus_video_id"] res = twelve_labs_client.generate.text( video_id=selected_video_id, prompt=question ) pegasus_full_video_answers.append([question,selected_video_name,res.data]) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {int(execution_time)} seconds")
Execution time: 72 seconds
We’ll then compare this to clips.
pegasus_clip_video_answers = [] start_time = time.time() for question in video_questions: embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=question, text_truncate="start", ) query_vector = embedding.text_embedding.segments[0].embeddings_float response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("clip")) ) selected_video_name = response.objects[0].properties["video_file"] selected_video_id = response.objects[0].properties["pegasus_video_id"] res = twelve_labs_client.generate.text( video_id=selected_video_id, prompt=question ) pegasus_clip_video_answers.append([question,selected_video_name,res.data]) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {int(execution_time)} seconds")
Execution time: 20 seconds
Now we’ll compare the answers from the selected clip vs the full video.
for clip_answer, full_answer in zip(pegasus_clip_video_answers, pegasus_full_video_answers): print("question",clip_answer[0]) print("clip: ",clip_answer[2]) print("full: ",full_answer[2]) print("\n")
We see that the answers given are accurate and comparable to each other. However, the clip processing took 20 seconds, while the full video processing took 72 seconds.
Multi Video RAG with LLaVa-NeXT-Video
Now, we will run the same experiment with the LLaVa-NeXT-Video model. But, first we must sample all of our videos.
for video_file in os.listdir(video_segments_dir): print(video_file) sampled_video = sample_video(video_segments_dir + video_file,num_samples=40) sampled_video_files[video_file] = sampled_video
We’ll start by using the full video queries.
llava_full_video_answers = [] start_time = time.time() for question in video_questions: embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=question, text_truncate="start" ) query_vector = embedding.text_embedding.segments[0].embeddings_float response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("video")) ) selected_video_file = response.objects[0].properties["video_file"] selected_video_id = response.objects[0].properties["pegasus_video_id"] sampled_video = sampled_video_files[selected_video_file] generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video) llava_full_video_answers.append([question,selected_video_name,generated_text]) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {int(execution_time)} seconds")
Execution time: 24 seconds
We’ll then compare this to the clips.
from weaviate.classes.query import MetadataQuery import time llava_clip_video_answers = [] start_time = time.time() for question in video_questions: embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=question, text_truncate="start" ) query_vector = embedding.text_embedding.segments[0].embeddings_float response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("clip")) ) selected_video_file = response.objects[0].properties["video_file"] selected_video_id = response.objects[0].properties["pegasus_video_id"] sampled_video = sampled_video_files[selected_video_file] generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video) llava_clip_video_answers.append([question,selected_video_name,generated_text]) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {int(execution_time)} seconds")
Execution time: 24 seconds
We see that execution took the same amount of time for each. This is because we sample 40 frames from each video regardless of length.
Now, we’ll take a look at the answers that LLaVa-NeXT-Video gives for clips and full videos.
for clip_answer, full_answer in zip(llava_clip_video_answers, llava_full_video_answers): print("question",clip_answer[0]) print("clip: ",clip_answer[2]) print("full: ",full_answer[2]) print("\n")
In this case LLaVa-NeXT-Video gets 2/5 correct while analyzing the clips:
It correctly understands that the New York Giants and playing the New England Patriots in the first question; and
It correctly identifies that the tennis match is between Roger Federer and Novak Djokovic in the third question.
7 - Conclusion: Leveraging RAG for Efficient Video Understanding with TwelveLabs and Weaviate
Our exploration of Retrieval-Augmented Generation (RAG) for video processing has demonstrated significant benefits in both efficiency and accuracy. By combining TwelveLabs' advanced video understanding capabilities with Weaviate's powerful vector database, we've created a system that intelligently processes only the most relevant video segments rather than entire videos.
Key Findings
Performance Improvements: When using TwelveLabs' Pegasus with our Weaviate-powered RAG system, we significantly improved processing speeds by querying shorter, more relevant video clips instead of entire videos.
Enhanced Accuracy: For open-source models like LLaVa-NeXT-Video, focusing on specific video segments dramatically improved answer accuracy, enabling more precise responses to queries about video content.
Scalable Architecture: Our RAG pipeline demonstrates how TwelveLabs' embedding models (Marengo) and Weaviate's vector database create a powerful foundation for efficient video understanding. Weaviate's ability to store and retrieve high-dimensional embeddings with low latency is crucial for making this approach practical in real-world applications.
Use Cases
The integration of TwelveLabs' video understanding capabilities with Weaviate's vector database enables powerful applications across numerous industries:
Media & Entertainment: Content creators can quickly locate specific scenes across large video libraries, enabling efficient editing, content repurposing, and clip generation for social media.
Sports Analytics: Coaches and analysts can instantly retrieve relevant plays from game footage by describing the action they're looking for, without manually scrubbing through hours of video.
Retail & E-commerce: Retailers can transform their product demonstration videos into interactive shopping experiences by enabling customers to ask specific questions like "How do I adjust the strap?" or "Show me how it fits in a backpack" and instantly receive the relevant video segment.
Together, TwelveLabs and Weaviate can create powerful video RAG systems that significantly enhance how we interact with and extract value from video content at scale.
Big thanks to Tuana Celik and Erika Cardenas from the Weaviate team for reviewing the draft!
Video processing is computationally expensive and time-consuming, especially when analyzing long-form content. Retrieval-Augmented Generation (RAG) offers a solution by enabling systems to process only the most relevant video segments rather than entire videos. This targeted approach significantly reduces processing time while maintaining or improving response quality.
In this post, we'll explore how to combine Twelve Labs' video understanding capabilities with Weaviate's vector database to create an efficient RAG system for video content. By segmenting videos and using embeddings to retrieve only the most relevant portions for analysis, we can significantly improve processing times while maintaining or even enhancing accuracy.
Our approach leverages several key technologies:
TwelveLabs Pegasus and Marengo models for video understanding and embedding generation
Weaviate vector database for efficient storage and retrieval of video segments
Open source LLaVA-NeXT-Video model as a comparison point for video analysis
We'll demonstrate how this RAG-based approach can reduce the computational load of video processing by focusing only on the most relevant segments, making it possible to analyze longer videos more efficiently. Whether you're building applications for content moderation, sports analysis, or educational content, this approach can help you scale your video processing capabilities while maintaining high-quality results.
1 - Set Up TwelveLabs and Weaviate
TwelveLabs
If you have not already signed up for Twelve Labs you can do so here. Once you have your account set up, go to the Playground, click on your user icon in the top right corner of the screen, and go to API Key
.
In your notebook click the key icon on the left and create a secret with this value as TL_API_KEY
.
Weaviate
If you do not have a Weaviate account, you can sign up here. Once you have an account, go to the cloud dashboard and create a new cluster. Once you have your cluster set up, you’ll need to populate two values in your notebook secrets section.
Add URL under REST Endpoint
to a WEAVIATE_URL
variable. Copy the Admin
key under API Keys
and save it to WEAVIATE_API_KEY
.
2 - Choose a GPU Runtime
We’ll need a GPU to run the LLaVA-NeXT-Video model. In your notebook go to Runtime > Change runtime type
and select a T4 GPU
.
3 - Setting Up Our Environment
Install Dependencies
First, we need to install the TwelveLabs and Weaviate SDKs:
!python -m pip install -U -q twelvelabs !python -m pip install -U -q "weaviate-client>=4.0.0"
We’ll then install the remaining dependencies.
!python -m pip install torch !python -m pip install -q av !python -m pip install --upgrade -q accelerate !python -m pip install -U bitsandbytes !python -m pip install git
!python -m pip install pillow !python -m pip install sentencepiece !python -m
Set Up TwelveLabs and Weaviate SDKs
from google.colab import userdata TL_API_KEY=userdata.get('TL_API_KEY') weaviate_url = userdata.get("WEAVIATE_URL") weaviate_api_key = userdata.get("WEAVIATE_API_KEY")
We’ll then initialize the TwelveLabs Client.
from twelvelabs import TwelveLabs # Initialize the Twelve Labs client twelve_labs_client = TwelveLabs(api_key=TL_API_KEY)
Finally, we’ll set up our Weaviate client and initialize a Video_Embeddings
collection.
import weaviate from weaviate.classes.init import Auth # Connect to Weaviate Cloud weaviate_client = weaviate.connect_to_weaviate_cloud( cluster_url=weaviate_url, auth_credentials=Auth.api_key(weaviate_api_key), ) # Get or create collection try: collection = weaviate_client.collections.get("Video_Embeddings") except: collection = weaviate_client.collections.create(name="Video_Embeddings")
Setting Up Our Video Data
Now, we need to get our video data for embedding. You can find the video data in a Google Drive folder using this link. Copy it to a folder called "TwelveLabs-Weaviate" in your base Google Drive folder. We'll use the following cell to mount your drive and give our notebook access to the video files.
from google.colab import drive drive.mount('/content/drive') base_folder_path = "/content/drive/MyDrive/TwelveLabs-Weaviate" raw_video_dir = base_folder_path + "/sports_videos" upscaled_video_dir = base_folder_path + "/upscaled_videos/" video_segments_dir = base_folder_path + "/video_segments/"
Upscaling Our Videos
Some of our videos are too low resolution for our embeddings model. We’ll need to upscale them before using them.
We’ll create the upscale function here. read_video_pyav
comes directly from the LLaVa-NeXT-Video collab notebook and it formats videos in the correct numpy representation for inference.
import av import numpy as np def upscale_video(input_file, output_file, target_width=1280, target_height=720): input_container = av.open(input_file) output_container = av.open(output_file, mode='w') input_stream = input_container.streams.video[0] output_stream = output_container.add_stream('libx264', rate=input_stream.average_rate) output_stream.width = target_width output_stream.height = target_height output_stream.pix_fmt = 'yuv420p' for frame in input_container.decode(input_stream): frame = frame.reformat(width=target_width, height=target_height) packet = output_stream.encode(frame) output_container.mux(packet) # Flush the encoder packet = output_stream.encode(None) output_container.mux(packet) # Close the containers input_container.close() output_container.close() def read_video_pyav(container, indices): ''' Decode the video with PyAV decoder. Args: container (av.container.input.InputContainer): PyAV container. indices (List[int]): List of frame indices to decode. Returns: np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3). ''' frames = [] container.seek(0) start_index = indices[0] end_index = indices[-1] for i, frame in enumerate(container.decode(video=0)): if i > end_index: break if i >= start_index and i in indices: frames.append(frame) return np.stack([x.to_ndarray(format="rgb24") for x in frames])
We’ll take the videos in our raw_video_dir
, upscale them, and save them to upscaled_video_dir
.
# Create output directory if it doesn't exist if not os.path.exists(upscaled_video_dir): os.makedirs(upscaled_video_dir) # Iterate over all files in the raw video directory for filename in os.listdir(raw_video_dir): # Check if the file is a video file if filename.endswith(".mp4"): print(filename) # Get the file name without extension input_file_no_ext = os.path.splitext(filename)[0] # Define the output file name output_file = f"{input_file_no_ext}_480.mp4" if output_file in os.listdir(upscaled_video_dir): continue # Define the full path for the input and output files input_file_path = os.path.join(raw_video_dir, filename) output_file_path = os.path.join(upscaled_video_dir, output_file) # Upscale the video upscale_video(input_file_path, output_file_path)
4 - Comparing Pegasus and LLaVa-NeXT-Video on a Single Video
Pegasus and LLaVa-NeXT-Video are both video understanding models that allow us to take a video and ask questions about it.
We’ll start by comparing Pegasus and LLaVa-NeXT-Video on a single video from our video collection. The video shows a sequence from Super Bowl XLII where the New York Giants and playing the New England Patriots. It is a famous catch called the “Helmet Catch” where Eli Manning, the Giants Quarterback, throws the ball to the Giants receiver, David Tyree, who successfully catches the ball against his helmet in the final two minutes of the game.
Now that we have context of the video, we will determine how well our two models are able to understand the video when asked “What is happening in this video?”.
Using Pegasus to Chat with our Video
Before we start, we need to set up a Pegasus index to store our video.
models = [ { "name": "pegasus1.2", "options": ["visual"] } ] index_name = "sports_videos" indices_list = twelve_labs_client.index.list(name=index_name) if len(indices_list) == 0: index = twelve_labs_client.index.create( name=index_name, models=models ) print(f"A new index has been created: id={index.id} name={index.name} models={index.models}") else: index = indices_list[0] print(f"Index already exists: id={index.id} name={index.name} models={index.models}")
We then create a function to upload our video to the index. This will return a Pegasus Video ID which we can use to ask questions with our video.
# Monitor the status of the video task def on_task_update(task): print(f" Status={task.status}") def upload_video_to_twelve_labs_pegasus(video_path): task = twelve_labs_client.task.create( index_id=index.id, file = video_path ) print(f"Task created: id={task.id} status={task.status}") task.wait_for_done(sleep_interval=5, callback=on_task_update) if task.status != "ready": raise RuntimeError(f"Indexing failed with status {task.status}") print(f"The unique identifer of your video is {task.video_id}.") return task.video_id
We’ll upload our video and save the Pegasus Video ID to single_video_id
.
# Define the video file path single_video_file = upscaled_video_dir + "football_480.mp4" single_video_id = upload_video_to_twelve_labs_pegasus(single_video_file)
We’ll ask “What is going on in this video? Please be concise.” to ensure that Pegasus really understand the video.
single_video_query = "What is going on in this video? Please be concise." res = twelve_labs_client.generate.text( video_id=single_video_id, prompt=single_video_query ) print(f"{res.data}")
Pegasus responds with this message:
The video showcases a pivotal moment in a football game between the New York Giants and the New England Patriots. Eli Manning, the Giants' quarterback, throws a pass that David Tyree catches spectacularly by pinning the ball against his helmet as he falls out of bounds. Multiple angles replay the catch, emphasizing its difficulty and precision. Tyree briefly celebrates after the play, and the video ends with him and other players walking off the field.
We can see from the response that Pegasus has a deep understanding of the video. It understands that this is a football game between the Giants and Patriots. It also understands that Eli Manning threw the ball, David Tyree caught it, and that it is a pivotal moment in the game.
Pegasus does not mention that it is the Super Bowl, so we’ll ask to make sure.
res = twelve_labs_client.generate.text( video_id=single_video_id, prompt="What game is this?" ) print(f"{res.data}")
Pegasus responds with This is the Super Bowl XLII game.
, which is correct.
Now, lets see how well LLaVa-NeXT-Video understands the video.
Using LLaVa-NeXT-Video to Chat with our Video
For LLaVa-NeXT-Video, we need to prepare our video data in a specific format before inference. This involves sampling frames uniformly throughout the video, as the model doesn't process the entire video stream at once. We'll create a sampling function that extracts 40 evenly distributed frames from each video, ensuring we capture the key moments throughout the content. This sampling approach is adapted from the official LLaVA-NeXT-Video implementation. After sampling, we'll load the model from Hugging Face Hub, format our inputs according to the model's requirements, and run inference to generate responses to our queries.
def sample_video(video_path, num_samples=8): container = av.open(video_path) # sample uniformly num_samples frames from the video total_frames = container.streams.video[0].frames indices = np.arange(0, total_frames, total_frames / num_samples).astype(int) sampled_frames = read_video_pyav(container, indices) return sampled_frames sampled_video = sample_video(single_video_file, num_samples=40)
Once the video is properly sampled, we’ll set up our model.
from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor import torch quantization_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16 ) llava_next_processor = LlavaNextVideoProcessor.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf") llava_next_model = LlavaNextVideoForConditionalGeneration.from_pretrained( "llava-hf/LLaVA-NeXT-Video-7B-hf", quantization_config=quantization_config, device_map='auto' )
Then we’ll create a function to query our model.
def query_llava_next(query,model,processor,sampled_video): # Each "content" is a list of dicts and you can add image/video/text modalities conversation = [ { "role": "user", "content": [ {"type": "text", "text": query}, {"type": "video"}, ], }, ] prompt = processor.apply_chat_template(conversation, add_generation_prompt=True) # prompt_len = len(prompt) inputs = processor([prompt], videos=[sampled_video], padding=True, return_tensors="pt").to(model.device) generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9} output = model.generate(**inputs, **generate_kwargs) generated_text = processor.batch_decode(output, skip_special_tokens=True) return generated_text[0]
Finally, we’ll ask the same questions to compare the output to Pegasus.
llava_next_result = query_llava_next(single_video_query,llava_next_model,llava_next_processor,sampled_video) print(llava_next_result)
We get this response:
What is happening in this video? Be concise ASSISTANT: The video shows a football game in progress, with various players on the field. It appears to be the Super Bowl III between the New York Giants and the New England Patriots, judging by the jersey numbers and the old-fashioned helmets worn by some players. One player is in mid-action, grabbing the ball and getting tackled by another player, while a referee is signaling a first down. There are also coaches and other game
While this model does recognize that there is a football game happening between the Giants and the Patriots, it incorrectly identifies the game as Super Bowl III. It also misses the most important part of the video - the helmet catch.
We’ll ask the same “what game is this?” question that we asked before. We see that the model is closer, but still incorrect.
llava_next_result = query_llava_next("what game is this?",llava_next_model,llava_next_processor,sampled_video) print(llava_next_result)
USER: what game is this? ASSISTANT: The image you've provided is of a football game in progress, specifically from Super Bowl XLI. It is the New England Patriots versus the Giants. The players in the image are Giants and Patriots.
5 - RAG for Segment-Level Queries on a Single Video
Our comparison shows that Pegasus delivers superior results when analyzing complete videos, offering more accurate and coherent responses with faster processing times.
However, we can likely improve the performance of our models by narrowing their focus to only the most relevant video segments. This is where Retrieval-Augmented Generation (RAG) becomes valuable - instead of processing entire videos, we can identify and analyze only the segments that contain information relevant to a specific query.
To implement this approach, we'll leverage TwelveLabs' Marengo model, which specializes in creating high-quality embeddings that capture the semantic content of video segments. These embeddings allow us to:
Index each segment of a video independently.
Match user queries to the most relevant segments.
Process only those specific segments with our video understanding models.
Let's begin by dividing our video into segments and generating embeddings for each one using the Marengo model. These embeddings will serve as the foundation of our RAG system.
Using Marengo to Create Full Video and Video Clip Embeddings
We set our segment length to 10 seconds - the maximum that Marengo allows.
# Define the video segment length segment_length = 10
We’ll then use Marengo to embed our video. Note: we set video_embedding_scopes=["clip", "video"]
and video_clip_length=segment_length
to ensure that Marengo returns embeddings for our entire video along with embeddings for each 10 second clip in the video.
task = twelve_labs_client.embed.task.create( model_name="Marengo-retrieval-2.7", video_file=single_video_file, video_clip_length=segment_length, video_embedding_scopes=["clip", "video"] ) print( f"Created task: id={task.id} model_name={task.model_name} status={task.status}" ) # Monitor the status of the video embedding task status = task.wait_for_done( sleep_interval=2, callback=on_task_update ) print(f"Embedding done: {status}")
Once the embedding is done, we can save the Marengo Task ID to retrieve those embeddings when we need them. We’ll store our Task ID in marengo_task_ids
for later use when populating our Weaviate databse.
single_video_task_id = task.id marengo_task_ids = {} single_video_file_name = single_video_file.split("/")[-1] marengo_task_ids[single_video_file_name] = single_video_task_id
Prepare Video Segments for RAG
To create an efficient RAG pipeline, we want to associate the Pegasus Video ID with the Marengo Task ID in our database. This will allow us to chat with that video segment when it is returned in our vector search. To do this we will also upload each segment of the video to Pegasus to be indexed.
First, we will create a split_video
function to split the video into 10 second segments to be uploaded to Pegasus. We also need to ensure that each segment is over 4 seconds long - the minimum for Pegasus. We do this by including some overlap between the last two clips if the last clip is less than 5 seconds long.
import os import subprocess import json def split_video(input_path, output_dir, segment_duration=10): """ Split a video into segments of the specified duration. Regular segments will be exactly segment_duration seconds. The last segment will be at least 5 seconds long, potentially overlapping with the previous segment if needed. Args: input_path: Path to the input video file output_dir: Directory to save the output segments segment_duration: Duration of each segment in seconds (default: 10) """ # Minimum length for the last segment min_last_segment_len = 5 # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) # Get base filename without extension base_name = os.path.splitext(os.path.basename(input_path))[0] # Get video duration using ffprobe probe_cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", input_path ] try: probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True) video_info = json.loads(probe_result.stdout) duration = float(video_info["format"]["duration"]) except Exception as e: print(f"Error getting video duration: {e}") return 0 # Calculate number of full segments num_full_segments = int(duration / segment_duration) # Calculate remaining duration remaining_duration = duration - (num_full_segments * segment_duration) # Determine total number of segments and if we need to adjust the last segment if remaining_duration > 0: if remaining_duration < min_last_segment_len: # Last segment would be too short, so we'll adjust its start time num_segments = num_full_segments + 1 needs_adjustment = True else: # Last segment is already long enough num_segments = num_full_segments + 1 needs_adjustment = False else: # No remaining duration, all segments are complete num_segments = num_full_segments needs_adjustment = False print(f"Video {base_name} is {duration:.2f} seconds long") print(f"Creating {num_segments} segments") # Create each segment for i in range(num_segments): # For regular segments, start at the segment boundary if i < num_full_segments: start_time = i * segment_duration actual_duration = segment_duration else: # This is the last segment if needs_adjustment: # Start earlier to ensure it's at least min_last_segment_len seconds start_time = duration - min_last_segment_len actual_duration = min_last_segment_len else: # Last segment is already long enough start_time = i * segment_duration actual_duration = remaining_duration output_path = os.path.join(output_dir, f"{base_name}_segment_{i:03d}.mp4") # For all segments, use copy mode for speed cmd = [ "ffmpeg", "-y", "-ss", str(start_time), "-i", input_path, "-t", str(actual_duration), "-c:v", "copy", "-c:a", "copy", output_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f"Error creating segment {i+1}: {result.stderr[:100]}...") else: end_time = start_time + actual_duration if i == num_segments - 1 and needs_adjustment: print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s (adjusted to ensure at least {min_last_segment_len}s)") else: print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s") print(f"Successfully split {base_name} into {num_segments} segments") return num_segments
We’ll save our videos to a new video_segments_dir
split_video(single_video_file, video_segments_dir,segment_length)
We’ll then create pegasus_video_ids
- a dictionary mapping file names with Pegasus Video IDs and add the Video ID for our full video.
pegasus_video_ids = {} fname = single_video_file.split("/")[-1] pegasus_video_ids[fname] = single_video_id
We’ll then upload our video segments to Pegasus and populate pegasus_video_ids
with their video IDs.
segment_video_files = [f for f in os.listdir(video_segments_dir) if f.endswith(('.mp4'))] # Process each video for segment_video_file in segment_video_files: if segment_video_file in pegasus_video_ids: print("skip file",segment_video_file) continue print("processing file",segment_video_file) try: video_id = upload_video_to_twelve_labs_pegasus(video_segments_dir+segment_video_file) pegasus_video_ids[segment_video_file] = video_id except: print("error",segment_video_file) continue
Finally we need to segment all of our videos for efficient use with the LLaVa-NeXT-Video model.
sampled_video_files = {} for video_file in os.listdir(video_segments_dir): print(video_file) sampled_video = sample_video(video_segments_dir + video_file,num_samples=40) sampled_video_files[video_file] = sampled_video for video_file in os.listdir(upscaled_video_dir): print(video_file) sampled_video = sample_video(upscaled_video_dir + video_file,num_samples=40) sampled_video_files[video_file] = sampled_video
Uploading Embeddings to Weaviate
Weaviate expects the metadata records and embeddings vectors to be separate when uploading to a collection. We’ll create prepare_marengo_embeddings_for_weaviate
to take our Marengo Task IDs and our Pegasus Video IDs and prepare the records
and vectors
for upload.
def prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids): # Prepare data for Weaviate upload records = [] vectors = [] for video_file_name in marengo_task_ids.keys(): marengo_task_id = marengo_task_ids[video_file_name] # Retrieve marengo full video and clip embeddings marengo_embeddings_result = twelve_labs_client.embed.task.retrieve(marengo_task_id) #track segment number to match with fiel segment_number = 0 for segment in marengo_embeddings_result.video_embedding.segments: # Determine if this is a video or clip segment is_video = segment.embedding_scope == "video" #Update the file name if segment updated_file_name = video_file_name if not is_video: updated_file_name = updated_file_name.replace(".mp4",f"_segment_{segment_number:03d}.mp4") segment_number += 1 video_name = video_file_name.replace(".mp4","") pegasus_video_id = None if updated_file_name in pegasus_video_ids: pegasus_video_id = pegasus_video_ids[updated_file_name] record = { 'video_name':video_name, 'segment_number': 0 if is_video else segment_number, 'video_file': updated_file_name, 'start_time': getattr(segment, 'start_offset_sec', 0), 'end_time': getattr(segment, 'end_offset_sec', 0), 'type': 'video' if is_video else 'clip', 'task_id': marengo_task_id, 'pegasus_video_id': pegasus_video_id } # Get the embedding vector embedding_vector = [float(x) for x in segment.embeddings_float] # Add to our lists records.append(record) vectors.append(embedding_vector) # Print summary print(f"Prepared {len(records)} segments for upload to Weaviate") print(f"- Video embeddings: {sum(1 for r in records if r['type'] == 'video')}") print(f"- Clip embeddings: {sum(1 for r in records if r['type'] == 'clip')}") return records, vectors
We’ll then use our function to get the records and vectors to upload to Weaviate.
records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids) with collection.batch.dynamic() as batch: for i, record in enumerate(records): batch.add_object( properties=record, vector=vectors[i] ) print(f"Added {len(records)} embeddings to Weaviate")
Testing Vector Search
Now that we have everything in the collection, we can test and see that a vector search on Weaviate returns the correct video. We're using Weaviate’s near_vector
search, so if we search with a video's vector it should return a distance of zero to itself.
We'll search using vector #5 from our collection, which should return the corresponding video segment with a distance of zero.
from weaviate.classes.query import MetadataQuery, Filter # Use a specific vector for the query query_vector = vectors[5] # Perform vector search response = collection.query.near_vector( near_vector=query_vector, limit=1, # Increased limit to get more results return_metadata=MetadataQuery(distance=True), ) print(f"Found {len(response.objects)} results for vector search") for obj in response.objects: print(f"Video: {obj.properties['video_file']}, Type: {obj.properties['type']}") if 'segment_id' in obj.properties: print(f"Segment: {obj.properties['segment_id']}") if 'text' in obj.properties and obj.properties['text']: print(f"Text: {obj.properties['text']}") print(f"Distance: {obj.metadata.distance}") print("-" * 50)
This output confirms our embeddings were properly stored and can be accurately retrieved.
Retrieving Relevant Video Segments for RAG
The core of our RAG pipeline is the ability to match user questions with the most relevant video segments. This process works in three key steps:
We use TwelveLabs' Marengo model to convert the user's text query into a vector embedding
We search Weaviate for video segment embeddings that are most similar to our query embedding
Once we identify the most relevant video segment, we use its associated Pegasus video ID to generate accurate responses specific to that segment
This targeted approach allows us to process only the most relevant portions of video content, significantly improving both efficiency and response quality.
First, we use Marengo to embed our text query.
sample_question = "What technique did David Tyree use to catch the ball?" embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=sample_question, text_truncate="start", ) query_vector = embedding.text_embedding.segments[0].embeddings_float
Then we find the most relevant clip. We use filters=(Filter.by_property("type").equal("clip"))
to return just the clip embeddings, ignoring the full video embeddings.
response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("clip")) ) video_file = response.objects[0].properties.get("video_file") print(video_file)
We see that it returned the fourth clip (index 3) football_480_segment_003.mp4
Lets take a look at the clip:
import matplotlib.pyplot as plt from matplotlib import animation from IPython.display import HTML video_file = response.objects[0].properties.get("video_file") video = sampled_video_files[video_file] fig = plt.figure() im = plt.imshow(video[0,:,:,:]) plt.close() # this is required to not display the generated image def init(): im.set_data(video[0,:,:,:]) def animate(i): im.set_data(video[i,:,:,:]) return im anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0], interval=100) HTML(anim.to_html5_video())
We see that it finds the exact moment in the video where the helmet catch is made.
Now that we know we have the proper segment, we can see how Pegasus and LLaVa-NeXT-Video perform with a shorter clip.
Chatting with our Video Segment: Pegasus vs LLaVa-NeTX-Video
First we will see how Pegasus answers.
pegasus_video_id = response.objects[0].properties.get("pegasus_video_id") print(sample_question) res = twelve_labs_client.generate.text( video_id=pegasus_video_id, prompt=sample_question ) print(f"{res.data}")
What technique did David Tyree use to catch the ball?
David Tyree used a technique where he pressed the ball against his helmet to secure the catch. This was a crucial play that allowed the New York Giants to maintain possession and continue their drive.
We see that Pegasus gives us a great answer - mentioning the helmet catch and how it is a cricial play for the Giants.
Now, lets see if LLaVa-NeTX-Video gives a better answer when looking at a segment.
video_file = response.objects[0].properties.get("video_file") sampled_video = sampled_video_files[video_file] generated_text = query_llava_next(sample_question,llava_next_model,llava_next_processor,sampled_video) print(generated_text)
USER: What technique did David Tyree use to catch the ball? ASSISTANT: The player who was caught in the act of catching the ball used a two-handed, over-the-head catch technique. This technique involves using both hands to secure the ball while elevating it above the receiver's head. It can be a risky maneuver if the ball is deflected from the intended target, but it can also be a very effective way to grab the ball out of the air or catch it in a traffic jam. David Tyree'
We see that it gives an accurate answer that the ball was caught overhead. However it does ot mention that it was a helmet catch. It also starts to ramble a bit at the end.
6 - Multi Video RAG with Marengo, Weaviate, and Pegasus
Now that we know how Marengo embeddings perform on individual clips from a single video, we will show how to use embeddings across multiple videos for a more realistic RAG use case.
Get Marengo Embeddings for All Videos
First, we’ll update the marengo_task_ids
dictionary with Marengo Task IDs for all of our videos.
for video_file_name in os.listdir(upscaled_video_dir): if video_file_name in marengo_task_ids: print(f"skipping {video_file_name} because embeddings already exist") continue print(f"processing {video_file_name}") file_path = os.path.join(upscaled_video_dir, video_file_name) task = twelve_labs_client.embed.task.create( model_name="Marengo-retrieval-2.7", video_file=file_path, video_clip_length=segment_length, video_embedding_scopes=["clip", "video"] ) print( f"Created task: id={task.id} model_name={task.model_name} status={task.status}" ) # Monitor the status of the video embedding task status = task.wait_for_done( sleep_interval=2, callback=on_task_update ) print(f"Embedding done: {status}") marengo_task_ids[video_file_name] = task.id
Split our Remaining Videos into Segments
We’ll then split the remainder of our videos into segments like we did before.
# Create output folder if it doesn't exist os.makedirs(upscaled_video_dir, exist_ok=True) # Get all video files video_files = [f for f in os.listdir(upscaled_video_dir) if f.endswith(('.mp4', '.avi', '.mov'))] # Process each video for video_file in video_files: split_video(upscaled_video_dir + video_file,video_segments_dir,segment_length)
Get Pegasus Video IDs for All Videos and their Segments
We’ll then get the Pegasus Video IDs for the remainder of the segments full videos. We’ll do this in parallel to save time.
import concurrent.futures import os from tqdm import tqdm # Use standard tqdm instead of tqdm.notebook def process_video(video_path): video_file_name = video_path.split("/")[-1] try: video_id = upload_video_to_twelve_labs_pegasus(video_path) return video_file_name, video_id except Exception as e: print(f"Error processing {video_file_name}: {str(e)}") return video_file_name, None # Filter out videos that are already processed segment_video_files = [ video_segments_dir + f for f in os.listdir(video_segments_dir) if f.endswith('.mp4')] full_video_files = [ upscaled_video_dir + f for f in os.listdir(upscaled_video_dir) if f.endswith('.mp4')] all_video_files = segment_video_files + full_video_files videos_to_process = [f for f in all_video_files if f.split("/")[-1] not in pegasus_video_ids] print(f"Processing {len(videos_to_process)} videos in parallel...") # Use ThreadPoolExecutor for I/O-bound operations like API calls with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: # Submit all tasks and create a dictionary mapping futures to their video files future_to_video = {executor.submit(process_video, video_path): video_path for video_path in videos_to_process} # Process results as they complete with a progress bar for future in tqdm(concurrent.futures.as_completed(future_to_video), total=len(videos_to_process)): video_file_name, video_id = future.result() if video_id: pegasus_video_ids[video_file_name] = video_id print("All videos processed!") print(f"Successfully processed {len([v for v in pegasus_video_ids.values() if v is not None])} videos")
Upload Data to Weaviate
Next, we’ll upload the rest of our data to Weaviate.
records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids) with collection.batch.dynamic() as batch: for i, record in enumerate(records): if record["pegasus_video_id"] is None: continue batch.add_object( properties=record, vector=vectors[i] ) print(f"Added {len(records)} embeddings to Weaviate")
RAG Performance Evaluation: Clips vs. Full Videos
With our Marengo embeddings and Pegasus video IDs successfully indexed in Weaviate, we can now evaluate the effectiveness of our RAG system. This evaluation will focus on two critical aspects:
Answer Quality: How accurately does the system respond to queries when using clip-level retrieval versus full-video retrieval?
Processing Efficiency: What are the performance differences in terms of response time and computational resources?
We'll run a series of targeted queries against both approaches - retrieving relevant video clips and retrieving entire videos - to measure these differences quantitatively. This comparison will demonstrate how RAG can significantly improve video processing by focusing only on the most relevant segments, particularly for longer videos or complex queries that reference specific moments.
Let's begin by defining a set of diverse test questions that span different sports and require understanding specific actions or events within our videos.
video_questions = [ "In the American Football Video, what are the teams playing?", "What technique does David Tyree use to catch the ball?", "In the tennis match video, who is playing?", "What foot does Messi shoot at the goal with?", "When does Keri Strug hurt her foot?" ]
Multi Video RAG with Pegasus
First we will assess the performance querying full videos:
from weaviate.classes.query import MetadataQuery, Filter import time pegasus_full_video_answers = [] start_time = time.time() for question in video_questions: embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=question, text_truncate="start", ) query_vector = embedding.text_embedding.segments[0].embeddings_float response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("video")) ) selected_video_name = response.objects[0].properties["video_file"] selected_video_id = response.objects[0].properties["pegasus_video_id"] res = twelve_labs_client.generate.text( video_id=selected_video_id, prompt=question ) pegasus_full_video_answers.append([question,selected_video_name,res.data]) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {int(execution_time)} seconds")
Execution time: 72 seconds
We’ll then compare this to clips.
pegasus_clip_video_answers = [] start_time = time.time() for question in video_questions: embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=question, text_truncate="start", ) query_vector = embedding.text_embedding.segments[0].embeddings_float response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("clip")) ) selected_video_name = response.objects[0].properties["video_file"] selected_video_id = response.objects[0].properties["pegasus_video_id"] res = twelve_labs_client.generate.text( video_id=selected_video_id, prompt=question ) pegasus_clip_video_answers.append([question,selected_video_name,res.data]) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {int(execution_time)} seconds")
Execution time: 20 seconds
Now we’ll compare the answers from the selected clip vs the full video.
for clip_answer, full_answer in zip(pegasus_clip_video_answers, pegasus_full_video_answers): print("question",clip_answer[0]) print("clip: ",clip_answer[2]) print("full: ",full_answer[2]) print("\n")
We see that the answers given are accurate and comparable to each other. However, the clip processing took 20 seconds, while the full video processing took 72 seconds.
Multi Video RAG with LLaVa-NeXT-Video
Now, we will run the same experiment with the LLaVa-NeXT-Video model. But, first we must sample all of our videos.
for video_file in os.listdir(video_segments_dir): print(video_file) sampled_video = sample_video(video_segments_dir + video_file,num_samples=40) sampled_video_files[video_file] = sampled_video
We’ll start by using the full video queries.
llava_full_video_answers = [] start_time = time.time() for question in video_questions: embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=question, text_truncate="start" ) query_vector = embedding.text_embedding.segments[0].embeddings_float response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("video")) ) selected_video_file = response.objects[0].properties["video_file"] selected_video_id = response.objects[0].properties["pegasus_video_id"] sampled_video = sampled_video_files[selected_video_file] generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video) llava_full_video_answers.append([question,selected_video_name,generated_text]) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {int(execution_time)} seconds")
Execution time: 24 seconds
We’ll then compare this to the clips.
from weaviate.classes.query import MetadataQuery import time llava_clip_video_answers = [] start_time = time.time() for question in video_questions: embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=question, text_truncate="start" ) query_vector = embedding.text_embedding.segments[0].embeddings_float response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("clip")) ) selected_video_file = response.objects[0].properties["video_file"] selected_video_id = response.objects[0].properties["pegasus_video_id"] sampled_video = sampled_video_files[selected_video_file] generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video) llava_clip_video_answers.append([question,selected_video_name,generated_text]) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {int(execution_time)} seconds")
Execution time: 24 seconds
We see that execution took the same amount of time for each. This is because we sample 40 frames from each video regardless of length.
Now, we’ll take a look at the answers that LLaVa-NeXT-Video gives for clips and full videos.
for clip_answer, full_answer in zip(llava_clip_video_answers, llava_full_video_answers): print("question",clip_answer[0]) print("clip: ",clip_answer[2]) print("full: ",full_answer[2]) print("\n")
In this case LLaVa-NeXT-Video gets 2/5 correct while analyzing the clips:
It correctly understands that the New York Giants and playing the New England Patriots in the first question; and
It correctly identifies that the tennis match is between Roger Federer and Novak Djokovic in the third question.
7 - Conclusion: Leveraging RAG for Efficient Video Understanding with TwelveLabs and Weaviate
Our exploration of Retrieval-Augmented Generation (RAG) for video processing has demonstrated significant benefits in both efficiency and accuracy. By combining TwelveLabs' advanced video understanding capabilities with Weaviate's powerful vector database, we've created a system that intelligently processes only the most relevant video segments rather than entire videos.
Key Findings
Performance Improvements: When using TwelveLabs' Pegasus with our Weaviate-powered RAG system, we significantly improved processing speeds by querying shorter, more relevant video clips instead of entire videos.
Enhanced Accuracy: For open-source models like LLaVa-NeXT-Video, focusing on specific video segments dramatically improved answer accuracy, enabling more precise responses to queries about video content.
Scalable Architecture: Our RAG pipeline demonstrates how TwelveLabs' embedding models (Marengo) and Weaviate's vector database create a powerful foundation for efficient video understanding. Weaviate's ability to store and retrieve high-dimensional embeddings with low latency is crucial for making this approach practical in real-world applications.
Use Cases
The integration of TwelveLabs' video understanding capabilities with Weaviate's vector database enables powerful applications across numerous industries:
Media & Entertainment: Content creators can quickly locate specific scenes across large video libraries, enabling efficient editing, content repurposing, and clip generation for social media.
Sports Analytics: Coaches and analysts can instantly retrieve relevant plays from game footage by describing the action they're looking for, without manually scrubbing through hours of video.
Retail & E-commerce: Retailers can transform their product demonstration videos into interactive shopping experiences by enabling customers to ask specific questions like "How do I adjust the strap?" or "Show me how it fits in a backpack" and instantly receive the relevant video segment.
Together, TwelveLabs and Weaviate can create powerful video RAG systems that significantly enhance how we interact with and extract value from video content at scale.