Spark Dedup before Join

In Spark, as with any SQL left outer join, it will produce more rows than the total number of rows in the left table if the right table has duplicates.

You could first drop the duplicates on the right table before performing join as follows.


Or you could also do a groupBy and aggregate

Spark: Analyzing Stock Price

Simple moving average is an indicator many people use in analyzing stock price. Here I want to show how to use Spark’s window function to compute the moving average easily.

First, lets load the stock data

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

case class Stock(exc: String, symbol: String, stockDate: String, open: Double, high: Double, low: Double, close: Double,
                   volume: Double, adjClose: Double)

  val data = sc.textFile("s3://giantify-stocks/APC_2016_08_03.csv")
  val stocksData = { d =>
    val tokens = d.split(",")
    Stock(tokens(0), tokens(1), tokens(2), tokens(3).toDouble, tokens(4).toDouble, tokens(5).toDouble, tokens(6).toDouble, tokens(7).toDouble,

  val stocks = stocksData.withColumn("stockDate", to_date(col("stockDate")))

Next we will compute the 20 days, 50 days, 100 days simple moving averages

val movingAverageWindow20 = Window.orderBy("stockDate").rowsBetween(-20, 0)
val movingAverageWindow50 = Window.orderBy("stockDate").rowsBetween(-50, 0)
val movingAverageWindow100 = Window.orderBy("stockDate").rowsBetween(-100, 0)

// Calculate the moving average
val stocksMA = stocks.withColumn( "MA20", avg(stocks("close")).over(movingAverageWindow20)).withColumn( "MA50", avg(stocks("close")).over(movingAverageWindow50)).withColumn("MA100", avg(stocks("close")).over(movingAverageWindow100))

stocksMA.filter("close > MA50").select(col("stockDate"), col("close"), col("MA50")).show()

With the moving average calculated, let’s find when closing price exceeds the 50 days moving average

stocksMA.filter("close > MA50").select(col("stockDate"), col("close"), col("MA50")).show()
