Get rid of horrible ticker() stuff and replace with AddTimer calls
git-svn-id: https://svn.perl.org/qpsmtpd/branches/high_perf@447 958fd67b-6ff1-0310-b445-bb7760255be9
This commit is contained in:
parent
be6b0e203c
commit
6047477c11
@ -113,6 +113,7 @@ sub enable_read {
|
|||||||
my Danga::Client $self = shift;
|
my Danga::Client $self = shift;
|
||||||
$self->{disable_read}--;
|
$self->{disable_read}--;
|
||||||
if ($self->{disable_read} <= 0) {
|
if ($self->{disable_read} <= 0) {
|
||||||
|
warn("read back on\n");
|
||||||
$self->{disable_read} = 0;
|
$self->{disable_read} = 0;
|
||||||
$self->watch_read(1);
|
$self->watch_read(1);
|
||||||
}
|
}
|
||||||
|
@ -47,6 +47,8 @@ sub new {
|
|||||||
|
|
||||||
$self->watch_read(1);
|
$self->watch_read(1);
|
||||||
|
|
||||||
|
$self->AddTimer(5, sub { $self->_do_cleanup });
|
||||||
|
|
||||||
return $self;
|
return $self;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -101,12 +103,6 @@ sub query_txt {
|
|||||||
$self->_query($asker, $host, 'TXT', $now) || return;
|
$self->_query($asker, $host, 'TXT', $now) || return;
|
||||||
}
|
}
|
||||||
|
|
||||||
# run cleanup every 5 seconds
|
|
||||||
if ($now - 5 > $last_cleanup) {
|
|
||||||
$last_cleanup = $now;
|
|
||||||
$self->_do_cleanup($now);
|
|
||||||
}
|
|
||||||
|
|
||||||
#print "+Pending queries: " . keys(%{$self->{id_to_asker}}) .
|
#print "+Pending queries: " . keys(%{$self->{id_to_asker}}) .
|
||||||
# " / Cache Size: " . keys(%{$self->{cache}}) . "\n";
|
# " / Cache Size: " . keys(%{$self->{cache}}) . "\n";
|
||||||
|
|
||||||
@ -125,12 +121,6 @@ sub query_mx {
|
|||||||
$self->_query($asker, $host, 'MX', $now) || return;
|
$self->_query($asker, $host, 'MX', $now) || return;
|
||||||
}
|
}
|
||||||
|
|
||||||
# run cleanup every 5 seconds
|
|
||||||
if ($now - 5 > $last_cleanup) {
|
|
||||||
$last_cleanup = $now;
|
|
||||||
$self->_do_cleanup($now);
|
|
||||||
}
|
|
||||||
|
|
||||||
#print "+Pending queries: " . keys(%{$self->{id_to_asker}}) .
|
#print "+Pending queries: " . keys(%{$self->{id_to_asker}}) .
|
||||||
# " / Cache Size: " . keys(%{$self->{cache}}) . "\n";
|
# " / Cache Size: " . keys(%{$self->{cache}}) . "\n";
|
||||||
|
|
||||||
@ -149,31 +139,17 @@ sub query {
|
|||||||
$self->_query($asker, $host, 'A', $now) || return;
|
$self->_query($asker, $host, 'A', $now) || return;
|
||||||
}
|
}
|
||||||
|
|
||||||
# run cleanup every 5 seconds
|
|
||||||
if ($now - 5 > $last_cleanup) {
|
|
||||||
$last_cleanup = $now;
|
|
||||||
$self->_do_cleanup($now);
|
|
||||||
}
|
|
||||||
|
|
||||||
#print "+Pending queries: " . keys(%{$self->{id_to_asker}}) .
|
#print "+Pending queries: " . keys(%{$self->{id_to_asker}}) .
|
||||||
# " / Cache Size: " . keys(%{$self->{cache}}) . "\n";
|
# " / Cache Size: " . keys(%{$self->{cache}}) . "\n";
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
sub ticker {
|
|
||||||
my Danga::DNS::Resolver $self = shift;
|
|
||||||
my $now = time;
|
|
||||||
# run cleanup every 5 seconds
|
|
||||||
if ($now - 5 > $last_cleanup) {
|
|
||||||
$last_cleanup = $now;
|
|
||||||
$self->_do_cleanup($now);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
sub _do_cleanup {
|
sub _do_cleanup {
|
||||||
my Danga::DNS::Resolver $self = shift;
|
my Danga::DNS::Resolver $self = shift;
|
||||||
my $now = shift;
|
my $now = time;
|
||||||
|
|
||||||
|
$self->AddTimer(5, sub { $self->_do_cleanup });
|
||||||
|
|
||||||
my $idle = $self->max_idle_time;
|
my $idle = $self->max_idle_time;
|
||||||
|
|
||||||
|
@ -201,17 +201,6 @@ sub KQueueEventLoop {
|
|||||||
my $timeout = @Timers ? ($Timers[0][0] - $now) : 1;
|
my $timeout = @Timers ? ($Timers[0][0] - $now) : 1;
|
||||||
my @ret = $KQueue->kevent($timeout * 1000);
|
my @ret = $KQueue->kevent($timeout * 1000);
|
||||||
|
|
||||||
if (!@ret) {
|
|
||||||
foreach my $fd ( keys %DescriptorMap ) {
|
|
||||||
my Danga::Socket $sock = $DescriptorMap{$fd};
|
|
||||||
if ($sock->can('ticker')) {
|
|
||||||
$sock->ticker;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
my @objs;
|
|
||||||
|
|
||||||
foreach my $kev (@ret) {
|
foreach my $kev (@ret) {
|
||||||
my ($fd, $filter, $flags, $fflags) = @$kev;
|
my ($fd, $filter, $flags, $fflags) = @$kev;
|
||||||
|
|
||||||
@ -222,20 +211,16 @@ sub KQueueEventLoop {
|
|||||||
if (my $code = $OtherFds{$fd}) {
|
if (my $code = $OtherFds{$fd}) {
|
||||||
$code->($filter);
|
$code->($filter);
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
print STDERR "kevent() returned fd $fd for which we have no mapping. removing.\n";
|
||||||
|
POSIX::close($fd); # close deletes the kevent entry
|
||||||
|
}
|
||||||
next;
|
next;
|
||||||
}
|
}
|
||||||
|
|
||||||
DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), flags=%d \@ %s\n",
|
DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), flags=%d \@ %s\n",
|
||||||
$fd, ref($pob), $flags, time);
|
$fd, ref($pob), $flags, time);
|
||||||
|
|
||||||
push @objs, [$pob, $filter, $flags, $fflags];
|
|
||||||
}
|
|
||||||
|
|
||||||
# TODO - prioritize the objects
|
|
||||||
|
|
||||||
foreach (@objs) {
|
|
||||||
my ($pob, $filter, $flags, $fflags) = @$_;
|
|
||||||
|
|
||||||
$pob->event_read if $filter == IO::KQueue::EVFILT_READ() && !$pob->{closed};
|
$pob->event_read if $filter == IO::KQueue::EVFILT_READ() && !$pob->{closed};
|
||||||
$pob->event_write if $filter == IO::KQueue::EVFILT_WRITE() && !$pob->{closed};
|
$pob->event_write if $filter == IO::KQueue::EVFILT_WRITE() && !$pob->{closed};
|
||||||
if ($flags == IO::KQueue::EV_EOF() && !$pob->{closed}) {
|
if ($flags == IO::KQueue::EV_EOF() && !$pob->{closed}) {
|
||||||
@ -277,17 +262,6 @@ sub EpollEventLoop {
|
|||||||
my $i;
|
my $i;
|
||||||
my $evcount = epoll_wait($Epoll, 1000, $timeout * 1000, \@events);
|
my $evcount = epoll_wait($Epoll, 1000, $timeout * 1000, \@events);
|
||||||
|
|
||||||
if (!$evcount) {
|
|
||||||
foreach my $fd ( keys %DescriptorMap ) {
|
|
||||||
my Danga::Socket $sock = $DescriptorMap{$fd};
|
|
||||||
if ($sock->can('ticker')) {
|
|
||||||
$sock->ticker;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
next;
|
|
||||||
}
|
|
||||||
|
|
||||||
my @objs;
|
|
||||||
EVENT:
|
EVENT:
|
||||||
for ($i=0; $i<$evcount; $i++) {
|
for ($i=0; $i<$evcount; $i++) {
|
||||||
my $ev = $events[$i];
|
my $ev = $events[$i];
|
||||||
@ -306,17 +280,18 @@ sub EpollEventLoop {
|
|||||||
if (my $code = $OtherFds{$ev->[0]}) {
|
if (my $code = $OtherFds{$ev->[0]}) {
|
||||||
$code->($state);
|
$code->($state);
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
my $fd = $ev->[0];
|
||||||
|
print STDERR "epoll() returned fd $fd w/ state $state for which we have no mapping. removing.\n";
|
||||||
|
POSIX::close($fd);
|
||||||
|
epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, 0);
|
||||||
|
}
|
||||||
next;
|
next;
|
||||||
}
|
}
|
||||||
|
|
||||||
DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), state=%d \@ %s\n",
|
DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), state=%d \@ %s\n",
|
||||||
$ev->[0], ref($pob), $ev->[1], time);
|
$ev->[0], ref($pob), $ev->[1], time);
|
||||||
|
|
||||||
push @objs, [$pob, $state];
|
|
||||||
}
|
|
||||||
|
|
||||||
foreach (@objs) {
|
|
||||||
my ($pob, $state) = @$_;
|
|
||||||
$pob->event_read if $state & EPOLLIN && ! $pob->{closed};
|
$pob->event_read if $state & EPOLLIN && ! $pob->{closed};
|
||||||
$pob->event_write if $state & EPOLLOUT && ! $pob->{closed};
|
$pob->event_write if $state & EPOLLOUT && ! $pob->{closed};
|
||||||
$pob->event_err if $state & EPOLLERR && ! $pob->{closed};
|
$pob->event_err if $state & EPOLLERR && ! $pob->{closed};
|
||||||
@ -361,15 +336,6 @@ sub PollEventLoop {
|
|||||||
return 0 unless @poll;
|
return 0 unless @poll;
|
||||||
|
|
||||||
my $count = IO::Poll::_poll($timeout * 1000, @poll);
|
my $count = IO::Poll::_poll($timeout * 1000, @poll);
|
||||||
if (!$count) {
|
|
||||||
foreach my $fd ( keys %DescriptorMap ) {
|
|
||||||
my Danga::Socket $sock = $DescriptorMap{$fd};
|
|
||||||
if ($sock->can('ticker')) {
|
|
||||||
$sock->ticker;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
next;
|
|
||||||
}
|
|
||||||
|
|
||||||
# Fetch handles with read events
|
# Fetch handles with read events
|
||||||
while (@poll) {
|
while (@poll) {
|
||||||
|
@ -7,6 +7,8 @@ use fields qw(alive_time create_time);
|
|||||||
|
|
||||||
our $last_cleanup = 0;
|
our $last_cleanup = 0;
|
||||||
|
|
||||||
|
Danga::Socket->AddTimer(15, \&_do_cleanup);
|
||||||
|
|
||||||
sub new {
|
sub new {
|
||||||
my Danga::TimeoutSocket $self = shift;
|
my Danga::TimeoutSocket $self = shift;
|
||||||
my $sock = shift;
|
my $sock = shift;
|
||||||
@ -16,31 +18,18 @@ sub new {
|
|||||||
my $now = time;
|
my $now = time;
|
||||||
$self->{alive_time} = $self->{create_time} = $now;
|
$self->{alive_time} = $self->{create_time} = $now;
|
||||||
|
|
||||||
if ($now - 15 > $last_cleanup) {
|
|
||||||
$last_cleanup = $now;
|
|
||||||
_do_cleanup($now);
|
|
||||||
}
|
|
||||||
|
|
||||||
return $self;
|
return $self;
|
||||||
}
|
}
|
||||||
|
|
||||||
sub ticker {
|
|
||||||
my Danga::TimeoutSocket $self = shift;
|
|
||||||
|
|
||||||
my $now = time;
|
|
||||||
|
|
||||||
if ($now - 15 > $last_cleanup) {
|
|
||||||
$last_cleanup = $now;
|
|
||||||
_do_cleanup($now);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
# overload these in a subclass
|
# overload these in a subclass
|
||||||
sub max_idle_time { 0 }
|
sub max_idle_time { 0 }
|
||||||
sub max_connect_time { 0 }
|
sub max_connect_time { 0 }
|
||||||
|
|
||||||
sub _do_cleanup {
|
sub _do_cleanup {
|
||||||
my $now = shift;
|
my $now = time;
|
||||||
|
|
||||||
|
Danga::Socket->AddTimer(15, \&_do_cleanup);
|
||||||
|
|
||||||
my $sf = __PACKAGE__->get_sock_ref;
|
my $sf = __PACKAGE__->get_sock_ref;
|
||||||
|
|
||||||
my %max_age; # classname -> max age (0 means forever)
|
my %max_age; # classname -> max age (0 means forever)
|
||||||
|
Loading…
Reference in New Issue
Block a user