scala - How do i pass Spark context to a function from foreach -


i need pass sparkcontext function , please suggest me how below scenario.

i have sequence, each element refers specific data source gets rdd , process them. have defined function takes spark context , data source , necessary things. curretly using while loop. but, foreach or map, can imply parallel processing. need spark context function, how can pass foreach.?

just sample code, cannot present actual code:

import org.apache.spark.sparkconf import org.apache.spark.sparkcontext import org.apache.spark.sql.sqlcontext  object roughwork {   def main(args: array[string]) {      val str = "hello,hw:how,sr:are,ws:you,re";     val conf = new sparkconf     conf.setmaster("local");     conf.setappname("app1");     val sc = new sparkcontext(conf);     val sqlcontext = new sqlcontext(sc);      val rdd = sc.parallelize(str.split(":"))     rdd.map(x => {println("==>"+x);passtest(sc, x)}).collect();    }    def passtest(context: sparkcontext, input: string) {     val rdd1 = context.parallelize(input.split(","));     rdd1.foreach(println)   } } 

you cannot pass sparkcontext around that. passtest run on an/the executor(s), while sparkcontext runs on driver.

if have double split that, 1 approach use flatmap:

rdd   .zipwithindex   .flatmap(l => {     val parts = l._1.split(",");     list.fill(parts.length)(l._2) zip parts})   .countbykey 

there may prettier ways, idea can use zipwithindex keep track line item came , use key-value pair rdd methods work on data.

if have more 1 key, or more structured data in general, can using spark sql dataframes (or datasets in latest version) , explode instead of flatmap.


Comments