我的两个问题:
Foo
类不可序列化?我在 Java 中使用嵌入式 hazelcast (5.4.0)。
有问题的管道:
Pipeline pipeline = Pipeline.create();
StreamStage<Bar> prepared = pipeline.readFrom(KafkaSources. ...)
.withTimestamps(...)
.map(Map.Entry::getValue);
ServiceFactory<?, Foo> service = ServiceFactories.sharedService(ctx -> new Foo(url))
.toNonCooperative();
prepared.mapUsingService(service, (s, bar) -> new EnhancedBar(bar, s.getDetails(bar.getId())))
.writeTo(Sinks.logger());
错误:
java.lang.IllegalArgumentException: "createContextFn" must be serializable
at com.hazelcast.internal.serialization.impl.SerializationUtil.checkSerializable(SerializationUtil.java:83)
有问题的班级:
@Getter
@Setter
public class Foo implements Serializable {
private String url;
private transient OkHttpClient client;
public Foo() {
this.url = "unknown";
client = new OkHttpClient();
}
public Foo(String url) {
this.url = url;
client = new OkHttpClient();
}
public String getDetails() {
Request request = new Request.Builder().url(url).build();
Call call = client.newCall(request);
try {
Response response = call.execute();
return response.body().string();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Serial
private void writeObject(ObjectOutputStream oos) throws IOException {
oos.writeObject(url);
}
@Serial
private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
String url = (String) ois.readObject();
this.setUrl(url);
this.setClient(new OkHttpClient());
}
}
问题源于
OkHttpClient
没有实现Serialized(我猜)。
我的尝试:
"constant detail"
,作业就成功提交了。@Serial
方法并使用 CompactSerializer 时,我得到了同样的错误。在最后一个例子中,我的 CompactSerializer 的读/写方法是
@NotNull @Override
public Foo read(@NotNull CompactReader reader) {
String url = reader.readString("url");
return new Foo(url);
}
@Override
public void write(@NotNull CompactWriter writer, @NotNull Foo object) {
writer.writeString("url", object.getUrl());
}
如果你使用的是OkHttpClient,那么你还需要上传
注意:在我看来,OkHttpClient 正在造成不必要的负担。只需使用JDK自带的HttpClient即可。作业配置会更简单
这是一个例子
// This example creates a Jet job that downsload the given url.
@Slf4j
class OkHttpClientJob {
public static void main(String[] args) throws InterruptedException {
log.info("Starting Test");
// Start client
HazelcastInstance hazelcastClient = getHazelcastClientInstance();
submitJob(hazelcastClient);
TimeUnit.SECONDS.sleep(10);
hazelcastClient.shutdown();
log.info("Test completed");
}
private static HazelcastInstance getHazelcastClient() {
ClientConfig clientConfig = new ClientConfig();
return HazelcastClient.newHazelcastClient(clientConfig);
}
private static void submitJob(HazelcastInstance hazelcastInstance) {
// Create a service that will be used by Jet Job
ServiceFactory<?, HttpClientService> service =
ServiceFactories.sharedService(ctx -> new HttpClientService())
.toNonCooperative();
Pipeline pipeline = Pipeline.create();
List<String> input = List.of("https://www.google.com");
BatchStage<String> prepared = pipeline.readFrom(TestSources.items(input));
prepared.mapUsingService(service, HttpClientService::get)
.writeTo(Sinks.logger());
// Path to okhttp jar file
File file = new File("src/main/resources/okhttp-5.0.0-alpha.14.jar");
JobConfig jobConfig = new JobConfig()
// Add jar to jet job
.addJarsInZip(file)
// Add parent class that contains the service class
.addClass(OkHttpClientJob.class);
JetService jetService = hazelcastInstance.getJet();
Job job = jetService.newJob(pipeline, jobConfig);
job.join();
}
private static class HttpClientService implements Serializable {
private transient OkHttpClient client;
public HttpClientService() {
client = new OkHttpClient();
}
public String get(String url) {
Request request = new Request.Builder()
.url(url)
.build();
String result;
try {
Call call = client.newCall(request);
try (Response response = call.execute()) {
try (ResponseBody responseBody = response.body()) {
result = responseBody.string();
}
}
} catch (IOException exception) {
result = exception.getMessage();
}
return result;
}
}
}