diff --git a/DaoCore/build.gradle b/DaoCore/build.gradle index 0e86d8429..e3e428112 100644 --- a/DaoCore/build.gradle +++ b/DaoCore/build.gradle @@ -20,6 +20,7 @@ dependencies { compileOnly 'com.google.android:support-v4:r7' compileOnly 'io.reactivex:rxjava:1.1.8' + compileOnly 'io.reactivex.rxjava2:rxjava:2.0.9' compileOnly files('libs/sqlcipher.jar') } diff --git a/DaoCore/src/main/java/org/greenrobot/greendao/AbstractDao.java b/DaoCore/src/main/java/org/greenrobot/greendao/AbstractDao.java index de08fe2c2..17fbd4fa2 100644 --- a/DaoCore/src/main/java/org/greenrobot/greendao/AbstractDao.java +++ b/DaoCore/src/main/java/org/greenrobot/greendao/AbstractDao.java @@ -33,6 +33,7 @@ import org.greenrobot.greendao.query.Query; import org.greenrobot.greendao.query.QueryBuilder; import org.greenrobot.greendao.rx.RxDao; +import org.greenrobot.greendao.rx2.Rx2Dao; import java.util.ArrayList; import java.util.Arrays; @@ -73,6 +74,9 @@ public abstract class AbstractDao { private volatile RxDao rxDao; private volatile RxDao rxDaoPlain; + private volatile Rx2Dao rx2Dao; + private volatile Rx2Dao rx2DaoPlain; + public AbstractDao(DaoConfig config) { this(config, null); } @@ -964,6 +968,34 @@ public RxDao rx() { return rxDao; } + /** + * The returned Rx2Dao is a special DAO that let's you interact with Rx Observables without any Scheduler set + * for subscribeOn. + * + * @see #rx2() + */ + @Experimental + public Rx2Dao rx2Plain() { + if (rx2DaoPlain == null) { + rx2DaoPlain = new Rx2Dao<>(this); + } + return rx2DaoPlain; + } + + /** + * The returned Rx2Dao is a special DAO that let's you interact with Rx Observables using RX's IO scheduler for + * subscribeOn. + * + * @see #rx2Plain() + */ + @Experimental + public Rx2Dao rx2() { + if (rx2Dao == null) { + rx2Dao = new Rx2Dao<>(this, io.reactivex.schedulers.Schedulers.io()); + } + return rx2Dao; + } + /** Gets the SQLiteDatabase for custom database access. Not needed for greenDAO entities. */ public Database getDatabase() { return db; diff --git a/DaoCore/src/main/java/org/greenrobot/greendao/query/Query.java b/DaoCore/src/main/java/org/greenrobot/greendao/query/Query.java index 6176f3bce..1a3c8bc09 100644 --- a/DaoCore/src/main/java/org/greenrobot/greendao/query/Query.java +++ b/DaoCore/src/main/java/org/greenrobot/greendao/query/Query.java @@ -22,6 +22,8 @@ import org.greenrobot.greendao.annotation.apihint.Internal; import org.greenrobot.greendao.rx.RxQuery; import org.greenrobot.greendao.rx.RxTransaction; +import org.greenrobot.greendao.rx2.Rx2Query; +import org.greenrobot.greendao.rx2.Rx2Transaction; import java.util.Date; import java.util.List; @@ -69,6 +71,9 @@ static Query create(AbstractDao dao, String sql, Object[] initia private volatile RxQuery rxTxPlain; private volatile RxQuery rxTxIo; + private volatile Rx2Query rx2TxPlain; + private volatile Rx2Query rx2TxIo; + private Query(QueryData queryData, AbstractDao dao, String sql, String[] initialValues, int limitPosition, int offsetPosition) { super(dao, sql, initialValues, limitPosition, offsetPosition); @@ -188,4 +193,34 @@ public RxQuery __InternalRx() { } return rxTxIo; } + + /** + * DO NOT USE. + * The returned {@link Rx2Transaction} allows getting query results using Rx Observables without any Scheduler set + * for subscribeOn. + * + * @see #__InternalRx() + */ + @Internal + public Rx2Query __internalRx2Plain() { + if (rx2TxPlain == null) { + rx2TxPlain = new Rx2Query(this); + } + return rx2TxPlain; + } + + /** + * DO NOT USE. + * The returned {@link Rx2Transaction} allows getting query results using Rx Observables using RX's IO scheduler for + * subscribeOn. + * + * @see #__internalRxPlain() + */ + @Internal + public Rx2Query __InternalRx2() { + if (rx2TxIo == null) { + rx2TxIo = new Rx2Query(this, io.reactivex.schedulers.Schedulers.io()); + } + return rx2TxIo; + } } diff --git a/DaoCore/src/main/java/org/greenrobot/greendao/query/QueryBuilder.java b/DaoCore/src/main/java/org/greenrobot/greendao/query/QueryBuilder.java index a0667c02e..8a030e4db 100644 --- a/DaoCore/src/main/java/org/greenrobot/greendao/query/QueryBuilder.java +++ b/DaoCore/src/main/java/org/greenrobot/greendao/query/QueryBuilder.java @@ -26,6 +26,7 @@ import org.greenrobot.greendao.annotation.apihint.Experimental; import org.greenrobot.greendao.internal.SqlUtils; import org.greenrobot.greendao.rx.RxQuery; +import org.greenrobot.greendao.rx2.Rx2Query; import java.util.ArrayList; import java.util.List; @@ -427,6 +428,22 @@ public List list() { return build().list(); } + /** + * Shorthand for {@link QueryBuilder#build() build()}.{@link Query#__InternalRx2()}. + */ + @Experimental + public Rx2Query rx2() { + return build().__InternalRx2(); + } + + /** + * Shorthand for {@link QueryBuilder#build() build()}.{@link Query#__internalRx2Plain()}. + */ + @Experimental + public Rx2Query rx2Plain() { + return build().__internalRx2Plain(); + } + /** * Shorthand for {@link QueryBuilder#build() build()}.{@link Query#__InternalRx()}. */ diff --git a/DaoCore/src/main/java/org/greenrobot/greendao/rx2/Rx2Base.java b/DaoCore/src/main/java/org/greenrobot/greendao/rx2/Rx2Base.java new file mode 100644 index 000000000..bb86678b7 --- /dev/null +++ b/DaoCore/src/main/java/org/greenrobot/greendao/rx2/Rx2Base.java @@ -0,0 +1,57 @@ +package org.greenrobot.greendao.rx2; + +import org.greenrobot.greendao.annotation.apihint.Experimental; +import org.greenrobot.greendao.annotation.apihint.Internal; + +import java.util.concurrent.Callable; + +import io.reactivex.Observable; +import io.reactivex.Scheduler; + +/** + * Created by Zhang Tingkuo. + * Date: 2017-04-28 + * Time: 14:14 + */ +@Internal +class Rx2Base { + + protected final Scheduler mScheduler; + + /** + * No default scheduler. + */ + public Rx2Base() { + mScheduler = null; + } + + /** + * Sets the default scheduler, which is used to configure returned observables with + * {@link Observable#subscribeOn(Scheduler)}. + */ + @Experimental + Rx2Base(Scheduler scheduler) { + mScheduler = scheduler; + } + + /** + * The default scheduler (or null) used for wrapping. + */ + @Experimental + public Scheduler getScheduler() { + return mScheduler; + } + + protected Observable wrap(Callable callable) { + return wrap(Rx2Utils.fromCallable(callable)); + } + + protected Observable wrap(Observable observable) { + if (mScheduler != null) { + return observable.subscribeOn(mScheduler); + } else { + return observable; + } + } + +} diff --git a/DaoCore/src/main/java/org/greenrobot/greendao/rx2/Rx2Dao.java b/DaoCore/src/main/java/org/greenrobot/greendao/rx2/Rx2Dao.java new file mode 100644 index 000000000..be024c33f --- /dev/null +++ b/DaoCore/src/main/java/org/greenrobot/greendao/rx2/Rx2Dao.java @@ -0,0 +1,381 @@ +package org.greenrobot.greendao.rx2; + +import org.greenrobot.greendao.AbstractDao; +import org.greenrobot.greendao.annotation.apihint.Experimental; + +import java.util.List; +import java.util.concurrent.Callable; + +import io.reactivex.Observable; +import io.reactivex.Scheduler; + +/** + * Created by Zhang Tingkuo. + * Date: 2017-04-28 + * Time: 14:28 + */ +@Experimental +public class Rx2Dao extends Rx2Base { + + private final AbstractDao mDao; + + /** + * Creates a new RxDao without a default scheduler. + */ + @Experimental + public Rx2Dao(AbstractDao dao) { + this(dao, null); + } + + /** + * Creates a new RxDao with a default scheduler, which is used to configure returned observables with + * {@link Observable#subscribeOn(Scheduler)}. + */ + @Experimental + public Rx2Dao(AbstractDao dao, Scheduler scheduler) { + super(scheduler); + mDao = dao; + } + + /** + * Rx version of {@link AbstractDao#loadAll()} returning an Observable. + */ + @Experimental + public Observable> loadAll() { + return wrap(new Callable>() { + @Override + public List call() throws Exception { + return mDao.loadAll(); + } + }); + } + + /** + * Rx version of {@link AbstractDao#loadAll()} returning an Observable. + */ + @Experimental + public Observable load(final K key) { + return wrap(new Callable() { + @Override + public T call() throws Exception { + return mDao.load(key); + } + }); + } + + /** + * Rx version of {@link AbstractDao#refresh(Object)} returning an Observable. + * Note that the Observable will emit the given entity back to its subscribers. + */ + @Experimental + public Observable refresh(final T entity) { + return wrap(new Callable() { + @Override + public T call() throws Exception { + mDao.refresh(entity); + return entity; + } + }); + } + + /** + * Rx version of {@link AbstractDao#insert(Object)} returning an Observable. + * Note that the Observable will emit the given entity back to its subscribers. + */ + @Experimental + public Observable insert(final T entity) { + return wrap(new Callable() { + @Override + public T call() throws Exception { + mDao.insert(entity); + return entity; + } + }); + } + + /** + * Rx version of {@link AbstractDao#insertInTx(Iterable)} returning an Observable. + * Note that the Observable will emit the given entities back to its subscribers. + */ + @Experimental + public Observable> insertInTx(final Iterable entities) { + return wrap(new Callable>() { + @Override + public Iterable call() throws Exception { + mDao.insertInTx(entities); + return entities; + } + }); + } + + /** + * Rx version of {@link AbstractDao#insertInTx(Object[])} returning an Observable. + * Note that the Observable will emit the given entities back to its subscribers. + */ + @Experimental + public Observable insertInTx(final T... entities) { + return wrap(new Callable() { + @Override + public Object[] call() throws Exception { + mDao.insertInTx(entities); + return entities; + } + }); + } + + /** + * Rx version of {@link AbstractDao#insertOrReplace(Object)} returning an Observable. + * Note that the Observable will emit the given entity back to its subscribers. + */ + @Experimental + public Observable insertOrReplace(final T entity) { + return wrap(new Callable() { + @Override + public T call() throws Exception { + mDao.insertOrReplace(entity); + return entity; + } + }); + } + + /** + * Rx version of {@link AbstractDao#insertOrReplaceInTx(Iterable)} returning an Observable. + * Note that the Observable will emit the given entities back to its subscribers. + */ + @Experimental + public Observable> insertOrReplaceInTx(final Iterable entities) { + return wrap(new Callable>() { + @Override + public Iterable call() throws Exception { + mDao.insertOrReplaceInTx(entities); + return entities; + } + }); + } + + /** + * Rx version of {@link AbstractDao#insertOrReplaceInTx(Object[])} returning an Observable. + * Note that the Observable will emit the given entities back to its subscribers. + */ + @Experimental + public Observable insertOrReplaceInTx(final T... entities) { + return wrap(new Callable() { + @Override + public Object[] call() throws Exception { + mDao.insertOrReplaceInTx(entities); + return entities; + } + }); + } + + /** + * Rx version of {@link AbstractDao#save(Object)} returning an Observable. + * Note that the Observable will emit the given entity back to its subscribers. + */ + @Experimental + public Observable save(final T entity) { + return wrap(new Callable() { + @Override + public T call() throws Exception { + mDao.save(entity); + return entity; + } + }); + } + + /** + * Rx version of {@link AbstractDao#saveInTx(Iterable)} returning an Observable. + * Note that the Observable will emit the given entities back to its subscribers. + */ + @Experimental + public Observable> saveInTx(final Iterable entities) { + return wrap(new Callable>() { + @Override + public Iterable call() throws Exception { + mDao.saveInTx(entities); + return entities; + } + }); + } + + /** + * Rx version of {@link AbstractDao#saveInTx(Object[])} returning an Observable. + * Note that the Observable will emit the given entities back to its subscribers. + */ + @Experimental + public Observable saveInTx(final T... entities) { + return wrap(new Callable() { + @Override + public Object[] call() throws Exception { + mDao.saveInTx(entities); + return entities; + } + }); + } + + /** + * Rx version of {@link AbstractDao#update(Object)} returning an Observable. + * Note that the Observable will emit the given entity back to its subscribers. + */ + @Experimental + public Observable update(final T entity) { + return wrap(new Callable() { + @Override + public T call() throws Exception { + mDao.update(entity); + return entity; + } + }); + } + + /** + * Rx version of {@link AbstractDao#updateInTx(Iterable)} returning an Observable. + * Note that the Observable will emit the given entities back to its subscribers. + */ + @Experimental + public Observable> updateInTx(final Iterable entities) { + return wrap(new Callable>() { + @Override + public Iterable call() throws Exception { + mDao.updateInTx(entities); + return entities; + } + }); + } + + /** + * Rx version of {@link AbstractDao#updateInTx(Object[])} returning an Observable. + * Note that the Observable will emit the given entities back to its subscribers. + */ + @Experimental + public Observable updateInTx(final T... entities) { + return wrap(new Callable() { + @Override + public Object[] call() throws Exception { + mDao.updateInTx(entities); + return entities; + } + }); + } + + + /** + * Rx version of {@link AbstractDao#delete(Object)} returning an Observable. + */ + @Experimental + public Observable delete(final T entity) { + return wrap(new Callable() { + @Override + public Void call() throws Exception { + mDao.delete(entity); + return Void.TYPE.newInstance(); + } + }); + } + + /** + * Rx version of {@link AbstractDao#deleteByKey(Object)} returning an Observable. + */ + @Experimental + public Observable deleteByKey(final K key) { + return wrap(new Callable() { + @Override + public Void call() throws Exception { + mDao.deleteByKey(key); + return Void.TYPE.newInstance(); + } + }); + } + + /** + * Rx version of {@link AbstractDao#deleteAll()} returning an Observable. + */ + @Experimental + public Observable deleteAll() { + return wrap(new Callable() { + @Override + public Void call() throws Exception { + mDao.deleteAll(); + return Void.TYPE.newInstance(); + } + }); + } + + /** + * Rx version of {@link AbstractDao#deleteInTx(Iterable)} returning an Observable. + */ + @Experimental + public Observable deleteInTx(final Iterable entities) { + return wrap(new Callable() { + @Override + public Void call() throws Exception { + mDao.deleteInTx(entities); + return Void.TYPE.newInstance(); + } + }); + } + + /** + * Rx version of {@link AbstractDao#deleteInTx(Object[])} returning an Observable. + */ + @Experimental + public Observable deleteInTx(final T... entities) { + return wrap(new Callable() { + @Override + public Void call() throws Exception { + mDao.deleteInTx(entities); + return Void.TYPE.newInstance(); + } + }); + } + + /** + * Rx version of {@link AbstractDao#deleteByKeyInTx(Iterable)} returning an Observable. + */ + @Experimental + public Observable deleteByKeyInTx(final Iterable keys) { + return wrap(new Callable() { + @Override + public Void call() throws Exception { + mDao.deleteByKeyInTx(keys); + return Void.TYPE.newInstance(); + } + }); + } + + /** + * Rx version of {@link AbstractDao#deleteByKeyInTx(Object[])} returning an Observable. + */ + @Experimental + public Observable deleteByKeyInTx(final K... keys) { + return wrap(new Callable() { + @Override + public Void call() throws Exception { + mDao.deleteByKeyInTx(keys); + return Void.TYPE.newInstance(); + } + }); + } + + /** + * Rx version of {@link AbstractDao#count()} returning an Observable. + */ + @Experimental + public Observable count() { + return wrap(new Callable() { + @Override + public Long call() throws Exception { + return mDao.count(); + } + }); + } + + /** + * The plain DAO. + */ + @Experimental + public AbstractDao getDao() { + return mDao; + } + +} diff --git a/DaoCore/src/main/java/org/greenrobot/greendao/rx2/Rx2Query.java b/DaoCore/src/main/java/org/greenrobot/greendao/rx2/Rx2Query.java new file mode 100644 index 000000000..28add3388 --- /dev/null +++ b/DaoCore/src/main/java/org/greenrobot/greendao/rx2/Rx2Query.java @@ -0,0 +1,99 @@ +package org.greenrobot.greendao.rx2; + +import org.greenrobot.greendao.annotation.apihint.Experimental; +import org.greenrobot.greendao.query.LazyList; +import org.greenrobot.greendao.query.Query; + +import java.util.List; +import java.util.concurrent.Callable; + +import io.reactivex.Observable; +import io.reactivex.ObservableEmitter; +import io.reactivex.ObservableOnSubscribe; +import io.reactivex.Scheduler; +import io.reactivex.exceptions.Exceptions; + +/** + * Created by Zhang Tingkuo. + * Date: 2017-04-28 + * Time: 14:20 + */ +@Experimental +public class Rx2Query extends Rx2Base { + private final Query mQuery; + + public Rx2Query(Query query) { + mQuery = query; + } + + public Rx2Query(Query query, Scheduler scheduler) { + super(scheduler); + mQuery = query; + } + + /** + * Rx version of {@link Query#list()} returning an Observable. + */ + @Experimental + public Observable> list() { + return wrap(new Callable>() { + @Override + public List call() throws Exception { + return mQuery.forCurrentThread().list(); + } + }); + } + + /** + * Rx version of {@link Query#unique()} returning an Observable. + */ + @Experimental + public Observable unique() { + return wrap(new Callable() { + @Override + public T call() throws Exception { + return mQuery.forCurrentThread().unique(); + } + }); + } + + /** + * Emits the resulting entities one by one, producing them on the fly ("streaming" entities). + * Unlike {@link #list()}, it does not wait for the query to gather all results. Thus, the first entities are + * immediately available as soon the underlying database cursor has data. This approach may be more memory + * efficient for large number of entities (or large entities) at the cost of additional overhead caused by a + * per-entity delivery through Rx. + */ + public Observable oneByOne() { + Observable observable = Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter emitter) throws Exception { + try { + LazyList lazyList = mQuery.forCurrentThread().listLazyUncached(); + try { + for (T entity : lazyList) { + if (emitter.isDisposed()) { + break; + } + emitter.onNext(entity); + } + } finally { + lazyList.close(); + } + if (!emitter.isDisposed()) { + emitter.onComplete(); + } + } catch (Throwable t) { + Exceptions.throwIfFatal(t); + emitter.onError(t); + } + } + }); + return wrap(observable); + } + +// @Experimental +// public Query getQuery() { +// return mQuery; +// } +} diff --git a/DaoCore/src/main/java/org/greenrobot/greendao/rx2/Rx2Transaction.java b/DaoCore/src/main/java/org/greenrobot/greendao/rx2/Rx2Transaction.java new file mode 100644 index 000000000..fb6c0ca8d --- /dev/null +++ b/DaoCore/src/main/java/org/greenrobot/greendao/rx2/Rx2Transaction.java @@ -0,0 +1,63 @@ +package org.greenrobot.greendao.rx2; + +import org.greenrobot.greendao.AbstractDaoSession; +import org.greenrobot.greendao.annotation.apihint.Experimental; + +import java.util.concurrent.Callable; + +import io.reactivex.Observable; +import io.reactivex.Scheduler; + +/** + * Created by Zhang Tingkuo. + * Date: 2017-04-28 + * Time: 14:16 + */ +@Experimental +public class Rx2Transaction extends Rx2Base { + private final AbstractDaoSession mDaoSession; + + public Rx2Transaction(AbstractDaoSession daoSession) { + mDaoSession = daoSession; + } + + public Rx2Transaction(AbstractDaoSession daoSession, Scheduler scheduler) { + super(scheduler); + mDaoSession = daoSession; + } + + /** + * Rx version of {@link AbstractDaoSession#runInTx(Runnable)} returning an Observable. + */ + @Experimental + public Observable run(final Runnable runnable) { + return wrap(new Callable() { + @Override + public Void call() throws Exception { + mDaoSession.runInTx(runnable); + return null; + } + }); + } + + /** + * Rx version of {@link AbstractDaoSession#callInTx(Callable)} returning an Observable. + */ + @Experimental + public Observable call(final Callable callable) { + return wrap(new Callable() { + @Override + public T call() throws Exception { + return mDaoSession.callInTx(callable); + } + }); + } + + // Note: wrapping callInTxNoException does not make sense, because the Exception is handled by Rx anyway. + + + @Experimental + public AbstractDaoSession getDaoSession() { + return mDaoSession; + } +} diff --git a/DaoCore/src/main/java/org/greenrobot/greendao/rx2/Rx2Utils.java b/DaoCore/src/main/java/org/greenrobot/greendao/rx2/Rx2Utils.java new file mode 100644 index 000000000..b05a1a6b3 --- /dev/null +++ b/DaoCore/src/main/java/org/greenrobot/greendao/rx2/Rx2Utils.java @@ -0,0 +1,34 @@ +package org.greenrobot.greendao.rx2; + +import org.greenrobot.greendao.annotation.apihint.Internal; + +import java.util.concurrent.Callable; + +import io.reactivex.Observable; +import io.reactivex.ObservableSource; + +/** + * Created by Zhang Tingkuo. + * Date: 2017-04-28 + * Time: 14:09 + */ + +@Internal +class Rx2Utils { + @Internal + static Observable fromCallable(final Callable callable) { + return Observable.defer(new Callable>() { + + @Override + public ObservableSource call() { + T result; + try { + result = callable.call(); + } catch (Exception e) { + return Observable.error(e); + } + return Observable.just(result); + } + }); + } +}