Commit d8ef9ec6 authored by Michael Hanus 's avatar Michael Hanus
Browse files

Base packaged synched to libs of PAKCS 2.1.0

parent a8e67a81
------------------------------------------------------------------------------
--- Implementation of a Curry Port Name Server based on raw sockets.
--- It is used to implement the library Ports for distributed programming
--- with ports.
---
--- @author Michael Hanus
--- @version February 2017
--- @category web
------------------------------------------------------------------------------
module CPNS(registerPort,getPortInfo,unregisterPort,
cpnsStart,cpnsStop,cpnsShow,cpnsAlive,main) where
import Char
import Distribution(installDir)
import FilePath((</>))
import IO
import List(delete,intersperse)
import Profile
import ReadShowTerm
import Socket
import System
import Time
-- If we connect to a port with symbolic name pn, we first connect
-- to the CPNS of the host named by pn to get the physical socket
-- number of this port. In order to connect to CPNS from any
-- machine in the world, the CPNS demon always listens at the following
-- port:
-- (Note that this must be identical for all machines running
-- Distributed Curry! If this port is occupied by another process
-- on a host, you cannot run Distributed Curry on it.)
-- The standard port number of CPNS demon.
cpnsSocket :: Int
cpnsSocket = 8767
-- The time out before considering the server as unreachable:
cpnsTimeOut :: Int
cpnsTimeOut = 3000
--- Type of messages to be processed by the Curry Port Name Server.
---
--- @cons Register name pid sn pn ack
--- - assign the values pid, sn, and pn to name
--- (pid is the process number of the registered process
--- (should be 0 if it is unknown); the server returns True
--- if registration had no problems, otherwise False)
--- @cons GetRegister name - request for a registered port name;
--- the server returns the values (sn,pn) that are assigned to the
--- port name
--- @cons Unregister name - request to remove a registered port name
--- @cons ShowRegistry - show the current port registrations
--- @cons Ping - ping the CPNS demon for liveness check
--- @cons Terminate - terminate the CPNS demon
data CPNSMessage = Terminate
| Ping
| Register String Int Int Int
| GetRegister String
| Unregister String
| ShowRegistry
-- The lock file to coordinate the startup of the CPNS demon:
cpnsStartupLockfile :: String
cpnsStartupLockfile = "/tmp/CurryPNSD.lock"
--- Starts the "Curry Port Name Server" (CPNS) running on the local machine.
--- The CPNS is responsible to resolve symbolic names for ports
--- into physical socket numbers so that a port can be reached
--- under its symbolic name from any machine in the world.
cpnsStart :: IO ()
cpnsStart = catch startup
(\_ -> putStrLn "FAILURE occurred during startup!" >>
deleteStartupLockfile >>
return Nothing) >>=
maybe done (cpnsServer [])
where
deleteStartupLockfile = do
putStrLn $ "Removing startup lock file \""++cpnsStartupLockfile++"\"..."
system $ "rm -f "++cpnsStartupLockfile
done
startup = do
putStrLn $ "Starting Curry Port Name Server on port " ++
show cpnsSocket ++ "..."
socket <- listenOn cpnsSocket
deleteStartupLockfile
pid <- getPID
putStrLn $ "Curry Port Name Server is ready (PID: "++show pid++")."
return (Just socket)
--- The main loop of the CPNS demon
cpnsServer :: [(String,Int,Int,Int)] -> Socket -> IO ()
cpnsServer regs socket = do
(chost,stream) <- socketAccept socket
--putStrLn $ "Connection from "++chost
serveRequest chost stream
where
doIfLocalHost rhost action = do
hostname <- getHostname
if rhost `elem` ["localhost","localhost.localdomain",hostname]
|| take 8 rhost == "127.0.0."
then action
else do putStrLn $ "Illegal request received from host: " ++ rhost
cpnsServer regs socket
serveRequest rhost h = do
msg <- readMsgLine h
either
(\line -> do putStrLn $ "ERROR: Illegal message:\n" ++ line
cpnsServer regs socket )
(\m -> case m of
Terminate -> doIfLocalHost rhost $ do
hClose h
putStrLn "CPNS demon terminated."
Ping -> do
hPutStrLn h (showQTerm ())
hClose h
cpnsServer regs socket
Register pname pid sn pn -> doIfLocalHost rhost $ do
(ack, newregs) <- tryRegisterPortName regs pname pid sn pn
hPutStrLn h (showQTerm ack)
hClose h
cpnsServer newregs socket
GetRegister pname -> do
--putStrLn $ "Request for port name: " ++ pname
(newregs,pinfo) <- getRegisteredPortName regs pname
hPutStrLn h (showQTerm pinfo)
hClose h
cpnsServer newregs socket
Unregister pname -> doIfLocalHost rhost $ do
newregs <- unregisterPortName regs pname
hClose h
cpnsServer newregs socket
ShowRegistry -> doIfLocalHost rhost $ do
putStrLn "Currently registered port names:"
newregs <- showAndCleanRegs regs
hFlush stdout
hClose h
cpnsServer newregs socket )
msg
tryRegisterPortName :: [(String,Int,Int,Int)] -> String -> Int -> Int -> Int
-> IO (Bool, [(String, Int, Int, Int)])
tryRegisterPortName regs name pid sn pn = do
let nameregs = filter (\(n,_,_,_)->name==n) regs
ack <- if null nameregs
then return True
else let (_,pid',_,_) = head nameregs in
if pid'>0 && pid'/=pid
-- we allow registration from the same process
then doesProcessExists pid' >>= \pex -> return (not pex)
else return True
ctime <- getLocalTime
putStrLn $ "Register port \""++name++"\": pid "++show pid++
" / socket "++show sn++
" / number "++show pn ++ " at " ++ calendarTimeToString ctime
let newregs = (name,pid,sn,pn) : filter (\ (n,_,_,_)->name/=n) regs
printMemInfo newregs
hFlush stdout
return (ack, newregs)
-- Delete all registrations for a given port name:
unregisterPortName :: [(String,Int,Int,Int)] -> String
-> IO [(String,Int,Int,Int)]
unregisterPortName regs name = do
ctime <- getLocalTime
putStrLn $ "Unregister port \""++name++"\" at "++calendarTimeToString ctime
let newregs = filter (\ (n,_,_,_)->name/=n) regs
printMemInfo newregs
hFlush stdout
return newregs
-- Get the socket number for a registered port name
-- (or (0,0) if not registered).
-- In addition, a new registration list is returned where a registration
-- is deleted if the corresponding server process does not exist.
getRegisteredPortName :: [(String,Int,Int,Int)] -> String
-> IO ([(String,Int,Int,Int)],(Int,Int))
getRegisteredPortName regs pname =
let nameregs = filter (\(n,_,_,_)->pname==n) regs in
if null nameregs
then return (regs,(0,0))
else let (_,pid,sn,pn) = head nameregs in
if pid>0
then doesProcessExists pid >>= \pex ->
if pex
then return (regs,(sn,pn))
else --putStrLn ("WARNING: Process "++show pid++" not running!") >>
return (delete (head nameregs) regs, (0,0))
else return (regs,(sn,pn))
-- Show all registered ports and return a new registration list
-- where a registration is deleted if the corresponding server process
-- does not exist.
showAndCleanRegs :: [(String,Int,Int,Int)] -> IO [(String,Int,Int,Int)]
showAndCleanRegs regs = do
newreglist <- mapIO checkAndShow regs
return (concat newreglist)
where
checkAndShow reg@(n,pid,sn,pn) = do
pidexist <- doesProcessExists pid
if pidexist
then do putStrLn $ n++": pid "++show pid++
" / socket "++show sn++" / number "++show pn
return [reg]
else return []
-- Print status information of current CPNS demon process:
printMemInfo :: [(String,Int,Int,Int)] -> IO ()
printMemInfo regs = do
pinfos <- getProcessInfos
putStrLn ("NumRegs: " ++ show (length regs) ++ ", " ++ showMemInfo pinfos)
-- test whether a process with a given pid is running:
doesProcessExists :: Int -> IO Bool
doesProcessExists pid = do
status <- system("test -z \"`ps -p "++show pid++" | fgrep "++show pid++"`\"")
return (status>0)
-- Read a line from a stream and check syntactical correctness of message:
readMsgLine :: Handle -> IO (Either String a)
readMsgLine handle = do
line <- hGetLine handle
case readsQTerm line of
[(msg,rem)] -> return (if all isSpace rem then Right msg else Left line)
_ -> return (Left line)
-- Perform an action if the CPNS demon at a given host is alive:
doIfAlive :: String -> IO a -> IO a
doIfAlive host action = do
alive <- cpnsAlive cpnsTimeOut host
if not alive
then error $ "Curry port name server at host \""++host++
"\" is not reachable (timeout after "++show cpnsTimeOut++
" ms)!"
else action
sendToLocalCPNS :: CPNSMessage -> IO ()
sendToLocalCPNS msg = doIfAlive "localhost" $ do
h <- connectToSocket "localhost" cpnsSocket
hPutStrLn h (showQTerm msg)
hClose h
--- Shows all registered ports at the local CPNS demon (in its logfile).
cpnsShow :: IO ()
cpnsShow = sendToLocalCPNS ShowRegistry
--- Terminates the local CPNS demon
cpnsStop :: IO ()
cpnsStop = sendToLocalCPNS Terminate
--- Gets an answer from a Curry port name server on a host,
--- or reports an error.
cpnsTryGetAnswer :: String -> CPNSMessage -> IO _
cpnsTryGetAnswer host msg = catch tryGetAnswer connectError
where
tryGetAnswer = do
h <- connectToSocket host cpnsSocket
hPutStrLn h (showQTerm msg)
hFlush h
ready <- hWaitForInput h cpnsTimeOut
if ready
then do
answer <- readMsgLine h
hClose h
either (\line -> error $ "cpnsTryGetAnswer: Illegal answer: " ++ line)
return
answer
else failed
connectError _ = error $ "Curry port name server at host \""++host++
"\" is not reachable!"
--- Registers a symbolic port at the local host.
registerPort :: String -> Int -> Int -> IO ()
registerPort pname sn pn = do
startCPNSDIfNecessary
pid <- getPID
ack <- cpnsTryGetAnswer "localhost" (Register pname pid sn pn)
if ack then done
else putStrLn ("WARNING: Port name '"++pname++"' already registered!")
--- Gets the information about a symbolic port at some host.
getPortInfo :: String -> String -> IO (Int,Int)
getPortInfo pname host = cpnsTryGetAnswer host (GetRegister pname)
--- Unregisters a symbolic port at the local host.
unregisterPort :: String -> IO ()
unregisterPort pname = sendToLocalCPNS (Unregister pname)
--- Tests whether the CPNS demon at a host is alive.
cpnsAlive :: Int -> String -> IO Bool
cpnsAlive timeout host = catch tryPingCPNS (\_ -> return False)
where
tryPingCPNS = do
h <- connectToSocket host cpnsSocket
hPutStrLn h (showQTerm Ping)
hFlush h
answer <- hWaitForInput h timeout
hClose h
return answer
--- Starts the CPNS demon at localhost if it is not already running:
startCPNSDIfNecessary :: IO ()
startCPNSDIfNecessary = do
system $ installDir </> "currytools" </> "cpns" </> "start"
done
--- Main function for CPNS demon. Check arguments and execute command.
main :: IO ()
main = do
args <- getArgs
case args of
["start"] -> cpnsStart
["stop"] -> cpnsStop
["show"] -> cpnsShow
_ -> putStrLn $ "ERROR: Illegal arguments: " ++
concat (intersperse " " args) ++ "\n" ++
"Allowed arguments: start|stop|show"
{-
Test with PAKCS:
:fork cpnsStart
registerPort "xxx" 42 2
getPortInfo "xxx" "localhost"
cpnsStop
-}
This diff is collapsed.
------------------------------------------------------------------------------
--- Library to support network programming with sockets that are addressed
--- by symbolic names. In contrast to raw sockets (see library
--- <code>Socket</code>), this library uses the Curry Port Name Server
--- to provide sockets that are addressed by symbolic names
--- rather than numbers.
---
--- In standard applications, the server side uses the operations
--- <code>listenOn</code> and <code>socketAccept</code> to provide some service
--- on a named socket, and the client side uses the operation
--- <code>connectToSocket</code> to request a service.
---
--- @author Michael Hanus
--- @version February 2008
--- @category general
------------------------------------------------------------------------------
module NamedSocket(Socket,
listenOn, socketAccept, waitForSocketAccept,
connectToSocketRepeat, connectToSocketWait,
sClose, socketName, connectToSocket)
where
import System
import IO(Handle)
import qualified Socket
import CPNS
---------------------------------------------------------------------
-- Server side operations:
--- Abstract type for named sockets.
data Socket = NamedSocket String Socket.Socket
--- Creates a server side socket with a symbolic name.
listenOn :: String -> IO Socket
listenOn socketname = do
(port,socket) <- Socket.listenOnFresh
registerPort socketname port 0
return (NamedSocket socketname socket)
--- Returns a connection of a client to a socket.
--- The connection is returned as a pair consisting of a string identifying
--- the client (the format of this string is implementation-dependent)
--- and a handle to a stream communication with the client.
--- The handle is both readable and writable.
socketAccept :: Socket -> IO (String,Handle)
socketAccept (NamedSocket _ socket) = Socket.socketAccept socket
--- Waits until a connection of a client to a socket is available.
--- If no connection is available within the time limit, it returns Nothing,
--- otherwise the connection is returned as a pair consisting
--- of a string identifying the client
--- (the format of this string is implementation-dependent)
--- and a handle to a stream communication with the client.
--- @param socket - a socket
--- @param timeout - milliseconds to wait for input (< 0 : no time out)
waitForSocketAccept :: Socket -> Int -> IO (Maybe (String,Handle))
waitForSocketAccept (NamedSocket _ socket) = Socket.waitForSocketAccept socket
--- Closes a server socket.
sClose :: Socket -> IO ()
sClose (NamedSocket socketname socket) = do
Socket.sClose socket
unregisterPort socketname
--- Returns a the symbolic name of a named socket.
socketName :: Socket -> String
socketName (NamedSocket socketname _) = socketname
---------------------------------------------------------------------
-- Client side operations:
--- Waits for connection to a Unix socket with a symbolic name.
--- In contrast to <code>connectToSocket</code>, this action waits until
--- the socket has been registered with its symbolic name.
--- @param waittime - the time to wait before retrying (in milliseconds)
--- @param action - I/O action to be executed before each wait cycle
--- @param retries - number of retries before giving up (-1 = retry forever)
--- @param nameAtHost - the symbolic name of the socket
--- (must be either of the form "name@host" or "name"
--- where the latter is a shorthand for "name@localhost")
--- @return Nothing (if connection is not possible within the given limits)
--- or (Just h) where h is the handle of the connection
connectToSocketRepeat :: Int -> IO _ -> Int -> String -> IO (Maybe Handle)
connectToSocketRepeat waittime action retries nameAtHost = do
let (name,atHost) = break (=='@') nameAtHost
host = if atHost=="" then "localhost" else tail atHost
-- check whether remote CPNS demon is alive:
alive <- cpnsAlive waittime host
if not alive
then tryAgain
else do -- get remote socket/port numbers:
(snr,_) <- getPortInfo name host
if snr==0
then tryAgain
else Socket.connectToSocket host snr >>= return . Just
where
tryAgain = if retries==0 then return Nothing else do
action
sleep (ms2s waittime)
connectToSocketRepeat waittime action (decr retries) nameAtHost
ms2s n = let mn = n `div` 1000 in if mn==0 then 1 else mn
decr n = if n<0 then n else n-1
--- Waits for connection to a Unix socket with a symbolic name and
--- return the handle of the connection.
--- This action waits (possibly forever) until the socket with the symbolic
--- name is registered.
--- @param nameAtHost - the symbolic name of the socket
--- (must be either of the form "name@host" or "name"
--- where the latter is a shorthand for "name@localhost")
--- @return the handle of the connection (connected to the socket nameAtHost)
--- which is both readable and writable
connectToSocketWait :: String -> IO Handle
connectToSocketWait nameAtHost = do
Just hdl <- connectToSocketRepeat 1000 done (-1) nameAtHost
return hdl
--- Creates a new connection to an existing(!) Unix socket with a symbolic
--- name. If the symbolic name is not registered, an error is reported.
--- @param nameAtHost - the symbolic name of the socket
--- (must be either of the form "name@host" or "name"
--- where the latter is a shorthand for "name@localhost")
--- @return the handle of the stream (connected to the socket nameAtHost)
--- which is both readable and writable
connectToSocket :: String -> IO Handle
connectToSocket nameAtHost = do
let (name,atHost) = break (=='@') nameAtHost
host = if atHost=="" then "localhost" else tail atHost
-- get remote port number:
(snr,_) <- getPortInfo name host
if snr==0
then error ("connectToSocket: Socket \""++name++"@"++host++
"\" is not registered!")
else done
Socket.connectToSocket host snr
---------------------------------------------------------------------
------------------------------------------------------------------------------
--- A library to read and update files containing properties in the usual
--- equational syntax, i.e., a property is defined by a line of the form
--- `prop=value` where `prop` starts with a letter.
--- All other lines (e.g., blank lines or lines starting with `#` are
--- considered as comment lines and are ignored.
---
--- @author Michael Hanus
--- @version August 2006
--- @category general
------------------------------------------------------------------------------
module PropertyFile(readPropertyFile,updatePropertyFile) where
import Directory
import IOExts
import Char
--- Reads a property file and returns the list of properties.
--- Returns empty list if the property file does not exist.
readPropertyFile :: String -> IO [(String,String)]
readPropertyFile file = do
pfexists <- doesFileExist file
if pfexists
then do rcs <- readCompleteFile file -- to avoid open file handles
return $ splitEqs . filter (\l->not (null l) && isAlpha (head l))
. lines $ rcs
else return []
where
splitEqs [] = []
splitEqs (eq:eqs) = case break (=='=') eq of
(prop,_:val) -> (prop,val) : splitEqs eqs
_ -> splitEqs eqs
--- Update a property in a property file or add it, if it is not already
--- there.
--- @param file - the name of the property file
--- @param pname - the name of the property
--- @param pvalue - the new value of the property
updatePropertyFile :: String -> String -> String -> IO ()
updatePropertyFile file pname pval = do
props <- readPropertyFile file
if lookup pname props == Nothing
then appendFile file (pname++"="++pval++"\n")
else changePropertyInFile file pname pval
--- Change a property in a property file.
changePropertyInFile :: String -> String -> String -> IO ()
changePropertyInFile file pname pval = do
updateFile (\rcs -> unlines . map changeProp . lines $ rcs) file
where
changeProp l = let (s1,s2) = break (=='=') l
in if null l || not (isAlpha (head l)) || null s2
then l
else if s1==pname then s1++"="++pval else l
--- ----------------------------------------------------------------------------
--- Computing strongly connected components
---
--- Copyright (c) 2000 - 2003, Wolfgang Lux
--- See LICENSE for the full license.
---
--- The function `scc` computes the strongly connected components of a list
--- of entities in two steps. First, the list is topologically sorted
--- "downwards" using the *defines* relation.
--- Then the resulting list is sorted "upwards" using the *uses* relation
--- and partitioned into the connected components. Both relations
--- are computed within this module using the bound and free names of each
--- declaration.
---
--- In order to avoid useless recomputations, the code in the module first
--- decorates the declarations with their bound and free names and a
--- unique number. The latter is only used to provide a trivial ordering
--- so that the declarations can be used as set elements.
---
--- @author Wolfgang Lux
--- @category algorithm
--- ----------------------------------------------------------------------------
module SCC (scc) where
import SetRBT (emptySetRBT, elemRBT, insertRBT)
data Node a b = Node Int [b] [b] a
deriving Eq
cmpNode :: Node a b -> Node a b -> Bool
cmpNode n1 n2 = key n1 < key n2
key :: Node a b -> Int
key (Node k _ _ _) = k
bvs :: Node a b -> [b]
bvs (Node _ bs _ _) = bs
fvs :: Node a b -> [b]
fvs (Node _ _ fs _) = fs
node :: Node a b -> a
node (Node _ _ _ n) = n
--- Computes the strongly connected components of a list
--- of entities. To be flexible, we distinguish the nodes and
--- the entities defined in this node.
---
--- @param defines - maps each node to the entities defined in this node
--- @param uses - maps each node to the entities used in this node
--- @param nodes - the list of nodes which should be sorted into
--- strongly connected components
--- @return the strongly connected components of the list of nodes
scc :: (Eq a, Eq b) =>
(a -> [b]) -- ^ entities defined by node
-> (a -> [b]) -- ^ entities used by node
-> [a] -- ^ list of nodes
-> [[a]] -- ^ strongly connected components
scc bvs' fvs' = map (map node) . tsort' . tsort . zipWith wrap [0 ..]
where wrap i n = Node i (bvs' n) (fvs' n) n
tsort :: (Eq a, Eq b) => [Node a b] -> [Node a b]
tsort xs = snd (dfs xs (emptySetRBT cmpNode) [])
where
dfs [] marks stack = (marks, stack)
dfs (x : xs') marks stack
| x `elemRBT` marks = dfs xs' marks stack
| otherwise = dfs xs' marks' (x : stack')
where
(marks', stack') = dfs (defs x) (x `insertRBT` marks) stack
defs x1 = filter (any (`elem` fvs x1) . bvs) xs
tsort' :: (Eq a, Eq b) => [Node a b] -> [[Node a b]]
tsort' xs = snd (dfs xs (emptySetRBT cmpNode) [])
where
dfs [] marks stack = (marks, stack)
dfs (x : xs') marks stack
| x `elemRBT` marks = dfs xs' marks stack
| otherwise = dfs xs' marks' ((x : concat stack') : stack)
where
(marks', stack') = dfs (uses x) (x `insertRBT` marks) []
uses x1 = filter (any (`elem` bvs x1) . fvs) xs
------------------------------------------------------------------------------
--- Library to support network programming with sockets.
--- In standard applications, the server side uses the operations
--- <code>listenOn</code> and <code>socketAccept</code> to provide some service
--- on a socket, and the client side uses the operation
--- <code>connectToSocket</code> to request a service.
---
--- @author Michael Hanus
--- @version February 2008
--- @category general
------------------------------------------------------------------------------
module Socket(Socket, listenOn, listenOnFresh,
socketAccept, waitForSocketAccept, sClose, connectToSocket)
where
import System
import IO(Handle)
--- The abstract type of sockets.
external data Socket
---------------------------------------------------------------------
-- Server side operations:
--- Creates a server side socket bound to a given port number.
listenOn :: Int -> IO Socket
listenOn port = prim_listenOn $# port
prim_listenOn :: Int -> IO Socket
prim_listenOn external
--- Creates a server side socket bound to a free port.
--- The port number and the socket is returned.