본문 바로가기

알쓸코잡

Race Condition과 Thread.Lock (Feat. 파이썬)

Race Condition과 Thread.Lock (Feat. 파이썬)

NCP에서 Fast API관련한 서버가 계속 OOM이 발생해서 원인을 파악하던 도중.. 갑자기 딴 길로 샜다.


Race Condition

동시성 프로그래밍에서 발생하는 상황으로, 두개 이상의 동시적인 연산(스레드나 프로세스에 의해 수행되는)이 자원에 접근하고 그 순서에 따라 결과가 달라지는 상황

예를 들어, 두 스레드가 동시에 같은 메모리 위치에 쓰기 작업을 수행하려고 할 때, 어느 스레드가 먼저 쓰기 작업을 수행하느냐에 따라 결과가 달라질 수 있음


따라서

동시성 프로그래밍을 할 때는 Race Condition을 방지하기 위한 동기화 메커니즘(뮤텍스, 세마포어)을 사용해서 한 번에 오직 하나의 연산만이 해당 자원에 접근할 수 있도록 제어해야 함

 

두 개의 스레드가 같은 변수 'counter'를 증가시키는데, race condition 발생함

import threading

# 공유 변수
counter = 0

def increase_counter():
    global counter
    for _ in range(1000000):
        counter += 1

# 두 개의 스레드 생성
thread1 = threading.Thread(target=increase_counter)
thread2 = threading.Thread(target=increase_counter)

# 스레드 시작
thread1.start()
thread2.start()

# 두 스레드가 종료되기를 기다림
thread1.join()
thread2.join()

print(f"예상되는 값: {2 * 1000000}, 실제 counter 값: {counter}")

 

결과

예상되는 값: 2000000, 실제 counter 값: 1853043

이 문제를 해결하기 위해선 동기화 메커니즘인 스레드 Lock을 사용해서 한 번에 하나의 스레드만 'counter'를 증가시킴

import threading

# 공유 변수
counter = 0
# 락 객체 생성
counter_lock = threading.Lock()

def increase_counter():
    global counter
    for _ in range(1000000):
        # 공유 변수에 접근하기 전에 락 획득
        with counter_lock:
            counter += 1

# 두 개의 스레드 생성
thread1 = threading.Thread(target=increase_counter)
thread2 = threading.Thread(target=increase_counter)

# 스레드 시작
thread1.start()
thread2.start()

# 두 스레드가 종료되기를 기다림
thread1.join()
thread2.join()

print(f"예상되는 값: {2 * 1000000}, 실제 counter 값: {counter}")

 

결과

예상되는 값: 2000000, 실제 counter 값: 2000000

위처럼, thread.Lock을 사용할 경우, 각 스레드는 해당 락을 획득하려고 시도한다

한 스레드가 락을 이미 획득한 경우, 다른 스레드는 락이 풀릴 때까지 대기해야 한다

그렇다면

동작순서가 Thread1 작업이 끝나야 Thread2 작업이 시작하는가?라고 묻는 다면 이것은 아니다.

import threading
import time

# 공유 변수
counter = 0
# 락 객체 생성
counter_lock = threading.Lock()

def increase_counter(thread_name):
    global counter
    for _ in range(100): 
        with counter_lock:
            print(f"{thread_name} has acquired the lock.")
            counter += 1
            time.sleep(1)

            print(f"{thread_name} has released the lock.")

# 두 개의 스레드 생성
thread1 = threading.Thread(target=increase_counter, args=("Thread 1",))
thread2 = threading.Thread(target=increase_counter, args=("Thread 2",))

# 스레드 시작
thread1.start()
thread2.start()

# 두 스레드가 종료되기를 기다림
thread1.join()
thread2.join()

print(f"Counter value: {counter}")

 

결과

 

이것을 시각화해서 확인하는 경우, 한 번에 하나의 작업만 하는 것을 더 직관적으로 확인할 수 있다

import threading
import time
import matplotlib.pyplot as plt

# 공유 변수
counter = 0
# 락 객체 생성
counter_lock = threading.Lock()

# 스레드 작업 결과 저장
results = {"Thread 1": [], "Thread 2": []}

def increase_counter(thread_name):
    global counter
    for _ in range(100):  
        with counter_lock:
            # 시작 시간 기록
            start_time = time.time()
            counter += 1
            time.sleep(0.1)  
            # 종료 시간 기록
            end_time = time.time()
            # 작업 결과 저장
            results[thread_name].append((start_time, end_time))

# 두 개의 스레드 생성
thread1 = threading.Thread(target=increase_counter, args=("Thread 1",))
thread2 = threading.Thread(target=increase_counter, args=("Thread 2",))

# 스레드 시작
thread1.start()
thread2.start()

# 두 스레드가 종료되기를 기다림
thread1.join()
thread2.join()

print(f"Counter value: {counter}")

# 시각화
fig, ax = plt.subplots()
colors = ['blue', 'orange']

for i, (color, (thread_name, timestamps)) in enumerate(zip(colors, results.items())):
    start_times, end_times = zip(*timestamps)
    ax.broken_barh([(start, end-start) for start, end in zip(start_times, end_times)], (i, 1), facecolors=color, label=thread_name)

ax.set_xlabel("Time")
ax.set_yticks(range(len(results)))
ax.set_yticklabels(list(results.keys()))
ax.legend()
plt.show()

 

결과


또한

GIL을 사용하더라고 I/O 바운드가 일어나면 GIL을 release 해서 다른 스레드에게 CPU를 재할당한다

I/O를 많이 사용하는 작업을 시간 지연(time.sleep)으로, CPU를 많이 사용하는 작업을 루프 작업으로 작성함

 

0.1초 지연
import threading
import time

def cpu_bound():
    print("\nCPU-bound 작업 시작...")
    for i in range(5 * 10**7):
        pass
    print("\nCPU-bound 작업 끝...")

def io_bound():
    print("\nI/O-bound 작업 시작...")
    time.sleep(0.1)  
    print("\nI/O-bound task 끝...")

t1 = threading.Thread(target=cpu_bound)
t2 = threading.Thread(target=io_bound)

t1.start()
t2.start()

t1.join()
t2.join()
결과

 

3초 지연 결과

 

동일한 프로세스 내의 여러 스레드가 동시에 실행되지 않는다.

하지만 "동시에 실행되지 않는다"는 것은 CPU가 계산하는 작업에만 해당된다.

 

스레드가 I/O 작업을 수행하고 대기하는 동안, 파이썬 인터프리터는 해당 스레드에서 GIL 해제하고 다른 스레드가 실행될 수 있도록 GIL을 할당함

 

이렇게 하면 하나의 스레드가 I/O 작업으로 인해 Blocked 상태일 때, 다른 스레드가 CPU를 사용하게 하는 것이다