Hoe rdd-object naar dataframe in spark te converteren

Hoe kan ik een RDD (org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]) converteren naar een Dataframe org.apache.spark.sql.DataFrame. Ik heb een dataframe geconverteerd naar rdd met behulp van .rdd. Na het verwerken wil ik het terug in het dataframe. Hoe kan ik dit doen?


Antwoord 1, autoriteit 100%

SparkSessionheeft een aantal createDataFramemethoden die een DataFramemaken met een rdd. Ik kan me voorstellen dat een van deze voor jouw context zal werken.

Bijvoorbeeld:

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

Maakt een DataFrame van een RDD met rijen met behulp van de gegeven
schema.


Antwoord 2, autoriteit 91%

Deze code werkt perfect vanaf Spark 2.x met Scala 2.11

Benodigde lessen importeren

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}

Maak SparkSession-object, en hier is het spark

val spark: SparkSession = SparkSession.builder.master("local").getOrCreate
val sc = spark.sparkContext // Just used to create test RDDs

Laten we een rddgebruiken om het DataFrame

te maken

val rdd = sc.parallelize(
  Seq(
    ("first", Array(2.0, 1.0, 2.1, 5.4)),
    ("test", Array(1.5, 0.5, 0.9, 3.7)),
    ("choose", Array(8.0, 2.9, 9.1, 2.5))
  )
)

Methode 1

Gebruik SparkSession.createDataFrame(RDD obj).

val dfWithoutSchema = spark.createDataFrame(rdd)
dfWithoutSchema.show()
+------+--------------------+
|    _1|                  _2|
+------+--------------------+
| first|[2.0, 1.0, 2.1, 5.4]|
|  test|[1.5, 0.5, 0.9, 3.7]|
|choose|[8.0, 2.9, 9.1, 2.5]|
+------+--------------------+

Methode 2

Gebruik SparkSession.createDataFrame(RDD obj)en specificeer kolomnamen.

val dfWithSchema = spark.createDataFrame(rdd).toDF("id", "vals")
dfWithSchema.show()
+------+--------------------+
|    id|                vals|
+------+--------------------+
| first|[2.0, 1.0, 2.1, 5.4]|
|  test|[1.5, 0.5, 0.9, 3.7]|
|choose|[8.0, 2.9, 9.1, 2.5]|
+------+--------------------+

Methode 3 (Eigenlijk antwoord op de vraag)

Op deze manier moet de invoer rddvan het type RDD[Row]zijn.

val rowsRdd: RDD[Row] = sc.parallelize(
  Seq(
    Row("first", 2.0, 7.0),
    Row("second", 3.5, 2.5),
    Row("third", 7.0, 5.9)
  )
)

maak het schema

val schema = new StructType()
  .add(StructField("id", StringType, true))
  .add(StructField("val1", DoubleType, true))
  .add(StructField("val2", DoubleType, true))

Pas nu zowel rowsRddals schematoe op createDataFrame()

val df = spark.createDataFrame(rowsRdd, schema)
df.show()
+------+----+----+
|    id|val1|val2|
+------+----+----+
| first| 2.0| 7.0|
|second| 3.5| 2.5|
| third| 7.0| 5.9|
+------+----+----+

Antwoord 3, autoriteit 72%

Ervan uitgaande dat uw RDD[rij] rdd heet, kunt u het volgende gebruiken:

val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._
rdd.toDF()

Antwoord 4, autoriteit 21%

Opmerking: dit antwoord is oorspronkelijk hier

geplaatst

Ik plaats dit antwoord omdat ik aanvullende details wil delen over de beschikbare opties die ik niet in de andere antwoorden heb gevonden


Er zijn twee hoofdopties om een DataFrame te maken van een RDD met rijen:

1)Zoals al aangegeven, zou je toDF()kunnen gebruiken die kan worden geïmporteerd door import sqlContext.implicits._. Deze aanpak werkt echter alleen voor de volgende typen RDD’s:

  • RDD[Int]
  • RDD[Long]
  • RDD[String]
  • RDD[T <: scala.Product]

(bron: Scaladocvan het SQLContext.implicits-object)

De laatste handtekening betekent eigenlijk dat het kan werken voor een RDD van tuples of een RDD van hoofdletterklassen (omdat tuples en hoofdletterklassen subklassen zijn van scala.Product).

Dus, om deze benadering te gebruiken voor een RDD[Row], moet je deze toewijzen aan een RDD[T <: scala.Product]. Dit kan worden gedaan door elke rij toe te wijzen aan een aangepaste case-klasse of aan een tuple, zoals in de volgende codefragmenten:

val df = rdd.map({ 
  case Row(val1: String, ..., valN: Long) => (val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")

of

case class MyClass(val1: String, ..., valN: Long = 0L)
val df = rdd.map({ 
  case Row(val1: String, ..., valN: Long) => MyClass(val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")

Het belangrijkste nadeel van deze benadering (naar mijn mening) is dat je het schema van het resulterende DataFrame expliciet moet instellen in de kaartfunctie, kolom voor kolom. Misschien kan dit programmatisch worden gedaan als je het schema niet van tevoren kent, maar daar kan het een beetje rommelig worden. Als alternatief is er dus een andere optie:


2)U kunt createDataFrame(rowRDD: RDD[Row], schema: StructType)gebruiken zoals in het geaccepteerde antwoord, dat beschikbaar is in de SQLContext-object . Voorbeeld voor het converteren van een RDD van een oud DataFrame:

val rdd = oldDF.rdd
val newDF = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)

Merk op dat het niet nodig is om een ​​schema-kolom expliciet in te stellen. We hergebruiken het schema van het oude DF, dat is van StructTypeKlasse en kan eenvoudig worden uitgebreid. Deze aanpak is echter niet mogelijk en kan in sommige gevallen minder efficiënt zijn dan de eerste.


5, Autoriteit 10%

Methode 1: (Scala)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val df_2 = sc.parallelize(Seq((1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c"))).toDF("x", "y", "z")

Methode 2: (Scala)

case class temp(val1: String,val3 : Double) 
val rdd = sc.parallelize(Seq(
  Row("foo",  0.5), Row("bar",  0.0)
))
val rows = rdd.map({case Row(val1:String,val3:Double) => temp(val1,val3)}).toDF()
rows.show()

Methode 1: (Python)

from pyspark.sql import Row
l = [('Alice',2)]
Person = Row('name','age')
rdd = sc.parallelize(l)
person = rdd.map(lambda r:Person(*r))
df2 = sqlContext.createDataFrame(person)
df2.show()

Methode 2: (Python)

from pyspark.sql.types import * 
l = [('Alice',2)]
rdd = sc.parallelize(l)
schema =  StructType([StructField ("name" , StringType(), True) , 
StructField("age" , IntegerType(), True)]) 
df3 = sqlContext.createDataFrame(rdd, schema) 
df3.show()

Geëxtraheerd de waarde uit de reeks object en vervolgens toegepast bij klasse converteren RDD DF

val temp1 = attrib1.map{case Row ( key: Int ) => s"$key" }
val temp2 = attrib2.map{case Row ( key: Int) => s"$key" }
case class RLT (id: String, attrib_1 : String, attrib_2 : String)
import hiveContext.implicits._
val df = result.map{ s => RLT(s(0),s(1),s(2)) }.toDF

6, Autoriteit 6%

Hier is een eenvoudig voorbeeld van het omzetten van uw lijst in Spark RDD en vervolgens het omzetten van die Spark RDD in dataframe.

Houd er rekening mee dat ik Spark-shell Scala REPL hebben gebruikt om volgende code uit te voeren, Hier sc is een voorbeeld van SparkContext dat impliciet beschikbaar is in Spark-shell. Hoop dat het antwoord op uw vraag.

scala> val numList = List(1,2,3,4,5)
numList: List[Int] = List(1, 2, 3, 4, 5)
scala> val numRDD = sc.parallelize(numList)
numRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[80] at parallelize at <console>:28
scala> val numDF = numRDD.toDF
numDF: org.apache.spark.sql.DataFrame = [_1: int]
scala> numDF.show
+---+
| _1|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
+---+

Antwoord 7, autoriteit 4%

Op nieuwere versies van Spark (2.0+)

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val spark = SparkSession
  .builder()
  .getOrCreate()
import spark.implicits._
val dfSchema = Seq("col1", "col2", "col3")
rdd.toDF(dfSchema: _*)

Antwoord 8

One needs to create a schema, and attach it to the Rdd.

Ervan uitgaande dat val spark een product is van een SparkSession.builder…

   import org.apache.spark._
    import org.apache.spark.sql._       
    import org.apache.spark.sql.types._
    /* Lets gin up some sample data:
     * As RDD's and dataframes can have columns of differing types, lets make our
     * sample data a three wide, two tall, rectangle of mixed types.
     * A column of Strings, a column of Longs, and a column of Doubules 
     */
    val arrayOfArrayOfAnys = Array.ofDim[Any](2,3)
    arrayOfArrayOfAnys(0)(0)="aString"
    arrayOfArrayOfAnys(0)(1)=0L
    arrayOfArrayOfAnys(0)(2)=3.14159
    arrayOfArrayOfAnys(1)(0)="bString"
    arrayOfArrayOfAnys(1)(1)=9876543210L
    arrayOfArrayOfAnys(1)(2)=2.71828
    /* The way to convert an anything which looks rectangular, 
     * (Array[Array[String]] or Array[Array[Any]] or Array[Row], ... ) into an RDD is to 
     * throw it into sparkContext.parallelize.
     * http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext shows
     * the parallelize definition as 
     *     def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)
     * so in our case our ArrayOfArrayOfAnys is treated as a sequence of ArraysOfAnys.
     * Will leave the numSlices as the defaultParallelism, as I have no particular cause to change it. 
     */
    val rddOfArrayOfArrayOfAnys=spark.sparkContext.parallelize(arrayOfArrayOfAnys)
    /* We'll be using the sqlContext.createDataFrame to add a schema our RDD.
     * The RDD which goes into createDataFrame is an RDD[Row] which is not what we happen to have.
     * To convert anything one tall and several wide into a Row, one can use Row.fromSeq(thatThing.toSeq)
     * As we have an RDD[somethingWeDontWant], we can map each of the RDD rows into the desired Row type. 
     */     
    val rddOfRows=rddOfArrayOfArrayOfAnys.map(f=>
        Row.fromSeq(f.toSeq)
    )
    /* Now to construct our schema. This needs to be a StructType of 1 StructField per column in our dataframe.
     * https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.StructField shows the definition as
     *   case class StructField(name: String, dataType: DataType, nullable: Boolean = true, metadata: Metadata = Metadata.empty)
     * Will leave the two default values in place for each of the columns:
     *        nullability as true, 
     *        metadata as an empty Map[String,Any]
     *   
     */
    val schema = StructType(
        StructField("colOfStrings", StringType) ::
        StructField("colOfLongs"  , LongType  ) ::
        StructField("colOfDoubles", DoubleType) ::
        Nil
    )
    val df=spark.sqlContext.createDataFrame(rddOfRows,schema)
    /*
     *      +------------+----------+------------+
     *      |colOfStrings|colOfLongs|colOfDoubles|
     *      +------------+----------+------------+
     *      |     aString|         0|     3.14159|
     *      |     bString|9876543210|     2.71828|
     *      +------------+----------+------------+
    */ 
    df.show 

dezelfde stappen, maar met minder valse verklaringen:

   val arrayOfArrayOfAnys=Array(
        Array("aString",0L         ,3.14159),
        Array("bString",9876543210L,2.71828)
    )
    val rddOfRows=spark.sparkContext.parallelize(arrayOfArrayOfAnys).map(f=>Row.fromSeq(f.toSeq))
    /* If one knows the datatypes, for instance from JDBC queries as to RDBC column metadata:
     * Consider constructing the schema from an Array[StructField].  This would allow looping over 
     * the columns, with a match statement applying the appropriate sql datatypes as the second
     *  StructField arguments.   
     */
    val sf=new Array[StructField](3)
    sf(0)=StructField("colOfStrings",StringType)
    sf(1)=StructField("colOfLongs"  ,LongType  )
    sf(2)=StructField("colOfDoubles",DoubleType)        
    val df=spark.sqlContext.createDataFrame(rddOfRows,StructType(sf.toList))
    df.show

9

Ik heb geprobeerd de oplossing uit te leggen met behulp van het probleem Word Count .
1. Lees het bestand met SC

  1. produceren Word Count
  2. Methoden om DF

    te maken

    • RDD.TODF-methode
    • rdd.todf (“woord”, “tellen”)
      • Spark.Createdataframe (RDD, SCHEMA)

    Lees bestand met behulp van Spark

    val rdd=sc.textFile("D://cca175/data/")  
    

    RDD naar DATAFAME

    val df=sc.textFile(“D://cca175/data/”).toDF(“t1”)
    df.show

    Methode 1

    Creëer woordentelling RDD naar dataframe

    val df=rdd.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>(x+y)).toDF("word","count")
    

    Methode2

    Maak een dataframe van Rdd

    val df=spark.createDataFrame(wordRdd) 
    # with header   
    val df=spark.createDataFrame(wordRdd).toDF("word","count")  df.show
    

    Methode3

    Schema definiëren

    org.apache.spark.sql.types._ importeren

    val schema=new StructType().
    add(StructField(“woord”,StringType,true)).
    add(StructField(“count”,StringType,true))

    RijRDD maken

    import org.apache.spark.sql.Row
    val rowRdd=wordRdd.map(x=>(Row(x._1,x._2)))     
    

    Maak DataFrame van RDD met schema

    val df=spark.createDataFrame(rowRdd,schema)
    df.show


Antwoord 10

Om een Array[Row] naar DataFrame of Dataset te converteren, werkt het volgende elegant:

Stel, schema is het StructType voor de rij, dan

val rows: Array[Row]=...
implicit val encoder = RowEncoder.apply(schema)
import spark.implicits._
rows.toDS

Other episodes