分布式缓存l2上使用infinispan和hibernate的java连接锁
我使用infinispan作为分布式hibernate缓存L2,在AWS上使用jgroups进行配置。 但是,在以下场景中,我在重载时面临一个问题:
- 最初有2个EC2实例可用
- 负荷急剧增加
- 负载平衡启动4个EC2实例
- 部分减载
- 负载平衡减少2个EC2实例
- 剩余的所有实例开始保持数据库连接(Hikari池),直到没有可用的连接为止,执行以下所有请求都会由于等待空闲连接超时而返回错误李>
其余实例尝试与旧实例通信,但没有得到响应,在等待响应时保持连接
所有实体都使用读写策略
Infinispan配置: org/infinispan/hibernate/cache/commons/builder/infinispan配置。xml
区域。工厂类别:org。英菲尼斯潘。冬眠隐藏物平民英菲尼斯潘地区工厂
以下Jgroups配置编辑自:org/infinispan/infinispan-core/9.2.0。最终/infinispan-core-9.2.0。最终的jar/default配置/default jgroups tcp。xml
<config xmlns="urn:org:jgroups"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups-4.0.xsd">
<TCP bind_port="7800"
enable_diagnostics="false"
thread_naming_pattern="pl"
send_buf_size="640k"
sock_conn_timeout="300"
bundler_type="no-bundler"
thread_pool.min_threads="${jgroups.thread_pool.min_threads:50}"
thread_pool.max_threads="${jgroups.thread_pool.max_threads:500}"
thread_pool.keep_alive_time="30000"
/>
<AWS_ELB_PING
region="sa-east-1"
load_balancers_names="elb-name"
/>
<MERGE3 min_interval="5000"
max_interval="30000"
/>
<FD_SOCK />
<FD_ALL timeout="9000"
interval="3000"
timeout_check_interval="1000"
/>
<VERIFY_SUSPECT timeout="5000" />
<pbcast.NAKACK2 use_mcast_xmit="false"
xmit_interval="100"
xmit_table_num_rows="50"
xmit_table_msgs_per_row="1024"
xmit_table_max_compaction_time="30000"
resend_last_seqno="true"
/>
<UNICAST3 xmit_interval="100"
xmit_table_num_rows="50"
xmit_table_msgs_per_row="1024"
xmit_table_max_compaction_time="30000"
conn_expiry_timeout="0"
/>
<pbcast.STABLE stability_delay="500"
desired_avg_gossip="5000"
max_bytes="1M"
/>
<pbcast.GMS print_local_addr="false"
install_view_locally_first="true"
join_timeout="${jgroups.join_timeout:5000}"
/>
<MFC max_credits="2m"
min_threshold="0.40"
/>
<FRAG3/>
</config>
AWS\u ELB\u PING:该类是发现类的实现,其中使用AWS ELB api发现所有可用的IP
我从下面的代码中删除了日志和一些样板代码:
public class AWS_ELB_PING extends Discovery {
private static final String LIST_ELEMENT_SEPARATOR = ",";
static {
ClassConfigurator.addProtocol((short) 790, AWS_ELB_PING.class); // id must be unique
}
private String region;
private String load_balancers_names;
private int bind_port = 7800;
private AmazonElasticLoadBalancing amazonELBClient;
private AmazonEC2 amazonEC2Client;
private List<String> getLoadBalancersNamesList() {
return Arrays.asList(Optional.ofNullable(load_balancers_names).orElse("").split(LIST_ELEMENT_SEPARATOR));
}
@Override
public void init() throws Exception {
super.init();
DefaultAWSCredentialsProviderChain awsCredentialsProviderChain = DefaultAWSCredentialsProviderChain.getInstance();
amazonELBClient = AmazonElasticLoadBalancingClientBuilder.standard()
.withRegion(region)
.withCredentials(awsCredentialsProviderChain)
.build();
amazonEC2Client = AmazonEC2ClientBuilder.standard()
.withRegion(region)
.withCredentials(awsCredentialsProviderChain)
.build();
}
@Override
public void discoveryRequestReceived(final Address sender, final String logical_name,
final PhysicalAddress physical_addr) {
super.discoveryRequestReceived(sender, logical_name, physical_addr);
}
@Override
public void findMembers(final List<Address> members, final boolean initialDiscovery, final Responses responses) {
PhysicalAddress physicalAddress = null;
PingData data = null;
if (!use_ip_addrs || !initialDiscovery) {
physicalAddress = (PhysicalAddress) super.down(new Event(Event.GET_PHYSICAL_ADDRESS, local_addr));
data = new PingData(local_addr, false, NameCache.get(local_addr), physicalAddress);
if (members != null && members.size() <= max_members_in_discovery_request) {
data.mbrs(members);
}
}
sendDiscoveryRequests(physicalAddress, data, initialDiscovery, getLoadBalancersInstances());
}
private Set<Instance> getLoadBalancersInstances() {
final List<String> loadBalancerNames = getLoadBalancersNamesList();
final List<LoadBalancerDescription> loadBalancerDescriptions = amazonELBClient
.describeLoadBalancers(new DescribeLoadBalancersRequest().withLoadBalancerNames(loadBalancerNames))
.getLoadBalancerDescriptions();
checkLoadBalancersExists(loadBalancerNames, loadBalancerDescriptions);
final List<String> instanceIds = loadBalancerDescriptions.stream()
.flatMap(loadBalancer -> loadBalancer.getInstances().stream())
.map(instance -> instance.getInstanceId())
.collect(toList());
return amazonEC2Client.describeInstances(new DescribeInstancesRequest().withInstanceIds(instanceIds))
.getReservations()
.stream()
.map(Reservation::getInstances)
.flatMap(List::stream)
.collect(Collectors.toSet());
}
private void checkLoadBalancersExists(final List<String> loadBalancerNames,
final List<LoadBalancerDescription> loadBalancerDescriptions) {
final Set<String> difference = Sets.difference(new HashSet<>(loadBalancerNames),
loadBalancerDescriptions
.stream()
.map(LoadBalancerDescription::getLoadBalancerName)
.collect(Collectors.toSet()));
}
private PhysicalAddress toPhysicalAddress(final Instance instance) {
try {
return new IpAddress(instance.getPrivateIpAddress(), bind_port);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
private void sendDiscoveryRequests(@Nullable final PhysicalAddress localAddress, @Nullable final PingData data,
final boolean initialDiscovery, final Set<Instance> instances) {
final PingHeader header = new PingHeader(PingHeader.GET_MBRS_REQ)
.clusterName(cluster_name)
.initialDiscovery(initialDiscovery);
instances.stream()
.map(this::toPhysicalAddress)
.filter(physicalAddress -> !physicalAddress.equals(localAddress))
.forEach(physicalAddress -> sendDiscoveryRequest(data, header, physicalAddress));
}
private void sendDiscoveryRequest(@Nullable final PingData data, final PingHeader header,
final PhysicalAddress destinationAddress) {
final Message message = new Message(destinationAddress)
.setFlag(Message.Flag.INTERNAL, Message.Flag.DONT_BUNDLE, Message.Flag.OOB)
.putHeader(this.id, header);
if (data != null) {
message.setBuffer(marshal(data));
}
if (async_discovery_use_separate_thread_per_request) {
timer.execute(() -> sendDiscoveryRequest(message), sends_can_block);
} else {
sendDiscoveryRequest(message);
}
}
protected void sendDiscoveryRequest(final Message message) {
try {
super.down(message);
} catch (final Throwable t) {
}
}
@Override
public boolean isDynamic() {
return true;
}
@Override
public void stop() {
try {
if (amazonEC2Client != null) {
amazonEC2Client.shutdown();
}
if (amazonELBClient != null) {
amazonELBClient.shutdown();
}
} catch (final Exception e) {
} finally {
super.stop();
}
}
}
有人已经面临这样的问题了
# 1 楼答案
你的问题是什么;您已经描述了您的设置,但没有描述您的问题。。。 你有AWS_-ELP_-PING的参考号吗
# 2 楼答案
虽然您可能希望扩展现有的发现代码,例如tcping或FILE_-PING,甚至本机_-S3_-PING,但代码在我看来很好
你说的“游泳池耗尽”是什么意思?这是维护连接池的AWS客户端吗?还是说TCP中的线程池?后者有
bindler_type=no-bundler
;尝试删除此项(然后将使用transfer-queue-bundler
,这将创建消息批,而不是逐个发送消息)如果您有一个耗尽的TCP线程池,那么获取堆栈跟踪将是一件有趣的事情,以查看现有线程在哪些线程上被阻塞