Automatizando Migraciones de Datos con Python
En el día a día de un desarrollador, las migraciones de datos entre sistemas suelen ser tareas comunes pero delicadas. Hoy exploraremos cómo construir un script en Python para automatizar un flujo de trabajo que involucra tres fuentes de datos distintas y una API externa, asegurando eficiencia y respeto por los límites de tráfico (Rate Limiting).
El Desafío
Necesitábamos registrar productos en un nuevo sistema (Shopify) basándonos en una lista de IDs antiguos. El flujo requería:
- Leer una lista de IDs desde un CSV.
- Traducir esos IDs a SKUs usando un archivo JSON con estructura compleja (formato tipo DynamoDB).
- Verificar si el SKU ya existe en un segundo CSV (export de productos actuales) para evitar duplicados.
- Si el producto no existe, enviarlo a una API REST.
- Implementar una pausa entre peticiones para no saturar el servidor.
Herramientas utilizadas
- Python 3
- Librería
requests: para las peticiones HTTP. - Librerías
csvyjson: integradas en Python para el manejo de archivos.
El Script: Paso a Paso
1. Optimización de búsqueda con set
Uno de los errores comunes es buscar dentro de una lista en cada iteración. Para verificar si un SKU ya existe en un archivo de miles de filas, lo mejor es cargar esos datos en un set. Esto reduce la complejidad de búsqueda de O(n) a O(1).
existing_skus = set()
with open('productos_actuales.csv', mode='r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
for row in reader:
sku = row.get('Variant SKU')
if sku:
existing_skus.add(sku.strip())
2. Procesando el mapeo JSON
A veces los datos no vienen en formatos planos. En este caso, el JSON tenía una estructura anidada donde los valores están envueltos en llaves de tipo (ej. {"S": "valor"}).
sku_mapping = {}
with open('mapeo.json', 'r') as f:
data = json.load(f)
for item in data.get("Items", []):
ml_id = item.get("Id", {}).get("S")
sku_val = item.get("SKU", {}).get("S")
if ml_id and sku_val:
sku_mapping[ml_id] = sku_val
3. El ciclo de migración y Rate Limiting
Para ser buenos ciudadanos con la API que recibe los datos, es vital no enviar cientos de peticiones por segundo.
La función time.sleep(10) nos permite cumplir con los límites de tasa del servidor.
import time
import requests
def procesar_migracion():
# ... (carga de archivos)
for id_producto in lista_ids:
sku = sku_mapping.get(id_producto)
if sku in existing_skus:
print(f"Saltando {sku}: ya registrado.")
continue
# Petición POST
payload = {"action": "migrate", "sku": sku}
headers = {"Authorization": "TU_TOKEN_AQUI"}
response = requests.post(API_URL, json=payload, headers=headers)
if response.status_code == 200:
print(f"Registro exitoso: {sku}")
# Pausa de seguridad de 10 segundos
time.sleep(10)
else:
print(f"Error al registrar {sku}")
Conclusión
Automatizar este tipo de procesos con Python no solo ahorra horas de trabajo manual, sino que reduce drásticamente el error humano. Los puntos clave para recordar en cualquier script de integración son:
- Limpiar los datos: usa
.strip()para evitar errores por espacios invisibles. - Eficiencia: utiliza
seto diccionarios para búsquedas rápidas. - Respeto al servidor: si vas a hacer cientos de peticiones, implementa siempre un
sleep.
🚀 ¡Feliz codificación!