Simple moving average is an indicator many people use in analyzing stock price. Here I want to show how to use Spark’s window function to compute the moving average easily.
First, lets load the stock data
import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ case class Stock(exc: String, symbol: String, stockDate: String, open: Double, high: Double, low: Double, close: Double, volume: Double, adjClose: Double) val data = sc.textFile("s3://giantify-stocks/APC_2016_08_03.csv") val stocksData = data.map { d => val tokens = d.split(",") Stock(tokens(0), tokens(1), tokens(2), tokens(3).toDouble, tokens(4).toDouble, tokens(5).toDouble, tokens(6).toDouble, tokens(7).toDouble, tokens(8).toDouble) }.toDF.cache() val stocks = stocksData.withColumn("stockDate", to_date(col("stockDate")))
Next we will compute the 20 days, 50 days, 100 days simple moving averages
val movingAverageWindow20 = Window.orderBy("stockDate").rowsBetween(-20, 0) val movingAverageWindow50 = Window.orderBy("stockDate").rowsBetween(-50, 0) val movingAverageWindow100 = Window.orderBy("stockDate").rowsBetween(-100, 0) // Calculate the moving average val stocksMA = stocks.withColumn( "MA20", avg(stocks("close")).over(movingAverageWindow20)).withColumn( "MA50", avg(stocks("close")).over(movingAverageWindow50)).withColumn("MA100", avg(stocks("close")).over(movingAverageWindow100)) stocksMA.show() stocksMA.filter("close > MA50").select(col("stockDate"), col("close"), col("MA50")).show()
With the moving average calculated, let’s find when closing price exceeds the 50 days moving average
stocksMA.filter("close > MA50").select(col("stockDate"), col("close"), col("MA50")).show()
Stay tuned for the next blog on how to use Zeppelin to visualize the price data