from __future__ import annotations import importlib import logging import unicodedata from bisect import bisect_right from codecs import IncrementalDecoder from encodings.aliases import aliases from functools import lru_cache from re import findall from typing import Generator from _multibytecodec import ( # type: ignore[import-not-found,import] MultibyteIncrementalDecoder, ) from .constant import ( ENCODING_MARKS, IANA_SUPPORTED_SIMILAR, RE_POSSIBLE_ENCODING_INDICATION, UNICODE_RANGES_COMBINED, UNICODE_SECONDARY_RANGE_KEYWORD, UTF8_MAXIMAL_ALLOCATION, COMMON_CJK_CHARACTERS, _LATIN, _CJK, _HANGUL, _KATAKANA, _HIRAGANA, _THAI, _ARABIC, _ARABIC_ISOLATED_FORM, _ACCENT_KEYWORDS, _ACCENTUATED, ) @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) def _character_flags(character: str) -> int: """Compute all name-based classification flags with a single unicodedata.name() call.""" try: desc: str = unicodedata.name(character) except ValueError: return 0 flags: int = 0 if "LATIN" in desc: flags |= _LATIN if "CJK" in desc: flags |= _CJK if "HANGUL" in desc: flags |= _HANGUL if "KATAKANA" in desc: flags |= _KATAKANA if "HIRAGANA" in desc: flags |= _HIRAGANA if "THAI" in desc: flags |= _THAI if "ARABIC" in desc: flags |= _ARABIC if "ISOLATED FORM" in desc: flags |= _ARABIC_ISOLATED_FORM for kw in _ACCENT_KEYWORDS: if kw in desc: flags |= _ACCENTUATED break return flags @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) def is_accentuated(character: str) -> bool: return bool(_character_flags(character) & _ACCENTUATED) @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) def remove_accent(character: str) -> str: decomposed: str = unicodedata.decomposition(character) if not decomposed: return character codes: list[str] = decomposed.split(" ") return chr(int(codes[0], 16)) # Pre-built sorted lookup table for O(log n) binary search in unicode_range(). # Each entry is (range_start, range_end_exclusive, range_name). _UNICODE_RANGES_SORTED: list[tuple[int, int, str]] = sorted( (ord_range.start, ord_range.stop, name) for name, ord_range in UNICODE_RANGES_COMBINED.items() ) _UNICODE_RANGE_STARTS: list[int] = [e[0] for e in _UNICODE_RANGES_SORTED] @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) def unicode_range(character: str) -> str | None: """ Retrieve the Unicode range official name from a single character. """ character_ord: int = ord(character) # Binary search: find the rightmost range whose start <= character_ord idx = bisect_right(_UNICODE_RANGE_STARTS, character_ord) - 1 if idx >= 0: start, stop, name = _UNICODE_RANGES_SORTED[idx] if character_ord < stop: return name return None @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) def is_latin(character: str) -> bool: return bool(_character_flags(character) & _LATIN) @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) def is_punctuation(character: str) -> bool: character_category: str = unicodedata.category(character) if "P" in character_category: return True character_range: str | None = unicode_range(character) if character_range is None: return False return "Punctuation" in character_range @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) def is_symbol(character: str) -> bool: character_category: str = unicodedata.category(character) if "S" in character_category or "N" in character_category: return True character_range: str | None = unicode_range(character) if character_range is None: return False return "Forms" in character_range and character_category != "Lo" @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) def is_emoticon(character: str) -> bool: character_range: str | None = unicode_range(character) if character_range is None: return False return "Emoticons" in character_range or "Pictographs" in character_range @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) def is_separator(character: str) -> bool: if character.isspace() or character in {"|", "+", "<", ">"}: return True character_category: str = unicodedata.category(character) return "Z" in character_category or character_category in {"Po", "Pd", "Pc"} @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) def is_case_variable(character: str) -> bool: return character.islower() != character.isupper() @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) def is_cjk(character: str) -> bool: return bool(_character_flags(character) & _CJK) @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) def is_hiragana(character: str) -> bool: return bool(_character_flags(character) & _HIRAGANA) @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) def is_katakana(character: str) -> bool: return bool(_character_flags(character) & _KATAKANA) @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) def is_hangul(character: str) -> bool: return bool(_character_flags(character) & _HANGUL) @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) def is_thai(character: str) -> bool: return bool(_character_flags(character) & _THAI) @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) def is_arabic(character: str) -> bool: return bool(_character_flags(character) & _ARABIC) @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) def is_arabic_isolated_form(character: str) -> bool: return bool(_character_flags(character) & _ARABIC_ISOLATED_FORM) @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) def is_cjk_uncommon(character: str) -> bool: return character not in COMMON_CJK_CHARACTERS @lru_cache(maxsize=len(UNICODE_RANGES_COMBINED)) def is_unicode_range_secondary(range_name: str) -> bool: return any(keyword in range_name for keyword in UNICODE_SECONDARY_RANGE_KEYWORD) @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) def is_unprintable(character: str) -> bool: return ( character.isspace() is False # includes \n \t \r \v and character.isprintable() is False and character != "\x1a" # Why? Its the ASCII substitute character. and character != "\ufeff" # bug discovered in Python, # Zero Width No-Break Space located in Arabic Presentation Forms-B, Unicode 1.1 not acknowledged as space. ) def any_specified_encoding(sequence: bytes, search_zone: int = 8192) -> str | None: """ Extract using ASCII-only decoder any specified encoding in the first n-bytes. """ if not isinstance(sequence, (bytes, bytearray)): raise TypeError seq_len: int = len(sequence) results: list[str] = findall( RE_POSSIBLE_ENCODING_INDICATION, sequence[: min(seq_len, search_zone)].decode("ascii", errors="ignore"), ) if len(results) == 0: return None for specified_encoding in results: specified_encoding = specified_encoding.lower().replace("-", "_") encoding_alias: str encoding_iana: str for encoding_alias, encoding_iana in aliases.items(): if encoding_alias == specified_encoding: return encoding_iana if encoding_iana == specified_encoding: return encoding_iana return None @lru_cache(maxsize=128) def is_multi_byte_encoding(name: str) -> bool: """ Verify is a specific encoding is a multi byte one based on it IANA name """ return name in { "utf_8", "utf_8_sig", "utf_16", "utf_16_be", "utf_16_le", "utf_32", "utf_32_le", "utf_32_be", "utf_7", } or issubclass( importlib.import_module(f"encodings.{name}").IncrementalDecoder, MultibyteIncrementalDecoder, ) def identify_sig_or_bom(sequence: bytes) -> tuple[str | None, bytes]: """ Identify and extract SIG/BOM in given sequence. """ for iana_encoding in ENCODING_MARKS: marks: bytes | list[bytes] = ENCODING_MARKS[iana_encoding] if isinstance(marks, bytes): marks = [marks] for mark in marks: if sequence.startswith(mark): return iana_encoding, mark return None, b"" def should_strip_sig_or_bom(iana_encoding: str) -> bool: return iana_encoding not in {"utf_16", "utf_32"} def iana_name(cp_name: str, strict: bool = True) -> str: """Returns the Python normalized encoding name (Not the IANA official name).""" cp_name = cp_name.lower().replace("-", "_") encoding_alias: str encoding_iana: str for encoding_alias, encoding_iana in aliases.items(): if cp_name in [encoding_alias, encoding_iana]: return encoding_iana if strict: raise ValueError(f"Unable to retrieve IANA for '{cp_name}'") return cp_name def cp_similarity(iana_name_a: str, iana_name_b: str) -> float: if is_multi_byte_encoding(iana_name_a) or is_multi_byte_encoding(iana_name_b): return 0.0 decoder_a = importlib.import_module(f"encodings.{iana_name_a}").IncrementalDecoder decoder_b = importlib.import_module(f"encodings.{iana_name_b}").IncrementalDecoder id_a: IncrementalDecoder = decoder_a(errors="ignore") id_b: IncrementalDecoder = decoder_b(errors="ignore") character_match_count: int = 0 for i in range(256): to_be_decoded: bytes = bytes([i]) if id_a.decode(to_be_decoded) == id_b.decode(to_be_decoded): character_match_count += 1 return character_match_count / 256 def is_cp_similar(iana_name_a: str, iana_name_b: str) -> bool: """ Determine if two code page are at least 80% similar. IANA_SUPPORTED_SIMILAR dict was generated using the function cp_similarity. """ return ( iana_name_a in IANA_SUPPORTED_SIMILAR and iana_name_b in IANA_SUPPORTED_SIMILAR[iana_name_a] ) def set_logging_handler( name: str = "charset_normalizer", level: int = logging.INFO, format_string: str = "%(asctime)s | %(levelname)s | %(message)s", ) -> None: logger = logging.getLogger(name) logger.setLevel(level) handler = logging.StreamHandler() handler.setFormatter(logging.Formatter(format_string)) logger.addHandler(handler) def cut_sequence_chunks( sequences: bytes, encoding_iana: str, offsets: range, chunk_size: int, bom_or_sig_available: bool, strip_sig_or_bom: bool, sig_payload: bytes, is_multi_byte_decoder: bool, decoded_payload: str | None = None, ) -> Generator[str, None, None]: if decoded_payload and is_multi_byte_decoder is False: for i in offsets: chunk = decoded_payload[i : i + chunk_size] if not chunk: break yield chunk else: for i in offsets: chunk_end = i + chunk_size if chunk_end > len(sequences) + 8: continue cut_sequence = sequences[i : i + chunk_size] if bom_or_sig_available and strip_sig_or_bom is False: cut_sequence = sig_payload + cut_sequence chunk = cut_sequence.decode( encoding_iana, errors="ignore" if is_multi_byte_decoder else "strict", ) # multi-byte bad cutting detector and adjustment # not the cleanest way to perform that fix but clever enough for now. if is_multi_byte_decoder and i > 0: chunk_partial_size_chk: int = min(chunk_size, 16) if ( decoded_payload and chunk[:chunk_partial_size_chk] not in decoded_payload ): for j in range(i, i - 4, -1): cut_sequence = sequences[j:chunk_end] if bom_or_sig_available and strip_sig_or_bom is False: cut_sequence = sig_payload + cut_sequence chunk = cut_sequence.decode(encoding_iana, errors="ignore") if chunk[:chunk_partial_size_chk] in decoded_payload: break yield chunk