Google Cloud Run: Serverless Container Deployment Guide
Google Cloud Run is a fully managed compute platform that automatically scales your stateless containers. This guide covers everything from basic deployments to advanced patterns for building production-ready serverless applications.
Why Cloud Run?
Cloud Run excels with: - Any Language, Any Library: Deploy containers with your favorite stack - Automatic Scaling: Scale to zero or thousands of instances - Pay Per Use: Billed only for actual request time - Fully Managed: No infrastructure to manage - Portable: Built on Knative, runs anywhere
Getting Started with Cloud Run
Your First Deployment
# Dockerfile
FROM node:18-alpine
WORKDIR /app
# Copy package files
COPY package*.json ./
RUN npm ci --only=production
# Copy application files
COPY . .
# Create non-root user
RUN addgroup -g 1001 -S nodejs
RUN adduser -S nodejs -u 1001
USER nodejs
# Expose port (Cloud Run sets PORT env variable)
EXPOSE 8080
# Start the application
CMD ["node", "server.js"]
// server.js
const express = require('express');
const app = express();
// Cloud Run sets the PORT environment variable
const port = process.env.PORT || 8080;
// Health check endpoint
app.get('/', (req, res) => {
res.json({
message: 'Hello from Cloud Run!',
revision: process.env.K_REVISION || 'unknown',
service: process.env.K_SERVICE || 'unknown'
});
});
// Graceful shutdown
process.on('SIGTERM', () => {
console.log('SIGTERM signal received: closing HTTP server');
server.close(() => {
console.log('HTTP server closed');
});
});
const server = app.listen(port, () => {
console.log(`Server listening on port ${port}`);
});
Deploy the service:
# Build and push container
gcloud builds submit --tag gcr.io/PROJECT_ID/hello-cloudrun
# Deploy to Cloud Run
gcloud run deploy hello-cloudrun \
--image gcr.io/PROJECT_ID/hello-cloudrun \
--platform managed \
--region us-central1 \
--allow-unauthenticated \
--memory 512Mi \
--cpu 1 \
--timeout 60 \
--concurrency 80 \
--max-instances 100
Advanced Container Patterns
Multi-Stage Builds
# Multi-stage Dockerfile for Go application
FROM golang:1.19-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o main .
# Final stage
FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/
# Copy binary from builder
COPY --from=builder /app/main .
# Run as non-root user
RUN addgroup -g 1001 -S appuser && adduser -S appuser -u 1001
USER appuser
EXPOSE 8080
CMD ["./main"]
Python Application with Dependencies
# Dockerfile for Python application
FROM python:3.11-slim
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
python3-dev \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
# Copy requirements first for better caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user
RUN useradd -m -u 1001 appuser
USER appuser
# Gunicorn for production
CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app
# main.py
import os
import logging
from flask import Flask, request, jsonify
from google.cloud import firestore
from google.cloud import storage
import asyncio
import aiohttp
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
# Initialize clients
db = firestore.Client()
storage_client = storage.Client()
@app.route('/', methods=['GET'])
def hello():
"""Health check endpoint."""
return jsonify({
'status': 'healthy',
'service': os.environ.get('K_SERVICE', 'unknown'),
'revision': os.environ.get('K_REVISION', 'unknown'),
'project': os.environ.get('GOOGLE_CLOUD_PROJECT', 'unknown')
})
@app.route('/process', methods=['POST'])
async def process_data():
"""Process incoming data asynchronously."""
try:
data = request.get_json()
# Validate input
if not data or 'url' not in data:
return jsonify({'error': 'Missing required field: url'}), 400
# Process asynchronously
result = await process_url(data['url'])
# Store result in Firestore
doc_ref = db.collection('results').document()
doc_ref.set({
'url': data['url'],
'result': result,
'timestamp': firestore.SERVER_TIMESTAMP
})
return jsonify({
'id': doc_ref.id,
'status': 'processed',
'result': result
})
except Exception as e:
logging.error(f"Processing error: {e}")
return jsonify({'error': str(e)}), 500
async def process_url(url):
"""Process URL with async HTTP client."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
content = await response.text()
return {
'status': response.status,
'content_length': len(content),
'headers': dict(response.headers)
}
@app.route('/upload', methods=['POST'])
def handle_upload():
"""Handle file uploads to Cloud Storage."""
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
# Upload to Cloud Storage
bucket_name = os.environ.get('STORAGE_BUCKET', 'my-uploads')
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(f"uploads/{file.filename}")
blob.upload_from_string(
file.read(),
content_type=file.content_type
)
return jsonify({
'filename': file.filename,
'url': blob.public_url,
'size': blob.size
})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))
Cloud Run Configuration
Advanced Service Configuration
# service.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: advanced-service
annotations:
run.googleapis.com/launch-stage: GA
spec:
template:
metadata:
annotations:
# Autoscaling configuration
autoscaling.knative.dev/minScale: "1"
autoscaling.knative.dev/maxScale: "1000"
# CPU allocation
run.googleapis.com/cpu-throttling: "false"
run.googleapis.com/execution-environment: "gen2"
# VPC connector for private resources
run.googleapis.com/vpc-access-connector: projects/PROJECT_ID/locations/us-central1/connectors/my-connector
run.googleapis.com/vpc-access-egress: private-ranges-only
# Session affinity
run.googleapis.com/sessionAffinity: "true"
spec:
containerConcurrency: 1000
timeoutSeconds: 900
serviceAccountName: cloudrun-sa@PROJECT_ID.iam.gserviceaccount.com
containers:
- image: gcr.io/PROJECT_ID/advanced-service
ports:
- name: http1
containerPort: 8080
env:
- name: ENVIRONMENT
value: "production"
- name: DB_CONNECTION
valueFrom:
secretKeyRef:
name: db-connection
key: latest
resources:
limits:
cpu: "4"
memory: "8Gi"
livenessProbe:
httpGet:
path: /health
initialDelaySeconds: 10
periodSeconds: 10
startupProbe:
httpGet:
path: /ready
initialDelaySeconds: 0
periodSeconds: 1
failureThreshold: 30
Deploy with configuration:
gcloud run services replace service.yaml --region us-central1
Environment-Specific Configurations
# config_manager.py
import os
from google.cloud import secretmanager
class ConfigManager:
"""Manage Cloud Run configurations."""
def __init__(self):
self.project_id = os.environ.get('GOOGLE_CLOUD_PROJECT')
self.secret_client = secretmanager.SecretManagerServiceClient()
self._cache = {}
def get_config(self):
"""Get environment-specific configuration."""
env = os.environ.get('ENVIRONMENT', 'development')
base_config = {
'app_name': 'cloud-run-app',
'log_level': 'INFO',
'max_workers': 4,
'cache_ttl': 300
}
env_configs = {
'development': {
'debug': True,
'database_url': 'sqlite:///dev.db',
'cache_enabled': False
},
'staging': {
'debug': False,
'database_url': self.get_secret('staging-db-url'),
'cache_enabled': True,
'redis_url': self.get_secret('staging-redis-url')
},
'production': {
'debug': False,
'database_url': self.get_secret('prod-db-url'),
'cache_enabled': True,
'redis_url': self.get_secret('prod-redis-url'),
'cdn_url': 'https://cdn.example.com'
}
}
config = base_config.copy()
config.update(env_configs.get(env, {}))
return config
def get_secret(self, secret_id, version='latest'):
"""Get secret from Secret Manager with caching."""
cache_key = f"{secret_id}:{version}"
if cache_key in self._cache:
return self._cache[cache_key]
name = f"projects/{self.project_id}/secrets/{secret_id}/versions/{version}"
response = self.secret_client.access_secret_version(request={"name": name})
secret_value = response.payload.data.decode('UTF-8')
self._cache[cache_key] = secret_value
return secret_value
Traffic Management
Blue-Green Deployments
#!/bin/bash
# blue_green_deploy.sh
SERVICE_NAME="my-service"
REGION="us-central1"
IMAGE="gcr.io/PROJECT_ID/my-service:$1"
# Deploy new revision with no traffic
gcloud run deploy $SERVICE_NAME \
--image $IMAGE \
--region $REGION \
--no-traffic \
--tag green
# Test the green deployment
GREEN_URL=$(gcloud run services describe $SERVICE_NAME \
--region $REGION \
--format 'value(status.url)')
echo "Testing green deployment at: $GREEN_URL"
curl -H "Authorization: Bearer $(gcloud auth print-identity-token)" \
"$GREEN_URL"
# If tests pass, switch traffic
read -p "Switch traffic to green? (y/n) " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]
then
gcloud run services update-traffic $SERVICE_NAME \
--region $REGION \
--to-latest
echo "Traffic switched to green deployment"
else
echo "Deployment cancelled, removing green revision"
gcloud run revisions delete green --region $REGION --quiet
fi
Canary Deployments
# canary_manager.py
from google.cloud import run_v2
import time
class CanaryManager:
"""Manage canary deployments for Cloud Run."""
def __init__(self, project_id, region):
self.project_id = project_id
self.region = region
self.services_client = run_v2.ServicesClient()
self.revisions_client = run_v2.RevisionsClient()
def deploy_canary(self, service_name, new_image, canary_percentage=10):
"""Deploy new revision as canary."""
service_path = f"projects/{self.project_id}/locations/{self.region}/services/{service_name}"
# Get current service
service = self.services_client.get_service(name=service_path)
# Update with new image
service.template.containers[0].image = new_image
service.template.revision = f"{service_name}-canary-{int(time.time())}"
# Deploy without traffic
updated_service = self.services_client.update_service(
service=service,
allow_missing=True
)
# Get revision names
canary_revision = updated_service.template.revision
stable_revision = self.get_stable_revision(service_name)
# Split traffic
self.split_traffic(
service_name,
{
stable_revision: 100 - canary_percentage,
canary_revision: canary_percentage
}
)
return canary_revision
def monitor_canary(self, service_name, canary_revision, metrics_threshold):
"""Monitor canary deployment metrics."""
# This would integrate with Cloud Monitoring
# to check error rates, latency, etc.
metrics = self.get_revision_metrics(canary_revision)
if metrics['error_rate'] > metrics_threshold['max_error_rate']:
print(f"Canary failed: error rate {metrics['error_rate']}%")
self.rollback_canary(service_name, canary_revision)
return False
if metrics['p99_latency'] > metrics_threshold['max_p99_latency']:
print(f"Canary failed: P99 latency {metrics['p99_latency']}ms")
self.rollback_canary(service_name, canary_revision)
return False
return True
def promote_canary(self, service_name, canary_revision):
"""Promote canary to receive all traffic."""
self.split_traffic(service_name, {canary_revision: 100})
print(f"Canary {canary_revision} promoted to 100% traffic")
def progressive_rollout(self, service_name, canary_revision, stages):
"""Progressive traffic rollout."""
for percentage in stages:
stable_revision = self.get_stable_revision(service_name)
self.split_traffic(
service_name,
{
stable_revision: 100 - percentage,
canary_revision: percentage
}
)
print(f"Canary at {percentage}% traffic")
time.sleep(300) # Wait 5 minutes between stages
if not self.monitor_canary(service_name, canary_revision, {}):
return False
return True
Authentication and Security
Service-to-Service Authentication
# auth_client.py
import google.auth
import google.auth.transport.requests
from google.oauth2 import service_account
import requests
class CloudRunAuthClient:
"""Handle authentication for Cloud Run services."""
def __init__(self, target_audience=None):
self.target_audience = target_audience
self._auth_session = None
def get_auth_session(self):
"""Get authenticated session for service-to-service calls."""
if self._auth_session:
return self._auth_session
# Get credentials
credentials, project = google.auth.default()
# Create authenticated session
auth_req = google.auth.transport.requests.Request()
credentials.refresh(auth_req)
self._auth_session = requests.Session()
self._auth_session.headers.update({
'Authorization': f'Bearer {credentials.token}'
})
return self._auth_session
def call_service(self, service_url, method='GET', **kwargs):
"""Call another Cloud Run service with authentication."""
session = self.get_auth_session()
# Get ID token for the target service
auth_req = google.auth.transport.requests.Request()
id_token = google.oauth2.id_token.fetch_id_token(
auth_req,
self.target_audience or service_url
)
# Make authenticated request
headers = kwargs.get('headers', {})
headers['Authorization'] = f'Bearer {id_token}'
kwargs['headers'] = headers
response = session.request(method, service_url, **kwargs)
response.raise_for_status()
return response
def verify_jwt_token(self, token):
"""Verify JWT token from incoming request."""
import jwt
from jwt import PyJWKClient
# Google's public key URL
jwks_url = "https://www.googleapis.com/oauth2/v3/certs"
jwks_client = PyJWKClient(jwks_url)
try:
# Decode and verify token
signing_key = jwks_client.get_signing_key_from_jwt(token)
decoded = jwt.decode(
token,
signing_key.key,
algorithms=["RS256"],
audience=self.target_audience,
issuer="https://accounts.google.com"
)
return decoded
except jwt.InvalidTokenError as e:
raise ValueError(f"Invalid token: {e}")
API Gateway Integration
# api_gateway_config.yaml
swagger: "2.0"
info:
title: "Cloud Run API Gateway"
description: "API Gateway for Cloud Run services"
version: "1.0.0"
schemes:
- "https"
produces:
- "application/json"
x-google-backend:
address: https://hello-cloudrun-xxx-uc.a.run.app
protocol: h2
paths:
/public/hello:
get:
summary: "Public endpoint"
operationId: "getPublicHello"
responses:
'200':
description: "Success"
/private/data:
get:
summary: "Private endpoint requiring authentication"
operationId: "getPrivateData"
security:
- google_id_token: []
responses:
'200':
description: "Success"
'401':
description: "Unauthorized"
/api/process:
post:
summary: "Process data with rate limiting"
operationId: "processData"
x-google-quota:
metricCosts:
"process-requests": 1
parameters:
- in: body
name: body
required: true
schema:
type: object
properties:
data:
type: string
responses:
'200':
description: "Success"
securityDefinitions:
google_id_token:
authorizationUrl: ""
flow: "implicit"
type: "oauth2"
x-google-issuer: "https://accounts.google.com"
x-google-audiences: "YOUR-CLIENT-ID"
x-google-management:
metrics:
- name: "process-requests"
displayName: "Process API requests"
valueType: INT64
metricKind: DELTA
quota:
limits:
- name: "process-limit"
metric: "process-requests"
unit: "1/min/{project}"
values:
STANDARD: 100
Performance Optimization
Cold Start Optimization
# optimized_app.py
import os
import time
from flask import Flask, g
import redis
from google.cloud import firestore
# Global initialization - happens once per container
print(f"Cold start initialization at {time.time()}")
# Initialize expensive resources globally
app = Flask(__name__)
db = None
cache = None
def get_db():
"""Lazy load Firestore client."""
global db
if db is None:
db = firestore.Client()
return db
def get_cache():
"""Lazy load Redis client."""
global cache
if cache is None and os.environ.get('REDIS_URL'):
cache = redis.from_url(os.environ['REDIS_URL'])
return cache
# Minimize imports in request handlers
@app.before_request
def before_request():
"""Set up request context."""
g.start_time = time.time()
g.request_id = request.headers.get('X-Cloud-Trace-Context', 'unknown')
@app.after_request
def after_request(response):
"""Log request duration."""
if hasattr(g, 'start_time'):
duration = (time.time() - g.start_time) * 1000
print(f"Request {g.request_id} took {duration:.2f}ms")
return response
@app.route('/optimized', methods=['GET'])
def optimized_handler():
"""Optimized request handler."""
# Use lazy-loaded resources
db_client = get_db()
cache_client = get_cache()
# Check cache first
if cache_client:
cached = cache_client.get('result')
if cached:
return {'result': cached.decode(), 'cached': True}
# Perform operation
result = perform_operation(db_client)
# Cache result
if cache_client:
cache_client.setex('result', 300, result)
return {'result': result, 'cached': False}
# Use connection pooling for external services
from urllib3 import PoolManager
http = PoolManager(maxsize=10)
@app.route('/external', methods=['GET'])
def call_external():
"""Call external service with connection pooling."""
response = http.request('GET', 'https://api.example.com/data')
return {'data': response.data.decode()}
# Implement graceful shutdown
import signal
import sys
def signal_handler(sig, frame):
print('Graceful shutdown initiated')
# Clean up resources
if cache:
cache.close()
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
if __name__ == '__main__':
port = int(os.environ.get('PORT', 8080))
app.run(host='0.0.0.0', port=port)
Concurrency Optimization
// main.go - High-concurrency Go service
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"runtime"
"sync"
"syscall"
"time"
"cloud.google.com/go/firestore"
"github.com/go-redis/redis/v8"
)
var (
firestoreClient *firestore.Client
redisClient *redis.Client
clientPool sync.Pool
)
func init() {
// Set GOMAXPROCS to match container CPU
runtime.GOMAXPROCS(runtime.NumCPU())
// Initialize Firestore
ctx := context.Background()
var err error
firestoreClient, err = firestore.NewClient(ctx, os.Getenv("GOOGLE_CLOUD_PROJECT"))
if err != nil {
log.Fatalf("Failed to create Firestore client: %v", err)
}
// Initialize Redis with connection pool
redisClient = redis.NewClient(&redis.Options{
Addr: os.Getenv("REDIS_URL"),
PoolSize: 10 * runtime.NumCPU(),
MinIdleConns: 5,
MaxRetries: 3,
})
// Initialize object pool for request processing
clientPool = sync.Pool{
New: func() interface{} {
return &RequestProcessor{}
},
}
}
type RequestProcessor struct {
Buffer []byte
}
func (p *RequestProcessor) Reset() {
p.Buffer = p.Buffer[:0]
}
func main() {
mux := http.NewServeMux()
// Endpoints
mux.HandleFunc("/", healthCheck)
mux.HandleFunc("/process", handleProcess)
mux.HandleFunc("/batch", handleBatch)
// Server configuration
srv := &http.Server{
Addr: ":" + os.Getenv("PORT"),
Handler: mux,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 120 * time.Second,
}
// Start server
go func() {
log.Printf("Server starting on port %s", os.Getenv("PORT"))
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
log.Fatalf("ListenAndServe: %v", err)
}
}()
// Graceful shutdown
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("Shutting down server...")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Fatal("Server forced to shutdown:", err)
}
log.Println("Server exited")
}
func handleProcess(w http.ResponseWriter, r *http.Request) {
// Get processor from pool
processor := clientPool.Get().(*RequestProcessor)
defer func() {
processor.Reset()
clientPool.Put(processor)
}()
// Process request concurrently
ctx := r.Context()
resultChan := make(chan interface{}, 2)
errChan := make(chan error, 2)
// Parallel operations
go func() {
// Check cache
val, err := redisClient.Get(ctx, "key").Result()
if err == nil {
resultChan <- val
} else {
errChan <- err
}
}()
go func() {
// Query Firestore
doc, err := firestoreClient.Collection("data").Doc("doc").Get(ctx)
if err == nil {
resultChan <- doc.Data()
} else {
errChan <- err
}
}()
// Wait for first result
select {
case result := <-resultChan:
json.NewEncoder(w).Encode(map[string]interface{}{
"result": result,
})
case err := <-errChan:
http.Error(w, err.Error(), http.StatusInternalServerError)
case <-ctx.Done():
http.Error(w, "Request timeout", http.StatusRequestTimeout)
}
}
func handleBatch(w http.ResponseWriter, r *http.Request) {
// Handle batch processing with worker pool
var requests []BatchRequest
if err := json.NewDecoder(r.Body).Decode(&requests); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// Process in parallel with limited concurrency
results := make([]BatchResult, len(requests))
var wg sync.WaitGroup
semaphore := make(chan struct{}, 10) // Limit to 10 concurrent operations
for i, req := range requests {
wg.Add(1)
go func(idx int, request BatchRequest) {
defer wg.Done()
semaphore <- struct{}{} // Acquire
defer func() { <-semaphore }() // Release
result := processBatchItem(request)
results[idx] = result
}(i, req)
}
wg.Wait()
json.NewEncoder(w).Encode(results)
}
Observability and Monitoring
Structured Logging
# logging_config.py
import os
import json
import logging
from pythonjsonlogger import jsonlogger
from google.cloud import logging as cloud_logging
class CloudRunLogger:
"""Configure structured logging for Cloud Run."""
def __init__(self, service_name=None):
self.service_name = service_name or os.environ.get('K_SERVICE', 'unknown')
self.revision = os.environ.get('K_REVISION', 'unknown')
self.setup_logging()
def setup_logging(self):
"""Setup structured logging."""
# Use Cloud Logging in production
if os.environ.get('K_SERVICE'):
client = cloud_logging.Client()
client.setup_logging()
# Configure JSON formatter
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(
fmt='%(timestamp)s %(severity)s %(message)s',
rename_fields={
'timestamp': 'time',
'severity': 'severity'
}
)
logHandler.setFormatter(formatter)
# Set handler
logger = logging.getLogger()
logger.handlers = [logHandler]
logger.setLevel(logging.INFO)
# Add Cloud Run metadata to all logs
old_factory = logging.getLogRecordFactory()
def record_factory(*args, **kwargs):
record = old_factory(*args, **kwargs)
record.service = self.service_name
record.revision = self.revision
# Add trace context if available
trace_header = request.headers.get('X-Cloud-Trace-Context')
if trace_header:
trace = trace_header.split('/')[0]
record.trace = f"projects/{os.environ.get('GOOGLE_CLOUD_PROJECT')}/traces/{trace}"
return record
logging.setLogRecordFactory(record_factory)
def log_request(self, request, response, duration):
"""Log HTTP request with metadata."""
logging.info(
"HTTP Request",
extra={
'httpRequest': {
'requestMethod': request.method,
'requestUrl': request.url,
'status': response.status_code,
'userAgent': request.headers.get('User-Agent'),
'remoteIp': request.remote_addr,
'latency': f"{duration}s"
}
}
)
def log_error(self, error, context=None):
"""Log error with context."""
logging.error(
f"Error: {str(error)}",
extra={
'error': {
'type': type(error).__name__,
'message': str(error),
'stacktrace': traceback.format_exc()
},
'context': context or {}
},
exc_info=True
)
Custom Metrics
# metrics.py
from google.cloud import monitoring_v3
import time
from functools import wraps
class CloudRunMetrics:
"""Custom metrics for Cloud Run services."""
def __init__(self, project_id):
self.client = monitoring_v3.MetricServiceClient()
self.project_name = f"projects/{project_id}"
self.resource = {
"type": "cloud_run_revision",
"labels": {
"service_name": os.environ.get('K_SERVICE', 'unknown'),
"revision_name": os.environ.get('K_REVISION', 'unknown'),
"location": os.environ.get('K_REGION', 'unknown')
}
}
def record_metric(self, metric_name, value, metric_type="custom.googleapis.com"):
"""Record a custom metric."""
series = monitoring_v3.TimeSeries()
series.metric.type = f"{metric_type}/{metric_name}"
series.resource.type = self.resource["type"]
series.resource.labels.update(self.resource["labels"])
now = time.time()
seconds = int(now)
nanos = int((now - seconds) * 10 ** 9)
interval = monitoring_v3.TimeInterval(
{"end_time": {"seconds": seconds, "nanos": nanos}}
)
point = monitoring_v3.Point({
"interval": interval,
"value": {"double_value": value}
})
series.points = [point]
self.client.create_time_series(
name=self.project_name,
time_series=[series]
)
def timer(self, metric_name):
"""Decorator to time function execution."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
duration = (time.time() - start_time) * 1000 # ms
self.record_metric(f"{metric_name}_duration_ms", duration)
self.record_metric(f"{metric_name}_success_count", 1)
return result
except Exception as e:
duration = (time.time() - start_time) * 1000
self.record_metric(f"{metric_name}_duration_ms", duration)
self.record_metric(f"{metric_name}_error_count", 1)
raise
return wrapper
return decorator
def count_by_status(self, metric_name, status_code):
"""Count requests by status code."""
self.record_metric(
f"{metric_name}_by_status",
1,
labels={"status_code": str(status_code)}
)
CI/CD Integration
Cloud Build Pipeline
# cloudbuild.yaml
steps:
# Run tests
- name: 'gcr.io/cloud-builders/docker'
args: ['build', '-t', 'test', '-f', 'Dockerfile.test', '.']
- name: 'test'
args: ['npm', 'test']
# Build production image
- name: 'gcr.io/cloud-builders/docker'
args: [
'build',
'-t', 'gcr.io/$PROJECT_ID/${_SERVICE_NAME}:${SHORT_SHA}',
'-t', 'gcr.io/$PROJECT_ID/${_SERVICE_NAME}:latest',
'--build-arg', 'BUILD_DATE=$(date -u +"%Y-%m-%dT%H:%M:%SZ")',
'--build-arg', 'VCS_REF=${SHORT_SHA}',
'--build-arg', 'VERSION=${TAG_NAME}',
'.'
]
# Push to Container Registry
- name: 'gcr.io/cloud-builders/docker'
args: ['push', '--all-tags', 'gcr.io/$PROJECT_ID/${_SERVICE_NAME}']
# Deploy to Cloud Run (staging)
- name: 'gcr.io/cloud-builders/gcloud'
args: [
'run', 'deploy', '${_SERVICE_NAME}-staging',
'--image', 'gcr.io/$PROJECT_ID/${_SERVICE_NAME}:${SHORT_SHA}',
'--region', '${_REGION}',
'--platform', 'managed',
'--no-traffic',
'--tag', 'pr-${_PR_NUMBER}'
]
# Run integration tests
- name: 'gcr.io/cloud-builders/gcloud'
entrypoint: 'bash'
args:
- '-c'
- |
SERVICE_URL=$(gcloud run services describe ${_SERVICE_NAME}-staging \
--region=${_REGION} \
--format='value(status.url)')
npm run integration-test -- --url=$SERVICE_URL
# Deploy to production with canary
- name: 'gcr.io/cloud-builders/gcloud'
args: [
'run', 'services', 'update-traffic',
'${_SERVICE_NAME}',
'--region', '${_REGION}',
'--to-revisions', '${_SERVICE_NAME}-${SHORT_SHA}=10'
]
substitutions:
_SERVICE_NAME: my-service
_REGION: us-central1
_PR_NUMBER: ${BRANCH_NAME}
options:
logging: CLOUD_LOGGING_ONLY
# Trigger configuration
trigger:
github:
owner: 'your-org'
name: 'your-repo'
push:
branch: '^main$'
GitOps with Cloud Deploy
# skaffold.yaml
apiVersion: skaffold/v2beta28
kind: Config
metadata:
name: cloud-run-app
build:
artifacts:
- image: app
docker:
dockerfile: Dockerfile
tagPolicy:
gitCommit: {}
local:
push: false
deploy:
cloudrun:
projectid: PROJECT_ID
region: us-central1
services:
- name: my-service
image: app
cpu: 2
memory: 2Gi
maxInstances: 100
env:
- name: ENVIRONMENT
value: production
profiles:
- name: dev
deploy:
cloudrun:
services:
- name: my-service-dev
image: app
cpu: 1
memory: 512Mi
maxInstances: 10
env:
- name: ENVIRONMENT
value: development
Best Practices
Production Checklist
# health_checks.py
from flask import Flask, jsonify
import psutil
import gc
app = Flask(__name__)
@app.route('/health')
def health_check():
"""Basic health check endpoint."""
return jsonify({
'status': 'healthy',
'service': os.environ.get('K_SERVICE'),
'revision': os.environ.get('K_REVISION')
})
@app.route('/ready')
def readiness_check():
"""Readiness check with dependency validation."""
checks = {
'database': check_database(),
'cache': check_cache(),
'external_api': check_external_api()
}
all_ready = all(checks.values())
status_code = 200 if all_ready else 503
return jsonify({
'ready': all_ready,
'checks': checks
}), status_code
@app.route('/debug/memory')
def memory_stats():
"""Memory usage statistics."""
memory = psutil.Process().memory_info()
gc_stats = gc.get_stats()
return jsonify({
'memory': {
'rss_mb': memory.rss / 1024 / 1024,
'vms_mb': memory.vms / 1024 / 1024
},
'gc': gc_stats[0] if gc_stats else {}
})
def check_database():
"""Check database connectivity."""
try:
# Perform a simple query
db.collection('health').document('check').get()
return True
except Exception:
return False
def check_cache():
"""Check cache connectivity."""
try:
cache.ping()
return True
except Exception:
return False
def check_external_api():
"""Check external API availability."""
try:
response = requests.get(
'https://api.example.com/health',
timeout=2
)
return response.status_code == 200
except Exception:
return False
Security Hardening
# Secure Dockerfile
FROM python:3.11-slim-bullseye AS builder
# Install security updates
RUN apt-get update && apt-get upgrade -y && \
apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Create app directory
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir --user -r requirements.txt
# Final stage
FROM python:3.11-slim-bullseye
# Install security updates
RUN apt-get update && apt-get upgrade -y && \
rm -rf /var/lib/apt/lists/*
# Create non-root user
RUN groupadd -g 1001 appuser && \
useradd -r -u 1001 -g appuser appuser
# Copy Python dependencies
COPY --from=builder /root/.local /home/appuser/.local
# Set working directory
WORKDIR /app
# Copy application
COPY --chown=appuser:appuser . .
# Switch to non-root user
USER appuser
# Add Python user packages to PATH
ENV PATH=/home/appuser/.local/bin:$PATH
# Security headers
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1 \
PIP_DISABLE_PIP_VERSION_CHECK=1
# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8080/health')"
# Run application
CMD ["gunicorn", "--bind", ":8080", "--workers", "2", "--threads", "4", \
"--worker-class", "gthread", "--access-logfile", "-", \
"--error-logfile", "-", "app:app"]
Conclusion
Google Cloud Run provides a powerful platform for deploying containerized applications without managing infrastructure. Key benefits:
- True Serverless: Scale to zero, pay only for what you use
- Container Flexibility: Use any language or framework
- Fully Managed: No infrastructure management
- Advanced Features: Traffic splitting, authentication, VPC connectivity
- Developer Experience: Great CLI, console, and API support
Best Practices Summary
- Optimize for Cold Starts: Minimize dependencies and startup time
- Implement Health Checks: Use readiness and liveness probes
- Structure Logs: Use JSON structured logging
- Monitor Everything: Custom metrics and traces
- Secure by Default: Run as non-root, use least privilege
- Automate Deployment: CI/CD with testing and canary releases
- Handle Graceful Shutdown: Listen for SIGTERM signals
Next Steps
- Explore Cloud Run for Anthos for hybrid deployments
- Learn about Cloud Run Jobs for batch processing
- Study Eventarc for event-driven architectures
- Implement service mesh with Cloud Run and Istio
- Get certified as a Google Cloud Developer
Remember: Cloud Run makes it easy to deploy containers, but following best practices ensures your services are production-ready, secure, and cost-effective.