Update step 6 temp - summarization and ASN CIDRs.py

This commit is contained in:
Andrevich 2024-12-14 22:35:17 +04:00 committed by GitHub
parent e976179322
commit fc3dd4bbde
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -84,28 +84,32 @@ LOCAL_IP_CIDRS = [
# Function to summarize IPs into /28 subnets at most
def summarize_ips(ips):
try:
# Remove duplicates and sort IPs, treating them as networks (e.g., x.x.x.x/32)
networks = [ipaddress.ip_network(ip, strict=False) for ip in set(ips)]
networks = []
for ip in set(ips):
try:
networks.append(ipaddress.ip_network(ip, strict=False))
except ValueError as e:
logging.warning(f"Skipping invalid IP or CIDR: {ip} ({e})")
collapsed_networks = ipaddress.collapse_addresses(networks)
summarized_networks = []
for network in collapsed_networks:
if network.prefixlen < 28: # If network is bigger than /28, split into /28
for subnet in network.subnets(new_prefix=28):
summarized_networks.append(subnet)
if network.prefixlen < 28:
summarized_networks.extend(network.subnets(new_prefix=28))
else:
summarized_networks.append(network)
logging.info(f'Summarized networks: {summarized_networks}')
logging.info(f"Summarized networks: {summarized_networks}")
return summarized_networks
except ValueError as e:
logging.error(f'Error summarizing IPs: {e}')
except Exception as e:
logging.error(f"Error summarizing IPs: {e}")
return []
# Function to handle rate-limiting errors (429) and retry after waiting
def handle_rate_limit():
wait_time = 60 # Wait time of 60 seconds
logging.warning(f'Rate limit hit. Waiting for {wait_time} seconds.')
wait_time = 60
logging.warning(f"Rate limit hit. Waiting for {wait_time} seconds.")
time.sleep(wait_time)
# Function to get CIDRs for a domain from ASN using ip.guide
@ -117,10 +121,10 @@ def get_cidr_for_asn(asn):
data = json.loads(result.stdout)
return data.get('routes', {}).get('v4', [])
else:
logging.error(f'Error executing curl command: {result.stderr}')
logging.error(f"Error executing curl command: {result.stderr}")
return []
except Exception as e:
logging.error(f'Error retrieving CIDRs for ASN {asn}: {e}')
logging.error(f"Error retrieving CIDRs for ASN {asn}: {e}")
return []
# Function to resolve a domain with retries and punycode support
@ -129,7 +133,7 @@ def resolve_domain(domain):
domain_punycode = idna_encode(domain).decode('utf-8')
return socket.gethostbyname_ex(domain_punycode)[2]
except Exception as e:
logging.error(f'Could not resolve domain {domain}: {e}')
logging.error(f"Could not resolve domain {domain}: {e}")
return []
# Function to check if a domain matches COMPANY_DOMAINS and fetch CIDRs
@ -148,7 +152,7 @@ def read_ips_from_file(file_path):
with open(file_path, 'r') as f:
return [line.strip() for line in f.readlines() if line.strip()]
except FileNotFoundError:
logging.error(f'File not found: {file_path}')
logging.error(f"File not found: {file_path}")
return []
# Function to check if an IP is local
@ -159,7 +163,7 @@ def is_local_ip(ip):
if ip_obj.version == cidr.version and ip_obj.subnet_of(cidr):
return True
except ValueError as e:
logging.error(f'Invalid IP or CIDR: {ip}: {e}')
logging.error(f"Invalid IP or CIDR: {ip}: {e}")
return False
# Function to write summarized CIDRs to ipsum.lst
@ -167,26 +171,18 @@ def write_summarized_ips(ips, filename):
try:
with open(filename, 'w') as f:
for cidr in ips:
f.write(f'{cidr}\n')
logging.info(f'Written summarized IPs to {filename}')
f.write(f"{cidr}\n")
logging.info(f"Written summarized IPs to {filename}")
except Exception as e:
logging.error(f'Error writing summarized IPs to file: {e}')
logging.error(f"Error writing summarized IPs to file: {e}")
# Main function to process ip.lst, summarize, and add CIDRs for company domains
def main():
# Initialize the GeoIP2 reader
reader = initialize_geoip_reader()
# Read IPs from ip.lst
ips = read_ips_from_file(IP_LST_PATH)
# Filter out local IPs
ips = [ip for ip in ips if not is_local_ip(ip)]
# Summarize the IPs into /28 networks
summarized_ips = summarize_ips(ips)
# Check domains.lst for COMPANY_DOMAINS matches and get corresponding CIDRs
domains = read_ips_from_file(DOMAINS_LST_PATH)
company_cidrs = set()
processed_asns = set()
@ -194,10 +190,7 @@ def main():
for domain in domains:
company_cidrs.update(process_domain_for_asn(domain, processed_asns))
# Combine summarized IPs and company CIDRs
final_cidrs = set(summarized_ips) | company_cidrs
# Write the final output to ipsum.lst
write_summarized_ips(final_cidrs, OUTPUT_FILE)
if __name__ == '__main__':