Google Cloud SQL and Database Services: Complete Guide

Tyler Maginnis | February 04, 2024

Google CloudCloud SQLdatabasesMySQLPostgreSQLSQL Server

Need Professional Google Cloud Services?

Get expert assistance with your google cloud services implementation and management. Tyler on Tech Louisville provides priority support for Louisville businesses.

Same-day service available for Louisville area

Google Cloud SQL and Database Services: Complete Guide

Google Cloud offers a comprehensive suite of database services for every use case, from traditional relational databases to globally distributed NoSQL solutions. This guide covers Cloud SQL, Cloud Spanner, Firestore, and Bigtable with best practices and advanced configurations.

Choosing the Right Database Service

Database Service Comparison

Service Type Use Case Scale Consistency
Cloud SQL Relational (MySQL, PostgreSQL, SQL Server) Traditional applications, WordPress, e-commerce Regional Strong
Cloud Spanner Relational (Distributed) Global applications, financial systems Global Strong
Firestore Document NoSQL Mobile/web apps, real-time sync Global Strong
Bigtable Wide-column NoSQL IoT, time-series, analytics Global Eventual
Memorystore In-memory (Redis, Memcached) Caching, session storage Regional Strong

Cloud SQL Deep Dive

Creating High-Availability Cloud SQL Instances

# Create MySQL instance with HA
gcloud sql instances create production-mysql \
    --database-version=MYSQL_8_0 \
    --tier=db-n1-highmem-4 \
    --region=us-central1 \
    --network=projects/PROJECT_ID/global/networks/custom-vpc \
    --no-assign-ip \
    --availability-type=REGIONAL \
    --backup-start-time=03:00 \
    --backup-location=us \
    --enable-bin-log \
    --retained-backups-count=30 \
    --retained-transaction-log-days=7 \
    --maintenance-window-day=SUN \
    --maintenance-window-hour=2 \
    --maintenance-release-channel=production \
    --enable-point-in-time-recovery \
    --insights-config-query-insights-enabled \
    --insights-config-query-string-length=1024 \
    --insights-config-record-application-tags

# Create PostgreSQL instance
gcloud sql instances create production-postgres \
    --database-version=POSTGRES_14 \
    --tier=db-custom-4-16384 \
    --region=us-central1 \
    --availability-type=REGIONAL \
    --enable-google-private-path \
    --database-flags=shared_preload_libraries=pg_stat_statements \
    --database-flags=log_statement=all \
    --database-flags=log_min_duration_statement=1000

Advanced Python Management

# cloud_sql_manager.py
from google.cloud import sql_v1
from google.cloud import secretmanager
import sqlalchemy
import pymysql
import psycopg2
from contextlib import contextmanager
import time

class CloudSQLManager:
    """Manage Cloud SQL instances and connections."""

    def __init__(self, project_id):
        self.project_id = project_id
        self.sql_client = sql_v1.SqlInstancesServiceClient()
        self.backup_client = sql_v1.SqlBackupRunsServiceClient()
        self.operations_client = sql_v1.SqlOperationsServiceClient()
        self.secret_client = secretmanager.SecretManagerServiceClient()

    def create_instance_with_replicas(self, instance_id, region, database_type='MYSQL'):
        """Create Cloud SQL instance with read replicas."""
        instance = {
            "name": instance_id,
            "region": region,
            "database_version": f"{database_type}_8_0",
            "settings": {
                "tier": "db-n1-highmem-4",
                "activation_policy": "ALWAYS",
                "availability_type": "REGIONAL",
                "backup_configuration": {
                    "enabled": True,
                    "start_time": "03:00",
                    "location": region,
                    "point_in_time_recovery_enabled": True,
                    "transaction_log_retention_days": 7,
                    "retained_backups_count": 30
                },
                "ip_configuration": {
                    "ipv4_enabled": False,
                    "private_network": f"projects/{self.project_id}/global/networks/custom-vpc",
                    "enable_private_path_for_google_cloud_services": True
                },
                "database_flags": [
                    {"name": "slow_query_log", "value": "on"},
                    {"name": "long_query_time", "value": "2"},
                    {"name": "general_log", "value": "on"}
                ],
                "insights_config": {
                    "query_insights_enabled": True,
                    "query_string_length": 1024,
                    "record_application_tags": True
                }
            }
        }

        # Create primary instance
        operation = self.sql_client.insert(
            project=self.project_id,
            body=instance
        )

        # Wait for operation
        self._wait_for_operation(operation.name)

        # Create read replicas
        replica_regions = ['us-east1', 'europe-west1']
        for i, replica_region in enumerate(replica_regions):
            replica = {
                "name": f"{instance_id}-replica-{i+1}",
                "region": replica_region,
                "master_instance_name": f"projects/{self.project_id}/instances/{instance_id}",
                "replica_configuration": {
                    "failover_target": False
                }
            }

            replica_operation = self.sql_client.insert(
                project=self.project_id,
                body=replica
            )
            self._wait_for_operation(replica_operation.name)

        return instance_id

    @contextmanager
    def get_connection(self, instance_name, database_name):
        """Get database connection with automatic resource management."""
        # Get connection info
        instance = self.sql_client.get(
            project=self.project_id,
            instance=instance_name
        )

        # Get credentials from Secret Manager
        secret_name = f"projects/{self.project_id}/secrets/db-password/versions/latest"
        response = self.secret_client.access_secret_version(name=secret_name)
        password = response.payload.data.decode('UTF-8')

        # Create connection
        if 'MYSQL' in instance.database_version:
            connection = pymysql.connect(
                host=instance.ip_addresses[0].ip_address,
                user='root',
                password=password,
                database=database_name,
                cursorclass=pymysql.cursors.DictCursor
            )
        else:  # PostgreSQL
            connection = psycopg2.connect(
                host=instance.ip_addresses[0].ip_address,
                user='postgres',
                password=password,
                database=database_name
            )

        try:
            yield connection
        finally:
            connection.close()

    def setup_automatic_failover(self, instance_name):
        """Configure automatic failover for high availability."""
        # Create failover replica
        failover_replica = {
            "name": f"{instance_name}-failover",
            "master_instance_name": f"projects/{self.project_id}/instances/{instance_name}",
            "replica_configuration": {
                "failover_target": True
            }
        }

        operation = self.sql_client.insert(
            project=self.project_id,
            body=failover_replica
        )

        self._wait_for_operation(operation.name)
        print(f"Failover replica created for {instance_name}")

    def perform_backup(self, instance_name, description="Manual backup"):
        """Perform on-demand backup."""
        backup_run = {
            "instance": instance_name,
            "description": description,
            "type": "ON_DEMAND"
        }

        operation = self.backup_client.insert(
            project=self.project_id,
            instance=instance_name,
            body=backup_run
        )

        self._wait_for_operation(operation.name)
        print(f"Backup completed for {instance_name}")

        return operation

    def clone_instance(self, source_instance, target_instance, point_in_time=None):
        """Clone Cloud SQL instance."""
        clone_context = {
            "destination_instance_name": target_instance,
            "point_in_time": point_in_time  # ISO format timestamp
        }

        operation = self.sql_client.clone(
            project=self.project_id,
            instance=source_instance,
            body={"clone_context": clone_context}
        )

        self._wait_for_operation(operation.name)
        print(f"Cloned {source_instance} to {target_instance}")

    def _wait_for_operation(self, operation_name):
        """Wait for operation to complete."""
        while True:
            operation = self.operations_client.get(
                project=self.project_id,
                operation=operation_name.split('/')[-1]
            )

            if operation.status == "DONE":
                if operation.error:
                    raise Exception(f"Operation failed: {operation.error}")
                break

            time.sleep(5)

Database Migration

# database_migrator.py
import subprocess
from google.cloud import storage
import os

class DatabaseMigrator:
    """Migrate databases to Cloud SQL."""

    def __init__(self, project_id):
        self.project_id = project_id
        self.storage_client = storage.Client(project=project_id)

    def migrate_mysql_database(self, source_host, source_db, target_instance):
        """Migrate MySQL database to Cloud SQL."""
        # Export from source
        dump_file = f"{source_db}_dump.sql"

        export_command = [
            "mysqldump",
            "-h", source_host,
            "-u", "root",
            "-p",
            "--single-transaction",
            "--routines",
            "--triggers",
            "--events",
            "--set-gtid-purged=OFF",
            source_db,
            "-r", dump_file
        ]

        subprocess.run(export_command, check=True)

        # Upload to Cloud Storage
        bucket_name = f"{self.project_id}-sql-migration"
        bucket = self.storage_client.bucket(bucket_name)
        blob = bucket.blob(f"dumps/{dump_file}")
        blob.upload_from_filename(dump_file)

        # Import to Cloud SQL
        import_command = [
            "gcloud", "sql", "import", "sql",
            target_instance,
            f"gs://{bucket_name}/dumps/{dump_file}",
            "--database", source_db,
            "--project", self.project_id
        ]

        subprocess.run(import_command, check=True)

        # Cleanup
        os.remove(dump_file)
        blob.delete()

        print(f"Migration completed: {source_db} -> {target_instance}")

    def setup_continuous_replication(self, source_config, target_instance):
        """Setup continuous replication from external database."""
        # This would use Cloud SQL external replica feature
        replica_config = {
            "mysql_replica_configuration": {
                "master_instance_name": source_config['master_host'],
                "username": source_config['username'],
                "password": source_config['password'],
                "ssl_cipher": "TLS_AES_256_GCM_SHA384",
                "ca_certificate": source_config['ca_cert'],
                "client_certificate": source_config['client_cert'],
                "client_key": source_config['client_key']
            }
        }

        # Configure external master
        subprocess.run([
            "gcloud", "sql", "instances", "patch", target_instance,
            "--project", self.project_id,
            "--external-master-host", source_config['master_host'],
            "--external-master-port", str(source_config.get('port', 3306)),
            "--external-master-user", source_config['username'],
            "--external-master-password", source_config['password']
        ], check=True)

        print(f"Continuous replication configured for {target_instance}")

Cloud Spanner: Globally Distributed SQL

Setting Up Cloud Spanner

# spanner_manager.py
from google.cloud import spanner
from google.cloud.spanner_admin_database_v1 import DatabaseAdminClient
from google.cloud.spanner_admin_instance_v1 import InstanceAdminClient
import time

class SpannerManager:
    """Manage Cloud Spanner instances and databases."""

    def __init__(self, project_id):
        self.project_id = project_id
        self.spanner_client = spanner.Client(project=project_id)
        self.instance_admin = InstanceAdminClient()
        self.database_admin = DatabaseAdminClient()

    def create_multi_region_instance(self, instance_id, display_name):
        """Create multi-region Spanner instance."""
        config_name = f"projects/{self.project_id}/instanceConfigs/nam-eur-asia1"

        instance = self.spanner_client.instance(
            instance_id,
            configuration_name=config_name,
            display_name=display_name,
            node_count=3
        )

        operation = instance.create()
        operation.result()  # Wait for completion

        print(f"Created multi-region instance: {instance_id}")
        return instance

    def create_database_with_schema(self, instance_id, database_id):
        """Create database with schema."""
        instance = self.spanner_client.instance(instance_id)
        database = instance.database(database_id)

        # Define schema
        ddl_statements = [
            """CREATE TABLE Users (
                UserId STRING(36) NOT NULL,
                Email STRING(255) NOT NULL,
                Name STRING(100),
                CreatedAt TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true),
                UpdatedAt TIMESTAMP OPTIONS (allow_commit_timestamp=true),
                Version INT64 NOT NULL DEFAULT (0),
            ) PRIMARY KEY (UserId)""",

            """CREATE UNIQUE INDEX UsersByEmail ON Users(Email)""",

            """CREATE TABLE Orders (
                OrderId STRING(36) NOT NULL,
                UserId STRING(36) NOT NULL,
                Status STRING(20) NOT NULL,
                TotalAmount NUMERIC NOT NULL,
                Currency STRING(3) NOT NULL,
                CreatedAt TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true),
                CONSTRAINT FK_UserId FOREIGN KEY (UserId) REFERENCES Users (UserId),
            ) PRIMARY KEY (OrderId),
            INTERLEAVE IN PARENT Users ON DELETE CASCADE""",

            """CREATE TABLE OrderItems (
                OrderId STRING(36) NOT NULL,
                ItemId STRING(36) NOT NULL,
                ProductId STRING(36) NOT NULL,
                Quantity INT64 NOT NULL,
                Price NUMERIC NOT NULL,
                CreatedAt TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true),
                CONSTRAINT FK_OrderId FOREIGN KEY (OrderId) REFERENCES Orders (OrderId),
            ) PRIMARY KEY (OrderId, ItemId),
            INTERLEAVE IN PARENT Orders ON DELETE CASCADE"""
        ]

        operation = database.create(ddl_statements)
        operation.result()  # Wait for completion

        print(f"Created database with schema: {database_id}")
        return database

    def perform_distributed_transaction(self, instance_id, database_id):
        """Perform distributed transaction across tables."""
        instance = self.spanner_client.instance(instance_id)
        database = instance.database(database_id)

        def insert_order_with_items(transaction):
            # Insert user if not exists
            user_id = "user-123"
            transaction.execute_update(
                """INSERT OR UPDATE Users (UserId, Email, Name, CreatedAt)
                   VALUES (@user_id, @email, @name, PENDING_COMMIT_TIMESTAMP())""",
                params={"user_id": user_id, "email": "user@example.com", "name": "John Doe"},
                param_types={
                    "user_id": spanner.param_types.STRING,
                    "email": spanner.param_types.STRING,
                    "name": spanner.param_types.STRING
                }
            )

            # Insert order
            order_id = "order-456"
            transaction.execute_update(
                """INSERT INTO Orders (OrderId, UserId, Status, TotalAmount, Currency, CreatedAt)
                   VALUES (@order_id, @user_id, @status, @total, @currency, PENDING_COMMIT_TIMESTAMP())""",
                params={
                    "order_id": order_id,
                    "user_id": user_id,
                    "status": "PENDING",
                    "total": 99.99,
                    "currency": "USD"
                },
                param_types={
                    "order_id": spanner.param_types.STRING,
                    "user_id": spanner.param_types.STRING,
                    "status": spanner.param_types.STRING,
                    "total": spanner.param_types.NUMERIC,
                    "currency": spanner.param_types.STRING
                }
            )

            # Insert order items
            items = [
                {"item_id": "item-1", "product_id": "prod-1", "quantity": 2, "price": 29.99},
                {"item_id": "item-2", "product_id": "prod-2", "quantity": 1, "price": 40.01}
            ]

            for item in items:
                transaction.execute_update(
                    """INSERT INTO OrderItems (OrderId, ItemId, ProductId, Quantity, Price, CreatedAt)
                       VALUES (@order_id, @item_id, @product_id, @quantity, @price, PENDING_COMMIT_TIMESTAMP())""",
                    params={
                        "order_id": order_id,
                        "item_id": item["item_id"],
                        "product_id": item["product_id"],
                        "quantity": item["quantity"],
                        "price": item["price"]
                    },
                    param_types={
                        "order_id": spanner.param_types.STRING,
                        "item_id": spanner.param_types.STRING,
                        "product_id": spanner.param_types.STRING,
                        "quantity": spanner.param_types.INT64,
                        "price": spanner.param_types.NUMERIC
                    }
                )

        database.run_in_transaction(insert_order_with_items)
        print("Distributed transaction completed successfully")

    def setup_change_streams(self, instance_id, database_id):
        """Setup change streams for real-time data capture."""
        instance = self.spanner_client.instance(instance_id)
        database = instance.database(database_id)

        change_stream_ddl = [
            """CREATE CHANGE STREAM UserOrderStream
               FOR Users, Orders, OrderItems
               OPTIONS (
                   retention_period = '7d',
                   value_capture_type = 'NEW_VALUES'
               )"""
        ]

        operation = database.update_ddl(change_stream_ddl)
        operation.result()

        print("Change stream created successfully")

Firestore: NoSQL Document Database

Advanced Firestore Operations

# firestore_advanced.py
from google.cloud import firestore
from google.cloud.firestore_v1 import aggregation
from google.cloud.firestore_v1.watch import Watch
import asyncio
from datetime import datetime, timedelta

class FirestoreAdvanced:
    """Advanced Firestore operations and patterns."""

    def __init__(self, project_id):
        self.db = firestore.Client(project=project_id)
        self.batch_size = 500

    async def bulk_import_with_batching(self, collection_name, data):
        """Bulk import data with automatic batching."""
        collection = self.db.collection(collection_name)

        # Process in batches
        for i in range(0, len(data), self.batch_size):
            batch = self.db.batch()
            batch_data = data[i:i + self.batch_size]

            for item in batch_data:
                doc_ref = collection.document(item.get('id') or None)
                batch.set(doc_ref, item)

            batch.commit()
            print(f"Imported batch {i//self.batch_size + 1}")

            # Rate limiting
            await asyncio.sleep(0.1)

    def setup_composite_indexes(self):
        """Define composite indexes for complex queries."""
        # Note: Indexes are typically defined in firestore.indexes.json
        indexes_config = {
            "indexes": [
                {
                    "collectionGroup": "orders",
                    "queryScope": "COLLECTION",
                    "fields": [
                        {"fieldPath": "userId", "order": "ASCENDING"},
                        {"fieldPath": "status", "order": "ASCENDING"},
                        {"fieldPath": "createdAt", "order": "DESCENDING"}
                    ]
                },
                {
                    "collectionGroup": "products",
                    "queryScope": "COLLECTION",
                    "fields": [
                        {"fieldPath": "category", "arrayConfig": "CONTAINS"},
                        {"fieldPath": "price", "order": "ASCENDING"},
                        {"fieldPath": "rating", "order": "DESCENDING"}
                    ]
                }
            ],
            "fieldOverrides": [
                {
                    "collectionGroup": "users",
                    "fieldPath": "email",
                    "indexes": [
                        {"queryScope": "COLLECTION", "order": "ASCENDING"},
                        {"queryScope": "COLLECTION_GROUP", "order": "ASCENDING"}
                    ]
                }
            ]
        }

        return indexes_config

    def implement_sharding_pattern(self, collection_name, document_id, data, shard_count=10):
        """Implement sharding for high-write collections."""
        import random

        # Choose random shard
        shard_id = random.randint(0, shard_count - 1)

        # Write to sharded subcollection
        shard_ref = self.db.collection(collection_name).document(document_id)\
                          .collection('shards').document(str(shard_id))

        shard_ref.set({
            **data,
            'shard_id': shard_id,
            'timestamp': firestore.SERVER_TIMESTAMP
        })

        # Update aggregate in parent document
        parent_ref = self.db.collection(collection_name).document(document_id)
        parent_ref.set({
            'shard_count': shard_count,
            'last_updated': firestore.SERVER_TIMESTAMP
        }, merge=True)

    def distributed_counter(self, counter_path, increment_value=1):
        """Implement distributed counter pattern."""
        counter_ref = self.db.document(counter_path)
        shards_ref = counter_ref.collection('shards')

        # Get or create counter document
        counter_doc = counter_ref.get()
        if not counter_doc.exists:
            counter_ref.set({
                'num_shards': 10,
                'total_count': 0
            })

            # Initialize shards
            batch = self.db.batch()
            for i in range(10):
                shard_ref = shards_ref.document(str(i))
                batch.set(shard_ref, {'count': 0})
            batch.commit()

        # Increment random shard
        import random
        shard_id = random.randint(0, 9)
        shard_ref = shards_ref.document(str(shard_id))
        shard_ref.update({'count': firestore.Increment(increment_value)})

    def get_distributed_count(self, counter_path):
        """Get total from distributed counter."""
        counter_ref = self.db.document(counter_path)
        shards_ref = counter_ref.collection('shards')

        # Sum all shards
        total = 0
        for shard in shards_ref.stream():
            total += shard.to_dict().get('count', 0)

        # Update cached total
        counter_ref.update({'total_count': total})

        return total

    def complex_query_with_pagination(self, collection_name, filters, order_by, page_size=20):
        """Perform complex query with cursor-based pagination."""
        query = self.db.collection(collection_name)

        # Apply filters
        for field, operator, value in filters:
            query = query.where(field, operator, value)

        # Apply ordering
        for field, direction in order_by:
            query = query.order_by(field, direction=direction)

        # Limit for pagination
        query = query.limit(page_size)

        # Generator for paginated results
        def paginate():
            last_doc = None
            while True:
                if last_doc:
                    paginated_query = query.start_after(last_doc)
                else:
                    paginated_query = query

                docs = list(paginated_query.stream())
                if not docs:
                    break

                yield docs
                last_doc = docs[-1]

        return paginate()

    def setup_real_time_listener(self, collection_name, callback):
        """Setup real-time listener with error handling."""
        def on_snapshot(col_snapshot, changes, read_time):
            for change in changes:
                if change.type.name == 'ADDED':
                    callback('added', change.document.id, change.document.to_dict())
                elif change.type.name == 'MODIFIED':
                    callback('modified', change.document.id, change.document.to_dict())
                elif change.type.name == 'REMOVED':
                    callback('removed', change.document.id, None)

        # Create listener
        col_ref = self.db.collection(collection_name)
        col_watch = col_ref.on_snapshot(on_snapshot)

        return col_watch  # Return for later unsubscribe

    def transaction_with_retry(self, transaction_func, max_attempts=3):
        """Execute transaction with automatic retry."""
        @firestore.transactional
        def update_in_transaction(transaction):
            return transaction_func(transaction)

        attempts = 0
        while attempts < max_attempts:
            try:
                result = update_in_transaction(self.db.transaction())
                return result
            except Exception as e:
                attempts += 1
                if attempts >= max_attempts:
                    raise
                time.sleep(2 ** attempts)  # Exponential backoff

Bigtable: Wide-Column NoSQL

Bigtable Implementation

# bigtable_manager.py
from google.cloud import bigtable
from google.cloud.bigtable import column_family
from google.cloud.bigtable import row_filters
import struct
import time

class BigtableManager:
    """Manage Bigtable instances and operations."""

    def __init__(self, project_id):
        self.project_id = project_id
        self.client = bigtable.Client(project=project_id, admin=True)

    def create_time_series_table(self, instance_id, table_id):
        """Create table optimized for time-series data."""
        instance = self.client.instance(instance_id)
        table = instance.table(table_id)

        # Create table if not exists
        if not table.exists():
            table.create()

            # Add column families
            cf_metrics = table.column_family('metrics')
            cf_metrics.create()

            cf_metadata = table.column_family('metadata')
            cf_metadata.create()

            # Set garbage collection rules
            cf_metrics.update(
                gc_rule=column_family.MaxVersionsGCRule(1) &
                column_family.MaxAgeGCRule(timedelta(days=30))
            )

            print(f"Created time-series table: {table_id}")

    def write_time_series_data(self, instance_id, table_id, device_id, metrics):
        """Write time-series data with proper row key design."""
        instance = self.client.instance(instance_id)
        table = instance.table(table_id)

        # Create row key: device_id#reverse_timestamp
        timestamp = int(time.time() * 1000)
        reverse_timestamp = 9999999999999 - timestamp
        row_key = f"{device_id}#{reverse_timestamp}".encode()

        row = table.direct_row(row_key)

        # Write metrics
        for metric_name, value in metrics.items():
            # Store as binary for efficiency
            if isinstance(value, (int, float)):
                value_bytes = struct.pack('d', float(value))
            else:
                value_bytes = str(value).encode()

            row.set_cell(
                'metrics',
                metric_name.encode(),
                value_bytes,
                timestamp=timestamp * 1000  # microseconds
            )

        # Write metadata
        row.set_cell(
            'metadata',
            b'device_id',
            device_id.encode(),
            timestamp=timestamp * 1000
        )

        row.commit()

    def query_time_range(self, instance_id, table_id, device_id, start_time, end_time):
        """Query time-series data for a specific time range."""
        instance = self.client.instance(instance_id)
        table = instance.table(table_id)

        # Convert timestamps to reverse format
        start_reverse = 9999999999999 - int(end_time * 1000)
        end_reverse = 9999999999999 - int(start_time * 1000)

        # Create row key range
        row_start = f"{device_id}#{start_reverse}".encode()
        row_end = f"{device_id}#{end_reverse}".encode()

        # Create row set
        row_set = bigtable.row_set.RowSet()
        row_set.add_row_range_from_keys(
            start_key=row_start,
            end_key=row_end
        )

        # Read rows
        rows = table.read_rows(
            row_set=row_set,
            filter_=row_filters.ColumnRangeFilter(
                'metrics',
                start_column=b'',
                end_column=b'~'
            )
        )

        # Process results
        results = []
        for row in rows:
            timestamp = 9999999999999 - int(row.row_key.decode().split('#')[1])
            metrics = {}

            for cell in row.cells['metrics'].values():
                for c in cell:
                    metric_name = c.column.decode()
                    try:
                        value = struct.unpack('d', c.value)[0]
                    except:
                        value = c.value.decode()
                    metrics[metric_name] = value

            results.append({
                'timestamp': timestamp / 1000,
                'metrics': metrics
            })

        return results

    def implement_hot_spotting_prevention(self, instance_id, table_id):
        """Implement salting to prevent hot spotting."""
        instance = self.client.instance(instance_id)
        table = instance.table(table_id)

        def salted_row_key(original_key, salt_buckets=10):
            """Add salt prefix to distribute writes."""
            import hashlib

            # Calculate salt based on key hash
            key_hash = hashlib.md5(original_key.encode()).hexdigest()
            salt = int(key_hash[:2], 16) % salt_buckets

            return f"{salt:02d}#{original_key}".encode()

        return salted_row_key

Memorystore: In-Memory Caching

Redis Implementation

# memorystore_manager.py
import redis
from google.cloud import redis_v1
import json
import pickle
from datetime import timedelta

class MemorystoreManager:
    """Manage Memorystore Redis instances."""

    def __init__(self, project_id):
        self.project_id = project_id
        self.redis_admin = redis_v1.CloudRedisClient()
        self.redis_clients = {}

    def create_redis_instance(self, instance_id, region, tier='STANDARD_HA'):
        """Create Memorystore Redis instance."""
        parent = f"projects/{self.project_id}/locations/{region}"

        instance = {
            "name": f"{parent}/instances/{instance_id}",
            "tier": tier,
            "memory_size_gb": 5,
            "redis_version": "REDIS_6_X",
            "display_name": instance_id,
            "auth_enabled": True,
            "transit_encryption_mode": "SERVER_AUTHENTICATION",
            "persistence_config": {
                "persistence_mode": "RDB",
                "rdb_snapshot_period": "ONE_HOUR"
            }
        }

        operation = self.redis_admin.create_instance(
            parent=parent,
            instance_id=instance_id,
            instance=instance
        )

        result = operation.result()  # Wait for completion
        print(f"Created Redis instance: {instance_id}")

        return result

    def get_redis_client(self, instance_name):
        """Get Redis client for instance."""
        if instance_name not in self.redis_clients:
            # Get instance details
            instance = self.redis_admin.get_instance(name=instance_name)

            # Create Redis client
            self.redis_clients[instance_name] = redis.Redis(
                host=instance.host,
                port=instance.port,
                password=instance.auth_string if instance.auth_enabled else None,
                ssl=True,
                ssl_cert_reqs='required',
                decode_responses=True
            )

        return self.redis_clients[instance_name]

    def implement_cache_patterns(self, redis_client):
        """Implement common caching patterns."""

        # Cache-aside pattern
        def cache_aside_get(key, fetch_func, ttl=300):
            # Try to get from cache
            cached = redis_client.get(key)
            if cached:
                return json.loads(cached)

            # Fetch from source
            data = fetch_func()

            # Store in cache
            redis_client.setex(key, ttl, json.dumps(data))

            return data

        # Write-through pattern
        def write_through_set(key, value, store_func, ttl=300):
            # Store in primary storage
            store_func(value)

            # Update cache
            redis_client.setex(key, ttl, json.dumps(value))

        # Cache warming
        def warm_cache(keys_data, ttl=300):
            pipeline = redis_client.pipeline()

            for key, data in keys_data.items():
                pipeline.setex(key, ttl, json.dumps(data))

            pipeline.execute()

        return {
            'cache_aside_get': cache_aside_get,
            'write_through_set': write_through_set,
            'warm_cache': warm_cache
        }

    def setup_session_store(self, redis_client):
        """Setup Redis as session store."""

        class SessionStore:
            def __init__(self, redis_client, prefix='session:', ttl=3600):
                self.redis = redis_client
                self.prefix = prefix
                self.ttl = ttl

            def save(self, session_id, data):
                key = f"{self.prefix}{session_id}"
                self.redis.setex(key, self.ttl, pickle.dumps(data))

            def load(self, session_id):
                key = f"{self.prefix}{session_id}"
                data = self.redis.get(key)
                return pickle.loads(data) if data else None

            def delete(self, session_id):
                key = f"{self.prefix}{session_id}"
                self.redis.delete(key)

            def extend(self, session_id):
                key = f"{self.prefix}{session_id}"
                self.redis.expire(key, self.ttl)

        return SessionStore(redis_client)

    def implement_rate_limiting(self, redis_client):
        """Implement rate limiting with Redis."""

        def rate_limit(identifier, max_requests=100, window_seconds=60):
            key = f"rate_limit:{identifier}"

            try:
                current = redis_client.incr(key)

                if current == 1:
                    redis_client.expire(key, window_seconds)

                if current > max_requests:
                    return False, current

                return True, current

            except redis.RedisError:
                # Allow on Redis error (fail open)
                return True, 0

        return rate_limit

Performance Optimization

Connection Pooling and Query Optimization

# database_optimizer.py
from sqlalchemy import create_engine, pool
from sqlalchemy.orm import sessionmaker
import threading

class DatabaseOptimizer:
    """Optimize database performance across services."""

    def __init__(self, project_id):
        self.project_id = project_id
        self.connection_pools = {}
        self.pool_lock = threading.Lock()

    def get_cloud_sql_pool(self, instance_connection_name, database):
        """Get connection pool for Cloud SQL."""
        pool_key = f"{instance_connection_name}:{database}"

        with self.pool_lock:
            if pool_key not in self.connection_pools:
                # Create SQLAlchemy engine with connection pooling
                engine = create_engine(
                    f"mysql+pymysql://root:password@/{database}",
                    connect_args={
                        "unix_socket": f"/cloudsql/{instance_connection_name}"
                    },
                    poolclass=pool.QueuePool,
                    pool_size=5,
                    max_overflow=10,
                    pool_timeout=30,
                    pool_recycle=1800,  # Recycle connections after 30 minutes
                    pool_pre_ping=True  # Verify connections before use
                )

                self.connection_pools[pool_key] = engine

        return self.connection_pools[pool_key]

    def optimize_queries(self):
        """Query optimization strategies."""

        # Example: Batch operations
        def batch_insert(engine, table_name, records, batch_size=1000):
            with engine.begin() as conn:
                for i in range(0, len(records), batch_size):
                    batch = records[i:i + batch_size]
                    conn.execute(f"INSERT INTO {table_name} VALUES", batch)

        # Example: Query result caching
        class QueryCache:
            def __init__(self, redis_client):
                self.redis = redis_client
                self.ttl = 300  # 5 minutes

            def cached_query(self, query_key, query_func):
                # Check cache
                cached = self.redis.get(f"query:{query_key}")
                if cached:
                    return json.loads(cached)

                # Execute query
                result = query_func()

                # Cache result
                self.redis.setex(
                    f"query:{query_key}",
                    self.ttl,
                    json.dumps(result, default=str)
                )

                return result

        return {
            'batch_insert': batch_insert,
            'QueryCache': QueryCache
        }

    def implement_read_write_splitting(self, primary_engine, replica_engines):
        """Implement read/write splitting."""

        class ReadWriteSplitter:
            def __init__(self, primary, replicas):
                self.primary = primary
                self.replicas = replicas
                self.current_replica = 0

            def execute_read(self, query):
                # Round-robin across replicas
                replica = self.replicas[self.current_replica]
                self.current_replica = (self.current_replica + 1) % len(self.replicas)

                with replica.connect() as conn:
                    return conn.execute(query).fetchall()

            def execute_write(self, query):
                # All writes go to primary
                with self.primary.connect() as conn:
                    return conn.execute(query)

        return ReadWriteSplitter(primary_engine, replica_engines)

Security Best Practices

Database Security Implementation

# database_security.py
from google.cloud import secretmanager
from google.cloud import kms
import secrets
import hashlib

class DatabaseSecurity:
    """Implement database security best practices."""

    def __init__(self, project_id):
        self.project_id = project_id
        self.secret_client = secretmanager.SecretManagerServiceClient()
        self.kms_client = kms.KeyManagementServiceClient()

    def setup_cloud_sql_security(self, instance_name):
        """Setup comprehensive Cloud SQL security."""
        security_config = {
            # Require SSL
            "require_ssl": True,

            # Database flags for security
            "database_flags": {
                # MySQL flags
                "local_infile": "off",
                "skip_show_database": "on",
                "sql_mode": "TRADITIONAL",

                # Audit logging
                "cloudsql_mysql_audit_log": "on",
                "cloudsql_mysql_audit_log_max_file_size": "100",
                "cloudsql_mysql_audit_log_rotate": "on",

                # Performance schema for monitoring
                "performance_schema": "on"
            },

            # Backup encryption
            "backup_configuration": {
                "enabled": True,
                "point_in_time_recovery_enabled": True,
                "transaction_log_retention_days": 7,
                "location": "us",
                "backup_encryption_key_name": f"projects/{self.project_id}/locations/us/keyRings/database-keys/cryptoKeys/backup-key"
            },

            # Authorized networks (use Private IP instead)
            "authorized_networks": [],

            # Maintenance window
            "maintenance_window": {
                "hour": 3,
                "day": 0,  # Sunday
                "update_track": "stable"
            }
        }

        return security_config

    def rotate_database_credentials(self, instance_name, database_name):
        """Rotate database credentials securely."""
        # Generate new password
        new_password = secrets.token_urlsafe(32)

        # Store in Secret Manager
        secret_id = f"{instance_name}-{database_name}-password"
        parent = f"projects/{self.project_id}"

        # Create or update secret
        try:
            secret = self.secret_client.create_secret(
                parent=parent,
                secret_id=secret_id,
                secret={
                    "replication": {"automatic": {}}
                }
            )
        except:
            # Secret already exists
            secret_name = f"{parent}/secrets/{secret_id}"
            secret = self.secret_client.get_secret(name=secret_name)

        # Add new version
        self.secret_client.add_secret_version(
            parent=secret.name,
            payload={"data": new_password.encode()}
        )

        # Update database password
        # This would be done through Cloud SQL Admin API

        return secret_id

    def implement_data_encryption(self):
        """Implement application-level data encryption."""

        class FieldEncryption:
            def __init__(self, kms_key_name):
                self.kms_key = kms_key_name

            def encrypt_field(self, plaintext):
                # Generate DEK (Data Encryption Key)
                dek = secrets.token_bytes(32)

                # Encrypt DEK with KEK (Key Encryption Key) from KMS
                # This would use KMS API to encrypt the DEK

                # Encrypt data with DEK
                from cryptography.fernet import Fernet
                import base64

                key = base64.urlsafe_b64encode(dek)
                f = Fernet(key)
                ciphertext = f.encrypt(plaintext.encode())

                return {
                    'ciphertext': ciphertext,
                    'encrypted_dek': 'encrypted_dek_here'
                }

            def decrypt_field(self, encrypted_data):
                # Decrypt DEK using KMS
                # Decrypt data using DEK
                pass

        return FieldEncryption

Monitoring and Maintenance

Database Monitoring Setup

# database_monitoring.py
from google.cloud import monitoring_v3
import time

class DatabaseMonitoring:
    """Monitor database performance and health."""

    def __init__(self, project_id):
        self.project_id = project_id
        self.monitoring_client = monitoring_v3.MetricServiceClient()
        self.alert_client = monitoring_v3.AlertPolicyServiceClient()

    def setup_cloud_sql_monitoring(self, instance_name):
        """Setup comprehensive Cloud SQL monitoring."""
        project_name = f"projects/{self.project_id}"

        # Create alert policies
        alert_policies = [
            {
                "display_name": "Cloud SQL CPU High",
                "conditions": [{
                    "display_name": "CPU utilization above 80%",
                    "condition_threshold": {
                        "filter": f'resource.type="cloudsql_database" '
                                 f'AND resource.label.database_id="{self.project_id}:{instance_name}" '
                                 f'AND metric.type="cloudsql.googleapis.com/database/cpu/utilization"',
                        "comparison": monitoring_v3.ComparisonType.COMPARISON_GT,
                        "threshold_value": 0.8,
                        "duration": {"seconds": 300},
                        "aggregations": [{
                            "alignment_period": {"seconds": 60},
                            "per_series_aligner": monitoring_v3.Aggregation.Aligner.ALIGN_MEAN
                        }]
                    }
                }]
            },
            {
                "display_name": "Cloud SQL Storage Near Limit",
                "conditions": [{
                    "display_name": "Storage usage above 90%",
                    "condition_threshold": {
                        "filter": f'resource.type="cloudsql_database" '
                                 f'AND resource.label.database_id="{self.project_id}:{instance_name}" '
                                 f'AND metric.type="cloudsql.googleapis.com/database/disk/utilization"',
                        "comparison": monitoring_v3.ComparisonType.COMPARISON_GT,
                        "threshold_value": 0.9,
                        "duration": {"seconds": 60}
                    }
                }]
            },
            {
                "display_name": "Cloud SQL Replication Lag",
                "conditions": [{
                    "display_name": "Replication lag above 30 seconds",
                    "condition_threshold": {
                        "filter": f'resource.type="cloudsql_database" '
                                 f'AND metric.type="cloudsql.googleapis.com/database/replication/replica_lag"',
                        "comparison": monitoring_v3.ComparisonType.COMPARISON_GT,
                        "threshold_value": 30,
                        "duration": {"seconds": 300}
                    }
                }]
            }
        ]

        for policy_config in alert_policies:
            policy = monitoring_v3.AlertPolicy(policy_config)
            self.alert_client.create_alert_policy(
                name=project_name,
                alert_policy=policy
            )

        print(f"Monitoring alerts created for {instance_name}")

    def create_custom_dashboard(self):
        """Create custom database monitoring dashboard."""
        dashboard_config = {
            "displayName": "Database Performance Dashboard",
            "gridLayout": {
                "widgets": [
                    {
                        "title": "Cloud SQL CPU Usage",
                        "xyChart": {
                            "dataSets": [{
                                "timeSeriesQuery": {
                                    "timeSeriesFilter": {
                                        "filter": 'resource.type="cloudsql_database" '
                                                 'AND metric.type="cloudsql.googleapis.com/database/cpu/utilization"'
                                    }
                                }
                            }]
                        }
                    },
                    {
                        "title": "Query Performance",
                        "xyChart": {
                            "dataSets": [{
                                "timeSeriesQuery": {
                                    "timeSeriesFilter": {
                                        "filter": 'resource.type="cloudsql_database" '
                                                 'AND metric.type="cloudsql.googleapis.com/database/mysql/queries"'
                                    }
                                }
                            }]
                        }
                    }
                ]
            }
        }

        return dashboard_config

Best Practices Summary

Database Selection Matrix

def select_database_service(requirements):
    """Select appropriate database service based on requirements."""

    if requirements.get('global_consistency') and requirements.get('sql'):
        return 'Cloud Spanner'

    elif requirements.get('traditional_sql'):
        return 'Cloud SQL'

    elif requirements.get('document_store') and requirements.get('real_time'):
        return 'Firestore'

    elif requirements.get('time_series') or requirements.get('iot'):
        return 'Bigtable'

    elif requirements.get('caching'):
        return 'Memorystore'

    else:
        return 'Evaluate requirements further'

Conclusion

Google Cloud's database portfolio provides solutions for every use case. Key considerations:

  1. Cloud SQL: Perfect for lift-and-shift migrations and traditional applications
  2. Cloud Spanner: When you need global scale with SQL consistency
  3. Firestore: Ideal for mobile and web applications with real-time requirements
  4. Bigtable: Best for high-throughput analytical and time-series workloads
  5. Memorystore: Essential for caching and session management

Next Steps

  • Implement proof-of-concepts for your use cases
  • Study database migration strategies
  • Learn about cross-service integration patterns
  • Explore advanced features like Change Data Capture
  • Get certified as a Google Cloud Database Engineer

Remember: Choose the right database for your workload, not the most popular one. Google Cloud's database services are designed to work together for comprehensive solutions.