apache spark - Why does textFileStream dstream give empty RDDs as if no files were processed? -


i have basic spark application streams input file, every line contains json string want create model object of.

public final class sparkstreamingapplication {      public static javasparkcontext javasparkcontext() {         final sparkconf conf = new sparkconf()                 .setappname("sparkapplication")                 .setmaster("local[2]");         return new javasparkcontext(conf);     }      public static void main(string[] args) {         final javasparkcontext sparkcontext = javasparkcontext();         final string path = "data/input.txt";          final javastreamingcontext streamingcontext = new javastreamingcontext(sparkcontext, durations.seconds(10));         final javadstream<string> linesdstream = streamingcontext.textfilestream(path);         final javadstream<string> tokens = linesdstream.flatmap(x -> arrays.aslist(x.split("|")));         final javadstream<long> count = tokens.count();         count.print();         streamingcontext.start();                       streamingcontext.awaittermination();        } }  

this results in:

16/01/24 18:44:56 info sparkcontext: running spark version 1.6.0 16/01/24 18:44:58 warn nativecodeloader: unable load native-hadoop library platform... using builtin-java classes applicable 16/01/24 18:44:58 warn utils: hostname, markus-lenovo resolves loopback address: 127.0.1.1; using 192.168.2.103 instead (on interface wlp2s0) 16/01/24 18:44:58 warn utils: set spark_local_ip if need bind address 16/01/24 18:44:58 info securitymanager: changing view acls to: markus 16/01/24 18:44:58 info securitymanager: changing modify acls to: markus 16/01/24 18:44:58 info securitymanager: securitymanager: authentication disabled; ui acls disabled; users view permissions: set(markus); users modify permissions: set(markus) 16/01/24 18:44:59 info utils: started service 'sparkdriver' on port 38761. 16/01/24 18:44:59 info slf4jlogger: slf4jlogger started 16/01/24 18:44:59 info remoting: starting remoting 16/01/24 18:45:00 info remoting: remoting started; listening on addresses :[akka.tcp://sparkdriveractorsystem@192.168.2.103:45438] 16/01/24 18:45:00 info utils: started service 'sparkdriveractorsystem' on port 45438. 16/01/24 18:45:00 info sparkenv: registering mapoutputtracker 16/01/24 18:45:00 info sparkenv: registering blockmanagermaster 16/01/24 18:45:00 info diskblockmanager: created local directory @ /tmp/blockmgr-82c4981c-0b78-47c0-a8c7-e6fe8bc6ac84 16/01/24 18:45:00 info memorystore: memorystore started capacity 1092.4 mb 16/01/24 18:45:00 info sparkenv: registering outputcommitcoordinator 16/01/24 18:45:00 info utils: started service 'sparkui' on port 4040. 16/01/24 18:45:00 info sparkui: started sparkui @ http://192.168.2.103:4040 16/01/24 18:45:00 info executor: starting executor id driver on host localhost 16/01/24 18:45:00 info utils: started service 'org.apache.spark.network.netty.nettyblocktransferservice' on port 35429. 16/01/24 18:45:00 info nettyblocktransferservice: server created on 35429 16/01/24 18:45:00 info blockmanagermaster: trying register blockmanager 16/01/24 18:45:00 info blockmanagermasterendpoint: registering block manager localhost:35429 1092.4 mb ram, blockmanagerid(driver, localhost, 35429) 16/01/24 18:45:00 info blockmanagermaster: registered blockmanager 16/01/24 18:45:01 info fileinputdstream: duration remembering rdds set 60000 ms org.apache.spark.streaming.dstream.fileinputdstream@3c35c345 16/01/24 18:45:02 info foreachdstream: metadatacleanupdelay = -1 16/01/24 18:45:02 info mappeddstream: metadatacleanupdelay = -1 16/01/24 18:45:02 info mappeddstream: metadatacleanupdelay = -1 16/01/24 18:45:02 info shuffleddstream: metadatacleanupdelay = -1 16/01/24 18:45:02 info transformeddstream: metadatacleanupdelay = -1 16/01/24 18:45:02 info mappeddstream: metadatacleanupdelay = -1 16/01/24 18:45:02 info flatmappeddstream: metadatacleanupdelay = -1 16/01/24 18:45:02 info mappeddstream: metadatacleanupdelay = -1 16/01/24 18:45:02 info fileinputdstream: metadatacleanupdelay = -1 16/01/24 18:45:02 info fileinputdstream: slide time = 10000 ms 16/01/24 18:45:02 info fileinputdstream: storage level = storagelevel(false, false, false, false, 1) 16/01/24 18:45:02 info fileinputdstream: checkpoint interval = null 16/01/24 18:45:02 info fileinputdstream: remember duration = 60000 ms 16/01/24 18:45:02 info fileinputdstream: initialized , validated org.apache.spark.streaming.dstream.fileinputdstream@3c35c345 16/01/24 18:45:02 info mappeddstream: slide time = 10000 ms 16/01/24 18:45:02 info mappeddstream: storage level = storagelevel(false, false, false, false, 1) 16/01/24 18:45:02 info mappeddstream: checkpoint interval = null 16/01/24 18:45:02 info mappeddstream: remember duration = 10000 ms 16/01/24 18:45:02 info mappeddstream: initialized , validated org.apache.spark.streaming.dstream.mappeddstream@45f27baa 16/01/24 18:45:02 info flatmappeddstream: slide time = 10000 ms 16/01/24 18:45:02 info flatmappeddstream: storage level = storagelevel(false, false, false, false, 1) 16/01/24 18:45:02 info flatmappeddstream: checkpoint interval = null 16/01/24 18:45:02 info flatmappeddstream: remember duration = 10000 ms 16/01/24 18:45:02 info flatmappeddstream: initialized , validated org.apache.spark.streaming.dstream.flatmappeddstream@18d0e76e 16/01/24 18:45:02 info mappeddstream: slide time = 10000 ms 16/01/24 18:45:02 info mappeddstream: storage level = storagelevel(false, false, false, false, 1) 16/01/24 18:45:02 info mappeddstream: checkpoint interval = null 16/01/24 18:45:02 info mappeddstream: remember duration = 10000 ms 16/01/24 18:45:02 info mappeddstream: initialized , validated org.apache.spark.streaming.dstream.mappeddstream@eb2c23e 16/01/24 18:45:02 info transformeddstream: slide time = 10000 ms 16/01/24 18:45:02 info transformeddstream: storage level = storagelevel(false, false, false, false, 1) 16/01/24 18:45:02 info transformeddstream: checkpoint interval = null 16/01/24 18:45:02 info transformeddstream: remember duration = 10000 ms 16/01/24 18:45:02 info transformeddstream: initialized , validated org.apache.spark.streaming.dstream.transformeddstream@26b276d3 16/01/24 18:45:02 info shuffleddstream: slide time = 10000 ms 16/01/24 18:45:02 info shuffleddstream: storage level = storagelevel(false, false, false, false, 1) 16/01/24 18:45:02 info shuffleddstream: checkpoint interval = null 16/01/24 18:45:02 info shuffleddstream: remember duration = 10000 ms 16/01/24 18:45:02 info shuffleddstream: initialized , validated org.apache.spark.streaming.dstream.shuffleddstream@704b6684 16/01/24 18:45:02 info mappeddstream: slide time = 10000 ms 16/01/24 18:45:02 info mappeddstream: storage level = storagelevel(false, false, false, false, 1) 16/01/24 18:45:02 info mappeddstream: checkpoint interval = null 16/01/24 18:45:02 info mappeddstream: remember duration = 10000 ms 16/01/24 18:45:02 info mappeddstream: initialized , validated org.apache.spark.streaming.dstream.mappeddstream@6fbf1474 16/01/24 18:45:02 info mappeddstream: slide time = 10000 ms 16/01/24 18:45:02 info mappeddstream: storage level = storagelevel(false, false, false, false, 1) 16/01/24 18:45:02 info mappeddstream: checkpoint interval = null 16/01/24 18:45:02 info mappeddstream: remember duration = 10000 ms 16/01/24 18:45:02 info mappeddstream: initialized , validated org.apache.spark.streaming.dstream.mappeddstream@7784888f 16/01/24 18:45:02 info foreachdstream: slide time = 10000 ms 16/01/24 18:45:02 info foreachdstream: storage level = storagelevel(false, false, false, false, 1) 16/01/24 18:45:02 info foreachdstream: checkpoint interval = null 16/01/24 18:45:02 info foreachdstream: remember duration = 10000 ms 16/01/24 18:45:02 info foreachdstream: initialized , validated org.apache.spark.streaming.dstream.foreachdstream@42b57c42 16/01/24 18:45:02 info recurringtimer: started timer jobgenerator @ time 1453657510000 16/01/24 18:45:02 info jobgenerator: started jobgenerator @ 1453657510000 ms 16/01/24 18:45:02 info jobscheduler: started jobscheduler 16/01/24 18:45:02 info streamingcontext: streamingcontext started 16/01/24 18:45:10 info fileinputdstream: finding new files took 184 ms 16/01/24 18:45:10 info fileinputdstream: new files @ time 1453657510000 ms:  16/01/24 18:45:10 info jobscheduler: added jobs time 1453657510000 ms 16/01/24 18:45:10 info jobscheduler: starting job streaming job 1453657510000 ms.0 job set of time 1453657510000 ms 16/01/24 18:45:10 info sparkcontext: starting job: print @ sparkstreamingapplication.java:33 16/01/24 18:45:10 info dagscheduler: registering rdd 5 (union @ dstream.scala:617) 16/01/24 18:45:10 info dagscheduler: got job 0 (print @ sparkstreamingapplication.java:33) 1 output partitions 16/01/24 18:45:10 info dagscheduler: final stage: resultstage 1 (print @ sparkstreamingapplication.java:33) 16/01/24 18:45:10 info dagscheduler: parents of final stage: list(shufflemapstage 0) 16/01/24 18:45:10 info dagscheduler: missing parents: list(shufflemapstage 0) 16/01/24 18:45:10 info dagscheduler: submitting shufflemapstage 0 (unionrdd[5] @ union @ dstream.scala:617), has no missing parents 16/01/24 18:45:10 info memorystore: block broadcast_0 stored values in memory (estimated size 4.6 kb, free 4.6 kb) 16/01/24 18:45:10 info memorystore: block broadcast_0_piece0 stored bytes in memory (estimated size 2.6 kb, free 7.2 kb) 16/01/24 18:45:10 info blockmanagerinfo: added broadcast_0_piece0 in memory on localhost:35429 (size: 2.6 kb, free: 1092.4 mb) 16/01/24 18:45:10 info sparkcontext: created broadcast 0 broadcast @ dagscheduler.scala:1006 16/01/24 18:45:10 info dagscheduler: submitting 1 missing tasks shufflemapstage 0 (unionrdd[5] @ union @ dstream.scala:617) 16/01/24 18:45:10 info taskschedulerimpl: adding task set 0.0 1 tasks 16/01/24 18:45:10 info tasksetmanager: starting task 0.0 in stage 0.0 (tid 0, localhost, partition 0,process_local, 2148 bytes) 16/01/24 18:45:10 info executor: running task 0.0 in stage 0.0 (tid 0) 16/01/24 18:45:10 info executor: finished task 0.0 in stage 0.0 (tid 0). 1159 bytes result sent driver 16/01/24 18:45:11 info dagscheduler: shufflemapstage 0 (union @ dstream.scala:617) finished in 0.211 s 16/01/24 18:45:11 info dagscheduler: looking newly runnable stages 16/01/24 18:45:11 info dagscheduler: running: set() 16/01/24 18:45:11 info dagscheduler: waiting: set(resultstage 1) 16/01/24 18:45:11 info tasksetmanager: finished task 0.0 in stage 0.0 (tid 0) in 174 ms on localhost (1/1) 16/01/24 18:45:11 info taskschedulerimpl: removed taskset 0.0, tasks have completed, pool  16/01/24 18:45:11 info dagscheduler: failed: set() 16/01/24 18:45:11 info dagscheduler: submitting resultstage 1 (mappartitionsrdd[8] @ count @ sparkstreamingapplication.java:32), has no missing parents 16/01/24 18:45:11 info memorystore: block broadcast_1 stored values in memory (estimated size 3.5 kb, free 10.8 kb) 16/01/24 18:45:11 info memorystore: block broadcast_1_piece0 stored bytes in memory (estimated size 2.0 kb, free 12.8 kb) 16/01/24 18:45:11 info blockmanagerinfo: added broadcast_1_piece0 in memory on localhost:35429 (size: 2.0 kb, free: 1092.4 mb) 16/01/24 18:45:11 info sparkcontext: created broadcast 1 broadcast @ dagscheduler.scala:1006 16/01/24 18:45:11 info dagscheduler: submitting 1 missing tasks resultstage 1 (mappartitionsrdd[8] @ count @ sparkstreamingapplication.java:32) 16/01/24 18:45:11 info taskschedulerimpl: adding task set 1.0 1 tasks 16/01/24 18:45:11 info tasksetmanager: starting task 0.0 in stage 1.0 (tid 1, localhost, partition 0,node_local, 1813 bytes) 16/01/24 18:45:11 info executor: running task 0.0 in stage 1.0 (tid 1) 16/01/24 18:45:11 info shuffleblockfetcheriterator: getting 1 non-empty blocks out of 1 blocks 16/01/24 18:45:11 info shuffleblockfetcheriterator: started 0 remote fetches in 8 ms 16/01/24 18:45:11 info executor: finished task 0.0 in stage 1.0 (tid 1). 1241 bytes result sent driver 16/01/24 18:45:11 info dagscheduler: resultstage 1 (print @ sparkstreamingapplication.java:33) finished in 0.068 s 16/01/24 18:45:11 info tasksetmanager: finished task 0.0 in stage 1.0 (tid 1) in 72 ms on localhost (1/1) 16/01/24 18:45:11 info taskschedulerimpl: removed taskset 1.0, tasks have completed, pool  16/01/24 18:45:11 info dagscheduler: job 0 finished: print @ sparkstreamingapplication.java:33, took 0.729150 s 16/01/24 18:45:11 info sparkcontext: starting job: print @ sparkstreamingapplication.java:33 16/01/24 18:45:11 info mapoutputtrackermaster: size of output statuses shuffle 0 144 bytes 16/01/24 18:45:11 info dagscheduler: got job 1 (print @ sparkstreamingapplication.java:33) 1 output partitions 16/01/24 18:45:11 info dagscheduler: final stage: resultstage 3 (print @ sparkstreamingapplication.java:33) 16/01/24 18:45:11 info dagscheduler: parents of final stage: list(shufflemapstage 2) 16/01/24 18:45:11 info dagscheduler: missing parents: list() 16/01/24 18:45:11 info dagscheduler: submitting resultstage 3 (mappartitionsrdd[8] @ count @ sparkstreamingapplication.java:32), has no missing parents 16/01/24 18:45:11 info memorystore: block broadcast_2 stored values in memory (estimated size 3.5 kb, free 16.3 kb) 16/01/24 18:45:11 info memorystore: block broadcast_2_piece0 stored bytes in memory (estimated size 2.0 kb, free 18.3 kb) 16/01/24 18:45:11 info blockmanagerinfo: added broadcast_2_piece0 in memory on localhost:35429 (size: 2.0 kb, free: 1092.4 mb) 16/01/24 18:45:11 info sparkcontext: created broadcast 2 broadcast @ dagscheduler.scala:1006 16/01/24 18:45:11 info dagscheduler: submitting 1 missing tasks resultstage 3 (mappartitionsrdd[8] @ count @ sparkstreamingapplication.java:32) 16/01/24 18:45:11 info taskschedulerimpl: adding task set 3.0 1 tasks 16/01/24 18:45:11 info tasksetmanager: starting task 0.0 in stage 3.0 (tid 2, localhost, partition 1,process_local, 1813 bytes) 16/01/24 18:45:11 info executor: running task 0.0 in stage 3.0 (tid 2) 16/01/24 18:45:11 info contextcleaner: cleaned accumulator 1 16/01/24 18:45:11 info shuffleblockfetcheriterator: getting 0 non-empty blocks out of 1 blocks 16/01/24 18:45:11 info shuffleblockfetcheriterator: started 0 remote fetches in 1 ms 16/01/24 18:45:11 info executor: finished task 0.0 in stage 3.0 (tid 2). 1163 bytes result sent driver 16/01/24 18:45:11 info dagscheduler: resultstage 3 (print @ sparkstreamingapplication.java:33) finished in 0.048 s 16/01/24 18:45:11 info dagscheduler: job 1 finished: print @ sparkstreamingapplication.java:33, took 0.112123 s ------------------------------------------- time: 1453657510000 ms ------------------------------------------- 0  16/01/24 18:45:11 info tasksetmanager: finished task 0.0 in stage 3.0 (tid 2) in 48 ms on localhost (1/1) 16/01/24 18:45:11 info taskschedulerimpl: removed taskset 3.0, tasks have completed, pool  16/01/24 18:45:11 info jobscheduler: finished job streaming job 1453657510000 ms.0 job set of time 1453657510000 ms 16/01/24 18:45:11 info jobscheduler: total delay: 1.318 s time 1453657510000 ms (execution: 0.963 s) 16/01/24 18:45:11 info fileinputdstream: cleared 0 old files older 1453657450000 ms:  16/01/24 18:45:11 info blockmanagerinfo: removed broadcast_1_piece0 on localhost:35429 in memory (size: 2.0 kb, free: 1092.4 mb) 16/01/24 18:45:11 info receivedblocktracker: deleting batches arraybuffer() 16/01/24 18:45:11 info contextcleaner: cleaned accumulator 2 16/01/24 18:45:11 info blockmanagerinfo: removed broadcast_0_piece0 on localhost:35429 in memory (size: 2.6 kb, free: 1092.4 mb) 16/01/24 18:45:11 info inputinfotracker: remove old batch metadata 

as can see @ lower end of output printing of tokens dstream 0. result should 3 because each line of input file in format xx | yy | zz ?!?!?

is there wrong in spark configuration or in usage of dstreams? ideas , suggestions!

spark's textfilestream creates stream watches directory new files only.

you have change path "data/", have put file directory when stream started.

please note new files detected , processed according documentation:

once moved, files must not changed. if files being continuously appended, new data not read.

however when file renamed, spark detects it.


Comments