NotesWhat is notes.io?

Notes brand slogan

Notes - notes.io

/*
Creator - Mu Sigma

Objective - Read a month long data of any masthead -> filter the data for relevant categories -> flag the session -> Calculate the aggregate metrics

Inputs - User has to pass two arguments - The month value along with the year & masthead name as tracked by data collector.

Outputs - The code outputs Pageviews and # Visits for the month and masthead and saves it in S3.

Assumptions in the code -
1.) One key assumption made in the code is that the categories can be defined using the URL. The URL was tracked through a sample of data(Jan-2014) and relevant filter was created. Need to confirm if the same set of values were tracked historically as well.

Modifications -
No modifications required as of now.

*/

import java.security.MessageDigest
import java.util.{Calendar, Locale, TimeZone}

import com.fasterxml.jackson.core.JsonParseException
import com.github.nscala_time.time.Imports._
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormatter
import org.json4s.{DefaultFormats, _}
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization._

import scala.collection.immutable.Map
import scala.collection.mutable.ListBuffer
import scala.util.Random
import scala.util.control._

object forecast_cat {
type Click = Map[String, String]

def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Forecast").set("spark.io.compression.codec","org.apache.spark.io.LZ4CompressionCodec").set("spark.kryoserializer.buffer.mb","128")
val sc = new SparkContext(conf)
var month = args(0).toString
var masthead = args(1).toString

var ClicksLocation = "s3n://ffx-analytics/all-interactions/" + month + "-" + masthead + "-view-*.gz"
var OUTPUT_BASE_PATH = "s3n://ffx-analytics/mu-sigma-data/"

//Reading as a textfile

//Filter for the relevant week

def readClicks(clicksLocation: String): RDD[Click] = {
var read_text = sc.textFile(clicksLocation, 100)

var clicks: RDD[Click] = (read_text
.filter((l: String) => !(l contains "00000000-0000-0000-0000-000000000000"))
.map(expandRecord(_))
)

return clicks
}

def filtercat(clicks: RDD[Click]): RDD[Click] = {

clicks.map(c=>(c,c.getOrElse("assetCategory",""))).filter(c=>(c._2 == "Article")).map(c=>(c._1))

}


/*
* Single function to add location fields to an RDD of clicks
* input is the RDD of clicks to enrich
* and the (hadoopable) location to find the ip location file
*/

def addLocFields(clicks:RDD[Click],ipToCityLoc:String): RDD[Click] = {

// first, get and broadcast the ip ranges

val (ipToCity:RDD[((Long,Long),(Long, Array[String]))]) = (sc.hadoopFile(ipToCityLoc, classOf[TextInputFormat], classOf[LongWritable],classOf[Text])
.map( pair => (pair._1.get(), pair._2.toString) )
.filter(
(line:(Long,String)) => !(line._2 contains ":")
)
.mapValues(
(line:String) => line.split(",")
.map(
(word:String) => word.filterNot(_ == '"')
)
)
.map(
(line:(Long,Array[String])) => ((ipToLong(line._2(0)),ipToLong(line._2(1))), line)
)
)
// ipToCity.take(5).foreach(println)
val ipRanges:Broadcast[Array[(Long,Long)]] = sc.broadcast(ipToCity
.map(_._1)
.collect()
.sortBy(_._1)
)

//Now, enrich the clicks
val loc_enrich_clicks = (clicks
.map(
(click:Click) => (getAddressRange(ipRanges.value, ipToLong(click.getOrElse("ipAddress", "203.192.146.201"))),click)
)
.join(ipToCity)
.map(
(data:((Long,Long),(Click,(Long,Array[String])))) => (data._2._1
.updated("Country", data._2._2._2(2))
.updated("Region", data._2._2._2(3))
.updated("City", data._2._2._2(4))
.updated("ipBlock", data._2._2._2(0) + " - " + data._2._2._2(1))
.updated("ipBlockIndex", data._2._2._1.toString)
)
)
)

(loc_enrich_clicks.map(c=>(c, c.getOrElse("Country","")))
.filter(t=>(t._2 == "AU"))
.map(t=>(t._1))
)
}

def sessionize(clicks: RDD[Click]): RDD[Click] = {
//Group all the timestamps for a tracking ID, to be used for sessionising the data
var groupByTrackingIdsAndDate = clicks
.map(c => {
var timestamp = c.getOrElse("timestamp", "")
var startOfDay = toDate_try(timestamp.toString.toLong)
((c.getOrElse("trackingId", ""), startOfDay), timestamp)
})
.groupByKey()
.map(t => (t._1, t._2.toList.sorted))
.map(t => (t._1, split(t._2)))

//Flatten the list to get list of timestamps as separate rows
var track_individual_list = groupByTrackingIdsAndDate.flatMap(t => (for (xs <- t._2) yield (t._1, xs)))

//Map the session ID back to the original click data. Assumption - negligible chances of a tracking ID having two clicks at the exact same timestamp
var withsessionid = track_individual_list
.map { case ((trackingId, date), timestamps) => ((trackingId, timestamps),
(
timestamps.head + trackingId + date,
timestamps.head,
timestamps.last,
(timestamps.last.toLong - timestamps.head.toLong) / 1000,
timestamps.length
))
}
.flatMap { case ((trackingId, timestamps), t) => {
for (timestamp <- timestamps)
yield ((trackingId, timestamp), t)
}
}

//Add the sessionID and referer to the click data using updated statement
var sessionised_clicks = clicks
.map(c => ((c.getOrElse("trackingId", ""), c.getOrElse("timestamp", "")), c))
.join(withsessionid)
.map { case ((trackingId, timestamp), (click, sessionInfo)) => (click.updated("sessionId", sessionInfo._1.toString)
.updated("sessionDuration", sessionInfo._4.toString)
.updated("sessionStart", sessionInfo._2.toString)
.updated("sessionEnd", sessionInfo._3.toString)
.updated("numPage", sessionInfo._5.toString))
}
return sessionised_clicks
}

def calculate_metrics(sessionised_clicks: RDD[Click], month: String, member_or_tracking:String) = {

//println(sessionised_clicks.map(c=>(toMonth(c.getOrElse("timestamp","").toString.toLong))).distinct)
//println(sessionised_clicks.map(c=>(c.getOrElse("site",""))).distinct)


var fid_numpage_session = sessionised_clicks
.map(c=>(c(member_or_tracking).toString,1))

var numpage = fid_numpage_session
.map(t=>((t._1, month, "View"),t._2.toDouble))
.reduceByKey(_+_)

var num_tracking = numpage.map(t=>(("Number_of_articles","month",member_or_tracking,t._2,month),1)).reduceByKey(_+_)

numpage.repartition(1).saveAsTextFile("s3n://ffx-analytics/mu-sigma-data/adhoc/" + month + "_" + cat + "_" + masthead + "_tracking_num_articles")

num_tracking.repartition(1).saveAsTextFile("s3n://ffx-analytics/mu-sigma-data/adhoc/" + month + "_" + cat + "_" + masthead + "articles_num_tracking")
}

def memberid_match(clicks:RDD[String],subscription:String) :RDD[String] = {
var read_text = sc.textFile(subscription).map(c=>(c._1,1))
var memid_matched = read_text.join(clicks.map(c=>(c.getOrElse("memberId",""),c))).map(c=>(c._2._2))
memid_matched
}

var subscription = "s3n://ffx-analytics/mu-sigma-data/Subscription.csv"
var clicks = readClicks(ClicksLocation)
var ipToCityLocation = "s3n://ffx-analytics/mu-sigma-data/dbip-city-2014-09.csv"

var filtered_cat = filterCat(clicks)
var subscription_data = memberid_match(filtered_cat)
var non_subscription_data = filtered_cat.subtract(subscription_data)

var subs_metrics = calculate_metrics(addLocFields(subscription_data,ipToCityLocation),month,"memberId")
var non_subs = calculate_metrics(addLocFields(non_subscription_data,ipToCityLocation),month,"trackingId")
}

var IpRegex = new Regex("""([0-9]{1,3}).([0-9]{1,3}).([0-9]{1,3}).([0-9]{1,3})""")

def ipToLong(ip:String):Long = ip match {
case IpRegex(a,b,c,d) => Array(ta,b,c,d).map(_.toLong).foldLeft(0L)((a: Long, b: Long) => (a << 8) | (b & 0xff))
case _ => 1L
}

// Binary search of the address range
def getAddressRange(ranges:Array[(Long, Long)], addr: Long):(Long,Long) = {
def getAddressRangeHelper(ranges:Array[(Long,Long)], start:Int, end:Int, addr:Long):(Long,Long) = {
val midpoint = start + (end-start)/2
if (addr < ranges(midpoint)._1)
getAddressRangeHelper(ranges, start, midpoint-1, addr)
else if (addr > ranges(midpoint)._2)
getAddressRangeHelper(ranges, midpoint+1, end, addr)
else
ranges(midpoint)
}
getAddressRangeHelper(ranges, 0, ranges.size-1, addr)
}

// Halve a list of tuples
def halveRange(in:Array[(Long,Long)]):Array[(Long,Long)] = {
def halveRangeRec(out:List[(Long,Long)], in:List[(Long,Long)]):List[(Long,Long)] = in match {
case List() => out
case x::List() => x::out
case x::y::xs => {
println(x)
halveRangeRec((x._1,y._2)::out, xs)
}
}

halveRangeRec(List(), in.toList).reverse.toArray
}

def toMonth(timestamp: Long): String = {

val mydate = Calendar.getInstance(TimeZone.getTimeZone("Australia/Sydney"), Locale.GERMANY)
mydate.setTimeInMillis(timestamp)
val returndate = mydate.get(Calendar.MONTH) + "-" + mydate.get(Calendar.YEAR)
return returndate.toString
}

def toDate_try(timestamp: Long): String = {

val mydate = Calendar.getInstance(TimeZone.getTimeZone("Australia/Sydney"), Locale.GERMANY)
mydate.setTimeInMillis(timestamp)
val returndate = mydate.get(Calendar.DAY_OF_MONTH) + "-" + mydate.get(Calendar.MONTH) + "-" + mydate.get(Calendar.YEAR)
return returndate.toString
}

def convert_to_pairs(x: List[String]): List[(String, String)] = {
var first: String = ""
var second: String = ""
var make_list = new ListBuffer[(String, String)]()
make_list.clear()
if (x.length == 1) {
first = x(0)
make_list += Pair(first, first)
} else {
for (i <- 0 until x.length - 1) {
first = x(i)
second = x(i + 1)
make_list += Pair(first, second)
}
first = x(x.length - 1)
make_list += Pair(first, first)
}
make_list.toList
}

var fs: String = ""
var b = new ListBuffer[String]()

var THIRTY_MINUTES_IN_MILLISECONDS = 1800000

def split[A](xs: List[String]): List[List[String]] = {

var n = 0
if (xs.size == 1) {
fs = xs(0)
return List(xs)
}
else {
val loop = new Breaks;
loop.breakable {
for (i <- 0 until xs.length - 1) {
var diffs = xs(i + 1).toLong - xs(i).toLong
if (diffs > THIRTY_MINUTES_IN_MILLISECONDS) {
n = i + 1
loop.break
}
}
}
if (n == 0) return List(xs)
val (ys, zs) = xs.splitAt(n)
ys :: split(zs)
}

}
var d = new ListBuffer[String]()

def lag_asset(a: Array[String], b: Array[String]): Array[String] = {
d.clear()
for (i <- 0 until a.length) {
if (a(i).contentEquals(b(i))) {
d += "Same_asset"
} else {
d += "Not_Same"
}
}
return d.toArray
}

def check(a: List[List[String]]): List[String] = {
b.clear()
for (i <- 0 until a.length) {
b += (i + 1).toString
}

return b.toList
}

def sub(a: Array[String], b: Array[String]): Array[String] = {

for (i <- 0 until a.length) {
var diff = a(i).toLong - b(i).toLong
d += diff.toString
}
return d.toArray
}

def parseWithErrors(jsonRecord: String): Option[Map[String, JValue]] = {

implicit val formats = DefaultFormats

try {
Some(parse(jsonRecord).extract[Map[String, JValue]])
} catch {
case e: JsonParseException => {
val index = e.getLocation.getColumnNr - 2

// println(jsonRecord)

jsonRecord.charAt(index) match {
case ''' => parseWithErrors(jsonRecord.patch(index - 1, "", 1))
case 'v' => parseWithErrors(jsonRecord.patch(index - 1, "\u0007", 2))
case 'a' => parseWithErrors(jsonRecord.patch(index - 1, "\u0013", 2))
case _ => parseWithErrors(jsonRecord.patch(index - 1, "", 1))
}
}
case _: Throwable => None
}
}

def expandRecord(jsonRecord: String): Map[String, String] = {

val digest = MessageDigest.getInstance("MD5")

val partParsed: Option[Map[String, JValue]] = parseWithErrors(jsonRecord)

// Now, walk partParsed and convert each of the JValues to their primitive types.
// This is to circumvent https://github.com/json4s/json4s/issues/124

def reparse(v: JValue): Any = v match {
case JString(s) => s
case JInt(i) => i.toString()
case JArray(o) => (o
.asInstanceOf[List[JObject]]
.map(
(pair: JObject) => {
// we know that the pair has name, value keys
val p = (pair
.obj
.toMap
.mapValues(_.values.asInstanceOf[String])
)

Map[String, String](p("name") -> p("value"))
}
)
.foldLeft(Map[String, String]())(_ ++ _)
)
case _ => v
}

partParsed match {
case Some(s) => {
val withExtras = s.mapValues(reparse)
val trackingId = digest.digest(withExtras("trackingId").asInstanceOf[String].getBytes).map("%02x".format(_)).mkString
val extras = withExtras.getOrElse("extras", Map()).asInstanceOf[Map[String, String]]
(withExtras - "extras").updated("trackingId", trackingId).asInstanceOf[Map[String, String]] ++ extras
}
case None => Map[String, String]()
}
}

def serialise(click: Map[String, String]): String = {

implicit val formats = DefaultFormats

write(click)
}

def deserialise(json:String):Click = {
implicit val formats = DefaultFormats

parse(json).extract[Click]
}
}
     
 
what is notes.io
 

Notes.io is a web-based application for taking notes. You can take your notes and share with others people. If you like taking long notes, notes.io is designed for you. To date, over 8,000,000,000 notes created and continuing...

With notes.io;

  • * You can take a note from anywhere and any device with internet connection.
  • * You can share the notes in social platforms (YouTube, Facebook, Twitter, instagram etc.).
  • * You can quickly share your contents without website, blog and e-mail.
  • * You don't need to create any Account to share a note. As you wish you can use quick, easy and best shortened notes with sms, websites, e-mail, or messaging services (WhatsApp, iMessage, Telegram, Signal).
  • * Notes.io has fabulous infrastructure design for a short link and allows you to share the note as an easy and understandable link.

Fast: Notes.io is built for speed and performance. You can take a notes quickly and browse your archive.

Easy: Notes.io doesn’t require installation. Just write and share note!

Short: Notes.io’s url just 8 character. You’ll get shorten link of your note when you want to share. (Ex: notes.io/q )

Free: Notes.io works for 12 years and has been free since the day it was started.


You immediately create your first note and start sharing with the ones you wish. If you want to contact us, you can use the following communication channels;


Email: [email protected]

Twitter: http://twitter.com/notesio

Instagram: http://instagram.com/notes.io

Facebook: http://facebook.com/notesio



Regards;
Notes.io Team

     
 
Shortened Note Link
 
 
Looding Image
 
     
 
Long File
 
 

For written notes was greater than 18KB Unable to shorten.

To be smaller than 18KB, please organize your notes, or sign in.