8. Feature Engineering#

8.1. Automatic Feature Engineering#

Due to Deep Feature Synthesis (DFS) being a computationally expensive process, I decided to split it into retailer chunks, i.e. one execution per retailer, to maximize parallelism. Provided a list of MAIN_SYSTEM_ID, one can parallelize the execution in bash with:

PYARROW_IGNORE_TIMEZONE=1 \
    time parallel \
    -j 200% \
    -a data/retailer_ids.csv \
    python automatic-feature-engineering.py \
        --retailerid {} \
        --maxdepth 2

And here is the script itself (loaded from repository):

DFS script
  1import argparse
  2import joblib
  3import os
  4import re
  5import subprocess
  6from typing import List, Tuple
  7import itertools
  8import pandas as pd
  9import numpy as np
 10import featuretools as ft
 11from featuretools.primitives import AggregationPrimitive, TransformPrimitive
 12from featuretools_tsfresh_primitives import (
 13    comprehensive_fc_parameters,
 14    primitives_from_fc_settings,
 15)
 16from featuretools_tsfresh_primitives import primitives as tsfresh_primitives
 17from featuretools_tsfresh_primitives.primitives import *
 18
 19
 20if __name__ == "__main__":
 21    parser = argparse.ArgumentParser(description="Automatic feature engineering step")
 22    parser.add_argument(
 23        "--retailerid",
 24        type=int,
 25        help="MAIN_SYSTEM_ID",
 26    )
 27    parser.add_argument(
 28        "--maxdepth",
 29        type=int,
 30        default=2,
 31        help="Dictates the complexity of features built;"
 32        + " the higher the number, the more expensive the compute.",
 33    )
 34    args = parser.parse_args()
 35    selected_main_system_id = args.retailerid
 36    maxdepth = args.maxdepth
 37    print(f"\n\nStart DFS run for MAIN_SYSTEM_ID == {selected_main_system_id}")
 38
 39    # Load datasets
 40    loans_df = (
 41        pd.read_excel("data/Loans_Data.xlsx")
 42        .drop(
 43            [
 44                # Low signal columns
 45                "INITIAL_COST",
 46                "INDEX",
 47                "REPAYMENT_ID",
 48                "FINAL_COST",
 49                "RETAILER_ID",
 50                # Columns populated after the fact, thus would lead to data leak
 51                "REPAYMENT_UPDATED",
 52                "SPENT",
 53                "TOTAL_FINAL_AMOUNT",
 54                "FIRST_TRIAL_BALANCE",
 55                "FIRST_TRAIL_DELAYS",
 56                "PAYMENT_AMOUNT",
 57                "LOAN_PAYMENT_DATE",
 58                "REPAYMENT_AMOUNT",
 59                "CUMMULATIVE_OUTSTANDING",
 60                "PAYMENT_STATUS",
 61            ],
 62            axis=1,
 63        )
 64        .query(f"MAIN_SYSTEM_ID == {selected_main_system_id}")
 65        .assign(
 66            MAIN_SYSTEM_ID=lambda x: x["MAIN_SYSTEM_ID"].astype("int64"),
 67            LOAN_ID=lambda x: x["LOAN_ID"].astype("int64"),
 68            LOAN_ISSUANCE_DATE=lambda x: x["LOAN_ISSUANCE_DATE"].astype("<M8[ns]"),
 69            LOAN_AMOUNT=lambda x: x["LOAN_AMOUNT"].astype("float64"),
 70            TOTAL_INITIAL_AMOUNT=lambda x: x["TOTAL_INITIAL_AMOUNT"].astype("float64"),
 71            INITIAL_DATE=lambda x: x["INITIAL_DATE"].astype("<M8[ns]"),
 72        )
 73    )
 74
 75    fintech_df = (
 76        pd.read_csv(
 77            "data/Retailer_Transactions_Data.csv",
 78            header=0,
 79            dtype={
 80                "ID": np.dtype("int64"),
 81                "CREATED_AT": np.dtype("O"),
 82                "UPDATED_AT": np.dtype("O"),
 83                "AMOUNT": np.dtype("float64"),
 84                "FEES": np.dtype("float64"),
 85                "RETAILER_CUT": np.dtype("float64"),
 86                "STATUS": np.dtype("O"),
 87                "TOTAL_AMOUNT_INCLUDING_TAX": np.dtype("float64"),
 88                "TOTAL_AMOUNT_PAID": np.dtype("float64"),
 89                "WALLET_BALANCE_BEFORE_TRANSACTION": np.dtype("float64"),
 90                "MAIN_SYSTEM_ID": np.dtype("int64"),
 91            },
 92        )
 93        .query(f"MAIN_SYSTEM_ID == {selected_main_system_id}")
 94        .assign(
 95            CREATED_AT=lambda x: pd.to_datetime(
 96                x["CREATED_AT"], infer_datetime_format=True
 97            ),
 98            UPDATED_AT=lambda x: pd.to_datetime(
 99                x["UPDATED_AT"], infer_datetime_format=True
100            ),
101        )
102    )
103
104    ecommerce_df = (
105        pd.read_csv(
106            "data/Ecommerce_orders_Data.csv",
107            header=0,
108            dtype={
109                "ORDER_ID": np.dtype("int64"),
110                "MAIN_SYSTEM_ID": np.dtype("int64"),
111                "ORDER_PRICE": np.dtype("float64"),
112                "DISCOUNT": np.dtype("float64"),
113                "ORDER_PRICE_AFTER_DISCOUNT": np.dtype("float64"),
114                "ORDER_CREATION_DATE": np.dtype("O"),
115            },
116        )
117        .query(f"MAIN_SYSTEM_ID == {selected_main_system_id}")
118        .assign(
119            ORDER_CREATION_DATE=lambda x: pd.to_datetime(
120                x["ORDER_CREATION_DATE"], infer_datetime_format=True
121            ),
122        )
123    )
124
125    retailer_df = loans_df[["MAIN_SYSTEM_ID"]].drop_duplicates()
126
127    # Create an entity set and add the retailers entity
128    entity_set = ft.EntitySet(id="maxab_entity_set")
129    relationships = []
130
131    try:
132        entity_set = entity_set.add_dataframe(
133            dataframe_name="retailers",
134            dataframe=retailer_df,
135            index="MAIN_SYSTEM_ID",
136        )
137    except Exception as excpt:
138        raise Exception(
139            f"DFS failed for MAIN_SYSTEM_ID == {selected_main_system_id}"
140        ) from excpt
141
142    # Add the loans entity
143    try:
144        entity_set = entity_set.add_dataframe(
145            dataframe_name="loans",
146            dataframe=loans_df,
147            index="LOAN_ID",
148            time_index="LOAN_ISSUANCE_DATE",
149        )
150        rel_retailer_loans = ft.Relationship(
151            entity_set,
152            parent_dataframe_name="retailers",
153            parent_column_name="MAIN_SYSTEM_ID",
154            child_dataframe_name="loans",
155            child_column_name="MAIN_SYSTEM_ID",
156        )
157        relationships.append(rel_retailer_loans)
158    except Exception as excpt:
159        raise Exception(
160            f"DFS failed for MAIN_SYSTEM_ID == {selected_main_system_id}"
161        ) from excpt
162        pass
163
164    # Add the sales entity and relationship
165    try:
166        entity_set = entity_set.add_dataframe(
167            dataframe_name="sales",
168            dataframe=fintech_df,
169            index="ID",
170            time_index="CREATED_AT",
171            secondary_time_index={"UPDATED_AT": ["STATUS", "TOTAL_AMOUNT_PAID"]},
172        )
173        rel_retailer_sales = ft.Relationship(
174            entity_set,
175            parent_dataframe_name="retailers",
176            parent_column_name="MAIN_SYSTEM_ID",
177            child_dataframe_name="sales",
178            child_column_name="MAIN_SYSTEM_ID",
179        )
180        relationships.append(rel_retailer_sales)
181    except Exception:
182        print(
183            f"Fintech dataset seems to be empty for MAIN_SYSTEM_ID '{selected_main_system_id}'"
184        )
185
186    # Add the purchases entity
187    try:
188        entity_set = entity_set.add_dataframe(
189            dataframe_name="purchases",
190            dataframe=ecommerce_df,
191            index="ORDER_ID",
192            time_index="ORDER_CREATION_DATE",
193        )
194        rel_retailer_purchases = ft.Relationship(
195            entity_set,
196            parent_dataframe_name="retailers",
197            parent_column_name="MAIN_SYSTEM_ID",
198            child_dataframe_name="purchases",
199            child_column_name="MAIN_SYSTEM_ID",
200        )
201        relationships.append(rel_retailer_purchases)
202    except Exception:
203        print(
204            f"Ecommerce dataset seems to be empty for MAIN_SYSTEM_ID '{selected_main_system_id}'"
205        )
206
207    # Add the relationships to the entity set
208    entity_set = entity_set.add_relationships(relationships)
209
210    # Create list of Featuretools primitives
211    ft_valid_primitives_tuple = ft.get_valid_primitives(
212        entity_set, target_dataframe_name="loans", max_depth=2
213    )
214    FT_AGG_PRIMITIVES: List[str] = list(
215        map(lambda x: x().name, ft_valid_primitives_tuple[0])
216    )
217    FT_TRANSFORM_PRIMITIVES: List[str] = list(
218        map(lambda x: x().name, ft_valid_primitives_tuple[1])
219    )
220
221    # Remove buggy primitive 'expanding_count'
222    try:
223        FT_TRANSFORM_PRIMITIVES.remove("expanding_count")
224    except:
225        pass
226
227    # Create list of TSFresh primitives
228    TSFRESH_PARAMETERS = comprehensive_fc_parameters()
229
230    # Creates a list where each element refers one combination of primitive(parameters)
231    TSF_AGG_PRIMITIVES: List[AggregationPrimitive] = list(
232        itertools.chain(
233            *[
234                primitives_from_fc_settings(
235                    {
236                        getattr(tsfresh_primitives, key).name: TSFRESH_PARAMETERS[
237                            getattr(tsfresh_primitives, key).name
238                        ]
239                        or [{}]
240                    }
241                )
242                for key in dir(tsfresh_primitives)
243                if key[0].isupper()
244                and key != "SUPPORTED_PRIMITIVES"
245                and isinstance(
246                    getattr(tsfresh_primitives, key)(
247                        **(
248                            TSFRESH_PARAMETERS[getattr(tsfresh_primitives, key).name]
249                            or [{}]
250                        )[0]
251                    ),
252                    AggregationPrimitive,
253                )
254            ]
255        )
256    )
257
258    # Creates a list where each element refers to one combination of primitive(parameters)
259    TSF_TRANSFORM_PRIMITIVES: List[TransformPrimitive] = list(
260        itertools.chain(
261            *[
262                primitives_from_fc_settings(
263                    {
264                        getattr(tsfresh_primitives, key).name: TSFRESH_PARAMETERS[
265                            getattr(tsfresh_primitives, key).name
266                        ]
267                        or [{}]
268                    }
269                )
270                for key in dir(tsfresh_primitives)
271                if key[0].isupper()
272                and key != "SUPPORTED_PRIMITIVES"
273                and isinstance(
274                    getattr(tsfresh_primitives, key)(
275                        **(
276                            TSFRESH_PARAMETERS[getattr(tsfresh_primitives, key).name]
277                            or [{}]
278                        )[0]
279                    ),
280                    TransformPrimitive,
281                )
282            ]
283        )
284    )
285
286    # Run deep feature synthesis to create features between the entities
287    ft_featureframe, ft_feature_defs = ft.dfs(
288        entityset=entity_set,
289        target_dataframe_name="loans",
290        verbose=0,
291        agg_primitives=FT_AGG_PRIMITIVES,
292        trans_primitives=FT_TRANSFORM_PRIMITIVES,
293        max_depth=maxdepth,
294        # max_features=1000,
295        ignore_columns={
296            "loans": ["LOAN_ID", "MAIN_SYSTEM_ID"],
297            "sales": ["ID", "MAIN_SYSTEM_ID"],
298            "purchases": ["ORDER_ID", "MAIN_SYSTEM_ID"],
299        },
300        n_jobs=1,
301        cutoff_time_in_index=True,
302    )
303
304    tsf_featureframe, tsf_feature_defs = ft.dfs(
305        entityset=entity_set,
306        target_dataframe_name="loans",
307        verbose=0,
308        agg_primitives=TSF_AGG_PRIMITIVES,
309        trans_primitives=TSF_TRANSFORM_PRIMITIVES,
310        max_depth=maxdepth,
311        # max_features=1000,
312        ignore_columns={
313            "loans": ["LOAN_ID", "MAIN_SYSTEM_ID"],
314            "sales": ["ID", "MAIN_SYSTEM_ID"],
315            "purchases": ["ORDER_ID", "MAIN_SYSTEM_ID"],
316        },
317        n_jobs=1,
318        cutoff_time_in_index=True,
319    )
320
321    # Keep only LOAN_ID in the index
322    ft_featureframe = ft_featureframe.reset_index(level=1)
323    tsf_featureframe = tsf_featureframe.reset_index(level=1)
324
325    # Get shared columns to avoid duplication in merge
326    shared_columns = list(
327        set(ft_featureframe.columns).intersection(set(tsf_featureframe.columns))
328    )
329
330    featureframe = pd.merge(
331        ft_featureframe,
332        tsf_featureframe.drop(shared_columns, axis=1),
333        left_index=True,
334        right_index=True,
335    )
336    feature_defs = list(set(ft_feature_defs).union(set(tsf_feature_defs)))
337
338    # Clean up the feature frame a bit
339    featureframe, feature_defs = ft.selection.remove_highly_null_features(
340        featureframe, pct_null_threshold=0.25, features=feature_defs
341    )
342    featureframe, feature_defs = ft.selection.remove_low_information_features(
343        featureframe, features=feature_defs
344    )
345    featureframe, feature_defs = ft.selection.remove_single_value_features(
346        featureframe, features=feature_defs
347    )
348    featureframe, feature_defs = ft.selection.remove_single_value_features(
349        featureframe, features=feature_defs
350    )
351    featureframe, feature_defs = ft.selection.remove_highly_correlated_features(
352        featureframe, pct_corr_threshold=0.95, features=feature_defs
353    )
354
355    # Re-add column MAIN_SYSTEM_ID
356    featureframe = (
357        featureframe.join(
358            loans_df[["LOAN_ID", "MAIN_SYSTEM_ID"]].drop_duplicates(),
359            on="LOAN_ID",
360            how="left"
361        )
362    )
363    if not "LOAN_ID" in featureframe.columns:
364        featureframe = featureframe.reset_index(drop=False)
365
366    # Force all numeric columns to float and boolean to int for schema consistency
367    # I will downcast them once Spark is able to merge schema of chunks
368    featureframe = featureframe.astype(
369        {
370            **{
371                key: "float"
372                for key in featureframe.drop(
373                    ["LOAN_ID", "MAIN_SYSTEM_ID"],
374                    axis=1
375                ).select_dtypes(include=["number"])
376            },
377            **{key: "int" for key in featureframe.select_dtypes(include=["boolean"])},
378        }
379    )
380
381    # Write featureframe to storage
382    dest_path = f"./data/featureframe-maxdepth{maxdepth}.parquet/MAIN_SYSTEM_ID={selected_main_system_id}/part.snappy.parquet"
383    if not os.path.exists(dest_path):
384        _ = subprocess.run(["mkdir", "-p", dest_path.rsplit("/", 1)[-1]])
385
386    featureframe.to_parquet(dest_path, compression="snappy")
387
388    _ = subprocess.run(
389        f"mkdir -p data/featureframe.parquet/MAIN_SYSTEM_ID={selected_main_system_id}".split()
390    )
391    joblib.dump(feature_defs, f"data/featureframe-maxdepth{maxdepth}-definitions.joblib")

Because DFS generates a large number of combinations of aggregations and transformations, a lot of them will be correlated. It may be counterintuitive, but a large number of features is not helpful for ML models to learn, this is known as the Dimensionality Curse.

There are many techniques to address such problem. Since this is a case study, interpretability of final features can be sacrificed in favor of time. So, I decide to use PCA to drastically reduce dimensionality from +1K columns to 10 in hope those principal components will capture most of the variance in the featureframe. Here is the script for dimensionality reduction:

Dimensionality reduction, a.k.a feature selection
  1import argparse
  2from pyspark import StorageLevel
  3from pyspark.sql import SparkSession
  4from pyspark.sql.functions import col
  5from pyspark.ml import Pipeline
  6from pyspark.ml.feature import PCA, VectorAssembler, StringIndexer, Imputer
  7
  8
  9if __name__ == "__main__":
 10    parser = argparse.ArgumentParser(description="Feature selection step")
 11    parser.add_argument(
 12        "--maxdepth",
 13        type=int,
 14        help="Decide which featureframe to run selection on based on --maxdepth"
 15        + " used in the automatic feature engineering step",
 16    )
 17    args = parser.parse_args()
 18    maxdepth = args.maxdepth
 19
 20    # Start or fetch active Spark session
 21    spark = SparkSession.builder.getOrCreate()
 22
 23    # Load data
 24    featureframe = (
 25        spark.read.option("mergeSchema", "true")
 26        .parquet(
 27            "data/featureframe-maxdepth2.parquet"
 28        )
 29        .drop("__index_level_0__")
 30        .coalesce(int(spark.sparkContext.getConf().get("spark.executor.instances", "2")))
 31        .persist(StorageLevel.MEMORY_AND_DISK)
 32    )
 33
 34    # Parse column names to comply with SparkSQL
 35    featureframe = featureframe.select(
 36        [
 37            col(f"`{column}`").alias(
 38                column.replace("(", "__")
 39                .replace(")", "__")
 40                .replace(",", "_")
 41                .replace(".", "_")
 42                .replace(" % ", "_mod_")
 43                .replace(" / ", "_div_")
 44                .replace(" * ", "_mul_")
 45            )
 46            for column in featureframe.columns
 47        ]
 48    )
 49
 50    # This should no longer be necessary if _fetch_largest_dtype_for_numeric_feature works in dfs
 51    # # Convert boolean columns to numeric
 52    # featureframe = featureframe.select(
 53    #     [
 54    #         col(column_name)
 55    #         if column_dtype != "boolean"
 56    #         else col(column_name).cast("int").alias(column_name)
 57    #         for column_name, column_dtype in featureframe.dtypes
 58    #     ]
 59    # )
 60    metadata_cols = ["LOAN_ID", "MAIN_SYSTEM_ID"]
 61
 62    # Split featureframe into categorical and numeric columns
 63    cat_cols = [
 64        col_name for col_name, col_type in featureframe.dtypes if col_type == "string"
 65    ]
 66    num_cols = list(
 67        set(featureframe.drop(*metadata_cols).columns).difference(set(cat_cols))
 68    )
 69    assert set(cat_cols).intersection(set(num_cols)) == set(
 70        []
 71    ), "Failed to split featureframe into categorical and numeric features"
 72
 73    # Define an encoder for categorical features
 74    indexer_cols = [col_name + "_index" for col_name in cat_cols]
 75    indexer = StringIndexer(
 76        inputCols=cat_cols, outputCols=indexer_cols, handleInvalid="keep"
 77    )
 78
 79    # Define mean imputer for numeric columns
 80    imputer_cols = [col_name + "_imputed" for col_name in num_cols]
 81    imputer = Imputer(inputCols=num_cols, outputCols=imputer_cols)
 82
 83    # Define a vector assembler
 84    assembler = VectorAssembler(
 85        inputCols=[*imputer_cols, *indexer_cols],
 86        outputCol="assembled_features",
 87    )
 88
 89    # Define a PCA model
 90    pca = PCA(k=10, inputCol="assembled_features", outputCol="pca_features")
 91
 92    # Define a PipelineModel
 93    pipeline = Pipeline(stages=[indexer, imputer, assembler, pca])
 94
 95    # Fit the model to the data and persist it
 96    model = pipeline.fit(featureframe)
 97    model.write().overwrite().save(
 98        f"credit-risk/feature-engineering/pca-model-maxdepth{maxdepth}.mllib"
 99    )
100
101    # Transform the data using the model
102    selected_features = model.transform(featureframe).select(
103        *metadata_cols, "pca_features"
104    )
105
106    selected_features.coalesce(96).write.mode("overwrite").parquet(
107        f"data/pca-featureframe-maxdepth{maxdepth}.parquet"
108    )

8.2. Split Train and Test Data#

(
    featureframe.selectExpr(
        "count(label) filter (where label = 1) as positive_in_total",
        "count(label) filter (where label = 0) as negative_in_total",
        "count(label) filter (where label = 1 and stratified_split < 0.5) as positive_in_train",        
        "count(label) filter (where label = 0 and stratified_split < 0.6) as negative_in_train",
        "count(label) filter (where label = 1 and stratified_split between 0.5 and 0.7) as positive_in_valid",        
        "count(label) filter (where label = 0 and stratified_split between 0.6 and 0.8) as negative_in_valid",
        "count(label) filter (where label = 1 and stratified_split >= 0.7) as positive_in_test",        
        "count(label) filter (where label = 0 and stratified_split >= 0.8) as negative_in_test",        
    )
)

Class

Total

Train

Validation

Test

Negative

71166

42699

14234

14234

Positive

6

3

1

2

That’s weird. It seems we lost 3 positive (not fully paid) along the way. My hunch is that they were excluded due to TypeError: Time index column must be a Datetime or numeric column. This happened during Deep Feature Synthesis. I was hoping observations in the positive class wouldn’t be affected. Due to time constraint, I will follow along until the end of this experiment trial, then come back to this if I can manage a second trial.

Note

I found the reason. Not all retailers in Loans dataset have records in Fintech or Ecommerce. The way I created relationships between datasets meant that the script failed if a retailer did not have observations in either dataset. That is fixed now, it will automatically adjust to use only datasets available.