-
Notifications
You must be signed in to change notification settings - Fork 346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added Spark Dataset support #814
base: develop
Are you sure you want to change the base?
Conversation
This is great! thanks for the PR. I'd like to merge it. you need to accept the contributor agreement above. Also, can you run |
|
||
package object implicits { | ||
|
||
import com.twitter.algebird.BF |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you move the imports to the top of the file?
/** | ||
* spark exposes an Aggregator type, so this is here to avoid shadowing | ||
*/ | ||
type AlgebirdAggregator[A, B, C] = Aggregator[A, B, C] | ||
val AlgebirdAggregator = Aggregator | ||
|
||
implicit class ToAlgebird[T](val rdd: RDD[T]) extends AnyVal { | ||
implicit class ToAlgebirdRDD[T](val rdd: RDD[T]) extends AnyVal { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we not change the name of this? this will break source and binary compatibility for people.
} | ||
|
||
after { | ||
// try spark.stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that we can remove this after
block?
@@ -0,0 +1,29 @@ | |||
package com.twitter.algebird.spark | |||
|
|||
package object implicits { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I would actually create an EncoderInstances
trait
and have package object spark extends EncoderInstances
.
Users will do import com.twitter.algebird.spark._
and they will get these lower priority implicits
. I can see that this was to possibly be inline with spark way of things, but I think this is more consistent with the rest of algebird.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this and it didn't work with spark.implicits._
. So I'm moving them into the package object spark
instead.
|
||
import scala.reflect.ClassTag | ||
implicit def kryoPriorityQueueEncoder[A](implicit ct: ClassTag[PriorityQueue[A]]): Encoder[PriorityQueue[A]] = | ||
org.apache.spark.sql.Encoders.kryo[PriorityQueue[A]](ct) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
org.apache.spark.sql.Encoders.kryo[PriorityQueue[A]](ct) | |
org.apache.spark.sql.Encoders.kryo[PriorityQueue[A]] |
iirc ClassTag
is a context bound so the above should be possible?
import org.apache.spark.sql.SparkSession | ||
import org.apache.spark.sql.Encoder | ||
import org.apache.spark.sql.Dataset | ||
import com.twitter.algebird.BloomFilter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems unused?
* above to at least check compilation | ||
*/ | ||
test("aggregate") { | ||
val sparkImplicits = spark.implicits |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I would move these to #L24 to be more evident that these are indeed the SparkSession
implicitis
@Tomczik76 I'd love to include this. Any idea when you might have time to get back around to it? |
No description provided.