Java 17 升级后,Google Cloud Dataflow 对新代码的代码覆盖率未超过 SonarQube 中的阈值

问题描述 投票:0回答:1

最近,我将我的云数据流应用程序从 Java 11 升级到 Java 17 及其相应的依赖项。该应用程序运行良好,甚至测试用例也运行良好。我还将我的 apache beam 版本从 2.35.0 升级到 2.49.0。

但是,在其中一个自定义类中,

RedisWriteIO
,发生了一些更改,现在测试未通过新的代码覆盖率。

RedisWriteIO

package com.example.dataflow.io.redis;

import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;

public class RedisWriteIO {

    public static Write write() {
        return (new AutoValue_RedisWriteIO_Write.Builder())
                .setConnectionConfiguration(CustomRedisConfigurations.create()).build();
    }

    @AutoValue
    public abstract static class Write extends PTransform<PCollection<KV<String,String>>, PDone> {
        public Write() {
        }

        @Nullable
        abstract CustomRedisConfigurations connectionConfiguration();

        @Nullable
        abstract Long expireTime();

        abstract Builder toBuilder();

        public Write withEndpoint(String host, int port) {
            Preconditions.checkArgument(host != null, "host can not be null");
            Preconditions.checkArgument(port > 0, "port can not be negative or 0");
            return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withHost(host).withPort(port)).build();
        }

        public Write withAuth(String auth) {
            Preconditions.checkArgument(auth != null, "auth can not be null");
            return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withAuth(auth)).build();
        }

        public Write withTimeout(int timeout) {
            Preconditions.checkArgument(timeout >= 0, "timeout can not be negative");
            return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withTimeout(timeout)).build();
        }

        public Write withConnectionConfiguration(CustomRedisConfigurations connection) {
            Preconditions.checkArgument(connection != null, "connection can not be null");
            return this.toBuilder().setConnectionConfiguration(connection).build();
        }

        public Write withExpireTime(Long expireTimeMillis) {
            Preconditions.checkArgument(expireTimeMillis != null, "expireTimeMillis can not be null");
            Preconditions.checkArgument(expireTimeMillis > 0L, "expireTimeMillis can not be negative or 0");
            return this.toBuilder().setExpireTime(expireTimeMillis).build();
        }

        public PDone expand(PCollection<KV<String, String>> input) {
            Preconditions.checkArgument(this.connectionConfiguration() != null, "withConnectionConfiguration() is required");
            input.apply(ParDo.of(new WriteFn(this)));
            return PDone.in(input.getPipeline());
        }

        private static class WriteFn extends DoFn<KV<String, String>, Void>{
            private static final int DEFAULT_BATCH_SIZE = 1000;
            private final RedisWriteIO.Write spec;
            private transient Jedis jedis;
            private transient @Nullable Transaction transaction;

            private int batchCount;

            public WriteFn(RedisWriteIO.Write spec) {
                this.spec = spec;
            }

            @Setup
            public void setup() {
                jedis = spec.connectionConfiguration().connect();
            }

            @StartBundle
            public void startBundle() {
                transaction = jedis.multi();
                batchCount = 0;
            }
            @ProcessElement
            public void processElement(DoFn<KV<String, String>, Void>.ProcessContext c) {

                KV<String, String> record = c.element();

                String fieldKey = record.getKey();
                String fieldValue = record.getValue();

                transaction.sadd(fieldKey,fieldValue);

                batchCount++;

                if (batchCount >= DEFAULT_BATCH_SIZE) {
                    transaction.exec();
                    transaction.multi();
                    batchCount = 0;
                }
            }

            @FinishBundle
            public void finishBundle() {
                if (batchCount > 0) {
                    transaction.exec();
                }
                if (transaction != null) {
                    transaction.close();
                }
                transaction = null;
                batchCount = 0;
            }

            @Teardown
            public void teardown() {
                jedis.close();
            }
        }

        @AutoValue.Builder
        abstract static class Builder {
            Builder() {
            }

            abstract Builder setConnectionConfiguration(CustomRedisConfigurations connectionConfiguration);

            abstract Builder setExpireTime(Long expireTimeMillis);

            abstract Write build();

        }
    }
}

测试类如下:

package com.example.dataflow.io.redis;

import com.github.fppt.jedismock.RedisServer;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Wait;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.junit.*;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;

import javax.net.ssl.SSLSocketFactory;
import java.io.IOException;

import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.*;


public class RedisWriteIOTest {

    private static final String REDIS_HOST = "localhost";
    private static final String[] INPUT_DATA = new String[]{
            "123456789",
            "Bruce",
            "Wayne"
    };

    @Mock
    static SSLSocketFactory socketFactory;
    private static RedisServer server;
    private static int port;

    @Mock
    private static Jedis jedis;

    @Mock
    private Transaction transaction;

    private int batchCount;

    @Rule
    public TestPipeline pipeline = TestPipeline.create();
    @Mock
    CustomRedisConfigurations connection;

    @Mock
    DoFn.OutputReceiver<KV<String, String>> out;

    @Before
    public void setUp() {
        MockitoAnnotations.openMocks(this);
        when(connection.connect()).thenReturn(jedis);
        when(jedis.multi()).thenReturn(transaction);
        batchCount = 0;
    }


    @BeforeClass
    public static void beforeClass() throws Exception {
        server = RedisServer.newRedisServer(8000);
        server.start();
        port = server.getBindPort();
        jedis = new Jedis(server.getHost(), server.getBindPort());
    }

    @AfterClass
    public static void afterClass() throws IOException {
        jedis.close();
        server.stop();
    }

    @Test
    public void WriteMemoryStoreWithEmptyAuth() {
        RedisWriteIO.write()
                .withEndpoint(REDIS_HOST, port).withAuth("");
    }

    @Test
    public void WriteMemoryStoreWithAuth() {
        RedisWriteIO.write()
                .withAuth("AuthString");
    }

    @Test
    public void WriteTimeOut() {
        RedisWriteIO.write()
                .withTimeout(10);
    }

    @Test
    public void WriteMemoryStoreWithExpireTime() {
        RedisWriteIO.Write write = RedisWriteIO.write();
        write = write.withExpireTime(1000L);
        assertNotNull(write);
    }

    @Test(expected = IllegalArgumentException.class)
    public void WriteMemoryStoreWithoutExpireTime() {
        RedisWriteIO.write()
                .withExpireTime(0L);
    }


    @Test(expected = IllegalArgumentException.class)
    public void WriteMemoryStoreWithNegativeExpireTime() {
        RedisWriteIO.write()
                .withExpireTime(-10L);
    }

    @Test
    public void WriteMemoryStoryWithConnectionConfiguration() {
        connection = CustomRedisConfigurations.create().withHost(REDIS_HOST).withPort(port);
        RedisWriteIO.Write write = RedisWriteIO.write()
                .withConnectionConfiguration(connection);
        assertNotNull(write);
    }

    @Test(expected = IllegalArgumentException.class)
    public void WriteMemoryStoryWithNullConnectionConfiguration() {
        RedisWriteIO.Write write = RedisWriteIO.write()
                .withConnectionConfiguration(null);
    }


    @Test
    public void testBatchProcessingWithTransactionExecuted() {
        RedisWriteIO.Write spec = RedisWriteIO.write().withConnectionConfiguration(connection);
        PCollection<String> flushFlag = pipeline.apply("Read File", TextIO.read().from("files/fileHavingFiveThousandRecords.txt"));

        List<KV<String, String>> recordEntries = new ArrayList<>();
        for (int i = 0; i <= 10000; i++) {
            // adding unique entries 10000 times
            recordEntries.add(KV.of("Bruce:Wayne" + i, "123456789" + i));
        }

        // outputData will be written to Redis (memorystore)
        PCollection<KV<String, String>> outputData = pipeline.apply(Create.of(recordEntries));

        outputData.apply("Waiting until clearing Redis database", Wait.on(flushFlag))
               .apply("Writing the data into Redis database", RedisWriteIO.write()
                    .withConnectionConfiguration(CustomRedisConfigurations
                            .create(REDIS_HOST, port)
                            .withTimeout(100)
                            .withAuth("credentials")
                            .enableSSL()));
        pipeline.run();

    }

}

RedisWriteIO
是一个实用程序类,它将文件中的数据写入Redis数据库。它按预期工作,并且编写的测试用例也按预期工作。但是,SonarQube 未涵盖以下代码块。

if (batchCount >= DEFAULT_BATCH_SIZE) {
     transaction.exec();
     transaction.multi();
     batchCount = 0;
}

当文件超过 1000 条记录时,应执行上述块。它在测试类中不起作用。我尝试使用具有 5000 条记录的测试文件覆盖

testBatchProcessingWithTransactionExecuted()
方法中的这段代码,但该代码块仍然无法执行。

我需要帮助编写涵盖所有行的测试用例。

java junit sonarqube google-cloud-dataflow google-cloud-memorystore
1个回答
0
投票

我能够编写覆盖所有行的测试用例。我刚刚将列表的大小增加到 20000,通过这样做,

RedisWriteIO
类可以按预期运行以处理更大的数据集。

批量计数 1000 作为

DEFAULT_BATCH_SIZE
指定的阈值,当达到该阈值时,就会执行事务(
transaction.exec()
)并启动新事务(
transaction.multi()
)。

    @Test
    public void testBatchProcessingWithTransactionExecuted() {
        RedisWriteIO.Write spec = RedisWriteIO.write().withConnectionConfiguration(connection);
        PCollection<String> flushFlag = pipeline.apply("Read File", TextIO.read().from("files/fileHavingFiveThousandRecords.txt"));

        List<KV<String, String>> recordEntries = new ArrayList<>();
        for (int i = 0; i <= 20000; i++) {
            // adding unique entries 20000 times
            recordEntries.add(KV.of("Bruce:Wayne" + i, "123456789" + i));
        }

        // outputData will be written to Redis (memorystore)
        PCollection<KV<String, String>> outputData = pipeline.apply(Create.of(recordEntries));

        outputData.apply("Waiting until clearing Redis database", Wait.on(flushFlag))
               .apply("Writing the data into Redis database", RedisWriteIO.write()
                    .withConnectionConfiguration(CustomRedisConfigurations
                            .create(REDIS_HOST, port)
                            .withTimeout(100)
                            .withAuth("credentials")
                            .enableSSL()));
        pipeline.run();

    }
© www.soinside.com 2019 - 2024. All rights reserved.