首页 » Android » Rxjava2_Flowable_Sqlite_Android数据库访问

Rxjava2_Flowable_Sqlite_Android数据库访问

原文 http://blog.csdn.net/robert_cysy/article/details/79256540

2018-02-06 02:00:21阅读(651)

一、使用Rxjava访问数据库的优点: 1.随意的线程控制,数据库操作在一个线程,返回数据处理在ui线程
2.随时订阅和取消订阅,而不必再使用回调函数
3.对读取的数据用rxjava进行过滤,流式处理
4.使用sqlbrite可以原生返回rxjava的格式,同时是响应式数据库框架
(有数据添加和更新时自动调用之前订阅了的读取函数,达到有数据添加自动更新ui的效果,
同时这个特性没有禁止的方法,只能通过取消订阅停止这个功能,对于有的框架这反而是一种累赘)

二、接下来之关注实现过程: 本次实现用Rxjava2Flowable,有被压支持(在不需要被压支持的情况建议使用Observable)
实现一个稳健的的可灵活切换其他数据库的结构,当然是先定义数据库访问接口。然后跟具不同的数据库实现接口的方法

定义接口:(对于update,delete,insert,可以选择void类型,来简化调用代码,但缺少了执行结果判断)

public interface DbSource {
        //String sql = "insert into table_task (tid,startts) values(tid,startts)";
        Flowable<Boolean> insertNewTask(int tid, int startts);
        //String sql = "select * from table_task";
        Flowable<List<TaskItem>> getAllTask();
        //String sql = "select * from table_task where endts = 0";
        Flowable<Optional<TaskItem>> getRunningTask();
        //String sql = "update table_task set isuploadend=isuploadend where tid=tid";
        Flowable<Boolean> markUploadEnd(int tid, boolean isuploadend);
        //String sql = "delete from table_task where tid=tid and endts>0";
        Flowable<Boolean> deleteTask(int tid);
}

三、用Android原生的Sqlite实现数据库操作

public class SimpleDb implements DbSource {
    private static SimpleDb sqlite;
    private SqliteHelper sqliteHelper;
    private SimpleDb(Context context) {
        this.sqliteHelper = new SqliteHelper(context);
    }
    public static synchronized SimpleDb getInstance(Context context) {
        if (sqlite == null )
            sqlite = new SimpleDb(context);
        return sqlite;
    }
    Flowable<Boolean> insertNewTask(int tid, int startts) {
        return Flowable.create(new FlowableOnSubscribe<Boolean>() {
            @Override
            public void subscribe(FlowableEmitter<Boolean> e) throws Exception {
                //这里数据库操作只做示例代码,主要关注rxjava的Flowable使用方法
                ContentValues values = new ContentValues();
                values.put(“tid”, 1);
                values.put(“startts”,13233);
                if(sqliteHelper.getWriteableDatabase().insert(TABLE_NAME, null, values) != -1)
                    e.onNext(true);
                else 
                    e.onNext(false);
                e.onComplete();
            }
        }, BackpressureStrategy.BUFFER);
    }
    Flowable<List<TaskItem>> getAllTask() {
        return Flowable.create(new FlowableOnSubscribe<List<TaskItem>>() {
            @Override
            public void subscribe(FlowableEmitter<List<TaskItem>> e) throws Exception {
                List<TaskItem> taskList = new ArrayList<>();
                StringBuilder sql = new StringBuilder(100);
                sql.append("select * from ");
                sql.append(SqliteHelper.TABLE_NAME_TASK);
                SQLiteDatabase sqLiteDatabase = sqliteHelper.getReadableDatabase();
                Cursor cursor = sqLiteDatabase.rawQuery(sql.toString(), null);
                if (cursor.moveToFirst()) {
                    int count = cursor.getCount();
                    for (int a = 0; a < count; a ++) {
                        TaskItem item = new TaskItem();
                        item.setTid(cursor.getInt(1));
                        item.setStartts(cursor.getInt(2));
                        item.setEndts(cursor.getInt(3));
                        taskList.add(item);
                        cursor.move(1);
                    }
                }
                cursor.close();
                sqLiteDatabase.close();
                e.onNext(taskList);
                e.onComplete();
            }
        }, BackpressureStrategy.BUFFER);
    }
      
    Flowable<Optional<TaskItem>> getRunningTask() {
        return Flowable.create(new FlowableOnSubscribe<Optional<TaskItem>>() {
            @Override
            public void subscribe(FlowableEmitter<Optional<TaskItem>> e) throws Exception {
                TaskItem item = null;
                StringBuilder sql = new StringBuilder(100);
                sql.append("select * from ");
                sql.append(SqliteHelper.TABLE_NAME_TASK);
                sql.append(" where endts=0 limit 1");
                SQLiteDatabase sqLiteDatabase = sqliteHelper.getReadableDatabase();
                Cursor cursor = sqLiteDatabase.rawQuery(sql.toString(), null);
                if (cursor.moveToFirst()) {
                    int count = cursor.getCount();
                    if (count == 1) {
                        item = new TaskItem();
                        item.setId(cursor.getInt(0));
                        item.setTid(cursor.getInt(1));
                        item.setStartts(cursor.getInt(2));
                        item.setEndts(cursor.getInt(3));
                    }
                }
                cursor.close();
                sqLiteDatabase.close();
                e.onNext(Optional.fromNullable(item)); //import com.google.common.base.Optional;//安全检查,待会看调用的代码,配合rxjava很好
                e.onComplete();
            }
        }, BackpressureStrategy.BUFFER);
    }
    
    Flowable<Boolean> markUploadEnd(int tid, boolean isuploadend) {
            return Flowable.create(new FlowableOnSubscribe<Boolean>() {
            @Override
            public void subscribe(FlowableEmitter<Boolean> e) throws Exception {
                //这里数据库操作只做示例代码,主要关注rxjava的Flowable使用方法
                //数据库操作代码
                e.onNext(false);//返回结果
                e.onComplete();//返回结束
            }
        }, BackpressureStrategy.BUFFER);
    }
        
    Flowable<Boolean> deleteTask(int tid) {
        return Flowable.create(new FlowableOnSubscribe<Boolean>() {
            @Override
            public void subscribe(FlowableEmitter<Boolean> e) throws Exception {
                //这里数据库操作只做示例代码,主要关注rxjava的Flowable使用方法
                //数据库操作代码
                e.onNext(false);//返回结果
                e.onComplete();//返回结束
            }
        }, BackpressureStrategy.BUFFER);
    }
}

四、同一个接口使用sqlbrite的实现方式

public class BriteDb implements DbSource {
    @NonNull
    protected final BriteDatabase mDatabaseHelper;
    @NonNull
    private Function<Cursor, TaskItem> mTaskMapperFunction;
    @NonNull
    private Function<Cursor, PoiItem> mPoiMapperFunction;
    @NonNull
    private Function<Cursor, InterestPoiItem> mInterestPoiMapperFunction;
    // Prevent direct instantiation.
    private BriteDb(@NonNull Context context) {
        DbHelper dbHelper = new DbHelper(context);
        SqlBrite sqlBrite = new SqlBrite.Builder().build();
        mDatabaseHelper = sqlBrite.wrapDatabaseHelper(dbHelper, Schedulers.io();
        mTaskMapperFunction = this::getTask;
        mPoiMapperFunction = this::getPoi;
        mInterestPoiMapperFunction = this::getInterestPoi;
    }
    @Nullable
    private static BriteDb INSTANCE;
    public static BriteDb getInstance(@NonNull Context context) {
        if (INSTANCE == null) {
            INSTANCE = new BriteDb(context);
        }
        return INSTANCE;
    }
    @NonNull
    private TaskItem getTask(@NonNull Cursor c) {
        TaskItem item = new TaskItem();
        item.setId(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_ID)));
        item.setTid(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_TID)));
        item.setStartts(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_STARTTS)));
        item.setEndts(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_ENDTS)));
        return item;
    }
    
    @Override
    public void insertNewTask(int tid, int startts) {
        ContentValues values = new ContentValues();
        values.put(PersistenceContract.TaskEntry.COLUMN_TASK_TID, tid);
        values.put(PersistenceContract.TaskEntry.COLUMN_TASK_STARTTS, startts);
        mDatabaseHelper.insert(PersistenceContract.TaskEntry.TABLE_NAME_TASK, values, SQLiteDatabase.CONFLICT_REPLACE);
    }
    @Override
    public Flowable<List<TaskItem>> getAllTask() {
        String sql = String.format("SELECT * FROM %s", PersistenceContract.TaskEntry.TABLE_NAME_TASK);//TABLE_NAME_TASK表的名字字符串
        return mDatabaseHelper.createQuery(PersistenceContract.TaskEntry.TABLE_NAME_TASK, sql)
                .mapToList(mTaskMapperFunction)
                .toFlowable(BackpressureStrategy.BUFFER);
    }
    @Override
    public Flowable<Optional<TaskItem>> getRunningTask() {
        String sql = String.format("SELECT * FROM %s WHERE %s = ? limit 1",
                PersistenceContract.TaskEntry.TABLE_NAME_TASK, PersistenceContract.TaskEntry.COLUMN_TASK_ENDTS);
        return mDatabaseHelper.createQuery(PersistenceContract.TaskEntry.TABLE_NAME_TASK, sql, "0")
                .mapToOne(cursor -> Optional.fromNullable(mTaskMapperFunction.apply(cursor)))
                .toFlowable(BackpressureStrategy.BUFFER);
    }
    @Override
    public Flowable<Boolean> markUploadEnd(int tid, boolean isuploadend) {
        return Flowable.create(new FlowableOnSubscribe<Boolean>() {
            @Override
            public void subscribe(FlowableEmitter<Boolean> e) throws Exception {
                    ContentValues values = new ContentValues();
                    if(isuploadend) {
                        values.put(PersistenceContract.TaskEntry.COLUMN_TASK_ISUPLOADEND, 1);
                    } else {
                        values.put(PersistenceContract.TaskEntry.COLUMN_TASK_ISUPLOADEND, 0);
                    }
                    String selection = PersistenceContract.TaskEntry.COLUMN_TASK_TID + " = ?";
                    //String[] selectionArgs = {String.valueOf(tid)};
                    String selectionArgs = String.valueOf(tid);
                    int res = mDatabaseHelper.update(PersistenceContract.TaskEntry.TABLE_NAME_TASK, values, selection, selectionArgs);
                    if (res > 0) {
                        e.onNext(true);//返回结果
                    } else {
                            e.onNext(false);//返回结果
                    }
                    e.onComplete();//返回结束
            }
        }, BackpressureStrategy.BUFFER);
    }
    @Override
    public Flowable<Boolean> deleteTask(int tid) {
        return Flowable.create(new FlowableOnSubscribe<Boolean>() {
            @Override
            public void subscribe(FlowableEmitter<Boolean> e) throws Exception {
                    String selection = PersistenceContract.TaskEntry.COLUMN_TASK_TID + " = ? AND "+
                                                    PersistenceContract.TaskEntry.COLUMN_TASK_ENDTS + " > 0";
                    String[] selectionArgs = new String[1];
                    selectionArgs[0] = String.valueOf(tid);
                    int res = mDatabaseHelper.delete(PersistenceContract.TaskEntry.TABLE_NAME_TASK, selection, selectionArgs);
                    if (res > 0) {
                        e.onNext(true);//返回结果
                    } else {
                            e.onNext(false);//返回结果
                    }
                    e.onComplete();//返回结束
            }
        }, BackpressureStrategy.BUFFER);
    }
}

五、数据库调用使用方法

使用了lambda简化了表达式进一步简化代码:
简化方法:在/app/build.gradle里面加入如下内容:(defaultConfig的外面)

    compileOptions {
        sourceCompatibility JavaVersion.VERSION_1_8
        targetCompatibility JavaVersion.VERSION_1_8
    }
接口调用(获得数据库实例):
//全局定义的实例获取类,以后想要换数据库,只需在这个类里切换即可
public class Injection { 
    public static DbSource getDbSource(Context context) {
        //choose one of them
        //return BriteDb.getInstance(context);
        return SimpleDb.getInstance(context);
    }
}
DbSource db = Injection.getInstance(mContext);
disposable1 = db.getAllTask()
                            .flatMap(Flowable::fromIterable)
                            .filter(task -> {                   //自定义过滤
                                 if (!task.getIsuploadend()) {
                                     return true;
                                 } else {
                                     return false;
                                 }
                            })
                            .subscribe(taskItems ->    //这里是使用了lambda简化了表达式
                                doTaskProcess(taskItems)
                            , throwable -> {
                                throwable.printStackTrace();
                            },// onCompleted
                            () -> {
                                if (disposable1 != null && !disposable1.isDisposed()) {
                                    disposable1.dispose();
                                }
                            });
 disposable1 = db.getRunningTask()
                .filter(Optional::isPresent)  //判断是否为空,为空的就跳过
                .map(Optional::get)             //获取到真的参数
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(taskItem -> {                    //onNext()
                            //has running task
                            mTid = taskItem.getTid();
                }, throwable -> throwable.printStackTrace() //onError()
                , () -> disposable1.dispose());             //onComplete()
disposable1 =  db.markUploadEnd(tid, isuploadend)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(status -> {                    //onNext()
                            if (status) {
                                //dosomething
                            }
                }, throwable -> throwable.printStackTrace() //onError()
                , () -> disposable1.dispose());             //onComplete()
disposable1 =  db.deleteTask(tid)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(status -> {                    //onNext()
                            if (status) {
                                //dosomething
                            }
                }, throwable -> throwable.printStackTrace() //onError()
                , () -> disposable1.dispose());             //onComplete()



最新发布

CentOS专题

关于本站

5ibc.net旗下博客站精品博文小部分原创、大部分从互联网收集整理。尊重作者版权、传播精品博文,让更多编程爱好者知晓!

小提示

按 Ctrl+D 键,
把本文加入收藏夹