Google Cloud Security and IAM: Enterprise Security Guide
Security is paramount in cloud computing. This comprehensive guide covers Google Cloud's security features, Identity and Access Management (IAM), and best practices for building secure, compliant cloud environments.
Security Foundation
Shared Responsibility Model
Google Cloud operates on a shared responsibility model: - Google's Responsibility: Physical security, infrastructure, hypervisor, network - Customer's Responsibility: Data, identity, access management, application security, network controls
Identity and Access Management (IAM)
IAM Hierarchy and Inheritance
# iam_manager.py
from google.cloud import resourcemanager_v3
from google.cloud import iam_admin_v1
from google.cloud import service_usage_v1
import json
class IAMManager:
"""Comprehensive IAM management for Google Cloud."""
def __init__(self, project_id):
self.project_id = project_id
self.resource_client = resourcemanager_v3.ProjectsClient()
self.iam_client = iam_admin_v1.IAMClient()
self.service_usage_client = service_usage_v1.ServiceUsageClient()
def setup_organizational_hierarchy(self, org_id):
"""Setup organizational hierarchy with folders."""
folders = {
"production": {
"display_name": "Production",
"iam_bindings": [
{
"role": "roles/viewer",
"members": ["group:production-viewers@company.com"]
}
]
},
"development": {
"display_name": "Development",
"iam_bindings": [
{
"role": "roles/editor",
"members": ["group:developers@company.com"]
}
]
},
"shared-services": {
"display_name": "Shared Services",
"iam_bindings": [
{
"role": "roles/viewer",
"members": ["group:all-users@company.com"]
}
]
}
}
created_folders = {}
for folder_id, config in folders.items():
# Create folder
folder = self.create_folder(org_id, config['display_name'])
created_folders[folder_id] = folder
# Set IAM policy
self.set_folder_iam_policy(folder.name, config['iam_bindings'])
return created_folders
def implement_least_privilege(self, resource_name, principal, required_permissions):
"""Implement least privilege access."""
# Find minimal predefined role
minimal_role = self.find_minimal_role(required_permissions)
if not minimal_role:
# Create custom role if no suitable predefined role
custom_role = self.create_custom_role(
role_id=f"custom_{resource_name.replace('/', '_')}",
title=f"Custom role for {resource_name}",
permissions=required_permissions
)
minimal_role = custom_role.name
# Grant role to principal
self.add_iam_binding(resource_name, principal, minimal_role)
return minimal_role
def create_custom_role(self, role_id, title, permissions, description=""):
"""Create custom IAM role."""
parent = f"projects/{self.project_id}"
role = {
"role_id": role_id,
"role": {
"title": title,
"description": description,
"included_permissions": permissions,
"stage": "GA"
}
}
created_role = self.iam_client.create_role(
parent=parent,
role_id=role_id,
role=role["role"]
)
return created_role
def setup_workload_identity(self, namespace, service_account_name):
"""Setup Workload Identity for GKE."""
# Create Google Service Account
gsa_email = f"{service_account_name}@{self.project_id}.iam.gserviceaccount.com"
service_account = {
"account_id": service_account_name,
"service_account": {
"display_name": f"Workload Identity SA for {namespace}",
"description": "Service account for Workload Identity"
}
}
# Create service account
sa = self.iam_client.create_service_account(
name=f"projects/{self.project_id}",
account_id=service_account["account_id"],
service_account=service_account["service_account"]
)
# Allow Kubernetes SA to impersonate Google SA
policy_binding = {
"role": "roles/iam.workloadIdentityUser",
"members": [
f"serviceAccount:{self.project_id}.svc.id.goog[{namespace}/{service_account_name}]"
]
}
self.add_iam_binding(
f"projects/{self.project_id}/serviceAccounts/{gsa_email}",
policy_binding["members"][0],
policy_binding["role"]
)
return gsa_email
def implement_separation_of_duties(self):
"""Implement separation of duties with custom roles."""
roles = {
"security-admin": {
"title": "Security Administrator",
"description": "Manages security policies and IAM",
"permissions": [
"iam.roles.create",
"iam.roles.delete",
"iam.roles.update",
"iam.serviceAccounts.create",
"iam.serviceAccounts.delete",
"resourcemanager.projects.setIamPolicy",
"logging.logMetrics.create",
"monitoring.alertPolicies.create"
]
},
"network-admin": {
"title": "Network Administrator",
"description": "Manages network resources",
"permissions": [
"compute.networks.create",
"compute.networks.delete",
"compute.firewalls.create",
"compute.firewalls.delete",
"compute.routers.create",
"compute.vpnTunnels.create"
]
},
"developer": {
"title": "Developer",
"description": "Deploy and manage applications",
"permissions": [
"compute.instances.create",
"compute.instances.delete",
"container.clusters.get",
"container.pods.create",
"storage.objects.create",
"storage.objects.get"
]
},
"auditor": {
"title": "Auditor",
"description": "Read-only access for compliance",
"permissions": [
"compute.instances.list",
"iam.serviceAccounts.list",
"logging.logs.list",
"monitoring.timeSeries.list",
"resourcemanager.projects.get"
]
}
}
created_roles = {}
for role_id, config in roles.items():
role = self.create_custom_role(
role_id=role_id,
title=config["title"],
permissions=config["permissions"],
description=config["description"]
)
created_roles[role_id] = role
return created_roles
def setup_conditional_iam(self, resource, member, role, conditions):
"""Setup conditional IAM bindings."""
# Example conditions
condition_examples = {
"time_based": {
"expression": 'request.time < timestamp("2024-12-31T23:59:59.999Z")',
"title": "Expires end of 2024",
"description": "Access expires at the end of 2024"
},
"ip_based": {
"expression": 'origin.ip in ["203.0.113.0/24", "198.51.100.0/24"]',
"title": "Corporate network only",
"description": "Access only from corporate IP ranges"
},
"resource_based": {
"expression": 'resource.name.startsWith("projects/my-project/zones/us-central1-a")',
"title": "US Central only",
"description": "Access limited to us-central1-a resources"
},
"combined": {
"expression": '''
request.time < timestamp("2024-12-31T23:59:59.999Z") &&
origin.ip in ["203.0.113.0/24"] &&
resource.service == "compute.googleapis.com"
''',
"title": "Temporary compute access",
"description": "Temporary access to compute resources from corporate network"
}
}
# Get current IAM policy
policy = self.get_iam_policy(resource)
# Add conditional binding
binding = {
"role": role,
"members": [member],
"condition": conditions
}
policy.bindings.append(binding)
# Update policy
self.set_iam_policy(resource, policy)
return binding
Service Account Security
# service_account_security.py
from google.cloud import iam_admin_v1
from google.oauth2 import service_account
import json
import time
class ServiceAccountSecurity:
"""Secure service account management."""
def __init__(self, project_id):
self.project_id = project_id
self.iam_client = iam_admin_v1.IAMClient()
def create_least_privilege_service_account(self, name, roles, description=""):
"""Create service account with minimal permissions."""
# Create service account
service_account = self.iam_client.create_service_account(
name=f"projects/{self.project_id}",
account_id=name,
service_account={
"display_name": name,
"description": description
}
)
# Grant minimal roles
for role in roles:
self.grant_role_to_service_account(service_account.email, role)
# Enable audit logging for this service account
self.enable_service_account_audit_logging(service_account.email)
return service_account
def implement_key_rotation(self, service_account_email):
"""Implement automatic key rotation."""
# List existing keys
keys = self.iam_client.list_service_account_keys(
name=f"projects/{self.project_id}/serviceAccounts/{service_account_email}"
)
# Delete keys older than 90 days
for key in keys.keys:
if self.is_key_expired(key.valid_after_time, days=90):
self.iam_client.delete_service_account_key(name=key.name)
print(f"Deleted expired key: {key.name}")
# Create new key
new_key = self.iam_client.create_service_account_key(
name=f"projects/{self.project_id}/serviceAccounts/{service_account_email}",
private_key_type="TYPE_GOOGLE_CREDENTIALS_FILE"
)
# Store in Secret Manager (implementation would go here)
self.store_key_in_secret_manager(new_key)
return new_key
def implement_impersonation_chain(self, target_sa, impersonator_sa):
"""Setup service account impersonation chain."""
# Grant impersonation permission
policy_binding = {
"role": "roles/iam.serviceAccountTokenCreator",
"members": [f"serviceAccount:{impersonator_sa}"]
}
self.add_iam_binding(
f"projects/{self.project_id}/serviceAccounts/{target_sa}",
policy_binding["members"][0],
policy_binding["role"]
)
# Log impersonation setup
print(f"Allowed {impersonator_sa} to impersonate {target_sa}")
return True
def setup_short_lived_credentials(self, service_account_email, lifetime_seconds=3600):
"""Generate short-lived credentials."""
# This would typically use STS API
credentials_config = {
"type": "service_account",
"audience": f"https://{self.project_id}.iam.gserviceaccount.com",
"subject_token_type": "urn:ietf:params:oauth:token-type:jwt",
"token_lifetime_seconds": lifetime_seconds,
"service_account": service_account_email
}
# Generate short-lived token
# Implementation would use Google STS API
return credentials_config
VPC Security
VPC Service Controls
# vpc_security.py
from google.cloud import accesscontextmanager_v1
from google.cloud import compute_v1
import ipaddress
class VPCSecurityManager:
"""Manage VPC security controls."""
def __init__(self, project_id):
self.project_id = project_id
self.access_client = accesscontextmanager_v1.AccessContextManagerClient()
self.compute_client = compute_v1.NetworksClient()
def create_vpc_service_perimeter(self, perimeter_name, protected_projects,
restricted_services):
"""Create VPC Service Controls perimeter."""
# Create access policy if not exists
policy_name = f"organizations/{self.get_org_id()}/accessPolicies/default"
# Define perimeter
service_perimeter = {
"name": f"{policy_name}/servicePerimeters/{perimeter_name}",
"title": perimeter_name,
"perimeter_type": "PERIMETER_TYPE_REGULAR",
"status": {
"resources": [f"projects/{project}" for project in protected_projects],
"restricted_services": restricted_services,
"vpc_accessible_services": {
"enable_restriction": True,
"allowed_services": ["storage.googleapis.com", "bigquery.googleapis.com"]
}
}
}
# Create perimeter
operation = self.access_client.create_service_perimeter(
parent=policy_name,
service_perimeter=service_perimeter
)
return operation
def setup_private_google_access(self, network_name, subnet_name, region):
"""Enable Private Google Access for subnet."""
subnet_client = compute_v1.SubnetworksClient()
# Get subnet
subnet = subnet_client.get(
project=self.project_id,
region=region,
subnetwork=subnet_name
)
# Enable Private Google Access
subnet.private_ip_google_access = True
# Update subnet
operation = subnet_client.patch(
project=self.project_id,
region=region,
subnetwork=subnet_name,
subnetwork_resource=subnet
)
return operation
def create_cloud_nat(self, router_name, region):
"""Create Cloud NAT for secure outbound connectivity."""
router_client = compute_v1.RoutersClient()
nat_config = {
"name": f"{router_name}-nat",
"nat_ip_allocate_option": "AUTO_ONLY",
"source_subnetwork_ip_ranges_to_nat": "ALL_SUBNETWORKS_ALL_IP_RANGES",
"log_config": {
"enable": True,
"filter": "ERRORS_ONLY"
},
"min_ports_per_vm": 64,
"max_ports_per_vm": 2048,
"enable_endpoint_independent_mapping": True
}
# Add NAT to router
router = router_client.get(
project=self.project_id,
region=region,
router=router_name
)
router.nats = [nat_config]
operation = router_client.update(
project=self.project_id,
region=region,
router=router_name,
router_resource=router
)
return operation
def implement_firewall_rules(self, network_name):
"""Implement comprehensive firewall rules."""
firewall_client = compute_v1.FirewallsClient()
# Define security rules
firewall_rules = [
{
"name": f"{network_name}-deny-all-ingress",
"direction": "INGRESS",
"priority": 65534,
"source_ranges": ["0.0.0.0/0"],
"denied": [{"IP_protocol": "all"}],
"description": "Deny all ingress traffic by default"
},
{
"name": f"{network_name}-allow-internal",
"direction": "INGRESS",
"priority": 1000,
"source_ranges": ["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"],
"allowed": [{"IP_protocol": "tcp"}, {"IP_protocol": "udp"}, {"IP_protocol": "icmp"}],
"description": "Allow internal RFC1918 traffic"
},
{
"name": f"{network_name}-allow-ssh-from-iap",
"direction": "INGRESS",
"priority": 1001,
"source_ranges": ["35.235.240.0/20"], # IAP range
"allowed": [{"IP_protocol": "tcp", "ports": ["22"]}],
"target_tags": ["ssh-enabled"],
"description": "Allow SSH from Identity-Aware Proxy"
},
{
"name": f"{network_name}-allow-health-checks",
"direction": "INGRESS",
"priority": 1002,
"source_ranges": ["35.191.0.0/16", "130.211.0.0/22"], # Google health check ranges
"allowed": [{"IP_protocol": "tcp", "ports": ["80", "443"]}],
"target_tags": ["http-server", "https-server"],
"description": "Allow Google Cloud health checks"
}
]
created_rules = []
for rule_config in firewall_rules:
rule_config["network"] = f"projects/{self.project_id}/global/networks/{network_name}"
operation = firewall_client.insert(
project=self.project_id,
firewall_resource=rule_config
)
created_rules.append(operation)
return created_rules
Data Security
Data Loss Prevention (DLP)
# dlp_security.py
from google.cloud import dlp_v2
from google.cloud import storage
import json
class DLPSecurityManager:
"""Implement Data Loss Prevention controls."""
def __init__(self, project_id):
self.project_id = project_id
self.dlp_client = dlp_v2.DlpServiceClient()
self.parent = f"projects/{project_id}/locations/global"
def create_dlp_inspection_templates(self):
"""Create DLP inspection templates for common use cases."""
templates = {
"pii_detection": {
"display_name": "PII Detection Template",
"description": "Detect personally identifiable information",
"info_types": [
{"name": "EMAIL_ADDRESS"},
{"name": "PHONE_NUMBER"},
{"name": "CREDIT_CARD_NUMBER"},
{"name": "US_SOCIAL_SECURITY_NUMBER"},
{"name": "PASSPORT"},
{"name": "DATE_OF_BIRTH"}
],
"min_likelihood": "LIKELY",
"include_quote": True
},
"financial_data": {
"display_name": "Financial Data Template",
"description": "Detect financial information",
"info_types": [
{"name": "CREDIT_CARD_NUMBER"},
{"name": "IBAN_CODE"},
{"name": "SWIFT_CODE"},
{"name": "US_BANK_ROUTING_MICR"},
{"name": "CRYPTO_WALLET"}
],
"min_likelihood": "POSSIBLE",
"include_quote": True
},
"healthcare_data": {
"display_name": "Healthcare Data Template",
"description": "Detect healthcare information",
"info_types": [
{"name": "US_HEALTHCARE_NPI"},
{"name": "MEDICAL_RECORD_NUMBER"},
{"name": "FDA_CODE"},
{"name": "ICD9_CODE"},
{"name": "ICD10_CODE"}
],
"min_likelihood": "LIKELY",
"include_quote": True
}
}
created_templates = {}
for template_id, config in templates.items():
inspect_config = {
"info_types": config["info_types"],
"min_likelihood": config["min_likelihood"],
"include_quote": config["include_quote"],
"limits": {
"max_findings_per_request": 100
}
}
template = self.dlp_client.create_inspect_template(
parent=self.parent,
inspect_template={
"display_name": config["display_name"],
"description": config["description"],
"inspect_config": inspect_config
}
)
created_templates[template_id] = template
return created_templates
def create_deidentify_template(self):
"""Create de-identification template."""
deidentify_config = {
"info_type_transformations": {
"transformations": [
{
"info_types": [{"name": "EMAIL_ADDRESS"}],
"primitive_transformation": {
"replace_config": {
"new_value": {"string_value": "[EMAIL_REDACTED]"}
}
}
},
{
"info_types": [{"name": "PHONE_NUMBER"}],
"primitive_transformation": {
"character_mask_config": {
"masking_character": "*",
"number_to_mask": 6
}
}
},
{
"info_types": [{"name": "CREDIT_CARD_NUMBER"}],
"primitive_transformation": {
"crypto_replace_ffx_fpe_config": {
"crypto_key": {
"kms_wrapped": {
"wrapped_key": "base64_encoded_key_here",
"crypto_key_name": f"projects/{self.project_id}/locations/global/keyRings/dlp-keyring/cryptoKeys/dlp-key"
}
},
"surrogate_info_type": {"name": "ENCRYPTED_CREDIT_CARD"}
}
}
}
]
}
}
template = self.dlp_client.create_deidentify_template(
parent=self.parent,
deidentify_template={
"display_name": "Standard De-identification Template",
"description": "De-identify common PII types",
"deidentify_config": deidentify_config
}
)
return template
def scan_storage_bucket(self, bucket_name, inspect_template_name):
"""Scan Cloud Storage bucket for sensitive data."""
storage_config = {
"cloud_storage_options": {
"file_set": {
"url": f"gs://{bucket_name}/**"
},
"file_types": ["TEXT_FILE", "CSV", "JSON"],
"sample_method": "TOP",
"files_limit_percent": 10
}
}
inspect_job = {
"inspect_template_name": inspect_template_name,
"storage_config": storage_config,
"actions": [
{
"save_findings": {
"output_config": {
"table": {
"project_id": self.project_id,
"dataset_id": "dlp_findings",
"table_id": f"scan_{bucket_name}"
}
}
}
},
{
"pub_sub": {
"topic": f"projects/{self.project_id}/topics/dlp-findings"
}
}
]
}
job = self.dlp_client.create_dlp_job(
parent=self.parent,
inspect_job=inspect_job
)
return job
Encryption and Key Management
Cloud KMS Implementation
# kms_security.py
from google.cloud import kms
from cryptography.fernet import Fernet
import base64
import os
class KMSSecurityManager:
"""Manage encryption keys with Cloud KMS."""
def __init__(self, project_id):
self.project_id = project_id
self.kms_client = kms.KeyManagementServiceClient()
def setup_key_hierarchy(self):
"""Setup comprehensive key hierarchy."""
key_rings = {
"application-keys": {
"location": "global",
"keys": {
"database-encryption": {
"purpose": "ENCRYPT_DECRYPT",
"rotation_period": "7776000s", # 90 days
"algorithm": "GOOGLE_SYMMETRIC_ENCRYPTION"
},
"api-signing": {
"purpose": "ASYMMETRIC_SIGN",
"algorithm": "RSA_SIGN_PSS_2048_SHA256"
},
"config-encryption": {
"purpose": "ENCRYPT_DECRYPT",
"rotation_period": "2592000s", # 30 days
"algorithm": "GOOGLE_SYMMETRIC_ENCRYPTION"
}
}
},
"infrastructure-keys": {
"location": "us-central1",
"keys": {
"disk-encryption": {
"purpose": "ENCRYPT_DECRYPT",
"rotation_period": "31536000s", # 1 year
"algorithm": "GOOGLE_SYMMETRIC_ENCRYPTION"
},
"backup-encryption": {
"purpose": "ENCRYPT_DECRYPT",
"rotation_period": "15552000s", # 180 days
"algorithm": "GOOGLE_SYMMETRIC_ENCRYPTION"
}
}
}
}
created_resources = {}
for ring_name, ring_config in key_rings.items():
# Create key ring
ring_parent = f"projects/{self.project_id}/locations/{ring_config['location']}"
key_ring = self.kms_client.create_key_ring(
parent=ring_parent,
key_ring_id=ring_name,
key_ring={}
)
created_resources[ring_name] = {"ring": key_ring, "keys": {}}
# Create crypto keys
for key_name, key_config in ring_config['keys'].items():
crypto_key = {
"purpose": key_config["purpose"],
"version_template": {
"algorithm": key_config["algorithm"]
}
}
if "rotation_period" in key_config:
crypto_key["rotation_period"] = key_config["rotation_period"]
crypto_key["next_rotation_time"] = {"seconds": int(time.time()) + 86400}
created_key = self.kms_client.create_crypto_key(
parent=key_ring.name,
crypto_key_id=key_name,
crypto_key=crypto_key
)
created_resources[ring_name]["keys"][key_name] = created_key
return created_resources
def implement_envelope_encryption(self, plaintext_data, kms_key_name):
"""Implement envelope encryption pattern."""
# Generate DEK (Data Encryption Key)
dek = Fernet.generate_key()
# Encrypt data with DEK
f = Fernet(dek)
encrypted_data = f.encrypt(plaintext_data.encode())
# Encrypt DEK with KEK (Key Encryption Key) from KMS
encrypt_response = self.kms_client.encrypt(
name=kms_key_name,
plaintext=dek
)
# Return encrypted data and encrypted DEK
return {
"encrypted_data": base64.b64encode(encrypted_data).decode(),
"encrypted_dek": base64.b64encode(encrypt_response.ciphertext).decode(),
"kms_key_name": kms_key_name
}
def decrypt_envelope_encryption(self, encrypted_package):
"""Decrypt envelope encrypted data."""
# Decrypt DEK using KMS
decrypt_response = self.kms_client.decrypt(
name=encrypted_package["kms_key_name"],
ciphertext=base64.b64decode(encrypted_package["encrypted_dek"])
)
# Decrypt data using DEK
f = Fernet(decrypt_response.plaintext)
decrypted_data = f.decrypt(base64.b64decode(encrypted_package["encrypted_data"]))
return decrypted_data.decode()
def setup_cmek_for_services(self):
"""Setup Customer-Managed Encryption Keys for GCP services."""
services_config = {
"compute": {
"key": "disk-encryption",
"policy": {
"default_kms_key_name": f"projects/{self.project_id}/locations/us-central1/keyRings/infrastructure-keys/cryptoKeys/disk-encryption"
}
},
"storage": {
"key": "backup-encryption",
"bucket_config": {
"encryption": {
"default_kms_key_name": f"projects/{self.project_id}/locations/us-central1/keyRings/infrastructure-keys/cryptoKeys/backup-encryption"
}
}
},
"bigquery": {
"key": "database-encryption",
"dataset_config": {
"default_encryption_configuration": {
"kms_key_name": f"projects/{self.project_id}/locations/global/keyRings/application-keys/cryptoKeys/database-encryption"
}
}
}
}
return services_config
Security Monitoring and Compliance
Security Command Center
# security_monitoring.py
from google.cloud import securitycenter_v1
from google.cloud import logging_v2
from google.cloud import monitoring_v3
import time
class SecurityMonitoring:
"""Comprehensive security monitoring and compliance."""
def __init__(self, project_id, organization_id):
self.project_id = project_id
self.organization_id = organization_id
self.scc_client = securitycenter_v1.SecurityCenterClient()
self.logging_client = logging_v2.Client(project=project_id)
self.monitoring_client = monitoring_v3.AlertPolicyServiceClient()
def setup_security_monitoring(self):
"""Setup comprehensive security monitoring."""
# Enable audit logs
audit_configs = [
{
"service": "allServices",
"audit_log_configs": [
{"log_type": "ADMIN_READ"},
{"log_type": "DATA_READ"},
{"log_type": "DATA_WRITE"}
]
}
]
# Create log sinks for security events
security_sinks = {
"admin-activity": {
"filter": 'protoPayload.@type="type.googleapis.com/google.cloud.audit.AuditLog" '
'AND protoPayload.methodName=~".*Admin.*"',
"destination": f"bigquery.googleapis.com/projects/{self.project_id}/datasets/security_logs"
},
"suspicious-activity": {
"filter": 'protoPayload.authenticationInfo.principalEmail!~".*@company.com$" '
'OR protoPayload.requestMetadata.callerIp!~"^10\.|^172\.(1[6-9]|2[0-9]|3[01])\.|^192\.168\."',
"destination": f"pubsub.googleapis.com/projects/{self.project_id}/topics/security-alerts"
},
"data-access": {
"filter": 'protoPayload.@type="type.googleapis.com/google.cloud.audit.AuditLog" '
'AND (protoPayload.serviceName="storage.googleapis.com" '
'OR protoPayload.serviceName="bigquery.googleapis.com")',
"destination": f"storage.googleapis.com/{self.project_id}-security-logs"
}
}
created_sinks = {}
for sink_name, config in security_sinks.items():
sink = self.logging_client.sink(sink_name)
sink.filter_ = config["filter"]
sink.destination = config["destination"]
sink.create()
created_sinks[sink_name] = sink
return created_sinks
def create_security_alerts(self):
"""Create security monitoring alerts."""
project_name = f"projects/{self.project_id}"
alert_policies = [
{
"display_name": "Suspicious IAM Activity",
"conditions": [{
"display_name": "IAM policy changes",
"condition_threshold": {
"filter": 'resource.type="project" '
'AND metric.type="logging.googleapis.com/user/iam_policy_changes"',
"comparison": monitoring_v3.ComparisonType.COMPARISON_GT,
"threshold_value": 5,
"duration": {"seconds": 300}
}
}],
"notification_channels": [],
"alert_strategy": {
"auto_close": {"seconds": 86400}
}
},
{
"display_name": "Failed Authentication Attempts",
"conditions": [{
"display_name": "Multiple failed logins",
"condition_threshold": {
"filter": 'resource.type="global" '
'AND metric.type="logging.googleapis.com/user/failed_login_attempts"',
"comparison": monitoring_v3.ComparisonType.COMPARISON_GT,
"threshold_value": 10,
"duration": {"seconds": 600}
}
}]
},
{
"display_name": "Data Exfiltration Detection",
"conditions": [{
"display_name": "Unusual data transfer",
"condition_threshold": {
"filter": 'resource.type="gcs_bucket" '
'AND metric.type="storage.googleapis.com/network/sent_bytes_count"',
"comparison": monitoring_v3.ComparisonType.COMPARISON_GT,
"threshold_value": 10737418240, # 10 GB
"duration": {"seconds": 3600}
}
}]
}
]
created_policies = []
for policy_config in alert_policies:
policy = monitoring_v3.AlertPolicy(policy_config)
created_policy = self.monitoring_client.create_alert_policy(
name=project_name,
alert_policy=policy
)
created_policies.append(created_policy)
return created_policies
def setup_continuous_compliance(self):
"""Setup continuous compliance monitoring."""
# Define compliance standards
compliance_checks = {
"cis-benchmark": {
"checks": [
"ensure-audit-logging-enabled",
"ensure-oslogin-enabled",
"ensure-default-network-deleted",
"ensure-uniform-bucket-access",
"ensure-cmek-encryption"
]
},
"pci-dss": {
"checks": [
"ensure-data-encryption-at-rest",
"ensure-network-segmentation",
"ensure-access-logging",
"ensure-key-rotation",
"ensure-vulnerability-scanning"
]
},
"hipaa": {
"checks": [
"ensure-phi-encryption",
"ensure-audit-trail",
"ensure-access-controls",
"ensure-data-backup",
"ensure-incident-response"
]
}
}
# Create custom finding sources in Security Command Center
org_name = f"organizations/{self.organization_id}"
for standard, config in compliance_checks.items():
source = self.scc_client.create_source(
parent=org_name,
source={
"display_name": f"{standard.upper()} Compliance Checks",
"description": f"Automated compliance checks for {standard.upper()}"
}
)
# Schedule compliance scans
self.schedule_compliance_scan(source.name, config["checks"])
return compliance_checks
Binary Authorization
# binary_authorization.py
from google.cloud import binaryauthorization_v1
from google.cloud import containeranalysis_v1
import hashlib
import base64
class BinaryAuthorizationManager:
"""Implement Binary Authorization for container security."""
def __init__(self, project_id):
self.project_id = project_id
self.binauthz_client = binaryauthorization_v1.BinauthzManagementServiceV1Client()
self.grafeas_client = containeranalysis_v1.GrafeasClient()
def create_attestor(self, attestor_id, note_id):
"""Create Binary Authorization attestor."""
parent = f"projects/{self.project_id}"
# Create note for attestations
note = {
"kind": "ATTESTATION",
"attestation": {
"hint": {
"human_readable_name": f"Attestation for {attestor_id}"
}
}
}
note_name = f"projects/{self.project_id}/notes/{note_id}"
self.grafeas_client.create_note(
parent=parent,
note_id=note_id,
note=note
)
# Create attestor
attestor = {
"name": f"{parent}/attestors/{attestor_id}",
"user_owned_grafeas_note": {
"note_reference": note_name,
"public_keys": [
{
"comment": "Production attestor key",
"id": "prod-key-1",
"pkix_public_key": {
"public_key_pem": self.generate_public_key(),
"signature_algorithm": "RSA_PSS_2048_SHA256"
}
}
]
}
}
created_attestor = self.binauthz_client.create_attestor(
parent=parent,
attestor_id=attestor_id,
attestor=attestor
)
return created_attestor
def create_policy(self):
"""Create Binary Authorization policy."""
policy = {
"name": f"projects/{self.project_id}/policy",
"global_policy_evaluation_mode": "ENABLE",
"admission_whitelist_patterns": [
{
"name_pattern": "gcr.io/my-project/break-glass/*"
}
],
"default_admission_rule": {
"evaluation_mode": "REQUIRE_ATTESTATION",
"enforcement_mode": "ENFORCED_BLOCK_AND_AUDIT_LOG",
"require_attestations_by": [
f"projects/{self.project_id}/attestors/prod-attestor"
]
},
"cluster_admission_rules": {
"us-central1-a.prod-cluster": {
"evaluation_mode": "REQUIRE_ATTESTATION",
"enforcement_mode": "ENFORCED_BLOCK_AND_AUDIT_LOG",
"require_attestations_by": [
f"projects/{self.project_id}/attestors/prod-attestor"
]
},
"us-central1-a.dev-cluster": {
"evaluation_mode": "ALWAYS_ALLOW",
"enforcement_mode": "DRYRUN_AUDIT_LOG_ONLY"
}
}
}
updated_policy = self.binauthz_client.update_policy(
policy=policy
)
return updated_policy
def create_attestation(self, image_url, attestor_name):
"""Create attestation for container image."""
# Get image digest
image_digest = self.get_image_digest(image_url)
# Create occurrence (attestation)
occurrence = {
"resource_uri": image_url,
"note_name": f"projects/{self.project_id}/notes/prod-attestation",
"kind": "ATTESTATION",
"attestation": {
"serialized_payload": base64.b64encode(
json.dumps({
"critical": {
"identity": {
"docker-reference": image_url
},
"image": {
"docker-manifest-digest": image_digest
},
"type": "Google cloud binauthz container signature"
}
}).encode()
).decode(),
"signatures": [
{
"public_key_id": "prod-key-1",
"signature": self.sign_payload(image_digest)
}
]
}
}
created_occurrence = self.grafeas_client.create_occurrence(
parent=f"projects/{self.project_id}",
occurrence=occurrence
)
return created_occurrence
Security Automation
Automated Security Response
# security_automation.py
from google.cloud import functions_v1
from google.cloud import workflows_v1
import yaml
class SecurityAutomation:
"""Automate security responses and remediation."""
def __init__(self, project_id):
self.project_id = project_id
self.functions_client = functions_v1.CloudFunctionsServiceClient()
self.workflows_client = workflows_v1.WorkflowsServiceClient()
def create_security_response_workflow(self):
"""Create automated security response workflow."""
workflow_definition = """
main:
params: [event]
steps:
- init:
assign:
- severity: ${event.severity}
- finding_type: ${event.finding.category}
- resource: ${event.finding.resourceName}
- evaluate_severity:
switch:
- condition: ${severity == "CRITICAL"}
next: critical_response
- condition: ${severity == "HIGH"}
next: high_response
- condition: ${severity == "MEDIUM"}
next: medium_response
- condition: ${severity == "LOW"}
next: low_response
- critical_response:
parallel:
branches:
- isolate_resource:
call: isolate_compromised_resource
args:
resource: ${resource}
- notify_security:
call: send_security_alert
args:
severity: CRITICAL
details: ${event}
- create_incident:
call: create_security_incident
args:
severity: CRITICAL
finding: ${event.finding}
- high_response:
steps:
- apply_remediation:
call: auto_remediate
args:
finding_type: ${finding_type}
resource: ${resource}
- notify_team:
call: send_notification
args:
channel: security-alerts
message: ${event.finding.description}
- medium_response:
steps:
- create_ticket:
call: create_jira_ticket
args:
priority: Medium
summary: ${event.finding.category}
description: ${event.finding.description}
- low_response:
steps:
- log_finding:
call: log_security_finding
args:
finding: ${event.finding}
action: "Logged for review"
"""
workflow = {
"name": f"projects/{self.project_id}/locations/us-central1/workflows/security-response",
"description": "Automated security incident response workflow",
"source_contents": workflow_definition
}
operation = self.workflows_client.create_workflow(
parent=f"projects/{self.project_id}/locations/us-central1",
workflow=workflow,
workflow_id="security-response"
)
return operation
def create_remediation_functions(self):
"""Create Cloud Functions for automated remediation."""
remediation_functions = {
"isolate_instance": '''
import googleapiclient.discovery
def isolate_instance(request):
"""Isolate compromised compute instance."""
compute = googleapiclient.discovery.build('compute', 'v1')
# Remove all network tags
instance_data = request.get_json()
project = instance_data['project']
zone = instance_data['zone']
instance = instance_data['instance']
# Apply isolation tag
compute.instances().setTags(
project=project,
zone=zone,
instance=instance,
body={'items': ['isolated'], 'fingerprint': instance_data['fingerprint']}
).execute()
# Create snapshot for forensics
compute.disks().createSnapshot(
project=project,
zone=zone,
disk=instance,
body={'name': f'forensics-{instance}-{int(time.time())}'}
).execute()
return {'status': 'isolated', 'instance': instance}
''',
"revoke_iam_permissions": '''
from google.cloud import resourcemanager_v3
def revoke_permissions(request):
"""Revoke IAM permissions for compromised account."""
client = resourcemanager_v3.ProjectsClient()
data = request.get_json()
project_id = data['project_id']
member = data['member']
# Get current policy
policy = client.get_iam_policy(
resource=f"projects/{project_id}"
)
# Remove member from all bindings
for binding in policy.bindings:
if member in binding.members:
binding.members.remove(member)
# Update policy
client.set_iam_policy(
resource=f"projects/{project_id}",
policy=policy
)
return {'status': 'revoked', 'member': member}
''',
"block_suspicious_ip": '''
from google.cloud import compute_v1
def block_ip(request):
"""Block suspicious IP in firewall."""
firewall_client = compute_v1.FirewallsClient()
data = request.get_json()
project_id = data['project_id']
ip_address = data['ip_address']
# Create deny rule
firewall_rule = {
'name': f'block-suspicious-ip-{ip_address.replace(".", "-")}',
'network': f'projects/{project_id}/global/networks/default',
'priority': 100,
'source_ranges': [ip_address],
'denied': [{'IPProtocol': 'all'}],
'direction': 'INGRESS',
'description': f'Block suspicious IP {ip_address}'
}
firewall_client.insert(
project=project_id,
firewall_resource=firewall_rule
)
return {'status': 'blocked', 'ip': ip_address}
'''
}
deployed_functions = {}
for func_name, source_code in remediation_functions.items():
function = {
"name": f"projects/{self.project_id}/locations/us-central1/functions/{func_name}",
"source_code": {
"inline_code": source_code
},
"entry_point": func_name,
"trigger": {
"event_trigger": {
"event_type": "providers/cloud.pubsub/eventTypes/topic.publish",
"resource": f"projects/{self.project_id}/topics/security-remediation"
}
},
"runtime": "python39"
}
# Deploy function (simplified - actual deployment would be more complex)
deployed_functions[func_name] = function
return deployed_functions
Compliance and Audit
Compliance Automation
# compliance_automation.py
from google.cloud import asset_v1
from google.cloud import cloudresourcemanager_v3
import pandas as pd
class ComplianceAutomation:
"""Automate compliance checks and reporting."""
def __init__(self, project_id, organization_id):
self.project_id = project_id
self.organization_id = organization_id
self.asset_client = asset_v1.AssetServiceClient()
def generate_compliance_report(self, compliance_standard="cis"):
"""Generate comprehensive compliance report."""
report_data = {
"summary": {
"organization": self.organization_id,
"standard": compliance_standard,
"scan_date": datetime.now().isoformat(),
"total_checks": 0,
"passed": 0,
"failed": 0,
"warnings": 0
},
"findings": []
}
# Run compliance checks
if compliance_standard == "cis":
checks = self.run_cis_benchmark_checks()
elif compliance_standard == "pci":
checks = self.run_pci_dss_checks()
elif compliance_standard == "hipaa":
checks = self.run_hipaa_checks()
# Process results
for check in checks:
report_data["summary"]["total_checks"] += 1
if check["status"] == "PASS":
report_data["summary"]["passed"] += 1
elif check["status"] == "FAIL":
report_data["summary"]["failed"] += 1
else:
report_data["summary"]["warnings"] += 1
report_data["findings"].append(check)
# Generate report
self.create_compliance_dashboard(report_data)
return report_data
def run_cis_benchmark_checks(self):
"""Run CIS Google Cloud Benchmark checks."""
checks = []
# Check 1.1 - Ensure that corporate login credentials are used
check_result = {
"check_id": "1.1",
"title": "Ensure that corporate login credentials are used",
"status": "PASS",
"details": "",
"remediation": ""
}
# Query IAM members
policy = self.get_organization_iam_policy()
non_corporate_users = []
for binding in policy.bindings:
for member in binding.members:
if member.startswith("user:") and not member.endswith("@company.com"):
non_corporate_users.append(member)
if non_corporate_users:
check_result["status"] = "FAIL"
check_result["details"] = f"Non-corporate users found: {non_corporate_users}"
check_result["remediation"] = "Remove non-corporate users and enforce SSO"
checks.append(check_result)
# Check 1.2 - Ensure that multi-factor authentication is enabled
mfa_check = {
"check_id": "1.2",
"title": "Ensure that multi-factor authentication is enabled",
"status": "PASS",
"details": "",
"remediation": ""
}
# Check organization policy for MFA enforcement
# This would query the organization's security policies
checks.append(mfa_check)
# Continue with other CIS checks...
return checks
def create_automated_remediation(self, finding):
"""Create automated remediation for compliance findings."""
remediation_actions = {
"1.1": self.remediate_non_corporate_users,
"1.2": self.enforce_mfa,
"1.3": self.enable_audit_logging,
"2.1": self.remove_default_network,
"3.1": self.enable_uniform_bucket_access
}
if finding["check_id"] in remediation_actions:
action = remediation_actions[finding["check_id"]]
result = action(finding)
return result
return {"status": "No automated remediation available"}
Best Practices Implementation
Security Best Practices Class
# security_best_practices.py
class SecurityBestPractices:
"""Implement Google Cloud security best practices."""
def __init__(self, project_id, organization_id):
self.project_id = project_id
self.organization_id = organization_id
def implement_zero_trust_architecture(self):
"""Implement Zero Trust security model."""
zero_trust_components = {
"identity_aware_proxy": {
"enabled": True,
"oauth_client_id": "YOUR_OAUTH_CLIENT_ID",
"allowed_domains": ["company.com"]
},
"beyondcorp_enterprise": {
"enabled": True,
"access_levels": [
{
"name": "trusted_network",
"conditions": {
"ip_subnetworks": ["10.0.0.0/8"],
"device_policy": {
"require_corp_owned": True,
"require_screen_lock": True
}
}
}
]
},
"context_aware_access": {
"policies": [
{
"name": "require_trusted_device",
"target": "apps/gmail",
"access_levels": ["trusted_network"]
}
]
}
}
return zero_trust_components
def security_checklist(self):
"""Comprehensive security checklist."""
checklist = {
"Identity and Access": [
"Enable 2FA for all users",
"Use service accounts sparingly",
"Implement least privilege principle",
"Regular IAM audit",
"Use Workload Identity for GKE"
],
"Network Security": [
"Delete default VPC network",
"Use Private Google Access",
"Implement Cloud Armor",
"Enable VPC Flow Logs",
"Use Cloud NAT for outbound"
],
"Data Protection": [
"Enable CMEK for all services",
"Implement DLP scanning",
"Use Secret Manager",
"Enable audit logging",
"Regular backup testing"
],
"Compliance": [
"Regular compliance scans",
"Automated remediation",
"Continuous monitoring",
"Incident response plan",
"Security training"
]
}
return checklist
Conclusion
Google Cloud provides comprehensive security features that, when properly implemented, create a robust security posture. Key takeaways:
- Defense in Depth: Layer security controls at every level
- Least Privilege: Grant minimal required permissions
- Automation: Automate security responses and compliance
- Monitoring: Continuous monitoring and alerting
- Encryption: Encrypt data at rest and in transit
Next Steps
- Implement Security Command Center for unified visibility
- Deploy Cloud Armor for DDoS protection
- Study Chronicle for security analytics
- Explore BeyondCorp Enterprise for Zero Trust
- Get certified as a Google Cloud Security Engineer
Remember: Security is not a destination but a continuous journey. Stay informed about new threats and continuously improve your security posture.