谈谈OkHttp源码的同步异步原理
2023-03-13 19:19:38来源:Android开发编程
OkHttpClient okHttpClient = new OkHttpClient();Request request = new Request.Builder() .url("http://www.test.com") .build();Call call = okHttpClient.newCall(request);//1.异步请求,通过接口回调告知用户 http 的异步执行结果call.enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { System.out.println(e.getMessage()); } @Override public void onResponse(Call call, Response response) throws IOException { if (response.isSuccessful()) { System.out.println(response.body().string()); } }});//2.同步请求Response response = call.execute();if (response.isSuccessful()) { System.out.println(response.body().string());}2、异步请求实现流程
//get的异步请求 public void getAsync(View view) { //定义okhttp对象 OkHttpClient okHttpClient = new OkHttpClient(); Request request = new Request.Builder().url("http://www.test.com").build(); Call call = okHttpClient.newCall(request); //异步请求:不用创建子线程 //enqueue()并不会阻塞代码的执行,不需要与服务器请求完成之后,才会执行后面的代码 //而且enqueue内部会为我们创建子线程 call.enqueue(new Callback() { @Override public void onFailure(@NotNull Call call, @NotNull IOException e) { } @Override public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException { Log.i("TAG", "onResponse: " + (Looper.getMainLooper().getThread() == Thread.currentThread()));//为false 表示这是在子线程,需要切换到主线程才能操作UI if (response.isSuccessful()){ Log.i(TAG,"getAsync:"+response.body().string()); } } }); }
(资料图片)
call的异步调用是通过RealCall.enqueue()实现的。而请求结果通过Callback回调到主线程。
(1)RealCall.enqueue()@Override public void enqueue(Callback responseCallback) { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } transmitter.callStart(); client.dispatcher().enqueue(new AsyncCall(responseCallback)); }
将用户创建的callback作为参数传入AsyncCall()构造函数。AsyncCall 继承于Runnable。
(2)AsyncCallfinal class AsyncCall extends NamedRunnable { private volatile AtomicInteger callsPerHost = new AtomicInteger(0); ... /** * 该方法是在dispather需要执行此请求的时候,分配给它线程池,此异步请求便在这个线程池中执行网络请求。 */ void executeOn(ExecutorService executorService) { ... boolean success = false; try { //异步的关键:将请求放到线程池中执行。 executorService.execute(this); success = true; } catch (RejectedExecutionException e) { ... success = false; } finally { if (!success) { client.dispatcher().finished(this); // 执行失败会通过Dispatcher进行finished,以后再也不会用此AsyncCall。 } } } @Override protected void execute() { boolean signalledCallback = false; transmitter.timeoutEnter(); try { Response response = getResponseWithInterceptorChain(); signalledCallback = true; //请求成功时,回调Response给到用户 responseCallback.onResponse(RealCall.this, response); } catch (IOException e) { ... //请求错误时,回调错误接口给到用户 responseCallback.onFailure(RealCall.this, e); } finally { //结束一次请求。 client.dispatcher().finished(this); } } }
AsyncCall继承于Runnable,它提供了将Call放到线程池执行的能力,实现了请求的异步流程。
(3)Dispatcher.enqueue()//准备进行异步调用的请求。private final Deque将此请求登记到Dispatcher的预备双端队列中。以此次的请求的Host来查找可服用的异步请求,如果存在,进行复用。尝试将刚刚加入预备队的请求执行。(4)Dipatcher.finish()readyAsyncCalls = new ArrayDeque<>(); //正在执行的异步请求。private final Deque runningAsyncCalls = new ArrayDeque<>();void enqueue(AsyncCall call) { synchronized (this) { //将异步请求加入到双端队列中 readyAsyncCalls.add(call); // 寻找是否有同Host的请求,如果有进行复用 if (!call.get().forWebSocket) { AsyncCall existingCall = findExistingCallWithHost(call.host()); if (existingCall != null) call.reuseCallsPerHostFrom(existingCall); } } //将符合条件的Ready的异步请求转入runningAsyncCalls,并执行 promoteAndExecute(); }
private调度器结束一次请求。当一个异步任务完成后,调度器会触发一次预备任务执行流程。让之前因为最大请求数等限制而不能执行的请求有机会得到执行。通过idleCallback.run()通知此时的调度器空闲状态。(5)Dipatcher.promoteAndExecute()void finished(Deque calls, T call) { Runnable idleCallback; synchronized (this) { ... //一个请求完成后,检查此时是否有在等待执行的请求,并处理。 boolean isRunning = promoteAndExecute(); if (!isRunning && idleCallback != null) { //通知此时已经没有异步请求任务 idleCallback.run(); } }
private boolean promoteAndExecute() { ... List该方法是对预备队列里的请求提升至执行队列并执行的一次尝试。如果不能执行,他启动时机将会延后到其他请求结束。3、同步请求executableCalls = new ArrayList<>(); boolean isRunning; synchronized (this) { for (Iterator i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall asyncCall = i.next(); //检查最大请求数限制和 if (runningAsyncCalls.size() >= maxRequests) break; if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; //满足条件,便把预备队的请求提升到执行队列。 i.remove(); asyncCall.callsPerHost().incrementAndGet(); executableCalls.add(asyncCall); runningAsyncCalls.add(asyncCall); } isRunning = runningCallsCount() > 0; } //将可执行的异步请求放进线程池执行 for (int i = 0, size = executableCalls.size(); i < size; i++) { AsyncCall asyncCall = executableCalls.get(i); // asyncCall.executeOn(executorService()); } return isRunning;
public static final MediaType JSON = MediaType.get("application/json; charset=utf-8");OkHttpClient client = new OkHttpClient();String post(String url, String json) throws IOException { RequestBody body = RequestBody.create(json, JSON); Request request = new Request.Builder() .url(url) .post(body) .build(); try (Response response = client.newCall(request).execute()) { return response.body().string(); }}通过Builder模式构建Request。调用client.newCall(),通过request生成一个Call对象。他的实现类是RealCall。随后调用RealCall.execute(),进行同步请求。(1)RealCall.execute()
@Override public Response execute() throws IOException { synchronized (this) { // 如果该请求已经执行过,报错。 if (executed) throw new IllegalStateException("Already Executed"); executed = true; } transmitter.timeoutEnter(); transmitter.callStart(); try { //获取 client 里面的调度器 Dispatcher 并记录这个请求。 client.dispatcher().executed(this); //通过责任链的方式将发起请求并返回结果。这里是同步动作,会阻塞。 return getResponseWithInterceptorChain(); } finally { //请求完后需要把这个请求从调度器中移走 client.dispatcher().finished(this); } }判断Call的合法性。将RealCall传进Client里面的Dispatcher.executed()里,而Dispatcher是在 OkHttpClient 被的构建函数里被创建并作为成员变量的。开启责任链模式,进行请求相关逻辑。执行完成后,调度器对这个请求进行收尾工作。(2)Dispatcher.executed()
/** Running asynchronous calls. Includes canceled calls that haven"t finished yet. */ private final Deque(3)RealCall.getResponseWithInterceptorChain()runningAsyncCalls = new ArrayDeque<>(); synchronized void executed(RealCall call) { runningSyncCalls.add(call); }
Response getResponseWithInterceptorChain() throws IOException { // 将请求的具体逻辑进行分层,并采用责任链的方式进行构造。 List将用户自定义的拦截器先加入集合中。加入一次请求中需要用的拦截器,这些拦截器代表一次完整的网络请求逻辑被分了几层以及他们的先后顺序。从代码中我们不难看出他们的流程是:重试/重定向逻辑->网络桥逻辑->缓存逻辑->建立网络连接逻辑->网络通行逻辑。用以上拦截器集合构建出一条逻辑处理的拦截链,并将这条链需要使用的拦截器下标赋值为0,从第一个开始。调用chain.proceed()启动这条链的处理流程。使用责任链的设计模式来处理一次网络请求中的逻辑可以有效的划分逻辑层。而前一个拦截器可以根据实际的处理情况来决定下一拦截器是否应该继续处理。(4)RealInterceptorChain.proceed()interceptors = new ArrayList<>(); // 用户自已的请求拦截器 interceptors.addAll(client.interceptors()); //重试和重定向拦截器 interceptors.add(new RetryAndFollowUpInterceptor(client)); //桥拦截器 interceptors.add(new BridgeInterceptor(client.cookieJar())); //缓存逻辑拦截器 interceptors.add(new CacheInterceptor(client.internalCache())); //网络连接逻辑拦截器 interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } // 网络请求拦截器,真正网络通行的地方,这个拦截器处理过后会生成一个Response interceptors.add(new CallServerInterceptor(forWebSocket)); //依照如上配置,构建出一个请求的处理逻辑责任链,特别注意:这条链开始于下标位置为的0拦截器。 Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this, client.connectTimeoutMillis(), client.readTimeoutMillis(), client.writeTimeoutMillis()); boolean calledNoMoreExchanges = false; try { //按下处理逻辑链条的开关。 Response response = chain.proceed(originalRequest); if (transmitter.isCanceled()) { closeQuietly(response); throw new IOException("Canceled"); } //返回请求结果 return response; } catch (IOException e) { calledNoMoreExchanges = true; throw transmitter.noMoreExchanges(e); } finally { if (!calledNoMoreExchanges) { transmitter.noMoreExchanges(null); } } }
RealInterceptorChain.javapublic Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange) throws IOException { if (index >= interceptors.size()) throw new AssertionError(); calls++; ... /** * index+1:构建出新的拦截链,不过新的拦截链的处理拦截器是下标为index+1的 * 实现了责任链中,处理逻辑的流转。 */ RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange, index + 1, request, call, connectTimeout, readTimeout, writeTimeout); //此时index = 0;所以拿到了第一个拦截器,并且调用他的intercept 方法进行具体逻辑处理。 Interceptor interceptor = interceptors.get(index); //当前拦截器对网络请求进行处理。 Response response = interceptor.intercept(next); // Confirm that the next interceptor made its required call to chain.proceed(). if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) { throw new IllegalStateException("network interceptor " + interceptor + " must call proceed() exactly once"); } // 省略对response合法性的检查代码 ... return response; }先index+1,构建index指向下一个拦截器的责任链。在链中取出下标当前index(此时为0)的拦截器,并调用intercept(next)进行此次拦截器逻辑的真正处理。这里注意,传递进去的参数正是1中构建出来的next责任链。在Interceptor.intercept()方法内部进行自身的逻辑处理以后,会调用next.proceed()进行一次传递,由下一个拦截器进行处理。而此次请求也是这样在各个拦截器中被 处理->传递->处理->...->返回结果。4、异步和同步总结
同步
RealCall.execute() ➡️ Dispatcher.execute() ---runningSyncCalls(异步请求执行队列)添加 call 对象。getResponseWithInterceptorChain() 返回 Response 对象。Dispatcher.finished()。异步
RealCall. enqueue() ➡️ Dispatcher. enqueue()---readyAsyncCalls(异步请求就绪队列)添加 call 对象。readyAsyncCalls(异步请求就绪队列),将满足条件的添加到临时队列和runningAsyncCalls (异步请求的执行队列)。getResponseWithInterceptorChain() 返回 Response 对象;。Dispatcher.finished()。关键词: