commit
4c9c60c338
@ -5,6 +5,8 @@ import requests
|
||||
import ipaddress
|
||||
import time
|
||||
import os
|
||||
import subprocess
|
||||
import json
|
||||
from collections import defaultdict
|
||||
from idna import encode as idna_encode
|
||||
|
||||
@ -100,25 +102,17 @@ def handle_rate_limit():
|
||||
logging.warning(f'Rate limit hit. Waiting for {wait_time} seconds.')
|
||||
time.sleep(wait_time)
|
||||
|
||||
# Function to get CIDRs for a domain from ASN using GeoLite2
|
||||
# Function to get CIDRs for a domain from ASN using ip.guide
|
||||
def get_cidr_for_asn(asn):
|
||||
try:
|
||||
url = f'https://api.bgpview.io/asn/{asn}/prefixes'
|
||||
response = requests.get(url)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
return [prefix['prefix'] for prefix in data['data']['ipv4_prefixes']]
|
||||
|
||||
elif response.status_code == 429:
|
||||
handle_rate_limit()
|
||||
return get_cidr_for_asn(asn) # Retry after waiting
|
||||
|
||||
elif response.status_code == 403:
|
||||
logging.error(f'Access forbidden for ASN {asn}, skipping.')
|
||||
command = f'curl -sL https://ip.guide/as{asn}'
|
||||
result = subprocess.run(command, shell=True, capture_output=True, text=True)
|
||||
if result.returncode == 0:
|
||||
data = json.loads(result.stdout)
|
||||
return data.get('routes', {}).get('v4', [])
|
||||
else:
|
||||
logging.error(f'Error executing curl command: {result.stderr}')
|
||||
return []
|
||||
|
||||
return []
|
||||
except Exception as e:
|
||||
logging.error(f'Error retrieving CIDRs for ASN {asn}: {e}')
|
||||
return []
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user