Big thanks to the Pinecone team (Adam Heerwagen and Cory Waddingham) for collaborating with us on this tutorial.
Introduction
Welcome to this tutorial on integrating Twelve Labs' Embed API with Pinecone's hosted vector database for RAG-based Q&A on videos. In this guide, you'll learn how to extract text answers from an unstructured database of videos using generative models.
We combine Twelve Labs' rich, contextual embeddings with Pinecone's vector database to store, index, and query these video embeddings, creating a chat application. This notebook demonstrates the current possibilities of these technologies with just a few lines of code.
For comparison, we also showcase the difference in developer experience between using Twelve Labs' Generate API to generate text responses and a leading open-source model, LLaVA-NeXT-Video.
Setup and Installation
Before diving into the core functionality, let's set up our environment and install the necessary libraries.
Installing Required Libraries
First, we'll install the libraries for Twelve Labs and Pinecone. Run the following command in your notebook cell:
# Install required libraries
!pip install twelvelabs pinecone-client
Next, we'll install PyAV for video formatting, and bitsandbytes plus transformers in Hugging Face to use our open-source model:
!pip install -q av
!pip install --upgrade -q accelerate bitsandbytes
!pip install transformers
Authentication
We need to set up our keys for the Twelve Labs API and Pinecone. We'll use Google Colab's built-in userdata library to store these keys. You can find the Pinecone information in their console after signing up. They offer a free Starter Tier that's more than sufficient for this demo.
You can find the Twelve Labs key in your account upon signing up at api.twelvelabs.io.
from google.colab import userdata
TL_API_KEY=userdata.get('TL_API_KEY')
PINECONE_API_KEY=userdata.get('PINECONE_API_KEY')
Setting Up our Video Data
Now, we need to get our video data for embedding. You can find the video data in a Google Drive folder using this link. Copy it to a folder called "TwelveLabs-Pinecone" in your base Google Drive folder. We'll use the following cell to mount your drive and give our notebook access to the video files.
from google.colab import drive
drive.mount('/content/drive')
base_folder_path = "/content/drive/MyDrive/TwelveLabs-Pinecone"
single_video = base_folder_path + "/ad_vids/Rare Beauty By Selena Gomez - Makeup Made To Feel Good In.mp4"
split_video_dir = base_folder_path + "/split_ad_videos"
Setting up our Clients
It’s time to set up the configuration for Pinecone and Twelve Labs. It imports and initializes both services using their respective API keys.
# Configure Pinecone
from pinecone import Pinecone, ServerlessSpec
pc = Pinecone(api_key=PINECONE_API_KEY)
from twelvelabs import TwelveLabs
from twelvelabs.models.embed import EmbeddingsTask
# Initialize the Twelve Labs client
twelvelabs_client = TwelveLabs(api_key=TL_API_KEY)
Preparing Embeddings and Ingesting in Pinecone
The code block below demonstrates a process for generating and storing video embeddings using the Twelve Labs API and Pinecone vector database. It defines two main functions:
- The
generate_embedding
function is responsible for creating and managing the embedding task:- It creates an embedding task using the Twelve Labs API with the specified video file and engine.
- It defines a callback function to monitor the task's progress.
- It waits for the task to complete and retrieves the results.
- Finally, it extracts the embeddings along with their metadata (time ranges and scope) from the task result.
- The
ingest_data
function is the main function for data ingestion:- It calls
generate_embedding
to get the embeddings for the given video file. - It connects to the Pinecone index (which is called
twelve-labs
in this case). - It prepares the vectors for upsert by formatting the embeddings with their metadata.
- It upserts the vectors into the Pinecone index.
When running this code, you'll see progress updates as the embedding task is processed, and finally, a confirmation of how many embeddings were ingested into Pinecone. This sets the foundation for later retrieval and analysis of the video content using these embeddings.
# Define a callback function to monitor task progress
def on_task_update(task: EmbeddingsTask):
print(f" Status={task.status}")
def generate_embedding(video_file):
# Create an embedding task
task = twelvelabs_client.embed.task.create(
engine_name="Marengo-retrieval-2.6",
video_file=video_file
)
print(f"Created task: id={task.id} engine_name={task.engine_name} status={task.status}")
# Wait for the task to complete
status = task.wait_for_done(
sleep_interval=2,
callback=on_task_update
)
print(f"Embedding done: {status}")
# Retrieve the task result
task_result = twelvelabs_client.embed.task.retrieve(task.id)
# Extract and return the embeddings
embeddings = []
for v in task_result.video_embeddings:
embeddings.append({
'embedding': v.embedding.float,
'start_offset_sec': v.start_offset_sec,
'end_offset_sec': v.end_offset_sec,
'embedding_scope': v.embedding_scope
})
return embeddings, task_result
def ingest_data(video_file_path, index_name = "twelve-labs"):
"""
Generate embeddings for video and store in Pinecone
"""
#Strip the extension and the folders from the video_file_path
video_name = os.path.splitext(os.path.basename(video_file_path))[0]
print(video_name)
# Connect to Pinecone index
if index_name not in pc.list_indexes().names():
pc.create_index(
name=index_name,
dimension=1024, # The dimensions of Twelve Lab's Embedding Model
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
)
)
index = pc.Index(index_name)
# Generate embeddings using Twelve Labs Embed API
embeddings, task_result = generate_embedding(video_file_path)
# Prepare vectors for upsert
vectors_to_upsert = []
for i, emb in enumerate(embeddings):
vector_id = f"{video_name}_{i}"
vectors_to_upsert.append((vector_id, emb['embedding'], {
'video_file': video_name,
'video_segment': i,
'start_time': emb['start_offset_sec'],
'end_time': emb['end_offset_sec'],
'scope': emb['embedding_scope']
}))
# Upsert embeddings to Pinecone
index.upsert(vectors=vectors_to_upsert)
return f"Ingested {len(embeddings)} embeddings for {video_file_path}"
And now we’ll use these two functions to load our video embeddings into Pinecone:
# Example usage
result = ingest_data(single_video)
print(result)
This code allows you to generate multimodal embeddings for a video using Twelve Labs' Embed API and store them in Pinecone for later retrieval. The embeddings capture various aspects of the video content, including visual, audio, and textual information, making them suitable for a wide range of AI applications.
Retrieving from a Text Query
We'll set up functions to embed text using Twelve Labs' Marengo model and to retrieve similar content from the Pinecone database:
- The
get_text_embedding
function is responsible for converting a text query into an embedding using the Twelve Labs Embed API:- It uses the
twelvelabs_client.embed.create
method to generate an embedding for the given text. - The
engine_name
parameter specifies which embedding model to use ("Marengo-retrieval-2.6”). - The
text_truncate
parameter is set to "start", which means if the text is too long, it will be truncated from the start.
- The
retrieve_similar_content
function is the main function for content retrieval:- It takes a text query and the number of results to return (
top_k
) as parameters. - It calls
get_text_embedding
to convert the text query into an embedding. - It connects to the Pinecone index called
twelve-labs
. - It queries the Pinecone index for vectors similar to the query embedding, specifying the number of results to return and including metadata.
The retrieval process works by comparing the embedding of the text query with the pre-computed embeddings of video segments stored in Pinecone. This allows for fast and efficient similarity search across large video datasets.
def get_text_embedding(text_query):
# Twelve Labs Embed API supports text-to-embedding
text_embedding = twelvelabs_client.embed.create(
engine_name="Marengo-retrieval-2.6",
text=text_query,
text_truncate="start"
)
return text_embedding.text_embedding.float
def retrieve_similar_content(query, index_name="twelve-labs", top_k=5):
"""
Retrieve similar content based on query embedding
"""
# Generate query embedding
query_embedding = get_text_embedding(query)
# Connect to Pinecone index
index = pc.Index(index_name)
# Query Pinecone for similar vectors
results = index.query(vector=query_embedding, top_k=top_k, include_metadata=True)
return results
Now we can use the retrieve_similar_content function with a sample text query and print out the query and the details of the top similar content found.
# Example usage
text_query = "Lipstick"
similar_content = retrieve_similar_content(text_query)
print(f"Query: '{text_query}'")
print(f"Top {len(similar_content['matches'])} similar content:")
for i, match in enumerate(similar_content['matches']):
print(f"{i+1}. Score: {match['score']:.4f}")
print(f" Video File: {match['metadata']['video_file']}")
print(f" Video ID: {match['metadata']['video_segment']}")
print(f" Time range: {match['metadata']['start_time']} - {match['metadata']['end_time']} seconds")
print(f" Scope: {match['metadata']['scope']}")
print()
This code allows you to perform semantic search on your video content using text queries. It leverages the power of Twelve Labs' multimodal embeddings to find video segments that are semantically similar to the text query, even if the exact words are not present in the video.
When running this code, you'll see the top matching video segments along with their similarity scores, video files, video IDs, time ranges, and scopes. This enables various applications such as video search, content recommendations, and more.
Video Formatting
With our video embeddings in the database and the ability to query them, our first experiment aims to link those embeddings to specific video clips rather than the full video. We'll split them up similarly to how the embedding model handles timestamps.
The split_video
function below uses the av library to split a video file into smaller segments of a specified duration. Here's a brief explanation:
- The function takes an input video path, an output directory, and a segment duration (default 6 seconds).
- It opens the input video, calculates the number of frames per segment based on the video's frame rate, and then iterates through the video frames.
- For each segment, it creates a new output container, writes the frames to it, and adjusts the frame timestamps.
- The resulting segments are saved as separate MP4 files in the output directory, named with sequential numbers.
import av
def split_video(input_path, output_dir, segment_duration=6):
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
input_file_name = os.path.splitext(os.path.basename(input_path))[0]
print(input_file_name)
with av.open(input_path) as input_container:
# Get video stream
input_stream = input_container.streams.video[0]
fps = input_stream.average_rate
# Calculate how many frames are in each segment
frames_per_segment = int(segment_duration * fps)
segment_count = 0
frame_count = 0
output_container = None
output_stream = None
first_frame_timestamp = None
for frame in input_container.decode(video=0):
if frame_count % frames_per_segment == 0:
# Close previous output container if it exists
if output_container:
output_container.close()
# Create a new output container
output_path = os.path.join(output_dir, f'{input_file_name}_segment_{segment_count:03d}.mp4')
segment_count += 1
output_container = av.open(output_path, mode='w')
output_stream = output_container.add_stream('h264', rate=fps)
output_stream.width = frame.width
output_stream.height = frame.height
output_stream.pix_fmt = 'yuv420p'
# Reset the first frame timestamp for the new segment
first_frame_timestamp = frame.pts
# Adjust the frame timestamp
frame.pts -= first_frame_timestamp
# Encode frame
packet = output_stream.encode(frame)
output_container.mux(packet)
frame_count += 1
# Flush the encoder
packet = output_stream.encode(None)
output_container.mux(packet)
# Close the last output container
if output_container:
output_container.close()
split_video(input_path=single_video, output_dir=split_video_dir)
Setting up a Query
Now we have everything ready to start interacting with our generative models. Let’s define a query and retrieve the relevant content:
query = "What is this advertisement selling?"
similar_content = retrieve_similar_content(query)
Using Pegasus to Chat with our Video Clip
There are three things that we’ll need to do to use the Pegasus-1 model:
- Setup an index on Twelve Labs to host the videos – We only ever need to do this once
- Upload the videos to Twelve Labs – We only ever need to do once per video
- Query Pegasus with our prompt and video
First, we’ll set up our Index:
engines = [
{
"name": "pegasus1.1",
"options": ["visual", "conversation"]
}
]
index_name = "ads_index"
indices_list = twelvelabs_client.index.list(name=index_name)
if len(indices_list) == 0:
index = twelvelabs_client.index.create(
name=index_name,
engines=engines,
)
print(f"A new index has been created: id={index.id} name={index.name} engines={index.engines}")
else:
index = indices_list[0]
print(f"Index already exists: id={index.id} name={index.name} engines={index.engines}")
Then, we’ll set up our uploading logic for easy use:
def upload_video_to_twelve_labs(video_path):
task = twelvelabs_client.task.create(
index_id=index.id,
file = video_path
)
print(f"Task created: id={task.id} status={task.status}")
task.wait_for_done(sleep_interval=5, callback=on_task_update)
if task.status != "ready":
raise RuntimeError(f"Indexing failed with status {task.status}")
print(f"The unique identifier of your video is {task.video_id}.")
return task.video_id
Now, we can loop through the full directory of split videos and upload them to our index on Twelve Labs.
video_ids = {}
for split_video_filename in os.listdir(split_video_dir):
split_video_path = os.path.join(split_video_dir, split_video_filename)
print(split_video_path)
split_video_name = split_video_filename.split('.')[0]
print(split_video_name)
video_id = upload_video_to_twelve_labs(split_video_path)
video_ids[split_video_name] = video_id
print(video_ids)
Calling Pegasus
All that is left is to send a simple query after linking our retrieval results to the actual video clip:
# retrieve the correct video_id for the relevant video
video_segment = (int) (similar_content['matches'][0]['metadata']['video_segment'])
print(f"Retrieved video segment: {video_segment}")
base_filename = os.path.splitext(os.path.basename(single_video))[0]
video_key = f"{base_filename}_segment_{video_segment:03d}"
video_id = video_ids[video_key]
res = twelvelabs_client.generate.text(
video_id=video_id,
prompt=query
)
print(f"{res.data}")
Using LLaVA-NeXT-Video
For our open-source model, we will need to:
- Turn our videos into a numpy format
- Sample them into a subset of frames for the model to consume
- Download and host the model on our GPU’s
- Handle all formatting for our model and running the query
Turning our Videos into a numpy format
The read_video_pyav()
function decodes specific frames from a video using PyAV.
import av
import numpy as np
def read_video_pyav(container, indices):
'''
Decode the video with PyAV decoder.
Args:
container (av.container.input.InputContainer): PyAV container.
indices (List[int]): List of frame indices to decode.
Returns:
np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3).
'''
frames = []
container.seek(0)
start_index = indices[0]
end_index = indices[-1]
for i, frame in enumerate(container.decode(video=0)):
if i > end_index:
break
if i >= start_index and i in indices:
frames.append(frame)
Sampling our Video
In the code block below:
get_total_frames()
: Counts the total number of frames in a video.sample_video()
: Samples a specified number of frames uniformly from a video.process_videos_in_folder()
: Processes all videos in a given folder, sampling frames from each.
def get_total_frames(video_path):
"""
Manually count the total number of frames in a video. Used in case uniformly sampling comes up as 0 frames.
"""
container = av.open(video_path)
video_stream = container.streams.video[0]
total_frames = 0
for frame in container.decode(video_stream):
total_frames += 1
return total_frames
def sample_video(video_path, num_samples=8):
container = av.open(video_path)
video_stream = container.streams.video[0]
# sample uniformly num_samples frames from the video
total_frames = container.streams.video[0].frames
if total_frames == 0:
total_frames = get_total_frames(video_path)
indices = np.arange(0, total_frames, total_frames / num_samples).astype(int)
sampled_frames = read_video_pyav(container, indices)
return sampled_frames
def process_videos_in_folder(folder_path):
sample_info = {}
# Supported video file extensions
video_extensions = ('.mp4', '.avi', '.mov', '.mkv')
for filename in os.listdir(folder_path):
simple_video_name = os.path.splitext(os.path.basename(filename))[0]
if filename.lower().endswith(video_extensions):
video_path = os.path.join(folder_path, filename)
try:
print("Sampling " + video_path)
sampled_clip = sample_video(video_path)
sample_info[simple_video_name] = {"sampled_video": sampled_clip, "video_path" : video_path}
except Exception as e:
print(f"Error processing {filename}: {str(e)}")
return sample_info
sampled_video_info = process_videos_in_folder(split_video_dir)
Loading the Model
Here we initialize the LLaVA-NeXT-Video model and its processor using the Hugging Face Transformers library. Specifically, it uses 4-bit quantization for efficient memory usage.
from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor
import torch
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16
)
processor = LlavaNextVideoProcessor.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf")
model = LlavaNextVideoForConditionalGeneration.from_pretrained(
"llava-hf/LLaVA-NeXT-Video-7B-hf",
quantization_config=quantization_config,
device_map='auto'
)
Asking our Model a Question
We’ll use this function to pull the segment number from the actual file name that we get back from our Pinecone query. The video_segment_name_from_offset()
function generates a segment name based on the video path and start time. It retrieves a specific video segment based on metadata from a 'similar_content'
dictionary.
def video_segment_name_from_offset(video_path, start_time, segment_length = 6):
segment_number = int (start_time // segment_length)
simple_video_name = os.path.splitext(os.path.basename(video_path))[0]
return f"{simple_video_name}_segment_{segment_number:03d}"
Now we can get the sampled video to format a request to our model. First, we prepare a conversation input with both text and video content. Then, we process the input using the LLaVA-NeXT-Video processor. Finally, we generate a response using the model with specified parameters.
video_segment = similar_content['matches'][0]['metadata']['video_file']
print(video_segment)
video_offset = similar_content['matches'][0]['metadata']['start_time']
video_segment_name = video_segment_name_from_offset(video_segment, video_offset)
video_segment = sampled_video_info[video_segment_name]['sampled_video']
# Each "content" is a list of dicts and you can add image/video/text modalities
conversation = [
{
"role": "user",
"content": [
{"type": "text", "text": query},
{"type": "video"},
],
},
]
prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
prompt_len = len(prompt)
inputs = processor([prompt], videos=[video_segment], padding=True, return_tensors="pt").to(model.device)
generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9}
output = model.generate(**inputs, **generate_kwargs)
generated_text = processor.batch_decode(output, skip_special_tokens=True)
print(generated_text[0])
Comparison
When we compare the outputs of these models, we can see a more detailed and context-aware answer from Pegasus compared to LLaVA-NeXT-Video.
However, both models clearly struggle to understand what is going on in the full video, as they were only given clips. Let’s now show what giving them the full video looks like.
Multiple Videos
Now we will have an unstructured folder of videos, which we can ask questions about overall. Earlier, our retrieval was finding us the most relevant clips to a specific query. We’ll do the same thing here, but we will then feed our models the full video that corresponds to that clip. We can easily do this with the metadata that we’re storing in Pinecone.
We’ll start off by ingesting our videos to Pinecone:
ads_dir = os.path.join(base_folder_path,"ad_vids")
video_list = []
#Make sure that we' don't waste time re-embedding the original single video:
single_video_filename = os.path.splitext(os.path.basename(single_video))[0]
for filename in os.listdir(ads_dir):
if filename.endswith(".mp4") and single_video_filename not in filename:
video_list.append(ads_dir + "/" + filename)
print(video_list)
for video in video_list:
ingest_data(video)
Now we’ll set up a few questions that we can ask of our database, and retrieve the most relevant videos:
full_database_questions = ["Who is the actor in the Miss Dior video?", "What ad is Selena Gomez in?", "What is the ad for Rare Beauty about?", "Why should people buy the Rare Beauty product according to their ad?"]
question = full_database_questions[0]
similar_content_from_question = retrieve_similar_content(question)
video_name = similar_content_from_question['matches'][0]['metadata']['video_file']
Using Pegasus
Now we’ll cover the incremental things that we need to do to use Pegasus via the Python SDK. This time, we already have our index set up, so we only need to upload the videos before querying.
Uploading our Videos to Twelve Labs
We iterate through videos in a directory, upload each to Twelve Labs, and store the video IDs.
for vid in os.listdir(ads_dir):
vid_path = os.path.join(ads_dir, vid)
vid_name = os.path.splitext(os.path.basename(vid_path))[0]
print(vid_path)
video_id = upload_video_to_twelve_labs(vid_path)
video_ids[vid_name] = video_id
Querying Pegasus with our Video Database
We then use the Twelve Labs client to generate text based on a video ID and a question prompt.
video_id = video_ids[video_name]
res = twelvelabs_client.generate.text(
video_id=video_id,
prompt=question
)
print(f"{res.data}")
Using LLaVA-NeXT-Video with our Video Database
Sampling Videos
We first need to sample all our videos, store them, and then access the correct sampling for the retrieved video:
sampled_database_video_info = process_videos_in_folder(ads_dir)
video_segment = sampled_database_video_info[video_name]['sampled_video']
Running our Model
Now we can run our model on the video sample.
- First we create a conversation structure with a user role, including text (question) and video content.
- Then we apply a chat template to the conversation, prepare inputs for the model (including the prompt and video segment) and set up generation parameters (
max_new_tokens
, do_sample
, top_p
). - Finally, we use the LLaVA-NeXT-Video model to generate text based on the inputs, decode the output, and print the generated text.
conversation = [
{
"role": "user",
"content": [
{"type": "text", "text": question},
{"type": "video"},
],
},
]
prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
prompt_len = len(prompt)
inputs = processor([prompt], videos=[video_segment], padding=True, return_tensors="pt").to(model.device)
generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9}
output = model.generate(**inputs, **generate_kwargs)
generated_text = processor.batch_decode(output, skip_special_tokens=True)
print(generated_text[0])
Comparison
When running this query on our two models, we observe that Pegasus clearly understands who Natalie Portman is and recognizes her presence in the video. In contrast, the LLaVA-NeXT-Video model either doesn't recognize Natalie Portman or can't "see" her well enough with the given sampling. Moreover, it tends to veer off-topic, resulting in a longer response and increased latency—a potential concern for production use cases.
Conclusion
This guide demonstrated how to interact with videos, either individually or as a complete set. It utilized the Twelve Labs Embed API and Pinecone's vector database for managing retrieval.
We also compared Twelve Labs' Pegasus model with the LLaVA-NeXT-Video open-source model, evaluating the required infrastructure, developer experience, and query results. Pegasus showed promise with less operational overhead and better instruction following compared to the open-source model.
Best Practices
- For dedicated hosts, consider using Pinecone's Pod-based offerings
- Consider the trade-off between the number of frames the open-source video model processes, its accuracy, and generation latency.
- When possible, quantize the open-source model to accelerate inference. Experiment with inference time trade-offs among RAM usage, speed, and quality to meet your specific needs.
Next Steps
When retrieving videos from a much larger set, the retrieval mechanism may be less accurate. Some potential solutions include:
- Training a linear adapter on top of the embeddings to better fit your data.
- Re-ranking videos using Pegasus when clips from different videos are returned.
- Adding Textual Summary data for each video to the Pinecone entries to create a hybrid search system, enhancing accuracy using Pinecone's Metadata capabilities.
Appendix
For your reference and further exploration:
- Complete Colab Notebook
- Twelve Labs Documentation
- Pinecone’s Client and Cloud Documentation