我正在 GCP DataProc 集群上运行 Spark scala 作业。处理数据后,我需要将消息发布到 PubSub 主题,但出现如下错误。
未找到功能渠道服务商。尝试添加依赖项 在 grpc-okhttp、grpc-netty 或 grpc-netty-shaded 工件上
在火花处理之前一切正常。当我向 PubSub 发布消息时,我收到此错误。看代码...
Try {
val topicName = TopicName.of(projectName, pubSubTopicName)
val scope = new ArrayList[String]()
scope.add("https://www.googleapis.com/auth/pubsub")
val googleCredentials = GoogleCredentials
.fromStream(getClass.getResourceAsStream("file path")
.createScoped(scope)
val batchingSettings = BatchingSettings
.newBuilder()
.setElementCountThreshold(elementCountThreshold)
.setRequestByteThreshold(requestByteThreshold)
.setDelayThreshold(delayDuration)
.build()
val publisher = getPublisher(
topicName,
batchingSettings,
googleCredentials
)
val publishedData: MutableList[String] = MutableList()
for (pubMessage <- dataToBePublished) {
val pubSubMessage =
getPubSubMessage(
ByteString.copyFromUtf8(pubMessage)
)
val messageIdFuture = publisher.publish(pubSubMessage)
publishedData.+=(messageIdFuture.get)
}
}
def getPublisher(
topicName: TopicName,
batchingSettings: BatchingSettings,
googleCredentials: GoogleCredentials
): Publisher = {
Publisher
.newBuilder(topicName)
.setCredentialsProvider(
FixedCredentialsProvider.create(googleCredentials)
)
.setBatchingSettings(batchingSettings)
.build()
}
def getPubSubMessage( 数据:字节串 ): 发布订阅消息 = {
PubsubMessage
.newBuilder()
.setData(data)
.build()
}
由于它显示频道错误,我在发布者中尝试了以下更改,但出现了相同的错误
Publisher
.newBuilder(topicName)
.setCredentialsProvider(
FixedCredentialsProvider.create(googleCredentials)
)
.setChannelProvider(
TopicAdminSettings
.defaultGrpcTransportProviderBuilder()
.build()
)
.build()
我也尝试在 sbt 中添加依赖项,但仍然出现相同的错误
"com.google.cloud" % "google-cloud-pubsub" % "1.120.19",
"io.grpc" % "grpc-okhttp" % "1.49.2",
"io.grpc" % "grpc-netty" % "1.49.2"
所有三个建议的依赖项都在库中,但仍然错误。
请帮忙解决这个问题,提前致谢。
所以问题在于由于 pubsub 库而组装 fat jar。
以下是 build.sbt 中所需的更改
"io.grpc" % "grpc-netty" % "1.49.2"
assemblyShadeRules in assembly := Seq(
ShadeRule
.rename("com.google.common.**" -> "repackaged.com.google.common.@1")
.inAll,
ShadeRule
.rename("com.google.protobuf.**" -> "repackaged.com.google.protobuf.@1")
.inAll,
)
assemblyMergeStrategy in assembly := {
case x if Assembly.isConfigFile(x) =>
MergeStrategy.concat
case PathList(ps @ _*) if Assembly.isReadme(ps.last) || Assembly.isLicenseFile(ps.last) =>
MergeStrategy.rename
case PathList("META-INF", xs @ _*) =>
(xs map { _.toLowerCase }) match {
case ("manifest.mf" :: Nil) | ("index.list" :: Nil) | ("dependencies" :: Nil) =>
MergeStrategy.discard
case ps @ (x :: xs) if ps.last.endsWith(".sf") || ps.last.endsWith(".dsa") =>
MergeStrategy.discard
case "plexus" :: xs =>
MergeStrategy.discard
case "services" :: xs =>
MergeStrategy.filterDistinctLines
case ("spring.schemas" :: Nil) | ("spring.handlers" :: Nil) =>
MergeStrategy.filterDistinctLines
case _ => MergeStrategy.first
}
case _ => MergeStrategy.first
}
这将不会出现运行时错误。