我目前正在使用 Spring v2 中的
EnableBinding
、StreamListener
: https://www.javadoc.io/doc/org.springframework.cloud/spring-cloud-stream/2.0.0.RELEASE/org/ springframework/cloud/stream/annotation/package-summary.html
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import com.enterprise.production.model.exceptions.ProductionStoreException;
import com.enterprise.production.model.message.EnergyProductionMessage;
import com.enterprise.production.stream.service.ProductionStoreService;
import lombok.extern.slf4j.Slf4j;
@EnableBinding(Sink.class)
@Slf4j
@Service
public class ProductionMessageConsumer {
@Autowired
private ProductionStoreService productionService;
@Autowired
private Clock clock;
@StreamListener(target = Sink.INPUT)
public void handleEnergyProductionMessage(@Payload EnergyProductionMessage energyProductionMessage) throws ProductionStoreException {
Instant start = clock.instant();
log.debug("Processing energy productions message with original interval: {}|{}|{}", energyProductionMessage.getTenantId(), energyProductionMessage.getUsername(),
energyProductionMessage.getDeviceId());
log.info("Processing {} energy productions ", energyProductionMessage.getSolarEnergies().size());
productionService.saveProductions(energyProductionMessage);
log.debug("Ending energy productions message with original interval: {}|{}|{}: ended in {}ms", energyProductionMessage.getTenantId(),
energyProductionMessage.getUsername(), energyProductionMessage.getDeviceId(), Duration
.between(start, clock.instant()).toMillis());
Instant startNormalization = clock.instant();
log.debug("Processing energy productions message with normalization 30m: {}|{}|{}", energyProductionMessage.getTenantId(), energyProductionMessage.getUsername(),
energyProductionMessage.getDeviceId());
productionService.saveProductions30m(energyProductionMessage);
log.debug("Ending energy productions message with normalization 30m: {}|{}|{}: ended in {}ms", energyProductionMessage.getTenantId(),
energyProductionMessage.getUsername(), energyProductionMessage.getDeviceId(), Duration
.between(startNormalization, clock.instant()).toMillis());
}
@StreamListener("errorChannel")
public void error(Message<?> message) {
log.error("Fail to read message with error '{}'", message.getPayload());
}
}
我需要迁移到 Spring v4,但这些注释在 Spring v4 中不可用:https://www.javadoc.io/doc/org.springframework.cloud/spring-cloud-stream/4.0.0/org/springframework /cloud/stream/annotation/package-summary.html
有人知道如何将这些注释从 Spring v2 迁移到 Spring v4 吗?
这些注释已被折旧。您可以使用 Consumer 来定义它
@Service
public class ProductionMessageConsumer {
@Autowired
private ProductionStoreService productionService;
@Bean
public Consumer<EnergyProductionMessage> handleEnergyProductionMessage() {
retrun energyProductionMessage -> {
...
productionService.saveProductions(energyProductionMessage);
....
}
}
}