Objective - Read a month long data of any masthead -> filter the data for relevant categories -> flag the session -> Calculate the aggregate metrics

Inputs - User has to pass two arguments - The month value along with the year & masthead name as tracked by data collector.

Outputs - The code outputs Pageviews and # Visits for the month and masthead and saves it in S3.

Assumptions in the code -
1.) One key assumption made in the code is that the categories can be defined using the URL. The URL was tracked through a sample of data(Jan-2014) and relevant filter was created. Need to confirm if the same set of values were tracked historically as well.

Modifications -
No modifications required as of now.


import java.util.{Calendar, Locale, TimeZone}

import com.fasterxml.jackson.core.JsonParseException
import com.github.nscala_time.time.Imports._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormatter
import org.json4s.{DefaultFormats, _}
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization._

import scala.collection.immutable.Map
import scala.collection.mutable.ListBuffer
import scala.util.Random
import scala.util.control._

object forecast_cat {
type Click = Map[String, String]

def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Forecast").set("","").set("spark.kryoserializer.buffer.mb","128")
val sc = new SparkContext(conf)
var month = args(0).toString
var masthead = args(1).toString

var ClicksLocation = "s3n://ffx-analytics/all-interactions/" + month + "-" + masthead + "-view-*.gz"
var OUTPUT_BASE_PATH = "s3n://ffx-analytics/mu-sigma-data/"

//Reading as a textfile

//Filter for the relevant week

def readClicks(clicksLocation: String): RDD[Click] = {
var read_text = sc.textFile(clicksLocation, 100)

var clicks: RDD[Click] = (read_text
.filter((l: String) => !(l contains "00000000-0000-0000-0000-000000000000"))

return clicks

def filtercat(clicks: RDD[Click]): RDD[Click] = {>(c,c.getOrElse("assetCategory",""))).filter(c=>(c._2 == "Article")).map(c=>(c._1))


* Single function to add location fields to an RDD of clicks
* input is the RDD of clicks to enrich
* and the (hadoopable) location to find the ip location file

def addLocFields(clicks:RDD[Click],ipToCityLoc:String): RDD[Click] = {

// first, get and broadcast the ip ranges

val (ipToCity:RDD[((Long,Long),(Long, Array[String]))]) = (sc.hadoopFile(ipToCityLoc, classOf[TextInputFormat], classOf[LongWritable],classOf[Text])
.map( pair => (pair._1.get(), pair._2.toString) )
(line:(Long,String)) => !(line._2 contains ":")
(line:String) => line.split(",")
(word:String) => word.filterNot(_ == '"')
(line:(Long,Array[String])) => ((ipToLong(line._2(0)),ipToLong(line._2(1))), line)
// ipToCity.take(5).foreach(println)
val ipRanges:Broadcast[Array[(Long,Long)]] = sc.broadcast(ipToCity

//Now, enrich the clicks
val loc_enrich_clicks = (clicks
(click:Click) => (getAddressRange(ipRanges.value, ipToLong(click.getOrElse("ipAddress", ""))),click)
(data:((Long,Long),(Click,(Long,Array[String])))) => (data._2._1
.updated("Country", data._2._2._2(2))
.updated("Region", data._2._2._2(3))
.updated("City", data._2._2._2(4))
.updated("ipBlock", data._2._2._2(0) + " - " + data._2._2._2(1))
.updated("ipBlockIndex", data._2._2._1.toString)

(>(c, c.getOrElse("Country","")))
.filter(t=>(t._2 == "AU"))

def sessionize(clicks: RDD[Click]): RDD[Click] = {
//Group all the timestamps for a tracking ID, to be used for sessionising the data
var groupByTrackingIdsAndDate = clicks
.map(c => {
var timestamp = c.getOrElse("timestamp", "")
var startOfDay = toDate_try(timestamp.toString.toLong)
((c.getOrElse("trackingId", ""), startOfDay), timestamp)
.map(t => (t._1, t._2.toList.sorted))
.map(t => (t._1, split(t._2)))

//Flatten the list to get list of timestamps as separate rows
var track_individual_list = groupByTrackingIdsAndDate.flatMap(t => (for (xs <- t._2) yield (t._1, xs)))

//Map the session ID back to the original click data. Assumption - negligible chances of a tracking ID having two clicks at the exact same timestamp
var withsessionid = track_individual_list
.map { case ((trackingId, date), timestamps) => ((trackingId, timestamps),
timestamps.head + trackingId + date,
(timestamps.last.toLong - timestamps.head.toLong) / 1000,
.flatMap { case ((trackingId, timestamps), t) => {
for (timestamp <- timestamps)
yield ((trackingId, timestamp), t)

//Add the sessionID and referer to the click data using updated statement
var sessionised_clicks = clicks
.map(c => ((c.getOrElse("trackingId", ""), c.getOrElse("timestamp", "")), c))
.map { case ((trackingId, timestamp), (click, sessionInfo)) => (click.updated("sessionId", sessionInfo._1.toString)
.updated("sessionDuration", sessionInfo._4.toString)
.updated("sessionStart", sessionInfo._2.toString)
.updated("sessionEnd", sessionInfo._3.toString)
.updated("numPage", sessionInfo._5.toString))
return sessionised_clicks

def calculate_metrics(sessionised_clicks: RDD[Click], month: String, member_or_tracking:String) = {


var fid_numpage_session = sessionised_clicks

var numpage = fid_numpage_session
.map(t=>((t._1, month, "View"),t._2.toDouble))

var num_tracking =>(("Number_of_articles","month",member_or_tracking,t._2,month),1)).reduceByKey(_+_)

numpage.repartition(1).saveAsTextFile("s3n://ffx-analytics/mu-sigma-data/adhoc/" + month + "_" + cat + "_" + masthead + "_tracking_num_articles")

num_tracking.repartition(1).saveAsTextFile("s3n://ffx-analytics/mu-sigma-data/adhoc/" + month + "_" + cat + "_" + masthead + "articles_num_tracking")

def memberid_match(clicks:RDD[String],subscription:String) :RDD[String] = {
var read_text = sc.textFile(subscription).map(c=>(c._1,1))
var memid_matched = read_text.join(>(c.getOrElse("memberId",""),c))).map(c=>(c._2._2))

var subscription = "s3n://ffx-analytics/mu-sigma-data/Subscription.csv"
var clicks = readClicks(ClicksLocation)
var ipToCityLocation = "s3n://ffx-analytics/mu-sigma-data/dbip-city-2014-09.csv"

var filtered_cat = filterCat(clicks)
var subscription_data = memberid_match(filtered_cat)
var non_subscription_data = filtered_cat.subtract(subscription_data)

var subs_metrics = calculate_metrics(addLocFields(subscription_data,ipToCityLocation),month,"memberId")
var non_subs = calculate_metrics(addLocFields(non_subscription_data,ipToCityLocation),month,"trackingId")

var IpRegex = new Regex("""([0-9]{1,3}).([0-9]{1,3}).([0-9]{1,3}).([0-9]{1,3})""")

def ipToLong(ip:String):Long = ip match {
case IpRegex(a,b,c,d) => Array(ta,b,c,d).map(_.toLong).foldLeft(0L)((a: Long, b: Long) => (a << 8) | (b & 0xff))
case _ => 1L

// Binary search of the address range
def getAddressRange(ranges:Array[(Long, Long)], addr: Long):(Long,Long) = {
def getAddressRangeHelper(ranges:Array[(Long,Long)], start:Int, end:Int, addr:Long):(Long,Long) = {
val midpoint = start + (end-start)/2
if (addr < ranges(midpoint)._1)
getAddressRangeHelper(ranges, start, midpoint-1, addr)
else if (addr > ranges(midpoint)._2)
getAddressRangeHelper(ranges, midpoint+1, end, addr)
getAddressRangeHelper(ranges, 0, ranges.size-1, addr)

// Halve a list of tuples
def halveRange(in:Array[(Long,Long)]):Array[(Long,Long)] = {
def halveRangeRec(out:List[(Long,Long)], in:List[(Long,Long)]):List[(Long,Long)] = in match {
case List() => out
case x::List() => x::out
case x::y::xs => {
halveRangeRec((x._1,y._2)::out, xs)

halveRangeRec(List(), in.toList).reverse.toArray

def toMonth(timestamp: Long): String = {

val mydate = Calendar.getInstance(TimeZone.getTimeZone("Australia/Sydney"), Locale.GERMANY)
val returndate = mydate.get(Calendar.MONTH) + "-" + mydate.get(Calendar.YEAR)
return returndate.toString

def toDate_try(timestamp: Long): String = {

val mydate = Calendar.getInstance(TimeZone.getTimeZone("Australia/Sydney"), Locale.GERMANY)
val returndate = mydate.get(Calendar.DAY_OF_MONTH) + "-" + mydate.get(Calendar.MONTH) + "-" + mydate.get(Calendar.YEAR)
return returndate.toString

def convert_to_pairs(x: List[String]): List[(String, String)] = {
var first: String = ""
var second: String = ""
var make_list = new ListBuffer[(String, String)]()
if (x.length == 1) {
first = x(0)
make_list += Pair(first, first)
} else {
for (i <- 0 until x.length - 1) {
first = x(i)
second = x(i + 1)
make_list += Pair(first, second)
first = x(x.length - 1)
make_list += Pair(first, first)

var fs: String = ""
var b = new ListBuffer[String]()


def split[A](xs: List[String]): List[List[String]] = {

var n = 0
if (xs.size == 1) {
fs = xs(0)
return List(xs)
else {
val loop = new Breaks;
loop.breakable {
for (i <- 0 until xs.length - 1) {
var diffs = xs(i + 1).toLong - xs(i).toLong
n = i + 1
if (n == 0) return List(xs)
val (ys, zs) = xs.splitAt(n)
ys :: split(zs)

var d = new ListBuffer[String]()

def lag_asset(a: Array[String], b: Array[String]): Array[String] = {
for (i <- 0 until a.length) {
if (a(i).contentEquals(b(i))) {
d += "Same_asset"
} else {
d += "Not_Same"
return d.toArray

def check(a: List[List[String]]): List[String] = {
for (i <- 0 until a.length) {
b += (i + 1).toString

return b.toList

def sub(a: Array[String], b: Array[String]): Array[String] = {

for (i <- 0 until a.length) {
var diff = a(i).toLong - b(i).toLong
d += diff.toString
return d.toArray

def parseWithErrors(jsonRecord: String): Option[Map[String, JValue]] = {

implicit val formats = DefaultFormats

try {
Some(parse(jsonRecord).extract[Map[String, JValue]])
} catch {
case e: JsonParseException => {
val index = e.getLocation.getColumnNr - 2

// println(jsonRecord)

jsonRecord.charAt(index) match {
case ''' => parseWithErrors(jsonRecord.patch(index - 1, "", 1))
case 'v' => parseWithErrors(jsonRecord.patch(index - 1, "\u0007", 2))
case 'a' => parseWithErrors(jsonRecord.patch(index - 1, "\u0013", 2))
case _ => parseWithErrors(jsonRecord.patch(index - 1, "", 1))
case _: Throwable => None

def expandRecord(jsonRecord: String): Map[String, String] = {

val digest = MessageDigest.getInstance("MD5")

val partParsed: Option[Map[String, JValue]] = parseWithErrors(jsonRecord)

// Now, walk partParsed and convert each of the JValues to their primitive types.
// This is to circumvent

def reparse(v: JValue): Any = v match {
case JString(s) => s
case JInt(i) => i.toString()
case JArray(o) => (o
(pair: JObject) => {
// we know that the pair has name, value keys
val p = (pair

Map[String, String](p("name") -> p("value"))
.foldLeft(Map[String, String]())(_ ++ _)
case _ => v

partParsed match {
case Some(s) => {
val withExtras = s.mapValues(reparse)
val trackingId = digest.digest(withExtras("trackingId").asInstanceOf[String].getBytes).map("%02x".format(_)).mkString
val extras = withExtras.getOrElse("extras", Map()).asInstanceOf[Map[String, String]]
(withExtras - "extras").updated("trackingId", trackingId).asInstanceOf[Map[String, String]] ++ extras
case None => Map[String, String]()

def serialise(click: Map[String, String]): String = {

implicit val formats = DefaultFormats


def deserialise(json:String):Click = {
implicit val formats = DefaultFormats

