SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

SpringCloud 源码系列(1)—— 注册中心 Eureka(上)

SpringCloud 源码系列(2)—— 注册中心 Eureka(中)

SpringCloud 源码系列(3)—— 注册中心 Eureka(下)

SpringCloud 源码系列(4)—— 负载均衡 Ribbon(上)

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

SpringCloud 源码系列(6)—— 声明式服务调用 Feign

 

五、Ribbon 核心接口

前面已经了解到 Ribbon 核心接口以及默认实现如何协作来查找要调用的一个实例,这节再来看下各个核心接口的一些特性及其它实现类。

1、客户端配置 — IClientConfig

IClientConfig 就是管理客户端配置的核心接口,它的默认实现类是 DefaultClientConfigImpl。可以看到在创建 IClientConfig 时,设置了 Ribbon 客户端默认的连接和读取超时时间为 1 秒,例如读取如果超过1秒,就会返回超时,这两个一般需要根据实际情况来调整。、

 1 @Bean
 2 @ConditionalOnMissingBean
 3 public IClientConfig ribbonClientConfig() {
 4     DefaultClientConfigImpl config = new DefaultClientConfigImpl();
 5     // 加载配置
 6     config.loadProperties(this.name);
 7     // 连接超时默认 1 秒
 8     config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
 9     // 读取超时默认 1 秒
10     config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
11     config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
12     return config;
13 }

CommonClientConfigKey 这个类定义了 Ribbon 客户端相关的所有配置的键常量,可以通过这个类来看有哪些配置。

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

  1 public abstract class CommonClientConfigKey<T> implements IClientConfigKey<T> {
  2 
  3     public static final IClientConfigKey<String> AppName = new CommonClientConfigKey<String>("AppName"){};
  4 
  5     public static final IClientConfigKey<String> Version = new CommonClientConfigKey<String>("Version"){};
  6 
  7     public static final IClientConfigKey<Integer> Port = new CommonClientConfigKey<Integer>("Port"){};
  8 
  9     public static final IClientConfigKey<Integer> SecurePort = new CommonClientConfigKey<Integer>("SecurePort"){};
 10 
 11     public static final IClientConfigKey<String> VipAddress = new CommonClientConfigKey<String>("VipAddress"){};
 12 
 13     public static final IClientConfigKey<Boolean> ForceClientPortConfiguration = new CommonClientConfigKey<Boolean>("ForceClientPortConfiguration"){}; // use client defined port regardless of server advert
 14 
 15     public static final IClientConfigKey<String> DeploymentContextBasedVipAddresses = new CommonClientConfigKey<String>("DeploymentContextBasedVipAddresses"){};
 16 
 17     public static final IClientConfigKey<Integer> MaxAutoRetries = new CommonClientConfigKey<Integer>("MaxAutoRetries"){};
 18 
 19     public static final IClientConfigKey<Integer> MaxAutoRetriesNextServer = new CommonClientConfigKey<Integer>("MaxAutoRetriesNextServer"){};
 20 
 21     public static final IClientConfigKey<Boolean> OkToRetryOnAllOperations = new CommonClientConfigKey<Boolean>("OkToRetryOnAllOperations"){};
 22 
 23     public static final IClientConfigKey<Boolean> RequestSpecificRetryOn = new CommonClientConfigKey<Boolean>("RequestSpecificRetryOn"){};
 24 
 25     public static final IClientConfigKey<Integer> ReceiveBufferSize = new CommonClientConfigKey<Integer>("ReceiveBufferSize"){};
 26 
 27     public static final IClientConfigKey<Boolean> EnablePrimeConnections = new CommonClientConfigKey<Boolean>("EnablePrimeConnections"){};
 28 
 29     public static final IClientConfigKey<String> PrimeConnectionsClassName = new CommonClientConfigKey<String>("PrimeConnectionsClassName"){};
 30 
 31     public static final IClientConfigKey<Integer> MaxRetriesPerServerPrimeConnection = new CommonClientConfigKey<Integer>("MaxRetriesPerServerPrimeConnection"){};
 32 
 33     public static final IClientConfigKey<Integer> MaxTotalTimeToPrimeConnections = new CommonClientConfigKey<Integer>("MaxTotalTimeToPrimeConnections"){};
 34 
 35     public static final IClientConfigKey<Float> MinPrimeConnectionsRatio = new CommonClientConfigKey<Float>("MinPrimeConnectionsRatio"){};
 36 
 37     public static final IClientConfigKey<String> PrimeConnectionsURI = new CommonClientConfigKey<String>("PrimeConnectionsURI"){};
 38 
 39     public static final IClientConfigKey<Integer> PoolMaxThreads = new CommonClientConfigKey<Integer>("PoolMaxThreads"){};
 40 
 41     public static final IClientConfigKey<Integer> PoolMinThreads = new CommonClientConfigKey<Integer>("PoolMinThreads"){};
 42 
 43     public static final IClientConfigKey<Integer> PoolKeepAliveTime = new CommonClientConfigKey<Integer>("PoolKeepAliveTime"){};
 44 
 45     public static final IClientConfigKey<String> PoolKeepAliveTimeUnits = new CommonClientConfigKey<String>("PoolKeepAliveTimeUnits"){};
 46 
 47     public static final IClientConfigKey<Boolean> EnableConnectionPool = new CommonClientConfigKey<Boolean>("EnableConnectionPool") {};
 48 
 49     /**
 50      * Use {@link #MaxConnectionsPerHost}
 51      */
 52     @Deprecated
 53     public static final IClientConfigKey<Integer> MaxHttpConnectionsPerHost = new CommonClientConfigKey<Integer>("MaxHttpConnectionsPerHost"){};
 54 
 55     /**
 56      * Use {@link #MaxTotalConnections}
 57      */
 58     @Deprecated
 59     public static final IClientConfigKey<Integer> MaxTotalHttpConnections = new CommonClientConfigKey<Integer>("MaxTotalHttpConnections"){};
 60 
 61     public static final IClientConfigKey<Integer> MaxConnectionsPerHost = new CommonClientConfigKey<Integer>("MaxConnectionsPerHost"){};
 62 
 63     public static final IClientConfigKey<Integer> MaxTotalConnections = new CommonClientConfigKey<Integer>("MaxTotalConnections"){};
 64 
 65     public static final IClientConfigKey<Boolean> IsSecure = new CommonClientConfigKey<Boolean>("IsSecure"){};
 66 
 67     public static final IClientConfigKey<Boolean> GZipPayload = new CommonClientConfigKey<Boolean>("GZipPayload"){};
 68 
 69     public static final IClientConfigKey<Integer> ConnectTimeout = new CommonClientConfigKey<Integer>("ConnectTimeout"){};
 70 
 71     public static final IClientConfigKey<Integer> BackoffInterval = new CommonClientConfigKey<Integer>("BackoffTimeout"){};
 72 
 73     public static final IClientConfigKey<Integer> ReadTimeout = new CommonClientConfigKey<Integer>("ReadTimeout"){};
 74 
 75     public static final IClientConfigKey<Integer> SendBufferSize = new CommonClientConfigKey<Integer>("SendBufferSize"){};
 76 
 77     public static final IClientConfigKey<Boolean> StaleCheckingEnabled = new CommonClientConfigKey<Boolean>("StaleCheckingEnabled"){};
 78 
 79     public static final IClientConfigKey<Integer> Linger = new CommonClientConfigKey<Integer>("Linger"){};
 80 
 81     public static final IClientConfigKey<Integer> ConnectionManagerTimeout = new CommonClientConfigKey<Integer>("ConnectionManagerTimeout"){};
 82 
 83     public static final IClientConfigKey<Boolean> FollowRedirects = new CommonClientConfigKey<Boolean>("FollowRedirects"){};
 84 
 85     public static final IClientConfigKey<Boolean> ConnectionPoolCleanerTaskEnabled = new CommonClientConfigKey<Boolean>("ConnectionPoolCleanerTaskEnabled"){};
 86 
 87     public static final IClientConfigKey<Integer> ConnIdleEvictTimeMilliSeconds = new CommonClientConfigKey<Integer>("ConnIdleEvictTimeMilliSeconds"){};
 88 
 89     public static final IClientConfigKey<Integer> ConnectionCleanerRepeatInterval = new CommonClientConfigKey<Integer>("ConnectionCleanerRepeatInterval"){};
 90 
 91     public static final IClientConfigKey<Boolean> EnableGZIPContentEncodingFilter = new CommonClientConfigKey<Boolean>("EnableGZIPContentEncodingFilter"){};
 92 
 93     public static final IClientConfigKey<String> ProxyHost = new CommonClientConfigKey<String>("ProxyHost"){};
 94 
 95     public static final IClientConfigKey<Integer> ProxyPort = new CommonClientConfigKey<Integer>("ProxyPort"){};
 96 
 97     public static final IClientConfigKey<String> KeyStore = new CommonClientConfigKey<String>("KeyStore"){};
 98 
 99     public static final IClientConfigKey<String> KeyStorePassword = new CommonClientConfigKey<String>("KeyStorePassword"){};
100 
101     public static final IClientConfigKey<String> TrustStore = new CommonClientConfigKey<String>("TrustStore"){};
102 
103     public static final IClientConfigKey<String> TrustStorePassword = new CommonClientConfigKey<String>("TrustStorePassword"){};
104 
105     // if this is a secure rest client, must we use client auth too?
106     public static final IClientConfigKey<Boolean> IsClientAuthRequired = new CommonClientConfigKey<Boolean>("IsClientAuthRequired"){};
107 
108     public static final IClientConfigKey<String> CustomSSLSocketFactoryClassName = new CommonClientConfigKey<String>("CustomSSLSocketFactoryClassName"){};
109      // must host name match name in certificate?
110     public static final IClientConfigKey<Boolean> IsHostnameValidationRequired = new CommonClientConfigKey<Boolean>("IsHostnameValidationRequired"){};
111 
112     // see also http://hc.apache.org/httpcomponents-client-ga/tutorial/html/advanced.html
113     public static final IClientConfigKey<Boolean> IgnoreUserTokenInConnectionPoolForSecureClient = new CommonClientConfigKey<Boolean>("IgnoreUserTokenInConnectionPoolForSecureClient"){};
114 
115     // Client implementation
116     public static final IClientConfigKey<String> ClientClassName = new CommonClientConfigKey<String>("ClientClassName"){};
117 
118     //LoadBalancer Related
119     public static final IClientConfigKey<Boolean> InitializeNFLoadBalancer = new CommonClientConfigKey<Boolean>("InitializeNFLoadBalancer"){};
120 
121     public static final IClientConfigKey<String> NFLoadBalancerClassName = new CommonClientConfigKey<String>("NFLoadBalancerClassName"){};
122 
123     public static final IClientConfigKey<String> NFLoadBalancerRuleClassName = new CommonClientConfigKey<String>("NFLoadBalancerRuleClassName"){};
124 
125     public static final IClientConfigKey<String> NFLoadBalancerPingClassName = new CommonClientConfigKey<String>("NFLoadBalancerPingClassName"){};
126 
127     public static final IClientConfigKey<Integer> NFLoadBalancerPingInterval = new CommonClientConfigKey<Integer>("NFLoadBalancerPingInterval"){};
128 
129     public static final IClientConfigKey<Integer> NFLoadBalancerMaxTotalPingTime = new CommonClientConfigKey<Integer>("NFLoadBalancerMaxTotalPingTime"){};
130 
131     public static final IClientConfigKey<String> NFLoadBalancerStatsClassName = new CommonClientConfigKey<String>("NFLoadBalancerStatsClassName"){};
132 
133     public static final IClientConfigKey<String> NIWSServerListClassName = new CommonClientConfigKey<String>("NIWSServerListClassName"){};
134 
135     public static final IClientConfigKey<String> ServerListUpdaterClassName = new CommonClientConfigKey<String>("ServerListUpdaterClassName"){};
136 
137     public static final IClientConfigKey<String> NIWSServerListFilterClassName = new CommonClientConfigKey<String>("NIWSServerListFilterClassName"){};
138 
139     public static final IClientConfigKey<Integer> ServerListRefreshInterval = new CommonClientConfigKey<Integer>("ServerListRefreshInterval"){};
140 
141     public static final IClientConfigKey<Boolean> EnableMarkingServerDownOnReachingFailureLimit = new CommonClientConfigKey<Boolean>("EnableMarkingServerDownOnReachingFailureLimit"){};
142 
143     public static final IClientConfigKey<Integer> ServerDownFailureLimit = new CommonClientConfigKey<Integer>("ServerDownFailureLimit"){};
144 
145     public static final IClientConfigKey<Integer> ServerDownStatWindowInMillis = new CommonClientConfigKey<Integer>("ServerDownStatWindowInMillis"){};
146 
147     public static final IClientConfigKey<Boolean> EnableZoneAffinity = new CommonClientConfigKey<Boolean>("EnableZoneAffinity"){};
148 
149     public static final IClientConfigKey<Boolean> EnableZoneExclusivity = new CommonClientConfigKey<Boolean>("EnableZoneExclusivity"){};
150 
151     public static final IClientConfigKey<Boolean> PrioritizeVipAddressBasedServers = new CommonClientConfigKey<Boolean>("PrioritizeVipAddressBasedServers"){};
152 
153     public static final IClientConfigKey<String> VipAddressResolverClassName = new CommonClientConfigKey<String>("VipAddressResolverClassName"){};
154 
155     public static final IClientConfigKey<String> TargetRegion = new CommonClientConfigKey<String>("TargetRegion"){};
156 
157     public static final IClientConfigKey<String> RulePredicateClasses = new CommonClientConfigKey<String>("RulePredicateClasses"){};
158 
159     public static final IClientConfigKey<String> RequestIdHeaderName = new CommonClientConfigKey<String>("RequestIdHeaderName") {};
160 
161     public static final IClientConfigKey<Boolean> UseIPAddrForServer = new CommonClientConfigKey<Boolean>("UseIPAddrForServer") {};
162 
163     public static final IClientConfigKey<String> ListOfServers = new CommonClientConfigKey<String>("listOfServers") {};
164 
165     private static final Set<IClientConfigKey> keys = new HashSet<IClientConfigKey>();
166 
167     // ...
168 }

View Code

进入到 DefaultClientConfigImpl,可以看到 CommonClientConfigKey 中的每个配置都对应了一个默认值。在加载配置的时候,如果用户没有定制配置,就会使用默认的配置。

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

  1 public class DefaultClientConfigImpl implements IClientConfig {
  2 
  3     public static final Boolean DEFAULT_PRIORITIZE_VIP_ADDRESS_BASED_SERVERS = Boolean.TRUE;
  4 
  5     public static final String DEFAULT_NFLOADBALANCER_PING_CLASSNAME = "com.netflix.loadbalancer.DummyPing"; // DummyPing.class.getName();
  6 
  7     public static final String DEFAULT_NFLOADBALANCER_RULE_CLASSNAME = "com.netflix.loadbalancer.AvailabilityFilteringRule";
  8 
  9     public static final String DEFAULT_NFLOADBALANCER_CLASSNAME = "com.netflix.loadbalancer.ZoneAwareLoadBalancer";
 10 
 11     public static final boolean DEFAULT_USEIPADDRESS_FOR_SERVER = Boolean.FALSE;
 12 
 13     public static final String DEFAULT_CLIENT_CLASSNAME = "com.netflix.niws.client.http.RestClient";
 14 
 15     public static final String DEFAULT_VIPADDRESS_RESOLVER_CLASSNAME = "com.netflix.client.SimpleVipAddressResolver";
 16 
 17     public static final String DEFAULT_PRIME_CONNECTIONS_URI = "/";
 18 
 19     public static final int DEFAULT_MAX_TOTAL_TIME_TO_PRIME_CONNECTIONS = 30000;
 20 
 21     public static final int DEFAULT_MAX_RETRIES_PER_SERVER_PRIME_CONNECTION = 9;
 22 
 23     public static final Boolean DEFAULT_ENABLE_PRIME_CONNECTIONS = Boolean.FALSE;
 24 
 25     public static final int DEFAULT_MAX_REQUESTS_ALLOWED_PER_WINDOW = Integer.MAX_VALUE;
 26 
 27     public static final int DEFAULT_REQUEST_THROTTLING_WINDOW_IN_MILLIS = 60000;
 28 
 29     public static final Boolean DEFAULT_ENABLE_REQUEST_THROTTLING = Boolean.FALSE;
 30 
 31     public static final Boolean DEFAULT_ENABLE_GZIP_CONTENT_ENCODING_FILTER = Boolean.FALSE;
 32 
 33     public static final Boolean DEFAULT_CONNECTION_POOL_CLEANER_TASK_ENABLED = Boolean.TRUE;
 34 
 35     public static final Boolean DEFAULT_FOLLOW_REDIRECTS = Boolean.FALSE;
 36 
 37     public static final float DEFAULT_PERCENTAGE_NIWS_EVENT_LOGGED = 0.0f;
 38 
 39     public static final int DEFAULT_MAX_AUTO_RETRIES_NEXT_SERVER = 1;
 40 
 41     public static final int DEFAULT_MAX_AUTO_RETRIES = 0;
 42 
 43     public static final int DEFAULT_BACKOFF_INTERVAL = 0;
 44 
 45     public static final int DEFAULT_READ_TIMEOUT = 5000;
 46 
 47     public static final int DEFAULT_CONNECTION_MANAGER_TIMEOUT = 2000;
 48 
 49     public static final int DEFAULT_CONNECT_TIMEOUT = 2000;
 50 
 51     public static final Boolean DEFAULT_ENABLE_CONNECTION_POOL = Boolean.TRUE;
 52 
 53     @Deprecated
 54     public static final int DEFAULT_MAX_HTTP_CONNECTIONS_PER_HOST = 50;
 55 
 56     @Deprecated
 57     public static final int DEFAULT_MAX_TOTAL_HTTP_CONNECTIONS = 200;
 58 
 59     public static final int DEFAULT_MAX_CONNECTIONS_PER_HOST = 50;
 60 
 61     public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 200;
 62 
 63     public static final float DEFAULT_MIN_PRIME_CONNECTIONS_RATIO = 1.0f;
 64 
 65     public static final String DEFAULT_PRIME_CONNECTIONS_CLASS = "com.netflix.niws.client.http.HttpPrimeConnection";
 66 
 67     public static final String DEFAULT_SEVER_LIST_CLASS = "com.netflix.loadbalancer.ConfigurationBasedServerList";
 68 
 69     public static final String DEFAULT_SERVER_LIST_UPDATER_CLASS = "com.netflix.loadbalancer.PollingServerListUpdater";
 70 
 71     public static final int DEFAULT_CONNECTION_IDLE_TIMERTASK_REPEAT_IN_MSECS = 30000; // every half minute (30 secs)
 72 
 73     public static final int DEFAULT_CONNECTIONIDLE_TIME_IN_MSECS = 30000; // all connections idle for 30 secs
 74 
 75     protected volatile Map<String, Object> properties = new ConcurrentHashMap<String, Object>();
 76 
 77     protected Map<IClientConfigKey<?>, Object> typedProperties = new ConcurrentHashMap<IClientConfigKey<?>, Object>();
 78 
 79     private static final Logger LOG = LoggerFactory.getLogger(DefaultClientConfigImpl.class);
 80 
 81     private String clientName = null;
 82 
 83     private VipAddressResolver resolver = null;
 84 
 85     private boolean enableDynamicProperties = true;
 86     /**
 87      * Defaults for the parameters for the thread pool used by batchParallel
 88      * calls
 89      */
 90     public static final int DEFAULT_POOL_MAX_THREADS = DEFAULT_MAX_TOTAL_HTTP_CONNECTIONS;
 91     public static final int DEFAULT_POOL_MIN_THREADS = 1;
 92     public static final long DEFAULT_POOL_KEEP_ALIVE_TIME = 15 * 60L;
 93     public static final TimeUnit DEFAULT_POOL_KEEP_ALIVE_TIME_UNITS = TimeUnit.SECONDS;
 94     public static final Boolean DEFAULT_ENABLE_ZONE_AFFINITY = Boolean.FALSE;
 95     public static final Boolean DEFAULT_ENABLE_ZONE_EXCLUSIVITY = Boolean.FALSE;
 96     public static final int DEFAULT_PORT = 7001;
 97     public static final Boolean DEFAULT_ENABLE_LOADBALANCER = Boolean.TRUE;
 98 
 99     public static final String DEFAULT_PROPERTY_NAME_SPACE = "ribbon";
100 
101     private String propertyNameSpace = DEFAULT_PROPERTY_NAME_SPACE;
102 
103     public static final Boolean DEFAULT_OK_TO_RETRY_ON_ALL_OPERATIONS = Boolean.FALSE;
104 
105     public static final Boolean DEFAULT_ENABLE_NIWS_EVENT_LOGGING = Boolean.TRUE;
106 
107     public static final Boolean DEFAULT_IS_CLIENT_AUTH_REQUIRED = Boolean.FALSE;
108 
109     private final Map<String, DynamicStringProperty> dynamicProperties = new ConcurrentHashMap<String, DynamicStringProperty>();
110 
111     public Boolean getDefaultPrioritizeVipAddressBasedServers() {
112         return DEFAULT_PRIORITIZE_VIP_ADDRESS_BASED_SERVERS;
113     }
114 
115     public String getDefaultNfloadbalancerPingClassname() {
116         return DEFAULT_NFLOADBALANCER_PING_CLASSNAME;
117     }
118 
119     public String getDefaultNfloadbalancerRuleClassname() {
120         return DEFAULT_NFLOADBALANCER_RULE_CLASSNAME;
121     }
122 
123     public String getDefaultNfloadbalancerClassname() {
124         return DEFAULT_NFLOADBALANCER_CLASSNAME;
125     }
126 
127     public boolean getDefaultUseIpAddressForServer() {
128         return DEFAULT_USEIPADDRESS_FOR_SERVER;
129     }
130 
131     public String getDefaultClientClassname() {
132         return DEFAULT_CLIENT_CLASSNAME;
133     }
134 
135     public String getDefaultVipaddressResolverClassname() {
136         return DEFAULT_VIPADDRESS_RESOLVER_CLASSNAME;
137     }
138 
139     public String getDefaultPrimeConnectionsUri() {
140         return DEFAULT_PRIME_CONNECTIONS_URI;
141     }
142 
143     public int getDefaultMaxTotalTimeToPrimeConnections() {
144         return DEFAULT_MAX_TOTAL_TIME_TO_PRIME_CONNECTIONS;
145     }
146 
147     public int getDefaultMaxRetriesPerServerPrimeConnection() {
148         return DEFAULT_MAX_RETRIES_PER_SERVER_PRIME_CONNECTION;
149     }
150 
151     public Boolean getDefaultEnablePrimeConnections() {
152         return DEFAULT_ENABLE_PRIME_CONNECTIONS;
153     }
154 
155     public int getDefaultMaxRequestsAllowedPerWindow() {
156         return DEFAULT_MAX_REQUESTS_ALLOWED_PER_WINDOW;
157     }
158 
159     public int getDefaultRequestThrottlingWindowInMillis() {
160         return DEFAULT_REQUEST_THROTTLING_WINDOW_IN_MILLIS;
161     }
162 
163     public Boolean getDefaultEnableRequestThrottling() {
164         return DEFAULT_ENABLE_REQUEST_THROTTLING;
165     }
166 
167     public Boolean getDefaultEnableGzipContentEncodingFilter() {
168         return DEFAULT_ENABLE_GZIP_CONTENT_ENCODING_FILTER;
169     }
170 
171     public Boolean getDefaultConnectionPoolCleanerTaskEnabled() {
172         return DEFAULT_CONNECTION_POOL_CLEANER_TASK_ENABLED;
173     }
174 
175     public Boolean getDefaultFollowRedirects() {
176         return DEFAULT_FOLLOW_REDIRECTS;
177     }
178 
179     public float getDefaultPercentageNiwsEventLogged() {
180         return DEFAULT_PERCENTAGE_NIWS_EVENT_LOGGED;
181     }
182 
183     public int getDefaultMaxAutoRetriesNextServer() {
184         return DEFAULT_MAX_AUTO_RETRIES_NEXT_SERVER;
185     }
186 
187     public int getDefaultMaxAutoRetries() {
188         return DEFAULT_MAX_AUTO_RETRIES;
189     }
190 
191     public int getDefaultReadTimeout() {
192         return DEFAULT_READ_TIMEOUT;
193     }
194 
195     public int getDefaultConnectionManagerTimeout() {
196         return DEFAULT_CONNECTION_MANAGER_TIMEOUT;
197     }
198 
199     public int getDefaultConnectTimeout() {
200         return DEFAULT_CONNECT_TIMEOUT;
201     }
202 
203     @Deprecated
204     public int getDefaultMaxHttpConnectionsPerHost() {
205         return DEFAULT_MAX_HTTP_CONNECTIONS_PER_HOST;
206     }
207 
208     @Deprecated
209     public int getDefaultMaxTotalHttpConnections() {
210         return DEFAULT_MAX_TOTAL_HTTP_CONNECTIONS;
211     }
212 
213     public int getDefaultMaxConnectionsPerHost() {
214         return DEFAULT_MAX_CONNECTIONS_PER_HOST;
215     }
216 
217     public int getDefaultMaxTotalConnections() {
218         return DEFAULT_MAX_TOTAL_CONNECTIONS;
219     }
220 
221     public float getDefaultMinPrimeConnectionsRatio() {
222         return DEFAULT_MIN_PRIME_CONNECTIONS_RATIO;
223     }
224 
225     public String getDefaultPrimeConnectionsClass() {
226         return DEFAULT_PRIME_CONNECTIONS_CLASS;
227     }
228 
229     public String getDefaultSeverListClass() {
230         return DEFAULT_SEVER_LIST_CLASS;
231     }
232 
233     public int getDefaultConnectionIdleTimertaskRepeatInMsecs() {
234         return DEFAULT_CONNECTION_IDLE_TIMERTASK_REPEAT_IN_MSECS;
235     }
236 
237     public int getDefaultConnectionidleTimeInMsecs() {
238         return DEFAULT_CONNECTIONIDLE_TIME_IN_MSECS;
239     }
240 
241     public VipAddressResolver getResolver() {
242         return resolver;
243     }
244 
245     public boolean isEnableDynamicProperties() {
246         return enableDynamicProperties;
247     }
248 
249     public int getDefaultPoolMaxThreads() {
250         return DEFAULT_POOL_MAX_THREADS;
251     }
252 
253     public int getDefaultPoolMinThreads() {
254         return DEFAULT_POOL_MIN_THREADS;
255     }
256 
257     public long getDefaultPoolKeepAliveTime() {
258         return DEFAULT_POOL_KEEP_ALIVE_TIME;
259     }
260 
261     public TimeUnit getDefaultPoolKeepAliveTimeUnits() {
262         return DEFAULT_POOL_KEEP_ALIVE_TIME_UNITS;
263     }
264 
265     public Boolean getDefaultEnableZoneAffinity() {
266         return DEFAULT_ENABLE_ZONE_AFFINITY;
267     }
268 
269     public Boolean getDefaultEnableZoneExclusivity() {
270         return DEFAULT_ENABLE_ZONE_EXCLUSIVITY;
271     }
272 
273     public int getDefaultPort() {
274         return DEFAULT_PORT;
275     }
276 
277     public Boolean getDefaultEnableLoadbalancer() {
278         return DEFAULT_ENABLE_LOADBALANCER;
279     }
280 
281 
282     public Boolean getDefaultOkToRetryOnAllOperations() {
283         return DEFAULT_OK_TO_RETRY_ON_ALL_OPERATIONS;
284     }
285 
286     public Boolean getDefaultIsClientAuthRequired(){
287         return DEFAULT_IS_CLIENT_AUTH_REQUIRED;
288     }
289 
290 
291     /**
292      * Create instance with no properties in default name space {@link #DEFAULT_PROPERTY_NAME_SPACE}
293      */
294     public DefaultClientConfigImpl() {
295         this.dynamicProperties.clear();
296         this.enableDynamicProperties = false;
297     }
298 
299     /**
300      * Create instance with no properties in the specified name space
301      */
302     public DefaultClientConfigImpl(String nameSpace) {
303         this();
304         this.propertyNameSpace = nameSpace;
305     }
306 
307     public void loadDefaultValues() {
308         putDefaultIntegerProperty(CommonClientConfigKey.MaxHttpConnectionsPerHost, getDefaultMaxHttpConnectionsPerHost());
309         putDefaultIntegerProperty(CommonClientConfigKey.MaxTotalHttpConnections, getDefaultMaxTotalHttpConnections());
310         putDefaultBooleanProperty(CommonClientConfigKey.EnableConnectionPool, getDefaultEnableConnectionPool());
311         putDefaultIntegerProperty(CommonClientConfigKey.MaxConnectionsPerHost, getDefaultMaxConnectionsPerHost());
312         putDefaultIntegerProperty(CommonClientConfigKey.MaxTotalConnections, getDefaultMaxTotalConnections());
313         putDefaultIntegerProperty(CommonClientConfigKey.ConnectTimeout, getDefaultConnectTimeout());
314         putDefaultIntegerProperty(CommonClientConfigKey.ConnectionManagerTimeout, getDefaultConnectionManagerTimeout());
315         putDefaultIntegerProperty(CommonClientConfigKey.ReadTimeout, getDefaultReadTimeout());
316         putDefaultIntegerProperty(CommonClientConfigKey.MaxAutoRetries, getDefaultMaxAutoRetries());
317         putDefaultIntegerProperty(CommonClientConfigKey.MaxAutoRetriesNextServer, getDefaultMaxAutoRetriesNextServer());
318         putDefaultBooleanProperty(CommonClientConfigKey.OkToRetryOnAllOperations, getDefaultOkToRetryOnAllOperations());
319         putDefaultBooleanProperty(CommonClientConfigKey.FollowRedirects, getDefaultFollowRedirects());
320         putDefaultBooleanProperty(CommonClientConfigKey.ConnectionPoolCleanerTaskEnabled, getDefaultConnectionPoolCleanerTaskEnabled());
321         putDefaultIntegerProperty(CommonClientConfigKey.ConnIdleEvictTimeMilliSeconds, getDefaultConnectionidleTimeInMsecs());
322         putDefaultIntegerProperty(CommonClientConfigKey.ConnectionCleanerRepeatInterval, getDefaultConnectionIdleTimertaskRepeatInMsecs());
323         putDefaultBooleanProperty(CommonClientConfigKey.EnableGZIPContentEncodingFilter, getDefaultEnableGzipContentEncodingFilter());
324         String proxyHost = ConfigurationManager.getConfigInstance().getString(getDefaultPropName(CommonClientConfigKey.ProxyHost.key()));
325         if (proxyHost != null && proxyHost.length() > 0) {
326             setProperty(CommonClientConfigKey.ProxyHost, proxyHost);
327         }
328         Integer proxyPort = ConfigurationManager
329                 .getConfigInstance()
330                 .getInteger(
331                         getDefaultPropName(CommonClientConfigKey.ProxyPort),
332                         (Integer.MIN_VALUE + 1)); // + 1 just to avoid potential clash with user setting
333         if (proxyPort != (Integer.MIN_VALUE + 1)) {
334             setProperty(CommonClientConfigKey.ProxyPort, proxyPort);
335         }
336         putDefaultIntegerProperty(CommonClientConfigKey.Port, getDefaultPort());
337         putDefaultBooleanProperty(CommonClientConfigKey.EnablePrimeConnections, getDefaultEnablePrimeConnections());
338         putDefaultIntegerProperty(CommonClientConfigKey.MaxRetriesPerServerPrimeConnection, getDefaultMaxRetriesPerServerPrimeConnection());
339         putDefaultIntegerProperty(CommonClientConfigKey.MaxTotalTimeToPrimeConnections, getDefaultMaxTotalTimeToPrimeConnections());
340         putDefaultStringProperty(CommonClientConfigKey.PrimeConnectionsURI, getDefaultPrimeConnectionsUri());
341         putDefaultIntegerProperty(CommonClientConfigKey.PoolMinThreads, getDefaultPoolMinThreads());
342         putDefaultIntegerProperty(CommonClientConfigKey.PoolMaxThreads, getDefaultPoolMaxThreads());
343         putDefaultLongProperty(CommonClientConfigKey.PoolKeepAliveTime, getDefaultPoolKeepAliveTime());
344         putDefaultTimeUnitProperty(CommonClientConfigKey.PoolKeepAliveTimeUnits, getDefaultPoolKeepAliveTimeUnits());
345         putDefaultBooleanProperty(CommonClientConfigKey.EnableZoneAffinity, getDefaultEnableZoneAffinity());
346         putDefaultBooleanProperty(CommonClientConfigKey.EnableZoneExclusivity, getDefaultEnableZoneExclusivity());
347         putDefaultStringProperty(CommonClientConfigKey.ClientClassName, getDefaultClientClassname());
348         putDefaultStringProperty(CommonClientConfigKey.NFLoadBalancerClassName, getDefaultNfloadbalancerClassname());
349         putDefaultStringProperty(CommonClientConfigKey.NFLoadBalancerRuleClassName, getDefaultNfloadbalancerRuleClassname());
350         putDefaultStringProperty(CommonClientConfigKey.NFLoadBalancerPingClassName, getDefaultNfloadbalancerPingClassname());
351         putDefaultBooleanProperty(CommonClientConfigKey.PrioritizeVipAddressBasedServers, getDefaultPrioritizeVipAddressBasedServers());
352         putDefaultFloatProperty(CommonClientConfigKey.MinPrimeConnectionsRatio, getDefaultMinPrimeConnectionsRatio());
353         putDefaultStringProperty(CommonClientConfigKey.PrimeConnectionsClassName, getDefaultPrimeConnectionsClass());
354         putDefaultStringProperty(CommonClientConfigKey.NIWSServerListClassName, getDefaultSeverListClass());
355         putDefaultStringProperty(CommonClientConfigKey.VipAddressResolverClassName, getDefaultVipaddressResolverClassname());
356         putDefaultBooleanProperty(CommonClientConfigKey.IsClientAuthRequired, getDefaultIsClientAuthRequired());
357         // putDefaultStringProperty(CommonClientConfigKey.RequestIdHeaderName, getDefaultRequestIdHeaderName());
358         putDefaultBooleanProperty(CommonClientConfigKey.UseIPAddrForServer, getDefaultUseIpAddressForServer());
359         putDefaultStringProperty(CommonClientConfigKey.ListOfServers, "");
360     }
361 }

View Code

也可以在配置文件中定制配置,例如配置超时和重试:

 1 # 全局配置
 2 ribbon:
 3   # 客户端读取超时时间
 4   ReadTimeout: 3000
 5   # 客户端连接超时时间
 6   ConnectTimeout: 3000
 7   # 默认只重试 GET,设置为 true 时将重试所有类型,如 POST、PUT、DELETE
 8   OkToRetryOnAllOperations: false
 9   # 重试次数
10   MaxAutoRetries: 1
11   # 最多重试几个实例
12   MaxAutoRetriesNextServer: 1
13 
14 # 只针对 demo-producer 客户端
15 demo-producer:
16   ribbon:
17     # 客户端读取超时时间
18     ReadTimeout: 5000
19     # 客户端连接超时时间
20     ConnectTimeout: 3000

2、均衡策略 — IRule

IRule 是最终选择 Server 的策略规则类,核心的接口就是 choose。

 1 public interface IRule{
 2 
 3     // 选择 Server
 4     public Server choose(Object key);
 5 
 6     // 设置 ILoadBalancer
 7     public void setLoadBalancer(ILoadBalancer lb);
 8 
 9     // 获取 ILoadBalancer
10     public ILoadBalancer getLoadBalancer();
11 }

Ribbon 提供了丰富的负载均衡策略,我们也可以通过配置指定使用某个均衡策略。下面是整个Ribbon提供的 IRule 均衡策略。

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

3、服务检查 — IPing

IPing 是用于定期检查 Server 的可用性的,它只提供了一个接口,用来判断 Server 是否存活:

1 public interface IPing { 2 3 public boolean isAlive(Server server); 4 }

IPing 也提供了多种策略可选,下面是整个 IPing 体系结构:

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

4、获取服务列表 — ServerList

ServerList 提供了两个接口,一个是第一次获取 Server 列表,一个是更新 Server 列表,其中 getUpdatedListOfServers 会每被 Loadbalancer 隔 30 秒调一次来更新 allServerList。

 1 public interface ServerList<T extends Server> {  2  3 public List<T> getInitialListOfServers();  4  5 /**  6  * Return updated list of servers. This is called say every 30 secs  7  * (configurable) by the Loadbalancer's Ping cycle  8 */  9 public List<T> getUpdatedListOfServers(); 10 }

ServerList 也提供了多种实现,ServerList 体系结构如下:

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

5、过滤服务 — ServerListFilter

ServerListFilter 提供了一个接口用来过滤出可用的 Server。

1 public interface ServerListFilter<T extends Server> {
2 
3     public List<T> getFilteredListOfServers(List<T> servers);
4 }

ServerListFilter 体系结构如下:

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

6、服务列表更新 — ServerListUpdater

ServerListUpdater 有多个接口,最核心的就是 start 开启定时任务调用 updateAction 来更新 allServerList。

 1 public interface ServerListUpdater {  2  3 /**  4  * an interface for the updateAction that actually executes a server list update  5 */  6 public interface UpdateAction {  7 void doUpdate();  8  }  9 10 /** 11  * start the serverList updater with the given update action 12  * This call should be idempotent. 13 */ 14 void start(UpdateAction updateAction); 15 }

默认有两个实现类:

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

7、负载均衡器 — ILoadBalancer

ILoadBalancer 是负载均衡选择服务的核心接口,主要提供了如下的获取Server列表和根据客户端名称选择Server的接口。

 1 public interface ILoadBalancer {  2  3 // 添加Server  4 public void addServers(List<Server> newServers);  5  6 // 根据key选择一个Server  7 public Server chooseServer(Object key);  8  9 // 获取存活的Server列表,返回 upServerList 10 public List<Server> getReachableServers(); 11 12 // 获取所有Server列表,返回 allServerList 13 public List<Server> getAllServers(); 14 }

ILoadBalancer 的体系结构如下:

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

8、Ribbon 相关配置类

从前面一直看下来,可以发现有很多与 Ribbon 相关的配置类,这里总结下与 Ribbon 相关的配置类,看每个配置类的配置顺序,以及都主要配置了哪些东西。

① 首先是Eureka客户端配置类 EurekaClientAutoConfiguration,这个自动化配置类主要配置了 Ribbon 所需的 EurekaClient。

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

 1 @Configuration(proxyBeanMethods = false)
 2 @EnableConfigurationProperties
 3 @ConditionalOnClass(EurekaClientConfig.class)
 4 @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
 5 @ConditionalOnDiscoveryEnabled
 6 @AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
 7         CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
 8 @AutoConfigureAfter(name = {
 9         "org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration",
10         "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
11         "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
12         "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })
13 public class EurekaClientAutoConfiguration {
14     // ....
15 }

View Code

② 接着是Ribbon自动化配置类 RibbonAutoConfiguration,这个类主要配置了如下类:

  • SpringClientFactory:管理 Ribbon 客户端上下文。
  • LoadBalancerClient:负载均衡客户端,默认实现类为 RibbonLoadBalancerClient(实际是在 RibbonClientConfiguration 中配置的)。
  • PropertiesFactory:用于判断配置文件中是否自定义了核心接口的实现类,如 NFLoadBalancerClassName、NFLoadBalancerPingClassName 等。
  • RibbonApplicationContextInitializer:开启饥饿配置的时候,用这个类来在启动时初始化 Ribbon 客户端上下文。

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

 1 package org.springframework.cloud.netflix.ribbon;
 2 
 3 @Configuration
 4 @Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class)
 5 @RibbonClients
 6 // 在 EurekaClientAutoConfiguration 之后配置
 7 @AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
 8 // 在 LoadBalancerAutoConfiguration、AsyncLoadBalancerAutoConfiguration 之前配置
 9 @AutoConfigureBefore({ LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class })
10 @EnableConfigurationProperties({ RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class })
11 public class RibbonAutoConfiguration {
12 
13     @Autowired(required = false)
14     private List<RibbonClientSpecification> configurations = new ArrayList<>();
15 
16     @Autowired
17     private RibbonEagerLoadProperties ribbonEagerLoadProperties;
18 
19     @Bean
20     public HasFeatures ribbonFeature() {
21         return HasFeatures.namedFeature("Ribbon", Ribbon.class);
22     }
23 
24     @Bean
25     @ConditionalOnMissingBean
26     public SpringClientFactory springClientFactory() {
27         SpringClientFactory factory = new SpringClientFactory();
28         factory.setConfigurations(this.configurations);
29         return factory;
30     }
31 
32     @Bean
33     @ConditionalOnMissingBean(LoadBalancerClient.class)
34     public LoadBalancerClient loadBalancerClient() {
35         return new RibbonLoadBalancerClient(springClientFactory());
36     }
37 
38     @Bean
39     @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
40     @ConditionalOnMissingBean
41     public LoadBalancedRetryFactory loadBalancedRetryPolicyFactory(final SpringClientFactory clientFactory) {
42         return new RibbonLoadBalancedRetryFactory(clientFactory);
43     }
44 
45     @Bean
46     @ConditionalOnMissingBean
47     public PropertiesFactory propertiesFactory() {
48         return new PropertiesFactory();
49     }
50 
51     @Bean
52     @ConditionalOnProperty("ribbon.eager-load.enabled")
53     public RibbonApplicationContextInitializer ribbonApplicationContextInitializer() {
54         return new RibbonApplicationContextInitializer(springClientFactory(), ribbonEagerLoadProperties.getClients());
55     }
56 }

View Code

③ 接着是负载均衡器配置类 LoadBalancerAutoConfiguration,这个类主要是创建了负载均衡拦截器 LoadBalancerInterceptor,并添加到 RestTemplae 的拦截器中。

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

 1 package org.springframework.cloud.client.loadbalancer;
 2 
 3 @Configuration(proxyBeanMethods = false)
 4 @ConditionalOnClass(RestTemplate.class)
 5 @ConditionalOnBean(LoadBalancerClient.class)
 6 @EnableConfigurationProperties(LoadBalancerRetryProperties.class)
 7 public class LoadBalancerAutoConfiguration {
 8 
 9     @LoadBalanced
10     @Autowired(required = false)
11     private List<RestTemplate> restTemplates = Collections.emptyList();
12 
13     @Autowired(required = false)
14     private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
15 
16     // 对 RestTemplate 定制化
17     @Bean
18     public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
19             final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
20         return () -> restTemplateCustomizers.ifAvailable(customizers -> {
21             for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
22                 for (RestTemplateCustomizer customizer : customizers) {
23                     customizer.customize(restTemplate);
24                 }
25             }
26         });
27     }
28 
29     @Bean
30     @ConditionalOnMissingBean
31     public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
32         return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
33     }
34 
35     @Configuration(proxyBeanMethods = false)
36     @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
37     static class LoadBalancerInterceptorConfig {
38 
39         // 创建 RestTemplate 拦截器
40         @Bean
41         public LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient,
42                                                          LoadBalancerRequestFactory requestFactory) {
43             return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
44         }
45 
46         @Bean
47         @ConditionalOnMissingBean
48         public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
49             return restTemplate -> {
50                 List<ClientHttpRequestInterceptor> list = new ArrayList<>(
51                         restTemplate.getInterceptors());
52                 list.add(loadBalancerInterceptor);
53                 restTemplate.setInterceptors(list);
54             };
55         }
56 
57     }
58 }

View Code

④ 之后是默认的 Ribbon 客户端配置类 RibbonClientConfiguration,这个类主要配置了 Ribbon 核心接口的默认实现。

  • IClientConfig:Ribbon 客户端配置类,默认实现是 DefaultClientConfigImpl。
  • IRule:负载均衡策略规则组件,默认实现是 ZoneAvoidanceRule。
  • IPing:判断 Server 是否存活,默认实现是 DummyPing,永远都是返回 true。
  • ServerList:获取 Server 的组件,默认实现类为 ConfigurationBasedServerList,从配置文件获取。
  • ServerListUpdater:Server 列表更新组件,默认实现类为 PollingServerListUpdater。
  • ServerListFilter:过滤可用的 Server 列表,默认实现类为 ZonePreferenceServerListFilter。
  • RibbonLoadBalancerContext:负载均衡客户端。
  • RetryHandler:重试处理器,默认实现类为 DefaultLoadBalancerRetryHandler。

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

  1 package org.springframework.cloud.netflix.ribbon;
  2 
  3 @SuppressWarnings("deprecation")
  4 @Configuration(proxyBeanMethods = false)
  5 @EnableConfigurationProperties
  6 @Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class,
  7         RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class })
  8 public class RibbonClientConfiguration {
  9     public static final int DEFAULT_CONNECT_TIMEOUT = 1000;
 10     public static final int DEFAULT_READ_TIMEOUT = 1000;
 11     public static final boolean DEFAULT_GZIP_PAYLOAD = true;
 12 
 13     @RibbonClientName
 14     private String name = "client";
 15 
 16     @Autowired
 17     private PropertiesFactory propertiesFactory;
 18 
 19     @Bean
 20     @ConditionalOnMissingBean
 21     public IClientConfig ribbonClientConfig() {
 22         DefaultClientConfigImpl config = new DefaultClientConfigImpl();
 23         config.loadProperties(this.name);
 24         config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
 25         config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
 26         config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
 27         return config;
 28     }
 29 
 30     @Bean
 31     @ConditionalOnMissingBean
 32     public IRule ribbonRule(IClientConfig config) {
 33         if (this.propertiesFactory.isSet(IRule.class, name)) {
 34             return this.propertiesFactory.get(IRule.class, config, name);
 35         }
 36         ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
 37         rule.initWithNiwsConfig(config);
 38         return rule;
 39     }
 40 
 41     @Bean
 42     @ConditionalOnMissingBean
 43     public IPing ribbonPing(IClientConfig config) {
 44         if (this.propertiesFactory.isSet(IPing.class, name)) {
 45             return this.propertiesFactory.get(IPing.class, config, name);
 46         }
 47         return new DummyPing();
 48     }
 49 
 50     @Bean
 51     @ConditionalOnMissingBean
 52     @SuppressWarnings("unchecked")
 53     public ServerList<Server> ribbonServerList(IClientConfig config) {
 54         if (this.propertiesFactory.isSet(ServerList.class, name)) {
 55             return this.propertiesFactory.get(ServerList.class, config, name);
 56         }
 57         ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
 58         serverList.initWithNiwsConfig(config);
 59         return serverList;
 60     }
 61 
 62     @Bean
 63     @ConditionalOnMissingBean
 64     public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
 65         return new PollingServerListUpdater(config);
 66     }
 67 
 68     @Bean
 69     @ConditionalOnMissingBean
 70     public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
 71             ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
 72             IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
 73         if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
 74             return this.propertiesFactory.get(ILoadBalancer.class, config, name);
 75         }
 76         return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
 77                 serverListFilter, serverListUpdater);
 78     }
 79 
 80     @Bean
 81     @ConditionalOnMissingBean
 82     @SuppressWarnings("unchecked")
 83     public ServerListFilter<Server> ribbonServerListFilter(IClientConfig config) {
 84         if (this.propertiesFactory.isSet(ServerListFilter.class, name)) {
 85             return this.propertiesFactory.get(ServerListFilter.class, config, name);
 86         }
 87         ZonePreferenceServerListFilter filter = new ZonePreferenceServerListFilter();
 88         filter.initWithNiwsConfig(config);
 89         return filter;
 90     }
 91 
 92     @Bean
 93     @ConditionalOnMissingBean
 94     public RibbonLoadBalancerContext ribbonLoadBalancerContext(ILoadBalancer loadBalancer,
 95             IClientConfig config, RetryHandler retryHandler) {
 96         return new RibbonLoadBalancerContext(loadBalancer, config, retryHandler);
 97     }
 98 
 99     @Bean
100     @ConditionalOnMissingBean
101     public RetryHandler retryHandler(IClientConfig config) {
102         return new DefaultLoadBalancerRetryHandler(config);
103     }
104 
105     @Bean
106     @ConditionalOnMissingBean
107     public ServerIntrospector serverIntrospector() {
108         return new DefaultServerIntrospector();
109     }
110 }

View Code

⑤ Ribbon Eureka 自动化配置类 RibbonEurekaAutoConfiguration,判断是否启用 Ribbon Eureka,并触发 EurekaRibbonClientConfiguration 配置类。

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

 1 package org.springframework.cloud.netflix.ribbon.eureka;
 2 
 3 @Configuration(proxyBeanMethods = false)
 4 @EnableConfigurationProperties
 5 @ConditionalOnRibbonAndEurekaEnabled
 6 @AutoConfigureAfter(RibbonAutoConfiguration.class)
 7 @RibbonClients(defaultConfiguration = EurekaRibbonClientConfiguration.class)
 8 public class RibbonEurekaAutoConfiguration {
 9 
10 }

View Code

⑥ 默认启用 Ribbon Eureka 的情况下,会使用 Ribbon Eureka 客户端配置类 EurekaRibbonClientConfiguration:

  • IPing:替换了默认实现类 DummyPing,改为 NIWSDiscoveryPing,通过判断 InstanceInfo 的状态是否为 UP 来判断 Server 是否存活。
  • ServerList:替换了默认的实现类 ConfigurationBasedServerList,改为 DomainExtractingServerList,实际是 DiscoveryEnabledNIWSServerList,从 EurekaClient 获取 Server 列表。

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

 1 package org.springframework.cloud.netflix.ribbon.eureka;
 2 
 3 @Configuration(proxyBeanMethods = false)
 4 public class EurekaRibbonClientConfiguration {
 5 
 6     private static final Log log = LogFactory.getLog(EurekaRibbonClientConfiguration.class);
 7 
 8     @Value("${ribbon.eureka.approximateZoneFromHostname:false}")
 9     private boolean approximateZoneFromHostname = false;
10     @RibbonClientName
11     private String serviceId = "client";
12     @Autowired(required = false)
13     private EurekaClientConfig clientConfig;
14     @Autowired(required = false)
15     private EurekaInstanceConfig eurekaConfig;
16     @Autowired
17     private PropertiesFactory propertiesFactory;
18 
19     public EurekaRibbonClientConfiguration() {
20     }
21 
22     public EurekaRibbonClientConfiguration(EurekaClientConfig clientConfig,
23             String serviceId, EurekaInstanceConfig eurekaConfig,
24             boolean approximateZoneFromHostname) {
25         this.clientConfig = clientConfig;
26         this.serviceId = serviceId;
27         this.eurekaConfig = eurekaConfig;
28         this.approximateZoneFromHostname = approximateZoneFromHostname;
29     }
30 
31     @Bean
32     @ConditionalOnMissingBean
33     public IPing ribbonPing(IClientConfig config) {
34         if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
35             return this.propertiesFactory.get(IPing.class, config, serviceId);
36         }
37         NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
38         ping.initWithNiwsConfig(config);
39         return ping;
40     }
41 
42     @Bean
43     @ConditionalOnMissingBean
44     public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
45         if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
46             return this.propertiesFactory.get(ServerList.class, config, serviceId);
47         }
48         DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
49                 config, eurekaClientProvider);
50         DomainExtractingServerList serverList = new DomainExtractingServerList(
51                 discoveryServerList, config, this.approximateZoneFromHostname);
52         return serverList;
53     }
54 }

View Code

⑦ 各个配置类所属模块

spring-cloud-netflix-eureka-client:

  • org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration
  • org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration
  • org.springframework.cloud.netflix.ribbon.eureka.EurekaRibbonClientConfiguration

spring-cloud-netflix-ribbon:

  • org.springframework.cloud.netflix.ribbon.RibbonAutoConfiguration
  • org.springframework.cloud.netflix.ribbon.RibbonClientConfiguration

spring-cloud-commons:

  • org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration

六、Ribbon HTTP客户端组件

1、Java HTTP 组件库

① HTTP 组件库

首先简单了解下常用的 Java HTTP 组件库,Ribbon 中通过不同的配置便可以启用某个 HTTP 组件来进行服务间的通信。

Java 中的 HTTP 组件库,大体可以分为三类:

  • JDK 自带的标准库 HttpURLConnection
  • Apache HttpComponents HttpClient
  • OkHttp

HttpURLConnection 发起 HTTP 请求最大的优点是不需要引入额外的依赖,但是 HttpURLConnection 封装层次太低,使用起来非常繁琐。支持的特性太少,缺乏连接池管理、域名机械控制,无法支持 HTTP/2等。

Apache HttpComponents HttpClient 和 OkHttp 都支持连接池管理、超时、空闲连接数控制等特性。OkHttp 接口设计更友好,且支持 HTTP/2,Android 开发中用得更多。

② 超时重试配置

先给 demo-consumer 中添加如下默认配置,即读取、连接超时时间设置为 1 秒,这也是默认值。然后重试次数为1,重试一个Server。后面基于这些配置来验证Ribbon HTTP客户端的超时和重试。

 1 ribbon:
 2   # 客户端读取超时时间
 3   ReadTimeout: 1000
 4   # 客户端连接超时时间
 5   ConnectTimeout: 1000
 6   # 默认只重试 GET,设置为 true 时将重试所有类型,如 POST、PUT、DELETE
 7   OkToRetryOnAllOperations: false
 8   # 重试次数
 9   MaxAutoRetries: 1
10   # 最多重试几个实例
11   MaxAutoRetriesNextServer: 1

然后 demo-producer 的接口休眠3秒,造成网络延迟的现象,并且 demo-producer 启两个实例。

1 @GetMapping("/v1/uuid")
2 public ResponseEntity<String> getUUID() throws InterruptedException {
3     String uuid = UUID.randomUUID().toString();
4     logger.info("generate uuid: {}", uuid);
5     Thread.sleep(3000);
6     return ResponseEntity.ok(uuid);
7 }

2、Ribbon 默认使用 HttpURLConnection

① Ribbon 默认的 HTTP 组件

在不添加其它配置的情况下,我们来看下 Ribbon 默认使用的 HTTP 组件是什么。

首先通过之前的分析可以知道,默认情况下,LoadBalancerAutoConfiguration 配置类会向 RestTemplate 添加 LoadBalancerInterceptor 拦截器。然后在 RestTemplate 调用时,即在 doExecute 方法中,创建 ClientHttpRequest 时,因为配置了拦截器,所以 ClientHttpRequestFactory 就是 InterceptingClientHttpRequestFactory,而且创建 InterceptingClientHttpRequestFactory 传入的 ClientHttpRequestFactory 默认是父类的 SimpleClientHttpRequestFactory。

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

 1 protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
 2         @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
 3     //...
 4     ClientHttpResponse response = null;
 5     try {
 6         // 创建 ClientHttpRequest
 7         ClientHttpRequest request = createRequest(url, method);
 8         if (requestCallback != null) {
 9             requestCallback.doWithRequest(request);
10         }
11         // ClientHttpRequest 发起请求
12         response = request.execute();
13         handleResponse(url, method, response);
14         return (responseExtractor != null ? responseExtractor.extractData(response) : null);
15     }
16     catch (IOException ex) {
17         // ...
18     }
19     finally {
20         if (response != null) {
21             response.close();
22         }
23     }
24 }
25 
26 protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
27     // getRequestFactory 获取 ClientHttpRequestFactory
28     ClientHttpRequest request = getRequestFactory().createRequest(url, method);
29     initialize(request);
30     if (logger.isDebugEnabled()) {
31         logger.debug("HTTP " + method.name() + " " + url);
32     }
33     return request;
34 }
35 
36 public ClientHttpRequestFactory getRequestFactory() {
37     List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
38     if (!CollectionUtils.isEmpty(interceptors)) {
39         ClientHttpRequestFactory factory = this.interceptingRequestFactory;
40         if (factory == null) {
41             // 有拦截器的情况,super.getRequestFactory() 默认返回的是 SimpleClientHttpRequestFactory
42             factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
43             this.interceptingRequestFactory = factory;
44         }
45         return factory;
46     }
47     else {
48         // 无拦截器的情况
49         return super.getRequestFactory();
50     }
51 }

View Code

InterceptingClientHttpRequestFactory 这个工厂类创建的 ClientHttpRequest 类型是 InterceptingClientHttpRequest。最终 RestTemplate 的 doExecute 方法中调用 ClientHttpRequest 的 execute 方法时,就调用到了 InterceptingClientHttpRequest 中的内部类 InterceptingRequestExecution 中。

在 InterceptingRequestExecution 的 execute 方法中,首先是遍历所有拦截器对 RestTemplate 定制化,最后则通过 requestFactory 创建 ClientHttpRequest 来发起最终的 HTTP 调用。从这里可以看出,无论有没有拦截器,其实最终都会使用 requestFactory 来创建 ClientHttpRequest。

 1 private class InterceptingRequestExecution implements ClientHttpRequestExecution {
 2     private final Iterator<ClientHttpRequestInterceptor> iterator;
 3 
 4     @Override
 5     public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
 6         if (this.iterator.hasNext()) {
 7             // 拦截器定制化 RestTemplate
 8             ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
 9             return nextInterceptor.intercept(request, body, this);
10         }
11         else {
12             HttpMethod method = request.getMethod();
13             // delegate => SimpleBufferingClientHttpRequest
14             ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
15             //...
16             return delegate.execute();
17         }
18     }
19 }

这里的 requestFactory 就是前面传进来的 SimpleClientHttpRequestFactory,从它的 createRequest 方法可以看出,默认情况下,就是用的 JDK 标准 HTTP 库组件 HttpURLConnection 来进行服务间的请求通信。

 1 public ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod) throws IOException {
 2     // JDK 标准HTTP库 HttpURLConnection
 3     HttpURLConnection connection = openConnection(uri.toURL(), this.proxy);
 4     prepareConnection(connection, httpMethod.name());
 5 
 6     if (this.bufferRequestBody) {
 7         return new SimpleBufferingClientHttpRequest(connection, this.outputStreaming);
 8     }
 9     else {
10         return new SimpleStreamingClientHttpRequest(connection, this.chunkSize, this.outputStreaming);
11     }
12 }

总结:

从前面的源码分析可以看出,Ribbon 默认的 HTTP 客户端是 HttpURLConnection。

在前面默认的超时配置下,可以验证出超时配置并未生效,一直阻塞3秒后才返回了结果,说明 Ribbon 默认情况下就不支持超时重试。

而且 HttpURLConnection 每次都是新创建的,请求返回来之后就关闭连接,没有连接池管理机制,网络连接的建立和关闭本身就会损耗一定的性能,所以正式环境下,最好不要使用默认的配置。

② HttpClient 配置类

另外,我们从 RibbonClientConfiguration 配置类的定义可以看到,其导入了 HttpClientConfiguration、OkHttpRibbonConfiguration、RestClientRibbonConfiguration、HttpClientRibbonConfiguration 四个 HttpClient 的配置类,通过注释也可以了解到,最后一个是默认配置类,前面三个在某些配置启用的情况下才会生效。

1 @Configuration(proxyBeanMethods = false)
2 @EnableConfigurationProperties
3 // Order is important here, last should be the default, first should be optional
4 @Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class,
5         RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class })
6 public class RibbonClientConfiguration {
7 
8 }

进入 HttpClientRibbonConfiguration,这个配置类在 ribbon.httpclient.enabled=true 时才生效,而且默认为 true。在从 SpringClientFactory 中获取 ILoadBalancer 时,会通过这个配置类初始化 HttpClient,按先后顺序会初始化 HttpClientConnectionManager、CloseableHttpClient、RibbonLoadBalancingHttpClient。CloseableHttpClient 是 Apache HttpComponents HttpClient 中的组件,也就是说默认情况下应该是使用 apache HttpComponents 作为 HTTP 组件库。

但经过前面源码的分析,以及测试发现,最终其实走的的 HttpURLConnection,并没有用到 CloseableHttpClient。把 ribbon.httpclient.enabled 设置为 false,也没有什么影响,还是默认走 HttpURLConnection。我们后面再来分析这个问题。

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

 1 @Configuration(proxyBeanMethods = false)
 2 @ConditionalOnClass(name = "org.apache.http.client.HttpClient")
 3 // ribbon.httpclient.enabled more文为 true
 4 @ConditionalOnProperty(name = "ribbon.httpclient.enabled", matchIfMissing = true)
 5 public class HttpClientRibbonConfiguration {
 6 
 7     @RibbonClientName
 8     private String name = "client";
 9 
10     // RibbonLoadBalancingHttpClient
11     @Bean
12     @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
13     @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
14     public RibbonLoadBalancingHttpClient ribbonLoadBalancingHttpClient(
15             IClientConfig config, ServerIntrospector serverIntrospector,
16             ILoadBalancer loadBalancer, RetryHandler retryHandler,
17             CloseableHttpClient httpClient) {
18         RibbonLoadBalancingHttpClient client = new RibbonLoadBalancingHttpClient(
19                 httpClient, config, serverIntrospector);
20         client.setLoadBalancer(loadBalancer);
21         client.setRetryHandler(retryHandler);
22         Monitors.registerObject("Client_" + this.name, client);
23         return client;
24     }
25 
26     // 在引入了 spring-retry 时,即可以重试的 RetryTemplate 时,就创建 RetryableRibbonLoadBalancingHttpClient
27     @Bean
28     @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
29     @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
30     public RetryableRibbonLoadBalancingHttpClient retryableRibbonLoadBalancingHttpClient(
31             IClientConfig config, ServerIntrospector serverIntrospector,
32             ILoadBalancer loadBalancer, RetryHandler retryHandler,
33             LoadBalancedRetryFactory loadBalancedRetryFactory,
34             CloseableHttpClient httpClient,
35             RibbonLoadBalancerContext ribbonLoadBalancerContext) {
36         RetryableRibbonLoadBalancingHttpClient client = new RetryableRibbonLoadBalancingHttpClient(
37                 httpClient, config, serverIntrospector, loadBalancedRetryFactory);
38         client.setLoadBalancer(loadBalancer);
39         client.setRetryHandler(retryHandler);
40         client.setRibbonLoadBalancerContext(ribbonLoadBalancerContext);
41         Monitors.registerObject("Client_" + this.name, client);
42         return client;
43     }
44 
45     @Configuration(proxyBeanMethods = false)
46     protected static class ApacheHttpClientConfiguration {
47 
48         private final Timer connectionManagerTimer = new Timer(
49                 "RibbonApacheHttpClientConfiguration.connectionManagerTimer", true);
50 
51         private CloseableHttpClient httpClient;
52 
53         @Autowired(required = false)
54         private RegistryBuilder registryBuilder;
55 
56         // HttpClient 连接池管理器
57         @Bean
58         @ConditionalOnMissingBean(HttpClientConnectionManager.class)
59         public HttpClientConnectionManager httpClientConnectionManager(
60                 IClientConfig config,
61                 ApacheHttpClientConnectionManagerFactory connectionManagerFactory) {
62             RibbonProperties ribbon = RibbonProperties.from(config);
63             int maxTotalConnections = ribbon.maxTotalConnections();
64             int maxConnectionsPerHost = ribbon.maxConnectionsPerHost();
65             int timerRepeat = ribbon.connectionCleanerRepeatInterval();
66             long timeToLive = ribbon.poolKeepAliveTime();
67             TimeUnit ttlUnit = ribbon.getPoolKeepAliveTimeUnits();
68             final HttpClientConnectionManager connectionManager = connectionManagerFactory
69                     .newConnectionManager(false, maxTotalConnections,
70                             maxConnectionsPerHost, timeToLive, ttlUnit, registryBuilder);
71             this.connectionManagerTimer.schedule(new TimerTask() {
72                 @Override
73                 public void run() {
74                     connectionManager.closeExpiredConnections();
75                 }
76             }, 30000, timerRepeat);
77             return connectionManager;
78         }
79 
80         // HttpClient => CloseableHttpClient
81         @Bean
82         @ConditionalOnMissingBean(CloseableHttpClient.class)
83         public CloseableHttpClient httpClient(ApacheHttpClientFactory httpClientFactory,
84                 HttpClientConnectionManager connectionManager, IClientConfig config) {
85             RibbonProperties ribbon = RibbonProperties.from(config);
86             Boolean followRedirects = ribbon.isFollowRedirects();
87             Integer connectTimeout = ribbon.connectTimeout();
88             RequestConfig defaultRequestConfig = RequestConfig.custom()
89                     .setConnectTimeout(connectTimeout)
90                     .setRedirectsEnabled(followRedirects).build();
91             this.httpClient = httpClientFactory.createBuilder()
92                     .setDefaultRequestConfig(defaultRequestConfig)
93                     .setConnectionManager(connectionManager).build();
94             return httpClient;
95         }
96     }
97 }

View Code

③ 默认配置下的 RestTemplate 的调用过程大致可以用下图来表示。

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

3、启用 RestClient

① 启用 RestClient

可以添加如下配置启用 RestClient:

 1 ribbon:
 2   # 关闭 httpclient
 3   httpclient:
 4     enabled: false
 5   # 启用 RestClient
 6   restclient:
 7     enabled: true
 8   # 启用 RestClient
 9   http:
10     client:
11       enabled: true

进入 RestClientRibbonConfiguration  可以看到,只要 ribbon.http.client.enabled、ribbon.restclient.enabled 其中一个配置了启用,就可以启用 RestClient。

 1 @SuppressWarnings("deprecation")
 2 @Configuration(proxyBeanMethods = false)
 3 // 启用条件 ConditionalOnRibbonRestClient
 4 @RibbonAutoConfiguration.ConditionalOnRibbonRestClient
 5 class RestClientRibbonConfiguration {
 6     @RibbonClientName
 7     private String name = "client";
 8 
 9     // RestClient 已过期
10     @Bean
11     @Lazy
12     @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
13     public RestClient ribbonRestClient(IClientConfig config, ILoadBalancer loadBalancer,
14             ServerIntrospector serverIntrospector, RetryHandler retryHandler) {
15         RestClient client = new RibbonClientConfiguration.OverrideRestClient(config, serverIntrospector);
16         client.setLoadBalancer(loadBalancer);
17         client.setRetryHandler(retryHandler);
18         return client;
19     }
20 
21 }
22 
23 @Target({ ElementType.TYPE, ElementType.METHOD })
24 @Retention(RetentionPolicy.RUNTIME)
25 @Documented
26 @Conditional(OnRibbonRestClientCondition.class)
27 @interface ConditionalOnRibbonRestClient {
28 }
29 
30 private static class OnRibbonRestClientCondition extends AnyNestedCondition {
31     @Deprecated // remove in Edgware"
32     @ConditionalOnProperty("ribbon.http.client.enabled")
33     static class ZuulProperty {
34     }
35 
36     @ConditionalOnProperty("ribbon.restclient.enabled")
37     static class RibbonProperty {
38     }
39 }

RestClient 继承自 AbstractLoadBalancerAwareClient。需要注意的是,RestClient 已经过期,所以生产环境中我们就不要启用 RestTemplate 了。

1 @Deprecated
2 public class RestClient extends AbstractLoadBalancerAwareClient<HttpRequest, HttpResponse> {
3 
4 }

② LoadBalancerContext 类体系结构

负载均衡上下文 LoadBalancerContext 体系的类结构如下。可以看出,Ribbon 是支持 Feign、OkHttp、HttpClient、RestClient 的。默认配置下使用的实现类是 RibbonLoadBalancerContext。

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

③ RestTemplate 的 ClientHttpRequest 工厂类配置

接着看 RibbonAutoConfiguration 中有如下的配置,跟前面 RestClientRibbonConfiguration 也是一样,满足 @ConditionalOnRibbonRestClient 的条件。

可以看到,它会创建 RibbonClientHttpRequestFactory 并设置到 RestTemplate 中,也就是说,这时 RestTemplate 中的 requestFactory 就不是默认的 SimpleClientHttpRequestFactory 了,而是 RibbonClientHttpRequestFactory。

 1 @Configuration(proxyBeanMethods = false)
 2 @ConditionalOnClass(HttpRequest.class)
 3 @ConditionalOnRibbonRestClient
 4 protected static class RibbonClientHttpRequestFactoryConfiguration {
 5     @Autowired
 6     private SpringClientFactory springClientFactory;
 7 
 8     @Bean
 9     public RestTemplateCustomizer restTemplateCustomizer(
10             final RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory) {
11         // RestTemplate 设置 requestFactory 为 RibbonClientHttpRequestFactory
12         return restTemplate -> restTemplate
13                 .setRequestFactory(ribbonClientHttpRequestFactory);
14     }
15 
16     // ClientHttpRequest 工厂类 => RibbonClientHttpRequestFactory
17     @Bean
18     public RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory() {
19         return new RibbonClientHttpRequestFactory(this.springClientFactory);
20     }
21 }

而且,由于这里配置了 RestTemplateCustomizer,原本默认配置下,在 LoadBalancerAutoConfiguration 中创建 RestTemplateCustomizer 的方法就不会生效了。

LoadBalancerAutoConfiguration 中的 RestTemplateCustomizer 是向 RestTemplate 中添加 LoadBalancerInterceptor 拦截器,所以在启用了 RestClient 的情况下,原本的 LoadBalancerInterceptor 就不会生效了。

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

 1 @Configuration(proxyBeanMethods = false)
 2 @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
 3 static class LoadBalancerInterceptorConfig {
 4 
 5     @Bean
 6     public LoadBalancerInterceptor ribbonInterceptor(
 7             LoadBalancerClient loadBalancerClient,
 8             LoadBalancerRequestFactory requestFactory) {
 9         return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
10     }
11 
12     @Bean
13     @ConditionalOnMissingBean
14     public RestTemplateCustomizer restTemplateCustomizer(
15             final LoadBalancerInterceptor loadBalancerInterceptor) {
16         return restTemplate -> {
17             List<ClientHttpRequestInterceptor> list = new ArrayList<>(
18                     restTemplate.getInterceptors());
19             list.add(loadBalancerInterceptor);
20             restTemplate.setInterceptors(list);
21         };
22     }
23 }

View Code

那么 RestTemplate 的 doExecute 方法中,在调用 createRequest 方法创建 ClientHttpRequest 时,就会用 RibbonClientHttpRequestFactory 来创建,进去可以看到 ClientHttpRequest 的实际类型就是 RibbonHttpRequest。

 1 public ClientHttpRequest createRequest(URI originalUri, HttpMethod httpMethod) throws IOException {
 2     String serviceId = originalUri.getHost();
 3     if (serviceId == null) {
 4         throw new IOException("Invalid hostname in the URI [" + originalUri.toASCIIString() + "]");
 5     }
 6     IClientConfig clientConfig = this.clientFactory.getClientConfig(serviceId);
 7     RestClient client = this.clientFactory.getClient(serviceId, RestClient.class);
 8     HttpRequest.Verb verb = HttpRequest.Verb.valueOf(httpMethod.name());
 9 
10     return new RibbonHttpRequest(originalUri, verb, client, clientConfig);
11 }

调用 RibbonHttpRequest 的 execute 方法,实际组中是调用了它的 executeInternal 方法,然后最后是使用 RestClient 来发起负载均衡的调用。

 1 protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
 2     try {
 3         // ...
 4         HttpRequest request = builder.build();
 5         // client => RestClient
 6         HttpResponse response = client.executeWithLoadBalancer(request, config);
 7         return new RibbonHttpResponse(response);
 8     }
 9     catch (Exception e) {
10         throw new IOException(e);
11     }
12 }

④ RestClient HTTP 调用

RestClient 的 executeWithLoadBalancer 实际是进入到父类 AbstractLoadBalancerAwareClient 的 executeWithLoadBalancer  方法中。

从这个方法可以知道,主要的负载均衡请求是在 LoadBalancerCommand 中的,LoadBalancerCommand 必定会通过负载均衡器 ILoadBalancer 得到一个 Server,然后通过 submit 的这个 ServerOperation 对原始URI进行重构,重构之后调用 RestClient 的 execute 发起HTTP请求。

 1 public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
 2     // 负载均衡命令
 3     LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
 4 
 5     try {
 6         // 发起负载均衡请求
 7         return command.submit(
 8             new ServerOperation<T>() {
 9                 @Override
10                 public Observable<T> call(Server server) {
11                     // 重构 URI,将服务名用 Server 的 IP 和端口替换
12                     URI finalUri = reconstructURIWithServer(server, request.getUri());
13                     S requestForServer = (S) request.replaceUri(finalUri);
14                     try {
15                         // execute 发起调用,实际调用的是 RestClient 中的 execute
16                         return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
17                     }
18                     catch (Exception e) {
19                         return Observable.error(e);
20                     }
21                 }
22             })
23             .toBlocking()
24             .single();
25     } catch (Exception e) {
26         //....
27     }
28 }

再看 RestClient 的 execute 方法,最终可以发现,RestClient 其实是使用基于 jersey 的 WebResource 来发起 HTTP 请求的。

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

 1 private HttpResponse execute(HttpRequest.Verb verb, URI uri,
 2         Map<String, Collection<String>> headers, Map<String, Collection<String>> params,
 3         IClientConfig overriddenClientConfig, Object requestEntity) throws Exception {
 4     // ...
 5     // WebResource 是基于 jersey 封装的 HTTP 客户端组件
 6     WebResource xResource = restClient.resource(uri.toString());
 7     ClientResponse jerseyResponse;
 8     Builder b = xResource.getRequestBuilder();
 9     Object entity = requestEntity;
10 
11     switch (verb) {
12     case GET:
13         jerseyResponse = b.get(ClientResponse.class);
14         break;
15     case POST:
16         jerseyResponse = b.post(ClientResponse.class, entity);
17         break;
18     case PUT:
19         jerseyResponse = b.put(ClientResponse.class, entity);
20         break;
21     case DELETE:
22         jerseyResponse = b.delete(ClientResponse.class);
23         break;
24     case HEAD:
25         jerseyResponse = b.head();
26         break;
27     case OPTIONS:
28         jerseyResponse = b.options(ClientResponse.class);
29         break;
30     default:
31         throw new ClientException(
32                 ClientException.ErrorType.GENERAL,
33                 "You have to one of the REST verbs such as GET, POST etc.");
34     }
35 
36     thisResponse = new HttpClientResponse(jerseyResponse, uri, overriddenClientConfig);
37     if (thisResponse.getStatus() == 503){
38         thisResponse.close();
39         throw new ClientException(ClientException.ErrorType.SERVER_THROTTLED);
40     }
41     return thisResponse;
42 }

View Code

⑤ 最后,RestTemplate 基于 RestClient 的请求流程可以用下图来做个总结。

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

4、Apache HttpClient 或 OkHttp 对 RestTemplate 不生效(BUG?)

① Apache HttpClient

默认情况下,ribbon.httpclient.enabled=true,在 HttpClientRibbonConfiguration 中会初始化 apache httpcomponents 相关的组件,前已经分析过了,但是在 RestTemplate 中并未使用相关的组件。

也就是说,默认情况下启用了 apache httpcomponents,但是 RestTemplate 最后是使用 HttpURLConnection 来发起 HTTP 请求的,而不是配置的 CloseableHttpClient。

② OkHttp

首先需要加入 OkHttp 的依赖:

1 <dependency>
2     <groupId>com.squareup.okhttp3</groupId>
3     <artifactId>okhttp</artifactId>
4 </dependency>

然后添加如下配置就可以启用 OkHttp:

1 ribbon: 2  httpclient: 3 enabled: false 4  # 启用 okhttp 5  okhttp: 6 enabled: true

配置 ribbon.okhttp.enabled=true 后,在 OkHttpRibbonConfiguration 中会初始化 OkHttp 相关的组件。

但是调试之后会发现,其实它还是走的默认的流程,就是最终用 HttpURLConnection 发起 HTTP 请求,跟 httpcomponents 是一样的效果。

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

 1 @Configuration(proxyBeanMethods = false)
 2 @ConditionalOnProperty("ribbon.okhttp.enabled")
 3 @ConditionalOnClass(name = "okhttp3.OkHttpClient")
 4 public class OkHttpRibbonConfiguration {
 5 
 6     @RibbonClientName
 7     private String name = "client";
 8 
 9     @Bean
10     @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
11     @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
12     public RetryableOkHttpLoadBalancingClient retryableOkHttpLoadBalancingClient(
13             IClientConfig config, ServerIntrospector serverIntrospector,
14             ILoadBalancer loadBalancer, RetryHandler retryHandler,
15             LoadBalancedRetryFactory loadBalancedRetryFactory, OkHttpClient delegate,
16             RibbonLoadBalancerContext ribbonLoadBalancerContext) {
17         RetryableOkHttpLoadBalancingClient client = new RetryableOkHttpLoadBalancingClient(
18                 delegate, config, serverIntrospector, loadBalancedRetryFactory);
19         client.setLoadBalancer(loadBalancer);
20         client.setRetryHandler(retryHandler);
21         client.setRibbonLoadBalancerContext(ribbonLoadBalancerContext);
22         Monitors.registerObject("Client_" + this.name, client);
23         return client;
24     }
25 
26     @Bean
27     @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
28     @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
29     public OkHttpLoadBalancingClient okHttpLoadBalancingClient(IClientConfig config,
30             ServerIntrospector serverIntrospector, ILoadBalancer loadBalancer,
31             RetryHandler retryHandler, OkHttpClient delegate) {
32         OkHttpLoadBalancingClient client = new OkHttpLoadBalancingClient(delegate, config,
33                 serverIntrospector);
34         client.setLoadBalancer(loadBalancer);
35         client.setRetryHandler(retryHandler);
36         Monitors.registerObject("Client_" + this.name, client);
37         return client;
38     }
39 
40     @Configuration(proxyBeanMethods = false)
41     protected static class OkHttpClientConfiguration {
42 
43         private OkHttpClient httpClient;
44 
45         @Bean
46         @ConditionalOnMissingBean(ConnectionPool.class)
47         public ConnectionPool httpClientConnectionPool(IClientConfig config,
48                 OkHttpClientConnectionPoolFactory connectionPoolFactory) {
49             RibbonProperties ribbon = RibbonProperties.from(config);
50             int maxTotalConnections = ribbon.maxTotalConnections();
51             long timeToLive = ribbon.poolKeepAliveTime();
52             TimeUnit ttlUnit = ribbon.getPoolKeepAliveTimeUnits();
53             return connectionPoolFactory.create(maxTotalConnections, timeToLive, ttlUnit);
54         }
55 
56         @Bean
57         @ConditionalOnMissingBean(OkHttpClient.class)
58         public OkHttpClient client(OkHttpClientFactory httpClientFactory,
59                 ConnectionPool connectionPool, IClientConfig config) {
60             RibbonProperties ribbon = RibbonProperties.from(config);
61             this.httpClient = httpClientFactory.createBuilder(false)
62                     .connectTimeout(ribbon.connectTimeout(), TimeUnit.MILLISECONDS)
63                     .readTimeout(ribbon.readTimeout(), TimeUnit.MILLISECONDS)
64                     .followRedirects(ribbon.isFollowRedirects())
65                     .connectionPool(connectionPool).build();
66             return this.httpClient;
67         }
68 
69         @PreDestroy
70         public void destroy() {
71             if (httpClient != null) {
72                 httpClient.dispatcher().executorService().shutdown();
73                 httpClient.connectionPool().evictAll();
74             }
75         }
76 
77     }
78 
79 }

View Cod

③ 启用 HttpClient 或 OkHttp 不生效的原因

经过前面的分析,可以知道启用 apache httpcomponents 或者 OkHttp,对 RestTemplate 都没有起作用,最终还是用 HttpURLConnection 发起 HTTP 请求。那为什么为出现这种情况呢?我们可以看下 RestTemplate  的 setRequestFactory 方法。

通过 RestTemplate 的 setRequestFactory 方法的注释也可以了解到,默认的 requestFactory 是 SimpleClientHttpRequestFactory,它是基于 JDK 标准 HTTP 库的 HttpURLConnection。

默认的 HttpURLConnection 不支持 PATCH,如果想支持,需设置为 Apache HttpComponents 或 OkHttp 的 request factory。

 1 /**
 2  * Set the request factory that this accessor uses for obtaining client request handles.
 3  * <p>The default is a {@link SimpleClientHttpRequestFactory} based on the JDK's own
 4  * HTTP libraries ({@link java.net.HttpURLConnection}).
 5  * <p><b>Note that the standard JDK HTTP library does not support the HTTP PATCH method.
 6  * Configure the Apache HttpComponents or OkHttp request factory to enable PATCH.</b>
 7  * @see #createRequest(URI, HttpMethod)
 8  * @see SimpleClientHttpRequestFactory
 9  * @see org.springframework.http.client.HttpComponentsAsyncClientHttpRequestFactory
10  * @see org.springframework.http.client.OkHttp3ClientHttpRequestFactory
11  */
12 public void setRequestFactory(ClientHttpRequestFactory requestFactory) {
13     Assert.notNull(requestFactory, "ClientHttpRequestFactory must not be null");
14     this.requestFactory = requestFactory;
15 }

ClientHttpRequestFactory 体系类结构如下:

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

那 RestClient 又是如何生效的呢?通过上一节的分析可以知道,在 RibbonAutoConfiguration 中有如下的配置,这个 RibbonClientHttpRequestFactoryConfiguration 通过自定义 RestTemplateCustomizer 向 RestTemplate 设置了 requestFactory 为 RibbonClientHttpRequestFactory。

 1 @Configuration(proxyBeanMethods = false)
 2 @ConditionalOnClass(HttpRequest.class)
 3 @ConditionalOnRibbonRestClient
 4 protected static class RibbonClientHttpRequestFactoryConfiguration {
 5     @Autowired
 6     private SpringClientFactory springClientFactory;
 7 
 8     @Bean
 9     public RestTemplateCustomizer restTemplateCustomizer(final RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory) {
10         return restTemplate -> restTemplate.setRequestFactory(ribbonClientHttpRequestFactory);
11     }
12 
13     @Bean
14     public RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory() {
15         return new RibbonClientHttpRequestFactory(this.springClientFactory);
16     }
17 }

RibbonClientHttpRequestFactory 是对应 RestClient 的,也就是说要启用 OkHttp 或 HttpClient,还需自己创建对应的 ClientHttpRequestFactory,并设置给 RestTemplate。从上面的类结构可以看出,是提供了 HttpComponentsClientHttpRequestFactory 和 OkHttp3ClientHttpRequestFactory 工厂类了的。

这里其实也比较奇怪,既然启用了 apache httpcomponents 或者 OkHttp,却没有创建默认的 ClientHttpRequestFactory 实现类设置给 RestTemplate,感觉这是 spring-cloud-netflix-ribbon 的一个 BUG。

5、定制 RestTemplate 使用 Apache httpcomponents

如果想让 RestTemplate 使用 httpcomponents  的组件,就需要自己创建一个 ClientHttpRequestFactory,并设置给 RestTemplate。下面我们一步步来看看如何修复这个问题。

① 设置 HttpComponentsClientHttpRequestFactory

httpcomponents  中提供的 ClientHttpRequestFactory 实现类是 HttpComponentsClientHttpRequestFactory,但是并不能直接使用这个工厂类,因为它创建的 HttpComponentsClientHttpRequest 不具备重试的能力,它直接使用 CloseableHttpClient 执行请求,虽然有超时的功能,但不能重试。而且,它本质上也没有负载均衡的能力,需要借助 LoadBalancerInterceptor 拦截器来重构 URI。

 1 final class HttpComponentsClientHttpRequest extends AbstractBufferingClientHttpRequest {
 2     private final HttpClient httpClient;
 3     private final HttpUriRequest httpRequest;
 4     private final HttpContext httpContext;
 5 
 6     @Override
 7     protected ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
 8         // ...
 9         // httpClient => CloseableHttpClient
10         HttpResponse httpResponse = this.httpClient.execute(this.httpRequest, this.httpContext);
11         return new HttpComponentsClientHttpResponse(httpResponse);
12     }
13 }

所以,如果不需要重试的功能,可以直接创建一个 HttpComponentsClientHttpRequest,并设置给 RestTemplate 即可。这样就会使用 LoadBalancerInterceptor 来做负载均衡,重构 URI,然后用 HttpComponentsClientHttpRequest 来执行请求。

1 @Bean
2 @LoadBalanced
3 public RestTemplate restTemplate() {
4     HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory();
5     RestTemplate restTemplate = new RestTemplate();
6     restTemplate.setRequestFactory(requestFactory);
7     return restTemplate;
8 }

② 定制 apache ClientHttpRequestFactory

如果想让 RestTemplate 即有负载均衡的能力,又能使用 apache HttpComponents 组件,且具备重试的功能,我们就需要自己定制 ClientHttpRequestFactory 了。关于重试后面再单独来讲。

对比 RestClient 可以发现,RibbonClientHttpRequestFactory 创建的 RibbonHttpRequest 其实是使用 RestClient 执行请求,而 RestClient  内部使用 LoadBalancerCommand 来进行重试。

类似的,我们至少要用上已经配置好的 RibbonLoadBalancingHttpClient 来执行请求,所以需要自定义一个类似的 RibbonHttpRequest 。

1)定制 apache ClientHttpRequest

创建 ApacheClientHttpRequest 继承自 RibbonHttpRequest,核心的点在于要注入 RibbonLoadBalancingHttpClient,如果要支持重试,需注入 RetryableRibbonLoadBalancingHttpClient。RetryableRibbonLoadBalancingHttpClient 在引入 spring-retry 后才会创建,这个后面分析重试时再看。

然后在 executeInternal 根据 retryable 判断,如果要重试,就调用 execute 方法,看 RetryableRibbonLoadBalancingHttpClient 的源码可以发现,它本身是支持负载均衡的,会自动选择 Server。

如果不需要重试,就需要调用 executeWithLoadBalancer,它是利用 LoadBalancerCommand 来提交请求,就跟 RestClient 是一样的了。但是不一样的地方是 RibbonLoadBalancingHttpClient 的 executeWithLoadBalancer 是不会进行重试的,这个也放到后面分析。

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

 1 package com.lyyzoo.sunny.register.ribbon.apache;
 2 
 3 import java.io.ByteArrayInputStream;
 4 import java.io.ByteArrayOutputStream;
 5 import java.io.IOException;
 6 import java.net.URI;
 7 import java.util.ArrayList;
 8 
 9 import com.netflix.client.config.IClientConfig;
10 import com.netflix.client.http.HttpResponse;
11 import org.springframework.cloud.netflix.ribbon.RibbonHttpRequest;
12 import org.springframework.cloud.netflix.ribbon.RibbonHttpResponse;
13 import org.springframework.cloud.netflix.ribbon.apache.RetryableRibbonLoadBalancingHttpClient;
14 import org.springframework.cloud.netflix.ribbon.apache.RibbonApacheHttpRequest;
15 import org.springframework.cloud.netflix.ribbon.apache.RibbonLoadBalancingHttpClient;
16 import org.springframework.cloud.netflix.ribbon.support.RibbonCommandContext;
17 import org.springframework.http.HttpHeaders;
18 import org.springframework.http.HttpMethod;
19 import org.springframework.http.client.ClientHttpResponse;
20 import org.springframework.util.LinkedMultiValueMap;
21 
22 /**
23  * Apache ClientHttpRequest
24  *
25  * @author bojiangzhou
26  */
27 public class ApacheClientHttpRequest extends RibbonHttpRequest {
28 
29     private final URI uri;
30 
31     private final HttpMethod httpMethod;
32 
33     private final String serviceId;
34 
35     private final RibbonLoadBalancingHttpClient client;
36 
37     private final IClientConfig config;
38     /**
39      * 是否重试
40      */
41     private final boolean retryable;
42 
43     public ApacheClientHttpRequest(URI uri,
44                                    HttpMethod httpMethod,
45                                    String serviceId,
46                                    RibbonLoadBalancingHttpClient client,
47                                    IClientConfig config,
48                                    boolean retryable) {
49         super(uri, null, null, config);
50         this.uri = uri;
51         this.httpMethod = httpMethod;
52         this.serviceId = serviceId;
53         this.client = client;
54         this.config = config;
55         this.retryable = retryable;
56         if (retryable && !(client instanceof RetryableRibbonLoadBalancingHttpClient)) {
57             throw new IllegalArgumentException("Retryable client must be RetryableRibbonLoadBalancingHttpClient");
58         }
59     }
60 
61     @Override
62     protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
63         try {
64             RibbonApacheHttpRequest request = new RibbonApacheHttpRequest(buildCommandContext(headers));
65 
66             HttpResponse response;
67             if (retryable) {
68                 // RetryableRibbonLoadBalancingHttpClient 使用 RetryTemplate 做负载均衡和重试
69                 response = client.execute(request, config);
70             } else {
71                 // RibbonLoadBalancingHttpClient 需调用 executeWithLoadBalancer 才具备负载均衡的能力
72                 response = client.executeWithLoadBalancer(request, config);
73             }
74 
75             return new RibbonHttpResponse(response);
76         } catch (Exception e) {
77             throw new IOException(e);
78         }
79     }
80 
81     protected RibbonCommandContext buildCommandContext(HttpHeaders headers) throws IOException {
82         ByteArrayInputStream requestEntity = null;
83         ByteArrayOutputStream bufferedOutput = (ByteArrayOutputStream) this.getBodyInternal(headers);
84         if (bufferedOutput != null) {
85             requestEntity = new ByteArrayInputStream(bufferedOutput.toByteArray());
86             bufferedOutput.close();
87         }
88 
89         return new RibbonCommandContext(serviceId, httpMethod.name(), uri.toString(), retryable,
90                 headers, new LinkedMultiValueMap<>(), requestEntity, new ArrayList<>());
91     }
92 }

View Code

2)定制 apache ClientHttpRequestFactory

创建 ApacheClientHttpRequestFactory 继承自 HttpComponentsClientHttpRequestFactory,主要是在 createRequest 方法中创建自定义的 ApacheClientHttpRequest。RibbonLoadBalancingHttpClient 可以从 SpringClientFactory 中获取。

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

 1 package com.lyyzoo.sunny.register.ribbon.apache;
 2 
 3 import java.io.IOException;
 4 import java.net.URI;
 5 
 6 import com.netflix.client.config.IClientConfig;
 7 import org.springframework.cloud.netflix.ribbon.SpringClientFactory;
 8 import org.springframework.cloud.netflix.ribbon.apache.RibbonLoadBalancingHttpClient;
 9 import org.springframework.http.HttpMethod;
10 import org.springframework.http.client.ClientHttpRequest;
11 import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
12 import org.springframework.lang.NonNull;
13 
14 /**
15  * Apache HttpComponents ClientHttpRequest factory
16  *
17  * @author bojiangzhou
18  */
19 public class ApacheClientHttpRequestFactory extends HttpComponentsClientHttpRequestFactory {
20 
21     private final SpringClientFactory clientFactory;
22     private final boolean retryable;
23 
24     public ApacheClientHttpRequestFactory(SpringClientFactory clientFactory, boolean retryable) {
25         this.clientFactory = clientFactory;
26         this.retryable = retryable;
27     }
28 
29     @Override
30     @NonNull
31     public ClientHttpRequest createRequest(URI originalUri, HttpMethod httpMethod) throws IOException {
32         String serviceId = originalUri.getHost();
33         if (serviceId == null) {
34             throw new IOException(
35                     "Invalid hostname in the URI [" + originalUri.toASCIIString() + "]");
36         }
37         IClientConfig clientConfig = this.clientFactory.getClientConfig(serviceId);
38         RibbonLoadBalancingHttpClient httpClient = this.clientFactory.getClient(serviceId, RibbonLoadBalancingHttpClient.class);
39 
40         return new ApacheClientHttpRequest(originalUri, httpMethod, serviceId, httpClient, clientConfig, retryable);
41     }
42 }

View Code

3)定制 apache ClientHttpRequestFactory 配置类

跟 RestClient 的配置类类似,定制 ApacheClientHttpRequestFactory 的配置类,同样的,默认启用 httpclient。在存在 RetryTemplate 时,就设置 ApacheClientHttpRequestFactory 的 retryable 参数为 true,否则为 false。

然后自定义 RestTemplateCustomizer,将 ApacheClientHttpRequestFactory 设置到 RestTemplate 中,注意这时 LoadBalancerInterceptor 就不会添加到 RestTemplate 中了。

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

 1 package com.lyyzoo.sunny.register.ribbon.apache;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 
 6 import org.apache.http.client.HttpClient;
 7 import org.apache.http.protocol.HTTP;
 8 import org.springframework.beans.factory.annotation.Autowired;
 9 import org.springframework.boot.autoconfigure.AutoConfigureAfter;
10 import org.springframework.boot.autoconfigure.AutoConfigureBefore;
11 import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
12 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
13 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
14 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
15 import org.springframework.cloud.client.loadbalancer.AsyncLoadBalancerAutoConfiguration;
16 import org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration;
17 import org.springframework.cloud.client.loadbalancer.RestTemplateCustomizer;
18 import org.springframework.cloud.netflix.ribbon.SpringClientFactory;
19 import org.springframework.context.annotation.Bean;
20 import org.springframework.context.annotation.Configuration;
21 import org.springframework.http.client.ClientHttpRequestInterceptor;
22 import org.springframework.web.client.RestTemplate;
23 
24 /**
25  *
26  * @author bojiangzhou
27  */
28 @Configuration
29 @ConditionalOnClass(RestTemplate.class)
30 @AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
31 @AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class})
32 @ConditionalOnProperty(name = "ribbon.httpclient.restTemplate.enabled", matchIfMissing = true)
33 public class ApacheClientHttpRequestFactoryConfiguration {
34 
35     @Configuration(proxyBeanMethods = false)
36     @ConditionalOnClass(HttpClient.class)
37     @ConditionalOnProperty(name = "ribbon.httpclient.enabled", matchIfMissing = true)
38     static class ClientHttpRequestFactoryConfiguration {
39 
40         @Autowired
41         private SpringClientFactory springClientFactory;
42 
43         @Bean
44         @ConditionalOnMissingBean
45         public RestTemplateCustomizer restTemplateCustomizer(
46                 final ApacheClientHttpRequestFactory apacheClientHttpRequestFactory) {
47             return restTemplate -> {
48                 // 设置 RequestFactory
49                 restTemplate.setRequestFactory(apacheClientHttpRequestFactory);
50 
51                 // 添加移除 Content-Length 的拦截器,否则会报错
52                 ClientHttpRequestInterceptor removeHeaderLenInterceptor = (request, bytes, execution) -> {
53                     request.getHeaders().remove(HTTP.CONTENT_LEN);
54                     return execution.execute(request, bytes);
55                 };
56 
57                 List<ClientHttpRequestInterceptor> interceptors = new ArrayList<>(restTemplate.getInterceptors());
58                 // 添加移除Content-Length请求头的Interceptor
59                 interceptors.add(removeHeaderLenInterceptor);
60                 restTemplate.setInterceptors(interceptors);
61             };
62         }
63 
64         @Bean
65         @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
66         public ApacheClientHttpRequestFactory apacheClientHttpRequestFactory() {
67             return new ApacheClientHttpRequestFactory(springClientFactory, false);
68         }
69 
70         @Bean
71         @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
72         public ApacheClientHttpRequestFactory retryableApacheClientHttpRequestFactory() {
73             return new ApacheClientHttpRequestFactory(springClientFactory, true);
74         }
75 
76 
77     }
78 }

View Code

4)简单调试下

配置好之后,把 demo-consumer 服务启动起来,简单测试下。

a) 首先请求会进入到 RestTemplate 的 doExecute 中,然后通过 createRequest,调用 ApacheClientHttpRequestFactory 创建 ApacheClientHttpRequest。

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

b) 接着调用 ApacheClientHttpRequest 的 execute 方法,在 ApacheClientHttpRequest  的 executeInternal 中,就会调用 RibbonLoadBalancingHttpClient 的 executeWithLoadBalancer 方法。

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

c) 最后,进入 RibbonLoadBalancingHttpClient 的 execute 方法中,它又将请求转给了代理对象 delegate 来执行,delegate 就是在 HttpClientRibbonConfiguration 中配置的 CloseableHttpClient 对象,实际类型是 InternalHttpClient。

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

经过验证,通过自定义的配置,最终使得 RestTemplate 可以使用 apache httpcomponents 组件来执行 HTTP 请求。重试那块后面再来研究。

③ 还是用一张图来总结下 RestTemplate 基于 apache HttpClient 后的执行流程

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

6、定制 RestTemplate 使用 OkHttp

① 设置 OkHttp3ClientHttpRequestFactory

类似的,可以给 RestTemplate 直接设置 OkHttp3ClientHttpRequestFactory,但它同样也不具备重试的能力。

1 @Bean
2 @LoadBalanced
3 public RestTemplate restTemplate() {
4     OkHttp3ClientHttpRequestFactory requestFactory = new OkHttp3ClientHttpRequestFactory();
5     RestTemplate restTemplate = new RestTemplate();
6     restTemplate.setRequestFactory(requestFactory);
7     return restTemplate;
8 }

② 定制 OkHttp ClientHttpRequestFactory

与定制 apache httpcomponents  类似,我这里就直接把三个类的代码放出来了。主要的差异就在于使用的 AbstractLoadBalancingClient 不同,apache 是 RibbonLoadBalancingHttpClient,okhttp 是 OkHttpLoadBalancingClient。

a) OkHttpClientHttpRequest:

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

 1 package com.lyyzoo.sunny.register.ribbon.okhttp;
 2 
 3 import java.io.ByteArrayInputStream;
 4 import java.io.ByteArrayOutputStream;
 5 import java.io.IOException;
 6 import java.net.URI;
 7 import java.util.ArrayList;
 8 
 9 import com.netflix.client.config.IClientConfig;
10 import com.netflix.client.http.HttpResponse;
11 import org.springframework.cloud.netflix.ribbon.RibbonHttpRequest;
12 import org.springframework.cloud.netflix.ribbon.RibbonHttpResponse;
13 import org.springframework.cloud.netflix.ribbon.okhttp.OkHttpLoadBalancingClient;
14 import org.springframework.cloud.netflix.ribbon.okhttp.OkHttpRibbonRequest;
15 import org.springframework.cloud.netflix.ribbon.okhttp.RetryableOkHttpLoadBalancingClient;
16 import org.springframework.cloud.netflix.ribbon.support.RibbonCommandContext;
17 import org.springframework.http.HttpHeaders;
18 import org.springframework.http.HttpMethod;
19 import org.springframework.http.client.ClientHttpResponse;
20 import org.springframework.util.LinkedMultiValueMap;
21 
22 /**
23  * OkHttp ClientHttpRequest
24  *
25  * @author bojiangzhou
26  */
27 public class OkHttpClientHttpRequest extends RibbonHttpRequest {
28 
29     private final URI uri;
30 
31     private final HttpMethod httpMethod;
32 
33     private final String serviceId;
34 
35     private final OkHttpLoadBalancingClient client;
36 
37     private final IClientConfig config;
38     /**
39      * 是否重试
40      */
41     private final boolean retryable;
42 
43     public OkHttpClientHttpRequest(URI uri,
44                                    HttpMethod httpMethod,
45                                    String serviceId,
46                                    OkHttpLoadBalancingClient client,
47                                    IClientConfig config,
48                                    boolean retryable) {
49         super(uri, null, null, config);
50         this.uri = uri;
51         this.httpMethod = httpMethod;
52         this.serviceId = serviceId;
53         this.client = client;
54         this.config = config;
55         this.retryable = retryable;
56         if (retryable && !(client instanceof RetryableOkHttpLoadBalancingClient)) {
57             throw new IllegalArgumentException("Retryable client must be RetryableOkHttpLoadBalancingClient");
58         }
59     }
60 
61     @Override
62     protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
63         try {
64             OkHttpRibbonRequest request = new OkHttpRibbonRequest(buildCommandContext(headers));
65 
66             HttpResponse response;
67             if (retryable) {
68                 // RetryableRibbonLoadBalancingHttpClient 本身具备负载均衡的能力
69                 response = client.execute(request, config);
70             } else {
71                 // RibbonLoadBalancingHttpClient 需调用 executeWithLoadBalancer 才具备负载均衡的能力
72                 response = client.executeWithLoadBalancer(request, config);
73             }
74 
75             return new RibbonHttpResponse(response);
76         } catch (Exception e) {
77             throw new IOException(e);
78         }
79     }
80 
81     protected RibbonCommandContext buildCommandContext(HttpHeaders headers) throws IOException {
82         ByteArrayInputStream requestEntity = null;
83         ByteArrayOutputStream bufferedOutput = (ByteArrayOutputStream) this.getBodyInternal(headers);
84         if (bufferedOutput != null) {
85             requestEntity = new ByteArrayInputStream(bufferedOutput.toByteArray());
86             bufferedOutput.close();
87         }
88 
89         return new RibbonCommandContext(serviceId, httpMethod.name(), uri.toString(), retryable,
90                 headers, new LinkedMultiValueMap<>(), requestEntity, new ArrayList<>());
91     }
92 }

View Code

b) OkHttpClientHttpRequestFactory

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

 1 package com.lyyzoo.sunny.register.ribbon.okhttp;
 2 
 3 import java.net.URI;
 4 
 5 import com.netflix.client.config.IClientConfig;
 6 import org.springframework.cloud.netflix.ribbon.SpringClientFactory;
 7 import org.springframework.cloud.netflix.ribbon.okhttp.OkHttpLoadBalancingClient;
 8 import org.springframework.http.HttpMethod;
 9 import org.springframework.http.client.ClientHttpRequest;
10 import org.springframework.http.client.OkHttp3ClientHttpRequestFactory;
11 import org.springframework.lang.NonNull;
12 
13 /**
14  * OkHttp ClientHttpRequest factory
15  *
16  * @author bojiangzhou
17  */
18 public class OkHttpClientHttpRequestFactory extends OkHttp3ClientHttpRequestFactory {
19 
20     private final SpringClientFactory clientFactory;
21     private final boolean retryable;
22 
23     public OkHttpClientHttpRequestFactory(SpringClientFactory clientFactory, boolean retryable) {
24         this.clientFactory = clientFactory;
25         this.retryable = retryable;
26     }
27 
28     @Override
29     @NonNull
30     public ClientHttpRequest createRequest(URI originalUri, HttpMethod httpMethod) {
31         String serviceId = originalUri.getHost();
32         if (serviceId == null) {
33             throw new IllegalStateException(
34                     "Invalid hostname in the URI [" + originalUri.toASCIIString() + "]");
35         }
36         IClientConfig clientConfig = this.clientFactory.getClientConfig(serviceId);
37         OkHttpLoadBalancingClient httpClient = this.clientFactory.getClient(serviceId, OkHttpLoadBalancingClient.class);
38 
39         return new OkHttpClientHttpRequest(originalUri, httpMethod, serviceId, httpClient, clientConfig, retryable);
40     }
41 }

View Code

c) OkHttpClientHttpRequestFactoryConfiguration

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

 1 package com.lyyzoo.sunny.register.ribbon.okhttp;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.boot.autoconfigure.AutoConfigureAfter;
 5 import org.springframework.boot.autoconfigure.AutoConfigureBefore;
 6 import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
 7 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 8 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
 9 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
10 import org.springframework.cloud.client.loadbalancer.AsyncLoadBalancerAutoConfiguration;
11 import org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration;
12 import org.springframework.cloud.client.loadbalancer.RestTemplateCustomizer;
13 import org.springframework.cloud.netflix.ribbon.SpringClientFactory;
14 import org.springframework.context.annotation.Bean;
15 import org.springframework.context.annotation.Configuration;
16 import org.springframework.web.client.RestTemplate;
17 
18 /**
19  *
20  * @author bojiangzhou
21  */
22 @Configuration
23 @ConditionalOnClass(RestTemplate.class)
24 @AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
25 @AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class})
26 @ConditionalOnProperty(name = "ribbon.okhttp.restTemplate.enabled", matchIfMissing = true)
27 public class OkHttpClientHttpRequestFactoryConfiguration {
28 
29     @Configuration(proxyBeanMethods = false)
30     @ConditionalOnProperty("ribbon.okhttp.enabled")
31     @ConditionalOnClass(name = "okhttp3.OkHttpClient")
32     static class ClientHttpRequestFactoryConfiguration {
33 
34         @Autowired
35         private SpringClientFactory springClientFactory;
36 
37         @Bean
38         @ConditionalOnMissingBean
39         public RestTemplateCustomizer restTemplateCustomizer(
40                 final OkHttpClientHttpRequestFactory okHttpClientHttpRequestFactory) {
41             return restTemplate -> restTemplate
42                     .setRequestFactory(okHttpClientHttpRequestFactory);
43         }
44 
45         @Bean
46         @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
47         public OkHttpClientHttpRequestFactory okHttpClientHttpRequestFactory() {
48             return new OkHttpClientHttpRequestFactory(springClientFactory, false);
49         }
50 
51         @Bean
52         @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
53         public OkHttpClientHttpRequestFactory retryableOkHttpClientHttpRequestFactory() {
54             return new OkHttpClientHttpRequestFactory(springClientFactory, true);
55         }
56     }
57 }

View Code

七、RetryTemplate 和 Ribbon 重试

1、AbstractLoadBalancerAwareClient

① AbstractLoadBalancerAwareClient

通过上一节的分析,可以知道有重试功能的其实有两个组件,一个是 Ribbon 的 LoadBalancerCommand,一个是 spring-retry 的 RetryTemplate。RetryableRibbonLoadBalancingHttpClient 和 RetryableOkHttpLoadBalancingClient 都要依赖 RetryTemplate,所以必须先引入 spring-retry 依赖,它们最终都是使用 RetryTemplate 实现请求重试的能力的。除了 RetryTemplate,其它客户端想要获取重试的功能,就要用 ribbon 中的 AbstractLoadBalancerAwareClient 相关的组件,并调用 executeWithLoadBalancer 方法。

再看下 AbstractLoadBalancerAwareClient 的体系,通过源码可以了解到:

  • RetryableFeignLoadBalancer、RetryableRibbonLoadBalancingHttpClient、RetryableOkHttpLoadBalancingClient 都是使用 RetryTemplate 实现重试功能的,也就是 spring-retry 的重试。
  • RestClient、FeignLoadBalancer、RibbonLoadBalancingHttpClient、OkHttpLoadBalancingClient 是在 AbstractLoadBalancerAwareClient 中使用 LoadBalancerCommand 实现重试功能的,就是是 Ribbon 的重试。

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

② executeWithLoadBalancer

具体的 AbstractLoadBalancerAwareClient 客户端想要负载均衡调用以及能进行重试,需调用 AbstractLoadBalancerAwareClient 的 executeWithLoadBalancer 方法。

在这个方法里面,它先构建了 LoadBalancerCommand,然后用 command 提交了一个 ServerOperation,这个 ServerOperation 中对 URI 进行了 重构,转到具体的 LoadBalancerContext 去执行请求。

 1 public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
 2     // 负载均衡命令
 3     LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
 4 
 5     try {
 6         return command.submit(
 7             new ServerOperation<T>() {
 8                 @Override
 9                 public Observable<T> call(Server server) {
10                     // 重构URI
11                     URI finalUri = reconstructURIWithServer(server, request.getUri());
12                     S requestForServer = (S) request.replaceUri(finalUri);
13                     try {
14                         // 使用具体的 AbstractLoadBalancerAwareClient 客户端执行请求
15                         return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
16                     }
17                     catch (Exception e) {
18                         return Observable.error(e);
19                     }
20                 }
21             })
22             .toBlocking()
23             .single();
24     }
25 }

再看 buildLoadBalancerCommand 方法,它首先会通过 getRequestSpecificRetryHandler 方法获取请求重试处理器 RequestSpecificRetryHandler,而 getRequestSpecificRetryHandler 是一个抽象方法。这里就要重点注意了。

 1 // 抽象方法,获取请求重试处理器
 2 public abstract RequestSpecificRetryHandler getRequestSpecificRetryHandler(S request, IClientConfig requestConfig);
 3 
 4 protected LoadBalancerCommand<T> buildLoadBalancerCommand(final S request, final IClientConfig config) {
 5     // 获取请求重试处理器
 6     RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, config);
 7     LoadBalancerCommand.Builder<T> builder = LoadBalancerCommand.<T>builder()
 8             .withLoadBalancerContext(this)
 9             .withRetryHandler(handler)
10             .withLoadBalancerURI(request.getUri());
11     customizeLoadBalancerCommandBuilder(request, config, builder);
12     return builder.build();
13 }

2、请求重试处理器 RequestSpecificRetryHandler

① 先了解下 RequestSpecificRetryHandler:

  • 首先看它的构造方法,注意第一个参数和第二个参数,因为不同的 getRequestSpecificRetryHandler 方法实现,主要差异就在于这两个参数。
  • 然后看 isRetriableException,这个方法就是 LoadBalancerCommand 用来判断异常后是否需要重试的方法,可以了解到 okToRetryOnAllErrors=true 时就可以重试,否则 okToRetryOnConnectErrors=true 才可能重试。需要注意的是就算这个方法返回 true 也不一定会重试,这跟重试次数也是有一定关系的。
 1 public RequestSpecificRetryHandler(boolean okToRetryOnConnectErrors, boolean okToRetryOnAllErrors, RetryHandler baseRetryHandler, @Nullable IClientConfig requestConfig) {
 2     Preconditions.checkNotNull(baseRetryHandler);
 3     this.okToRetryOnConnectErrors = okToRetryOnConnectErrors;
 4     this.okToRetryOnAllErrors = okToRetryOnAllErrors;
 5     this.fallback = baseRetryHandler;
 6     if (requestConfig != null) {
 7         // 在同一个Server上重试的次数
 8         if (requestConfig.containsProperty(CommonClientConfigKey.MaxAutoRetries)) {
 9             retrySameServer = requestConfig.get(CommonClientConfigKey.MaxAutoRetries);
10         }
11         // 重试下一个Server的次数
12         if (requestConfig.containsProperty(CommonClientConfigKey.MaxAutoRetriesNextServer)) {
13             retryNextServer = requestConfig.get(CommonClientConfigKey.MaxAutoRetriesNextServer);
14         }
15     }
16 }
17 
18 @Override
19 public boolean isRetriableException(Throwable e, boolean sameServer) {
20     // 所有错误都重试
21     if (okToRetryOnAllErrors) {
22         return true;
23     }
24     // ClientException 才可能重试
25     else if (e instanceof ClientException) {
26         ClientException ce = (ClientException) e;
27         if (ce.getErrorType() == ClientException.ErrorType.SERVER_THROTTLED) {
28             return !sameServer;
29         } else {
30             return false;
31         }
32     }
33     else  {
34         // 连接错误才重试,就是抛出 SocketException 异常时才重试
35         return okToRetryOnConnectErrors && isConnectionException(e);
36     }
37 }

② 不同 AbstractLoadBalancerAwareClient 的 getRequestSpecificRetryHandler 实现

a)RestClient

默认配置下,RestClient 的 getRequestSpecificRetryHandler 会走到最后一步,okToRetryOnConnectErrors、okToRetryOnAllErrors 都为 true,也就是说 isRetriableException 始终返回 true,也就是说抛出异常都会重试。非GET请求时,okToRetryOnAllErrors为 false,只有连接异常时才会重试。

 1 @Override
 2 public RequestSpecificRetryHandler getRequestSpecificRetryHandler(
 3         HttpRequest request, IClientConfig requestConfig) {
 4     if (!request.isRetriable()) {
 5         return new RequestSpecificRetryHandler(false, false, this.getRetryHandler(), requestConfig);
 6     }
 7     if (this.ncc.get(CommonClientConfigKey.OkToRetryOnAllOperations, false)) {
 8         return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(), requestConfig);
 9     }
10     if (request.getVerb() != HttpRequest.Verb.GET) {
11         return new RequestSpecificRetryHandler(true, false, this.getRetryHandler(), requestConfig);
12     } else {
13         // okToRetryOnConnectErrors、okToRetryOnAllErrors 都为 true
14         return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(), requestConfig);
15     }
16 }

b)AbstractLoadBalancingClient

AbstractLoadBalancingClient 中的 getRequestSpecificRetryHandler 相当于一个默认实现,默认情况下 okToRetryOnAllOperations 为 false,最后也会到最后一步,即 okToRetryOnConnectErrors、okToRetryOnAllErrors 都为 true,isRetriableException 始终返回 true。非GET请求时,okToRetryOnAllErrors为 false,只有连接异常时才会重试。

 1 @Override
 2 public RequestSpecificRetryHandler getRequestSpecificRetryHandler(final S request, final IClientConfig requestConfig) {
 3     // okToRetryOnAllOperations:是否所有操作都重试,默认 false
 4     if (this.okToRetryOnAllOperations) {
 5         return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(), requestConfig);
 6     }
 7     if (!request.getContext().getMethod().equals("GET")) {
 8         return new RequestSpecificRetryHandler(true, false, this.getRetryHandler(), requestConfig);
 9     }
10     else {
11         return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(), requestConfig);
12     }
13 }

c)RibbonLoadBalancingHttpClient

RibbonLoadBalancingHttpClient 也重载了 getRequestSpecificRetryHandler,但是它设置了 okToRetryOnConnectErrors、okToRetryOnAllErrors 都为 false,isRetriableException 始终返回 false。

至此我们应该就知道为什么调用 RibbonLoadBalancingHttpClient 的 executeWithLoadBalancer 不具备重试的功能的原因了。所以启用 apache httpclient 时,RibbonLoadBalancingHttpClient 调用是不支持重试的。

1 @Override
2 public RequestSpecificRetryHandler getRequestSpecificRetryHandler(RibbonApacheHttpRequest request, IClientConfig requestConfig) {
3     // okToRetryOnConnectErrors、okToRetryOnAllErrors 都为 false
4     return new RequestSpecificRetryHandler(false, false, RetryHandler.DEFAULT, requestConfig);
5 }

RetryableRibbonLoadBalancingHttpClient 中也重写了 getRequestSpecificRetryHandler,同样也是设置 okToRetryOnConnectErrors、okToRetryOnAllErrors 都为 false。但是在引入 spring-retry 后,它会使用 RetryTemplate 实现重试的功能。

1 @Override
2 public RequestSpecificRetryHandler getRequestSpecificRetryHandler(RibbonApacheHttpRequest request, IClientConfig requestConfig) {
3     // okToRetryOnConnectErrors、okToRetryOnAllErrors 都为 false
4     return new RequestSpecificRetryHandler(false, false, RetryHandler.DEFAULT, null);
5 }

d)OkHttpLoadBalancingClient

OkHttpLoadBalancingClient 并没有重写 getRequestSpecificRetryHandler,所以它是使用父类 AbstractLoadBalancingClient 中的方法,也就是 okToRetryOnConnectErrors、okToRetryOnAllErrors 都为 true。

所以,启用 okhttp 时,OkHttpLoadBalancingClient 是支持所有GET重试的,非GET请求则在抛出连接异常(SocketException)时支持重试。

而 RetryableOkHttpLoadBalancingClient 跟 RetryableRibbonLoadBalancingHttpClient 一样的重写方式,使用 RetryTemplate 实现重试。

1 @Override
2 public RequestSpecificRetryHandler getRequestSpecificRetryHandler(RibbonApacheHttpRequest request, IClientConfig requestConfig) {
3     // okToRetryOnConnectErrors、okToRetryOnAllErrors 都为 false
4     return new RequestSpecificRetryHandler(false, false, RetryHandler.DEFAULT, null);
5 }

3、LoadBalancerCommand

看 LoadBalancerCommand 的 submit 方法,这个方法是重试的核心代码。

  • 首先获取了同一个Server重试次数 maxRetrysSame 和 重试下一个Server的次数 maxRetrysNext,其实就是前面配置的 ribbon.MaxAutoRetries 和 ribbon.MaxAutoRetriesNextServer,我设置的是 1。
  • 然后创建了一个 Observable,它的第一层会先通过 loadBalancerContext 获取 Server。在重试下一个 Server 时,这里就会获取下一个 Server。
  • 在第二层,又创建了一个 Observable,这个 Observable 就是调用 ServerOperation 的,就是重构 URI,调用具体的 AbstractLoadBalancerAwareClient 执行请求。
  • 在第二层里,会根据 maxRetrysSame 重试同一个 Server,从 retryPolicy 中可以了解到,当重试次数大于 maxRetrysSame 后,同一个 Server 重试就结束了,否则就用 retryHandler.isRetriableException 判断是否重试,这个前面已经分析过了。
  • 在外层,则根据 maxRetrysNext 重试不同的 Server,从 retryPolicy 中可以了解到,当不同Server重试次数大于 maxRetrysNext 后,就重试结束了,整个重试也就结束了,如果还是失败,就会进入 onErrorResumeNext 进行最后的失败处理。

最后来总结一下 LoadBalancerCommand 重试:

  • 重试分为同一个 Server 重试和重试下一个Server,当重试次数大于设置的重试值时,就停止重试。否则通过 retryHandler.isRetriableException 判断是否重试。
  • 那这里一共请求了多少次呢?可以总结出如下公式:请求次数 = (maxRetrysSame + 1) * (maxRetrysNext + 1),所以按 ribbon.MaxAutoRetries = 1、ribbon.MaxAutoRetriesNextServer = 1 的配置,如果每次请求都超时,就会发起 4 次请求。
 1 public Observable<T> submit(final ServerOperation<T> operation) {
 2     final ExecutionInfoContext context = new ExecutionInfoContext();
 3 
 4     // 同一个Server重试次数
 5     final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
 6     // 重试下一个Server的次数
 7     final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
 8 
 9     // 创建一个 Observable
10     Observable<T> o =
11             // 使用 loadBalancerContext 获取 Server
12             (server == null ? selectServer() : Observable.just(server))
13             .concatMap(new Func1<Server, Observable<T>>() {
14                 @Override
15                 public Observable<T> call(Server server) {
16                     // 设置Server
17                     context.setServer(server);
18                     final ServerStats stats = loadBalancerContext.getServerStats(server);
19 
20                     // 创建 Observable
21                     Observable<T> o = Observable
22                             .just(server)
23                             .concatMap(new Func1<Server, Observable<T>>() {
24                                 @Override
25                                 public Observable<T> call(final Server server) {
26                                     // 增加尝试次数
27                                     context.incAttemptCount();
28                                     // ...
29                                     // 调用 ServerOperation
30                                     return operation.call(server).doOnEach(new Observer<T>() {
31                                         // 一些回调方法
32                                     });
33                                 }
34                             });
35                     // 重试同一个Server
36                     if (maxRetrysSame > 0)
37                         o = o.retry(retryPolicy(maxRetrysSame, true));
38                     return o;
39                 }
40             });
41 
42     if (maxRetrysNext > 0 && server == null)
43         // 重试不同Server
44         o = o.retry(retryPolicy(maxRetrysNext, false));
45 
46     return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
47         @Override
48         public Observable<T> call(Throwable e) {
49             // 异常处理
50             return Observable.error(e);
51         }
52     });
53 }
54 
55 // retryPolicy 返回一个是否重试的断言
56 private Func2<Integer, Throwable, Boolean> retryPolicy(final int maxRetrys, final boolean same) {
57     return new Func2<Integer, Throwable, Boolean>() {
58         @Override
59         public Boolean call(Integer tryCount, Throwable e) {
60             // 请求拒绝异常就不允许重试
61             if (e instanceof AbortExecutionException) {
62                 return false;
63             }
64             // 尝试次数是否大于最大重试次数
65             if (tryCount > maxRetrys) {
66                 return false;
67             }
68             // 使用 RequestSpecificRetryHandler 判断是否重试
69             return retryHandler.isRetriableException(e, same);
70         }
71     };
72 }

4、RetryTemplate

① spring-retry

要启用 RetryTemplate 需先引入 spring-retry:

1 <dependency>
2     <groupId>org.springframework.retry</groupId>
3     <artifactId>spring-retry</artifactId>
4 </dependency>

以 RetryableRibbonLoadBalancingHttpClient 为例,先看看它的 execute 方法,它先创建了负载均衡重试策略类 LoadBalancedRetryPolicy,然后将请求调用的逻辑封装到 RetryCallback 中,最后其实就是用 RetryTemplate 执行这个 RetryCallback,也就是说请求重试的逻辑都在 RetryTemplate 中。

 1 public RibbonApacheHttpResponse execute(final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception {
 2     //...
 3 
 4     // 负载均衡重试策略 RibbonLoadBalancedRetryPolicy
 5     final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryFactory.createRetryPolicy(this.getClientName(), this);
 6 
 7     RetryCallback<RibbonApacheHttpResponse, Exception> retryCallback = context -> {
 8         // ...
 9         // delegate => CloseableHttpClient
10         final HttpResponse httpResponse = RetryableRibbonLoadBalancingHttpClient.this.delegate.execute(httpUriRequest);
11         // ...
12         // 成功 返回结果
13         return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI());
14     };
15 
16     LoadBalancedRecoveryCallback<RibbonApacheHttpResponse, HttpResponse> recoveryCallback = new LoadBalancedRecoveryCallback<RibbonApacheHttpResponse, HttpResponse>()//...
17 
18     return this.executeWithRetry(request, retryPolicy, retryCallback, recoveryCallback);
19 }
20 
21 private RibbonApacheHttpResponse executeWithRetry(RibbonApacheHttpRequest request,
22         LoadBalancedRetryPolicy retryPolicy,
23         RetryCallback<RibbonApacheHttpResponse, Exception> callback,
24         RecoveryCallback<RibbonApacheHttpResponse> recoveryCallback)
25         throws Exception {
26     RetryTemplate retryTemplate = new RetryTemplate();
27 
28     // retryable => 取自 RibbonCommandContext 设置的 retryable 参数
29     boolean retryable = isRequestRetryable(request);
30     // 设置重试策略
31     retryTemplate.setRetryPolicy(retryPolicy == null || !retryable
32             ? new NeverRetryPolicy() : new RetryPolicy(request, retryPolicy, this, this.getClientName()));
33 
34     BackOffPolicy backOffPolicy = loadBalancedRetryFactory.createBackOffPolicy(this.getClientName());
35     retryTemplate.setBackOffPolicy(backOffPolicy == null ? new NoBackOffPolicy() : backOffPolicy);
36 
37     // 利用 retryTemplate 执行请求 callback
38     return retryTemplate.execute(callback, recoveryCallback);
39 }

需要注意的是,在 executeWithRetry 中,会判断是否要重试,判断的逻辑中 getRetryable 其实就是取的 ApacheClientHttpRequest 中 executeInternal 方法里创建的 RibbonCommandContext 设置的 retryable 参数,这就和前面定制化的逻辑衔接上了。

1 private boolean isRequestRetryable(ContextAwareRequest request) {
2     if (request.getContext() == null || request.getContext().getRetryable() == null) {
3         return true;
4     }
5     return request.getContext().getRetryable();
6 }

② RetryTemplate

进入 RetryTemplate 的 execute 方法,核心的逻辑我精简成如下代码,主要就是一个 while 循环判断是否可以重试,然后调用 retryCallback 执行请求。请求失败后,比如超时,抛出异常,就会 registerThrowable 来注册异常。

 1 protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
 2         RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException {
 3 
 4     // retryPolicy => InterceptorRetryPolicy
 5     RetryPolicy retryPolicy = this.retryPolicy;
 6     BackOffPolicy backOffPolicy = this.backOffPolicy;
 7     //....
 8     try {
 9         // ...
10         // canRetry 判断是否重试
11         while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
12             try {
13                 // retryCallback 调用
14                 return retryCallback.doWithRetry(context);
15             }
16             catch (Throwable e) {
17                 // ...
18                 // 注册异常
19                 registerThrowable(retryPolicy, state, context, e);
20                 // ...
21             }
22         }
23         exhausted = true;
24         return handleRetryExhausted(recoveryCallback, context, state);
25     }
26     //...
27 }

看 canRetry 方法,它实际是调用了 InterceptorRetryPolicy 的 canRetry。第一次调用时,会去获取 Server;否则就用 RibbonLoadBalancedRetryPolicy 判断是否重试下一个 Server,注意它判断的逻辑是 GET 请求或者允许所有操作操作重试,且 Server 重试次数 nextServerCount  小于等于配置的 MaxAutoRetriesNextServer 。也就是说,while 循环判断的 canRetry 是重试下一个 Server 的。

 1 protected boolean canRetry(RetryPolicy retryPolicy, RetryContext context) {
 2     return retryPolicy.canRetry(context);
 3 }
 4 
 5 //////////// InterceptorRetryPolicy
 6 public boolean canRetry(RetryContext context) {
 7     LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
 8     if (lbContext.getRetryCount() == 0 && lbContext.getServiceInstance() == null) {
 9         // 获取 Server
10         lbContext.setServiceInstance(this.serviceInstanceChooser.choose(this.serviceName));
11         return true;
12     }
13     // RibbonLoadBalancedRetryPolicy => 重试下一个Server
14     return this.policy.canRetryNextServer(lbContext);
15 }
16 
17 ///////// RibbonLoadBalancedRetryPolicy
18 public boolean canRetryNextServer(LoadBalancedRetryContext context) {
19     // 判断重试下一个Server
20     return nextServerCount <= lbContext.getRetryHandler().getMaxRetriesOnNextServer()
21             && canRetry(context);
22 }
23 
24 public boolean canRetry(LoadBalancedRetryContext context) {
25     // GET 请求或者允许所有操作重试时,就允许重试
26     HttpMethod method = context.getRequest().getMethod();
27     return HttpMethod.GET == method || lbContext.isOkToRetryOnAllOperations();
28 }

接着看请求失败后的注册异常 registerThrowable,它最后会向 RibbonLoadBalancedRetryPolicy 注册异常。在 RibbonLoadBalancedRetryPolicy 的 registerThrowable 方法中,如果不重试同一个Server且可以重试下一个Server,就会轮询获取下一个Server。如果可以在同一个Server上重试,sameServerCount 计数器就+1,否则重置 sameServerCount,然后 nextServerCount +1。

 1 protected void registerThrowable(RetryPolicy retryPolicy, RetryState state,
 2         RetryContext context, Throwable e) {
 3     retryPolicy.registerThrowable(context, e);
 4     registerContext(context, state);
 5 }
 6 
 7 ///////// InterceptorRetryPolicy /////////
 8 public void registerThrowable(RetryContext context, Throwable throwable) {
 9     LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
10     lbContext.registerThrowable(throwable);
11     // RibbonLoadBalancedRetryPolicy
12     this.policy.registerThrowable(lbContext, throwable);
13 }
14 
15 ///////// RibbonLoadBalancedRetryPolicy /////////
16 public void registerThrowable(LoadBalancedRetryContext context, Throwable throwable) {
17     //...
18     // 如果不在在同一个Server 上重试且可以重试下一个Server,则重新选择一个 Server
19     if (!canRetrySameServer(context) && canRetryNextServer(context)) {
20         context.setServiceInstance(loadBalanceChooser.choose(serviceId));
21     }
22 
23     // 同一个Server重试超过设置的值后,就重置 sameServerCount
24     if (sameServerCount >= lbContext.getRetryHandler().getMaxRetriesOnSameServer()
25             && canRetry(context)) {
26         // 重置 nextServerCount
27         sameServerCount = 0;
28         // 下一个Server重试次数+1
29         nextServerCount++;
30         if (!canRetryNextServer(context)) {
31             // 不能重试下一个Server了
32             context.setExhaustedOnly();
33         }
34     }
35     else {
36         // 同一个Server重试次数+1
37         sameServerCount++;
38     }
39 }
40 
41 // 判断是否重试同一个Server
42 public boolean canRetrySameServer(LoadBalancedRetryContext context) {
43     return sameServerCount < lbContext.getRetryHandler().getMaxRetriesOnSameServer()
44             && canRetry(context);
45 }
46 
47 public boolean canRetry(LoadBalancedRetryContext context) {
48     // GET 请求或者允许所有操作重试时,就允许重试
49     HttpMethod method = context.getRequest().getMethod();
50     return HttpMethod.GET == method || lbContext.isOkToRetryOnAllOperations();
51 }

5、Ribbon 重试总结

① 首先,Ribbon 关于超时和重试的配置参数如下,这些参数也可以针对某个客户端配置:

 1 ribbon:
 2   # 客户端读取超时时间
 3   ReadTimeout: 1000
 4   # 客户端连接超时时间
 5   ConnectTimeout: 1000
 6   # 默认只重试 GET,设置为 true 时将重试所有类型,如 POST、PUT、DELETE
 7   OkToRetryOnAllOperations: false
 8   # 同一个Server重试次数
 9   MaxAutoRetries: 1
10   # 最多重试几个Server
11   MaxAutoRetriesNextServer: 1

② RetryTemplate 是 spring-retry 的重试组件,LoadBalancerCommand 是 Ribbon 的重试组件。它们重试的请求次数是一样的,重试逻辑也是类似,都是先重试当前 Server,再重试下一个Server,总的请求次数 = (MaxAutoRetries + 1) * (MaxAutoRetriesNextServer + 1)。

③ 但是有点差别的是,RetryTemplate 会判断请求方法为 GET 或者 OkToRetryOnAllOperations=true 时才允许重试,而 LoadBalancerCommand  是GET方法都可以重试,非GET方法在抛出连接异常时也可以重试。这个要注意下,一般只有GET才允许重试,因为GET是查询操作,接口是幂等的,而POST、PUT、DELETE一般是非幂等的。所以一般更建议使用 RetryTemplate,并且配置 OkToRetryOnAllOperations=false。

④ 为了提升服务间通信性能,一般可以启用 apache httpclient 或者 OkHttp,如果要启用重试功能,还需要引入 spring-retry 依赖。重试时,当前Server就不要重试了(MaxAutoRetries=0),直接重试下一个Server。

 1 ribbon:
 2   # 客户端读取超时时间
 3   ReadTimeout: 1000
 4   # 客户端连接超时时间
 5   ConnectTimeout: 1000
 6   # 默认只重试 GET,设置为 true 时将重试所有类型,如 POST、PUT、DELETE
 7   OkToRetryOnAllOperations: false
 8   # 同一个Server重试次数
 9   MaxAutoRetries: 0
10   # 最多重试几个Server
11   MaxAutoRetriesNextServer: 1
12   # 启用 httpclient
13   httpclient:
14     enabled: false
15   # 启用 RestClient
16   restclient:
17     enabled: false
18   # 启用 okhttp
19   okhttp:
20     enabled: true

八、Ribbon 架构图

最后,将 Ribbon 核心组件架构用两张类图总结下。

① 负载均衡器 ILoadBalancer

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

② 负载均衡客户端

SpringCloud 源码系列(5)—— 负载均衡 Ribbon(下)

 

发表评论

评论已关闭。

相关文章