# $Id: Client.pm,v 1.8 2005/02/14 22:06:38 msergeant Exp $

package Danga::Client;
use base 'Danga::TimeoutSocket';
use fields qw(
    line
    pause_count
    read_bytes
    data_bytes
    callback
    get_chunks
    reader_object
    );
use Time::HiRes ();

use bytes;

# 30 seconds max timeout!
sub max_idle_time       { 30 }
sub max_connect_time    { 1200 }

sub new {
    my Danga::Client $self = shift;
    $self = fields::new($self) unless ref $self;
    $self->SUPER::new( @_ );

    $self->reset_for_next_message;
    return $self;
}

sub reset_for_next_message {
    my Danga::Client $self = shift;
    $self->{line} = '';
    $self->{pause_count} = 0;
    $self->{read_bytes} = 0;
    $self->{callback} = undef;
    $self->{reader_object} = undef;
    $self->{data_bytes} = '';
    $self->{get_chunks} = 0;
    return $self;
}

sub get_bytes {
    my Danga::Client $self = shift;
    my ($bytes, $callback) = @_;
    if ($self->{callback}) {
        die "get_bytes/get_chunks currently in progress!";
    }
    $self->{read_bytes} = $bytes;
    $self->{data_bytes} = $self->{line};
    $self->{read_bytes} -= length($self->{data_bytes});
    $self->{line} = '';
    if ($self->{read_bytes} <= 0) {
        if ($self->{read_bytes} < 0) {
            $self->{line} = substr($self->{data_bytes},
                                   $self->{read_bytes}, # negative offset
                                   0 - $self->{read_bytes}, # to end of str
                                   ""); # truncate that substr
        }
        $callback->($self->{data_bytes});
        return;
    }
    $self->{callback} = $callback;
}

sub process_chunk {
    my Danga::Client $self = shift;
    my $callback = shift;

    my $last_crlf = rindex($self->{line}, "\r\n");

    if ($last_crlf != -1) {
        if ($last_crlf + 2 == length($self->{line})) {
            my $data = $self->{line};
            $self->{line} = '';
            $callback->($data);
        }
        else {
            my $data = substr($self->{line}, 0, $last_crlf + 2);
            $self->{line} = substr($self->{line}, $last_crlf + 2);
            $callback->($data);
        }
    }
}

sub get_chunks {
    my Danga::Client $self = shift;
    my ($bytes, $callback) = @_;
    if ($self->{callback}) {
        die "get_bytes/get_chunks currently in progress!";
    }
    $self->{read_bytes} = $bytes;
    $self->process_chunk($callback) if length($self->{line});
    $self->{callback} = $callback;
    $self->{get_chunks} = 1;
}

sub end_get_chunks {
    my Danga::Client $self = shift;
    my $remaining = shift;
    $self->{callback} = undef;
    $self->{get_chunks} = 0;
    if (defined($remaining)) {
        $self->process_read_buf(\$remaining);
    }
}

sub set_reader_object {
    my Danga::Client $self = shift;
    $self->{reader_object} = shift;
}

sub event_read {
    my Danga::Client $self = shift;
    if (my $obj = $self->{reader_object}) {
        $self->{reader_object} = undef;
        $obj->event_read($self);
    }
    elsif ($self->{callback}) {
        $self->{alive_time} = time;
        if ($self->{get_chunks}) {
            my $bref = $self->read($self->{read_bytes});
            return $self->close($!) unless defined $bref;
            $self->{line} .= $$bref;
            $self->process_chunk($self->{callback}) if length($self->{line});
            return;
        }
        if ($self->{read_bytes} > 0) {
            my $bref = $self->read($self->{read_bytes});
            return $self->close($!) unless defined $bref;
            $self->{read_bytes} -= length($$bref);
            $self->{data_bytes} .= $$bref;
        }
        if ($self->{read_bytes} <= 0) {
            # print "Erk, read too much!\n" if $self->{read_bytes} < 0;
            my $cb = $self->{callback};
            $self->{callback} = undef;
            $cb->($self->{data_bytes});
        }
    }
    else {
        my $bref = $self->read(8192);
        return $self->close($!) unless defined $bref;
        $self->process_read_buf($bref);
    }
}

sub process_read_buf {
    my Danga::Client $self = shift;
    my $bref = shift;
    $self->{line} .= $$bref;
    return if $self->{pause_count} || $self->{closed};
    
    if ($self->{line} =~ s/^(.*?\n)//) {
        my $line = $1;
        $self->{alive_time} = time;
        my $resp = $self->process_line($line);
        if ($::DEBUG > 1 and $resp) { print "$$:".($self+0)."S: $_\n" for split(/\n/, $resp) }
        $self->write($resp) if $resp;
        # $self->watch_read(0) if $self->{pause_count};
        return if $self->{pause_count} || $self->{closed};
        # read more in a timer, to give other clients a look in
        $self->AddTimer(0, sub {
            if (length($self->{line}) && !$self->paused) {
                $self->process_read_buf(\""); # " for bad syntax highlighters
            }
        });
    }
}

sub has_data {
    my Danga::Client $self = shift;
    return length($self->{line}) ? 1 : 0;
}

sub clear_data {
    my Danga::Client $self = shift;
    $self->{line} = '';
}

sub paused {
    my Danga::Client $self = shift;
    return 1 if $self->{pause_count};
    return 1 if $self->{closed};
    return 0;
}

sub pause_read {
    my Danga::Client $self = shift;
    $self->{pause_count}++;
    # $self->watch_read(0);
}

sub continue_read {
    my Danga::Client $self = shift;
    $self->{pause_count}--;
    if ($self->{pause_count} <= 0) {
        $self->{pause_count} = 0;
        $self->AddTimer(0, sub {
            if (length($self->{line}) && !$self->paused) {
                $self->process_read_buf(\""); # " for bad syntax highlighters
            }
        });
    }
}

sub process_line {
    my Danga::Client $self = shift;
    return '';
}

sub close {
    my Danga::Client $self = shift;
    print "closing @_\n" if $::DEBUG;
    $self->SUPER::close(@_);
}

sub event_err { my Danga::Client $self = shift; $self->close("Error") }
sub event_hup { my Danga::Client $self = shift; $self->close("Disconnect (HUP)") }

1;