Day 13: Streams - Reactive Programming Fundamentals

Go beyond single values to continuous data flows. Learn Streams, StreamController, and reactive programming - the foundation for live Flutter features!

မင်္ဂလာပါ.. 👋

100 days of Flutter ရဲ့ Day 13 က ကြိုဆိုပါတယ်။ ဒီနေ့မှာတော့ dart langauge နဲ့ ပတ်သက်တာတွေကို ဆက်လက်ပြီး လေ့လာသွားပါမယ်။ ဒီနေ့ဆွေးနွေးသွားမယ့် ခေါင်းစဥ်ကတော့ Streams အကြောင်းပဲ ဖြစ်ပါတယ်။


မနေ့တုန်းက API တွေခေါ်တာ၊ Database ကနေ သွားပြီးတော့ data တွေ ယူတဲ့အခါ စောင့်ရတဲ့အခါ Future တွေ အသုံးပြုတာကို လေ့လာပြီးပြီပဲ ဖြစ်ပါတယ်။ ဒီနေ့မှာတော့ data တွေက တခါထဲ ရတာမျိုး မဟုတ်ဘဲ တခုပြီးတခု လာနေတာမျိုးဆို ဘယ်လိုအသုံးပြုကြမလဲဆိုတာ ဆက်ကြည့်သွားပါမယ်။ ဒါကိုတော့ Dart မှာ Stream လို့ခေါ်ပါတယ်။ ရေစီးကြောင်းလိုမျိုးပေါ့။ Data တွေက စီးဆင်းနေမှာ ဖြစ်ပြီးတော့ ကိုယ်လိုချင်တာကို ခပ်ပြီး သုံးသွားရသလိုပါ။ Future နဲ့မတူတာက Stream က data တွေက တခါထပ်ပိုပြီးတော့ ဝင်လာနိုင်တာပါ။

Streams တွေကိုတော့ real-time app တွေ ရေးချင်တဲ့အခါမျိုးမှာ အသုံးပြုလို့ရပါတယ်။ ဥပမာ Chat app တွေ၊ Live Score ပြတဲ့ app တွေ၊ Stock app တွေ၊ Ride sharing app တွေ ရေးချင်တဲ့အခါဆို stream အကြောင်းက မသိမဖြစ်လို့ ပြောလို့ရပါတယ်။

ဒီတော့ ဘယ်လို code တွေရေးပြီး Dart ရဲ့ Stream တွေကို အသုံးပြုလို့ရမလဲ ကြည့်ရအောင်ပါ။

Stream

// Emit one value
Stream<int> stream1 = Stream.value(42);

// Emit nothing
Stream<String> stream2 = Stream.empty();

// Emit three values
Stream<int> stream3 = Stream.fromIterable([1, 2, 3]);

// Emit forever (until cancelled)
Stream<int> stream4 = Stream.periodic(Duration(seconds: 1), (count) => count);

ဒါကတော့ ထည့်ပေးမယ့် value တွေ သိပြီးတော့ stream အနေနဲ့ ပေးချင်တာဆိုရင် သုံးလို့ရပါတယ်။ Stream.value(...) ကတော့ value ကို တိုက်ရိုက် ထည့်ပေးလို့ရတာပဲ ဖြစ်ပါတယ်။​ Stream.fromIterable(...) ကတော့ သူ့ထဲမှာ ကြေငြာထားတာတွေကို တခုချင်း Stream ထဲကို ထည့်ပေးသွားမှာပါ။ Stream.periodic(...) ကတော့ သူ့ထဲမှာ value တွေကို ကြေငြာထားတဲ့ အချိန်ပိုင်းအလိုက် publish လုပ်ပေးနေမှာပဲ ဖြစ်ပါတယ်။ သူ့ကို သုံးရင်တော့ နည်းနည်း သတိထားဖို့လိုပါတယ်။ ဘာဖြစ်လို့လဲဆိုတော့ သူ့ကို မရပ်မချင်း publish လုပ်နေမှာ ဖြစ်လို့ပါ။ သူတို့ကို listen လုပ်လိုက်တာနဲ့ ကြေငြာထားတဲ့ value ကို စပြီးပို့ပေးသွားမှာမျိုးဖြစ်ပါတယ်။

StreamController

final controller = StreamController<String>();

controller.sink.add("Hello");
controller.sink.add("World");

Stream<String> myStream = controller.stream;

controller.close();

ဒါကတော့ ကိုယ်တိုင် value တွေကို အခြားနေရာတွေက ထည့်ပေးသွားချင်တာမျိုးဆို သုံးလို့ရပါတယ်။ StreamController ကိုအရင်ဆုံးကြေငြာပြီး သူ့ထဲကို data တွေ publish လုပ်ချင်တယ်ဆို controller.sink.add(...) ဆိုတာကို ခေါ်ပြီး data တွေ ထည့်ပေးသွားလို့ရပါတယ်။ နေရာအများကြီးကနေ stream ကို သုံးဖို့လိုတဲ့အခါ အခုလိုမျိုး stream controller တည်ဆောက်ပြီး data တွေ ပို့ပေးလို့ရပါတယ်။ သူ့ကို အသုံးပြုပြီး stream ထဲက data တွေ ရယူဖို့ဆိုရင်တော့ controller.stream ဆိုပြီး သုံးနိုင်ပါတယ်။

Subscription

StreamSubscription<String> subscription = myStream.listen(
  (data) {
    print("Received: $data");
  },
  onError: (error) {
    print("Error: $error");
  },
  onDone: () {
    print("Stream complete!");
  },
);

subscription.cancel();

Stream ထဲက data တွေကို ရယူဖို့ listen ဆိုပြီး ရေးလိုက်တာက subscribe လုပ်လိုက်တာနဲ့ အလားသဏ္ဍန်တူပါတယ်။ listen လို့ ရေးလိုက်တာနဲ့ stream ထဲကို ဝင်လာတဲ့ value တွေကို ပထမဆုံး callback နေရာမှာ ရယူနိုင်သွားပါလိမ့်မယ်။ stream ထဲကို data မဟုတ်ဘဲ error pass ပေးလို့လဲရပါတယ်။ ဒါဆိုရင်တော့ onError ဆိုတဲ့ named callback function ထဲကို ဝင်သွားပါလိမ့်မယ်။ နောက်ဆုံး stream ထဲက data တွေ ဝင်လာတာပြီးသွားတဲ့အခါ ဒါမှမဟုတ် stream ကို close လုပ်လိုက်တာမျိုးဆိုရင်တော့ onDone ဆိုတဲ့ named callback function ထဲကို ရောက်သွားမှာပဲ ဖြစ်ပါတယ်။

Stream Transformers

myStream
  .map((value) => value * 2)
  .where((value) => value > 10)
  .take(5)
  .listen((data) => print(data));

ဒါကိုတော့ stream ထဲက value တွေကို ဝင်လာတဲ့ အချိန်မှာ tramsform/filter/limit လုပ်ချင်တာမျိုးဆို သုံးလို့ရပါတယ်။ ရှေ့မှာ ပြောခဲ့တဲ့ method chaining ဆိုတာကို stream မှာလဲ သုံးလို့ရတာဖြစ်ပါတယ်။


အခု stream နဲ့ပတ်သက်တဲ့ အခြေခံတွေ ပြောပြီးပြီဆိုတော့ ဥပမာလေးတွေ ဆက်ကြည့်ကြည့်ရအောင်ပါ။

Example 1 - Basic Timer

void main() async {
  print('🚀 Starting countdown at ${DateTime.now()}');

  Stream<int> countdown = Stream.periodic(
    const Duration(seconds: 1),
    (count) => 10 - count,
  ).take(11);

  final subscription = countdown.listen(
    (seconds) {
      if (seconds > 0) {
        print('⏲️ $seconds');
      } else {
        print('BLAST OFF!!!!');
      }
    },
    onError: (error) {
      print('Error $error');
    },
    onDone: () {
      print('✅ Countdown complete');
    },
  );

  await Future.delayed(const Duration(seconds: 12));

  await subscription.cancel();
}

// Output:
// 🚀 Starting countdown at 2026-01-13 21:54:36.206237
// ⏲️ 10
// ⏲️ 9
// ⏲️ 8
// ⏲️ 7
// ⏲️ 6
// ⏲️ 5
// ⏲️ 4
// ⏲️ 3
// ⏲️ 2
// ⏲️ 1
// BLAST OFF!!!!
// ✅ Countdown complete

Example 2 - Chat Room Simulator

import 'dart:async';

class ChatRoom {
  // Private controller - only we can add messages
  final _messageController = StreamController<ChatMessage>();

  // Public stream - anyone can listen
  Stream<ChatMessage> get messages => _messageController.stream;

  // Send a message to the stream
  void sendMessage(String username, String text) {
    final message = ChatMessage(
      username: username,
      text: text,
      timestamp: DateTime.now(),
    );
    _messageController.sink.add(message);
  }

  // CRITICAL: Clean up resources
  void dispose() {
    _messageController.close();
  }
}

class ChatMessage {
  final String username;
  final String text;
  final DateTime timestamp;

  ChatMessage({
    required this.username,
    required this.text,
    required this.timestamp,
  });

  @override
  String toString() {
    final time =
        "${timestamp.hour}:${timestamp.minute.toString().padLeft(2, '0')}";
    return "[$time] $username: $text";
  }
}

void main() async {
  print("💬 Chat Room Started\n");

  final chatRoom = ChatRoom();

  // Subscribe to messages
  final subscription = chatRoom.messages.listen((message) => print(message));

  // Simulate conversation
  chatRoom.sendMessage("Alice", "Hey everyone! 👋");
  await Future.delayed(Duration(seconds: 1));

  chatRoom.sendMessage("Bob", "Hi Alice! How's Flutter going?");
  await Future.delayed(Duration(seconds: 1));

  chatRoom.sendMessage("Alice", "Amazing! Just learned Streams today 🔥");
  await Future.delayed(Duration(seconds: 1));

  chatRoom.sendMessage("Charlie", "Streams are powerful! 💪");

  // Cleanup
  await Future.delayed(Duration(seconds: 1));
  print("\n👋 Chat Room Closing...");
  await subscription.cancel();
  chatRoom.dispose();
}

// Output:
// 💬 Chat Room Started

// [14:30] Alice: Hey everyone! 👋
// [14:30] Bob: Hi Alice! How's Flutter going?
// [14:30] Alice: Amazing! Just learned Streams today 🔥
// [14:30] Charlie: Streams are powerful! 💪

// 👋 Chat Room Closing...

Example 3 - Stock Price Monitor

import 'dart:async';
import 'dart:math';

void main() async {
  print("📊 Stock Price Monitor Started\n");

  // Generate random stock prices every 500ms
  Stream<double> stockPrices = Stream.periodic(
    Duration(milliseconds: 500),
    (_) => 100 + Random().nextDouble() * 100, // $100-$200
  ).take(20);

  // Transform the stream with multiple operations
  final alertStream = stockPrices
      .map((price) => price.toStringAsFixed(2)) // Format to 2 decimals
      .map((priceStr) => double.parse(priceStr)) // Parse back
      .where((price) => price > 150.0) // Only prices above $150
      .map((price) => StockAlert(price: price, timestamp: DateTime.now()));

  // Listen to alerts
  int alertCount = 0;
  final subscription = alertStream.listen(
    (alert) {
      alertCount++;
      print(
        "🟢 ALERT #$alertCount: \$${alert.price.toStringAsFixed(2)} at ${alert.timestamp.toString().substring(11, 19)}",
      );
    },
    onDone: () {
      print("\n📈 Market closed. Total alerts: $alertCount");
    },
  );

  // Wait for stream to complete
  await Future.delayed(Duration(seconds: 12));
  await subscription.cancel();
}

class StockAlert {
  final double price;
  final DateTime timestamp;

  StockAlert({required this.price, required this.timestamp});
}

// Output
// 📊 Stock Price Monitor Started
//
// 🟢 ALERT #1: $177.83 at 22:22:40
// 🟢 ALERT #2: $168.75 at 22:22:41
// 🟢 ALERT #3: $189.18 at 22:22:42
// 🟢 ALERT #4: $153.36 at 22:22:43
// 🟢 ALERT #5: $153.62 at 22:22:43
// 🟢 ALERT #6: $192.32 at 22:22:45
// 🟢 ALERT #7: $156.02 at 22:22:45
// 🟢 ALERT #8: $179.79 at 22:22:46
// 🟢 ALERT #9: $197.65 at 22:22:47
// 🟢 ALERT #10: $174.05 at 22:22:47
// 🟢 ALERT #11: $162.34 at 22:22:49
// 🟢 ALERT #12: $185.20 at 22:22:49
//
// 📈 Market closed. Total alerts: 12

Single-Subscription vs Broadcast Streams

Stream တွေ သုံးတဲ့အခါ သတိပြုစရာရှိတာက ကြေငြာထားတဲ့ stream က ဘယ်လိုအမျိုးအစားလဲ ဆိုတာပါ။

final stream = Stream.periodic(Duration(seconds: 1), (i) => i).take(3);

stream.listen((data) => print("Listener 1: $data"));

stream.listen((data) => print("Listener 2: $data"));

// Output:
// Unhandled exception:
// Bad state: Stream has already been listened to.

ဒီမှာ ပြထားသလိုမျိုး ရေးပြီး run လိုက်တဲ့အခါ error ပြနေမှာပဲ ဖြစ်ပါတယ်။ ဘာဖြစ်လို့လဲဆိုတော့ stream တွေကို ဘာမှ ထပ်ဆောင်းမကြေငြာထားဘဲ အသုံးပြုတဲ့အခါ သူတို့ကို တနေရာကဘဲ အသုံးပြု (listen လုပ်) လို့ ရပါတယ်။ နောက်ထပ် တနေရာက ထပ်ပြီး listen လုပ်လိုက်တာနဲ့ error တက်လာမှာပဲ ဖြစ်ပါတယ်။

ဒီတော့ Stream ကို တနေရာထပ်ပိုပြီး အသုံးပြုစေချင်တဲ့အခါ broadcast လုပ်ပေးဖို့လိုပါတယ်။

final controller = StreamController<int>.broadcast();

controller.stream.listen((data) => print("Home Page: $data"));
controller.stream.listen((data) => print("Profile Page: $data"));
controller.stream.listen((data) => print("Settings Page: $data"));

controller.sink.add(42);

controller.close();

ဒီမှာကြည့်မယ်ဆို listen လုပ်ထားတာ နေရာအများကြီးက လုပ်ထားတာ ဖြစ်ပါတယ်။ boradcast() ဆိုပြီး ထည့်ရေးထားတဲ့အတွက် error တွေလဲ မတက်တော့ပါဘူး။

Stream<int> singleStream = Stream.periodic(Duration(seconds: 1), (i) => i);

Stream<int> broadcastStream = singleStream.asBroadcastStream();

broadcastStream.listen((data) => print("A: $data"));
broadcastStream.listen((data) => print("B: $data"));

တကယ်လို့ ရှိနေတဲ့ single subscription stream ကို implementation လုပ်ထားတဲ့နေရာမှာ broadcast() ဆိုပြီး ပြင်လို့မရခဲ့ရင် နားထောင်တာမတိုင်ခင် broadcast အဖြစ် ပြောင်းလိုက်လို့ရပါတယ်။

Stream<int> singleStream = Stream.periodic(Duration(seconds: 1), (i) => i);

Stream<int> broadcastStream = singleStream.asBroadcastStream();

broadcastStream.listen((data) => print("A: $data"));
broadcastStream.listen((data) => print("B: $data"));

တကယ်လို့ stream ကနေဝင်လာတဲ့ data တွေက processing တွေထပ်လုပ်ဖို့လိုတယ် တခုချင်းစီလဲ ကြာဖို့ရှိတယ်ဆိုရင်တော့ await for loop ကိုသုံးပြီး တခုချင်းစီ အဆင့်ဆင့် အလုပ်လုပ်ဆောင်နိုင်အောင် အသုံးပြုလို့ရပါတယ်။

void main() async {
  Stream<int> numbers = Stream.periodic(
    Duration(seconds: 1),
    (i) => i * i, // Emit squares: 0, 1, 4, 9, 16
  ).take(5);

  print("Processing numbers sequentially...\n");

  await for (var number in numbers) {
    print("Processing: $number");
    await Future.delayed(Duration(milliseconds: 500));
    print("✓ Done with $number");
  }

  print("All processing complete!");
}

// Output:
// Processing numbers sequentially...
// Processing: 0
// ✓ Done with 0
// Processing: 1
// ✓ Done with 1
// Processing: 4
// ✓ Done with 4
// Processing: 9
// ✓ Done with 9
// Processing: 16
// ✓ Done with 16
// All processing complete!

ဒီနေ့ Day 13 အတွက်ကတော့ ဒီလောက်ပဲ ဖြစ်ပါတယ်။ အဆုံးထိ ဖတ်ပေးတဲ့အတွက် အများကြီး ကျေးဇူးတင်ပါတယ်။ နားမလည်တာတွေ အဆင်မပြေတာတွေ ရှိခဲ့ရင်လဲ အောက်မှာပေးထားတဲ့ discord server ထဲမှာ လာရောက်ဆွေးနွေးနိုင်ပါတယ်။ နောက်နေ့တွေမှာလဲ ဆက်လက်ပြီး sharing လုပ်သွားပေးသွားမှာ ဖြစ်တဲ့အတွက် subscribe လုပ်ထားဖို့ ဖိတ်ခေါ်ပါတယ်။