Python | Remove domains eMail

GoldEvilCode

Новорег

GoldEvilCode

Новорег
Статус
Offline
Регистрация
21 Дек 2024
Сообщения
1
Лайки
1
Небольшой скрипт для работы с базами mail:passwoprd
Два режима работы.
1. Соберёт только нужные домены.
2. Удалит указанные .

Умеет работать с масками. К примеру - gmail.*, *.ru. Если нужно удалить все ру домены просто впишите *.ru
скрипт создавался для подготовки баз к IMAP чеку.
Мой список доменов для удаления.
hotmail.*, gmail.*, yahoo.*, outlook.*, icloud.*, *.ru, live.*, 21cn.com, *.cn, hu.inter.netwindstream.net, gardal.dk, golden.net, sibmail.com, tele2.nl, pa.net, shaw.ca, gmail.com, hu.inter.net, metrocast.net, netvigator.com, telus.net, yeah.net, hotmail.com, script9.net, kaross.net, vetorial.net, coitt.es, dokeda.lt, itprofi.eu, adept.co.za, caus-h.dk, sarenet.es, konto.pl, zoom-a.com, lds.net.ua, homechoice.co.uk, absamail.co.za, lantic.net, 21cn.com, 126.com, tlen.pl, qq.com, wz.zj.cn, poczta.fm, 189.cn, 163.com, 139.com, zjip.com, korea.com, alice-dsl.net, alice.de, addicon.com.cn, acsalaska.net, alaska.net, sina.com, wz.zj.cn, daum.net, hanmail.net, 66.ru, nate.com, pro.numericable.fr, estvideo.fr, evc.net, evhr.net, tv-com.net, valvision.fr, auchanabox.fr, numericable.com, noos.fr, terra.com.br, svitonline.com, gci.net, Telenet.be, numericable.fr, iol.ie, cybersmart.co.za, modulonet.fr, 0511.cn, actel.hu, iol.ie, vp.pl, pochta.onet.pl, 21cn.net, onet.pl, 126.com, 139.com, 163.com, 189.cn, 21cn.com, acsalaska.net, addicon.com.cn, alaska.net, allfarm.cn, altern.org, bestfarmshop.cn, biblicalsteel.cn, carpenterbible.cn, chevrolettracker.cn, cynthiablood.cn, daum.net, enet.com.cn, evergreenprograms.cn, failedflash.cn, googlemail., goosesites.cn, gsta.com, hanmail.net, happenedwatch.cn, headquarterslegal.cn, joycebrown.cn, korea.com, layersupplies.cn, leadsrecords.cn, librariesdirectory.cn, live.com, live.ru, mail.nbptt.zj.cn, mailzj.cn, mappinglanguage.cn, msnzone.cn, nate.com, naver.com, penciltoyota.cn, protectivedodge.cn, qq.com, requiredgeneral.cn, sina.cn, sina.com, sohu.com, thegreatestfarm.cn, vip.qq.com, vitalbiography.cn, vocabularyemployment.cn, w.cn, wz.zj.cn, yourlancia.cn, zjip.com, numericable.fr, telenet.be, svitonline.com, Zoom-a.com, modulonet.fr, homechoice.co.uk, gci.net, powerscript.at, noos.fr, rediffmail.com, iol.ie, yeah.net, absamail.co.za, adept.co.za, varsat.net, netvigator.com, actel.hu, macau.ctm.net, cybersmart.co.za, fsyyy.com, accor.de, evabaltser.dk, tom.com, web.de, gmx.com


Для запуска нужно установить python
И зависимости
Python

pip install chardet tqdm


Code:
import os
import sys
import re
import time
import fnmatch
import multiprocessing
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed

import chardet
from tqdm import tqdm


THREADS = 8
BATCH_SIZE = 5000
email_pass_pattern = re.compile(r'^([a-zA-Z0-9._%+-]{4,})@([a-zA-Z0-9.-]+\.[a-zA-Z]{2,}):(.+)$')


def safe_print(*args, **kwargs):
    try:
        print(*args, **kwargs)
    except UnicodeEncodeError:
        encoded = ' '.join(str(arg) for arg in args)
        encoded = encoded.encode(sys.stdout.encoding, errors='replace').decode(sys.stdout.encoding)
        print(encoded, **kwargs)

def detect_encoding(filepath):
    with open(filepath, 'rb') as f:
        raw = f.read(10000)
    result = chardet.detect(raw)
    return result.get('encoding')

def domain_matches(domain, patterns):
    for pattern in patterns:
        if fnmatch.fnmatch(domain, pattern):
            return True
    return False

def process_line(line, patterns, mode):
    line = line.strip()
    match = email_pass_pattern.match(line)
    if match:
        domain = match.group(2).lower()
        is_match = domain_matches(domain, patterns)
        if (mode == "1" and is_match) or (mode == "2" and not is_match):
            return line
    return None

def filter_emails(file_path, user_domains, mode, output_file_path="temp.txt"):
    encoding = detect_encoding(file_path)
    if not encoding:
        raise ValueError("      Unable to determine file encoding.")

    print(f"      Encoding defined: {encoding}")
    start_time = time.time()

    # Count total lines
    print("      Counting lines in a file...")
    with open(file_path, 'r', encoding=encoding, errors='ignore') as f:
        total_lines = sum(1 for _ in f)

    # Filtering
    with open(file_path, 'r', encoding=encoding, errors='ignore') as input_file, \
         open(output_file_path, 'w', encoding='utf-8') as output_file, \
         ThreadPoolExecutor(max_workers=THREADS) as executor:

        batch = []
        saved_lines = 0
        processed_lines = 0

        with tqdm(total=total_lines, unit="lines", desc="      Processing") as pbar:
            for line in input_file:
                batch.append(line)
                if len(batch) >= BATCH_SIZE:
                    futures = [executor.submit(process_line, l, user_domains, mode) for l in batch]
                    for f in as_completed(futures):
                        result = f.result()
                        if result:
                            output_file.write(result + '\n')
                            saved_lines += 1
                        processed_lines += 1
                        pbar.update(1)
                    batch.clear()

            # Final batch
            if batch:
                futures = [executor.submit(process_line, l, user_domains, mode) for l in batch]
                for f in as_completed(futures):
                    result = f.result()
                    if result:
                        output_file.write(result + '\n')
                        saved_lines += 1
                    processed_lines += 1
                    pbar.update(1)

    elapsed_time = time.time() - start_time
    return {
        "processed_lines": processed_lines,
        "saved_lines": saved_lines,
        "elapsed_time": elapsed_time,
        "output_file_path": output_file_path,
        "encoding": encoding
    }

def resultgen():
    if os.path.exists("temp.txt"):
        today = datetime.now()
        olnn = "Result_" + today.strftime('%Y_%m_%d_%H_%M_%S')
        os.mkdir(olnn)

        with open(f'{olnn}/result.txt', "w", encoding='utf-8') as outputFile, \
             open('temp.txt', "r", encoding='utf-8', errors='replace') as inputFile:

            lines_seen_so_far = set()
            for line in inputFile:
                if line not in lines_seen_so_far:
                    outputFile.write(line)
                    lines_seen_so_far.add(line)

        try:
            with open(f'{olnn}/result.txt', 'r', encoding='utf-8', errors='replace') as fp:
                lines = len(fp.readlines())
                safe_print('\r      Unique lines found:', lines, end='')
        except Exception as e:
            safe_print(f"\n      Failed to read result.txt: {e}")
            lines = 0

        safe_print(f"\n      Result - {olnn}")
        os.remove('temp.txt')
    else:
        safe_print("      No matching lines were found.")

    safe_print("      ==================================\n\n\n")
    os.system('pause')

def start():
    if os.path.exists("temp.txt"):
        os.remove("temp.txt")

    os.system('CLS' if os.name == 'nt' else 'clear')

    while True:
        os.system('CLS' if os.name == 'nt' else 'clear')
        print("")
        print("  1) Remove domains")
        print("  2) Info")
        print("")

        zapros = input("Select the function --> ").strip()
        print("")

        if zapros == "1":
            os.system('CLS' if os.name == 'nt' else 'clear')
            print("\n" * 8)
            print("      ==================================")
            print("      Function - Remove/Collect domains")

            file_path = input("      Enter the path to the file: ").strip()
            mode = input("      Select the mode:\n      1 - Collect only the specified domains\n      2 - Delete specified domains\n      Your choice (1/2): ").strip()
            while mode not in {"1", "2"}:
                mode = input("      Wrong choice. Enter 1 or 2: ").strip()

            domains_input = input("      Enter domains separated by commas (masks are allowed, e.g.: gmail.*, *.ru):\n      > ").strip()
            user_domains = [d.strip().lower() for d in domains_input.split(",") if d.strip()]

            if not user_domains:
                print("      You didn't specify any domains. Completion.")
                exit(1)

            try:
                stats = filter_emails(file_path, user_domains, mode)
            except Exception as e:
                print(f"      Error: {e}")
                input("      Press Enter to exit...")
                exit(1)

            print(f"      Done! Final statistics:")
            print(f"      Total lines processed: {stats['processed_lines']}")
            print(f"      Lines saved: {stats['saved_lines']}")
            print(f"      Time elapsed: {stats['elapsed_time']:.2f} seconds")
            print(f"      Encoding used: {stats['encoding']}")

            resultgen()

        elif zapros == "2":
            os.system('CLS' if os.name == 'nt' else 'clear')
            print("\n\n\n\n")
            print("  Code | https://hard-tm.su")
            os.system("pause")

if [B]name[/B] == '[B]main[/B]':
    multiprocessing.freeze_support()
    start()
 
Последнее редактирование модератором:
Сверху