Merge pull request #114 from 1andrevich/1andrevich-patch-1

Update step 6 temp - summarization and ASN CIDRs.py
This commit is contained in:
Andrevich 2024-12-14 19:52:57 +02:00 committed by GitHub
commit e83ccc010c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -17,20 +17,26 @@ OUTPUT_FILE = 'sum/output/ipsum.lst'
# Path to the GeoLite2 ASN database
GEOIP_DB_PATH = 'sum/GeoLite2-ASN.mmdb'
GEOIP_DB_URL = 'https://git.io/GeoLite2-ASN.mmdb'
GEOIP_DB_URLS = [
'https://git.io/GeoLite2-ASN.mmdb',
'https://github.com/P3TERX/GeoLite.mmdb/raw/download/GeoLite2-ASN.mmdb'
]
# Function to download the GeoLite2 ASN database
def download_geolite2_asn_db():
if not os.path.exists(GEOIP_DB_PATH):
try:
response = requests.get(GEOIP_DB_URL)
response.raise_for_status()
with open(GEOIP_DB_PATH, 'wb') as f:
f.write(response.content)
logging.info(f'Downloaded GeoLite2 ASN database to {GEOIP_DB_PATH}')
except requests.RequestException as e:
logging.error(f'Failed to download GeoLite2 ASN database: {e}')
raise
for url in GEOIP_DB_URLS:
try:
response = requests.get(url)
response.raise_for_status()
with open(GEOIP_DB_PATH, 'wb') as f:
f.write(response.content)
logging.info(f'Downloaded GeoLite2 ASN database to {GEOIP_DB_PATH} from {url}')
return
except requests.RequestException as e:
logging.warning(f'Failed to download GeoLite2 ASN database from {url}: {e}')
logging.error('All attempts to download the GeoLite2 ASN database have failed.')
raise Exception('Unable to download GeoLite2 ASN database')
# Initialize the GeoIP2 reader
def initialize_geoip_reader():
@ -78,7 +84,6 @@ LOCAL_IP_CIDRS = [
# Function to summarize IPs into /28 subnets at most
def summarize_ips(ips):
try:
# Remove duplicates and sort IPs, treating them as networks (e.g., x.x.x.x/32)
networks = [ipaddress.ip_network(ip, strict=False) for ip in set(ips)]
collapsed_networks = ipaddress.collapse_addresses(networks)
summarized_networks = []
@ -98,7 +103,7 @@ def summarize_ips(ips):
# Function to handle rate-limiting errors (429) and retry after waiting
def handle_rate_limit():
wait_time = 60 # Wait time of 60 seconds
wait_time = 60
logging.warning(f'Rate limit hit. Waiting for {wait_time} seconds.')
time.sleep(wait_time)
@ -168,19 +173,11 @@ def write_summarized_ips(ips, filename):
# Main function to process ip.lst, summarize, and add CIDRs for company domains
def main():
# Initialize the GeoIP2 reader
reader = initialize_geoip_reader()
# Read IPs from ip.lst
ips = read_ips_from_file(IP_LST_PATH)
# Filter out local IPs
ips = [ip for ip in ips if not is_local_ip(ip)]
# Summarize the IPs into /28 networks
summarized_ips = summarize_ips(ips)
# Check domains.lst for COMPANY_DOMAINS matches and get corresponding CIDRs
domains = read_ips_from_file(DOMAINS_LST_PATH)
company_cidrs = set()
processed_asns = set()
@ -188,11 +185,8 @@ def main():
for domain in domains:
company_cidrs.update(process_domain_for_asn(domain, processed_asns))
# Combine summarized IPs and company CIDRs
final_cidrs = set(summarized_ips) | company_cidrs
# Write the final output to ipsum.lst
write_summarized_ips(final_cidrs, OUTPUT_FILE)
if __name__ == '__main__':
main()
main()