Python: GIL, threading and I/O. Pt.2 (original) (raw)

February 7 2010, 12:16

Category:

Итак, встала задача каким-то образом обеспечить фоновое упреждающее чтение файлов.
Поразмыслив над проблемой, сделал для себя два вывода. Первое -- не надо трогать существующий работающий однопоточный код (тот самый цикл, бегущий по списку файлов). Второе -- решаемая задача красиво сводиться к общему универсальному решению, которое потом можно будет многократно использовать.

Что должно представлять из себя это общее решение? Вспомним, что существующий код (который я решил не трогать), это фактически процесс, завязанный на итерабельном объекте (списке с именами файлов). Т.е. наружный интерфейс для универсального решения понятен -- нечто итерабельное.
Каким должен быть внутренний интерфейс? В исходном решении мы "бежали" по списку файлов -- понятно, что теперь мы должны делать тоже самое, только в параллельном потоке.

Итак, наше решение это некий итерабельный объект, которому в конструктор дают нечто итерабельное, по которому он должен пробежаться в параллельно потоке и отдать как результат итераций по самому себе. Т.е. это обертка для чего-то итерабельного, и смысл обертки в том, что сам процесс перечисления будет происходить асинхронно в фоновом потоке.

Полученное решение я назвал IterInThread.

`Copy Source | Copy HTML
class IterInThread:""" Enumerate seq in background thread """def init(self, seq : Iterable, maxInQueue : int):def ThreadBody():def Add(last : bool, result):
with self.lock:
self.queue.append( (last, result) )
self.wakeupIter.notify_all()for i in seq:

push result

Add(False, i)# can process next item?
with self.lock:
if len(self.queue) >= maxInQueue:
self.wakeupThread.wait()# add 'last' item
Add(True, None)

self.lock = threading.Lock()
self.wakeupThread = threading.Condition(self.lock)
self.wakeupIter = threading.Condition(self.lock)

self.queue = []

array of (last : bool, result)

self.thread = threading.Thread(target =

ThreadBody)
self.thread.start()def iter(self):while True:
with self.lock:
if len(self.queue) == 0:
self.wakeupThread.notify_all()
self.wakeupIter.wait()
else:
result = self.queue[ 0]
self.queue = self.queue[1:]
if (result[ 0]): break
self.wakeupThread.notify_all()
yield result[1] `

Один нюанс. Вспомним, что в случае с исходным скриптом, который патчил файлы, мы фактически загружали файл целиком в память. В случае, когда мы пускаем этот процесс асинхронно в фоне, мы наверняка каким-то способом захотим ввести ограничение -- насколько фоновая итерация может "убежать вперед", т.к. память не резиновая и слишком большое упреждение нет смысла делать. Отсюда второй параметр конструктора -- maxInQueue.

Надеюсь, что в коде особо комментировать не чего. Созданый в конструкторе тред читает данные из итерабельного объекта в очередь, защищенную мютексом. Если в очереди стало слишком много элементов (больше чем maxInQueue) это означает, что мы ушли далеко вперед от главного потока, и фоновый поток ложится в ожидание, из которого его поднимают по факту извлечения элемента из очереди главным потоком.
Главный поток, по мере необходимости (через интерацию), забирает данные из очереди. Если данных нет, то он уходит в ожидание, из которого его должен вывести фоновый поток, после того, как в очередь будет положен очередной результат итерации.

Честно говоря, приводить весь код исходного скрипта для патчинга файлов не вижу смысла, лучше покажу более просто и наглядный пример: как можно применить класс IterInThread для подсчета CRC32 для файла.

`Copy Source | Copy HTML

return (crc32, rate)

def CalcCRC32(fileName : str) -> (int, float):
t = time.clock()with io.open(fileName, 'rb') as f:
data = f.read()

sizeMb = len(data) / (

1024 * 1024)
crc = binascii.crc32(data)

t = time.clock() - t

return (crc, sizeMb / t)def CalcCRC32Threaded(fileName : str) -> (int, float):

CLastDw = 0xffffffff
CBlockSize =

512 * 1024def LoadNext():
with io.open(fileName, 'rb') as f:
while True:
data = f.read(CBlockSize)
if len(data) > 0: yield data
if len(data) != CBlockSize: break

t = time.clock()

size =

0
crc = 0for block in IterInThread(LoadNext(), 2):
crc = binascii.crc32(block, crc)
size += len(block)

sizeMb = size / (

1024 * 1024)

crc = crc & CLastDw

t = time.clock() - t

return (crc, sizeMb / t)`

Функция CalcCRC32() работает в лоб -- читает файл целиком в память, после этого считает CRC32.
Ну а функция CalcCRC32Threaded() работает поблочно, пытаясь прочитать в фоне до 2-х блоков, размером в полмегабайта. У меня она работает ощутимо быстрее (разумеется, при условии, что файл, выбранный для подсчета CRC32, не находится в кэше).

И напоследок. Увидел, что GIL проблему в питоне пытаются решать не путем запуска нескольких потоков, а путем запуска нескольких процессов (т.е. нескольких экземпляров интерпретатора). Понятно, что обмен данными в этом случае надо делать несколько через жо... (читай -- через pickle) и работать это может не очень хорошо и эффективно. Инструментарий для всего этого безобразия находится в модуле multiprocessing, поиграться с этим всем добром пока не успел.

Часть первая -- http://cd-riper.livejournal.com/251840.html

Тут Канделаки показывает всё на что способна)

LJ Video