我正在尝试在 java 中设置骆驼路由,读取 Azure 数据湖中的 csv 并对其进行处理。到目前为止,它看起来像这样:
from("direct:updateValues")
.pollEnrich()
.simple(messageRouteConfig.getFrom() + "&fileName=${body}")
.log(LoggingLevel.INFO, log, "message content (file content): ${body}")
.log(LoggingLevel.INFO, log, "Starting processing file : ${file:name}")
然后它会处理并将其发送到某个地方(这部分有效)。我在正文中传递文件名 (testing.csv),并且 messageRouteConfig.getFrom() 返回以下 URI:
azure-storage-datalake:account01/test?clientId=clientId&tenantId=tenantId&clientSecret=secret&directoryName=/foo/bar
问题是 directoryName 选项似乎没有被考虑在内。在数据湖中,文件位置为:
test/foo/bar/testing.csv
。
但当试图得到它时,骆驼似乎试图达到:test/testing.csv
。
我也尝试过这种方式(从 URI 中删除 directoryName),但也没有成功:
from("direct:updateMdmIds")
.setHeader("CamelAzureStorageDataLakeDirectoryName", constant("/foo/bar"))
.pollEnrich()
.simple(messageRouteConfig.getFrom() + "&fileName=${body}")
.log(LoggingLevel.INFO, log, "message content (file content): ${body}")
.log(LoggingLevel.INFO, log, "Starting processing file : ${file:name}")
但是如果我删除任何 directoryName 选项并在正文中传递 /foo/bar/testing.csv (作为文件名),它就可以工作。知道在这里做什么吗?
附带问题:无论如何要获取读取的文件名?我尝试记录它(按照最后一行),但它似乎不起作用。
非常感谢!
我正在尝试在 Java 中设置骆驼路由,读取 Azure 数据湖中的 csv 并对其进行处理。
我遵循了这个Camel官方文档,
您可以使用下面的代码使用camel路线从azure-data Lake读取csv文件。
代码:
public class App {
public static void main(String[] args) throws Exception {
CamelContext context = new DefaultCamelContext();
DataLakeComponent azureStorageDataLakeComponent = new DataLakeComponent();
DataLakeConfiguration configuration = new DataLakeConfiguration();
configuration.setClientId("xxxx");
configuration.setTenantId("xxxx");
configuration.setClientSecret("Bxxxx");
azureStorageDataLakeComponent.setConfiguration(configuration);
context.addComponent("azure-storage-datalake", azureStorageDataLakeComponent);
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
.to("azure-storage-datalake:venkat8912/test?operation=getFile&fileName=foo/bar/001.csv")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
// Retrieve file content as InputStream
InputStream inputStream = exchange.getMessage().getBody(InputStream.class);
String fileContent = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
fileContent = fileContent.replaceAll("(?m)^[ \t]*\r?\n", ""); // Remove empty lines
System.out.println("File Content:\n" + fileContent);
}
})
.to("mock:results");
}
});
// Start the context
context.start();
// Send a message to trigger the route
context.createProducerTemplate().sendBody("direct:start", null);
// Allow the route to run for a while before stopping the context
Thread.sleep(5000);
// Stop the context
context.stop();
}
}
输出:
File Content:
PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. William Henry",male,35,0,0,373450,8.05,,S
6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
7,0,1,"McCarthy, Mr. Timothy J",male,54,0,0,17463,51.8625,E46,S
8,0,3,"Palsson, Master. Gosta Leonard",male,2,3,1,349909,21.075,,S
9,1,3,"Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)",female,27,0,2,347742,11.1333,,S
10,1,2,"Nasser, Mrs. Nicholas (Adele Achem)",female,14,1,0,237736,30.0708,,C