i have problem saving kafka messages consumed spark streaming in zeppelin notebook.
my code is:
case class message(id: long, message: string, timestamp: long) extends serializable val ssc = new streamingcontext(sc, seconds(2)) val messagesstream = kafkautils.createstream[string, string, stringdecoder, stringdecoder](ssc, map("zookeeper.connect" -> "localhost:2181", "group.id" -> "test-consumer-group"), map("test" -> 4), storagelevel.memory_only) .map { case (k, v) => implicit val formats = defaultformats; jsonmethods.parse(v).extract[message] } .filter(_.id % 2 == 0) val mes = messagesstream.window(seconds(10)) mes .map(m => message(m.id, m.message, m.timestamp)) .foreachrdd( rdd => rdd.todf.registertemptable("messages")) ssc.start()
when run %sql select * messages
shows no data, table defined. if change saving temptable on cassandra saves , shows data correctly. don't understand why so.
thanks help.
ok here problem. let's first review foreachrdd operator definition :
foreachrdd
not used how it's intended used. generic output operator applies function, func, each rdd generated stream. function should push data in each rdd external system, such saving rdd files, or writing on network database. note function func executed in driver process running streaming application, , have rdd actions in force computation of streaming rdds.
so what's happening code following :
since dstreams executed lazily output operations, rdds lazily executed rdd actions. specifically, rdd actions inside dstream output operations force processing of received data. hence, if application not have output operation, don't, or has output operations dstream.foreachrdd() without rdd action inside them, nothing executed. system receive data , discard it.
so rdd data discarded each time perform registertemptable
, sql query gives empty result.
to solve problem, you'll need save data somewhere (cassandra choice) query on it.
Comments
Post a Comment