Lets dive into the implementation of Logistic Regression in Spark. You can find the implementation in LogisticRegression.scala class. I am looking at the low level mllib library instead of the newer ml API.
/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
It defines the LogisticsRegressionModel which extends abstract class GeneralizedLinearModel (/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala) and ClassificationModel, Serializable, Saveable and PMMLExportable traits.
class LogisticRegressionModel @Since("1.3.0") ( @Since("1.0.0") override val weights: Vector, @Since("1.0.0") override val intercept: Double, @Since("1.3.0") val numFeatures: Int, @Since("1.3.0") val numClasses: Int) extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable with Saveable with PMMLExportable {
ClassificationModel defines various predict methods without concrete implementation
trait ClassificationModel extends Serializable { /** * Predict values for the given data set using the model trained. * * @param testData RDD representing data points to be predicted * @return an RDD[Double] where each entry contains the corresponding prediction */ @Since("1.0.0") def predict(testData: RDD[Vector]): RDD[Double] /** * Predict values for a single data point using the model trained. * * @param testData array representing a single data point * @return predicted category from the trained model */ @Since("1.0.0") def predict(testData: Vector): Double /** * Predict values for examples stored in a JavaRDD. * @param testData JavaRDD representing data points to be predicted * @return a JavaRDD[java.lang.Double] where each entry contains the corresponding prediction */ @Since("1.0.0") def predict(testData: JavaRDD[Vector]): JavaRDD[java.lang.Double] = predict(testData.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Double]] }
In LogisticRegressionModel’s predictPoint method, if the number of classes is 2 then we have binary classification, otherwise it will perform multinomial logistic regression classification.
override protected def predictPoint( dataMatrix: Vector, weightMatrix: Vector, intercept: Double) = { require(dataMatrix.size == numFeatures) // If dataMatrix and weightMatrix have the same dimension, it's binary logistic regression. if (numClasses == 2) { val margin = dot(weightMatrix, dataMatrix) + intercept val score = 1.0 / (1.0 + math.exp(-margin)) threshold match { case Some(t) => if (score > t) 1.0 else 0.0 case None => score } } else { /** * Compute and find the one with maximum margins. If the maxMargin is negative, then the * prediction result will be the first class. * * PS, if you want to compute the probabilities for each outcome instead of the outcome * with maximum probability, remember to subtract the maxMargin from margins if maxMargin * is positive to prevent overflow. */ var bestClass = 0 var maxMargin = 0.0 val withBias = dataMatrix.size + 1 == dataWithBiasSize (0 until numClasses - 1).foreach { i => var margin = 0.0 dataMatrix.foreachActive { (index, value) => if (value != 0.0) margin += value * weightsArray((i * dataWithBiasSize) + index) } // Intercept is required to be added into margin. if (withBias) { margin += weightsArray((i * dataWithBiasSize) + dataMatrix.size) } if (margin > maxMargin) { maxMargin = margin bestClass = i + 1 } } bestClass.toDouble } }
In binary classification, it uses the math below to calculate score value,
val margin = dot(weightMatrix, dataMatrix) + intercept
val score = 1.0 / (1.0 + math.exp(-margin))
if threshold is specified, then it checks calculated score against threshold and return class 1 if score is greater than threshold. Both dataMatrix and weightMatrix are of Vector type.
For binary logistic regression, it uses stochastic gradient descent with default L2 regularization to train the model. See the implementation of LogisticRegressionWithSGD below. It extends the abstract class GeneralizedLinearAlgorithm. GeneralizedLinearAlgorithm has an un-implemented empty method createModel as shown below.
protected def createModel(weights: Vector, intercept: Double): M
The abstract class has the base implementation of run(input: RDD[LabeledPoint], initialWeights: Vector): M method
which will call createModel method at the end.
Any extending class, eg. LogisticRegressionWithSGD provides the createModel implementation. It returns a trained classification model.
LogisticRegressionWithSGD contains the following train methods to build the model.
The train method takes RDD of LabeledPoint. LabeledPoint case class contains a double value (class label) and a Vector of features.
case class LabeledPoint @Since("1.0.0") ( @Since("0.8.0") label: Double, @Since("1.0.0") features: Vector) { override def toString: String = { s"($label,$features)" } }
@Since("0.8.0") class LogisticRegressionWithSGD private[mllib] ( private var stepSize: Double, private var numIterations: Int, private var regParam: Double, private var miniBatchFraction: Double) extends GeneralizedLinearAlgorithm[LogisticRegressionModel] with Serializable { private val gradient = new LogisticGradient() private val updater = new SquaredL2Updater() @Since("0.8.0") override val optimizer = new GradientDescent(gradient, updater) .setStepSize(stepSize) .setNumIterations(numIterations) .setRegParam(regParam) .setMiniBatchFraction(miniBatchFraction) override protected val validators = List(DataValidators.binaryLabelValidator) /** * Construct a LogisticRegression object with default parameters: {stepSize: 1.0, * numIterations: 100, regParm: 0.01, miniBatchFraction: 1.0}. */ @Since("0.8.0") def this() = this(1.0, 100, 0.01, 1.0) override protected[mllib] def createModel(weights: Vector, intercept: Double) = { new LogisticRegressionModel(weights, intercept) } } /** * Top-level methods for calling Logistic Regression using Stochastic Gradient Descent. * NOTE: Labels used in Logistic Regression should be {0, 1} */ @Since("0.8.0") object LogisticRegressionWithSGD { // NOTE(shivaram): We use multiple train methods instead of default arguments to support // Java programs. /** * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed * number of iterations of gradient descent using the specified step size. Each iteration uses * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in * gradient descent are initialized using the initial weights provided. * NOTE: Labels used in Logistic Regression should be {0, 1} * * @param input RDD of (label, array of features) pairs. * @param numIterations Number of iterations of gradient descent to run. * @param stepSize Step size to be used for each iteration of gradient descent. * @param miniBatchFraction Fraction of data to be used per iteration. * @param initialWeights Initial set of weights to be used. Array should be equal in size to * the number of features in the data. */ @Since("1.0.0") def train( input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, miniBatchFraction: Double, initialWeights: Vector): LogisticRegressionModel = { new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction) .run(input, initialWeights) } /** * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed * number of iterations of gradient descent using the specified step size. Each iteration uses * `miniBatchFraction` fraction of the data to calculate the gradient. * NOTE: Labels used in Logistic Regression should be {0, 1} * * @param input RDD of (label, array of features) pairs. * @param numIterations Number of iterations of gradient descent to run. * @param stepSize Step size to be used for each iteration of gradient descent. * @param miniBatchFraction Fraction of data to be used per iteration. */ @Since("1.0.0") def train( input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, miniBatchFraction: Double): LogisticRegressionModel = { new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction) .run(input) } /** * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed * number of iterations of gradient descent using the specified step size. We use the entire data * set to update the gradient in each iteration. * NOTE: Labels used in Logistic Regression should be {0, 1} * * @param input RDD of (label, array of features) pairs. * @param stepSize Step size to be used for each iteration of Gradient Descent. * @param numIterations Number of iterations of gradient descent to run. * @return a LogisticRegressionModel which has the weights and offset from training. */ @Since("1.0.0") def train( input: RDD[LabeledPoint], numIterations: Int, stepSize: Double): LogisticRegressionModel = { train(input, numIterations, stepSize, 1.0) } /** * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed * number of iterations of gradient descent using a step size of 1.0. We use the entire data set * to update the gradient in each iteration. * NOTE: Labels used in Logistic Regression should be {0, 1} * * @param input RDD of (label, array of features) pairs. * @param numIterations Number of iterations of gradient descent to run. * @return a LogisticRegressionModel which has the weights and offset from training. */ @Since("1.0.0") def train( input: RDD[LabeledPoint], numIterations: Int): LogisticRegressionModel = { train(input, numIterations, 1.0, 1.0) } }