Concurrent programming - Message queues (3)

ArticleCategory:

SoftwareDevelopment

AuthorImage:

[Leonardo]

TranslationInfo:

original in en: Leonardo Giordani

en to de: J�rgen Pohl

AboutTheAuthor:

Vor kurzem erhielt ich mein Diplom von der Fakult�t f�r Telekommunikations - Engineering am Politecnico in Milano. Mein Hauptinteresse liegt im Programmieren (Assembler und C/C++). Seit 1999 arbeite ich fast ausschliesslich mit Linux/Unix.

Abstract:

Das ist der letzte Artikel in der Serie �ber concurrent programming: wir werden den zweiten und letzten Layer unseres Protokolls implementieren, damit erzeugen wir Funktionen, die das Benutzerverhalten auf der Basis des ersten Layers, den wir im vorangegangenen Artikel entwickelten, realisiert.

Es ist sicher auch eine gute Idee, einige der fr�her erschienenen Artikel dieser Serie gelesen zu haben:

ArticleIllustration:

[run in paralell]

ArticleBody:

Protokollausf�hrung - Layer 2 - Allgemeines

Das Programm ipcdemo wurde als einfache Ausf�hrung einer Vermittlungstelle zwischen Benutzern, die versuchen einander Messages zu senden, entwickelt. Um diese Simulation etwas realistischer zu machen, f�gte ich das Konzept "Service" ein: die Hauptaufgabe einer Servicemessage (Signalisierung) ist nicht die Daten�bermittlung von Benutzer zu Benutzer, sondern die Information �ber deren Steuerung. Die Service-Message wird von den Benutzern an die Vermittlungstelle geschickt, um mitzuteilen, dass sie noch vorhanden sind, wie sie erreicht werden k�nnen (�bermitteln der IPC- Queue -ID) und dass sie abbrechen. Zwei weitere Service wurden eingef�hrt: Abbrechen und Timing. Der erste wird von der Vermittlungstelle benutzt, um dem Benutzer mitzuteilen, dass er abbrechen sollte. Der zweite versucht die Reaktionszeit des Benutzers zu messen. Mehr dar�ber sp�ter in den Abschnitten �ber Benutzer und Switch.

Layer 2 enth�lt High-Level-Funktionen zum Senden und Empfangen von Meldungen, um Services anzufordern oder auf Anforderungen anzusprechen, sowie Initialisierungsinformationen: diese Funktionen wurden aus Layer 1-Funktionen aufgebaut, sie sind daher recht einfach zu verstehen. Bitte beachte, dass ich in layer2.h einige Aliase f�r Meldungstypen ( User-Message oder Service-Message) und verschiedene Services (darunter zwei vom Benutzer festzulegende Services zum Experimentieren) deklariere.

Die ipcdemo ist nur ein Beispielcode: er ist nicht optimiert, und wie wir sehen, benutzte ich viele globale Variablen, der Leser sollte sich aber nur auf die IPC-Details und nicht auf den Code konzentrieren. Falls jemand jedoch auf etwas echt Seltsames st��t, bitte an mich schreiben und wir k�nnen das diskutieren.

Implementation der User-Prozesse

Der User-Prozess ist schlicht ein Childprozess der Vermittlungstelle (oder besser, des Parentprozesses, der als Vermittlungstelle handelt). Das bedeutet, der User-Prozess hat alle Variablen initialisiert, genau wie der Switch: er kennt z.B. die id der Switch-Queue, weil sie vom switch selber in einer lokale Variablen, vor der Verzweigung (forking operation), gespeichert wurde.

Wenn der User-Prozess seine Arbeit beginnt, sollte er als erstes eine Queue erstellen und der Vermittlungstelle mitteilen, wie diese zu erreichen ist. Um das zu bewerkstelligen, schickt der User-Prozess zwei Servicemeldungen: SERV_BIRTH und SERV_QID.

/* Initialize queue  */
qid = init_queue(i);
      
/* Let the switch know we are alive */
child_send_birth(i, sw);
      
/* Let the switch know how to reach us */
child_send_qid(i, qid, sw);
Dann wird er in der Hauptschleife wirksam: hier schickt der User-Prozess Meldungen, pr�ft auf eintreffende Meldungen von anderen Usern und ob die Vermittlungstelle einen Service anforderte.

Die Entscheidung, Meldungen zu senden, wird auf Basis der Wahrscheinlichkeit getroffen: die Funktion myrand() liefert eine Zufallszahl, die auf das auszuf�hrende Argument eingestellt ist, in diesem Fall 100. Eine Meldung wird nur gesandt, wenn diese Zahl kleiner als die spezifizierte Wahrscheinlichkeitszahl ist. Da der User-Prozess 1 Sekunde Pause zwischen zwei Schleifendurchl�ufen macht, folgt daraus, dass der User-Prozess so viele Meldungen schickt, wie die wahrscheinliche Anzahl der Send-Meldungen jede 100 Sekunden, das ist wirklich zu wenig... Wir m�ssen also darauf achten, die Wahrscheinlichkeitszahl nicht zu niedrig zu setzen oder unsere Simulation wird ewig dauern.

if(myrand(100) < send_prob){
  dest = 0;

  /* Do not send messages to the switch, to you, */
  /* and to the same receiver of the previous message */
  while((dest == 0) || (dest == i) || (dest == olddest)){
    dest = myrand(childs + 1);
  }
  olddest = dest;

  printf("%d -- U %d -- Message to user %d\n", (int) time(NULL), i, dest);
  child_send_msg(i, dest, 0, sw);
}
Die Meldungen der anderen User sind tats�chlich zur Vermittlungstelle und von dieser an uns gesandt worden; sie sind mit Typ TYPE_CONN (as CONNECTION) gekennzeichnet.
/* Check the incoming box for simple messages */
if(child_get_msg(TYPE_CONN, &in)){
  msg_sender = get_sender(&in);
  msg_data = get_data(&in);
  printf("%d -- U %d -- Message from user %d: %d\n", 
         (int) time(NULL), i, msg_sender, msg_data);
}
Falls die Vermittlungstelle einen Service anfordert, benutzen wir eine Meldung vom Typ TYPE_SERV. Die Servicemeldung m�ssen wir beantworten. Im Fall der Anschlussbeendigung senden wir eine Best�tigung und die Vermittlungsstelle markiert uns als nicht erreichbar. Danach m�ssen wir alle �briggebliebenen Meldungen lesen ( aus H�flichkeit, wir k�nnten auch das �berspringen), die Queue entfernen und uns von der Simulation verabschieden. Die Anforderung des Zeit-Service, die wir zur Vermittlungstelle schicken, ist eine Meldung mit der aktuellen Zeit: die Vermittlungstelle subtrahiert diese von der Zeit, die sie f�r das Verbleiben der Meldung in der Queue protokolliert hat. Wie wir feststellen, f�hren wir auch QoS (Quality of Service) durch - die Simulation ist wahrscheinlich besser als das wirkliche Telefonsystem....
/* Check if the switch requested a service */
if(child_get_msg(TYPE_SERV, &in)){
  msg_service = get_service(&in);

  switch(msg_service){
  case SERV_TERM:
    /* Sorry, we have to terminate */
    /* Send an acknowledgement to the switch */
    child_send_death(i, getpid(), sw);
        
    /* Read the last messages we have in the queue */
    while(child_get_msg(TYPE_CONN, &in)){
      msg_sender = get_sender(&in);
      msg_data = get_data(&in);
      printf("%d -- U %d -- Message from user %d: %d\n", 
             (int) time(NULL), i, msg_sender, msg_data);
    }
        
    /* Remove the queue */
    close_queue(qid);    
    printf("%d -- U %d -- Termination\n", (int) time(NULL), i);
    exit(0);
    break;
  case SERV_TIME:
    /* We have to time our work */
    child_send_time(i, sw);
    printf("%d -- U %d -- Timing\n", (int) time(NULL), i);
    break;
  }
}

Implementation des Vermittlungstellen-Prozesses

Der Parent-Prozess besteht aus zwei Teilen: vor und nach der Erstellung der Child-Prozesse. Im ersten Teil muss er ein Array initiieren, um die Queue-.I.D. seiner Child-Prozesse zu speichern und seine eigene Queue aufzubauen: das ist sicherlich nicht der wahre Weg, um etwas dieser Art einzuf�hren, jedoch w�rde die Einf�hrung von dynamischen Listen in diesem Zusammenhang ausserhalb des Rahmens diese Artikels und damit nicht n�tzlich sein. W�nschten wir eine Entwicklung f�r eine Vielzahl von Verbindungen, sollten wir nicht vergessen, dynamische Strukturen und Speicherallozierung einzusetzen. Die Queue-Identifikatoren werden am Anfang mit dem Wert der Vermittlungstellen-Queue-I.D. initialisiert, d.h. der User ist noch nicht voll da: beendet ein User den Anschluss, wird die Queue-I.D. wieder auf den urspr�nglichen Wert gesetzt.

Im zweiten Teil wirkt der Parentprozess als Vermittlungstelle, der wie der User-Prozess eine Schleife durchl�uft, bis alle User-Anschl�sse beendet sind. Die Vermittlungstelle pr�ft auf eintreffende Meldungen von den Usern und leitet sie zu ihrem Ziel.

/* Check if some user has connected */
if(switch_get_msg(TYPE_CONN, &in)){

  msg_receiver = get_receiver(&in);
  msg_sender = get_sender(&in);
  msg_data = get_data(&in);
      
  /* If the destination is alive */
  if(queues[msg_receiver] != sw){
    
    /* Send a messge to the destination (follow-up the received message) */
    switch_send_msg(msg_sender, msg_data, queues[msg_receiver]);
    
    printf("%d -- S -- Sender: %d -- Destination: %d\n", 
           (int) time(NULL), msg_sender, msg_receiver);
  }
  else{
    /* The destination is not alive */
    printf("%d -- S -- Unreachable destination (Sender: %d - Destination: %d)\n", 
          (int) time(NULL), msg_sender, msg_receiver);
  }
Sendet ein User jedoch eine Meldung durch die Vermittlungsstelle, kann das Gegenstand einer Serviceanforderung auf Wahrscheinlichkeitsbasis sein (Vorgang wie vorgehend beschrieben); im ersten Fall zwingen wir den User abzubrechen, in zweiten Fall beginnen wir einen Timingablauf: Wir protokollieren die aktuelle Zeit und kennzeichnen den User, um zu verhindern, dass wir einen User behandeln, der sich schon in diesem Prozessablauf befindet. Erhalten wir keine Meldungen, ist es m�glich, dass alle User abgeh�ngt sind: in diesem Fall warten wir, bis alle Child-Prozesse wirklich beendet sind (der letzte User k�nnte noch nach verbleibenden Meldungen in seiner Queue pr�fen), entfernen unsere Queue und brechen ab.
  /* Randomly request a service to the sender of the last message */
  if((myrand(100)  < death_prob) && (queues[msg_sender] != sw)){
    switch(myrand(2))
      {
      case 0:
    /* The user must terminate */
    printf("%d -- S -- User %d chosen for termination\n", 
          (int) time(NULL), msg_sender);
    switch_send_term(i, queues[msg_sender]);
    break;
      case 1:
    /* Check if we are already timing that user */
    if(!timing[msg_sender][0]){
      timing[msg_sender][0] = 1;
      timing[msg_sender][1] = (int) time(NULL);
      printf("%d -- S -- User %d chosen for timing...\n", 
            timing[msg_sender][1], msg_sender);
      switch_send_time(queues[msg_sender]);
    }
    break;
      }
  }
}
else{
  if(deadproc == childs){
    /* All childs have been terminated, just wait for the last to complete its last jobs */
    waitpid(pid, &status, 0);

    /* Remove the switch queue */
    remove_queue(sw);

    /* Terminate the program */
    exit(0);
  }
}
Dann schauen wir nach Service-Meldungen: wir k�nnen Meldungen �ber unseren Start, unsere Beendigung, unsere User-I.D. und Auskunft �ber unseren Time-Service erhalten.
if(switch_get_msg(TYPE_SERV, &in)){
  msg_service = get_service(&in);
  msg_sender = get_sender(&in);

  switch(msg_service)
    {
    case SERV_BIRTH:
      /* A new user has connected */
      printf("%d -- S -- Activation of user %d\n", (int) time(NULL), msg_sender);
      break;

    case SERV_DEATH:
      /* The user is terminating */
      printf("%d -- S -- User %d is terminating\n", (int) time(NULL), msg_sender);

      /* Remove its queue from the list */
      queues[msg_sender] = sw;

      /* Remember how many users are dead */
      deadproc++;
    break;

    case SERV_QID:
      /* The user is sending us its queue id */
      msg_data = get_data(&in);
      printf("%d -- S -- Got queue id of user %d: %d\n", 
            (int) time(NULL), msg_sender, msg_data);
      queues[msg_sender] = msg_data;
      break;

    case SERV_TIME:
      msg_data = get_data(&in);

      /* Timing informations */
      timing[msg_sender][1] = msg_data - timing[msg_sender][1];

      printf("%d -- S -- Timing of user %d: %d seconds\n", 
            (int) time(NULL), msg_sender, timing[msg_sender][1]);
      /* The user is no more under time control */
      timing[msg_sender][0] = 0;
      break;
    }
}

Schlussfolgerungen

Wir sind am Ende unserer kleinen Artikelserie �ber concurring programming: nicht alle M�glichkeiten wurden ausgesch�pft, aber wir haben jetzt einen guten Einblick, was hinter dem Begriff IPC steckt und welche Aufgaben man damit l�sen kann. Ich empfehle das kleine Programm, das von mir f�r diesen Artikel entwickelt wurde, zu erweitern. Wie schon erw�hnt, es ist schwierig, Multi-Prozess-Programme zu debuggen, dieses k�nnte jedoch eine sch�ne M�glichkeit sein, um unsere Kenntnisse �ber Debug-Programme zu erweitern (vergessen wir nicht, dass gdb unser bester Freund beim Programmieren ist): in der Liste am Ende dieses Artikels finden wir einige n�tzliche Programme zum Gebrauch beim Programmieren allgemein.

Ein kurzer Hinweis zu Experimenten mit IPC. Programme funktionieren nicht immer wie gew�nscht (das hier beschriebene Programm wurde vielfach ausgef�hrt...), aber wenn wir Prozesse verzweigen, werden nicht alle durch Anwendung von Ctrl-C gekillt. Ich habe das kill-Programm bisher nicht erw�hnt, aber ich vermute, wir verstehen einiges �ber Prozesse und die Man Pages. Unsere Prozesse lassen jedoch noch etwas anderes nach dem kill-Kommando zur�ck: IPC-Strukturen. In unserem obigen Beispiel wird das Killen der laufenden Prozesse nicht die Meldungs-Queue deallozieren. Um den gesamten, f�r unser Experiment allozierten Kernelspeicher zu entleeren, k�nnen wir die Programme ipcs und ipcrm benutzen: ipcs gibt eine Liste aller aktiven allozierten IPC-Resourrcen ( nicht nur unsere, sondern alle Programme, Vorsicht ist also geboten), w�hrend ipcrm uns erlaubt, einige zu entfernen. F�hren wir ipcrm ohne Argumente aus, erhalten wir alle ben�tigten Informationen: die empfohlenen Werte f�r die ersten Experimente sind "5 70 70".

Um das Projekt zu extrahieren, f�hren wir "tar xvzf ipcdemo-0.1.tar.gz" aus. Zum Installieren des ipcdemo-Programms f�hren wir "make" im aktuellen Verzeichnis des Projekts aus. "make clean" beseitigt die Sicherungsdateien und "make cleanall" entfernt Objektdateien.

Schlussbemerkung

Ich m�chte mich f�r die Verz�gerung bei der Ver�ffentlichung diese Artikels entschuldigen, Softwareentwicklung ist gl�cklicherweise nicht unser einziger Lebensinhalt... Wie stets warte ich auf eure Kommentare zu diesem Artikel und auf Vorschl�ge f�r zuk�nftige Themen: wie w�r's denn mit Threads?

Empfohlene Programme, Sites und Lesbares

Empfohlene B�cher finden wir in den vorangegangenen Artikeln, dieses Mal gebe ich euch einige Internetadressen �ber Programmieren, Debuggen und n�tzlichen Lesestoff.

Debugger (wie schon erw�hnt) sind der beste Freund des Developers, mindestens w�hrend der Entwicklung: lernt den Gebrauch von gdb ehe ihr euch mit ddd abgebt, graphischer Kram ist sch�n, aber nicht unbedingt notwendig.

Habt ihr die aufdringliche "Segmentation fault"- Meldung erhalten und ihr wundert euch, wo ihr fehlerhaften Code geschrieben habt? Zus�tzlich zum Lesen der Speicherauszugdatei mit gdb kann das Programm mit valgrind ausgef�hrt werden, wir sollten auch sein Speichersimulations-Framework nutzen.

Wie wir bemerkten, macht das Schreiben von IPC in C-Sprache Spass, ist aber komplex. Python ist die L�sung: es bietet vollst�ndige Unterst�tzung f�r Verzweigung (forking) und anderes, ausserdem ist es erweiterbar in C. Schaut's euch mal an, es lohnt sich.

Download