最近,我将我的云数据流应用程序从 Java 11 升级到 Java 17 及其相应的依赖项。该应用程序运行良好,甚至测试用例也运行良好。我还将我的 apache beam 版本从 2.35.0 升级到 2.49.0。
但是,在其中一个自定义类中,
RedisWriteIO
,发生了一些更改,现在测试未通过新的代码覆盖率。
RedisWriteIO
package com.example.dataflow.io.redis;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;
public class RedisWriteIO {
public static Write write() {
return (new AutoValue_RedisWriteIO_Write.Builder())
.setConnectionConfiguration(CustomRedisConfigurations.create()).build();
}
@AutoValue
public abstract static class Write extends PTransform<PCollection<KV<String,String>>, PDone> {
public Write() {
}
@Nullable
abstract CustomRedisConfigurations connectionConfiguration();
@Nullable
abstract Long expireTime();
abstract Builder toBuilder();
public Write withEndpoint(String host, int port) {
Preconditions.checkArgument(host != null, "host can not be null");
Preconditions.checkArgument(port > 0, "port can not be negative or 0");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withHost(host).withPort(port)).build();
}
public Write withAuth(String auth) {
Preconditions.checkArgument(auth != null, "auth can not be null");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withAuth(auth)).build();
}
public Write withTimeout(int timeout) {
Preconditions.checkArgument(timeout >= 0, "timeout can not be negative");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withTimeout(timeout)).build();
}
public Write withConnectionConfiguration(CustomRedisConfigurations connection) {
Preconditions.checkArgument(connection != null, "connection can not be null");
return this.toBuilder().setConnectionConfiguration(connection).build();
}
public Write withExpireTime(Long expireTimeMillis) {
Preconditions.checkArgument(expireTimeMillis != null, "expireTimeMillis can not be null");
Preconditions.checkArgument(expireTimeMillis > 0L, "expireTimeMillis can not be negative or 0");
return this.toBuilder().setExpireTime(expireTimeMillis).build();
}
public PDone expand(PCollection<KV<String, String>> input) {
Preconditions.checkArgument(this.connectionConfiguration() != null, "withConnectionConfiguration() is required");
input.apply(ParDo.of(new WriteFn(this)));
return PDone.in(input.getPipeline());
}
private static class WriteFn extends DoFn<KV<String, String>, Void>{
private static final int DEFAULT_BATCH_SIZE = 1000;
private final RedisWriteIO.Write spec;
private transient Jedis jedis;
private transient @Nullable Transaction transaction;
private int batchCount;
public WriteFn(RedisWriteIO.Write spec) {
this.spec = spec;
}
@Setup
public void setup() {
jedis = spec.connectionConfiguration().connect();
}
@StartBundle
public void startBundle() {
transaction = jedis.multi();
batchCount = 0;
}
@ProcessElement
public void processElement(DoFn<KV<String, String>, Void>.ProcessContext c) {
KV<String, String> record = c.element();
String fieldKey = record.getKey();
String fieldValue = record.getValue();
transaction.sadd(fieldKey,fieldValue);
batchCount++;
if (batchCount >= DEFAULT_BATCH_SIZE) {
transaction.exec();
transaction.multi();
batchCount = 0;
}
}
@FinishBundle
public void finishBundle() {
if (batchCount > 0) {
transaction.exec();
}
if (transaction != null) {
transaction.close();
}
transaction = null;
batchCount = 0;
}
@Teardown
public void teardown() {
jedis.close();
}
}
@AutoValue.Builder
abstract static class Builder {
Builder() {
}
abstract Builder setConnectionConfiguration(CustomRedisConfigurations connectionConfiguration);
abstract Builder setExpireTime(Long expireTimeMillis);
abstract Write build();
}
}
}
测试类如下:
package com.example.dataflow.io.redis;
import com.github.fppt.jedismock.RedisServer;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Wait;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.junit.*;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;
import javax.net.ssl.SSLSocketFactory;
import java.io.IOException;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.*;
public class RedisWriteIOTest {
private static final String REDIS_HOST = "localhost";
private static final String[] INPUT_DATA = new String[]{
"123456789",
"Bruce",
"Wayne"
};
@Mock
static SSLSocketFactory socketFactory;
private static RedisServer server;
private static int port;
@Mock
private static Jedis jedis;
@Mock
private Transaction transaction;
private int batchCount;
@Rule
public TestPipeline pipeline = TestPipeline.create();
@Mock
CustomRedisConfigurations connection;
@Mock
DoFn.OutputReceiver<KV<String, String>> out;
@Before
public void setUp() {
MockitoAnnotations.openMocks(this);
when(connection.connect()).thenReturn(jedis);
when(jedis.multi()).thenReturn(transaction);
batchCount = 0;
}
@BeforeClass
public static void beforeClass() throws Exception {
server = RedisServer.newRedisServer(8000);
server.start();
port = server.getBindPort();
jedis = new Jedis(server.getHost(), server.getBindPort());
}
@AfterClass
public static void afterClass() throws IOException {
jedis.close();
server.stop();
}
@Test
public void WriteMemoryStoreWithEmptyAuth() {
RedisWriteIO.write()
.withEndpoint(REDIS_HOST, port).withAuth("");
}
@Test
public void WriteMemoryStoreWithAuth() {
RedisWriteIO.write()
.withAuth("AuthString");
}
@Test
public void WriteTimeOut() {
RedisWriteIO.write()
.withTimeout(10);
}
@Test
public void WriteMemoryStoreWithExpireTime() {
RedisWriteIO.Write write = RedisWriteIO.write();
write = write.withExpireTime(1000L);
assertNotNull(write);
}
@Test(expected = IllegalArgumentException.class)
public void WriteMemoryStoreWithoutExpireTime() {
RedisWriteIO.write()
.withExpireTime(0L);
}
@Test(expected = IllegalArgumentException.class)
public void WriteMemoryStoreWithNegativeExpireTime() {
RedisWriteIO.write()
.withExpireTime(-10L);
}
@Test
public void WriteMemoryStoryWithConnectionConfiguration() {
connection = CustomRedisConfigurations.create().withHost(REDIS_HOST).withPort(port);
RedisWriteIO.Write write = RedisWriteIO.write()
.withConnectionConfiguration(connection);
assertNotNull(write);
}
@Test(expected = IllegalArgumentException.class)
public void WriteMemoryStoryWithNullConnectionConfiguration() {
RedisWriteIO.Write write = RedisWriteIO.write()
.withConnectionConfiguration(null);
}
@Test
public void testBatchProcessingWithTransactionExecuted() {
RedisWriteIO.Write spec = RedisWriteIO.write().withConnectionConfiguration(connection);
PCollection<String> flushFlag = pipeline.apply("Read File", TextIO.read().from("files/fileHavingFiveThousandRecords.txt"));
List<KV<String, String>> recordEntries = new ArrayList<>();
for (int i = 0; i <= 10000; i++) {
// adding unique entries 10000 times
recordEntries.add(KV.of("Bruce:Wayne" + i, "123456789" + i));
}
// outputData will be written to Redis (memorystore)
PCollection<KV<String, String>> outputData = pipeline.apply(Create.of(recordEntries));
outputData.apply("Waiting until clearing Redis database", Wait.on(flushFlag))
.apply("Writing the data into Redis database", RedisWriteIO.write()
.withConnectionConfiguration(CustomRedisConfigurations
.create(REDIS_HOST, port)
.withTimeout(100)
.withAuth("credentials")
.enableSSL()));
pipeline.run();
}
}
RedisWriteIO
是一个实用程序类,它将文件中的数据写入Redis数据库。它按预期工作,并且编写的测试用例也按预期工作。但是,SonarQube 未涵盖以下代码块。
if (batchCount >= DEFAULT_BATCH_SIZE) {
transaction.exec();
transaction.multi();
batchCount = 0;
}
当文件超过 1000 条记录时,应执行上述块。它在测试类中不起作用。我尝试使用具有 5000 条记录的测试文件覆盖
testBatchProcessingWithTransactionExecuted()
方法中的这段代码,但该代码块仍然无法执行。
我需要帮助编写涵盖所有行的测试用例。
我能够编写覆盖所有行的测试用例。我刚刚将列表的大小增加到 20000,通过这样做,
RedisWriteIO
类可以按预期运行以处理更大的数据集。
批量计数 1000 作为
DEFAULT_BATCH_SIZE
指定的阈值,当达到该阈值时,就会执行事务(transaction.exec()
)并启动新事务(transaction.multi()
)。
@Test
public void testBatchProcessingWithTransactionExecuted() {
RedisWriteIO.Write spec = RedisWriteIO.write().withConnectionConfiguration(connection);
PCollection<String> flushFlag = pipeline.apply("Read File", TextIO.read().from("files/fileHavingFiveThousandRecords.txt"));
List<KV<String, String>> recordEntries = new ArrayList<>();
for (int i = 0; i <= 20000; i++) {
// adding unique entries 20000 times
recordEntries.add(KV.of("Bruce:Wayne" + i, "123456789" + i));
}
// outputData will be written to Redis (memorystore)
PCollection<KV<String, String>> outputData = pipeline.apply(Create.of(recordEntries));
outputData.apply("Waiting until clearing Redis database", Wait.on(flushFlag))
.apply("Writing the data into Redis database", RedisWriteIO.write()
.withConnectionConfiguration(CustomRedisConfigurations
.create(REDIS_HOST, port)
.withTimeout(100)
.withAuth("credentials")
.enableSSL()));
pipeline.run();
}