Google Cloud SQL and Database Services: Complete Guide
Google Cloud offers a comprehensive suite of database services for every use case, from traditional relational databases to globally distributed NoSQL solutions. This guide covers Cloud SQL, Cloud Spanner, Firestore, and Bigtable with best practices and advanced configurations.
Choosing the Right Database Service
Database Service Comparison
Service | Type | Use Case | Scale | Consistency |
---|---|---|---|---|
Cloud SQL | Relational (MySQL, PostgreSQL, SQL Server) | Traditional applications, WordPress, e-commerce | Regional | Strong |
Cloud Spanner | Relational (Distributed) | Global applications, financial systems | Global | Strong |
Firestore | Document NoSQL | Mobile/web apps, real-time sync | Global | Strong |
Bigtable | Wide-column NoSQL | IoT, time-series, analytics | Global | Eventual |
Memorystore | In-memory (Redis, Memcached) | Caching, session storage | Regional | Strong |
Cloud SQL Deep Dive
Creating High-Availability Cloud SQL Instances
# Create MySQL instance with HA
gcloud sql instances create production-mysql \
--database-version=MYSQL_8_0 \
--tier=db-n1-highmem-4 \
--region=us-central1 \
--network=projects/PROJECT_ID/global/networks/custom-vpc \
--no-assign-ip \
--availability-type=REGIONAL \
--backup-start-time=03:00 \
--backup-location=us \
--enable-bin-log \
--retained-backups-count=30 \
--retained-transaction-log-days=7 \
--maintenance-window-day=SUN \
--maintenance-window-hour=2 \
--maintenance-release-channel=production \
--enable-point-in-time-recovery \
--insights-config-query-insights-enabled \
--insights-config-query-string-length=1024 \
--insights-config-record-application-tags
# Create PostgreSQL instance
gcloud sql instances create production-postgres \
--database-version=POSTGRES_14 \
--tier=db-custom-4-16384 \
--region=us-central1 \
--availability-type=REGIONAL \
--enable-google-private-path \
--database-flags=shared_preload_libraries=pg_stat_statements \
--database-flags=log_statement=all \
--database-flags=log_min_duration_statement=1000
Advanced Python Management
# cloud_sql_manager.py
from google.cloud import sql_v1
from google.cloud import secretmanager
import sqlalchemy
import pymysql
import psycopg2
from contextlib import contextmanager
import time
class CloudSQLManager:
"""Manage Cloud SQL instances and connections."""
def __init__(self, project_id):
self.project_id = project_id
self.sql_client = sql_v1.SqlInstancesServiceClient()
self.backup_client = sql_v1.SqlBackupRunsServiceClient()
self.operations_client = sql_v1.SqlOperationsServiceClient()
self.secret_client = secretmanager.SecretManagerServiceClient()
def create_instance_with_replicas(self, instance_id, region, database_type='MYSQL'):
"""Create Cloud SQL instance with read replicas."""
instance = {
"name": instance_id,
"region": region,
"database_version": f"{database_type}_8_0",
"settings": {
"tier": "db-n1-highmem-4",
"activation_policy": "ALWAYS",
"availability_type": "REGIONAL",
"backup_configuration": {
"enabled": True,
"start_time": "03:00",
"location": region,
"point_in_time_recovery_enabled": True,
"transaction_log_retention_days": 7,
"retained_backups_count": 30
},
"ip_configuration": {
"ipv4_enabled": False,
"private_network": f"projects/{self.project_id}/global/networks/custom-vpc",
"enable_private_path_for_google_cloud_services": True
},
"database_flags": [
{"name": "slow_query_log", "value": "on"},
{"name": "long_query_time", "value": "2"},
{"name": "general_log", "value": "on"}
],
"insights_config": {
"query_insights_enabled": True,
"query_string_length": 1024,
"record_application_tags": True
}
}
}
# Create primary instance
operation = self.sql_client.insert(
project=self.project_id,
body=instance
)
# Wait for operation
self._wait_for_operation(operation.name)
# Create read replicas
replica_regions = ['us-east1', 'europe-west1']
for i, replica_region in enumerate(replica_regions):
replica = {
"name": f"{instance_id}-replica-{i+1}",
"region": replica_region,
"master_instance_name": f"projects/{self.project_id}/instances/{instance_id}",
"replica_configuration": {
"failover_target": False
}
}
replica_operation = self.sql_client.insert(
project=self.project_id,
body=replica
)
self._wait_for_operation(replica_operation.name)
return instance_id
@contextmanager
def get_connection(self, instance_name, database_name):
"""Get database connection with automatic resource management."""
# Get connection info
instance = self.sql_client.get(
project=self.project_id,
instance=instance_name
)
# Get credentials from Secret Manager
secret_name = f"projects/{self.project_id}/secrets/db-password/versions/latest"
response = self.secret_client.access_secret_version(name=secret_name)
password = response.payload.data.decode('UTF-8')
# Create connection
if 'MYSQL' in instance.database_version:
connection = pymysql.connect(
host=instance.ip_addresses[0].ip_address,
user='root',
password=password,
database=database_name,
cursorclass=pymysql.cursors.DictCursor
)
else: # PostgreSQL
connection = psycopg2.connect(
host=instance.ip_addresses[0].ip_address,
user='postgres',
password=password,
database=database_name
)
try:
yield connection
finally:
connection.close()
def setup_automatic_failover(self, instance_name):
"""Configure automatic failover for high availability."""
# Create failover replica
failover_replica = {
"name": f"{instance_name}-failover",
"master_instance_name": f"projects/{self.project_id}/instances/{instance_name}",
"replica_configuration": {
"failover_target": True
}
}
operation = self.sql_client.insert(
project=self.project_id,
body=failover_replica
)
self._wait_for_operation(operation.name)
print(f"Failover replica created for {instance_name}")
def perform_backup(self, instance_name, description="Manual backup"):
"""Perform on-demand backup."""
backup_run = {
"instance": instance_name,
"description": description,
"type": "ON_DEMAND"
}
operation = self.backup_client.insert(
project=self.project_id,
instance=instance_name,
body=backup_run
)
self._wait_for_operation(operation.name)
print(f"Backup completed for {instance_name}")
return operation
def clone_instance(self, source_instance, target_instance, point_in_time=None):
"""Clone Cloud SQL instance."""
clone_context = {
"destination_instance_name": target_instance,
"point_in_time": point_in_time # ISO format timestamp
}
operation = self.sql_client.clone(
project=self.project_id,
instance=source_instance,
body={"clone_context": clone_context}
)
self._wait_for_operation(operation.name)
print(f"Cloned {source_instance} to {target_instance}")
def _wait_for_operation(self, operation_name):
"""Wait for operation to complete."""
while True:
operation = self.operations_client.get(
project=self.project_id,
operation=operation_name.split('/')[-1]
)
if operation.status == "DONE":
if operation.error:
raise Exception(f"Operation failed: {operation.error}")
break
time.sleep(5)
Database Migration
# database_migrator.py
import subprocess
from google.cloud import storage
import os
class DatabaseMigrator:
"""Migrate databases to Cloud SQL."""
def __init__(self, project_id):
self.project_id = project_id
self.storage_client = storage.Client(project=project_id)
def migrate_mysql_database(self, source_host, source_db, target_instance):
"""Migrate MySQL database to Cloud SQL."""
# Export from source
dump_file = f"{source_db}_dump.sql"
export_command = [
"mysqldump",
"-h", source_host,
"-u", "root",
"-p",
"--single-transaction",
"--routines",
"--triggers",
"--events",
"--set-gtid-purged=OFF",
source_db,
"-r", dump_file
]
subprocess.run(export_command, check=True)
# Upload to Cloud Storage
bucket_name = f"{self.project_id}-sql-migration"
bucket = self.storage_client.bucket(bucket_name)
blob = bucket.blob(f"dumps/{dump_file}")
blob.upload_from_filename(dump_file)
# Import to Cloud SQL
import_command = [
"gcloud", "sql", "import", "sql",
target_instance,
f"gs://{bucket_name}/dumps/{dump_file}",
"--database", source_db,
"--project", self.project_id
]
subprocess.run(import_command, check=True)
# Cleanup
os.remove(dump_file)
blob.delete()
print(f"Migration completed: {source_db} -> {target_instance}")
def setup_continuous_replication(self, source_config, target_instance):
"""Setup continuous replication from external database."""
# This would use Cloud SQL external replica feature
replica_config = {
"mysql_replica_configuration": {
"master_instance_name": source_config['master_host'],
"username": source_config['username'],
"password": source_config['password'],
"ssl_cipher": "TLS_AES_256_GCM_SHA384",
"ca_certificate": source_config['ca_cert'],
"client_certificate": source_config['client_cert'],
"client_key": source_config['client_key']
}
}
# Configure external master
subprocess.run([
"gcloud", "sql", "instances", "patch", target_instance,
"--project", self.project_id,
"--external-master-host", source_config['master_host'],
"--external-master-port", str(source_config.get('port', 3306)),
"--external-master-user", source_config['username'],
"--external-master-password", source_config['password']
], check=True)
print(f"Continuous replication configured for {target_instance}")
Cloud Spanner: Globally Distributed SQL
Setting Up Cloud Spanner
# spanner_manager.py
from google.cloud import spanner
from google.cloud.spanner_admin_database_v1 import DatabaseAdminClient
from google.cloud.spanner_admin_instance_v1 import InstanceAdminClient
import time
class SpannerManager:
"""Manage Cloud Spanner instances and databases."""
def __init__(self, project_id):
self.project_id = project_id
self.spanner_client = spanner.Client(project=project_id)
self.instance_admin = InstanceAdminClient()
self.database_admin = DatabaseAdminClient()
def create_multi_region_instance(self, instance_id, display_name):
"""Create multi-region Spanner instance."""
config_name = f"projects/{self.project_id}/instanceConfigs/nam-eur-asia1"
instance = self.spanner_client.instance(
instance_id,
configuration_name=config_name,
display_name=display_name,
node_count=3
)
operation = instance.create()
operation.result() # Wait for completion
print(f"Created multi-region instance: {instance_id}")
return instance
def create_database_with_schema(self, instance_id, database_id):
"""Create database with schema."""
instance = self.spanner_client.instance(instance_id)
database = instance.database(database_id)
# Define schema
ddl_statements = [
"""CREATE TABLE Users (
UserId STRING(36) NOT NULL,
Email STRING(255) NOT NULL,
Name STRING(100),
CreatedAt TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true),
UpdatedAt TIMESTAMP OPTIONS (allow_commit_timestamp=true),
Version INT64 NOT NULL DEFAULT (0),
) PRIMARY KEY (UserId)""",
"""CREATE UNIQUE INDEX UsersByEmail ON Users(Email)""",
"""CREATE TABLE Orders (
OrderId STRING(36) NOT NULL,
UserId STRING(36) NOT NULL,
Status STRING(20) NOT NULL,
TotalAmount NUMERIC NOT NULL,
Currency STRING(3) NOT NULL,
CreatedAt TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true),
CONSTRAINT FK_UserId FOREIGN KEY (UserId) REFERENCES Users (UserId),
) PRIMARY KEY (OrderId),
INTERLEAVE IN PARENT Users ON DELETE CASCADE""",
"""CREATE TABLE OrderItems (
OrderId STRING(36) NOT NULL,
ItemId STRING(36) NOT NULL,
ProductId STRING(36) NOT NULL,
Quantity INT64 NOT NULL,
Price NUMERIC NOT NULL,
CreatedAt TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true),
CONSTRAINT FK_OrderId FOREIGN KEY (OrderId) REFERENCES Orders (OrderId),
) PRIMARY KEY (OrderId, ItemId),
INTERLEAVE IN PARENT Orders ON DELETE CASCADE"""
]
operation = database.create(ddl_statements)
operation.result() # Wait for completion
print(f"Created database with schema: {database_id}")
return database
def perform_distributed_transaction(self, instance_id, database_id):
"""Perform distributed transaction across tables."""
instance = self.spanner_client.instance(instance_id)
database = instance.database(database_id)
def insert_order_with_items(transaction):
# Insert user if not exists
user_id = "user-123"
transaction.execute_update(
"""INSERT OR UPDATE Users (UserId, Email, Name, CreatedAt)
VALUES (@user_id, @email, @name, PENDING_COMMIT_TIMESTAMP())""",
params={"user_id": user_id, "email": "user@example.com", "name": "John Doe"},
param_types={
"user_id": spanner.param_types.STRING,
"email": spanner.param_types.STRING,
"name": spanner.param_types.STRING
}
)
# Insert order
order_id = "order-456"
transaction.execute_update(
"""INSERT INTO Orders (OrderId, UserId, Status, TotalAmount, Currency, CreatedAt)
VALUES (@order_id, @user_id, @status, @total, @currency, PENDING_COMMIT_TIMESTAMP())""",
params={
"order_id": order_id,
"user_id": user_id,
"status": "PENDING",
"total": 99.99,
"currency": "USD"
},
param_types={
"order_id": spanner.param_types.STRING,
"user_id": spanner.param_types.STRING,
"status": spanner.param_types.STRING,
"total": spanner.param_types.NUMERIC,
"currency": spanner.param_types.STRING
}
)
# Insert order items
items = [
{"item_id": "item-1", "product_id": "prod-1", "quantity": 2, "price": 29.99},
{"item_id": "item-2", "product_id": "prod-2", "quantity": 1, "price": 40.01}
]
for item in items:
transaction.execute_update(
"""INSERT INTO OrderItems (OrderId, ItemId, ProductId, Quantity, Price, CreatedAt)
VALUES (@order_id, @item_id, @product_id, @quantity, @price, PENDING_COMMIT_TIMESTAMP())""",
params={
"order_id": order_id,
"item_id": item["item_id"],
"product_id": item["product_id"],
"quantity": item["quantity"],
"price": item["price"]
},
param_types={
"order_id": spanner.param_types.STRING,
"item_id": spanner.param_types.STRING,
"product_id": spanner.param_types.STRING,
"quantity": spanner.param_types.INT64,
"price": spanner.param_types.NUMERIC
}
)
database.run_in_transaction(insert_order_with_items)
print("Distributed transaction completed successfully")
def setup_change_streams(self, instance_id, database_id):
"""Setup change streams for real-time data capture."""
instance = self.spanner_client.instance(instance_id)
database = instance.database(database_id)
change_stream_ddl = [
"""CREATE CHANGE STREAM UserOrderStream
FOR Users, Orders, OrderItems
OPTIONS (
retention_period = '7d',
value_capture_type = 'NEW_VALUES'
)"""
]
operation = database.update_ddl(change_stream_ddl)
operation.result()
print("Change stream created successfully")
Firestore: NoSQL Document Database
Advanced Firestore Operations
# firestore_advanced.py
from google.cloud import firestore
from google.cloud.firestore_v1 import aggregation
from google.cloud.firestore_v1.watch import Watch
import asyncio
from datetime import datetime, timedelta
class FirestoreAdvanced:
"""Advanced Firestore operations and patterns."""
def __init__(self, project_id):
self.db = firestore.Client(project=project_id)
self.batch_size = 500
async def bulk_import_with_batching(self, collection_name, data):
"""Bulk import data with automatic batching."""
collection = self.db.collection(collection_name)
# Process in batches
for i in range(0, len(data), self.batch_size):
batch = self.db.batch()
batch_data = data[i:i + self.batch_size]
for item in batch_data:
doc_ref = collection.document(item.get('id') or None)
batch.set(doc_ref, item)
batch.commit()
print(f"Imported batch {i//self.batch_size + 1}")
# Rate limiting
await asyncio.sleep(0.1)
def setup_composite_indexes(self):
"""Define composite indexes for complex queries."""
# Note: Indexes are typically defined in firestore.indexes.json
indexes_config = {
"indexes": [
{
"collectionGroup": "orders",
"queryScope": "COLLECTION",
"fields": [
{"fieldPath": "userId", "order": "ASCENDING"},
{"fieldPath": "status", "order": "ASCENDING"},
{"fieldPath": "createdAt", "order": "DESCENDING"}
]
},
{
"collectionGroup": "products",
"queryScope": "COLLECTION",
"fields": [
{"fieldPath": "category", "arrayConfig": "CONTAINS"},
{"fieldPath": "price", "order": "ASCENDING"},
{"fieldPath": "rating", "order": "DESCENDING"}
]
}
],
"fieldOverrides": [
{
"collectionGroup": "users",
"fieldPath": "email",
"indexes": [
{"queryScope": "COLLECTION", "order": "ASCENDING"},
{"queryScope": "COLLECTION_GROUP", "order": "ASCENDING"}
]
}
]
}
return indexes_config
def implement_sharding_pattern(self, collection_name, document_id, data, shard_count=10):
"""Implement sharding for high-write collections."""
import random
# Choose random shard
shard_id = random.randint(0, shard_count - 1)
# Write to sharded subcollection
shard_ref = self.db.collection(collection_name).document(document_id)\
.collection('shards').document(str(shard_id))
shard_ref.set({
**data,
'shard_id': shard_id,
'timestamp': firestore.SERVER_TIMESTAMP
})
# Update aggregate in parent document
parent_ref = self.db.collection(collection_name).document(document_id)
parent_ref.set({
'shard_count': shard_count,
'last_updated': firestore.SERVER_TIMESTAMP
}, merge=True)
def distributed_counter(self, counter_path, increment_value=1):
"""Implement distributed counter pattern."""
counter_ref = self.db.document(counter_path)
shards_ref = counter_ref.collection('shards')
# Get or create counter document
counter_doc = counter_ref.get()
if not counter_doc.exists:
counter_ref.set({
'num_shards': 10,
'total_count': 0
})
# Initialize shards
batch = self.db.batch()
for i in range(10):
shard_ref = shards_ref.document(str(i))
batch.set(shard_ref, {'count': 0})
batch.commit()
# Increment random shard
import random
shard_id = random.randint(0, 9)
shard_ref = shards_ref.document(str(shard_id))
shard_ref.update({'count': firestore.Increment(increment_value)})
def get_distributed_count(self, counter_path):
"""Get total from distributed counter."""
counter_ref = self.db.document(counter_path)
shards_ref = counter_ref.collection('shards')
# Sum all shards
total = 0
for shard in shards_ref.stream():
total += shard.to_dict().get('count', 0)
# Update cached total
counter_ref.update({'total_count': total})
return total
def complex_query_with_pagination(self, collection_name, filters, order_by, page_size=20):
"""Perform complex query with cursor-based pagination."""
query = self.db.collection(collection_name)
# Apply filters
for field, operator, value in filters:
query = query.where(field, operator, value)
# Apply ordering
for field, direction in order_by:
query = query.order_by(field, direction=direction)
# Limit for pagination
query = query.limit(page_size)
# Generator for paginated results
def paginate():
last_doc = None
while True:
if last_doc:
paginated_query = query.start_after(last_doc)
else:
paginated_query = query
docs = list(paginated_query.stream())
if not docs:
break
yield docs
last_doc = docs[-1]
return paginate()
def setup_real_time_listener(self, collection_name, callback):
"""Setup real-time listener with error handling."""
def on_snapshot(col_snapshot, changes, read_time):
for change in changes:
if change.type.name == 'ADDED':
callback('added', change.document.id, change.document.to_dict())
elif change.type.name == 'MODIFIED':
callback('modified', change.document.id, change.document.to_dict())
elif change.type.name == 'REMOVED':
callback('removed', change.document.id, None)
# Create listener
col_ref = self.db.collection(collection_name)
col_watch = col_ref.on_snapshot(on_snapshot)
return col_watch # Return for later unsubscribe
def transaction_with_retry(self, transaction_func, max_attempts=3):
"""Execute transaction with automatic retry."""
@firestore.transactional
def update_in_transaction(transaction):
return transaction_func(transaction)
attempts = 0
while attempts < max_attempts:
try:
result = update_in_transaction(self.db.transaction())
return result
except Exception as e:
attempts += 1
if attempts >= max_attempts:
raise
time.sleep(2 ** attempts) # Exponential backoff
Bigtable: Wide-Column NoSQL
Bigtable Implementation
# bigtable_manager.py
from google.cloud import bigtable
from google.cloud.bigtable import column_family
from google.cloud.bigtable import row_filters
import struct
import time
class BigtableManager:
"""Manage Bigtable instances and operations."""
def __init__(self, project_id):
self.project_id = project_id
self.client = bigtable.Client(project=project_id, admin=True)
def create_time_series_table(self, instance_id, table_id):
"""Create table optimized for time-series data."""
instance = self.client.instance(instance_id)
table = instance.table(table_id)
# Create table if not exists
if not table.exists():
table.create()
# Add column families
cf_metrics = table.column_family('metrics')
cf_metrics.create()
cf_metadata = table.column_family('metadata')
cf_metadata.create()
# Set garbage collection rules
cf_metrics.update(
gc_rule=column_family.MaxVersionsGCRule(1) &
column_family.MaxAgeGCRule(timedelta(days=30))
)
print(f"Created time-series table: {table_id}")
def write_time_series_data(self, instance_id, table_id, device_id, metrics):
"""Write time-series data with proper row key design."""
instance = self.client.instance(instance_id)
table = instance.table(table_id)
# Create row key: device_id#reverse_timestamp
timestamp = int(time.time() * 1000)
reverse_timestamp = 9999999999999 - timestamp
row_key = f"{device_id}#{reverse_timestamp}".encode()
row = table.direct_row(row_key)
# Write metrics
for metric_name, value in metrics.items():
# Store as binary for efficiency
if isinstance(value, (int, float)):
value_bytes = struct.pack('d', float(value))
else:
value_bytes = str(value).encode()
row.set_cell(
'metrics',
metric_name.encode(),
value_bytes,
timestamp=timestamp * 1000 # microseconds
)
# Write metadata
row.set_cell(
'metadata',
b'device_id',
device_id.encode(),
timestamp=timestamp * 1000
)
row.commit()
def query_time_range(self, instance_id, table_id, device_id, start_time, end_time):
"""Query time-series data for a specific time range."""
instance = self.client.instance(instance_id)
table = instance.table(table_id)
# Convert timestamps to reverse format
start_reverse = 9999999999999 - int(end_time * 1000)
end_reverse = 9999999999999 - int(start_time * 1000)
# Create row key range
row_start = f"{device_id}#{start_reverse}".encode()
row_end = f"{device_id}#{end_reverse}".encode()
# Create row set
row_set = bigtable.row_set.RowSet()
row_set.add_row_range_from_keys(
start_key=row_start,
end_key=row_end
)
# Read rows
rows = table.read_rows(
row_set=row_set,
filter_=row_filters.ColumnRangeFilter(
'metrics',
start_column=b'',
end_column=b'~'
)
)
# Process results
results = []
for row in rows:
timestamp = 9999999999999 - int(row.row_key.decode().split('#')[1])
metrics = {}
for cell in row.cells['metrics'].values():
for c in cell:
metric_name = c.column.decode()
try:
value = struct.unpack('d', c.value)[0]
except:
value = c.value.decode()
metrics[metric_name] = value
results.append({
'timestamp': timestamp / 1000,
'metrics': metrics
})
return results
def implement_hot_spotting_prevention(self, instance_id, table_id):
"""Implement salting to prevent hot spotting."""
instance = self.client.instance(instance_id)
table = instance.table(table_id)
def salted_row_key(original_key, salt_buckets=10):
"""Add salt prefix to distribute writes."""
import hashlib
# Calculate salt based on key hash
key_hash = hashlib.md5(original_key.encode()).hexdigest()
salt = int(key_hash[:2], 16) % salt_buckets
return f"{salt:02d}#{original_key}".encode()
return salted_row_key
Memorystore: In-Memory Caching
Redis Implementation
# memorystore_manager.py
import redis
from google.cloud import redis_v1
import json
import pickle
from datetime import timedelta
class MemorystoreManager:
"""Manage Memorystore Redis instances."""
def __init__(self, project_id):
self.project_id = project_id
self.redis_admin = redis_v1.CloudRedisClient()
self.redis_clients = {}
def create_redis_instance(self, instance_id, region, tier='STANDARD_HA'):
"""Create Memorystore Redis instance."""
parent = f"projects/{self.project_id}/locations/{region}"
instance = {
"name": f"{parent}/instances/{instance_id}",
"tier": tier,
"memory_size_gb": 5,
"redis_version": "REDIS_6_X",
"display_name": instance_id,
"auth_enabled": True,
"transit_encryption_mode": "SERVER_AUTHENTICATION",
"persistence_config": {
"persistence_mode": "RDB",
"rdb_snapshot_period": "ONE_HOUR"
}
}
operation = self.redis_admin.create_instance(
parent=parent,
instance_id=instance_id,
instance=instance
)
result = operation.result() # Wait for completion
print(f"Created Redis instance: {instance_id}")
return result
def get_redis_client(self, instance_name):
"""Get Redis client for instance."""
if instance_name not in self.redis_clients:
# Get instance details
instance = self.redis_admin.get_instance(name=instance_name)
# Create Redis client
self.redis_clients[instance_name] = redis.Redis(
host=instance.host,
port=instance.port,
password=instance.auth_string if instance.auth_enabled else None,
ssl=True,
ssl_cert_reqs='required',
decode_responses=True
)
return self.redis_clients[instance_name]
def implement_cache_patterns(self, redis_client):
"""Implement common caching patterns."""
# Cache-aside pattern
def cache_aside_get(key, fetch_func, ttl=300):
# Try to get from cache
cached = redis_client.get(key)
if cached:
return json.loads(cached)
# Fetch from source
data = fetch_func()
# Store in cache
redis_client.setex(key, ttl, json.dumps(data))
return data
# Write-through pattern
def write_through_set(key, value, store_func, ttl=300):
# Store in primary storage
store_func(value)
# Update cache
redis_client.setex(key, ttl, json.dumps(value))
# Cache warming
def warm_cache(keys_data, ttl=300):
pipeline = redis_client.pipeline()
for key, data in keys_data.items():
pipeline.setex(key, ttl, json.dumps(data))
pipeline.execute()
return {
'cache_aside_get': cache_aside_get,
'write_through_set': write_through_set,
'warm_cache': warm_cache
}
def setup_session_store(self, redis_client):
"""Setup Redis as session store."""
class SessionStore:
def __init__(self, redis_client, prefix='session:', ttl=3600):
self.redis = redis_client
self.prefix = prefix
self.ttl = ttl
def save(self, session_id, data):
key = f"{self.prefix}{session_id}"
self.redis.setex(key, self.ttl, pickle.dumps(data))
def load(self, session_id):
key = f"{self.prefix}{session_id}"
data = self.redis.get(key)
return pickle.loads(data) if data else None
def delete(self, session_id):
key = f"{self.prefix}{session_id}"
self.redis.delete(key)
def extend(self, session_id):
key = f"{self.prefix}{session_id}"
self.redis.expire(key, self.ttl)
return SessionStore(redis_client)
def implement_rate_limiting(self, redis_client):
"""Implement rate limiting with Redis."""
def rate_limit(identifier, max_requests=100, window_seconds=60):
key = f"rate_limit:{identifier}"
try:
current = redis_client.incr(key)
if current == 1:
redis_client.expire(key, window_seconds)
if current > max_requests:
return False, current
return True, current
except redis.RedisError:
# Allow on Redis error (fail open)
return True, 0
return rate_limit
Performance Optimization
Connection Pooling and Query Optimization
# database_optimizer.py
from sqlalchemy import create_engine, pool
from sqlalchemy.orm import sessionmaker
import threading
class DatabaseOptimizer:
"""Optimize database performance across services."""
def __init__(self, project_id):
self.project_id = project_id
self.connection_pools = {}
self.pool_lock = threading.Lock()
def get_cloud_sql_pool(self, instance_connection_name, database):
"""Get connection pool for Cloud SQL."""
pool_key = f"{instance_connection_name}:{database}"
with self.pool_lock:
if pool_key not in self.connection_pools:
# Create SQLAlchemy engine with connection pooling
engine = create_engine(
f"mysql+pymysql://root:password@/{database}",
connect_args={
"unix_socket": f"/cloudsql/{instance_connection_name}"
},
poolclass=pool.QueuePool,
pool_size=5,
max_overflow=10,
pool_timeout=30,
pool_recycle=1800, # Recycle connections after 30 minutes
pool_pre_ping=True # Verify connections before use
)
self.connection_pools[pool_key] = engine
return self.connection_pools[pool_key]
def optimize_queries(self):
"""Query optimization strategies."""
# Example: Batch operations
def batch_insert(engine, table_name, records, batch_size=1000):
with engine.begin() as conn:
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
conn.execute(f"INSERT INTO {table_name} VALUES", batch)
# Example: Query result caching
class QueryCache:
def __init__(self, redis_client):
self.redis = redis_client
self.ttl = 300 # 5 minutes
def cached_query(self, query_key, query_func):
# Check cache
cached = self.redis.get(f"query:{query_key}")
if cached:
return json.loads(cached)
# Execute query
result = query_func()
# Cache result
self.redis.setex(
f"query:{query_key}",
self.ttl,
json.dumps(result, default=str)
)
return result
return {
'batch_insert': batch_insert,
'QueryCache': QueryCache
}
def implement_read_write_splitting(self, primary_engine, replica_engines):
"""Implement read/write splitting."""
class ReadWriteSplitter:
def __init__(self, primary, replicas):
self.primary = primary
self.replicas = replicas
self.current_replica = 0
def execute_read(self, query):
# Round-robin across replicas
replica = self.replicas[self.current_replica]
self.current_replica = (self.current_replica + 1) % len(self.replicas)
with replica.connect() as conn:
return conn.execute(query).fetchall()
def execute_write(self, query):
# All writes go to primary
with self.primary.connect() as conn:
return conn.execute(query)
return ReadWriteSplitter(primary_engine, replica_engines)
Security Best Practices
Database Security Implementation
# database_security.py
from google.cloud import secretmanager
from google.cloud import kms
import secrets
import hashlib
class DatabaseSecurity:
"""Implement database security best practices."""
def __init__(self, project_id):
self.project_id = project_id
self.secret_client = secretmanager.SecretManagerServiceClient()
self.kms_client = kms.KeyManagementServiceClient()
def setup_cloud_sql_security(self, instance_name):
"""Setup comprehensive Cloud SQL security."""
security_config = {
# Require SSL
"require_ssl": True,
# Database flags for security
"database_flags": {
# MySQL flags
"local_infile": "off",
"skip_show_database": "on",
"sql_mode": "TRADITIONAL",
# Audit logging
"cloudsql_mysql_audit_log": "on",
"cloudsql_mysql_audit_log_max_file_size": "100",
"cloudsql_mysql_audit_log_rotate": "on",
# Performance schema for monitoring
"performance_schema": "on"
},
# Backup encryption
"backup_configuration": {
"enabled": True,
"point_in_time_recovery_enabled": True,
"transaction_log_retention_days": 7,
"location": "us",
"backup_encryption_key_name": f"projects/{self.project_id}/locations/us/keyRings/database-keys/cryptoKeys/backup-key"
},
# Authorized networks (use Private IP instead)
"authorized_networks": [],
# Maintenance window
"maintenance_window": {
"hour": 3,
"day": 0, # Sunday
"update_track": "stable"
}
}
return security_config
def rotate_database_credentials(self, instance_name, database_name):
"""Rotate database credentials securely."""
# Generate new password
new_password = secrets.token_urlsafe(32)
# Store in Secret Manager
secret_id = f"{instance_name}-{database_name}-password"
parent = f"projects/{self.project_id}"
# Create or update secret
try:
secret = self.secret_client.create_secret(
parent=parent,
secret_id=secret_id,
secret={
"replication": {"automatic": {}}
}
)
except:
# Secret already exists
secret_name = f"{parent}/secrets/{secret_id}"
secret = self.secret_client.get_secret(name=secret_name)
# Add new version
self.secret_client.add_secret_version(
parent=secret.name,
payload={"data": new_password.encode()}
)
# Update database password
# This would be done through Cloud SQL Admin API
return secret_id
def implement_data_encryption(self):
"""Implement application-level data encryption."""
class FieldEncryption:
def __init__(self, kms_key_name):
self.kms_key = kms_key_name
def encrypt_field(self, plaintext):
# Generate DEK (Data Encryption Key)
dek = secrets.token_bytes(32)
# Encrypt DEK with KEK (Key Encryption Key) from KMS
# This would use KMS API to encrypt the DEK
# Encrypt data with DEK
from cryptography.fernet import Fernet
import base64
key = base64.urlsafe_b64encode(dek)
f = Fernet(key)
ciphertext = f.encrypt(plaintext.encode())
return {
'ciphertext': ciphertext,
'encrypted_dek': 'encrypted_dek_here'
}
def decrypt_field(self, encrypted_data):
# Decrypt DEK using KMS
# Decrypt data using DEK
pass
return FieldEncryption
Monitoring and Maintenance
Database Monitoring Setup
# database_monitoring.py
from google.cloud import monitoring_v3
import time
class DatabaseMonitoring:
"""Monitor database performance and health."""
def __init__(self, project_id):
self.project_id = project_id
self.monitoring_client = monitoring_v3.MetricServiceClient()
self.alert_client = monitoring_v3.AlertPolicyServiceClient()
def setup_cloud_sql_monitoring(self, instance_name):
"""Setup comprehensive Cloud SQL monitoring."""
project_name = f"projects/{self.project_id}"
# Create alert policies
alert_policies = [
{
"display_name": "Cloud SQL CPU High",
"conditions": [{
"display_name": "CPU utilization above 80%",
"condition_threshold": {
"filter": f'resource.type="cloudsql_database" '
f'AND resource.label.database_id="{self.project_id}:{instance_name}" '
f'AND metric.type="cloudsql.googleapis.com/database/cpu/utilization"',
"comparison": monitoring_v3.ComparisonType.COMPARISON_GT,
"threshold_value": 0.8,
"duration": {"seconds": 300},
"aggregations": [{
"alignment_period": {"seconds": 60},
"per_series_aligner": monitoring_v3.Aggregation.Aligner.ALIGN_MEAN
}]
}
}]
},
{
"display_name": "Cloud SQL Storage Near Limit",
"conditions": [{
"display_name": "Storage usage above 90%",
"condition_threshold": {
"filter": f'resource.type="cloudsql_database" '
f'AND resource.label.database_id="{self.project_id}:{instance_name}" '
f'AND metric.type="cloudsql.googleapis.com/database/disk/utilization"',
"comparison": monitoring_v3.ComparisonType.COMPARISON_GT,
"threshold_value": 0.9,
"duration": {"seconds": 60}
}
}]
},
{
"display_name": "Cloud SQL Replication Lag",
"conditions": [{
"display_name": "Replication lag above 30 seconds",
"condition_threshold": {
"filter": f'resource.type="cloudsql_database" '
f'AND metric.type="cloudsql.googleapis.com/database/replication/replica_lag"',
"comparison": monitoring_v3.ComparisonType.COMPARISON_GT,
"threshold_value": 30,
"duration": {"seconds": 300}
}
}]
}
]
for policy_config in alert_policies:
policy = monitoring_v3.AlertPolicy(policy_config)
self.alert_client.create_alert_policy(
name=project_name,
alert_policy=policy
)
print(f"Monitoring alerts created for {instance_name}")
def create_custom_dashboard(self):
"""Create custom database monitoring dashboard."""
dashboard_config = {
"displayName": "Database Performance Dashboard",
"gridLayout": {
"widgets": [
{
"title": "Cloud SQL CPU Usage",
"xyChart": {
"dataSets": [{
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": 'resource.type="cloudsql_database" '
'AND metric.type="cloudsql.googleapis.com/database/cpu/utilization"'
}
}
}]
}
},
{
"title": "Query Performance",
"xyChart": {
"dataSets": [{
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": 'resource.type="cloudsql_database" '
'AND metric.type="cloudsql.googleapis.com/database/mysql/queries"'
}
}
}]
}
}
]
}
}
return dashboard_config
Best Practices Summary
Database Selection Matrix
def select_database_service(requirements):
"""Select appropriate database service based on requirements."""
if requirements.get('global_consistency') and requirements.get('sql'):
return 'Cloud Spanner'
elif requirements.get('traditional_sql'):
return 'Cloud SQL'
elif requirements.get('document_store') and requirements.get('real_time'):
return 'Firestore'
elif requirements.get('time_series') or requirements.get('iot'):
return 'Bigtable'
elif requirements.get('caching'):
return 'Memorystore'
else:
return 'Evaluate requirements further'
Conclusion
Google Cloud's database portfolio provides solutions for every use case. Key considerations:
- Cloud SQL: Perfect for lift-and-shift migrations and traditional applications
- Cloud Spanner: When you need global scale with SQL consistency
- Firestore: Ideal for mobile and web applications with real-time requirements
- Bigtable: Best for high-throughput analytical and time-series workloads
- Memorystore: Essential for caching and session management
Next Steps
- Implement proof-of-concepts for your use cases
- Study database migration strategies
- Learn about cross-service integration patterns
- Explore advanced features like Change Data Capture
- Get certified as a Google Cloud Database Engineer
Remember: Choose the right database for your workload, not the most popular one. Google Cloud's database services are designed to work together for comprehensive solutions.