一、异步入门
1.1 async和await
这两个关键词的本质是语法糖。例如:
void _increment() async {
await Future.delayed(Duration(seconds: 1));
setState((){ _counter++; });
}
等价于:
void _increment() {
Future.delayed(Duration(seconds: 1),(){
setState((){ _counter++; });
});
}
或者:
void _increment() {
Future.delayed(Duration(seconds: 1)).then(
(value) => setState((){ _counter++; })
);
}
这里需要纠正一个误区:就像JS那样,dart的异步操作并非多线程。在dart中,每一个线程都被isolate包裹,它们之间不能共享内存,也就避免了死锁问题,降低了程序复杂度,提升GC的性能。线程之间使用消息队列来通信。
一个线程怎么处理等待事件呢?这里就要用到Event Loop机制。内置一个Event Queue和MicroTask Queue,只要MicroTask Queue中还有事件没有处理完,则先处理这里面的事件,而此时EventQueue中的事件一个都不会被执行。使用scheduleMicrotask()
来向Microtask Queue添加事件。
void main(){
Future(() => print('A'));
print('B');
}
//打印结果:BA
这里我们还是要提醒一下,只有在出现需要等待的事件时,EventLoop机制才能发挥作用。如果卡顿是因为计算量过大造成的,那么只有多开几个Isolate(线程)来解决了。
1.2 运行时机
-
直接运行:
Future.sync()
、Future.value()
、_.then()
-
Microtask:
scheduleMicrotask()
、Future.microtask()
、_completed.then()
-
Event:
Future()
、Future.delayed()
1.3 运行时机举例
-
直接运行
void main(){ print("1"); Future.value(func()); print("3"); } String func(){ print("2"); return ""; }//输出结果:123
void main(){ print("1"); Future.sync(()=>print("2")); print("3"); }//输出结果:123
-
Event:
以http请求来说:
http.get('www.baidu.com') .then((value)=>{null}) .catchError(...); //返回一个Future
函数返回Future,这里看起来Future是立刻执行的,但实际上要在下一个循环中才会被执行:
Future<String> getFuture(){ return Future(() => "alice"); } void main(){ getFuture().then((value)=>print(value)); print("hi"); } //打印结果:hi alice
来个大汇总吧:
void main(){
Future.delayed(Duration(seconds: 1),() => print("event 3"));
Future.delayed(Duration.zero, ()=>print("event 2"));
Future(()=>print("event 1"));//这里的event 1与event 2等价,调换语句顺序,执行顺序也会对应调换
scheduleMicrotask(()=>print("mtask 1"));
Future.microtask(()=>print("mtask 2"));
Future.value(123).then((value)=>print("mtask 3")); //_completed.then()
print("main 1");
Future.sync(()=>print("sync 1"));
print("main 2");
}
/*
输出结果:
mtask 1
mtask 2
mtask 3
event 2
event 1
event 3
main 1
sync 1
main 2
*/
最后落下一个会立即执行的_.then()
:
Future.delayed(Duration(seconds: 1),() => print("delayed"))
.then((value){
scheduleMicrotask(()=>print("micro"));
print("then");
}).then((value)=>print("then 2"));
//结果:delayed then then2 micro
1.4 error和whenComplete
getFuture()
.then((value)=>print(value))
.catchError((err)=>print(err))
.whenComplete(()=>print("complete"));
Future<String> getFuture(){
return Future.error(Exception("wrong"));
}
//wrong complete
二、Builder和Controller
2.1 FutureBuilder
body: FutureBuilder(
future: Future.value(123),
builder: (context, AsyncSnapshot<dynamic> snapshot){
//snapshot代表最近的状态:未完成、完成、出错
if(snapshot.connectionState == ConnectionState.waiting){
return CircularProgressIndicator();
}
else if(snapshot.connectionState == ConnectionState.done){
if(snapshot.hasError){
return Text("error");
} else {
return Text("${snapshot.data}"); //data和error只有一个有数据
}
}
//none代表前面设置了future: null,不可能发生;而active设计Stream,暂不考虑
throw "should not happen";
}
)
//更加可读
body: FutureBuilder(
future: Future.value(123),
initialData: 72, //未完成时snapshot.data = 72
builder: (context, AsyncSnapshot<dynamic> snapshot){
if(snapshot.hasError){
return Icon(Icons.error, size:80);
} else if(snapshot.hasData){
return Text("${snapshot.data}");
} else {
return CircularProgressIndicator();
}
}
)
值得一提的是,snapshot中的error和data属性只能有一个存在值,如果出错,flutter会先删除原来data中的initialData,再填上对应的error。
2.2 Stream和StreamBuilder
Future负责在以后给我们一个值,而stream则会在以后给我们很多值。
final future = Future.delayed(Duration(seconds: 1), ()=>42);
final stream = Stream.periodic(Duration(seconds: 1), (_) => 42);
@override
void initialState(){
future.then(...);
stream.listen((event){
print("stream: $event");
});
//...
}
可以使用StreamBuilder监控stream:
body: StreamBuilder(
stream: stream,
builder: (context, AsyncSnapshot<dynamic> snapshot) {
//可以使用switch来判断snapshot.connectionState
//...
}
)
Stream和Future的区别之一就在于,Future是在结束(done)的时候返回一个data或error,而Stream是在active的状态返回一个data或error,而stream一旦处于done状态,那么向stream发送数据就会引发异常。
2.3 StreamController
形象地说,Stream就像水流,sink就像水龙头(数据来源),StreamController会同时创建这两者:
final controller = StreamController();
controller.sink.add(72);//添加一个值,数据类型可以混搭
controller.sink.addError("error");//添加错误
controller.stream.listen((event) { });//处理事件
controller.close();//关闭流
使用StreamBuilder:
body: StreamBuilder(
stream: controller.stream, //监听controller
builder: (context, AsyncSnapshot<dynamic> snapshot) {
//可以使用switch来判断snapshot.connectionState
//...
}
)
2.4 Stream.broadcast()
一般来说,一个Stream只能被一方监听,同时未被接受的数据会被缓存;反过来说,使用Stream.broadcast
创建的广播流可以被多方监听,但未被接受的数据不再会被缓存。
三、Stream高级操作
3.1 Stream.map
例如:
stream: controller.stream.map((event) => event * 2),
上面的代码将会将流中的数据重复两次输出。
3.2 Stream.where
stream: controller.stream.where((event) => event is int),
上面的代码只会允许int类型的数据通过。
3.3 Stream.distinct
stream: controller.stream.distinct(),
上面的代码将会过滤重复的值。
3.4 async*
Stream<int> getNumbers() async* {
while(true){
await Future.delayed(Duration(seconds: 1));
yield 1;
}
}
//StreamBuilder使用该流
StreamBuilder(
stream: getTime(),
builder: (...) { ... }
)
上面的代码使用async*
关键字,每一秒钟会向流中发送一个数字。
3.5 流中的状态——Stream.transform
StreamBuilder{
stream: _controller.stream.transform(TallyTransformer()),
//....
}
//定义自己的TallyTransformer
class TallyTransformer implements StreamTransformer {
int sum = 0;
StreamController _controller = StreamController();
@override
Stream bind(Stream stream){
stream.listen((event) { //我们听源头的这个stream
sum += event;
_controller.add(sum);
});
return _controller.stream; //他人听的是这个新的stream
}
@override
StreamTransformer<RS, RT> cast<RS, RT>() =>
StreamTransformer.castFrom(this); //类型转换
}