Post

SWE #2: Concurrency (Part 2) - Chúc mừng 30/4 - 1/5

SWE #2: Concurrency (Part 2) - Chúc mừng 30/4 - 1/5

Summarry

Tiếp tục với phần trước, ta sẽ tìm hiểu các cách mà Golang sử dụng để xử lý việc Race-Condition

Race condition

Race Condition là việc mà 2 hay nhiều thread cùng vào và xử lý một shared resource (biến, mảng, file, …). Điều này dẫn đến việc các thread có thể đọc và ghi dữ liệu không đồng bộ với nhau. Điều này dẫn đến việc dữ liệu bị sai lệch và không chính xác.

Cùng xem đoạn code dưới đây:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
	"fmt"
	"time"
)
func main() {
	cnt := 0

	for i := 0; i <= 5; i++ {
		go func() {
			for j := 1; j <= 1000; j++ {
				cnt++
			}
		}()
	}

	time.Sleep(3 * time.Second);
	
	fmt.Println(cnt);
}

Khi chạy đoạn code trên, ta sẽ thấy rằng giá trị của biến cnt không phải là 6000 mà là một số khác. Điều này xảy ra do việc các goroutine cùng vào và xử lý biến cnt cùng một lúc.

Ví dụ khi cnt = 0, goroutine 1 vào và đọc biến cnt. Khi này cnt đang là 0. Trong lúc đó, goroutine 2 cũng vào và đọc biến cnt = 0. Khi goroutine ` tăng cnt lên 1 đáng nhẽ lúc này cnt sẽ là 1 nhưng do goroutine 2 cũng đang tăng cnt lên 1 nên giá trị của biến cnt sẽ là 0 thay vì 1. Điều này dẫn đến việc giá trị của biến cnt không chính xác.

Mutex

Mutex là một trong những cách để xử lý race condition. Mutex là viết tắt của Mutual Exclusion. Mutex là một cơ chế đồng bộ hóa cho phép chỉ một goroutine được truy cập vào một shared resource tại một thời điểm. Điều này giúp tránh việc các goroutine cùng vào và xử lý một shared resource cùng một lúc.

Package sync trong Golang cung cấp cho ta 3 phương thức để xử lý mutex:

  • Lock(): Để khóa mutex. Khi mutex đang không bị khóa, phương thức này sẽ khóa mutex lại sau đó tiếp tục thực thi các lệnh bên dưới. Nếu muxtex đang bị khóa, nó sẽ bị block ở đây đến khi được Unlock. A += 0.05; // Điều chỉnh tốc độ quay dọc

  • Unlock(): Nếu mutex đang bị khóa bởi chính goroutine này: Mutex sẽ mở khóa, cho phép goroutine khác tiếp quản. Nếu mutex chưa bị khóa hoặc bị khóa bởi goroutines khác: Gây ra runtime panic.

  • TryLock(): Nếu mutex chưa bị khóa: TryLock() sẽ khóa mutex và trả về True. Nếu mutex đang bị khóa TryLock() sẽ không bị block và trả về false

Áp dụng vào ví dụ trên ta sẽ làm như sau:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
	"fmt"
	"time"
        "sync"
)
func main() {
	cnt := 0

    var mu sync.Mutex

	for i := 0; i <= 5; i++ {
		go func() {
                    for j := 1; j <= 1000; j++ {
                        mu.Lock()
                        cnt++
                        mu.Unlock()
                    }
                }()
	}

	time.Sleep(3 * time.Second);
	
	fmt.Println(cnt);
}

Khi vào thao tác với biến cnt, ta sẽ khóa mutex lại không cho phép các goroutines khác tham gia vào quá trình này. Và khi đã thao tác xong ta mở khóa cho các goroutines khác vào tiếp tục làm việc.

Channel

Đồng bộ hóa với mutex là một cách tiếp cận đơn giản. Bây giờ ta sẽ dụng một unbuffered channel để thực hiện việc đồng bộ hóa. Channel cung cấp cho chúng ta 2 phương thức là gửi và nhận thông qua dấu <-

Gửi: Gửi thông tin qua channel và nó sẽ bị block đến khi có một goroutines khác nhận

Nhận: Nhận thông tin và nó sẽ bị block đến khi có một goroutines gửi vào channel.

Áp dụng vào bài toán trên

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
	"fmt"
)
func main() {

	cnt := make(chan int, 1)

	for i := 0; i <= 5; i++ {
		go func() {
			for j := 1; j <= 1000; j++ {
				cnt <- 1
			}
		}()
	}	
	
	sum := 0
	
	for i := 0; i < 6000; i++ {
		sum += <- cnt
	}

	fmt.Println(sum)
}

Có một lưu ý là nếu cnt đã không còn nhận được thông tin nào mà sum cứ đòi nhận thì sẽ gây ra tình trạng deadlock. Ở đây vì mình biết đáp án là 6000 trước nên mình để chạy 6000 lần nhận. Còn trong thực tế khi đã chạy xong ta phải close lại channel.

Wait group

Có một cách đơn giản hơn nữa là sử dụng sync.waitGroup để chờ một tập các sự kiện. 3 phương thức mà ta thường hay sử dụng với waitGroup là:

  • Add: Thêm một gouroutines vào tập
  • Done: Hoàn thành một goroutines
  • Wait: Block và chờ đến khi tất cả goroutines trong tập hoàn thành.

Áp dụng vào ví dụ trên kết hợp cùng với channel ta sẽ có đoạn code hoàn chỉnh như sau:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
	"fmt"
	"sync"
)

func main() {

	cnt := make(chan int, 1)

	var wg sync.WaitGroup

	for i := 0; i <= 5; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for j := 1; j <= 1000; j++ {
				cnt <- 1
			}
		}()
	}

	go func ()  {
		wg.Wait()
		close(cnt)
	}()

	sum := 0
	
	for x := range cnt {
		sum += x
	}

	fmt.Println(sum)
}

Producer Consumer

Concurrency

Một ví dụ phổ biến nhất về lập trình Concurrency là mô hình Producer Cosumer giúp tăng tốc độ xử lý chung của chương trình bằng cách cần bằng sưc mạnh của các thread “sản xuất) (produce) và tiêu thụ (consume).

Producer tạo ra một số dữ liêu và sau đó đưa vào một hàng đợi. Cùng lúc đó consumer cũng lấy dữ liệu này ra để xử lý. Điều này làm cho produce và consume trở thành hai quá trình bất đồng bộ. Khi không có dữ liệu trong hàng đợi, consumer sẽ chờ ở trạng thái “đói”, còn khi dữ liệu trong hàng đợi bị đầy, producer phải đối diện với vấn đề mất mát dữ liệu khi CPU phải loại bỏ bớt dữ liệu trong đó để nạp thêm.

Trong Golang, ta có thể thực hiện cơ chế này khá đơn giản như sau:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func Producer(factor int, out chan <- int) {
    for i := 0; ; i++ {
        out <- i * factor
    }
}

func Consumer(in <- chan int) {
    for i := range in {
        fmt.Println(i)
    }
}

func main() {
    ch := make(chan int, 64)

    go Producer(3, ch)

    go Producer(5, ch)

    go Consumer(ch)

    time.Sleep(5 * time.Second)
}

Mô hình Publish Subscribe

Mô hình Publish-Subscribe hay pub/sub. Trong mô hình này, producer trờ thành publisher và consumer trở thành subscriber. Đồng thời, producer:consumer là mối quan hệ M:N.

Concurrency

This post is licensed under CC BY 4.0 by the author.