本项目代码地址:https://github.com/HashZhang/spring-cloud-scaffold/tree/master/spring-cloud-iiford我们使用 Spring Cloud 官方推荐的 Spring Cloud LoadBalancer 作为我们的客户端负载均衡器。上一节我们了解了 Spring Cloud LoadBalancer 的结构,接下来我们来说一下我们在使用 Spring Cloud LoadBalancer 要实现的功能:
metamap中的zone配置,来区分不同集群的实例。只有实例的metamap中的zone配置一样的实例才能互相调用。这个通过实现自定义的 ServiceInstanceListSupplier 即可实现RoundRobinLoadBalancer 是所有线程共用同一个原子变量 position 每次请求原子加 1。在这种情况下会有问题:假设有微服务 A 有两个实例:实例 1 和实例 2。请求 A 到达时,RoundRobinLoadBalancer 返回实例 1,这时有请求 B 到达,RoundRobinLoadBalancer 返回实例 2。然后如果请求 A 失败重试,RoundRobinLoadBalancer 又返回了实例 1。这不是我们期望看到的。LoadBalancerZoneConfig:
public class LoadBalancerZoneConfig {
//标识当前负载均衡器处于哪一个 zone
private String zone;
public LoadBalancerZoneConfig(String zone) {
this.zone = zone;
}
public String getZone() {
return zone;
}
public void setZone(String zone) {
this.zone = zone;
}
}
如果没有引入 Eureka 相关依赖,则这个 zone 通过 spring.cloud.loadbalancer.zone 配置:
LoadBalancerAutoConfiguration
@Bean
@ConditionalOnMissingBean
public LoadBalancerZoneConfig zoneConfig(Environment environment) {
return new LoadBalancerZoneConfig(environment.getProperty("spring.cloud.loadbalancer.zone"));
}
如果引入了 Eureka 相关依赖,则如果在 Eureka 元数据配置了 zone,则这个 zone 会覆盖 Spring Cloud LoadBalancer 中的 LoadBalancerZoneConfig:
EurekaLoadBalancerClientConfiguration
@PostConstruct
public void postprocess() {
if (!StringUtils.isEmpty(zoneConfig.getZone())) {
return;
}
String zone = getZoneFromEureka();
if (!StringUtils.isEmpty(zone)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Setting the value of '" + LOADBALANCER_ZONE + "' to " + zone);
}
//设置 `LoadBalancerZoneConfig`
zoneConfig.setZone(zone);
}
}
private String getZoneFromEureka() {
String zone;
//是否配置了 spring.cloud.loadbalancer.eureka.approximateZoneFromHostname 为 true
boolean approximateZoneFromHostname = eurekaLoadBalancerProperties.isApproximateZoneFromHostname();
//如果配置了,则尝试从 Eureka 配置的 host 名称中提取
//实际就是以 . 分割 host,然后第二个就是 zone
//例如 www.zone1.com 就是 zone1
if (approximateZoneFromHostname && eurekaConfig != null) {
return ZoneUtils.extractApproximateZone(this.eurekaConfig.getHostName(false));
}
else {
//否则,从 metadata map 中取 zone 这个 key
zone = eurekaConfig == null ? null : eurekaConfig.getMetadataMap().get("zone");
//如果这个 key 不存在,则从配置中以 region 从 zone 列表取第一个 zone 作为当前 zone
if (StringUtils.isEmpty(zone) && clientConfig != null) {
String[] zones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
// Pick the first one from the regions we want to connect to
zone = zones != null && zones.length > 0 ? zones[0] : null;
}
return zone;
}
}
SameZoneOnlyServiceInstanceListSupplier
/**
* 只返回与当前实例同一个 Zone 的服务实例,不同 zone 之间的服务不互相调用
*/
public class SameZoneOnlyServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier {
/**
* 实例元数据 map 中表示 zone 配置的 key
*/
private final String ZONE = "zone";
/**
* 当前 spring cloud loadbalancer 的 zone 配置
*/
private final LoadBalancerZoneConfig zoneConfig;
private String zone;
public SameZoneOnlyServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, LoadBalancerZoneConfig zoneConfig) {
super(delegate);
this.zoneConfig = zoneConfig;
}
@Override
public Flux<List<ServiceInstance>> get() {
return getDelegate().get().map(this::filteredByZone);
}
//通过 zoneConfig 过滤
private List<ServiceInstance> filteredByZone(List<ServiceInstance> serviceInstances) {
if (zone == null) {
zone = zoneConfig.getZone();
}
if (zone != null) {
List<ServiceInstance> filteredInstances = new ArrayList<>();
for (ServiceInstance serviceInstance : serviceInstances) {
String instanceZone = getZone(serviceInstance);
if (zone.equalsIgnoreCase(instanceZone)) {
filteredInstances.add(serviceInstance);
}
}
if (filteredInstances.size() > 0) {
return filteredInstances;
}
}
/**
* @see ZonePreferenceServiceInstanceListSupplier 在没有相同zone实例的时候返回的是所有实例
* 我们这里为了实现不同 zone 之间不互相调用需要返回空列表
*/
return List.of();
}
//读取实例的 zone,没有配置则为 null
private String getZone(ServiceInstance serviceInstance) {
Map<String, String> metadata = serviceInstance.getMetadata();
if (metadata != null) {
return metadata.get(ZONE);
}
return null;
}
}
RoundRobinWithRequestSeparatedPositionLoadBalancer
//一定必须是实现ReactorServiceInstanceLoadBalancer
//而不是ReactorLoadBalancer<ServiceInstance>
//因为注册的时候是ReactorServiceInstanceLoadBalancer
@Log4j2
public class RoundRobinWithRequestSeparatedPositionLoadBalancer implements ReactorServiceInstanceLoadBalancer {
private final ServiceInstanceListSupplier serviceInstanceListSupplier;
//每次请求算上重试不会超过1分钟
//对于超过1分钟的,这种请求肯定比较重,不应该重试
private final LoadingCache<Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES)
//随机初始值,防止每次都是从第一个开始调用
.build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000)));
private final String serviceId;
private final Tracer tracer;
public RoundRobinWithRequestSeparatedPositionLoadBalancer(ServiceInstanceListSupplier serviceInstanceListSupplier, String serviceId, Tracer tracer) {
this.serviceInstanceListSupplier = serviceInstanceListSupplier;
this.serviceId = serviceId;
this.tracer = tracer;
}
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
return serviceInstanceListSupplier.get().next().map(serviceInstances -> getInstanceResponse(serviceInstances));
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
if (serviceInstances.isEmpty()) {
log.warn("No servers available for service: " + this.serviceId);
return new EmptyResponse();
}
return getInstanceResponseByRoundRobin(serviceInstances);
}
private Response<ServiceInstance> getInstanceResponseByRoundRobin(List<ServiceInstance> serviceInstances) {
if (serviceInstances.isEmpty()) {
log.warn("No servers available for service: " + this.serviceId);
return new EmptyResponse();
}
//为了解决原始算法不同调用并发可能导致一个请求重试相同的实例
Span currentSpan = tracer.currentSpan();
if (currentSpan == null) {
currentSpan = tracer.newTrace();
}
long l = currentSpan.context().traceId();
AtomicInteger seed = positionCache.get(l);
int s = seed.getAndIncrement();
int pos = s % serviceInstances.size();
log.info("position {}, seed: {}, instances count: {}", pos, s, serviceInstances.size());
return new DefaultResponse(serviceInstances.stream()
//实例返回列表顺序可能不同,为了保持一致,先排序再取
.sorted(Comparator.comparing(ServiceInstance::getInstanceId))
.collect(Collectors.toList()).get(pos));
}
}
@LoadBalancerClients 注解配置默认的负载均衡器配置,我们这里就是通过这种方式进行配置。首先在 spring.factories 中添加自动配置类:
spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.github.hashjang.spring.cloud.iiford.service.common.auto.LoadBalancerAutoConfiguration
然后编写这个自动配置类,其实很简单,就是添加一个 @LoadBalancerClients 注解,设置默认配置类:
LoadBalancerAutoConfiguration
@Configuration(proxyBeanMethods = false)
@LoadBalancerClients(defaultConfiguration = DefaultLoadBalancerConfiguration.class)
public class LoadBalancerAutoConfiguration {
}
编写这个默认配置类,将上面我们实现的两个类,组装进去:
DefaultLoadBalancerConfiguration
@Configuration(proxyBeanMethods = false)
public class DefaultLoadBalancerConfiguration {
@Bean
public ServiceInstanceListSupplier serviceInstanceListSupplier(
DiscoveryClient discoveryClient,
Environment env,
ConfigurableApplicationContext context,
LoadBalancerZoneConfig zoneConfig
) {
ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context
.getBeanProvider(LoadBalancerCacheManager.class);
return //开启服务实例缓存
new CachingServiceInstanceListSupplier(
//只能返回同一个 zone 的服务实例
new SameZoneOnlyServiceInstanceListSupplier(
//启用通过 discoveryClient 的服务发现
new DiscoveryClientServiceInstanceListSupplier(
discoveryClient, env
),
zoneConfig
)
, cacheManagerProvider.getIfAvailable()
);
}
@Bean
public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(
Environment environment,
ServiceInstanceListSupplier serviceInstanceListSupplier,
Tracer tracer
) {
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new RoundRobinWithRequestSeparatedPositionLoadBalancer(
serviceInstanceListSupplier,
name,
tracer
);
}
}
这样,我们就实现了自定义的负载均衡器。也理解了 Spring Cloud LoadBalancer 的使用。接下来,我们来单元测试下这些功能。集成测试后面会有单独的章节,不用着急。
LoadBalancerTest
//SpringRunner也包含了MockitoJUnitRunner,所以 @Mock 等注解也生效了
@RunWith(SpringRunner.class)
@SpringBootTest(properties = {LoadBalancerEurekaAutoConfiguration.LOADBALANCER_ZONE + "=zone1"})
public class LoadBalancerTest {
@EnableAutoConfiguration(exclude = EurekaDiscoveryClientConfiguration.class)
@Configuration
public static class App {
@Bean
public DiscoveryClient discoveryClient() {
ServiceInstance zone1Instance1 = Mockito.mock(ServiceInstance.class);
ServiceInstance zone1Instance2 = Mockito.mock(ServiceInstance.class);
ServiceInstance zone2Instance3 = Mockito.mock(ServiceInstance.class);
Map<String, String> zone1 = Map.ofEntries(
Map.entry("zone", "zone1")
);
Map<String, String> zone2 = Map.ofEntries(
Map.entry("zone", "zone2")
);
when(zone1Instance1.getMetadata()).thenReturn(zone1);
when(zone1Instance1.getInstanceId()).thenReturn("instance1");
when(zone1Instance2.getMetadata()).thenReturn(zone1);
when(zone1Instance2.getInstanceId()).thenReturn("instance2");
when(zone2Instance3.getMetadata()).thenReturn(zone2);
when(zone2Instance3.getInstanceId()).thenReturn("instance3");
DiscoveryClient mock = Mockito.mock(DiscoveryClient.class);
Mockito.when(mock.getInstances("testService"))
.thenReturn(List.of(zone1Instance1, zone1Instance2, zone2Instance3));
return mock;
}
}
@Autowired
private LoadBalancerClientFactory loadBalancerClientFactory;
@Autowired
private Tracer tracer;
/**
* 只返回同一个 zone 下的实例
*/
@Test
public void testFilteredByZone() {
ReactiveLoadBalancer<ServiceInstance> testService =
loadBalancerClientFactory.getInstance("testService");
for (int i = 0; i < 100; i++) {
ServiceInstance server = Mono.from(testService.choose()).block().getServer();
//必须处于和当前实例同一个zone下
Assert.assertEquals(server.getMetadata().get("zone"), "zone1");
}
}
/**
* 返回不同的实例
*/
@Test
public void testReturnNext() {
ReactiveLoadBalancer<ServiceInstance> testService =
loadBalancerClientFactory.getInstance("testService");
//获取服务实例
ServiceInstance server1 = Mono.from(testService.choose()).block().getServer();
ServiceInstance server2 = Mono.from(testService.choose()).block().getServer();
//每次选择的是不同实例
Assert.assertNotEquals(server1.getInstanceId(), server2.getInstanceId());
}
/**
* 跨线程,默认情况下是可能返回同一实例的,在我们的实现下,保持
* span 则会返回下一个实例,这样保证多线程环境同一个 request 重试会返回下一实例
* @throws Exception
*/
@Test
public void testSameSpanReturnNext() throws Exception {
Span span = tracer.nextSpan();
//测试 100 次
for (int i = 0; i < 100; i++) {
try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
ReactiveLoadBalancer<ServiceInstance> testService =
loadBalancerClientFactory.getInstance("testService");
//获取实例
ServiceInstance server1 = Mono.from(testService.choose()).block().getServer();
AtomicReference<ServiceInstance> server2 = new AtomicReference<>();
Thread thread = new Thread(() -> {
//保持 trace,这样就会认为仍然是同一个请求上下文,这样模拟重试
try (Tracer.SpanInScope cleared2 = tracer.withSpanInScope(span)) {
server2.set(Mono.from(testService.choose()).block().getServer());
}
});
thread.start();
thread.join();
System.out.println(i);
Assert.assertNotEquals(server1.getInstanceId(), server2.get().getInstanceId());
}
}
}
}
运行测试,测试通过。
微信搜索“我的编程喵”关注公众号,加作者微信,每日一刷,轻松提升技术,斩获各种offer:
我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div
我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看rubyzip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d
类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc
很好奇,就使用rubyonrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提
假设我做了一个模块如下:m=Module.newdoclassCendend三个问题:除了对m的引用之外,还有什么方法可以访问C和m中的其他内容?我可以在创建匿名模块后为其命名吗(就像我输入“module...”一样)?如何在使用完匿名模块后将其删除,使其定义的常量不再存在? 最佳答案 三个答案:是的,使用ObjectSpace.此代码使c引用你的类(class)C不引用m:c=nilObjectSpace.each_object{|obj|c=objif(Class===objandobj.name=~/::C$/)}当然这取决于
我正在尝试使用ruby和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h
我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po