赞
踩
DataStore又分为Preferences DataStore和Proto DataStore,与SharedPreferences的对比如下图所示:
DataStore的创建代码如下所示:
context.createDataStore( name = USER_PREFERENCES_NAME, migrations = listOf(SharedPreferencesMigration(context, USER_PREFERENCES_NAME)) ) //PreferenceDataStoreFactory.kt fun Context.createDataStore( name: String, corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null, migrations: List<DataMigration<Preferences>> = listOf(), scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()) ): DataStore<Preferences> = PreferenceDataStoreFactory.create( produceFile = { File(this.filesDir, "datastore/$name.preferences_pb") }, corruptionHandler = corruptionHandler, migrations = migrations, scope = scope )
USER_PREFERENCES_NAME定义了SharedPreferences和DataStore中的name,SharedPreferencesMigration是用于将SharedPreferences中的内容迁移到DataStore中,scope默认指定为IO线程。进一步调用PreferenceDataStoreFactory类的create()方法来创建DatatStore对象,可知数据存储的文件的路径为:…/datastore/$name.preferences_pb。最终调用DataStoreFactory来创建一个SingleProcessDataStore对象:
//class DataStoreFactory
fun <T> create(
produceFile: () -> File,
serializer: Serializer<T>,
corruptionHandler: ReplaceFileCorruptionHandler<T>? = null,
migrations: List<DataMigration<T>> = listOf(),
scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
): DataStore<T> =
SingleProcessDataStore(
produceFile = produceFile,
serializer = serializer,
corruptionHandler = corruptionHandler ?: NoOpCorruptionHandler(),
initTasksList = listOf(DataMigrationInitializer.getInitializer(migrations)),
scope = scope
)
这里的initTasksList是一个lambda表达式,等到执行它时再看其逻辑。
往Preferences DataStore中写入数据的姿势为:
dataStore.edit { settings ->
val currentCounterValue = settings[EXAMPLE_COUNTER] ?: 0
settings[EXAMPLE_COUNTER] = currentCounterValue + 1
}
edit()方法有一个lambda参数transform,在该lambda表达式中可以将新值更新到settings中,这里需要关注settings的类型,实际是Preferences类型,最终会调用到SingleProcessDataStore的updateData()方法:
//class SingleProcessDataStore override suspend fun updateData(transform: suspend (t: T) -> T): T { val ack = CompletableDeferred<T>() val dataChannel = downstreamChannel() val updateMsg = Message.Update<T>(transform, ack, dataChannel) actor.send(updateMsg) // If no read has succeeded yet, we need to wait on the result of the next read so we can // bubble exceptions up to the caller. Read exceptions are not bubbled up through ack. //消费掉第一个元素 if (dataChannel.valueOrNull == null) { dataChannel.asFlow().first() } // Wait with same scope as the actor, so we're not waiting on a cancelled actor. return withContext(scope.coroutineContext) { ack.await() } }
这里主要是将transform、ack和dataChannel构造成一个Update消息对象,然后发给名为actor的ChannelCoroutine,然后等待处理结果,actor的处理逻辑为:
private val actor: SendChannel<Message<T>> = scope.actor( capacity = UNLIMITED ) { try { messageConsumer@ for (msg in channel) { if (msg.dataChannel.isClosedForSend) { // The message was sent with an old, now closed, dataChannel. This means that // our read failed. continue@messageConsumer } //1. 从文件中读取所有数据到dataChannel中 try { readAndInitOnce(msg.dataChannel) } catch (ex: Throwable) { resetDataChannel(ex) continue@messageConsumer } // We have successfully read data and sent it to downstreamChannel. //2. 将数据写入到文件中 if (msg is Message.Update) { msg.ack.completeWith( runCatching { transformAndWrite(msg.transform, downstreamChannel()) } ) } } } finally { downstreamChannel().cancel() } }
Message消息有两种类型:Update和Read,处理两种消息时都会执行readAndInitOnce()函数从文件中读取所有数据到dataChannel中,只有在处理Update消息时才会执行transformAndWrite()函数将数据写入到文件中。
readAndInitOnce()函数先读取DataStore文件的数据,然后会将SharedPreference中的数据迁移过来,并合并为最新的数据:
private suspend fun readAndInitOnce(dataChannel: ConflatedBroadcastChannel<T>) { if (dataChannel.valueOrNull != null) { // If we already have cached data, we don't try to read it again. return } val updateLock = Mutex() //1.读取DataStore文件的数据 var initData = readDataOrHandleCorruption() var initializationComplete: Boolean = false // TODO(b/151635324): Consider using Context Element to throw an error on re-entrance. val api = object : InitializerApi<T> { override suspend fun updateData(transform: suspend (t: T) -> T): T { return updateLock.withLock() { if (initializationComplete) { throw IllegalStateException( "InitializerApi.updateData should not be " + "called after initialization is complete." ) } val newData = transform(initData) if (newData != initData) { writeData(newData) initData = newData } initData } } } //2.执行SharedPreference数据迁移 initTasks?.forEach { it(api) } initTasks = null // Init tasks have run successfully, we don't need them anymore. updateLock.withLock { initializationComplete = true } dataChannel.offer(initData) }
这里先调用readDataOrHandleCorruption()函数读取PrototolBuffer文件中的数据,然后执行initTasks链表中的数据,该initTasks即是initTasksList,是在SingleProcessDataStore构造函数中初始化的:
//SingleProcessDataStore构造
initTasksList = listOf(DataMigrationInitializer.getInitializer(migrations))
//class DataMigrationInitializer
fun <T> getInitializer(migrations: List<DataMigration<T>>):
suspend (api: InitializerApi<T>) -> Unit = { api ->
runMigrations(migrations, api)
}
从上面代码可知,initTasksList只有一个元素,每个元素是一个suspend lambda表达式,其函数体会以migrations和api为参数执行runMigrations()函数,从前文可知migrations为listOf(SharedPreferencesMigration(context, USER_PREFERENCES_NAME)),api是readAndInitOnce()函数中传入的。执行SharedPreference数据迁移时,即会执行runMigrations()函数:
//class DataMigrationInitializer private suspend fun <T> runMigrations( migrations: List<DataMigration<T>>, api: InitializerApi<T> ) { val cleanUps = mutableListOf<suspend () -> Unit>() api.updateData { startingData -> migrations.fold(startingData) { data, migration -> if (migration.shouldMigrate(data)) { cleanUps.add { migration.cleanUp() } migration.migrate(data) } else { data } } } var cleanUpFailure: Throwable? = null cleanUps.forEach { cleanUp -> try { cleanUp() } catch (exception: Throwable) { if (cleanUpFailure == null) { cleanUpFailure = exception } else { cleanUpFailure!!.addSuppressed(exception) } } } // If we encountered a failure on cleanup, throw it. cleanUpFailure?.let { throw it } }
这里主要是执行api的updateData()函数来执行数据迁移,回到api初始化的地方,在updateData()函数中会先执行transform(initData)代码块,initData即为从DataStore文件中读出的Preferences类型数据。transform代码块是从runMigrations()函数中传入的,因此startingData即是initData,这里migrations的fold()扩展方法包装了一个for循环,migrations只有一个元素SharedPreferencesMigration。先执行shouldMigrate()函数判断是否需要迁移,然后执行migrate()函数进行迁移:
//class SharedPreferencesMigration override suspend fun migrate(currentData: T): T = migrate( SharedPreferencesView( sharedPrefs, keySet ), currentData ) //sharedPrefs的定义 private val sharedPrefs: SharedPreferences by lazy { context.getSharedPreferences(sharedPreferencesName, Context.MODE_PRIVATE) } //keySet的定义 private val keySet: MutableSet<String> by lazy { (keysToMigrate ?: sharedPrefs.all.keys).toMutableSet() }
migrate是以一个lambda表达式,是在SharedPreferencesMigration构造函数中传入的:
SharedPreferencesMigration( context = context, sharedPreferencesName = sharedPreferencesName, keysToMigrate = keysToMigrate, deleteEmptyPreferences = deleteEmptyPreferences, shouldRunMigration = { prefs -> // If any key hasn't been migrated to currentData, we can't skip the migration. If // the key set is not specified, we can't skip the migration. val allKeys = prefs.asMap().keys.map { it.name } keysToMigrate?.any { it !in allKeys } ?: true }, migrate = { sharedPrefs: SharedPreferencesView, currentData: Preferences -> // prefs.getAll is already filtered to our key set, but we don't want to overwrite // already existing keys. val currentKeys = currentData.asMap().keys.map { it.name } val filteredSharedPreferences = sharedPrefs.getAll().filter { (key, _) -> key !in currentKeys } val mutablePreferences = currentData.toMutablePreferences() for ((key, value) in filteredSharedPreferences) { when (value) { is Boolean -> mutablePreferences[preferencesKey(key)] = value is Float -> mutablePreferences[preferencesKey(key)] = value is Int -> mutablePreferences[preferencesKey(key)] = value is Long -> mutablePreferences[preferencesKey(key)] = value is String -> mutablePreferences[preferencesKey(key)] = value is Set<*> -> { @Suppress("UNCHECKED_CAST") mutablePreferences[preferencesSetKey<String>(key)] = value as Set<String> } } } mutablePreferences.toPreferences() })
在migrate代码块中,按照key的唯一性先将sharedPrefs中不包含在currentData中的键值对帅选出来存放在filteredSharedPreferences,然后再将filteredSharedPreferences和currentData合并成mutablePreferences并返回。将数据合并后会将SharedPreferences数据清除:
//class SharedPreferencesMigration override suspend fun cleanUp() { val sharedPrefsEditor = sharedPrefs.edit() for (key in keySet) { sharedPrefsEditor.remove(key) } if (!sharedPrefsEditor.commit()) { throw IOException( "Unable to delete migrated keys from SharedPreferences: $sharedPreferencesName" ) } if (deleteEmptyPreferences && sharedPrefs.all.isEmpty()) { deleteSharedPreferences(context, sharedPreferencesName) } keySet.clear() }
代码执行到这里就完成了从SharedPreferences中迁移数据到DataStore的任务并清除了SharedPreferences。回到readAndInitOnce()函数,接下来会判断旧数据与迁移后的新数据是否相同,若不相同则要执行writeData(newData)函数将新数据写入到文件中,这里先跳过写入过程。
由于DataStore采用Coroutine的Flow来实现的,所以DataStore不提供主动式的读取,这一点和SharedPreferences有区别,DataStore提供了被动式的读取方式,即每写入一个数据,均会执行Flow中collect代码块中的代码,可以在这里对写入的数据作缓存。在SingleProcessDataStore提供一个类型为Flow的data成员变量给外界使用:
//class SingleProcessDataStore
override val data: Flow<T> = flow {
val curChannel = downstreamChannel()
actor.offer(Message.Read(curChannel))
emitAll(curChannel.asFlow())
}
这里利用了Channel转Flow来构造一个流对象,真正承载数据的是curChannel:
private inline fun downstreamChannel(): ConflatedBroadcastChannel<T> {
return downstreamChannel.get()
}
回忆上一节的数据写入过程,acor中每接收到一个Update消息,均会向downstreamChannel发送一个元素,因此每次写入数据均会向data流中发射一个Preferences元素,Preferences元素存储了所有数据。
Proto DataStore的创建和Preferences DataStore的创建基本相同,最大不同的是需要提供一个Serializer来定义结构化数据的读写,创建代码如下:
private val userPreferencesStore: DataStore<UserPreferences> = context.createDataStore( fileName = DATA_STORE_FILE_NAME, serializer = UserPreferencesSerializer, migrations = listOf(sharedPrefsMigration) ) //class DataStoreFactory fun <T> create( produceFile: () -> File, serializer: Serializer<T>, corruptionHandler: ReplaceFileCorruptionHandler<T>? = null, migrations: List<DataMigration<T>> = listOf(), scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()) ): DataStore<T> = SingleProcessDataStore( produceFile = produceFile, serializer = serializer, corruptionHandler = corruptionHandler ?: NoOpCorruptionHandler(), initTasksList = listOf(DataMigrationInitializer.getInitializer(migrations)), scope = scope )
在创建过程中可以自定义SharedPreferences数据迁移对象,这里的sharedPrefsMigration定义为:
private const val USER_PREFERENCES_NAME = "user_preferences" private val sharedPrefsMigration = SharedPreferencesMigration( context, USER_PREFERENCES_NAME ) { sharedPrefs: SharedPreferencesView, currentData: UserPreferences -> // Define the mapping from SharedPreferences to UserPreferences if (currentData.sortOrder == SortOrder.UNSPECIFIED) { currentData.toBuilder().setSortOrder( SortOrder.valueOf( sharedPrefs.getString( SORT_ORDER_KEY, SortOrder.NONE.name )!! ) ).build() } else { currentData } }
当sortOrder为SortOrder.UNSPECIFIED时,还没有进行数据迁移,需要将该数据添加到currentData中;反之则表明已经迁移了数据,无需做额外工作。
Proto DataStore写入数据的姿势为:
suspend fun updateShowCompleted(completed: Boolean) {
dataStore.updateData { preferences ->
preferences.toBuilder().setShowCompleted(completed).build()
}
}
自然这里的preferences的类型为UserPreferences,通过与Preferences DataStore的写入方式对比,Proto DataStore的写入方式更直接,直接调用updateData()方法,而Preferences DataStore需要先调用edit()方法,然后再调用updateData()方法。updateData()方法的代码逻辑前文已经分析过了,这里不在累述。
Proto DataStore的读取姿势为:
val userPreferencesFlow: Flow<UserPreferences> = userPreferencesStore.data
.catch { exception ->
// dataStore.data throws an IOException when an error is encountered when reading data
if (exception is IOException) {
Log.e(TAG, "Error reading sort order preferences.", exception)
emit(UserPreferences.getDefaultInstance())
} else {
throw exception
}
}
其原理与Preferences DataStore一致。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。