Isolation Forest and Pyspark

Isolation Forest and Pyspark

Lessons learned

Debugging PySpark and Isolation Forest — Image by authorDebugging PySpark and Isolation Forest — Image by author

So, after a few runs with the PySpark ml implementation of Isolation Forest presented here, I stumbled upon a couple of things and I thought I’d write about them so that you don’t waste the time I wasted troubleshooting.

Only Dense Vectors

In the previous article, I used VectorAssembler to gather the feature vectors. It so happened that the test data I had, created only DenseVectors, but when I tried the example on a different dataset, I realized that:

  • **VectorAssembler can create both Dense and Sparse vectors** in the same dataframe (which is smart and other spark ml argorithms can leverage it and work with it)

  • **Isolation Forest (or at least the implementation found here ) does not support the above**, so the input must be DenseVectors only.

To demonstrate the issue:

from pyspark import SparkConf
from pyspark.sql import SparkSession, functions as F
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark_iforest.ml.iforest import IForest, IForestModel
import tempfile

conf = SparkConf()
conf.set('spark.jars', '/full/path/to/spark-iforest-2.4.0.jar')

spark = SparkSession \
        .builder \
        .config(conf=conf) \
        .appName("IForestExample") \
        .getOrCreate()

temp_path = tempfile.mkdtemp()
iforest_path = temp_path + "/iforest"
model_path = temp_path + "/iforest_model"

data = [
    {'feature1': 1., 'feature2': 0., 'feature3': 0.3, 'feature4': 0.01},
    {'feature1': 10., 'feature2': 3., 'feature3': 0.9, 'feature4': 0.1},
    {'feature1': 101., 'feature2': 13., 'feature3': 0.9, 'feature4': 0.91},
    {'feature1': 111., 'feature2': 11., 'feature3': 1.2, 'feature4': 1.91},
    {'feature1': 0., 'feature2': 0., 'feature3': 0., 'feature4': 0.1},  #  issue happens when I add this line
]

# use a VectorAssembler to gather the features as Vectors (dense)
assembler = VectorAssembler(
    inputCols=list(data[0].keys()),
    outputCol="features"
)

df = spark.createDataFrame(data)
df.printSchema()
df = assembler.transform(df)
df.show()


# last line, features column: a sparse vector
# +--------+--------+--------+--------+--------------------+
# |feature1|feature2|feature3|feature4|            features|
# +--------+--------+--------+--------+--------------------+
# |     1.0|     0.0|     0.3|    0.01|  [1.0,0.0,0.3,0.01]|
# |    10.0|     3.0|     0.9|     0.1|  [10.0,3.0,0.9,0.1]|
# |   101.0|    13.0|     0.9|    0.91|[101.0,13.0,0.9,0...|
# |   111.0|    11.0|     1.2|    1.91|[111.0,11.0,1.2,1...|
# |     0.0|     0.0|     0.0|     0.1|       (4,[3],[0.1])|
# +--------+--------+--------+--------+--------------------+

The current workaround is to transform all vectors to dense, using a udf unfortunately.

from pyspark import SparkConf
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark_iforest.ml.iforest import IForest, IForestModel
from pyspark.ml.linalg import Vectors, VectorUDT

conf = SparkConf()
conf.set('spark.jars', '/full/path/to/spark-iforest-2.4.0.jar')

spark = SparkSession \
        .builder \
        .config(conf=conf) \
        .appName("IForestExample") \
        .getOrCreate()

data = [
    {'feature1': 1., 'feature2': 0., 'feature3': 0.3, 'feature4': 0.01},
    {'feature1': 10., 'feature2': 3., 'feature3': 0.9, 'feature4': 0.1},
    {'feature1': 101., 'feature2': 13., 'feature3': 0.9, 'feature4': 0.91},
    {'feature1': 111., 'feature2': 11., 'feature3': 1.2, 'feature4': 1.91},
    {'feature1': 0., 'feature2': 0., 'feature3': 0., 'feature4': 0.1}, 
]

to_dense_vector_udf = F.udf(lambda l: Vectors.dense(l), VectorUDT())

df = spark.createDataFrame(data)
df.printSchema()
df = df.withColumn('features', F.array(*df.columns))
df = df.withColumn('vectorized_features', to_dense_vector_udf('features'))

df.show()
df.printSchema()

# root
#  |-- feature1: double (nullable = true)
#  |-- feature2: double (nullable = true)
#  |-- feature3: double (nullable = true)
#  |-- feature4: double (nullable = true)
# 
# +--------+--------+--------+--------+--------------------+--------------------+
# |feature1|feature2|feature3|feature4|            features| vectorized_features|
# +--------+--------+--------+--------+--------------------+--------------------+
# |     1.0|     0.0|     0.3|    0.01|[1.0, 0.0, 0.3, 0...|  [1.0,0.0,0.3,0.01]|
# |    10.0|     3.0|     0.9|     0.1|[10.0, 3.0, 0.9, ...|  [10.0,3.0,0.9,0.1]|
# |   101.0|    13.0|     0.9|    0.91|[101.0, 13.0, 0.9...|[101.0,13.0,0.9,0...|
# |   111.0|    11.0|     1.2|    1.91|[111.0, 11.0, 1.2...|[111.0,11.0,1.2,1...|
# |     0.0|     0.0|     0.0|     0.1|[0.0, 0.0, 0.0, 0.1]|   [0.0,0.0,0.0,0.1]|
# +--------+--------+--------+--------+--------------------+--------------------+
# 
# root
#  |-- feature1: double (nullable = true)
#  |-- feature2: double (nullable = true)
#  |-- feature3: double (nullable = true)
#  |-- feature4: double (nullable = true)
#  |-- features: array (nullable = false)
#  |    |-- element: double (containsNull = true)
#  |-- vectorized_features: vector (nullable = true)

scaler = StandardScaler(inputCol='vectorized_features', outputCol='scaled_features')
iforest = IForest(contamination=0.3, maxDepth=2)
iforest.setFeaturesCol('scaled_features')
iforest.setSeed(42)  # for reproducibility

scaler_model = scaler.fit(df)
df = scaler_model.transform(df)
model = iforest.fit(df)

Avoid OOM and executor communication issues

Set the approxQuantileRelativeError* or the threshold parameter for large datasets*

If you are planning on training on a big dataset, e.g. more than 10M rows, even if you set parameters like maxSamples and maxFeatures to decrease the dimensions, you need to set the approxQuantileRelativeError parameter to something reasonable, e.g. 0.2. The reason is that the approxQuantile function is a very expensive function to use, especially if we expect the approxQuantileRelativeError to be 0. (which is the default value). Doing this, drastically cut the training time and the OOM and executor communication issues, and, up to now, I haven’t seen any decline in prediction accuracy.

From the comments of the implementation:

** The proportion of outliers in the data set (0< contamination < 1).
* It will be used in the prediction. In order to enhance performance,
* Our method to get anomaly score threshold adopts DataFrameStsFunctions.approxQuantile,
* which is designed for performance with some extent accuracy loss.
* Set the param approxQuantileRelativeError (0 < e < 1) to calculate
* an approximate quantile threshold of anomaly scores for large dataset.*

Alternatively, set the threshold beforehand in fit and avoid the approxQuantile calculation like this:

model = iforest.fit(df, {**'threshold'**: 0.5})
print(model.getThreshold())

> 0.49272560194039505

Hope this helps and saves you some time :) Let me know if you have any suggestions, corrections or ideas.

Read more about how to use Isolation Forest with pyspark here:

Isolation Forest and Spark Main characteristics and ways to use Isolation Forest in PySparktowardsdatascience.com

Where to next?

Understanding your model’s predictions: Machine Learning Interpretability — Shapley Values with PySpark Interpreting Isolation Forest’s predictions — and not onlymedium.com