Google Vertex AI: Complete Machine Learning Platform Guide

Tyler Maginnis | February 07, 2024

Google CloudVertex AImachine learningMLOpsAI Platform

Need Professional Google Cloud Services?

Get expert assistance with your google cloud services implementation and management. Tyler on Tech Louisville provides priority support for Louisville businesses.

Same-day service available for Louisville area

Google Vertex AI: Complete Machine Learning Platform Guide

Vertex AI is Google Cloud's unified machine learning platform that brings together all ML services under one roof. This comprehensive guide covers everything from AutoML to custom model training, deployment, and MLOps practices.

Why Vertex AI?

Vertex AI provides: - Unified Platform: All ML tools in one place - AutoML and Custom Training: Flexibility for any skill level - MLOps Integration: End-to-end ML lifecycle management - Pre-trained APIs: Vision, Language, Video, and more - Scalable Infrastructure: Leverage Google's compute power

Getting Started with Vertex AI

Setting Up Your Environment

# setup_vertex_ai.py
from google.cloud import aiplatform
from google.cloud import storage
import os

class VertexAISetup:
    """Initialize Vertex AI environment."""

    def __init__(self, project_id, location='us-central1'):
        self.project_id = project_id
        self.location = location

        # Initialize Vertex AI
        aiplatform.init(
            project=project_id,
            location=location,
            staging_bucket=f'gs://{project_id}-vertex-staging'
        )

        # Create necessary buckets
        self._create_buckets()

    def _create_buckets(self):
        """Create required storage buckets."""
        storage_client = storage.Client(project=self.project_id)

        buckets = [
            f'{self.project_id}-vertex-staging',
            f'{self.project_id}-vertex-data',
            f'{self.project_id}-vertex-models',
            f'{self.project_id}-vertex-pipelines'
        ]

        for bucket_name in buckets:
            try:
                bucket = storage_client.bucket(bucket_name)
                bucket.create(location=self.location)
                print(f"Created bucket: {bucket_name}")
            except Exception as e:
                print(f"Bucket {bucket_name} already exists or error: {e}")

    def enable_apis(self):
        """Enable required APIs."""
        apis = [
            'aiplatform.googleapis.com',
            'compute.googleapis.com',
            'storage.googleapis.com',
            'bigquery.googleapis.com',
            'cloudbuild.googleapis.com',
            'artifactregistry.googleapis.com'
        ]

        for api in apis:
            os.system(f'gcloud services enable {api} --project={self.project_id}')

        print("APIs enabled successfully")

    def create_service_account(self):
        """Create service account for Vertex AI."""
        sa_name = 'vertex-ai-sa'
        sa_email = f'{sa_name}@{self.project_id}.iam.gserviceaccount.com'

        # Create service account
        os.system(f'''
            gcloud iam service-accounts create {sa_name} \
                --display-name="Vertex AI Service Account" \
                --project={self.project_id}
        ''')

        # Grant necessary roles
        roles = [
            'roles/aiplatform.user',
            'roles/bigquery.user',
            'roles/storage.objectAdmin',
            'roles/compute.admin'
        ]

        for role in roles:
            os.system(f'''
                gcloud projects add-iam-policy-binding {self.project_id} \
                    --member="serviceAccount:{sa_email}" \
                    --role="{role}"
            ''')

        return sa_email

AutoML with Vertex AI

Image Classification with AutoML

# automl_image_classification.py
from google.cloud import aiplatform
from google.cloud import storage
import pandas as pd

class AutoMLImageClassifier:
    """Train image classification models with AutoML."""

    def __init__(self, project_id, location='us-central1'):
        self.project_id = project_id
        self.location = location
        aiplatform.init(project=project_id, location=location)

    def prepare_dataset(self, image_folder, labels_csv):
        """Prepare dataset for AutoML training."""
        # Read labels
        df = pd.read_csv(labels_csv)

        # Create import file for Vertex AI
        import_data = []
        for _, row in df.iterrows():
            import_data.append({
                'imageGcsUri': f"gs://{self.project_id}-vertex-data/{image_folder}/{row['filename']}",
                'classificationAnnotation': {
                    'displayName': row['label']
                }
            })

        # Save import file
        import_file = 'image_classification_import.jsonl'
        with open(import_file, 'w') as f:
            for item in import_data:
                f.write(json.dumps(item) + '\n')

        # Upload to GCS
        storage_client = storage.Client()
        bucket = storage_client.bucket(f'{self.project_id}-vertex-data')
        blob = bucket.blob('imports/image_classification_import.jsonl')
        blob.upload_from_filename(import_file)

        return f"gs://{self.project_id}-vertex-data/imports/image_classification_import.jsonl"

    def create_dataset(self, display_name, gcs_source):
        """Create image dataset in Vertex AI."""
        dataset = aiplatform.ImageDataset.create(
            display_name=display_name,
            gcs_source=gcs_source,
            import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification
        )

        print(f"Created dataset: {dataset.resource_name}")
        return dataset

    def train_model(self, dataset, model_display_name, budget_hours=8):
        """Train AutoML image classification model."""
        job = aiplatform.AutoMLImageTrainingJob(
            display_name=f"{model_display_name}_training",
            prediction_type="classification",
            multi_label=False,
            model_type="CLOUD",
            budget_milli_node_hours=budget_hours * 1000,
        )

        model = job.run(
            dataset=dataset,
            model_display_name=model_display_name,
            training_fraction_split=0.8,
            validation_fraction_split=0.1,
            test_fraction_split=0.1,
        )

        print(f"Model trained: {model.resource_name}")
        return model

    def evaluate_model(self, model):
        """Evaluate model performance."""
        evaluations = model.list_model_evaluations()

        for evaluation in evaluations:
            metrics = evaluation.metrics
            print(f"Model Evaluation Metrics:")
            print(f"  Precision: {metrics.get('auPrc', 'N/A')}")
            print(f"  Recall: {metrics.get('auRoc', 'N/A')}")
            print(f"  Confidence Threshold: {metrics.get('confidenceThreshold', 'N/A')}")

            # Confusion matrix
            if 'confusionMatrix' in metrics:
                print("\nConfusion Matrix:")
                cm = metrics['confusionMatrix']
                for i, row in enumerate(cm['rows']):
                    print(f"  {cm['annotationSpecs'][i]['displayName']}: {row}")

    def deploy_model(self, model, endpoint_display_name):
        """Deploy model to endpoint."""
        endpoint = aiplatform.Endpoint.create(
            display_name=endpoint_display_name,
            project=self.project_id,
            location=self.location,
        )

        model.deploy(
            endpoint=endpoint,
            deployed_model_display_name=model.display_name,
            machine_type="n1-standard-4",
            min_replica_count=1,
            max_replica_count=3,
            accelerator_type="NVIDIA_TESLA_K80",
            accelerator_count=1,
        )

        print(f"Model deployed to endpoint: {endpoint.resource_name}")
        return endpoint

    def predict(self, endpoint, image_path):
        """Make prediction using deployed model."""
        with open(image_path, "rb") as f:
            image_bytes = f.read()

        # Encode image
        encoded_content = base64.b64encode(image_bytes).decode("utf-8")

        instance = {
            "content": encoded_content
        }

        prediction = endpoint.predict(instances=[instance])

        # Parse results
        results = []
        for i, pred in enumerate(prediction.predictions[0]['confidences']):
            results.append({
                'class': prediction.predictions[0]['displayNames'][i],
                'confidence': pred
            })

        return sorted(results, key=lambda x: x['confidence'], reverse=True)

Tabular Data with AutoML

# automl_tabular.py
from google.cloud import aiplatform
from google.cloud import bigquery
import pandas as pd

class AutoMLTabular:
    """Train tabular models with AutoML."""

    def __init__(self, project_id, location='us-central1'):
        self.project_id = project_id
        self.location = location
        self.bq_client = bigquery.Client(project=project_id)
        aiplatform.init(project=project_id, location=location)

    def prepare_bigquery_dataset(self, query, dataset_id, table_id):
        """Prepare data in BigQuery."""
        # Create dataset if not exists
        dataset_ref = f"{self.project_id}.{dataset_id}"
        try:
            self.bq_client.create_dataset(dataset_ref)
        except:
            pass

        # Run query and save results
        job_config = bigquery.QueryJobConfig(
            destination=f"{dataset_ref}.{table_id}",
            write_disposition="WRITE_TRUNCATE",
        )

        query_job = self.bq_client.query(query, job_config=job_config)
        query_job.result()

        print(f"Data prepared in BigQuery: {dataset_ref}.{table_id}")
        return f"bq://{dataset_ref}.{table_id}"

    def create_tabular_dataset(self, display_name, bq_source):
        """Create tabular dataset from BigQuery."""
        dataset = aiplatform.TabularDataset.create(
            display_name=display_name,
            bq_source=bq_source,
        )

        print(f"Created dataset: {dataset.resource_name}")
        return dataset

    def train_classification_model(self, dataset, target_column, model_display_name):
        """Train tabular classification model."""
        job = aiplatform.AutoMLTabularTrainingJob(
            display_name=f"{model_display_name}_training",
            optimization_prediction_type="classification",
            optimization_objective="minimize-log-loss",
        )

        # Define column transformations
        column_specs = {
            "numeric_column": "numeric",
            "categorical_column": "categorical",
            "text_column": "text",
            target_column: "categorical"  # target
        }

        model = job.run(
            dataset=dataset,
            target_column=target_column,
            training_fraction_split=0.8,
            validation_fraction_split=0.1,
            test_fraction_split=0.1,
            predefined_split_column_name=None,
            timestamp_split_column_name=None,
            weight_column=None,
            budget_milli_node_hours=1000,
            model_display_name=model_display_name,
            disable_early_stopping=False,
        )

        return model

    def train_regression_model(self, dataset, target_column, model_display_name):
        """Train tabular regression model."""
        job = aiplatform.AutoMLTabularTrainingJob(
            display_name=f"{model_display_name}_training",
            optimization_prediction_type="regression",
            optimization_objective="minimize-rmse",
        )

        model = job.run(
            dataset=dataset,
            target_column=target_column,
            training_fraction_split=0.8,
            validation_fraction_split=0.1,
            test_fraction_split=0.1,
            budget_milli_node_hours=1000,
            model_display_name=model_display_name,
        )

        return model

    def batch_predict(self, model, input_source, output_uri):
        """Run batch prediction."""
        batch_prediction_job = model.batch_predict(
            job_display_name=f"{model.display_name}_batch_prediction",
            instances_format="bigquery",
            predictions_format="bigquery",
            source_uri=input_source,
            destination_uri=output_uri,
            machine_type="n1-standard-4",
            max_replica_count=5,
        )

        batch_prediction_job.wait()

        print(f"Batch prediction completed: {batch_prediction_job.resource_name}")
        return batch_prediction_job

Custom Model Training

Custom Training with TensorFlow

# custom_training_tensorflow.py
import tensorflow as tf
from google.cloud import aiplatform
import os

class CustomTensorFlowTrainer:
    """Custom TensorFlow model training on Vertex AI."""

    def __init__(self, project_id, location='us-central1'):
        self.project_id = project_id
        self.location = location
        aiplatform.init(project=project_id, location=location)

    def create_training_script(self):
        """Create TensorFlow training script."""
        training_script = '''
import tensorflow as tf
import argparse
import os
import json

def get_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--epochs', type=int, default=10)
    parser.add_argument('--batch-size', type=int, default=32)
    parser.add_argument('--learning-rate', type=float, default=0.001)
    parser.add_argument('--model-dir', type=str, required=True)
    return parser.parse_args()

def create_model(input_shape, num_classes):
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=input_shape),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Conv2D(64, 3, activation='relu'),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Conv2D(128, 3, activation='relu'),
        tf.keras.layers.GlobalAveragePooling2D(),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.5),
        tf.keras.layers.Dense(num_classes, activation='softmax')
    ])
    return model

def train_model():
    args = get_args()

    # Load data (example with CIFAR-10)
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()
    x_train = x_train.astype('float32') / 255.0
    x_test = x_test.astype('float32') / 255.0
    y_train = tf.keras.utils.to_categorical(y_train, 10)
    y_test = tf.keras.utils.to_categorical(y_test, 10)

    # Create model
    model = create_model((32, 32, 3), 10)

    # Compile model
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=args.learning_rate),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    # Setup callbacks
    callbacks = [
        tf.keras.callbacks.ModelCheckpoint(
            os.path.join(args.model_dir, 'checkpoint-{epoch:02d}'),
            save_best_only=True
        ),
        tf.keras.callbacks.TensorBoard(
            log_dir=os.path.join(args.model_dir, 'logs')
        ),
        tf.keras.callbacks.EarlyStopping(
            patience=3,
            restore_best_weights=True
        )
    ]

    # Train model
    history = model.fit(
        x_train, y_train,
        batch_size=args.batch_size,
        epochs=args.epochs,
        validation_split=0.1,
        callbacks=callbacks
    )

    # Evaluate model
    test_loss, test_acc = model.evaluate(x_test, y_test)
    print(f'Test accuracy: {test_acc}')

    # Save model
    model.save(os.path.join(args.model_dir, 'model'))

    # Save metrics
    metrics = {
        'accuracy': float(test_acc),
        'loss': float(test_loss),
        'history': history.history
    }

    with open(os.path.join(args.model_dir, 'metrics.json'), 'w') as f:
        json.dump(metrics, f)

if __name__ == '__main__':
    train_model()
'''

        with open('trainer.py', 'w') as f:
            f.write(training_script)

        return 'trainer.py'

    def create_custom_job(self, display_name, training_script_path):
        """Create custom training job."""
        job = aiplatform.CustomJob(
            display_name=display_name,
            worker_pool_specs=[
                {
                    "machine_spec": {
                        "machine_type": "n1-standard-4",
                        "accelerator_type": "NVIDIA_TESLA_K80",
                        "accelerator_count": 1,
                    },
                    "replica_count": 1,
                    "container_spec": {
                        "image_uri": "gcr.io/deeplearning-platform-release/tf2-gpu.2-8:latest",
                        "command": ["python"],
                        "args": [
                            training_script_path,
                            "--epochs=20",
                            "--batch-size=64",
                            "--learning-rate=0.001",
                            "--model-dir=/gcs/model"
                        ],
                    },
                }
            ],
            staging_bucket=f"gs://{self.project_id}-vertex-staging",
        )

        return job

    def run_distributed_training(self, display_name):
        """Run distributed training across multiple machines."""
        job = aiplatform.CustomJob(
            display_name=display_name,
            worker_pool_specs=[
                # Chief worker
                {
                    "machine_spec": {
                        "machine_type": "n1-standard-8",
                        "accelerator_type": "NVIDIA_TESLA_V100",
                        "accelerator_count": 2,
                    },
                    "replica_count": 1,
                    "container_spec": {
                        "image_uri": "gcr.io/deeplearning-platform-release/tf2-gpu.2-8:latest",
                        "command": ["python"],
                        "args": ["trainer.py"],
                        "env": [
                            {"name": "TF_CONFIG", "value": '{"cluster": {...}, "task": {"type": "chief", "index": 0}}'}
                        ]
                    },
                },
                # Workers
                {
                    "machine_spec": {
                        "machine_type": "n1-standard-8",
                        "accelerator_type": "NVIDIA_TESLA_V100",
                        "accelerator_count": 2,
                    },
                    "replica_count": 3,
                    "container_spec": {
                        "image_uri": "gcr.io/deeplearning-platform-release/tf2-gpu.2-8:latest",
                        "command": ["python"],
                        "args": ["trainer.py"],
                        "env": [
                            {"name": "TF_CONFIG", "value": '{"cluster": {...}, "task": {"type": "worker", "index": 0}}'}
                        ]
                    },
                },
                # Parameter servers
                {
                    "machine_spec": {
                        "machine_type": "n1-standard-4",
                    },
                    "replica_count": 2,
                    "container_spec": {
                        "image_uri": "gcr.io/deeplearning-platform-release/tf2-cpu.2-8:latest",
                        "command": ["python"],
                        "args": ["trainer.py"],
                        "env": [
                            {"name": "TF_CONFIG", "value": '{"cluster": {...}, "task": {"type": "ps", "index": 0}}'}
                        ]
                    },
                },
            ],
        )

        job.run(sync=True)
        return job

Custom Container Training

# custom_container_training.py
from google.cloud import aiplatform
import docker
import os

class CustomContainerTrainer:
    """Train models using custom containers."""

    def __init__(self, project_id, location='us-central1'):
        self.project_id = project_id
        self.location = location
        self.docker_client = docker.from_env()
        aiplatform.init(project=project_id, location=location)

    def create_dockerfile(self):
        """Create Dockerfile for custom training."""
        dockerfile_content = '''
FROM pytorch/pytorch:1.11.0-cuda11.3-cudnn8-runtime

# Install additional dependencies
RUN pip install --no-cache-dir \
    google-cloud-storage \
    google-cloud-aiplatform \
    tensorboard \
    scikit-learn \
    pandas \
    numpy

# Copy training code
WORKDIR /app
COPY . /app

# Set environment variables
ENV PYTHONUNBUFFERED=1

# Run training script
ENTRYPOINT ["python", "train.py"]
'''

        with open('Dockerfile', 'w') as f:
            f.write(dockerfile_content)

        return 'Dockerfile'

    def create_training_script(self):
        """Create PyTorch training script."""
        training_script = '''
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import argparse
import os
import json
from google.cloud import storage

class SimpleNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)
        return out

def train_model(args):
    # Set device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")

    # Create dummy data for example
    X_train = torch.randn(1000, 20)
    y_train = torch.randint(0, 3, (1000,))
    X_test = torch.randn(200, 20)
    y_test = torch.randint(0, 3, (200,))

    # Create data loaders
    train_dataset = TensorDataset(X_train, y_train)
    train_loader = DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True)

    # Initialize model
    model = SimpleNN(20, args.hidden_size, 3).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=args.learning_rate)

    # Training loop
    for epoch in range(args.epochs):
        model.train()
        running_loss = 0.0

        for i, (inputs, labels) in enumerate(train_loader):
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        avg_loss = running_loss / len(train_loader)
        print(f'Epoch [{epoch+1}/{args.epochs}], Loss: {avg_loss:.4f}')

    # Evaluate model
    model.eval()
    with torch.no_grad():
        X_test, y_test = X_test.to(device), y_test.to(device)
        outputs = model(X_test)
        _, predicted = torch.max(outputs.data, 1)
        accuracy = (predicted == y_test).sum().item() / y_test.size(0)
        print(f'Test Accuracy: {accuracy:.4f}')

    # Save model
    model_path = os.path.join(args.model_dir, 'model.pth')
    torch.save(model.state_dict(), model_path)

    # Save to GCS if specified
    if args.model_dir.startswith('gs://'):
        upload_to_gcs(model_path, args.model_dir)

    # Save metrics
    metrics = {
        'final_loss': avg_loss,
        'accuracy': accuracy,
        'epochs': args.epochs
    }

    with open('metrics.json', 'w') as f:
        json.dump(metrics, f)

def upload_to_gcs(local_path, gcs_path):
    """Upload file to Google Cloud Storage."""
    storage_client = storage.Client()
    bucket_name = gcs_path.split('/')[2]
    blob_name = '/'.join(gcs_path.split('/')[3:])

    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    blob.upload_from_filename(local_path)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--epochs', type=int, default=10)
    parser.add_argument('--batch-size', type=int, default=32)
    parser.add_argument('--learning-rate', type=float, default=0.001)
    parser.add_argument('--hidden-size', type=int, default=64)
    parser.add_argument('--model-dir', type=str, default='./model')

    args = parser.parse_args()
    train_model(args)
'''

        with open('train.py', 'w') as f:
            f.write(training_script)

        return 'train.py'

    def build_and_push_image(self, image_name):
        """Build and push Docker image to Artifact Registry."""
        # Build image
        image_tag = f"us-central1-docker.pkg.dev/{self.project_id}/vertex-ai/{image_name}:latest"

        print("Building Docker image...")
        self.docker_client.images.build(
            path=".",
            tag=image_tag,
            rm=True
        )

        # Push to Artifact Registry
        print("Pushing image to Artifact Registry...")
        self.docker_client.images.push(image_tag)

        return image_tag

    def submit_custom_container_job(self, display_name, image_uri):
        """Submit custom container training job."""
        job = aiplatform.CustomContainerTrainingJob(
            display_name=display_name,
            container_uri=image_uri,
            model_serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/pytorch-cpu.1-11:latest",
        )

        model = job.run(
            model_display_name=f"{display_name}_model",
            args=[
                "--epochs=20",
                "--batch-size=64",
                "--learning-rate=0.001",
                "--model-dir=gs://my-bucket/models"
            ],
            machine_type="n1-standard-4",
            accelerator_type="NVIDIA_TESLA_K80",
            accelerator_count=1,
        )

        return model

Vertex AI Pipelines

Building ML Pipelines

# vertex_pipelines.py
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import component, pipeline, Input, Output, Dataset, Model, Metrics
from google.cloud import aiplatform

@component(
    packages_to_install=["pandas", "scikit-learn"],
    base_image="python:3.9"
)
def load_data(
    data_path: str,
    output_dataset: Output[Dataset]
):
    """Load and preprocess data."""
    import pandas as pd

    # Load data
    df = pd.read_csv(data_path)

    # Basic preprocessing
    df = df.dropna()
    df = df.drop_duplicates()

    # Save processed data
    df.to_csv(output_dataset.path, index=False)

    # Log metadata
    output_dataset.metadata["num_rows"] = len(df)
    output_dataset.metadata["num_columns"] = len(df.columns)

@component(
    packages_to_install=["pandas", "scikit-learn"],
    base_image="python:3.9"
)
def train_model(
    dataset: Input[Dataset],
    model: Output[Model],
    metrics: Output[Metrics]
):
    """Train ML model."""
    import pandas as pd
    from sklearn.model_selection import train_test_split
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import accuracy_score, precision_score, recall_score
    import joblib

    # Load data
    df = pd.read_csv(dataset.path)

    # Prepare features and target
    X = df.drop('target', axis=1)
    y = df['target']

    # Split data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )

    # Train model
    clf = RandomForestClassifier(n_estimators=100, random_state=42)
    clf.fit(X_train, y_train)

    # Evaluate model
    y_pred = clf.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred, average='weighted')
    recall = recall_score(y_test, y_pred, average='weighted')

    # Save model
    joblib.dump(clf, model.path)

    # Log metrics
    metrics.log_metric("accuracy", accuracy)
    metrics.log_metric("precision", precision)
    metrics.log_metric("recall", recall)

@component(
    packages_to_install=["google-cloud-aiplatform"],
    base_image="python:3.9"
)
def deploy_model(
    model: Input[Model],
    project_id: str,
    location: str,
    endpoint_name: str
) -> str:
    """Deploy model to Vertex AI endpoint."""
    from google.cloud import aiplatform

    aiplatform.init(project=project_id, location=location)

    # Upload model
    vertex_model = aiplatform.Model.upload(
        display_name=f"{endpoint_name}_model",
        artifact_uri=model.uri,
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest"
    )

    # Create endpoint
    endpoint = aiplatform.Endpoint.create(display_name=endpoint_name)

    # Deploy model
    endpoint.deploy(
        model=vertex_model,
        deployed_model_display_name=vertex_model.display_name,
        machine_type="n1-standard-4",
        min_replica_count=1,
        max_replica_count=3
    )

    return endpoint.resource_name

@pipeline(
    name="ml-training-pipeline",
    description="End-to-end ML training pipeline"
)
def ml_pipeline(
    data_path: str,
    project_id: str,
    location: str = "us-central1"
):
    """Complete ML pipeline."""
    # Load and preprocess data
    data_task = load_data(data_path=data_path)

    # Train model
    train_task = train_model(dataset=data_task.outputs["output_dataset"])

    # Deploy model
    deploy_task = deploy_model(
        model=train_task.outputs["model"],
        project_id=project_id,
        location=location,
        endpoint_name="ml-pipeline-endpoint"
    )

class PipelineManager:
    """Manage Vertex AI Pipelines."""

    def __init__(self, project_id, location='us-central1'):
        self.project_id = project_id
        self.location = location
        aiplatform.init(project=project_id, location=location)

    def compile_pipeline(self, pipeline_func, output_path):
        """Compile pipeline to JSON."""
        compiler.Compiler().compile(
            pipeline_func=pipeline_func,
            package_path=output_path
        )

    def submit_pipeline(self, pipeline_path, display_name, parameters):
        """Submit pipeline for execution."""
        job = aiplatform.PipelineJob(
            display_name=display_name,
            template_path=pipeline_path,
            pipeline_root=f"gs://{self.project_id}-vertex-pipelines",
            parameter_values=parameters,
        )

        job.submit()
        return job

    def create_scheduled_pipeline(self, pipeline_path, schedule):
        """Create scheduled pipeline execution."""
        from google.cloud import scheduler_v1

        scheduler_client = scheduler_v1.CloudSchedulerClient()
        parent = f"projects/{self.project_id}/locations/{self.location}"

        job = {
            "name": f"{parent}/jobs/ml-pipeline-schedule",
            "description": "Scheduled ML pipeline execution",
            "schedule": schedule,  # e.g., "0 2 * * *" for daily at 2 AM
            "http_target": {
                "uri": f"https://{self.location}-aiplatform.googleapis.com/v1/projects/{self.project_id}/locations/{self.location}/pipelineJobs",
                "http_method": "POST",
                "headers": {
                    "Content-Type": "application/json",
                },
                "body": json.dumps({
                    "displayName": "scheduled-pipeline-run",
                    "templatePath": pipeline_path,
                    "pipelineRoot": f"gs://{self.project_id}-vertex-pipelines",
                }).encode()
            }
        }

        scheduler_client.create_job(parent=parent, job=job)

Model Deployment and Serving

Advanced Model Deployment

# model_deployment.py
from google.cloud import aiplatform
from google.cloud import monitoring_v3
import numpy as np

class ModelDeploymentManager:
    """Manage model deployment and serving."""

    def __init__(self, project_id, location='us-central1'):
        self.project_id = project_id
        self.location = location
        self.monitoring_client = monitoring_v3.MetricServiceClient()
        aiplatform.init(project=project_id, location=location)

    def deploy_with_traffic_split(self, models, endpoint_name, traffic_split):
        """Deploy multiple models with traffic splitting."""
        # Create endpoint
        endpoint = aiplatform.Endpoint.create(
            display_name=endpoint_name,
            description="Multi-model endpoint with traffic split"
        )

        # Deploy models with traffic split
        deployed_models = []
        for i, (model, traffic) in enumerate(zip(models, traffic_split)):
            deployed_model = model.deploy(
                endpoint=endpoint,
                deployed_model_display_name=f"{model.display_name}_v{i}",
                machine_type="n1-standard-4",
                min_replica_count=1,
                max_replica_count=5,
                traffic_percentage=traffic,
                sync=False
            )
            deployed_models.append(deployed_model)

        # Wait for all deployments
        for dm in deployed_models:
            dm.wait()

        return endpoint

    def create_batch_prediction_job(self, model, input_source, output_destination):
        """Create batch prediction job."""
        batch_prediction_job = model.batch_predict(
            job_display_name=f"{model.display_name}_batch_prediction",
            gcs_source=input_source,
            gcs_destination_prefix=output_destination,
            machine_type="n1-standard-4",
            max_replica_count=10,
            accelerator_type="NVIDIA_TESLA_K80",
            accelerator_count=1,
        )

        return batch_prediction_job

    def setup_model_monitoring(self, endpoint, model_name):
        """Setup model monitoring for drift detection."""
        # Create model monitoring job
        monitoring_job = aiplatform.ModelDeploymentMonitoringJob.create(
            display_name=f"{model_name}_monitoring",
            endpoint=endpoint,
            logging_sampling_strategy={"sample_rate": 0.1},
            predict_instance_schema_uri="gs://my-bucket/schemas/predict.yaml",
            analysis_instance_schema_uri="gs://my-bucket/schemas/analysis.yaml",
            objective_configs=[
                {
                    "deployed_model_id": model_name,
                    "objective_config": {
                        "training_dataset": {
                            "dataset": "bq://project.dataset.training_data",
                            "logging_sampling_strategy": {"sample_rate": 1.0},
                        },
                        "training_prediction_skew_detection_config": {
                            "skew_thresholds": {
                                "feature_1": {"value": 0.001},
                                "feature_2": {"value": 0.001},
                            }
                        },
                        "prediction_drift_detection_config": {
                            "drift_thresholds": {
                                "feature_1": {"value": 0.001},
                                "feature_2": {"value": 0.001},
                            }
                        }
                    }
                }
            ],
            alert_config={
                "user_emails": ["ml-team@company.com"],
                "enable_logging": True
            },
            schedule_config={
                "monitor_interval": {"seconds": 3600}  # Monitor every hour
            }
        )

        return monitoring_job

    def create_explanation_metadata(self):
        """Create explanation metadata for model interpretability."""
        explanation_metadata = {
            "inputs": {
                "feature_1": {
                    "input_tensor_name": "dense_input",
                    "encoding": "IDENTITY",
                    "modality": "numeric"
                },
                "feature_2": {
                    "input_tensor_name": "dense_input",
                    "encoding": "IDENTITY",
                    "modality": "numeric"
                }
            },
            "outputs": {
                "prediction": {
                    "output_tensor_name": "dense_2"
                }
            }
        }

        explanation_parameters = {
            "sampled_shapley_attribution": {
                "path_count": 10
            }
        }

        return explanation_metadata, explanation_parameters

    def online_predict_with_explanation(self, endpoint, instances):
        """Make predictions with explanations."""
        predictions = endpoint.predict(
            instances=instances,
            parameters={"generate_explanation": True}
        )

        # Parse results
        results = []
        for i, (pred, expl) in enumerate(zip(
            predictions.predictions,
            predictions.explanations
        )):
            results.append({
                "instance": instances[i],
                "prediction": pred,
                "feature_attributions": expl.attributions
            })

        return results

Custom Prediction Routine

# custom_predictor.py
from google.cloud import aiplatform
from typing import Dict, List, Any
import numpy as np
import joblib

class CustomPredictor:
    """Custom prediction routine for Vertex AI."""

    def __init__(self):
        self._model = None
        self._preprocessor = None

    def load(self, artifacts_path: str):
        """Load model artifacts."""
        self._model = joblib.load(f"{artifacts_path}/model.pkl")
        self._preprocessor = joblib.load(f"{artifacts_path}/preprocessor.pkl")

    def preprocess(self, instances: List[Dict[str, Any]]) -> np.ndarray:
        """Preprocess input data."""
        # Convert to numpy array
        data = np.array([[
            instance['feature1'],
            instance['feature2'],
            instance['feature3']
        ] for instance in instances])

        # Apply preprocessing
        return self._preprocessor.transform(data)

    def predict(self, instances: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Make predictions."""
        # Preprocess
        preprocessed = self.preprocess(instances)

        # Predict
        predictions = self._model.predict(preprocessed)
        probabilities = self._model.predict_proba(preprocessed)

        # Format results
        results = []
        for pred, probs in zip(predictions, probabilities):
            results.append({
                "prediction": int(pred),
                "probabilities": probs.tolist(),
                "confidence": float(np.max(probs))
            })

        return results

    def postprocess(self, predictions: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Postprocess predictions."""
        # Add business logic
        for pred in predictions:
            if pred['confidence'] < 0.7:
                pred['recommendation'] = 'Manual review recommended'
            else:
                pred['recommendation'] = 'Automated action approved'

        return predictions

# Deployment configuration
def deploy_custom_predictor(model_path, endpoint_name):
    """Deploy model with custom prediction routine."""
    # Package custom code
    from setuptools import setup, find_packages

    setup(
        name="custom_predictor",
        version="1.0",
        packages=find_packages(),
        install_requires=[
            "numpy",
            "scikit-learn",
            "joblib"
        ],
        python_requires=">=3.7",
    )

    # Upload model
    model = aiplatform.Model.upload(
        display_name=f"{endpoint_name}_model",
        artifact_uri=model_path,
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest",
        serving_container_predict_route="/predict",
        serving_container_health_route="/health",
        serving_container_environment_variables={
            "MODEL_NAME": "custom_model"
        },
        serving_container_command=["python", "-m", "custom_predictor"],
    )

    return model

MLOps Best Practices

Experiment Tracking

# experiment_tracking.py
from google.cloud import aiplatform
import mlflow
import mlflow.tensorflow
from datetime import datetime

class ExperimentTracker:
    """Track ML experiments with Vertex AI."""

    def __init__(self, project_id, location='us-central1'):
        self.project_id = project_id
        self.location = location
        aiplatform.init(project=project_id, location=location)

        # Setup MLflow with Vertex AI
        mlflow.set_tracking_uri(f"https://{location}-aiplatform.googleapis.com")
        mlflow.set_experiment(f"{project_id}-experiments")

    def track_experiment(self, experiment_name, params, metrics, artifacts):
        """Track experiment details."""
        experiment = aiplatform.Experiment.create(
            experiment_name=experiment_name,
            description=f"Experiment run on {datetime.now()}",
        )

        with experiment.start_run(run_name=f"run-{datetime.now().strftime('%Y%m%d-%H%M%S')}") as run:
            # Log parameters
            for key, value in params.items():
                run.log_params({key: value})

            # Log metrics
            for key, value in metrics.items():
                run.log_metrics({key: value})

            # Log artifacts
            for artifact_path in artifacts:
                run.log_artifact(artifact_path)

            # Log model
            if 'model_path' in artifacts:
                run.log_model(artifacts['model_path'], "model")

        return experiment

    def compare_experiments(self, experiment_names):
        """Compare multiple experiments."""
        experiments = []
        for name in experiment_names:
            exp = aiplatform.Experiment(experiment_name=name)
            experiments.append(exp)

        # Get all runs
        all_runs = []
        for exp in experiments:
            runs = exp.list_runs()
            all_runs.extend(runs)

        # Compare metrics
        comparison = []
        for run in all_runs:
            comparison.append({
                'experiment': run.experiment_name,
                'run_name': run.name,
                'metrics': run.get_metrics(),
                'params': run.get_params()
            })

        return pd.DataFrame(comparison)

    def register_best_model(self, experiment_name, metric_name='accuracy'):
        """Register best model from experiment."""
        experiment = aiplatform.Experiment(experiment_name=experiment_name)

        # Find best run
        best_run = None
        best_metric = -float('inf')

        for run in experiment.list_runs():
            metrics = run.get_metrics()
            if metric_name in metrics and metrics[metric_name] > best_metric:
                best_metric = metrics[metric_name]
                best_run = run

        if best_run:
            # Register model
            model = aiplatform.Model.upload(
                display_name=f"{experiment_name}_best_model",
                artifact_uri=best_run.get_artifact_uri("model"),
                serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-8:latest",
            )

            # Add metadata
            model.update(
                labels={
                    "experiment": experiment_name,
                    "metric": metric_name,
                    "value": str(best_metric)
                }
            )

            return model

Model Registry and Versioning

# model_registry.py
from google.cloud import aiplatform
from typing import Dict, List
import hashlib

class ModelRegistry:
    """Manage model versions and registry."""

    def __init__(self, project_id, location='us-central1'):
        self.project_id = project_id
        self.location = location
        aiplatform.init(project=project_id, location=location)

    def register_model(self, model_name, artifact_uri, framework, version=None):
        """Register new model version."""
        # Generate version if not provided
        if not version:
            version = self._generate_version(artifact_uri)

        # Upload model
        model = aiplatform.Model.upload(
            display_name=f"{model_name}_v{version}",
            artifact_uri=artifact_uri,
            serving_container_image_uri=self._get_container_image(framework),
            labels={
                "model_name": model_name,
                "version": version,
                "framework": framework,
                "stage": "staging"
            }
        )

        return model

    def promote_model(self, model_id, stage='production'):
        """Promote model to different stage."""
        model = aiplatform.Model(model_id)

        # Update labels
        labels = model.labels.copy()
        labels['stage'] = stage
        labels['promoted_at'] = datetime.now().isoformat()

        model.update(labels=labels)

        # If promoting to production, update the production endpoint
        if stage == 'production':
            self._update_production_endpoint(model)

        return model

    def get_model_lineage(self, model_name):
        """Get model lineage and versions."""
        # List all models with the name
        models = aiplatform.Model.list(
            filter=f'labels.model_name="{model_name}"',
            order_by="create_time desc"
        )

        lineage = []
        for model in models:
            lineage.append({
                'model_id': model.resource_name,
                'version': model.labels.get('version', 'unknown'),
                'stage': model.labels.get('stage', 'none'),
                'created': model.create_time,
                'metrics': self._get_model_metrics(model)
            })

        return lineage

    def _generate_version(self, artifact_uri):
        """Generate version hash from artifact."""
        return hashlib.sha256(artifact_uri.encode()).hexdigest()[:8]

    def _get_container_image(self, framework):
        """Get appropriate serving container image."""
        container_images = {
            'tensorflow': 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-8:latest',
            'pytorch': 'us-docker.pkg.dev/vertex-ai/prediction/pytorch-cpu.1-11:latest',
            'sklearn': 'us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest',
            'xgboost': 'us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-6:latest',
        }

        return container_images.get(framework, container_images['sklearn'])

    def _update_production_endpoint(self, model):
        """Update production endpoint with new model."""
        # Find production endpoint
        endpoints = aiplatform.Endpoint.list(
            filter='labels.environment="production"'
        )

        if endpoints:
            endpoint = endpoints[0]

            # Undeploy old models
            for deployed_model in endpoint.list_models():
                if deployed_model.display_name != model.display_name:
                    endpoint.undeploy(deployed_model_id=deployed_model.id)

            # Deploy new model
            model.deploy(
                endpoint=endpoint,
                deployed_model_display_name=model.display_name,
                machine_type="n1-standard-4",
                min_replica_count=2,
                max_replica_count=10,
                traffic_percentage=100
            )

Integration with Other Services

BigQuery ML Integration

# bigquery_ml_integration.py
from google.cloud import bigquery
from google.cloud import aiplatform

class BigQueryMLIntegration:
    """Integrate BigQuery ML with Vertex AI."""

    def __init__(self, project_id):
        self.project_id = project_id
        self.bq_client = bigquery.Client(project=project_id)
        aiplatform.init(project=project_id)

    def export_bqml_model_to_vertex(self, bq_model_ref, vertex_model_name):
        """Export BigQuery ML model to Vertex AI."""
        # Export model from BigQuery
        export_job = self.bq_client.extract_model(
            bq_model_ref,
            f"gs://{self.project_id}-vertex-models/{vertex_model_name}",
        )

        export_job.result()  # Wait for export

        # Import to Vertex AI
        model = aiplatform.Model.upload(
            display_name=vertex_model_name,
            artifact_uri=f"gs://{self.project_id}-vertex-models/{vertex_model_name}",
            serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-8:latest",
        )

        return model

    def create_feature_engineering_pipeline(self, query, feature_table):
        """Create feature engineering pipeline."""
        # Create feature engineering query
        feature_query = f"""
        CREATE OR REPLACE TABLE `{self.project_id}.{feature_table}` AS
        {query}
        """

        # Execute query
        query_job = self.bq_client.query(feature_query)
        query_job.result()

        # Create Vertex AI dataset from BigQuery
        dataset = aiplatform.TabularDataset.create(
            display_name=f"{feature_table}_dataset",
            bq_source=f"bq://{self.project_id}.{feature_table}",
        )

        return dataset

Best Practices and Production Tips

Production ML Pipeline

# production_ml_pipeline.py
class ProductionMLPipeline:
    """Production-ready ML pipeline with best practices."""

    def __init__(self, project_id, location='us-central1'):
        self.project_id = project_id
        self.location = location
        self.setup_monitoring()

    def setup_monitoring(self):
        """Setup comprehensive monitoring."""
        # Create monitoring dashboard
        dashboard_config = {
            "displayName": "ML Pipeline Dashboard",
            "mosaicLayout": {
                "tiles": [
                    {
                        "widget": {
                            "title": "Model Predictions",
                            "xyChart": {
                                "dataSets": [{
                                    "timeSeriesQuery": {
                                        "timeSeriesFilter": {
                                            "filter": 'resource.type="aiplatform.googleapis.com/Endpoint"'
                                        }
                                    }
                                }]
                            }
                        }
                    },
                    {
                        "widget": {
                            "title": "Model Latency",
                            "xyChart": {
                                "dataSets": [{
                                    "timeSeriesQuery": {
                                        "timeSeriesFilter": {
                                            "filter": 'metric.type="aiplatform.googleapis.com/prediction/latency"'
                                        }
                                    }
                                }]
                            }
                        }
                    }
                ]
            }
        }

        return dashboard_config

    def implement_model_governance(self):
        """Implement model governance policies."""
        governance_policies = {
            "model_approval": {
                "required_metrics": ["accuracy", "precision", "recall"],
                "minimum_thresholds": {
                    "accuracy": 0.85,
                    "precision": 0.80,
                    "recall": 0.80
                },
                "required_tests": [
                    "bias_detection",
                    "fairness_check",
                    "performance_validation"
                ]
            },
            "deployment_gates": {
                "stages": ["dev", "staging", "production"],
                "approval_required": ["staging", "production"],
                "automated_tests": True
            },
            "monitoring_requirements": {
                "drift_detection": True,
                "performance_monitoring": True,
                "data_quality_checks": True
            }
        }

        return governance_policies

    def create_ci_cd_pipeline(self):
        """Create CI/CD pipeline for ML."""
        cloudbuild_config = """
steps:
  # Run unit tests
  - name: 'python:3.9'
    entrypoint: 'pytest'
    args: ['tests/']

  # Validate data
  - name: 'gcr.io/$PROJECT_ID/ml-validator'
    args: ['--data-path', 'gs://$PROJECT_ID-data/']

  # Train model
  - name: 'gcr.io/deeplearning-platform-release/tf2-gpu.2-8'
    args: ['python', 'train.py']

  # Evaluate model
  - name: 'gcr.io/$PROJECT_ID/ml-evaluator'
    args: ['--model-path', 'gs://$PROJECT_ID-models/']

  # Deploy if tests pass
  - name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
    entrypoint: 'gcloud'
    args: [
      'ai', 'models', 'upload',
      '--region', 'us-central1',
      '--display-name', 'production-model',
      '--artifact-uri', 'gs://$PROJECT_ID-models/'
    ]

timeout: '3600s'
options:
  machineType: 'E2_HIGHCPU_8'
"""

        return cloudbuild_config

Conclusion

Vertex AI provides a comprehensive platform for building, deploying, and managing machine learning models at scale. Key benefits:

  1. Unified Platform: All ML tools and services in one place
  2. Flexibility: Support for AutoML and custom training
  3. Scalability: Leverage Google's infrastructure
  4. MLOps Integration: End-to-end ML lifecycle management
  5. Pre-trained Models: Access to Google's AI capabilities

Best Practices Summary

  • Start with AutoML: For quick prototyping and baseline models
  • Use Pipelines: Automate and reproduce ML workflows
  • Implement Monitoring: Track model performance and drift
  • Version Everything: Models, data, and code
  • Security First: Use IAM, VPC, and encryption
  • Cost Optimization: Use preemptible instances for training
  • Documentation: Document models, features, and decisions

Next Steps

  • Explore Vertex AI Feature Store for feature management
  • Learn about Vertex AI Matching Engine for similarity search
  • Study Vertex AI Model Monitoring for production deployments
  • Implement Vertex AI Experiments for systematic experimentation
  • Get certified as a Google Cloud ML Engineer

Remember: Vertex AI simplifies ML development, but following MLOps best practices ensures your models are production-ready, maintainable, and deliver business value.