Day 13: Streams - Reactive Programming Fundamentals
Go beyond single values to continuous data flows. Learn Streams, StreamController, and reactive programming - the foundation for live Flutter features!
မင်္ဂလာပါ.. 👋
100 days of Flutter ရဲ့ Day 13 က ကြိုဆိုပါတယ်။ ဒီနေ့မှာတော့ dart langauge နဲ့ ပတ်သက်တာတွေကို ဆက်လက်ပြီး လေ့လာသွားပါမယ်။ ဒီနေ့ဆွေးနွေးသွားမယ့် ခေါင်းစဥ်ကတော့ Streams အကြောင်းပဲ ဖြစ်ပါတယ်။
မနေ့တုန်းက API တွေခေါ်တာ၊ Database ကနေ သွားပြီးတော့ data တွေ ယူတဲ့အခါ စောင့်ရတဲ့အခါ Future တွေ အသုံးပြုတာကို လေ့လာပြီးပြီပဲ ဖြစ်ပါတယ်။ ဒီနေ့မှာတော့ data တွေက တခါထဲ ရတာမျိုး မဟုတ်ဘဲ တခုပြီးတခု လာနေတာမျိုးဆို ဘယ်လိုအသုံးပြုကြမလဲဆိုတာ ဆက်ကြည့်သွားပါမယ်။ ဒါကိုတော့ Dart မှာ Stream လို့ခေါ်ပါတယ်။ ရေစီးကြောင်းလိုမျိုးပေါ့။ Data တွေက စီးဆင်းနေမှာ ဖြစ်ပြီးတော့ ကိုယ်လိုချင်တာကို ခပ်ပြီး သုံးသွားရသလိုပါ။ Future နဲ့မတူတာက Stream က data တွေက တခါထပ်ပိုပြီးတော့ ဝင်လာနိုင်တာပါ။
Streams တွေကိုတော့ real-time app တွေ ရေးချင်တဲ့အခါမျိုးမှာ အသုံးပြုလို့ရပါတယ်။ ဥပမာ Chat app တွေ၊ Live Score ပြတဲ့ app တွေ၊ Stock app တွေ၊ Ride sharing app တွေ ရေးချင်တဲ့အခါဆို stream အကြောင်းက မသိမဖြစ်လို့ ပြောလို့ရပါတယ်။
ဒီတော့ ဘယ်လို code တွေရေးပြီး Dart ရဲ့ Stream တွေကို အသုံးပြုလို့ရမလဲ ကြည့်ရအောင်ပါ။
Stream
// Emit one value
Stream<int> stream1 = Stream.value(42);
// Emit nothing
Stream<String> stream2 = Stream.empty();
// Emit three values
Stream<int> stream3 = Stream.fromIterable([1, 2, 3]);
// Emit forever (until cancelled)
Stream<int> stream4 = Stream.periodic(Duration(seconds: 1), (count) => count);ဒါကတော့ ထည့်ပေးမယ့် value တွေ သိပြီးတော့ stream အနေနဲ့ ပေးချင်တာဆိုရင် သုံးလို့ရပါတယ်။ Stream.value(...) ကတော့ value ကို တိုက်ရိုက် ထည့်ပေးလို့ရတာပဲ ဖြစ်ပါတယ်။ Stream.fromIterable(...) ကတော့ သူ့ထဲမှာ ကြေငြာထားတာတွေကို တခုချင်း Stream ထဲကို ထည့်ပေးသွားမှာပါ။ Stream.periodic(...) ကတော့ သူ့ထဲမှာ value တွေကို ကြေငြာထားတဲ့ အချိန်ပိုင်းအလိုက် publish လုပ်ပေးနေမှာပဲ ဖြစ်ပါတယ်။ သူ့ကို သုံးရင်တော့ နည်းနည်း သတိထားဖို့လိုပါတယ်။ ဘာဖြစ်လို့လဲဆိုတော့ သူ့ကို မရပ်မချင်း publish လုပ်နေမှာ ဖြစ်လို့ပါ။ သူတို့ကို listen လုပ်လိုက်တာနဲ့ ကြေငြာထားတဲ့ value ကို စပြီးပို့ပေးသွားမှာမျိုးဖြစ်ပါတယ်။
StreamController
final controller = StreamController<String>();
controller.sink.add("Hello");
controller.sink.add("World");
Stream<String> myStream = controller.stream;
controller.close();ဒါကတော့ ကိုယ်တိုင် value တွေကို အခြားနေရာတွေက ထည့်ပေးသွားချင်တာမျိုးဆို သုံးလို့ရပါတယ်။ StreamController ကိုအရင်ဆုံးကြေငြာပြီး သူ့ထဲကို data တွေ publish လုပ်ချင်တယ်ဆို controller.sink.add(...) ဆိုတာကို ခေါ်ပြီး data တွေ ထည့်ပေးသွားလို့ရပါတယ်။ နေရာအများကြီးကနေ stream ကို သုံးဖို့လိုတဲ့အခါ အခုလိုမျိုး stream controller တည်ဆောက်ပြီး data တွေ ပို့ပေးလို့ရပါတယ်။ သူ့ကို အသုံးပြုပြီး stream ထဲက data တွေ ရယူဖို့ဆိုရင်တော့ controller.stream ဆိုပြီး သုံးနိုင်ပါတယ်။
Subscription
StreamSubscription<String> subscription = myStream.listen(
(data) {
print("Received: $data");
},
onError: (error) {
print("Error: $error");
},
onDone: () {
print("Stream complete!");
},
);
subscription.cancel();Stream ထဲက data တွေကို ရယူဖို့ listen ဆိုပြီး ရေးလိုက်တာက subscribe လုပ်လိုက်တာနဲ့ အလားသဏ္ဍန်တူပါတယ်။ listen လို့ ရေးလိုက်တာနဲ့ stream ထဲကို ဝင်လာတဲ့ value တွေကို ပထမဆုံး callback နေရာမှာ ရယူနိုင်သွားပါလိမ့်မယ်။ stream ထဲကို data မဟုတ်ဘဲ error pass ပေးလို့လဲရပါတယ်။ ဒါဆိုရင်တော့ onError ဆိုတဲ့ named callback function ထဲကို ဝင်သွားပါလိမ့်မယ်။ နောက်ဆုံး stream ထဲက data တွေ ဝင်လာတာပြီးသွားတဲ့အခါ ဒါမှမဟုတ် stream ကို close လုပ်လိုက်တာမျိုးဆိုရင်တော့ onDone ဆိုတဲ့ named callback function ထဲကို ရောက်သွားမှာပဲ ဖြစ်ပါတယ်။
Stream Transformers
myStream
.map((value) => value * 2)
.where((value) => value > 10)
.take(5)
.listen((data) => print(data));ဒါကိုတော့ stream ထဲက value တွေကို ဝင်လာတဲ့ အချိန်မှာ tramsform/filter/limit လုပ်ချင်တာမျိုးဆို သုံးလို့ရပါတယ်။ ရှေ့မှာ ပြောခဲ့တဲ့ method chaining ဆိုတာကို stream မှာလဲ သုံးလို့ရတာဖြစ်ပါတယ်။
အခု stream နဲ့ပတ်သက်တဲ့ အခြေခံတွေ ပြောပြီးပြီဆိုတော့ ဥပမာလေးတွေ ဆက်ကြည့်ကြည့်ရအောင်ပါ။
Example 1 - Basic Timer
void main() async {
print('🚀 Starting countdown at ${DateTime.now()}');
Stream<int> countdown = Stream.periodic(
const Duration(seconds: 1),
(count) => 10 - count,
).take(11);
final subscription = countdown.listen(
(seconds) {
if (seconds > 0) {
print('⏲️ $seconds');
} else {
print('BLAST OFF!!!!');
}
},
onError: (error) {
print('Error $error');
},
onDone: () {
print('✅ Countdown complete');
},
);
await Future.delayed(const Duration(seconds: 12));
await subscription.cancel();
}
// Output:
// 🚀 Starting countdown at 2026-01-13 21:54:36.206237
// ⏲️ 10
// ⏲️ 9
// ⏲️ 8
// ⏲️ 7
// ⏲️ 6
// ⏲️ 5
// ⏲️ 4
// ⏲️ 3
// ⏲️ 2
// ⏲️ 1
// BLAST OFF!!!!
// ✅ Countdown completeExample 2 - Chat Room Simulator
import 'dart:async';
class ChatRoom {
// Private controller - only we can add messages
final _messageController = StreamController<ChatMessage>();
// Public stream - anyone can listen
Stream<ChatMessage> get messages => _messageController.stream;
// Send a message to the stream
void sendMessage(String username, String text) {
final message = ChatMessage(
username: username,
text: text,
timestamp: DateTime.now(),
);
_messageController.sink.add(message);
}
// CRITICAL: Clean up resources
void dispose() {
_messageController.close();
}
}
class ChatMessage {
final String username;
final String text;
final DateTime timestamp;
ChatMessage({
required this.username,
required this.text,
required this.timestamp,
});
@override
String toString() {
final time =
"${timestamp.hour}:${timestamp.minute.toString().padLeft(2, '0')}";
return "[$time] $username: $text";
}
}
void main() async {
print("💬 Chat Room Started\n");
final chatRoom = ChatRoom();
// Subscribe to messages
final subscription = chatRoom.messages.listen((message) => print(message));
// Simulate conversation
chatRoom.sendMessage("Alice", "Hey everyone! 👋");
await Future.delayed(Duration(seconds: 1));
chatRoom.sendMessage("Bob", "Hi Alice! How's Flutter going?");
await Future.delayed(Duration(seconds: 1));
chatRoom.sendMessage("Alice", "Amazing! Just learned Streams today 🔥");
await Future.delayed(Duration(seconds: 1));
chatRoom.sendMessage("Charlie", "Streams are powerful! 💪");
// Cleanup
await Future.delayed(Duration(seconds: 1));
print("\n👋 Chat Room Closing...");
await subscription.cancel();
chatRoom.dispose();
}
// Output:
// 💬 Chat Room Started
// [14:30] Alice: Hey everyone! 👋
// [14:30] Bob: Hi Alice! How's Flutter going?
// [14:30] Alice: Amazing! Just learned Streams today 🔥
// [14:30] Charlie: Streams are powerful! 💪
// 👋 Chat Room Closing...
Example 3 - Stock Price Monitor
import 'dart:async';
import 'dart:math';
void main() async {
print("📊 Stock Price Monitor Started\n");
// Generate random stock prices every 500ms
Stream<double> stockPrices = Stream.periodic(
Duration(milliseconds: 500),
(_) => 100 + Random().nextDouble() * 100, // $100-$200
).take(20);
// Transform the stream with multiple operations
final alertStream = stockPrices
.map((price) => price.toStringAsFixed(2)) // Format to 2 decimals
.map((priceStr) => double.parse(priceStr)) // Parse back
.where((price) => price > 150.0) // Only prices above $150
.map((price) => StockAlert(price: price, timestamp: DateTime.now()));
// Listen to alerts
int alertCount = 0;
final subscription = alertStream.listen(
(alert) {
alertCount++;
print(
"🟢 ALERT #$alertCount: \$${alert.price.toStringAsFixed(2)} at ${alert.timestamp.toString().substring(11, 19)}",
);
},
onDone: () {
print("\n📈 Market closed. Total alerts: $alertCount");
},
);
// Wait for stream to complete
await Future.delayed(Duration(seconds: 12));
await subscription.cancel();
}
class StockAlert {
final double price;
final DateTime timestamp;
StockAlert({required this.price, required this.timestamp});
}
// Output
// 📊 Stock Price Monitor Started
//
// 🟢 ALERT #1: $177.83 at 22:22:40
// 🟢 ALERT #2: $168.75 at 22:22:41
// 🟢 ALERT #3: $189.18 at 22:22:42
// 🟢 ALERT #4: $153.36 at 22:22:43
// 🟢 ALERT #5: $153.62 at 22:22:43
// 🟢 ALERT #6: $192.32 at 22:22:45
// 🟢 ALERT #7: $156.02 at 22:22:45
// 🟢 ALERT #8: $179.79 at 22:22:46
// 🟢 ALERT #9: $197.65 at 22:22:47
// 🟢 ALERT #10: $174.05 at 22:22:47
// 🟢 ALERT #11: $162.34 at 22:22:49
// 🟢 ALERT #12: $185.20 at 22:22:49
//
// 📈 Market closed. Total alerts: 12
Single-Subscription vs Broadcast Streams
Stream တွေ သုံးတဲ့အခါ သတိပြုစရာရှိတာက ကြေငြာထားတဲ့ stream က ဘယ်လိုအမျိုးအစားလဲ ဆိုတာပါ။
final stream = Stream.periodic(Duration(seconds: 1), (i) => i).take(3);
stream.listen((data) => print("Listener 1: $data"));
stream.listen((data) => print("Listener 2: $data"));
// Output:
// Unhandled exception:
// Bad state: Stream has already been listened to.ဒီမှာ ပြထားသလိုမျိုး ရေးပြီး run လိုက်တဲ့အခါ error ပြနေမှာပဲ ဖြစ်ပါတယ်။ ဘာဖြစ်လို့လဲဆိုတော့ stream တွေကို ဘာမှ ထပ်ဆောင်းမကြေငြာထားဘဲ အသုံးပြုတဲ့အခါ သူတို့ကို တနေရာကဘဲ အသုံးပြု (listen လုပ်) လို့ ရပါတယ်။ နောက်ထပ် တနေရာက ထပ်ပြီး listen လုပ်လိုက်တာနဲ့ error တက်လာမှာပဲ ဖြစ်ပါတယ်။
ဒီတော့ Stream ကို တနေရာထပ်ပိုပြီး အသုံးပြုစေချင်တဲ့အခါ broadcast လုပ်ပေးဖို့လိုပါတယ်။
final controller = StreamController<int>.broadcast();
controller.stream.listen((data) => print("Home Page: $data"));
controller.stream.listen((data) => print("Profile Page: $data"));
controller.stream.listen((data) => print("Settings Page: $data"));
controller.sink.add(42);
controller.close();ဒီမှာကြည့်မယ်ဆို listen လုပ်ထားတာ နေရာအများကြီးက လုပ်ထားတာ ဖြစ်ပါတယ်။ boradcast() ဆိုပြီး ထည့်ရေးထားတဲ့အတွက် error တွေလဲ မတက်တော့ပါဘူး။
Stream<int> singleStream = Stream.periodic(Duration(seconds: 1), (i) => i);
Stream<int> broadcastStream = singleStream.asBroadcastStream();
broadcastStream.listen((data) => print("A: $data"));
broadcastStream.listen((data) => print("B: $data"));တကယ်လို့ ရှိနေတဲ့ single subscription stream ကို implementation လုပ်ထားတဲ့နေရာမှာ broadcast() ဆိုပြီး ပြင်လို့မရခဲ့ရင် နားထောင်တာမတိုင်ခင် broadcast အဖြစ် ပြောင်းလိုက်လို့ရပါတယ်။
Stream<int> singleStream = Stream.periodic(Duration(seconds: 1), (i) => i);
Stream<int> broadcastStream = singleStream.asBroadcastStream();
broadcastStream.listen((data) => print("A: $data"));
broadcastStream.listen((data) => print("B: $data"));တကယ်လို့ stream ကနေဝင်လာတဲ့ data တွေက processing တွေထပ်လုပ်ဖို့လိုတယ် တခုချင်းစီလဲ ကြာဖို့ရှိတယ်ဆိုရင်တော့ await for loop ကိုသုံးပြီး တခုချင်းစီ အဆင့်ဆင့် အလုပ်လုပ်ဆောင်နိုင်အောင် အသုံးပြုလို့ရပါတယ်။
void main() async {
Stream<int> numbers = Stream.periodic(
Duration(seconds: 1),
(i) => i * i, // Emit squares: 0, 1, 4, 9, 16
).take(5);
print("Processing numbers sequentially...\n");
await for (var number in numbers) {
print("Processing: $number");
await Future.delayed(Duration(milliseconds: 500));
print("✓ Done with $number");
}
print("All processing complete!");
}
// Output:
// Processing numbers sequentially...
// Processing: 0
// ✓ Done with 0
// Processing: 1
// ✓ Done with 1
// Processing: 4
// ✓ Done with 4
// Processing: 9
// ✓ Done with 9
// Processing: 16
// ✓ Done with 16
// All processing complete!ဒီနေ့ Day 13 အတွက်ကတော့ ဒီလောက်ပဲ ဖြစ်ပါတယ်။ အဆုံးထိ ဖတ်ပေးတဲ့အတွက် အများကြီး ကျေးဇူးတင်ပါတယ်။ နားမလည်တာတွေ အဆင်မပြေတာတွေ ရှိခဲ့ရင်လဲ အောက်မှာပေးထားတဲ့ discord server ထဲမှာ လာရောက်ဆွေးနွေးနိုင်ပါတယ်။ နောက်နေ့တွေမှာလဲ ဆက်လက်ပြီး sharing လုပ်သွားပေးသွားမှာ ဖြစ်တဲ့အတွက် subscribe လုပ်ထားဖို့ ဖိတ်ခေါ်ပါတယ်။
- Youtube: https://www.youtube.com/@arkarmintun
- Newsletter: https://arkar.dev/
- Discord: https://discord.gg/3xUJ6k6dkH