我正在尝试提取使用PySpark训练的随机森林对象的类概率。但是,我在文档的任何地方都没有看到它的示例,也不是的一种方法RandomForestModel
。
如何从RandomForestModel
PySpark中的分类器中提取类概率?
这是文档中提供的示例代码,仅提供最终课程(不提供概率):
from pyspark.mllib.tree import RandomForest
from pyspark.mllib.util import MLUtils
# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a RandomForest model.
# Empty categoricalFeaturesInfo indicates all features are continuous.
# Note: Use larger numTrees in practice.
# Setting featureSubsetStrategy="auto" lets the algorithm choose.
model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
numTrees=3, featureSubsetStrategy="auto",
impurity='gini', maxDepth=4, maxBins=32)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
我没有任何model.predict_proba()
方法-我该怎么办??
据我所知,当前版本(1.2.1)不支持此功能。原生Scala代码(tree.py)上的Python包装器仅定义了“预测”函数,这些函数依次调用了相应的Scala对应项(treeEnsembleModels.scala)。后者通过在二进制决策中进行表决来做出决策。提供一种概率预测的方法更为简洁,该概率预测可以像sklearn中那样任意设定阈值或用于ROC计算。应该为将来的版本添加此功能!
解决方法是,将predict_proba实现为纯Python函数(请参见下面的示例)。它既不优雅也不高效,因为它在森林中的一组独立决策树上运行一个循环。技巧-或更确切地说是一个肮脏的技巧-是访问Java决策树模型数组并将其转换为Python对应模型。之后,您可以在整个数据集中计算单个模型的预测,并使用“ zip”将其总和累加到RDD中。除以树木数量即可得到理想的结果。对于大型数据集,可以接受主节点中少量决策树上的循环。
由于将Python集成到Spark(以Java运行)的困难,下面的代码非常棘手。应该非常小心,不要将任何复杂的数据发送到工作程序节点,这会由于序列化问题而导致崩溃。不能在辅助节点上运行任何引用Spark上下文的代码。同样,任何引用任何Java代码的代码都不能序列化。例如,在下面的代码中使用len(trees)而不是ntrees可能很诱人-砰!用Java / Scala编写这样的包装器可能会更加优雅,例如,通过在工作节点上的决策树上运行循环,从而降低通信成本。
下面的测试函数演示了prepare_proba给出的测试错误与原始示例中使用的prepare相同。
def predict_proba(rf_model, data):
'''
This wrapper overcomes the "binary" nature of predictions in the native
RandomForestModel.
'''
# Collect the individual decision tree models by calling the underlying
# Java model. These are returned as JavaArray defined by py4j.
trees = rf_model._java_model.trees()
ntrees = rf_model.numTrees()
scores = DecisionTreeModel(trees[0]).predict(data.map(lambda x: x.features))
# For each decision tree, apply its prediction to the entire dataset and
# accumulate the results using 'zip'.
for i in range(1,ntrees):
dtm = DecisionTreeModel(trees[i])
scores = scores.zip(dtm.predict(data.map(lambda x: x.features)))
scores = scores.map(lambda x: x[0] + x[1])
# Divide the accumulated scores over the number of trees
return scores.map(lambda x: x/ntrees)
def testError(lap):
testErr = lap.filter(lambda (v, p): v != p).count() / float(testData.count())
print('Test Error = ' + str(testErr))
def testClassification(trainingData, testData):
model = RandomForest.trainClassifier(trainingData, numClasses=2,
categoricalFeaturesInfo={},
numTrees=50, maxDepth=30)
# Compute test error by thresholding probabilistic predictions
threshold = 0.5
scores = predict_proba(model,testData)
pred = scores.map(lambda x: 0 if x < threshold else 1)
lab_pred = testData.map(lambda lp: lp.label).zip(pred)
testError(lab_pred)
# Compute test error by comparing binary predictions
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testError(labelsAndPredictions)
总而言之,这是学习Spark的不错的练习!
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句