一、异步入门

1.1 async和await

这两个关键词的本质是语法糖。例如:

void _increment() async {
  await Future.delayed(Duration(seconds: 1));
  setState((){ _counter++; });
}

等价于:

void _increment() {
  Future.delayed(Duration(seconds: 1),(){
    setState((){ _counter++; });
  });
}

或者:

void _increment() {
  Future.delayed(Duration(seconds: 1)).then(
  	(value) => setState((){ _counter++; })
  );
}

​ 这里需要纠正一个误区:就像JS那样,dart的异步操作并非多线程。在dart中,每一个线程都被isolate包裹,它们之间不能共享内存,也就避免了死锁问题,降低了程序复杂度,提升GC的性能。线程之间使用消息队列来通信。

​ 一个线程怎么处理等待事件呢?这里就要用到Event Loop机制。内置一个Event Queue和MicroTask Queue,只要MicroTask Queue中还有事件没有处理完,则先处理这里面的事件,而此时EventQueue中的事件一个都不会被执行。使用scheduleMicrotask()来向Microtask Queue添加事件。

void main(){
  Future(() => print('A'));
  print('B');
}
//打印结果:BA

​ 这里我们还是要提醒一下,只有在出现需要等待的事件时,EventLoop机制才能发挥作用。如果卡顿是因为计算量过大造成的,那么只有多开几个Isolate(线程)来解决了。

1.2 运行时机

  1. 直接运行:Future.sync()Future.value()_.then()

  2. Microtask:scheduleMicrotask()Future.microtask()_completed.then()

  3. Event:Future()Future.delayed()

1.3 运行时机举例

  1. 直接运行

    void main(){
      print("1");
      Future.value(func());
      print("3");
    }
    String func(){
      print("2");
      return "";
    }//输出结果:123
    
    void main(){
      print("1");
      Future.sync(()=>print("2"));
      print("3");
    }//输出结果:123
    
  2. Event:

    以http请求来说:

    http.get('www.baidu.com')
      .then((value)=>{null})
      .catchError(...);	//返回一个Future
    

    函数返回Future,这里看起来Future是立刻执行的,但实际上要在下一个循环中才会被执行:

    Future<String> getFuture(){
      return Future(() => "alice");
    }
    void main(){
      getFuture().then((value)=>print(value));
      print("hi");
    }
    //打印结果:hi alice
    

来个大汇总吧:

void main(){
  Future.delayed(Duration(seconds: 1),() => print("event 3"));
  Future.delayed(Duration.zero, ()=>print("event 2"));
  Future(()=>print("event 1"));//这里的event 1与event 2等价,调换语句顺序,执行顺序也会对应调换
  
  scheduleMicrotask(()=>print("mtask 1"));
  Future.microtask(()=>print("mtask 2"));
  Future.value(123).then((value)=>print("mtask 3"));	//_completed.then()
  
  print("main 1");
  Future.sync(()=>print("sync 1"));
  print("main 2");
}
/*
	输出结果:
		mtask 1
		mtask 2
		mtask 3
		event 2
		event 1		
		event 3
		main 1
		sync 1
		main 2
*/

最后落下一个会立即执行的_.then()

Future.delayed(Duration(seconds: 1),() => print("delayed"))
	.then((value){
    scheduleMicrotask(()=>print("micro"));
    print("then");
  }).then((value)=>print("then 2"));
//结果:delayed then then2 micro

1.4 error和whenComplete

getFuture()
  .then((value)=>print(value))
  .catchError((err)=>print(err))
  .whenComplete(()=>print("complete"));

Future<String> getFuture(){
  return Future.error(Exception("wrong"));
}
//wrong complete

二、Builder和Controller

2.1 FutureBuilder

body: FutureBuilder(
	future: Future.value(123),
  builder: (context, AsyncSnapshot<dynamic> snapshot){
    //snapshot代表最近的状态:未完成、完成、出错
    if(snapshot.connectionState == ConnectionState.waiting){
      return CircularProgressIndicator();
    }
    else if(snapshot.connectionState == ConnectionState.done){
      if(snapshot.hasError){
        return Text("error");
      } else {
        return Text("${snapshot.data}");	//data和error只有一个有数据
      }
    }
    //none代表前面设置了future: null,不可能发生;而active设计Stream,暂不考虑
   	throw "should not happen";
  }
)
  
//更加可读
body: FutureBuilder(
	future: Future.value(123),
  initialData: 72,	//未完成时snapshot.data = 72
  builder: (context, AsyncSnapshot<dynamic> snapshot){
    if(snapshot.hasError){
      return Icon(Icons.error, size:80);
    } else if(snapshot.hasData){
      return Text("${snapshot.data}");
    } else {
      return CircularProgressIndicator();
    }
  }
)

​ 值得一提的是,snapshot中的error和data属性只能有一个存在值,如果出错,flutter会先删除原来data中的initialData,再填上对应的error。

2.2 Stream和StreamBuilder

Future负责在以后给我们一个值,而stream则会在以后给我们很多值。

final future = Future.delayed(Duration(seconds: 1), ()=>42);
final stream = Stream.periodic(Duration(seconds: 1), (_) => 42);

@override
void initialState(){
  future.then(...);
  stream.listen((event){
    print("stream: $event");
  });
  //...
}

可以使用StreamBuilder监控stream:

body: StreamBuilder(
	stream: stream,
  builder: (context, AsyncSnapshot<dynamic> snapshot) {
    //可以使用switch来判断snapshot.connectionState
    //...
  }
)

​ Stream和Future的区别之一就在于,Future是在结束(done)的时候返回一个data或error,而Stream是在active的状态返回一个data或error,而stream一旦处于done状态,那么向stream发送数据就会引发异常。

2.3 StreamController

形象地说,Stream就像水流,sink就像水龙头(数据来源),StreamController会同时创建这两者:

final controller = StreamController();

controller.sink.add(72);//添加一个值,数据类型可以混搭
controller.sink.addError("error");//添加错误
controller.stream.listen((event) {  });//处理事件

controller.close();//关闭流

使用StreamBuilder:

body: StreamBuilder(
	stream: controller.stream,		//监听controller
  builder: (context, AsyncSnapshot<dynamic> snapshot) {
    //可以使用switch来判断snapshot.connectionState
    //...
  }
)

2.4 Stream.broadcast()

​ 一般来说,一个Stream只能被一方监听,同时未被接受的数据会被缓存;反过来说,使用Stream.broadcast创建的广播流可以被多方监听,但未被接受的数据不再会被缓存。

三、Stream高级操作

3.1 Stream.map

例如:

stream: controller.stream.map((event) => event * 2),

上面的代码将会将流中的数据重复两次输出。

3.2 Stream.where

stream: controller.stream.where((event) => event is int),

上面的代码只会允许int类型的数据通过。

3.3 Stream.distinct

stream: controller.stream.distinct(),

上面的代码将会过滤重复的值。

3.4 async*

Stream<int> getNumbers() async* {
  while(true){
    await Future.delayed(Duration(seconds: 1));
    yield 1;
  }
}
//StreamBuilder使用该流
StreamBuilder(
	stream: getTime(),
  builder: (...) { ... }
)

上面的代码使用async*关键字,每一秒钟会向流中发送一个数字。

3.5 流中的状态——Stream.transform

StreamBuilder{
  stream: _controller.stream.transform(TallyTransformer()),
  //....
}

//定义自己的TallyTransformer
class TallyTransformer implements StreamTransformer {
  int sum = 0;
  StreamController _controller = StreamController();
  @override
  Stream bind(Stream stream){
    stream.listen((event) {		//我们听源头的这个stream
      sum += event;
      _controller.add(sum);
    });
    return _controller.stream;	//他人听的是这个新的stream
  }
  
  @override
  StreamTransformer<RS, RT> cast<RS, RT>() => 
    StreamTransformer.castFrom(this);	//类型转换
}