1.how to insert alternative columns of a csv file into spark?
2.see this scenario
Input file:-
name^age^state
swathi^23^us
srivani^24^UK
ram^25^London
scala> case class schema(name:String,age:Int,brand_code:String)
scala> val rdd = sc.textFile("file://<file-path>/test1.csv")
scala> val rdd1= rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }
scala> val df1 = rdd1.map(_.split("\\^")).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()
(or)
scala> val df1 = rdd1.map(_.split('^')).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()
Output:-
scala> df1.show()
+-------+---+----------+
| name|age|brand_code|
+-------+---+----------+
| swathi| 23| us|
|srivani| 24| UK|
| ram| 25| London|
+-------+---+----------+
3.how will you add coumn name in a text file?
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType,StringType,StructField,StructType}
val schema =new StructType().add(StructField("name",StringType,true)).add(StructField("age",IntegerType,true)).add(StructField("state",StringType,true))
val data = sc.textFile("/user/206571870/sample.csv")
val header = data.first()
or
val df1 = sc.textFile("testfile.txt").Map(_.split('|')).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()
2.see this scenario
Input file:-
name^age^state
swathi^23^us
srivani^24^UK
ram^25^London
scala> case class schema(name:String,age:Int,brand_code:String)
scala> val rdd = sc.textFile("file://<file-path>/test1.csv")
scala> val rdd1= rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }
scala> val df1 = rdd1.map(_.split("\\^")).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()
(or)
scala> val df1 = rdd1.map(_.split('^')).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()
Output:-
scala> df1.show()
+-------+---+----------+
| name|age|brand_code|
+-------+---+----------+
| swathi| 23| us|
|srivani| 24| UK|
| ram| 25| London|
+-------+---+----------+
3.how will you add coumn name in a text file?
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType,StringType,StructField,StructType}
val schema =new StructType().add(StructField("name",StringType,true)).add(StructField("age",IntegerType,true)).add(StructField("state",StringType,true))
val data = sc.textFile("/user/206571870/sample.csv")
val header = data.first()
- val rdd = data.filter(row => row != header)
- val rowsRDD = rdd.map(x => x.split(",")).map(x => Row(x(0),x(1).toInt,x(2)))
- val df = sqlContext.createDataFrame(rowsRDD,schema)
or
val df1 = sc.textFile("testfile.txt").Map(_.split('|')).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF()
No comments:
Post a Comment