我的要求是加密表的个人标识列。为此,我编写了一个小代码,可以分批选择数据并将其插入到新表中,而很少有额外的列。
问题是当我运行我的代码时,它在开始时效果很好,但停止在db中插入记录。它也不会打印任何异常。
这是我的连接池配置。
private BlockingQueue<EntityManager> getConnectionPool(int poolSize) throws InterruptedException { // List<EntityManager> list = new ArrayList<>(); BlockingQueue<EntityManager> queue = new ArrayBlockingQueue<>(poolSize); int i = poolSize; do { EntityManager entityManager = connectionService.getEm(); queue.put(entityManager); //list.add(entityManager); i--; } while (i != 0); return queue; }
这是所有内容的起点。它计算批处理的总数,并为执行程序服务调用一种方法。
public void insertData() throws InterruptedException { key = hash(key); EntityManager entityManager = connectionService.getEm(); EntityTransaction entityTransaction = entityManager.getTransaction(); BlockingQueue<EntityManager> queue = getConnectionPool( 200); try { int batchSize= 1000; BigInteger totalResults = partnerRepository.getCountCustomerLedger(entityManager); double totalPages = Math.ceil(totalResults.longValue() / batchSize); int maxResult = batchSize; CountDownLatch latch = new CountDownLatch(((Double)totalPages).intValue()); for(int i =1 ; i <= totalPages; i++) { int firstResult = (i - 1) * batchSize; if (i == totalPages) { batchSize = totalResults.intValue() - firstResult; } exectueTask(queue, firstResult, batchSize, latch, i); } System.out.println("waiting for latch to finish"); latch.await(); System.out.println("latch exited"); }catch (Exception e) { e.printStackTrace(); if (entityTransaction.isActive()) { entityTransaction.rollback(); } entityManager.close(); } finally { int i = poolSize; do { queue.take().close(); i--; } while (i != 0); } entityManager.close(); }
这将调用执行程序方法
private void exectueTask(BlockingQueue<EntityManager> queue, int firstResult, int batchSize, CountDownLatch latch, int batchNumber) { taskExecutor.execute(() -> { try { try { run(queue, firstResult, batchSize, latch, batchNumber); } catch (IOException e) { e.printStackTrace(); } } catch (InterruptedException e) { e.printStackTrace(); } }); }
这里我批量执行查询并将数据插入db
private void run(BlockingQueue<EntityManager> queue, int firstResult, int batchSize, CountDownLatch latch, int batchNumber) throws InterruptedException, IOException { logger.info("batchNumber " + batchNumber + " batchNumber called " + " at " + new Date()); EntityManager entityManager = queue.take(); logger.info("batchNumber " + batchNumber + " batchNumber took " + " following time to get entitymanager " + new Date()); EntityTransaction entityTransaction = entityManager.getTransaction(); List<CustomerLedger> customerLedgerList = partnerRepository.getAllCustomerLedger(entityManager,firstResult, batchSize); //List<Object[]> customerLedgerList = partnerRepository.getAllCustomerLedgerNative(entityManager,firstResult, batchSize); entityTransaction.begin(); for (CustomerLedger old :customerLedgerList) { CustomerLedgerNew ledgerNew = new CustomerLedgerNew(); String customerLedgerJson = objectMapper.writeValueAsString(old); ledgerNew = customerLedgerToCustomerLedgerNew(customerLedgerJson); ledgerNew.setFirstName(convertToDatabaseColumn(old.getFirstName(),key)); ledgerNew.setMiddleName(convertToDatabaseColumn(old.getMiddleName(),key)); ledgerNew.setLastName(convertToDatabaseColumn(old.getLastName(),key)); ledgerNew.setAddressLine1(convertToDatabaseColumn(old.getAddressLine1(),key)); ledgerNew.setAddressLine2(convertToDatabaseColumn(old.getAddressLine2(),key)); ledgerNew.setAddressLine3(convertToDatabaseColumn(old.getAddressLine3(),key)); ledgerNew.setAddressLine4(convertToDatabaseColumn(old.getAddressLine4(),key)); ledgerNew.setHomePhone(convertToDatabaseColumn(old.getHomePhone(),key)); ledgerNew.setWorkPhone(convertToDatabaseColumn(old.getWorkPhone(),key)); ledgerNew.setEmail1(convertToDatabaseColumn(old.getEmail1(),key)); ledgerNew.setMobile(convertToDatabaseColumn(old.getMobile(),key)); ledgerNew.setMobileSha(sha256Hash(old.getMobile())); ledgerNew.setMobileChecksum(getMD5Hash(old.getMobile())); ledgerNew.setEmailSha(sha256Hash(old.getEmail1())); ledgerNew.setEmailChecksum(getMD5Hash(old.getEmail1())); //ledgerNew.setChannel(old.getChannel()); //ledgerNew.setUniqueCustomerId(old.getUniqueCustomerId()); //ledgerNew.setLastModifiedDate(old.getLastModifiedDate()); entityManager.persist(ledgerNew); } //System.out.println("commited"); logger.info("batchNumber " + batchNumber + " batchNumber started commiting data at " + new Date()); entityTransaction.commit(); logger.info("batchNumber " + batchNumber + " batchNumber finished commiting data at " + new Date()); queue.put(entityManager); latch.countDown(); logger.info("batchNumber " + batchNumber + " latch count " + latch.getCount()); }
我从日志中注意到的一点,仅打印日志
** batchNumber 615 batchNumber在IST 201 **周三12月11日17:22:54开始提交数据,但不打印下一行提交数据的日志。我真的无法理解这个原因。
线程池配置类
@Configuration public class ThreadPoolConfiguration { private final static org.slf4j.Logger LOGGER = org.slf4j.LoggerFactory.getLogger(ThreadPoolConfiguration.class); private final int defaultCorePoolSize = 200; private final int defaultMaxPoolSize = 300; private final int defaultQueueCapacity = 20000; private final int defaultKeepAlive = 10; @Bean @Qualifier("TaskExecutor") public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(defaultCorePoolSize); executor.setMaxPoolSize(defaultMaxPoolSize); executor.setQueueCapacity(defaultQueueCapacity); executor.setKeepAliveSeconds(defaultKeepAlive); executor.setAllowCoreThreadTimeOut(true); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setThreadNamePrefix("encryption-DEFAULT-"); executor.initialize(); return executor; } }
如果无法正确构图,请原谅我
我的要求是加密表的个人标识列。为此,我编写了一个小代码,该代码分批选择数据并将其插入具有很少额外列的新表中。 ...
您的问题可能有多种来源:* MySQL表锁定死锁* MySQL用尽连接(检查MySQL日志)*由于EntityManager缓冲区过多而导致内存不足的情况* EntityManager中的死锁