下面是我的点对点Geode集群配置
定位符->
LocatorLauncher locatorLauncher = new LocatorLauncher.Builder()
.setMemberName("locator1")
.setPort(13489)
.build();
locatorLauncher.start();
locatorLauncher.waitOnLocator();
Peer1->
Properties properties = new Properties();
properties.setProperty("locators", "localhost[13489]");
properties.setProperty("mcast-address", "224.0.0.0");
properties.setProperty("mcast-port", "0");
properties.setProperty(NAME, "Member1");
CacheFactory cacheFactory = new CacheFactory(properties);
Cache cache = cacheFactory.create();
对等2->
Properties properties = new Properties();
properties.setProperty("locators", "localhost[13489]");
properties.setProperty("mcast-address", "224.0.0.0");
properties.setProperty("mcast-port", "0");
properties.setProperty(NAME, "Member12");
CacheFactory cacheFactory = new CacheFactory(properties);
Cache cache = cacheFactory.create();
然后我有一个分区区域
RegionFactory<String, Person> regionFactory = this.cache.createRegionFactory(RegionShortcut.PARTITION);
region = regionFactory.create("Person");
当我尝试在该区域中插入批量插入时
Map<String, Person> transactionData = new HashMap();
int id = Integer.parseInt(start);
for (int i = 0; i < 100; i++) {
Person person = createPerson("Agent" + (i + id), "");
transactionData.put(person.getFirstName(), person);
}
CacheTransactionManager cacheTransactionManager = cache.getCacheTransactionManager();
cacheTransactionManager.begin();
region.putAll(transactionData);
cacheTransactionManager.commit();
private Person createPerson(String firstname, String lastname) {
Person person = new Person(firstname, lastname);
IdentifierSequence.INSTANCE.setSequentialLongId(person);
return person;
}
我正在接受例外处理
org.apache.geode.cache.TransactionDataNotColocatedException: Key Agent142 is not colocated with transaction, caused by org.apache.geode.internal.cache.PrimaryBucketException: Bucket 94 is not primary. Current primary holder is 192.168.40.214(cacheServer2:20928)<v1>:41001
at org.apache.geode.internal.cache.TXStateProxyImpl.getTransactionException(TXStateProxyImpl.java:224)
at org.apache.geode.internal.cache.TXStateProxyImpl.putEntry(TXStateProxyImpl.java:612)
at org.apache.geode.internal.cache.LocalRegion.basicPut(LocalRegion.java:5119)
at org.apache.geode.internal.cache.TXState$1.run(TXState.java:2046)
at org.apache.geode.internal.cache.event.NonDistributedEventTracker.syncBulkOp(NonDistributedEventTracker.java:114)
at org.apache.geode.internal.cache.LocalRegion.syncBulkOp(LocalRegion.java:6117)
at org.apache.geode.internal.cache.TXState.postPutAll(TXState.java:2033)
at org.apache.geode.internal.cache.TXStateProxyImpl.postPutAll(TXStateProxyImpl.java:840)
at org.apache.geode.internal.cache.LocalRegion.basicPutAll(LocalRegion.java:9552)
at org.apache.geode.internal.cache.LocalRegion.putAll(LocalRegion.java:9300)
at org.apache.geode.internal.cache.LocalRegion.putAll(LocalRegion.java:9312)
at com.trendcore.cache.peertopeer.CacheInteractor.executeTransactions(CacheInteractor.java:185)
...Caused by: org.apache.geode.internal.cache.PrimaryBucketException: Bucket 94 is not primary. Current primary holder is 192.168.40.214(cacheServer2:20928)<v1>:41001
at org.apache.geode.internal.cache.PartitionedRegion.getDataRegionForWrite(PartitionedRegion.java:9404)
at org.apache.geode.internal.cache.PartitionedRegion.getDataRegionForWrite(PartitionedRegion.java:254)
at org.apache.geode.internal.cache.TXState.txReadEntry(TXState.java:1533)
at org.apache.geode.internal.cache.TXState.txWriteEntry(TXState.java:1314)
at org.apache.geode.internal.cache.TXState.txPutEntry(TXState.java:1368)
at org.apache.geode.internal.cache.TXState.putEntry(TXState.java:1711)
at org.apache.geode.internal.cache.TXStateProxyImpl.putEntry(TXStateProxyImpl.java:607)
... 23 more
问题:-
是否有更好的方法可以在事务内的分区区域中执行批量插入以及重新平衡数据。
我修改了我的地区属性,并尝试使用以下配置进行批量操作
PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory();
PartitionResolver resolver = new StandardPartitionResolver();
partitionAttributesFactory.setPartitionResolver(resolver);
PartitionAttributes partitionAttributes = partitionAttributesFactory.create();
region = regionFactory.setPartitionAttributes(partitionAttributes).create("Person");
StandardPartitionResolver->
public class StandardPartitionResolver implements PartitionResolver<String, Person> {
@Override
public Object getRoutingObject(EntryOperation<String, Person> opDetails) {
return "1";
}
@Override
public String getName() {
return getClass().getName();
}
}
使用PartitionResolver,按预期执行批量插入。但是当在下面的代码中执行以检查哪个分区保存哪些数据时
cache.getDistributedSystem().
getAllOtherMembers().
forEach(
distributedMember ->
System.out.println(distributedMember.getId() + " --- " + distributedMember.getName() + " --- " + cache.isServer())
);
所有数据都进入分区1(或分区2)。完成批量执行后,Apache Geode不会重新平衡分区。
我必须手动调用重新平衡操作吗?
Geode仅支持对同一存储桶中所有数据的事务。因此,在这种情况下,您的单个批量操作将转到多个存储桶,从而导致TransactionDataNotColocatedException。
创建StandardPartitionResolver时,将all数据放入单个存储桶中。存储桶无法拆分,因此所有数据将最终存储在一台服务器上。
您最好的选择是对要放入的不同存储桶使用单独的交易。这样可以确保将作为单个事务一部分的所有数据都放置在同一位置。