Using Additional Concurrency Mechanisms

Rexx has additional concurrency mechanisms that can add full concurrency so that more than one method of a given scope can run in an object at a time:

SETUNGUARDED Method and UNGUARDED Option

The SETUNGUARDED method of the Method class and the UNGUARDED option of the ::METHOD directive control locking of an object's scope when a method is invoked. Both let a method run even if another method is active on the same object.

Use the SETUNGUARDED method or UNGUARDED option only for methods that do not need exclusive use of their object variable pool, that is, methods whose execution can interleave with another method's execution without affecting the object's integrity. Otherwise, concurrent methods can produce unexpected results.

To use the SETUNGUARDED method for a method you have created with the NEW method of the Method class, you specify:

methodname~SETUNGUARDED

(See SETUNGUARDED for details about SETUNGUARDED.)

Alternately, you can define a method with the ::METHOD directive, specifying the UNGUARDED option:

::METHOD methodname UNGUARDED

GUARD ON and GUARD OFF

You might not be able to use the SETUNGUARDED method or UNGUARDED option in all cases. A method might need exclusive use of its object variables, then allow methods on other activities to run, and perhaps later need exclusive use again. You can use GUARD ON and GUARD OFF to alternate between exclusive use of an object's scope and allowing other activities to use the scope.

By default, a method must wait until a currently running method is finished before it begins. GUARD OFF lets another method (running on a different activity) that needs exclusive use of the same object variables become active on the same object. See GUARD for more information.

Guarded Methods

Concurrency requires the activities of concurrently running methods to be synchronized. Critical data must be safeguarded so diverse methods on other activities do not perform concurrent updates. Guarded methods satisfy both these needs.

A guarded method combines the UNGUARDED option of the ::METHOD directive or the SETUNGUARDED method of the Method class with the GUARD instruction.

The UNGUARDED option and the SETUNGUARDED method both provide unconditional concurrency. Including a GUARD instruction in a method makes concurrency conditional:

GUARD ON WHEN expression

If the expression on the GUARD instruction evaluates to 1 (true), the method continues to run. If the expression on the GUARD instruction evaluates to 0 (false), the method does not continue running. GUARD reevaluates the expression whenever the value of an exposed object variable changes. When the expression evaluates to 1, the method resumes running. You can use GUARD to block running any method when proceeding is not safe. (See GUARD for details about GUARD.)

Note: It is important to ensure that you use an expression that can be fulfilled. If the condition expression cannot be met, GUARD ON WHEN puts the program in a continuous wait condition. This can occur in particular when several activities run concurrently. In this case, a second activity can make the condition expression invalid before GUARD ON WHEN can use it.

To avoid this, ensure that the GUARD ON WHEN statement is executed before the condition is set to true. Keep in mind that the sequence of running activities is not determined by the calling sequence, so it is important to use a logic that is independent of the activity sequence.

Additional Examples

The following example uses REPLY in a method for a write-back cache.

/* Method Write_Back                                 */
use arg data           /* Save data to be written    */
reply 0                /* Tell the sender all was OK */
self~disk_write(data)  /* Now write the data         */

The REPLY instruction returns control to the point at which method Write_Back was called, returning the result 0. The caller of method Write_Back continues processing from this point; meanwhile, method Write_Back also continues processing.

The following example uses a message object. It reads a line asynchronously into the variable nextline:

mymsg = infile~start("READLINE") /* Gets message object to carry    */
/* message to INFILE                                                */
/* do other work */
nextline=mymsg~result            /* Gets result from message object */

This creates a message object that waits for the read to finish while the sender continues with other work. When the line is read, the mymsg message object obtains the result and holds it until the sender requests it.

Semaphores and monitors (bounded buffers) synchronize concurrency processes. Giving readers and writers concurrent access is a typical concurrency problem. The following sections show how to use guarded methods to code semaphore and monitor mechanisms and to provide concurrency for readers and writers.

Semaphores

A semaphore is a mechanism that controls access to resources, for example, preventing simultaneous access. Synchronization often uses semaphores. Here is an example of a semaphore class:

Figure 12-5. Example of a Rexx Semaphore Class

/*******************************************************************************/
/*  A Rexx Semaphore Class.                                                    */
/*                                                                             */
/* This file implements a semaphore class in Rexx.  The class is defined to    */
/* the Global Rexx Environment.  The following methods are defined for         */
/* this class:                                                                 */
/*   init - Initializes a new semaphore.  Accepts the following positional     */
/*          parameters:                                                        */
/*                             'name' - global name for this semaphore         */
/*                                if named default to set name in              */
/*                                the class semDirectory                       */
/*                             noshare -  do not define named semaphore        */
/*                                in class semDirectory                        */
/*                             Initial state (0 or 1)                          */
/*   setInitialState - Allow for subclass to have some post-initialization,    */
/*     and do setup based on initial state of semaphore                        */
/*   Waiting  - Is the number of objects waiting on this semaphore.            */
/*   Shared   - Is this semaphore shared (Global).                             */
/*   Named    - Is this semaphore named.                                       */
/*   Name     - Is the name of a named semaphore.                              */
/*   setSem   - Sets the semaphore and returns previous state.                 */
/*   resetSem - Sets state to unSet.                                           */
/*   querySem - Returns current state of semaphore.                            */
/*                                                                             */
/* SemaphoreMeta - Is the metaclass for the semaphore classes.  This class is  */
/*   set up so that when a namedSemaphore is shared, it maintains these        */
/*   named/shared semaphores as part of its state.  These semaphores are       */
/*   maintained in a directory, and an UNKNOWN method is installed on the      */
/*   class to forward unknown messages to the directory.  In this way the      */
/*   class can function as a class and "like" a directory, so [] syntax can    */
/*   be used to retrieve a semaphore from the class.                           */
/*                                                                             */
/*                                                                             */
/* The following are in the subclass EventSemaphore.                           */
/*                                                                             */
/*   Post  - Posts this semaphore.                                             */
/*   Query - Queries the number of posts since the last reset.                 */
/*   Reset - Resets the semaphore.                                             */
/*   Wait  - Waits on this semaphore.                                          */
/*                                                                             */
/*                                                                             */
/* The following are in the subclass MutexSemaphore                            */
/*                                                                             */
/*   requestMutex - Gets exclusive use of semaphore.                           */
/*   releaseMutex - Releases to allow someone else to use semaphore.           */
/*        NOTE: Currently anyone can issue a release (need not be the owner).  */
/*******************************************************************************/
/* ============================================================================ */
/* ===         Start of Semaphore class.                                  ===== */
/* ============================================================================ */
::class SemaphoreMeta subclass class
::method init
  expose semDict
                                            /* Be sure to initialize parent     */
  .message~new(self, .array~of("INIT", super), "a", arg(1,"a"))~send
  semDict = .directory~new

::method unknown
  expose semDict
  use arg msgName, args
                                            /* Forward all unknown messages     */
                                            /* to the semaphore dictionary      */
  .message~new(semDict, msgName, "a", args)~send
  if var("RESULT") then
    return result
  else
    return


::class Semaphore subclass object metaclass SemaphoreMeta

::method init
  expose sem waits shared name
  use arg semname, shr, state

  waits = 0                                 /* No one waiting                   */
  name = ""                                 /* Assume unnamed                   */
  shared = 0                                /* Assume not shared                */
  sem = 0                                   /* Default to not posted            */

  if state = 1 Then                         /* Should initial state be set?     */
    sem = 1
                                            /* Was a name specified?            */
  if VAR("SEMNAME") & semname \= "" Then Do
    name = semname                          /* Yes, so set the name             */


    if shr \= "NOSHARE" Then Do             /* Do we want to share this sem?    */
      shared = 1                            /* Yes, mark it shared              */
                                            /* Shared add to semDict            */
      self~class[name] = self
    End

  End
  self~setInitialState(sem)                 /* Initialize initial state         */

::method setInitialState
                                            /* This method intended to be       */
nop                                         /* overridden by subclasses         */
::method setSem
expose sem
oldState = sem
sem = 1                                     /* Set new state to 1               */
return oldState

::method resetSem
expose sem
sem = 0
return 0

::method querySem
expose sem
return sem


::method shared
expose shared
return shared                               /* Return true 1 or false 0         */

::method named
expose name
                                            /* Does semaphore have a name?      */
if name = "" Then return 0                  /* No, not named                    */
Else return 1                               /* Yes, it is named                 */

::method name
expose name
return name                                 /* Return name or ""                */

::method incWaits
expose waits
waits = waits + 1                           /* One more object waiting          */

::method decWaits
expose Waits
waits = waits - 1                           /* One object less waiting          */

::method Waiting
expose Waits
return waits                                /* Return number of objects waiting */
/* ========================================================================== */
/* ===         Start of EventSemaphore class.                             === */
/* ========================================================================== */

::class EventSemaphore subclass Semaphore public
::method setInitialState
  expose posted posts
  use arg posted

  if posted  then posts = 1
  else posts = 0
::method post
  expose posts posted

  self~setSem                        /* Set semaphore state            */
  posted = 1                         /* Mark as posted                 */
  reply
  posts = posts + 1                  /* Increase the number of posts   */

::method wait
  expose posted

  self~incWaits                      /* Increment number waiting       */
  guard off
  guard on when posted               /* Now wait until posted          */
  reply                              /* Return to caller               */
  self~decWaits                      /* Cleanup, 1 less waiting        */

::method reset
  expose posts posted

  posted = self~resetSem             /* Reset semaphore                */
  reply                              /* Do an early reply              */
  posts = 0                          /* Reset number of posts          */

::method query
  expose posts
                                     /* Return number of times         */
  return posts                       /* Semaphore has been posted      */
/* ========================================================================== */
/* ===         Start of MutexSemaphore class.                             === */
/* ========================================================================== */

::class MutexSemaphore subclass Semaphore public

::method setInitialState
  expose owned
  use arg owned


::method requestMutex
  expose Owned

  Do forever                                /* Do until we get the semaphore  */
    owned = self~setSem
    if Owned = 0                            /* Was semaphore already set?     */
      Then leave                            /* Wasn't owned; we now have it   */
    else Do
      self~incWaits
      guard off                             /* Turn off guard status to let   */
                                            /* others come in                 */
      guard on when \Owned                  /* Wait until not owned and get   */
                                            /* guard                          */
      self~decWaits                         /* One less waiting for MUTEX     */
    End
                                            /* Go up and see if we can get it */
  End


::method releaseMutex
  expose owned
  owned = self~resetSem                     /* Reset semaphore                */

Note: There are functions available that use system semaphores. See SysCreateEventSem, and SysCreateMutexSem.

Monitors (Bounded Buffer)

A monitor object consists of a number of client methods, WAIT and SIGNAL methods for client methods to use, and one or more condition variables. Guarded methods provide the functionality of monitors. Do not confuse this with the Monitor class (see The Monitor Class).

::method init
/* Initialize the bounded buffer */
expose size in out n
use arg size
in = 1
out = 1
n = 0

::method append unguarded
/* Add to the bounded buffer if not full */
expose n size b. in
guard on when n < size
use arg b.in
in = in//size+1
n = n+1

::method take
/* Remove from the bounded buffer if not empty */
expose n b. out size
guard on when n > 0
reply b.out
out = out//size+1
n = n-1

Readers and Writers

The concurrency problem of the readers and writers requires that writers exclude writers and readers, whereas readers exclude only writers. The UNGUARDED option is required to allow several concurrent readers.

::method init
expose readers writers
readers = 0
writers = 0

::method read unguarded
/* Read if no one is writing */
expose writers readers
guard on when writers = 0
readers = readers + 1
guard off

/* Read the data */
say "Reading (writers:" writers", readers:" readers")."
guard on
readers = readers - 1

::method write unguarded
/* Write if no-one is writing or reading */
expose writers readers
guard on when writers + readers = 0
writers = writers + 1

/* Write the data */
say "Writing (writers:" writers", readers:" readers")."
writers = writers - 1