diff options
Diffstat (limited to 'Bugzilla/JobQueue.pm')
-rw-r--r-- | Bugzilla/JobQueue.pm | 210 |
1 files changed, 106 insertions, 104 deletions
diff --git a/Bugzilla/JobQueue.pm b/Bugzilla/JobQueue.pm index 6ff85d84f..e48182007 100644 --- a/Bugzilla/JobQueue.pm +++ b/Bugzilla/JobQueue.pm @@ -21,153 +21,155 @@ use fields qw(_worker_pidfile); # This maps job names for Bugzilla::JobQueue to the appropriate modules. # If you add new types of jobs, you should add a mapping here. -use constant JOB_MAP => { - send_mail => 'Bugzilla::Job::Mailer', - bug_mail => 'Bugzilla::Job::BugMail', -}; +use constant JOB_MAP => + {send_mail => 'Bugzilla::Job::Mailer', bug_mail => 'Bugzilla::Job::BugMail',}; # Without a driver cache TheSchwartz opens a new database connection # for each email it sends. This cached connection doesn't persist # across requests. -use constant DRIVER_CACHE_TIME => 300; # 5 minutes +use constant DRIVER_CACHE_TIME => 300; # 5 minutes # To avoid memory leak/fragmentation, a worker process won't process more than # MAX_MESSAGES messages. use constant MAX_MESSAGES => 1000; sub job_map { - if (!defined(Bugzilla->request_cache->{job_map})) { - my $job_map = JOB_MAP; - Bugzilla::Hook::process('job_map', { job_map => $job_map }); - Bugzilla->request_cache->{job_map} = $job_map; - } - - return Bugzilla->request_cache->{job_map}; + if (!defined(Bugzilla->request_cache->{job_map})) { + my $job_map = JOB_MAP; + Bugzilla::Hook::process('job_map', {job_map => $job_map}); + Bugzilla->request_cache->{job_map} = $job_map; + } + + return Bugzilla->request_cache->{job_map}; } sub new { - my $class = shift; - - if (!Bugzilla->feature('jobqueue')) { - ThrowUserError('feature_disabled', { feature => 'jobqueue' }); - } - - my $lc = Bugzilla->localconfig; - # We need to use the main DB as TheSchwartz module is going - # to write to it. - my $self = $class->SUPER::new( - databases => [{ - dsn => Bugzilla->dbh_main->{private_bz_dsn}, - user => $lc->{db_user}, - pass => $lc->{db_pass}, - prefix => 'ts_', - }], - driver_cache_expiration => DRIVER_CACHE_TIME, - prioritize => 1, - ); - - return $self; + my $class = shift; + + if (!Bugzilla->feature('jobqueue')) { + ThrowUserError('feature_disabled', {feature => 'jobqueue'}); + } + + my $lc = Bugzilla->localconfig; + + # We need to use the main DB as TheSchwartz module is going + # to write to it. + my $self = $class->SUPER::new( + databases => [{ + dsn => Bugzilla->dbh_main->{private_bz_dsn}, + user => $lc->{db_user}, + pass => $lc->{db_pass}, + prefix => 'ts_', + }], + driver_cache_expiration => DRIVER_CACHE_TIME, + prioritize => 1, + ); + + return $self; } # A way to get access to the underlying databases directly. sub bz_databases { - my $self = shift; - my @hashes = keys %{ $self->{databases} }; - return map { $self->driver_for($_) } @hashes; + my $self = shift; + my @hashes = keys %{$self->{databases}}; + return map { $self->driver_for($_) } @hashes; } # inserts a job into the queue to be processed and returns immediately sub insert { - my $self = shift; - my $job = shift; - - if (!ref($job)) { - my $mapped_job = Bugzilla::JobQueue->job_map()->{$job}; - ThrowCodeError('jobqueue_no_job_mapping', { job => $job }) - if !$mapped_job; - - $job = new TheSchwartz::Job( - funcname => $mapped_job, - arg => $_[0], - priority => $_[1] || 5 - ); - } - - my $retval = $self->SUPER::insert($job); - # XXX Need to get an error message here if insert fails, but - # I don't see any way to do that in TheSchwartz. - ThrowCodeError('jobqueue_insert_failed', { job => $job, errmsg => $@ }) - if !$retval; - - return $retval; + my $self = shift; + my $job = shift; + + if (!ref($job)) { + my $mapped_job = Bugzilla::JobQueue->job_map()->{$job}; + ThrowCodeError('jobqueue_no_job_mapping', {job => $job}) if !$mapped_job; + + $job = new TheSchwartz::Job( + funcname => $mapped_job, + arg => $_[0], + priority => $_[1] || 5 + ); + } + + my $retval = $self->SUPER::insert($job); + + # XXX Need to get an error message here if insert fails, but + # I don't see any way to do that in TheSchwartz. + ThrowCodeError('jobqueue_insert_failed', {job => $job, errmsg => $@}) + if !$retval; + + return $retval; } # To avoid memory leaks/fragmentation which tends to happen for long running # perl processes; check for jobs, and spawn a new process to empty the queue. sub subprocess_worker { - my $self = shift; - - my $command = "$0 -d -p '" . $self->{_worker_pidfile} . "' onepass"; - - while (1) { - my $time = (time); - my @jobs = $self->list_jobs({ - funcname => $self->{all_abilities}, - run_after => $time, - grabbed_until => $time, - limit => 1, - }); - if (@jobs) { - $self->debug("Spawning queue worker process"); - # Run the worker as a daemon - system $command; - # And poll the PID to detect when the working has finished. - # We do this instead of system() to allow for the INT signal to - # interrup us and trigger kill_worker(). - my $pid = read_text($self->{_worker_pidfile}, err_mode => 'quiet'); - if ($pid) { - sleep(3) while(kill(0, $pid)); - } - $self->debug("Queue worker process completed"); - } else { - $self->debug("No jobs found"); - } - sleep(5); + my $self = shift; + + my $command = "$0 -d -p '" . $self->{_worker_pidfile} . "' onepass"; + + while (1) { + my $time = (time); + my @jobs = $self->list_jobs({ + funcname => $self->{all_abilities}, + run_after => $time, + grabbed_until => $time, + limit => 1, + }); + if (@jobs) { + $self->debug("Spawning queue worker process"); + + # Run the worker as a daemon + system $command; + + # And poll the PID to detect when the working has finished. + # We do this instead of system() to allow for the INT signal to + # interrup us and trigger kill_worker(). + my $pid = read_text($self->{_worker_pidfile}, err_mode => 'quiet'); + if ($pid) { + sleep(3) while (kill(0, $pid)); + } + $self->debug("Queue worker process completed"); } + else { + $self->debug("No jobs found"); + } + sleep(5); + } } sub kill_worker { - my $self = Bugzilla->job_queue(); - if ($self->{_worker_pidfile} && -e $self->{_worker_pidfile}) { - my $worker_pid = read_text($self->{_worker_pidfile}); - if ($worker_pid && kill(0, $worker_pid)) { - $self->debug("Stopping worker process"); - system "$0 -f -p '" . $self->{_worker_pidfile} . "' stop"; - } + my $self = Bugzilla->job_queue(); + if ($self->{_worker_pidfile} && -e $self->{_worker_pidfile}) { + my $worker_pid = read_text($self->{_worker_pidfile}); + if ($worker_pid && kill(0, $worker_pid)) { + $self->debug("Stopping worker process"); + system "$0 -f -p '" . $self->{_worker_pidfile} . "' stop"; } + } } sub set_pidfile { - my ($self, $pidfile) = @_; - $self->{_worker_pidfile} = bz_locations->{'datadir'} . - '/worker-' . basename($pidfile); + my ($self, $pidfile) = @_; + $self->{_worker_pidfile} + = bz_locations->{'datadir'} . '/worker-' . basename($pidfile); } # Clear the request cache at the start of each run. sub work_once { - my $self = shift; - Bugzilla->clear_request_cache(); - return $self->SUPER::work_once(@_); + my $self = shift; + Bugzilla->clear_request_cache(); + return $self->SUPER::work_once(@_); } # Never process more than MAX_MESSAGES in one batch, to avoid memory # leak/fragmentation issues. sub work_until_done { - my $self = shift; - my $count = 0; - while ($count++ < MAX_MESSAGES) { - $self->work_once or last; - } + my $self = shift; + my $count = 0; + while ($count++ < MAX_MESSAGES) { + $self->work_once or last; + } } 1; |