Infraestructura IoT: Construyendo el Futuro Conectado
La Internet de las Cosas (IoT) ha revolucionado la forma en que interactuamos con el mundo físico. Desde sensores industriales hasta dispositivos domésticos inteligentes, la infraestructura IoT requiere un enfoque DevOps robusto para garantizar escalabilidad, seguridad y confiabilidad.
Arquitectura de Infraestructura IoT
Capas Fundamentales
Una infraestructura IoT moderna se compone de varias capas interconectadas:
# docker-compose.yml - Stack IoT Completo
version: '3.8'
services:
# MQTT Broker para dispositivos
mosquitto:
image: eclipse-mosquitto:2.0
container_name: iot-mqtt-broker
ports:
- "1883:1883"
- "9001:9001"
volumes:
- ./mqtt/config:/mosquitto/config
- ./mqtt/data:/mosquitto/data
- ./mqtt/log:/mosquitto/log
restart: always
# Base de datos de series temporales
influxdb:
image: influxdb:2.7
container_name: iot-influxdb
ports:
- "8086:8086"
environment:
- DOCKER_INFLUXDB_INIT_MODE=setup
- DOCKER_INFLUXDB_INIT_USERNAME=admin
- DOCKER_INFLUXDB_INIT_PASSWORD=secretpassword
- DOCKER_INFLUXDB_INIT_ORG=iot-org
- DOCKER_INFLUXDB_INIT_BUCKET=iot-data
volumes:
- influxdb-data:/var/lib/influxdb2
restart: always
# Grafana para visualización
grafana:
image: grafana/grafana:10.0.0
container_name: iot-grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana-data:/var/lib/grafana
restart: always
# Node-RED para orquestación
nodered:
image: nodered/node-red:latest
container_name: iot-nodered
ports:
- "1880:1880"
volumes:
- nodered-data:/data
environment:
- TZ=Europe/Madrid
restart: always
volumes:
influxdb-data:
grafana-data:
nodered-data:
Gateway IoT con Docker
# Dockerfile.iot-gateway
FROM python:3.11-slim
WORKDIR /app
RUN pip install paho-mqtt influxdb-client asyncio aiohttp
COPY gateway/ .
CMD ["python", "iot_gateway.py"]
# gateway/iot_gateway.py - Gateway IoT Inteligente
import asyncio
import json
import logging
from datetime import datetime
from typing import Dict, Any
import paho.mqtt.client as mqtt
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
class IoTGateway:
def __init__(self):
self.mqtt_client = mqtt.Client()
self.influx_client = InfluxDBClient(
url="http://influxdb:8086",
token="your-influxdb-token",
org="iot-org"
)
self.write_api = self.influx_client.write_api(write_options=SYNCHRONOUS)
self.device_registry = {}
# Configurar callbacks MQTT
self.mqtt_client.on_connect = self.on_connect
self.mqtt_client.on_message = self.on_message
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def on_connect(self, client, userdata, flags, rc):
self.logger.info(f"Conectado al broker MQTT con código: {rc}")
# Suscribirse a todos los topics de dispositivos
client.subscribe("devices/+/+")
client.subscribe("sensors/+/+")
def on_message(self, client, userdata, msg):
try:
topic_parts = msg.topic.split('/')
device_type = topic_parts[0]
device_id = topic_parts[1]
metric = topic_parts[2] if len(topic_parts) > 2 else 'data'
payload = json.loads(msg.payload.decode())
# Procesar y almacenar datos
asyncio.create_task(self.process_device_data(
device_type, device_id, metric, payload
))
except Exception as e:
self.logger.error(f"Error procesando mensaje: {e}")
async def process_device_data(self, device_type: str, device_id: str,
metric: str, payload: Dict[str, Any]):
try:
# Registrar dispositivo si es nuevo
device_key = f"{device_type}_{device_id}"
if device_key not in self.device_registry:
self.device_registry[device_key] = {
'first_seen': datetime.utcnow(),
'last_seen': datetime.utcnow(),
'message_count': 0
}
# Actualizar registro
self.device_registry[device_key]['last_seen'] = datetime.utcnow()
self.device_registry[device_key]['message_count'] += 1
# Crear punto de datos para InfluxDB
point = Point(metric) \
.tag("device_type", device_type) \
.tag("device_id", device_id) \
.time(datetime.utcnow())
# Añadir campos según el tipo de datos
for key, value in payload.items():
if isinstance(value, (int, float)):
point = point.field(key, value)
else:
point = point.tag(key, str(value))
# Escribir a InfluxDB
self.write_api.write(bucket="iot-data", record=point)
# Aplicar reglas de negocio
await self.apply_business_rules(device_type, device_id, payload)
self.logger.info(f"Datos procesados: {device_key} - {metric}")
except Exception as e:
self.logger.error(f"Error almacenando datos: {e}")
async def apply_business_rules(self, device_type: str, device_id: str,
payload: Dict[str, Any]):
"""Aplicar reglas de negocio específicas"""
# Ejemplo: Alertas por temperatura alta
if device_type == "sensors" and "temperature" in payload:
temp = payload["temperature"]
if temp > 40: # Umbral crítico
await self.send_alert({
"device_id": device_id,
"alert_type": "HIGH_TEMPERATURE",
"value": temp,
"threshold": 40,
"timestamp": datetime.utcnow().isoformat()
})
async def send_alert(self, alert_data: Dict[str, Any]):
"""Enviar alertas a sistemas externos"""
self.logger.warning(f"ALERTA: {alert_data}")
# Aquí se puede integrar con sistemas de notificación
# Slack, email, webhooks, etc.
def start(self):
self.mqtt_client.connect("mosquitto", 1883, 60)
self.mqtt_client.loop_forever()
if __name__ == "__main__":
gateway = IoTGateway()
gateway.start()
Gestión de Dispositivos con Kubernetes
# k8s/iot-device-manager.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: iot-device-manager
labels:
app: iot-device-manager
spec:
replicas: 3
selector:
matchLabels:
app: iot-device-manager
template:
metadata:
labels:
app: iot-device-manager
spec:
containers:
- name: device-manager
image: iot/device-manager:v1.0.0
ports:
- containerPort: 8080
env:
- name: MQTT_BROKER_URL
value: "tcp://mqtt-service:1883"
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: iot-secrets
key: database-url
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: iot-device-manager-service
spec:
selector:
app: iot-device-manager
ports:
- protocol: TCP
port: 80
targetPort: 8080
type: LoadBalancer
Monitorización y Alertas
# monitoring/iot_monitor.py - Sistema de Monitorización IoT
import asyncio
import json
from datetime import datetime, timedelta
from typing import List, Dict, Any
from influxdb_client import InfluxDBClient
import aiohttp
class IoTMonitor:
def __init__(self):
self.influx_client = InfluxDBClient(
url="http://localhost:8086",
token="your-influxdb-token",
org="iot-org"
)
self.query_api = self.influx_client.query_api()
self.alert_rules = self.load_alert_rules()
def load_alert_rules(self) -> List[Dict[str, Any]]:
"""Cargar reglas de alerta desde configuración"""
return [
{
"name": "device_offline",
"query": """
from(bucket: "iot-data")
|> range(start: -10m)
|> filter(fn: (r) => r["_measurement"] == "heartbeat")
|> group(columns: ["device_id"])
|> last()
""",
"condition": "last_seen > 5m",
"severity": "warning",
"action": "notify_ops_team"
},
{
"name": "high_cpu_usage",
"query": """
from(bucket: "iot-data")
|> range(start: -5m)
|> filter(fn: (r) => r["_measurement"] == "system_metrics")
|> filter(fn: (r) => r["_field"] == "cpu_percent")
|> mean()
""",
"condition": "_value > 85",
"severity": "critical",
"action": "scale_resources"
}
]
async def check_device_health(self):
"""Verificar salud de dispositivos"""
query = """
from(bucket: "iot-data")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "heartbeat")
|> group(columns: ["device_id", "device_type"])
|> last()
"""
try:
tables = self.query_api.query(query)
for table in tables:
for record in table.records:
device_id = record.values.get("device_id")
last_seen = record.get_time()
# Verificar si el dispositivo está offline
if datetime.now(last_seen.tzinfo) - last_seen > timedelta(minutes=10):
await self.trigger_alert({
"type": "device_offline",
"device_id": device_id,
"last_seen": last_seen.isoformat(),
"severity": "warning"
})
except Exception as e:
print(f"Error verificando salud de dispositivos: {e}")
async def trigger_alert(self, alert_data: Dict[str, Any]):
"""Disparar alerta"""
print(f"🚨 ALERTA: {alert_data}")
# Enviar a webhook de Slack/Teams
webhook_url = "https://hooks.slack.com/your-webhook"
slack_payload = {
"text": f"IoT Alert: {alert_data['type']}",
"attachments": [{
"color": "danger" if alert_data['severity'] == "critical" else "warning",
"fields": [{
"title": "Device ID",
"value": alert_data.get('device_id', 'N/A'),
"short": True
}, {
"title": "Severity",
"value": alert_data['severity'],
"short": True
}]
}]
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(webhook_url, json=slack_payload) as resp:
if resp.status == 200:
print("Alerta enviada correctamente")
except Exception as e:
print(f"Error enviando alerta: {e}")
async def run_monitoring_loop(self):
"""Bucle principal de monitorización"""
while True:
try:
await self.check_device_health()
# Añadir más verificaciones aquí
await asyncio.sleep(60) # Verificar cada minuto
except Exception as e:
print(f"Error en bucle de monitorización: {e}")
await asyncio.sleep(10)
if __name__ == "__main__":
monitor = IoTMonitor()
asyncio.run(monitor.run_monitoring_loop())
Edge Computing y Procesamiento Local
# edge/docker-compose.edge.yml
version: '3.8'
services:
edge-processor:
build: ./edge-processor
container_name: iot-edge-processor
volumes:
- /dev:/dev
- ./config:/app/config
privileged: true
network_mode: host
restart: always
environment:
- EDGE_ID=edge-001
- CLOUD_ENDPOINT=https://api.iot-platform.com
- LOCAL_STORAGE=/app/data
local-mqtt:
image: eclipse-mosquitto:2.0
container_name: edge-mqtt
ports:
- "1883:1883"
volumes:
- ./mqtt-config:/mosquitto/config
restart: always
redis-cache:
image: redis:7-alpine
container_name: edge-cache
ports:
- "6379:6379"
command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru
restart: always
# edge-processor/edge_main.py - Procesador Edge
import asyncio
import json
import redis
from datetime import datetime
from typing import Dict, Any
class EdgeProcessor:
def __init__(self):
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
self.local_storage = []
self.sync_interval = 300 # 5 minutos
async def process_sensor_data(self, sensor_data: Dict[str, Any]):
"""Procesamiento local de datos de sensores"""
# Aplicar filtros y transformaciones
processed_data = await self.apply_edge_rules(sensor_data)
# Almacenar localmente
self.store_locally(processed_data)
# Decisión de envío a la nube
if self.should_sync_to_cloud(processed_data):
await self.queue_for_cloud_sync(processed_data)
return processed_data
async def apply_edge_rules(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Aplicar reglas de procesamiento en el edge"""
# Ejemplo: Filtrado de ruido en sensores de temperatura
if 'temperature' in data:
temp = data['temperature']
if abs(temp - self.get_last_temperature()) > 20:
# Posible error de sensor, usar último valor válido
data['temperature'] = self.get_last_temperature()
data['anomaly_detected'] = True
# Agregar timestamp de procesamiento
data['processed_at'] = datetime.utcnow().isoformat()
data['edge_id'] = 'edge-001'
return data
def should_sync_to_cloud(self, data: Dict[str, Any]) -> bool:
"""Determinar si los datos deben enviarse a la nube"""
# Criterios para sincronización
if data.get('anomaly_detected'):
return True
if data.get('priority') == 'high':
return True
# Sincronización periódica para datos normales
return len(self.local_storage) > 100
async def queue_for_cloud_sync(self, data: Dict[str, Any]):
"""Encolar datos para sincronización con la nube"""
self.redis_client.lpush('cloud_sync_queue', json.dumps(data))
def store_locally(self, data: Dict[str, Any]):
"""Almacenar datos localmente"""
self.local_storage.append(data)
# Mantener solo los últimos 1000 registros
if len(self.local_storage) > 1000:
self.local_storage.pop(0)
def get_last_temperature(self) -> float:
"""Obtener última temperatura válida"""
for record in reversed(self.local_storage):
if 'temperature' in record and not record.get('anomaly_detected'):
return record['temperature']
return 20.0 # Valor por defecto
if __name__ == "__main__":
processor = EdgeProcessor()
# Iniciar procesamiento
Seguridad en IoT
Configuración de Certificados TLS
#!/bin/bash
# scripts/setup-iot-certificates.sh
# Crear CA para IoT
openssl genrsa -out ca-key.pem 4096
openssl req -new -x509 -days 365 -key ca-key.pem -sha256 -out ca.pem \
-subj "/C=ES/ST=Madrid/L=Madrid/O=IoT/CN=iot-ca"
# Crear certificado para broker MQTT
openssl genrsa -out mqtt-server-key.pem 4096
openssl req -subj "/CN=mqtt-broker" -sha256 -new -key mqtt-server-key.pem \
-out mqtt-server.csr
echo subjectAltName = DNS:mqtt-broker,IP:127.0.0.1 >> extfile.cnf
openssl x509 -req -days 365 -sha256 -in mqtt-server.csr -CA ca.pem \
-CAkey ca-key.pem -out mqtt-server-cert.pem -extfile extfile.cnf -CAcreateserial
# Crear certificados para dispositivos
generate_device_cert() {
local device_id=$1
openssl genrsa -out ${device_id}-key.pem 4096
openssl req -subj "/CN=${device_id}" -new -key ${device_id}-key.pem \
-out ${device_id}.csr
openssl x509 -req -days 365 -sha256 -in ${device_id}.csr -CA ca.pem \
-CAkey ca-key.pem -CAcreateserial -out ${device_id}-cert.pem
}
# Generar certificados para dispositivos de ejemplo
for device in sensor-001 sensor-002 gateway-001; do
generate_device_cert $device
done
Configuración MQTT Segura
# mqtt/config/mosquitto.conf
port 8883
cafile /mosquitto/certs/ca.pem
certfile /mosquitto/certs/mqtt-server-cert.pem
keyfile /mosquitto/certs/mqtt-server-key.pem
require_certificate true
use_identity_as_username true
# Logging
log_dest file /mosquitto/log/mosquitto.log
log_type error
log_type warning
log_type notice
log_type information
log_timestamp true
# Persistence
persistence true
persistence_location /mosquitto/data/
# Security
allow_anonymous false
password_file /mosquitto/config/passwd
# ACL - Control de acceso
acl_file /mosquitto/config/acl.conf
# mqtt/config/acl.conf - Control de Acceso
# Dispositivos solo pueden publicar en su topic
pattern read devices/%u/#
pattern write devices/%u/#
# Gateways pueden leer de todos los dispositivos
user gateway-001
topic read devices/+/#
topic write gateways/gateway-001/#
# Servicios de backend
user iot-backend
topic read #
topic write commands/#
Terraform para Infraestructura IoT en AWS
# terraform/iot-infrastructure.tf
provider "aws" {
region = var.aws_region
}
# IoT Core
resource "aws_iot_thing_type" "sensor_type" {
name = "industrial-sensor"
properties {
description = "Industrial sensor device type"
searchable_attributes = ["location", "sensor_type"]
}
}
# IoT Policy
resource "aws_iot_policy" "device_policy" {
name = "IoTDevicePolicy"
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"iot:Connect",
"iot:Publish",
"iot:Subscribe",
"iot:Receive"
]
Resource = "*"
Condition = {
StringEquals = {
"iot:Connection.Thing.IsAttached" = "true"
}
}
}
]
})
}
# DynamoDB para metadatos de dispositivos
resource "aws_dynamodb_table" "device_registry" {
name = "iot-device-registry"
billing_mode = "PAY_PER_REQUEST"
hash_key = "device_id"
attribute {
name = "device_id"
type = "S"
}
attribute {
name = "device_type"
type = "S"
}
global_secondary_index {
name = "device-type-index"
hash_key = "device_type"
}
tags = {
Name = "IoT Device Registry"
Environment = var.environment
}
}
# TimeStream para datos de sensores
resource "aws_timestreamwrite_database" "iot_database" {
database_name = "iot-sensor-data"
tags = {
Name = "IoT Sensor Database"
}
}
resource "aws_timestreamwrite_table" "sensor_data" {
database_name = aws_timestreamwrite_database.iot_database.database_name
table_name = "sensor-readings"
retention_properties {
memory_store_retention_period_in_hours = 24
magnetic_store_retention_period_in_days = 365
}
}
# Lambda para procesamiento de datos
resource "aws_lambda_function" "data_processor" {
filename = "data_processor.zip"
function_name = "iot-data-processor"
role = aws_iam_role.lambda_role.arn
handler = "lambda_function.lambda_handler"
source_code_hash = filebase64sha256("data_processor.zip")
runtime = "python3.9"
timeout = 60
environment {
variables = {
TIMESTREAM_DATABASE = aws_timestreamwrite_database.iot_database.database_name
TIMESTREAM_TABLE = aws_timestreamwrite_table.sensor_data.table_name
}
}
}
# IoT Rule para enrutar datos
resource "aws_iot_topic_rule" "sensor_data_rule" {
name = "sensor_data_processing_rule"
description = "Process sensor data from devices"
enabled = true
sql = "SELECT *, topic(2) as device_id FROM 'devices/+/data'"
sql_version = "2016-03-23"
lambda {
function_arn = aws_lambda_function.data_processor.arn
}
}
# CloudWatch Dashboard
resource "aws_cloudwatch_dashboard" "iot_dashboard" {
dashboard_name = "IoT-Infrastructure-Dashboard"
dashboard_body = jsonencode({
widgets = [
{
type = "metric"
x = 0
y = 0
width = 12
height = 6
properties = {
metrics = [
["AWS/IoT", "PublishIn.Success"],
[".", "PublishIn.Failure"],
["AWS/Lambda", "Invocations", "FunctionName", aws_lambda_function.data_processor.function_name],
[".", "Errors", ".", "."]
]
view = "timeSeries"
stacked = false
region = var.aws_region
title = "IoT Core Metrics"
period = 300
}
}
]
})
}
CI/CD para Dispositivos IoT
# .github/workflows/iot-deployment.yml
name: IoT Device Deployment
on:
push:
branches: [ main ]
paths:
- 'firmware/**'
- 'edge-software/**'
jobs:
build-firmware:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup PlatformIO
uses: enterprise-iot/platformio-action@v1
- name: Build firmware
run: |
pio run -e esp32dev
- name: Run tests
run: |
pio test -e esp32dev
- name: Upload firmware artifacts
uses: actions/upload-artifact@v3
with:
name: firmware
path: .pio/build/esp32dev/firmware.bin
build-edge-software:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r edge-software/requirements.txt
- name: Run tests
run: |
pytest edge-software/tests/
- name: Build Docker image
run: |
docker build -t iot-edge:${{ github.sha }} edge-software/
- name: Push to registry
run: |
docker push iot-edge:${{ github.sha }}
deploy-to-edge-devices:
needs: [build-firmware, build-edge-software]
runs-on: ubuntu-latest
steps:
- name: Deploy to edge gateways
run: |
# Script de despliegue a dispositivos edge
ansible-playbook -i inventory/edge-devices deploy-edge.yml
- name: Validate deployment
run: |
# Verificar conectividad y funcionalidad
python scripts/validate-edge-deployment.py
Casos de Uso Prácticos
Industria 4.0 - Mantenimiento Predictivo
# examples/predictive_maintenance.py
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import joblib
class PredictiveMaintenanceSystem:
def __init__(self):
self.model = IsolationForest(contamination=0.1, random_state=42)
self.scaler = StandardScaler()
self.is_trained = False
def train(self, historical_data):
"""Entrenar el modelo con datos históricos"""
features = ['vibration', 'temperature', 'pressure', 'sound_level']
X = historical_data[features]
# Normalizar datos
X_scaled = self.scaler.fit_transform(X)
# Entrenar modelo
self.model.fit(X_scaled)
self.is_trained = True
# Guardar modelo
joblib.dump(self.model, 'predictive_model.pkl')
joblib.dump(self.scaler, 'scaler.pkl')
def predict_anomaly(self, sensor_data):
"""Detectar anomalías en datos de sensores en tiempo real"""
if not self.is_trained:
return {"error": "Model not trained"}
features = ['vibration', 'temperature', 'pressure', 'sound_level']
X = np.array([sensor_data[f] for f in features]).reshape(1, -1)
X_scaled = self.scaler.transform(X)
anomaly_score = self.model.decision_function(X_scaled)[0]
is_anomaly = self.model.predict(X_scaled)[0] == -1
return {
"anomaly_detected": is_anomaly,
"anomaly_score": anomaly_score,
"severity": "high" if anomaly_score -0.5 else "medium" if anomaly_score -0.2 else "low"
}
# Uso del sistema
maintenance_system = PredictiveMaintenanceSystem()
# Simular datos de sensores en tiempo real
sensor_reading = {
"vibration": 2.1,
"temperature": 75.3,
"pressure": 14.7,
"sound_level": 68.2,
"timestamp": "2025-05-02T10:30:00Z",
"machine_id": "press-001"
}
result = maintenance_system.predict_anomaly(sensor_reading)
if result["anomaly_detected"]:
print(f"⚠️ Anomalía detectada en {sensor_reading['machine_id']}")
print(f"Severidad: {result['severity']}")
# Generar orden de trabajo de mantenimiento
Smart City - Gestión Inteligente del Tráfico
# examples/smart_traffic.py
import asyncio
import json
from datetime import datetime, timedelta
class SmartTrafficController:
def __init__(self):
self.intersections = {}
self.traffic_patterns = {}
self.emergency_mode = False
async def process_traffic_data(self, intersection_id, sensor_data):
"""Procesar datos de tráfico en tiempo real"""
# Actualizar estado de la intersección
self.intersections[intersection_id] = {
"vehicle_count": sensor_data.get("vehicle_count", 0),
"average_speed": sensor_data.get("average_speed", 0),
"queue_length": sensor_data.get("queue_length", 0),
"pedestrian_count": sensor_data.get("pedestrian_count", 0),
"last_update": datetime.now()
}
# Optimizar tiempos de semáforo
optimal_timing = await self.optimize_traffic_light(intersection_id)
# Aplicar cambios
await self.update_traffic_light(intersection_id, optimal_timing)
return optimal_timing
async def optimize_traffic_light(self, intersection_id):
"""Optimizar tiempos de semáforo basado en tráfico actual"""
data = self.intersections[intersection_id]
# Algoritmo básico de optimización
base_green_time = 30 # segundos
vehicle_factor = min(data["vehicle_count"] / 10, 2.0) # Max 2x multiplicador
queue_factor = min(data["queue_length"] / 5, 1.5) # Max 1.5x multiplicador
optimized_green_time = base_green_time * vehicle_factor * queue_factor
# Considerar peatones
if data["pedestrian_count"] > 5:
pedestrian_time = 15
else:
pedestrian_time = 8
return {
"green_time": min(optimized_green_time, 60), # Max 60 segundos
"red_time": 5,
"pedestrian_time": pedestrian_time,
"yellow_time": 3
}
async def update_traffic_light(self, intersection_id, timing):
"""Enviar comandos al semáforo"""
command = {
"intersection_id": intersection_id,
"command": "update_timing",
"timing": timing,
"timestamp": datetime.now().isoformat()
}
# Simular envío de comando MQTT
print(f"Sending command to traffic light {intersection_id}: {timing}")
return command
async def handle_emergency_vehicle(self, route_data):
"""Manejar vehículos de emergencia"""
self.emergency_mode = True
affected_intersections = route_data["intersections"]
for intersection_id in affected_intersections:
# Dar prioridad verde al vehículo de emergencia
emergency_timing = {
"green_time": 45,
"red_time": 3,
"pedestrian_time": 5,
"yellow_time": 2
}
await self.update_traffic_light(intersection_id, emergency_timing)
# Restaurar operación normal después de 5 minutos
await asyncio.sleep(300)
self.emergency_mode = False
# Simulación de uso
traffic_controller = SmartTrafficController()
# Datos de ejemplo de sensor de tráfico
traffic_sensor_data = {
"vehicle_count": 23,
"average_speed": 35.2,
"queue_length": 8,
"pedestrian_count": 3,
"timestamp": datetime.now().isoformat()
}
# Procesar datos y optimizar semáforo
asyncio.run(traffic_controller.process_traffic_data("intersection_001", traffic_sensor_data))
Mejores Prácticas y Consideraciones de Diseño
1. Arquitectura por Capas
- Separar claramente las responsabilidades entre dispositivos, edge, y cloud
- Implementar patrones de Circuit Breaker para resiliencia
- Usar message queues para desacoplar servicios
2. Seguridad por Diseño
- Implementar zero-trust architecture
- Rotación automática de certificados
- Monitoreo continuo de anomalías de seguridad
3. Escalabilidad Horizontal
- Diseñar para auto-scaling basado en métricas
- Implementar sharding de datos por región o tipo de dispositivo
- Usar CDN para distribución global de firmware
4. Observabilidad Completa
- Implementar distributed tracing
- Métricas de negocio además de métricas técnicas
- Alertas inteligentes que reduzcan false positives
Conclusión
La infraestructura IoT moderna requiere un enfoque holístico que combine dispositivos inteligentes, comunicaciones confiables, procesamiento distribuido y observabilidad completa. Con las herramientas y patrones presentados, puedes construir sistemas IoT que escalen desde prototipos hasta despliegues de millones de dispositivos.
La clave está en diseñar para la evolución: los requisitos cambiarán, las tecnologías avanzarán, y los dispositivos permanecerán en el campo durante años. Una arquitectura flexible y bien instrumentada te permitirá adaptar y mejorar tu infraestructura IoT continuamente.
La inversión en una infraestructura sólida desde el inicio pagará dividendos a medida que tu ecosistema IoT crezca y genere valor real para tu organización.