i need pass sparkcontext function , please suggest me how below scenario.
i have sequence, each element refers specific data source gets rdd , process them. have defined function takes spark context , data source , necessary things. curretly using while loop. but, foreach or map, can imply parallel processing. need spark context function, how can pass foreach.?
just sample code, cannot present actual code:
import org.apache.spark.sparkconf import org.apache.spark.sparkcontext import org.apache.spark.sql.sqlcontext object roughwork { def main(args: array[string]) { val str = "hello,hw:how,sr:are,ws:you,re"; val conf = new sparkconf conf.setmaster("local"); conf.setappname("app1"); val sc = new sparkcontext(conf); val sqlcontext = new sqlcontext(sc); val rdd = sc.parallelize(str.split(":")) rdd.map(x => {println("==>"+x);passtest(sc, x)}).collect(); } def passtest(context: sparkcontext, input: string) { val rdd1 = context.parallelize(input.split(",")); rdd1.foreach(println) } }
you cannot pass sparkcontext around that. passtest
run on an/the executor(s), while sparkcontext runs on driver.
if have double split that, 1 approach use flatmap
:
rdd .zipwithindex .flatmap(l => { val parts = l._1.split(","); list.fill(parts.length)(l._2) zip parts}) .countbykey
there may prettier ways, idea can use zipwithindex
keep track line item came , use key-value pair rdd methods work on data.
if have more 1 key, or more structured data in general, can using spark sql dataframes (or datasets in latest version) , explode
instead of flatmap
.
Comments
Post a Comment