我遇到以下问题:在我的 android 电视应用程序中,我可以添加(并稍后更新)epg 源,作为 .xml、.gz 或 .xz 文件(.gz 和 .xz 解压缩为 .xml)。因此,用户添加文件的 url,该文件会被下载,然后解析并保存到 objectbox 数据库中。 我尝试了 XmlPullParser 和 Sax-Parser,一切都工作正常,对于一个大约 50mb 和 700.000 行(350 个通道和大约 80.000 个程序)的 xml 来说:
XmlPullParser -> 在模拟器上 50 秒,直接在电视上 1 分 30 秒 Sax-Parser -> 在模拟器上 55 秒,在电视上直接 1 分 50 秒
我希望它能快一点,但没关系。然后我首先意识到,如果我更新 epg 源(再次下载 xml,解析它,并将新的 epgdata 添加到 ob-db)并同时在我的应用程序中导航,
它持续的时间更长(XmlPullParser 和 Sax-Parser 都需要几分钟
应用程序在使用时开始滞后,在我的电视上,它在一段时间后也崩溃了 - 可能是由于内存原因。如果我更新了 epg 源而没有在我的应用程序中执行任何其他操作,则不会发生这种情况。
在“调查”Profiler 时我注意到两件事。
我不确定,但我读到不断调用垃圾收集器可能会导致我的应用程序出现滞后。所以我尝试最小化对象创建,但不知何故它没有改变任何东西(或者也许我没有正确)。我也测试了该过程,但没有为 EpgDataOB 创建数据库对象,因此也没有将 EpgData 添加到数据库中。但我仍然可以在 Profiler 中看到许多垃圾收集器调用,所以我的解析代码应该是问题所在。
唯一对我有帮助的是在每个解析程序后添加 100 毫秒的延迟(从逻辑上讲,这是不可能的解决方案,因为它会增加几个小时的处理时间),或者减少批处理大小(这也会增加处理时间,例如:使用批量大小 500 = 模拟器上的处理时间:2 分钟 10 秒,垃圾收集器在 5 秒内被调用大约 6-10 次,将批量减少到 100 -> 模拟器 = 近 3 分钟,gc 在 5 秒内被调用 4-5 次).
我将发布我的两个版本。
XmlPull解析器
存储库代码:
var currentChannel: Channel? = null
var epgDataBatch = mutableListOf<EpgDataOB>()
val batchSize = 10000
suspend fun parseXmlStream(
inputStream: InputStream,
epgSourceId: Long,
maxDays: Int,
minDays: Int,
sourceUrl: String
): Resource<String> = withContext(Dispatchers.Default) {
try {
val thisEpgSource = epgSourceBox.get(epgSourceId)
val factory = XmlPullParserFactory.newInstance()
val parser = factory.newPullParser()
parser.setInput(inputStream, null)
var eventType = parser.eventType
while (eventType != XmlPullParser.END_DOCUMENT) {
when (eventType) {
XmlPullParser.START_TAG -> {
when (parser.name) {
"channel" -> {
parseChannel(parser, thisEpgSource)
}
"programme" -> {
parseProgram(parser, thisEpgSource)
}
}
}
}
eventType = parser.next()
}
if (epgDataBatch.isNotEmpty()) {
epgDataBox.put(epgDataBatch)
}
assignEpgDataToChannels(thisEpgSource)
_epgProcessState.value = ExternEpgProcessState.Success
Resource.Success("OK")
} catch (e: Exception) {
Log.d("ERROR PARSING", "Error parsing XML: ${e.message}")
_epgProcessState.value = ExternEpgProcessState.Error("Error parsing XML: ${e.message}")
Resource.Error("Error parsing XML: ${e.message}")
} finally {
withContext(Dispatchers.IO) {
inputStream.close()
}
}
}
private fun resetChannel() {
currentChannel = Channel("", mutableListOf(), mutableListOf(), "")
}
private fun parseChannel(parser: XmlPullParser, thisEpgSource: EpgSource) {
resetChannel()
currentChannel?.id = parser.getAttributeValue(null, "id")
while (parser.next() != XmlPullParser.END_TAG) {
if (parser.eventType == XmlPullParser.START_TAG) {
when (parser.name) {
"display-name" -> currentChannel?.displayName = mutableListOf(parser.nextText())
"icon" -> currentChannel?.icon = mutableListOf(parser.getAttributeValue(null, "src"))
"url" -> currentChannel?.url = parser.nextText()
}
}
}
val channelInDB = epgChannelBox.query(EpgSourceChannel_.chEpgId.equal("${thisEpgSource.id}_${currentChannel?.id}")).build().findUnique()
if (channelInDB == null) {
val epgChannelToAdd = EpgSourceChannel(
0,
"${thisEpgSource.id}_${currentChannel?.id}",
currentChannel?.id ?: "",
currentChannel?.icon,
currentChannel?.displayName?.firstOrNull() ?: "",
thisEpgSource.id,
currentChannel?.displayName ?: mutableListOf(),
true
)
epgChannelBox.put(epgChannelToAdd)
} else {
channelInDB.display_name = currentChannel?.displayName ?: channelInDB.display_name
channelInDB.icon = currentChannel?.icon
channelInDB.name = currentChannel?.displayName?.firstOrNull() ?: channelInDB.name
epgChannelBox.put(channelInDB)
}
}
private fun parseProgram(parser: XmlPullParser, thisEpgSource: EpgSource) {
val start = SimpleDateFormat("yyyyMMddHHmmss Z", Locale.getDefault())
.parse(parser.getAttributeValue(null, "start"))?.time ?: -1
val stop = SimpleDateFormat("yyyyMMddHHmmss Z", Locale.getDefault())
.parse(parser.getAttributeValue(null, "stop"))?.time ?: -1
val channel = parser.getAttributeValue(null, "channel")
val isAnUpdate = if (isUpdating) {
epgDataBox.query(EpgDataOB_.idByAccountData.equal("${channel}_${start}_${thisEpgSource.id}")).build().findUnique() != null
} else {
false
}
if (!isAnUpdate) {
val newEpgData = EpgDataOB(
id = 0,
idByAccountData = "${channel}_${start}_${thisEpgSource.id}",
epgId = channel ?: "",
chId = channel ?: "",
datum = SimpleDateFormat("yyyy-MM-dd", Locale.getDefault()).format(start),
name = "",
sub_title = "",
descr = "",
category = null,
director = null,
actor = null,
date = "",
country = null,
showIcon = "",
episode_num = "",
rating = "",
startTimestamp = start,
stopTimestamp = stop,
mark_archive = null,
accountData = thisEpgSource.url,
epgSourceId = thisEpgSource.id.toInt(),
epChId = "${thisEpgSource.id}_${channel}"
)
while (parser.next() != XmlPullParser.END_TAG) {
if (parser.eventType == XmlPullParser.START_TAG) {
when (parser.name) {
"title" -> newEpgData.name = parser.nextText()
"sub-title" -> newEpgData.sub_title = parser.nextText()
"desc" -> newEpgData.descr = parser.nextText()
"director" -> newEpgData.director?.add(parser.nextText())
"actor" -> newEpgData.actor?.add(parser.nextText())
"date" -> newEpgData.date = parser.nextText()
"category" -> newEpgData.category?.add(parser.nextText())
"country" -> newEpgData.country?.add(parser.nextText())
"episode-num" -> newEpgData.episode_num = parser.nextText()
"value" -> newEpgData.rating = parser.nextText()
"icon" -> newEpgData.showIcon = parser.getAttributeValue(null, "src") ?: ""
}
}
}
epgDataBatch.add(newEpgData)
if (epgDataBatch.size >= batchSize) {
epgDataBox.put(epgDataBatch)
epgDataBatch.clear()
}
}
}
private fun assignEpgDataToChannels(thisEpgSource: EpgSource) {
epgChannelBox.query(EpgSourceChannel_.epgSourceId.equal(thisEpgSource.id)).build().find().forEach { epgChannel ->
epgChannel.epgSource.target = thisEpgSource
epgChannel.epgDataList.addAll(epgDataBox.query(EpgDataOB_.epChId.equal(epgChannel.chEpgId)).build().find())
epgChannelBox.put(epgChannel)
}
epgDataBatch.clear()
}
萨克斯解析器
存储库代码:
suspend fun parseXmlStream(
inputStream: InputStream,
epgSourceId: Long,
maxDays: Int,
minDays: Int,
sourceUrl: String
): Resource<String> = withContext(Dispatchers.Default) {
try {
val thisEpgSource = epgSourceBox.get(epgSourceId)
inputStream.use { input ->
val saxParserFactory = SAXParserFactory.newInstance()
val saxParser = saxParserFactory.newSAXParser()
val handler = EpgSaxHandler(thisEpgSource.id, maxDays, minDays, thisEpgSource.url, isUpdating)
saxParser.parse(input, handler)
if (handler.epgDataBatch.isNotEmpty()) {
epgDataBox.put(handler.epgDataBatch)
handler.epgDataBatch.clear()
}
_epgProcessState.value = ExternEpgProcessState.Success
return@withContext Resource.Success("OK")
}
} catch (e: Exception) {
Log.e("ERROR PARSING", "${e.message}")
_epgProcessState.value = ExternEpgProcessState.Error("Error parsing XML: ${e.message}")
return@withContext Resource.Error("Error parsing XML: ${e.message}")
}
}
处理者:
class EpgSaxHandler(
private val epgSourceId: Long,
private val maxDays: Int,
private val minDays: Int,
private val sourceUrl: String,
private val isUpdating: Boolean
) : DefaultHandler() {
private val epgSourceBox: Box<EpgSource>
private val epgChannelBox: Box<EpgSourceChannel>
private val epgDataBox: Box<EpgDataOB>
init {
val store = ObjectBox.store
epgSourceBox = store.boxFor(EpgSource::class.java)
epgChannelBox = store.boxFor(EpgSourceChannel::class.java)
epgDataBox = store.boxFor(EpgDataOB::class.java)
}
var epgDataBatch = mutableListOf<EpgDataOB>()
private val batchSize = 10000
private var currentElement = ""
private var currentChannel: Channel? = null
private var currentProgram: EpgDataOB? = null
private var stringBuilder = StringBuilder()
override fun startElement(uri: String?, localName: String?, qName: String?, attributes: Attributes?) {
currentElement = qName ?: ""
when (qName) {
"channel" -> {
val id = attributes?.getValue("id") ?: ""
currentChannel = Channel(id, mutableListOf(), mutableListOf(), "")
}
"programme" -> {
val start = SimpleDateFormat("yyyyMMddHHmmss Z", Locale.getDefault())
.parse(attributes?.getValue("start") ?: "")?.time ?: -1
val stop = SimpleDateFormat("yyyyMMddHHmmss Z", Locale.getDefault())
.parse(attributes?.getValue("stop") ?: "")?.time ?: -1
val channel = attributes?.getValue("channel") ?: ""
if (isUpdating) {
val existingProgram = epgDataBox.query(EpgDataOB_.idByAccountData.equal("${channel}_${start}_${epgSourceId}",)).build().findUnique()
if (existingProgram != null) {
currentProgram = null
return
}
}
currentProgram = EpgDataOB(
id = 0,
idByAccountData = "${channel}_${start}_${epgSourceId}",
epgId = channel,
chId = channel,
datum = SimpleDateFormat("yyyy-MM-dd", Locale.getDefault()).format(start),
name = "",
sub_title = "",
descr = "",
category = mutableListOf(),
director = mutableListOf(),
actor = mutableListOf(),
date = "",
country = mutableListOf(),
showIcon = "",
episode_num = "",
rating = "",
startTimestamp = start,
stopTimestamp = stop,
mark_archive = null,
accountData = sourceUrl,
epgSourceId = epgSourceId.toInt(),
epChId = "${epgSourceId}_$channel"
)
}
"icon" -> {
val src = attributes?.getValue("src") ?: ""
currentChannel?.icon?.add(src)
currentProgram?.showIcon = src
}
"desc", "title", "sub-title", "episode-num", "rating", "country", "director", "actor", "date", "display-name" -> {
stringBuilder = StringBuilder()
}
}
}
override fun characters(ch: CharArray?, start: Int, length: Int) {
ch?.let {
stringBuilder.append(it, start, length)
}
}
override fun endElement(uri: String?, localName: String?, qName: String?) {
when (qName) {
"channel" -> {
currentChannel?.let { channel ->
val channelInDB = epgChannelBox.query(EpgSourceChannel_.chEpgId.equal("${epgSourceId}_${channel.id}")).build().findUnique()
if (channelInDB == null) {
val newChannel = EpgSourceChannel(
id = 0,
chEpgId = "${epgSourceId}_${channel.id}",
chId = channel.id,
icon = channel.icon,
display_name = channel.displayName,
name = channel.displayName.firstOrNull() ?: "",
epgSourceId = epgSourceId,
isExternalEpg = true
)
epgChannelBox.put(newChannel)
} else {
channelInDB.display_name = channel.displayName
channelInDB.icon = channel.icon
channelInDB.name = channel.displayName.firstOrNull() ?: channelInDB.name
epgChannelBox.put(channelInDB)
}
}
currentChannel = null
}
"programme" -> {
currentProgram?.let { program ->
addEpgDataToBatch(program)
}
currentProgram = null
}
"desc" -> {
currentProgram?.descr = stringBuilder.toString()
}
"title" -> {
currentProgram?.name = stringBuilder.toString()
}
"sub-title" -> {
currentProgram?.sub_title = stringBuilder.toString()
}
"episode-num" -> {
currentProgram?.episode_num = stringBuilder.toString()
}
"rating" -> {
currentProgram?.rating = stringBuilder.toString()
}
"country" -> {
currentProgram?.country?.add(stringBuilder.toString())
}
"director" -> {
currentProgram?.director?.add(stringBuilder.toString())
}
"actor" -> {
currentProgram?.actor?.add(stringBuilder.toString())
}
"date" -> {
currentProgram?.date = stringBuilder.toString()
}
"display-name" -> {
currentChannel?.displayName?.add(stringBuilder.toString())
}
}
currentElement = ""
}
private fun addEpgDataToBatch(epgData: EpgDataOB) {
epgDataBatch.add(epgData)
if (epgDataBatch.size >= batchSize) {
processEpgDataBatch()
}
}
private fun processEpgDataBatch() {
if (epgDataBatch.isNotEmpty()) {
epgDataBox.put(epgDataBatch)
epgDataBatch.clear()
}
}
}
所以我正在寻找一种快速的方法来解析 xml 数据并将其插入数据库,而不会在我的应用程序中出现滞后或崩溃:-) :-) 我的代码中是否存在导致滞后的错误?或者在不减慢解析和数据库插入过程的情况下,这不是很简单吗?
如果需要任何其他代码,我可以发布它。 使用 XmlPullParser 解析程序时,内存分析器如下所示:
更新:
内存使用&gc -> 仅解析,无数据库使用 我使用数据类 Channel & Program 来解析某处的数据,并始终重用相同的通道/程序:
内存使用和 gc -> 解析和创建 EpgDataOB 对象(无数据库插入)
内存使用&gc -> 解析并将数据添加到数据库(db=最后10秒)
内存使用和GC - >解析,将数据添加到数据库并使用EpgData列表管理关系epg通道:
private fun addEpgDataToDatabase() {
GlobalScope.launch {
withContext(Dispatchers.IO) {
epgDataBatch.chunked(15000).forEach { batch ->
epgDataBox.put(batch)
epgChannelBatch.forEach { epgChannel ->
epgChannel.epgDataList.addAll(batch.filter { it.epChId == epgChannel.chEpgId })
}
Log.d("EPGPARSING ADD TO DB", "OK")
delay(500)
}
epgDataBatch.clear()
}
}
}
如果在数据对象后插入已解析的 xml 数据时执行大量数据库插入,则数据库插入的成本可能会很高。来自ObjectBox 文档。
这是因为它使用阻塞 I/O 和文件锁将数据库写入磁盘,因为每个 put 都处于隐式事务中。
因此您可以通过加快数据库插入速度来加快解析速度。
您可以将数据批量放入一个数组中,然后
put
(插入)它们一次全部完成,因此只在一个事务中,这会消耗更多内存,但速度更快。
或者 ObjectBox 确实有 BoxStore.runInTx(),它需要一个 Runnable 在单个事务中执行多个放入操作。
ObjectBox 似乎希望您避免在 xml 解析开始时开始事务并在完成 xml 解析时结束事务。它确实有一个内部低级方法来执行此操作。
请注意,这也适用于其他基于文件的数据库,例如 sqlite。