8. Feature Engineering#
8.1. Automatic Feature Engineering#
Due to Deep Feature Synthesis (DFS) being a computationally expensive process, I decided to split it into retailer chunks, i.e. one execution per retailer, to maximize parallelism. Provided a list of MAIN_SYSTEM_ID, one can parallelize the execution in bash with:
PYARROW_IGNORE_TIMEZONE=1 \
time parallel \
-j 200% \
-a data/retailer_ids.csv \
python automatic-feature-engineering.py \
--retailerid {} \
--maxdepth 2
And here is the script itself (loaded from repository):
DFS script
1import argparse
2import joblib
3import os
4import re
5import subprocess
6from typing import List, Tuple
7import itertools
8import pandas as pd
9import numpy as np
10import featuretools as ft
11from featuretools.primitives import AggregationPrimitive, TransformPrimitive
12from featuretools_tsfresh_primitives import (
13 comprehensive_fc_parameters,
14 primitives_from_fc_settings,
15)
16from featuretools_tsfresh_primitives import primitives as tsfresh_primitives
17from featuretools_tsfresh_primitives.primitives import *
18
19
20if __name__ == "__main__":
21 parser = argparse.ArgumentParser(description="Automatic feature engineering step")
22 parser.add_argument(
23 "--retailerid",
24 type=int,
25 help="MAIN_SYSTEM_ID",
26 )
27 parser.add_argument(
28 "--maxdepth",
29 type=int,
30 default=2,
31 help="Dictates the complexity of features built;"
32 + " the higher the number, the more expensive the compute.",
33 )
34 args = parser.parse_args()
35 selected_main_system_id = args.retailerid
36 maxdepth = args.maxdepth
37 print(f"\n\nStart DFS run for MAIN_SYSTEM_ID == {selected_main_system_id}")
38
39 # Load datasets
40 loans_df = (
41 pd.read_excel("data/Loans_Data.xlsx")
42 .drop(
43 [
44 # Low signal columns
45 "INITIAL_COST",
46 "INDEX",
47 "REPAYMENT_ID",
48 "FINAL_COST",
49 "RETAILER_ID",
50 # Columns populated after the fact, thus would lead to data leak
51 "REPAYMENT_UPDATED",
52 "SPENT",
53 "TOTAL_FINAL_AMOUNT",
54 "FIRST_TRIAL_BALANCE",
55 "FIRST_TRAIL_DELAYS",
56 "PAYMENT_AMOUNT",
57 "LOAN_PAYMENT_DATE",
58 "REPAYMENT_AMOUNT",
59 "CUMMULATIVE_OUTSTANDING",
60 "PAYMENT_STATUS",
61 ],
62 axis=1,
63 )
64 .query(f"MAIN_SYSTEM_ID == {selected_main_system_id}")
65 .assign(
66 MAIN_SYSTEM_ID=lambda x: x["MAIN_SYSTEM_ID"].astype("int64"),
67 LOAN_ID=lambda x: x["LOAN_ID"].astype("int64"),
68 LOAN_ISSUANCE_DATE=lambda x: x["LOAN_ISSUANCE_DATE"].astype("<M8[ns]"),
69 LOAN_AMOUNT=lambda x: x["LOAN_AMOUNT"].astype("float64"),
70 TOTAL_INITIAL_AMOUNT=lambda x: x["TOTAL_INITIAL_AMOUNT"].astype("float64"),
71 INITIAL_DATE=lambda x: x["INITIAL_DATE"].astype("<M8[ns]"),
72 )
73 )
74
75 fintech_df = (
76 pd.read_csv(
77 "data/Retailer_Transactions_Data.csv",
78 header=0,
79 dtype={
80 "ID": np.dtype("int64"),
81 "CREATED_AT": np.dtype("O"),
82 "UPDATED_AT": np.dtype("O"),
83 "AMOUNT": np.dtype("float64"),
84 "FEES": np.dtype("float64"),
85 "RETAILER_CUT": np.dtype("float64"),
86 "STATUS": np.dtype("O"),
87 "TOTAL_AMOUNT_INCLUDING_TAX": np.dtype("float64"),
88 "TOTAL_AMOUNT_PAID": np.dtype("float64"),
89 "WALLET_BALANCE_BEFORE_TRANSACTION": np.dtype("float64"),
90 "MAIN_SYSTEM_ID": np.dtype("int64"),
91 },
92 )
93 .query(f"MAIN_SYSTEM_ID == {selected_main_system_id}")
94 .assign(
95 CREATED_AT=lambda x: pd.to_datetime(
96 x["CREATED_AT"], infer_datetime_format=True
97 ),
98 UPDATED_AT=lambda x: pd.to_datetime(
99 x["UPDATED_AT"], infer_datetime_format=True
100 ),
101 )
102 )
103
104 ecommerce_df = (
105 pd.read_csv(
106 "data/Ecommerce_orders_Data.csv",
107 header=0,
108 dtype={
109 "ORDER_ID": np.dtype("int64"),
110 "MAIN_SYSTEM_ID": np.dtype("int64"),
111 "ORDER_PRICE": np.dtype("float64"),
112 "DISCOUNT": np.dtype("float64"),
113 "ORDER_PRICE_AFTER_DISCOUNT": np.dtype("float64"),
114 "ORDER_CREATION_DATE": np.dtype("O"),
115 },
116 )
117 .query(f"MAIN_SYSTEM_ID == {selected_main_system_id}")
118 .assign(
119 ORDER_CREATION_DATE=lambda x: pd.to_datetime(
120 x["ORDER_CREATION_DATE"], infer_datetime_format=True
121 ),
122 )
123 )
124
125 retailer_df = loans_df[["MAIN_SYSTEM_ID"]].drop_duplicates()
126
127 # Create an entity set and add the retailers entity
128 entity_set = ft.EntitySet(id="maxab_entity_set")
129 relationships = []
130
131 try:
132 entity_set = entity_set.add_dataframe(
133 dataframe_name="retailers",
134 dataframe=retailer_df,
135 index="MAIN_SYSTEM_ID",
136 )
137 except Exception as excpt:
138 raise Exception(
139 f"DFS failed for MAIN_SYSTEM_ID == {selected_main_system_id}"
140 ) from excpt
141
142 # Add the loans entity
143 try:
144 entity_set = entity_set.add_dataframe(
145 dataframe_name="loans",
146 dataframe=loans_df,
147 index="LOAN_ID",
148 time_index="LOAN_ISSUANCE_DATE",
149 )
150 rel_retailer_loans = ft.Relationship(
151 entity_set,
152 parent_dataframe_name="retailers",
153 parent_column_name="MAIN_SYSTEM_ID",
154 child_dataframe_name="loans",
155 child_column_name="MAIN_SYSTEM_ID",
156 )
157 relationships.append(rel_retailer_loans)
158 except Exception as excpt:
159 raise Exception(
160 f"DFS failed for MAIN_SYSTEM_ID == {selected_main_system_id}"
161 ) from excpt
162 pass
163
164 # Add the sales entity and relationship
165 try:
166 entity_set = entity_set.add_dataframe(
167 dataframe_name="sales",
168 dataframe=fintech_df,
169 index="ID",
170 time_index="CREATED_AT",
171 secondary_time_index={"UPDATED_AT": ["STATUS", "TOTAL_AMOUNT_PAID"]},
172 )
173 rel_retailer_sales = ft.Relationship(
174 entity_set,
175 parent_dataframe_name="retailers",
176 parent_column_name="MAIN_SYSTEM_ID",
177 child_dataframe_name="sales",
178 child_column_name="MAIN_SYSTEM_ID",
179 )
180 relationships.append(rel_retailer_sales)
181 except Exception:
182 print(
183 f"Fintech dataset seems to be empty for MAIN_SYSTEM_ID '{selected_main_system_id}'"
184 )
185
186 # Add the purchases entity
187 try:
188 entity_set = entity_set.add_dataframe(
189 dataframe_name="purchases",
190 dataframe=ecommerce_df,
191 index="ORDER_ID",
192 time_index="ORDER_CREATION_DATE",
193 )
194 rel_retailer_purchases = ft.Relationship(
195 entity_set,
196 parent_dataframe_name="retailers",
197 parent_column_name="MAIN_SYSTEM_ID",
198 child_dataframe_name="purchases",
199 child_column_name="MAIN_SYSTEM_ID",
200 )
201 relationships.append(rel_retailer_purchases)
202 except Exception:
203 print(
204 f"Ecommerce dataset seems to be empty for MAIN_SYSTEM_ID '{selected_main_system_id}'"
205 )
206
207 # Add the relationships to the entity set
208 entity_set = entity_set.add_relationships(relationships)
209
210 # Create list of Featuretools primitives
211 ft_valid_primitives_tuple = ft.get_valid_primitives(
212 entity_set, target_dataframe_name="loans", max_depth=2
213 )
214 FT_AGG_PRIMITIVES: List[str] = list(
215 map(lambda x: x().name, ft_valid_primitives_tuple[0])
216 )
217 FT_TRANSFORM_PRIMITIVES: List[str] = list(
218 map(lambda x: x().name, ft_valid_primitives_tuple[1])
219 )
220
221 # Remove buggy primitive 'expanding_count'
222 try:
223 FT_TRANSFORM_PRIMITIVES.remove("expanding_count")
224 except:
225 pass
226
227 # Create list of TSFresh primitives
228 TSFRESH_PARAMETERS = comprehensive_fc_parameters()
229
230 # Creates a list where each element refers one combination of primitive(parameters)
231 TSF_AGG_PRIMITIVES: List[AggregationPrimitive] = list(
232 itertools.chain(
233 *[
234 primitives_from_fc_settings(
235 {
236 getattr(tsfresh_primitives, key).name: TSFRESH_PARAMETERS[
237 getattr(tsfresh_primitives, key).name
238 ]
239 or [{}]
240 }
241 )
242 for key in dir(tsfresh_primitives)
243 if key[0].isupper()
244 and key != "SUPPORTED_PRIMITIVES"
245 and isinstance(
246 getattr(tsfresh_primitives, key)(
247 **(
248 TSFRESH_PARAMETERS[getattr(tsfresh_primitives, key).name]
249 or [{}]
250 )[0]
251 ),
252 AggregationPrimitive,
253 )
254 ]
255 )
256 )
257
258 # Creates a list where each element refers to one combination of primitive(parameters)
259 TSF_TRANSFORM_PRIMITIVES: List[TransformPrimitive] = list(
260 itertools.chain(
261 *[
262 primitives_from_fc_settings(
263 {
264 getattr(tsfresh_primitives, key).name: TSFRESH_PARAMETERS[
265 getattr(tsfresh_primitives, key).name
266 ]
267 or [{}]
268 }
269 )
270 for key in dir(tsfresh_primitives)
271 if key[0].isupper()
272 and key != "SUPPORTED_PRIMITIVES"
273 and isinstance(
274 getattr(tsfresh_primitives, key)(
275 **(
276 TSFRESH_PARAMETERS[getattr(tsfresh_primitives, key).name]
277 or [{}]
278 )[0]
279 ),
280 TransformPrimitive,
281 )
282 ]
283 )
284 )
285
286 # Run deep feature synthesis to create features between the entities
287 ft_featureframe, ft_feature_defs = ft.dfs(
288 entityset=entity_set,
289 target_dataframe_name="loans",
290 verbose=0,
291 agg_primitives=FT_AGG_PRIMITIVES,
292 trans_primitives=FT_TRANSFORM_PRIMITIVES,
293 max_depth=maxdepth,
294 # max_features=1000,
295 ignore_columns={
296 "loans": ["LOAN_ID", "MAIN_SYSTEM_ID"],
297 "sales": ["ID", "MAIN_SYSTEM_ID"],
298 "purchases": ["ORDER_ID", "MAIN_SYSTEM_ID"],
299 },
300 n_jobs=1,
301 cutoff_time_in_index=True,
302 )
303
304 tsf_featureframe, tsf_feature_defs = ft.dfs(
305 entityset=entity_set,
306 target_dataframe_name="loans",
307 verbose=0,
308 agg_primitives=TSF_AGG_PRIMITIVES,
309 trans_primitives=TSF_TRANSFORM_PRIMITIVES,
310 max_depth=maxdepth,
311 # max_features=1000,
312 ignore_columns={
313 "loans": ["LOAN_ID", "MAIN_SYSTEM_ID"],
314 "sales": ["ID", "MAIN_SYSTEM_ID"],
315 "purchases": ["ORDER_ID", "MAIN_SYSTEM_ID"],
316 },
317 n_jobs=1,
318 cutoff_time_in_index=True,
319 )
320
321 # Keep only LOAN_ID in the index
322 ft_featureframe = ft_featureframe.reset_index(level=1)
323 tsf_featureframe = tsf_featureframe.reset_index(level=1)
324
325 # Get shared columns to avoid duplication in merge
326 shared_columns = list(
327 set(ft_featureframe.columns).intersection(set(tsf_featureframe.columns))
328 )
329
330 featureframe = pd.merge(
331 ft_featureframe,
332 tsf_featureframe.drop(shared_columns, axis=1),
333 left_index=True,
334 right_index=True,
335 )
336 feature_defs = list(set(ft_feature_defs).union(set(tsf_feature_defs)))
337
338 # Clean up the feature frame a bit
339 featureframe, feature_defs = ft.selection.remove_highly_null_features(
340 featureframe, pct_null_threshold=0.25, features=feature_defs
341 )
342 featureframe, feature_defs = ft.selection.remove_low_information_features(
343 featureframe, features=feature_defs
344 )
345 featureframe, feature_defs = ft.selection.remove_single_value_features(
346 featureframe, features=feature_defs
347 )
348 featureframe, feature_defs = ft.selection.remove_single_value_features(
349 featureframe, features=feature_defs
350 )
351 featureframe, feature_defs = ft.selection.remove_highly_correlated_features(
352 featureframe, pct_corr_threshold=0.95, features=feature_defs
353 )
354
355 # Re-add column MAIN_SYSTEM_ID
356 featureframe = (
357 featureframe.join(
358 loans_df[["LOAN_ID", "MAIN_SYSTEM_ID"]].drop_duplicates(),
359 on="LOAN_ID",
360 how="left"
361 )
362 )
363 if not "LOAN_ID" in featureframe.columns:
364 featureframe = featureframe.reset_index(drop=False)
365
366 # Force all numeric columns to float and boolean to int for schema consistency
367 # I will downcast them once Spark is able to merge schema of chunks
368 featureframe = featureframe.astype(
369 {
370 **{
371 key: "float"
372 for key in featureframe.drop(
373 ["LOAN_ID", "MAIN_SYSTEM_ID"],
374 axis=1
375 ).select_dtypes(include=["number"])
376 },
377 **{key: "int" for key in featureframe.select_dtypes(include=["boolean"])},
378 }
379 )
380
381 # Write featureframe to storage
382 dest_path = f"./data/featureframe-maxdepth{maxdepth}.parquet/MAIN_SYSTEM_ID={selected_main_system_id}/part.snappy.parquet"
383 if not os.path.exists(dest_path):
384 _ = subprocess.run(["mkdir", "-p", dest_path.rsplit("/", 1)[-1]])
385
386 featureframe.to_parquet(dest_path, compression="snappy")
387
388 _ = subprocess.run(
389 f"mkdir -p data/featureframe.parquet/MAIN_SYSTEM_ID={selected_main_system_id}".split()
390 )
391 joblib.dump(feature_defs, f"data/featureframe-maxdepth{maxdepth}-definitions.joblib")
Because DFS generates a large number of combinations of aggregations and transformations, a lot of them will be correlated. It may be counterintuitive, but a large number of features is not helpful for ML models to learn, this is known as the Dimensionality Curse.
There are many techniques to address such problem. Since this is a case study, interpretability of final features can be sacrificed in favor of time. So, I decide to use PCA to drastically reduce dimensionality from +1K columns to 10 in hope those principal components will capture most of the variance in the featureframe. Here is the script for dimensionality reduction:
Dimensionality reduction, a.k.a feature selection
1import argparse
2from pyspark import StorageLevel
3from pyspark.sql import SparkSession
4from pyspark.sql.functions import col
5from pyspark.ml import Pipeline
6from pyspark.ml.feature import PCA, VectorAssembler, StringIndexer, Imputer
7
8
9if __name__ == "__main__":
10 parser = argparse.ArgumentParser(description="Feature selection step")
11 parser.add_argument(
12 "--maxdepth",
13 type=int,
14 help="Decide which featureframe to run selection on based on --maxdepth"
15 + " used in the automatic feature engineering step",
16 )
17 args = parser.parse_args()
18 maxdepth = args.maxdepth
19
20 # Start or fetch active Spark session
21 spark = SparkSession.builder.getOrCreate()
22
23 # Load data
24 featureframe = (
25 spark.read.option("mergeSchema", "true")
26 .parquet(
27 "data/featureframe-maxdepth2.parquet"
28 )
29 .drop("__index_level_0__")
30 .coalesce(int(spark.sparkContext.getConf().get("spark.executor.instances", "2")))
31 .persist(StorageLevel.MEMORY_AND_DISK)
32 )
33
34 # Parse column names to comply with SparkSQL
35 featureframe = featureframe.select(
36 [
37 col(f"`{column}`").alias(
38 column.replace("(", "__")
39 .replace(")", "__")
40 .replace(",", "_")
41 .replace(".", "_")
42 .replace(" % ", "_mod_")
43 .replace(" / ", "_div_")
44 .replace(" * ", "_mul_")
45 )
46 for column in featureframe.columns
47 ]
48 )
49
50 # This should no longer be necessary if _fetch_largest_dtype_for_numeric_feature works in dfs
51 # # Convert boolean columns to numeric
52 # featureframe = featureframe.select(
53 # [
54 # col(column_name)
55 # if column_dtype != "boolean"
56 # else col(column_name).cast("int").alias(column_name)
57 # for column_name, column_dtype in featureframe.dtypes
58 # ]
59 # )
60 metadata_cols = ["LOAN_ID", "MAIN_SYSTEM_ID"]
61
62 # Split featureframe into categorical and numeric columns
63 cat_cols = [
64 col_name for col_name, col_type in featureframe.dtypes if col_type == "string"
65 ]
66 num_cols = list(
67 set(featureframe.drop(*metadata_cols).columns).difference(set(cat_cols))
68 )
69 assert set(cat_cols).intersection(set(num_cols)) == set(
70 []
71 ), "Failed to split featureframe into categorical and numeric features"
72
73 # Define an encoder for categorical features
74 indexer_cols = [col_name + "_index" for col_name in cat_cols]
75 indexer = StringIndexer(
76 inputCols=cat_cols, outputCols=indexer_cols, handleInvalid="keep"
77 )
78
79 # Define mean imputer for numeric columns
80 imputer_cols = [col_name + "_imputed" for col_name in num_cols]
81 imputer = Imputer(inputCols=num_cols, outputCols=imputer_cols)
82
83 # Define a vector assembler
84 assembler = VectorAssembler(
85 inputCols=[*imputer_cols, *indexer_cols],
86 outputCol="assembled_features",
87 )
88
89 # Define a PCA model
90 pca = PCA(k=10, inputCol="assembled_features", outputCol="pca_features")
91
92 # Define a PipelineModel
93 pipeline = Pipeline(stages=[indexer, imputer, assembler, pca])
94
95 # Fit the model to the data and persist it
96 model = pipeline.fit(featureframe)
97 model.write().overwrite().save(
98 f"credit-risk/feature-engineering/pca-model-maxdepth{maxdepth}.mllib"
99 )
100
101 # Transform the data using the model
102 selected_features = model.transform(featureframe).select(
103 *metadata_cols, "pca_features"
104 )
105
106 selected_features.coalesce(96).write.mode("overwrite").parquet(
107 f"data/pca-featureframe-maxdepth{maxdepth}.parquet"
108 )
8.2. Split Train and Test Data#
(
featureframe.selectExpr(
"count(label) filter (where label = 1) as positive_in_total",
"count(label) filter (where label = 0) as negative_in_total",
"count(label) filter (where label = 1 and stratified_split < 0.5) as positive_in_train",
"count(label) filter (where label = 0 and stratified_split < 0.6) as negative_in_train",
"count(label) filter (where label = 1 and stratified_split between 0.5 and 0.7) as positive_in_valid",
"count(label) filter (where label = 0 and stratified_split between 0.6 and 0.8) as negative_in_valid",
"count(label) filter (where label = 1 and stratified_split >= 0.7) as positive_in_test",
"count(label) filter (where label = 0 and stratified_split >= 0.8) as negative_in_test",
)
)
Class |
Total |
Train |
Validation |
Test |
|---|---|---|---|---|
Negative |
71166 |
42699 |
14234 |
14234 |
Positive |
6 |
3 |
1 |
2 |
That’s weird. It seems we lost 3 positive (not fully paid) along the way. My hunch is that they were excluded due to TypeError: Time index column must be a Datetime or numeric column.
This happened during Deep Feature Synthesis. I was hoping observations in the positive class wouldn’t be affected. Due to time constraint, I will follow along until the end of this experiment trial, then come back to this if I can manage a second trial.
Note
I found the reason. Not all retailers in Loans dataset have records in Fintech or Ecommerce. The way I created relationships between datasets meant that the script failed if a retailer did not have observations in either dataset. That is fixed now, it will automatically adjust to use only datasets available.