Google Cloud Security and IAM: Enterprise Security Guide

Tyler Maginnis | February 05, 2024

Google CloudsecurityIAMcompliancebest practices

Need Professional Google Cloud Services?

Get expert assistance with your google cloud services implementation and management. Tyler on Tech Louisville provides priority support for Louisville businesses.

Same-day service available for Louisville area

Google Cloud Security and IAM: Enterprise Security Guide

Security is paramount in cloud computing. This comprehensive guide covers Google Cloud's security features, Identity and Access Management (IAM), and best practices for building secure, compliant cloud environments.

Security Foundation

Shared Responsibility Model

Google Cloud operates on a shared responsibility model: - Google's Responsibility: Physical security, infrastructure, hypervisor, network - Customer's Responsibility: Data, identity, access management, application security, network controls

Identity and Access Management (IAM)

IAM Hierarchy and Inheritance

# iam_manager.py
from google.cloud import resourcemanager_v3
from google.cloud import iam_admin_v1
from google.cloud import service_usage_v1
import json

class IAMManager:
    """Comprehensive IAM management for Google Cloud."""

    def __init__(self, project_id):
        self.project_id = project_id
        self.resource_client = resourcemanager_v3.ProjectsClient()
        self.iam_client = iam_admin_v1.IAMClient()
        self.service_usage_client = service_usage_v1.ServiceUsageClient()

    def setup_organizational_hierarchy(self, org_id):
        """Setup organizational hierarchy with folders."""
        folders = {
            "production": {
                "display_name": "Production",
                "iam_bindings": [
                    {
                        "role": "roles/viewer",
                        "members": ["group:production-viewers@company.com"]
                    }
                ]
            },
            "development": {
                "display_name": "Development",
                "iam_bindings": [
                    {
                        "role": "roles/editor",
                        "members": ["group:developers@company.com"]
                    }
                ]
            },
            "shared-services": {
                "display_name": "Shared Services",
                "iam_bindings": [
                    {
                        "role": "roles/viewer",
                        "members": ["group:all-users@company.com"]
                    }
                ]
            }
        }

        created_folders = {}
        for folder_id, config in folders.items():
            # Create folder
            folder = self.create_folder(org_id, config['display_name'])
            created_folders[folder_id] = folder

            # Set IAM policy
            self.set_folder_iam_policy(folder.name, config['iam_bindings'])

        return created_folders

    def implement_least_privilege(self, resource_name, principal, required_permissions):
        """Implement least privilege access."""
        # Find minimal predefined role
        minimal_role = self.find_minimal_role(required_permissions)

        if not minimal_role:
            # Create custom role if no suitable predefined role
            custom_role = self.create_custom_role(
                role_id=f"custom_{resource_name.replace('/', '_')}",
                title=f"Custom role for {resource_name}",
                permissions=required_permissions
            )
            minimal_role = custom_role.name

        # Grant role to principal
        self.add_iam_binding(resource_name, principal, minimal_role)

        return minimal_role

    def create_custom_role(self, role_id, title, permissions, description=""):
        """Create custom IAM role."""
        parent = f"projects/{self.project_id}"

        role = {
            "role_id": role_id,
            "role": {
                "title": title,
                "description": description,
                "included_permissions": permissions,
                "stage": "GA"
            }
        }

        created_role = self.iam_client.create_role(
            parent=parent,
            role_id=role_id,
            role=role["role"]
        )

        return created_role

    def setup_workload_identity(self, namespace, service_account_name):
        """Setup Workload Identity for GKE."""
        # Create Google Service Account
        gsa_email = f"{service_account_name}@{self.project_id}.iam.gserviceaccount.com"

        service_account = {
            "account_id": service_account_name,
            "service_account": {
                "display_name": f"Workload Identity SA for {namespace}",
                "description": "Service account for Workload Identity"
            }
        }

        # Create service account
        sa = self.iam_client.create_service_account(
            name=f"projects/{self.project_id}",
            account_id=service_account["account_id"],
            service_account=service_account["service_account"]
        )

        # Allow Kubernetes SA to impersonate Google SA
        policy_binding = {
            "role": "roles/iam.workloadIdentityUser",
            "members": [
                f"serviceAccount:{self.project_id}.svc.id.goog[{namespace}/{service_account_name}]"
            ]
        }

        self.add_iam_binding(
            f"projects/{self.project_id}/serviceAccounts/{gsa_email}",
            policy_binding["members"][0],
            policy_binding["role"]
        )

        return gsa_email

    def implement_separation_of_duties(self):
        """Implement separation of duties with custom roles."""
        roles = {
            "security-admin": {
                "title": "Security Administrator",
                "description": "Manages security policies and IAM",
                "permissions": [
                    "iam.roles.create",
                    "iam.roles.delete",
                    "iam.roles.update",
                    "iam.serviceAccounts.create",
                    "iam.serviceAccounts.delete",
                    "resourcemanager.projects.setIamPolicy",
                    "logging.logMetrics.create",
                    "monitoring.alertPolicies.create"
                ]
            },
            "network-admin": {
                "title": "Network Administrator",
                "description": "Manages network resources",
                "permissions": [
                    "compute.networks.create",
                    "compute.networks.delete",
                    "compute.firewalls.create",
                    "compute.firewalls.delete",
                    "compute.routers.create",
                    "compute.vpnTunnels.create"
                ]
            },
            "developer": {
                "title": "Developer",
                "description": "Deploy and manage applications",
                "permissions": [
                    "compute.instances.create",
                    "compute.instances.delete",
                    "container.clusters.get",
                    "container.pods.create",
                    "storage.objects.create",
                    "storage.objects.get"
                ]
            },
            "auditor": {
                "title": "Auditor",
                "description": "Read-only access for compliance",
                "permissions": [
                    "compute.instances.list",
                    "iam.serviceAccounts.list",
                    "logging.logs.list",
                    "monitoring.timeSeries.list",
                    "resourcemanager.projects.get"
                ]
            }
        }

        created_roles = {}
        for role_id, config in roles.items():
            role = self.create_custom_role(
                role_id=role_id,
                title=config["title"],
                permissions=config["permissions"],
                description=config["description"]
            )
            created_roles[role_id] = role

        return created_roles

    def setup_conditional_iam(self, resource, member, role, conditions):
        """Setup conditional IAM bindings."""
        # Example conditions
        condition_examples = {
            "time_based": {
                "expression": 'request.time < timestamp("2024-12-31T23:59:59.999Z")',
                "title": "Expires end of 2024",
                "description": "Access expires at the end of 2024"
            },
            "ip_based": {
                "expression": 'origin.ip in ["203.0.113.0/24", "198.51.100.0/24"]',
                "title": "Corporate network only",
                "description": "Access only from corporate IP ranges"
            },
            "resource_based": {
                "expression": 'resource.name.startsWith("projects/my-project/zones/us-central1-a")',
                "title": "US Central only",
                "description": "Access limited to us-central1-a resources"
            },
            "combined": {
                "expression": '''
                    request.time < timestamp("2024-12-31T23:59:59.999Z") &&
                    origin.ip in ["203.0.113.0/24"] &&
                    resource.service == "compute.googleapis.com"
                ''',
                "title": "Temporary compute access",
                "description": "Temporary access to compute resources from corporate network"
            }
        }

        # Get current IAM policy
        policy = self.get_iam_policy(resource)

        # Add conditional binding
        binding = {
            "role": role,
            "members": [member],
            "condition": conditions
        }

        policy.bindings.append(binding)

        # Update policy
        self.set_iam_policy(resource, policy)

        return binding

Service Account Security

# service_account_security.py
from google.cloud import iam_admin_v1
from google.oauth2 import service_account
import json
import time

class ServiceAccountSecurity:
    """Secure service account management."""

    def __init__(self, project_id):
        self.project_id = project_id
        self.iam_client = iam_admin_v1.IAMClient()

    def create_least_privilege_service_account(self, name, roles, description=""):
        """Create service account with minimal permissions."""
        # Create service account
        service_account = self.iam_client.create_service_account(
            name=f"projects/{self.project_id}",
            account_id=name,
            service_account={
                "display_name": name,
                "description": description
            }
        )

        # Grant minimal roles
        for role in roles:
            self.grant_role_to_service_account(service_account.email, role)

        # Enable audit logging for this service account
        self.enable_service_account_audit_logging(service_account.email)

        return service_account

    def implement_key_rotation(self, service_account_email):
        """Implement automatic key rotation."""
        # List existing keys
        keys = self.iam_client.list_service_account_keys(
            name=f"projects/{self.project_id}/serviceAccounts/{service_account_email}"
        )

        # Delete keys older than 90 days
        for key in keys.keys:
            if self.is_key_expired(key.valid_after_time, days=90):
                self.iam_client.delete_service_account_key(name=key.name)
                print(f"Deleted expired key: {key.name}")

        # Create new key
        new_key = self.iam_client.create_service_account_key(
            name=f"projects/{self.project_id}/serviceAccounts/{service_account_email}",
            private_key_type="TYPE_GOOGLE_CREDENTIALS_FILE"
        )

        # Store in Secret Manager (implementation would go here)
        self.store_key_in_secret_manager(new_key)

        return new_key

    def implement_impersonation_chain(self, target_sa, impersonator_sa):
        """Setup service account impersonation chain."""
        # Grant impersonation permission
        policy_binding = {
            "role": "roles/iam.serviceAccountTokenCreator",
            "members": [f"serviceAccount:{impersonator_sa}"]
        }

        self.add_iam_binding(
            f"projects/{self.project_id}/serviceAccounts/{target_sa}",
            policy_binding["members"][0],
            policy_binding["role"]
        )

        # Log impersonation setup
        print(f"Allowed {impersonator_sa} to impersonate {target_sa}")

        return True

    def setup_short_lived_credentials(self, service_account_email, lifetime_seconds=3600):
        """Generate short-lived credentials."""
        # This would typically use STS API
        credentials_config = {
            "type": "service_account",
            "audience": f"https://{self.project_id}.iam.gserviceaccount.com",
            "subject_token_type": "urn:ietf:params:oauth:token-type:jwt",
            "token_lifetime_seconds": lifetime_seconds,
            "service_account": service_account_email
        }

        # Generate short-lived token
        # Implementation would use Google STS API

        return credentials_config

VPC Security

VPC Service Controls

# vpc_security.py
from google.cloud import accesscontextmanager_v1
from google.cloud import compute_v1
import ipaddress

class VPCSecurityManager:
    """Manage VPC security controls."""

    def __init__(self, project_id):
        self.project_id = project_id
        self.access_client = accesscontextmanager_v1.AccessContextManagerClient()
        self.compute_client = compute_v1.NetworksClient()

    def create_vpc_service_perimeter(self, perimeter_name, protected_projects, 
                                   restricted_services):
        """Create VPC Service Controls perimeter."""
        # Create access policy if not exists
        policy_name = f"organizations/{self.get_org_id()}/accessPolicies/default"

        # Define perimeter
        service_perimeter = {
            "name": f"{policy_name}/servicePerimeters/{perimeter_name}",
            "title": perimeter_name,
            "perimeter_type": "PERIMETER_TYPE_REGULAR",
            "status": {
                "resources": [f"projects/{project}" for project in protected_projects],
                "restricted_services": restricted_services,
                "vpc_accessible_services": {
                    "enable_restriction": True,
                    "allowed_services": ["storage.googleapis.com", "bigquery.googleapis.com"]
                }
            }
        }

        # Create perimeter
        operation = self.access_client.create_service_perimeter(
            parent=policy_name,
            service_perimeter=service_perimeter
        )

        return operation

    def setup_private_google_access(self, network_name, subnet_name, region):
        """Enable Private Google Access for subnet."""
        subnet_client = compute_v1.SubnetworksClient()

        # Get subnet
        subnet = subnet_client.get(
            project=self.project_id,
            region=region,
            subnetwork=subnet_name
        )

        # Enable Private Google Access
        subnet.private_ip_google_access = True

        # Update subnet
        operation = subnet_client.patch(
            project=self.project_id,
            region=region,
            subnetwork=subnet_name,
            subnetwork_resource=subnet
        )

        return operation

    def create_cloud_nat(self, router_name, region):
        """Create Cloud NAT for secure outbound connectivity."""
        router_client = compute_v1.RoutersClient()

        nat_config = {
            "name": f"{router_name}-nat",
            "nat_ip_allocate_option": "AUTO_ONLY",
            "source_subnetwork_ip_ranges_to_nat": "ALL_SUBNETWORKS_ALL_IP_RANGES",
            "log_config": {
                "enable": True,
                "filter": "ERRORS_ONLY"
            },
            "min_ports_per_vm": 64,
            "max_ports_per_vm": 2048,
            "enable_endpoint_independent_mapping": True
        }

        # Add NAT to router
        router = router_client.get(
            project=self.project_id,
            region=region,
            router=router_name
        )

        router.nats = [nat_config]

        operation = router_client.update(
            project=self.project_id,
            region=region,
            router=router_name,
            router_resource=router
        )

        return operation

    def implement_firewall_rules(self, network_name):
        """Implement comprehensive firewall rules."""
        firewall_client = compute_v1.FirewallsClient()

        # Define security rules
        firewall_rules = [
            {
                "name": f"{network_name}-deny-all-ingress",
                "direction": "INGRESS",
                "priority": 65534,
                "source_ranges": ["0.0.0.0/0"],
                "denied": [{"IP_protocol": "all"}],
                "description": "Deny all ingress traffic by default"
            },
            {
                "name": f"{network_name}-allow-internal",
                "direction": "INGRESS",
                "priority": 1000,
                "source_ranges": ["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"],
                "allowed": [{"IP_protocol": "tcp"}, {"IP_protocol": "udp"}, {"IP_protocol": "icmp"}],
                "description": "Allow internal RFC1918 traffic"
            },
            {
                "name": f"{network_name}-allow-ssh-from-iap",
                "direction": "INGRESS",
                "priority": 1001,
                "source_ranges": ["35.235.240.0/20"],  # IAP range
                "allowed": [{"IP_protocol": "tcp", "ports": ["22"]}],
                "target_tags": ["ssh-enabled"],
                "description": "Allow SSH from Identity-Aware Proxy"
            },
            {
                "name": f"{network_name}-allow-health-checks",
                "direction": "INGRESS",
                "priority": 1002,
                "source_ranges": ["35.191.0.0/16", "130.211.0.0/22"],  # Google health check ranges
                "allowed": [{"IP_protocol": "tcp", "ports": ["80", "443"]}],
                "target_tags": ["http-server", "https-server"],
                "description": "Allow Google Cloud health checks"
            }
        ]

        created_rules = []
        for rule_config in firewall_rules:
            rule_config["network"] = f"projects/{self.project_id}/global/networks/{network_name}"

            operation = firewall_client.insert(
                project=self.project_id,
                firewall_resource=rule_config
            )
            created_rules.append(operation)

        return created_rules

Data Security

Data Loss Prevention (DLP)

# dlp_security.py
from google.cloud import dlp_v2
from google.cloud import storage
import json

class DLPSecurityManager:
    """Implement Data Loss Prevention controls."""

    def __init__(self, project_id):
        self.project_id = project_id
        self.dlp_client = dlp_v2.DlpServiceClient()
        self.parent = f"projects/{project_id}/locations/global"

    def create_dlp_inspection_templates(self):
        """Create DLP inspection templates for common use cases."""
        templates = {
            "pii_detection": {
                "display_name": "PII Detection Template",
                "description": "Detect personally identifiable information",
                "info_types": [
                    {"name": "EMAIL_ADDRESS"},
                    {"name": "PHONE_NUMBER"},
                    {"name": "CREDIT_CARD_NUMBER"},
                    {"name": "US_SOCIAL_SECURITY_NUMBER"},
                    {"name": "PASSPORT"},
                    {"name": "DATE_OF_BIRTH"}
                ],
                "min_likelihood": "LIKELY",
                "include_quote": True
            },
            "financial_data": {
                "display_name": "Financial Data Template",
                "description": "Detect financial information",
                "info_types": [
                    {"name": "CREDIT_CARD_NUMBER"},
                    {"name": "IBAN_CODE"},
                    {"name": "SWIFT_CODE"},
                    {"name": "US_BANK_ROUTING_MICR"},
                    {"name": "CRYPTO_WALLET"}
                ],
                "min_likelihood": "POSSIBLE",
                "include_quote": True
            },
            "healthcare_data": {
                "display_name": "Healthcare Data Template",
                "description": "Detect healthcare information",
                "info_types": [
                    {"name": "US_HEALTHCARE_NPI"},
                    {"name": "MEDICAL_RECORD_NUMBER"},
                    {"name": "FDA_CODE"},
                    {"name": "ICD9_CODE"},
                    {"name": "ICD10_CODE"}
                ],
                "min_likelihood": "LIKELY",
                "include_quote": True
            }
        }

        created_templates = {}
        for template_id, config in templates.items():
            inspect_config = {
                "info_types": config["info_types"],
                "min_likelihood": config["min_likelihood"],
                "include_quote": config["include_quote"],
                "limits": {
                    "max_findings_per_request": 100
                }
            }

            template = self.dlp_client.create_inspect_template(
                parent=self.parent,
                inspect_template={
                    "display_name": config["display_name"],
                    "description": config["description"],
                    "inspect_config": inspect_config
                }
            )

            created_templates[template_id] = template

        return created_templates

    def create_deidentify_template(self):
        """Create de-identification template."""
        deidentify_config = {
            "info_type_transformations": {
                "transformations": [
                    {
                        "info_types": [{"name": "EMAIL_ADDRESS"}],
                        "primitive_transformation": {
                            "replace_config": {
                                "new_value": {"string_value": "[EMAIL_REDACTED]"}
                            }
                        }
                    },
                    {
                        "info_types": [{"name": "PHONE_NUMBER"}],
                        "primitive_transformation": {
                            "character_mask_config": {
                                "masking_character": "*",
                                "number_to_mask": 6
                            }
                        }
                    },
                    {
                        "info_types": [{"name": "CREDIT_CARD_NUMBER"}],
                        "primitive_transformation": {
                            "crypto_replace_ffx_fpe_config": {
                                "crypto_key": {
                                    "kms_wrapped": {
                                        "wrapped_key": "base64_encoded_key_here",
                                        "crypto_key_name": f"projects/{self.project_id}/locations/global/keyRings/dlp-keyring/cryptoKeys/dlp-key"
                                    }
                                },
                                "surrogate_info_type": {"name": "ENCRYPTED_CREDIT_CARD"}
                            }
                        }
                    }
                ]
            }
        }

        template = self.dlp_client.create_deidentify_template(
            parent=self.parent,
            deidentify_template={
                "display_name": "Standard De-identification Template",
                "description": "De-identify common PII types",
                "deidentify_config": deidentify_config
            }
        )

        return template

    def scan_storage_bucket(self, bucket_name, inspect_template_name):
        """Scan Cloud Storage bucket for sensitive data."""
        storage_config = {
            "cloud_storage_options": {
                "file_set": {
                    "url": f"gs://{bucket_name}/**"
                },
                "file_types": ["TEXT_FILE", "CSV", "JSON"],
                "sample_method": "TOP",
                "files_limit_percent": 10
            }
        }

        inspect_job = {
            "inspect_template_name": inspect_template_name,
            "storage_config": storage_config,
            "actions": [
                {
                    "save_findings": {
                        "output_config": {
                            "table": {
                                "project_id": self.project_id,
                                "dataset_id": "dlp_findings",
                                "table_id": f"scan_{bucket_name}"
                            }
                        }
                    }
                },
                {
                    "pub_sub": {
                        "topic": f"projects/{self.project_id}/topics/dlp-findings"
                    }
                }
            ]
        }

        job = self.dlp_client.create_dlp_job(
            parent=self.parent,
            inspect_job=inspect_job
        )

        return job

Encryption and Key Management

Cloud KMS Implementation

# kms_security.py
from google.cloud import kms
from cryptography.fernet import Fernet
import base64
import os

class KMSSecurityManager:
    """Manage encryption keys with Cloud KMS."""

    def __init__(self, project_id):
        self.project_id = project_id
        self.kms_client = kms.KeyManagementServiceClient()

    def setup_key_hierarchy(self):
        """Setup comprehensive key hierarchy."""
        key_rings = {
            "application-keys": {
                "location": "global",
                "keys": {
                    "database-encryption": {
                        "purpose": "ENCRYPT_DECRYPT",
                        "rotation_period": "7776000s",  # 90 days
                        "algorithm": "GOOGLE_SYMMETRIC_ENCRYPTION"
                    },
                    "api-signing": {
                        "purpose": "ASYMMETRIC_SIGN",
                        "algorithm": "RSA_SIGN_PSS_2048_SHA256"
                    },
                    "config-encryption": {
                        "purpose": "ENCRYPT_DECRYPT",
                        "rotation_period": "2592000s",  # 30 days
                        "algorithm": "GOOGLE_SYMMETRIC_ENCRYPTION"
                    }
                }
            },
            "infrastructure-keys": {
                "location": "us-central1",
                "keys": {
                    "disk-encryption": {
                        "purpose": "ENCRYPT_DECRYPT",
                        "rotation_period": "31536000s",  # 1 year
                        "algorithm": "GOOGLE_SYMMETRIC_ENCRYPTION"
                    },
                    "backup-encryption": {
                        "purpose": "ENCRYPT_DECRYPT",
                        "rotation_period": "15552000s",  # 180 days
                        "algorithm": "GOOGLE_SYMMETRIC_ENCRYPTION"
                    }
                }
            }
        }

        created_resources = {}

        for ring_name, ring_config in key_rings.items():
            # Create key ring
            ring_parent = f"projects/{self.project_id}/locations/{ring_config['location']}"
            key_ring = self.kms_client.create_key_ring(
                parent=ring_parent,
                key_ring_id=ring_name,
                key_ring={}
            )

            created_resources[ring_name] = {"ring": key_ring, "keys": {}}

            # Create crypto keys
            for key_name, key_config in ring_config['keys'].items():
                crypto_key = {
                    "purpose": key_config["purpose"],
                    "version_template": {
                        "algorithm": key_config["algorithm"]
                    }
                }

                if "rotation_period" in key_config:
                    crypto_key["rotation_period"] = key_config["rotation_period"]
                    crypto_key["next_rotation_time"] = {"seconds": int(time.time()) + 86400}

                created_key = self.kms_client.create_crypto_key(
                    parent=key_ring.name,
                    crypto_key_id=key_name,
                    crypto_key=crypto_key
                )

                created_resources[ring_name]["keys"][key_name] = created_key

        return created_resources

    def implement_envelope_encryption(self, plaintext_data, kms_key_name):
        """Implement envelope encryption pattern."""
        # Generate DEK (Data Encryption Key)
        dek = Fernet.generate_key()

        # Encrypt data with DEK
        f = Fernet(dek)
        encrypted_data = f.encrypt(plaintext_data.encode())

        # Encrypt DEK with KEK (Key Encryption Key) from KMS
        encrypt_response = self.kms_client.encrypt(
            name=kms_key_name,
            plaintext=dek
        )

        # Return encrypted data and encrypted DEK
        return {
            "encrypted_data": base64.b64encode(encrypted_data).decode(),
            "encrypted_dek": base64.b64encode(encrypt_response.ciphertext).decode(),
            "kms_key_name": kms_key_name
        }

    def decrypt_envelope_encryption(self, encrypted_package):
        """Decrypt envelope encrypted data."""
        # Decrypt DEK using KMS
        decrypt_response = self.kms_client.decrypt(
            name=encrypted_package["kms_key_name"],
            ciphertext=base64.b64decode(encrypted_package["encrypted_dek"])
        )

        # Decrypt data using DEK
        f = Fernet(decrypt_response.plaintext)
        decrypted_data = f.decrypt(base64.b64decode(encrypted_package["encrypted_data"]))

        return decrypted_data.decode()

    def setup_cmek_for_services(self):
        """Setup Customer-Managed Encryption Keys for GCP services."""
        services_config = {
            "compute": {
                "key": "disk-encryption",
                "policy": {
                    "default_kms_key_name": f"projects/{self.project_id}/locations/us-central1/keyRings/infrastructure-keys/cryptoKeys/disk-encryption"
                }
            },
            "storage": {
                "key": "backup-encryption",
                "bucket_config": {
                    "encryption": {
                        "default_kms_key_name": f"projects/{self.project_id}/locations/us-central1/keyRings/infrastructure-keys/cryptoKeys/backup-encryption"
                    }
                }
            },
            "bigquery": {
                "key": "database-encryption",
                "dataset_config": {
                    "default_encryption_configuration": {
                        "kms_key_name": f"projects/{self.project_id}/locations/global/keyRings/application-keys/cryptoKeys/database-encryption"
                    }
                }
            }
        }

        return services_config

Security Monitoring and Compliance

Security Command Center

# security_monitoring.py
from google.cloud import securitycenter_v1
from google.cloud import logging_v2
from google.cloud import monitoring_v3
import time

class SecurityMonitoring:
    """Comprehensive security monitoring and compliance."""

    def __init__(self, project_id, organization_id):
        self.project_id = project_id
        self.organization_id = organization_id
        self.scc_client = securitycenter_v1.SecurityCenterClient()
        self.logging_client = logging_v2.Client(project=project_id)
        self.monitoring_client = monitoring_v3.AlertPolicyServiceClient()

    def setup_security_monitoring(self):
        """Setup comprehensive security monitoring."""
        # Enable audit logs
        audit_configs = [
            {
                "service": "allServices",
                "audit_log_configs": [
                    {"log_type": "ADMIN_READ"},
                    {"log_type": "DATA_READ"},
                    {"log_type": "DATA_WRITE"}
                ]
            }
        ]

        # Create log sinks for security events
        security_sinks = {
            "admin-activity": {
                "filter": 'protoPayload.@type="type.googleapis.com/google.cloud.audit.AuditLog" '
                         'AND protoPayload.methodName=~".*Admin.*"',
                "destination": f"bigquery.googleapis.com/projects/{self.project_id}/datasets/security_logs"
            },
            "suspicious-activity": {
                "filter": 'protoPayload.authenticationInfo.principalEmail!~".*@company.com$" '
                         'OR protoPayload.requestMetadata.callerIp!~"^10\.|^172\.(1[6-9]|2[0-9]|3[01])\.|^192\.168\."',
                "destination": f"pubsub.googleapis.com/projects/{self.project_id}/topics/security-alerts"
            },
            "data-access": {
                "filter": 'protoPayload.@type="type.googleapis.com/google.cloud.audit.AuditLog" '
                         'AND (protoPayload.serviceName="storage.googleapis.com" '
                         'OR protoPayload.serviceName="bigquery.googleapis.com")',
                "destination": f"storage.googleapis.com/{self.project_id}-security-logs"
            }
        }

        created_sinks = {}
        for sink_name, config in security_sinks.items():
            sink = self.logging_client.sink(sink_name)
            sink.filter_ = config["filter"]
            sink.destination = config["destination"]
            sink.create()
            created_sinks[sink_name] = sink

        return created_sinks

    def create_security_alerts(self):
        """Create security monitoring alerts."""
        project_name = f"projects/{self.project_id}"

        alert_policies = [
            {
                "display_name": "Suspicious IAM Activity",
                "conditions": [{
                    "display_name": "IAM policy changes",
                    "condition_threshold": {
                        "filter": 'resource.type="project" '
                                 'AND metric.type="logging.googleapis.com/user/iam_policy_changes"',
                        "comparison": monitoring_v3.ComparisonType.COMPARISON_GT,
                        "threshold_value": 5,
                        "duration": {"seconds": 300}
                    }
                }],
                "notification_channels": [],
                "alert_strategy": {
                    "auto_close": {"seconds": 86400}
                }
            },
            {
                "display_name": "Failed Authentication Attempts",
                "conditions": [{
                    "display_name": "Multiple failed logins",
                    "condition_threshold": {
                        "filter": 'resource.type="global" '
                                 'AND metric.type="logging.googleapis.com/user/failed_login_attempts"',
                        "comparison": monitoring_v3.ComparisonType.COMPARISON_GT,
                        "threshold_value": 10,
                        "duration": {"seconds": 600}
                    }
                }]
            },
            {
                "display_name": "Data Exfiltration Detection",
                "conditions": [{
                    "display_name": "Unusual data transfer",
                    "condition_threshold": {
                        "filter": 'resource.type="gcs_bucket" '
                                 'AND metric.type="storage.googleapis.com/network/sent_bytes_count"',
                        "comparison": monitoring_v3.ComparisonType.COMPARISON_GT,
                        "threshold_value": 10737418240,  # 10 GB
                        "duration": {"seconds": 3600}
                    }
                }]
            }
        ]

        created_policies = []
        for policy_config in alert_policies:
            policy = monitoring_v3.AlertPolicy(policy_config)
            created_policy = self.monitoring_client.create_alert_policy(
                name=project_name,
                alert_policy=policy
            )
            created_policies.append(created_policy)

        return created_policies

    def setup_continuous_compliance(self):
        """Setup continuous compliance monitoring."""
        # Define compliance standards
        compliance_checks = {
            "cis-benchmark": {
                "checks": [
                    "ensure-audit-logging-enabled",
                    "ensure-oslogin-enabled",
                    "ensure-default-network-deleted",
                    "ensure-uniform-bucket-access",
                    "ensure-cmek-encryption"
                ]
            },
            "pci-dss": {
                "checks": [
                    "ensure-data-encryption-at-rest",
                    "ensure-network-segmentation",
                    "ensure-access-logging",
                    "ensure-key-rotation",
                    "ensure-vulnerability-scanning"
                ]
            },
            "hipaa": {
                "checks": [
                    "ensure-phi-encryption",
                    "ensure-audit-trail",
                    "ensure-access-controls",
                    "ensure-data-backup",
                    "ensure-incident-response"
                ]
            }
        }

        # Create custom finding sources in Security Command Center
        org_name = f"organizations/{self.organization_id}"

        for standard, config in compliance_checks.items():
            source = self.scc_client.create_source(
                parent=org_name,
                source={
                    "display_name": f"{standard.upper()} Compliance Checks",
                    "description": f"Automated compliance checks for {standard.upper()}"
                }
            )

            # Schedule compliance scans
            self.schedule_compliance_scan(source.name, config["checks"])

        return compliance_checks

Binary Authorization

# binary_authorization.py
from google.cloud import binaryauthorization_v1
from google.cloud import containeranalysis_v1
import hashlib
import base64

class BinaryAuthorizationManager:
    """Implement Binary Authorization for container security."""

    def __init__(self, project_id):
        self.project_id = project_id
        self.binauthz_client = binaryauthorization_v1.BinauthzManagementServiceV1Client()
        self.grafeas_client = containeranalysis_v1.GrafeasClient()

    def create_attestor(self, attestor_id, note_id):
        """Create Binary Authorization attestor."""
        parent = f"projects/{self.project_id}"

        # Create note for attestations
        note = {
            "kind": "ATTESTATION",
            "attestation": {
                "hint": {
                    "human_readable_name": f"Attestation for {attestor_id}"
                }
            }
        }

        note_name = f"projects/{self.project_id}/notes/{note_id}"
        self.grafeas_client.create_note(
            parent=parent,
            note_id=note_id,
            note=note
        )

        # Create attestor
        attestor = {
            "name": f"{parent}/attestors/{attestor_id}",
            "user_owned_grafeas_note": {
                "note_reference": note_name,
                "public_keys": [
                    {
                        "comment": "Production attestor key",
                        "id": "prod-key-1",
                        "pkix_public_key": {
                            "public_key_pem": self.generate_public_key(),
                            "signature_algorithm": "RSA_PSS_2048_SHA256"
                        }
                    }
                ]
            }
        }

        created_attestor = self.binauthz_client.create_attestor(
            parent=parent,
            attestor_id=attestor_id,
            attestor=attestor
        )

        return created_attestor

    def create_policy(self):
        """Create Binary Authorization policy."""
        policy = {
            "name": f"projects/{self.project_id}/policy",
            "global_policy_evaluation_mode": "ENABLE",
            "admission_whitelist_patterns": [
                {
                    "name_pattern": "gcr.io/my-project/break-glass/*"
                }
            ],
            "default_admission_rule": {
                "evaluation_mode": "REQUIRE_ATTESTATION",
                "enforcement_mode": "ENFORCED_BLOCK_AND_AUDIT_LOG",
                "require_attestations_by": [
                    f"projects/{self.project_id}/attestors/prod-attestor"
                ]
            },
            "cluster_admission_rules": {
                "us-central1-a.prod-cluster": {
                    "evaluation_mode": "REQUIRE_ATTESTATION",
                    "enforcement_mode": "ENFORCED_BLOCK_AND_AUDIT_LOG",
                    "require_attestations_by": [
                        f"projects/{self.project_id}/attestors/prod-attestor"
                    ]
                },
                "us-central1-a.dev-cluster": {
                    "evaluation_mode": "ALWAYS_ALLOW",
                    "enforcement_mode": "DRYRUN_AUDIT_LOG_ONLY"
                }
            }
        }

        updated_policy = self.binauthz_client.update_policy(
            policy=policy
        )

        return updated_policy

    def create_attestation(self, image_url, attestor_name):
        """Create attestation for container image."""
        # Get image digest
        image_digest = self.get_image_digest(image_url)

        # Create occurrence (attestation)
        occurrence = {
            "resource_uri": image_url,
            "note_name": f"projects/{self.project_id}/notes/prod-attestation",
            "kind": "ATTESTATION",
            "attestation": {
                "serialized_payload": base64.b64encode(
                    json.dumps({
                        "critical": {
                            "identity": {
                                "docker-reference": image_url
                            },
                            "image": {
                                "docker-manifest-digest": image_digest
                            },
                            "type": "Google cloud binauthz container signature"
                        }
                    }).encode()
                ).decode(),
                "signatures": [
                    {
                        "public_key_id": "prod-key-1",
                        "signature": self.sign_payload(image_digest)
                    }
                ]
            }
        }

        created_occurrence = self.grafeas_client.create_occurrence(
            parent=f"projects/{self.project_id}",
            occurrence=occurrence
        )

        return created_occurrence

Security Automation

Automated Security Response

# security_automation.py
from google.cloud import functions_v1
from google.cloud import workflows_v1
import yaml

class SecurityAutomation:
    """Automate security responses and remediation."""

    def __init__(self, project_id):
        self.project_id = project_id
        self.functions_client = functions_v1.CloudFunctionsServiceClient()
        self.workflows_client = workflows_v1.WorkflowsServiceClient()

    def create_security_response_workflow(self):
        """Create automated security response workflow."""
        workflow_definition = """
        main:
          params: [event]
          steps:
            - init:
                assign:
                  - severity: ${event.severity}
                  - finding_type: ${event.finding.category}
                  - resource: ${event.finding.resourceName}

            - evaluate_severity:
                switch:
                  - condition: ${severity == "CRITICAL"}
                    next: critical_response
                  - condition: ${severity == "HIGH"}
                    next: high_response
                  - condition: ${severity == "MEDIUM"}
                    next: medium_response
                  - condition: ${severity == "LOW"}
                    next: low_response

            - critical_response:
                parallel:
                  branches:
                    - isolate_resource:
                        call: isolate_compromised_resource
                        args:
                          resource: ${resource}
                    - notify_security:
                        call: send_security_alert
                        args:
                          severity: CRITICAL
                          details: ${event}
                    - create_incident:
                        call: create_security_incident
                        args:
                          severity: CRITICAL
                          finding: ${event.finding}

            - high_response:
                steps:
                  - apply_remediation:
                      call: auto_remediate
                      args:
                        finding_type: ${finding_type}
                        resource: ${resource}
                  - notify_team:
                      call: send_notification
                      args:
                        channel: security-alerts
                        message: ${event.finding.description}

            - medium_response:
                steps:
                  - create_ticket:
                      call: create_jira_ticket
                      args:
                        priority: Medium
                        summary: ${event.finding.category}
                        description: ${event.finding.description}

            - low_response:
                steps:
                  - log_finding:
                      call: log_security_finding
                      args:
                        finding: ${event.finding}
                        action: "Logged for review"
        """

        workflow = {
            "name": f"projects/{self.project_id}/locations/us-central1/workflows/security-response",
            "description": "Automated security incident response workflow",
            "source_contents": workflow_definition
        }

        operation = self.workflows_client.create_workflow(
            parent=f"projects/{self.project_id}/locations/us-central1",
            workflow=workflow,
            workflow_id="security-response"
        )

        return operation

    def create_remediation_functions(self):
        """Create Cloud Functions for automated remediation."""
        remediation_functions = {
            "isolate_instance": '''
                import googleapiclient.discovery

                def isolate_instance(request):
                    """Isolate compromised compute instance."""
                    compute = googleapiclient.discovery.build('compute', 'v1')

                    # Remove all network tags
                    instance_data = request.get_json()
                    project = instance_data['project']
                    zone = instance_data['zone']
                    instance = instance_data['instance']

                    # Apply isolation tag
                    compute.instances().setTags(
                        project=project,
                        zone=zone,
                        instance=instance,
                        body={'items': ['isolated'], 'fingerprint': instance_data['fingerprint']}
                    ).execute()

                    # Create snapshot for forensics
                    compute.disks().createSnapshot(
                        project=project,
                        zone=zone,
                        disk=instance,
                        body={'name': f'forensics-{instance}-{int(time.time())}'}
                    ).execute()

                    return {'status': 'isolated', 'instance': instance}
            ''',

            "revoke_iam_permissions": '''
                from google.cloud import resourcemanager_v3

                def revoke_permissions(request):
                    """Revoke IAM permissions for compromised account."""
                    client = resourcemanager_v3.ProjectsClient()

                    data = request.get_json()
                    project_id = data['project_id']
                    member = data['member']

                    # Get current policy
                    policy = client.get_iam_policy(
                        resource=f"projects/{project_id}"
                    )

                    # Remove member from all bindings
                    for binding in policy.bindings:
                        if member in binding.members:
                            binding.members.remove(member)

                    # Update policy
                    client.set_iam_policy(
                        resource=f"projects/{project_id}",
                        policy=policy
                    )

                    return {'status': 'revoked', 'member': member}
            ''',

            "block_suspicious_ip": '''
                from google.cloud import compute_v1

                def block_ip(request):
                    """Block suspicious IP in firewall."""
                    firewall_client = compute_v1.FirewallsClient()

                    data = request.get_json()
                    project_id = data['project_id']
                    ip_address = data['ip_address']

                    # Create deny rule
                    firewall_rule = {
                        'name': f'block-suspicious-ip-{ip_address.replace(".", "-")}',
                        'network': f'projects/{project_id}/global/networks/default',
                        'priority': 100,
                        'source_ranges': [ip_address],
                        'denied': [{'IPProtocol': 'all'}],
                        'direction': 'INGRESS',
                        'description': f'Block suspicious IP {ip_address}'
                    }

                    firewall_client.insert(
                        project=project_id,
                        firewall_resource=firewall_rule
                    )

                    return {'status': 'blocked', 'ip': ip_address}
            '''
        }

        deployed_functions = {}
        for func_name, source_code in remediation_functions.items():
            function = {
                "name": f"projects/{self.project_id}/locations/us-central1/functions/{func_name}",
                "source_code": {
                    "inline_code": source_code
                },
                "entry_point": func_name,
                "trigger": {
                    "event_trigger": {
                        "event_type": "providers/cloud.pubsub/eventTypes/topic.publish",
                        "resource": f"projects/{self.project_id}/topics/security-remediation"
                    }
                },
                "runtime": "python39"
            }

            # Deploy function (simplified - actual deployment would be more complex)
            deployed_functions[func_name] = function

        return deployed_functions

Compliance and Audit

Compliance Automation

# compliance_automation.py
from google.cloud import asset_v1
from google.cloud import cloudresourcemanager_v3
import pandas as pd

class ComplianceAutomation:
    """Automate compliance checks and reporting."""

    def __init__(self, project_id, organization_id):
        self.project_id = project_id
        self.organization_id = organization_id
        self.asset_client = asset_v1.AssetServiceClient()

    def generate_compliance_report(self, compliance_standard="cis"):
        """Generate comprehensive compliance report."""
        report_data = {
            "summary": {
                "organization": self.organization_id,
                "standard": compliance_standard,
                "scan_date": datetime.now().isoformat(),
                "total_checks": 0,
                "passed": 0,
                "failed": 0,
                "warnings": 0
            },
            "findings": []
        }

        # Run compliance checks
        if compliance_standard == "cis":
            checks = self.run_cis_benchmark_checks()
        elif compliance_standard == "pci":
            checks = self.run_pci_dss_checks()
        elif compliance_standard == "hipaa":
            checks = self.run_hipaa_checks()

        # Process results
        for check in checks:
            report_data["summary"]["total_checks"] += 1

            if check["status"] == "PASS":
                report_data["summary"]["passed"] += 1
            elif check["status"] == "FAIL":
                report_data["summary"]["failed"] += 1
            else:
                report_data["summary"]["warnings"] += 1

            report_data["findings"].append(check)

        # Generate report
        self.create_compliance_dashboard(report_data)

        return report_data

    def run_cis_benchmark_checks(self):
        """Run CIS Google Cloud Benchmark checks."""
        checks = []

        # Check 1.1 - Ensure that corporate login credentials are used
        check_result = {
            "check_id": "1.1",
            "title": "Ensure that corporate login credentials are used",
            "status": "PASS",
            "details": "",
            "remediation": ""
        }

        # Query IAM members
        policy = self.get_organization_iam_policy()
        non_corporate_users = []

        for binding in policy.bindings:
            for member in binding.members:
                if member.startswith("user:") and not member.endswith("@company.com"):
                    non_corporate_users.append(member)

        if non_corporate_users:
            check_result["status"] = "FAIL"
            check_result["details"] = f"Non-corporate users found: {non_corporate_users}"
            check_result["remediation"] = "Remove non-corporate users and enforce SSO"

        checks.append(check_result)

        # Check 1.2 - Ensure that multi-factor authentication is enabled
        mfa_check = {
            "check_id": "1.2",
            "title": "Ensure that multi-factor authentication is enabled",
            "status": "PASS",
            "details": "",
            "remediation": ""
        }

        # Check organization policy for MFA enforcement
        # This would query the organization's security policies

        checks.append(mfa_check)

        # Continue with other CIS checks...

        return checks

    def create_automated_remediation(self, finding):
        """Create automated remediation for compliance findings."""
        remediation_actions = {
            "1.1": self.remediate_non_corporate_users,
            "1.2": self.enforce_mfa,
            "1.3": self.enable_audit_logging,
            "2.1": self.remove_default_network,
            "3.1": self.enable_uniform_bucket_access
        }

        if finding["check_id"] in remediation_actions:
            action = remediation_actions[finding["check_id"]]
            result = action(finding)
            return result

        return {"status": "No automated remediation available"}

Best Practices Implementation

Security Best Practices Class

# security_best_practices.py
class SecurityBestPractices:
    """Implement Google Cloud security best practices."""

    def __init__(self, project_id, organization_id):
        self.project_id = project_id
        self.organization_id = organization_id

    def implement_zero_trust_architecture(self):
        """Implement Zero Trust security model."""
        zero_trust_components = {
            "identity_aware_proxy": {
                "enabled": True,
                "oauth_client_id": "YOUR_OAUTH_CLIENT_ID",
                "allowed_domains": ["company.com"]
            },
            "beyondcorp_enterprise": {
                "enabled": True,
                "access_levels": [
                    {
                        "name": "trusted_network",
                        "conditions": {
                            "ip_subnetworks": ["10.0.0.0/8"],
                            "device_policy": {
                                "require_corp_owned": True,
                                "require_screen_lock": True
                            }
                        }
                    }
                ]
            },
            "context_aware_access": {
                "policies": [
                    {
                        "name": "require_trusted_device",
                        "target": "apps/gmail",
                        "access_levels": ["trusted_network"]
                    }
                ]
            }
        }

        return zero_trust_components

    def security_checklist(self):
        """Comprehensive security checklist."""
        checklist = {
            "Identity and Access": [
                "Enable 2FA for all users",
                "Use service accounts sparingly",
                "Implement least privilege principle",
                "Regular IAM audit",
                "Use Workload Identity for GKE"
            ],
            "Network Security": [
                "Delete default VPC network",
                "Use Private Google Access",
                "Implement Cloud Armor",
                "Enable VPC Flow Logs",
                "Use Cloud NAT for outbound"
            ],
            "Data Protection": [
                "Enable CMEK for all services",
                "Implement DLP scanning",
                "Use Secret Manager",
                "Enable audit logging",
                "Regular backup testing"
            ],
            "Compliance": [
                "Regular compliance scans",
                "Automated remediation",
                "Continuous monitoring",
                "Incident response plan",
                "Security training"
            ]
        }

        return checklist

Conclusion

Google Cloud provides comprehensive security features that, when properly implemented, create a robust security posture. Key takeaways:

  1. Defense in Depth: Layer security controls at every level
  2. Least Privilege: Grant minimal required permissions
  3. Automation: Automate security responses and compliance
  4. Monitoring: Continuous monitoring and alerting
  5. Encryption: Encrypt data at rest and in transit

Next Steps

  • Implement Security Command Center for unified visibility
  • Deploy Cloud Armor for DDoS protection
  • Study Chronicle for security analytics
  • Explore BeyondCorp Enterprise for Zero Trust
  • Get certified as a Google Cloud Security Engineer

Remember: Security is not a destination but a continuous journey. Stay informed about new threats and continuously improve your security posture.