"""
uri_extractor.py
================
Utilities for extracting URIs from plain text and from HTML, and for
checking extracted URIs against DNS-based URI blacklists (URIBLs / RBLs).
Public API
----------
extract_uris(text, **options) → list[dict]
extract_uris_from_html(html, **options) → list[dict]
check_uris_against_rbls(uris, **options) → list[dict]
Each URI dict from the extractors contains at minimum:
uri – resolved URI string
scheme – lowercased scheme, or None
kind – 'absolute' | 'scheme-only' | 'protocol-relative' |
'bare-domain' | 'attribute' | 'css' | 'script' | 'text'
HTML results also carry:
raw_uri – URI exactly as written in the source
source – 'attribute' | 'css' | 'script' | 'text'
tag – lowercase tag name, or None
attr – attribute name, or None
RBL result dicts (one per URI) contain:
uri – the URI that was checked
host – extracted hostname / IP
is_listed – True if listed on ANY checked RBL
verdict – 'listed' | 'clean' | 'error'
checked_rbls – list of RBL names actually queried
rbl_results – dict mapping RBL name → per-RBL result dict
stopped_early – True when checking halted after a positive hit
"""
from __future__ import annotations
import ipaddress
import re
import socket
import time
from html.parser import HTMLParser
from typing import Optional
from urllib.parse import urljoin, urlparse
# ===========================================================================
# Shared / internal helpers
# ===========================================================================
_SCHEME_PAT = r'(?:[a-zA-Z][a-zA-Z0-9+\-.]*)'
_URI_CHARS = r"""[^\s<>\[\]{}|\\^`"']"""
_KNOWN_SCHEMES = re.compile(
r'^(?:https?|ftps?|sftp|ssh|telnet|smtp|imap|pop3s?|ldaps?|'
r'mailto|xmpp|sip|sips|turn|stun|ws|wss|urn|data|file|git|svn|'
r'rtsp|rtsps|vnc|rdp|market|intent|tel|callto)$',
re.IGNORECASE,
)
_URI_RE = re.compile(
r'(?:'
r'(?P<absolute>' + _SCHEME_PAT + r'://' + _URI_CHARS + r'+)'
r'|'
r'(?P<schemeless>(?:mailto|tel|callto|urn|data):' + _URI_CHARS + r'+)'
r'|'
r'(?P<protrel>(?<![:/])//' + _URI_CHARS + r'+)'
r')',
re.IGNORECASE,
)
_TRAILING_JUNK = re.compile(r'[.,;:!?)>\]}\'"]+$')
def _trim_trailing(uri: str) -> str:
uri = _TRAILING_JUNK.sub('', uri)
for open_c, close_c in [('(', ')'), ('[', ']'), ('{', '}')]:
while uri.endswith(close_c) and uri.count(open_c) < uri.count(close_c):
uri = uri[:-1]
return _TRAILING_JUNK.sub('', uri)
_BARE_DOMAIN = re.compile(
r'(?<![/\w@.])'
r'(?:'
r'(?:www\d*|ftp)\.'
r'|'
r'(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)'
r'(?:com|net|org|edu|gov|mil|int'
r'|io|co|ai|app|dev|cloud|tech|online|store|shop|blog|media'
r'|[a-z]{2})\.'
r')'
r'[a-zA-Z0-9][a-zA-Z0-9-]*'
r'(?:\.[a-zA-Z]{2,})*'
r'(?::\d{1,5})?'
r'(?:/[^\s<>\[\]{}|\\^`"\']*)?',
re.IGNORECASE,
)
# ---------------------------------------------------------------------------
# HTML-specific patterns
# ---------------------------------------------------------------------------
_URI_ATTRS: dict[str, list[str]] = {
'a': ['href', 'ping', 'xlink:href'],
'area': ['href', 'ping'],
'audio': ['src'],
'base': ['href'],
'blockquote': ['cite'],
'button': ['formaction'],
'del': ['cite'],
'embed': ['src'],
'feimage': ['href', 'xlink:href'],
'form': ['action'],
'frame': ['src', 'longdesc'],
'iframe': ['src'],
'image': ['href', 'xlink:href'],
'img': ['src', 'srcset', 'longdesc', 'usemap'],
'input': ['src', 'formaction'],
'ins': ['cite'],
'link': ['href', 'imagesrcset'],
'meta': ['content'],
'object': ['data', 'usemap', 'classid', 'codebase'],
'param': ['value'],
'pattern': ['href', 'xlink:href'],
'q': ['cite'],
'script': ['src'],
'source': ['src', 'srcset'],
'track': ['src'],
'use': ['href', 'xlink:href'],
'video': ['src', 'poster'],
}
_GLOBAL_URI_ATTRS = {
'action', 'cite', 'data', 'formaction', 'href', 'manifest', 'ping',
'poster', 'src', 'usemap', 'xlink:href', 'xml:base', 'xmlns', 'itemid',
}
_SRCSET_RE = re.compile(r'([^\s,][^\s,]*[^\s,]|[^\s,])(?:\s+[\d.]+[wx])?')
_CSS_URL_RE = re.compile(r'url\(\s*["\']?([^"\')\s]+)["\']?\s*\)', re.IGNORECASE)
_CSS_IMPORT_RE = re.compile(
r'@import\s+(?:url\(\s*["\']?([^"\')\s]+)["\']?\s*\)|["\']([^"\']+)["\'])',
re.IGNORECASE,
)
_EVENT_ATTR = re.compile(r'^on[a-z]+$', re.IGNORECASE)
_JS_URI_RE = re.compile(r'\bjavascript:\s*[^\s"\'>;]+', re.IGNORECASE)
_META_URL_NAMES = {
'og:url', 'twitter:url', 'canonical',
'msapplication-starturl', 'msapplication-tileimage',
}
_META_URL_PROPS = {'og:url', 'og:image', 'og:audio', 'og:video'}
# ===========================================================================
# Plain-text extractor
# ===========================================================================
def extract_uris(
text: str,
*,
include_bare_domains: bool = False,
deduplicate: bool = True,
sort: bool = False,
) -> list[dict]:
"""
Extract all URIs from a plain-text string.
Parameters
----------
text:
Input string to scan.
include_bare_domains:
Off by default (higher false-positive rate).
deduplicate:
Keep only the first occurrence of each unique URI string.
sort:
Return results sorted alphabetically by URI instead of by position.
Returns
-------
list[dict] with keys: uri, scheme, kind, start, end
"""
results: list[dict] = []
seen: set[str] = set()
def _add(uri: str, kind: str, scheme: Optional[str], start: int) -> None:
uri = _trim_trailing(uri)
if not uri:
return
if deduplicate and uri in seen:
return
seen.add(uri)
results.append({
'uri': uri,
'scheme': scheme.lower() if scheme else None,
'kind': kind,
'start': start,
'end': start + len(uri),
})
for m in _URI_RE.finditer(text):
raw, start = m.group(0), m.start()
if m.group('absolute'):
scheme = re.match(_SCHEME_PAT, raw).group(0)
_add(raw, 'absolute', scheme, start)
elif m.group('schemeless'):
_add(raw, 'scheme-only', raw.split(':')[0], start)
elif m.group('protrel'):
_add(raw, 'protocol-relative', None, start)
if include_bare_domains:
for m in _BARE_DOMAIN.finditer(text):
raw = _trim_trailing(m.group(0))
start = m.start()
if not raw:
continue
if any(r['start'] <= start < r['end'] for r in results):
continue
if deduplicate and raw in seen:
continue
seen.add(raw)
results.append({
'uri': raw,
'scheme': None,
'kind': 'bare-domain',
'start': start,
'end': start + len(raw),
})
results.sort(key=lambda r: r['uri'] if sort else r['start'])
return results
# ===========================================================================
# HTML parser (internal)
# ===========================================================================
class _URIHTMLParser(HTMLParser):
def __init__(self, base_url: Optional[str], extract_text_uris: bool) -> None:
super().__init__(convert_charrefs=True)
self.base_url = base_url
self.extract_text_uris = extract_text_uris
self.results: list[dict] = []
self._seen: set[tuple] = set()
self._in_style = False
self._in_script = False
def _resolve(self, uri: str) -> str:
uri = uri.strip()
if self.base_url and uri and not uri.startswith(('data:', 'javascript:')):
try:
return urljoin(self.base_url, uri)
except ValueError:
pass
return uri
def _add(self, uri: str, source: str, tag: Optional[str],
attr: Optional[str], extra: Optional[dict] = None) -> None:
uri = uri.strip()
if not uri or uri == '#':
return
resolved = self._resolve(uri)
key = (resolved, source)
if key in self._seen:
return
self._seen.add(key)
try:
parsed = urlparse(resolved)
scheme = parsed.scheme.lower() if parsed.scheme else None
except ValueError:
scheme = None
record: dict = {
'uri': resolved,
'raw_uri': uri,
'scheme': scheme,
'kind': source,
'source': source,
'tag': tag,
'attr': attr,
}
if extra:
record.update(extra)
self.results.append(record)
def _scan_srcset(self, value: str, tag: str, attr: str) -> None:
for m in _SRCSET_RE.finditer(value):
self._add(m.group(1), 'attribute', tag, attr)
def _scan_css(self, css: str, tag: Optional[str], attr: Optional[str]) -> None:
for m in _CSS_URL_RE.finditer(css):
self._add(m.group(1), 'css', tag, attr)
for m in _CSS_IMPORT_RE.finditer(css):
url = m.group(1) or m.group(2)
if url:
self._add(url, 'css', tag, attr)
def handle_starttag(self, tag: str, attrs: list[tuple[str, Optional[str]]]) -> None:
tag_l = tag.lower()
self._in_style = tag_l == 'style'
self._in_script = tag_l == 'script'
attr_dict = {k.lower(): (v or '').strip() for k, v in attrs}
allowed = set(_URI_ATTRS.get(tag_l, [])) | _GLOBAL_URI_ATTRS
for attr, value in attr_dict.items():
if not value:
continue
if attr in ('srcset', 'imagesrcset'):
self._scan_srcset(value, tag_l, attr)
elif attr == 'style':
self._scan_css(value, tag_l, attr)
elif _EVENT_ATTR.match(attr):
for m in _JS_URI_RE.finditer(value):
self._add(m.group(0), 'script', tag_l, attr)
elif tag_l == 'meta' and attr == 'content':
name = attr_dict.get('name', '').lower()
prop = attr_dict.get('property', '').lower()
equiv = attr_dict.get('http-equiv', '').lower()
if name in _META_URL_NAMES or prop in _META_URL_PROPS:
self._add(value, 'attribute', tag_l, attr)
elif equiv == 'refresh':
m2 = re.search(r'url\s*=\s*([^\s;]+)', value, re.I)
self._add(m2.group(1) if m2 else value, 'attribute', tag_l, attr)
elif attr in allowed:
if value.startswith('data:'):
self._add(value, 'attribute', tag_l, attr, {'data_uri': True})
elif value.lower().startswith('javascript:'):
self._add(value, 'script', tag_l, attr)
else:
self._add(value, 'attribute', tag_l, attr)
def handle_endtag(self, tag: str) -> None:
if tag.lower() == 'style':
self._in_style = False
if tag.lower() == 'script':
self._in_script = False
def handle_data(self, data: str) -> None:
if self._in_style:
self._scan_css(data, 'style', None)
elif self._in_script:
for hit in extract_uris(data, include_bare_domains=False):
self._add(hit['uri'], 'script', 'script', None)
elif self.extract_text_uris:
for hit in extract_uris(data, include_bare_domains=False):
self._add(hit['uri'], 'text', None, None)
# ===========================================================================
# HTML extractor (public)
# ===========================================================================
def extract_uris_from_html(
html: str,
*,
base_url: Optional[str] = None,
extract_text_uris: bool = False,
include_data_uris: bool = False,
include_javascript_uris: bool = False,
deduplicate: bool = True,
sort: bool = False,
) -> list[dict]:
"""
Extract all URIs from a string of HTML.
Parameters
----------
html:
Raw HTML string to parse.
base_url:
Base URL used to resolve relative references.
extract_text_uris:
Also extract URIs from visible text nodes.
include_data_uris:
Include ``data:`` URIs (excluded by default).
include_javascript_uris:
Include ``javascript:`` URIs.
deduplicate:
Suppress (resolved-uri, source) duplicates.
sort:
Return results sorted by URI string instead of document order.
Returns
-------
list[dict] with keys: uri, raw_uri, scheme, kind, source, tag, attr
"""
parser = _URIHTMLParser(base_url=base_url, extract_text_uris=extract_text_uris)
parser.feed(html)
results = parser.results
if not include_data_uris:
results = [r for r in results
if r.get('scheme') != 'data' and not r.get('data_uri')]
if not include_javascript_uris:
results = [r for r in results if r.get('scheme') != 'javascript']
if sort:
results.sort(key=lambda r: r['uri'])
return results
# ===========================================================================
# RBL / URIBL checker
# ===========================================================================
# ---------------------------------------------------------------------------
# The three best free URI / domain reputation DNSBLs
#
# URIBL (uribl.com)
# Zones : multi.uribl.com
# Listed: any A record returned (127.0.0.x, bitmask in last octet)
# Bits : 0x02 = black, 0x04 = grey, 0x08 = red, 0x40 = white (safe)
# Note : requires a paid subscription for automated/high-volume use;
# free for manual / low-volume queries.
#
# SURBL (surbl.org)
# Zones : multi.surbl.org
# Listed: any A record (127.0.0.x bitmask)
# Bits : 0x02 = phishing (PH), 0x04 = malware (MW),
# 0x08 = spam (AB), 0x10 = CR, 0x40 = abuse (ABUSE)
#
# Spamhaus DBL (spamhaus.org)
# Zones : dbl.spamhaus.org
# Listed: returns 127.0.1.x
# 127.0.1.2 = spam domain
# 127.0.1.4 = phishing domain
# 127.0.1.5 = malware domain
# 127.0.1.6 = botnet C&C domain
# 127.0.1.102/103 = abused legit spam/phish (informational)
# Note : 127.0.1.255 = "do not query" / query error — treat as unknown
# ---------------------------------------------------------------------------
RBL_DEFINITIONS: dict[str, dict] = {
'URIBL': {
'zone': 'multi.uribl.com',
'description': 'URIBL multi-zone (black/grey/red)',
'type': 'bitmask',
'bits': {
0x02: 'black',
0x04: 'grey',
0x08: 'red',
0x40: 'white', # whitelisted — NOT a threat
},
'safe_bits': {0x40}, # bits that mean "safe / whitelisted"
},
'SURBL': {
'zone': 'multi.surbl.org',
'description': 'SURBL multi-zone (phishing/malware/spam)',
'type': 'bitmask',
'bits': {
0x02: 'phishing',
0x04: 'malware',
0x08: 'spam',
0x10: 'CR',
0x40: 'abuse',
},
'safe_bits': set(),
},
'SpamhausDBL': {
'zone': 'dbl.spamhaus.org',
'description': 'Spamhaus Domain Block List',
'type': 'value',
'values': {
'127.0.1.2': 'spam-domain',
'127.0.1.4': 'phishing-domain',
'127.0.1.5': 'malware-domain',
'127.0.1.6': 'botnet-c2-domain',
'127.0.1.102': 'abused-legit-spam',
'127.0.1.103': 'abused-legit-phish',
'127.0.1.255': 'query-error', # must NOT be treated as listed
},
'error_values': {'127.0.1.255'}, # return as 'error', not 'listed'
},
}
# Default ordered list of RBLs to check (most → least aggressive)
DEFAULT_RBLS = ['SpamhausDBL', 'URIBL', 'SURBL']
# Schemes whose hosts are worth querying; all others are skipped
_CHECKABLE_SCHEMES = {'http', 'https', 'ftp', 'ftps', 'sftp', 'ws', 'wss'}
# Private / loopback / link-local ranges — never query RBLs for these
_PRIVATE_RANGES = [
ipaddress.ip_network('10.0.0.0/8'),
ipaddress.ip_network('172.16.0.0/12'),
ipaddress.ip_network('192.168.0.0/16'),
ipaddress.ip_network('127.0.0.0/8'),
ipaddress.ip_network('169.254.0.0/16'),
ipaddress.ip_network('::1/128'),
ipaddress.ip_network('fc00::/7'),
]
def _extract_host(uri: str) -> Optional[str]:
"""
Return the bare hostname / IP from a URI string, stripped of port and
trailing dots. Returns None if the host cannot be determined or is
obviously non-routable.
"""
try:
parsed = urlparse(uri if '://' in uri else 'scheme://' + uri)
host = (parsed.hostname or '').lower().rstrip('.')
except Exception:
return None
if not host:
return None
# Reject private / loopback IPs
try:
addr = ipaddress.ip_address(host)
if any(addr in net for net in _PRIVATE_RANGES):
return None
except ValueError:
pass # not an IP — that's fine
return host
def _reverse_ip(ip: str) -> Optional[str]:
"""
Return the dotted-decimal reverse of an IPv4 address for DNSBL lookup
(e.g. '1.2.3.4' → '4.3.2.1'). Returns None for IPv6 (not supported
by most domain-oriented DNSBLs).
"""
try:
addr = ipaddress.IPv4Address(ip)
return '.'.join(reversed(addr.compressed.split('.')))
except ValueError:
return None # IPv6 or invalid — skip
def _query_rbl(host: str, rbl_name: str, rbl: dict, timeout: float) -> dict:
"""
Perform a single DNS A-record lookup for ``host`` against one RBL zone.
Returns a dict:
listed – True | False | None (None = query error / inconclusive)
raw – list of IP strings returned by DNS (may be empty)
labels – list of human-readable threat labels matched
error – error message string, or None
"""
# Build the lookup name: reversed-IP.zone or host.zone
try:
ipaddress.ip_address(host) # raises if not an IP
reversed_host = _reverse_ip(host)
if reversed_host is None:
return {'listed': None, 'raw': [], 'labels': [],
'error': 'IPv6 not supported by this RBL'}
lookup = f'{reversed_host}.{rbl["zone"]}'
except ValueError:
lookup = f'{host}.{rbl["zone"]}'
try:
answers = socket.getaddrinfo(lookup, None, socket.AF_INET,
socket.SOCK_DGRAM)
raw_ips = list({ans[4][0] for ans in answers})
except socket.gaierror as exc:
# NXDOMAIN (errno 11001 / -2 / -3) → not listed
# Other errors (timeout, SERVFAIL) → inconclusive
code = exc.args[0] if exc.args else None
nxdomain_codes = {
socket.EAI_NONAME, # -2 POSIX
11001, # Windows WSAHOST_NOT_FOUND
8, # EAI_NODATA on some systems
}
if code in nxdomain_codes or 'Name or service not known' in str(exc) \
or 'No address associated' in str(exc) \
or 'host not found' in str(exc).lower():
return {'listed': False, 'raw': [], 'labels': [], 'error': None}
return {'listed': None, 'raw': [], 'labels': [],
'error': f'DNS error: {exc}'}
if not raw_ips:
return {'listed': False, 'raw': [], 'labels': [], 'error': None}
# Decode the response according to RBL type
labels: list[str] = []
listed: bool = False
if rbl['type'] == 'bitmask':
safe_bits = rbl.get('safe_bits', set())
for ip in raw_ips:
try:
last_octet = int(ip.split('.')[-1])
except (ValueError, IndexError):
continue
for bit, label in rbl['bits'].items():
if last_octet & bit:
if bit not in safe_bits:
labels.append(label)
listed = True
elif rbl['type'] == 'value':
error_values = rbl.get('error_values', set())
for ip in raw_ips:
if ip in error_values:
return {'listed': None, 'raw': raw_ips, 'labels': [],
'error': f'RBL returned error sentinel: {ip}'}
label = rbl['values'].get(ip)
if label:
labels.append(label)
listed = True
return {'listed': listed, 'raw': raw_ips, 'labels': labels, 'error': None}
def check_uris_against_rbls(
uris: list[dict] | list[str],
*,
rbls: Optional[list[str]] = None,
stop_on_positive: bool = True,
timeout: float = 5.0,
skip_schemes: Optional[set[str]] = None,
) -> list[dict]:
"""
Check each URI against up to three free DNS-based reputation lists
(DNSBLs / URIBLs).
Checking stops immediately for a given URI as soon as any RBL returns a
positive (listed) result — there is no point querying further once a URI
is known-bad.
Parameters
----------
uris:
List of URI dicts (as returned by ``extract_uris`` /
``extract_uris_from_html``) **or** plain URI strings.
rbls:
Ordered list of RBL names to query. Must be keys of
``RBL_DEFINITIONS``. Defaults to ``DEFAULT_RBLS``
(SpamhausDBL → URIBL → SURBL).
stop_on_positive:
If True (default), stop querying further RBLs for a URI the moment
one returns ``listed=True``.
timeout:
DNS lookup timeout in seconds (best-effort; Python's socket module
uses the OS resolver which may not honour per-query timeouts).
skip_schemes:
URI schemes to skip entirely (e.g. ``{'file', 'data'}``).
Defaults to skipping everything not in ``_CHECKABLE_SCHEMES``.
Returns
-------
list[dict] — one entry per input URI, with keys:
uri – the URI string
host – extracted hostname or IP (None if not extractable)
is_listed – True if listed on any RBL, False if clean,
None if all queries were inconclusive / errored
verdict – 'listed' | 'clean' | 'error' | 'skipped'
checked_rbls – RBL names actually queried (may be < full list)
stopped_early – True when loop halted after a positive
rbl_results – dict mapping RBL name → per-RBL result dict
(keys: listed, raw, labels, error)
"""
rbl_names = rbls if rbls is not None else DEFAULT_RBLS
# Validate requested RBL names
unknown = [n for n in rbl_names if n not in RBL_DEFINITIONS]
if unknown:
raise ValueError(f'Unknown RBL name(s): {unknown}. '
f'Valid names: {list(RBL_DEFINITIONS)}')
checkable = skip_schemes if skip_schemes is not None else _CHECKABLE_SCHEMES
results: list[dict] = []
for item in uris:
uri_str = item if isinstance(item, str) else item.get('uri', '')
scheme = (item.get('scheme') if isinstance(item, dict)
else (urlparse(uri_str).scheme or '').lower())
base_record: dict = {
'uri': uri_str,
'host': None,
'is_listed': None,
'verdict': 'error',
'checked_rbls': [],
'stopped_early': False,
'rbl_results': {},
}
# --- Skip non-HTTP-like schemes ---
if scheme and scheme not in checkable:
base_record['verdict'] = 'skipped'
base_record['is_listed'] = False
results.append(base_record)
continue
host = _extract_host(uri_str)
base_record['host'] = host
if not host:
base_record['verdict'] = 'skipped'
base_record['is_listed'] = False
results.append(base_record)
continue
# --- Query each RBL in order ---
any_listed = False
any_clean = False
any_error = False
stopped_early = False
for rbl_name in rbl_names:
rbl = RBL_DEFINITIONS[rbl_name]
res = _query_rbl(host, rbl_name, rbl, timeout)
base_record['checked_rbls'].append(rbl_name)
base_record['rbl_results'][rbl_name] = res
if res['listed'] is True:
any_listed = True
if stop_on_positive:
stopped_early = (rbl_name != rbl_names[-1])
break # ← stop on first positive hit
elif res['listed'] is False:
any_clean = True
else: # None = inconclusive / error
any_error = True
base_record['stopped_early'] = stopped_early
if any_listed:
base_record['is_listed'] = True
base_record['verdict'] = 'listed'
elif any_clean:
base_record['is_listed'] = False
base_record['verdict'] = 'clean'
else:
base_record['is_listed'] = None
base_record['verdict'] = 'error'
results.append(base_record)
return results