9.4.12 الگو Pipeline

9.4.12 الگو Pipeline

9.4.12.1 توضیحات #

الگوی خط لوله (Pipeline) یکی از حرفه‌ای‌ترین و پرکاربردترین الگوهای همزمانی در زبان Go است که نقش بسیار مهمی در طراحی سیستم‌های مقیاس‌پذیر، قابل نگهداری و با کارایی بالا دارد. هدف این الگو این است که یک کار بزرگ یا پردازش پیچیده را به چند مرحله (stage) کاملاً مستقل تقسیم کند، به طوری که هر مرحله بتواند همزمان با مراحل دیگر اجرا شود. در این ساختار، هر stage مسئولیت انجام یک بخش خاص از فرآیند را بر عهده دارد (مثلاً خواندن داده، پاک‌سازی، پردازش، ذخیره‌سازی و…) و معمولاً هر stage در یک goroutine مجزا اجرا می‌شود.

انتقال داده بین این مراحل، به صورت ایمن و concurrent از طریق channel انجام می‌گیرد. هر مرحله داده‌های پردازش‌شده خود را به کانال خروجی ارسال می‌کند و مرحله بعدی آن داده‌ها را از کانال ورودی دریافت می‌کند. این ساختار باعث می‌شود که مراحل مختلف pipeline بتوانند بدون وابستگی مستقیم به یکدیگر، به صورت موازی و کاملاً decoupled کار کنند؛ به عبارتی، هر stage می‌تواند با سرعت خود کار کند و کندی یا شتاب یک مرحله، روی کل پردازش اثر غیرخطی نخواهد داشت.

این الگو به طور گسترده در پردازش داده‌های حجیم، تحلیل داده‌های real-time، پیاده‌سازی ETL (استخراج، تبدیل و بارگذاری)، پردازش تصویر و صدا، یا هرجا که داده باید مرحله به مرحله و به شکل stream پردازش شود، کاربرد دارد. با استفاده از Pipeline، کد ساده‌تر، توسعه‌پذیرتر و خطایابی آن آسان‌تر می‌شود و همچنین بهره‌وری سیستم به حداکثر می‌رسد، چرا که همزمانی و توزیع بار به بهترین شکل انجام می‌شود.

9.4.12.2 دیاگرام #

flowchart LR A[Input Data] --> S1[Stage 1: Preprocessing] S1 --> S2[Stage 2: Processing] S2 --> S3[Stage 3: Postprocessing] S3 --> B[Output Data] subgraph Goroutine S1 S2 S3 end style S1 fill:#d9f5f9,stroke:#30b5c8,stroke-width:2px style S2 fill:#e2f7e9,stroke:#46c772,stroke-width:2px style S3 fill:#fdf2e8,stroke:#e8922d,stroke-width:2px

9.4.12.3 نمونه کد #

 1package main
 2
 3import (
 4	"fmt"
 5)
 6
 7// تولید داده اولیه (stage 1)
 8func gen(nums ...int) <-chan int {
 9	out := make(chan int, 2) // کانال بافر دار
10	go func() {
11		defer close(out)
12		for _, n := range nums {
13			out <- n
14		}
15	}()
16	return out
17}
18
19// مربع هر عدد (stage 2)
20func sq(in <-chan int) <-chan int {
21	out := make(chan int, 2)
22	go func() {
23		defer close(out)
24		for n := range in {
25			out <- n * n
26		}
27	}()
28	return out
29}
30
31// افزایش عدد به اندازه ۱۰ (stage 3)
32func addTen(in <-chan int) <-chan int {
33	out := make(chan int, 2)
34	go func() {
35		defer close(out)
36		for n := range in {
37			out <- n + 10
38		}
39	}()
40	return out
41}
42
43func main() {
44	// ساخت خط لوله با چندین مرحله
45	stage1 := gen(2, 3, 5, 7)
46	stage2 := sq(stage1)
47	stage3 := addTen(stage2)
48
49	// مصرف تمام خروجی pipeline
50	for v := range stage3 {
51		fmt.Println(v)
52	}
53}
1$ go run main.go
214
319
435
559

در این مثال، یک خط لوله (Pipeline) همزمانی واقعی و کاملاً idiomatic در زبان Go پیاده‌سازی شده است که چندین مرحله (stage) پردازشی را به صورت مستقل و موازی اجرا می‌کند. هر مرحله‌ی این pipeline یک تابع جداگانه است که داده‌های دریافتی از مرحله قبلی را از یک کانال (channel) می‌گیرد، عملیات مورد نظر خود را روی هر داده انجام می‌دهد و نتیجه را به کانال خروجی ارسال می‌کند. این مدل باعث می‌شود هر بخش از پردازش بدون وابستگی به سرعت بخش دیگر و به صورت ایمن و concurrent اجرا شود.

در کد، ابتدا با تابع gen داده‌های اولیه (در اینجا ۲، ۳، ۵ و ۷) تولید و وارد یک کانال می‌شوند. سپس این داده‌ها به مرحله دوم (sq) ارسال می‌شوند که کار هر goroutine در این مرحله گرفتن عدد و بازگرداندن مربع آن است. خروجی مرحله دوم وارد مرحله سوم (addTen) می‌شود که به هر عدد، مقدار ۱۰ اضافه می‌کند. هر مرحله در یک goroutine مجزا و روی یک کانال بافر‌دار اجرا می‌شود که باعث افزایش performance و decoupling کامل مراحل می‌شود.

در انتها، یک حلقه‌ی ساده روی خروجی مرحله آخر (stage3) قرار می‌گیرد و تمام نتایج به ترتیب مصرف و چاپ می‌شوند. استفاده از حلقه و range روی کانال، مصرف امن، بدون بلاک شدن و بدون نگرانی از goroutine leak را تضمین می‌کند، چون با بسته شدن کانال، حلقه به طور خودکار خارج می‌شود. این ساختار بسیار منعطف و قابل توسعه است؛ می‌توان به راحتی مراحل بیشتری به pipeline افزود یا منطق هر مرحله را تغییر داد، بدون اینکه بخش‌های دیگر برنامه نیاز به تغییر داشته باشند. چنین معماری، مناسب سیستم‌های پردازش داده، real-time، ETL و سناریوهای تحلیل موازی و مقیاس‌پذیر است.

یک مثال کاربردی دیگر

9.4.12.4 کاربردها #

  • مدیریت ترافیک و پردازش شبکه (Network Stream Processing):
    الگوی Pipeline به شما اجازه می‌دهد چندین اتصال شبکه (مثلاً درخواست‌های همزمان کاربران یا پیام‌های ورودی) را به صورت مرحله‌ای مدیریت کنید؛ به گونه‌ای که هر بسته یا پیام از مراحل مختلفی مانند خواندن، تجزیه (parse)، فیلتر (filter)، اعتبارسنجی (validation)، مسیریابی (routing)، و حتی رمزنگاری یا فشرده‌سازی عبور کند. این مدل به شدت در سرورهای proxy، load balancer و نرم‌افزارهای firewall کاربرد دارد.
  • محاسبات چند مرحله‌ای و زنجیره‌ای (Multi-stage Computation):
    Pipeline راهکاری عالی برای تقسیم محاسبات پیچیده به گام‌های ساده‌تر و مستقل است؛ به طوری که هر مرحله روی بخشی از داده یا نتیجه مرحله قبل کار کند. برای مثال: تولید داده خام → پاک‌سازی → تبدیل فرمت → محاسبات عددی یا آماری → تجمیع نهایی. این ساختار کارایی و خوانایی برنامه را افزایش و توسعه آن را آسان‌تر می‌کند.
  • پردازش و تحلیل گزارش‌ها (Log Processing & ETL):
    در سامانه‌های جمع‌آوری و تحلیل لاگ یا داده‌های گزارش، می‌توان داده‌ها را به صورت جریان پیوسته از مراحل مختلف عبور داد؛ مثلاً ابتدا فیلترکردن داده‌های نامربوط، سپس تجزیه (parse) رکوردها، enrich کردن با داده‌های خارجی (مانند geoip)، و در نهایت ذخیره‌سازی یا index برای جستجو. این روش پایه معماری‌های ETL، سامانه‌های لاگ توزیع‌شده (ELK، Loki، …)، و data lakeها است.
  • تجزیه و تحلیل داده‌های بزرگ (Big Data & Real-Time Analytics):
    در پروژه‌های داده‌محور، Pipeline ابزاری کلیدی برای اجرای زنجیره‌ای عملیات روی داده‌های حجیم است؛ مانند فیلترکردن داده‌ها، map/reduce، تبدیل ویژگی‌ها (feature engineering)، استخراج اطلاعات، و ساخت گزارش‌های لحظه‌ای یا ذخیره در بانک داده. این مدل به پردازش موازی، افزایش throughput و مقیاس‌پذیری سامانه کمک می‌کند.
  • پردازش فایل و تصویر:
    در بسیاری از سرویس‌های backend، فایل‌ها یا تصاویر آپلودشده می‌توانند به صورت pipeline پردازش شوند: خواندن فایل → decode → resize/crop → تبدیل فرمت → اعمال واترمارک → ذخیره‌سازی یا آپلود به سرویس دیگر.