我试图同时从两个连接迭代两个结果集,尝试合并两个表。这就是我获得连接的方式:
public static void execMany(Consumer<Connection> cons) {
// ClickHouse server URL
String url = "jdbc:clickhouse://myip:8123/db2";
ClickHouseProperties properties = new ClickHouseProperties();
properties.setSocketTimeout(6000000);
properties.setSessionTimeout(6000000L);
properties.setDatabase("db2");
properties.setUser("default");
properties.setPassword("12345");
ClickHouseDataSource dataSource = new ClickHouseDataSource(url, properties);
try (ClickHouseConnection connection = dataSource.getConnection()) {
cons.accept(connection);
} catch (SQLException e) {
e.printStackTrace();
}
}
这就是我迭代的方式:
var ref = new Object() {
long txCnt = 0;
long addrCnt = 0;
};
var cnt = ClickHouse.count("transactions_tmp2");
ClickHouse.execMany(conn -> ClickHouse.execMany(conn1 -> {
try {
var select1 = conn.createStatement().executeQuery("SELECT * FROM %s order by %s".formatted(t1,f1));
var select2 = conn1.createStatement().executeQuery("SELECT * FROM %s order by %s".formatted(t2,f2));
String val2 = null;
String val1 = null;
while (true) {
if (ref.txCnt % 1000000 == 0)
System.out.println((ref.txCnt) + "/" + cnt + "=" + (1. * ref.txCnt / +cnt) + " addrCnt=" + ref.addrCnt + " fromAddress=" + val2 + " txAddress=" + val1);
if (val2 == null) {
if (!select2.next()) break;
ref.addrCnt++;
val2 = select2.getString(f2);
}
if (val1 == null) {
if (!select1.next()) break;
ref.txCnt++;
val1 = select1.getString(f1);
}
if (val1.compareTo(val2) < 0) {
val1 = null;
continue;
}
if (val2.compareTo(val1) < 0) {
val2 = null;
continue;
}
assert val1.equals(val2);
cons.accept(select1,select2);
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}));
}
但是突然我开始遇到这个错误,这是我以前从未见过的:
Exception in thread "main" java.lang.RuntimeException: java.sql.SQLException: org.apache.http.MalformedChunkCodingException: Unexpected content at the end of chunk
at me.nanosecond.TxImporter.lambda$merge$1(TxImporter.java:123)
at me.nanosecond.ClickHouse.execMany(ClickHouse.java:291)
at me.nanosecond.TxImporter.lambda$merge$2(TxImporter.java:78)
at me.nanosecond.ClickHouse.execMany(ClickHouse.java:291)
at me.nanosecond.TxImporter.merge(TxImporter.java:78)
at me.nanosecond.TxImporter.main(TxImporter.java:185)
Caused by: java.sql.SQLException: org.apache.http.MalformedChunkCodingException: Unexpected content at the end of chunk
at ru.yandex.clickhouse.response.ClickHouseResultSet.hasNext(ClickHouseResultSet.java:164)
at ru.yandex.clickhouse.response.ClickHouseResultSet.next(ClickHouseResultSet.java:200)
at me.nanosecond.TxImporter.lambda$merge$1(TxImporter.java:90)
... 5 more
Caused by: org.apache.http.MalformedChunkCodingException: Unexpected content at the end of chunk
at org.apache.http.impl.io.ChunkedInputStream.getChunkSize(ChunkedInputStream.java:254)
at org.apache.http.impl.io.ChunkedInputStream.nextChunk(ChunkedInputStream.java:222)
at org.apache.http.impl.io.ChunkedInputStream.read(ChunkedInputStream.java:183)
at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:135)
at java.base/java.io.DataInputStream.read(DataInputStream.java:158)
at ru.yandex.clickhouse.util.Utils.readFully(Utils.java:163)
at ru.yandex.clickhouse.util.Utils.readFully(Utils.java:147)
at ru.yandex.clickhouse.response.ClickHouseLZ4Stream.readNextBlock(ClickHouseLZ4Stream.java:102)
at ru.yandex.clickhouse.response.ClickHouseLZ4Stream.checkNext(ClickHouseLZ4Stream.java:75)
at ru.yandex.clickhouse.response.ClickHouseLZ4Stream.read(ClickHouseLZ4Stream.java:61)
at ru.yandex.clickhouse.response.StreamSplitter.readFromStream(StreamSplitter.java:92)
at ru.yandex.clickhouse.response.StreamSplitter.next(StreamSplitter.java:63)
at ru.yandex.clickhouse.response.ClickHouseResultSet.hasNext(ClickHouseResultSet.java:149)
... 7 more
两个查询在一一执行时都运行良好。以防万一有人问我为什么要这样做,我会将您重定向到到这个问题。
我在这里与 ClickHouse 集成团队讨论了您的帖子。您在这里使用什么版本的 clickhouse-java ?堆栈跟踪表明它是一个相当旧的版本。
ClickHouse 一直在积极开发这个库并且它得到了很大的改进。另外,我要指出的是,当由于长时间查询而发生超时时,我们也看到了类似的问题,我们建议增加超时以避免这种情况:https://clickhouse.com/docs/en/integrations/java#resolving- jdbc-大型插入超时