Einführung in Event:: [Vortrag]

(c) 2000 Marc Lehmann <pcg@goof.com>
1 April 2000


Table of Contents


Abstract

Wenn viele Jobs parallel ausgeführt werden sollen, eignet sich das bekannte fork-Paradigma von Unix nicht mehr: Die Interprozeßkommunikation und der Mehraufwand an Speicher und Ressourcen überwiegt dir Vorteile der einfacheren Programmstruktur bei weitem. Diese kurze Einführung in die Ereignis-gesteuerte Programmierung in Perl zeigt an einem konkreten Beispiel (News-Scanner), wie einfach sich selbst komplexe Strukturen in Perl realisieren lassen.


1. Event in der Praxis --- oder wie man 500 Newsserver gleichzeitig scannt.

1.1. Ereignis-gesteuerte Programmierung?

Zur Lösung paralleler ablaufender Prozesse sind heute drei Ansätze gebräuchlich:

Jeder dieser Ansätze hat verschiedene Vor- und Nachteile: Das fork-Modell ist sehr einfach zu programmieren und eignet sich besonders für einfache Probleme, die sozusagen in kleine "Stückzahlen" anfallen. Durch die Abschottung der Prozesse wird eine einfach Parallelisierung möglich, da die Prozesse (z.B.) auf unterschiedlichen Rechnern arbeiten können. Größter Nachteil ist die relative aufwendige Interprozeßkommunikation, die einen großen Overhead nach sich ziehen kann.

Threads werden vielfach als das Mittel der Wahl angesehen. Der größte Vorteil von Threads ist das Vorhandensein mehrere Ablauf-Instanzen, die getrennt blockieren können. Leider werden Threads in den meisten Fällen nur dazu mißbraucht, das Blockieren des gesamten Prozesses zu verhindern (z.B. wenn Daten nicht sofort zur Verfügung stehen), werden also effektiv nur als Krücke für asynchrone-EA verwendet. Diesen Vorteil erkauft man sich durch eine zwar schnelle aber dafür extrem komplizierte Synchronisation innerhalb der Threads. Threads sind in in den seltensten Fällen die richtige Wahl für ein Problem.

Ereignis-gesteuerte Programmierung beruht auf dem Callback-Prinzip: Eine zentrale Anlaufstelle innerhalb des Prozesses wartet auf Ereignisse (engl. "Events", also z.B. "Daten angekommen", "Zeit abgelaufen" etc...). Je nach Ereignis werden entsprechende Callback-Funktionen aufgerufen. Der Vorteil dieses Ansatzes ist eine übersichtliche Programmstruktur, eine extreme schnelle Kommunikation (nur ein Prozeß) und ein ressourcenschonendes Endprodukt. Auch dieser Ansatz hat seine Nachteile. Der größte ist, daß man bei vielen Problemen "Umdenken" muß, da sich Callbacks eben keine lineare Programmstruktur verwirklichen läßt (Closures können dabei jedoch helfen). Außerdem muß man sich bewußt sein, das ein blockierender Funktionsaufruf (z.B. read) das gesamte Programm anhält.

1.2. Das Problem...

...ist oberflächlich betrachtet, recht einfach: Eine (kleine) Menge von Usenet-Servern soll nach Newsgruppen abgesucht werden. Das kann auf faire Weise geschehen: man öffnet eine NNTP-Verbindung und schickt Requests. Dies läßt sich durch Pipelining (senden mehrere Befehle gleichzeitig) beschleunigen. Durch die Zeiten, die der News-Server benötigt um Artikel zu suchen, wird die Datenrate in der Praxis allerdings drastisch beschränkt.

Also die unfaire Weise: statt einer öffnet man 5, 10 oder gleich mehrere hundert Verbindungen zu einem (oder mehreren) Servern und verteilt so die Verbindungslatenz und die Antwortzeit.

1.3. Die Planung

Die (für mich) naheligende Idee, dies mit mehreren Scanprozessen zu implementieren, scheiterte an zwei Problemen:

Die Lösung (klar!) lag im Event-Modul. Da alle Verbindungen von einem Prozeß bearbeitet werden, gibt es keine Synchronisationsprobleme. Der Overhead pro Verbindung beschränkt sich ebenfalls auf einen Hash, und das umschalten von Prozessen entfällt ebenfalls (schneller).


2. Die Implementation

Die folgenden Abschnitte stellen die wichtigsten "Knotenpunkte" des Scanprogrammes vor. Jedesmal wird kurz das Problem erläutert und die Lösung mit Hilfe des Event-Moduls diskutiert.

2.1. Der "Scheduler"

Der komplizierteste Teil des Programmes ist der Scheduler: Er verteilt einzelne Jobs auf die Scanner, bzw. beendet das Programm, wenn alle Jobs abgearbeitet wurden. Es gibt nur zwei Typen von "Jobs":

Ein "Scanner" ist dabei kein Prozeß, sondern nur eine Instanz der Scanner-Klasse, in der im wesentlichen der Zustand einer Verbindung gespeichert wird (Server, Port, aktuelle Gruppe...). Für jede potentielle Verbindung wird ein solches Objekt erzeugt. Für hundert Verbindungen sieht das z.B. so aus:

new Scanner for 1..100;

Die Objekte reihen sich automatisch in die idle-Warteschlange ein.

Beim Programmstart werden alle Server- und Gruppen aus einer Datei gelesen und in die Job-Warteschlange eingefügt. Dann wird in die Hauptschleife gesprungen:

Scanner::loop();        # Hauptschleife
sub loop 
   while (@queue || @idle < $scanners) {
      runq;
      Event::loop;
   }

Dabei stehen die zu bearbeitenden Jobs in @queue und die verfügbaren Scanner-Objekte in @idle. Solange noch Jobs vorhanden sind (@queue != 0) und nicht alle ($scanners) Scanner idlen, wird runq aufgerufen und in die Hauptschleife von Event gesprungen.

runq (das steht für "run queue") nimmt Jobs aus der Warteschlange und teilt sie verfügbaren Scannern zu. Der Algorithmus ist sehr primitiv (FCFS) und könnte wesentlich verbessert werden. Wichtig ist, daß die Lastverteilung in diesen wenigen Zeilen stattfindet und sehr gut lokalisiert und damit sehr einfach änderbar ist.

sub runq 
   while (@queue && @idle) {
      my $c = pop @queue;
      my $s = pop @idle;
      $s->run(@$c);
   }
   Event::unloop_all unless @queue || @idle < $scanners;

Der Aufruf von unloop_all beendet alle Event-Schleifen, wenn alle Jobs abgearbeitet wurden.

2.2. Job Management & Rescheduling

Um neue Jobs in das System einzufügen, gibt die Funktion add_job:

sub add_job 
   push @queue, [@_];
   $reschedule->start if @idle;

Die wichtigste Teil ist der Aufruf von $reschedule->start: Wenn ein Scanner verfügbar ist (@idle nicht leer ist), muß der Scheduler aufgerufen werden. Da der Aufruf von add_job sehr häufig ist, und der Scheduler (loop) eine Rekursion bedeutet, wird er nicht direkt aufgerufen, sondern nur, wenn sonst keine Ereignisse anliegen. Dies wird mit einem idle-Event-Handler erreicht, der in der globalen Variable $reschedule steht:

my $reschedule = Event->idle(
   desc => "reschedule hook",
   max => 5,
   cb => sub 
      $_[0]->w->stop;
      Event::unloop;
   
);
$reschedule->stop;

Event->idle ist der Konstruktor, der einen Ereignis-Handler vom Typ "idle" erzeugt. Die einzelnen Attribute bedeuten:

Attribut Beschreibung
desc Eine Beschreibung, z.B. für das NetServer::ProcessTop-Modul.
max Zeit (in Sekunden) nach dem der Callback auf jeden Fall ausgeführt wird.
cb Die Callback-Funktion, die aufgerufen wird.

Übertragen auf den $rescheduler bedeutet dies, daß aus der Event-Schleife gesprungen wird, wenn gerade kein Datentransfer oder sonstige Aufgaben anliegen, oder nach fünf Sekunden, je nachdem, was früher eintrit. Diese Einschränkung verhindert, das ein schnell eintreffender Artikel den gesamten Prozeß "am Laufen hält" und damit verhindert, das freie (idle) Scanner nicht mit neuen Jobs versorgt werden.

Wenn der Callback angesprungen wird, bekommt er ein Ereignis-Objekt übergeben (unter X entspricht dies einem XEvent, bei Gtk ist es ein Gdk::Event). Als erstes sucht er über dieses Ereignis-Objekt (in $_[0]) den ursprünglichen Watcher ($_[0]->w, "w" steht für "watcher") und ruft die stop-Methode auf. Damit wird erreicht, daß der Callback nicht mehr aufgerufen wird, bis er das nächste mal gestartet wird (z.B. in add_job). $_[0]->w->stop ist übrigens das gleiche wie $rescheduler->stop, die Variable $rescheduler ist wegen my jedoch erst nach dem Aufruf des Konstruktors sichtbar.

Das zweite (und wichtigste) was der Callback unternimmt, ist, den eigentlichen Scheduler wieder anzuspringen loop. In loop wurde die Hauptschleife des Event-Moduls aufgerufen (Event::loop): unloop ist das Gegenstück dazu und springt aus dieser Schleife heraus, so daß der Scheduler neue Jobs verteilen kann.

2.2.1. Beendigung eines Jobs

Wenn ein Scanner-Objekt einen Job verarbeitet hat, muß es sich wieder in die @idle-Queue eintragen:

sub idle 
   my $self = shift;
   push @idle, $self;
   $reschedule->start;

der Aufbau gleicht add_job.

2.3. Die Jobschleife

Für die Abarbeitung der Jobs ist die Methode run zuständig. Sie hat mindestens drei Parameter: self (das Scanner-Objekt), host (der NNTP-Server, inkl. Port) und cmd (der Jobtyp).

Da das NNTP-Protokoll "stateful" ist, muß der aktuelle NNTP-Server und die aktuelle Gruppe gespeichert werden. Gilt der neue Job für denselben Rechner und dieselbe Gruppe (der Normalfall) passiert nichts, ansonsten wird die Verbindung zum NNTP-Server neu aufgebaut, bzw. die Gruppe gewechselt.

Das Aufbauen der NNTP-Verbindung ist ein Problem für den Event-Ansatz: ein connect-Aufruf blockiert den Prozeß, bis entweder die Verbindung steht oder ein Fehler passiert. Da ein solcher connect einige Sekunden benötigen kann (bei Netzwerkproblemen auch wesentlich länger), müssen sog. "non-blocking-calls" verwendet werden.

Das ist auch der Grund, weshalb das Programm auf Standardmodule wie IO::Socket oder Net::NNTP verzichten muß: Unterstützung für nicht-blockierende Aufrufe ist kaum oder überhaupt nicht vorhanden. Das Net::NNTP-Modul ist in dieser Hinsicht besonders schlecht, dnen man kann die entsprechende Methoden nicht einfach in einer Subklasse überschreiben.

Der schwierigste Teil war der Aufruf von connect, der ebenfalls nicht blockieren sollte:

if (socket $fd, PF_INET, SOCK_STREAM, getprotobyname 'tcp') 
   sub TCP_NODELAY(){1} sub SOL_TCP(){6}; # linux-2.2
   setsockopt $fd, SOL_TCP, TCP_NODELAY, 1;
   fcntl $fd, F_SETFL, O_NONBLOCK;
   connect $fd, sockaddr_in $port, inet_aton($ip);
   fcntl $fd, F_SETFL, 0;
 else 
   undef $fd;

Einige Konstanten (z.B. SOL_TCP) sind in Perl nicht einfach zu bekommen. Da das Script mehr ein Hack als eine professionelle Anwendung ist, wurden sie einfach hardcodiert.

Wenn der Server gewechselt wird, wechselt auch der Filehandle, so daß eine neuer Event-Watcher erzeugt werden muß:

($self->w = Event->io(fd => fileno $fd, poll => 'r'))->stop;

2.4. NNTP-Befehle

Das NNTP-Protokoll ist sehr einfach: Kommandos bestehen aus einer einzelnen Textzeile, Antworten aus einem Zifferncode und einer beschreibenden Textzeile. Artikel werden als Textblock übertragen, wobei die letzte Zeile einen einzelnen Punkt als Endekennung enthält.

Das Absetzen eines Befehls geschieht über die Methode rcb. Ihr werden zwei Argumente übergeben, das Kommando (ohne Zeilenende) und eine Callback-Funktion. Das Kommando wird an den NNTP-Server geschickt, die Callback-Funktion wird aufgerufen, wenn die erste Zeile der Antwort angekommen ist (mit dem Statuscode).

Dies wird erreicht, indem der Event-Watcher für die NNTP-Verbindung ($self-w>) gefüttert und gestartet wird:

sub rcb 
   my $self = shift;
   my $cmd = shift;
   my $cb = shift;
   if ($cmd) {
      $self->command($cmd);
   } else {
      $cmd = "<anonymous command>";
   }

   $self->{w}->desc($cmd);
   $self->{w}->cb(sub {
      $self->{w}->stop;
      $cb->($self);
   });
   $self->{w}->start;

Falls eine Befehl ($cmd) übergeben wurde, wird dieser über die Leitung gepustet ($self-command>) und als beschreibender Text verwendet. Mit desc wird diese Beschreibung gesetzt (hilfreich zum Debuggen oder Tollfühlen, wenn es hinterher funktioniert).

Dann wird der Callback (cb) gesetzt, der lediglich den Watcher stoppt (Befehle sind einmalige Angelegenheiten) und die eigentliche Callback-Funktion aufruft, und schließlich wird der Watcher gestartet.

2.5. Lesen der Antwort

Der schwierigste Teil des Skriptes ist das zeilenweise Lesen, das vom NNTP-Protokoll vorausgesetzt wird. Da Perl von sich aus (noch) keinerlei Support dafür anbietet (<> blockiert den Prozeß oder liefert keine ganzen Zeilen zurück), mußte das Zusammensetzen der Zeilen selbst implementiert werden.

Grundlage dafür ist die Methode refill, die alle Zeichen liest, die angekommen sind (ohne zu blockieren) und sie in einem Puffer ablegt:

sub refill 
   my $self = shift;
   my $wait = shift;
   my $fd = $self->{fd};
   fcntl $fd, F_SETFL, O_NONBLOCK;
   for(;;) {
      my $r = sysread $fd, $self->{buff}, 32768, length $self->{buff};
      if ($r>0) {
         last;
      } elsif (!defined $r && $! == EAGAIN) {
         last unless $wait;
         $self->{w}->cb(sub { $self->{w}->stop; Event::unloop });
         $self->{w}->start;
         Event::loop();
      } else {
         $self->{buff} = "500 I/O error: $!\015\012.\015\012";
         delete $self->{host};
         last;
      }
   }
   fcntl $fd, F_SETFL, 0;

Das Argument $wait bestimmt, ob auf jeden Fall gewartet werden soll, oder ob refill zurückkehren soll, auch wenn keine neuen Daten verfügbar sind. Letzteres ist außerst selten der Fall und wurde entsprechend ineffizient implementiert, indem ein "leerer" Watcher gestartet wird und dann auf dessen Unloop gewartet wird.

Als nächstes in der Hierarchy steht getline, das einfach die nächste Zeile liefert, notfalls durch Warten:

sub getline 
   my $self = shift;
   $self->refill(1) while $self->{buff} !~ s/^([^\015\012]*)\015\012//o;
   $1;

Sie ist sehr einfach: gibt es schon eine ganze Zeile im Puffer, dann schneide sie heraus und gib sie zurück. Nicht sehr effizient, aber einfach zu benutzen.

Sie wird benutzt von response, wo die Zeile in ihre beiden Kompomenten (Statuscode, Meldung) zerlegt wird, und die erste Ziffer des Statuscodes zurückgegeben wird (der für das weitere Vorgehen am entscheidensten ist).

sub response 
   my $self = shift;
   @{$self}{'code','message'} = split m/ /, $self->getline, 2;
   substr $self->{code}, 0, 1;

2.6. Scannen einer Gruppe

Um herauszufinden, welche Artikel seit dem letzten Mal neu hinzugekommen sind, wird die Statusmeldung ausgewertet, die der Server beim Wechsel in eine Gruppe liefert:

BEFEHL   GROUP comp.lang.perl.moderated
ANTWORT  211 125 4886 5010 comp.lang.perl.moderated group selected

211 ist der Statuscode für "O.K.", 125 ist die Zahl der Artikel, 4886 ist die erste und 5010 die letzte Artikelnummer.

Dies ist eine ideale Anwendung für rcb:

$self->rcb("GROUP $group", sub 
   if ($self->response == 2 && $self->{message} =~ /(\d+)\s+(\d+)\s+(\d+)/) {
      my($count, $first, $last, $name) = ($1, $2, $3, $3);
      if ($count) {
         $self->slog("selected group $group");
         $self->{group} = $group;
         $self->{first} = $first;
         $self->{last} = $last;
         $cb->($self);
         return;
      } else {
         $self->slog("SKIPPED empty group $group: ", substr($self->{message},0,-1));
      }
   } else {
      $self->slog("SKIPPED bogus group $group on ".$self->{host}[0].": ", substr($self->{message},0,-1));
   }
   $self->idle;
);

rcb bekommt zwei Argumente übergeben: "GROUP $group" ist das NNTP-Kommando zum Wechseln der (News-) Gruppe, das zweite Argument ist die Callback-Funktion, die die NNTP-Anwtort als Argument bekommt.

Die Verwendung einer Closure erlaubt es, Befehl (rcb) und die Reaktion (das sub ) direkt hintereinander zu schreiben, so, als wäre rcb ein "normaler", blockierender Aufruf zum Lesen einer Zeile, mit dem einzigen Unterschied, daß die Auswertung des Ergebnisses im einem eingerückten Block stattfindent. Anders gesagt, aus:

$response = $self->rcb("GROUP $group");

if ($response....) 

wird:

$self->rcb(GROUP $group", sub 

   if ($response....) {
   }
);

rcb kehrt jedoch sofort zurück (ein sub, daß rcb verwendet, kann deshalb nicht sofort ein Resultat an den Aufrufer zurückliefern.

Die Information über die Gruppe (first und last, wird aus der NNTP-Antwort genommen) wird später mit den Daten aus der SQL-Datenbank verglichen (das hat allerdings nichts mit Event zu tun):

sub group_scan 
   my $self = shift;
   my $group = $self->{group};
   my $todo = new Set::IntSpan $self->{first}."-".$self->{last};
   $todo = $todo->intersect($self->gs_done->complement);
   if ($todo->empty) {
      $self->slog("[no new articles in $group]");
   } else {
      $self->slog("scanning group $group: ", $todo->run_list);
      add_job($self->{host},'A',$group,$todo);
   }
   $self->idle;

Das Set::IntSpan-Modul wird dazu benutzt, um aus der Menge der vorhandenen Artikel die bereits gescannten (die von gs_done zurückgegeben werden) zu entfernen. Ist die resultierende Menge nicht leer, wird ein neuer Job ("hole alle diese Artikel") erzeugt.

2.7. Holen eines Artikels

Das Holen geschieht in zwei Stufen. Zuerst wird die Message-Id mit einem STAT-Befehl ausgewertet. Damit wird außerdem festgestellt, ob ein bestimmter Artikel überhaupt existiert.

$self->rcb("STAT ".$self->num, \&got_stat);

Ein Protokollbeispiel:

BEFEHL  STAT 5010
ANTWORT 223 5010 <85j7jc$68n@junior.apk.net> article retrieved - request text separately
BEFEHL  STAT 4977
ANTWORT 430 No such article: 4977

Der Callback got_stat wertet diese Information aus:

sub got_stat 
   my $self = shift;
   my $r = $self->response;
   $self->mark_article_done;

   ($self->{mid}) = $self->{message} =~ /<([^>]+)>/g;

   if ($r == 2) {
      my $aid = sql_fetch("select count(*) from art where mid=? limit 1", "".$self->{mid});
      $self->mark_article_present;
      if ($aid) {
         sql_exec("replace into lnk values (?,?)", $self->gid, $aid);
         $self->idle;
      } else {
         $busy{$self->{mid}}++;
         $stat_article++;
         $self->rcb_dot("ARTICLE ".$self->{num}, \&got_article);
      }
   } else {
      $self->idle;
   }

Existiert der Artikel nicht, ist der Job beendet und es wird in den idle-Modus gegangen. Wurde er schon einmal geholt (z.B. in einer anderen Gruppe) wird er nicht noch einmal geholt, sondern lediglich in die Gruppe "gelinkt" (Artikel können sehr groß werden).

Ansonsten wird ein ARTICLE-Befehl abgesetzt, mit dem der gesamte Artikel geholt wird.

BEFEHL  ARTICLE 5010
ANTWORT 220 5010 <85j7jc$68n@junior.apk.net> article retrieved - text follows
ANTWORT From: allbery@apk.net (Brandon S. Allbery KF8NH)
ANTWORT Newsgroups: comp.lang.perl.moderated
ANTWORT Subject: Re: Usefulness of Pseudo Hashes
ANTWORT Message-ID: <85j7jc$68n@junior.apk.net>
ANTWORT
ANTWORT Also sprach Alex Rhomberg <rhomberg@ife.ee.ethz.ch> (<384E39B8.D8635949@ife.ee.ethz.ch>):
ANTWORT +-----
ANTWORT | I wonder why pseudo hashes were invented
ANTWORT +--->8
ANTWORT
ANTWORT Sometimes you need an ordered list (so you can't use hashes) with keyed access
ANTWORT to the list (so lists/arrays are slow and a pain in the butt to use).  Pseudo
ANTWORT hashes are a better solution than the usual hack of maintaining duplicate
ANTWORT information in a hash and an array/list.
ANTWORT
ANTWORT --
ANTWORT brandon s. allbery      [os/2][linux][solaris][japh]     allbery@kf8nh.apk.net
ANTWORT system administrator         [WAY too many hats]           allbery@ece.cmu.edu
ANTWORT carnegie mellon / electrical and computer engineering                    KF8NH
ANTWORT                           Kiss my bits, Billy-boy.
ANTWORT .

Hierbei tritt das Problem auf, daß nach der Statuszeile ein Artikel folgt. Deshalb wird statt rcb die Methode rcb_dot benutzt (das steht für "read callback + data read until dot"):

sub rcb_dot 
   my $self = shift;
   my $cmd = shift;
   $self->{rcb_cb} = shift;
   delete $self->{body};
   $self->rcb($cmd, sub {
      if ($self->response == 2) {
         $self->{w}->cb([$self, 'rcb_cb']);
         $self->{w}->start;
         $self->rcb_cb;
      } else {
         $self->{rcb_cb}->($self);
      }
   });


sub rcb_cb 
   my $self = shift;
   $self->refill(0);
   if ($self->{buff} =~ s/^\.\015\012|^(.*?)\015\012\.\015\012//s) {
      $self->{body} .= $1;
      $self->{w}->stop;
      $self->{body} =~ s/\015\012/\n/g;
      $self->{rcb_cb}->($self, delete $self->{body});
   } elsif ($self->{buff} =~ s/^(.*\015\012)//s) {
      $self->{body} .= $1;
   }

Der komplizierteste Teil ist rcb_cb, in der die Artikeldaten akkumuliert werden, wozu furchtbare regexes benutzt wurden. Im Gegensatz zu vielen anderen Stellen wurden die Callbacks nicht durch Closures implementiert, da Event+Closures im allgemeinen ein großes Memory-Leak ist (soll ab Event-0.59 besser sein, aber man kann sichs nicht immer ausssuchen).

2.8. Updaten von SQL-Tabellen

Die Aufrufe mark_article_done und mark_article_present markieren einen Artikel in der Datenbank als bearbeitet bzw. vorhanden. Sie setzen einfach ein Element in der entsprechenden Set::IntSpan-Menge.

Diese Mengen werden in einer SQL-Tabelle gespeichert. Da sie relativ groß sind (einige Kilobytes), serh häufig geändert werden (bis zu 100 mal pro Sekunde) und der Zielrechner sehr langsam ist, sollten die Tabellen nicht bei jeder Änderung gespeichert werden. Dies wird mit einem idle-Watcher erreicht, der jedesmal gestartet wird, wenn sich die Daten ändern:

my $save_gs = Event->idle(
                     desc => "groupstatus saver",
                     max => 60,
                     cb => sub 
                        $_[0]->w->stop;
                        # zurückschreiben der Tabellen
                     
                 );
$save_gs->stop;

sub mark_article_done 
   my $self = shift;
   $gs{$self->hid,$self->gid}[0]->insert($self->{num});
   $save_gs->start;

Sollte der Draht so richtig dampfen, sorgt der Timeout von 60 Sekunden dafür, daß bei einem Absturz maximal die letzte Minute fehlt. In der Praxis wird er viel häufiger aufgerufen, nämlich dann, wenn alle einkommenden Verbindungen einmal bedient wurden und noch keine weiteren Daten angekommen sind.

2.9. Künstliche "Lastsimulation"

Da der Test-Server auf der lokalen Maschine lief, mußte künstlich Last erzeugt werden, um einigermaßen wirklichkeitsnahe Ergebnisse zu erhalten. Die größten Zeitfaktoren bei NNTP sind die Latenz zum Server (abhängig von der Entfernung) und die Bandbreite.

Um eine künstliche Latenz einzuführen, wird die command-Funktion leicht abgeändert:

sub command 
   my ($self, $cmd) = @_;
   Event->timer(after => rand, cb => sub {
      $_[0]->w->cancel;
      syswrite $self->{fd}, "$cmd\015\012";
   });

Statt das Kommando sofort zu verschicken, wird ein kurzer Timer gestartet. Die Verzögerung liegt zwischen 0 und 1 Sekunde (rand) und sorgt für eine Streuung. Ohne diese zufällige Verzögerung würde ein unerwünschtes Bearbietungsmuster entstehen, bei dem effektiv nur ein Scan-Vorgang gleichzeitig stattfindet.

Die obige Version von command schneidet in ihrer Kürze recht gut gegen die "normale" Version ab:

sub command 
   my ($self, $cmd) = @_;
   syswrite $self->{fd}, "$cmd\015\012";

2.10. NetServer::ProcessTop

Ein recht interessantes Modul ist NetServer::ProcessTop. Wird es benutzt, bindet es sich auf einen TCP-Port, den man per telnet ansprechen kann, um ein top-artiges Listing der Event-Watcher zu bekommen, Außerdem kann man die Watcher edieren.

Die Benutzung ist denkbar einfach:

eval 
   require NetServer::ProcessTop;
   NetServer::ProcessTop->new(7000);
;

Ein telnet localhost 7000 erzeugt dann dieses Bild:

get PID=3407 @ cerebro                                       | 14:26:46 [  60s]
10 events; load averages: 0.75, 0.73, 0.00; lag  0%

  EID PRI STATE   RAN  TIME   CPU TYPE DESCRIPTION                           P1
    0   7         912  0:00 26.6%  sys idle
    3   4 zomb    227  0:00 16.9%   io ARTICLE 273573
    6   4 zomb    236  0:00 16.6%   io ARTICLE 273572
    4   4 sleep   232  0:00 16.4%   io ARTICLE 273575
    5   4 sleep   221  0:00 16.0%   io ARTICLE 273574
    9   4 wait    117  0:00  7.3% idle groupstatus saver
   10   4 wait    180  0:00  0.3% idle reschedule hook
    2   3 sleep     1  0:00  0.0% time Event::Stats
    1   3 cpu       0  0:00  0.0%   io NetServer::ProcessTop::Client localhost
    7   3 sleep     0  0:00  0.0%   io NetServer::ProcessTop
    8   4 sleep     0  0:00  0.0%   io user input
    0  -1           0  0:00  0.0%  sys other processes

%

Weil das Modul aber ein potentielles Sicherheitsproblem sein kann, sollte es nur zum Debuggen/Erfreuen verwendet werden.


A. Der Quellcode

#!/usr/app/bin/perl

# this goody scans newsgroups on any number of servers
# using more than one connection

# (c)1999 Marc Alexander Lehmann <pcg@goof.com>

$::scanners = 4; # the number of scanner "processes" to use
$::max_data = 1e6 * 30;

package Scanner;

use Event;
use Socket;
use Fcntl;
use Errno qw(EAGAIN);
use Set::IntSpan;
BEGIN  eval "use Time::HiRes 'time'" 

use sex_lib;

my $scanners;
my @idle;
my @queue;

# statistics
my $stat_article =
my $stat_stat =
my $stat_bread = 0;
my $stat_start = time;

# cmds
# S group     scan the newsgroup
# A group anum        scan the article

# utliity methods and functions

sub slog 
   my $self = shift;
   printf "%2d [%2d,%2d]: ", $self->{identifier}, scalar@queue, scalar@idle;
   print @_, "\n";


my $reschedule = Event->idle(
   desc => "reschedule hook",
   max => 5,
   cb => sub 
      $_[0]->w->stop;
      Event::unloop;
   
);
$reschedule->stop;

sub runq 
   if ($stat_bread > $::max_data) {
      print "max data size exceeded, stopping...\n";
      @queue = ();
   }
   while (@queue && @idle) {
      my $c = pop @queue;
      my $s = pop @idle;
      #printf "JJJ%d @$c\n", $s->{identifier};#d#
      $s->run(@$c);
   }
   Event::unloop_all unless @queue || @idle < $scanners;


sub loop 
   while (@queue || @idle < $scanners) {
      #printf "main loop %d < %d\n", scalar @queue, scalar @idle;
      runq;
      Event::loop;
   }


sub idle 
   my $self = shift;
   push @idle, $self;
   $reschedule->start;


sub add_job 
   push @queue, [@_];
   #print "adding job [@_]\n";
   $reschedule->start if @idle;


sub shuffle_jobs 
   srand time;
   for my $i (0..$#queue) {
      my $j = $#queue - int rand $i;
      my $d = $queue[$i]; $queue[$i]=$queue[$j]; $queue[$j]=$d;
   }


# I/O handling -- Net::NNTP is too dumb and too slow

sub command 
   my ($self, $cmd) = @_;
   if (1) {
      syswrite $self->{fd}, "$cmd\015\012";
   } else {
      # simulate light load
      Event->timer(after => rand, cb => sub {
         $_[0]->w->cancel;
         syswrite $self->{fd}, "$cmd\015\012";
      });
   }


sub refill 
   my $self = shift;
   my $wait = shift;
   my $fd = $self->{fd};
   fcntl $fd, F_SETFL, O_NONBLOCK;
   for(;;) {
      my $r = sysread $fd, $self->{buff}, 32768, length $self->{buff};
      if ($r>0) {
         last;
      } elsif (!defined $r && $! == EAGAIN) {
         last unless $wait;
         $self->{w}->cb(sub { $self->{w}->stop; Event::unloop });
         $self->{w}->start;
         Event::loop();
      } else {
         $self->{buff} = "500 I/O error: $!\015\012.\015\012";
         delete $self->{host};
         last;
      }
   }
   fcntl $fd, F_SETFL, 0;


sub getline 
   my $self = shift;
   $self->refill(1) while $self->{buff} !~ s/^([^\015\012]*)\015\012//o;
   $1;


sub response 
   my $self = shift;
   @{$self}{'code','message'} = split m/ /, $self->getline, 2;
   substr $self->{code}, 0, 1;


sub rcb 
   my $self = shift;
   my $cmd = shift;
   my $cb = shift;
   if ($cmd) {
      $self->command($cmd);
   } else {
      $cmd = "<anonymous command>";
   }
   if ($self->{buff} =~ /\015\012/) {
      # cannot not happen normally, as this would indicate that
      # we already had a reply to a command we just send, before
      # we actually sent it.
      $cb->($self);
   } else {
      $self->{w}->desc($cmd);
      $self->{w}->cb(sub {
         $self->{w}->stop;
         $cb->($self);
      });
      $self->{w}->start;
   }


sub rcb_cb 
   my $self = shift;
   $self->refill(0);
   if ($self->{buff} =~ s/^\.\015\012|^(.*?)\015\012\.\015\012//s) {
      $self->{body} .= $1;
      $self->{w}->stop;
      $self->{body} =~ s/\015\012/\n/g;
      $self->{rcb_cb}->($self, delete $self->{body});
   } elsif ($self->{buff} =~ s/^(.*\015\012)//s) {
      $self->{body} .= $1;
   }


sub rcb_dot 
   my $self = shift;
   my $cmd = shift;
   $self->{rcb_cb} = shift;
   delete $self->{body};
   $self->rcb($cmd, sub {
      if ($self->response == 2) {
         $self->{w}->cb([$self, 'rcb_cb']);
         $self->{w}->start;
         $self->rcb_cb;
      } else {
         $self->{rcb_cb}->($self);
      }
   });


# main state machine and program logic

sub hid 
   my $self = shift;
   my $hid = $hid{$self->{host}[0]} || sql_fetch("select hid from host where name=?", $self->{host}[0]);
   unless (defined $hid) {
      sql_exec("insert into host (name) values (?)", $self->{host}[0]);
      $hid = sql_insertid;
   }
   $hid{$self->{host}[0]} = $hid;


sub gid 
   my $self = shift;
   my $gid = $gid{$self->{group}} || sql_fetch("select gid from grp where name=?", $self->{group});
   unless (defined $gid) {
      sql_exec("insert into grp (name) values (?)", $self->{group});
      $gid = sql_insertid;
   }
   $gid{$self->{group}} = $gid;


my %gs;
my %save_gs;
my $save_gs = Event->idle(
                     desc => "groupstatus saver",
                     max => 60,
                     cb => sub 
                        $_[0]->w->stop;
                        while (my($k,$self)=each %save_gs) {
                           my($hid,$gid)=split /\0/, $k;
                           my($d,$p)=@{$gs{$hid,$gid}};
                           #print "\nSGS $d,$p $hid,$gid\n";
                           sql_exec("replace into grpstat values (?,?,?,?)",
                                    $hid, $gid, $d->run_list, $p->run_list);
                        }
                        %save_gs = ();
                     
                 );
$save_gs->stop;

sub gs_done 
   my $self = shift;
   unless (exists $gs{$self->hid,$self->gid}) {
      my ($d, $p) =
         sql_fetch("select done,present from grpstat where hid=? and gid=?",
                   $self->hid, $self->gid);
      $d = new Set::IntSpan $d;
      $p = new Set::IntSpan $p;
      $gs{$self->hid,$self->gid} = [$d, $p];
   }
   $gs{$self->hid,$self->gid}[0];


sub mark_article_done 
   my $self = shift;
   $self->gs_done;
   $gs{$self->hid,$self->gid}[0]->insert($self->{num});
   $save_gs{$self->hid."\0".$self->gid} = 1;
   $save_gs->start;


sub mark_article_present 
   my $self = shift;
   $self->gs_done;
   $gs{$self->hid,$self->gid}[1]->insert($self->{num});
   $save_gs{$self->hid."\0".$self->gid} = 1;
   $save_gs->start;


sub group 
   my ($self, $group, $cb) = @_;
   if ($self->{group} eq $group) {
      $cb->($self);
   } else {
      $self->rcb("GROUP $group", sub {
         if ($self->response == 2 && $self->{message} =~ /(\d+)\s+(\d+)\s+(\d+)/) {
            my($count, $first, $last, $name) = ($1, $2, $3, $3);
            if ($count) {
               $self->slog("selected group $group");
               $self->{group} = $group;
               $self->{first} = $first;
               $self->{last} = $last;
               $cb->($self);
               return;
            } else {
               $self->slog("SKIPPED empty group $group: ", substr($self->{message},0,-1));
            }
         } else {
            $self->slog("SKIPPED bogus group $group on ".$self->{host}[0].": ", substr($self->{message},0,-1));
         }
         $self->idle;
      });
   }


my %busy;

# I had to serialize this into many different subs, otherwise
# we get biiiig memory leaks in anonymous subs

sub got_article 
   my $self = shift;
   if (@_) {
      $stat_bread += length $_[0];
      sql_exec("insert into art (mid,mtime) values (?,?)", $self->{mid}, time);
      my $aid = sql_insertid;
      my $fh = storage_create $aid;
      print $fh $_[0];
      sql_exec("replace into lnk values (?,?)", $self->gid, $aid);
      print "*";
   } else {
      $self->slog($self->{mid}.": STAT yes, but no ARTICLE??");
   }
   delete $busy{$self->{mid}};
   $self->idle;


sub got_stat 
   my $self = shift;
   my $r = $self->response;
   $self->mark_article_done;
   ($self->{mid}) = $self->{message} =~ /<([^>]+)>/g;

   if ($r == 2 && !exists $busy{$self->{mid}}) {
      my $aid = sql_fetch("select count(*) from art where mid=? limit 1", "".$self->{mid});
      $self->mark_article_present;
      if ($aid) {
         print "[-]";
         sql_exec("replace into lnk values (?,?)", $self->gid, $aid);
         $self->idle;
      } else {
         $busy{$self->{mid}}++;
         $stat_article++;
         $self->rcb_dot("ARTICLE ".$self->{num}, \&got_article);
      }
   } else {
      print "-";
      $self->idle;
   }


sub article_get 
   my $self = shift;
   $stat_stat++;
   $self->rcb("STAT ".$self->{num}, \&got_stat);


sub group_scan 
   my $self = shift;
   my $group = $self->{group};
   my $todo = new Set::IntSpan $self->{first}."-".$self->{last};
   $todo = $todo->intersect($self->gs_done->complement);
   if ($todo->empty) {
      $self->slog("[no new articles in $group]");
   } else {
      $self->slog("scanning group $group: ", $todo->run_list);
      add_job($self->{host},'A',$group,$todo);
   }
   $self->idle;


# start a single command action
sub run 
   my ($self,$host,$cmd,@args) = @_;
   if ($self->{host}[0] ne $host->[0]) {
      my ($hostname, $user, $pass) = @$host;
      delete $self->{group};
      eval { $self->{w}->cancel };
      my ($ip,$port) = $hostname =~ /^([^:]+)(?::(\d+))?/g;
      $port ||= 119;
      $self->slog("connecting to $ip ($port)");
      my $fd = $self->{fd} = local *HOST;

      if (socket $fd, PF_INET, SOCK_STREAM, getprotobyname 'tcp') {
         sub TCP_NODELAY(){1} sub SOL_TCP(){6}; # linux-2.2
         setsockopt $fd, SOL_TCP, TCP_NODELAY, 1;
         fcntl $fd, F_SETFL, O_NONBLOCK;
         connect $fd, sockaddr_in $port, inet_aton($ip);
         fcntl $fd, F_SETFL, 0;
      } else {
         undef $fd;
      }

      ($self->{w} = Event->io(fd => fileno $fd, poll => 'r'))->stop;
      if ($fd && $self->response == 2) {
         if ($user) {
            $self->command("AUTHINFO USER $user"); $self->response;
            $self->command("AUTHINFO PASS $pass"); $self->response;
         }
         $self->{host} = $host;
      } else {
         $self->slog("SKIPPED host $host->[0]: $!");
         delete $self->{host};
         $self->idle;
         return;
      }
   }
   if ($cmd eq 'S') {
      my ($group) = @args;
      $self->group($group, \&group_scan);
   } elsif ($cmd eq 'A') {
      my ($group,$todo) = @args;
      $self->{num} = min $todo;
      $todo->remove($self->{num});
      add_job($self->{host},'A',$group,$todo) unless $todo->empty;
      $self->group($group, \&article_get);
   } else {
      die "unknown command $cmd (@args)";
   }


sub new 
   my $class = shift;
   my @cmd = @_;
   my $self = bless {}, $class;
   $self->{identifier} = ++$scanners;
   $self->idle;
   $self;


sub statistics 
   my $time = time-$stat_start;
   print "$stat_stat STATs (~",
         int($stat_stat/$time)/10,
         "/s), $stat_article ARTICLEs, $stat_bread newsbytes read (~",
         int($stat_bread/($time*102.4))/10,"kb/s)\n";


if (-t STDIN) 
   Event->io(fd => 0, poll => "r", desc => "user input", cb => sub {
      my $cmd = <STDIN>;
      print "\nUSER: $cmd";
      statistics;
      if ($cmd =~ /quit/) {
         print "\07flushed queue\n";
         @queue = ();
      }
   });


package main;

# it does so many things, so I called it just "get"

use lib '.';

$|=1;

use Event;

eval 
   require NetServer::ProcessTop;
   NetServer::ProcessTop->new(7000);
;

print "starting $::scanners workers...\n";
new Scanner for 1..$::scanners;

print "feeding seed commands...\n";
my $host;
while (<>) 
   chomp;
   s/\s*#.*$//;
   next unless /\S/;
   if (/^server (\S+?)(?::(\S+?):(\S+?))?$/) {
      push @host, [$1, $2, $3];
   } else {
      for my $host (@host) {
         Scanner::add_job($host, 'S', $_);
      }
   }


#Scanner::shuffle_jobs;

print "looping...\n";
Scanner::loop();
print "done...\n";
Scanner::statistics;

A.1. Mehr!

Die folgenden Module/Programme/RFCs wurden für das Projekt verwendet.