Vertex AI RAG Engine doesn't auto-sync when documents change. Here's how to build an event-driven pipeline with Eventarc, Cloud Functions, and Pub/Sub batching to trigger ImportRagFiles automatically using Terraform.
You've built your RAG Engine corpus, imported files from GCS, and tested retrieval with Gemini. Then someone uploads new documents to the bucket and... nothing happens. The corpus still returns stale results.
Vertex AI RAG Engine doesn't watch your GCS bucket for changes. You need to call ImportRagFiles whenever documents are added or updated. This post builds the event-driven pipeline with Terraform: GCS object events flow through Eventarc to a Cloud Function (2nd gen) that triggers the import. A Pub/Sub-based batching layer prevents redundant imports when multiple files land at once. 🎯
🏗️ Architecture Overview
GCS Bucket (docs added/updated/deleted)
↓ Eventarc (object.finalize / object.delete)
Cloud Function 2nd Gen (calls ImportRagFiles)
↓ On failure
Cloud Logging + Alert Policy → Notification Channel
Why Cloud Functions 2nd gen? Gen 2 functions are built on Cloud Run and use Eventarc for triggers, giving you native GCS event support with retry policies, longer timeouts (up to 60 minutes), and concurrency control. Gen 1 functions work but lack the Eventarc integration and timeout flexibility.
🔧 Terraform: The Full Pipeline
Required APIs and Permissions
GCS-to-Eventarc triggers require the Cloud Storage service account to publish to Pub/Sub. This is the permission step most people miss:
# sync/apis.tf
resource "google_project_service" "required" {
for_each = toset([
"cloudfunctions.googleapis.com",
"cloudbuild.googleapis.com",
"run.googleapis.com",
"eventarc.googleapis.com",
"aiplatform.googleapis.com",
"storage.googleapis.com",
])
project = var.project_id
service = each.value
}
# GCS service account needs Pub/Sub publisher role for Eventarc
data "google_storage_project_service_account" "gcs_account" {}
resource "google_project_iam_member" "gcs_pubsub_publishing" {
project = var.project_id
role = "roles/pubsub.publisher"
member = "serviceAccount:${data.google_storage_project_service_account.gcs_account.email_address}"
}
Service Account for the Cloud Function
# sync/iam.tf
resource "google_service_account" "rag_sync" {
account_id = "${var.environment}-rag-sync-sa"
display_name = "RAG Sync Cloud Function SA"
project = var.project_id
}
# Permission to call ImportRagFiles
resource "google_project_iam_member" "vertex_ai_user" {
project = var.project_id
role = "roles/aiplatform.user"
member = "serviceAccount:${google_service_account.rag_sync.email}"
}
# Permission to receive Eventarc events
resource "google_project_iam_member" "eventarc_receiver" {
project = var.project_id
role = "roles/eventarc.eventReceiver"
member = "serviceAccount:${google_service_account.rag_sync.email}"
}
# Permission for Cloud Run (gen2 functions run on Cloud Run)
resource "google_project_iam_member" "run_invoker" {
project = var.project_id
role = "roles/run.invoker"
member = "serviceAccount:${google_service_account.rag_sync.email}"
}
# Read access to the GCS data source bucket
resource "google_storage_bucket_iam_member" "read_docs" {
bucket = var.rag_docs_bucket_name
role = "roles/storage.objectViewer"
member = "serviceAccount:${google_service_account.rag_sync.email}"
}
Cloud Function Source Code
# sync/function_source/main.py
import functions_framework
from google.cloud import aiplatform
import os
import logging
import time
logger = logging.getLogger(__name__)
PROJECT_ID = os.environ["PROJECT_ID"]
LOCATION = os.environ["LOCATION"]
RAG_CORPUS_ID = os.environ["RAG_CORPUS_ID"]
GCS_BUCKET = os.environ["GCS_BUCKET"]
CHUNK_SIZE = int(os.environ.get("CHUNK_SIZE", "512"))
CHUNK_OVERLAP = int(os.environ.get("CHUNK_OVERLAP", "100"))
@functions_framework.cloud_event
def handle_gcs_event(cloud_event):
"""Triggered by GCS object finalize/delete events."""
data = cloud_event.data
file_name = data["name"]
event_type = cloud_event["type"]
logger.info(f"Event: {event_type}, File: {file_name}")
# Skip non-document files
supported_extensions = (".pdf", ".txt", ".md", ".html", ".docx", ".csv")
if not file_name.lower().endswith(supported_extensions):
logger.info(f"Skipping unsupported file type: {file_name}")
return "Skipped", 200
# Import the specific file that changed
aiplatform.init(project=PROJECT_ID, location=LOCATION)
gcs_uri = f"gs://{GCS_BUCKET}/{file_name}"
try:
from vertexai.preview import rag
# Import the changed file into the corpus
rag.import_files(
corpus_name=RAG_CORPUS_ID,
paths=[gcs_uri],
chunk_size=CHUNK_SIZE,
chunk_overlap=CHUNK_OVERLAP,
max_embedding_requests_per_min=300,
)
logger.info(f"Import triggered for: {gcs_uri}")
return "OK", 200
except Exception as e:
logger.error(f"Import failed for {gcs_uri}: {str(e)}")
raise # Let Eventarc retry
Key design choice: We import the specific file that changed (paths=[gcs_uri]) rather than re-importing the entire bucket. This makes each sync fast and targeted. For deletions, RAG Engine handles cleanup when the file is no longer accessible at the source URI during the next full import.
Package and Deploy the Function
# sync/function.tf
# Zip the function source
data "archive_file" "rag_sync" {
type = "zip"
source_dir = "${path.module}/function_source"
output_path = "${path.module}/function_source.zip"
}
# Upload to GCS for Cloud Functions deployment
resource "google_storage_bucket" "function_source" {
name = "${var.project_id}-rag-sync-source"
location = var.region
uniform_bucket_level_access = true
lifecycle_rule {
action { type = "Delete" }
condition { age = 30 }
}
}
resource "google_storage_bucket_object" "function_zip" {
name = "rag-sync-${data.archive_file.rag_sync.output_md5}.zip"
bucket = google_storage_bucket.function_source.name
source = data.archive_file.rag_sync.output_path
}
# Deploy Cloud Function 2nd gen with GCS Eventarc trigger
resource "google_cloudfunctions2_function" "rag_sync" {
name = "${var.environment}-rag-sync"
location = var.region
project = var.project_id
build_config {
runtime = "python312"
entry_point = "handle_gcs_event"
source {
storage_source {
bucket = google_storage_bucket.function_source.name
object = google_storage_bucket_object.function_zip.name
}
}
}
service_config {
max_instance_count = 1 # Prevent concurrent imports
min_instance_count = 0
timeout_seconds = 540 # 9 minutes for large files
available_memory = "256Mi"
service_account_email = google_service_account.rag_sync.email
environment_variables = {
PROJECT_ID = var.project_id
LOCATION = var.region
RAG_CORPUS_ID = var.rag_corpus_id
GCS_BUCKET = var.rag_docs_bucket_name
CHUNK_SIZE = var.chunk_size
CHUNK_OVERLAP = var.chunk_overlap
}
}
event_trigger {
trigger_region = var.region
event_type = "google.cloud.storage.object.v1.finalized"
retry_policy = "RETRY_POLICY_RETRY"
service_account_email = google_service_account.rag_sync.email
event_filters {
attribute = "bucket"
value = var.rag_docs_bucket_name
}
}
depends_on = [
google_project_iam_member.gcs_pubsub_publishing,
google_project_iam_member.eventarc_receiver,
google_project_iam_member.run_invoker,
]
}
max_instance_count = 1 is important. It prevents multiple concurrent imports from overwhelming your embedding model's QPM quota. Events queue up and process sequentially.
⚠️ Handling Deletions
The google.cloud.storage.object.v1.finalized event fires on creates and updates, but not deletes. For deletions, you have two options:
Option 1: Add a second trigger for deletes. Add another event_trigger block with event_type = "google.cloud.storage.object.v1.deleted" and handle it in your function by calling rag.delete_rag_files().
Option 2: Scheduled full re-import. Run a nightly Cloud Scheduler job that does a full import_files on the entire bucket. RAG Engine compares against existing files and removes vectors for deleted documents. This is simpler and handles edge cases better.
For most teams, option 2 is the pragmatic choice. Event-driven imports handle creates/updates in near-real-time, and the nightly re-import catches deletions.
📐 Batching: Handling Bulk Uploads
When someone uploads 50 files at once, you get 50 Eventarc events. With max_instance_count = 1, Cloud Run queues these requests and processes them sequentially. Each file gets imported individually, which works but is slower than a single bulk import.
For better performance on bulk uploads, add a Pub/Sub batching layer:
# Alternative: Route events through Pub/Sub for batching
resource "google_pubsub_topic" "rag_sync" {
name = "${var.environment}-rag-sync-topic"
project = var.project_id
}
resource "google_pubsub_subscription" "rag_sync_batch" {
name = "${var.environment}-rag-sync-batch"
topic = google_pubsub_topic.rag_sync.name
project = var.project_id
ack_deadline_seconds = 600
message_retention_duration = "86400s"
push_config {
push_endpoint = google_cloudfunctions2_function.rag_sync.url
}
}
Then modify the function to collect file URIs from the batch and call import_files once with all URIs. This reduces embedding API calls and import overhead.
🔔 Monitoring
# sync/monitoring.tf
resource "google_monitoring_alert_policy" "rag_sync_errors" {
display_name = "${var.environment}-rag-sync-errors"
project = var.project_id
combiner = "OR"
conditions {
display_name = "Cloud Function errors"
condition_threshold {
filter = "resource.type=\"cloud_run_revision\" AND resource.labels.service_name=\"${var.environment}-rag-sync\" AND metric.type=\"run.googleapis.com/request_count\" AND metric.labels.response_code_class!=\"2xx\""
comparison = "COMPARISON_GT"
threshold_value = 5
duration = "300s"
aggregations {
alignment_period = "300s"
per_series_aligner = "ALIGN_COUNT"
}
}
}
notification_channels = [var.notification_channel_id]
}
📐 Environment Configuration
# environments/dev.tfvars
max_instance_count = 1
chunk_size = 300
chunk_overlap = 50
timeout_seconds = 300
# environments/prod.tfvars
max_instance_count = 1 # Still 1 to protect QPM quota
chunk_size = 512
chunk_overlap = 100
timeout_seconds = 540
⏭️ What's Next
This is Post 4 of the GCP RAG Pipeline with Terraform series.
- Post 1: Vertex AI RAG Engine - Basic Setup 🔍
- Post 2: Advanced RAG - Chunking, Hybrid Search, Reranking 🧠
- Post 3: AlloyDB AI / pgvector - SQL-Native Vector Search 💰
- Post 4: Auto-Sync Pipeline (you are here) ⚡
Your RAG corpus now stays fresh automatically. Upload a document to GCS, and within minutes it's chunked, embedded, and searchable through Gemini. Eventarc handles the plumbing, Cloud Functions does the work, and Terraform makes it repeatable. ⚡
Found this helpful? Follow for the full RAG Pipeline with Terraform series! 💬
Top comments (0)