# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. # # This Source Code Form is "Incompatible With Secondary Licenses", as # defined by the Mozilla Public License, v. 2.0. package Bugzilla::JobQueue; use 5.10.1; use strict; use warnings; use Bugzilla::Constants; use Bugzilla::Error; use Bugzilla::Install::Util qw(install_string); use Bugzilla::Util qw(read_text); use File::Basename; use base qw(TheSchwartz); use fields qw(_worker_pidfile); use List::MoreUtils qw(any); use Parallel::ForkManager; # This maps job names for Bugzilla::JobQueue to the appropriate modules. # If you add new types of jobs, you should add a mapping here. use constant JOB_MAP => { send_mail => 'Bugzilla::Job::Mailer', bug_mail => 'Bugzilla::Job::BugMail', }; # Without a driver cache TheSchwartz opens a new database connection # for each email it sends. This cached connection doesn't persist # across requests. use constant DRIVER_CACHE_TIME => 0; # 5 minutes # To avoid memory leak/fragmentation, a worker process won't process more than # MAX_MESSAGES messages. use constant MAX_MESSAGES => 1000; sub job_map { if (!defined(Bugzilla->request_cache->{job_map})) { my $job_map = JOB_MAP; Bugzilla::Hook::process('job_map', {job_map => $job_map}); Bugzilla->request_cache->{job_map} = $job_map; } return Bugzilla->request_cache->{job_map}; } sub new { my $class = shift; if (!Bugzilla->feature('jobqueue')) { ThrowUserError('feature_disabled', {feature => 'jobqueue'}); } my $lc = Bugzilla->localconfig; # We need to use the main DB as TheSchwartz module is going # to write to it. my $self = $class->SUPER::new( databases => [{ dsn => Bugzilla->dbh_main->{private_bz_dsn}, user => $lc->{db_user}, pass => $lc->{db_pass}, prefix => 'ts_', }], driver_cache_expiration => DRIVER_CACHE_TIME, prioritize => 1, ## REDHAT EXTENSION BEGIN 1206448 batch_size => 1, ## REDHAT EXTENSION END 1206448 ); return $self; } # A way to get access to the underlying databases directly. sub bz_databases { my $self = shift; my @hashes = keys %{$self->{databases}}; return map { $self->driver_for($_) } @hashes; } # inserts a job into the queue to be processed and returns immediately sub insert { my $self = shift; my $job = shift; if (!ref($job)) { my $mapped_job = Bugzilla::JobQueue->job_map()->{$job}; ThrowCodeError('jobqueue_no_job_mapping', {job => $job}) if !$mapped_job; my $run_after = delete($_[0]->{run_after}); $job = new TheSchwartz::Job( funcname => $mapped_job, arg => $_[0], priority => $_[1] || 5, run_after => $run_after, ); } my $retval = $self->SUPER::insert($job); # XXX Need to get an error message here if insert fails, but # I don't see any way to do that in TheSchwartz. ThrowCodeError('jobqueue_insert_failed', {job => $job, errmsg => $@}) if !$retval; return $retval; } # To avoid memory leaks/fragmentation which tends to happen for long running # perl processes; check for jobs, and spawn a new process to empty the queue. sub subprocess_worker { my $self = shift; my $name = shift; # REDHAT EXTENSION START 1992413 # Create SMTP connection before forking and cache it to reduce connection count my $method = Bugzilla->params->{'mail_delivery_method'}; my $test_method = Bugzilla->params->{'mail_delivery_override'}; if ($method eq "Test" && $test_method ne '') { $method = $test_method; } if ($method eq "SMTP") { unless (Bugzilla->request_cache->{smtp}) { my ($host, $port) = split(/:/, Bugzilla->params->{'smtpserver'}, 2); Bugzilla->request_cache->{smtp} = Email::Sender::Transport::SMTP::Persistent->new( { host => $host, defined($port) ? (port => $port) : (), sasl_username => Bugzilla->params->{'smtp_username'}, sasl_password => Bugzilla->params->{'smtp_password'}, helo => Bugzilla->params->{'cookiedomain'}, ssl => Bugzilla->params->{'smtp_ssl'}, debug => Bugzilla->params->{'smtp_debug'} } ); } } # REDHAT EXTENSION END 1992413 # REDHAT EXTENSION START 1900483 my $pm = Parallel::ForkManager->new(Bugzilla->params->{'num_jobs'}); JOBS: while (1) { my $time = (time); my @jobs = $self->list_jobs({ funcname => $self->{all_abilities}, run_after => $time, grabbed_until => $time, limit => 1, }); if (@jobs) { Bugzilla->logger->debug("Spawning queue worker process"); $pm->start and next JOBS; Bugzilla->request_cache->{in_child} = 1; Bugzilla->check_dbh(); $self->work_once(); Bugzilla->logger->debug("Queue worker process completed"); $pm->finish; } else { Bugzilla->logger->debug("No jobs found"); sleep(1); } } $pm->wait_all_children; return; # REDHAT EXTENSION START 1900483 } sub kill_worker { my $self = Bugzilla->job_queue(); if ($self->{_worker_pidfile} && -e $self->{_worker_pidfile}) { my $worker_pid = read_text($self->{_worker_pidfile}); if ($worker_pid && kill(0, $worker_pid)) { Bugzilla->logger->debug("Stopping worker process"); system "$0 -f -p '" . $self->{_worker_pidfile} . "' stop"; } } } sub set_pidfile { my ($self, $pidfile) = @_; $self->{_worker_pidfile} = bz_locations->{'datadir'} . '/worker-' . basename($pidfile); } # Clear the request cache at the start of each run. sub work_once { my $self = shift; my $transport = Bugzilla->request_cache->{smtp}; # REDHAT EXTENSION 1992413 Bugzilla->clear_request_cache(); Bugzilla->request_cache->{smtp} = $transport; # REDHAT EXTENSION 1992413 return $self->SUPER::work_once(@_); } # Never process more than MAX_MESSAGES in one batch, to avoid memory # leak/fragmentation issues. sub work_until_done { my $self = shift; my $count = 0; while ($count++ < MAX_MESSAGES) { $self->work_once or last; } } 1; __END__ =head1 NAME Bugzilla::JobQueue - Interface between Bugzilla and TheSchwartz. =head1 SYNOPSIS use Bugzilla; my $obj = Bugzilla->job_queue(); $obj->insert('send_mail', { msg => $message }); =head1 DESCRIPTION Certain tasks should be done asyncronously. The job queue system allows Bugzilla to use some sort of service to schedule jobs to happen asyncronously. =head2 Inserting a Job See the synopsis above for an easy to follow example on how to insert a job into the queue. Give it a name and some arguments and the job will be sent away to be done later. =head1 B =over =item insert =item bz_databases =item job_map =item set_pidfile =item kill_worker =back