reduceByKey |
aggregateByKey |
groupByKey |
---|---|---|
uses combiner | uses combiner | no combiner |
take one parameter as function-for SeqOp and CombOp | take two parameter as function-one for SeqOp and another CombOp | no parameters as functions .Generally followed by map or flatMap |
Implicit combiner | Explicit combiner | no combiner |
seqOp or combiner logic are same as combOp or final reduce logic | seqOp or combiner logic are different from combOp or final reduce logic | no combiner |
input and output value type need to be same | input and output value type can be different | no parameters |
Performance is high for aggregations | Performance is high for aggregations | Relatively slow for aggregations |
only aggregations | only aggregations | Any by key transformation-aggreagaton,sorting,ranking etc |
Spark groupbyKey vs reduceByKey vs aggregateByKey
ReduceByKey
While both reducebykey and groupbykey will produce the same answer, the reduceByKey example works much better on a large dataset. That’s because Spark knows it can combine output with a common key on each partition before shuffling the data.
On the other hand, when calling groupByKey – all the key-value pairs are shuffled around. This is a lot of unnessary data to being transferred over the network.
Syntax:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey((x,y)=> (x+y))
Data is combined at each partition , only one output for one key at each partition to send over network. reduceByKey required combining all your values into another value with the exact same type.
GroupByKey – groupByKey([numTasks])
It doesn’t merge the values for the key but directly the shuffle process happens and here lot of data gets sent to each partition, almost same as the initial data.
And the merging of values for each key is done after the shuffle. Here lot of data stored on final worker node so resulting in out of memory issue.
Syntax:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" ") )
.map(word => (word,1))
.groupByKey()
.map((x,y) => (x,sum(y)) )
groupByKey can cause out of disk problems as data is sent over the network and collected on the reduce workers
AggregateByKey – aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) It is similar to reduceByKey but you can provide initial values when performing aggregation.
same as reduceByKey, which takes an initial value.
3 parameters as input i. initial value ii. Combiner logic iii. sequence op logic
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.paralleliz reduceByKey() e(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
output: Aggregate By Key sum Results bar -> 3 foo -> 5
Comparison between groupByKey, reduceByKey and aggregateByKey
groupByKey() is just to group your dataset based on a key.
reduceByKey() is something like grouping + aggregation.
reduceByKey can be used when we run on large data set.
reduceByKey when the input and output value types are of same type over aggregateByKey
aggregateByKey() is logically same as reduceByKey() but it lets you return result in different type. In another words,
it lets you have a input as type x and aggregate result as type y. For example (1,2),(1,4) as input and (1,”six”) as output.
ReduceByKey
While both reducebykey and groupbykey will produce the same answer, the reduceByKey example works much better on a large dataset. That’s because Spark knows it can combine output with a common key on each partition before shuffling the data.
On the other hand, when calling groupByKey – all the key-value pairs are shuffled around. This is a lot of unnessary data to being transferred over the network.
Syntax:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey((x,y)=> (x+y))
Data is combined at each partition , only one output for one key at each partition to send over network. reduceByKey required combining all your values into another value with the exact same type.
GroupByKey – groupByKey([numTasks])
It doesn’t merge the values for the key but directly the shuffle process happens and here lot of data gets sent to each partition, almost same as the initial data.
And the merging of values for each key is done after the shuffle. Here lot of data stored on final worker node so resulting in out of memory issue.
Syntax:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" ") )
.map(word => (word,1))
.groupByKey()
.map((x,y) => (x,sum(y)) )
groupByKey can cause out of disk problems as data is sent over the network and collected on the reduce workers
AggregateByKey – aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) It is similar to reduceByKey but you can provide initial values when performing aggregation.
same as reduceByKey, which takes an initial value.
3 parameters as input i. initial value ii. Combiner logic iii. sequence op logic
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.paralleliz reduceByKey() e(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
output: Aggregate By Key sum Results bar -> 3 foo -> 5
Comparison between groupByKey, reduceByKey and aggregateByKey
groupByKey() is just to group your dataset based on a key.
reduceByKey() is something like grouping + aggregation.
reduceByKey can be used when we run on large data set.
reduceByKey when the input and output value types are of same type over aggregateByKey
aggregateByKey() is logically same as reduceByKey() but it lets you return result in different type. In another words,
it lets you have a input as type x and aggregate result as type y. For example (1,2),(1,4) as input and (1,”six”) as output.
No comments:
Post a Comment