A Judit API utiliza paginação baseada em cursor para navegar através de grandes conjuntos de dados de forma eficiente e consistente.
Parâmetro | Tipo | Padrão | Descrição |
---|---|---|---|
page | integer | 1 | Número da página (baseado em 1) |
page_size | integer | 20 | Quantidade de itens por página (máx: 100) |
{
"page_data": [
// Array com os dados da página atual
],
"links": {
"next": "https://requests.prod.judit.io/requests?page=2&page_size=20",
"prev": "https://requests.prod.judit.io/requests?page=1&page_size=20",
"first": "https://requests.prod.judit.io/requests?page=1&page_size=20",
"last": "https://requests.prod.judit.io/requests?page=10&page_size=20"
},
"meta": {
"page": 1,
"page_count": 2,
"all_pages_count": 10,
"all_count": 200,
}
}
# Primeira página
curl -X GET "https://requests.prod.judit.io/requests?page=1&page_size=10" \
-H "api-key: $JUDIT_API_KEY"
# Página específica
curl -X GET "https://requests.prod.judit.io/requests?page=3&page_size=25" \
-H "api-key: $JUDIT_API_KEY"
def fetch_all_pages(endpoint, max_pages=None):
"""Busca todas as páginas de um endpoint"""
all_data = []
current_url = f"{base_url}/{endpoint}?page=1&page_size=50"
page_count = 0
while current_url and (max_pages is None or page_count < max_pages):
response = requests.get(current_url, headers=headers)
if response.status_code != 200:
print(f"Erro na página {page_count + 1}: {response.status_code}")
break
data = response.json()
all_data.extend(data.get('page_data', []))
# Próxima página usando o link fornecido
current_url = data.get('links', {}).get('next')
page_count += 1
print(f"Processada página {page_count}, "
f"itens coletados: {len(all_data)}")
return all_data
# Buscar todas as requisições
all_requests = fetch_all_pages('requests', max_pages=5)
print(f"Total de requisições coletadas: {len(all_requests)}")
def get_filtered_requests(status=None, page=1, page_size=20):
"""Busca requisições com filtro de status"""
params = {
'page': page,
'page_size': page_size
}
if status:
params['status'] = status
response = requests.get(
f"{base_url}/requests",
headers=headers,
params=params
)
return response.json() if response.status_code == 200 else None
# Buscar apenas requisições completadas
completed_requests = get_filtered_requests(
status='completed',
page=1,
page_size=50
)
def get_tracking_page(page=1, status=None):
"""Busca página de monitoramentos"""
params = {'page': page}
if status:
params['status'] = status
response = requests.get(
"https://tracking.prod.judit.io/tracking",
headers=headers,
params=params
)
return response.json() if response.status_code == 200 else None
# Buscar monitoramentos ativos
active_tracking = get_tracking_page(page=1, status='active', status='updated', status='updating')
# Para listagens rápidas
small_page = get_requests_page(page=1, page_size=10)
# Para processamento em lote
large_page = get_requests_page(page=1, page_size=100) # Máximo
import concurrent.futures
def fetch_page_range(start_page, end_page, page_size=50):
"""Busca múltiplas páginas em paralelo"""
def fetch_single_page(page):
return get_requests_page(page, page_size)
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Submeter todas as páginas
future_to_page = {
executor.submit(fetch_single_page, page): page
for page in range(start_page, end_page + 1)
}
# Coletar resultados
for future in concurrent.futures.as_completed(future_to_page):
page_num = future_to_page[future]
try:
page_data = future.result()
if page_data:
results.append((page_num, page_data))
except Exception as e:
print(f"Erro na página {page_num}: {e}")
# Ordenar por número da página
results.sort(key=lambda x: x[0])
return [data for _, data in results]
# Buscar páginas 1-5 em paralelo
parallel_results = fetch_page_range(1, 5)
import time
def paginate_with_rate_limit(endpoint, delay=0.2):
"""Paginação respeitando rate limits"""
page = 1
all_data = []
while True:
# Fazer requisição
data = get_requests_page(page, 50)
if not data or not data.get('page_data'):
break
all_data.extend(data['page_data'])
# Verificar se há próxima página
if not data.get('meta', {}).get('has_next'):
break
page += 1
# Delay para respeitar rate limit
time.sleep(delay)
print(f"Página {page-1} processada, "
f"total: {len(all_data)} itens")
return all_data
import time
import requests
def safe_paginate(endpoint, max_retries=3):
"""Paginação com tratamento de erros"""
page = 1
all_data = []
while True:
retry_count = 0
while retry_count < max_retries:
try:
data = get_requests_page(page, 50)
if not data:
return all_data
all_data.extend(data.get('page_data', []))
if not data.get('meta', {}).get('has_next'):
return all_data
break # Sucesso, sair do loop de retry
except requests.RequestException as e:
retry_count += 1
if retry_count >= max_retries:
print(f"Erro após {max_retries} tentativas: {e}")
return all_data
# Backoff exponencial
time.sleep(2 ** retry_count)
page += 1
return all_data