阿帕奇梁:是有可能消耗的RabbitMQ的消息以交换和路由关键

问题描述 投票:0回答:1

我定义Apache中梁管道消耗在RabbitMQ的消息代理给定队列的消息。

我定义在RabbitMQ的交换和路由的关键。

我在梁用AmqpIO.read()(2.9.0版本),但我并没有发现任何API设置ECHANGE和路由关键。

(在此之后DOC:https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/amqp/AmqpIO.html

是否有可能做到这一点?即使与其他任何插件。

问候,阿里

rabbitmq apache-beam
1个回答
0
投票

有一个新的(实验)IO为RabbitMQ的附带最新v2.9.0阿帕奇梁释放连​​接器。同时,AMQP连接器不会为RabbitMQ的工作。

如果您正在使用Maven添加在您的POM以下依赖

<!-- Beam MongoDB I/O -->
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-mongodb</artifactId>
    <version>2.9.0</version>
</dependency>

你可以像流水线使用

public class RabbitMQPipeline {

final static Logger log = LoggerFactory.getLogger(RabbitMQPipeline.class);

/**
 * Mongo Pipeline options.
 */
public interface RabbitMQPipelineOptions extends PipelineOptions {

    @Description("Path of the file to read from")
    @Default.String("amqp://localhost")
    @Required
    String getUri();

    void setUri(String uri);

}

/**
 * @param args
 */
public static void main(String[] args) {

    RabbitMQPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(RabbitMQPipelineOptions.class);

    Pipeline pipeline = Pipeline.create(options);

    PCollection<RabbitMqMessage> messages = pipeline
            .apply(RabbitMqIO2.read().withUri(options.getUri()).withQueue("test"));

    messages.apply(ParDo.of(new DoFn<RabbitMqMessage, String>() {
        @ProcessElement
        public void process(@Element RabbitMqMessage msg) {
            System.out.println(msg.toString());
        }
    }));

    pipeline.run().waitUntilFinish();
}

}

该RabbitMqIO的Javadoc有如何使用读写器的例子。

一个忠告

目前已被固定的,但定于v2.11.0释放known bug阻止从最简单的情况下,即使工作的连接器。解决方法是非常简单的(见JIRA问题),但你将需要重建类的新版本。如果你想给它一个尝试确保您添加以下Maven的依赖

<dependency>
    <groupId>com.google.auto.value</groupId>
    <artifactId>auto-value</artifactId>
    <version>1.5.2</version>
    <scope>provided</scope>
</dependency>

并添加以下配置中的Maven Compiler插件

<plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.6.1</version>
        <configuration>
            <source>1.8</source>
            <target>1.8</target>
            <annotationProcessors>
        <annotationProcessor>com.google.auto.value.processor.AutoValueProcessor</annotationProcessor>
            </annotationProcessors>
        </configuration>
    </plugin>

如果您使用的是Eclipse确保你安装m2-apt Maven plugin。祝好运!

© www.soinside.com 2019 - 2024. All rights reserved.