diff --git a/src/step 6 temp - summarization and ASN CIDRs.py b/src/step 6 temp - summarization and ASN CIDRs.py index 01bac24..3283a88 100644 --- a/src/step 6 temp - summarization and ASN CIDRs.py +++ b/src/step 6 temp - summarization and ASN CIDRs.py @@ -84,28 +84,32 @@ LOCAL_IP_CIDRS = [ # Function to summarize IPs into /28 subnets at most def summarize_ips(ips): try: - # Remove duplicates and sort IPs, treating them as networks (e.g., x.x.x.x/32) - networks = [ipaddress.ip_network(ip, strict=False) for ip in set(ips)] + networks = [] + for ip in set(ips): + try: + networks.append(ipaddress.ip_network(ip, strict=False)) + except ValueError as e: + logging.warning(f"Skipping invalid IP or CIDR: {ip} ({e})") + collapsed_networks = ipaddress.collapse_addresses(networks) summarized_networks = [] for network in collapsed_networks: - if network.prefixlen < 28: # If network is bigger than /28, split into /28 - for subnet in network.subnets(new_prefix=28): - summarized_networks.append(subnet) + if network.prefixlen < 28: + summarized_networks.extend(network.subnets(new_prefix=28)) else: summarized_networks.append(network) - logging.info(f'Summarized networks: {summarized_networks}') + logging.info(f"Summarized networks: {summarized_networks}") return summarized_networks - except ValueError as e: - logging.error(f'Error summarizing IPs: {e}') + except Exception as e: + logging.error(f"Error summarizing IPs: {e}") return [] # Function to handle rate-limiting errors (429) and retry after waiting def handle_rate_limit(): - wait_time = 60 # Wait time of 60 seconds - logging.warning(f'Rate limit hit. Waiting for {wait_time} seconds.') + wait_time = 60 + logging.warning(f"Rate limit hit. Waiting for {wait_time} seconds.") time.sleep(wait_time) # Function to get CIDRs for a domain from ASN using ip.guide @@ -117,10 +121,10 @@ def get_cidr_for_asn(asn): data = json.loads(result.stdout) return data.get('routes', {}).get('v4', []) else: - logging.error(f'Error executing curl command: {result.stderr}') + logging.error(f"Error executing curl command: {result.stderr}") return [] except Exception as e: - logging.error(f'Error retrieving CIDRs for ASN {asn}: {e}') + logging.error(f"Error retrieving CIDRs for ASN {asn}: {e}") return [] # Function to resolve a domain with retries and punycode support @@ -129,7 +133,7 @@ def resolve_domain(domain): domain_punycode = idna_encode(domain).decode('utf-8') return socket.gethostbyname_ex(domain_punycode)[2] except Exception as e: - logging.error(f'Could not resolve domain {domain}: {e}') + logging.error(f"Could not resolve domain {domain}: {e}") return [] # Function to check if a domain matches COMPANY_DOMAINS and fetch CIDRs @@ -148,7 +152,7 @@ def read_ips_from_file(file_path): with open(file_path, 'r') as f: return [line.strip() for line in f.readlines() if line.strip()] except FileNotFoundError: - logging.error(f'File not found: {file_path}') + logging.error(f"File not found: {file_path}") return [] # Function to check if an IP is local @@ -159,7 +163,7 @@ def is_local_ip(ip): if ip_obj.version == cidr.version and ip_obj.subnet_of(cidr): return True except ValueError as e: - logging.error(f'Invalid IP or CIDR: {ip}: {e}') + logging.error(f"Invalid IP or CIDR: {ip}: {e}") return False # Function to write summarized CIDRs to ipsum.lst @@ -167,26 +171,18 @@ def write_summarized_ips(ips, filename): try: with open(filename, 'w') as f: for cidr in ips: - f.write(f'{cidr}\n') - logging.info(f'Written summarized IPs to {filename}') + f.write(f"{cidr}\n") + logging.info(f"Written summarized IPs to {filename}") except Exception as e: - logging.error(f'Error writing summarized IPs to file: {e}') + logging.error(f"Error writing summarized IPs to file: {e}") # Main function to process ip.lst, summarize, and add CIDRs for company domains def main(): - # Initialize the GeoIP2 reader reader = initialize_geoip_reader() - - # Read IPs from ip.lst ips = read_ips_from_file(IP_LST_PATH) - - # Filter out local IPs ips = [ip for ip in ips if not is_local_ip(ip)] - - # Summarize the IPs into /28 networks summarized_ips = summarize_ips(ips) - # Check domains.lst for COMPANY_DOMAINS matches and get corresponding CIDRs domains = read_ips_from_file(DOMAINS_LST_PATH) company_cidrs = set() processed_asns = set() @@ -194,10 +190,7 @@ def main(): for domain in domains: company_cidrs.update(process_domain_for_asn(domain, processed_asns)) - # Combine summarized IPs and company CIDRs final_cidrs = set(summarized_ips) | company_cidrs - - # Write the final output to ipsum.lst write_summarized_ips(final_cidrs, OUTPUT_FILE) if __name__ == '__main__':