Concurrency

InfoInfo
Search:    

next: Networking previous: Input/Output top: Contents

Concurrent objects or Processes
Asynchronous Procedures
Threads

Concurrent objects or Processes

Spark allows several computations to execute simultaneously, possibly interacting with each other. In programming parlance this is known as multi-processing, multi-threading, concurrency etc. Programming languages usually depend on the host operating system's capabilities to implement concurrency. Spark has built-in concurrency. It provide multi-threading without any help from the underlying operating system. This has some important consequences:

To write concurrent programs, we need to become familiar with three new procedures: spawn, send and receive.

(spawn proc): Creates a new process. The procedure represented by proc is evaluated in that process. The return value of spawn is an integer that identifies the new process. It can take n number of optional arguments which can be used to initialize the state of the new process.

(send pid): Asynchronously sends a message to the process identified by pid.

(receive pid): Receives a message from the message queue of the process identified by pid. Calls to receive usually make use of the pattern matching facility provided by the match library. receive usually follows the coding pattern given below:


(import (match))
(define (process-callback pid)
  (let ((msg (receive pid)))
    ;; Match the message against a set of patterns.
    (match msg
           ((pattern1) expressions1)
           ((pattern2) expressions2)
           (_ handle-unexpected-message))))

Let us study a small program where concurrent objects are put to good use. Here we define an object that can act as a background service for doing trigonometric calculations. It waits for messages in a loop. A message is packed as a list with the first element identifying the trigonometric function and the rest of the elements being that function's arguments. When a message arrives, it is unpacked using the match procedure and an appropriate trigonometric function is executed and the result is displayed. If it receives the special message 'exit, the concurrent object kills itself.


;; file: trig-server.ss

(import (match))

(define (trig-server pid)
  (let loop ((message (receive pid)))
    (cond
     ((not (eq? message 'exit))
      (match message
             (('sin a)
              (printf "sin of ~a is ~a~n" a (sin a)))
             (('cos a)
              (printf "cos of ~a is ~a~n" a (cos a)))
             (('tan a)
              (printf "tan of ~a is ~a~n" a (tan a)))
             (_
              (printf "Don't know the trig function ~a~n" message)))
      (loop (receive pid)))
     (else
      (display "Goodbye.")
       (newline)))))

To use the trig server, load trig-server.ss into Spark and spawn a process with the procedure trig-server as its callback.


> (load "trig-server.ss")
> (define pid (spawn trig-server))

Now we can send messages to the trigonometry object and have the results printed. As this object lives in its own parallel world, we can go on with other business as it does the computations.


> (send pid (list 'sin 3.4))
sin of 3.4 is -0.2555411020268312
> (send pid (list 'cos 2.1))
cos of 2.1 is -0.5048461045998575
> (send pid (list 'tan 3.5))
tan of 3.5 is 0.3745856401585947
> (send pid (list 'blah 7))
Don't know the trif function (blah 7)
> (send pid 'exit)
Goodbye.

Let us extend the trig-server with a complete implementation of inter-process communication. The client process should include its process id as the first element of the message. trig-server will compute the result and send it back to the client.


(import (match))

(define (trig-server pid)
  (let loop ((message (receive pid)))
    (cond
     ((not (eq? message 'exit))
      (match message
             ((sender-pid 'sin a)
              (send sender-pid (sin a)))
             ((sender-pid 'cos a)
              (send sender-pid (cos a)))
             ((sender-pid 'tan a)
              (send sender-pid (tan a)))
             (_
              (printf "Don't know the trig function ~a~n" message)))
      (loop (receive pid)))
     (else
      (display "Goodbye.")
      (newline)))))

Here is a client process that can interact with the trig-server. It is wrapped up in a procedure that takes the server process id, the trig function and the value to apply the trig function to. Internally we spawn a new process that will send the request to the server, get the response and print it.


(define (trig-client server-pid func val)
  (spawn
   (lambda (pid)
     (send server-pid (list pid func val))
     (printf "(~a ~a) = ~a" func val (receive pid))
     (flush-output))))

To try our new code, we need to load both trig-server.ss and trig-client.ss:


> (load "trig-server.ss")
> (load "trig-client.ss")
> (define server-pid (spawn trig-server))
> (trig-client server-pid 'sin 10)
2 ;; pid of the new client process
(sin 10) = -0.5440211108893699 ;; result
> (trig-client server-pid 'cos 1.2)
3
(cos 1.2) = 0.3623577544766736

Concurrent objects gives us a powerful tool to model the world around us. Like objects in the real world, Spark processes live and execute independently. They interact with each other using messages. As they need not share memory, programs can be written without worrying about synchronization and locks. Spark's concurrent objects are an implementation of the [WWW]Actor model and brings the experienced programmer to a previously unknown intimacy with pure [WWW]Object Oriented Programming.

Fault tolerance

A system is fault tolerant if it can detect errors and repair itself. A Spark process implements fault tolerance with the help of watcher processes. A watcher process is notified by the system when a watched process die or exit. The following code snippet introduces the procedure watch that enables a process to keep an eye on another process:


(define (test-server pid)
  (let loop ((m (receive pid)))
    (if (not (eq? m 'exit))
        (loop (receive pid)))))

(define (watch-server server-pid)
  (spawn
   (lambda (pid)
     (watch server-pid pid)
     (printf "~a watching ~a~n" pid server-pid)
     (let loop ((m (receive pid)))
      (if (eq? (car m) 'dead)
           (begin
             (printf "Process ~a is dead. Restarting it ...~n" (cdr m))
             (watch-server (spawn test-server)))
           (loop (receive pid)))))))

The process represented by test-server can be brought under watch by passing its proc-id to watch-server. When it dies, it is restarted by the watcher. This is demonstrated in the following session:


> (watch-server (spawn test-server))
2
2 watching 1
> (send 1 'exit)
Process (1) is dead. Restarting it ...
4 watching 3
> (send 3 'exit)
Process (3) is dead. Restarting it ...
6 watching 5

Here are a few useful process utility procedures:

(kill proc-id): Forcefully kills the process identified by proc-id.

(alive? proc-id): Returns #t if the process identified by proc-id is alive.

(register proc-id name): Maps name to proc-id. After registering the name, it can be used instead of proc-id in send calls.

(unregister proc-id name): Removes the proc-id-name mapping.

(descriptor proc-id): Returns the thread descriptor object that the process uses internally.

(message-channel proc-id): Returns the asynchronous message channel object that the process uses internally.

Distributed computing

In distributed computing a program is split into parts that run simultaneously on multiple computers communicating over a network. Spark processes can be made to send and receive messages over the network. This does not require any extensive modification of existing code. A client process can send messages to a server process running on another machine using the send procedure itself. The server process on the other end of the network can use the same old receive procedure to get the request. To actually send and receive messages over the network, we need a new procedure, though. This is called remoting! and it starts an internal TCP server that acts as a mediator between the distributed processes. The programmer need not worry about how the messages are packed, delivered and unpacked over the wire. All of that is taken care of by the send, receive and remoting! procedure calls. The trig-server we have written can serve clients in a networked environment, without any modification. Let us see how to start it as a server process:


> (load "trig-server.ss")
> (remoting!)
#<thread>
> (define pid (spawn trig-server))
> (register pid "trig-server")

Now clients can send messages to "trig-server" by appending the machine name or IP to its identifier. Here is a sample client that can interact with the remote trig-server:


> (define (trig-client pid)
  (let loop ((msg (receive pid)))
    (printf "~a~n" msg)
    (flush-output)
    (loop (receive pid))))
> (remoting! 1445)
#<thread>
> (define pid (spawn trig-client))
> (register pid "trig-client")
> (send "trig-server@localhost" (list "trig-client@localhost:1445" 'sin 3.4))
#t
-0.2555411020268312
> (send "trig-server@localhost" (list "trig-client@localhost:1445" 'cos 1.2))
#t
0.3623577544766736

Note the way we specify the remote process id appended with the machine name. We need to start the remoting service so as to get back the responses send by trig-server. We start the client side remoting service on port 1445, because the default port 1383 is already bound by trig-server. If the server and client processes are actually running on two machines, we need not change the port. Thus components in a Spark program can be distributed across a network with very little additional programming effort. One other language that offer this convenience is [WWW]Erlang, which inspired and influenced the design of Spark concurrent objects.


Asynchronous procedures

Asynchronous procedures can be used to get a time consuming computation done in the background. It is another way of creating processes, without the messaging infrastructure. The computation is defined as a procedure which is evaluated using async.

(async proc args-list callback-proc): proc is the procedure to be executed asynchronously. args-list is a list of arguments that will be passed to proc. This should be null if proc does not take any argument. callback-proc is a procedure that will be called once proc is done. callback-proc will receive as argument the value that proc evaluates to. callback-proc can be null.


> (define (long-computation a b)
   ;; We simulate the long computation by
   ;; making the current thread of execution sleep for 2 seconds.
   (sleep 2)
   (+ a b))
> (define (done-callback result)
   (printf "long computation done. result is ~a~n" result))
> (async long-computation (list 10 20) done-callback)
>
long computation done. result is 30


Threads

If you have programmed in languages like Java or Python, you might have used threads to simulate concurrency. Spark too have threads. But unlike the aforementioned languages, Spark's multi-threading capabilities are not dependent on the host operating system. Threads is part of the Spark runtime itself. A new thread is created with a call to the thread procedure. It takes a procedure as argument, which will be execute in the new thread. The return value is a thread descriptor object.


> (thread (lambda () (display "i am in a new thread!") (newline)))
#<thread>
i am in a new thread!

See the MzScheme library reference [WWW]http://spark-scheme.wikispot.org/MzScheme_Library_Reference for more information on threads.

Atomic operations

The preferred paradigm for Spark is [WWW]Functional Programming. Spark is not a pure functional language. It means that there is nothing in the language that prevents you from sharing state across processes. If two processes modify a shared resource at the same time, that might lead to data corruption and incorrect program behavior. Spark provides the special form atomic to prevent a piece of code from being executed by two processes at the same time. If the portion of code that writes to the shared state is atomic, it is guaranteed that it won't be modified by two processes simultaneously.


> (define (modify-global-state)
   (atomic
     (begin
        ;; Modify global state here.
       ))

The atomic block in modify-global-state makes sure that all writes to global-state is serialized.

Usually atomic makes use of a single, global synchronization object. New, special purpose synchronization objects can be created using named atomics. Code blocks can choose which synchronization object to use by specifying its name.


> (define (modify-global-state)
   (atomic "lock-name"
    (begin
        ;; Modify global state here.
       ))

All constant-time procedures and operations are thread-safe because they are atomic. For example, set! assigns to a variable as an atomic action with respect to all threads, so that no thread can see a "half-assigned" variable. Similarly, vector-set! assigns to a vector atomically. The hash-table-put! procedure is not atomic, but the table is protected by a lock. Port operations are generally not atomic, but they are thread-safe in the sense that a byte consumed by one thread from an input port will not be returned also to another thread, and procedures like port-commit-peeked and write-bytes-avail offer specific concurrency guarantees. See MzScheme Library Reference for more information.

This is a Wiki Spot wiki. Wiki Spot is a non-profit organization that helps communities collaborate via wikis.