Using Claude.AI for parsing URIs from email.
Problem reported by Douglas Foster - 5/16/2026 at 3:18 PM
Submitted
I asked Claude to generate a Python module to parse all URIs from either plain text or HRML source, then  send the results to the top three RBL sources, stopping after the first negative result.   I am blown away that it can do in seconds, for free, what I would struggle to complete at all.   Code has not been tested, but I am going to post it in the first reply anyway.   If you find a bug before I do, please let me know, but I expect that Claude's code will also be more bug-free than anything I could write.   If you have already written something similar, is its code is as good as yours?
Douglas Foster Replied
"""
uri_extractor.py
================
Utilities for extracting URIs from plain text and from HTML, and for
checking extracted URIs against DNS-based URI blacklists (URIBLs / RBLs).

Public API
----------
    extract_uris(text, **options)            → list[dict]
    extract_uris_from_html(html, **options)  → list[dict]
    check_uris_against_rbls(uris, **options) → list[dict]

Each URI dict from the extractors contains at minimum:

    uri       – resolved URI string
    scheme    – lowercased scheme, or None
    kind      – 'absolute' | 'scheme-only' | 'protocol-relative' |
                'bare-domain' | 'attribute' | 'css' | 'script' | 'text'

HTML results also carry:
    raw_uri   – URI exactly as written in the source
    source    – 'attribute' | 'css' | 'script' | 'text'
    tag       – lowercase tag name, or None
    attr      – attribute name, or None

RBL result dicts (one per URI) contain:
    uri            – the URI that was checked
    host           – extracted hostname / IP
    is_listed      – True if listed on ANY checked RBL
    verdict        – 'listed' | 'clean' | 'error'
    checked_rbls   – list of RBL names actually queried
    rbl_results    – dict mapping RBL name → per-RBL result dict
    stopped_early  – True when checking halted after a positive hit
"""

from __future__ import annotations

import ipaddress
import re
import socket
import time
from html.parser import HTMLParser
from typing import Optional
from urllib.parse import urljoin, urlparse


# ===========================================================================
# Shared / internal helpers
# ===========================================================================

_SCHEME_PAT = r'(?:[a-zA-Z][a-zA-Z0-9+\-.]*)'
_URI_CHARS  = r"""[^\s<>\[\]{}|\\^`"']"""

_KNOWN_SCHEMES = re.compile(
    r'^(?:https?|ftps?|sftp|ssh|telnet|smtp|imap|pop3s?|ldaps?|'
    r'mailto|xmpp|sip|sips|turn|stun|ws|wss|urn|data|file|git|svn|'
    r'rtsp|rtsps|vnc|rdp|market|intent|tel|callto)$',
    re.IGNORECASE,
)

_URI_RE = re.compile(
    r'(?:'
    r'(?P<absolute>'   + _SCHEME_PAT + r'://' + _URI_CHARS + r'+)'
    r'|'
    r'(?P<schemeless>(?:mailto|tel|callto|urn|data):' + _URI_CHARS + r'+)'
    r'|'
    r'(?P<protrel>(?<![:/])//' + _URI_CHARS + r'+)'
    r')',
    re.IGNORECASE,
)

_TRAILING_JUNK = re.compile(r'[.,;:!?)>\]}\'"]+$')


def _trim_trailing(uri: str) -> str:
    uri = _TRAILING_JUNK.sub('', uri)
    for open_c, close_c in [('(', ')'), ('[', ']'), ('{', '}')]:
        while uri.endswith(close_c) and uri.count(open_c) < uri.count(close_c):
            uri = uri[:-1]
    return _TRAILING_JUNK.sub('', uri)


_BARE_DOMAIN = re.compile(
    r'(?<![/\w@.])'
    r'(?:'
        r'(?:www\d*|ftp)\.'
        r'|'
        r'(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)'
        r'(?:com|net|org|edu|gov|mil|int'
        r'|io|co|ai|app|dev|cloud|tech|online|store|shop|blog|media'
        r'|[a-z]{2})\.'
    r')'
    r'[a-zA-Z0-9][a-zA-Z0-9-]*'
    r'(?:\.[a-zA-Z]{2,})*'
    r'(?::\d{1,5})?'
    r'(?:/[^\s<>\[\]{}|\\^`"\']*)?',
    re.IGNORECASE,
)

# ---------------------------------------------------------------------------
# HTML-specific patterns
# ---------------------------------------------------------------------------

_URI_ATTRS: dict[str, list[str]] = {
    'a':           ['href', 'ping', 'xlink:href'],
    'area':        ['href', 'ping'],
    'audio':       ['src'],
    'base':        ['href'],
    'blockquote':  ['cite'],
    'button':      ['formaction'],
    'del':         ['cite'],
    'embed':       ['src'],
    'feimage':     ['href', 'xlink:href'],
    'form':        ['action'],
    'frame':       ['src', 'longdesc'],
    'iframe':      ['src'],
    'image':       ['href', 'xlink:href'],
    'img':         ['src', 'srcset', 'longdesc', 'usemap'],
    'input':       ['src', 'formaction'],
    'ins':         ['cite'],
    'link':        ['href', 'imagesrcset'],
    'meta':        ['content'],
    'object':      ['data', 'usemap', 'classid', 'codebase'],
    'param':       ['value'],
    'pattern':     ['href', 'xlink:href'],
    'q':           ['cite'],
    'script':      ['src'],
    'source':      ['src', 'srcset'],
    'track':       ['src'],
    'use':         ['href', 'xlink:href'],
    'video':       ['src', 'poster'],
}

_GLOBAL_URI_ATTRS = {
    'action', 'cite', 'data', 'formaction', 'href', 'manifest', 'ping',
    'poster', 'src', 'usemap', 'xlink:href', 'xml:base', 'xmlns', 'itemid',
}

_SRCSET_RE     = re.compile(r'([^\s,][^\s,]*[^\s,]|[^\s,])(?:\s+[\d.]+[wx])?')
_CSS_URL_RE    = re.compile(r'url\(\s*["\']?([^"\')\s]+)["\']?\s*\)', re.IGNORECASE)
_CSS_IMPORT_RE = re.compile(
    r'@import\s+(?:url\(\s*["\']?([^"\')\s]+)["\']?\s*\)|["\']([^"\']+)["\'])',
    re.IGNORECASE,
)
_EVENT_ATTR    = re.compile(r'^on[a-z]+$', re.IGNORECASE)
_JS_URI_RE     = re.compile(r'\bjavascript:\s*[^\s"\'>;]+', re.IGNORECASE)

_META_URL_NAMES = {
    'og:url', 'twitter:url', 'canonical',
    'msapplication-starturl', 'msapplication-tileimage',
}
_META_URL_PROPS = {'og:url', 'og:image', 'og:audio', 'og:video'}


# ===========================================================================
# Plain-text extractor
# ===========================================================================

def extract_uris(
    text: str,
    *,
    include_bare_domains: bool = False,
    deduplicate: bool = True,
    sort: bool = False,
) -> list[dict]:
    """
    Extract all URIs from a plain-text string.

    Parameters
    ----------
    text:
        Input string to scan.
    include_bare_domains:
        Also detect unschemed bare hostnames such as ``www.example.com``.
        Off by default (higher false-positive rate).
    deduplicate:
        Keep only the first occurrence of each unique URI string.
    sort:
        Return results sorted alphabetically by URI instead of by position.

    Returns
    -------
    list[dict] with keys: uri, scheme, kind, start, end
    """
    results: list[dict] = []
    seen:    set[str]   = set()

    def _add(uri: str, kind: str, scheme: Optional[str], start: int) -> None:
        uri = _trim_trailing(uri)
        if not uri:
            return
        if deduplicate and uri in seen:
            return
        seen.add(uri)
        results.append({
            'uri':    uri,
            'scheme': scheme.lower() if scheme else None,
            'kind':   kind,
            'start':  start,
            'end':    start + len(uri),
        })

    for m in _URI_RE.finditer(text):
        raw, start = m.group(0), m.start()
        if m.group('absolute'):
            scheme = re.match(_SCHEME_PAT, raw).group(0)
            _add(raw, 'absolute', scheme, start)
        elif m.group('schemeless'):
            _add(raw, 'scheme-only', raw.split(':')[0], start)
        elif m.group('protrel'):
            _add(raw, 'protocol-relative', None, start)

    if include_bare_domains:
        for m in _BARE_DOMAIN.finditer(text):
            raw   = _trim_trailing(m.group(0))
            start = m.start()
            if not raw:
                continue
            if any(r['start'] <= start < r['end'] for r in results):
                continue
            if deduplicate and raw in seen:
                continue
            seen.add(raw)
            results.append({
                'uri':    raw,
                'scheme': None,
                'kind':   'bare-domain',
                'start':  start,
                'end':    start + len(raw),
            })

    results.sort(key=lambda r: r['uri'] if sort else r['start'])
    return results


# ===========================================================================
# HTML parser (internal)
# ===========================================================================

class _URIHTMLParser(HTMLParser):
    def __init__(self, base_url: Optional[str], extract_text_uris: bool) -> None:
        super().__init__(convert_charrefs=True)
        self.base_url          = base_url
        self.extract_text_uris = extract_text_uris
        self.results:  list[dict] = []
        self._seen:    set[tuple] = set()
        self._in_style  = False
        self._in_script = False

    def _resolve(self, uri: str) -> str:
        uri = uri.strip()
        if self.base_url and uri and not uri.startswith(('data:', 'javascript:')):
            try:
                return urljoin(self.base_url, uri)
            except ValueError:
                pass
        return uri

    def _add(self, uri: str, source: str, tag: Optional[str],
             attr: Optional[str], extra: Optional[dict] = None) -> None:
        uri = uri.strip()
        if not uri or uri == '#':
            return
        resolved = self._resolve(uri)
        key = (resolved, source)
        if key in self._seen:
            return
        self._seen.add(key)
        try:
            parsed = urlparse(resolved)
            scheme = parsed.scheme.lower() if parsed.scheme else None
        except ValueError:
            scheme = None
        record: dict = {
            'uri':     resolved,
            'raw_uri': uri,
            'scheme':  scheme,
            'kind':    source,
            'source':  source,
            'tag':     tag,
            'attr':    attr,
        }
        if extra:
            record.update(extra)
        self.results.append(record)

    def _scan_srcset(self, value: str, tag: str, attr: str) -> None:
        for m in _SRCSET_RE.finditer(value):
            self._add(m.group(1), 'attribute', tag, attr)

    def _scan_css(self, css: str, tag: Optional[str], attr: Optional[str]) -> None:
        for m in _CSS_URL_RE.finditer(css):
            self._add(m.group(1), 'css', tag, attr)
        for m in _CSS_IMPORT_RE.finditer(css):
            url = m.group(1) or m.group(2)
            if url:
                self._add(url, 'css', tag, attr)

    def handle_starttag(self, tag: str, attrs: list[tuple[str, Optional[str]]]) -> None:
        tag_l = tag.lower()
        self._in_style  = tag_l == 'style'
        self._in_script = tag_l == 'script'
        attr_dict = {k.lower(): (v or '').strip() for k, v in attrs}
        allowed   = set(_URI_ATTRS.get(tag_l, [])) | _GLOBAL_URI_ATTRS

        for attr, value in attr_dict.items():
            if not value:
                continue
            if attr in ('srcset', 'imagesrcset'):
                self._scan_srcset(value, tag_l, attr)
            elif attr == 'style':
                self._scan_css(value, tag_l, attr)
            elif _EVENT_ATTR.match(attr):
                for m in _JS_URI_RE.finditer(value):
                    self._add(m.group(0), 'script', tag_l, attr)
            elif tag_l == 'meta' and attr == 'content':
                name  = attr_dict.get('name', '').lower()
                prop  = attr_dict.get('property', '').lower()
                equiv = attr_dict.get('http-equiv', '').lower()
                if name in _META_URL_NAMES or prop in _META_URL_PROPS:
                    self._add(value, 'attribute', tag_l, attr)
                elif equiv == 'refresh':
                    m2 = re.search(r'url\s*=\s*([^\s;]+)', value, re.I)
                    self._add(m2.group(1) if m2 else value, 'attribute', tag_l, attr)
            elif attr in allowed:
                if value.startswith('data:'):
                    self._add(value, 'attribute', tag_l, attr, {'data_uri': True})
                elif value.lower().startswith('javascript:'):
                    self._add(value, 'script', tag_l, attr)
                else:
                    self._add(value, 'attribute', tag_l, attr)

    def handle_endtag(self, tag: str) -> None:
        if tag.lower() == 'style':
            self._in_style = False
        if tag.lower() == 'script':
            self._in_script = False

    def handle_data(self, data: str) -> None:
        if self._in_style:
            self._scan_css(data, 'style', None)
        elif self._in_script:
            for hit in extract_uris(data, include_bare_domains=False):
                self._add(hit['uri'], 'script', 'script', None)
        elif self.extract_text_uris:
            for hit in extract_uris(data, include_bare_domains=False):
                self._add(hit['uri'], 'text', None, None)


# ===========================================================================
# HTML extractor (public)
# ===========================================================================

def extract_uris_from_html(
    html: str,
    *,
    base_url:                Optional[str] = None,
    extract_text_uris:       bool = False,
    include_data_uris:       bool = False,
    include_javascript_uris: bool = False,
    deduplicate:             bool = True,
    sort:                    bool = False,
) -> list[dict]:
    """
    Extract all URIs from a string of HTML.

    Parameters
    ----------
    html:
        Raw HTML string to parse.
    base_url:
        Base URL used to resolve relative references.
    extract_text_uris:
        Also extract URIs from visible text nodes.
    include_data_uris:
        Include ``data:`` URIs (excluded by default).
    include_javascript_uris:
        Include ``javascript:`` URIs.
    deduplicate:
        Suppress (resolved-uri, source) duplicates.
    sort:
        Return results sorted by URI string instead of document order.

    Returns
    -------
    list[dict] with keys: uri, raw_uri, scheme, kind, source, tag, attr
    """
    parser = _URIHTMLParser(base_url=base_url, extract_text_uris=extract_text_uris)
    parser.feed(html)
    results = parser.results
    if not include_data_uris:
        results = [r for r in results
                   if r.get('scheme') != 'data' and not r.get('data_uri')]
    if not include_javascript_uris:
        results = [r for r in results if r.get('scheme') != 'javascript']
    if sort:
        results.sort(key=lambda r: r['uri'])
    return results


# ===========================================================================
# RBL / URIBL checker
# ===========================================================================

# ---------------------------------------------------------------------------
# The three best free URI / domain reputation DNSBLs
#
#   URIBL (uribl.com)
#       Zones : multi.uribl.com
#       Listed: any A record returned (127.0.0.x, bitmask in last octet)
#       Bits  : 0x02 = black, 0x04 = grey, 0x08 = red, 0x40 = white (safe)
#       Note  : requires a paid subscription for automated/high-volume use;
#               free for manual / low-volume queries.
#
#   SURBL (surbl.org)
#       Zones : multi.surbl.org
#       Listed: any A record (127.0.0.x bitmask)
#       Bits  : 0x02 = phishing (PH), 0x04 = malware (MW),
#               0x08 = spam (AB), 0x10 = CR, 0x40 = abuse (ABUSE)
#
#   Spamhaus DBL (spamhaus.org)
#       Zones : dbl.spamhaus.org
#       Listed: returns 127.0.1.x
#               127.0.1.2 = spam domain
#               127.0.1.4 = phishing domain
#               127.0.1.5 = malware domain
#               127.0.1.6 = botnet C&C domain
#               127.0.1.102/103 = abused legit spam/phish (informational)
#       Note  : 127.0.1.255 = "do not query" / query error — treat as unknown
# ---------------------------------------------------------------------------

RBL_DEFINITIONS: dict[str, dict] = {
    'URIBL': {
        'zone':        'multi.uribl.com',
        'description': 'URIBL multi-zone (black/grey/red)',
        'type':        'bitmask',
        'bits': {
            0x02: 'black',
            0x04: 'grey',
            0x08: 'red',
            0x40: 'white',   # whitelisted — NOT a threat
        },
        'safe_bits':   {0x40},    # bits that mean "safe / whitelisted"
    },
    'SURBL': {
        'zone':        'multi.surbl.org',
        'description': 'SURBL multi-zone (phishing/malware/spam)',
        'type':        'bitmask',
        'bits': {
            0x02: 'phishing',
            0x04: 'malware',
            0x08: 'spam',
            0x10: 'CR',
            0x40: 'abuse',
        },
        'safe_bits':   set(),
    },
    'SpamhausDBL': {
        'zone':        'dbl.spamhaus.org',
        'description': 'Spamhaus Domain Block List',
        'type':        'value',
        'values': {
            '127.0.1.2':   'spam-domain',
            '127.0.1.4':   'phishing-domain',
            '127.0.1.5':   'malware-domain',
            '127.0.1.6':   'botnet-c2-domain',
            '127.0.1.102': 'abused-legit-spam',
            '127.0.1.103': 'abused-legit-phish',
            '127.0.1.255': 'query-error',        # must NOT be treated as listed
        },
        'error_values': {'127.0.1.255'},          # return as 'error', not 'listed'
    },
}

# Default ordered list of RBLs to check (most → least aggressive)
DEFAULT_RBLS = ['SpamhausDBL', 'URIBL', 'SURBL']

# Schemes whose hosts are worth querying; all others are skipped
_CHECKABLE_SCHEMES = {'http', 'https', 'ftp', 'ftps', 'sftp', 'ws', 'wss'}

# Private / loopback / link-local ranges — never query RBLs for these
_PRIVATE_RANGES = [
    ipaddress.ip_network('10.0.0.0/8'),
    ipaddress.ip_network('172.16.0.0/12'),
    ipaddress.ip_network('192.168.0.0/16'),
    ipaddress.ip_network('127.0.0.0/8'),
    ipaddress.ip_network('169.254.0.0/16'),
    ipaddress.ip_network('::1/128'),
    ipaddress.ip_network('fc00::/7'),
]


def _extract_host(uri: str) -> Optional[str]:
    """
    Return the bare hostname / IP from a URI string, stripped of port and
    trailing dots.  Returns None if the host cannot be determined or is
    obviously non-routable.
    """
    try:
        parsed = urlparse(uri if '://' in uri else 'scheme://' + uri)
        host   = (parsed.hostname or '').lower().rstrip('.')
    except Exception:
        return None

    if not host:
        return None

    # Reject private / loopback IPs
    try:
        addr = ipaddress.ip_address(host)
        if any(addr in net for net in _PRIVATE_RANGES):
            return None
    except ValueError:
        pass  # not an IP — that's fine

    return host


def _reverse_ip(ip: str) -> Optional[str]:
    """
    Return the dotted-decimal reverse of an IPv4 address for DNSBL lookup
    (e.g. '1.2.3.4' → '4.3.2.1').  Returns None for IPv6 (not supported
    by most domain-oriented DNSBLs).
    """
    try:
        addr = ipaddress.IPv4Address(ip)
        return '.'.join(reversed(addr.compressed.split('.')))
    except ValueError:
        return None   # IPv6 or invalid — skip


def _query_rbl(host: str, rbl_name: str, rbl: dict, timeout: float) -> dict:
    """
    Perform a single DNS A-record lookup for ``host`` against one RBL zone.

    Returns a dict:
        listed   – True | False | None (None = query error / inconclusive)
        raw      – list of IP strings returned by DNS (may be empty)
        labels   – list of human-readable threat labels matched
        error    – error message string, or None
    """
    # Build the lookup name: reversed-IP.zone  or  host.zone
    try:
        ipaddress.ip_address(host)          # raises if not an IP
        reversed_host = _reverse_ip(host)
        if reversed_host is None:
            return {'listed': None, 'raw': [], 'labels': [],
                    'error': 'IPv6 not supported by this RBL'}
        lookup = f'{reversed_host}.{rbl["zone"]}'
    except ValueError:
        lookup = f'{host}.{rbl["zone"]}'

    try:
        answers = socket.getaddrinfo(lookup, None, socket.AF_INET,
                                     socket.SOCK_DGRAM)
        raw_ips = list({ans[4][0] for ans in answers})
    except socket.gaierror as exc:
        # NXDOMAIN (errno 11001 / -2 / -3) → not listed
        # Other errors (timeout, SERVFAIL) → inconclusive
        code = exc.args[0] if exc.args else None
        nxdomain_codes = {
            socket.EAI_NONAME,          # -2  POSIX
            11001,                       # Windows WSAHOST_NOT_FOUND
            8,                           # EAI_NODATA on some systems
        }
        if code in nxdomain_codes or 'Name or service not known' in str(exc) \
                or 'No address associated' in str(exc) \
                or 'host not found' in str(exc).lower():
            return {'listed': False, 'raw': [], 'labels': [], 'error': None}
        return {'listed': None, 'raw': [], 'labels': [],
                'error': f'DNS error: {exc}'}

    if not raw_ips:
        return {'listed': False, 'raw': [], 'labels': [], 'error': None}

    # Decode the response according to RBL type
    labels:  list[str] = []
    listed:  bool      = False

    if rbl['type'] == 'bitmask':
        safe_bits = rbl.get('safe_bits', set())
        for ip in raw_ips:
            try:
                last_octet = int(ip.split('.')[-1])
            except (ValueError, IndexError):
                continue
            for bit, label in rbl['bits'].items():
                if last_octet & bit:
                    if bit not in safe_bits:
                        labels.append(label)
                        listed = True

    elif rbl['type'] == 'value':
        error_values = rbl.get('error_values', set())
        for ip in raw_ips:
            if ip in error_values:
                return {'listed': None, 'raw': raw_ips, 'labels': [],
                        'error': f'RBL returned error sentinel: {ip}'}
            label = rbl['values'].get(ip)
            if label:
                labels.append(label)
                listed = True

    return {'listed': listed, 'raw': raw_ips, 'labels': labels, 'error': None}


def check_uris_against_rbls(
    uris: list[dict] | list[str],
    *,
    rbls:             Optional[list[str]] = None,
    stop_on_positive: bool  = True,
    timeout:          float = 5.0,
    skip_schemes:     Optional[set[str]] = None,
) -> list[dict]:
    """
    Check each URI against up to three free DNS-based reputation lists
    (DNSBLs / URIBLs).

    Checking stops immediately for a given URI as soon as any RBL returns a
    positive (listed) result — there is no point querying further once a URI
    is known-bad.

    Parameters
    ----------
    uris:
        List of URI dicts (as returned by ``extract_uris`` /
        ``extract_uris_from_html``) **or** plain URI strings.
    rbls:
        Ordered list of RBL names to query.  Must be keys of
        ``RBL_DEFINITIONS``.  Defaults to ``DEFAULT_RBLS``
        (SpamhausDBL → URIBL → SURBL).
    stop_on_positive:
        If True (default), stop querying further RBLs for a URI the moment
        one returns ``listed=True``.
    timeout:
        DNS lookup timeout in seconds (best-effort; Python's socket module
        uses the OS resolver which may not honour per-query timeouts).
    skip_schemes:
        URI schemes to skip entirely (e.g. ``{'file', 'data'}``).
        Defaults to skipping everything not in ``_CHECKABLE_SCHEMES``.

    Returns
    -------
    list[dict] — one entry per input URI, with keys:

        uri            – the URI string
        host           – extracted hostname or IP (None if not extractable)
        is_listed      – True if listed on any RBL, False if clean,
                         None if all queries were inconclusive / errored
        verdict        – 'listed' | 'clean' | 'error' | 'skipped'
        checked_rbls   – RBL names actually queried (may be < full list)
        stopped_early  – True when loop halted after a positive
        rbl_results    – dict mapping RBL name → per-RBL result dict
                         (keys: listed, raw, labels, error)
    """
    rbl_names = rbls if rbls is not None else DEFAULT_RBLS
    # Validate requested RBL names
    unknown = [n for n in rbl_names if n not in RBL_DEFINITIONS]
    if unknown:
        raise ValueError(f'Unknown RBL name(s): {unknown}. '
                         f'Valid names: {list(RBL_DEFINITIONS)}')

    checkable = skip_schemes if skip_schemes is not None else _CHECKABLE_SCHEMES
    results: list[dict] = []

    for item in uris:
        uri_str = item if isinstance(item, str) else item.get('uri', '')
        scheme  = (item.get('scheme') if isinstance(item, dict)
                   else (urlparse(uri_str).scheme or '').lower())

        base_record: dict = {
            'uri':           uri_str,
            'host':          None,
            'is_listed':     None,
            'verdict':       'error',
            'checked_rbls':  [],
            'stopped_early': False,
            'rbl_results':   {},
        }

        # --- Skip non-HTTP-like schemes ---
        if scheme and scheme not in checkable:
            base_record['verdict'] = 'skipped'
            base_record['is_listed'] = False
            results.append(base_record)
            continue

        host = _extract_host(uri_str)
        base_record['host'] = host

        if not host:
            base_record['verdict'] = 'skipped'
            base_record['is_listed'] = False
            results.append(base_record)
            continue

        # --- Query each RBL in order ---
        any_listed      = False
        any_clean       = False
        any_error       = False
        stopped_early   = False

        for rbl_name in rbl_names:
            rbl = RBL_DEFINITIONS[rbl_name]
            res = _query_rbl(host, rbl_name, rbl, timeout)

            base_record['checked_rbls'].append(rbl_name)
            base_record['rbl_results'][rbl_name] = res

            if res['listed'] is True:
                any_listed = True
                if stop_on_positive:
                    stopped_early = (rbl_name != rbl_names[-1])
                    break                      # ← stop on first positive hit

            elif res['listed'] is False:
                any_clean = True

            else:                              # None = inconclusive / error
                any_error = True

        base_record['stopped_early'] = stopped_early

        if any_listed:
            base_record['is_listed'] = True
            base_record['verdict']   = 'listed'
        elif any_clean:
            base_record['is_listed'] = False
            base_record['verdict']   = 'clean'
        else:
            base_record['is_listed'] = None
            base_record['verdict']   = 'error'

        results.append(base_record)

    return results
echoDreamz Replied
Rspamd provides this out of the box as well, no custom scripts needed, though, you do have to setup rspamd :)
Douglas Foster Replied
That may be why none shared my excitement.

 I have been using a commercial appliance to do URI checking   But after realizing how much their overall capabilities fell short of my expectations, and being unable to buy it elsewhere, I started writing my own.   This feature has been one of the last hurdles for me.

Heimir Eidskrem Replied
@echodreamz
how do you turn that on in rspamd? 

Just installed it today so I have a lot learn.

echoDreamz Replied
Sébastien Riccio Replied
It's called URIBL. Rspamd has some built-in preconfigured URIBLs and it is quite efficient.


However be aware that most of these URIBLs providers have rate limits when they are used freely, so if you have a lot of inbound mail traffic, your requests will be throttled.

In this case you'll see this kind of rspamd symbol in the rspamd check results (*BL_BLOCKED), meaning that rspamd wasn't able to obtain a result.

URIBL_BLOCKED {
  description = "URIBL.com: query refused, likely due to policy/overusage";
}
Sébastien Riccio
System & Network Admin

echoDreamz Replied
We use Spamhaus and have had very very good results with them.
Sabatino Replied
Sorry, but I'm missing something.
Isn't it integrated into SM's antispam module?

settings - antispam - uribls?
Sabatino Traini
      Chief Information Officer
Genial s.r.l. 
Martinsicuro - Italy

Reply to Thread

Enter the verification text