
12.04.2009, 18:49
|
|
Познающий
Регистрация: 29.03.2009
Сообщений: 87
С нами:
9010027
Репутация:
308
|
|
Perl.Многопоточность.Большие обьемы.
Код:
use POSIX ":sys_wait_h";
use IO::Handle;
# Количество процессов и кол-во задач на процесс
my $max_proc = 30;
my $max_per_proc = 30;
# Записываем строку запуска скрипта. Ниже будет видно зачем это.
my $exe = $0;
# Нихт зомби
$SIG{CHLD} = sub { while ((my $pid = waitpid(-1, WNOHANG)) > 0) {} };
# Создаём пайпы для обмена данными родитель-чайлд и чайлд-ридер, снимаем буферизацию
pipe(PARENT_RDR, CHILD_WTR);
pipe(CHILD_RDR, PARENT_WTR);
CHILD_WTR->autoflush(1);
PARENT_WTR->autoflush(1);
# Записываем PID главного процесса, форкаем процесс-ридер
my $master_pid = $$;
my $reader_pid = fork();
die "Can't fork: $!" unless defined($reader_pid);
# Пошёл основной процесс.
if($reader_pid){
open(F, $ARGV[0]) or die "Can't open file: $!\n";
my %children;
while(<F>){
chomp;
s/\r//;
my $line = $_;
$line = sprintf("%-128s", $line);
syswrite(CHILD_WTR,$line,128);
# Читаем из файла и пишем в пайп точное количество данных.
# Это необходимо для того, чтобы задания правильно читались чайлдами.
# Если использовать простые print/<>/read/write то чайлды будут читать огрызки заданий.
# см. ниже =)
$0 = "$exe";
# Проверяем количество чайлдов и добавляем их, если нужно, до $max_proc
while(checkChildren(\%children) < $max_proc){
my $pid = fork();
die "Can't fork: $!" unless defined($pid);
if ($pid){
$children{$pid} = 1;
} else {
close(CHILD_RDR);
close(CHILD_WTR);
# Вот собссно и сам чайлд
processChild();
exit;
}
}
}
# Файл кончился. Теперь пишем в пайп сигнал о выходе для всех чайлдов.
while(checkChildren(\%children)){
syswrite(CHILD_WTR,sprintf("%-128s", 'EXIT'),128);
}
# Убиваем ридера.
kill(2,$reader_pid);
close(F);
}else{
# Здесь происходит запуск ридера
close(PARENT_RDR);
close(PARENT_WTR);
readerProcess();
exit;
}
sub processChild
{
# см. ещё чуть ниже =)
$0 = "$exe - CHILD";
my $work = $max_per_proc;
while($work){
my $bytes = sysread(PARENT_RDR, my $line, 128);
$line=~s/.*?\x00//;
$line=~s/\s+?$//;
$line=~s/\s+//g;
exit if $line eq 'EXIT';
next unless $line;
# Получили задание. Оно будет гарантированно целым за счёт sysread/syswrite
# Ну вот и добрались до странной конструкции =)
# С её помощью при просмотре списка процессов мы увидим какой процесс чем занимается.
# На работу системы не влияет, но категорически удобно =)
$0 = "$exe - CHILD - $line";
my $result;
# Здесь обрабатываем нашу задачу и пишем в пайп ридеру
# ...
# ...
$result = sprintf("%-128s", $result);
syswrite(PARENT_WTR,$result,128);
$0 = "$exe - CHILD";
# ну это, думаю, понятно =)
$work--;
}
}
sub readerProcess
{
# Это процесс ридера
$SIG{INT} = sub {
exit;
};
open(F,">result.txt");
select F;
$|=1;
my $count;
my $work = 1;
# Читаем данные из пайпа, пишем в файл
while($work){
$count++;
# Ридер рассказывает скока он уже обработал
$0 = "$exe - READER - reading... ".$count;
my $bytes = sysread(CHILD_RDR, my $data, 128);
$data=~s/.*?\x00//;
$data=~s/\s+?$//;
$data=~s/\s+//g;
last if $data eq 'EXIT';
# Пишем
print F "$data\n";
# Можно проверить не сдох ли родительский процесс.
# Если вдруг он загнулся (а такое может случиться), то выйти.
last unless kill(0, $master_pid);
}
close(F);
}
sub checkChildren
{
my $hash = shift;
for my $pid(keys %{$hash}){
if(!kill(0, $pid)){
delete ${$hash}{$pid};
}
}
return scalar keys %{$hash};
}
Варианты распараллеливания:
1.На каждую задачу выделяется один процесс. Вариант хорош тем, что каждое задание изолировано в отдельном процессе - не будет проблем с утечкой памяти и прочими заморочками (особенно когда юзаются сторонние модули).
2.Родить необходимое количество процессов и выдавать им задания.
3.Оптимальный вариант- принцип работы Апача. Основной процесс запускает заданное количество дочерних. Каждый дочерний выполняет установленное количество задач и умирает.
|
|
|