Mailcow’s Rspamd nutzen um in beliebigen IMAP-Mailkonten die Spam zu filtern

Von | Dezember 14, 2025

Meine Mailcow ist ein wunderbares Stück Software und filtert für alle darauf eingerichteten Konten sehr schön die SPAM heraus.

Jetzt habe ich noch einige weitere IMAP Konten von diversen Vereinen, die keinen so guten Spamfilter haben. Da kam mir der Gedanke ob ich nicht die Filterfunktion des Rspamd meiner Mailcow benutzen kann um damit jede neu eingehende Mail auf einem der IMAP Konten zu prüfen.

Das folgende Python3 Skript habe ich auf dem Host der Mailcow als Dienst installiert und seitdem verschiebt es mir die Spam-Mails sehr schön in den SPAM Ordner.

Alternativ habe ich das Skript in einen Docker Container gepackt und starte diesen nun zusammen mit Mailcow (Siehe unten)

#!/usr/bin/env python3
"""
imap_async_rspamd_scan.py
Verarbeitet mehrere IMAP-Konten gleichzeitig mit IMAP IDLE und Rspamd.
Angepasst für imapclient Version 3.0.1.
"""
import imaplib, requests, logging, time, sys
import concurrent.futures
from imapclient import IMAPClient, exceptions

# === GLOBALE KONFIGURATION (Für alle Konten gleich) ===
RSPAMD_URL = "http://172.22.1.11:11333/checkv2"
SPAM_THRESHOLD = 6.0
SPAM_FOLDER = "INBOX.Spam" 
IDLE_TIMEOUT = 540 # 9 Minuten
IDLE_CHECK_INTERVAL = 10 
LOGFILE = "/var/log/imap_rspamd_scan.log"
PROCESSED_FLAG = b'RspamdScanned'

logging.basicConfig(filename=LOGFILE, level=logging.INFO,
                    format='%(asctime)s %(levelname)s [%(user)s] %(message)s') # user-Feld hinzugefügt

# === KONTO-KONFIGURATIONEN ===
# HINWEIS: Hier ALLE Konten mit den jeweiligen Zugangsdaten eintragen.
ACCOUNTS = [
    {
        "USER": "a@contoso.com",
        "PASS": "Super-Geheim",
        "HOST": "imap.server.contoso.com",
        "PORT": 993,
    },
    {
        "USER": "b@woanders.com",
        "PASS": "Auch-Geheimes-Passwort",
        "HOST": "imap.server.woanders.com",
        "PORT": 993,
    },
    {
        "USER": "c@xy-verein.com",
        "PASS": "Super-Dooper-Geheim",
        "HOST": "imap.server.xy-verein.com",
        "PORT": 993,
    },
]

# === FUNKTIONEN ===
def imap_connect_and_login(config):
    """Stellt die IMAPClient-Verbindung her, loggt sich ein und wählt INBOX."""
    client = IMAPClient(config['HOST'], port=config['PORT'], ssl=True)
    client.login(config['USER'], config['PASS'])
    # Wichtig: Wählt INBOX aus, um auf Nachrichtenänderungen zu achten
    client.select_folder('INBOX', readonly=False) 
    return client

def post_to_rspamd(raw_message_bytes):
    """POST rohe Nachricht an Rspamd und gibt geparstes JSON zurück."""
    headers = {'Content-Type': 'application/octet-stream'}
    try:
        r = requests.post(RSPAMD_URL, headers=headers, data=raw_message_bytes, timeout=20)
        r.raise_for_status()
        return r.json()
    except Exception as e:
        logging.exception("Error contacting Rspamd: %s", e)
        return None

def process_messages(client, uids, config):
    """Ruft die Nachrichten ab, scannt sie und verschiebt Spam."""
    if not uids:
        return

    log_extra = {"user": config['USER']}
    
    # Abrufen der E-Mails: 
    fetch_data = client.fetch(uids, [b'BODY.PEEK[]', b'ENVELOPE'])

    for uid, data in fetch_data.items():
        raw_message_bytes = None
        
        # Verschiedene IMAP Server geben etwas andere Antworten
        if b'BODY.PEEK[]' in data:
            raw_message_bytes = data[b'BODY.PEEK[]']
        if not raw_message_bytes and b'RFC822' in data:
            raw_message_bytes = data[b'RFC822']
        if not raw_message_bytes and b'BODY[]' in data:
            raw_message_bytes = data[b'BODY[]']

        if not raw_message_bytes:
            logging.error("Could not find email body data for UID %s", uid, extra=log_extra)
            continue
            
        subject = data[b'ENVELOPE'].subject.decode() if data[b'ENVELOPE'].subject else '(no-subject)'
        
        # 1. Sende an Rspamd
        rsp = post_to_rspamd(raw_message_bytes)
        
        if not rsp:
            logging.warning("No rspamd response for UID %s", uid, extra=log_extra)
            continue
            
        score = rsp.get('score')
        required = rsp.get('required_score')
        action = rsp.get('action')

        logging.info("UID %s subject=%s score=%.2f required=%.2f action=%s",
                     uid, subject, score or 0.0, required or 0.0, action, extra=log_extra)
        
        # 2. Spam-Entscheidung
        is_spam = (score is not None and required is not None and score >= required) or \
                  (score is not None and score >= SPAM_THRESHOLD) or \
                  (action in ('reject', 'discard'))

        # 3. Verschieben, wenn Spam
        if is_spam:
            try:
                client.move(uid, SPAM_FOLDER) 
                logging.info("Moved SPAM UID %s to %s.", uid, SPAM_FOLDER, extra=log_extra)
            except Exception as e:
                logging.exception("Error moving UID %s: %s", uid, e, extra=log_extra)
        else:
            logging.info("Clean mail UID %s. Leaving in INBOX.", uid, extra=log_extra)


# === HAUPT-LOOP JE KONTO ===
def handle_account(config):
    """Hauptlogik: Verbindet, scannt und wartet im IDLE-Modus für ein einzelnes Konto."""
    user = config['USER']
    log_extra = {"user": user}

    logging.info("Handler gestartet für Konto.", extra=log_extra)
    
    while True:
        client = None
        try:
            client = imap_connect_and_login(config)
            # Initialer Scan: Verarbeite alle Mails, die UNGELSEN sind UND NICHT gescannt wurden.
            # Die Suche wird auf 'UNSEEN' und 'NOT RspamdScanned' eingeschränkt.
            flag_name = PROCESSED_FLAG.decode()
            search_criteria = ['UNSEEN', 'NOT', 'KEYWORD', flag_name]
            initial_uids = client.search(search_criteria)
            process_messages(client, initial_uids, config)
            
            start_time = time.time()
            client.idle()
            # IDLE Check Schleife
            while True:
                responses = client.idle_check(timeout=IDLE_CHECK_INTERVAL)
                if responses:
                    pass
                    break 

                if time.time() - start_time > IDLE_TIMEOUT:
                    logging.info("IDLE timeout reached. Stopping IDLE for connection refresh.", extra=log_extra)
                    break 

            client.idle_done()
            
            # Führe einen erneuten Scan nach UNSEEN-Mails durch, die NICHT gescannt wurden.
            final_uids = client.search(search_criteria) # Verwendet dieselben Kriterien wie Oben
            process_messages(client, final_uids, config)

            client.logout()
            
        except exceptions.IMAPClientError as e:
            logging.error("IMAPClient Error: %s. Reconnecting in 10s.", e, extra=log_extra)
            if client:
                try:
                    client.close()
                except:
                    pass
            time.sleep(10)
        except Exception as e:
            logging.exception("Fatal error in main loop: %s. Reconnecting in 10s.", e, extra=log_extra)
            if client:
                try:
                    client.close()
                except:
                    pass
            time.sleep(10)

# === HAUPT-LOOP ===
if __name__ == '__main__':
    logging.info("Starting multi-account IMAP IDLE scanner. Total accounts: %d", 
                 len(ACCOUNTS), 
                 extra={"user": "System"}) 
    
    # ThreadPoolExecutor erstellt einen Pool von Threads, um die Konten parallel zu verarbeiten
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(ACCOUNTS)) as executor:
        # Führt handle_account für jedes Konto in der ACCOUNTS-Liste aus
        executor.map(handle_account, ACCOUNTS)

Das Skript startet für jedes Konto einen eigenen Thread, der jeweils asyncron per IMAP IDLE auf neue Mails im Konto wartet. Die Mails, die schon verarbeitet wurden erhalten eine Markierung, damit sie nicht wieder verarbeitet werden.

Als Spam erkannte Mails werden in den im Konfigurationsblock angegebenen Ordner verschoben.

Dlie Rspamd-URL sollte sich eigentlich maximal in der IP des Rsapmd Containers der Mailcow unterscheiden.

Test

Nachdem man die Zugangsdaten für die IMAP Konten in der Konfiguration hinterlegt hat kann man das Skript erstmal so ausführen und testen.

Als Service bootfest einrichten

Wenn alles so klappt wie man es haben möchte, dann kann man das Script als Service einrichten indem man die folgende Datei als /etc/systemd/system/rspamd-imap-scanner.service ablegt.

anschließend ein „systemctl daemon-reload“ und systemctl enable rspamd-imap-scanner.service und systemctl start rspamd-imap-scanner.service

[Unit]
Description=Rspamd IMAP Scanner and Mover (Multi-Account IDLE)
After=network.target

[Service]
# Der Benutzer, unter dem das Skript laufen soll (empfohlen: nicht root, z.B. nobody)
User=nobody
Group=nobody

# Typ: simple ist ausreichend, da Ihr Skript eine Endlosschleife enthält.
Type=simple

# Der Befehl, der das Skript ausführt
ExecStart=/usr/bin/python3 /pfad_zum_script/imap_async_rspamd_scan.py

# Auto-Neustart bei abnormaler Beendigung (z.B. nach einem Fehler oder Crash)
Restart=always
RestartSec=5

# Standard-Ausgabe und Fehler werden in das Journal von Systemd geschrieben
StandardOutput=journal
StandardError=journal

[Install]
# Aktiviert den Dienst beim Systemstart
WantedBy=multi-user.target

Alternative: Direkt in der Mailcow-Dockerumgebung in einem separaten Container starten

Nachdem ich das obige Skript als Dienst laufen hatte es ab und zu das Problem, dass es die IP des RSpamd Containers verloren hat und erst neu finden musste. Also kam mir der Gedanke, dass man das Skript ja direkt in einem Docker Container im „mailcow-netzwerk“ laufen lassen kann und dann kein Problem mehr mit sich ändernden IP’s hat.

Ich habe die Konfiguration ausgelagert und das Logging auf das Docker typische Logging umgestellt.

Es braucht ein neues Verzeichnis in der Mailcow Umgebung /opt/mailcow-dockerized/data/custom/imap-scanner. In dieses kommt das Skript, die Konfigurationsdatei und das Dockerfile.

Dockerfile

# Dockerfile für den Rspamd IMAP Scanner
FROM python:3.11-slim

# Installiere benötigte Pakete
RUN pip install imapclient requests docker

# Lege das Arbeitsverzeichnis fest
WORKDIR /app

# Führe das Skript beim Start aus
CMD ["python3", "imap_rspamd_docker.py"]

Konfigurationsdatei: imap_rspamd_config.json

In dieser Datei müssen die Zugangsdaten für die IMAP Konten angegeben werden. Die Anzahl ist prinzipiell beliebig.

{
    "GLOBAL": {
        "RSPAMD_CONTAINER": "mailcowdockerized-rspamd-mailcow-1",
        "SPAM_THRESHOLD": 16.0,
        "SPAM_FOLDER": "INBOX.Spam",
        "IDLE_TIMEOUT": 540,
        "IDLE_CHECK_INTERVAL": 10,
        "LOGFILE": "/var/log/imap_rspamd_scan.log",
        "PROCESSED_FLAG": "RspamdScanned"
    },
    "ACCOUNTS": [
        {
            "USER": "hein.bloed@contoso.com",
            "PASS": "8MJp45L$#p%",
            "HOST": "imap.contoso.com",
            "PORT": 993
        },
        {
            "USER": "friedrich.maerz@contoso.com",
            "PASS": "FriedensNobelPreisFuerMich",
            "HOST": "contoso.com",
            "PORT": 993
        },
        {
            "USER": "chef@contoso.com",
            "PASS": "8hzhmdaodu0fwer",
            "HOST": "contoso.com",
            "PORT": 993
        }
    ]
}

Das angepasste Skript: imap_rspamd_docker.py

#!/usr/bin/env python3
"""
imap_rspamd_docker.py
Verarbeitet mehrere IMAP-Konten gleichzeitig mit IMAP IDLE und Rspamd.
"""
import imaplib, requests, logging, time, sys
import concurrent.futures
from imapclient import IMAPClient, exceptions
import sys
import json

# === KONFIGURATIONSDATEI liegt in ./data/custom/imap-scanner ===
CONFIG_PATH = "/etc/imap_rspamd_config.json"
RSPAMD_CONTAINER = "rspamd"
SPAM_THRESHOLD = None
SPAM_FOLDER = None
IDLE_TIMEOUT = None
# ... weitere globale Variablen hier definieren (z.B. LOGFILE, PROCESSED_FLAG) ...
PROCESSED_FLAG = None
ACCOUNTS = [] # Wird aus der Konfiguration geladen
RSPAMD_URL = None # Wird später berechnet

# === FUNKTIONEN ===

def imap_connect_and_login(config):
    """Stellt die IMAPClient-Verbindung her, loggt sich ein und wählt INBOX."""
    client = IMAPClient(config['HOST'], port=config['PORT'], ssl=True)
    client.login(config['USER'], config['PASS'])
    client.select_folder('INBOX', readonly=False) 
    return client

def post_to_rspamd(raw_message_bytes):
    """POST rohe Nachricht an Rspamd und parsed das zurückgegebene JSON"""
    headers = {'Content-Type': 'application/octet-stream'}
    try:
        r = requests.post(RSPAMD_URL, headers=headers, data=raw_message_bytes, timeout=20)
        r.raise_for_status()
        return r.json()
    except Exception as e:
        logging.exception("Error contacting Rspamd: %s", e)
        return None

def load_config(config_path):
    """Lädt die Konfiguration aus einer JSON-Datei."""
    try:
        with open(config_path, 'r') as f:
            return json.load(f)
    except FileNotFoundError:
        print(f"FATAL: Konfigurationsdatei nicht gefunden: {config_path}")
        sys.exit(1)
    except json.JSONDecodeError:
        print(f"FATAL: Fehler beim Parsen der Konfigurationsdatei: {config_path}")
        sys.exit(1)
    except Exception as e:
        print(f"FATAL: Fehler beim Laden der Konfiguration: {e}")
        sys.exit(1)

def process_messages(client, uids, config):
    """Ruft die neuen Nachrichten ab, scannt sie und verschiebt Spam."""
    if not uids:
        return

    log_extra = {"user": config['USER']}
    
    # Abrufen der E-Mails: BODY.PEEK[] verhindert das Setzen des \Seen-Flags.
    fetch_data = client.fetch(uids, [b'BODY.PEEK[]', b'ENVELOPE'])
    for uid, data in fetch_data.items():
        raw_message_bytes = None
        # Verschiedene Keys für prüfen, da der Key nicht bei allen Servern RFC822 ist
        if b'BODY.PEEK[]' in data:
            raw_message_bytes = data[b'BODY.PEEK[]']
        if not raw_message_bytes and b'RFC822' in data:
            raw_message_bytes = data[b'RFC822']
        if not raw_message_bytes and b'BODY[]' in data:
            raw_message_bytes = data[b'BODY[]']

        if not raw_message_bytes:
            logging.error("Could not find email body data for UID %s", uid, extra=log_extra)
            continue
            
        subject = data[b'ENVELOPE'].subject.decode() if data[b'ENVELOPE'].subject else '(no-subject)'
        
        # Sende an Rspamd
        rsp = post_to_rspamd(raw_message_bytes)           
        score = rsp.get('score')
        required = rsp.get('required_score')
        action = rsp.get('action')

        logging.info("UID %s subject=%s score=%.2f required=%.2f action=%s",
                     uid, subject, score or 0.0, required or 0.0, action, extra=log_extra)
        
        # Spam-Entscheidung
        is_spam = action in ('reject', 'add header', 'rewrite subject', 'discard') or \
                  (score is not None and required is not None and score >= required)

        # Verschieben, wenn Spam
        if is_spam:
            try:
                client.move(uid, SPAM_FOLDER) 
                logging.info("Moved SPAM UID %s to %s.", uid, SPAM_FOLDER, extra=log_extra)
            except Exception as e:
                logging.exception("Error moving UID %s: %s", uid, e, extra=log_extra)
        else:
            logging.info("Clean mail UID %s. Leaving in INBOX.", uid, extra=log_extra)
        try:
            # Hier wird ein Flag gesetzt, damit diese Mail nicht nocheinmal bearbeitet wird.
            client.add_flags(uids, (PROCESSED_FLAG,))
            logging.info("Set flag %s on UIDs: %s", PROCESSED_FLAG.decode(), uids, extra=log_extra)
        except Exception as e:
            logging.exception("Error adding flag %s to UIDs %s: %s", PROCESSED_FLAG.decode(), uids, e, extra=log_extra)

# === HAUPT-LOOP FÜR EIN KONTO ===
def handle_account(config):
    """Hauptlogik: Verbindet, scannt und wartet im IDLE-Modus für ein einzelnes Konto."""
    user = config['USER']
    log_extra = {"user": user}
    logging.info("Handler gestartet für Konto.", extra=log_extra)
    
    while True:
        client = None
        try:
            client = imap_connect_and_login(config)
            logging.info("IMAP connection established. Entering IDLE mode.", extra=log_extra)
            # Initialer Scan: Verarbeite alle Mails, die UNGELSEN sind UND NICHT das Flag haben.
            # Also wird die Suche auf 'UNSEEN' und 'NOT RspamdScanned' eingeschränkt.
            flag_name = PROCESSED_FLAG.decode()
            search_criteria = ['UNSEEN', 'NOT', 'KEYWORD', flag_name]
            initial_uids = client.search(search_criteria)
            process_messages(client, initial_uids, config)
            
            start_time = time.time()
            client.idle()
            # IDLE Check Schleife
            while True:
                responses = client.idle_check(timeout=IDLE_CHECK_INTERVAL)
                if responses:
                    pass
                    break 

                if time.time() - start_time > IDLE_TIMEOUT:
                    logging.info("IDLE timeout reached. Stopping IDLE for connection refresh.", extra=log_extra)
                    break 

            client.idle_done()            
            # Führe einen erneuten Scan nach UNSEEN-Mails durch, die NICHT gescannt wurden.
            final_uids = client.search(search_criteria) 
            process_messages(client, final_uids, config)
            client.logout()
            
        except exceptions.IMAPClientError as e:
            logging.error("IMAPClient Error: %s. Reconnecting in 10s.", e, extra=log_extra)
            if client:
                try:
                    client.close()
                except:
                    pass
            time.sleep(10)
        except Exception as e:
            logging.exception("Fatal error in main loop: %s. Reconnecting in 10s.", e, extra=log_extra)
            if client:
                try:
                    client.close()
                except:
                    pass
            time.sleep(10)

# === EINSATZPUNKT (Execution) ===
if __name__ == '__main__':
    # 1. Konfiguration laden
    config_data = load_config(CONFIG_PATH)
    # 2. und damit die globalen Variablen zuweisen
    global_config = config_data.get("GLOBAL", {})
    
    RSPAMD_CONTAINER = global_config.get("RSPAMD_CONTAINER")
    SPAM_THRESHOLD = global_config.get("SPAM_THRESHOLD")
    SPAM_FOLDER = global_config.get("SPAM_FOLDER")
    IDLE_TIMEOUT = global_config.get("IDLE_TIMEOUT")
    IDLE_CHECK_INTERVAL = global_config.get("IDLE_CHECK_INTERVAL")
    LOGFILE = global_config.get("LOGFILE")
    flag_str = global_config.get("PROCESSED_FLAG", "RspamdScanned")
    PROCESSED_FLAG = flag_str.encode('ascii') 
    ACCOUNTS = config_data.get("ACCOUNTS", [])

    # 3. Logging neu konfigurieren (Jetzt, da LOGFILE bekannt ist)
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s %(levelname)s [%(user)s] %(message)s', force=True)

    if not ACCOUNTS:
        logging.critical("FATAL: Keine Konten in der Konfigurationsdatei gefunden.", e, extra={"user": "System"})
        sys.exit(1)


    # 4. Rspamd URL berechnen
    try:
        RSPAMD_URL = "http://" + RSPAMD_CONTAINER + ":11333/checkv2"
    except Exception as e:
        logging.critical("Konnte Rspamd IP nicht ermitteln: %s", e, extra={"user": "System"})
        sys.exit(1)
        
    logging.info("Starting multi-account IMAP IDLE scanner. Total accounts: %d", 
                 len(ACCOUNTS), 
                 extra={"user": "System"}) 
                 
    # 5. Starte je einen Thread pro Konto
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(ACCOUNTS)) as executor:
        executor.map(handle_account, ACCOUNTS)

Einbinden in die Mailcow Umgebung über einen zusätzlichen Service in der docker-compose.override.yml

Wenn es noch keine docker-compose.override.yml Datei im Verzeichnis /opt/mailcow-dockerized gibt, dann muss diese angelegt werden und mit diesem Inhalt gefüllt werden:

services:
    imap-scanner:
      image: custom/imap-scanner:latest 
      build:
        context: data/custom/imap-scanner/
        dockerfile: Dockerfile
      # Neustart bei Fehlern
      restart: always
      # Zugriff auf das Mailcow-Netzwerk
      networks:
        mailcow-network:
          # Kein fester Hostname nötig
          aliases:
            - imap-scanner
      volumes:
        # Skript und Konfiguration mounten
        - ./data/custom/imap-scanner/imap_rspamd_docker.py:/app/imap_rspamd_docker.py:ro
        - ./data/custom/imap-scanner/imap_rspamd_config.json:/etc/imap_rspamd_config.json:ro
      # Optional: Feste Ressourcenbegrenzung
      mem_limit: 128m

Wenn es schon eine docker-compose.override.yml Datei gibt, dann wird der obige Teil dort unterhalb des Tags „services:“ eingefügt, natürlich ohne das einleitende „services:“ von oben.

Starten:

Vor dem ersten start muss der Container gebaut werden mit „docker compose build imap-scanner

Der Container wird nun bei einem normalen Start der Mailcow mitgestartet. Wenn die Mailcow schon läuft kann man ihn mit einem docker compose up -d nachstarten lassen.

Mit einem „docker logs -f mailcowdockerized-imap-scanner-1“ kann man sich die Logs ausgeben lassen.

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert