Source code for filipino_tokenizer.tagalog.tokenizer

"""
Tagalog Tokenizer — combines morphological segmentation with
morphology-aware BPE.

Pipeline:
    1. Pre-tokenize text on whitespace/punctuation
    2. For each word, run the morphological segmenter to find morpheme
       boundaries, then insert boundary markers into the **surface text**
       so that the original spelling is preserved.
    3. Feed the boundary-annotated corpus into ``MorphAwareBPE.train()``
    4. At encode time, re-run segmentation + BPE encode

This ensures the BPE vocabulary never contains merges that cross
morpheme boundaries, while preserving perfect round-trip fidelity.
"""

import os
import re
import json

from filipino_tokenizer.tagalog.segmenter import TagalogSegmenter
from filipino_tokenizer.tagalog.bpe import MorphAwareBPE, BOUNDARY


[docs] class TagalogTokenizer: """ End-to-end tokenizer for Tagalog text. Usage:: tok = TagalogTokenizer() tok.train("corpus.txt", vocab_size=32000) ids = tok.encode("Kumain siya ng pagkain.") text = tok.decode(ids) assert text == "kumain siya ng pagkain." """ def __init__(self): self.segmenter = TagalogSegmenter() self.bpe = MorphAwareBPE() self._segment_cache: dict[str, str] = {} # ================================================================== # # Training # # ================================================================== #
[docs] def train(self, corpus_path: str, vocab_size: int = 32_000) -> None: """ Train the tokenizer from a plain-text corpus file. Steps: 1. Read the corpus file line-by-line. 2. Pre-tokenize each line into words / punctuation. 3. Segment each word morphologically. 4. Insert boundary markers into the surface text at morpheme boundaries (preserving original spelling). 5. Train BPE with the CBPE constraint. Parameters ---------- corpus_path : str Path to a UTF-8 plain-text file (one sentence per line). vocab_size : int Target BPE vocabulary size. """ import sys # Count non-empty lines for progress reporting print("Counting lines ...", end="\r", file=sys.stderr, flush=True) with open(corpus_path, "r", encoding="utf-8") as f: total_lines = sum(1 for ln in f if ln.strip()) annotated_tokens: list[str] = [] cache: dict[str, str] = {} report_every = max(1, total_lines // 20) # report every ~5% processed = 0 print(f"Segmenting {total_lines:,} lines ... ", file=sys.stderr) with open(corpus_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue parts = re.split(r'(\s+|[^\w])', line) for part in parts: if not part: continue if re.match(r'^\w+$', part): word = part.lower() if word not in cache: cache[word] = self._surface_annotate(word) annotated_tokens.append(cache[word]) else: annotated_tokens.append(part) processed += 1 if processed % report_every == 0: pct = processed / total_lines * 100 print( f" {pct:5.1f}% {processed:,}/{total_lines:,} lines" f" {len(cache):,} unique words", end="\r", file=sys.stderr, flush=True, ) print( f" 100.0% {total_lines:,}/{total_lines:,} lines" f" {len(cache):,} unique words ", file=sys.stderr, ) print(f"Training BPE (vocab_size={vocab_size:,}) ...", file=sys.stderr) self.bpe.train(annotated_tokens, vocab_size=vocab_size)
# ================================================================== # # Encoding # # ================================================================== #
[docs] def encode(self, text: str) -> list[int]: """ Encode *text* into a list of integer token IDs. The text is lowercased, split into words/punctuation, each word is morphologically segmented (with boundary markers in the surface form), and BPE encoding is applied. """ all_ids: list[int] = [] tokens = self._segment_line(text) for token in tokens: ids = self.bpe.encode(token) all_ids.extend(ids) return all_ids
[docs] def tokenize(self, text: str) -> list[str]: """ Tokenize *text* into subword strings (for debugging / inspection). Returns the string representation of each BPE token rather than integer IDs. """ ids = self.encode(text) return [ self.bpe.id_to_token.get(i, MorphAwareBPE.UNK) for i in ids ]
# ================================================================== # # Decoding # # ================================================================== #
[docs] def decode(self, ids: list[int]) -> str: """ Decode a list of token IDs back to a readable string. Boundary markers and special tokens are removed. Spaces between words are reconstructed by detecting word-boundary tokens. """ # Reconstruct raw text from BPE raw = self.bpe.decode(ids) # Normalise whitespace raw = re.sub(r'\s+', ' ', raw).strip() return raw
# ================================================================== # # Persistence # # ================================================================== #
[docs] def save(self, directory: str) -> None: """ Save the trained tokenizer to *directory*. Creates: - ``vocab.json`` — BPE vocabulary mapping - ``merges.txt`` — ordered merge rules """ self.bpe.save(directory)
[docs] def load(self, directory: str) -> None: """Load a previously saved tokenizer from *directory*.""" self.bpe.load(directory)
[docs] def prewarm(self, lines: list[str]) -> None: """ Pre-segment all unique words across *lines* to warm the segment cache. ``TagalogTokenizer`` caches morphological segmentation per word in ``_segment_cache``. A large corpus has millions of lines but typically only tens of thousands of unique words. Calling this before ``encode()`` / ``tokenize()`` ensures each word is segmented exactly once, cutting tokenization time by ~10x on real corpora. Parameters ---------- lines : list[str] The same lines you intend to tokenize. """ import sys unique_words: set[str] = set() for line in lines: for part in re.split(r'(\s+|[^\w])', line): if part and re.match(r'^\w+$', part): unique_words.add(part.lower()) total = len(unique_words) print(f"Pre-warming cache: {total:,} unique words ...", end="\r", file=sys.stderr) for word in unique_words: if word not in self._segment_cache: self._segment_cache[word] = self._surface_annotate(word) print(f"Cache warmed: {total:,} unique words segmented. ", file=sys.stderr)
[docs] def load_pretrained(self) -> None: """ Load the bundled pretrained 32k Tagalog tokenizer. No path needed — the model is included in the package:: tok = TagalogTokenizer() tok.load_pretrained() ids = tok.encode("Kumain siya ng pagkain.") """ pretrained_dir = os.path.normpath( os.path.join(os.path.dirname(__file__), "..", "data", "pretrained") ) self.bpe.load(pretrained_dir)
# ================================================================== # # Internal helpers # # ================================================================== # def _segment_line(self, line: str) -> list[str]: """ Pre-tokenize a line of text and morphologically segment each word. Returns a list of boundary-annotated strings where boundary markers are inserted into the **surface text** at morpheme boundaries. This preserves the original spelling for perfect round-trip fidelity. For example, ``kumain`` is segmented into morphemes ``[um, kain]`` (infix *um*) but the surface form is ``kumain``. We locate morpheme boundaries in the surface text and produce ``"k▁um▁ain"`` so that: - BPE sees boundaries and won't merge across them - Removing ▁ gives back ``kumain`` exactly. """ result: list[str] = [] # Split on whitespace and punctuation, keeping delimiters parts = re.split(r'(\s+|[^\w])', line) for part in parts: if not part: continue if re.match(r'^\w+$', part): # Morphological segmentation word = part.lower() if word not in self._segment_cache: self._segment_cache[word] = self._surface_annotate(word) result.append(self._segment_cache[word]) else: # Whitespace / punctuation — pass through result.append(part) return result def _surface_annotate(self, word: str) -> str: """ Run the morphological segmenter and insert boundary markers into the surface text at positions corresponding to morpheme boundaries. The segmenter returns abstract morphemes which, for infixes, don't directly concatenate to form the surface word. This method maps the morphemes back to the surface form. Strategies: - Prefixes/suffixes: direct concatenation matches surface form. - Infixes (-um-, -in-): inserted after the first consonant. Surface = first_consonant + infix + remainder_of_root. - Unsegmented: return as-is (no boundaries). """ morphemes = self.segmenter.segment(word) # Single morpheme or empty — no boundaries to insert if len(morphemes) <= 1: return word # Check if direct concatenation matches the surface form # (works for prefixes, suffixes, circumfixes) concat = "".join(morphemes) if concat == word: return BOUNDARY.join(morphemes) # Handle infix cases: the segmenter returns [infix, root] or # [prefix, infix, root] etc. # We need to find where these morphemes appear in the surface text return self._reconstruct_with_infixes(word, morphemes) def _reconstruct_with_infixes(self, word: str, morphemes: list[str]) -> str: """ Insert boundary markers for infix-containing words. Known Tagalog infix patterns: - [infix, root]: e.g. ['um', 'kain'] for 'kumain' Surface = root[0] + infix + root[1:] = k + um + ain - [prefix, infix, root]: e.g. ['nag', 'um', 'root'] (rare, but handle it) We try to locate each morpheme's contribution in the surface text. """ infixes = set(self.segmenter.affixes.get_infixes()) # Case: [infix, root] — the most common infix-only pattern if len(morphemes) == 2 and morphemes[0] in infixes: infix = morphemes[0] root = morphemes[1] # Surface form: root[0] + infix + root[1:] if len(root) >= 1: expected = root[0] + infix + root[1:] if expected == word: # Boundary after root[0], after infix return root[0] + BOUNDARY + infix + BOUNDARY + root[1:] # Case: [prefix(es)…, infix, root] # Try to find the infix in the morpheme list and handle prefix part prefix_parts = [] infix_part = None root_part = None for i, m in enumerate(morphemes): if m in infixes and infix_part is None: infix_part = m # Everything after the infix is the root remaining = morphemes[i + 1:] if remaining: root_part = remaining[0] # Any further morphemes are suffixes suffix_parts = remaining[1:] else: suffix_parts = [] break else: prefix_parts.append(m) if infix_part and root_part: # Build: prefixes + root[0] + infix + root[1:] + suffixes prefix_str = "".join(prefix_parts) infix_surface = root_part[0] + infix_part + root_part[1:] suffix_str = "".join(suffix_parts) if suffix_parts else "" expected = prefix_str + infix_surface + suffix_str if expected == word: parts = [] if prefix_parts: parts.extend(prefix_parts) parts.append(root_part[0]) parts.append(infix_part) parts.append(root_part[1:]) if suffix_parts: parts.extend(suffix_parts) # Filter empty parts parts = [p for p in parts if p] return BOUNDARY.join(parts) # Fallback: can't reconstruct, return with simple boundary join # (won't roundtrip for complex cases, but is safe) return BOUNDARY.join(morphemes)