Limites Padrão

Limite Global

  • 180 requisições por minuto por API Key
  • Janela deslizante de 60 segundos
  • Reset automático a cada minuto

Limites por Endpoint

EndpointLimite EspecíficoObservações
POST /requests180/minCriação de novas consultas
GET /requests180/minListagem de requisições
GET /responses180/minConsulta de resultados
POST /tracking180/minCriação de monitoramentos
GET /tracking180/minListagem de monitoramentos

Monitoramento de Rate Limit

import requests
import time
import os

api_key = os.getenv('JUDIT_API_KEY')
headers = {'api-key': api_key}

def check_rate_limit(response):
    """Verifica status do rate limit"""
    limit = int(response.headers.get('X-RateLimit-Limit', 0))
    remaining = int(response.headers.get('X-RateLimit-Remaining', 0))
    reset_time = int(response.headers.get('X-RateLimit-Reset', 0))
    
    usage_percent = ((limit - remaining) / limit) * 100 if limit > 0 else 0
    
    print(f"Rate Limit: {remaining}/{limit} ({usage_percent:.1f}% usado)")
    
    # Alerta quando próximo do limite
    if usage_percent > 80:
        reset_in = reset_time - int(time.time())
        print(f"⚠️ Próximo do limite! Reset em {reset_in}s")
        
    return {
        'limit': limit,
        'remaining': remaining,
        'reset_time': reset_time,
        'usage_percent': usage_percent
    }

# Exemplo de uso
response = requests.get(
    'https://requests.prod.judit.io/requests',
    headers=headers
)

rate_info = check_rate_limit(response)

Estratégias para Lidar com Rate Limits

1. Backoff Exponencial

import time
import random

def exponential_backoff(attempt, base_delay=1, max_delay=60):
    """Calcula delay com backoff exponencial"""
    delay = min(base_delay * (2 ** attempt), max_delay)
    # Adicionar jitter para evitar thundering herd
    jitter = random.uniform(0, delay * 0.1)
    return delay + jitter

def make_request_with_backoff(url, headers, max_retries=5):
    """Faz requisição com retry e backoff exponencial"""
    
    for attempt in range(max_retries):
        try:
            response = requests.get(url, headers=headers)
            
            if response.status_code == 429:  # Rate limit
                if attempt == max_retries - 1:
                    raise Exception("Rate limit - máximo de tentativas atingido")
                
                # Usar Retry-After se disponível
                retry_after = response.headers.get('Retry-After')
                if retry_after:
                    delay = int(retry_after)
                else:
                    delay = exponential_backoff(attempt)
                
                print(f"Rate limit atingido. Aguardando {delay:.1f}s...")
                time.sleep(delay)
                continue
                
            return response
            
        except requests.RequestException as e:
            if attempt == max_retries - 1:
                raise e
            
            delay = exponential_backoff(attempt)
            time.sleep(delay)
    
    raise Exception("Máximo de tentativas atingido")

2. Pool de Requisições com Controle

import threading
import queue
import time

class RateLimitedRequester:
    def __init__(self, api_key, requests_per_minute=450):
        self.api_key = api_key
        self.headers = {'api-key': api_key}
        self.requests_per_minute = requests_per_minute
        self.request_queue = queue.Queue()
        self.results = {}
        self.running = False
        
        # Controle de rate limit
        self.request_times = []
        self.lock = threading.Lock()
    
    def can_make_request(self):
        """Verifica se pode fazer requisição baseado no rate limit"""
        now = time.time()
        
        with self.lock:
            # Remove requisições antigas (mais de 1 minuto)
            self.request_times = [
                req_time for req_time in self.request_times 
                if now - req_time < 60
            ]
            
            return len(self.request_times) < self.requests_per_minute
    
    def add_request_time(self):
        """Adiciona timestamp da requisição atual"""
        with self.lock:
            self.request_times.append(time.time())
    
    def add_request(self, url, request_id=None):
        """Adiciona requisição à fila"""
        if request_id is None:
            request_id = f"req_{int(time.time())}"
        
        self.request_queue.put((url, request_id))
        return request_id
    
    def worker(self):
        """Worker thread para processar requisições"""
        while self.running:
            try:
                url, request_id = self.request_queue.get(timeout=1)
                
                # Aguardar se necessário
                while not self.can_make_request():
                    time.sleep(1)
                
                # Fazer requisição
                self.add_request_time()
                response = requests.get(url, headers=self.headers)
                
                # Armazenar resultado
                self.results[request_id] = {
                    'response': response,
                    'timestamp': time.time()
                }
                
                self.request_queue.task_done()
                
            except queue.Empty:
                continue
            except Exception as e:
                print(f"Erro na requisição {request_id}: {e}")
                self.request_queue.task_done()
    
    def start(self, num_workers=3):
        """Inicia workers"""
        self.running = True
        self.workers = []
        
        for _ in range(num_workers):
            worker = threading.Thread(target=self.worker)
            worker.start()
            self.workers.append(worker)
    
    def stop(self):
        """Para workers"""
        self.running = False
        for worker in self.workers:
            worker.join()
    
    def get_result(self, request_id, timeout=30):
        """Obtém resultado de uma requisição"""
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            if request_id in self.results:
                return self.results.pop(request_id)
            time.sleep(0.1)
        
        return None

# Uso
requester = RateLimitedRequester(os.getenv('JUDIT_API_KEY'))
requester.start()

# Adicionar múltiplas requisições
req_ids = []
for i in range(10):
    req_id = requester.add_request(f'https://requests.prod.judit.io/requests?page={i+1}')
    req_ids.append(req_id)

# Coletar resultados
results = []
for req_id in req_ids:
    result = requester.get_result(req_id)
    if result:
        results.append(result['response'].json())

requester.stop()

3. Cache Inteligente

import hashlib
import json
from functools import wraps

class APICache:
    def __init__(self, ttl_seconds=300):  # 5 minutos padrão
        self.cache = {}
        self.ttl = ttl_seconds
    
    def _generate_key(self, url, params=None):
        """Gera chave única para cache"""
        cache_data = {'url': url, 'params': params or {}}
        cache_string = json.dumps(cache_data, sort_keys=True)
        return hashlib.md5(cache_string.encode()).hexdigest()
    
    def get(self, url, params=None):
        """Obtém item do cache"""
        key = self._generate_key(url, params)
        
        if key in self.cache:
            data, timestamp = self.cache[key]
            if time.time() - timestamp < self.ttl:
                return data
            else:
                del self.cache[key]
        
        return None
    
    def set(self, url, data, params=None):
        """Armazena item no cache"""
        key = self._generate_key(url, params)
        self.cache[key] = (data, time.time())
    
    def clear_expired(self):
        """Remove itens expirados do cache"""
        now = time.time()
        expired_keys = [
            key for key, (_, timestamp) in self.cache.items()
            if now - timestamp >= self.ttl
        ]
        
        for key in expired_keys:
            del self.cache[key]

# Decorator para cache automático
def cached_request(cache_instance, ttl=300):
    def decorator(func):
        @wraps(func)
        def wrapper(url, params=None, **kwargs):
            # Tentar cache primeiro
            cached_result = cache_instance.get(url, params)
            if cached_result:
                print(f"Cache hit para {url}")
                return cached_result
            
            # Fazer requisição
            result = func(url, params=params, **kwargs)
            
            # Armazenar no cache
            if result and hasattr(result, 'status_code') and result.status_code == 200:
                cache_instance.set(url, result, params)
            
            return result
        
        return wrapper
    return decorator

# Uso
api_cache = APICache(ttl_seconds=600)  # 10 minutos

@cached_request(api_cache)
def get_requests(url, params=None):
    return requests.get(url, headers=headers, params=params)

# Requisições serão cacheadas automaticamente
response1 = get_requests('https://requests.prod.judit.io/requests', {'page': 1})
response2 = get_requests('https://requests.prod.judit.io/requests', {'page': 1})  # Cache hit

Tratamento de Erro 429

Resposta de Rate Limit Excedido

{
  "error": "Rate limit exceeded",
  "message": "Muitas requisições. Tente novamente em 60 segundos.",
  "code": "RATE_LIMIT_EXCEEDED",
  "retry_after": 60
}

Tratamento Robusto

def handle_rate_limit_error(response):
    """Trata erro de rate limit de forma robusta"""
    
    if response.status_code == 429:
        error_data = response.json()
        retry_after = response.headers.get('Retry-After', '60')
        
        print(f"Rate limit excedido: {error_data.get('message')}")
        print(f"Retry após: {retry_after} segundos")
        
        return int(retry_after)
    
    return 0

def robust_api_call(url, max_retries=3):
    """Chamada de API com tratamento robusto de rate limit"""
    
    for attempt in range(max_retries):
        response = requests.get(url, headers=headers)
        
        if response.status_code == 200:
            return response.json()
        
        elif response.status_code == 429:
            retry_after = handle_rate_limit_error(response)
            
            if attempt < max_retries - 1:
                print(f"Tentativa {attempt + 1}/{max_retries}. Aguardando...")
                time.sleep(retry_after)
                continue
            else:
                raise Exception("Rate limit - máximo de tentativas atingido")
        
        else:
            response.raise_for_status()
    
    raise Exception("Máximo de tentativas atingido")

Otimização de Uso

1. Batch Requests

# Em vez de múltiplas requisições individuais
# for cpf in cpf_list:
#     create_request(cpf)

# Use consulta agrupada quando possível
grouped_request = {
    "search": {
        "search_type": "custom",
        "search_key": "batch_query",
        "response_type": "lawsuits"
    }
}

2. Cache com TTL Inteligente

# Configure TTL baseado na frequência de mudança dos dados
cache_configs = {
    'requests': 300,      # 5 minutos - dados dinâmicos
    'responses': 1800,    # 30 minutos - dados mais estáveis
    'tracking': 600,      # 10 minutos - monitoramento
}

3. Priorização de Requisições

class PriorityRequester:
    def __init__(self):
        self.high_priority = queue.PriorityQueue()
        self.normal_priority = queue.Queue()
    
    def add_request(self, url, priority='normal'):
        if priority == 'high':
            self.high_priority.put((0, url))  # 0 = alta prioridade
        else:
            self.normal_priority.put(url)

Monitoramento e Alertas

Dashboard de Rate Limit

class RateLimitDashboard:
    def __init__(self):
        self.metrics = {
            'total_requests': 0,
            'rate_limited': 0,
            'avg_usage': 0,
            'peak_usage': 0
        }
    
    def update_metrics(self, rate_info):
        self.metrics['total_requests'] += 1
        
        if rate_info['remaining'] == 0:
            self.metrics['rate_limited'] += 1
        
        usage = rate_info['usage_percent']
        self.metrics['avg_usage'] = (
            (self.metrics['avg_usage'] * (self.metrics['total_requests'] - 1) + usage) 
            / self.metrics['total_requests']
        )
        
        if usage > self.metrics['peak_usage']:
            self.metrics['peak_usage'] = usage
    
    def get_report(self):
        return {
            'total_requests': self.metrics['total_requests'],
            'rate_limited_percent': (self.metrics['rate_limited'] / max(self.metrics['total_requests'], 1)) * 100,
            'avg_usage_percent': self.metrics['avg_usage'],
            'peak_usage_percent': self.metrics['peak_usage']
        }

Próximos Passos

Importante: Para aplicações com alto volume, entre em contato para discutir limites personalizados.