Simple moving average is an indicator many people use in analyzing stock price. Here I want to show how to use Spark’s window function to compute the moving average easily.
First, lets load the stock data
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
case class Stock(exc: String, symbol: String, stockDate: String, open: Double, high: Double, low: Double, close: Double,
volume: Double, adjClose: Double)
val data = sc.textFile("s3://giantify-stocks/APC_2016_08_03.csv")
val stocksData = data.map { d =>
val tokens = d.split(",")
Stock(tokens(0), tokens(1), tokens(2), tokens(3).toDouble, tokens(4).toDouble, tokens(5).toDouble, tokens(6).toDouble, tokens(7).toDouble,
tokens(8).toDouble)
}.toDF.cache()
val stocks = stocksData.withColumn("stockDate", to_date(col("stockDate")))
Next we will compute the 20 days, 50 days, 100 days simple moving averages
val movingAverageWindow20 = Window.orderBy("stockDate").rowsBetween(-20, 0)
val movingAverageWindow50 = Window.orderBy("stockDate").rowsBetween(-50, 0)
val movingAverageWindow100 = Window.orderBy("stockDate").rowsBetween(-100, 0)
// Calculate the moving average
val stocksMA = stocks.withColumn( "MA20", avg(stocks("close")).over(movingAverageWindow20)).withColumn( "MA50", avg(stocks("close")).over(movingAverageWindow50)).withColumn("MA100", avg(stocks("close")).over(movingAverageWindow100))
stocksMA.show()
stocksMA.filter("close > MA50").select(col("stockDate"), col("close"), col("MA50")).show()
With the moving average calculated, let’s find when closing price exceeds the 50 days moving average
stocksMA.filter("close > MA50").select(col("stockDate"), col("close"), col("MA50")).show()

Stay tuned for the next blog on how to use Zeppelin to visualize the price data