Google Cloud Anthos: Complete Hybrid and Multi-Cloud Guide
Google Cloud Anthos is an application modernization platform that enables you to build, deploy, and manage applications across on-premises, Google Cloud, and other public clouds. This guide covers implementing enterprise-grade hybrid and multi-cloud architectures.
Anthos Architecture Overview
Core Components and Setup
# anthos-platform-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: anthos-platform-config
namespace: anthos-system
data:
platform: |
# Anthos Platform Configuration
components:
- name: "Anthos Config Management"
enabled: true
version: "1.14.0"
- name: "Anthos Service Mesh"
enabled: true
version: "1.16.0"
- name: "Anthos Identity Service"
enabled: true
version: "0.5.0"
- name: "Binary Authorization"
enabled: true
- name: "Policy Controller"
enabled: true
clusters:
- name: "gke-production"
type: "gke"
location: "us-central1"
tier: "production"
- name: "anthos-onprem-prod"
type: "anthos-bare-metal"
location: "datacenter-1"
tier: "production"
- name: "eks-production"
type: "attached-eks"
location: "us-east-1"
tier: "production"
Anthos Cluster Management
# anthos_cluster_manager.py
from google.cloud import gkehub_v1
from google.cloud import container_v1
from google.cloud import resourcemanager_v3
import yaml
import time
from typing import Dict, List, Any, Optional
import subprocess
import json
class AnthosClusterManager:
"""Manage Anthos clusters across environments"""
def __init__(self, project_id: str):
self.project_id = project_id
self.hub_client = gkehub_v1.GkeHubClient()
self.container_client = container_v1.ClusterManagerClient()
self.project_path = f"projects/{project_id}"
def register_gke_cluster(self, cluster_name: str,
location: str,
membership_name: Optional[str] = None) -> str:
"""Register GKE cluster with Anthos"""
membership_name = membership_name or f"{cluster_name}-membership"
# Get cluster details
cluster_path = (
f"projects/{self.project_id}/locations/{location}"
f"/clusters/{cluster_name}"
)
# Create membership
membership = gkehub_v1.Membership(
name=f"{self.project_path}/locations/global/memberships/{membership_name}",
description=f"GKE cluster {cluster_name} in {location}",
endpoint=gkehub_v1.MembershipEndpoint(
gke_cluster=gkehub_v1.GkeCluster(
resource_link=cluster_path
)
),
external_id=f"{cluster_name}-{location}"
)
operation = self.hub_client.create_membership(
parent=f"{self.project_path}/locations/global",
membership_id=membership_name,
membership=membership
)
result = operation.result()
print(f"Registered GKE cluster: {result.name}")
return result.name
def attach_external_cluster(self, cluster_name: str,
platform: str, # 'eks', 'aks', 'bare-metal'
kubeconfig_path: str) -> str:
"""Attach external cluster to Anthos"""
membership_name = f"{cluster_name}-attached"
# Generate manifest for the cluster
manifest_yaml = self._generate_connect_manifest(
membership_name, platform
)
# Apply manifest to external cluster
subprocess.run([
'kubectl', '--kubeconfig', kubeconfig_path,
'apply', '-f', '-'
], input=manifest_yaml.encode(), check=True)
# Wait for agent to be ready
time.sleep(30)
# Register membership
membership = gkehub_v1.Membership(
name=f"{self.project_path}/locations/global/memberships/{membership_name}",
description=f"Attached {platform} cluster: {cluster_name}",
external_id=f"{cluster_name}-{platform}"
)
operation = self.hub_client.create_membership(
parent=f"{self.project_path}/locations/global",
membership_id=membership_name,
membership=membership
)
result = operation.result()
print(f"Attached external cluster: {result.name}")
return result.name
def setup_anthos_bare_metal(self, cluster_config: Dict[str, Any]) -> str:
"""Setup Anthos on bare metal"""
# Create admin cluster configuration
admin_config = {
'apiVersion': 'baremetal.cluster.gke.io/v1',
'kind': 'Cluster',
'metadata': {
'name': cluster_config['name'],
'namespace': 'cluster-' + cluster_config['name']
},
'spec': {
'type': 'admin',
'anthosBareMetalVersion': cluster_config.get('version', '1.14.0'),
'networkConfig': {
'islandModeCIDR': cluster_config.get('pod_cidr', '10.200.0.0/16'),
'servicesCIDR': cluster_config.get('service_cidr', '10.96.0.0/20'),
'loadBalancer': {
'mode': 'bundled',
'type': 'layer2',
'addressPools': cluster_config['load_balancer_pools']
}
},
'storage': {
'lvpNodeMounts': {
'path': '/mnt/localpv-disk',
'storageClassName': 'local-disks'
},
'lvpShare': {
'path': '/mnt/localpv-share',
'storageClassName': 'local-shared',
'sharedPathPVCount': 5
}
},
'nodeConfig': {
'podDensity': {
'maxPodsPerNode': 250
}
},
'clusterSecurity': {
'authorization': {
'clusterAdmin': {
'gcpAccounts': cluster_config['admin_users']
}
}
}
}
}
# Write configuration
config_file = f'/tmp/{cluster_config["name"]}-config.yaml'
with open(config_file, 'w') as f:
yaml.dump(admin_config, f)
# Create cluster
subprocess.run([
'bmctl', 'create', 'cluster',
'-c', cluster_config['name'],
'--kubeconfig', cluster_config.get('kubeconfig', '~/.kube/config')
], check=True)
return cluster_config['name']
def _generate_connect_manifest(self, membership_name: str,
platform: str) -> str:
"""Generate Connect agent manifest"""
# Get Connect agent configuration
connect_config = subprocess.run([
'gcloud', 'container', 'hub', 'memberships',
'generate-gateway-rbac',
'--membership', membership_name,
'--role', 'clusterrole/cluster-admin',
'--users', f'serviceAccount:{self.project_id}.svc.id.goog[gke-connect/connect-agent-sa]',
'--project', self.project_id,
'--format', 'yaml'
], capture_output=True, text=True, check=True)
return connect_config.stdout
# Multi-cluster configuration
class MultiClusterConfigurator:
"""Configure multi-cluster features"""
def __init__(self, project_id: str):
self.project_id = project_id
def enable_multi_cluster_ingress(self, config_cluster: str,
member_clusters: List[str]):
"""Enable Multi-cluster Ingress"""
# Create MultiClusterIngress configuration
mci_config = {
'apiVersion': 'networking.gke.io/v1',
'kind': 'MultiClusterIngress',
'metadata': {
'name': 'app-ingress',
'namespace': 'default'
},
'spec': {
'template': {
'spec': {
'backend': {
'serviceName': 'app-service',
'servicePort': 80
},
'rules': [{
'host': 'app.example.com',
'http': {
'paths': [{
'backend': {
'serviceName': 'app-service',
'servicePort': 80
}
}]
}
}]
}
}
}
}
# Create MultiClusterService
mcs_config = {
'apiVersion': 'networking.gke.io/v1',
'kind': 'MultiClusterService',
'metadata': {
'name': 'app-service',
'namespace': 'default'
},
'spec': {
'template': {
'spec': {
'selector': {
'app': 'myapp'
},
'ports': [{
'name': 'http',
'protocol': 'TCP',
'port': 80,
'targetPort': 8080
}]
}
},
'clusters': member_clusters
}
}
# Apply configurations
self._apply_to_config_cluster(config_cluster, [mci_config, mcs_config])
def setup_multi_cluster_mesh(self, clusters: List[Dict[str, str]]):
"""Setup multi-cluster service mesh"""
# Install ASM on each cluster
for cluster in clusters:
self._install_asm(cluster)
# Create cross-cluster connectivity
for i, cluster1 in enumerate(clusters):
for cluster2 in clusters[i+1:]:
self._create_cluster_peer(cluster1, cluster2)
def _install_asm(self, cluster: Dict[str, str]):
"""Install Anthos Service Mesh on cluster"""
install_script = f'''
curl https://storage.googleapis.com/csm-artifacts/asm/asmcli_{cluster.get("asm_version", "1.16")}_linux_amd64 > asmcli
chmod +x asmcli
./asmcli install \
--project_id {self.project_id} \
--cluster_name {cluster["name"]} \
--cluster_location {cluster["location"]} \
--enable_all \
--ca mesh_ca
'''
subprocess.run(['bash', '-c', install_script], check=True)
def _create_cluster_peer(self, cluster1: Dict[str, str],
cluster2: Dict[str, str]):
"""Create peering between clusters"""
# Exchange root certificates
subprocess.run([
'kubectl', '--context', cluster1['context'],
'get', 'secret', 'cacerts', '-n', 'istio-system',
'-o', 'yaml'
], check=True)
# Create endpoint slices for cross-cluster discovery
print(f"Created peering between {cluster1['name']} and {cluster2['name']}")
def _apply_to_config_cluster(self, cluster: str, configs: List[Dict]):
"""Apply configurations to config cluster"""
for config in configs:
yaml_content = yaml.dump(config)
subprocess.run([
'kubectl', '--context', cluster,
'apply', '-f', '-'
], input=yaml_content.encode(), check=True)
Anthos Config Management
GitOps Implementation
# config-management-operator.yaml
apiVersion: configmanagement.gke.io/v1
kind: ConfigManagement
metadata:
name: config-management
spec:
# Enable multi-repo mode
enableMultiRepo: true
# Policy Controller configuration
policyController:
enabled: true
templateLibraryInstalled: true
logDeniesEnabled: true
referentialRulesEnabled: true
# Metrics configuration
metrics:
enabled: true
---
# Root repository configuration
apiVersion: configmanagement.gke.io/v1
kind: RootSync
metadata:
name: root-sync
namespace: config-management-system
spec:
sourceFormat: unstructured
git:
repo: https://github.com/example/anthos-config-root
branch: main
dir: "clusters/production"
auth: token
secretRef:
name: git-creds
---
# Namespace repository
apiVersion: configmanagement.gke.io/v1
kind: RepoSync
metadata:
name: repo-sync
namespace: app-team
spec:
sourceFormat: unstructured
git:
repo: https://github.com/example/app-team-config
branch: main
dir: "namespaces/app-team"
auth: token
secretRef:
name: git-creds
Policy Management
# policy_management.py
from typing import Dict, List, Any
import yaml
import os
class AnthossPolicyManager:
"""Manage Anthos policies across clusters"""
def __init__(self, config_repo_path: str):
self.config_repo_path = config_repo_path
def create_security_policies(self):
"""Create security policies for Anthos clusters"""
# Pod Security Policy
psp_policy = {
'apiVersion': 'policy/v1beta1',
'kind': 'PodSecurityPolicy',
'metadata': {
'name': 'restricted',
'annotations': {
'configmanagement.gke.io/cluster-selector': 'env-production'
}
},
'spec': {
'privileged': False,
'allowPrivilegeEscalation': False,
'requiredDropCapabilities': ['ALL'],
'volumes': ['configMap', 'emptyDir', 'projected', 'secret', 'downwardAPI', 'persistentVolumeClaim'],
'hostNetwork': False,
'hostIPC': False,
'hostPID': False,
'runAsUser': {
'rule': 'MustRunAsNonRoot'
},
'seLinux': {
'rule': 'RunAsAny'
},
'supplementalGroups': {
'rule': 'RunAsAny'
},
'fsGroup': {
'rule': 'RunAsAny'
},
'readOnlyRootFilesystem': False
}
}
# Network Policy
network_policy = {
'apiVersion': 'networking.k8s.io/v1',
'kind': 'NetworkPolicy',
'metadata': {
'name': 'default-deny-ingress',
'namespace': 'production',
'annotations': {
'configmanagement.gke.io/cluster-selector': 'env-production'
}
},
'spec': {
'podSelector': {},
'policyTypes': ['Ingress']
}
}
# Resource Quotas
resource_quota = {
'apiVersion': 'v1',
'kind': 'ResourceQuota',
'metadata': {
'name': 'compute-quota',
'namespace': 'production',
'annotations': {
'configmanagement.gke.io/cluster-selector': 'env-production'
}
},
'spec': {
'hard': {
'requests.cpu': '1000',
'requests.memory': '200Gi',
'persistentvolumeclaims': '10',
'services.loadbalancers': '2'
}
}
}
# Save policies to config repo
policies = [psp_policy, network_policy, resource_quota]
for policy in policies:
self._save_policy(policy)
def create_opa_constraints(self):
"""Create OPA Gatekeeper constraints"""
# Constraint template for required labels
required_labels_template = {
'apiVersion': 'templates.gatekeeper.sh/v1beta1',
'kind': 'ConstraintTemplate',
'metadata': {
'name': 'k8srequiredlabels'
},
'spec': {
'crd': {
'spec': {
'names': {
'kind': 'K8sRequiredLabels'
},
'validation': {
'openAPIV3Schema': {
'type': 'object',
'properties': {
'labels': {
'type': 'array',
'items': {
'type': 'string'
}
}
}
}
}
}
},
'targets': [{
'target': 'admission.k8s.gatekeeper.sh',
'rego': '''
package k8srequiredlabels
violation[{"msg": msg, "details": {"missing_labels": missing}}] {
required := input.parameters.labels
provided := input.review.object.metadata.labels
missing := required[_]
not provided[missing]
msg := sprintf("You must provide labels: %v", [missing])
}
'''
}]
}
}
# Constraint instance
required_labels_constraint = {
'apiVersion': 'constraints.gatekeeper.sh/v1beta1',
'kind': 'K8sRequiredLabels',
'metadata': {
'name': 'must-have-environment'
},
'spec': {
'match': {
'kinds': [{
'apiGroups': ['apps'],
'kinds': ['Deployment', 'StatefulSet']
}],
'namespaces': ['production', 'staging']
},
'parameters': {
'labels': ['environment', 'team', 'app']
}
}
}
# Container image policy
allowed_repos_template = {
'apiVersion': 'templates.gatekeeper.sh/v1beta1',
'kind': 'ConstraintTemplate',
'metadata': {
'name': 'k8sallowedrepos'
},
'spec': {
'crd': {
'spec': {
'names': {
'kind': 'K8sAllowedRepos'
},
'validation': {
'openAPIV3Schema': {
'type': 'object',
'properties': {
'repos': {
'type': 'array',
'items': {
'type': 'string'
}
}
}
}
}
}
},
'targets': [{
'target': 'admission.k8s.gatekeeper.sh',
'rego': '''
package k8sallowedrepos
violation[{"msg": msg}] {
container := input.review.object.spec.containers[_]
not strings.any_prefix_match(container.image, input.parameters.repos)
msg := sprintf("Container image %v is not from allowed repositories", [container.image])
}
strings.any_prefix_match(image, repos) {
repo := repos[_]
startswith(image, repo)
}
'''
}]
}
}
allowed_repos_constraint = {
'apiVersion': 'constraints.gatekeeper.sh/v1beta1',
'kind': 'K8sAllowedRepos',
'metadata': {
'name': 'prod-repo-whitelist'
},
'spec': {
'match': {
'kinds': [{
'apiGroups': ['apps', ''],
'kinds': ['Deployment', 'StatefulSet', 'Pod']
}],
'namespaces': ['production']
},
'parameters': {
'repos': [
'gcr.io/my-project/',
'us-docker.pkg.dev/my-project/'
]
}
}
}
# Save constraints
constraints = [
required_labels_template, required_labels_constraint,
allowed_repos_template, allowed_repos_constraint
]
for constraint in constraints:
self._save_policy(constraint)
def create_hierarchical_namespaces(self):
"""Create hierarchical namespace configuration"""
# Parent namespace
parent_namespace = {
'apiVersion': 'v1',
'kind': 'Namespace',
'metadata': {
'name': 'team-platform',
'labels': {
'team': 'platform'
}
}
}
# Subnamespace
subnamespace = {
'apiVersion': 'hnc.x-k8s.io/v1alpha2',
'kind': 'SubnamespaceAnchor',
'metadata': {
'name': 'team-platform-dev',
'namespace': 'team-platform'
}
}
# Hierarchical resource quota
hrq = {
'apiVersion': 'v1',
'kind': 'ResourceQuota',
'metadata': {
'name': 'team-quota',
'namespace': 'team-platform',
'annotations': {
'hnc.x-k8s.io/inheritable': 'true'
}
},
'spec': {
'hard': {
'requests.cpu': '100',
'requests.memory': '100Gi'
}
}
}
configs = [parent_namespace, subnamespace, hrq]
for config in configs:
self._save_policy(config)
def _save_policy(self, policy: Dict[str, Any]):
"""Save policy to config repository"""
# Determine file path based on policy type
kind = policy['kind'].lower()
namespace = policy['metadata'].get('namespace', 'cluster')
name = policy['metadata']['name']
if namespace == 'cluster':
file_path = f"{self.config_repo_path}/cluster/{kind}-{name}.yaml"
else:
file_path = f"{self.config_repo_path}/namespaces/{namespace}/{kind}-{name}.yaml"
# Create directory if needed
os.makedirs(os.path.dirname(file_path), exist_ok=True)
# Write policy
with open(file_path, 'w') as f:
yaml.dump(policy, f, default_flow_style=False)
print(f"Saved policy: {file_path}")
Anthos Service Mesh
Service Mesh Configuration
# asm-control-plane.yaml
apiVersion: install.istio.io/v1alpha1
kind: IstioOperator
metadata:
name: asm-control-plane
spec:
profile: asm-gcp
hub: gcr.io/gke-release/asm
tag: 1.16.0-asm.0
meshConfig:
accessLogFile: /dev/stdout
defaultConfig:
proxyStatsMatcher:
inclusionRegexps:
- ".*outlier_detection.*"
- ".*osconfig.*"
inclusionSuffixes:
- "upstream_rq_retry"
- "upstream_rq_pending"
- "downstream_rq_time"
values:
# Enable telemetry v2
telemetry:
v2:
prometheus:
configOverride:
inboundSidecar:
disable_host_header_fallback: true
outboundSidecar:
disable_host_header_fallback: true
gateway:
disable_host_header_fallback: true
# Multi-cluster configuration
pilot:
env:
PILOT_ENABLE_WORKLOAD_ENTRY_AUTOREGISTRATION: true
PILOT_ENABLE_CROSS_CLUSTER_WORKLOAD_ENTRY: true
# CNI configuration
cni:
enabled: true
# Security configuration
global:
mtls:
auto: true
Advanced Traffic Management
# traffic_management.py
import yaml
from typing import Dict, List, Any, Optional
class ServiceMeshTrafficManager:
"""Manage traffic policies in Anthos Service Mesh"""
def __init__(self, cluster_contexts: List[str]):
self.cluster_contexts = cluster_contexts
def create_multi_cluster_service(self, service_name: str,
namespace: str,
clusters: List[Dict[str, Any]]):
"""Create multi-cluster service configuration"""
# ServiceEntry for cross-cluster communication
service_entry = {
'apiVersion': 'networking.istio.io/v1beta1',
'kind': 'ServiceEntry',
'metadata': {
'name': f'{service_name}-cross-cluster',
'namespace': namespace
},
'spec': {
'hosts': [f'{service_name}.{namespace}.global'],
'location': 'MESH_EXTERNAL',
'ports': [{
'number': 8080,
'name': 'http',
'protocol': 'HTTP'
}],
'resolution': 'DNS',
'endpoints': []
}
}
# Add endpoints from each cluster
for cluster in clusters:
service_entry['spec']['endpoints'].append({
'address': cluster['endpoint'],
'priority': cluster.get('priority', 0),
'weight': cluster.get('weight', 100),
'labels': {
'cluster': cluster['name'],
'region': cluster['region']
}
})
# DestinationRule for load balancing
destination_rule = {
'apiVersion': 'networking.istio.io/v1beta1',
'kind': 'DestinationRule',
'metadata': {
'name': f'{service_name}-cross-cluster',
'namespace': namespace
},
'spec': {
'host': f'{service_name}.{namespace}.global',
'trafficPolicy': {
'connectionPool': {
'tcp': {
'maxConnections': 100
},
'http': {
'http1MaxPendingRequests': 100,
'http2MaxRequests': 100,
'maxRequestsPerConnection': 1
}
},
'loadBalancer': {
'simple': 'LEAST_REQUEST'
},
'outlierDetection': {
'consecutiveErrors': 5,
'interval': '30s',
'baseEjectionTime': '30s',
'maxEjectionPercent': 50,
'minHealthPercent': 30
}
},
'subsets': []
}
}
# Add subset for each cluster
for cluster in clusters:
destination_rule['spec']['subsets'].append({
'name': cluster['name'],
'labels': {
'cluster': cluster['name']
},
'trafficPolicy': {
'portLevelSettings': [{
'port': {
'number': 8080
},
'loadBalancer': {
'consistentHash': {
'httpHeaderName': 'x-session-id'
}
}
}]
}
})
return service_entry, destination_rule
def create_canary_deployment(self, service_name: str,
namespace: str,
stable_version: str,
canary_version: str,
canary_weight: int = 10):
"""Create canary deployment configuration"""
# VirtualService for traffic splitting
virtual_service = {
'apiVersion': 'networking.istio.io/v1beta1',
'kind': 'VirtualService',
'metadata': {
'name': service_name,
'namespace': namespace
},
'spec': {
'hosts': [service_name],
'http': [{
'match': [{
'headers': {
'x-canary': {
'exact': 'true'
}
}
}],
'route': [{
'destination': {
'host': service_name,
'subset': canary_version
}
}]
}, {
'route': [{
'destination': {
'host': service_name,
'subset': stable_version
},
'weight': 100 - canary_weight
}, {
'destination': {
'host': service_name,
'subset': canary_version
},
'weight': canary_weight
}]
}]
}
}
# DestinationRule for subsets
destination_rule = {
'apiVersion': 'networking.istio.io/v1beta1',
'kind': 'DestinationRule',
'metadata': {
'name': service_name,
'namespace': namespace
},
'spec': {
'host': service_name,
'subsets': [{
'name': stable_version,
'labels': {
'version': stable_version
}
}, {
'name': canary_version,
'labels': {
'version': canary_version
}
}]
}
}
return virtual_service, destination_rule
def create_circuit_breaker(self, service_name: str,
namespace: str,
max_requests: int = 100,
max_pending: int = 100,
consecutive_errors: int = 5):
"""Create circuit breaker configuration"""
destination_rule = {
'apiVersion': 'networking.istio.io/v1beta1',
'kind': 'DestinationRule',
'metadata': {
'name': f'{service_name}-circuit-breaker',
'namespace': namespace
},
'spec': {
'host': service_name,
'trafficPolicy': {
'connectionPool': {
'tcp': {
'maxConnections': max_requests
},
'http': {
'http1MaxPendingRequests': max_pending,
'http2MaxRequests': max_requests,
'maxRequestsPerConnection': 1,
'h2UpgradePolicy': 'UPGRADE'
}
},
'outlierDetection': {
'consecutiveErrors': consecutive_errors,
'interval': '30s',
'baseEjectionTime': '30s',
'maxEjectionPercent': 100,
'minHealthPercent': 0,
'splitExternalLocalOriginErrors': True
}
}
}
}
return destination_rule
def create_retry_policy(self, service_name: str,
namespace: str,
attempts: int = 3,
per_try_timeout: str = '2s'):
"""Create retry policy"""
virtual_service = {
'apiVersion': 'networking.istio.io/v1beta1',
'kind': 'VirtualService',
'metadata': {
'name': f'{service_name}-retry',
'namespace': namespace
},
'spec': {
'hosts': [service_name],
'http': [{
'route': [{
'destination': {
'host': service_name
}
}],
'retries': {
'attempts': attempts,
'perTryTimeout': per_try_timeout,
'retryOn': 'gateway-error,connect-failure,refused-stream'
},
'timeout': '10s'
}]
}
}
return virtual_service
# Service mesh security
class ServiceMeshSecurity:
"""Manage service mesh security policies"""
def create_authorization_policy(self, namespace: str,
service_name: str,
allowed_namespaces: List[str]):
"""Create authorization policy"""
auth_policy = {
'apiVersion': 'security.istio.io/v1beta1',
'kind': 'AuthorizationPolicy',
'metadata': {
'name': f'{service_name}-authz',
'namespace': namespace
},
'spec': {
'selector': {
'matchLabels': {
'app': service_name
}
},
'action': 'ALLOW',
'rules': [{
'from': [{
'source': {
'namespaces': allowed_namespaces
}
}],
'to': [{
'operation': {
'methods': ['GET', 'POST']
}
}]
}]
}
}
return auth_policy
def create_peer_authentication(self, namespace: str,
mode: str = 'STRICT'):
"""Create peer authentication policy"""
peer_auth = {
'apiVersion': 'security.istio.io/v1beta1',
'kind': 'PeerAuthentication',
'metadata': {
'name': 'default',
'namespace': namespace
},
'spec': {
'mtls': {
'mode': mode
}
}
}
return peer_auth
Identity and Access Management
Anthos Identity Service
# identity_management.py
from google.cloud import iam_v1
from google.cloud import secretmanager
import ldap3
import yaml
from typing import Dict, List, Any
import subprocess
class AnthosIdentityManager:
"""Manage identity and access in Anthos"""
def __init__(self, project_id: str):
self.project_id = project_id
self.iam_client = iam_v1.IAMClient()
self.secret_client = secretmanager.SecretManagerServiceClient()
def setup_oidc_provider(self, cluster_name: str,
provider_config: Dict[str, Any]):
"""Setup OIDC provider for cluster"""
# Create OIDC configuration
oidc_config = {
'apiVersion': 'authentication.gke.io/v1alpha1',
'kind': 'ClientConfig',
'metadata': {
'name': 'default',
'namespace': 'kube-public'
},
'spec': {
'authentication': [{
'name': provider_config['name'],
'oidc': {
'clientID': provider_config['client_id'],
'clientSecret': provider_config['client_secret'],
'extraParams': 'prompt=consent,access_type=offline',
'issuerURI': provider_config['issuer_uri'],
'kubectlRedirectURI': 'http://localhost:8000/callback',
'scopes': 'email,profile,openid',
'userClaim': provider_config.get('user_claim', 'email'),
'groupsClaim': provider_config.get('groups_claim', 'groups')
}
}],
'certificateAuthorityData': provider_config.get('ca_data', '')
}
}
# Apply configuration to cluster
self._apply_to_cluster(cluster_name, oidc_config)
# Create RBAC bindings for groups
if 'group_bindings' in provider_config:
self._create_group_rbac_bindings(
cluster_name,
provider_config['group_bindings']
)
def setup_ldap_sync(self, cluster_name: str,
ldap_config: Dict[str, Any]):
"""Setup LDAP synchronization"""
# Connect to LDAP
server = ldap3.Server(
ldap_config['host'],
port=ldap_config.get('port', 389),
use_ssl=ldap_config.get('use_ssl', False)
)
conn = ldap3.Connection(
server,
ldap_config['bind_dn'],
ldap_config['bind_password'],
auto_bind=True
)
# Search for users
conn.search(
ldap_config['user_search_base'],
ldap_config['user_search_filter'],
attributes=['uid', 'mail', 'memberOf']
)
users = []
groups = {}
for entry in conn.entries:
user = {
'username': str(entry.uid),
'email': str(entry.mail),
'groups': []
}
# Process group memberships
if hasattr(entry, 'memberOf'):
for group_dn in entry.memberOf:
group_name = self._extract_group_name(str(group_dn))
user['groups'].append(group_name)
if group_name not in groups:
groups[group_name] = []
groups[group_name].append(user['username'])
users.append(user)
# Create Kubernetes users and groups
self._sync_users_to_cluster(cluster_name, users, groups)
def create_workload_identity_binding(self, namespace: str,
service_account: str,
gcp_service_account: str):
"""Create Workload Identity binding"""
# Create Kubernetes service account
k8s_sa = {
'apiVersion': 'v1',
'kind': 'ServiceAccount',
'metadata': {
'name': service_account,
'namespace': namespace,
'annotations': {
'iam.gke.io/gcp-service-account': f'{gcp_service_account}@{self.project_id}.iam.gserviceaccount.com'
}
}
}
# Create IAM policy binding
policy_binding = {
'role': 'roles/iam.workloadIdentityUser',
'members': [
f'serviceAccount:{self.project_id}.svc.id.goog[{namespace}/{service_account}]'
]
}
# Apply configurations
subprocess.run([
'gcloud', 'iam', 'service-accounts', 'add-iam-policy-binding',
f'{gcp_service_account}@{self.project_id}.iam.gserviceaccount.com',
'--role', policy_binding['role'],
'--member', policy_binding['members'][0]
], check=True)
return k8s_sa
def create_rbac_policies(self, policies: List[Dict[str, Any]]):
"""Create RBAC policies"""
rbac_configs = []
for policy in policies:
# Create Role
role = {
'apiVersion': 'rbac.authorization.k8s.io/v1',
'kind': 'Role' if policy.get('namespace') else 'ClusterRole',
'metadata': {
'name': policy['name']
},
'rules': policy['rules']
}
if policy.get('namespace'):
role['metadata']['namespace'] = policy['namespace']
rbac_configs.append(role)
# Create RoleBinding
binding = {
'apiVersion': 'rbac.authorization.k8s.io/v1',
'kind': 'RoleBinding' if policy.get('namespace') else 'ClusterRoleBinding',
'metadata': {
'name': f"{policy['name']}-binding"
},
'roleRef': {
'apiGroup': 'rbac.authorization.k8s.io',
'kind': 'Role' if policy.get('namespace') else 'ClusterRole',
'name': policy['name']
},
'subjects': policy['subjects']
}
if policy.get('namespace'):
binding['metadata']['namespace'] = policy['namespace']
rbac_configs.append(binding)
return rbac_configs
def _create_group_rbac_bindings(self, cluster_name: str,
group_bindings: Dict[str, str]):
"""Create RBAC bindings for OIDC groups"""
for group, role in group_bindings.items():
binding = {
'apiVersion': 'rbac.authorization.k8s.io/v1',
'kind': 'ClusterRoleBinding',
'metadata': {
'name': f'oidc-{group}-{role}'
},
'roleRef': {
'apiGroup': 'rbac.authorization.k8s.io',
'kind': 'ClusterRole',
'name': role
},
'subjects': [{
'kind': 'Group',
'name': group,
'apiGroup': 'rbac.authorization.k8s.io'
}]
}
self._apply_to_cluster(cluster_name, binding)
def _extract_group_name(self, group_dn: str) -> str:
"""Extract group name from LDAP DN"""
# Extract CN from DN
parts = group_dn.split(',')
for part in parts:
if part.startswith('CN='):
return part[3:]
return group_dn
def _sync_users_to_cluster(self, cluster_name: str,
users: List[Dict[str, Any]],
groups: Dict[str, List[str]]):
"""Sync users and groups to cluster"""
# This is a simplified example
# In production, you would use a proper identity provider
print(f"Syncing {len(users)} users and {len(groups)} groups to {cluster_name}")
def _apply_to_cluster(self, cluster_name: str, config: Dict[str, Any]):
"""Apply configuration to cluster"""
yaml_content = yaml.dump(config)
subprocess.run([
'kubectl', '--context', cluster_name,
'apply', '-f', '-'
], input=yaml_content.encode(), check=True)
Observability and Monitoring
Unified Monitoring Setup
# monitoring-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-config
namespace: anthos-monitoring
data:
prometheus.yml: |
global:
scrape_interval: 30s
evaluation_interval: 30s
external_labels:
cluster: '${CLUSTER_NAME}'
project: '${PROJECT_ID}'
# Remote write to Cloud Monitoring
remote_write:
- url: https://monitoring.googleapis.com/v1/projects/${PROJECT_ID}/location/global/prometheus/api/v1/write
queue_config:
capacity: 10000
max_shards: 200
min_shards: 1
max_samples_per_send: 10000
batch_send_deadline: 5s
min_backoff: 30ms
max_backoff: 100ms
write_relabel_configs:
- source_labels: [__name__]
regex: 'up|container_.*|node_.*|kube_.*|istio_.*'
action: keep
scrape_configs:
# Kubernetes API server
- job_name: 'kubernetes-apiservers'
kubernetes_sd_configs:
- role: endpoints
scheme: https
tls_config:
ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token
relabel_configs:
- source_labels: [__meta_kubernetes_namespace, __meta_kubernetes_service_name, __meta_kubernetes_endpoint_port_name]
action: keep
regex: default;kubernetes;https
# Kubelet metrics
- job_name: 'kubernetes-nodes'
kubernetes_sd_configs:
- role: node
scheme: https
tls_config:
ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token
relabel_configs:
- action: labelmap
regex: __meta_kubernetes_node_label_(.+)
# Istio metrics
- job_name: 'istio-mesh'
kubernetes_sd_configs:
- role: endpoints
namespaces:
names:
- istio-system
- istio-gateway
relabel_configs:
- source_labels: [__meta_kubernetes_service_name, __meta_kubernetes_endpoint_port_name]
action: keep
regex: istio-telemetry;prometheus
Observability Implementation
# observability.py
from google.cloud import monitoring_v3
from google.cloud import logging_v2
from google.cloud import trace_v2
import time
from typing import Dict, List, Any
class AnthosObservability:
"""Implement observability for Anthos clusters"""
def __init__(self, project_id: str):
self.project_id = project_id
self.monitoring_client = monitoring_v3.MetricServiceClient()
self.logging_client = logging_v2.LoggingServiceClient()
self.trace_client = trace_v2.TraceServiceClient()
self.project_name = f"projects/{project_id}"
def create_slo_monitoring(self, service_name: str,
slo_config: Dict[str, Any]):
"""Create SLO monitoring for services"""
# Create custom metric for SLI
sli_metric = monitoring_v3.MetricDescriptor(
type=f"custom.googleapis.com/{service_name}/availability",
metric_kind=monitoring_v3.MetricDescriptor.MetricKind.GAUGE,
value_type=monitoring_v3.MetricDescriptor.ValueType.DOUBLE,
display_name=f"{service_name} Availability",
description="Service availability as measured by successful requests"
)
self.monitoring_client.create_metric_descriptor(
name=self.project_name,
metric_descriptor=sli_metric
)
# Create SLO alert policy
alert_policy = monitoring_v3.AlertPolicy(
display_name=f"{service_name} SLO Violation",
conditions=[
monitoring_v3.AlertPolicy.Condition(
display_name="SLO burn rate too high",
condition_threshold=monitoring_v3.AlertPolicy.Condition.MetricThreshold(
filter=f'''
resource.type="k8s_service"
AND resource.labels.service_name="{service_name}"
AND metric.type="custom.googleapis.com/{service_name}/error_rate"
''',
comparison=monitoring_v3.ComparisonType.COMPARISON_GT,
threshold_value=slo_config['error_budget_burn_rate'],
duration=monitoring_v3.Duration(seconds=300),
aggregations=[
monitoring_v3.Aggregation(
alignment_period=monitoring_v3.Duration(seconds=60),
per_series_aligner=monitoring_v3.Aggregation.Aligner.ALIGN_RATE
)
]
)
)
],
notification_channels=slo_config.get('notification_channels', [])
)
self.monitoring_client.create_alert_policy(
name=self.project_name,
alert_policy=alert_policy
)
def create_multi_cluster_dashboard(self, clusters: List[str]):
"""Create multi-cluster monitoring dashboard"""
from google.cloud import monitoring_dashboard_v1
dashboard_client = monitoring_dashboard_v1.DashboardsServiceClient()
# Dashboard configuration
dashboard = {
"displayName": "Anthos Multi-Cluster Overview",
"mosaicLayout": {
"columns": 12,
"tiles": [
# Cluster health overview
{
"width": 6,
"height": 4,
"widget": {
"title": "Cluster Health",
"scorecard": {
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": 'resource.type="k8s_cluster" metric.type="kubernetes.io/node/cpu/allocatable_utilization"',
"aggregation": {
"alignmentPeriod": "60s",
"perSeriesAligner": "ALIGN_MEAN",
"crossSeriesReducer": "REDUCE_MEAN",
"groupByFields": ["resource.cluster_name"]
}
}
},
"sparkChartView": {
"sparkChartType": "SPARK_LINE"
}
}
}
},
# Service mesh traffic
{
"xPos": 6,
"width": 6,
"height": 4,
"widget": {
"title": "Service Mesh Traffic",
"xyChart": {
"dataSets": [{
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": 'resource.type="k8s_container" metric.type="istio.io/service/server/request_count"',
"aggregation": {
"alignmentPeriod": "60s",
"perSeriesAligner": "ALIGN_RATE",
"crossSeriesReducer": "REDUCE_SUM",
"groupByFields": ["metric.destination_service_name"]
}
}
}
}]
}
}
},
# Cross-cluster latency
{
"yPos": 4,
"width": 12,
"height": 4,
"widget": {
"title": "Cross-Cluster Latency",
"xyChart": {
"dataSets": [{
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": 'resource.type="k8s_container" metric.type="istio.io/service/server/response_latencies"',
"aggregation": {
"alignmentPeriod": "60s",
"perSeriesAligner": "ALIGN_DELTA",
"crossSeriesReducer": "REDUCE_PERCENTILE_95",
"groupByFields": ["metric.source_workload", "metric.destination_workload"]
}
}
}
}]
}
}
}
]
}
}
response = dashboard_client.create_dashboard(
parent=self.project_name,
dashboard=monitoring_dashboard_v1.Dashboard(dashboard)
)
return response
def setup_distributed_tracing(self, clusters: List[str]):
"""Setup distributed tracing across clusters"""
# Configure OpenTelemetry collector for each cluster
otel_config = {
'apiVersion': 'v1',
'kind': 'ConfigMap',
'metadata': {
'name': 'otel-collector-config',
'namespace': 'istio-system'
},
'data': {
'otel-collector-config.yaml': '''
receivers:
zipkin:
endpoint: 0.0.0.0:9411
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
batch:
timeout: 10s
send_batch_size: 1024
resource:
attributes:
- key: cluster.name
value: ${CLUSTER_NAME}
action: insert
- key: project.id
value: ${PROJECT_ID}
action: insert
exporters:
googlecloud:
project: ${PROJECT_ID}
service:
pipelines:
traces:
receivers: [zipkin, otlp]
processors: [batch, resource]
exporters: [googlecloud]
'''
}
}
# Deploy to each cluster
for cluster in clusters:
subprocess.run([
'kubectl', '--context', cluster,
'apply', '-f', '-'
], input=yaml.dump(otel_config).encode(), check=True)
Migration and Modernization
Application Migration Strategy
# migration_strategy.py
import subprocess
import yaml
from typing import Dict, List, Any, Optional
import docker
import os
class AnthosaMigrationManager:
"""Manage application migration to Anthos"""
def __init__(self, project_id: str):
self.project_id = project_id
self.docker_client = docker.from_env()
def analyze_application(self, app_path: str) -> Dict[str, Any]:
"""Analyze application for migration readiness"""
analysis = {
'containerizable': True,
'dependencies': [],
'recommendations': [],
'estimated_effort': 'medium'
}
# Check for Dockerfile
if os.path.exists(os.path.join(app_path, 'Dockerfile')):
analysis['has_dockerfile'] = True
else:
analysis['has_dockerfile'] = False
analysis['recommendations'].append('Create Dockerfile for containerization')
# Check for common dependencies
if os.path.exists(os.path.join(app_path, 'pom.xml')):
analysis['framework'] = 'java-maven'
analysis['dependencies'].extend(self._analyze_maven_deps(app_path))
elif os.path.exists(os.path.join(app_path, 'package.json')):
analysis['framework'] = 'nodejs'
analysis['dependencies'].extend(self._analyze_npm_deps(app_path))
elif os.path.exists(os.path.join(app_path, 'requirements.txt')):
analysis['framework'] = 'python'
analysis['dependencies'].extend(self._analyze_python_deps(app_path))
# Check for stateful components
stateful_indicators = ['database', 'mysql', 'postgres', 'mongodb', 'redis']
for indicator in stateful_indicators:
if self._check_for_pattern(app_path, indicator):
analysis['has_state'] = True
analysis['recommendations'].append(
f'Consider using Cloud SQL or managed database for {indicator}'
)
# Check for configuration
if os.path.exists(os.path.join(app_path, 'config')) or \
os.path.exists(os.path.join(app_path, '.env')):
analysis['has_config'] = True
analysis['recommendations'].append('Use ConfigMaps and Secrets for configuration')
return analysis
def generate_migration_artifacts(self, app_name: str,
app_analysis: Dict[str, Any]) -> Dict[str, str]:
"""Generate Kubernetes artifacts for migration"""
artifacts = {}
# Generate Dockerfile if needed
if not app_analysis.get('has_dockerfile', False):
dockerfile = self._generate_dockerfile(app_analysis['framework'])
artifacts['Dockerfile'] = dockerfile
# Generate Kubernetes manifests
deployment = {
'apiVersion': 'apps/v1',
'kind': 'Deployment',
'metadata': {
'name': app_name,
'labels': {
'app': app_name,
'version': 'v1'
}
},
'spec': {
'replicas': 3,
'selector': {
'matchLabels': {
'app': app_name
}
},
'template': {
'metadata': {
'labels': {
'app': app_name,
'version': 'v1'
},
'annotations': {
'sidecar.istio.io/inject': 'true'
}
},
'spec': {
'containers': [{
'name': app_name,
'image': f'gcr.io/{self.project_id}/{app_name}:latest',
'ports': [{
'containerPort': 8080,
'name': 'http'
}],
'env': [{
'name': 'PORT',
'value': '8080'
}],
'resources': {
'requests': {
'cpu': '100m',
'memory': '128Mi'
},
'limits': {
'cpu': '500m',
'memory': '512Mi'
}
},
'livenessProbe': {
'httpGet': {
'path': '/health',
'port': 8080
},
'initialDelaySeconds': 30,
'periodSeconds': 10
},
'readinessProbe': {
'httpGet': {
'path': '/ready',
'port': 8080
},
'initialDelaySeconds': 5,
'periodSeconds': 5
}
}]
}
}
}
}
service = {
'apiVersion': 'v1',
'kind': 'Service',
'metadata': {
'name': app_name,
'labels': {
'app': app_name
}
},
'spec': {
'selector': {
'app': app_name
},
'ports': [{
'name': 'http',
'port': 80,
'targetPort': 8080
}]
}
}
# Add stateful components if needed
if app_analysis.get('has_state', False):
statefulset = self._generate_statefulset(app_name)
artifacts['statefulset.yaml'] = yaml.dump(statefulset)
artifacts['deployment.yaml'] = yaml.dump(deployment)
artifacts['service.yaml'] = yaml.dump(service)
# Generate migration script
migration_script = self._generate_migration_script(app_name, artifacts.keys())
artifacts['migrate.sh'] = migration_script
return artifacts
def _generate_dockerfile(self, framework: str) -> str:
"""Generate Dockerfile based on framework"""
dockerfiles = {
'java-maven': '''
FROM maven:3.8-openjdk-11 AS build
WORKDIR /app
COPY pom.xml .
RUN mvn dependency:go-offline
COPY src ./src
RUN mvn package -DskipTests
FROM openjdk:11-jre-slim
WORKDIR /app
COPY --from=build /app/target/*.jar app.jar
EXPOSE 8080
CMD ["java", "-jar", "app.jar"]
''',
'nodejs': '''
FROM node:16-alpine AS build
WORKDIR /app
COPY package*.json ./
RUN npm ci --only=production
FROM node:16-alpine
WORKDIR /app
COPY --from=build /app/node_modules ./node_modules
COPY . .
EXPOSE 8080
CMD ["node", "server.js"]
''',
'python': '''
FROM python:3.9-slim AS build
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
FROM python:3.9-slim
WORKDIR /app
COPY --from=build /usr/local/lib/python3.9/site-packages /usr/local/lib/python3.9/site-packages
COPY . .
EXPOSE 8080
CMD ["python", "app.py"]
'''
}
return dockerfiles.get(framework, dockerfiles['nodejs'])
def _generate_statefulset(self, app_name: str) -> Dict[str, Any]:
"""Generate StatefulSet for stateful applications"""
return {
'apiVersion': 'apps/v1',
'kind': 'StatefulSet',
'metadata': {
'name': f'{app_name}-db'
},
'spec': {
'serviceName': f'{app_name}-db',
'replicas': 3,
'selector': {
'matchLabels': {
'app': f'{app_name}-db'
}
},
'template': {
'metadata': {
'labels': {
'app': f'{app_name}-db'
}
},
'spec': {
'containers': [{
'name': 'database',
'image': 'postgres:13',
'ports': [{
'containerPort': 5432,
'name': 'postgres'
}],
'volumeMounts': [{
'name': 'data',
'mountPath': '/var/lib/postgresql/data'
}],
'env': [{
'name': 'POSTGRES_DB',
'value': app_name
}, {
'name': 'POSTGRES_PASSWORD',
'valueFrom': {
'secretKeyRef': {
'name': f'{app_name}-db-secret',
'key': 'password'
}
}
}]
}]
}
},
'volumeClaimTemplates': [{
'metadata': {
'name': 'data'
},
'spec': {
'accessModes': ['ReadWriteOnce'],
'resources': {
'requests': {
'storage': '10Gi'
}
}
}
}]
}
}
def _generate_migration_script(self, app_name: str,
artifact_files: List[str]) -> str:
"""Generate migration script"""
script = f'''#!/bin/bash
set -e
echo "Starting migration of {app_name} to Anthos..."
# Build and push container image
echo "Building container image..."
docker build -t gcr.io/{self.project_id}/{app_name}:latest .
docker push gcr.io/{self.project_id}/{app_name}:latest
# Apply Kubernetes manifests
echo "Deploying to Kubernetes..."
'''
for file in artifact_files:
if file.endswith('.yaml'):
script += f'kubectl apply -f {file}\n'
script += '''
# Wait for deployment
echo "Waiting for deployment to be ready..."
kubectl wait --for=condition=available --timeout=300s deployment/''' + app_name + '''
echo "Migration completed successfully!"
echo "Application URL: $(kubectl get service ''' + app_name + ''' -o jsonpath='{.status.loadBalancer.ingress[0].ip}')"
'''
return script
def _analyze_maven_deps(self, app_path: str) -> List[str]:
"""Analyze Maven dependencies"""
# Simplified - would parse pom.xml in production
return ['spring-boot', 'mysql-connector']
def _analyze_npm_deps(self, app_path: str) -> List[str]:
"""Analyze NPM dependencies"""
# Simplified - would parse package.json in production
return ['express', 'mongoose']
def _analyze_python_deps(self, app_path: str) -> List[str]:
"""Analyze Python dependencies"""
# Simplified - would parse requirements.txt in production
return ['flask', 'sqlalchemy']
def _check_for_pattern(self, app_path: str, pattern: str) -> bool:
"""Check if pattern exists in application files"""
# Simplified - would do actual file scanning in production
return False
Best Practices and Patterns
1. Cluster Management
- Use Fleet management for consistent configuration
- Implement proper cluster naming conventions
- Enable workload identity for all clusters
- Use separate clusters for different environments
2. Security
- Enable Binary Authorization for container images
- Implement Pod Security Policies
- Use service mesh for mTLS
- Regular security scanning and updates
3. Networking
- Design for multi-region deployments
- Implement proper network segmentation
- Use Multi-cluster Ingress for global load balancing
- Monitor cross-cluster latency
4. Observability
- Implement unified logging across clusters
- Use distributed tracing for service mesh
- Create SLOs for critical services
- Set up proper alerting
5. Cost Optimization
- Right-size cluster nodes
- Use autoscaling appropriately
- Implement resource quotas
- Monitor and optimize cross-region traffic
Next Steps
- Explore Anthos Config Management best practices
- Learn about Anthos Service Mesh advanced features
- Study Binary Authorization for supply chain security
- Implement Anthos Policy Controller
Remember that Anthos provides a comprehensive platform for modernizing applications and managing them consistently across hybrid and multi-cloud environments. Proper implementation ensures portability, consistency, and enterprise-grade security.