Skip to content
This repository has been archived by the owner on Sep 2, 2020. It is now read-only.

RxJava Support #121

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bolts-tasks/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ sourceSets {
dependencies {
provided 'com.google.android:android:4.1.1.4'

compile 'io.reactivex:rxjava:1.1.5'

testCompile 'junit:junit:4.12'
}

Expand Down
59 changes: 59 additions & 0 deletions bolts-tasks/src/main/java/bolts/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import rx.Completable;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func0;

/**
* Represents the result of an asynchronous operation.
*
Expand Down Expand Up @@ -1015,6 +1020,60 @@ private void runContinuations() {
}
}

/**
* creates an Rx Observable or Rx Completable from a Parse Task
*
* @param isNullable if the task is nullable this will get used to generate an {@link Completable}
* @return {@link Observable<TResult>}
*/
private Observable<TResult> asObservable(boolean isNullable) {
return Observable.defer(new Func0<Observable<TResult>>() {
@Override
public Observable<TResult> call() {
return Observable.create(new Observable.OnSubscribe<TResult>() {
@Override
public void call(Subscriber<? super TResult> subscriber) {
continueWith(new Continuation<TResult, Object>() {
@Override
public Object then(Task<TResult> task) throws Exception {
if (task.isCancelled()) {
// NOTICE: doOnUnsubscribe(() -> Observable.just(query) in outside
subscriber.unsubscribe(); //sub.onCompleted();?
} else if (task.isFaulted()) {
Throwable error = task.getError();
subscriber.onError(error);
} else {
TResult result = task.getResult();
if (isNullable || result != null) subscriber.onNext(result);
subscriber.onCompleted();
}
return null;
}
});
}
});
}
});
}

/**
* creates an Rx Observable from a Parse Task
*
* @return {@link Observable<TResult>}
*/
public Observable<TResult> asObservable() {
return asObservable(false);
}

/**
* creates an Rx Completable from a Parse Task
*
* @return {@link Completable}
*/
public Completable asCompletable() {
return asObservable(true).toCompletable();
}

/**
* @deprecated Please use {@link bolts.TaskCompletionSource} instead.
*/
Expand Down