当前位置:   article > 正文

JetPack之DataStore源码笔记_mutablepreferences.remove

mutablepreferences.remove


上一篇文章分析了SharedPreferences的原理和弊端,这篇文章主要分析DataStore的源码。根据 官方文档的介绍,DataStore主要有一下优势:

  1. 支持键值对和类型化对象的存储;
  2. 数据存储采用protocol buffers,更节省资源;
  3. 使用Kotlin协程以及Flow异步存储数据。

DataStore又分为Preferences DataStore和Proto DataStore,与SharedPreferences的对比如下图所示:
在这里插入图片描述

1. Preferences DataStore

1.1. Preferences DataStore的创建

DataStore的创建代码如下所示:

context.createDataStore(
            name = USER_PREFERENCES_NAME,
            migrations = listOf(SharedPreferencesMigration(context, USER_PREFERENCES_NAME))
        )
//PreferenceDataStoreFactory.kt
fun Context.createDataStore(
    name: String,
    corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null,
    migrations: List<DataMigration<Preferences>> = listOf(),
    scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
): DataStore<Preferences> =
    PreferenceDataStoreFactory.create(
        produceFile = {
            File(this.filesDir, "datastore/$name.preferences_pb")
        },
        corruptionHandler = corruptionHandler,
        migrations = migrations,
        scope = scope
    )
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

USER_PREFERENCES_NAME定义了SharedPreferences和DataStore中的name,SharedPreferencesMigration是用于将SharedPreferences中的内容迁移到DataStore中,scope默认指定为IO线程。进一步调用PreferenceDataStoreFactory类的create()方法来创建DatatStore对象,可知数据存储的文件的路径为:…/datastore/$name.preferences_pb。最终调用DataStoreFactory来创建一个SingleProcessDataStore对象:

//class DataStoreFactory
fun <T> create(
        produceFile: () -> File,
        serializer: Serializer<T>,
        corruptionHandler: ReplaceFileCorruptionHandler<T>? = null,
        migrations: List<DataMigration<T>> = listOf(),
        scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
    ): DataStore<T> =
        SingleProcessDataStore(
            produceFile = produceFile,
            serializer = serializer,
            corruptionHandler = corruptionHandler ?: NoOpCorruptionHandler(),
            initTasksList = listOf(DataMigrationInitializer.getInitializer(migrations)),
            scope = scope
        )
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

这里的initTasksList是一个lambda表达式,等到执行它时再看其逻辑。

1.2. Preferences DataStore的写入

往Preferences DataStore中写入数据的姿势为:

dataStore.edit { settings ->
    val currentCounterValue = settings[EXAMPLE_COUNTER] ?: 0
    settings[EXAMPLE_COUNTER] = currentCounterValue + 1
  }
  • 1
  • 2
  • 3
  • 4

edit()方法有一个lambda参数transform,在该lambda表达式中可以将新值更新到settings中,这里需要关注settings的类型,实际是Preferences类型,最终会调用到SingleProcessDataStore的updateData()方法:

//class SingleProcessDataStore
override suspend fun updateData(transform: suspend (t: T) -> T): T {
    val ack = CompletableDeferred<T>()
    val dataChannel = downstreamChannel()
    val updateMsg = Message.Update<T>(transform, ack, dataChannel)

    actor.send(updateMsg)

    // If no read has succeeded yet, we need to wait on the result of the next read so we can
    // bubble exceptions up to the caller. Read exceptions are not bubbled up through ack.
    //消费掉第一个元素
    if (dataChannel.valueOrNull == null) {
        dataChannel.asFlow().first()
    }

    // Wait with same scope as the actor, so we're not waiting on a cancelled actor.
    return withContext(scope.coroutineContext) { ack.await() }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

这里主要是将transform、ack和dataChannel构造成一个Update消息对象,然后发给名为actor的ChannelCoroutine,然后等待处理结果,actor的处理逻辑为:

private val actor: SendChannel<Message<T>> = scope.actor(
    capacity = UNLIMITED
) {
    try {
        messageConsumer@ for (msg in channel) {
            if (msg.dataChannel.isClosedForSend) {
                // The message was sent with an old, now closed, dataChannel. This means that
                // our read failed.
                continue@messageConsumer
            }
            //1. 从文件中读取所有数据到dataChannel中
            try {
                readAndInitOnce(msg.dataChannel)
            } catch (ex: Throwable) {
                resetDataChannel(ex)
                continue@messageConsumer
            }

            // We have successfully read data and sent it to downstreamChannel.
			//2. 将数据写入到文件中
            if (msg is Message.Update) {
                msg.ack.completeWith(
                    runCatching {
                        transformAndWrite(msg.transform, downstreamChannel())
                    }
                )
            }
        }
    } finally {
        downstreamChannel().cancel()
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

Message消息有两种类型:Update和Read,处理两种消息时都会执行readAndInitOnce()函数从文件中读取所有数据到dataChannel中,只有在处理Update消息时才会执行transformAndWrite()函数将数据写入到文件中。

1.2.1. 迁移SharedPreference过程

readAndInitOnce()函数先读取DataStore文件的数据,然后会将SharedPreference中的数据迁移过来,并合并为最新的数据:

private suspend fun readAndInitOnce(dataChannel: ConflatedBroadcastChannel<T>) {
    if (dataChannel.valueOrNull != null) {
        // If we already have cached data, we don't try to read it again.
        return
    }

    val updateLock = Mutex()
    //1.读取DataStore文件的数据
    var initData = readDataOrHandleCorruption()

    var initializationComplete: Boolean = false

    // TODO(b/151635324): Consider using Context Element to throw an error on re-entrance.
    val api = object : InitializerApi<T> {
        override suspend fun updateData(transform: suspend (t: T) -> T): T {
            return updateLock.withLock() {
                if (initializationComplete) {
                    throw IllegalStateException(
                        "InitializerApi.updateData should not be " +
                                "called after initialization is complete."
                    )
                }

                val newData = transform(initData)
                if (newData != initData) {
                    writeData(newData)
                    initData = newData
                }
                initData
            }
        }
    }
	//2.执行SharedPreference数据迁移
    initTasks?.forEach { it(api) }
    initTasks = null // Init tasks have run successfully, we don't need them anymore.
    updateLock.withLock {
        initializationComplete = true
    }

    dataChannel.offer(initData)
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

这里先调用readDataOrHandleCorruption()函数读取PrototolBuffer文件中的数据,然后执行initTasks链表中的数据,该initTasks即是initTasksList,是在SingleProcessDataStore构造函数中初始化的:

//SingleProcessDataStore构造
initTasksList = listOf(DataMigrationInitializer.getInitializer(migrations))

//class DataMigrationInitializer
fun <T> getInitializer(migrations: List<DataMigration<T>>):
        suspend (api: InitializerApi<T>) -> Unit = { api ->
    runMigrations(migrations, api)
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

从上面代码可知,initTasksList只有一个元素,每个元素是一个suspend lambda表达式,其函数体会以migrations和api为参数执行runMigrations()函数,从前文可知migrations为listOf(SharedPreferencesMigration(context, USER_PREFERENCES_NAME)),api是readAndInitOnce()函数中传入的。执行SharedPreference数据迁移时,即会执行runMigrations()函数:

//class DataMigrationInitializer
private suspend fun <T> runMigrations(
    migrations: List<DataMigration<T>>,
    api: InitializerApi<T>
) {
    val cleanUps = mutableListOf<suspend () -> Unit>()

    api.updateData { startingData ->
        migrations.fold(startingData) { data, migration ->
            if (migration.shouldMigrate(data)) {
                cleanUps.add { migration.cleanUp() }
                migration.migrate(data)
            } else {
                data
            }
        }
    }

    var cleanUpFailure: Throwable? = null

    cleanUps.forEach { cleanUp ->
        try {
            cleanUp()
        } catch (exception: Throwable) {
            if (cleanUpFailure == null) {
                cleanUpFailure = exception
            } else {
                cleanUpFailure!!.addSuppressed(exception)
            }
        }
    }

    // If we encountered a failure on cleanup, throw it.
    cleanUpFailure?.let { throw it }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

这里主要是执行api的updateData()函数来执行数据迁移,回到api初始化的地方,在updateData()函数中会先执行transform(initData)代码块,initData即为从DataStore文件中读出的Preferences类型数据。transform代码块是从runMigrations()函数中传入的,因此startingData即是initData,这里migrations的fold()扩展方法包装了一个for循环,migrations只有一个元素SharedPreferencesMigration。先执行shouldMigrate()函数判断是否需要迁移,然后执行migrate()函数进行迁移:

//class SharedPreferencesMigration
override suspend fun migrate(currentData: T): T =
    migrate(
        SharedPreferencesView(
            sharedPrefs,
            keySet
        ), currentData
    )

//sharedPrefs的定义
private val sharedPrefs: SharedPreferences by lazy {
    context.getSharedPreferences(sharedPreferencesName, Context.MODE_PRIVATE)
}

//keySet的定义
private val keySet: MutableSet<String> by lazy {
    (keysToMigrate ?: sharedPrefs.all.keys).toMutableSet()
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

migrate是以一个lambda表达式,是在SharedPreferencesMigration构造函数中传入的:

SharedPreferencesMigration(
        context = context,
        sharedPreferencesName = sharedPreferencesName,
        keysToMigrate = keysToMigrate,
        deleteEmptyPreferences = deleteEmptyPreferences,
        shouldRunMigration = { prefs ->
            // If any key hasn't been migrated to currentData, we can't skip the migration. If
            // the key set is not specified, we can't skip the migration.
            val allKeys = prefs.asMap().keys.map { it.name }
            keysToMigrate?.any { it !in allKeys } ?: true
        },
        migrate = { sharedPrefs: SharedPreferencesView, currentData: Preferences ->
            // prefs.getAll is already filtered to our key set, but we don't want to overwrite
            // already existing keys.
            val currentKeys = currentData.asMap().keys.map { it.name }

            val filteredSharedPreferences =
                sharedPrefs.getAll().filter { (key, _) -> key !in currentKeys }

            val mutablePreferences = currentData.toMutablePreferences()
            for ((key, value) in filteredSharedPreferences) {
                when (value) {
                    is Boolean -> mutablePreferences[preferencesKey(key)] = value
                    is Float -> mutablePreferences[preferencesKey(key)] = value
                    is Int -> mutablePreferences[preferencesKey(key)] = value
                    is Long -> mutablePreferences[preferencesKey(key)] = value
                    is String -> mutablePreferences[preferencesKey(key)] = value
                    is Set<*> -> {
                        @Suppress("UNCHECKED_CAST")
                        mutablePreferences[preferencesSetKey<String>(key)] = value as Set<String>
                    }
                }
            }

            mutablePreferences.toPreferences()
        })
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

在migrate代码块中,按照key的唯一性先将sharedPrefs中不包含在currentData中的键值对帅选出来存放在filteredSharedPreferences,然后再将filteredSharedPreferences和currentData合并成mutablePreferences并返回。将数据合并后会将SharedPreferences数据清除:

//class SharedPreferencesMigration
override suspend fun cleanUp() {
        val sharedPrefsEditor = sharedPrefs.edit()

        for (key in keySet) {
            sharedPrefsEditor.remove(key)
        }

        if (!sharedPrefsEditor.commit()) {
            throw IOException(
                "Unable to delete migrated keys from SharedPreferences: $sharedPreferencesName"
            )
        }

        if (deleteEmptyPreferences && sharedPrefs.all.isEmpty()) {
            deleteSharedPreferences(context, sharedPreferencesName)
        }

        keySet.clear()
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

代码执行到这里就完成了从SharedPreferences中迁移数据到DataStore的任务并清除了SharedPreferences。回到readAndInitOnce()函数,接下来会判断旧数据与迁移后的新数据是否相同,若不相同则要执行writeData(newData)函数将新数据写入到文件中,这里先跳过写入过程。

1.3. Preferences DataStore的读取

由于DataStore采用Coroutine的Flow来实现的,所以DataStore不提供主动式的读取,这一点和SharedPreferences有区别,DataStore提供了被动式的读取方式,即每写入一个数据,均会执行Flow中collect代码块中的代码,可以在这里对写入的数据作缓存。在SingleProcessDataStore提供一个类型为Flow的data成员变量给外界使用:

//class SingleProcessDataStore
override val data: Flow<T> = flow {
    val curChannel = downstreamChannel()
    actor.offer(Message.Read(curChannel))
    emitAll(curChannel.asFlow())
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

这里利用了Channel转Flow来构造一个流对象,真正承载数据的是curChannel:

private inline fun downstreamChannel(): ConflatedBroadcastChannel<T> {
    return downstreamChannel.get()
}
  • 1
  • 2
  • 3

回忆上一节的数据写入过程,acor中每接收到一个Update消息,均会向downstreamChannel发送一个元素,因此每次写入数据均会向data流中发射一个Preferences元素,Preferences元素存储了所有数据。

2. Proto DataStore

2.1 Proto DataStore的创建

Proto DataStore的创建和Preferences DataStore的创建基本相同,最大不同的是需要提供一个Serializer来定义结构化数据的读写,创建代码如下:

private val userPreferencesStore: DataStore<UserPreferences> = context.createDataStore(
    fileName = DATA_STORE_FILE_NAME,
    serializer = UserPreferencesSerializer,
    migrations = listOf(sharedPrefsMigration)
)

//class DataStoreFactory
fun <T> create(
    produceFile: () -> File,
    serializer: Serializer<T>,
    corruptionHandler: ReplaceFileCorruptionHandler<T>? = null,
    migrations: List<DataMigration<T>> = listOf(),
    scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
): DataStore<T> =
    SingleProcessDataStore(
        produceFile = produceFile,
        serializer = serializer,
        corruptionHandler = corruptionHandler ?: NoOpCorruptionHandler(),
        initTasksList = listOf(DataMigrationInitializer.getInitializer(migrations)),
        scope = scope
    )
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

在创建过程中可以自定义SharedPreferences数据迁移对象,这里的sharedPrefsMigration定义为:

private const val USER_PREFERENCES_NAME = "user_preferences"
private val sharedPrefsMigration = SharedPreferencesMigration(
    context,
    USER_PREFERENCES_NAME
) { sharedPrefs: SharedPreferencesView, currentData: UserPreferences ->
    // Define the mapping from SharedPreferences to UserPreferences
    if (currentData.sortOrder == SortOrder.UNSPECIFIED) {
        currentData.toBuilder().setSortOrder(
            SortOrder.valueOf(
                sharedPrefs.getString(
                    SORT_ORDER_KEY, SortOrder.NONE.name
                )!!
            )
        ).build()
    } else {
        currentData
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

当sortOrder为SortOrder.UNSPECIFIED时,还没有进行数据迁移,需要将该数据添加到currentData中;反之则表明已经迁移了数据,无需做额外工作。

2.2 Proto DataStore的写入

Proto DataStore写入数据的姿势为:

suspend fun updateShowCompleted(completed: Boolean) {
    dataStore.updateData { preferences ->
        preferences.toBuilder().setShowCompleted(completed).build()
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5

自然这里的preferences的类型为UserPreferences,通过与Preferences DataStore的写入方式对比,Proto DataStore的写入方式更直接,直接调用updateData()方法,而Preferences DataStore需要先调用edit()方法,然后再调用updateData()方法。updateData()方法的代码逻辑前文已经分析过了,这里不在累述。

2.3 Proto DataStore的读取

Proto DataStore的读取姿势为:

val userPreferencesFlow: Flow<UserPreferences> = userPreferencesStore.data
    .catch { exception ->
        // dataStore.data throws an IOException when an error is encountered when reading data
        if (exception is IOException) {
            Log.e(TAG, "Error reading sort order preferences.", exception)
            emit(UserPreferences.getDefaultInstance())
        } else {
            throw exception
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

其原理与Preferences DataStore一致。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/309394
推荐阅读
相关标签
  

闽ICP备14008679号