Skip to content
Nicolas Wu edited this page Jan 4, 2012 · 27 revisions

This repository contains work developed towards a new implementation of a distributed computing interface, where processes communicate with one another through explicit message passing rather than shared memory. The modules contained are intended to provide functionality for distributed computing, using the model described in Towards Cloud Haskell where Haskell is used to provide computation across nodes that share data only through message passing.

We start with a very high level description of how this library works, before going into further specific details.

Infrastructure

One goal of this project is to separate the transport layer from the process layer, so that the transport backend is entirely independent: it is envisaged that this interface might later be used by models other than the Cloud Haskell paradigm, and that applications built using Cloud Haskell might be easily configured to work with different backend transports.

The following diagram shows dependencies between the various modules that are envisaged, where arrows represent explicit module dependencies.

+------------------------------------------------------------+
|                        Application                         |
+------------------------------------------------------------+
             |                               |
             V                               V
+-------------------------+   +------------------------------+
|      Cloud Haskell      |<--|    Cloud Haskell Backend     |
+-------------------------+   +------------------------------+
             |           ______/             |
             V           V                   V
+-------------------------+   +------------------------------+
|   Transport Interface   |<--|   Transport Implementation   |
+-------------------------+   +------------------------------+
                                             |
                                             V
                              +------------------------------+
                              | Haskell/C Transport Library  |
                              +------------------------------+

In this diagram, the various nodes roughly correspond to specific modules:

Cloud Haskell                : Control.Distributed.Process
Cloud Haskell Backend        : Control.Distributed.Process.TCP
Transport Interface          : Network.Transport
Transport Implementation     : Network.Transport.TCP

An application is built using the primitives provided by the Cloud Haskell layer, provided by Control.Distributed.Process module, which provides abstractions such as nodes and processes.

The application also depends on a Cloud Haskell Backend, which provides functions to allow the initialisation of the transport layer using whatever topology might be appropriate to the application, this is be provided by Control.Distributed.Process.TCP, but could be interchanged for another backend transport protocol.

Both the Cloud Haskell interface and implementation make use of Transport Interface, provided by the Network.Transport module. This also serves as an interface for the Network.Transport.TCP module, which provides a specific implementation for this transport, and may, for example, be based on some external library written in Haskell or C.

Transports

Abstracting over the transport layer allows different protocols for message passing, including:

  • TCP/IP
  • UDP
  • MPI
  • CCI
  • ZeroMQ
  • SSH
  • MVars
  • Unix pipes

Each of these transports would provide its own implementation of the Network.Transport and provide a means of creating new connections for use within Control.Distributed.Process. This separation means that transports might be used for other purposes than Cloud Haskell.

Repositories, Packages, and Modules

The modules described above are found in the distributed-process repository, and split into two packages: distributed-process, and network-transport. These packages provide the basic interfaces, and a few standard implementations:

distributed-process:
  Control.Distributed.Process
  Control.Distributed.Process.TCP
  Control.Distributed.Process.UDP
  Control.Distributed.Process.MVar

network-transport:
  Network.Transport
  Network.Transport.TCP
  Network.Transport.UDP
  Network.Transport.MVar

Additional packages would include more exotic transports:

distributed-process-mpi:
  Control.Distributed.Process.MPI
  Network.Transport.MPI

distributed-process-cci:
  Control.Distributed.Process.CCI
  Network.Transport.CCI

Transport Interface

This section provides an overview of the transport interface, and explains the key functions and their use. Detailed documentation can be found in the Network.Transport module.

A Transport provides the means to create a connection between two endpoints, and abstracts over the underlying transportation mechanism, whether it be TCP, UDP, MPI, or some other means of communicating over a network. A new transport is created using a mkTransport function imported from the appropriate underlying driver module, such as Network.Transport.TCP, which exports the following function:

mkTransport :: TCPConfig -> IO Transport

This takes a parameter that configures a TCP connection and produces a transport that can be used on the node.

Once a transport has been created, a new connection between endpoints is established using the newConnection function:

newConnection :: Transport -> IO (SendAddr, ReceiveEnd)

This produces a SendAddr that can be serialized and passed around to clients who might wish to send messages to the corresponding ReceiveEnd. The serialization and deserialization of a SendAddr is pure:

serialize   :: SendAddr -> ByteString
deserialize :: ByteString -> SendAddr

Once a SendAddr is obtained, it can be used to create a SendEnd by using the connect function:

connect :: SendAddr -> IO SendEnd

This function establishes the connection between the resulting SendEnd and the ReceiveEnd.

Since the work of establishing a connection has been done by the connect function, the implementation of a send function is free to be lightweight. In order to support vectored messages, we send [ByteString]:

send :: SendEnd -> [ByteString] -> IO ()

The message will then be sent to the appropriate ReceiveEnd and can be retrieved using the receive function:

receive :: ReceiveEnd -> IO [ByteString]

Example

The following code is the first example found in DemoTransport.hs, where other examples can be found.

demo0 :: IO ()
demo0 = do
  -- First, we initialise the transport
  trans <- mkTransport $ TCPConfig undefined "127.0.0.1" "8080"

  -- A `SendAddr` and `ReceiveEnd` are then created
  (sendAddr, receiveEnd) <- newConnection trans

  -- Next, we fork a thread that will listen to any messages
  forkIO $ logServer "logServer" receiveEnd
  threadDelay 100000

  -- After a small delay, we create a new `SendEnd`, and transmit some
  -- messages:
  sendEnd <- connect sendAddr
  mapM_ (\n -> send sendEnd [BS.pack ("hello " ++ show n)]) [1 .. 10]
  threadDelay 100000

-- The `logServer` simply receives messages and traces them out:
logServer :: String -> ReceiveEnd -> IO ()
logServer name receiveEnd = forever $ do
  x <- receive receiveEnd
  trace (name ++ ": " ++ show x) $ return ()
Clone this wiki locally