我正在尝试部署camel-salesforce-source 连接器。
我已经从maven下载了camel-salesforce-source-kafka-connector-4.4.2-package.tar.gz tar文件,将其提取到一个目录并将该目录添加到connect-distributed的插件路径中。
我还提取了camel-salesforce-kafka-connector-0.9.0-package.tar.gz并将其添加到插件中。 连接器列表按预期显示了所有驼峰和 SF 连接器
"org.apache.camel.kafkaconnector.CamelSinkConnector"
"org.apache.camel.kafkaconnector.CamelSinkConnector"
"org.apache.camel.kafkaconnector.salesforce.CamelSalesforceSinkConnector"
"org.apache.camel.kafkaconnector.CamelSourceConnector"
"org.apache.camel.kafkaconnector.CamelSourceConnector"
"org.apache.camel.kafkaconnector.salesforce.CamelSalesforceSourceConnector"
"org.apache.camel.kafkaconnector.salesforcesource.CamelSalesforcesourceSourceConnector"
当我尝试启动连接器时,出现错误 -
javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException:
java.lang.NoClassDefFoundError: org/apache/camel/kafkaconnector/CamelSourceConnectorConfig
我的连接器配置如下
{
"name": "SF-poll",
"config": {
"tasks.max": "1",
"connector.class": "org.apache.camel.kafkaconnector.salesforce.CamelSalesforceSourceConnector",
"camel.kamelet.salesforce-source.query": "SELECT Id, Name, Email, Phone FROM Contact",
"camel.kamelet.salesforce-source.topicName": "sf-contact",
"camel.kamelet.salesforce-source.loginUrl": "https://org--qa.sandbox.lightning.force.com/",
"camel.kamelet.salesforce-source.clientId": "***",
"camel.kamelet.salesforce-source.clientSecret": "***",
"camel.kamelet.salesforce-source.userName": "***",
"camel.kamelet.salesforce-source.password": "***"
}
}
还有我的 connect-distributed.properties -
bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
consumer.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
consumer.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/usr/share/java,/home/ubuntu/Downloads/kafka_2.12-3.5.2/libs,/home/ubuntu/Downloads/kafka_2.12-3.5.2/camel-salesforce-source-kafka-connector
这里缺少什么?
错误是由于重复的依赖关系造成的。我通过为连接器创建 Uber jar 文件解决了这个问题。以下是步骤 -
下载/克隆骆驼连接器存储库。 https://github.com/apache/camel-kafka-connector
导航到您要构建的连接器目录 例如
camel-kafka-connector-main/connectors/camel-salesforce-source-kafka-connector
通过更改下面的 pom.xml 创建一个 fat/uber jar https://github.com/apache/camel/blob/main/tooling/maven/camel-maven-plugin/src/main/docs/camel-maven-plugin.adoc#camelprepare-fatjar
将此插件包含在连接器的 pom.xml 中
<build>
<plugins>
<plugin>
<groupId>org.apache.camel</groupId>
<artifactId>camel-maven-plugin</artifactId>
<version>${camel.version}</version>
<executions>
<execution>
<goals>
<goal>prepare-fatjar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
例如,要将其与 maven-assemble-plugin 一起使用,您可以执行以下操作。请记住指定主类的类名,其中显示 com.foo.NameOfMainClass:
<build>
<plugins>
<plugin>
<groupId>org.apache.camel</groupId>
<artifactId>camel-maven-plugin</artifactId>
<version>${camel.version}</version>
<executions>
<execution>
<goals>
<goal>prepare-fatjar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.foo.NameOfMainClass</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
或者在我的例子中使用 maven-shade 插件
<plugin>
<groupId>org.apache.camel</groupId>
<artifactId>camel-maven-plugin</artifactId>
<version>${camel.version}</version>
<executions>
<execution>
<goals>
<goal>prepare-fatjar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.6.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.camel.kafkaconnector.salesforcesource.CamelSalesforcesourceSourceConnector</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
谢谢。