A Judit API implementa rate limiting para garantir uso justo dos recursos e manter a performance para todos os usuários
Endpoint | Limite Específico | Observações |
---|---|---|
POST /requests | 180/min | Criação de novas consultas |
GET /requests | 180/min | Listagem de requisições |
GET /responses | 180/min | Consulta de resultados |
POST /tracking | 180/min | Criação de monitoramentos |
GET /tracking | 180/min | Listagem de monitoramentos |
import requests
import time
import os
api_key = os.getenv('JUDIT_API_KEY')
headers = {'api-key': api_key}
def check_rate_limit(response):
"""Verifica status do rate limit"""
limit = int(response.headers.get('X-RateLimit-Limit', 0))
remaining = int(response.headers.get('X-RateLimit-Remaining', 0))
reset_time = int(response.headers.get('X-RateLimit-Reset', 0))
usage_percent = ((limit - remaining) / limit) * 100 if limit > 0 else 0
print(f"Rate Limit: {remaining}/{limit} ({usage_percent:.1f}% usado)")
# Alerta quando próximo do limite
if usage_percent > 80:
reset_in = reset_time - int(time.time())
print(f"⚠️ Próximo do limite! Reset em {reset_in}s")
return {
'limit': limit,
'remaining': remaining,
'reset_time': reset_time,
'usage_percent': usage_percent
}
# Exemplo de uso
response = requests.get(
'https://requests.prod.judit.io/requests',
headers=headers
)
rate_info = check_rate_limit(response)
import time
import random
def exponential_backoff(attempt, base_delay=1, max_delay=60):
"""Calcula delay com backoff exponencial"""
delay = min(base_delay * (2 ** attempt), max_delay)
# Adicionar jitter para evitar thundering herd
jitter = random.uniform(0, delay * 0.1)
return delay + jitter
def make_request_with_backoff(url, headers, max_retries=5):
"""Faz requisição com retry e backoff exponencial"""
for attempt in range(max_retries):
try:
response = requests.get(url, headers=headers)
if response.status_code == 429: # Rate limit
if attempt == max_retries - 1:
raise Exception("Rate limit - máximo de tentativas atingido")
# Usar Retry-After se disponível
retry_after = response.headers.get('Retry-After')
if retry_after:
delay = int(retry_after)
else:
delay = exponential_backoff(attempt)
print(f"Rate limit atingido. Aguardando {delay:.1f}s...")
time.sleep(delay)
continue
return response
except requests.RequestException as e:
if attempt == max_retries - 1:
raise e
delay = exponential_backoff(attempt)
time.sleep(delay)
raise Exception("Máximo de tentativas atingido")
import threading
import queue
import time
class RateLimitedRequester:
def __init__(self, api_key, requests_per_minute=450):
self.api_key = api_key
self.headers = {'api-key': api_key}
self.requests_per_minute = requests_per_minute
self.request_queue = queue.Queue()
self.results = {}
self.running = False
# Controle de rate limit
self.request_times = []
self.lock = threading.Lock()
def can_make_request(self):
"""Verifica se pode fazer requisição baseado no rate limit"""
now = time.time()
with self.lock:
# Remove requisições antigas (mais de 1 minuto)
self.request_times = [
req_time for req_time in self.request_times
if now - req_time < 60
]
return len(self.request_times) < self.requests_per_minute
def add_request_time(self):
"""Adiciona timestamp da requisição atual"""
with self.lock:
self.request_times.append(time.time())
def add_request(self, url, request_id=None):
"""Adiciona requisição à fila"""
if request_id is None:
request_id = f"req_{int(time.time())}"
self.request_queue.put((url, request_id))
return request_id
def worker(self):
"""Worker thread para processar requisições"""
while self.running:
try:
url, request_id = self.request_queue.get(timeout=1)
# Aguardar se necessário
while not self.can_make_request():
time.sleep(1)
# Fazer requisição
self.add_request_time()
response = requests.get(url, headers=self.headers)
# Armazenar resultado
self.results[request_id] = {
'response': response,
'timestamp': time.time()
}
self.request_queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"Erro na requisição {request_id}: {e}")
self.request_queue.task_done()
def start(self, num_workers=3):
"""Inicia workers"""
self.running = True
self.workers = []
for _ in range(num_workers):
worker = threading.Thread(target=self.worker)
worker.start()
self.workers.append(worker)
def stop(self):
"""Para workers"""
self.running = False
for worker in self.workers:
worker.join()
def get_result(self, request_id, timeout=30):
"""Obtém resultado de uma requisição"""
start_time = time.time()
while time.time() - start_time < timeout:
if request_id in self.results:
return self.results.pop(request_id)
time.sleep(0.1)
return None
# Uso
requester = RateLimitedRequester(os.getenv('JUDIT_API_KEY'))
requester.start()
# Adicionar múltiplas requisições
req_ids = []
for i in range(10):
req_id = requester.add_request(f'https://requests.prod.judit.io/requests?page={i+1}')
req_ids.append(req_id)
# Coletar resultados
results = []
for req_id in req_ids:
result = requester.get_result(req_id)
if result:
results.append(result['response'].json())
requester.stop()
import hashlib
import json
from functools import wraps
class APICache:
def __init__(self, ttl_seconds=300): # 5 minutos padrão
self.cache = {}
self.ttl = ttl_seconds
def _generate_key(self, url, params=None):
"""Gera chave única para cache"""
cache_data = {'url': url, 'params': params or {}}
cache_string = json.dumps(cache_data, sort_keys=True)
return hashlib.md5(cache_string.encode()).hexdigest()
def get(self, url, params=None):
"""Obtém item do cache"""
key = self._generate_key(url, params)
if key in self.cache:
data, timestamp = self.cache[key]
if time.time() - timestamp < self.ttl:
return data
else:
del self.cache[key]
return None
def set(self, url, data, params=None):
"""Armazena item no cache"""
key = self._generate_key(url, params)
self.cache[key] = (data, time.time())
def clear_expired(self):
"""Remove itens expirados do cache"""
now = time.time()
expired_keys = [
key for key, (_, timestamp) in self.cache.items()
if now - timestamp >= self.ttl
]
for key in expired_keys:
del self.cache[key]
# Decorator para cache automático
def cached_request(cache_instance, ttl=300):
def decorator(func):
@wraps(func)
def wrapper(url, params=None, **kwargs):
# Tentar cache primeiro
cached_result = cache_instance.get(url, params)
if cached_result:
print(f"Cache hit para {url}")
return cached_result
# Fazer requisição
result = func(url, params=params, **kwargs)
# Armazenar no cache
if result and hasattr(result, 'status_code') and result.status_code == 200:
cache_instance.set(url, result, params)
return result
return wrapper
return decorator
# Uso
api_cache = APICache(ttl_seconds=600) # 10 minutos
@cached_request(api_cache)
def get_requests(url, params=None):
return requests.get(url, headers=headers, params=params)
# Requisições serão cacheadas automaticamente
response1 = get_requests('https://requests.prod.judit.io/requests', {'page': 1})
response2 = get_requests('https://requests.prod.judit.io/requests', {'page': 1}) # Cache hit
{
"error": "Rate limit exceeded",
"message": "Muitas requisições. Tente novamente em 60 segundos.",
"code": "RATE_LIMIT_EXCEEDED",
"retry_after": 60
}
def handle_rate_limit_error(response):
"""Trata erro de rate limit de forma robusta"""
if response.status_code == 429:
error_data = response.json()
retry_after = response.headers.get('Retry-After', '60')
print(f"Rate limit excedido: {error_data.get('message')}")
print(f"Retry após: {retry_after} segundos")
return int(retry_after)
return 0
def robust_api_call(url, max_retries=3):
"""Chamada de API com tratamento robusto de rate limit"""
for attempt in range(max_retries):
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json()
elif response.status_code == 429:
retry_after = handle_rate_limit_error(response)
if attempt < max_retries - 1:
print(f"Tentativa {attempt + 1}/{max_retries}. Aguardando...")
time.sleep(retry_after)
continue
else:
raise Exception("Rate limit - máximo de tentativas atingido")
else:
response.raise_for_status()
raise Exception("Máximo de tentativas atingido")
# Em vez de múltiplas requisições individuais
# for cpf in cpf_list:
# create_request(cpf)
# Use consulta agrupada quando possível
grouped_request = {
"search": {
"search_type": "custom",
"search_key": "batch_query",
"response_type": "lawsuits"
}
}
# Configure TTL baseado na frequência de mudança dos dados
cache_configs = {
'requests': 300, # 5 minutos - dados dinâmicos
'responses': 1800, # 30 minutos - dados mais estáveis
'tracking': 600, # 10 minutos - monitoramento
}
class PriorityRequester:
def __init__(self):
self.high_priority = queue.PriorityQueue()
self.normal_priority = queue.Queue()
def add_request(self, url, priority='normal'):
if priority == 'high':
self.high_priority.put((0, url)) # 0 = alta prioridade
else:
self.normal_priority.put(url)
class RateLimitDashboard:
def __init__(self):
self.metrics = {
'total_requests': 0,
'rate_limited': 0,
'avg_usage': 0,
'peak_usage': 0
}
def update_metrics(self, rate_info):
self.metrics['total_requests'] += 1
if rate_info['remaining'] == 0:
self.metrics['rate_limited'] += 1
usage = rate_info['usage_percent']
self.metrics['avg_usage'] = (
(self.metrics['avg_usage'] * (self.metrics['total_requests'] - 1) + usage)
/ self.metrics['total_requests']
)
if usage > self.metrics['peak_usage']:
self.metrics['peak_usage'] = usage
def get_report(self):
return {
'total_requests': self.metrics['total_requests'],
'rate_limited_percent': (self.metrics['rate_limited'] / max(self.metrics['total_requests'], 1)) * 100,
'avg_usage_percent': self.metrics['avg_usage'],
'peak_usage_percent': self.metrics['peak_usage']
}
Importante: Para aplicações com alto volume, entre em contato para discutir limites personalizados.