0%

Netty-ChannelFuture-源码分析

源码版本为: netty-all-4.1.59.Final

异步和事件驱动

Netty 是异步事件驱动的框架,该框架体现为所有的I/O操作都是异步的,所有的I/O调用会立即返回,并不保证调用成功与否,但是调用会返回ChannelFuture。Netty 会通过 ChannelFuture通知调用是成功了还是失败了,抑或是取消了。

ChannelFuture

这里是其中一个顶层类的接口实现,可以看到除了future的isDone以外,还增加了isSuccess,cause等接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214

+---------------------------+
| Completed successfully |
+---------------------------+
+----> isDone() = true |
+--------------------------+ | | isSuccess() = true |
| Uncompleted | | +===========================+
+--------------------------+ | | Completed with failure |
| isDone() = false | | +---------------------------+
| isSuccess() = false |----+----> isDone() = true |
| isCancelled() = false | | | cause() = non-null |
| cause() = null | | +===========================+
+--------------------------+ | | Completed by cancellation |
| +---------------------------+
+----> isDone() = true |
| isCancelled() = true |
+---------------------------+

public interface Future<V> extends java.util.concurrent.Future<V> {

/**
* Returns {@code true} if and only if the I/O operation was completed
* successfully.
*/
boolean isSuccess();

/**
* returns {@code true} if and only if the operation can be cancelled via {@link #cancel(boolean)}.
*/
boolean isCancellable();

/**
* Returns the cause of the failed I/O operation if the I/O operation has
* failed.
*
* @return the cause of the failure.
* {@code null} if succeeded or this future is not
* completed yet.
*/
Throwable cause();

/**
* Adds the specified listener to this future. The
* specified listener is notified when this future is
* {@linkplain #isDone() done}. If this future is already
* completed, the specified listener is notified immediately.
*/
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

/**
* Adds the specified listeners to this future. The
* specified listeners are notified when this future is
* {@linkplain #isDone() done}. If this future is already
* completed, the specified listeners are notified immediately.
*/
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

/**
* Removes the first occurrence of the specified listener from this future.
* The specified listener is no longer notified when this
* future is {@linkplain #isDone() done}. If the specified
* listener is not associated with this future, this method
* does nothing and returns silently.
*/
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

/**
* Removes the first occurrence for each of the listeners from this future.
* The specified listeners are no longer notified when this
* future is {@linkplain #isDone() done}. If the specified
* listeners are not associated with this future, this method
* does nothing and returns silently.
*/
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

/**
* Waits for this future until it is done, and rethrows the cause of the failure if this future
* failed.
*/
Future<V> sync() throws InterruptedException;

/**
* Waits for this future until it is done, and rethrows the cause of the failure if this future
* failed.
*/
Future<V> syncUninterruptibly();

/**
* Waits for this future to be completed.
*
* @throws InterruptedException
* if the current thread was interrupted
*/
Future<V> await() throws InterruptedException;

/**
* Waits for this future to be completed without
* interruption. This method catches an {@link InterruptedException} and
* discards it silently.
*/
Future<V> awaitUninterruptibly();

/**
* Waits for this future to be completed within the
* specified time limit.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*
* @throws InterruptedException
* if the current thread was interrupted
*/
boolean await(long timeout, TimeUnit unit) throws InterruptedException;

/**
* Waits for this future to be completed within the
* specified time limit.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*
* @throws InterruptedException
* if the current thread was interrupted
*/
boolean await(long timeoutMillis) throws InterruptedException;

/**
* Waits for this future to be completed within the
* specified time limit without interruption. This method catches an
* {@link InterruptedException} and discards it silently.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*/
boolean awaitUninterruptibly(long timeout, TimeUnit unit);

/**
* Waits for this future to be completed within the
* specified time limit without interruption. This method catches an
* {@link InterruptedException} and discards it silently.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*/
boolean awaitUninterruptibly(long timeoutMillis);

/**
* Return the result without blocking. If the future is not done yet this will return {@code null}.
*
* As it is possible that a {@code null} value is used to mark the future as successful you also need to check
* if the future is really done with {@link #isDone()} and not relay on the returned {@code null} value.
*/
V getNow();

/**
* {@inheritDoc}
*
* If the cancellation was successful it will fail the future with an {@link CancellationException}.
*/
@Override
boolean cancel(boolean mayInterruptIfRunning);
}



public interface ChannelFuture extends Future<Void> {

/**
* Returns a channel where the I/O operation associated with this
* future takes place.
*/
Channel channel();

@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);

@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

@Override
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

@Override
ChannelFuture sync() throws InterruptedException;

@Override
ChannelFuture syncUninterruptibly();

@Override
ChannelFuture await() throws InterruptedException;

@Override
ChannelFuture awaitUninterruptibly();

/**
* Returns {@code true} if this {@link ChannelFuture} is a void future and so not allow to call any of the
* following methods:
* <ul>
* <li>{@link #addListener(GenericFutureListener)}</li>
* <li>{@link #addListeners(GenericFutureListener[])}</li>
* <li>{@link #await()}</li>
* <li>{@link #await(long, TimeUnit)} ()}</li>
* <li>{@link #await(long)} ()}</li>
* <li>{@link #awaitUninterruptibly()}</li>
* <li>{@link #sync()}</li>
* <li>{@link #syncUninterruptibly()}</li>
* </ul>
*/
boolean isVoid();


ChannelFuture类似Java并发包的future类,不过功能有很多增强,比如对成功,失败的类型有区分,还有监听器的回调功能

刚创建的时候是uncomplete状态。当异步任务执行完后,会修改该类的状态。判断的方法有

  • 通过isDone方法来判断当前操作是否完成。
  • 通过isSuccess方法来判断已完成的当前操作是否成功。
  • 通过getCause方法来获取已完成的当前操作失败的原因。
  • 通过isCancelled方法来判断已完成的当前操作是否被取消。

DefaultPromise

DefaultPromise是ChannelFuture的实现,这个是一个默认的实现方法

通过以下几个方法可以看到各个功能的实现方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

public Promise<V> setSuccess(V result) {
//CAS更新RESULT_UPDATER的状态,
if (setSuccess0(result)) {
//通知监听器
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}


private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
checkNotifyWaiters();
return true;
}
return false;
}

//如果有waiter的话唤醒waiter
private synchronized void checkNotifyWaiters() {
if (waiters > 0) {
notifyAll();
}
}


private void notifyListeners() {
EventExecutor executor = executor();
//判断当前线程和future所执行的,是不是同一个线程,是就直接执行
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
//不是的话,把当前这个任务丢给他所属的executor来执行
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}

//添加监听器的方法
//Netty是异步的执行的,所以也兼容了添加监听器时当前任务可能是已经完成的情况,在添加监听器时,如果该任务已经完成了,那么也会调用notifyListeners的操作。所以可以放心使用监听器,即使异步任务已经完成了也可以挂载监听任务
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");

synchronized (this) {
addListener0(listener);
}

if (isDone()) {
notifyListeners();
}

return this;
}

例子

AbstractBootStrap.doBind()

以Netty的启动过程为例子,看看future类是怎么用的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

private ChannelFuture doBind(final SocketAddress localAddress) {
//启动过程先拿到initAndRegister的Future
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
//如果这个时候已经注册完毕了,那就直接调用
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
// 创建一个新的promise promise和future的差别就是promise是可写可修改状态的
ChannelPromise promise = channel.newPromise();
//doBind的是一个异步操作,下面直接就返回了这个promise了
//等doBind操作完后修改promise的状态
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
//这里也好理解,因为之前判断注册任务没有完成,所以这里就挂载一个监听器,如果完成了的话就开始doBind的操作,从而实现整个过程都是异步的
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();

doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}