Google Cloud Run: Serverless Container Deployment Guide

Tyler Maginnis | February 06, 2024

Google CloudCloud RuncontainersserverlessDocker

Need Professional Google Cloud Services?

Get expert assistance with your google cloud services implementation and management. Tyler on Tech Louisville provides priority support for Louisville businesses.

Same-day service available for Louisville area

Google Cloud Run: Serverless Container Deployment Guide

Google Cloud Run is a fully managed compute platform that automatically scales your stateless containers. This guide covers everything from basic deployments to advanced patterns for building production-ready serverless applications.

Why Cloud Run?

Cloud Run excels with: - Any Language, Any Library: Deploy containers with your favorite stack - Automatic Scaling: Scale to zero or thousands of instances - Pay Per Use: Billed only for actual request time - Fully Managed: No infrastructure to manage - Portable: Built on Knative, runs anywhere

Getting Started with Cloud Run

Your First Deployment

# Dockerfile
FROM node:18-alpine

WORKDIR /app

# Copy package files
COPY package*.json ./
RUN npm ci --only=production

# Copy application files
COPY . .

# Create non-root user
RUN addgroup -g 1001 -S nodejs
RUN adduser -S nodejs -u 1001
USER nodejs

# Expose port (Cloud Run sets PORT env variable)
EXPOSE 8080

# Start the application
CMD ["node", "server.js"]
// server.js
const express = require('express');
const app = express();

// Cloud Run sets the PORT environment variable
const port = process.env.PORT || 8080;

// Health check endpoint
app.get('/', (req, res) => {
  res.json({
    message: 'Hello from Cloud Run!',
    revision: process.env.K_REVISION || 'unknown',
    service: process.env.K_SERVICE || 'unknown'
  });
});

// Graceful shutdown
process.on('SIGTERM', () => {
  console.log('SIGTERM signal received: closing HTTP server');
  server.close(() => {
    console.log('HTTP server closed');
  });
});

const server = app.listen(port, () => {
  console.log(`Server listening on port ${port}`);
});

Deploy the service:

# Build and push container
gcloud builds submit --tag gcr.io/PROJECT_ID/hello-cloudrun

# Deploy to Cloud Run
gcloud run deploy hello-cloudrun \
  --image gcr.io/PROJECT_ID/hello-cloudrun \
  --platform managed \
  --region us-central1 \
  --allow-unauthenticated \
  --memory 512Mi \
  --cpu 1 \
  --timeout 60 \
  --concurrency 80 \
  --max-instances 100

Advanced Container Patterns

Multi-Stage Builds

# Multi-stage Dockerfile for Go application
FROM golang:1.19-alpine AS builder

WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download

COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o main .

# Final stage
FROM alpine:latest

RUN apk --no-cache add ca-certificates
WORKDIR /root/

# Copy binary from builder
COPY --from=builder /app/main .

# Run as non-root user
RUN addgroup -g 1001 -S appuser && adduser -S appuser -u 1001
USER appuser

EXPOSE 8080
CMD ["./main"]

Python Application with Dependencies

# Dockerfile for Python application
FROM python:3.11-slim

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    python3-dev \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app

# Copy requirements first for better caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Create non-root user
RUN useradd -m -u 1001 appuser
USER appuser

# Gunicorn for production
CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app
# main.py
import os
import logging
from flask import Flask, request, jsonify
from google.cloud import firestore
from google.cloud import storage
import asyncio
import aiohttp

app = Flask(__name__)
logging.basicConfig(level=logging.INFO)

# Initialize clients
db = firestore.Client()
storage_client = storage.Client()

@app.route('/', methods=['GET'])
def hello():
    """Health check endpoint."""
    return jsonify({
        'status': 'healthy',
        'service': os.environ.get('K_SERVICE', 'unknown'),
        'revision': os.environ.get('K_REVISION', 'unknown'),
        'project': os.environ.get('GOOGLE_CLOUD_PROJECT', 'unknown')
    })

@app.route('/process', methods=['POST'])
async def process_data():
    """Process incoming data asynchronously."""
    try:
        data = request.get_json()

        # Validate input
        if not data or 'url' not in data:
            return jsonify({'error': 'Missing required field: url'}), 400

        # Process asynchronously
        result = await process_url(data['url'])

        # Store result in Firestore
        doc_ref = db.collection('results').document()
        doc_ref.set({
            'url': data['url'],
            'result': result,
            'timestamp': firestore.SERVER_TIMESTAMP
        })

        return jsonify({
            'id': doc_ref.id,
            'status': 'processed',
            'result': result
        })

    except Exception as e:
        logging.error(f"Processing error: {e}")
        return jsonify({'error': str(e)}), 500

async def process_url(url):
    """Process URL with async HTTP client."""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            content = await response.text()
            return {
                'status': response.status,
                'content_length': len(content),
                'headers': dict(response.headers)
            }

@app.route('/upload', methods=['POST'])
def handle_upload():
    """Handle file uploads to Cloud Storage."""
    if 'file' not in request.files:
        return jsonify({'error': 'No file provided'}), 400

    file = request.files['file']
    if file.filename == '':
        return jsonify({'error': 'No file selected'}), 400

    # Upload to Cloud Storage
    bucket_name = os.environ.get('STORAGE_BUCKET', 'my-uploads')
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(f"uploads/{file.filename}")

    blob.upload_from_string(
        file.read(),
        content_type=file.content_type
    )

    return jsonify({
        'filename': file.filename,
        'url': blob.public_url,
        'size': blob.size
    })

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))

Cloud Run Configuration

Advanced Service Configuration

# service.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: advanced-service
  annotations:
    run.googleapis.com/launch-stage: GA
spec:
  template:
    metadata:
      annotations:
        # Autoscaling configuration
        autoscaling.knative.dev/minScale: "1"
        autoscaling.knative.dev/maxScale: "1000"

        # CPU allocation
        run.googleapis.com/cpu-throttling: "false"
        run.googleapis.com/execution-environment: "gen2"

        # VPC connector for private resources
        run.googleapis.com/vpc-access-connector: projects/PROJECT_ID/locations/us-central1/connectors/my-connector
        run.googleapis.com/vpc-access-egress: private-ranges-only

        # Session affinity
        run.googleapis.com/sessionAffinity: "true"
    spec:
      containerConcurrency: 1000
      timeoutSeconds: 900
      serviceAccountName: cloudrun-sa@PROJECT_ID.iam.gserviceaccount.com
      containers:
      - image: gcr.io/PROJECT_ID/advanced-service
        ports:
        - name: http1
          containerPort: 8080
        env:
        - name: ENVIRONMENT
          value: "production"
        - name: DB_CONNECTION
          valueFrom:
            secretKeyRef:
              name: db-connection
              key: latest
        resources:
          limits:
            cpu: "4"
            memory: "8Gi"
        livenessProbe:
          httpGet:
            path: /health
          initialDelaySeconds: 10
          periodSeconds: 10
        startupProbe:
          httpGet:
            path: /ready
          initialDelaySeconds: 0
          periodSeconds: 1
          failureThreshold: 30

Deploy with configuration:

gcloud run services replace service.yaml --region us-central1

Environment-Specific Configurations

# config_manager.py
import os
from google.cloud import secretmanager

class ConfigManager:
    """Manage Cloud Run configurations."""

    def __init__(self):
        self.project_id = os.environ.get('GOOGLE_CLOUD_PROJECT')
        self.secret_client = secretmanager.SecretManagerServiceClient()
        self._cache = {}

    def get_config(self):
        """Get environment-specific configuration."""
        env = os.environ.get('ENVIRONMENT', 'development')

        base_config = {
            'app_name': 'cloud-run-app',
            'log_level': 'INFO',
            'max_workers': 4,
            'cache_ttl': 300
        }

        env_configs = {
            'development': {
                'debug': True,
                'database_url': 'sqlite:///dev.db',
                'cache_enabled': False
            },
            'staging': {
                'debug': False,
                'database_url': self.get_secret('staging-db-url'),
                'cache_enabled': True,
                'redis_url': self.get_secret('staging-redis-url')
            },
            'production': {
                'debug': False,
                'database_url': self.get_secret('prod-db-url'),
                'cache_enabled': True,
                'redis_url': self.get_secret('prod-redis-url'),
                'cdn_url': 'https://cdn.example.com'
            }
        }

        config = base_config.copy()
        config.update(env_configs.get(env, {}))

        return config

    def get_secret(self, secret_id, version='latest'):
        """Get secret from Secret Manager with caching."""
        cache_key = f"{secret_id}:{version}"

        if cache_key in self._cache:
            return self._cache[cache_key]

        name = f"projects/{self.project_id}/secrets/{secret_id}/versions/{version}"
        response = self.secret_client.access_secret_version(request={"name": name})
        secret_value = response.payload.data.decode('UTF-8')

        self._cache[cache_key] = secret_value
        return secret_value

Traffic Management

Blue-Green Deployments

#!/bin/bash
# blue_green_deploy.sh

SERVICE_NAME="my-service"
REGION="us-central1"
IMAGE="gcr.io/PROJECT_ID/my-service:$1"

# Deploy new revision with no traffic
gcloud run deploy $SERVICE_NAME \
  --image $IMAGE \
  --region $REGION \
  --no-traffic \
  --tag green

# Test the green deployment
GREEN_URL=$(gcloud run services describe $SERVICE_NAME \
  --region $REGION \
  --format 'value(status.url)')

echo "Testing green deployment at: $GREEN_URL"
curl -H "Authorization: Bearer $(gcloud auth print-identity-token)" \
  "$GREEN_URL"

# If tests pass, switch traffic
read -p "Switch traffic to green? (y/n) " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]
then
  gcloud run services update-traffic $SERVICE_NAME \
    --region $REGION \
    --to-latest

  echo "Traffic switched to green deployment"
else
  echo "Deployment cancelled, removing green revision"
  gcloud run revisions delete green --region $REGION --quiet
fi

Canary Deployments

# canary_manager.py
from google.cloud import run_v2
import time

class CanaryManager:
    """Manage canary deployments for Cloud Run."""

    def __init__(self, project_id, region):
        self.project_id = project_id
        self.region = region
        self.services_client = run_v2.ServicesClient()
        self.revisions_client = run_v2.RevisionsClient()

    def deploy_canary(self, service_name, new_image, canary_percentage=10):
        """Deploy new revision as canary."""
        service_path = f"projects/{self.project_id}/locations/{self.region}/services/{service_name}"

        # Get current service
        service = self.services_client.get_service(name=service_path)

        # Update with new image
        service.template.containers[0].image = new_image
        service.template.revision = f"{service_name}-canary-{int(time.time())}"

        # Deploy without traffic
        updated_service = self.services_client.update_service(
            service=service,
            allow_missing=True
        )

        # Get revision names
        canary_revision = updated_service.template.revision
        stable_revision = self.get_stable_revision(service_name)

        # Split traffic
        self.split_traffic(
            service_name,
            {
                stable_revision: 100 - canary_percentage,
                canary_revision: canary_percentage
            }
        )

        return canary_revision

    def monitor_canary(self, service_name, canary_revision, metrics_threshold):
        """Monitor canary deployment metrics."""
        # This would integrate with Cloud Monitoring
        # to check error rates, latency, etc.

        metrics = self.get_revision_metrics(canary_revision)

        if metrics['error_rate'] > metrics_threshold['max_error_rate']:
            print(f"Canary failed: error rate {metrics['error_rate']}%")
            self.rollback_canary(service_name, canary_revision)
            return False

        if metrics['p99_latency'] > metrics_threshold['max_p99_latency']:
            print(f"Canary failed: P99 latency {metrics['p99_latency']}ms")
            self.rollback_canary(service_name, canary_revision)
            return False

        return True

    def promote_canary(self, service_name, canary_revision):
        """Promote canary to receive all traffic."""
        self.split_traffic(service_name, {canary_revision: 100})
        print(f"Canary {canary_revision} promoted to 100% traffic")

    def progressive_rollout(self, service_name, canary_revision, stages):
        """Progressive traffic rollout."""
        for percentage in stages:
            stable_revision = self.get_stable_revision(service_name)

            self.split_traffic(
                service_name,
                {
                    stable_revision: 100 - percentage,
                    canary_revision: percentage
                }
            )

            print(f"Canary at {percentage}% traffic")
            time.sleep(300)  # Wait 5 minutes between stages

            if not self.monitor_canary(service_name, canary_revision, {}):
                return False

        return True

Authentication and Security

Service-to-Service Authentication

# auth_client.py
import google.auth
import google.auth.transport.requests
from google.oauth2 import service_account
import requests

class CloudRunAuthClient:
    """Handle authentication for Cloud Run services."""

    def __init__(self, target_audience=None):
        self.target_audience = target_audience
        self._auth_session = None

    def get_auth_session(self):
        """Get authenticated session for service-to-service calls."""
        if self._auth_session:
            return self._auth_session

        # Get credentials
        credentials, project = google.auth.default()

        # Create authenticated session
        auth_req = google.auth.transport.requests.Request()
        credentials.refresh(auth_req)

        self._auth_session = requests.Session()
        self._auth_session.headers.update({
            'Authorization': f'Bearer {credentials.token}'
        })

        return self._auth_session

    def call_service(self, service_url, method='GET', **kwargs):
        """Call another Cloud Run service with authentication."""
        session = self.get_auth_session()

        # Get ID token for the target service
        auth_req = google.auth.transport.requests.Request()
        id_token = google.oauth2.id_token.fetch_id_token(
            auth_req,
            self.target_audience or service_url
        )

        # Make authenticated request
        headers = kwargs.get('headers', {})
        headers['Authorization'] = f'Bearer {id_token}'
        kwargs['headers'] = headers

        response = session.request(method, service_url, **kwargs)
        response.raise_for_status()

        return response

    def verify_jwt_token(self, token):
        """Verify JWT token from incoming request."""
        import jwt
        from jwt import PyJWKClient

        # Google's public key URL
        jwks_url = "https://www.googleapis.com/oauth2/v3/certs"
        jwks_client = PyJWKClient(jwks_url)

        try:
            # Decode and verify token
            signing_key = jwks_client.get_signing_key_from_jwt(token)
            decoded = jwt.decode(
                token,
                signing_key.key,
                algorithms=["RS256"],
                audience=self.target_audience,
                issuer="https://accounts.google.com"
            )

            return decoded

        except jwt.InvalidTokenError as e:
            raise ValueError(f"Invalid token: {e}")

API Gateway Integration

# api_gateway_config.yaml
swagger: "2.0"
info:
  title: "Cloud Run API Gateway"
  description: "API Gateway for Cloud Run services"
  version: "1.0.0"
schemes:
  - "https"
produces:
  - "application/json"
x-google-backend:
  address: https://hello-cloudrun-xxx-uc.a.run.app
  protocol: h2

paths:
  /public/hello:
    get:
      summary: "Public endpoint"
      operationId: "getPublicHello"
      responses:
        '200':
          description: "Success"

  /private/data:
    get:
      summary: "Private endpoint requiring authentication"
      operationId: "getPrivateData"
      security:
        - google_id_token: []
      responses:
        '200':
          description: "Success"
        '401':
          description: "Unauthorized"

  /api/process:
    post:
      summary: "Process data with rate limiting"
      operationId: "processData"
      x-google-quota:
        metricCosts:
          "process-requests": 1
      parameters:
        - in: body
          name: body
          required: true
          schema:
            type: object
            properties:
              data:
                type: string
      responses:
        '200':
          description: "Success"

securityDefinitions:
  google_id_token:
    authorizationUrl: ""
    flow: "implicit"
    type: "oauth2"
    x-google-issuer: "https://accounts.google.com"
    x-google-audiences: "YOUR-CLIENT-ID"

x-google-management:
  metrics:
    - name: "process-requests"
      displayName: "Process API requests"
      valueType: INT64
      metricKind: DELTA
  quota:
    limits:
      - name: "process-limit"
        metric: "process-requests"
        unit: "1/min/{project}"
        values:
          STANDARD: 100

Performance Optimization

Cold Start Optimization

# optimized_app.py
import os
import time
from flask import Flask, g
import redis
from google.cloud import firestore

# Global initialization - happens once per container
print(f"Cold start initialization at {time.time()}")

# Initialize expensive resources globally
app = Flask(__name__)
db = None
cache = None

def get_db():
    """Lazy load Firestore client."""
    global db
    if db is None:
        db = firestore.Client()
    return db

def get_cache():
    """Lazy load Redis client."""
    global cache
    if cache is None and os.environ.get('REDIS_URL'):
        cache = redis.from_url(os.environ['REDIS_URL'])
    return cache

# Minimize imports in request handlers
@app.before_request
def before_request():
    """Set up request context."""
    g.start_time = time.time()
    g.request_id = request.headers.get('X-Cloud-Trace-Context', 'unknown')

@app.after_request
def after_request(response):
    """Log request duration."""
    if hasattr(g, 'start_time'):
        duration = (time.time() - g.start_time) * 1000
        print(f"Request {g.request_id} took {duration:.2f}ms")
    return response

@app.route('/optimized', methods=['GET'])
def optimized_handler():
    """Optimized request handler."""
    # Use lazy-loaded resources
    db_client = get_db()
    cache_client = get_cache()

    # Check cache first
    if cache_client:
        cached = cache_client.get('result')
        if cached:
            return {'result': cached.decode(), 'cached': True}

    # Perform operation
    result = perform_operation(db_client)

    # Cache result
    if cache_client:
        cache_client.setex('result', 300, result)

    return {'result': result, 'cached': False}

# Use connection pooling for external services
from urllib3 import PoolManager
http = PoolManager(maxsize=10)

@app.route('/external', methods=['GET'])
def call_external():
    """Call external service with connection pooling."""
    response = http.request('GET', 'https://api.example.com/data')
    return {'data': response.data.decode()}

# Implement graceful shutdown
import signal
import sys

def signal_handler(sig, frame):
    print('Graceful shutdown initiated')
    # Clean up resources
    if cache:
        cache.close()
    sys.exit(0)

signal.signal(signal.SIGTERM, signal_handler)

if __name__ == '__main__':
    port = int(os.environ.get('PORT', 8080))
    app.run(host='0.0.0.0', port=port)

Concurrency Optimization

// main.go - High-concurrency Go service
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "os"
    "os/signal"
    "runtime"
    "sync"
    "syscall"
    "time"

    "cloud.google.com/go/firestore"
    "github.com/go-redis/redis/v8"
)

var (
    firestoreClient *firestore.Client
    redisClient     *redis.Client
    clientPool      sync.Pool
)

func init() {
    // Set GOMAXPROCS to match container CPU
    runtime.GOMAXPROCS(runtime.NumCPU())

    // Initialize Firestore
    ctx := context.Background()
    var err error
    firestoreClient, err = firestore.NewClient(ctx, os.Getenv("GOOGLE_CLOUD_PROJECT"))
    if err != nil {
        log.Fatalf("Failed to create Firestore client: %v", err)
    }

    // Initialize Redis with connection pool
    redisClient = redis.NewClient(&redis.Options{
        Addr:         os.Getenv("REDIS_URL"),
        PoolSize:     10 * runtime.NumCPU(),
        MinIdleConns: 5,
        MaxRetries:   3,
    })

    // Initialize object pool for request processing
    clientPool = sync.Pool{
        New: func() interface{} {
            return &RequestProcessor{}
        },
    }
}

type RequestProcessor struct {
    Buffer []byte
}

func (p *RequestProcessor) Reset() {
    p.Buffer = p.Buffer[:0]
}

func main() {
    mux := http.NewServeMux()

    // Endpoints
    mux.HandleFunc("/", healthCheck)
    mux.HandleFunc("/process", handleProcess)
    mux.HandleFunc("/batch", handleBatch)

    // Server configuration
    srv := &http.Server{
        Addr:         ":" + os.Getenv("PORT"),
        Handler:      mux,
        ReadTimeout:  10 * time.Second,
        WriteTimeout: 10 * time.Second,
        IdleTimeout:  120 * time.Second,
    }

    // Start server
    go func() {
        log.Printf("Server starting on port %s", os.Getenv("PORT"))
        if err := srv.ListenAndServe(); err != http.ErrServerClosed {
            log.Fatalf("ListenAndServe: %v", err)
        }
    }()

    // Graceful shutdown
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit

    log.Println("Shutting down server...")
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    if err := srv.Shutdown(ctx); err != nil {
        log.Fatal("Server forced to shutdown:", err)
    }

    log.Println("Server exited")
}

func handleProcess(w http.ResponseWriter, r *http.Request) {
    // Get processor from pool
    processor := clientPool.Get().(*RequestProcessor)
    defer func() {
        processor.Reset()
        clientPool.Put(processor)
    }()

    // Process request concurrently
    ctx := r.Context()
    resultChan := make(chan interface{}, 2)
    errChan := make(chan error, 2)

    // Parallel operations
    go func() {
        // Check cache
        val, err := redisClient.Get(ctx, "key").Result()
        if err == nil {
            resultChan <- val
        } else {
            errChan <- err
        }
    }()

    go func() {
        // Query Firestore
        doc, err := firestoreClient.Collection("data").Doc("doc").Get(ctx)
        if err == nil {
            resultChan <- doc.Data()
        } else {
            errChan <- err
        }
    }()

    // Wait for first result
    select {
    case result := <-resultChan:
        json.NewEncoder(w).Encode(map[string]interface{}{
            "result": result,
        })
    case err := <-errChan:
        http.Error(w, err.Error(), http.StatusInternalServerError)
    case <-ctx.Done():
        http.Error(w, "Request timeout", http.StatusRequestTimeout)
    }
}

func handleBatch(w http.ResponseWriter, r *http.Request) {
    // Handle batch processing with worker pool
    var requests []BatchRequest
    if err := json.NewDecoder(r.Body).Decode(&requests); err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    // Process in parallel with limited concurrency
    results := make([]BatchResult, len(requests))
    var wg sync.WaitGroup
    semaphore := make(chan struct{}, 10) // Limit to 10 concurrent operations

    for i, req := range requests {
        wg.Add(1)
        go func(idx int, request BatchRequest) {
            defer wg.Done()

            semaphore <- struct{}{}        // Acquire
            defer func() { <-semaphore }() // Release

            result := processBatchItem(request)
            results[idx] = result
        }(i, req)
    }

    wg.Wait()

    json.NewEncoder(w).Encode(results)
}

Observability and Monitoring

Structured Logging

# logging_config.py
import os
import json
import logging
from pythonjsonlogger import jsonlogger
from google.cloud import logging as cloud_logging

class CloudRunLogger:
    """Configure structured logging for Cloud Run."""

    def __init__(self, service_name=None):
        self.service_name = service_name or os.environ.get('K_SERVICE', 'unknown')
        self.revision = os.environ.get('K_REVISION', 'unknown')
        self.setup_logging()

    def setup_logging(self):
        """Setup structured logging."""
        # Use Cloud Logging in production
        if os.environ.get('K_SERVICE'):
            client = cloud_logging.Client()
            client.setup_logging()

        # Configure JSON formatter
        logHandler = logging.StreamHandler()
        formatter = jsonlogger.JsonFormatter(
            fmt='%(timestamp)s %(severity)s %(message)s',
            rename_fields={
                'timestamp': 'time',
                'severity': 'severity'
            }
        )
        logHandler.setFormatter(formatter)

        # Set handler
        logger = logging.getLogger()
        logger.handlers = [logHandler]
        logger.setLevel(logging.INFO)

        # Add Cloud Run metadata to all logs
        old_factory = logging.getLogRecordFactory()

        def record_factory(*args, **kwargs):
            record = old_factory(*args, **kwargs)
            record.service = self.service_name
            record.revision = self.revision

            # Add trace context if available
            trace_header = request.headers.get('X-Cloud-Trace-Context')
            if trace_header:
                trace = trace_header.split('/')[0]
                record.trace = f"projects/{os.environ.get('GOOGLE_CLOUD_PROJECT')}/traces/{trace}"

            return record

        logging.setLogRecordFactory(record_factory)

    def log_request(self, request, response, duration):
        """Log HTTP request with metadata."""
        logging.info(
            "HTTP Request",
            extra={
                'httpRequest': {
                    'requestMethod': request.method,
                    'requestUrl': request.url,
                    'status': response.status_code,
                    'userAgent': request.headers.get('User-Agent'),
                    'remoteIp': request.remote_addr,
                    'latency': f"{duration}s"
                }
            }
        )

    def log_error(self, error, context=None):
        """Log error with context."""
        logging.error(
            f"Error: {str(error)}",
            extra={
                'error': {
                    'type': type(error).__name__,
                    'message': str(error),
                    'stacktrace': traceback.format_exc()
                },
                'context': context or {}
            },
            exc_info=True
        )

Custom Metrics

# metrics.py
from google.cloud import monitoring_v3
import time
from functools import wraps

class CloudRunMetrics:
    """Custom metrics for Cloud Run services."""

    def __init__(self, project_id):
        self.client = monitoring_v3.MetricServiceClient()
        self.project_name = f"projects/{project_id}"
        self.resource = {
            "type": "cloud_run_revision",
            "labels": {
                "service_name": os.environ.get('K_SERVICE', 'unknown'),
                "revision_name": os.environ.get('K_REVISION', 'unknown'),
                "location": os.environ.get('K_REGION', 'unknown')
            }
        }

    def record_metric(self, metric_name, value, metric_type="custom.googleapis.com"):
        """Record a custom metric."""
        series = monitoring_v3.TimeSeries()
        series.metric.type = f"{metric_type}/{metric_name}"
        series.resource.type = self.resource["type"]
        series.resource.labels.update(self.resource["labels"])

        now = time.time()
        seconds = int(now)
        nanos = int((now - seconds) * 10 ** 9)
        interval = monitoring_v3.TimeInterval(
            {"end_time": {"seconds": seconds, "nanos": nanos}}
        )
        point = monitoring_v3.Point({
            "interval": interval,
            "value": {"double_value": value}
        })
        series.points = [point]

        self.client.create_time_series(
            name=self.project_name,
            time_series=[series]
        )

    def timer(self, metric_name):
        """Decorator to time function execution."""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                start_time = time.time()
                try:
                    result = func(*args, **kwargs)
                    duration = (time.time() - start_time) * 1000  # ms
                    self.record_metric(f"{metric_name}_duration_ms", duration)
                    self.record_metric(f"{metric_name}_success_count", 1)
                    return result
                except Exception as e:
                    duration = (time.time() - start_time) * 1000
                    self.record_metric(f"{metric_name}_duration_ms", duration)
                    self.record_metric(f"{metric_name}_error_count", 1)
                    raise
            return wrapper
        return decorator

    def count_by_status(self, metric_name, status_code):
        """Count requests by status code."""
        self.record_metric(
            f"{metric_name}_by_status",
            1,
            labels={"status_code": str(status_code)}
        )

CI/CD Integration

Cloud Build Pipeline

# cloudbuild.yaml
steps:
  # Run tests
  - name: 'gcr.io/cloud-builders/docker'
    args: ['build', '-t', 'test', '-f', 'Dockerfile.test', '.']

  - name: 'test'
    args: ['npm', 'test']

  # Build production image
  - name: 'gcr.io/cloud-builders/docker'
    args: [
      'build',
      '-t', 'gcr.io/$PROJECT_ID/${_SERVICE_NAME}:${SHORT_SHA}',
      '-t', 'gcr.io/$PROJECT_ID/${_SERVICE_NAME}:latest',
      '--build-arg', 'BUILD_DATE=$(date -u +"%Y-%m-%dT%H:%M:%SZ")',
      '--build-arg', 'VCS_REF=${SHORT_SHA}',
      '--build-arg', 'VERSION=${TAG_NAME}',
      '.'
    ]

  # Push to Container Registry
  - name: 'gcr.io/cloud-builders/docker'
    args: ['push', '--all-tags', 'gcr.io/$PROJECT_ID/${_SERVICE_NAME}']

  # Deploy to Cloud Run (staging)
  - name: 'gcr.io/cloud-builders/gcloud'
    args: [
      'run', 'deploy', '${_SERVICE_NAME}-staging',
      '--image', 'gcr.io/$PROJECT_ID/${_SERVICE_NAME}:${SHORT_SHA}',
      '--region', '${_REGION}',
      '--platform', 'managed',
      '--no-traffic',
      '--tag', 'pr-${_PR_NUMBER}'
    ]

  # Run integration tests
  - name: 'gcr.io/cloud-builders/gcloud'
    entrypoint: 'bash'
    args:
      - '-c'
      - |
        SERVICE_URL=$(gcloud run services describe ${_SERVICE_NAME}-staging \
          --region=${_REGION} \
          --format='value(status.url)')

        npm run integration-test -- --url=$SERVICE_URL

  # Deploy to production with canary
  - name: 'gcr.io/cloud-builders/gcloud'
    args: [
      'run', 'services', 'update-traffic',
      '${_SERVICE_NAME}',
      '--region', '${_REGION}',
      '--to-revisions', '${_SERVICE_NAME}-${SHORT_SHA}=10'
    ]

substitutions:
  _SERVICE_NAME: my-service
  _REGION: us-central1
  _PR_NUMBER: ${BRANCH_NAME}

options:
  logging: CLOUD_LOGGING_ONLY

# Trigger configuration
trigger:
  github:
    owner: 'your-org'
    name: 'your-repo'
    push:
      branch: '^main$'

GitOps with Cloud Deploy

# skaffold.yaml
apiVersion: skaffold/v2beta28
kind: Config
metadata:
  name: cloud-run-app
build:
  artifacts:
  - image: app
    docker:
      dockerfile: Dockerfile
  tagPolicy:
    gitCommit: {}
  local:
    push: false
deploy:
  cloudrun:
    projectid: PROJECT_ID
    region: us-central1
    services:
    - name: my-service
      image: app
      cpu: 2
      memory: 2Gi
      maxInstances: 100
      env:
      - name: ENVIRONMENT
        value: production
profiles:
- name: dev
  deploy:
    cloudrun:
      services:
      - name: my-service-dev
        image: app
        cpu: 1
        memory: 512Mi
        maxInstances: 10
        env:
        - name: ENVIRONMENT
          value: development

Best Practices

Production Checklist

# health_checks.py
from flask import Flask, jsonify
import psutil
import gc

app = Flask(__name__)

@app.route('/health')
def health_check():
    """Basic health check endpoint."""
    return jsonify({
        'status': 'healthy',
        'service': os.environ.get('K_SERVICE'),
        'revision': os.environ.get('K_REVISION')
    })

@app.route('/ready')
def readiness_check():
    """Readiness check with dependency validation."""
    checks = {
        'database': check_database(),
        'cache': check_cache(),
        'external_api': check_external_api()
    }

    all_ready = all(checks.values())
    status_code = 200 if all_ready else 503

    return jsonify({
        'ready': all_ready,
        'checks': checks
    }), status_code

@app.route('/debug/memory')
def memory_stats():
    """Memory usage statistics."""
    memory = psutil.Process().memory_info()
    gc_stats = gc.get_stats()

    return jsonify({
        'memory': {
            'rss_mb': memory.rss / 1024 / 1024,
            'vms_mb': memory.vms / 1024 / 1024
        },
        'gc': gc_stats[0] if gc_stats else {}
    })

def check_database():
    """Check database connectivity."""
    try:
        # Perform a simple query
        db.collection('health').document('check').get()
        return True
    except Exception:
        return False

def check_cache():
    """Check cache connectivity."""
    try:
        cache.ping()
        return True
    except Exception:
        return False

def check_external_api():
    """Check external API availability."""
    try:
        response = requests.get(
            'https://api.example.com/health',
            timeout=2
        )
        return response.status_code == 200
    except Exception:
        return False

Security Hardening

# Secure Dockerfile
FROM python:3.11-slim-bullseye AS builder

# Install security updates
RUN apt-get update && apt-get upgrade -y && \
    apt-get install -y --no-install-recommends \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

# Create app directory
WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir --user -r requirements.txt

# Final stage
FROM python:3.11-slim-bullseye

# Install security updates
RUN apt-get update && apt-get upgrade -y && \
    rm -rf /var/lib/apt/lists/*

# Create non-root user
RUN groupadd -g 1001 appuser && \
    useradd -r -u 1001 -g appuser appuser

# Copy Python dependencies
COPY --from=builder /root/.local /home/appuser/.local

# Set working directory
WORKDIR /app

# Copy application
COPY --chown=appuser:appuser . .

# Switch to non-root user
USER appuser

# Add Python user packages to PATH
ENV PATH=/home/appuser/.local/bin:$PATH

# Security headers
ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1 \
    PIP_NO_CACHE_DIR=1 \
    PIP_DISABLE_PIP_VERSION_CHECK=1

# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
    CMD python -c "import requests; requests.get('http://localhost:8080/health')"

# Run application
CMD ["gunicorn", "--bind", ":8080", "--workers", "2", "--threads", "4", \
     "--worker-class", "gthread", "--access-logfile", "-", \
     "--error-logfile", "-", "app:app"]

Conclusion

Google Cloud Run provides a powerful platform for deploying containerized applications without managing infrastructure. Key benefits:

  1. True Serverless: Scale to zero, pay only for what you use
  2. Container Flexibility: Use any language or framework
  3. Fully Managed: No infrastructure management
  4. Advanced Features: Traffic splitting, authentication, VPC connectivity
  5. Developer Experience: Great CLI, console, and API support

Best Practices Summary

  • Optimize for Cold Starts: Minimize dependencies and startup time
  • Implement Health Checks: Use readiness and liveness probes
  • Structure Logs: Use JSON structured logging
  • Monitor Everything: Custom metrics and traces
  • Secure by Default: Run as non-root, use least privilege
  • Automate Deployment: CI/CD with testing and canary releases
  • Handle Graceful Shutdown: Listen for SIGTERM signals

Next Steps

  • Explore Cloud Run for Anthos for hybrid deployments
  • Learn about Cloud Run Jobs for batch processing
  • Study Eventarc for event-driven architectures
  • Implement service mesh with Cloud Run and Istio
  • Get certified as a Google Cloud Developer

Remember: Cloud Run makes it easy to deploy containers, but following best practices ensures your services are production-ready, secure, and cost-effective.