Created
December 18, 2025 10:27
-
-
Save alessiosavi/4413411b89c64c25308bf5ceed4fb4d4 to your computer and use it in GitHub Desktop.
PoC for pattern matching based mapping
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from pprint import pprint | |
| import re | |
| import html | |
| import time | |
| import unicodedata | |
| import difflib | |
| import numpy as np | |
| # Predefined brand mapping (as given) | |
| BRAND_MAPPING = { | |
| "Gucci": ["GUCCI", "gucci", "Gucci Pre-Owned"], | |
| "Prada": ["PRADA", "prada"], | |
| "Fendi": ["FENDI", "fendi"], | |
| "Burberry": ["Burberrys", "BURBERRYS", "BURBERY"], | |
| "Tod's": ["Tod'S", "TOD'S", "Tods"], | |
| "Dolce & Gabbana": ["Dolce & Gabbana", "D&G", "DG", "DOLCE E GABBANA", "DOLCE & GABBANA"], | |
| "Zadig & Voltaire": ["Zadig&Voltaire"], | |
| "Hermès": ["HERMÈS", "HERMES", "Hermes"], | |
| "Chanel": ["CHANEL"], | |
| "Celine": ["CELINE", "Céline"], | |
| "Louis Vuitton": ["LOUIS VUITTON", "LV"], | |
| "Saint Laurent": ["SAINT LAURENT", "YSL"], | |
| "Bottega Veneta": ["BOTTEGA VENETA"], | |
| "Balenciaga": ["BALENCIAGA", "BALENCIANGA"], | |
| "Valentino": ["VALENTINO", "VALENTINO GARAVANI", "VLTN"], | |
| "Stella McCartney": ["STELLA MCCARTNEY", "STELLA MECCARTNEY", "STELLA MACCARTNEY", "STELLA MCCARTENEY"], | |
| "Miu Miu": ["MIU MIU", "MIIU MIU"], | |
| "Montblanc": ["MONTBLANC", "MB"], | |
| } | |
| # Canonical list for fuzzy methods | |
| CANONICALS = list(BRAND_MAPPING.keys()) | |
| # Generate test data: (raw, expected_canonical) | |
| # Include all variants, canonicals themselves, and some artificial noisy examples | |
| TEST_DATA = [] | |
| for canonical, variants in BRAND_MAPPING.items(): | |
| TEST_DATA.extend([(var, canonical) for var in variants]) | |
| TEST_DATA.append((canonical, canonical)) # Add canonical itself | |
| # Add artificial noisy examples (typos, abbreviations, etc.) with ground truth | |
| NOISY_EXAMPLES = [ | |
| ("Guc ci", "Gucci"), # Space typo | |
| ("Prda", "Prada"), # Missing vowel | |
| ("Fendii", "Fendi"), # Extra i | |
| ("Burbery Pre-Owned", "Burberry"), # Spelling + pre-owned | |
| ("Todd's", "Tod's"), # Variant spelling | |
| ("Dolce and Gabbana", "Dolce & Gabbana"), # Word variant | |
| ("Zadig&Voltair", "Zadig & Voltaire"), # Missing e | |
| ("Hermes Pre Owned", "Hermès"), # No accent, pre-owned | |
| ("Channnel", "Chanel"), # Double n | |
| ("Céliné", "Celine"), # Accents | |
| ("Louie Vuiton", "Louis Vuitton"), # Common misspelling | |
| ("St Laurent", "Saint Laurent"), # Abbrev | |
| ("Botega Venetta", "Bottega Veneta"), # Misspelling | |
| ("Balen ciaga", "Balenciaga"), # Space | |
| ("Valenttino", "Valentino"), # Extra t | |
| ("Stela McCartny", "Stella McCartney"), # Misspellings | |
| ("MiuMiu", "Miu Miu"), # No space | |
| ("Mont Blanc", "Montblanc"), # Space | |
| ("Unbranded", "Unbranded"), # Special case | |
| ("", "Unbranded"), # Empty | |
| ("RandomBrand", "RandomBrand"), # Unknown, should remain as is | |
| ] | |
| TEST_DATA.extend(NOISY_EXAMPLES) | |
| TEST_DATA *=100 # Add some "meat" to understand better how the methods perform | |
| # Normalization cleaning function (common to all methods) | |
| def clean_brand(brand): | |
| if not brand or str(brand).lower() == "unbranded": | |
| return "Unbranded" | |
| brand = re.sub(r'\bpre[- ]?owned\b', '', str(brand), flags=re.IGNORECASE) | |
| brand = html.unescape(brand) | |
| brand = unicodedata.normalize('NFKC', brand) | |
| brand = brand.replace('/', '/').replace('&', '&').replace("'", "'").replace("`", "'") | |
| brand = re.sub(r'\s+', ' ', brand).strip().title() | |
| return brand | |
| # Method 1: Exact mapping using the variant dictionary | |
| def exact_map(brand): | |
| cleaned = clean_brand(brand) | |
| for canonical, variants in BRAND_MAPPING.items(): | |
| if cleaned.upper() in [v.upper() for v in variants + [canonical]]: | |
| return canonical | |
| return cleaned # Fallback to cleaned if no match | |
| # Pure Python Levenshtein distance | |
| def levenshtein_distance(s1, s2): | |
| if len(s1) < len(s2): | |
| return levenshtein_distance(s2, s1) | |
| if len(s2) == 0: | |
| return len(s1) | |
| previous_row = range(len(s2) + 1) | |
| for i, c1 in enumerate(s1): | |
| current_row = [i + 1] | |
| for j, c2 in enumerate(s2): | |
| insertions = previous_row[j + 1] + 1 | |
| deletions = current_row[j] + 1 | |
| substitutions = previous_row[j] + (c1 != c2) | |
| current_row.append(min(insertions, deletions, substitutions)) | |
| previous_row = current_row | |
| return previous_row[-1] | |
| def levenshtein_similarity(s1, s2): | |
| dist = levenshtein_distance(s1.lower(), s2.lower()) | |
| max_len = max(len(s1), len(s2)) | |
| return 1 - (dist / max_len) if max_len else 1 | |
| # Method 2: Fuzzy with Levenshtein (pure Python, equivalent to rapidfuzz Levenshtein) | |
| def fuzzy_levenshtein(brand): | |
| cleaned = clean_brand(brand) | |
| if cleaned == "Unbranded": | |
| return cleaned | |
| best_score = 0 | |
| best_canonical = cleaned | |
| for can in CANONICALS: | |
| score = levenshtein_similarity(cleaned, can) | |
| if score > best_score: | |
| best_score = score | |
| best_canonical = can | |
| if best_score >= 0.85: # Threshold | |
| return best_canonical | |
| return cleaned | |
| # Pure Python Jaro-Winkler | |
| def jaro_distance(s1, s2): | |
| s1_len = len(s1) | |
| s2_len = len(s2) | |
| if s1_len == 0 or s2_len == 0: | |
| return 0.0 | |
| match_distance = max(s1_len, s2_len) // 2 - 1 | |
| s1_matches = [False] * s1_len | |
| s2_matches = [False] * s2_len | |
| matches = 0 | |
| transpositions = 0 | |
| # Finding matches | |
| for i in range(s1_len): | |
| start = max(0, i - match_distance) | |
| end = min(i + match_distance + 1, s2_len) | |
| for j in range(start, end): | |
| if s2_matches[j]: | |
| continue | |
| if s1[i] != s2[j]: | |
| continue | |
| s1_matches[i] = True | |
| s2_matches[j] = True | |
| matches += 1 | |
| break | |
| if matches == 0: | |
| return 0.0 | |
| k = 0 | |
| for i in range(s1_len): | |
| if not s1_matches[i]: | |
| continue | |
| while not s2_matches[k]: | |
| k += 1 | |
| if s1[i] != s2[k]: | |
| transpositions += 1 | |
| k += 1 | |
| transpositions /= 2 | |
| return (matches / s1_len + matches / s2_len + (matches - transpositions) / matches) / 3.0 | |
| def jaro_winkler(s1, s2, p=0.1, max_l=4): | |
| jaro_dist = jaro_distance(s1, s2) | |
| # Find the length of common prefix up to a max of max_l | |
| prefix_length = 0 | |
| for i in range(min(len(s1), len(s2), max_l)): | |
| if s1[i] == s2[i]: | |
| prefix_length += 1 | |
| else: | |
| break | |
| return jaro_dist + (prefix_length * p * (1 - jaro_dist)) | |
| # Method 3: Fuzzy with Jaro-Winkler (pure Python, equivalent to rapidfuzz Jaro-Winkler) | |
| def fuzzy_jarowinkler(brand): | |
| cleaned = clean_brand(brand) | |
| if cleaned == "Unbranded": | |
| return cleaned | |
| best_score = 0 | |
| best_canonical = cleaned | |
| for can in CANONICALS: | |
| score = jaro_winkler(cleaned.lower(), can.lower()) | |
| if score > best_score: | |
| best_score = score | |
| best_canonical = can | |
| if best_score >= 0.85: # Threshold | |
| return best_canonical | |
| return cleaned | |
| # TF-IDF with Cosine Similarity Explanation: | |
| # TF-IDF treats each brand as a document. We use character 3-grams as "terms" to handle spelling variations. | |
| # 1. Extract unique n-grams from all canonical brands. | |
| # 2. Compute TF (term frequency) for each canonical: count of n-gram / total n-grams in brand. | |
| # 3. Compute IDF: log(N / df), where df is number of brands containing the n-gram. | |
| # 4. TF-IDF = TF * IDF for each vector. | |
| # 5. For a query brand, compute its TF-IDF vector using the same vocabulary. | |
| # 6. Compute cosine similarity: dot(u, v) / (||u|| * ||v||) for query vs each canonical. | |
| # 7. Select the max cosine > threshold. | |
| # NOTE: Overkill and time consuming | |
| def get_ngrams(s, n=3): | |
| if len(s) < n: | |
| return [s] | |
| return [s[i:i+n] for i in range(len(s) - n + 1)] | |
| # Precompute TF-IDF for canonicals | |
| canonicals_lower = [c.lower() for c in CANONICALS] | |
| all_terms = set() | |
| for can in canonicals_lower: | |
| grams = get_ngrams(can) | |
| for g in grams: | |
| all_terms.add(g) | |
| vocab = list(all_terms) | |
| vocab.sort() | |
| n_vocab = len(vocab) | |
| n_docs = len(canonicals_lower) | |
| # Term index dict for faster lookup | |
| term_to_idx = {term: idx for idx, term in enumerate(vocab)} | |
| # TF matrix | |
| tf_can = np.zeros((n_docs, n_vocab)) | |
| for i, can in enumerate(canonicals_lower): | |
| grams = get_ngrams(can) | |
| gram_counts = {} | |
| for g in grams: | |
| gram_counts[g] = gram_counts.get(g, 0) + 1 | |
| total_grams = len(grams) | |
| for g, count in gram_counts.items(): | |
| if g in term_to_idx: | |
| tf_can[i, term_to_idx[g]] = count / total_grams if total_grams > 0 else 0 | |
| # IDF | |
| df = np.sum(tf_can > 0, axis=0) | |
| idf = np.log(n_docs / (df + 1e-8)) + 1 # Smooth IDF | |
| # TF-IDF | |
| tfidf_can = tf_can * idf | |
| # Norms | |
| norms_can = np.linalg.norm(tfidf_can, axis=1) | |
| # Method 4: Fuzzy with TF-IDF Cosine (using n-grams) | |
| def fuzzy_tfidf(brand): | |
| cleaned = clean_brand(brand) | |
| if cleaned == "Unbranded": | |
| return cleaned | |
| q_lower = cleaned.lower() | |
| q_grams = get_ngrams(q_lower) | |
| q_tf = np.zeros(n_vocab) | |
| gram_counts = {} | |
| for g in q_grams: | |
| gram_counts[g] = gram_counts.get(g, 0) + 1 | |
| total_grams = len(q_grams) | |
| for g, count in gram_counts.items(): | |
| if g in term_to_idx: | |
| q_tf[term_to_idx[g]] = count / total_grams if total_grams > 0 else 0 | |
| q_tfidf = q_tf * idf | |
| q_norm = np.linalg.norm(q_tfidf) | |
| if q_norm == 0: | |
| return cleaned | |
| cosines = np.dot(tfidf_can, q_tfidf) / (norms_can * q_norm + 1e-8) | |
| best_idx = np.argmax(cosines) | |
| if cosines[best_idx] >= 0.7: # Threshold, adjusted for n-grams | |
| return CANONICALS[best_idx] | |
| return cleaned | |
| # Pure Python Soundex | |
| def soundex_generator(token): | |
| # Convert the word to upper case for uniformity | |
| token = token.upper() | |
| soundex = "" | |
| # Retain the First Letter | |
| soundex += token[0] | |
| # Create a dictionary which maps letters to respective soundex codes. | |
| # Vowels and 'H', 'W' and 'Y' will be represented by '.' | |
| dictionary = { | |
| "BFPV": "1", | |
| "CGJKQSXZ": "2", | |
| "DT": "3", | |
| "L": "4", | |
| "MN": "5", | |
| "R": "6", | |
| "AEIOUHWY": "." | |
| } | |
| # Encode as per the dictionary | |
| for char in token[1:]: | |
| for key in dictionary.keys(): | |
| if char in key: | |
| code = dictionary[key] | |
| if code != '.': | |
| if code != soundex[-1]: # Avoid duplicates | |
| soundex += code | |
| break # Added break to stop after finding key | |
| # Trim or Pad to make Soundex a 7-character code | |
| soundex = soundex[:7].ljust(7, "0") | |
| return soundex | |
| # Method 5: Fuzzy with Soundex (phonetic, chosen for simplicity and speed over Metaphone) | |
| def fuzzy_soundex(brand): | |
| cleaned = clean_brand(brand) | |
| if cleaned == "Unbranded": | |
| return cleaned | |
| q_soundex = soundex_generator(cleaned) | |
| for can in CANONICALS: | |
| if soundex_generator(can) == q_soundex: | |
| return can | |
| return cleaned | |
| # Method 6: Fuzzy with difflib (SequenceMatcher ratio) | |
| def fuzzy_difflib(brand): | |
| cleaned = clean_brand(brand) | |
| if cleaned == "Unbranded": | |
| return cleaned | |
| best_match = difflib.get_close_matches(cleaned, CANONICALS, n=1, cutoff=0.85) | |
| return best_match[0] if best_match else cleaned | |
| # Framework to compute metrics | |
| def evaluate_normalization_methods(): | |
| methods = { | |
| 'exact_mapping': exact_map, | |
| 'fuzzy_levenshtein': fuzzy_levenshtein, # Pure Python Levenshtein (rapidfuzz equivalent) | |
| 'fuzzy_jarowinkler': fuzzy_jarowinkler, # Pure Python Jaro-Winkler (rapidfuzz equivalent) | |
| 'fuzzy_tfidf': fuzzy_tfidf, # TF-IDF with cosine | |
| 'fuzzy_soundex': fuzzy_soundex, # Soundex phonetic | |
| 'fuzzy_difflib': fuzzy_difflib, | |
| } | |
| results = {} | |
| total_tests = len(TEST_DATA) | |
| for method_name, method_func in methods.items(): | |
| correct = 0 | |
| start = time.process_time() | |
| for raw, expected in TEST_DATA: | |
| normalized = method_func(raw) | |
| if normalized == expected: | |
| correct += 1 | |
| duration = time.process_time() - start | |
| accuracy = correct / total_tests if total_tests > 0 else 0 | |
| results[method_name] = {'accuracy': accuracy,'duration':duration} | |
| return results | |
| # Example usage | |
| if __name__ == "__main__": | |
| scores = evaluate_normalization_methods() | |
| scores = dict(sorted(scores.items(), key = lambda x:x[1]['accuracy'],reverse=True)) | |
| pprint(scores) | |
| scores = { | |
| "fuzzy_jarowinkler": { | |
| "accuracy": 0.8902439024390244, | |
| "duration": 1.0974089999999999 | |
| }, | |
| "fuzzy_tfidf": { | |
| "accuracy": 0.8658536585365854, | |
| "duration": 0.2359309999999999 | |
| }, | |
| "fuzzy_soundex": { | |
| "accuracy": 0.8292682926829268, | |
| "duration": 0.4279989999999998 | |
| }, | |
| "exact_mapping": { | |
| "accuracy": 0.7804878048780488, | |
| "duration": 0.104941 | |
| }, | |
| "fuzzy_difflib": { | |
| "accuracy": 0.7560975609756098, | |
| "duration": 0.43470799999999965 | |
| }, | |
| "fuzzy_levenshtein": { | |
| "accuracy": 0.7195121951219512, | |
| "duration": 3.550218 | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment