Madhu Reddy wrote:
> Hi,
> I have following sorting program...
> basically it will split the large files into small
> file and creates thread..each thread will sort files
> after that merge back all sorted files...
>
> this program works fine on single CPU machine...
> same program giving problem on 8 CPU machine...
>
Hi Madhu,
Is this NT Server? If not, your problem may lie in an OS incompatibiity, since
worksttion versions of NT will not support more than four CPUs.
It is clear from the output that the problem occurs here on line 47 [marked in
reformatted script]:
push(@threads, new threads(\&sort_it,$_));
It could be that the primary thread goes on to:
$_->join for(@threads);
my_print("Sorting completed, merging started\n");
Before the first sort_it thread returns. Or the first thread moves on before the
others finish. I don't know the fine points of Perl threads, but I don't see anything
explicitly telling the application to wait for all threads to complete. The problem
might arise when the application goes on to this line [#113]
my @files = sort {$ref->{$a} <=> $ref->{$b}} keys %{$ref};
Since the next lines are never executed:
my $merged_to = $files[0];
my_print("merging : $files[0]\n");
The more I think about it, the more I think you need some explicit wait command when
you launch the threads or before moving on to the
$_->join for(@threads);
statement.
Joseph
Somewhat easier to read copy of your application follows [Thanks to smart-tabbing
options of
Programmers File Editor]:
BTW, you do NOT need to put the actual path in on Win32. It gets association
information
from the Registry. use the standard 'nix path for compatibility:
#!/usr/bin/perl -w
#!C:\perl_5.8\bin\perl -w #Perl translates slashes automatically. Avoid
backslashes.
#use strict;
use threads;
use threads::shared;
my $counter = 0;
my @tmp_files;
my @buffer;
my %bounds:shared = ();
my $i = 0;
my $tm1 = &my_time();
my $file = 'D:\Madhu\Tmp\abi_feeder.dat';
open(FILE,$file) || die $!;
while(<FILE>){
push(@buffer,$_);
if(@buffer > 1000000){
$counter++; #LINE 20
my $tmp = "tmp_$counter.txt";
open(TMP,">$tmp") || die $!;
for(@buffer){
print TMP $_;
}
close(TMP);
push(@tmp_files,$tmp);
@buffer = ();
}
}
if(@buffer){
my $tmp = "tmp_" . ++$counter . ".txt";
open(TMP,">$tmp") || die $!;
for(@buffer){
print TMP $_;
}
push(@tmp_files,$tmp);
close(TMP); #LINE 40
@buffer = ();
}
my @threads = ();
for(@tmp_files){
push(@threads, new threads(\&sort_it,$_));
}
$_->join for(@threads);
my_print("Sorting completed, merging started\n");
#$thrs = scalar(@threads);
#print "no of threads : $thrs\n";
#sleep(10);
my @keys = keys %bounds;
#my @vals = values %bounds; #LINE 60
my $n_keys = scalar(@keys);
my_print("no of keys : $n_keys\n");
#print "vals : @vals\n";
merge_it(\%bounds);
my_print("merge completed\n");
my $tm2 = &my_time();
print "\n\n----------------Report ------------\n";
print "---------------- Sort Start : $tm1\n";
print "---------------- Sort End : $tm2\n";
print "-----------------------------------\n";
sub sort_it{
# my $ref = shift;
# my $tmp = shift;
my $chunk = shift; #LINE 80
my $first = 1;
my $tid = threads->self->tid();
my_print("thread $tid Sorting chunk : $chunk\n");
my @buf = ();
open(TMP,"$chunk") || die $!;
push (@buf, $_) while(<TMP>);
close(TMP);
open(TMP,">$chunk") || die $!;
for(sort {my $fields1 = substr($a,10,10);
my $fields2 = substr($b,10,10);
$fields1 <=> $fields2 } @buf){
if($first){
{ #lock
lock(%bounds);
$bounds{$chunk} = substr($_,10,10);
} # unlock
$first = 0;
}
print TMP $_;
} #LINE 100
close(TMP);
# my @keys = keys %bounds;
# print "keys : @keys\n";
my_print("thread $tid Sorting chunk : $chunk COMPLETED\n");
}
sub merge_it{
my $ref = shift;
my @files = sort {$ref->{$a} <=> $ref->{$b}} keys %{$ref}; # LINE 113
my $merged_to = $files[0];
my_print("merging : $files[0]\n");
for(my $i=1; $i<@files; $i++){
open(FIRST,$merged_to) || dir $!;
open(SECOND,$files[$i]) || dir $!;
my_print ("merging : $files[$i]\n");
my $merged_tmp = "merged_tmp$i.txt";
open(MERGED,">$merged_tmp") || die $!;
my $line1 = <FIRST>;
my $line2 = <SECOND>; #LINE 120
while(1){
if(!defined($line1) && defined($line2)){
print MERGED $line2;
print MERGED while(<SECOND>);
last;
}
if(!defined($line2) && defined($line1)){
print MERGED $line1;
print MERGED while(<FIRST>);
last;
}
last if(!defined($line1) && !defined($line2));
my $value1 = substr($line1,10,10);
my $value2 = substr($line2,10,10);
if($value1 == $value2){
print MERGED $line1;
print MERGED $line2;
$line1 = <FIRST>;
$line2 = <SECOND>;
}elsif($value1 > $value2){ #LINE 140
while($value1 > $value2){
print MERGED $line2;
$line2 = <SECOND>;
last unless(defined $line2);
$value2 = substr($line2,10,10);
}
}else{
while($value1 < $value2){
print MERGED $line1;
$line1 = <FIRST>;
last unless(defined $line1);
$value1 = substr($line1,10,10);
}
}
}
close(FIRST);
close(SECOND);
close(MERGED);
unlink $merged_to;
unlink $files[$i]; #LINE 160
$merged_to = $merged_tmp;
}
}
sub my_print
{
my $cur_time = my_time();
print "$cur_time @_ ";
}
sub my_time()
{
my $time= time ;
my $daytime = localtime($time) ;
return $daytime ;
} #LINE 180
--
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]