As in the previous article on Flink we will create a simple tumbling window aggregation, just this time round we will create a data model using a case class
. In our extremely simplified example the data stream consists of purchase data from an online store.
Set the project up in a similar fashion as discussed in the previous article. We will use IntelliJ IDEA as our IDE.
Our ininital Purchase
data model for the parsed streaming data looks like this:
package com.bissolconsulting.flink.examples
object Models {
case class Purchase(
purchaseId:Int
, firstName:String
, lastName:String
, department:String
, purchaseValue:Double
, middleName:Option[String]
)
}
In IDEA create a new pacakge called com.bissolconsulting.flink.examples
and then a new Scala object file called Models
, finally insert the code shown above.
Our program will accept an input stream from a socket. We parse the comma separated data and properly type it using our Purchase
model. Finally we group the data by department
and aggregate the purchaseValue
(sales) - basically we want to know:
How much revenue do we generate in a department every 10 second?
Create a new Scala object file called WindowWordCountUsingCaseClass
and insert following code:
package com.bissolconsulting.flink.examples
import com.bissolconsulting.flink.examples.Models.Purchase
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import scala.util.Try
object WindowWordCountUsingCaseClass {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.socketTextStream("localhost", 9999)
val parsedStream = source.map(value => {
val columns = value.split(",")
// the optional middleName is in the last column
// check if it exists
val middleName = Try(Some(columns(5))).getOrElse(None)
// map input stream to case class model
Purchase(columns(0).toInt, columns(1), columns(2), columns(3), columns(4).toDouble, middleName)
})
val keyedStream = parsedStream.keyBy(_.department)
keyedStream
.timeWindow(Time.seconds(10))
.sum("purchaseValue")
.print
env.execute("Window Word Count Using Case Class")
}
}
Notice that using a case class
enables us to reference the fields by name in functions like keyBy
and sum
.
Next open your terminal and run this command:
nc -kl 9999
This opens a socket on port 9999.
In IDEA right click on our file and choose to execute it.
Next paste the below Sample Data into the terminal window and press enter:
111,Sam,Watson,Groceries,20
112,Luke,Waller,Groceries,50
113,Peter,Burlington,Furniture,250
114,Susan,Walker,Furniture,50,Marry
The output of our stream aggregation looks like this:
4> Purchase(111,Sam,Watson,Groceries,70.0,None)
4> Purchase(113,Peter,Burlington,Furniture,300.0,None)
Note that the purchaseValue
was correctly aggregated, however, (unsurprisingly) we cannot aggregate with all the details we currently have in the records. So the next step will be to drop the unnecessary fields and introduce a new PurchaseAggregation
Model using a case class
:
package com.bissolconsulting.flink.examples
case class Purchase(
purchaseId:Int
, firstName:String
, lastName:String
, department:String
, purchaseValue:Double
, middleName:Option[String]
)
case class PurchaseAggregation(
department:String
, purchaseValue:Double
)
}
Finally let’s add an additional mapping to slim the dataset down for aggreation:
package com.bissolconsulting.flink.examples
import com.bissolconsulting.flink.examples.Models.{Purchase, PurchaseAggregation}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import scala.util.Try
object WindowWordCountUsingCaseClass {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.socketTextStream("localhost", 9999)
//session map
val parsedStream = source.map(value => {
val columns = value.split(",")
// the optional middleName is in the last column
// check if it exists
val middleName = Try(Some(columns(5))).getOrElse(None)
// map input stream to case class model
Purchase(columns(0).toInt, columns(1), columns(2), columns(3), columns(4).toDouble, middleName)
})
val aggBase:DataStream[PurchaseAggregation] = parsedStream.map(
record => PurchaseAggregation(record.department,record.purchaseValue)
)
val keyedStream = aggBase.keyBy(_.department)
keyedStream
.timeWindow(Time.seconds(10))
.sum("purchaseValue")
.print
env.execute("Window Word Count Using Case Class")
}
}
Compile and run this version in IDEA and paste again the sample data into the terminal window. This time round the results of the windowed aggregation look better:
4> PurchaseAggregation(Groceries,70.0)
4> PurchaseAggregation(Furniture,300.0)
As always you can find the code for this example on my Github repo.