Показать сообщение отдельно

Perl.Многопоточность.Большие обьемы.
  #2  
Старый 12.04.2009, 18:49
[underwater]
Познающий
Регистрация: 29.03.2009
Сообщений: 87
С нами: 9010027

Репутация: 308
По умолчанию Perl.Многопоточность.Большие обьемы.

Код:
use POSIX ":sys_wait_h";
use IO::Handle;

# Количество процессов и кол-во задач на процесс
my $max_proc = 30;
my $max_per_proc = 30;

# Записываем строку запуска скрипта. Ниже будет видно зачем это.
my $exe = $0;

# Нихт зомби
$SIG{CHLD} = sub { while ((my $pid = waitpid(-1, WNOHANG)) > 0) {} };

# Создаём пайпы для обмена данными родитель-чайлд и чайлд-ридер, снимаем буферизацию
pipe(PARENT_RDR, CHILD_WTR);
pipe(CHILD_RDR,  PARENT_WTR);
CHILD_WTR->autoflush(1);
PARENT_WTR->autoflush(1);

# Записываем PID главного процесса, форкаем процесс-ридер
my $master_pid = $$;
my $reader_pid = fork();
die "Can't fork: $!" unless defined($reader_pid);

# Пошёл основной процесс.
if($reader_pid){
    open(F, $ARGV[0]) or die "Can't open file: $!\n";
    my %children;
    while(<F>){
        chomp;
        s/\r//;
        my $line = $_;
        $line = sprintf("%-128s", $line);
        syswrite(CHILD_WTR,$line,128);
        
        # Читаем из файла и пишем в пайп точное количество данных.
        # Это необходимо для того, чтобы задания правильно читались чайлдами.
        # Если использовать простые print/<>/read/write то чайлды будут читать огрызки заданий.
        
        # см. ниже =)
        $0 = "$exe";
        
        # Проверяем количество чайлдов и добавляем их, если нужно, до $max_proc
        while(checkChildren(\%children) < $max_proc){
            my $pid = fork();
            die "Can't fork: $!" unless defined($pid);
            if ($pid){
                $children{$pid} = 1;
            } else {
                close(CHILD_RDR);
                close(CHILD_WTR);
                
                # Вот собссно и сам чайлд
                
                processChild();
                exit;
            }
        }
    }
    
    # Файл кончился. Теперь пишем в пайп сигнал о выходе для всех чайлдов.
    while(checkChildren(\%children)){
        syswrite(CHILD_WTR,sprintf("%-128s", 'EXIT'),128);
    }
    
    # Убиваем ридера.
    kill(2,$reader_pid);
    close(F);
}else{

    # Здесь происходит запуск ридера
    close(PARENT_RDR);
    close(PARENT_WTR);
    readerProcess();
    exit;
}

sub processChild
{
    # см. ещё чуть ниже =)
    $0 = "$exe - CHILD";
    
    my $work = $max_per_proc;
    while($work){
        my $bytes = sysread(PARENT_RDR, my $line, 128);
        $line=~s/.*?\x00//;
        $line=~s/\s+?$//;
        $line=~s/\s+//g;
        exit if $line eq 'EXIT';
        next unless $line;
        # Получили задание. Оно будет гарантированно целым за счёт sysread/syswrite
        
        # Ну вот и добрались до странной конструкции =)
        # С её помощью при просмотре списка процессов мы увидим какой процесс чем занимается.
        # На работу системы не влияет, но категорически удобно =)
        $0 = "$exe - CHILD - $line";
       
        
        my $result;
        # Здесь обрабатываем нашу задачу и пишем в пайп ридеру
        # ...
        # ...
        
        
        $result = sprintf("%-128s", $result);
        syswrite(PARENT_WTR,$result,128);

        $0 = "$exe - CHILD";
        # ну это, думаю, понятно =)
        $work--;
    }
}

sub readerProcess
{

    # Это процесс ридера
    $SIG{INT} = sub {
        exit;
    };

    open(F,">result.txt");
    select F;
    $|=1;
    my $count;
    my $work = 1;
    
    # Читаем данные из пайпа, пишем в файл
    while($work){
        $count++;
        
        # Ридер рассказывает скока он уже обработал
        $0 = "$exe - READER - reading... ".$count;
        my $bytes = sysread(CHILD_RDR, my $data, 128);
        $data=~s/.*?\x00//;
        $data=~s/\s+?$//;
        $data=~s/\s+//g;
        last if $data eq 'EXIT';
        
        # Пишем
        print F "$data\n";
        
        # Можно проверить не сдох ли родительский процесс.
        # Если вдруг он загнулся (а такое может случиться), то выйти.
        last unless kill(0, $master_pid);
    }
    close(F);
}

sub checkChildren
{
    my $hash = shift;
    for my $pid(keys %{$hash}){
        if(!kill(0, $pid)){
            delete ${$hash}{$pid};
        }
    }
    return scalar keys %{$hash};
}
Варианты распараллеливания:

1.На каждую задачу выделяется один процесс. Вариант хорош тем, что каждое задание изолировано в отдельном процессе - не будет проблем с утечкой памяти и прочими заморочками (особенно когда юзаются сторонние модули).

2.Родить необходимое количество процессов и выдавать им задания.

3.Оптимальный вариант- принцип работы Апача. Основной процесс запускает заданное количество дочерних. Каждый дочерний выполняет установленное количество задач и умирает.
 
Ответить с цитированием