Spark: Analyzing Stock Price

Simple moving average is an indicator many people use in analyzing stock price. Here I want to show how to use Spark’s window function to compute the moving average easily.

First, lets load the stock data

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

case class Stock(exc: String, symbol: String, stockDate: String, open: Double, high: Double, low: Double, close: Double,
                   volume: Double, adjClose: Double)

  val data = sc.textFile("s3://giantify-stocks/APC_2016_08_03.csv")
  val stocksData = data.map { d =>
    val tokens = d.split(",")
    Stock(tokens(0), tokens(1), tokens(2), tokens(3).toDouble, tokens(4).toDouble, tokens(5).toDouble, tokens(6).toDouble, tokens(7).toDouble,
      tokens(8).toDouble)
  }.toDF.cache()

  val stocks = stocksData.withColumn("stockDate", to_date(col("stockDate")))

Next we will compute the 20 days, 50 days, 100 days simple moving averages

val movingAverageWindow20 = Window.orderBy("stockDate").rowsBetween(-20, 0)
val movingAverageWindow50 = Window.orderBy("stockDate").rowsBetween(-50, 0)
val movingAverageWindow100 = Window.orderBy("stockDate").rowsBetween(-100, 0)

// Calculate the moving average
val stocksMA = stocks.withColumn( "MA20", avg(stocks("close")).over(movingAverageWindow20)).withColumn( "MA50", avg(stocks("close")).over(movingAverageWindow50)).withColumn("MA100", avg(stocks("close")).over(movingAverageWindow100))

stocksMA.show()

stocksMA.filter("close > MA50").select(col("stockDate"), col("close"), col("MA50")).show()

With the moving average calculated, let’s find when closing price exceeds the 50 days moving average

stocksMA.filter("close > MA50").select(col("stockDate"), col("close"), col("MA50")).show()
Screen Shot 2016-08-21 at 11.29.13 AM

Stay tuned for the next blog on how to use Zeppelin to visualize the price data

Useful Spark Code Snippets for Data Analytics

Here are some Spark code snippets you will find particularly useful when performing basic big data analytics

Read CSV

import com.databricks.spark.csv._
val data = sqlContext.read.format("com.databricks.spark.csv").option("header","true").option("inferSchema","true").load(YOUR_INPUT_PATH)

Read AVRO

import com.databricks.spark.avro._
val data = sqlContext.read.avro(YOUR_INPUT_PATH)

Read JSON

val data = sqlContext.read.json(YOUR_INPUT_PATH)

Most often or not, you will probably perform some aggregations

val result = data.groupBy("company_branch").count().sort(desc("count"))
val result = data.groupBy("company_branch", "department").count().sort(asc("company_branch"),desc("count"))

You would want to save your results back to CSV file again

result.write.format("com.databricks.spark.csv").save(YOUR_OUTPUT_PATH)

If you want to consolidate all the result part files into one single file, you can use the coalesce(1) method

result.coalesce(1).write.format("com.databricks.spark.csv").save(YOUR_OUTPUT_PATH)

To perform projection/selection,

data.select(col("name"), col("age"), col("department_name").alias("dept"))

To perform filtering

data.filter("age > 18")

To use SQL, call the registerTempTable method on the dataframe

data.registerTempTable("data")
sqlContext.sql("select name, age from data")