When calculating the AUC with PySpark, it can be easily calculated by using the BinaryClassificationEvaluator
class.
However, as it is, it is not possible to meet the need to calculate the AUC for each segment instead of the entire test data in order to grasp the difference between the models.
As a workaround, I defined an aggregate function that calculates AUC using pandas_udf
and calculated it with the ʻagg` method.
The sample is as follows.
After calculating the correct label (true
) and predicted score (pred
) in advance, an aggregate function that calculates the AUC is defined with reference to it.
Note that the aggregate function defined by pandas_udf
in the ʻaggmethod cannot be used together with the aggregate function provided by spark. (If you try to use it together, you will get the error
Cannot use a mixture of aggregate function and group aggregate pandas UDF`)
UDF definition
from sklearn.metrics import roc_auc_score
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf(DoubleType(), functionType=PandasUDFType.GROUPED_AGG)
def auc_udf(true, pred):
return roc_auc_score(true, pred)
Calculation method
data.groupBy(key).agg(auc_udf('true', 'pred').alias('auc'))