Monday, May 12, 2008
ใช้ Queue ใน Python
คนที่เคยทำงานกับ threading ใน Python น่าจะเคยเจอปัญหาในการจัดการกับ lock มาบ้างไม่มากก็น้อย
ใน Python ก็มี module ตัวนึงชื่อว่า Queue มาช่วยจัดการปัญหานี้
วิธีการทำงานของมันจะคล้ายกับการทำงานแบบ multithreading ทั่วไป กล่าวคือมีตัวคิวเป็นตัวแจกงาน
และ worker วิ่งเข้ามารับงานไปเรื่อยๆ จนกว่าจะหมด
ข้อดีของ Queue ก็คือเราสามารถเลี่ยงปัญหา race condition ได้ เพราะ Queue จะจัดการพวก lock ต่างๆ ของมันเอง
ตัวอย่างรูปแบบการใช้งาน Queue แบบหนึ่ง
เริ่มต้นเลยต้องสร้าง Queue instance ขึ้นมาก่อน
จากนั้นเราจะต้องสร้าง worker function ขึ้นมาก่อน โดนสั่งให้ worker อยู่เฉยๆ รอ item ที่มาจากคิว มาดูตัวอย่างกัน
queobject ในที่นี้คือ Queue instance ที่เราจะส่งเข้าไปให้นั่นเอง
โปรดสังเกตว่ามี method ชื่อ get() กับ task_done() อยู่ในที่นี้
queobject.get() ทำหน้าที่รับข้อมูลจาก queue มา method นี้โดยค่าปริยายจะหยุดทำงาน จนกว่าจะมีข้อมูลมาจาก queue
พอทำงานเสร็จปุ๊บ เราก็จะบอกคิวว่า worker ตัวนี้ว่างแล้ว เอางานใหม่มา โดยใช้ method ชื่อ queobject.task_done()
อย่าลืมเชียวนะครับ!
หลังจากที่เรามีคิวและ worker เรียบร้อยแล้ว ก็ถึงเวลาสั่งให้ worker ทำงาน โดยใช้ threading.Thread
Worker 4 ตัวจะ start ขึ้นมาเพื่อรอข้อมูลจากคิว ใครที่ถนัดหน่อยสามารถใช้ module อื่นๆ เอาจำนวนของ cpu ของเครื่องมาใช้ก็ได้
ตอนนี้เราก็มีทุกอย่างพร้อมใช้งาน เราสามารถส่งข้อมูลเข้าไปในคิวได้แล้ว วิธีส่งก็ใช้ method ชื่อ put แบบนี้
ข้อสังเกต: ถ้าเอาโค้ดนี้ไปรันเลย เราจะไม่ได้อะไรออกมา เพราะว่า python จะไม่หยุดรอ (block) ให้ queue ว่าง หลังจากที่ put ข้อมูลเข้าไปแล้วจะถือว่า process นั้นเสร็จสิ้นทันที เราตั้งค่า setDaemon(True) ไว้ตอน start worker เพื่อที่จะให้ worker ทั้งหลายตายไปพร้อมกับตัว process กันปัญหา zombie thread
Queue instance มี method ที่ชื่อว่า join() เอาไว้สั่งให้ตัว program block ไว้จนกว่าทุกอย่างใน queue จะถูกดึงออกไปและได้รับ task_done() กลับมา แบบนี้
เป็นอันเสร็จสิ้น ลองเล่นได้เลยครับ
ใน Python ก็มี module ตัวนึงชื่อว่า Queue มาช่วยจัดการปัญหานี้
วิธีการทำงานของมันจะคล้ายกับการทำงานแบบ multithreading ทั่วไป กล่าวคือมีตัวคิวเป็นตัวแจกงาน
และ worker วิ่งเข้ามารับงานไปเรื่อยๆ จนกว่าจะหมด
ข้อดีของ Queue ก็คือเราสามารถเลี่ยงปัญหา race condition ได้ เพราะ Queue จะจัดการพวก lock ต่างๆ ของมันเอง
ตัวอย่างรูปแบบการใช้งาน Queue แบบหนึ่ง
---------
| Queue |
---------
/ | \
/ | \
/ | \
----- ----- -----
| W | | W | | W |
----- ----- -----
| | |
| | |
-----------------
| Results |
-----------------
เริ่มต้นเลยต้องสร้าง Queue instance ขึ้นมาก่อน
import threading
import Queue
cmdqueue = Queue.Queue()
จากนั้นเราจะต้องสร้าง worker function ขึ้นมาก่อน โดนสั่งให้ worker อยู่เฉยๆ รอ item ที่มาจากคิว มาดูตัวอย่างกัน
def workerThread(queobject):
while True:
vars = queobject.get()
print vars
queobject.task_done()
queobject ในที่นี้คือ Queue instance ที่เราจะส่งเข้าไปให้นั่นเอง
โปรดสังเกตว่ามี method ชื่อ get() กับ task_done() อยู่ในที่นี้
queobject.get() ทำหน้าที่รับข้อมูลจาก queue มา method นี้โดยค่าปริยายจะหยุดทำงาน จนกว่าจะมีข้อมูลมาจาก queue
พอทำงานเสร็จปุ๊บ เราก็จะบอกคิวว่า worker ตัวนี้ว่างแล้ว เอางานใหม่มา โดยใช้ method ชื่อ queobject.task_done()
อย่าลืมเชียวนะครับ!
หลังจากที่เรามีคิวและ worker เรียบร้อยแล้ว ก็ถึงเวลาสั่งให้ worker ทำงาน โดยใช้ threading.Thread
for i in range(4):
worker = threading.Thread(target=workerThread, args=(cmdqueue,))
worker.setDaemon(True)
worker.start()
Worker 4 ตัวจะ start ขึ้นมาเพื่อรอข้อมูลจากคิว ใครที่ถนัดหน่อยสามารถใช้ module อื่นๆ เอาจำนวนของ cpu ของเครื่องมาใช้ก็ได้
ตอนนี้เราก็มีทุกอย่างพร้อมใช้งาน เราสามารถส่งข้อมูลเข้าไปในคิวได้แล้ว วิธีส่งก็ใช้ method ชื่อ put แบบนี้
for i in range(2000):
data = {
'id': i,
}
cmdqueue.put(data)
ข้อสังเกต: ถ้าเอาโค้ดนี้ไปรันเลย เราจะไม่ได้อะไรออกมา เพราะว่า python จะไม่หยุดรอ (block) ให้ queue ว่าง หลังจากที่ put ข้อมูลเข้าไปแล้วจะถือว่า process นั้นเสร็จสิ้นทันที เราตั้งค่า setDaemon(True) ไว้ตอน start worker เพื่อที่จะให้ worker ทั้งหลายตายไปพร้อมกับตัว process กันปัญหา zombie thread
Queue instance มี method ที่ชื่อว่า join() เอาไว้สั่งให้ตัว program block ไว้จนกว่าทุกอย่างใน queue จะถูกดึงออกไปและได้รับ task_done() กลับมา แบบนี้
for i in range(2000):
data = {
'id': i,
}
cmdqueue.put(data)
cmdqueue.join()
เป็นอันเสร็จสิ้น ลองเล่นได้เลยครับ
Subscribe to Posts [Atom]