Last active
September 13, 2024 05:16
-
-
Save lmassaron/6695171ff45bae7ef7ddcdad2ad493ca to your computer and use it in GitHub Desktop.
Preprocessing scheme for high-cardinality categorical attributes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def add_noise(series, noise_level): | |
| return series * (1 + noise_level * np.random.randn(len(series))) | |
| def target_encode(trn_series=None, tst_series=None, target=None, k=1, f=1, noise_level=0): | |
| """ | |
| Encoding is computed like in the following paper by: | |
| Micci-Barreca, Daniele. "A preprocessing scheme for high-cardinality categorical attributes in classification and prediction problems." ACM SIGKDD Explorations Newsletter 3.1 (2001): 27-32. | |
| trn_series (pd.Series) : categorical feature in-sample | |
| tst_series (pd.Series) : categorical feature out-of-sample | |
| target (pd.Series) : target data in-sample | |
| k (int) : half of the minimal sample size for which we completely “trust” the estimate based on the sample in the cell | |
| f (int) : rate of transition between the cell’s posterior probability and the prior probability | |
| """ | |
| assert len(trn_series) == len(target) | |
| assert trn_series.name == tst_series.name | |
| temp = pd.concat([trn_series, target], axis=1) | |
| # Compute target mean | |
| averages = temp.groupby(by=trn_series.name)[target.name].agg(["mean", "count"]) | |
| # Compute smoothing (see formula 4 on paper) | |
| smoothing = 1 / (1 + np.exp(-(averages["count"] - k) / f)) | |
| # Apply average function to all target data | |
| prior = target.mean() | |
| # The bigger the count the less full_avg is taken into account | |
| averages[target.name] = prior * (1 - smoothing) + averages["mean"] * smoothing | |
| averages.drop(["mean", "count"], axis=1, inplace=True) | |
| # Apply averages to trn and tst series | |
| ft_trn_series = pd.merge( | |
| trn_series.to_frame(trn_series.name), | |
| averages.reset_index().rename(columns={'index': target.name, target.name: 'average'}), | |
| on=trn_series.name, | |
| how='left')['average'].rename(trn_series.name + '_mean').fillna(prior) | |
| # pd.merge does not keep the index so restore it | |
| ft_trn_series.index = trn_series.index | |
| ft_tst_series = pd.merge( | |
| tst_series.to_frame(tst_series.name), | |
| averages.reset_index().rename(columns={'index': target.name, target.name: 'average'}), | |
| on=tst_series.name, | |
| how='left')['average'].rename(trn_series.name + '_mean').fillna(prior) | |
| # pd.merge does not keep the index so restore it | |
| ft_tst_series.index = tst_series.index | |
| return add_noise(ft_trn_series, noise_level), add_noise(ft_tst_series, noise_level) | |
| class TargetEncode(BaseEstimator, TransformerMixin): | |
| def __init__(self, categories='auto', k=1, f=1, noise_level=0, random_state=None): | |
| self.categories = categories | |
| self.k = k | |
| self.f = f | |
| self.noise_level = noise_level | |
| self.encodings = dict() | |
| self.prior = None | |
| self.random_state = random_state | |
| def add_noise(self, series, noise_level): | |
| return series * (1 + noise_level * np.random.randn(len(series))) | |
| def fit(self, X, y=None): | |
| if type(self.categories)==str: | |
| self.categories = np.where(X.dtypes == type(object()))[0] | |
| temp = X.iloc[:, self.categories].copy() | |
| temp['target'] = y | |
| self.prior = np.mean(y) | |
| for variable in X.columns[categories]: | |
| avg = temp.groupby(by=variable)['target'].agg(['mean', 'count']) | |
| # Compute smoothing (see formula 4 on paper) | |
| smoothing = 1 / (1 + np.exp(-(avg['count'] - self.k) / self.f)) | |
| # The bigger the count the less full_avg is taken into account | |
| self.encodings[variable] = dict(self.prior * (1 - smoothing) + avg['mean'] * smoothing) | |
| return self | |
| def transform(self, X): | |
| Xt = X.copy() | |
| for variable in Xt.columns[categories]: | |
| Xt[variable].replace(self.encodings[variable], inplace=True) | |
| unknown_value = {value:self.prior for value in X[variable].unique() | |
| if value not in self.encodings[variable].keys()} | |
| if len(unknown_value) > 0: | |
| Xt[variable].replace(unknown_value, inplace=True) | |
| Xt[variable] = Xt[variable].astype(float) | |
| if self.noise_level > 0: | |
| if self.random_state is not None: | |
| np.random.seed(self.random_state) | |
| Xt[variable] = self.add_noise(Xt[variable], self.noise_level) | |
| return Xt | |
| def fit_transform(self, X, y=None): | |
| self.fit(X, y) | |
| return self.transform(X) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment