Skip to content Skip to sidebar Skip to footer

Unable To Write Pyspark Dataframe Created From Two Zipped Dataframes

I am trying to follow the example given here for combining two dataframes without a shared join key (combining by 'index' in a database table or pandas dataframe, except that PySpa

Solution 1:

You can temporarily switch to RDDs and add an index with zipWithIndex. This index can then be used as join criterium:

#create rdds with an additional index#as zipWithIndex adds the index as second column, we have to switch#the first and second column
left = left_df.rdd.zipWithIndex().map(lambda a: (a[1], a[0]))
right= right_df.rdd.zipWithIndex().map(lambda a: (a[1], a[0]))

#join both rdds 
joined = left.fullOuterJoin(right)

#restore the original columns
result = spark.createDataFrame(joined).select("_2._1.*", "_2._2.*")

The Javadoc of zipWithIndex states that

Some RDDs, such as those returned by groupBy(), do not guarantee order of elements in a partition.

Depending on the nature of the original datasets, this code might not produce deterministic results.

Solution 2:

RDD's are old hat, but answering from that perspective the error.

From la Trobe University http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#zip the following:

Joins two RDDs by combining the i-th of either partition with each other. The resulting RDD will consist of two-component tuples which are interpreted as key-value pairs by the methods provided by the PairRDDFunctions extension.

Note pair.

This means you must have the same partitioner with number of partitions and number of kv's per partition, else the definition above does not hold.

Best applied when reading in from files as repartition(n) may not give same distribution.

A little trick to get around that is to use zipWithIndex for the k of k, v, like so (Scala as not a pyspark specific aspect):

val rddA = sc.parallelize(Seq(
  ("ICCH 1", 10.0), ("ICCH 2", 10.0), ("ICCH 4", 100.0), ("ICCH 5", 100.0)
))
val rddAA = rddA.zipWithIndex().map(x => (x._2, x._1)).repartition(5)

val rddB = sc.parallelize(Seq(
  (10.0, "A"), (64.0, "B"), (39.0, "A"), (9.0, "C"), (80.0, "D"), (89.0, "D")
))
val rddBB = rddA.zipWithIndex().map(x => (x._2, x._1)).repartition(5)

val zippedRDD = (rddAA zip rddBB).map{ case ((id, x), (y, c)) => (id, x, y, c) }
zippedRDD.collect

The repartition(n) then seems to work as the k is the same type.

But you must have same num elements per partition. It is what it is, but it makes sense.

Post a Comment for "Unable To Write Pyspark Dataframe Created From Two Zipped Dataframes"