Integração em Produção
Checklist para Produção
Antes de fazer o deploy em produção, confirme:
Tratamento de Erros
Hierarquia de Tratamento
| Python |
|---|
| import requests
import time
import logging
logger = logging.getLogger(__name__)
def requisicao_robusta(url: str, headers: dict, method: str = "GET",
payload: dict = None, max_retries: int = 3) -> dict:
"""Requisição com tratamento completo de erros"""
for tentativa in range(1, max_retries + 1):
try:
response = requests.request(
method, url,
json=payload,
headers=headers,
timeout=30 # Timeout de 30s
)
if response.status_code == 401:
logger.warning("Token expirado — renovando...")
raise TokenExpiredError()
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
logger.warning(f"Rate limit atingido. Aguardando {retry_after}s")
time.sleep(retry_after)
continue
if response.status_code >= 500:
if tentativa < max_retries:
wait = 2 ** tentativa
logger.warning(f"Erro {response.status_code}. Retry {tentativa}/{max_retries} em {wait}s")
time.sleep(wait)
continue
response.raise_for_status()
return response.json()
except requests.Timeout:
logger.error(f"Timeout na tentativa {tentativa}")
if tentativa == max_retries:
raise
time.sleep(2 ** tentativa)
raise Exception(f"Falha após {max_retries} tentativas")
|
Retry e Backoff Exponencial
| Python |
|---|
| import time
import random
def backoff_exponencial(tentativa: int, base: float = 1.0,
max_wait: float = 60.0, jitter: bool = True) -> float:
"""Calcula tempo de espera com backoff exponencial + jitter"""
wait = min(base * (2 ** tentativa), max_wait)
if jitter:
wait *= (0.5 + random.random() * 0.5) # Jitter de 50-100%
return wait
# Uso
for tentativa in range(5):
try:
resultado = fazer_requisicao(url, headers)
break
except APIError as e:
if e.status_code in (429, 500, 502, 503, 504):
wait = backoff_exponencial(tentativa)
print(f"⏳ Aguardando {wait:.1f}s antes do retry...")
time.sleep(wait)
else:
raise # Não fazer retry para erros 4xx (exceto 429)
|
Controle de Rate Limit
| Python |
|---|
| def monitorar_rate_limit(response: requests.Response):
"""Monitora e alerta sobre uso do rate limit"""
limit = int(response.headers.get("X-RateLimit-Limit", 0))
remaining = int(response.headers.get("X-RateLimit-Remaining", 0))
reset = response.headers.get("X-RateLimit-Reset", "")
if limit > 0:
percentual = (remaining / limit) * 100
if percentual < 10:
logger.warning(f"⚠️ Rate limit crítico: {remaining}/{limit} restantes ({percentual:.0f}%)")
elif percentual < 20:
logger.info(f"Rate limit baixo: {remaining}/{limit} restantes")
|
Rate Limiter Preventivo
| Python |
|---|
| from threading import Semaphore
import time
class RateLimiter:
"""Limita requisições para não exceder o rate limit"""
def __init__(self, max_per_minute: int = 40):
self.max_per_minute = max_per_minute
self.semaphore = Semaphore(max_per_minute)
self.requests_times = []
def acquire(self):
now = time.time()
# Remover timestamps > 1 minuto atrás
self.requests_times = [t for t in self.requests_times if now - t < 60]
if len(self.requests_times) >= self.max_per_minute:
wait = 60 - (now - self.requests_times[0])
time.sleep(wait)
self.requests_times.append(time.time())
limiter = RateLimiter(max_per_minute=40) # 80% do limite de produção
|
Logs e Auditoria
| Python |
|---|
| import logging
import json
from datetime import datetime
# Configurar logger estruturado
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(name)s %(message)s'
)
def log_requisicao(method: str, url: str, status: int,
duration_ms: float, payload: dict = None):
"""Log estruturado de requisições para auditoria"""
logger.info(json.dumps({
"timestamp": datetime.utcnow().isoformat(),
"method": method,
"url": url,
"status": status,
"duration_ms": round(duration_ms, 2),
"payload_keys": list(payload.keys()) if payload else []
}))
|
Monitoramento e Observabilidade
Métricas Recomendadas
| Métrica | Alerta em |
| Taxa de erros 4xx/5xx | > 1% das requisições |
| Rate limit utilizado | > 80% |
| Tempo de resposta (p95) | > 3 segundos |
| Syncs com falha | > 0 em 30 min |
Healthcheck de Integração
| Python |
|---|
| def healthcheck_api(auth) -> bool:
"""Verifica se a API está acessível"""
try:
headers = auth.get_headers()
response = requests.get(
"https://apis.pontotel.com.br/pontotel/api/v4/empregadores/",
headers=headers,
params={"page_size": 1},
timeout=10
)
return response.status_code == 200
except Exception:
return False
|
Próximos Passos