Commit 0505b6da authored by Michael Hanus's avatar Michael Hanus
Browse files

Libraries CPNS, Socket and NamedSocket removed (now available in packages)

parent 4a44f1da
------------------------------------------------------------------------------
--- Implementation of a Curry Port Name Server based on raw sockets.
--- It is used to implement the library Ports for distributed programming
--- with ports.
---
--- @author Michael Hanus
--- @version February 2017
--- @category web
------------------------------------------------------------------------------
module CPNS(registerPort,getPortInfo,unregisterPort,
cpnsStart,cpnsStop,cpnsShow,cpnsAlive,main) where
import Char
import Distribution(installDir)
import FilePath((</>))
import IO
import List(delete,intersperse)
import Profile
import ReadShowTerm
import Socket
import System
import Time
-- If we connect to a port with symbolic name pn, we first connect
-- to the CPNS of the host named by pn to get the physical socket
-- number of this port. In order to connect to CPNS from any
-- machine in the world, the CPNS demon always listens at the following
-- port:
-- (Note that this must be identical for all machines running
-- Distributed Curry! If this port is occupied by another process
-- on a host, you cannot run Distributed Curry on it.)
-- The standard port number of CPNS demon.
cpnsSocket :: Int
cpnsSocket = 8767
-- The time out before considering the server as unreachable:
cpnsTimeOut :: Int
cpnsTimeOut = 3000
--- Type of messages to be processed by the Curry Port Name Server.
---
--- @cons Register name pid sn pn ack
--- - assign the values pid, sn, and pn to name
--- (pid is the process number of the registered process
--- (should be 0 if it is unknown); the server returns True
--- if registration had no problems, otherwise False)
--- @cons GetRegister name - request for a registered port name;
--- the server returns the values (sn,pn) that are assigned to the
--- port name
--- @cons Unregister name - request to remove a registered port name
--- @cons ShowRegistry - show the current port registrations
--- @cons Ping - ping the CPNS demon for liveness check
--- @cons Terminate - terminate the CPNS demon
data CPNSMessage = Terminate
| Ping
| Register String Int Int Int
| GetRegister String
| Unregister String
| ShowRegistry
-- The lock file to coordinate the startup of the CPNS demon:
cpnsStartupLockfile :: String
cpnsStartupLockfile = "/tmp/CurryPNSD.lock"
--- Starts the "Curry Port Name Server" (CPNS) running on the local machine.
--- The CPNS is responsible to resolve symbolic names for ports
--- into physical socket numbers so that a port can be reached
--- under its symbolic name from any machine in the world.
cpnsStart :: IO ()
cpnsStart = catch startup
(\_ -> putStrLn "FAILURE occurred during startup!" >>
deleteStartupLockfile >>
return Nothing) >>=
maybe done (cpnsServer [])
where
deleteStartupLockfile = do
putStrLn $ "Removing startup lock file \""++cpnsStartupLockfile++"\"..."
system $ "rm -f "++cpnsStartupLockfile
done
startup = do
putStrLn $ "Starting Curry Port Name Server on port " ++
show cpnsSocket ++ "..."
socket <- listenOn cpnsSocket
deleteStartupLockfile
pid <- getPID
putStrLn $ "Curry Port Name Server is ready (PID: "++show pid++")."
return (Just socket)
--- The main loop of the CPNS demon
cpnsServer :: [(String,Int,Int,Int)] -> Socket -> IO ()
cpnsServer regs socket = do
(chost,stream) <- socketAccept socket
--putStrLn $ "Connection from "++chost
serveRequest chost stream
where
doIfLocalHost rhost action = do
hostname <- getHostname
if rhost `elem` ["localhost","localhost.localdomain",hostname]
|| take 8 rhost == "127.0.0."
then action
else do putStrLn $ "Illegal request received from host: " ++ rhost
cpnsServer regs socket
serveRequest rhost h = do
msg <- readMsgLine h
either
(\line -> do putStrLn $ "ERROR: Illegal message:\n" ++ line
cpnsServer regs socket )
(\m -> case m of
Terminate -> doIfLocalHost rhost $ do
hClose h
putStrLn "CPNS demon terminated."
Ping -> do
hPutStrLn h (showQTerm ())
hClose h
cpnsServer regs socket
Register pname pid sn pn -> doIfLocalHost rhost $ do
(ack, newregs) <- tryRegisterPortName regs pname pid sn pn
hPutStrLn h (showQTerm ack)
hClose h
cpnsServer newregs socket
GetRegister pname -> do
--putStrLn $ "Request for port name: " ++ pname
(newregs,pinfo) <- getRegisteredPortName regs pname
hPutStrLn h (showQTerm pinfo)
hClose h
cpnsServer newregs socket
Unregister pname -> doIfLocalHost rhost $ do
newregs <- unregisterPortName regs pname
hClose h
cpnsServer newregs socket
ShowRegistry -> doIfLocalHost rhost $ do
putStrLn "Currently registered port names:"
newregs <- showAndCleanRegs regs
hFlush stdout
hClose h
cpnsServer newregs socket )
msg
tryRegisterPortName :: [(String,Int,Int,Int)] -> String -> Int -> Int -> Int
-> IO (Bool, [(String, Int, Int, Int)])
tryRegisterPortName regs name pid sn pn = do
let nameregs = filter (\(n,_,_,_)->name==n) regs
ack <- if null nameregs
then return True
else let (_,pid',_,_) = head nameregs in
if pid'>0 && pid'/=pid
-- we allow registration from the same process
then doesProcessExists pid' >>= \pex -> return (not pex)
else return True
ctime <- getLocalTime
putStrLn $ "Register port \""++name++"\": pid "++show pid++
" / socket "++show sn++
" / number "++show pn ++ " at " ++ calendarTimeToString ctime
let newregs = (name,pid,sn,pn) : filter (\ (n,_,_,_)->name/=n) regs
printMemInfo newregs
hFlush stdout
return (ack, newregs)
-- Delete all registrations for a given port name:
unregisterPortName :: [(String,Int,Int,Int)] -> String
-> IO [(String,Int,Int,Int)]
unregisterPortName regs name = do
ctime <- getLocalTime
putStrLn $ "Unregister port \""++name++"\" at "++calendarTimeToString ctime
let newregs = filter (\ (n,_,_,_)->name/=n) regs
printMemInfo newregs
hFlush stdout
return newregs
-- Get the socket number for a registered port name
-- (or (0,0) if not registered).
-- In addition, a new registration list is returned where a registration
-- is deleted if the corresponding server process does not exist.
getRegisteredPortName :: [(String,Int,Int,Int)] -> String
-> IO ([(String,Int,Int,Int)],(Int,Int))
getRegisteredPortName regs pname =
let nameregs = filter (\(n,_,_,_)->pname==n) regs in
if null nameregs
then return (regs,(0,0))
else let (_,pid,sn,pn) = head nameregs in
if pid>0
then doesProcessExists pid >>= \pex ->
if pex
then return (regs,(sn,pn))
else --putStrLn ("WARNING: Process "++show pid++" not running!") >>
return (delete (head nameregs) regs, (0,0))
else return (regs,(sn,pn))
-- Show all registered ports and return a new registration list
-- where a registration is deleted if the corresponding server process
-- does not exist.
showAndCleanRegs :: [(String,Int,Int,Int)] -> IO [(String,Int,Int,Int)]
showAndCleanRegs regs = do
newreglist <- mapIO checkAndShow regs
return (concat newreglist)
where
checkAndShow reg@(n,pid,sn,pn) = do
pidexist <- doesProcessExists pid
if pidexist
then do putStrLn $ n++": pid "++show pid++
" / socket "++show sn++" / number "++show pn
return [reg]
else return []
-- Print status information of current CPNS demon process:
printMemInfo :: [(String,Int,Int,Int)] -> IO ()
printMemInfo regs = do
pinfos <- getProcessInfos
putStrLn ("NumRegs: " ++ show (length regs) ++ ", " ++ showMemInfo pinfos)
-- test whether a process with a given pid is running:
doesProcessExists :: Int -> IO Bool
doesProcessExists pid = do
status <- system("test -z \"`ps -p "++show pid++" | fgrep "++show pid++"`\"")
return (status>0)
-- Read a line from a stream and check syntactical correctness of message:
readMsgLine :: Handle -> IO (Either String a)
readMsgLine handle = do
line <- hGetLine handle
case readsQTerm line of
[(msg,rem)] -> return (if all isSpace rem then Right msg else Left line)
_ -> return (Left line)
-- Perform an action if the CPNS demon at a given host is alive:
doIfAlive :: String -> IO a -> IO a
doIfAlive host action = do
alive <- cpnsAlive cpnsTimeOut host
if not alive
then error $ "Curry port name server at host \""++host++
"\" is not reachable (timeout after "++show cpnsTimeOut++
" ms)!"
else action
sendToLocalCPNS :: CPNSMessage -> IO ()
sendToLocalCPNS msg = doIfAlive "localhost" $ do
h <- connectToSocket "localhost" cpnsSocket
hPutStrLn h (showQTerm msg)
hClose h
--- Shows all registered ports at the local CPNS demon (in its logfile).
cpnsShow :: IO ()
cpnsShow = sendToLocalCPNS ShowRegistry
--- Terminates the local CPNS demon
cpnsStop :: IO ()
cpnsStop = sendToLocalCPNS Terminate
--- Gets an answer from a Curry port name server on a host,
--- or reports an error.
cpnsTryGetAnswer :: String -> CPNSMessage -> IO _
cpnsTryGetAnswer host msg = catch tryGetAnswer connectError
where
tryGetAnswer = do
h <- connectToSocket host cpnsSocket
hPutStrLn h (showQTerm msg)
hFlush h
ready <- hWaitForInput h cpnsTimeOut
if ready
then do
answer <- readMsgLine h
hClose h
either (\line -> error $ "cpnsTryGetAnswer: Illegal answer: " ++ line)
return
answer
else failed
connectError _ = error $ "Curry port name server at host \""++host++
"\" is not reachable!"
--- Registers a symbolic port at the local host.
registerPort :: String -> Int -> Int -> IO ()
registerPort pname sn pn = do
startCPNSDIfNecessary
pid <- getPID
ack <- cpnsTryGetAnswer "localhost" (Register pname pid sn pn)
if ack then done
else putStrLn ("WARNING: Port name '"++pname++"' already registered!")
--- Gets the information about a symbolic port at some host.
getPortInfo :: String -> String -> IO (Int,Int)
getPortInfo pname host = cpnsTryGetAnswer host (GetRegister pname)
--- Unregisters a symbolic port at the local host.
unregisterPort :: String -> IO ()
unregisterPort pname = sendToLocalCPNS (Unregister pname)
--- Tests whether the CPNS demon at a host is alive.
cpnsAlive :: Int -> String -> IO Bool
cpnsAlive timeout host = catch tryPingCPNS (\_ -> return False)
where
tryPingCPNS = do
h <- connectToSocket host cpnsSocket
hPutStrLn h (showQTerm Ping)
hFlush h
answer <- hWaitForInput h timeout
hClose h
return answer
--- Starts the CPNS demon at localhost if it is not already running:
startCPNSDIfNecessary :: IO ()
startCPNSDIfNecessary = do
system $ installDir </> "currytools" </> "cpns" </> "start"
done
--- Main function for CPNS demon. Check arguments and execute command.
main :: IO ()
main = do
args <- getArgs
case args of
["start"] -> cpnsStart
["stop"] -> cpnsStop
["show"] -> cpnsShow
_ -> putStrLn $ "ERROR: Illegal arguments: " ++
concat (intersperse " " args) ++ "\n" ++
"Allowed arguments: start|stop|show"
{-
Test with PAKCS:
:fork cpnsStart
registerPort "xxx" 42 2
getPortInfo "xxx" "localhost"
cpnsStop
-}
------------------------------------------------------------------------------
--- Library to support network programming with sockets that are addressed
--- by symbolic names. In contrast to raw sockets (see library
--- <code>Socket</code>), this library uses the Curry Port Name Server
--- to provide sockets that are addressed by symbolic names
--- rather than numbers.
---
--- In standard applications, the server side uses the operations
--- <code>listenOn</code> and <code>socketAccept</code> to provide some service
--- on a named socket, and the client side uses the operation
--- <code>connectToSocket</code> to request a service.
---
--- @author Michael Hanus
--- @version February 2008
--- @category general
------------------------------------------------------------------------------
module NamedSocket(Socket,
listenOn, socketAccept, waitForSocketAccept,
connectToSocketRepeat, connectToSocketWait,
sClose, socketName, connectToSocket)
where
import System
import IO(Handle)
import qualified Socket
import CPNS
---------------------------------------------------------------------
-- Server side operations:
--- Abstract type for named sockets.
data Socket = NamedSocket String Socket.Socket
--- Creates a server side socket with a symbolic name.
listenOn :: String -> IO Socket
listenOn socketname = do
(port,socket) <- Socket.listenOnFresh
registerPort socketname port 0
return (NamedSocket socketname socket)
--- Returns a connection of a client to a socket.
--- The connection is returned as a pair consisting of a string identifying
--- the client (the format of this string is implementation-dependent)
--- and a handle to a stream communication with the client.
--- The handle is both readable and writable.
socketAccept :: Socket -> IO (String,Handle)
socketAccept (NamedSocket _ socket) = Socket.socketAccept socket
--- Waits until a connection of a client to a socket is available.
--- If no connection is available within the time limit, it returns Nothing,
--- otherwise the connection is returned as a pair consisting
--- of a string identifying the client
--- (the format of this string is implementation-dependent)
--- and a handle to a stream communication with the client.
--- @param socket - a socket
--- @param timeout - milliseconds to wait for input (< 0 : no time out)
waitForSocketAccept :: Socket -> Int -> IO (Maybe (String,Handle))
waitForSocketAccept (NamedSocket _ socket) = Socket.waitForSocketAccept socket
--- Closes a server socket.
sClose :: Socket -> IO ()
sClose (NamedSocket socketname socket) = do
Socket.sClose socket
unregisterPort socketname
--- Returns a the symbolic name of a named socket.
socketName :: Socket -> String
socketName (NamedSocket socketname _) = socketname
---------------------------------------------------------------------
-- Client side operations:
--- Waits for connection to a Unix socket with a symbolic name.
--- In contrast to <code>connectToSocket</code>, this action waits until
--- the socket has been registered with its symbolic name.
--- @param waittime - the time to wait before retrying (in milliseconds)
--- @param action - I/O action to be executed before each wait cycle
--- @param retries - number of retries before giving up (-1 = retry forever)
--- @param nameAtHost - the symbolic name of the socket
--- (must be either of the form "name@host" or "name"
--- where the latter is a shorthand for "name@localhost")
--- @return Nothing (if connection is not possible within the given limits)
--- or (Just h) where h is the handle of the connection
connectToSocketRepeat :: Int -> IO _ -> Int -> String -> IO (Maybe Handle)
connectToSocketRepeat waittime action retries nameAtHost = do
let (name,atHost) = break (=='@') nameAtHost
host = if atHost=="" then "localhost" else tail atHost
-- check whether remote CPNS demon is alive:
alive <- cpnsAlive waittime host
if not alive
then tryAgain
else do -- get remote socket/port numbers:
(snr,_) <- getPortInfo name host
if snr==0
then tryAgain
else Socket.connectToSocket host snr >>= return . Just
where
tryAgain = if retries==0 then return Nothing else do
action
sleep (ms2s waittime)
connectToSocketRepeat waittime action (decr retries) nameAtHost
ms2s n = let mn = n `div` 1000 in if mn==0 then 1 else mn
decr n = if n<0 then n else n-1
--- Waits for connection to a Unix socket with a symbolic name and
--- return the handle of the connection.
--- This action waits (possibly forever) until the socket with the symbolic
--- name is registered.
--- @param nameAtHost - the symbolic name of the socket
--- (must be either of the form "name@host" or "name"
--- where the latter is a shorthand for "name@localhost")
--- @return the handle of the connection (connected to the socket nameAtHost)
--- which is both readable and writable
connectToSocketWait :: String -> IO Handle
connectToSocketWait nameAtHost = do
Just hdl <- connectToSocketRepeat 1000 done (-1) nameAtHost
return hdl
--- Creates a new connection to an existing(!) Unix socket with a symbolic
--- name. If the symbolic name is not registered, an error is reported.
--- @param nameAtHost - the symbolic name of the socket
--- (must be either of the form "name@host" or "name"
--- where the latter is a shorthand for "name@localhost")
--- @return the handle of the stream (connected to the socket nameAtHost)
--- which is both readable and writable
connectToSocket :: String -> IO Handle
connectToSocket nameAtHost = do
let (name,atHost) = break (=='@') nameAtHost
host = if atHost=="" then "localhost" else tail atHost
-- get remote port number:
(snr,_) <- getPortInfo name host
if snr==0
then error ("connectToSocket: Socket \""++name++"@"++host++
"\" is not registered!")
else done
Socket.connectToSocket host snr
---------------------------------------------------------------------
------------------------------------------------------------------------------
--- Library to support network programming with sockets.
--- In standard applications, the server side uses the operations
--- <code>listenOn</code> and <code>socketAccept</code> to provide some service
--- on a socket, and the client side uses the operation
--- <code>connectToSocket</code> to request a service.
---
--- @author Michael Hanus
--- @version February 2008
--- @category general
------------------------------------------------------------------------------
module Socket(Socket, listenOn, listenOnFresh,
socketAccept, waitForSocketAccept, sClose, connectToSocket)
where
import System
import IO(Handle)
--- The abstract type of sockets.
external data Socket
---------------------------------------------------------------------
-- Server side operations:
--- Creates a server side socket bound to a given port number.
listenOn :: Int -> IO Socket
listenOn port = prim_listenOn $# port
prim_listenOn :: Int -> IO Socket
prim_listenOn external
--- Creates a server side socket bound to a free port.
--- The port number and the socket is returned.
listenOnFresh :: IO (Int,Socket)
listenOnFresh external
--- Returns a connection of a client to a socket.
--- The connection is returned as a pair consisting of a string identifying
--- the client (the format of this string is implementation-dependent)
--- and a handle to a stream communication with the client.
--- The handle is both readable and writable.
socketAccept :: Socket -> IO (String,Handle)
socketAccept s = prim_socketAccept $## s
prim_socketAccept :: Socket -> IO (String,Handle)
prim_socketAccept external
--- Waits until a connection of a client to a socket is available.
--- If no connection is available within the time limit, it returns Nothing,
--- otherwise the connection is returned as a pair consisting
--- of a string identifying the client
--- (the format of this string is implementation-dependent)
--- and a handle to a stream communication with the client.
--- @param socket - a socket
--- @param timeout - milliseconds to wait for input (< 0 : no time out)
waitForSocketAccept :: Socket -> Int -> IO (Maybe (String,Handle))
waitForSocketAccept s timeout = (prim_waitForSocketAccept $## s) $# timeout
prim_waitForSocketAccept :: Socket -> Int -> IO (Maybe (String,Handle))
prim_waitForSocketAccept external
--- Closes a server socket.
sClose :: Socket -> IO ()
sClose s = prim_sClose $## s
prim_sClose :: Socket -> IO ()
prim_sClose external
---------------------------------------------------------------------
-- Client side operations:
--- Creates a new connection to a Unix socket.
--- @param host - the host name of the connection
--- @param port - the port number of the connection
--- @return the handle of the stream (connected to the port port@host)
--- which is both readable and writable
connectToSocket :: String -> Int -> IO Handle
connectToSocket host port = (prim_connectToSocket $## host) $# port
prim_connectToSocket :: String -> Int -> IO Handle
prim_connectToSocket external
---------------------------------------------------------------------
{-# LANGUAGE MultiParamTypeClasses #-}
import Control.Concurrent
import Control.Monad (when)
import Network
import Network.Socket hiding (sClose)
type C_Socket = PrimData Socket
instance ConvertCurryHaskell Curry_Prelude.C_Int PortID where
toCurry (PortNumber i) = toCurry (toInteger i)
fromCurry i = PortNumber (fromInteger (fromCurry i))
external_d_C_prim_listenOn :: Curry_Prelude.C_Int -> Cover -> ConstStore -> Curry_Prelude.C_IO C_Socket
external_d_C_prim_listenOn i _ _ = toCurry listenOn i
external_d_C_listenOnFresh :: Cover -> ConstStore -> Curry_Prelude.C_IO (Curry_Prelude.OP_Tuple2 Curry_Prelude.C_Int C_Socket)
external_d_C_listenOnFresh _ _ = toCurry listenOnFreshPort
where
listenOnFreshPort :: IO (PortID,Socket)
listenOnFreshPort = do
s <- listenOn (PortNumber aNY_PORT)
p <- Network.socketPort s
return (p,s)
external_d_C_prim_socketAccept :: C_Socket
-> Cover -> ConstStore -> Curry_Prelude.C_IO (Curry_Prelude.OP_Tuple2 Curry_Prelude.C_String Curry_IO.C_Handle)
external_d_C_prim_socketAccept socket _ _ =
toCurry (\s -> Network.accept s >>= \ (h,s,_) -> return (s,OneHandle h)) socket
external_d_C_prim_waitForSocketAccept :: C_Socket -> Curry_Prelude.C_Int
-> Cover -> ConstStore -> Curry_Prelude.C_IO (Curry_Prelude.C_Maybe (Curry_Prelude.OP_Tuple2 (Curry_Prelude.OP_List Curry_Prelude.C_Char) Curry_IO.C_Handle))
external_d_C_prim_waitForSocketAccept s i _ _ = toCurry wait s i
wait :: Socket -> Int -> IO (Maybe (String, CurryHandle))
wait s t =
if t < 0
then Network.accept s >>= \ (h, s, _) -> return (Just (s, OneHandle h))
else do
mv <- newEmptyMVar
tacc <- forkIO (Network.accept s >>= \ (h, s, _) ->
putMVar mv (Just (s, OneHandle h)))
ttim <- forkIO (delay ((fromIntegral t :: Integer) * 1000)
>> putMVar mv Nothing)
res <- takeMVar mv
maybe (killThread tacc) (\_ -> killThread ttim) res
return res
-- Like 'threadDelay', but not bounded by an 'Int'
delay :: Integer -> IO ()
delay time = do
let maxWait = min time $ toInteger (maxBound :: Int)
threadDelay $ fromInteger maxWait
when (maxWait /= time) $ delay (time - maxWait)
external_d_C_prim_sClose :: C_Socket -> Cover -> ConstStore -> Curry_Prelude.C_IO Curry_Prelude.OP_Unit
external_d_C_prim_sClose s _ _ = toCurry sClose s
external_d_C_prim_connectToSocket :: Curry_Prelude.C_String -> Curry_Prelude.C_Int
-> Cover -> ConstStore -> Curry_Prelude.C_IO Curry_IO.C_Handle
external_d_C_prim_connectToSocket str i _ _ =
toCurry (\ s i -> connectTo s i >>= return . OneHandle) str i
<?xml version="1.0" standalone="no"?>
<!DOCTYPE primitives SYSTEM "http://www.informatik.uni-kiel.de/~pakcs/primitives.dtd">
<primitives>
<primitive name="prim_listenOn" arity="1">
<library>prim_socket</library>
<entry>prim_listenOn</entry>
</primitive>
<primitive name="listenOnFresh" arity="0">
<library>prim_socket</library>
<entry>prim_listenOnFresh</entry>
</primitive>
<primitive name="prim_socketAccept" arity="1">
<library>prim_socket</library>
<entry>prim_socketAccept</entry>
</primitive>
<primitive name="prim_waitForSocketAccept" arity="2">
<library>prim_socket</library>
<entry>prim_waitForSocketAccept</entry>
</primitive>
<primitive name="prim_sClose" arity="1">
<library>prim_socket</library>
<entry>prim_sClose</entry>
</primitive>
<primitive name="prim_connectToSocket" arity="2">
<library>prim_socket</library>
<entry>prim_connectToSocket</entry>
</primitive>
</primitives>
Markdown is supported