Pular para conteúdo

Integração em Produção

Checklist para Produção

Antes de fazer o deploy em produção, confirme:

  • ✅ Testou todos os fluxos no Sandbox
  • ✅ Implementou retry com backoff exponencial
  • ✅ Implementou renovação automática de token
  • ✅ Configurou monitoramento de erros 4xx/5xx
  • ✅ Configurou alertas de rate limit
  • ✅ Armazenou credenciais em variáveis de ambiente (não no código)
  • ✅ Implementou logs de auditoria
  • ✅ Testou cenários de falha (token expirado, rate limit, timeout)

Tratamento de Erros

Hierarquia de Tratamento

Python
import requests
import time
import logging

logger = logging.getLogger(__name__)

def requisicao_robusta(url: str, headers: dict, method: str = "GET", 
                        payload: dict = None, max_retries: int = 3) -> dict:
    """Requisição com tratamento completo de erros"""

    for tentativa in range(1, max_retries + 1):
        try:
            response = requests.request(
                method, url,
                json=payload,
                headers=headers,
                timeout=30  # Timeout de 30s
            )

            if response.status_code == 401:
                logger.warning("Token expirado — renovando...")
                raise TokenExpiredError()

            if response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", 60))
                logger.warning(f"Rate limit atingido. Aguardando {retry_after}s")
                time.sleep(retry_after)
                continue

            if response.status_code >= 500:
                if tentativa < max_retries:
                    wait = 2 ** tentativa
                    logger.warning(f"Erro {response.status_code}. Retry {tentativa}/{max_retries} em {wait}s")
                    time.sleep(wait)
                    continue

            response.raise_for_status()
            return response.json()

        except requests.Timeout:
            logger.error(f"Timeout na tentativa {tentativa}")
            if tentativa == max_retries:
                raise
            time.sleep(2 ** tentativa)

    raise Exception(f"Falha após {max_retries} tentativas")

Retry e Backoff Exponencial

Python
import time
import random

def backoff_exponencial(tentativa: int, base: float = 1.0, 
                         max_wait: float = 60.0, jitter: bool = True) -> float:
    """Calcula tempo de espera com backoff exponencial + jitter"""
    wait = min(base * (2 ** tentativa), max_wait)
    if jitter:
        wait *= (0.5 + random.random() * 0.5)  # Jitter de 50-100%
    return wait

# Uso
for tentativa in range(5):
    try:
        resultado = fazer_requisicao(url, headers)
        break
    except APIError as e:
        if e.status_code in (429, 500, 502, 503, 504):
            wait = backoff_exponencial(tentativa)
            print(f"⏳ Aguardando {wait:.1f}s antes do retry...")
            time.sleep(wait)
        else:
            raise  # Não fazer retry para erros 4xx (exceto 429)

Controle de Rate Limit

Monitoramento de Headers

Python
def monitorar_rate_limit(response: requests.Response):
    """Monitora e alerta sobre uso do rate limit"""
    limit = int(response.headers.get("X-RateLimit-Limit", 0))
    remaining = int(response.headers.get("X-RateLimit-Remaining", 0))
    reset = response.headers.get("X-RateLimit-Reset", "")

    if limit > 0:
        percentual = (remaining / limit) * 100

        if percentual < 10:
            logger.warning(f"⚠️ Rate limit crítico: {remaining}/{limit} restantes ({percentual:.0f}%)")
        elif percentual < 20:
            logger.info(f"Rate limit baixo: {remaining}/{limit} restantes")

Rate Limiter Preventivo

Python
from threading import Semaphore
import time

class RateLimiter:
    """Limita requisições para não exceder o rate limit"""

    def __init__(self, max_per_minute: int = 40):
        self.max_per_minute = max_per_minute
        self.semaphore = Semaphore(max_per_minute)
        self.requests_times = []

    def acquire(self):
        now = time.time()
        # Remover timestamps > 1 minuto atrás
        self.requests_times = [t for t in self.requests_times if now - t < 60]

        if len(self.requests_times) >= self.max_per_minute:
            wait = 60 - (now - self.requests_times[0])
            time.sleep(wait)

        self.requests_times.append(time.time())

limiter = RateLimiter(max_per_minute=40)  # 80% do limite de produção

Logs e Auditoria

Python
import logging
import json
from datetime import datetime

# Configurar logger estruturado
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s %(levelname)s %(name)s %(message)s'
)

def log_requisicao(method: str, url: str, status: int, 
                    duration_ms: float, payload: dict = None):
    """Log estruturado de requisições para auditoria"""
    logger.info(json.dumps({
        "timestamp": datetime.utcnow().isoformat(),
        "method": method,
        "url": url,
        "status": status,
        "duration_ms": round(duration_ms, 2),
        "payload_keys": list(payload.keys()) if payload else []
    }))

Monitoramento e Observabilidade

Métricas Recomendadas

Métrica Alerta em
Taxa de erros 4xx/5xx > 1% das requisições
Rate limit utilizado > 80%
Tempo de resposta (p95) > 3 segundos
Syncs com falha > 0 em 30 min

Healthcheck de Integração

Python
def healthcheck_api(auth) -> bool:
    """Verifica se a API está acessível"""
    try:
        headers = auth.get_headers()
        response = requests.get(
            "https://apis.pontotel.com.br/pontotel/api/v4/empregadores/",
            headers=headers,
            params={"page_size": 1},
            timeout=10
        )
        return response.status_code == 200
    except Exception:
        return False

Próximos Passos