apache kafka - Flink streaming job switched to failed status -


i hava 8 nodes flink cluster , 5 nodes kafka cluster run wordcount job. in first case, lot of data generated , pushed kafka , flink job launched. works fine in case.
while in second case, flink streaming job launched first, data produced kafka topic. in case, flink job switched failed status. times fails after job launched. fails several minutes after job launched.

org.apache.flink.runtime.io.network.netty.exception.remotetransportexception: error @ remote task manager 'worker1/192.168.1.38:35240'. @ org.apache.flink.runtime.io.network.netty.partitionrequestclienthandler.decodemsg(partitionrequestclienthandler.java:241) @ org.apache.flink.runtime.io.network.netty.partitionrequestclienthandler.channelread(partitionrequestclienthandler.java:164) @ io.netty.channel.abstractchannelhandlercontext.invokechannelread(abstractchannelhandlercontext.java:308) @ io.netty.channel.abstractchannelhandlercontext.firechannelread(abstractchannelhandlercontext.java:294) @ io.netty.handler.codec.messagetomessagedecoder.channelread(messagetomessagedecoder.java:103) @ io.netty.channel.abstractchannelhandlercontext.invokechannelread(abstractchannelhandlercontext.java:308) @ io.netty.channel.abstractchannelhandlercontext.firechannelread(abstractchannelhandlercontext.java:294) @ io.netty.handler.codec.bytetomessagedecoder.channelread(bytetomessagedecoder.java:244) @ io.netty.channel.abstractchannelhandlercontext.invokechannelread(abstractchannelhandlercontext.java:308) @ io.netty.channel.abstractchannelhandlercontext.firechannelread(abstractchannelhandlercontext.java:294) @ io.netty.channel.defaultchannelpipeline.firechannelread(defaultchannelpipeline.java:846) @ io.netty.channel.nio.abstractniobytechannel$niobyteunsafe.read(abstractniobytechannel.java:131) @ io.netty.channel.nio.nioeventloop.processselectedkey(nioeventloop.java:511) @ io.netty.channel.nio.nioeventloop.processselectedkeysoptimized(nioeventloop.java:468) @ io.netty.channel.nio.nioeventloop.processselectedkeys(nioeventloop.java:382) @ io.netty.channel.nio.nioeventloop.run(nioeventloop.java:354) @ io.netty.util.concurrent.singlethreadeventexecutor$2.run(singlethreadeventexecutor.java:112) @ java.lang.thread.run(thread.java:745) caused by: org.apache.flink.runtime.io.network.partition.producerfailedexception @ org.apache.flink.runtime.io.network.netty.partitionrequestqueue.writeandflushnextmessageifpossible(partitionrequestqueue.java:164) @ org.apache.flink.runtime.io.network.netty.partitionrequestqueue.usereventtriggered(partitionrequestqueue.java:96) @ io.netty.channel.abstractchannelhandlercontext.invokeusereventtriggered(abstractchannelhandlercontext.java:279) @ io.netty.channel.abstractchannelhandlercontext.fireusereventtriggered(abstractchannelhandlercontext.java:265) @ io.netty.channel.channelinboundhandleradapter.usereventtriggered(channelinboundhandleradapter.java:108) @ io.netty.channel.abstractchannelhandlercontext.invokeusereventtriggered(abstractchannelhandlercontext.java:279) @ io.netty.channel.abstractchannelhandlercontext.fireusereventtriggered(abstractchannelhandlercontext.java:265) @ io.netty.channel.channelinboundhandleradapter.usereventtriggered(channelinboundhandleradapter.java:108) @ io.netty.channel.abstractchannelhandlercontext.invokeusereventtriggered(abstractchannelhandlercontext.java:279) @ io.netty.channel.abstractchannelhandlercontext.fireusereventtriggered(abstractchannelhandlercontext.java:265) @ io.netty.channel.channelinboundhandleradapter.usereventtriggered(channelinboundhandleradapter.java:108) @ io.netty.channel.abstractchannelhandlercontext.invokeusereventtriggered(abstractchannelhandlercontext.java:279) @ io.netty.channel.abstractchannelhandlercontext.access$500(abstractchannelhandlercontext.java:32) @ io.netty.channel.abstractchannelhandlercontext$6.run(abstractchannelhandlercontext.java:270) @ io.netty.util.concurrent.singlethreadeventexecutor.runalltasks(singlethreadeventexecutor.java:358) @ io.netty.channel.nio.nioeventloop.run(nioeventloop.java:357) ... 2 more 01/24/2016 22:21:32 keyed reduce -> sink: unnamed(29/32) switched failed  org.apache.flink.runtime.io.network.netty.exception.remotetransportexception: error @ remote task manager 'worker1/192.168.1.38:35240'. @ org.apache.flink.runtime.io.network.netty.partitionrequestclienthandler.decodemsg(partitionrequestclienthandler.java:241) @ org.apache.flink.runtime.io.network.netty.partitionrequestclienthandler.channelread(partitionrequestclienthandler.java:164) @ io.netty.channel.abstractchannelhandlercontext.invokechannelread(abstractchannelhandlercontext.java:308) @ io.netty.channel.abstractchannelhandlercontext.firechannelread(abstractchannelhandlercontext.java:294) @ io.netty.handler.codec.messagetomessagedecoder.channelread(messagetomessagedecoder.java:103) @ io.netty.channel.abstractchannelhandlercontext.invokechannelread(abstractchannelhandlercontext.java:308) @ io.netty.channel.abstractchannelhandlercontext.firechannelread(abstractchannelhandlercontext.java:294) @ io.netty.handler.codec.bytetomessagedecoder.channelread(bytetomessagedecoder.java:244) @ io.netty.channel.abstractchannelhandlercontext.invokechannelread(abstractchannelhandlercontext.java:308) @ io.netty.channel.abstractchannelhandlercontext.firechannelread(abstractchannelhandlercontext.java:294) @ io.netty.channel.defaultchannelpipeline.firechannelread(defaultchannelpipeline.java:846) @ io.netty.channel.nio.abstractniobytechannel$niobyteunsafe.read(abstractniobytechannel.java:131) @ io.netty.channel.nio.nioeventloop.processselectedkey(nioeventloop.java:511) @ io.netty.channel.nio.nioeventloop.processselectedkeysoptimized(nioeventloop.java:468) @ io.netty.channel.nio.nioeventloop.processselectedkeys(nioeventloop.java:382) @ io.netty.channel.nio.nioeventloop.run(nioeventloop.java:354) @ io.netty.util.concurrent.singlethreadeventexecutor$2.run(singlethreadeventexecutor.java:112) @ java.lang.thread.run(thread.java:745) 

in log file of worker4, error is:

23:03:43,786 info  org.apache.flink.runtime.taskmanager.task                     - source: custom source -> map -> flat map -> map (20/32) switched failed exception. java.lang.exception: error while fetching broker: exception partition 19: kafka.common.unknownexception     @ sun.reflect.nativeconstructoraccessorimpl.newinstance0(native method)     @ sun.reflect.nativeconstructoraccessorimpl.newinstance(nativeconstructoraccessorimpl.java:57)     @ sun.reflect.delegatingconstructoraccessorimpl.newinstance(delegatingconstructoraccessorimpl.java:45)     @ java.lang.reflect.constructor.newinstance(constructor.java:526)     @ java.lang.class.newinstance(class.java:383)     @ kafka.common.errormapping$.exceptionfor(errormapping.scala:86)     @ kafka.common.errormapping.exceptionfor(errormapping.scala)     @ org.apache.flink.streaming.connectors.kafka.internals.legacyfetcher$simpleconsumerthread.run(legacyfetcher.java:406)      @ org.apache.flink.streaming.connectors.kafka.internals.legacyfetcher.run(legacyfetcher.java:242)     @ org.apache.flink.streaming.connectors.kafka.flinkkafkaconsumer.run(flinkkafkaconsumer.java:397)     @ org.apache.flink.streaming.api.operators.streamsource.run(streamsource.java:58)     @ org.apache.flink.streaming.runtime.tasks.sourcestreamtask.run(sourcestreamtask.java:55)     @ org.apache.flink.streaming.runtime.tasks.streamtask.invoke(streamtask.java:218)     @ org.apache.flink.runtime.taskmanager.task.run(task.java:584)     @ java.lang.thread.run(thread.java:745) caused by: java.io.ioexception: error while fetching broker: exception partition 19: kafka.common.unknownexception     @ sun.reflect.nativeconstructoraccessorimpl.newinstance0(native method)     @ sun.reflect.nativeconstructoraccessorimpl.newinstance(nativeconstructoraccessorimpl.java:57)     @ sun.reflect.delegatingconstructoraccessorimpl.newinstance(delegatingconstructoraccessorimpl.java:45)     @ java.lang.reflect.constructor.newinstance(constructor.java:526)     @ java.lang.class.newinstance(class.java:383)     @ kafka.common.errormapping$.exceptionfor(errormapping.scala:86)     @ kafka.common.errormapping.exceptionfor(errormapping.scala)     @ org.apache.flink.streaming.connectors.kafka.internals.legacyfetcher$simpleconsumerthread.run(legacyfetcher.java:406)      @ org.apache.flink.streaming.connectors.kafka.internals.legacyfetcher$simpleconsumerthread.run(legacyfetcher.java:422) 

before unknowexception, there logs related zookeeper:

08:58:47,720 info  org.i0itec.zkclient.zkeventthread                                 - terminate zkclient event thread. 08:58:47,737 info  org.apache.zookeeper.zookeeper                                - session: 0x15277fbb7c70020 closed 08:58:47,737 info  org.apache.zookeeper.clientcnxn                               - eventthread shut down 08:58:47,737 info  org.apache.flink.runtime.taskmanager.task                     - source: custom source -> map -> flat map -> map (6/32) switched failed exception. 

the root cause of error

exception partition 19: kafka.common.unknownexception     @ sun.reflect.nativeconstructoraccessorimpl.newinstance0(native method)     @ sun.reflect.nativeconstructoraccessorimpl.newinstance(nativeconstructoraccessorimpl.java:57)     @ sun.reflect.delegatingconstructoraccessorimpl.newinstance(delegatingconstructoraccessorimpl.java:45)     @ java.lang.reflect.constructor.newinstance(constructor.java:526)     @ java.lang.class.newinstance(class.java:383)     @ kafka.common.errormapping$.exceptionfor(errormapping.scala:86)     @ kafka.common.errormapping.exceptionfor(errormapping.scala)     @ org.apache.flink.streaming.connectors.kafka.internals.legacyfetcher$simpleconsumerthread.run(legacyfetcher.java:406)     @ org.apache.flink.streaming.connectors.kafka.internals.legacyfetcher.run(legacyfetcher.java:242)     @ org.apache.flink.streaming.connectors.kafka.flinkkafkaconsumer.run(flinkkafkaconsumer.java:397)     @ org.apache.flink.streaming.api.operators.streamsource.run(streamsource.java:58)     @ org.apache.flink.streaming.runtime.tasks.sourcestreamtask.run(sourcestreamtask.java:55)     @ org.apache.flink.streaming.runtime.tasks.streamtask.invoke(streamtask.java:218)     @ org.apache.flink.runtime.taskmanager.task.run(task.java:584)     @ java.lang.thread.run(thread.java:745) 

the unknownexception triggered error on kafka side:

[2016-01-25 12:45:30,195] error [replica manager on broker 2]: error when processing fetch request partition [wordcount,4] offset 335517 consumer correlation id 0. possible cause: attempt read maximum offset (335515) less start offset (335517). (kafka.server.replicamanager) 

i've filed jira in flink problem: https://issues.apache.org/jira/browse/flink-3288


Comments