Google Vertex AI: Complete Machine Learning Platform Guide
Vertex AI is Google Cloud's unified machine learning platform that brings together all ML services under one roof. This comprehensive guide covers everything from AutoML to custom model training, deployment, and MLOps practices.
Why Vertex AI?
Vertex AI provides: - Unified Platform: All ML tools in one place - AutoML and Custom Training: Flexibility for any skill level - MLOps Integration: End-to-end ML lifecycle management - Pre-trained APIs: Vision, Language, Video, and more - Scalable Infrastructure: Leverage Google's compute power
Getting Started with Vertex AI
Setting Up Your Environment
# setup_vertex_ai.py
from google.cloud import aiplatform
from google.cloud import storage
import os
class VertexAISetup:
"""Initialize Vertex AI environment."""
def __init__(self, project_id, location='us-central1'):
self.project_id = project_id
self.location = location
# Initialize Vertex AI
aiplatform.init(
project=project_id,
location=location,
staging_bucket=f'gs://{project_id}-vertex-staging'
)
# Create necessary buckets
self._create_buckets()
def _create_buckets(self):
"""Create required storage buckets."""
storage_client = storage.Client(project=self.project_id)
buckets = [
f'{self.project_id}-vertex-staging',
f'{self.project_id}-vertex-data',
f'{self.project_id}-vertex-models',
f'{self.project_id}-vertex-pipelines'
]
for bucket_name in buckets:
try:
bucket = storage_client.bucket(bucket_name)
bucket.create(location=self.location)
print(f"Created bucket: {bucket_name}")
except Exception as e:
print(f"Bucket {bucket_name} already exists or error: {e}")
def enable_apis(self):
"""Enable required APIs."""
apis = [
'aiplatform.googleapis.com',
'compute.googleapis.com',
'storage.googleapis.com',
'bigquery.googleapis.com',
'cloudbuild.googleapis.com',
'artifactregistry.googleapis.com'
]
for api in apis:
os.system(f'gcloud services enable {api} --project={self.project_id}')
print("APIs enabled successfully")
def create_service_account(self):
"""Create service account for Vertex AI."""
sa_name = 'vertex-ai-sa'
sa_email = f'{sa_name}@{self.project_id}.iam.gserviceaccount.com'
# Create service account
os.system(f'''
gcloud iam service-accounts create {sa_name} \
--display-name="Vertex AI Service Account" \
--project={self.project_id}
''')
# Grant necessary roles
roles = [
'roles/aiplatform.user',
'roles/bigquery.user',
'roles/storage.objectAdmin',
'roles/compute.admin'
]
for role in roles:
os.system(f'''
gcloud projects add-iam-policy-binding {self.project_id} \
--member="serviceAccount:{sa_email}" \
--role="{role}"
''')
return sa_email
AutoML with Vertex AI
Image Classification with AutoML
# automl_image_classification.py
from google.cloud import aiplatform
from google.cloud import storage
import pandas as pd
class AutoMLImageClassifier:
"""Train image classification models with AutoML."""
def __init__(self, project_id, location='us-central1'):
self.project_id = project_id
self.location = location
aiplatform.init(project=project_id, location=location)
def prepare_dataset(self, image_folder, labels_csv):
"""Prepare dataset for AutoML training."""
# Read labels
df = pd.read_csv(labels_csv)
# Create import file for Vertex AI
import_data = []
for _, row in df.iterrows():
import_data.append({
'imageGcsUri': f"gs://{self.project_id}-vertex-data/{image_folder}/{row['filename']}",
'classificationAnnotation': {
'displayName': row['label']
}
})
# Save import file
import_file = 'image_classification_import.jsonl'
with open(import_file, 'w') as f:
for item in import_data:
f.write(json.dumps(item) + '\n')
# Upload to GCS
storage_client = storage.Client()
bucket = storage_client.bucket(f'{self.project_id}-vertex-data')
blob = bucket.blob('imports/image_classification_import.jsonl')
blob.upload_from_filename(import_file)
return f"gs://{self.project_id}-vertex-data/imports/image_classification_import.jsonl"
def create_dataset(self, display_name, gcs_source):
"""Create image dataset in Vertex AI."""
dataset = aiplatform.ImageDataset.create(
display_name=display_name,
gcs_source=gcs_source,
import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification
)
print(f"Created dataset: {dataset.resource_name}")
return dataset
def train_model(self, dataset, model_display_name, budget_hours=8):
"""Train AutoML image classification model."""
job = aiplatform.AutoMLImageTrainingJob(
display_name=f"{model_display_name}_training",
prediction_type="classification",
multi_label=False,
model_type="CLOUD",
budget_milli_node_hours=budget_hours * 1000,
)
model = job.run(
dataset=dataset,
model_display_name=model_display_name,
training_fraction_split=0.8,
validation_fraction_split=0.1,
test_fraction_split=0.1,
)
print(f"Model trained: {model.resource_name}")
return model
def evaluate_model(self, model):
"""Evaluate model performance."""
evaluations = model.list_model_evaluations()
for evaluation in evaluations:
metrics = evaluation.metrics
print(f"Model Evaluation Metrics:")
print(f" Precision: {metrics.get('auPrc', 'N/A')}")
print(f" Recall: {metrics.get('auRoc', 'N/A')}")
print(f" Confidence Threshold: {metrics.get('confidenceThreshold', 'N/A')}")
# Confusion matrix
if 'confusionMatrix' in metrics:
print("\nConfusion Matrix:")
cm = metrics['confusionMatrix']
for i, row in enumerate(cm['rows']):
print(f" {cm['annotationSpecs'][i]['displayName']}: {row}")
def deploy_model(self, model, endpoint_display_name):
"""Deploy model to endpoint."""
endpoint = aiplatform.Endpoint.create(
display_name=endpoint_display_name,
project=self.project_id,
location=self.location,
)
model.deploy(
endpoint=endpoint,
deployed_model_display_name=model.display_name,
machine_type="n1-standard-4",
min_replica_count=1,
max_replica_count=3,
accelerator_type="NVIDIA_TESLA_K80",
accelerator_count=1,
)
print(f"Model deployed to endpoint: {endpoint.resource_name}")
return endpoint
def predict(self, endpoint, image_path):
"""Make prediction using deployed model."""
with open(image_path, "rb") as f:
image_bytes = f.read()
# Encode image
encoded_content = base64.b64encode(image_bytes).decode("utf-8")
instance = {
"content": encoded_content
}
prediction = endpoint.predict(instances=[instance])
# Parse results
results = []
for i, pred in enumerate(prediction.predictions[0]['confidences']):
results.append({
'class': prediction.predictions[0]['displayNames'][i],
'confidence': pred
})
return sorted(results, key=lambda x: x['confidence'], reverse=True)
Tabular Data with AutoML
# automl_tabular.py
from google.cloud import aiplatform
from google.cloud import bigquery
import pandas as pd
class AutoMLTabular:
"""Train tabular models with AutoML."""
def __init__(self, project_id, location='us-central1'):
self.project_id = project_id
self.location = location
self.bq_client = bigquery.Client(project=project_id)
aiplatform.init(project=project_id, location=location)
def prepare_bigquery_dataset(self, query, dataset_id, table_id):
"""Prepare data in BigQuery."""
# Create dataset if not exists
dataset_ref = f"{self.project_id}.{dataset_id}"
try:
self.bq_client.create_dataset(dataset_ref)
except:
pass
# Run query and save results
job_config = bigquery.QueryJobConfig(
destination=f"{dataset_ref}.{table_id}",
write_disposition="WRITE_TRUNCATE",
)
query_job = self.bq_client.query(query, job_config=job_config)
query_job.result()
print(f"Data prepared in BigQuery: {dataset_ref}.{table_id}")
return f"bq://{dataset_ref}.{table_id}"
def create_tabular_dataset(self, display_name, bq_source):
"""Create tabular dataset from BigQuery."""
dataset = aiplatform.TabularDataset.create(
display_name=display_name,
bq_source=bq_source,
)
print(f"Created dataset: {dataset.resource_name}")
return dataset
def train_classification_model(self, dataset, target_column, model_display_name):
"""Train tabular classification model."""
job = aiplatform.AutoMLTabularTrainingJob(
display_name=f"{model_display_name}_training",
optimization_prediction_type="classification",
optimization_objective="minimize-log-loss",
)
# Define column transformations
column_specs = {
"numeric_column": "numeric",
"categorical_column": "categorical",
"text_column": "text",
target_column: "categorical" # target
}
model = job.run(
dataset=dataset,
target_column=target_column,
training_fraction_split=0.8,
validation_fraction_split=0.1,
test_fraction_split=0.1,
predefined_split_column_name=None,
timestamp_split_column_name=None,
weight_column=None,
budget_milli_node_hours=1000,
model_display_name=model_display_name,
disable_early_stopping=False,
)
return model
def train_regression_model(self, dataset, target_column, model_display_name):
"""Train tabular regression model."""
job = aiplatform.AutoMLTabularTrainingJob(
display_name=f"{model_display_name}_training",
optimization_prediction_type="regression",
optimization_objective="minimize-rmse",
)
model = job.run(
dataset=dataset,
target_column=target_column,
training_fraction_split=0.8,
validation_fraction_split=0.1,
test_fraction_split=0.1,
budget_milli_node_hours=1000,
model_display_name=model_display_name,
)
return model
def batch_predict(self, model, input_source, output_uri):
"""Run batch prediction."""
batch_prediction_job = model.batch_predict(
job_display_name=f"{model.display_name}_batch_prediction",
instances_format="bigquery",
predictions_format="bigquery",
source_uri=input_source,
destination_uri=output_uri,
machine_type="n1-standard-4",
max_replica_count=5,
)
batch_prediction_job.wait()
print(f"Batch prediction completed: {batch_prediction_job.resource_name}")
return batch_prediction_job
Custom Model Training
Custom Training with TensorFlow
# custom_training_tensorflow.py
import tensorflow as tf
from google.cloud import aiplatform
import os
class CustomTensorFlowTrainer:
"""Custom TensorFlow model training on Vertex AI."""
def __init__(self, project_id, location='us-central1'):
self.project_id = project_id
self.location = location
aiplatform.init(project=project_id, location=location)
def create_training_script(self):
"""Create TensorFlow training script."""
training_script = '''
import tensorflow as tf
import argparse
import os
import json
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument('--epochs', type=int, default=10)
parser.add_argument('--batch-size', type=int, default=32)
parser.add_argument('--learning-rate', type=float, default=0.001)
parser.add_argument('--model-dir', type=str, required=True)
return parser.parse_args()
def create_model(input_shape, num_classes):
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=input_shape),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Conv2D(64, 3, activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Conv2D(128, 3, activation='relu'),
tf.keras.layers.GlobalAveragePooling2D(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.5),
tf.keras.layers.Dense(num_classes, activation='softmax')
])
return model
def train_model():
args = get_args()
# Load data (example with CIFAR-10)
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()
x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0
y_train = tf.keras.utils.to_categorical(y_train, 10)
y_test = tf.keras.utils.to_categorical(y_test, 10)
# Create model
model = create_model((32, 32, 3), 10)
# Compile model
model.compile(
optimizer=tf.keras.optimizers.Adam(learning_rate=args.learning_rate),
loss='categorical_crossentropy',
metrics=['accuracy']
)
# Setup callbacks
callbacks = [
tf.keras.callbacks.ModelCheckpoint(
os.path.join(args.model_dir, 'checkpoint-{epoch:02d}'),
save_best_only=True
),
tf.keras.callbacks.TensorBoard(
log_dir=os.path.join(args.model_dir, 'logs')
),
tf.keras.callbacks.EarlyStopping(
patience=3,
restore_best_weights=True
)
]
# Train model
history = model.fit(
x_train, y_train,
batch_size=args.batch_size,
epochs=args.epochs,
validation_split=0.1,
callbacks=callbacks
)
# Evaluate model
test_loss, test_acc = model.evaluate(x_test, y_test)
print(f'Test accuracy: {test_acc}')
# Save model
model.save(os.path.join(args.model_dir, 'model'))
# Save metrics
metrics = {
'accuracy': float(test_acc),
'loss': float(test_loss),
'history': history.history
}
with open(os.path.join(args.model_dir, 'metrics.json'), 'w') as f:
json.dump(metrics, f)
if __name__ == '__main__':
train_model()
'''
with open('trainer.py', 'w') as f:
f.write(training_script)
return 'trainer.py'
def create_custom_job(self, display_name, training_script_path):
"""Create custom training job."""
job = aiplatform.CustomJob(
display_name=display_name,
worker_pool_specs=[
{
"machine_spec": {
"machine_type": "n1-standard-4",
"accelerator_type": "NVIDIA_TESLA_K80",
"accelerator_count": 1,
},
"replica_count": 1,
"container_spec": {
"image_uri": "gcr.io/deeplearning-platform-release/tf2-gpu.2-8:latest",
"command": ["python"],
"args": [
training_script_path,
"--epochs=20",
"--batch-size=64",
"--learning-rate=0.001",
"--model-dir=/gcs/model"
],
},
}
],
staging_bucket=f"gs://{self.project_id}-vertex-staging",
)
return job
def run_distributed_training(self, display_name):
"""Run distributed training across multiple machines."""
job = aiplatform.CustomJob(
display_name=display_name,
worker_pool_specs=[
# Chief worker
{
"machine_spec": {
"machine_type": "n1-standard-8",
"accelerator_type": "NVIDIA_TESLA_V100",
"accelerator_count": 2,
},
"replica_count": 1,
"container_spec": {
"image_uri": "gcr.io/deeplearning-platform-release/tf2-gpu.2-8:latest",
"command": ["python"],
"args": ["trainer.py"],
"env": [
{"name": "TF_CONFIG", "value": '{"cluster": {...}, "task": {"type": "chief", "index": 0}}'}
]
},
},
# Workers
{
"machine_spec": {
"machine_type": "n1-standard-8",
"accelerator_type": "NVIDIA_TESLA_V100",
"accelerator_count": 2,
},
"replica_count": 3,
"container_spec": {
"image_uri": "gcr.io/deeplearning-platform-release/tf2-gpu.2-8:latest",
"command": ["python"],
"args": ["trainer.py"],
"env": [
{"name": "TF_CONFIG", "value": '{"cluster": {...}, "task": {"type": "worker", "index": 0}}'}
]
},
},
# Parameter servers
{
"machine_spec": {
"machine_type": "n1-standard-4",
},
"replica_count": 2,
"container_spec": {
"image_uri": "gcr.io/deeplearning-platform-release/tf2-cpu.2-8:latest",
"command": ["python"],
"args": ["trainer.py"],
"env": [
{"name": "TF_CONFIG", "value": '{"cluster": {...}, "task": {"type": "ps", "index": 0}}'}
]
},
},
],
)
job.run(sync=True)
return job
Custom Container Training
# custom_container_training.py
from google.cloud import aiplatform
import docker
import os
class CustomContainerTrainer:
"""Train models using custom containers."""
def __init__(self, project_id, location='us-central1'):
self.project_id = project_id
self.location = location
self.docker_client = docker.from_env()
aiplatform.init(project=project_id, location=location)
def create_dockerfile(self):
"""Create Dockerfile for custom training."""
dockerfile_content = '''
FROM pytorch/pytorch:1.11.0-cuda11.3-cudnn8-runtime
# Install additional dependencies
RUN pip install --no-cache-dir \
google-cloud-storage \
google-cloud-aiplatform \
tensorboard \
scikit-learn \
pandas \
numpy
# Copy training code
WORKDIR /app
COPY . /app
# Set environment variables
ENV PYTHONUNBUFFERED=1
# Run training script
ENTRYPOINT ["python", "train.py"]
'''
with open('Dockerfile', 'w') as f:
f.write(dockerfile_content)
return 'Dockerfile'
def create_training_script(self):
"""Create PyTorch training script."""
training_script = '''
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import argparse
import os
import json
from google.cloud import storage
class SimpleNN(nn.Module):
def __init__(self, input_size, hidden_size, num_classes):
super(SimpleNN, self).__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(hidden_size, num_classes)
def forward(self, x):
out = self.fc1(x)
out = self.relu(out)
out = self.fc2(out)
return out
def train_model(args):
# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
# Create dummy data for example
X_train = torch.randn(1000, 20)
y_train = torch.randint(0, 3, (1000,))
X_test = torch.randn(200, 20)
y_test = torch.randint(0, 3, (200,))
# Create data loaders
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True)
# Initialize model
model = SimpleNN(20, args.hidden_size, 3).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=args.learning_rate)
# Training loop
for epoch in range(args.epochs):
model.train()
running_loss = 0.0
for i, (inputs, labels) in enumerate(train_loader):
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
avg_loss = running_loss / len(train_loader)
print(f'Epoch [{epoch+1}/{args.epochs}], Loss: {avg_loss:.4f}')
# Evaluate model
model.eval()
with torch.no_grad():
X_test, y_test = X_test.to(device), y_test.to(device)
outputs = model(X_test)
_, predicted = torch.max(outputs.data, 1)
accuracy = (predicted == y_test).sum().item() / y_test.size(0)
print(f'Test Accuracy: {accuracy:.4f}')
# Save model
model_path = os.path.join(args.model_dir, 'model.pth')
torch.save(model.state_dict(), model_path)
# Save to GCS if specified
if args.model_dir.startswith('gs://'):
upload_to_gcs(model_path, args.model_dir)
# Save metrics
metrics = {
'final_loss': avg_loss,
'accuracy': accuracy,
'epochs': args.epochs
}
with open('metrics.json', 'w') as f:
json.dump(metrics, f)
def upload_to_gcs(local_path, gcs_path):
"""Upload file to Google Cloud Storage."""
storage_client = storage.Client()
bucket_name = gcs_path.split('/')[2]
blob_name = '/'.join(gcs_path.split('/')[3:])
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.upload_from_filename(local_path)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--epochs', type=int, default=10)
parser.add_argument('--batch-size', type=int, default=32)
parser.add_argument('--learning-rate', type=float, default=0.001)
parser.add_argument('--hidden-size', type=int, default=64)
parser.add_argument('--model-dir', type=str, default='./model')
args = parser.parse_args()
train_model(args)
'''
with open('train.py', 'w') as f:
f.write(training_script)
return 'train.py'
def build_and_push_image(self, image_name):
"""Build and push Docker image to Artifact Registry."""
# Build image
image_tag = f"us-central1-docker.pkg.dev/{self.project_id}/vertex-ai/{image_name}:latest"
print("Building Docker image...")
self.docker_client.images.build(
path=".",
tag=image_tag,
rm=True
)
# Push to Artifact Registry
print("Pushing image to Artifact Registry...")
self.docker_client.images.push(image_tag)
return image_tag
def submit_custom_container_job(self, display_name, image_uri):
"""Submit custom container training job."""
job = aiplatform.CustomContainerTrainingJob(
display_name=display_name,
container_uri=image_uri,
model_serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/pytorch-cpu.1-11:latest",
)
model = job.run(
model_display_name=f"{display_name}_model",
args=[
"--epochs=20",
"--batch-size=64",
"--learning-rate=0.001",
"--model-dir=gs://my-bucket/models"
],
machine_type="n1-standard-4",
accelerator_type="NVIDIA_TESLA_K80",
accelerator_count=1,
)
return model
Vertex AI Pipelines
Building ML Pipelines
# vertex_pipelines.py
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import component, pipeline, Input, Output, Dataset, Model, Metrics
from google.cloud import aiplatform
@component(
packages_to_install=["pandas", "scikit-learn"],
base_image="python:3.9"
)
def load_data(
data_path: str,
output_dataset: Output[Dataset]
):
"""Load and preprocess data."""
import pandas as pd
# Load data
df = pd.read_csv(data_path)
# Basic preprocessing
df = df.dropna()
df = df.drop_duplicates()
# Save processed data
df.to_csv(output_dataset.path, index=False)
# Log metadata
output_dataset.metadata["num_rows"] = len(df)
output_dataset.metadata["num_columns"] = len(df.columns)
@component(
packages_to_install=["pandas", "scikit-learn"],
base_image="python:3.9"
)
def train_model(
dataset: Input[Dataset],
model: Output[Model],
metrics: Output[Metrics]
):
"""Train ML model."""
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score
import joblib
# Load data
df = pd.read_csv(dataset.path)
# Prepare features and target
X = df.drop('target', axis=1)
y = df['target']
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Train model
clf = RandomForestClassifier(n_estimators=100, random_state=42)
clf.fit(X_train, y_train)
# Evaluate model
y_pred = clf.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average='weighted')
recall = recall_score(y_test, y_pred, average='weighted')
# Save model
joblib.dump(clf, model.path)
# Log metrics
metrics.log_metric("accuracy", accuracy)
metrics.log_metric("precision", precision)
metrics.log_metric("recall", recall)
@component(
packages_to_install=["google-cloud-aiplatform"],
base_image="python:3.9"
)
def deploy_model(
model: Input[Model],
project_id: str,
location: str,
endpoint_name: str
) -> str:
"""Deploy model to Vertex AI endpoint."""
from google.cloud import aiplatform
aiplatform.init(project=project_id, location=location)
# Upload model
vertex_model = aiplatform.Model.upload(
display_name=f"{endpoint_name}_model",
artifact_uri=model.uri,
serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest"
)
# Create endpoint
endpoint = aiplatform.Endpoint.create(display_name=endpoint_name)
# Deploy model
endpoint.deploy(
model=vertex_model,
deployed_model_display_name=vertex_model.display_name,
machine_type="n1-standard-4",
min_replica_count=1,
max_replica_count=3
)
return endpoint.resource_name
@pipeline(
name="ml-training-pipeline",
description="End-to-end ML training pipeline"
)
def ml_pipeline(
data_path: str,
project_id: str,
location: str = "us-central1"
):
"""Complete ML pipeline."""
# Load and preprocess data
data_task = load_data(data_path=data_path)
# Train model
train_task = train_model(dataset=data_task.outputs["output_dataset"])
# Deploy model
deploy_task = deploy_model(
model=train_task.outputs["model"],
project_id=project_id,
location=location,
endpoint_name="ml-pipeline-endpoint"
)
class PipelineManager:
"""Manage Vertex AI Pipelines."""
def __init__(self, project_id, location='us-central1'):
self.project_id = project_id
self.location = location
aiplatform.init(project=project_id, location=location)
def compile_pipeline(self, pipeline_func, output_path):
"""Compile pipeline to JSON."""
compiler.Compiler().compile(
pipeline_func=pipeline_func,
package_path=output_path
)
def submit_pipeline(self, pipeline_path, display_name, parameters):
"""Submit pipeline for execution."""
job = aiplatform.PipelineJob(
display_name=display_name,
template_path=pipeline_path,
pipeline_root=f"gs://{self.project_id}-vertex-pipelines",
parameter_values=parameters,
)
job.submit()
return job
def create_scheduled_pipeline(self, pipeline_path, schedule):
"""Create scheduled pipeline execution."""
from google.cloud import scheduler_v1
scheduler_client = scheduler_v1.CloudSchedulerClient()
parent = f"projects/{self.project_id}/locations/{self.location}"
job = {
"name": f"{parent}/jobs/ml-pipeline-schedule",
"description": "Scheduled ML pipeline execution",
"schedule": schedule, # e.g., "0 2 * * *" for daily at 2 AM
"http_target": {
"uri": f"https://{self.location}-aiplatform.googleapis.com/v1/projects/{self.project_id}/locations/{self.location}/pipelineJobs",
"http_method": "POST",
"headers": {
"Content-Type": "application/json",
},
"body": json.dumps({
"displayName": "scheduled-pipeline-run",
"templatePath": pipeline_path,
"pipelineRoot": f"gs://{self.project_id}-vertex-pipelines",
}).encode()
}
}
scheduler_client.create_job(parent=parent, job=job)
Model Deployment and Serving
Advanced Model Deployment
# model_deployment.py
from google.cloud import aiplatform
from google.cloud import monitoring_v3
import numpy as np
class ModelDeploymentManager:
"""Manage model deployment and serving."""
def __init__(self, project_id, location='us-central1'):
self.project_id = project_id
self.location = location
self.monitoring_client = monitoring_v3.MetricServiceClient()
aiplatform.init(project=project_id, location=location)
def deploy_with_traffic_split(self, models, endpoint_name, traffic_split):
"""Deploy multiple models with traffic splitting."""
# Create endpoint
endpoint = aiplatform.Endpoint.create(
display_name=endpoint_name,
description="Multi-model endpoint with traffic split"
)
# Deploy models with traffic split
deployed_models = []
for i, (model, traffic) in enumerate(zip(models, traffic_split)):
deployed_model = model.deploy(
endpoint=endpoint,
deployed_model_display_name=f"{model.display_name}_v{i}",
machine_type="n1-standard-4",
min_replica_count=1,
max_replica_count=5,
traffic_percentage=traffic,
sync=False
)
deployed_models.append(deployed_model)
# Wait for all deployments
for dm in deployed_models:
dm.wait()
return endpoint
def create_batch_prediction_job(self, model, input_source, output_destination):
"""Create batch prediction job."""
batch_prediction_job = model.batch_predict(
job_display_name=f"{model.display_name}_batch_prediction",
gcs_source=input_source,
gcs_destination_prefix=output_destination,
machine_type="n1-standard-4",
max_replica_count=10,
accelerator_type="NVIDIA_TESLA_K80",
accelerator_count=1,
)
return batch_prediction_job
def setup_model_monitoring(self, endpoint, model_name):
"""Setup model monitoring for drift detection."""
# Create model monitoring job
monitoring_job = aiplatform.ModelDeploymentMonitoringJob.create(
display_name=f"{model_name}_monitoring",
endpoint=endpoint,
logging_sampling_strategy={"sample_rate": 0.1},
predict_instance_schema_uri="gs://my-bucket/schemas/predict.yaml",
analysis_instance_schema_uri="gs://my-bucket/schemas/analysis.yaml",
objective_configs=[
{
"deployed_model_id": model_name,
"objective_config": {
"training_dataset": {
"dataset": "bq://project.dataset.training_data",
"logging_sampling_strategy": {"sample_rate": 1.0},
},
"training_prediction_skew_detection_config": {
"skew_thresholds": {
"feature_1": {"value": 0.001},
"feature_2": {"value": 0.001},
}
},
"prediction_drift_detection_config": {
"drift_thresholds": {
"feature_1": {"value": 0.001},
"feature_2": {"value": 0.001},
}
}
}
}
],
alert_config={
"user_emails": ["ml-team@company.com"],
"enable_logging": True
},
schedule_config={
"monitor_interval": {"seconds": 3600} # Monitor every hour
}
)
return monitoring_job
def create_explanation_metadata(self):
"""Create explanation metadata for model interpretability."""
explanation_metadata = {
"inputs": {
"feature_1": {
"input_tensor_name": "dense_input",
"encoding": "IDENTITY",
"modality": "numeric"
},
"feature_2": {
"input_tensor_name": "dense_input",
"encoding": "IDENTITY",
"modality": "numeric"
}
},
"outputs": {
"prediction": {
"output_tensor_name": "dense_2"
}
}
}
explanation_parameters = {
"sampled_shapley_attribution": {
"path_count": 10
}
}
return explanation_metadata, explanation_parameters
def online_predict_with_explanation(self, endpoint, instances):
"""Make predictions with explanations."""
predictions = endpoint.predict(
instances=instances,
parameters={"generate_explanation": True}
)
# Parse results
results = []
for i, (pred, expl) in enumerate(zip(
predictions.predictions,
predictions.explanations
)):
results.append({
"instance": instances[i],
"prediction": pred,
"feature_attributions": expl.attributions
})
return results
Custom Prediction Routine
# custom_predictor.py
from google.cloud import aiplatform
from typing import Dict, List, Any
import numpy as np
import joblib
class CustomPredictor:
"""Custom prediction routine for Vertex AI."""
def __init__(self):
self._model = None
self._preprocessor = None
def load(self, artifacts_path: str):
"""Load model artifacts."""
self._model = joblib.load(f"{artifacts_path}/model.pkl")
self._preprocessor = joblib.load(f"{artifacts_path}/preprocessor.pkl")
def preprocess(self, instances: List[Dict[str, Any]]) -> np.ndarray:
"""Preprocess input data."""
# Convert to numpy array
data = np.array([[
instance['feature1'],
instance['feature2'],
instance['feature3']
] for instance in instances])
# Apply preprocessing
return self._preprocessor.transform(data)
def predict(self, instances: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Make predictions."""
# Preprocess
preprocessed = self.preprocess(instances)
# Predict
predictions = self._model.predict(preprocessed)
probabilities = self._model.predict_proba(preprocessed)
# Format results
results = []
for pred, probs in zip(predictions, probabilities):
results.append({
"prediction": int(pred),
"probabilities": probs.tolist(),
"confidence": float(np.max(probs))
})
return results
def postprocess(self, predictions: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Postprocess predictions."""
# Add business logic
for pred in predictions:
if pred['confidence'] < 0.7:
pred['recommendation'] = 'Manual review recommended'
else:
pred['recommendation'] = 'Automated action approved'
return predictions
# Deployment configuration
def deploy_custom_predictor(model_path, endpoint_name):
"""Deploy model with custom prediction routine."""
# Package custom code
from setuptools import setup, find_packages
setup(
name="custom_predictor",
version="1.0",
packages=find_packages(),
install_requires=[
"numpy",
"scikit-learn",
"joblib"
],
python_requires=">=3.7",
)
# Upload model
model = aiplatform.Model.upload(
display_name=f"{endpoint_name}_model",
artifact_uri=model_path,
serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest",
serving_container_predict_route="/predict",
serving_container_health_route="/health",
serving_container_environment_variables={
"MODEL_NAME": "custom_model"
},
serving_container_command=["python", "-m", "custom_predictor"],
)
return model
MLOps Best Practices
Experiment Tracking
# experiment_tracking.py
from google.cloud import aiplatform
import mlflow
import mlflow.tensorflow
from datetime import datetime
class ExperimentTracker:
"""Track ML experiments with Vertex AI."""
def __init__(self, project_id, location='us-central1'):
self.project_id = project_id
self.location = location
aiplatform.init(project=project_id, location=location)
# Setup MLflow with Vertex AI
mlflow.set_tracking_uri(f"https://{location}-aiplatform.googleapis.com")
mlflow.set_experiment(f"{project_id}-experiments")
def track_experiment(self, experiment_name, params, metrics, artifacts):
"""Track experiment details."""
experiment = aiplatform.Experiment.create(
experiment_name=experiment_name,
description=f"Experiment run on {datetime.now()}",
)
with experiment.start_run(run_name=f"run-{datetime.now().strftime('%Y%m%d-%H%M%S')}") as run:
# Log parameters
for key, value in params.items():
run.log_params({key: value})
# Log metrics
for key, value in metrics.items():
run.log_metrics({key: value})
# Log artifacts
for artifact_path in artifacts:
run.log_artifact(artifact_path)
# Log model
if 'model_path' in artifacts:
run.log_model(artifacts['model_path'], "model")
return experiment
def compare_experiments(self, experiment_names):
"""Compare multiple experiments."""
experiments = []
for name in experiment_names:
exp = aiplatform.Experiment(experiment_name=name)
experiments.append(exp)
# Get all runs
all_runs = []
for exp in experiments:
runs = exp.list_runs()
all_runs.extend(runs)
# Compare metrics
comparison = []
for run in all_runs:
comparison.append({
'experiment': run.experiment_name,
'run_name': run.name,
'metrics': run.get_metrics(),
'params': run.get_params()
})
return pd.DataFrame(comparison)
def register_best_model(self, experiment_name, metric_name='accuracy'):
"""Register best model from experiment."""
experiment = aiplatform.Experiment(experiment_name=experiment_name)
# Find best run
best_run = None
best_metric = -float('inf')
for run in experiment.list_runs():
metrics = run.get_metrics()
if metric_name in metrics and metrics[metric_name] > best_metric:
best_metric = metrics[metric_name]
best_run = run
if best_run:
# Register model
model = aiplatform.Model.upload(
display_name=f"{experiment_name}_best_model",
artifact_uri=best_run.get_artifact_uri("model"),
serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-8:latest",
)
# Add metadata
model.update(
labels={
"experiment": experiment_name,
"metric": metric_name,
"value": str(best_metric)
}
)
return model
Model Registry and Versioning
# model_registry.py
from google.cloud import aiplatform
from typing import Dict, List
import hashlib
class ModelRegistry:
"""Manage model versions and registry."""
def __init__(self, project_id, location='us-central1'):
self.project_id = project_id
self.location = location
aiplatform.init(project=project_id, location=location)
def register_model(self, model_name, artifact_uri, framework, version=None):
"""Register new model version."""
# Generate version if not provided
if not version:
version = self._generate_version(artifact_uri)
# Upload model
model = aiplatform.Model.upload(
display_name=f"{model_name}_v{version}",
artifact_uri=artifact_uri,
serving_container_image_uri=self._get_container_image(framework),
labels={
"model_name": model_name,
"version": version,
"framework": framework,
"stage": "staging"
}
)
return model
def promote_model(self, model_id, stage='production'):
"""Promote model to different stage."""
model = aiplatform.Model(model_id)
# Update labels
labels = model.labels.copy()
labels['stage'] = stage
labels['promoted_at'] = datetime.now().isoformat()
model.update(labels=labels)
# If promoting to production, update the production endpoint
if stage == 'production':
self._update_production_endpoint(model)
return model
def get_model_lineage(self, model_name):
"""Get model lineage and versions."""
# List all models with the name
models = aiplatform.Model.list(
filter=f'labels.model_name="{model_name}"',
order_by="create_time desc"
)
lineage = []
for model in models:
lineage.append({
'model_id': model.resource_name,
'version': model.labels.get('version', 'unknown'),
'stage': model.labels.get('stage', 'none'),
'created': model.create_time,
'metrics': self._get_model_metrics(model)
})
return lineage
def _generate_version(self, artifact_uri):
"""Generate version hash from artifact."""
return hashlib.sha256(artifact_uri.encode()).hexdigest()[:8]
def _get_container_image(self, framework):
"""Get appropriate serving container image."""
container_images = {
'tensorflow': 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-8:latest',
'pytorch': 'us-docker.pkg.dev/vertex-ai/prediction/pytorch-cpu.1-11:latest',
'sklearn': 'us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest',
'xgboost': 'us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-6:latest',
}
return container_images.get(framework, container_images['sklearn'])
def _update_production_endpoint(self, model):
"""Update production endpoint with new model."""
# Find production endpoint
endpoints = aiplatform.Endpoint.list(
filter='labels.environment="production"'
)
if endpoints:
endpoint = endpoints[0]
# Undeploy old models
for deployed_model in endpoint.list_models():
if deployed_model.display_name != model.display_name:
endpoint.undeploy(deployed_model_id=deployed_model.id)
# Deploy new model
model.deploy(
endpoint=endpoint,
deployed_model_display_name=model.display_name,
machine_type="n1-standard-4",
min_replica_count=2,
max_replica_count=10,
traffic_percentage=100
)
Integration with Other Services
BigQuery ML Integration
# bigquery_ml_integration.py
from google.cloud import bigquery
from google.cloud import aiplatform
class BigQueryMLIntegration:
"""Integrate BigQuery ML with Vertex AI."""
def __init__(self, project_id):
self.project_id = project_id
self.bq_client = bigquery.Client(project=project_id)
aiplatform.init(project=project_id)
def export_bqml_model_to_vertex(self, bq_model_ref, vertex_model_name):
"""Export BigQuery ML model to Vertex AI."""
# Export model from BigQuery
export_job = self.bq_client.extract_model(
bq_model_ref,
f"gs://{self.project_id}-vertex-models/{vertex_model_name}",
)
export_job.result() # Wait for export
# Import to Vertex AI
model = aiplatform.Model.upload(
display_name=vertex_model_name,
artifact_uri=f"gs://{self.project_id}-vertex-models/{vertex_model_name}",
serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-8:latest",
)
return model
def create_feature_engineering_pipeline(self, query, feature_table):
"""Create feature engineering pipeline."""
# Create feature engineering query
feature_query = f"""
CREATE OR REPLACE TABLE `{self.project_id}.{feature_table}` AS
{query}
"""
# Execute query
query_job = self.bq_client.query(feature_query)
query_job.result()
# Create Vertex AI dataset from BigQuery
dataset = aiplatform.TabularDataset.create(
display_name=f"{feature_table}_dataset",
bq_source=f"bq://{self.project_id}.{feature_table}",
)
return dataset
Best Practices and Production Tips
Production ML Pipeline
# production_ml_pipeline.py
class ProductionMLPipeline:
"""Production-ready ML pipeline with best practices."""
def __init__(self, project_id, location='us-central1'):
self.project_id = project_id
self.location = location
self.setup_monitoring()
def setup_monitoring(self):
"""Setup comprehensive monitoring."""
# Create monitoring dashboard
dashboard_config = {
"displayName": "ML Pipeline Dashboard",
"mosaicLayout": {
"tiles": [
{
"widget": {
"title": "Model Predictions",
"xyChart": {
"dataSets": [{
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": 'resource.type="aiplatform.googleapis.com/Endpoint"'
}
}
}]
}
}
},
{
"widget": {
"title": "Model Latency",
"xyChart": {
"dataSets": [{
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": 'metric.type="aiplatform.googleapis.com/prediction/latency"'
}
}
}]
}
}
}
]
}
}
return dashboard_config
def implement_model_governance(self):
"""Implement model governance policies."""
governance_policies = {
"model_approval": {
"required_metrics": ["accuracy", "precision", "recall"],
"minimum_thresholds": {
"accuracy": 0.85,
"precision": 0.80,
"recall": 0.80
},
"required_tests": [
"bias_detection",
"fairness_check",
"performance_validation"
]
},
"deployment_gates": {
"stages": ["dev", "staging", "production"],
"approval_required": ["staging", "production"],
"automated_tests": True
},
"monitoring_requirements": {
"drift_detection": True,
"performance_monitoring": True,
"data_quality_checks": True
}
}
return governance_policies
def create_ci_cd_pipeline(self):
"""Create CI/CD pipeline for ML."""
cloudbuild_config = """
steps:
# Run unit tests
- name: 'python:3.9'
entrypoint: 'pytest'
args: ['tests/']
# Validate data
- name: 'gcr.io/$PROJECT_ID/ml-validator'
args: ['--data-path', 'gs://$PROJECT_ID-data/']
# Train model
- name: 'gcr.io/deeplearning-platform-release/tf2-gpu.2-8'
args: ['python', 'train.py']
# Evaluate model
- name: 'gcr.io/$PROJECT_ID/ml-evaluator'
args: ['--model-path', 'gs://$PROJECT_ID-models/']
# Deploy if tests pass
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
entrypoint: 'gcloud'
args: [
'ai', 'models', 'upload',
'--region', 'us-central1',
'--display-name', 'production-model',
'--artifact-uri', 'gs://$PROJECT_ID-models/'
]
timeout: '3600s'
options:
machineType: 'E2_HIGHCPU_8'
"""
return cloudbuild_config
Conclusion
Vertex AI provides a comprehensive platform for building, deploying, and managing machine learning models at scale. Key benefits:
- Unified Platform: All ML tools and services in one place
- Flexibility: Support for AutoML and custom training
- Scalability: Leverage Google's infrastructure
- MLOps Integration: End-to-end ML lifecycle management
- Pre-trained Models: Access to Google's AI capabilities
Best Practices Summary
- Start with AutoML: For quick prototyping and baseline models
- Use Pipelines: Automate and reproduce ML workflows
- Implement Monitoring: Track model performance and drift
- Version Everything: Models, data, and code
- Security First: Use IAM, VPC, and encryption
- Cost Optimization: Use preemptible instances for training
- Documentation: Document models, features, and decisions
Next Steps
- Explore Vertex AI Feature Store for feature management
- Learn about Vertex AI Matching Engine for similarity search
- Study Vertex AI Model Monitoring for production deployments
- Implement Vertex AI Experiments for systematic experimentation
- Get certified as a Google Cloud ML Engineer
Remember: Vertex AI simplifies ML development, but following MLOps best practices ensures your models are production-ready, maintainable, and deliver business value.