パイプラインを使用して、パーティション化されたDataFrameに基づいて多くのSparkMLlibモデルを作成します

ロブ

scala> spark.version res8: String = 2.2.0

列を含むsparkデータフレームを使用していますlocationID線形回帰モデルを構築するためにMLlibパイプラインを作成しましたlocationIDこれは、単一のデータをフィードするときに機能します。ここで、「locationID」ごとに多くのモデルを作成したいと思います(本番環境には数千のlocationIDがある場合があります)。各モデルのモデル係数を保存したいと思います。

Scalaでこれをどのように行うことができるかわかりません。

私のパイプラインは次のように定義されています。

import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.regression.LinearRegressionModel
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql


// Load the regression input data
val mydata = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("./inputdata.csv")

// Crate month one hot encoding
val monthIndexer = new StringIndexer()
  .setInputCol("month")
  .setOutputCol("monthIndex").fit(mydata)
val monthEncoder = new OneHotEncoder()
  .setInputCol(monthIndexer.getOutputCol)
  .setOutputCol("monthVec")
val assembler =  new VectorAssembler()
  .setInputCols(Array("monthVec","tran_adr"))
  .setOutputCol("features")
val lr = new LinearRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)
val pipeline = new Pipeline()
  .setStages(Array(monthIndexer, monthEncoder, assembler, lr))


// Fit using the model pipeline
val myPipelineModel = pipeline.fit(mydata)

次に、次のようにモデルの詳細を取得できます。

val modelExtract = myPipelineModel.stages(3).asInstanceOf[LinearRegressionModel]

println(s"Coefficients: ${modelExtract.coefficients} Intercept: ${modelExtract.intercept}")
// Summarize the model over the training set and print out some metrics
val trainingSummary = modelExtract.summary
println(s"numIterations: ${trainingSummary.totalIterations}")
println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
trainingSummary.residuals.show()
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"r2: ${trainingSummary.r2}")

今、私は列にグループ化するlocationIDで見つかったmydataとのデータの各パーティション上のパイプラインを実行します。

groupbyを使ってみましたが、集計しかできません。

val grouped = mydata.groupBy("locationID")

また、一意locationIDのリストをリストとして取得し、ループすることも試みました。

val locationList = mydata.select(mydata("prop_code")).distinct

locationList.foreach { printLn }

Sparkは多くの小さなモデルを作成するのに理想的ではなく、大量のデータセットで1つのモデルを作成するのに最適であることを知っていますが、概念実証としてこれを行うように依頼されました。

Sparkでこのようなことを行うための正しいアプローチは何ですか?

user9638074

Sparkでこのようなことを行うための正しいアプローチは何ですか?

良いアプローチはまったくないと主張するリスクがあります。インコアデータ処理を処理できる多くの高度なツールと、独立した学習タスクを調整するために使用できる多くのタスクスケジューリングライブラリがあります。Sparkはここでは何も提供していません。

スケジューリング機能は平凡であり、ML / MLlibツールも平凡であり、各タスクが独立している場合、スケーリングとフォールトトレランスは役に立ちません。

Sparkを汎用スケジューリングに使用することもできます(Pythonを使用してもかまわない場合、このアイデアはsklearnキーモデルで実装されます)が、それだけです。

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

Related 関連記事

ホットタグ

アーカイブ