Monday, 6 May 2019

useful scenario in bigdata

1.how to insert alternative columns of a csv file into spark?


2.see this scenario
Input file:-

name^age^state
swathi^23^us
srivani^24^UK
ram^25^London
scala> case class schema(name:String,age:Int,brand_code:String)
scala> val rdd = sc.textFile("file://<file-path>/test1.csv")
scala> val rdd1= rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }
scala> val df1 = rdd1.map(_.split("\\^")).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()
(or)
scala> val df1 = rdd1.map(_.split('^')).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()
Output:-

scala> df1.show()
+-------+---+----------+
|   name|age|brand_code|
+-------+---+----------+
| swathi| 23|        us|
|srivani| 24|        UK|
|    ram| 25|    London|
+-------+---+----------+


3.how will you add coumn name in a text file?

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType,StringType,StructField,StructType}


val schema =new StructType().add(StructField("name",StringType,true)).add(StructField("age",IntegerType,true)).add(StructField("state",StringType,true))


val data = sc.textFile("/user/206571870/sample.csv")
val header = data.first()
  1. val rdd = data.filter(row => row != header)
  2. val rowsRDD = rdd.map(x => x.split(",")).map(x => Row(x(0),x(1).toInt,x(2)))
  3. val df = sqlContext.createDataFrame(rowsRDD,schema)

or

val df1 = sc.textFile("testfile.txt").Map(_.split('|')).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()





No comments:

Post a Comment