Meine Mailcow ist ein wunderbares Stück Software und filtert für alle darauf eingerichteten Konten sehr schön die SPAM heraus.
Jetzt habe ich noch einige weitere IMAP Konten von diversen Vereinen, die keinen so guten Spamfilter haben. Da kam mir der Gedanke ob ich nicht die Filterfunktion des Rspamd meiner Mailcow benutzen kann um damit jede neu eingehende Mail auf einem der IMAP Konten zu prüfen.
Das folgende Python3 Skript habe ich auf dem Host der Mailcow als Dienst installiert und seitdem verschiebt es mir die Spam-Mails sehr schön in den SPAM Ordner.
#!/usr/bin/env python3
"""
imap_async_rspamd_scan.py
Verarbeitet mehrere IMAP-Konten gleichzeitig mit IMAP IDLE und Rspamd.
Angepasst für imapclient Version 3.0.1.
"""
import imaplib, requests, logging, time, sys
import concurrent.futures
from imapclient import IMAPClient, exceptions
# === GLOBALE KONFIGURATION (Für alle Konten gleich) ===
RSPAMD_URL = "http://172.22.1.11:11333/checkv2"
SPAM_THRESHOLD = 6.0
SPAM_FOLDER = "INBOX.Spam"
IDLE_TIMEOUT = 540 # 9 Minuten
IDLE_CHECK_INTERVAL = 10
LOGFILE = "/var/log/imap_rspamd_scan.log"
PROCESSED_FLAG = b'RspamdScanned'
logging.basicConfig(filename=LOGFILE, level=logging.INFO,
format='%(asctime)s %(levelname)s [%(user)s] %(message)s') # user-Feld hinzugefügt
# === KONTO-KONFIGURATIONEN ===
# HINWEIS: Hier ALLE Konten mit den jeweiligen Zugangsdaten eintragen.
ACCOUNTS = [
{
"USER": "a@contoso.com",
"PASS": "Super-Geheim",
"HOST": "imap.server.contoso.com",
"PORT": 993,
},
{
"USER": "b@woanders.com",
"PASS": "Auch-Geheimes-Passwort",
"HOST": "imap.server.woanders.com",
"PORT": 993,
},
{
"USER": "c@xy-verein.com",
"PASS": "Super-Dooper-Geheim",
"HOST": "imap.server.xy-verein.com",
"PORT": 993,
},
]
# === FUNKTIONEN ===
def imap_connect_and_login(config):
"""Stellt die IMAPClient-Verbindung her, loggt sich ein und wählt INBOX."""
client = IMAPClient(config['HOST'], port=config['PORT'], ssl=True)
client.login(config['USER'], config['PASS'])
# Wichtig: Wählt INBOX aus, um auf Nachrichtenänderungen zu achten
client.select_folder('INBOX', readonly=False)
return client
def post_to_rspamd(raw_message_bytes):
"""POST rohe Nachricht an Rspamd und gibt geparstes JSON zurück."""
headers = {'Content-Type': 'application/octet-stream'}
try:
r = requests.post(RSPAMD_URL, headers=headers, data=raw_message_bytes, timeout=20)
r.raise_for_status()
return r.json()
except Exception as e:
logging.exception("Error contacting Rspamd: %s", e)
return None
def process_messages(client, uids, config):
"""Ruft die Nachrichten ab, scannt sie und verschiebt Spam."""
if not uids:
return
log_extra = {"user": config['USER']}
# Abrufen der E-Mails:
fetch_data = client.fetch(uids, [b'BODY.PEEK[]', b'ENVELOPE'])
for uid, data in fetch_data.items():
raw_message_bytes = None
# Verschiedene IMAP Server geben etwas andere Antworten
if b'BODY.PEEK[]' in data:
raw_message_bytes = data[b'BODY.PEEK[]']
if not raw_message_bytes and b'RFC822' in data:
raw_message_bytes = data[b'RFC822']
if not raw_message_bytes and b'BODY[]' in data:
raw_message_bytes = data[b'BODY[]']
if not raw_message_bytes:
logging.error("Could not find email body data for UID %s", uid, extra=log_extra)
continue
subject = data[b'ENVELOPE'].subject.decode() if data[b'ENVELOPE'].subject else '(no-subject)'
# 1. Sende an Rspamd
rsp = post_to_rspamd(raw_message_bytes)
if not rsp:
logging.warning("No rspamd response for UID %s", uid, extra=log_extra)
continue
score = rsp.get('score')
required = rsp.get('required_score')
action = rsp.get('action')
logging.info("UID %s subject=%s score=%.2f required=%.2f action=%s",
uid, subject, score or 0.0, required or 0.0, action, extra=log_extra)
# 2. Spam-Entscheidung
is_spam = (score is not None and required is not None and score >= required) or \
(score is not None and score >= SPAM_THRESHOLD) or \
(action in ('reject', 'discard'))
# 3. Verschieben, wenn Spam
if is_spam:
try:
client.move(uid, SPAM_FOLDER)
logging.info("Moved SPAM UID %s to %s.", uid, SPAM_FOLDER, extra=log_extra)
except Exception as e:
logging.exception("Error moving UID %s: %s", uid, e, extra=log_extra)
else:
logging.info("Clean mail UID %s. Leaving in INBOX.", uid, extra=log_extra)
# === HAUPT-LOOP JE KONTO ===
def handle_account(config):
"""Hauptlogik: Verbindet, scannt und wartet im IDLE-Modus für ein einzelnes Konto."""
user = config['USER']
log_extra = {"user": user}
logging.info("Handler gestartet für Konto.", extra=log_extra)
while True:
client = None
try:
client = imap_connect_and_login(config)
# Initialer Scan: Verarbeite alle Mails, die UNGELSEN sind UND NICHT gescannt wurden.
# Die Suche wird auf 'UNSEEN' und 'NOT RspamdScanned' eingeschränkt.
flag_name = PROCESSED_FLAG.decode()
search_criteria = ['UNSEEN', 'NOT', 'KEYWORD', flag_name]
initial_uids = client.search(search_criteria)
process_messages(client, initial_uids, config)
start_time = time.time()
client.idle()
# IDLE Check Schleife
while True:
responses = client.idle_check(timeout=IDLE_CHECK_INTERVAL)
if responses:
pass
break
if time.time() - start_time > IDLE_TIMEOUT:
logging.info("IDLE timeout reached. Stopping IDLE for connection refresh.", extra=log_extra)
break
client.idle_done()
# Führe einen erneuten Scan nach UNSEEN-Mails durch, die NICHT gescannt wurden.
final_uids = client.search(search_criteria) # Verwendet dieselben Kriterien wie Oben
process_messages(client, final_uids, config)
client.logout()
except exceptions.IMAPClientError as e:
logging.error("IMAPClient Error: %s. Reconnecting in 10s.", e, extra=log_extra)
if client:
try:
client.close()
except:
pass
time.sleep(10)
except Exception as e:
logging.exception("Fatal error in main loop: %s. Reconnecting in 10s.", e, extra=log_extra)
if client:
try:
client.close()
except:
pass
time.sleep(10)
# === HAUPT-LOOP ===
if __name__ == '__main__':
logging.info("Starting multi-account IMAP IDLE scanner. Total accounts: %d",
len(ACCOUNTS),
extra={"user": "System"})
# ThreadPoolExecutor erstellt einen Pool von Threads, um die Konten parallel zu verarbeiten
with concurrent.futures.ThreadPoolExecutor(max_workers=len(ACCOUNTS)) as executor:
# Führt handle_account für jedes Konto in der ACCOUNTS-Liste aus
executor.map(handle_account, ACCOUNTS)
Das Skript startet für jedes Konto einen eigenen Thread, der jeweils asyncron per IMAP IDLE auf neue Mails im Konto wartet. Die Mails, die schon verarbeitet wurden erhalten eine Markierung, damit sie nicht wieder verarbeitet werden.
Als Spam erkannte Mails werden in den im Konfigurationsblock angegebenen Ordner verschoben.
Dlie Rspamd-URL sollte sich eigentlich maximal in der IP des Rsapmd Containers der Mailcow unterscheiden.
Test
Nachdem man die Zugangsdaten für die IMAP Konten in der Konfiguration hinterlegt hat kann man das Skript erstmal so ausführen und testen.
Als Service bootfest einrichten
Wenn alles so klappt wie man es haben möchte, dann kann man das Script als Service einrichten indem man die folgende Datei als /etc/systemd/system/rspamd-imap-scanner.service ablegt.
anschließend ein „systemctl daemon-reload“ und systemctl enable rspamd-imap-scanner.service und systemctl start rspamd-imap-scanner.service
[Unit]
Description=Rspamd IMAP Scanner and Mover (Multi-Account IDLE)
After=network.target
[Service]
# Der Benutzer, unter dem das Skript laufen soll (empfohlen: nicht root, z.B. nobody)
User=nobody
Group=nobody
# Typ: simple ist ausreichend, da Ihr Skript eine Endlosschleife enthält.
Type=simple
# Der Befehl, der das Skript ausführt
ExecStart=/usr/bin/python3 /pfad_zum_script/imap_async_rspamd_scan.py
# Auto-Neustart bei abnormaler Beendigung (z.B. nach einem Fehler oder Crash)
Restart=always
RestartSec=5
# Standard-Ausgabe und Fehler werden in das Journal von Systemd geschrieben
StandardOutput=journal
StandardError=journal
[Install]
# Aktiviert den Dienst beim Systemstart
WantedBy=multi-user.target
