Using Claude.AI for parsing URIs from email.
Problem reported by Douglas Foster - Today at 3:18 PM
Submitted
I asked Claude to generate a Python module to parse all URIs from either plain text or HRML source, then  send the results to the top three RBL sources, stopping after the first negative result.   I am blown away that it can do in seconds, for free, what I would struggle to complete at all.   Code has not been tested, but I am going to post it in the first reply anyway.   If you find a bug before I do, please let me know, but I expect that Claude's code will also be more bug-free than anything I could write.   If you have already written something similar, is its code is as good as yours?
Douglas Foster Replied
"""
uri_extractor.py
================
Utilities for extracting URIs from plain text and from HTML, and for
checking extracted URIs against DNS-based URI blacklists (URIBLs / RBLs).

Public API
----------
    extract_uris(text, **options)            → list[dict]
    extract_uris_from_html(html, **options)  → list[dict]
    check_uris_against_rbls(uris, **options) → list[dict]

Each URI dict from the extractors contains at minimum:

    uri       – resolved URI string
    scheme    – lowercased scheme, or None
    kind      – 'absolute' | 'scheme-only' | 'protocol-relative' |
                'bare-domain' | 'attribute' | 'css' | 'script' | 'text'

HTML results also carry:
    raw_uri   – URI exactly as written in the source
    source    – 'attribute' | 'css' | 'script' | 'text'
    tag       – lowercase tag name, or None
    attr      – attribute name, or None

RBL result dicts (one per URI) contain:
    uri            – the URI that was checked
    host           – extracted hostname / IP
    is_listed      – True if listed on ANY checked RBL
    verdict        – 'listed' | 'clean' | 'error'
    checked_rbls   – list of RBL names actually queried
    rbl_results    – dict mapping RBL name → per-RBL result dict
    stopped_early  – True when checking halted after a positive hit
"""

from __future__ import annotations

import ipaddress
import re
import socket
import time
from html.parser import HTMLParser
from typing import Optional
from urllib.parse import urljoin, urlparse


# ===========================================================================
# Shared / internal helpers
# ===========================================================================

_SCHEME_PAT = r'(?:[a-zA-Z][a-zA-Z0-9+\-.]*)'
_URI_CHARS  = r"""[^\s<>\[\]{}|\\^`"']"""

_KNOWN_SCHEMES = re.compile(
    r'^(?:https?|ftps?|sftp|ssh|telnet|smtp|imap|pop3s?|ldaps?|'
    r'mailto|xmpp|sip|sips|turn|stun|ws|wss|urn|data|file|git|svn|'
    r'rtsp|rtsps|vnc|rdp|market|intent|tel|callto)$',
    re.IGNORECASE,
)

_URI_RE = re.compile(
    r'(?:'
    r'(?P<absolute>'   + _SCHEME_PAT + r'://' + _URI_CHARS + r'+)'
    r'|'
    r'(?P<schemeless>(?:mailto|tel|callto|urn|data):' + _URI_CHARS + r'+)'
    r'|'
    r'(?P<protrel>(?<![:/])//' + _URI_CHARS + r'+)'
    r')',
    re.IGNORECASE,
)

_TRAILING_JUNK = re.compile(r'[.,;:!?)>\]}\'"]+$')


def _trim_trailing(uri: str) -> str:
    uri = _TRAILING_JUNK.sub('', uri)
    for open_c, close_c in [('(', ')'), ('[', ']'), ('{', '}')]:
        while uri.endswith(close_c) and uri.count(open_c) < uri.count(close_c):
            uri = uri[:-1]
    return _TRAILING_JUNK.sub('', uri)


_BARE_DOMAIN = re.compile(
    r'(?<![/\w@.])'
    r'(?:'
        r'(?:www\d*|ftp)\.'
        r'|'
        r'(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)'
        r'(?:com|net|org|edu|gov|mil|int'
        r'|io|co|ai|app|dev|cloud|tech|online|store|shop|blog|media'
        r'|[a-z]{2})\.'
    r')'
    r'[a-zA-Z0-9][a-zA-Z0-9-]*'
    r'(?:\.[a-zA-Z]{2,})*'
    r'(?::\d{1,5})?'
    r'(?:/[^\s<>\[\]{}|\\^`"\']*)?',
    re.IGNORECASE,
)

# ---------------------------------------------------------------------------
# HTML-specific patterns
# ---------------------------------------------------------------------------

_URI_ATTRS: dict[str, list[str]] = {
    'a':           ['href', 'ping', 'xlink:href'],
    'area':        ['href', 'ping'],
    'audio':       ['src'],
    'base':        ['href'],
    'blockquote':  ['cite'],
    'button':      ['formaction'],
    'del':         ['cite'],
    'embed':       ['src'],
    'feimage':     ['href', 'xlink:href'],
    'form':        ['action'],
    'frame':       ['src', 'longdesc'],
    'iframe':      ['src'],
    'image':       ['href', 'xlink:href'],
    'img':         ['src', 'srcset', 'longdesc', 'usemap'],
    'input':       ['src', 'formaction'],
    'ins':         ['cite'],
    'link':        ['href', 'imagesrcset'],
    'meta':        ['content'],
    'object':      ['data', 'usemap', 'classid', 'codebase'],
    'param':       ['value'],
    'pattern':     ['href', 'xlink:href'],
    'q':           ['cite'],
    'script':      ['src'],
    'source':      ['src', 'srcset'],
    'track':       ['src'],
    'use':         ['href', 'xlink:href'],
    'video':       ['src', 'poster'],
}

_GLOBAL_URI_ATTRS = {
    'action', 'cite', 'data', 'formaction', 'href', 'manifest', 'ping',
    'poster', 'src', 'usemap', 'xlink:href', 'xml:base', 'xmlns', 'itemid',
}

_SRCSET_RE     = re.compile(r'([^\s,][^\s,]*[^\s,]|[^\s,])(?:\s+[\d.]+[wx])?')
_CSS_URL_RE    = re.compile(r'url\(\s*["\']?([^"\')\s]+)["\']?\s*\)', re.IGNORECASE)
_CSS_IMPORT_RE = re.compile(
    r'@import\s+(?:url\(\s*["\']?([^"\')\s]+)["\']?\s*\)|["\']([^"\']+)["\'])',
    re.IGNORECASE,
)
_EVENT_ATTR    = re.compile(r'^on[a-z]+$', re.IGNORECASE)
_JS_URI_RE     = re.compile(r'\bjavascript:\s*[^\s"\'>;]+', re.IGNORECASE)

_META_URL_NAMES = {
    'og:url', 'twitter:url', 'canonical',
    'msapplication-starturl', 'msapplication-tileimage',
}
_META_URL_PROPS = {'og:url', 'og:image', 'og:audio', 'og:video'}


# ===========================================================================
# Plain-text extractor
# ===========================================================================

def extract_uris(
    text: str,
    *,
    include_bare_domains: bool = False,
    deduplicate: bool = True,
    sort: bool = False,
) -> list[dict]:
    """
    Extract all URIs from a plain-text string.

    Parameters
    ----------
    text:
        Input string to scan.
    include_bare_domains:
        Also detect unschemed bare hostnames such as ``www.example.com``.
        Off by default (higher false-positive rate).
    deduplicate:
        Keep only the first occurrence of each unique URI string.
    sort:
        Return results sorted alphabetically by URI instead of by position.

    Returns
    -------
    list[dict] with keys: uri, scheme, kind, start, end
    """
    results: list[dict] = []
    seen:    set[str]   = set()

    def _add(uri: str, kind: str, scheme: Optional[str], start: int) -> None:
        uri = _trim_trailing(uri)
        if not uri:
            return
        if deduplicate and uri in seen:
            return
        seen.add(uri)
        results.append({
            'uri':    uri,
            'scheme': scheme.lower() if scheme else None,
            'kind':   kind,
            'start':  start,
            'end':    start + len(uri),
        })

    for m in _URI_RE.finditer(text):
        raw, start = m.group(0), m.start()
        if m.group('absolute'):
            scheme = re.match(_SCHEME_PAT, raw).group(0)
            _add(raw, 'absolute', scheme, start)
        elif m.group('schemeless'):
            _add(raw, 'scheme-only', raw.split(':')[0], start)
        elif m.group('protrel'):
            _add(raw, 'protocol-relative', None, start)

    if include_bare_domains:
        for m in _BARE_DOMAIN.finditer(text):
            raw   = _trim_trailing(m.group(0))
            start = m.start()
            if not raw:
                continue
            if any(r['start'] <= start < r['end'] for r in results):
                continue
            if deduplicate and raw in seen:
                continue
            seen.add(raw)
            results.append({
                'uri':    raw,
                'scheme': None,
                'kind':   'bare-domain',
                'start':  start,
                'end':    start + len(raw),
            })

    results.sort(key=lambda r: r['uri'] if sort else r['start'])
    return results


# ===========================================================================
# HTML parser (internal)
# ===========================================================================

class _URIHTMLParser(HTMLParser):
    def __init__(self, base_url: Optional[str], extract_text_uris: bool) -> None:
        super().__init__(convert_charrefs=True)
        self.base_url          = base_url
        self.extract_text_uris = extract_text_uris
        self.results:  list[dict] = []
        self._seen:    set[tuple] = set()
        self._in_style  = False
        self._in_script = False

    def _resolve(self, uri: str) -> str:
        uri = uri.strip()
        if self.base_url and uri and not uri.startswith(('data:', 'javascript:')):
            try:
                return urljoin(self.base_url, uri)
            except ValueError:
                pass
        return uri

    def _add(self, uri: str, source: str, tag: Optional[str],
             attr: Optional[str], extra: Optional[dict] = None) -> None:
        uri = uri.strip()
        if not uri or uri == '#':
            return
        resolved = self._resolve(uri)
        key = (resolved, source)
        if key in self._seen:
            return
        self._seen.add(key)
        try:
            parsed = urlparse(resolved)
            scheme = parsed.scheme.lower() if parsed.scheme else None
        except ValueError:
            scheme = None
        record: dict = {
            'uri':     resolved,
            'raw_uri': uri,
            'scheme':  scheme,
            'kind':    source,
            'source':  source,
            'tag':     tag,
            'attr':    attr,
        }
        if extra:
            record.update(extra)
        self.results.append(record)

    def _scan_srcset(self, value: str, tag: str, attr: str) -> None:
        for m in _SRCSET_RE.finditer(value):
            self._add(m.group(1), 'attribute', tag, attr)

    def _scan_css(self, css: str, tag: Optional[str], attr: Optional[str]) -> None:
        for m in _CSS_URL_RE.finditer(css):
            self._add(m.group(1), 'css', tag, attr)
        for m in _CSS_IMPORT_RE.finditer(css):
            url = m.group(1) or m.group(2)
            if url:
                self._add(url, 'css', tag, attr)

    def handle_starttag(self, tag: str, attrs: list[tuple[str, Optional[str]]]) -> None:
        tag_l = tag.lower()
        self._in_style  = tag_l == 'style'
        self._in_script = tag_l == 'script'
        attr_dict = {k.lower(): (v or '').strip() for k, v in attrs}
        allowed   = set(_URI_ATTRS.get(tag_l, [])) | _GLOBAL_URI_ATTRS

        for attr, value in attr_dict.items():
            if not value:
                continue
            if attr in ('srcset', 'imagesrcset'):
                self._scan_srcset(value, tag_l, attr)
            elif attr == 'style':
                self._scan_css(value, tag_l, attr)
            elif _EVENT_ATTR.match(attr):
                for m in _JS_URI_RE.finditer(value):
                    self._add(m.group(0), 'script', tag_l, attr)
            elif tag_l == 'meta' and attr == 'content':
                name  = attr_dict.get('name', '').lower()
                prop  = attr_dict.get('property', '').lower()
                equiv = attr_dict.get('http-equiv', '').lower()
                if name in _META_URL_NAMES or prop in _META_URL_PROPS:
                    self._add(value, 'attribute', tag_l, attr)
                elif equiv == 'refresh':
                    m2 = re.search(r'url\s*=\s*([^\s;]+)', value, re.I)
                    self._add(m2.group(1) if m2 else value, 'attribute', tag_l, attr)
            elif attr in allowed:
                if value.startswith('data:'):
                    self._add(value, 'attribute', tag_l, attr, {'data_uri': True})
                elif value.lower().startswith('javascript:'):
                    self._add(value, 'script', tag_l, attr)
                else:
                    self._add(value, 'attribute', tag_l, attr)

    def handle_endtag(self, tag: str) -> None:
        if tag.lower() == 'style':
            self._in_style = False
        if tag.lower() == 'script':
            self._in_script = False

    def handle_data(self, data: str) -> None:
        if self._in_style:
            self._scan_css(data, 'style', None)
        elif self._in_script:
            for hit in extract_uris(data, include_bare_domains=False):
                self._add(hit['uri'], 'script', 'script', None)
        elif self.extract_text_uris:
            for hit in extract_uris(data, include_bare_domains=False):
                self._add(hit['uri'], 'text', None, None)


# ===========================================================================
# HTML extractor (public)
# ===========================================================================

def extract_uris_from_html(
    html: str,
    *,
    base_url:                Optional[str] = None,
    extract_text_uris:       bool = False,
    include_data_uris:       bool = False,
    include_javascript_uris: bool = False,
    deduplicate:             bool = True,
    sort:                    bool = False,
) -> list[dict]:
    """
    Extract all URIs from a string of HTML.

    Parameters
    ----------
    html:
        Raw HTML string to parse.
    base_url:
        Base URL used to resolve relative references.
    extract_text_uris:
        Also extract URIs from visible text nodes.
    include_data_uris:
        Include ``data:`` URIs (excluded by default).
    include_javascript_uris:
        Include ``javascript:`` URIs.
    deduplicate:
        Suppress (resolved-uri, source) duplicates.
    sort:
        Return results sorted by URI string instead of document order.

    Returns
    -------
    list[dict] with keys: uri, raw_uri, scheme, kind, source, tag, attr
    """
    parser = _URIHTMLParser(base_url=base_url, extract_text_uris=extract_text_uris)
    parser.feed(html)
    results = parser.results
    if not include_data_uris:
        results = [r for r in results
                   if r.get('scheme') != 'data' and not r.get('data_uri')]
    if not include_javascript_uris:
        results = [r for r in results if r.get('scheme') != 'javascript']
    if sort:
        results.sort(key=lambda r: r['uri'])
    return results


# ===========================================================================
# RBL / URIBL checker
# ===========================================================================

# ---------------------------------------------------------------------------
# The three best free URI / domain reputation DNSBLs
#
#   URIBL (uribl.com)
#       Zones : multi.uribl.com
#       Listed: any A record returned (127.0.0.x, bitmask in last octet)
#       Bits  : 0x02 = black, 0x04 = grey, 0x08 = red, 0x40 = white (safe)
#       Note  : requires a paid subscription for automated/high-volume use;
#               free for manual / low-volume queries.
#
#   SURBL (surbl.org)
#       Zones : multi.surbl.org
#       Listed: any A record (127.0.0.x bitmask)
#       Bits  : 0x02 = phishing (PH), 0x04 = malware (MW),
#               0x08 = spam (AB), 0x10 = CR, 0x40 = abuse (ABUSE)
#
#   Spamhaus DBL (spamhaus.org)
#       Zones : dbl.spamhaus.org
#       Listed: returns 127.0.1.x
#               127.0.1.2 = spam domain
#               127.0.1.4 = phishing domain
#               127.0.1.5 = malware domain
#               127.0.1.6 = botnet C&C domain
#               127.0.1.102/103 = abused legit spam/phish (informational)
#       Note  : 127.0.1.255 = "do not query" / query error — treat as unknown
# ---------------------------------------------------------------------------

RBL_DEFINITIONS: dict[str, dict] = {
    'URIBL': {
        'zone':        'multi.uribl.com',
        'description': 'URIBL multi-zone (black/grey/red)',
        'type':        'bitmask',
        'bits': {
            0x02: 'black',
            0x04: 'grey',
            0x08: 'red',
            0x40: 'white',   # whitelisted — NOT a threat
        },
        'safe_bits':   {0x40},    # bits that mean "safe / whitelisted"
    },
    'SURBL': {
        'zone':        'multi.surbl.org',
        'description': 'SURBL multi-zone (phishing/malware/spam)',
        'type':        'bitmask',
        'bits': {
            0x02: 'phishing',
            0x04: 'malware',
            0x08: 'spam',
            0x10: 'CR',
            0x40: 'abuse',
        },
        'safe_bits':   set(),
    },
    'SpamhausDBL': {
        'zone':        'dbl.spamhaus.org',
        'description': 'Spamhaus Domain Block List',
        'type':        'value',
        'values': {
            '127.0.1.2':   'spam-domain',
            '127.0.1.4':   'phishing-domain',
            '127.0.1.5':   'malware-domain',
            '127.0.1.6':   'botnet-c2-domain',
            '127.0.1.102': 'abused-legit-spam',
            '127.0.1.103': 'abused-legit-phish',
            '127.0.1.255': 'query-error',        # must NOT be treated as listed
        },
        'error_values': {'127.0.1.255'},          # return as 'error', not 'listed'
    },
}

# Default ordered list of RBLs to check (most → least aggressive)
DEFAULT_RBLS = ['SpamhausDBL', 'URIBL', 'SURBL']

# Schemes whose hosts are worth querying; all others are skipped
_CHECKABLE_SCHEMES = {'http', 'https', 'ftp', 'ftps', 'sftp', 'ws', 'wss'}

# Private / loopback / link-local ranges — never query RBLs for these
_PRIVATE_RANGES = [
    ipaddress.ip_network('10.0.0.0/8'),
    ipaddress.ip_network('172.16.0.0/12'),
    ipaddress.ip_network('192.168.0.0/16'),
    ipaddress.ip_network('127.0.0.0/8'),
    ipaddress.ip_network('169.254.0.0/16'),
    ipaddress.ip_network('::1/128'),
    ipaddress.ip_network('fc00::/7'),
]


def _extract_host(uri: str) -> Optional[str]:
    """
    Return the bare hostname / IP from a URI string, stripped of port and
    trailing dots.  Returns None if the host cannot be determined or is
    obviously non-routable.
    """
    try:
        parsed = urlparse(uri if '://' in uri else 'scheme://' + uri)
        host   = (parsed.hostname or '').lower().rstrip('.')
    except Exception:
        return None

    if not host:
        return None

    # Reject private / loopback IPs
    try:
        addr = ipaddress.ip_address(host)
        if any(addr in net for net in _PRIVATE_RANGES):
            return None
    except ValueError:
        pass  # not an IP — that's fine

    return host


def _reverse_ip(ip: str) -> Optional[str]:
    """
    Return the dotted-decimal reverse of an IPv4 address for DNSBL lookup
    (e.g. '1.2.3.4' → '4.3.2.1').  Returns None for IPv6 (not supported
    by most domain-oriented DNSBLs).
    """
    try:
        addr = ipaddress.IPv4Address(ip)
        return '.'.join(reversed(addr.compressed.split('.')))
    except ValueError:
        return None   # IPv6 or invalid — skip


def _query_rbl(host: str, rbl_name: str, rbl: dict, timeout: float) -> dict:
    """
    Perform a single DNS A-record lookup for ``host`` against one RBL zone.

    Returns a dict:
        listed   – True | False | None (None = query error / inconclusive)
        raw      – list of IP strings returned by DNS (may be empty)
        labels   – list of human-readable threat labels matched
        error    – error message string, or None
    """
    # Build the lookup name: reversed-IP.zone  or  host.zone
    try:
        ipaddress.ip_address(host)          # raises if not an IP
        reversed_host = _reverse_ip(host)
        if reversed_host is None:
            return {'listed': None, 'raw': [], 'labels': [],
                    'error': 'IPv6 not supported by this RBL'}
        lookup = f'{reversed_host}.{rbl["zone"]}'
    except ValueError:
        lookup = f'{host}.{rbl["zone"]}'

    try:
        answers = socket.getaddrinfo(lookup, None, socket.AF_INET,
                                     socket.SOCK_DGRAM)
        raw_ips = list({ans[4][0] for ans in answers})
    except socket.gaierror as exc:
        # NXDOMAIN (errno 11001 / -2 / -3) → not listed
        # Other errors (timeout, SERVFAIL) → inconclusive
        code = exc.args[0] if exc.args else None
        nxdomain_codes = {
            socket.EAI_NONAME,          # -2  POSIX
            11001,                       # Windows WSAHOST_NOT_FOUND
            8,                           # EAI_NODATA on some systems
        }
        if code in nxdomain_codes or 'Name or service not known' in str(exc) \
                or 'No address associated' in str(exc) \
                or 'host not found' in str(exc).lower():
            return {'listed': False, 'raw': [], 'labels': [], 'error': None}
        return {'listed': None, 'raw': [], 'labels': [],
                'error': f'DNS error: {exc}'}

    if not raw_ips:
        return {'listed': False, 'raw': [], 'labels': [], 'error': None}

    # Decode the response according to RBL type
    labels:  list[str] = []
    listed:  bool      = False

    if rbl['type'] == 'bitmask':
        safe_bits = rbl.get('safe_bits', set())
        for ip in raw_ips:
            try:
                last_octet = int(ip.split('.')[-1])
            except (ValueError, IndexError):
                continue
            for bit, label in rbl['bits'].items():
                if last_octet & bit:
                    if bit not in safe_bits:
                        labels.append(label)
                        listed = True

    elif rbl['type'] == 'value':
        error_values = rbl.get('error_values', set())
        for ip in raw_ips:
            if ip in error_values:
                return {'listed': None, 'raw': raw_ips, 'labels': [],
                        'error': f'RBL returned error sentinel: {ip}'}
            label = rbl['values'].get(ip)
            if label:
                labels.append(label)
                listed = True

    return {'listed': listed, 'raw': raw_ips, 'labels': labels, 'error': None}


def check_uris_against_rbls(
    uris: list[dict] | list[str],
    *,
    rbls:             Optional[list[str]] = None,
    stop_on_positive: bool  = True,
    timeout:          float = 5.0,
    skip_schemes:     Optional[set[str]] = None,
) -> list[dict]:
    """
    Check each URI against up to three free DNS-based reputation lists
    (DNSBLs / URIBLs).

    Checking stops immediately for a given URI as soon as any RBL returns a
    positive (listed) result — there is no point querying further once a URI
    is known-bad.

    Parameters
    ----------
    uris:
        List of URI dicts (as returned by ``extract_uris`` /
        ``extract_uris_from_html``) **or** plain URI strings.
    rbls:
        Ordered list of RBL names to query.  Must be keys of
        ``RBL_DEFINITIONS``.  Defaults to ``DEFAULT_RBLS``
        (SpamhausDBL → URIBL → SURBL).
    stop_on_positive:
        If True (default), stop querying further RBLs for a URI the moment
        one returns ``listed=True``.
    timeout:
        DNS lookup timeout in seconds (best-effort; Python's socket module
        uses the OS resolver which may not honour per-query timeouts).
    skip_schemes:
        URI schemes to skip entirely (e.g. ``{'file', 'data'}``).
        Defaults to skipping everything not in ``_CHECKABLE_SCHEMES``.

    Returns
    -------
    list[dict] — one entry per input URI, with keys:

        uri            – the URI string
        host           – extracted hostname or IP (None if not extractable)
        is_listed      – True if listed on any RBL, False if clean,
                         None if all queries were inconclusive / errored
        verdict        – 'listed' | 'clean' | 'error' | 'skipped'
        checked_rbls   – RBL names actually queried (may be < full list)
        stopped_early  – True when loop halted after a positive
        rbl_results    – dict mapping RBL name → per-RBL result dict
                         (keys: listed, raw, labels, error)
    """
    rbl_names = rbls if rbls is not None else DEFAULT_RBLS
    # Validate requested RBL names
    unknown = [n for n in rbl_names if n not in RBL_DEFINITIONS]
    if unknown:
        raise ValueError(f'Unknown RBL name(s): {unknown}. '
                         f'Valid names: {list(RBL_DEFINITIONS)}')

    checkable = skip_schemes if skip_schemes is not None else _CHECKABLE_SCHEMES
    results: list[dict] = []

    for item in uris:
        uri_str = item if isinstance(item, str) else item.get('uri', '')
        scheme  = (item.get('scheme') if isinstance(item, dict)
                   else (urlparse(uri_str).scheme or '').lower())

        base_record: dict = {
            'uri':           uri_str,
            'host':          None,
            'is_listed':     None,
            'verdict':       'error',
            'checked_rbls':  [],
            'stopped_early': False,
            'rbl_results':   {},
        }

        # --- Skip non-HTTP-like schemes ---
        if scheme and scheme not in checkable:
            base_record['verdict'] = 'skipped'
            base_record['is_listed'] = False
            results.append(base_record)
            continue

        host = _extract_host(uri_str)
        base_record['host'] = host

        if not host:
            base_record['verdict'] = 'skipped'
            base_record['is_listed'] = False
            results.append(base_record)
            continue

        # --- Query each RBL in order ---
        any_listed      = False
        any_clean       = False
        any_error       = False
        stopped_early   = False

        for rbl_name in rbl_names:
            rbl = RBL_DEFINITIONS[rbl_name]
            res = _query_rbl(host, rbl_name, rbl, timeout)

            base_record['checked_rbls'].append(rbl_name)
            base_record['rbl_results'][rbl_name] = res

            if res['listed'] is True:
                any_listed = True
                if stop_on_positive:
                    stopped_early = (rbl_name != rbl_names[-1])
                    break                      # ← stop on first positive hit

            elif res['listed'] is False:
                any_clean = True

            else:                              # None = inconclusive / error
                any_error = True

        base_record['stopped_early'] = stopped_early

        if any_listed:
            base_record['is_listed'] = True
            base_record['verdict']   = 'listed'
        elif any_clean:
            base_record['is_listed'] = False
            base_record['verdict']   = 'clean'
        else:
            base_record['is_listed'] = None
            base_record['verdict']   = 'error'

        results.append(base_record)

    return results

Reply to Thread

Enter the verification text