Source code

This commit is contained in:
Andrevich 2024-10-13 16:16:18 +04:00
parent 3683f0e0e7
commit 2de476a579
22 changed files with 22612 additions and 0 deletions

BIN
src/GeoLite2-ASN.mmdb Normal file

Binary file not shown.

View File

@ -0,0 +1,157 @@
import requests
from concurrent.futures import ThreadPoolExecutor
import gc
import re
import logging
import time
import sys
# Setup logging with real-time output to both file and console
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# File handler (writes logs to file)
file_handler = logging.FileHandler('domain_analysis.log', mode='a', encoding='utf-8')
file_handler.setLevel(logging.INFO)
# Stream handler (outputs to console/terminal in real-time)
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setLevel(logging.INFO)
# Format for log messages
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# Apply the format to handlers
file_handler.setFormatter(formatter)
stream_handler.setFormatter(formatter)
# Add handlers to logger
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
# Updated Keywords
DRUG_KEYWORDS = [
'drug', 'narcotic', 'buy drugs', 'купить наркотики', 'метамфетамин', 'weed', 'xanax',
'xanaks', 'anasha', 'амфетамин', 'кокаин', 'метадон', 'mefedron', 'крокодил',
'amfetamin', 'cocaine', 'каннабис', 'мариухана', 'марихуана', 'ecstasy', 'blacksprut'
]
CASINO_KEYWORDS = [
'casino', 'gamble', 'казино', 'игры на деньги', 'покер', 'ставки', 'blackjack',
'roulette', 'slots', 'jackpot', 'winbig', '1win', 'vulkan', 'адмирал', 'лотерея',
'poker', 'sloty', 'рулетка', 'джекпот', 'ставка', 'слоты', 'бонусы', 'игровые автоматы', 'крутить'
]
INACTIVE_PHRASES = [
'nginx', 'apache', 'site for sale', 'сайт продается', 'this domain is for sale',
'under construction', 'в разработке', 'this website is under construction',
'maintenance mode', 'технические работы', 'страница недоступна', 'coming soon', 'Купить этот домен.'
'купить домен', 'купить этот домен', 'продам домен', 'domain for sale', 'Купить этот домен', 'Содержимое появится позже.'
'domain is for sale', 'domain available', 'продажа домена', 'свободный домен', 'Site is created successfully!' ,
'this site is for sale', 'временно недоступен', 'out of service', "www.w3.org/1999/xhtml" , 'Web server is returning an unknown error'
'этот домен продается', 'домен выставлен на продажу', 'service unavailable', 'Website blankdomain.com is ready. The content is to be added' ,
'503 service unavailable', 'закрыт на реконструкцию', 'сайт на реконструкции', 'Домен не прилинкован к директории на сервере'
'domain expired', 'домен истек', 'сайт временно не работает', 'default page', 'Срок регистрации домена истек'
]
PHISHING_KEYWORDS = [
'billing', 'invoice', 'banking', 'доступ к счету', 'инвестируй', 'зарабатывай' ,
'вход в аккаунт', 'доход', 'кредит', 'требуется подтверждение', 'подтвердите данные',
'биллинг', 'банковский аккаунт', 'Присоединяйтесь к проекту', 'Зарабатывайте'
]
ADULT_KEYWORDS = [
'escort', 'проститутки', 'striptiz', 'массаж', 'massaj', 'интим услуги', 'девушки по вызову', 'Порно с детьми', 'Детское порно'
'путана', 'проститутка', 'секс услуги', 'проститутки', 'adult dating', 'Rape', 'Kill', 'Gore', 'Порно с животными'
'эскорт', 'проститутка', 'эротический массаж', 'Animal Porn', 'Zoo Porn', 'Child Porn', 'Snuff', 'Dead Porn'
]
ILLEGAL_KEYWORDS = [
'fraud', 'подделка документов', 'russianbrides', 'русские невесты'
]
ALL_KEYWORDS = DRUG_KEYWORDS + CASINO_KEYWORDS + INACTIVE_PHRASES + PHISHING_KEYWORDS + ADULT_KEYWORDS + ILLEGAL_KEYWORDS
# User-agent to simulate real browser requests
HEADERS = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
# Function to fetch the page using HTTPS first, then fallback to HTTP
def fetch_page(domain):
for protocol in ['https://', 'http://']:
try:
url = f"{protocol}{domain}"
response = requests.get(url, headers=HEADERS, timeout=5, allow_redirects=False)
if response.status_code == 200:
return response.text
except requests.RequestException as e:
logger.error(f"Error fetching {url}: {e}")
return None
# Function to check for unwanted content
def check_content(domain, content):
found_keywords = [keyword for keyword in ALL_KEYWORDS if re.search(keyword, content, re.IGNORECASE)]
# Only filter if at least 2 keywords are matched
if len(found_keywords) >= 2:
logger.info(f"Domain reported: {domain} contains suspicious content. Keywords: {', '.join(found_keywords[:5])}")
return 'report'
# Check for inactive phrases separately
if any(re.search(phrase, content, re.IGNORECASE) for phrase in INACTIVE_PHRASES):
logger.info(f"Domain removed: {domain} inactive or for sale.")
return 'remove'
return 'keep'
# Main processing function
def process_domain(domain, clean_file, filtered_file):
content = fetch_page(domain)
if content:
status = check_content(domain, content)
if status == 'keep':
logger.info(f"Domain kept: {domain}. Summary of content: {content[:200]}...")
# Write kept domain to clean file
with open(clean_file, 'a') as cf:
cf.write(f"{domain}\n")
else:
# Write filtered domain to filtered file
with open(filtered_file, 'a') as ff:
ff.write(f"{domain}\n")
# Manually trigger garbage collection after processing each domain
gc.collect()
else:
logger.info(f"Domain skipped or error: {domain} could not be fetched.")
# Write skipped or error domain to clean file
with open(clean_file, 'a') as cf:
cf.write(f"{domain}\n")
# Main script runner
def run_script(domain_list):
clean_file = 'clean_domains.lst'
filtered_file = 'filtered_domains.lst'
# Clear contents of the output files before starting
open(clean_file, 'w').close()
open(filtered_file, 'w').close()
start_time = time.time()
with ThreadPoolExecutor(max_workers=750) as executor:
# Pass clean and filtered file names as arguments to the processing function
for domain in domain_list:
executor.submit(process_domain, domain, clean_file, filtered_file)
end_time = time.time()
logger.info(f"Processing completed in {end_time - start_time:.2f} seconds.")
# Example usage
if __name__ == "__main__":
with open('domains.lst') as f:
domains = [line.strip() for line in f.readlines()]
run_script(domains)

View File

@ -0,0 +1,58 @@
import re
import codecs
# Define the new fraud-related keywords (new tags)
NEW_TAGS = ['drug', 'narcotic', 'buy drugs', 'купить наркотики', 'метамфетамин', 'weed', 'xanax',
'xanaks', 'anasha', 'амфетамин', 'кокаин', 'метадон', 'mefedron', 'крокодил',
'amfetamin', 'cocaine', 'каннабис', 'мариухана', 'марихуана', 'ecstasy', 'blacksprut'
'casino', 'gamble', 'казино', 'игры на деньги', 'покер', 'blackjack',
'roulette', 'slots', 'jackpot', 'winbig', 'vulkan', 'адмирал', 'лотерея',
'poker', 'sloty', 'рулетка', 'джекпот', 'ставка', 'слоты', 'бонусы', 'игровые автоматы', 'крутить'
'nginx', 'apache', 'site for sale', 'сайт продается', 'this domain is for sale',
'under construction', 'в разработке', 'this website is under construction',
'maintenance mode', 'технические работы', 'страница недоступна', 'coming soon', 'Купить этот домен.'
'купить домен', 'купить этот домен', 'продам домен', 'domain for sale', 'Купить этот домен', 'Содержимое появится позже.'
'domain is for sale', 'domain available', 'продажа домена', 'свободный домен', 'Site is created successfully!' ,
'this site is for sale', 'временно недоступен', 'out of service', 'Web server is returning an unknown error'
'этот домен продается', 'домен выставлен на продажу', 'service unavailable', 'Website blankdomain.com is ready. The content is to be added' ,
'503 service unavailable', 'закрыт на реконструкцию', 'сайт на реконструкции', 'Домен не прилинкован к директории на сервере'
'domain expired', 'домен истек', 'сайт временно не работает', 'default page', 'Срок регистрации домена истек'
'доступ к счету', 'инвестируй', 'зарабатывай' ,
'вход в аккаунт', 'требуется подтверждение', 'подтвердите данные',
'биллинг', 'банковский аккаунт', 'Присоединяйтесь к проекту', 'Зарабатывайте'
'escort', 'проститутки', 'striptiz', 'массаж', 'massaj', 'интим услуги', 'девушки по вызову', 'Порно с детьми', 'Детское порно'
'путана', 'проститутка', 'секс услуги', 'adult dating', 'Порно с животными'
'эскорт', 'эротический массаж', 'Animal Porn', 'Zoo Porn', 'Child Porn', 'Snuff', 'Dead Porn'
'fraud', 'подделка документов', 'russianbrides', 'русские невесты']
# Initialize lists for reinstated domains and reports
reinstated_domains = []
reinstated_reports = []
# Regex pattern to extract domains and old tags from the log
domain_pattern = re.compile(r"Domain reported: (\S+) contains suspicious content\. Keywords: ([\w\s,]+)")
# Read the domain_analysis.log file and process each suspicious domain
with codecs.open('domain_analysis.log', 'r', encoding='utf-8') as log_file:
for line in log_file:
match = domain_pattern.search(line)
if match:
domain = match.group(1)
old_tags = match.group(2).split(', ') # Old tags found in the log entry
# Check if none of the old tags are in the new tags list
if not any(tag in NEW_TAGS for tag in old_tags):
reinstated_domains.append(domain)
# Prepare the report for this domain
reinstated_reports.append(f"Domain: {domain}\nOld Tags: {', '.join(old_tags)}\nReason: None of the old tags matched the new tags.\n")
# Write reinstated domains to domain_reinstated.lst with UTF-8 encoding
with codecs.open('domain_reinstated.lst', 'w', encoding='utf-8') as f:
f.write('\n'.join(reinstated_domains))
# Write the reinstated domain report to domain_reinstated_report.txt
with codecs.open('domain_reinstated_report.txt', 'w', encoding='utf-8') as report_file:
report_file.write('\n'.join(reinstated_reports))
# Output the summary of reinstated domains
print(f"Processed log file. Reinstated domains: {len(reinstated_domains)}")

View File

@ -0,0 +1,342 @@
import socket
import geoip2.database
import logging
import concurrent.futures
import threading
import gc
import time # For introducing delay
import requests # For making API calls to get ASN details
import ipaddress
from idna import encode as idna_encode
from queue import Queue
# Path to the GeoLite2 ASN database (replace with the path to your downloaded GeoLite2-ASN.mmdb)
GEOIP_DB_PATH = "GeoLite2-ASN.mmdb"
# Initialize the GeoIP2 reader
reader = geoip2.database.Reader(GEOIP_DB_PATH)
# Set up logging
logging.basicConfig(level=logging.DEBUG, # Set the lowest level to capture all logs
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("general.log", mode='a'),
logging.StreamHandler() # This will print logs to console as well
])
# Additional error logging handler
error_logger = logging.getLogger("error")
error_handler = logging.FileHandler("error.log", mode='a')
error_handler.setLevel(logging.ERROR)
error_logger.addHandler(error_handler)
# Lock for writing to the output file in a thread-safe way
file_write_lock = threading.Lock()
# Queue to hold results for batch writing
results_queue = Queue()
# Trusted ASNs: Companies that operate their own ASNs for core services
TRUSTED_ASNS = {
15169, # Google
32934, # Facebook (Meta)
714, # Apple
8075, # Microsoft
2906, # Netflix
20940, # Akamai
394161, # Tesla
13414, # Twitter
19679, # Dropbox
14492 # LinkedIn
}
# Hosting ASNs: Cloud hosting and CDN providers
HOSTING_ASNS = {
13335, # Cloudflare
54113, # Fastly
16509, # Amazon Web Services (AWS)
15169, # Google Cloud
8075, # Microsoft Azure
14061, # DigitalOcean
20473, # Vultr
63949, # Linode
16276, # OVH
20940, # Akamai
24940, # Hetzner
19994, # Rackspace
37963, # Alibaba Cloud
35908, # IBM Cloud
31898, # Oracle Cloud
55293, # Kinsta
46606, # HostGator
26347, # DreamHost
26496, # GoDaddy
46606 # Bluehost
}
# Main company domains for Trusted ASNs
COMPANY_DOMAINS = {
'google.com': [15169], # Google
'youtube.com': [15169], # Google (YouTube)
'facebook.com': [32934], # Facebook (Meta)
'instagram.com': [32934], # Facebook (Meta)
'whatsapp.com': [32934], # Facebook (Meta, WhatsApp)
'apple.com': [714], # Apple
'icloud.com': [714], # Apple iCloud
'appleid.apple.com': [714], # Apple ID
'microsoft.com': [8075], # Microsoft
'windows.com': [8075], # Microsoft
'live.com': [8075], # Microsoft
'office.com': [8075], # Microsoft Office
'onedrive.com': [8075], # Microsoft OneDrive
'linkedin.com': [14492], # LinkedIn (Microsoft)
'netflix.com': [2906], # Netflix
'netflixcdn.net': [2906], # Netflix CDN
'akamai.com': [20940], # Akamai
'akamaihd.net': [20940], # Akamai CDN
'twitter.com': [13414], # Twitter
'x.com': [13414], # Twitter
'dropbox.com': [19679], # Dropbox
'tesla.com': [394161] # Tesla
}
# Function to resolve a domain with retries and punycode support
def resolve_domain(domain, max_retries=2):
ip_set = set()
# Convert to punycode if necessary
try:
domain = idna_encode(domain).decode('utf-8')
except Exception as e:
error_logger.error(f"Punycode conversion failed for domain {domain}: {e}")
return []
for _ in range(max_retries):
try:
ip_list = socket.gethostbyname_ex(domain)[2]
ip_set.update(ip_list)
logging.info(f"Resolved {domain} to IPs: {ip_list}")
except socket.gaierror as e:
error_logger.error(f"Could not resolve domain {domain}: {e}")
return list(ip_set)
# Function to get all CIDRs for a given ASN using the BGPView API with a fallback mechanism
def get_all_cidrs_for_asn(asn):
cidrs = get_all_cidrs_from_bgpview(asn)
if not cidrs:
cidrs = get_all_cidrs_from_ripe(asn)
if not cidrs:
cidrs = get_all_cidrs_from_ipinfo(asn)
return cidrs
# Function to get all CIDRs for a given ASN using the BGPView API
def get_all_cidrs_from_bgpview(asn):
try:
url = f"https://api.bgpview.io/asn/{asn}/prefixes"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
ipv4_prefixes = [prefix['prefix'] for prefix in data['data']['ipv4_prefixes']]
return ipv4_prefixes
elif response.status_code == 429:
error_logger.error(f"Rate limit exceeded, waiting before retrying for ASN {asn}")
time.sleep(60)
return get_all_cidrs_for_asn(asn) # Retry
elif response.status_code == 403:
error_logger.error(f"Access forbidden for ASN {asn}, status code 403.")
else:
error_logger.error(f"Failed to get CIDRs for ASN {asn} from BGPView, status code: {response.status_code}")
return []
except Exception as e:
error_logger.error(f"Error retrieving CIDRs from BGPView for ASN {asn}: {e}")
return []
# Function to get all CIDRs for a given ASN using the RIPEstat API
def get_all_cidrs_from_ripe(asn):
try:
url = f"https://stat.ripe.net/data/announced-prefixes/data.json?resource={asn}"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
ipv4_prefixes = [prefix['prefix'] for prefix in data['data']['prefixes'] if ':' not in prefix['prefix']]
logging.info(f"Retrieved CIDRs for ASN {asn} from RIPEstat: {ipv4_prefixes}")
return ipv4_prefixes
else:
error_logger.error(f"Failed to get CIDRs for ASN {asn} from RIPEstat, status code: {response.status_code}")
return []
except Exception as e:
error_logger.error(f"Error retrieving CIDRs from RIPEstat for ASN {asn}: {e}")
return []
# Function to get all CIDRs for a given ASN using the IPinfo API
def get_all_cidrs_from_ipinfo(asn):
try:
url = f"https://ipinfo.io/{asn}"
headers = {
'Authorization': 'Bearer fe4b08eb6076c5' # Replace with your actual IPinfo API key
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
ipv4_prefixes = data.get('prefixes', [])
logging.info(f"Retrieved CIDRs for ASN {asn} from IPinfo: {ipv4_prefixes}")
return ipv4_prefixes
else:
error_logger.error(f"Failed to get CIDRs for ASN {asn} from IPinfo, status code: {response.status_code}")
return []
except Exception as e:
error_logger.error(f"Error retrieving CIDRs from IPinfo for ASN {asn}: {e}")
return []
# Function to get CIDR block for an IP address using GeoIP2
def get_cidr_for_ip(ip):
try:
response = reader.asn(ip)
asn = response.autonomous_system_number
network = response.network
logging.info(f"IP {ip} mapped to ASN {asn}, CIDR: {network}")
return asn, str(network)
except Exception as e:
error_logger.error(f"Error retrieving CIDR for IP {ip}: {e}")
return None, None
# Function to check if IP is already covered by an existing CIDR
def is_ip_in_existing_cidr(ip, cidrs):
try:
ip_obj = ipaddress.ip_address(ip)
for cidr in cidrs:
if ip_obj in ipaddress.ip_network(cidr, strict=False):
return True
except ValueError as e:
error_logger.error(f"Invalid IP or CIDR: {ip} - {cidr}: {e}")
return False
# Function to summarize IPs into subnets with a max /28 size and write to summarized_ip.lst
def summarize_ips(ips, summarized_filename="summarized_ip.lst"):
try:
ips = sorted(ips, key=ipaddress.ip_address)
networks = ipaddress.collapse_addresses([ipaddress.ip_network(f"{ip}/32") for ip in ips])
summarized_networks = []
for network in networks:
if network.prefixlen < 28:
for subnet in network.subnets(new_prefix=28):
summarized_networks.append(subnet)
else:
summarized_networks.append(network)
# Write summarized networks to the summarized_ip.lst file
with open(summarized_filename, 'a', encoding='utf-8') as f:
for network in summarized_networks:
f.write(f"{network}\n")
return summarized_networks
except Exception as e:
error_logger.error(f"Error summarizing IPs: {e}")
return []
# Function to get all CIDRs for a domain by resolving its IP addresses and querying GeoLite2
def process_domain(domain, existing_cidrs):
try:
cidrs = set()
ip_addresses = resolve_domain(domain) # Resolve domain to its IP addresses
hosting_ips = []
for ip in ip_addresses:
asn, cidr = get_cidr_for_ip(ip) # Get ASN and CIDR for each IP
if asn in TRUSTED_ASNS and is_trusted_domain(domain, asn):
# Use CIDR only if the domain is a main or trusted domain
if not is_ip_in_existing_cidr(ip, existing_cidrs):
all_cidrs = get_all_cidrs_for_asn(asn)
cidrs.update(all_cidrs)
elif asn not in TRUSTED_ASNS or not is_trusted_domain(domain, asn):
# If not a trusted company domain, just add /32 addresses
cidrs.add(f"{ip}/32")
elif asn in HOSTING_ASNS:
hosting_ips.append(ip)
# If there are close-range hosting IPs, summarize them into /28 max
if hosting_ips:
summarized_cidrs = summarize_ips(hosting_ips)
cidrs.update(str(cidr) for cidr in summarized_cidrs)
return cidrs
except Exception as e:
error_logger.error(f"Error processing domain {domain}: {e}")
return set()
# Function to read domains from domains.lst file
def read_domains_from_file(file_path="domains.lst"):
try:
with open(file_path, 'r', encoding='utf-8') as f:
domains = [line.strip() for line in f.readlines() if line.strip()]
logging.info(f"Read {len(domains)} domains from file.")
return domains
except FileNotFoundError as e:
error_logger.error(f"File not found: {file_path}, {e}")
return []
# Function to write CIDRs in batches to output file in a thread-safe way
def write_cidrs_to_file(filename="ip.lst"):
while True:
cidrs = results_queue.get() # Fetch CIDRs from the queue
if cidrs is None: # Sentinel value to stop the thread
break
with file_write_lock:
with open(filename, 'a', encoding='utf-8') as f:
for cidr in cidrs:
f.write(f"{cidr}\n")
logging.info(f"Written {len(cidrs)} CIDRs to {filename}")
results_queue.task_done()
# Multithreading to handle large domain lists efficiently
def main():
# Enable garbage collection
gc.enable()
# Read the domains from domains.lst file
domains = read_domains_from_file("domains.lst")
if not domains:
logging.info("No domains to process.")
return
existing_cidrs = set() # Keep track of all CIDRs to exclude matching IPs
# Start the file writer thread
writer_thread = threading.Thread(target=write_cidrs_to_file, args=("ip.lst",))
writer_thread.start()
# Use ThreadPoolExecutor to use more threads (set to 16 threads for better utilization)
with concurrent.futures.ThreadPoolExecutor(max_workers=35) as executor:
future_to_domain = {executor.submit(process_domain, domain, existing_cidrs): domain for domain in domains}
for future in concurrent.futures.as_completed(future_to_domain):
domain = future_to_domain[future]
try:
domain_cidrs = future.result()
if domain_cidrs:
existing_cidrs.update(domain_cidrs) # Add new CIDRs to the existing set
logging.info(f"CIDRs found for {domain}: {domain_cidrs}")
results_queue.put(list(domain_cidrs)) # Send the results to the writer queue
except Exception as e:
error_logger.error(f"Error with domain {domain}: {e}")
finally:
# Collect garbage after each domain processing to free memory
gc.collect()
# Stop the writer thread by sending a sentinel value
results_queue.put(None)
writer_thread.join()
if __name__ == "__main__":
main()

3
src/step 5 ooni list/.idea/.gitignore generated vendored Normal file
View File

@ -0,0 +1,3 @@
# Default ignored files
/shelf/
/workspace.xml

1
src/step 5 ooni list/.idea/.name generated Normal file
View File

@ -0,0 +1 @@
ooni_list.py

View File

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

7
src/step 5 ooni list/.idea/misc.xml generated Normal file
View File

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.9" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9" project-jdk-type="Python SDK" />
</project>

8
src/step 5 ooni list/.idea/modules.xml generated Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/step 5 ooni list.iml" filepath="$PROJECT_DIR$/.idea/step 5 ooni list.iml" />
</modules>
</component>
</project>

View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.9" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,26 @@
import json
# Input file
input_file = "discord_all_ips.json"
# Read the JSON content from the file
try:
with open(input_file, "r") as file:
data = json.load(file)
except json.JSONDecodeError as e:
print(f"Failed to decode JSON: {e}")
exit()
# Open the output file
with open("discord_ips.lst", "w") as output_file:
# Loop through the regions in the dictionary
for region, entries in data.items():
# Check if the value associated with the region is a list of dictionaries
if isinstance(entries, list):
for entry in entries:
# Get the IP address and format it as /32
ip = entry.get("ip")
if ip:
output_file.write(f"{ip}/32\n")
print("IP addresses have been written to discord_ips.lst")

View File

@ -0,0 +1,58 @@
import logging
from idna import encode as idna_encode
# Set up logging
logging.basicConfig(level=logging.DEBUG, # Set the lowest level to capture all logs
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("domain_processing.log", mode='a'),
logging.StreamHandler() # This will print logs to console as well
])
# Function to read domains from a file
def read_domains_from_file(file_path):
try:
with open(file_path, 'r', encoding='utf-8') as f:
domains = [line.strip() for line in f.readlines() if line.strip()]
logging.info(f"Read {len(domains)} domains from {file_path}.")
return domains
except FileNotFoundError as e:
logging.error(f"File not found: {file_path}, {e}")
return []
# Function to convert domains to punycode
def convert_to_punycode(domains):
punycode_domains = set()
for domain in domains:
try:
punycode_domain = idna_encode(domain).decode('utf-8')
punycode_domains.add(punycode_domain)
except Exception as e:
logging.error(f"Punycode conversion failed for domain {domain}: {e}")
return punycode_domains
# Main function to process domain files and create the output file
def main():
# Read domains from the three files
domains1 = read_domains_from_file("sum/input/domains.lst")
domains2 = read_domains_from_file("sum/input/ooni_domains.lst")
domains3 = read_domains_from_file("sum/input/community.lst")
# Combine all domains
all_domains = set(domains1 + domains2 + domains3)
# Convert to punycode and remove duplicates
unique_domains = convert_to_punycode(all_domains)
# Write the unique domains to the output file
output_file = "sum/output/domains_all.lst"
try:
with open(output_file, 'w', encoding='utf-8') as f:
for domain in sorted(unique_domains):
f.write(f"{domain}\n")
logging.info(f"Written {len(unique_domains)} unique domains to {output_file}.")
except Exception as e:
logging.error(f"Error writing to file {output_file}: {e}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,44 @@
import logging
# Set up logging
logging.basicConfig(level=logging.DEBUG, # Set the lowest level to capture all logs
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("ip_processing.log", mode='a'),
logging.StreamHandler() # This will print logs to console as well
])
# Function to read IPs from a file
def read_ips_from_file(file_path):
try:
with open(file_path, 'r', encoding='utf-8') as f:
ips = [line.strip() for line in f.readlines() if line.strip()]
logging.info(f"Read {len(ips)} IPs from {file_path}.")
return ips
except FileNotFoundError as e:
logging.error(f"File not found: {file_path}, {e}")
return []
# Main function to process IP files and create the output file
def main():
# Read IPs from the three files
ips1 = read_ips_from_file("input/ip.lst")
ips2 = read_ips_from_file("input/ip_ooni.lst")
ips3 = read_ips_from_file("input/ip_community.lst")
ips4 = read_ips_from_file("input/discord_ips.lst")
# Combine all IPs and remove duplicates
unique_ips = set(ips1 + ips2 + ips3 + ips4)
# Write the unique IPs to the output file
output_file = "ips_all.lst"
try:
with open(output_file, 'w', encoding='utf-8') as f:
for ip in sorted(unique_ips):
f.write(f"{ip}\n")
logging.info(f"Written {len(unique_ips)} unique IPs to {output_file}.")
except Exception as e:
logging.error(f"Error writing to file {output_file}: {e}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,182 @@
import socket
import geoip2.database
import logging
import requests
import ipaddress
import time
from collections import defaultdict
from idna import encode as idna_encode
# Paths to input files
IP_LST_PATH = "input/ips_all.lst"
DOMAINS_LST_PATH = "input/domains_all.lst"
OUTPUT_FILE = "output/ipsum.lst"
# Path to the GeoLite2 ASN database
GEOIP_DB_PATH = "GeoLite2-ASN.mmdb"
# Initialize the GeoIP2 reader
reader = geoip2.database.Reader(GEOIP_DB_PATH)
# Set up logging
logging.basicConfig(level=logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("summary.log", mode='a'),
logging.StreamHandler()
])
# Trusted ASNs for company domains
COMPANY_DOMAINS = {
'google.com': [15169],
'youtube.com': [15169],
'ggpht.com': [15169],
'facebook.com': [32934],
'instagram.com': [32934],
'whatsapp.com': [32934],
'microsoft.com': [8075],
'linkedin.com': [14492],
'netflix.com': [2906],
'akamai.com': [20940],
'twitter.com': [13414],
'x.com': [13414],
'dropbox.com': [19679],
'tesla.com': [394161]
}
# Local IP CIDRs to exclude
LOCAL_IP_CIDRS = [
ipaddress.ip_network("127.0.0.0/8"),
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.168.0.0/16"),
ipaddress.ip_network("169.254.0.0/16"),
ipaddress.ip_network("::1/128"),
ipaddress.ip_network("fc00::/7"),
ipaddress.ip_network("fe80::/10")
]
# Function to summarize IPs into /28 subnets at most
def summarize_ips(ips):
try:
# Remove duplicates and sort IPs, treating them as networks (e.g., x.x.x.x/32)
networks = [ipaddress.ip_network(ip, strict=False) for ip in set(ips)]
collapsed_networks = ipaddress.collapse_addresses(networks)
summarized_networks = []
for network in collapsed_networks:
if network.prefixlen < 28: # If network is bigger than /28, split into /28
for subnet in network.subnets(new_prefix=28):
summarized_networks.append(subnet)
else:
summarized_networks.append(network)
logging.info(f"Summarized networks: {summarized_networks}")
return summarized_networks
except ValueError as e:
logging.error(f"Error summarizing IPs: {e}")
return []
# Function to handle rate-limiting errors (429) and retry after waiting
def handle_rate_limit():
wait_time = 60 # Wait time of 60 seconds
logging.warning(f"Rate limit hit. Waiting for {wait_time} seconds.")
time.sleep(wait_time)
# Function to get CIDRs for a domain from ASN using GeoLite2
def get_cidr_for_asn(asn):
try:
url = f"https://api.bgpview.io/asn/{asn}/prefixes"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
return [prefix['prefix'] for prefix in data['data']['ipv4_prefixes']]
elif response.status_code == 429:
handle_rate_limit()
return get_cidr_for_asn(asn) # Retry after waiting
elif response.status_code == 403:
logging.error(f"Access forbidden for ASN {asn}, skipping.")
return []
return []
except Exception as e:
logging.error(f"Error retrieving CIDRs for ASN {asn}: {e}")
return []
# Function to resolve a domain with retries and punycode support
def resolve_domain(domain):
try:
domain_punycode = idna_encode(domain).decode('utf-8')
return socket.gethostbyname_ex(domain_punycode)[2]
except Exception as e:
logging.error(f"Could not resolve domain {domain}: {e}")
return []
# Function to check if a domain matches COMPANY_DOMAINS and fetch CIDRs
def process_domain_for_asn(domain):
asns = COMPANY_DOMAINS.get(domain, [])
cidrs = set()
if asns:
for asn in asns:
cidrs.update(get_cidr_for_asn(asn))
return cidrs
# Function to read IPs from ip.lst
def read_ips_from_file(file_path):
try:
with open(file_path, 'r') as f:
return [line.strip() for line in f.readlines() if line.strip()]
except FileNotFoundError:
logging.error(f"File not found: {file_path}")
return []
# Function to check if an IP is local
def is_local_ip(ip):
try:
ip_obj = ipaddress.ip_network(ip, strict=False)
for cidr in LOCAL_IP_CIDRS:
if ip_obj.version == cidr.version and ip_obj.subnet_of(cidr):
return True
except ValueError as e:
logging.error(f"Invalid IP or CIDR: {ip}: {e}")
return False
# Function to write summarized CIDRs to ipsum.lst
def write_summarized_ips(ips, filename):
try:
with open(filename, 'w') as f:
for cidr in ips:
f.write(f"{cidr}\n")
logging.info(f"Written summarized IPs to {filename}")
except Exception as e:
logging.error(f"Error writing summarized IPs to file: {e}")
# Main function to process ip.lst, summarize, and add CIDRs for company domains
def main():
# Read IPs from ip.lst
ips = read_ips_from_file(IP_LST_PATH)
# Filter out local IPs
ips = [ip for ip in ips if not is_local_ip(ip)]
# Summarize the IPs into /28 networks
summarized_ips = summarize_ips(ips)
# Check domains.lst for COMPANY_DOMAINS matches and get corresponding CIDRs
domains = read_ips_from_file(DOMAINS_LST_PATH)
company_cidrs = set()
for domain in domains:
company_cidrs.update(process_domain_for_asn(domain))
# Combine summarized IPs and company CIDRs
final_cidrs = set(summarized_ips) | company_cidrs
# Write the final output to ipsum.lst
write_summarized_ips(final_cidrs, OUTPUT_FILE)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,89 @@
import requests
import csv
import re
import logging
from datetime import datetime, timedelta
# Set up logging for domain checks
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("ooni_domain_fetch.log", mode='a'),
logging.StreamHandler()])
# Function to normalize domain by removing 'www.' but not subdomains like 'subdomain.domain.com'
def normalize_domain(domain):
return domain.lstrip('www.') if domain.startswith('www.') else domain
# Function to fetch and process OONI domains with logging and anomaly checks
def fetch_and_process_ooni_domains(output_file):
try:
# Calculate the date range for the last 7 days
today = datetime.now()
until_date = today.strftime('%Y-%m-%d')
since_date = (today - timedelta(days=7)).strftime('%Y-%m-%d')
# Construct the URL for downloading the CSV file using the OONI API
base_url = "https://api.ooni.io/api/v1/aggregation"
params = {
"axis_y": "domain",
"axis_x": "measurement_start_day",
"probe_cc": "RU", # Replace 'RU' with the country code you're interested in
"since": since_date,
"until": until_date,
"test_name": "web_connectivity",
"time_grain": "day",
"format": "CSV"
}
url = f"{base_url}?{'&'.join([f'{k}={v}' for k, v in params.items()])}"
# Fetch the CSV data from OONI
response = requests.get(url)
if response.status_code != 200:
logging.error(f"Failed to download data from OONI API, status code: {response.status_code}")
return
# Process the CSV data
domains = set()
csv_data = response.content.decode('utf-8').splitlines()
csv_reader = csv.DictReader(csv_data)
pattern = r'^.*\.{2,}.*$' # Pattern to match incorrect domains
for row in csv_reader:
domain = row['domain'].strip()
anomaly_count = int(row['anomaly_count'])
ok_count = int(row['ok_count'])
# Log domain processing details
logging.info(f"Checking domain: {domain} | Anomalies: {anomaly_count}, OK: {ok_count}, Anomaly Rate: {anomaly_count / (anomaly_count + ok_count) if (anomaly_count + ok_count) > 0 else 0:.2f}")
# Filter out incorrect domains
if re.match(pattern, domain):
logging.info(f"Domain has incorrect format: {domain}")
continue
# Log and process based on anomaly vs OK count
if anomaly_count > ok_count:
normalized_domain = normalize_domain(domain)
if normalized_domain not in domains:
domains.add(normalized_domain)
logging.info(f"Anomaly rate is high for the domain: {normalized_domain} - Adding to the list")
else:
logging.info(f"Site is accessible in Russia: {domain}")
# Write the domains to the output file
with open(output_file, 'w') as output:
for domain in sorted(domains): # Optionally sort the domains
output.write(f"{domain}\n")
print(f"Total unique domains written to {output_file}: {len(domains)}")
except Exception as e:
logging.error(f"Error occurred during fetching or processing: {e}")
# Replace with your output file path
output_file = 'ooni/ooni_domains.lst'
# Fetch and process OONI domains, and output to the specified file
fetch_and_process_ooni_domains(output_file)

View File

@ -0,0 +1,137 @@
import socket
import logging
import concurrent.futures
import threading
import gc
import time # For introducing delay
import requests # For making API calls to get ASN details
import ipaddress
from idna import encode as idna_encode
from queue import Queue
# Set up logging
logging.basicConfig(level=logging.DEBUG, # Set the lowest level to capture all logs
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("general_comm.log", mode='a'),
logging.StreamHandler() # This will print logs to console as well
])
# Additional error logging handler
error_logger = logging.getLogger("error")
error_handler = logging.FileHandler("error_comm.log", mode='a')
error_handler.setLevel(logging.ERROR)
error_logger.addHandler(error_handler)
# Lock for writing to the output file in a thread-safe way
file_write_lock = threading.Lock()
# Queue to hold results for batch writing
results_queue = Queue()
# Function to resolve a domain with retries and punycode support
def resolve_domain(domain, max_retries=2):
ip_set = set()
# Convert to punycode if necessary
try:
domain = idna_encode(domain).decode('utf-8')
except Exception as e:
error_logger.error(f"Punycode conversion failed for domain {domain}: {e}")
return []
for _ in range(max_retries):
try:
ip_list = socket.gethostbyname_ex(domain)[2]
ip_set.update(ip_list)
logging.info(f"Resolved {domain} to IPs: {ip_list}")
except socket.gaierror as e:
error_logger.error(f"Could not resolve domain {domain}: {e}")
return list(ip_set)
# Function to check if IP is already covered by an existing CIDR
def is_ip_in_existing_cidr(ip, cidrs):
try:
ip_obj = ipaddress.ip_address(ip)
for cidr in cidrs:
if ip_obj in ipaddress.ip_network(cidr, strict=False):
return True
except ValueError as e:
error_logger.error(f"Invalid IP or CIDR: {ip} - {cidr}: {e}")
return False
# Function to get all CIDRs for a domain by resolving its IP addresses
def process_domain(domain, existing_cidrs):
try:
cidrs = set()
ip_addresses = resolve_domain(domain) # Resolve domain to its IP addresses
for ip in ip_addresses:
if not is_ip_in_existing_cidr(ip, existing_cidrs):
cidrs.add(f"{ip}/32")
return cidrs
except Exception as e:
error_logger.error(f"Error processing domain {domain}: {e}")
return set()
# Function to read domains from domains.lst file
def read_domains_from_file(file_path="community.lst"):
try:
with open(file_path, 'r', encoding='utf-8') as f:
domains = [line.strip() for line in f.readlines() if line.strip()]
logging.info(f"Read {len(domains)} domains from file.")
return domains
except FileNotFoundError as e:
error_logger.error(f"File not found: {file_path}, {e}")
return []
# Function to write CIDRs in batches to output file in a thread-safe way
def write_cidrs_to_file(filename="ip_community.lst"):
while True:
cidrs = results_queue.get() # Fetch CIDRs from the queue
if cidrs is None: # Sentinel value to stop the thread
break
with file_write_lock:
with open(filename, 'a', encoding='utf-8') as f:
for cidr in cidrs:
f.write(f"{cidr}\n")
logging.info(f"Written {len(cidrs)} CIDRs to {filename}")
results_queue.task_done()
# Multithreading to handle large domain lists efficiently
def main():
# Enable garbage collection
gc.enable()
# Read the domains from domains.lst file
domains = read_domains_from_file("community.lst")
if not domains:
logging.info("No domains to process.")
return
existing_cidrs = set() # Keep track of all CIDRs to exclude matching IPs
# Start the file writer thread
writer_thread = threading.Thread(target=write_cidrs_to_file, args=("ip_community.lst",))
writer_thread.start()
# Use ThreadPoolExecutor to use more threads (set to 16 threads for better utilization)
with concurrent.futures.ThreadPoolExecutor(max_workers=35) as executor:
future_to_domain = {executor.submit(process_domain, domain, existing_cidrs): domain for domain in domains}
for future in concurrent.futures.as_completed(future_to_domain):
domain = future_to_domain[future]
try:
domain_cidrs = future.result()
if domain_cidrs:
results_queue.put(domain_cidrs)
except Exception as e:
error_logger.error(f"Error with domain {domain}: {e}")
finally:
# Collect garbage after each domain processing to free memory
gc.collect()
# Stop the writer thread by sending a sentinel value
results_queue.put(None)
writer_thread.join()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,138 @@
import socket
import logging
import concurrent.futures
import threading
import gc
import time # For introducing delay
import requests # For making API calls to get ASN details
import ipaddress
from idna import encode as idna_encode
from queue import Queue
# Set up logging
logging.basicConfig(level=logging.DEBUG, # Set the lowest level to capture all logs
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("general_ooni.log", mode='a'),
logging.StreamHandler() # This will print logs to console as well
])
# Additional error logging handler
error_logger = logging.getLogger("error")
error_handler = logging.FileHandler("error_ooni.log", mode='a')
error_handler.setLevel(logging.ERROR)
error_logger.addHandler(error_handler)
# Lock for writing to the output file in a thread-safe way
file_write_lock = threading.Lock()
# Queue to hold results for batch writing
results_queue = Queue()
# Function to resolve a domain with retries and punycode support
def resolve_domain(domain, max_retries=2):
ip_set = set()
# Convert to punycode if necessary
try:
domain = idna_encode(domain).decode('utf-8')
except Exception as e:
error_logger.error(f"Punycode conversion failed for domain {domain}: {e}")
return []
for _ in range(max_retries):
try:
ip_list = socket.gethostbyname_ex(domain)[2]
ip_set.update(ip_list)
logging.info(f"Resolved {domain} to IPs: {ip_list}")
except socket.gaierror as e:
error_logger.error(f"Could not resolve domain {domain}: {e}")
return list(ip_set)
# Function to check if IP is already covered by an existing CIDR
def is_ip_in_existing_cidr(ip, cidrs):
try:
ip_obj = ipaddress.ip_address(ip)
for cidr in cidrs:
if ip_obj in ipaddress.ip_network(cidr, strict=False):
return True
except ValueError as e:
error_logger.error(f"Invalid IP or CIDR: {ip} - {cidr}: {e}")
return False
# Function to get all CIDRs for a domain by resolving its IP addresses
def process_domain(domain, existing_cidrs):
try:
cidrs = set()
ip_addresses = resolve_domain(domain) # Resolve domain to its IP addresses
for ip in ip_addresses:
if not is_ip_in_existing_cidr(ip, existing_cidrs):
cidrs.add(f"{ip}/32")
return cidrs
except Exception as e:
error_logger.error(f"Error processing domain {domain}: {e}")
return set()
# Function to read domains from domains.lst file
def read_domains_from_file(file_path="ooni_domains.lst"):
try:
with open(file_path, 'r', encoding='utf-8') as f:
domains = [line.strip() for line in f.readlines() if line.strip()]
logging.info(f"Read {len(domains)} domains from file.")
return domains
except FileNotFoundError as e:
error_logger.error(f"File not found: {file_path}, {e}")
return []
# Function to write CIDRs in batches to output file in a thread-safe way
def write_cidrs_to_file(filename="ip.lst"):
while True:
cidrs = results_queue.get() # Fetch CIDRs from the queue
if cidrs is None: # Sentinel value to stop the thread
break
with file_write_lock:
with open(filename, 'a', encoding='utf-8') as f:
for cidr in cidrs:
f.write(f"{cidr}\n")
logging.info(f"Written {len(cidrs)} CIDRs to {filename}")
results_queue.task_done()
# Multithreading to handle large domain lists efficiently
def main():
# Enable garbage collection
gc.enable()
# Read the domains from domains.lst file
domains = read_domains_from_file("ooni_domains.lst")
if not domains:
logging.info("No domains to process.")
return
existing_cidrs = set() # Keep track of all CIDRs to exclude matching IPs
# Start the file writer thread
writer_thread = threading.Thread(target=write_cidrs_to_file, args=("ip_ooni.lst",))
writer_thread.start()
# Use ThreadPoolExecutor to use more threads (set to 16 threads for better utilization)
with concurrent.futures.ThreadPoolExecutor(max_workers=35) as executor:
future_to_domain = {executor.submit(process_domain, domain, existing_cidrs): domain for domain in domains}
for future in concurrent.futures.as_completed(future_to_domain):
domain = future_to_domain[future]
try:
domain_cidrs = future.result()
if domain_cidrs:
results_queue.put(domain_cidrs)
except Exception as e:
error_logger.error(f"Error with domain {domain}: {e}")
finally:
# Collect garbage after each domain processing to free memory
gc.collect()
# Stop the writer thread by sending a sentinel value
results_queue.put(None)
writer_thread.join()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,166 @@
import socket
import geoip2.database
import logging
import requests
import ipaddress
import time
from collections import defaultdict
from idna import encode as idna_encode
# Paths to input files
IP_LST_PATH = "ip.lst"
DOMAINS_LST_PATH = "domains.lst"
OUTPUT_FILE = "ipsum.lst"
# Path to the GeoLite2 ASN database
GEOIP_DB_PATH = "GeoLite2-ASN.mmdb"
# Initialize the GeoIP2 reader
reader = geoip2.database.Reader(GEOIP_DB_PATH)
# Set up logging
logging.basicConfig(level=logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("summary.log", mode='a'),
logging.StreamHandler()
])
# Trusted ASNs for company domains
COMPANY_DOMAINS = {
'google.com': [15169],
'youtube.com': [15169],
'ggpht.com': [15169],
'facebook.com': [32934],
'instagram.com': [32934],
'whatsapp.com': [32934],
'fbcdn.net': [32934],
'microsoft.com': [8075],
'linkedin.com': [14492],
'netflix.com': [2906],
'akamai.com': [20940],
'twitter.com': [13414],
'x.com': [13414],
'dropbox.com': [19679],
'tesla.com': [394161]
}
# Function to summarize IPs into /28 subnets at most
def summarize_ips(ips):
try:
# Remove duplicates and sort IPs, treating them as networks (e.g., x.x.x.x/32)
networks = [ipaddress.ip_network(ip, strict=False) for ip in set(ips)]
collapsed_networks = ipaddress.collapse_addresses(networks)
summarized_networks = []
for network in collapsed_networks:
if network.prefixlen < 28: # If network is bigger than /28, split into /28
for subnet in network.subnets(new_prefix=28):
summarized_networks.append(subnet)
else:
summarized_networks.append(network)
logging.info(f"Summarized networks: {summarized_networks}")
return summarized_networks
except ValueError as e:
logging.error(f"Error summarizing IPs: {e}")
return []
# Function to handle rate-limiting errors (429) and retry after waiting
def handle_rate_limit():
wait_time = 60 # Wait time of 60 seconds
logging.warning(f"Rate limit hit. Waiting for {wait_time} seconds.")
time.sleep(wait_time)
# Function to get CIDRs for a domain from ASN using GeoLite2
def get_cidr_for_asn(asn):
try:
url = f"https://api.bgpview.io/asn/{asn}/prefixes"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
return [prefix['prefix'] for prefix in data['data']['ipv4_prefixes']]
elif response.status_code == 429:
handle_rate_limit()
return get_cidr_for_asn(asn) # Retry after waiting
elif response.status_code == 403:
logging.error(f"Access forbidden for ASN {asn}, skipping.")
return []
return []
except Exception as e:
logging.error(f"Error retrieving CIDRs for ASN {asn}: {e}")
return []
# Function to resolve a domain with retries and punycode support
def resolve_domain(domain):
try:
domain_punycode = idna_encode(domain).decode('utf-8')
return socket.gethostbyname_ex(domain_punycode)[2]
except Exception as e:
logging.error(f"Could not resolve domain {domain}: {e}")
return []
# Function to check if a domain matches COMPANY_DOMAINS and fetch CIDRs
def process_domain_for_asn(domain):
asns = COMPANY_DOMAINS.get(domain, [])
cidrs = set()
if asns:
for asn in asns:
cidrs.update(get_cidr_for_asn(asn))
return cidrs
# Function to read IPs from ip.lst
def read_ips_from_file(file_path):
try:
with open(file_path, 'r') as f:
return [line.strip() for line in f.readlines() if line.strip()]
except FileNotFoundError:
logging.error(f"File not found: {file_path}")
return []
# Function to write summarized CIDRs to ipsum.lst
def write_summarized_ips(ips, filename):
try:
with open(filename, 'w') as f:
for cidr in ips:
f.write(f"{cidr}\n")
logging.info(f"Written summarized IPs to {filename}")
except Exception as e:
logging.error(f"Error writing summarized IPs to file: {e}")
# Main function to process ip.lst, summarize, and add CIDRs for company domains
def main():
# Read IPs from ip.lst
ips = read_ips_from_file(IP_LST_PATH)
# Summarize the IPs into /28 networks
summarized_ips = summarize_ips(ips)
# Check domains.lst for COMPANY_DOMAINS matches and get corresponding CIDRs
domains = read_ips_from_file(DOMAINS_LST_PATH)
company_cidrs = set()
for domain in domains:
company_cidrs.update(process_domain_for_asn(domain))
# Combine summarized IPs and company CIDRs
final_cidrs = set(summarized_ips) | company_cidrs
# Write the final output to ipsum.lst
write_summarized_ips(final_cidrs, OUTPUT_FILE)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,33 @@
import csv
FRAUD_KEYWORDS = [
'login', 'signin', 'bank', 'secure', 'verify', 'account', 'billing', 'password', 'invoice',
'casino', 'bet', 'poker', 'blackjack', 'roulette', 'slots', 'winbig', 'jackpot', '1win', 'admiralx', 'escort', 'striptiz', 'massaj' , 'stavki', 'vulkan', 'sloty'
'prostitutki', 'intim', 'kokain', 'xanax', 'xanaks', 'anasha', 'escort', 'pytana', 'prostitutka', 'metadon', 'mefedron', 'krokodil', 'amfetamin', 'drug', 'narcotic', 'meth', 'weed', 'vzyatka', 'bribe', 'russianbrides'
]
# Initialize lists for clean and filtered domains
clean_domains = []
filtered_domains = []
# Read the CSV file
with open('domains.csv', 'r') as csvfile:
reader = csv.DictReader(csvfile)
# Make sure we're using the correct column 'Domain'
for row in reader:
domain = row['Domain'].strip() # Use 'Domain' with a capital D
if any(keyword in domain.lower() for keyword in FRAUD_KEYWORDS):
filtered_domains.append(domain)
else:
clean_domains.append(domain)
# Write the clean domains to domain_clean.lst
with open('domain_clean.lst', 'w') as f:
f.write('\n'.join(clean_domains))
# Write the filtered domains to domain_filtered.lst
with open('domain_filtered.lst', 'w') as f:
f.write('\n'.join(filtered_domains))
print(f"Processed {len(clean_domains) + len(filtered_domains)} domains. Clean: {len(clean_domains)}, Filtered: {len(filtered_domains)}")

View File

@ -0,0 +1,232 @@
import csv
import requests
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import os
import time
import gc # Garbage collection
import idna # IDNA handling for Punycode domains
# Set up logging for both general and error logs
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
# Create file handler for general log
file_handler = logging.FileHandler('availability_check.log')
file_handler.setLevel(logging.INFO)
# Create error log handler
error_handler = logging.FileHandler('errors.log')
error_handler.setLevel(logging.WARNING)
# Create formatter and add it to both handlers
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
error_handler.setFormatter(formatter)
# Add the handlers to the logger
logger.addHandler(file_handler)
logger.addHandler(error_handler)
# Checkpoint file to track the last processed domain
CHECKPOINT_FILE = 'checkpoint.txt'
OUTPUT_FILE = 'validated_domains.csv' # New output file
# Function to handle Punycode and ensure DNS-compatible format
def dns_resolvable(domain):
try:
return idna.encode(domain).decode('utf-8')
except idna.IDNAError as e:
logger.error(f'Error converting {domain} to IDNA: {e}')
return None
# Function to check if the redirect leads to the same domain
def is_same_domain(domain, redirect_url):
try:
redirect_domain = requests.utils.urlparse(redirect_url).hostname
domain_normalized = requests.utils.urlparse(f'https://{domain}').hostname
return redirect_domain == domain_normalized or redirect_domain.startswith('www.' + domain_normalized)
except Exception as e:
logger.error(f'Error comparing domain and redirect: {e}')
return False
# Function to check HTTP/HTTPS availability
def check_http(url, original_domain):
try:
with requests.head(url, timeout=5, allow_redirects=False) as response:
if 200 <= response.status_code < 300: # Check for 2XX status codes
logger.info(f'{url} is available (Status Code: {response.status_code})')
return 'available', response.status_code
elif response.status_code == 301:
location = response.headers.get("Location", None)
if location and is_same_domain(original_domain, location):
logger.info(f'{url} returned a 301 redirect to {location}, same domain, considering valid')
return 'available', response.status_code
elif location:
try:
location = location.encode('latin1').decode('utf-8') # Handle location header decoding manually
logger.warning(f'{url} returned a 301 redirect to {location}')
except UnicodeDecodeError:
logger.warning(f'{url} returned a 301 redirect with non-decodable Location header')
else:
logger.warning(f'{url} returned a 301 redirect with no Location header')
return 'redirect', response.status_code
else:
logger.warning(f'{url} returned error (Status Code: {response.status_code})')
return 'unavailable', response.status_code
except requests.Timeout:
logger.error(f'HTTP request timed out for {url}')
return 'timeout', None
except requests.RequestException as e:
logger.error(f'HTTP request failed for {url}: {e}')
return 'unavailable', None
# Function to process a single domain
def process_domain(domain):
if not domain:
logger.warning('Encountered an empty line in the input file')
return None
logger.info(f'Processing domain: {domain}')
# Ensure domain is DNS-resolvable
dns_domain = dns_resolvable(domain)
if not dns_domain:
return None
try:
# First check HTTPS
https_status, https_code = check_http(f'https://{dns_domain}', domain)
# If HTTPS is not available, check HTTP (exclude redirects 301 from HTTP to HTTPS)
if https_status != 'available':
http_status, http_code = check_http(f'http://{dns_domain}', domain)
if http_status == 'redirect':
logger.info(f'{domain} redirects from HTTP to HTTPS, excluding from available domains.')
return None # Exclude HTTP redirects
# Only consider HTTP status if HTTPS is not available
is_available = http_status == 'available' and http_code is not None and 200 <= http_code < 300
else:
is_available = https_status == 'available'
return {
'domain': domain,
'dns_domain': dns_domain,
'https_status': https_status,
'https_code': https_code,
'is_available': is_available
}
except Exception as e:
logger.error(f'Error processing domain {domain}: {e}')
return None
# Function to append results to CSV files in real time
def append_to_csv_files(report_row, domain=None):
try:
# Append to report.csv
with open('report.csv', mode='a', newline='') as report_file:
report_writer = csv.writer(report_file)
report_writer.writerow(report_row)
# Append to validated_domains.csv if the domain is available
if domain:
with open(OUTPUT_FILE, mode='a', newline='') as domain_file:
domain_writer = csv.writer(domain_file)
domain_writer.writerow([domain])
except Exception as e:
logger.error(f'Error writing to CSV files: {e}')
# Function to save the current checkpoint
def save_checkpoint(domain):
try:
with open(CHECKPOINT_FILE, mode='w') as file:
file.write(domain)
except Exception as e:
logger.error(f'Error saving checkpoint for domain {domain}: {e}')
# Function to get the last checkpoint
def get_last_checkpoint():
try:
if os.path.exists(CHECKPOINT_FILE):
with open(CHECKPOINT_FILE, mode='r') as file:
return file.read().strip()
return None
except Exception as e:
logger.error(f'Error reading checkpoint file: {e}')
return None
# Function to process the domains in batches and generate the report
def process_domains(file_path, batch_size=100):
try:
# Initialize or resume from the checkpoint
last_domain = get_last_checkpoint()
skip = True if last_domain else False
logger.info(f'Starting processing of file: {file_path}')
logger.info(f'Resuming from domain: {last_domain}' if last_domain else 'Starting fresh.')
# Ensure output files exist and have headers
if not os.path.exists('report.csv'):
with open('report.csv', mode='w', newline='') as file:
writer = csv.writer(file)
writer.writerow(['Domain', 'DNS-Resolvable Domain', 'HTTPS Status', 'HTTPS Code'])
if not os.path.exists(OUTPUT_FILE):
with open(OUTPUT_FILE, mode='w', newline='') as file:
writer = csv.writer(file)
writer.writerow(['Domain'])
# Read domains from the input file
with open(file_path, mode='r', encoding='utf-8') as file:
domains = [line.strip() for line in file]
for i in range(0, len(domains), batch_size):
batch = domains[i:i + batch_size]
# Use ThreadPoolExecutor to process domains in parallel
with ThreadPoolExecutor(max_workers=50) as executor:
future_to_domain = {executor.submit(process_domain, domain): domain for domain in batch}
for future in as_completed(future_to_domain):
domain = future_to_domain[future]
if skip and domain != last_domain:
continue
skip = False
try:
result = future.result()
except Exception as e:
logger.error(f'Error processing domain {domain}: {e}')
continue
if result:
report_row = [
result['domain'],
result['dns_domain'],
result['https_status'],
result['https_code'] if result['https_code'] else 'N/A'
]
domain_available = result['dns_domain'] if result['is_available'] else None
# Append results to files
append_to_csv_files(report_row, domain_available)
# Save the current checkpoint
save_checkpoint(result['domain'])
logger.info(f'Completed processing batch {i // batch_size + 1} / {len(domains) // batch_size + 1}')
# Trigger garbage collection to free up memory
gc.collect()
time.sleep(1) # Sleep briefly to allow system to recover between batches
logger.info('Processing completed.')
except FileNotFoundError as e:
logger.error(f'Input file not found: {file_path}')
except Exception as e:
logger.error(f'An unexpected error occurred: {e}')
# Replace 'domain_clean.lst' with the path to your input file
process_domains('domain_clean.lst')