Source code for parser_header.parser

"""
Core parser module for HTTP headers and cookies.

Author: Hadi Cahyadi <cumulus13@gmail.com>
"""

from __future__ import annotations
import re
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Union, Any, Iterator, overload
from urllib.parse import unquote
from .exceptions import InvalidHeaderError, InvalidCookieError, EncodingError

InputType = Union[str, bytes]

[docs] @dataclass class HeaderValue: """Represents a parsed header value with optional parameters.""" value: str params: Dict[str, str] = field(default_factory=dict) def __str__(self) -> str: if not self.params: return self.value params_str = "; ".join(f"{k}={v}" for k, v in self.params.items()) return f"{self.value}; {params_str}" def __repr__(self) -> str: return f"HeaderValue(value={self.value!r}, params={self.params!r})"
[docs] class CookieParser: """Parser for HTTP cookies with multiple extraction methods."""
[docs] def __init__(self, data: Optional[InputType] = None, **kwargs: str): """ Initialize CookieParser. Args: data: Raw header string/bytes containing cookie entries **kwargs: Cookie key-value pairs to set directly e.g., CookieParser(session="abc", user="john") """ self._raw_data: Optional[str] = None self._cookies: Dict[str, str] = {} if data is not None: self.parse(data) if kwargs: self.set(**kwargs)
def _normalize_input(self, data: InputType) -> str: """Convert bytes to string if necessary.""" if isinstance(data, bytes): try: return data.decode('utf-8') except UnicodeDecodeError: try: return data.decode('latin-1') except UnicodeDecodeError as e: raise EncodingError(f"Failed to decode input: {e}") return data def _normalize_key(self, key: str) -> str: """Normalize cookie key (convert underscores to hyphens).""" return key.replace('_', '-')
[docs] def parse(self, data: Optional[InputType] = None, **kwargs: str) -> Dict[str, str]: """ Parse cookies from raw header data and/or kwargs. Args: data: Raw header string/bytes containing cookie entries **kwargs: Additional cookies as key=value pairs Returns: Dict of cookie name-value pairs """ if data is not None: self._raw_data = self._normalize_input(data) lines = self._raw_data.strip().split('\n') for line in lines: line = line.strip() if not line: continue if line.lower().startswith('cookie:'): cookie_value = line[7:].strip() self._parse_cookie_string(cookie_value) elif '=' in line and ':' not in line.split('=')[0]: self._parse_cookie_string(line) if kwargs: self.set(**kwargs) return self._cookies
def _parse_cookie_string(self, cookie_str: str) -> None: """Parse a cookie string (name=value or name=value; name2=value2).""" parts = cookie_str.split(';') for part in parts: part = part.strip() if '=' in part: idx = part.index('=') name = part[:idx].strip() value = part[idx+1:].strip() if name: self._cookies[name] = value
[docs] def set(self, name: Optional[str] = None, value: Optional[str] = None, **kwargs: str) -> 'CookieParser': """ Set cookie(s). Args: name: Cookie name (optional if using kwargs) value: Cookie value (required if name is provided) **kwargs: Cookie key=value pairs Underscores in keys are converted to hyphens Returns: self for chaining Examples: cookies.set('session', 'abc123') cookies.set(session='abc', user='john') cookies.set('token', 'xyz', refresh_token='123') """ if name is not None: if value is None: raise ValueError("value is required when name is provided") self._cookies[name] = value for k, v in kwargs.items(): key = self._normalize_key(k) self._cookies[key] = str(v) return self
[docs] def remove(self, *names: str) -> 'CookieParser': """Remove cookie(s) by name.""" for name in names: self._cookies.pop(name, None) self._cookies.pop(self._normalize_key(name), None) return self
[docs] def clear(self) -> 'CookieParser': """Clear all cookies.""" self._cookies.clear() return self
[docs] def update(self, data: Optional[Dict[str, str]] = None, **kwargs: str) -> 'CookieParser': """ Update cookies from dict and/or kwargs. Args: data: Dictionary of cookies **kwargs: Additional cookies Returns: self for chaining """ if data: self._cookies.update(data) if kwargs: self.set(**kwargs) return self
[docs] def to_dict(self, data: Optional[InputType] = None, **kwargs: str) -> Dict[str, str]: """ Get cookies as dictionary. Args: data: Optional new data to parse first **kwargs: Additional cookies to include Returns: Dict of cookie name-value pairs """ if data is not None or kwargs: self.parse(data, **kwargs) return self._cookies.copy()
[docs] def get(self, name: str, default: Optional[str] = None) -> Optional[str]: """Get a specific cookie value.""" return self._cookies.get(name, self._cookies.get(self._normalize_key(name), default))
[docs] def __getitem__(self, name: str) -> str: """Get cookie by name with bracket notation.""" if name in self._cookies: return self._cookies[name] normalized = self._normalize_key(name) if normalized in self._cookies: return self._cookies[normalized] raise KeyError(f"Cookie '{name}' not found")
[docs] def __setitem__(self, name: str, value: str) -> None: """Set cookie with bracket notation.""" self._cookies[name] = value
[docs] def __delitem__(self, name: str) -> None: """Delete cookie with bracket notation.""" if name in self._cookies: del self._cookies[name] elif self._normalize_key(name) in self._cookies: del self._cookies[self._normalize_key(name)] else: raise KeyError(f"Cookie '{name}' not found")
[docs] def __contains__(self, name: str) -> bool: return name in self._cookies or self._normalize_key(name) in self._cookies
[docs] def __iter__(self) -> Iterator[str]: return iter(self._cookies)
[docs] def __len__(self) -> int: return len(self._cookies)
def __repr__(self) -> str: return f"CookieParser({self._cookies!r})"
[docs] def keys(self) -> List[str]: return list(self._cookies.keys())
[docs] def values(self) -> List[str]: return list(self._cookies.values())
[docs] def items(self) -> List[tuple]: return list(self._cookies.items())
[docs] @classmethod def from_dict(cls, cookies: Dict[str, str]) -> 'CookieParser': """Create CookieParser from dictionary.""" parser = cls() parser._cookies = cookies.copy() return parser
[docs] @classmethod def from_kwargs(cls, **kwargs: str) -> 'CookieParser': """Create CookieParser from keyword arguments.""" return cls(**kwargs)
[docs] class HeaderParser: """ Comprehensive HTTP header parser supporting all standard and custom headers. """ SPECIAL_HEADERS = { 'content-type', 'accept', 'cache-control', 'content-disposition', 'authorization', 'www-authenticate', 'proxy-authenticate', 'set-cookie', 'cookie' } BOOLEAN_HEADERS = {'dnt', 'sec-gpc', 'upgrade-insecure-requests'} SEC_CH_UA_PATTERN = re.compile(r'"([^"]+)";v="([^"]+)"')
[docs] def __init__(self, data: Optional[InputType] = None, **kwargs: Any): """ Initialize HeaderParser. Args: data: Raw header string/bytes **kwargs: Header key-value pairs to set directly Underscores in keys are converted to hyphens e.g., HeaderParser(content_type="application/json", user_agent="Mozilla") """ self._raw_data: Optional[str] = None self._headers: Dict[str, Any] = {} self._cookies: CookieParser = CookieParser() self._raw_cookies: List[str] = [] if data is not None: self.parse(data) if kwargs: self.set(**kwargs)
def _normalize_input(self, data: InputType) -> str: """Convert bytes to string.""" if isinstance(data, bytes): try: return data.decode('utf-8') except UnicodeDecodeError: try: return data.decode('latin-1') except UnicodeDecodeError as e: raise EncodingError(f"Failed to decode: {e}") return data def _normalize_key(self, key: str) -> str: """Normalize header key (lowercase, convert underscores to hyphens).""" return key.lower().replace('_', '-')
[docs] def parse(self, data: Optional[InputType] = None, **kwargs: Any) -> Dict[str, Any]: """ Parse raw HTTP headers into structured dictionary. Args: data: Raw header string/bytes **kwargs: Additional headers as key=value pairs Underscores in keys are converted to hyphens Returns: Dict with parsed headers """ if data is not None: self._raw_data = self._normalize_input(data) self._raw_cookies = [] cookie_lines = [] lines = self._raw_data.strip().split('\n') for line in lines: line = line.strip() if not line: continue if ':' not in line: continue idx = line.index(':') name = line[:idx].strip() value = line[idx+1:].strip() name_lower = name.lower() if name_lower == 'cookie': cookie_lines.append(f"cookie: {value}") self._raw_cookies.append(value) self._add_header(name_lower, value) else: parsed_value = self._parse_header_value(name_lower, value) self._add_header(name_lower, parsed_value) if cookie_lines: self._cookies.parse('\n'.join(cookie_lines)) if kwargs: self.set(**kwargs) return self._headers
[docs] def set(self, name: Optional[str] = None, value: Optional[Any] = None, **kwargs: Any) -> 'HeaderParser': """ Set header(s). Args: name: Header name (optional if using kwargs) value: Header value (required if name is provided) **kwargs: Header key=value pairs Underscores in keys are converted to hyphens Returns: self for chaining Examples: parser.set('Content-Type', 'application/json') parser.set(content_type='application/json', user_agent='Mozilla') parser.set('X-Custom', 'value', accept='*/*') """ if name is not None: if value is None: raise ValueError("value is required when name is provided") name_lower = self._normalize_key(name) if name_lower == 'cookie': if isinstance(value, dict): self._cookies.update(value) else: self._cookies.parse(f"cookie: {value}") self._headers[name_lower] = self._parse_header_value(name_lower, str(value)) for k, v in kwargs.items(): key = self._normalize_key(k) if key == 'cookie': if isinstance(v, dict): self._cookies.update(v) else: self._cookies.parse(f"cookie: {v}") self._headers[key] = self._parse_header_value(key, str(v)) return self
[docs] def set_raw(self, name: Optional[str] = None, value: Optional[Any] = None, **kwargs: Any) -> 'HeaderParser': """ Set header(s) without parsing the value. Args: name: Header name value: Header value (stored as-is) **kwargs: Header key=value pairs Returns: self for chaining """ if name is not None: if value is None: raise ValueError("value is required when name is provided") self._headers[self._normalize_key(name)] = value for k, v in kwargs.items(): self._headers[self._normalize_key(k)] = v return self
[docs] def remove(self, *names: str) -> 'HeaderParser': """Remove header(s) by name.""" for name in names: key = self._normalize_key(name) self._headers.pop(key, None) return self
[docs] def clear(self) -> 'HeaderParser': """Clear all headers.""" self._headers.clear() self._cookies.clear() return self
[docs] def update(self, data: Optional[Dict[str, Any]] = None, **kwargs: Any) -> 'HeaderParser': """ Update headers from dict and/or kwargs. Args: data: Dictionary of headers **kwargs: Additional headers Returns: self for chaining """ if data: for k, v in data.items(): self.set(k, v) if kwargs: self.set(**kwargs) return self
def _add_header(self, name: str, value: Any) -> None: """Add header, handling multiple values for same header.""" if name in self._headers: existing = self._headers[name] if isinstance(existing, list): existing.append(value) else: self._headers[name] = [existing, value] else: self._headers[name] = value def _parse_header_value(self, name: str, value: str) -> Any: """Parse header value based on header type.""" if name in self.BOOLEAN_HEADERS: return self._parse_boolean(value) if name.startswith('sec-ch-ua'): return self._parse_sec_ch_ua(name, value) if name in ('content-type', 'content-disposition'): return self._parse_parameterized(value) if name == 'accept': return self._parse_accept(value) if name == 'accept-language': return self._parse_accept_language(value) if name == 'accept-encoding': return self._parse_accept_encoding(value) if name == 'cache-control': return self._parse_cache_control(value) if name == 'priority': return self._parse_priority(value) if name == 'content-length': try: return int(value) except ValueError: return value return value def _parse_boolean(self, value: str) -> bool: """Parse boolean-like header values.""" return value.lower() in ('1', 'true', 'yes', '?1') def _parse_sec_ch_ua(self, name: str, value: str) -> Any: """Parse Sec-CH-UA style headers.""" if name == 'sec-ch-ua-mobile': return value == '?1' if name in ('sec-ch-ua', 'sec-ch-ua-full-version-list'): brands = [] for match in self.SEC_CH_UA_PATTERN.finditer(value): brands.append({'brand': match.group(1), 'version': match.group(2)}) return brands if brands else value value = value.strip('"') return value def _parse_parameterized(self, value: str) -> HeaderValue: """Parse headers with parameters (e.g., Content-Type).""" parts = value.split(';') main_value = parts[0].strip() params = {} for part in parts[1:]: part = part.strip() if '=' in part: k, v = part.split('=', 1) params[k.strip()] = v.strip().strip('"') return HeaderValue(value=main_value, params=params) def _parse_accept(self, value: str) -> List[Dict[str, Any]]: """Parse Accept header.""" if value == '*/*': return [{'type': '*/*', 'q': 1.0}] result = [] for part in value.split(','): part = part.strip() if ';' in part: media, *params = part.split(';') entry = {'type': media.strip(), 'q': 1.0} for p in params: if '=' in p: k, v = p.split('=', 1) k = k.strip() if k == 'q': try: entry['q'] = float(v.strip()) except ValueError: pass else: entry[k] = v.strip() result.append(entry) else: result.append({'type': part, 'q': 1.0}) return sorted(result, key=lambda x: x['q'], reverse=True) def _parse_accept_language(self, value: str) -> List[Dict[str, Any]]: """Parse Accept-Language header.""" result = [] for part in value.split(','): part = part.strip() if ';' in part: lang, *params = part.split(';') entry = {'lang': lang.strip(), 'q': 1.0} for p in params: if '=' in p and p.strip().startswith('q='): try: entry['q'] = float(p.split('=')[1].strip()) except ValueError: pass result.append(entry) else: result.append({'lang': part, 'q': 1.0}) return sorted(result, key=lambda x: x['q'], reverse=True) def _parse_accept_encoding(self, value: str) -> List[str]: """Parse Accept-Encoding header.""" return [e.strip() for e in value.split(',')] def _parse_cache_control(self, value: str) -> Dict[str, Any]: """Parse Cache-Control header.""" result = {} for part in value.split(','): part = part.strip() if '=' in part: k, v = part.split('=', 1) try: result[k.strip()] = int(v.strip()) except ValueError: result[k.strip()] = v.strip() else: result[part] = True return result def _parse_priority(self, value: str) -> Dict[str, Any]: """Parse Priority header.""" result = {} for part in value.split(','): part = part.strip() if '=' in part: k, v = part.split('=', 1) result[k.strip()] = v.strip() else: result[part] = True return result # === Public API ===
[docs] def get(self, name: str, default: Any = None) -> Any: """Get header value by name (case-insensitive).""" return self._headers.get(self._normalize_key(name), default)
[docs] def __getitem__(self, name: str) -> Any: name_lower = self._normalize_key(name) if name_lower not in self._headers: raise KeyError(f"Header '{name}' not found") return self._headers[name_lower]
[docs] def __setitem__(self, name: str, value: Any) -> None: """Set header with bracket notation.""" self.set(name, value)
[docs] def __delitem__(self, name: str) -> None: """Delete header with bracket notation.""" key = self._normalize_key(name) if key not in self._headers: raise KeyError(f"Header '{name}' not found") del self._headers[key]
[docs] def __contains__(self, name: str) -> bool: return self._normalize_key(name) in self._headers
[docs] def __iter__(self) -> Iterator[str]: return iter(self._headers)
[docs] def __len__(self) -> int: return len(self._headers)
def __repr__(self) -> str: return f"HeaderParser({dict(list(self._headers.items())[:5])}{'...' if len(self._headers) > 5 else ''})"
[docs] def keys(self) -> List[str]: return list(self._headers.keys())
[docs] def values(self) -> List[Any]: return list(self._headers.values())
[docs] def items(self) -> List[tuple]: return list(self._headers.items())
[docs] def to_dict(self, stringify: bool = False) -> Dict[str, Any]: """ Get all headers as dictionary. Args: stringify: If True, convert all values to strings """ if not stringify: return self._headers.copy() return {k: str(v) for k, v in self._headers.items()}
[docs] def to_raw(self) -> str: """Convert parsed headers back to raw format.""" lines = [] for name, value in self._headers.items(): if isinstance(value, list): for v in value: lines.append(f"{name}: {v}") elif isinstance(value, HeaderValue): lines.append(f"{name}: {value}") elif isinstance(value, bool): lines.append(f"{name}: {'1' if value else '0'}") else: lines.append(f"{name}: {value}") return '\n'.join(lines)
[docs] def to_requests_headers(self) -> Dict[str, str]: """Convert to format suitable for requests library.""" result = {} for name, value in self._headers.items(): if isinstance(value, list): result[name] = ', '.join(str(v) for v in value) else: result[name] = str(value) return result
@property def cookies(self) -> CookieParser: """Access cookie parser.""" return self._cookies
[docs] def get_cookies_as_header(self) -> str: """Get all cookies formatted as Cookie header value.""" return self._cookies.to_cookie_header()
[docs] def get_cookies_as_dict(self) -> Dict[str, str]: """Get all cookies as dictionary.""" return self._cookies.to_dict()
@property def content_type(self) -> Optional[str]: """Get Content-Type value.""" ct = self.get('content-type') if isinstance(ct, HeaderValue): return ct.value return ct @property def content_length(self) -> Optional[int]: """Get Content-Length as integer.""" return self.get('content-length') @property def user_agent(self) -> Optional[str]: """Get User-Agent value.""" return self.get('user-agent') @property def origin(self) -> Optional[str]: """Get Origin value.""" return self.get('origin') @property def referer(self) -> Optional[str]: """Get Referer value.""" return self.get('referer')
[docs] def is_ajax(self) -> bool: """Check if request is AJAX/XHR.""" return self.get('x-requested-with', '').lower() == 'xmlhttprequest'
[docs] def is_cors(self) -> bool: """Check if request is CORS.""" return self.get('origin') is not None
[docs] def get_sec_fetch_metadata(self) -> Dict[str, str]: """Get all Sec-Fetch-* headers.""" return { 'site': self.get('sec-fetch-site', ''), 'mode': self.get('sec-fetch-mode', ''), 'dest': self.get('sec-fetch-dest', ''), 'user': self.get('sec-fetch-user', ''), }
[docs] def get_client_hints(self) -> Dict[str, Any]: """Get all Client Hints (``Sec-CH-*``) headers.""" hints = {} for name, value in self._headers.items(): if name.startswith('sec-ch-'): hints[name] = value return hints
[docs] @classmethod def from_dict(cls, headers: Dict[str, Any]) -> 'HeaderParser': """Create HeaderParser from dictionary.""" parser = cls() parser.update(headers) return parser
[docs] @classmethod def from_kwargs(cls, **kwargs: Any) -> 'HeaderParser': """Create HeaderParser from keyword arguments.""" return cls(**kwargs)
[docs] @classmethod def from_requests_response(cls, response: Any) -> 'HeaderParser': """Create HeaderParser from requests Response object.""" parser = cls() if hasattr(response, 'headers'): for k, v in response.headers.items(): parser.set(k, v) return parser