Back to Blog
Jorge

Automatizando Migraciones de Datos con Python

En el día a día de un desarrollador, las migraciones de datos entre sistemas suelen ser tareas comunes pero delicadas. Hoy exploraremos cómo construir un script en Python para automatizar un flujo de trabajo que involucra tres fuentes de datos distintas y una API externa, asegurando eficiencia y respeto por los límites de tráfico (Rate Limiting).


El Desafío

Necesitábamos registrar productos en un nuevo sistema (Shopify) basándonos en una lista de IDs antiguos. El flujo requería:

  • Leer una lista de IDs desde un CSV.
  • Traducir esos IDs a SKUs usando un archivo JSON con estructura compleja (formato tipo DynamoDB).
  • Verificar si el SKU ya existe en un segundo CSV (export de productos actuales) para evitar duplicados.
  • Si el producto no existe, enviarlo a una API REST.
  • Implementar una pausa entre peticiones para no saturar el servidor.

Herramientas utilizadas

  • Python 3
  • Librería requests: para las peticiones HTTP.
  • Librerías csv y json: integradas en Python para el manejo de archivos.

El Script: Paso a Paso

1. Optimización de búsqueda con set

Uno de los errores comunes es buscar dentro de una lista en cada iteración. Para verificar si un SKU ya existe en un archivo de miles de filas, lo mejor es cargar esos datos en un set. Esto reduce la complejidad de búsqueda de O(n) a O(1).

existing_skus = set()
with open('productos_actuales.csv', mode='r', encoding='utf-8-sig') as f:
    reader = csv.DictReader(f)
    for row in reader:
        sku = row.get('Variant SKU')
        if sku:
            existing_skus.add(sku.strip())

2. Procesando el mapeo JSON

A veces los datos no vienen en formatos planos. En este caso, el JSON tenía una estructura anidada donde los valores están envueltos en llaves de tipo (ej. {"S": "valor"}).

sku_mapping = {}
with open('mapeo.json', 'r') as f:
    data = json.load(f)
    for item in data.get("Items", []):
        ml_id = item.get("Id", {}).get("S")
        sku_val = item.get("SKU", {}).get("S")
        if ml_id and sku_val:
            sku_mapping[ml_id] = sku_val

3. El ciclo de migración y Rate Limiting

Para ser buenos ciudadanos con la API que recibe los datos, es vital no enviar cientos de peticiones por segundo. La función time.sleep(10) nos permite cumplir con los límites de tasa del servidor.

import time
import requests

def procesar_migracion():
    # ... (carga de archivos)
    
    for id_producto in lista_ids:
        sku = sku_mapping.get(id_producto)
        
        if sku in existing_skus:
            print(f"Saltando {sku}: ya registrado.")
            continue
            
        # Petición POST
        payload = {"action": "migrate", "sku": sku}
        headers = {"Authorization": "TU_TOKEN_AQUI"}
        
        response = requests.post(API_URL, json=payload, headers=headers)
        
        if response.status_code == 200:
            print(f"Registro exitoso: {sku}")
            # Pausa de seguridad de 10 segundos
            time.sleep(10)
        else:
            print(f"Error al registrar {sku}")

Conclusión

Automatizar este tipo de procesos con Python no solo ahorra horas de trabajo manual, sino que reduce drásticamente el error humano. Los puntos clave para recordar en cualquier script de integración son:

  • Limpiar los datos: usa .strip() para evitar errores por espacios invisibles.
  • Eficiencia: utiliza set o diccionarios para búsquedas rápidas.
  • Respeto al servidor: si vas a hacer cientos de peticiones, implementa siempre un sleep.

🚀 ¡Feliz codificación!