我使用Flink实现了一个简单的Scala对象来使用连接运算符。之后我把连接运算符显示我的结果我决定按第一个字段对输出进行排序。似乎输出按组排序。输出显示两组“Fyodor Dostoyevsky”。为什么会这样?如何对完整的DataSet进行排序?
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
object JoinBooksAndAuthors {
val AUTHOR_ID_FIELD: Int = 0
val AUTHOR_NAME_FIELD: Int = 1
val BOOK_AUTHORID_FIELD: Int = 0
val BOOK_YEAR_FIELD: Int = 1
val BOOK_NAME_FIELD: Int = 2
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val authors = env.readCsvFile[(Int, String)](
"downloads/authors.tsv",
fieldDelimiter = "\t",
lineDelimiter = "\n",
includedFields = Array(0, 1)
)
val books = env.readCsvFile[(Int, Short, String)](
"downloads/books.tsv",
fieldDelimiter = "\t",
lineDelimiter = "\n",
includedFields = Array(0, 1, 2)
)
authors
.join(books)
.where(AUTHOR_ID_FIELD)
.equalTo(BOOK_AUTHORID_FIELD)
.map(tuple => (tuple._1._2, tuple._2._3))
.sortPartition(0, Order.ASCENDING)
.print()
}
}
产量
(Charles Bukowski,Women)
(Charles Bukowski,The Most Beautiful Woman in Town)
(Charles Bukowski,Hot Water Music)
(Charles Bukowski,Barfly)
(Charles Bukowski,Notes of a Dirty Old Man)
(Charles Bukowski,Ham on Rye)
(Fyodor Dostoyevsky,The Brothers Karamazov)
(Fyodor Dostoyevsky,The Double: A Petersburg Poem)
(Fyodor Dostoyevsky,Poor Folk)
(George Orwell,Coming Up for Air)
(George Orwell,Burmese Days)
(George Orwell,A Clergyman's Daughter)
(George Orwell,Down and Out in Paris and London)
(Albert Camus,The Plague)
(Fyodor Dostoyevsky,The Eternal Husband)
(Fyodor Dostoyevsky,The Gambler)
(Fyodor Dostoyevsky,The House of the Dead)
(Fyodor Dostoyevsky,Crime and Punishment)
(Fyodor Dostoyevsky,Netochka Nezvanova)
.....
@Xingcanc帮助我说使用.partitionByRange(0).sortPartition(0, Order.ASCENDING)
进行全局排序而不是在本地排序