未找到org.apache.flink.api.common.serialization.DeserializationSchema的类文件

问题描述 投票:0回答:2

我正在尝试开发一个flink流工作。这份工作应该从kafka主题中读取。

我试图在https://github.com/dataArtisans/kafka-example/blob/master/src/main/java/com/dataartisans/ReadFromKafka.java更新这个例子

我想使用Flink 1.4和Kafka 0.11。

当我尝试构建(maven)项目时,我收到以下错误:

[ERROR] /quickstart/src/main/java/org/myorg/quickstart/StreamingJob.java:[20,66] cannot access org.apache.flink.api.common.serialization.DeserializationSchema
  class file for org.apache.flink.api.common.serialization.DeserializationSchema not found
[INFO] 1 error
...
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project quickstart: Compilation failure
[ERROR] /quickstart/src/main/java/org/myorg/quickstart/StreamingJob.java:[20,66] cannot access org.apache.flink.api.common.serialization.DeserializationSchema
[ERROR] class file for org.apache.flink.api.common.serialization.DeserializationSchema not found

有什么想法如何解决这个错误?到目前为止,我无法找到解决方案。

streaming job.Java

package org.myorg.quickstart;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;

public class StreamingJob {

    public static void main(String[] args) throws Exception {
        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // parse user parameters
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer08<String>(parameterTool.getRequired("topic"), (KeyedDeserializationSchema) new JSONKeyValueDeserializationSchema(true), parameterTool.getProperties()));

        // print() will write the contents of the stream to the TaskManager's standard out stream
        // the rebelance call is causing a repartitioning of the data so that all machines
        // see the messages (for example in cases when "num kafka partitions" < "num flink operators"
        messageStream.rebalance().map(new MapFunction<String, String>() {
            private static final long serialVersionUID = -6867736771747690202L;

            @Override
            public String map(String value) throws Exception {
                return "Kafka and Flink says: " + value;
            }
        }).print();

        // execute program
        env.execute("Flink Streaming Java API Skeleton");
    }
}

pom.hml

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.myorg.quickstart</groupId>
    <artifactId>quickstart</artifactId>
    <version>0.1</version>
    <packaging>jar</packaging>

    <name>Flink Quickstart Job</name>
    <url>http://www.myorganization.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.3.0</flink.version>
        <slf4j.version>1.7.7</slf4j.version>
        <log4j.version>1.2.17</log4j.version>
        <scala.binary.version>2.10</scala.binary.version>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <!-- 

        Execute "mvn clean package -Pbuild-jar"
        to build a jar file out of this project!

        How to use the Flink Quickstart pom:

        a) Adding new dependencies:
            You can add dependencies to the list below.
            Please check if the maven-shade-plugin below is filtering out your dependency
            and remove the exclude from there.

        b) Build a jar for running on the cluster:
            There are two options for creating a jar from this project

            b.1) "mvn clean package" -> this will create a fat jar which contains all
                    dependencies necessary for running the jar created by this pom in a cluster.
                    The "maven-shade-plugin" excludes everything that is provided on a running Flink cluster.

            b.2) "mvn clean package -Pbuild-jar" -> This will also create a fat-jar, but with much
                    nicer dependency exclusion handling. This approach is preferred and leads to
                    much cleaner jar files.
    -->

    <dependencies>
        <!-- Apache Flink dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.10</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.10</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- explicitly add a standard loggin framework, as Flink does not have
            a hard dependency on one specific framework by default -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
            <version>1.4-SNAPSHOT</version>
        </dependency>
    </dependencies>

    <profiles>
        <profile>
            <!-- Profile for packaging correct JAR files -->
            <id>build-jar</id>

            <activation>
                <activeByDefault>false</activeByDefault>
            </activation>

            <dependencies>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-java</artifactId>
                    <version>${flink.version}</version>
                    <scope>provided</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-streaming-java_2.10</artifactId>
                    <version>${flink.version}</version>
                    <scope>provided</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-clients_2.10</artifactId>
                    <version>${flink.version}</version>
                    <scope>provided</scope>
                </dependency>
                <dependency>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                    <version>${slf4j.version}</version>
                    <scope>provided</scope>
                </dependency>
                <dependency>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                    <version>${log4j.version}</version>
                    <scope>provided</scope>
                </dependency>
            </dependencies>

            <build>
                <plugins>
                    <!-- disable the exclusion rules -->
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-shade-plugin</artifactId>
                        <version>2.4.1</version>
                        <executions>
                            <execution>
                                <phase>package</phase>
                                <goals>
                                    <goal>shade</goal>
                                </goals>
                                <configuration>
                                    <artifactSet>
                                        <excludes combine.self="override"></excludes>
                                    </artifactSet>
                                </configuration>
                            </execution>
                        </executions>
                    </plugin>
                </plugins>
            </build>
        </profile>
    </profiles>

    <build>
        <plugins>
            <!-- We use the maven-shade plugin to create a fat jar that contains all dependencies
            except flink and its transitive dependencies. The resulting fat-jar can be executed
            on a cluster. Change the value of Program-Class if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.1</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <!-- This list contains all dependencies of flink-dist
                                    Everything else will be packaged into the fat-jar
                                    -->
                                    <exclude>org.apache.flink:flink-annotations</exclude>
                                    <exclude>org.apache.flink:flink-shaded-hadoop2</exclude>
                                    <exclude>org.apache.flink:flink-shaded-curator-recipes</exclude>
                                    <exclude>org.apache.flink:flink-core</exclude>
                                    <exclude>org.apache.flink:flink-java</exclude>
                                    <exclude>org.apache.flink:flink-scala_2.10</exclude>
                                    <exclude>org.apache.flink:flink-runtime_2.10</exclude>
                                    <exclude>org.apache.flink:flink-optimizer_2.10</exclude>
                                    <exclude>org.apache.flink:flink-clients_2.10</exclude>
                                    <exclude>org.apache.flink:flink-avro_2.10</exclude>
                                    <exclude>org.apache.flink:flink-examples-batch_2.10</exclude>
                                    <exclude>org.apache.flink:flink-examples-streaming_2.10</exclude>
                                    <exclude>org.apache.flink:flink-streaming-java_2.10</exclude>
                                    <exclude>org.apache.flink:flink-streaming-scala_2.10</exclude>
                                    <exclude>org.apache.flink:flink-scala-shell_2.10</exclude>
                                    <exclude>org.apache.flink:flink-python</exclude>
                                    <exclude>org.apache.flink:flink-metrics-core</exclude>
                                    <exclude>org.apache.flink:flink-metrics-jmx</exclude>
                                    <exclude>org.apache.flink:flink-statebackend-rocksdb_2.10</exclude>

                                    <!-- Also exclude very big transitive dependencies of Flink

                                    WARNING: You have to remove these excludes if your code relies on other
                                    versions of these dependencies.

                                    -->

                                    <exclude>log4j:log4j</exclude>
                                    <exclude>org.scala-lang:scala-library</exclude>
                                    <exclude>org.scala-lang:scala-compiler</exclude>
                                    <exclude>org.scala-lang:scala-reflect</exclude>
                                    <exclude>com.data-artisans:flakka-actor_*</exclude>
                                    <exclude>com.data-artisans:flakka-remote_*</exclude>
                                    <exclude>com.data-artisans:flakka-slf4j_*</exclude>
                                    <exclude>io.netty:netty-all</exclude>
                                    <exclude>io.netty:netty</exclude>
                                    <exclude>commons-fileupload:commons-fileupload</exclude>
                                    <exclude>org.apache.avro:avro</exclude>
                                    <exclude>commons-collections:commons-collections</exclude>
                                    <exclude>org.codehaus.jackson:jackson-core-asl</exclude>
                                    <exclude>org.codehaus.jackson:jackson-mapper-asl</exclude>
                                    <exclude>com.thoughtworks.paranamer:paranamer</exclude>
                                    <exclude>org.xerial.snappy:snappy-java</exclude>
                                    <exclude>org.apache.commons:commons-compress</exclude>
                                    <exclude>org.tukaani:xz</exclude>
                                    <exclude>com.esotericsoftware.kryo:kryo</exclude>
                                    <exclude>com.esotericsoftware.minlog:minlog</exclude>
                                    <exclude>org.objenesis:objenesis</exclude>
                                    <exclude>com.twitter:chill_*</exclude>
                                    <exclude>com.twitter:chill-java</exclude>
                                    <exclude>commons-lang:commons-lang</exclude>
                                    <exclude>junit:junit</exclude>
                                    <exclude>org.apache.commons:commons-lang3</exclude>
                                    <exclude>org.slf4j:slf4j-api</exclude>
                                    <exclude>org.slf4j:slf4j-log4j12</exclude>
                                    <exclude>log4j:log4j</exclude>
                                    <exclude>org.apache.commons:commons-math</exclude>
                                    <exclude>org.apache.sling:org.apache.sling.commons.json</exclude>
                                    <exclude>commons-logging:commons-logging</exclude>
                                    <exclude>commons-codec:commons-codec</exclude>
                                    <exclude>com.fasterxml.jackson.core:jackson-core</exclude>
                                    <exclude>com.fasterxml.jackson.core:jackson-databind</exclude>
                                    <exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
                                    <exclude>stax:stax-api</exclude>
                                    <exclude>com.typesafe:config</exclude>
                                    <exclude>org.uncommons.maths:uncommons-maths</exclude>
                                    <exclude>com.github.scopt:scopt_*</exclude>
                                    <exclude>commons-io:commons-io</exclude>
                                    <exclude>commons-cli:commons-cli</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>org.apache.flink:*</artifact>
                                    <excludes>
                                        <!-- exclude shaded google but include shaded curator -->
                                        <exclude>org/apache/flink/shaded/com/**</exclude>
                                        <exclude>web-docs/**</exclude>
                                    </excludes>
                                </filter>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <!-- If you want to use ./bin/flink run <quickstart jar> uncomment the following lines.
                            This will add a Main-Class entry to the manifest file -->
                            <!--
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.myorg.quickstart.StreamingJob</mainClass>
                                </transformer>
                            </transformers>
                            -->
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.7</source> <!-- If you want to use Java 8, change this to "1.8" -->
                    <target>1.7</target> <!-- If you want to use Java 8, change this to "1.8" -->
                </configuration>
            </plugin>
        </plugins>

        <!-- If you want to use Java 8 Lambda Expressions uncomment the following lines -->
        <!--
        <pluginManagement>
            <plugins>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <compilerId>jdt</compilerId>
                    </configuration>
                    <dependencies>
                        <dependency>
                            <groupId>org.eclipse.tycho</groupId>
                            <artifactId>tycho-compiler-jdt</artifactId>
                            <version>0.21.0</version>
                        </dependency>
                    </dependencies>
                </plugin>

                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-assembly-plugin</artifactId>
                                        <versionRange>[2.4,)</versionRange>
                                        <goals>
                                            <goal>single</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
        -->
    </build>
</project>
java maven apache-kafka apache-flink
2个回答
3
投票

我认为这是因为你试图根据你的Pom.xml使用flink 1.3.0。

<flink.version>1.3.0</flink.version>

DeserializationSchema位于org.apache.flink.streaming.util.serialization for 1.3.0中。不是它试图看的地方。应该能够将你的pom.xml中的版本更改为1.4.1

<flink.version>1.4.1</flink.version>

1
投票

您需要添加此maven依赖项:

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>1.4.0</version>
</dependency>
© www.soinside.com 2019 - 2024. All rights reserved.