我正在尝试创建一个实时流视频应用程序,它使用 websocket 连接来检索包含 base64 编码视频块数据的 json 对象。 我正在 CustomDataSource 中使用它进行解码,并在 ProgressiveMediaSource 中使用它来让 exoplayer 播放实时流,但是我的 exoplayer 进入缓冲模式或什么!它只是不播放视频。我在日志中也没有看到任何明显的错误。 我不确定数据源或 exoplayer 的配置是否有问题。
下面我提到了我为此创建 POC 所做的所有代码。
应用程序级别build.gradle:
dependencies {
...
//exoplayer
val mediaVersion = "1.1.1"
implementation("androidx.media3:media3-exoplayer:$mediaVersion")
implementation("androidx.media3:media3-ui:$mediaVersion")
implementation("androidx.media3:media3-exoplayer-hls:$mediaVersion")
//http
implementation ("io.reactivex.rxjava2:rxjava:2.2.21")
implementation ("com.squareup.retrofit2:retrofit:2.9.0")
implementation ("com.squareup.retrofit2:converter-gson:2.9.0")
implementation ("com.squareup.okhttp3:okhttp:4.9.3")
implementation ("com.squareup.okhttp3:logging-interceptor:4.9.3")
}
我的布局文件如下:
<?xml version="1.0" encoding="utf-8"?>
<androidx.constraintlayout.widget.ConstraintLayout xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:app="http://schemas.android.com/apk/res-auto"
xmlns:tools="http://schemas.android.com/tools"
android:layout_width="match_parent"
android:layout_height="match_parent"
android:background="#FFFFFF"
android:orientation="vertical">
<androidx.media3.ui.PlayerView
android:id="@+id/video_view"
android:layout_width="match_parent"
android:layout_height="match_parent"
app:use_controller="true"
app:shutter_background_color="@android:color/transparent"
app:layout_constraintStart_toStartOf="parent"
app:layout_constraintTop_toTopOf="parent"
app:layout_constraintEnd_toEndOf="parent"
app:layout_constraintBottom_toBottomOf="parent"/>
</androidx.constraintlayout.widget.ConstraintLayout>
我的 StreamingActivity 课程:
import android.annotation.SuppressLint
import android.app.Activity
import android.os.Build
import android.os.Bundle
import android.util.Log
import androidx.core.view.WindowCompat
import androidx.core.view.WindowInsetsCompat
import androidx.core.view.WindowInsetsControllerCompat
import androidx.media3.common.MediaItem
import androidx.media3.common.MimeTypes
import androidx.media3.common.PlaybackException
import androidx.media3.common.Player
import androidx.media3.common.util.UnstableApi
import androidx.media3.exoplayer.ExoPlayer
import androidx.media3.exoplayer.source.MediaSource
import androidx.media3.exoplayer.source.ProgressiveMediaSource
import androidx.media3.ui.PlayerView
import com.livestreamdemo.R
import com.livestreamdemo.ui.helper.CustomMediaExtractor
import com.livestreamdemo.ui.helper.UtilHelper
import com.livestreamdemo.ui.helper.WssDSFactory
import com.livestreamdemo.ui.helper.WssStreamDataSource
class StreamingActivity : Activity() {
private var player: Player? = null
private var mPlayWhenReady = true
private lateinit var videoView: PlayerView
private lateinit var wsDataSource: WssStreamDataSource
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_streaming)
videoView = findViewById<PlayerView>(R.id.video_view)
}
@androidx.annotation.OptIn(androidx.media3.common.util.UnstableApi::class)
private fun initPlayer() {
player = ExoPlayer.Builder(this).build()
.also { exoPlayer ->
videoView.player = exoPlayer
// Update the track selection parameters to only pick standard definition tracks
exoPlayer.trackSelectionParameters = exoPlayer.trackSelectionParameters
.buildUpon()
.setPreferredVideoMimeType(MimeTypes.VIDEO_MP4)//see if this actually needed
.setMaxVideoSizeSd()
.build()
exoPlayer.playWhenReady = mPlayWhenReady
// Create a Regular media source pointing to a playlist uri.
wsDataSource = WssStreamDataSource()
val factory = ProgressiveMediaSource.Factory(
WssDSFactory(wsDataSource),
CustomMediaExtractor()
)
val mediaItem = MediaItem.Builder().setUri(UtilHelper.wsUrl).setMimeType(
MimeTypes.VIDEO_MP4
).build()
val progressiveMediaSource: MediaSource = factory.createMediaSource(mediaItem)
// Set the media source to be played.
exoPlayer.setMediaSource(progressiveMediaSource)
// Prepare the player.
exoPlayer.addListener(playerCallBacks)
exoPlayer.prepare()
exoPlayer.play()
}
}
public override fun onStart() {
super.onStart()
initPlayer()
}
public override fun onResume() {
super.onResume()
hideSystemUi()
}
public override fun onPause() {
super.onPause()
if (Build.VERSION.SDK_INT <= 23) {
releasePlayer()
}
}
public override fun onStop() {
super.onStop()
if (Build.VERSION.SDK_INT > 23) {
releasePlayer()
}
}
private fun releasePlayer() {
player?.let { player ->
/* playbackPosition = player.currentPosition
mediaItemIndex = player.currentMediaItemIndex
playWhenReady = player.playWhenReady*/
player.removeListener(playerCallBacks)
player.release()
}
player = null
}
@SuppressLint("InlinedApi")
private fun hideSystemUi() {
WindowCompat.setDecorFitsSystemWindows(window, false)
WindowInsetsControllerCompat(window, videoView).let { controller ->
controller.hide(WindowInsetsCompat.Type.systemBars())
controller.systemBarsBehavior =
WindowInsetsControllerCompat.BEHAVIOR_SHOW_TRANSIENT_BARS_BY_SWIPE
}
}
private val playerCallBacks = @UnstableApi object : Player.Listener {
override fun onPlayerError(error: PlaybackException) {
super.onPlayerError(error)
Log.e("playerError", error.message.toString())
}
override fun onPlayerStateChanged(playWhenReady: Boolean, playbackState: Int) {
val stateString: String = when (playbackState) {
ExoPlayer.STATE_IDLE -> "ExoPlayer.STATE_IDLE -"
ExoPlayer.STATE_BUFFERING -> "ExoPlayer.STATE_BUFFERING -"
ExoPlayer.STATE_READY -> "ExoPlayer.STATE_READY -"
ExoPlayer.STATE_ENDED -> "ExoPlayer.STATE_ENDED -"
else -> "UNKNOWN_STATE -"
}
Log.d("onPlayerStateChanged", "changed state to $stateString")
}
}
}
我的 CustomDataSource 类:
import android.net.Uri
import androidx.media3.common.C
import androidx.media3.common.util.UnstableApi
import androidx.media3.datasource.BaseDataSource
import androidx.media3.datasource.DataSource
import androidx.media3.datasource.DataSpec
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.WebSocket
import java.lang.Integer.min
@androidx.annotation.OptIn(androidx.media3.common.util.UnstableApi::class)
class WssStreamDataSource : BaseDataSource(true) {
fun getUrl(): String{
return wsUrl.replace("proto","json")
}
private lateinit var httpClient: OkHttpClient
private lateinit var dataStreamCollector: WssDataStreamCollector
private var webSocketClient: WebSocket? = null
private var currentByteStream: ByteArray? = null
private var currentPosition = 0;
private var remainingBytes = 0;
init {
httpClient=initAndGetHttpClient()
dataStreamCollector= WssDataStreamCollector()
}
override fun open(dataSpec: DataSpec): Long {
// Form the request and open the socket.
// Provide the listener
// which collects the data for us (Previous class).
webSocketClient = httpClient.newWebSocket(
Request.Builder().apply {
dataSpec.httpRequestHeaders.forEach { entry ->
addHeader(entry.key, entry.value)
}
}.url(dataSpec.uri.toString()).build(),
dataStreamCollector)
return C.LENGTH_UNSET.toLong() // Return -1 as the size is unknown (streaming)
}
override fun read(buffer: ByteArray, offset: Int, length: Int): Int {
// return 0 (nothing read) when no data present...
if (currentByteStream == null && !dataStreamCollector.canStream()) {
return 0
}
// parse one (data) ByteString at a time.
// reset the current position and remaining bytes
// for every new data
if (currentByteStream == null) {
currentByteStream = dataStreamCollector.getNextStream().toByteArray()
currentPosition = 0
remainingBytes = currentByteStream?.size ?: 0
}
val readSize = min(length, remainingBytes)
currentByteStream?.copyInto(buffer, offset, currentPosition, currentPosition + readSize)
currentPosition += readSize
remainingBytes -= readSize
// once the data is read set currentByteStream to null
// so the next data would be collected to process in next
// iteration.
if (remainingBytes == 0) {
currentByteStream = null
}
return readSize
}
override fun getUri(): Uri? {
webSocketClient?.request()?.url?.let {
return Uri.parse(it.toString())
}
return null
}
override fun close() {
// close the socket and release the resources
closeWebsocketConnection()
}
private fun closeWebsocketConnection() {
// webSocketClient?.close(1000, "Closing connection")
webSocketClient?.cancel()
}
private fun initAndGetHttpClient() : OkHttpClient{
val builder = OkHttpClient.Builder()
return builder.build();
}
}
@UnstableApi // Factory class for DataSource
class WssDSFactory(private val dataSource: WssStreamDataSource) : DataSource.Factory {
override fun createDataSource(): DataSource = dataSource
}
Websocket监听器实现类
import android.os.Build
import android.util.Log
import androidx.annotation.RequiresApi
import com.livestreamdemo.ui.models.StreamResponse
import com.google.gson.Gson
import okhttp3.Response
import okhttp3.WebSocket
import okhttp3.WebSocketListener
import okio.ByteString
import java.util.concurrent.ConcurrentSkipListSet
class WssDataStreamCollector : WebSocketListener() {
companion object{
private const val TAG="WssDataStreamCollector"
}
private val wssData = ConcurrentSkipListSet<ByteString>()
override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
Log.e(TAG, "onMessage: byteString")
wssData.add(bytes)
}
@RequiresApi(Build.VERSION_CODES.O)
override fun onMessage(webSocket: WebSocket, text: String) {
val response: StreamResponse?=Gson().fromJson<StreamResponse>(text, StreamResponse::class.java)
response?.let {
val videoChunk: ByteString= UtilHelper.base64StringToByteString(it.dataChunk)!!//todo check add null safety
wssData.add(videoChunk)
Log.e(TAG, "video_chunk receiver: ${it.timestamp}")
}
}
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
super.onClosing(webSocket, code, reason)
wssData.removeAll(wssData)
Log.e("WssDataStreamCollector","connection Closed")
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
Log.e("onFailure","reason ${response.toString()}")
Log.e("onFailure","errMsg ${t.message.toString()}")
}
fun canStream(): Boolean {
return wssData.size > 0
}
fun getNextStream(): ByteString {
return wssData.pollFirst()
}
}
自定义媒体提取器:
@UnstableApi
internal class CustomMediaExtractor : ExtractorsFactory {
override fun createExtractors(): Array<Extractor> {
return arrayOf(FragmentedMp4Extractor())
}
}
助手类:
import android.os.Build
import androidx.annotation.RequiresApi
import okio.ByteString
import java.text.SimpleDateFormat
import java.util.Base64
import java.util.Locale
class UtilHelper {
companion object {
const val wsUrl =
"wss://some_webstream_url"
val ddMMYYYhhmmssa = SimpleDateFormat("dd-MM-yyyy hh:mm:ss a", Locale.US)
@RequiresApi(Build.VERSION_CODES.O)
fun base64ToArrayBuffer(base64: String): ByteString {
val decodedBytes = Base64.getDecoder().decode(base64)
return ByteString.of(*decodedBytes)
}
@RequiresApi(Build.VERSION_CODES.O)
fun base64StringToByteString(base64String: String): ByteString? {
return base64ToArrayBuffer(base64String)
}
}
}
我的模型响应类:
data class StreamResponse2(
@SerializedName("data")
val dataChunk: String,
@SerializedName("timestamp")
val timestamp: Long,
)
请点击此链接查看日志:打开此
由于这是相当神秘的,这里有一些有用的提示,我会尝试自己解决这个问题。
1。处理读取错误: 在 WssStreamDataSource 中,您正在使用 read 方法从 WebSocket 读取数据。如果读取数据时出现错误,可能会导致缓冲问题。您应该处理 WebSocket 读取期间潜在的异常或错误,并确保 WebSocket 连接不会过早关闭。
2。缓冲阈值: ExoPlayer 的缓冲行为可以通过调整缓冲区大小和缓冲持续时间设置来控制。您可以通过修改 initPlayer 函数中的 TrackSelectionParameters 来设置自定义缓冲区大小:
.buildUpon()
.setMaxVideoSizeSd()
.setBufferMinPlaybackMs(/* your desired buffer duration in milliseconds */)
.setBufferForPlaybackMs(/* your desired buffer size in milliseconds */)
.build()
调整这些值以查看是否可以改善缓冲行为。
3.线程问题: 确保您的 WebSocket 交互和数据处理不会导致线程问题。 WebSocket 交互应该在后台线程上完成,ExoPlayer 实例应该在主线程上管理。
4。 ExoPlayer 调试: ExoPlayer 提供了一个调试覆盖层,可以帮助您诊断播放问题。您可以通过将以下代码添加到 initPlayer 函数来启用它:
debugTextViewHelper.start()
5。最后但并非最不重要的;在不同设备上进行测试: 有时,缓冲问题可能是特定于设备的。在多个 Android 设备上测试您的应用程序,看看问题是否在不同的硬件上仍然存在。
请告诉我这些步骤是否可以帮助您解决问题。我相当有信心这要么是缺乏处理读取错误,要么是缓冲阈值配置不当。