Merge branch 'release/next-vere' into king-natpmp

This commit is contained in:
Elliot Glaysher 2020-08-14 16:17:47 -04:00 committed by GitHub
commit 1b3a69e24d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 857 additions and 209 deletions

View File

@ -3,7 +3,7 @@ source $setup
tar -xf $src tar -xf $src
cd libsigsegv-$version cd libsigsegv-$version
patch -p1 << 'HEREDOC' patch -p1 << 'PATCH_I386'
--- a/src/fault-linux-i386.h 2020-06-25 23:46:02.099235491 +0000 --- a/src/fault-linux-i386.h 2020-06-25 23:46:02.099235491 +0000
+++ b/src/fault-linux-i386.h 2020-06-25 23:45:48.679156892 +0000 +++ b/src/fault-linux-i386.h 2020-06-25 23:45:48.679156892 +0000
@@ -18,6 +18,7 @@ @@ -18,6 +18,7 @@
@ -14,12 +14,32 @@ patch -p1 << 'HEREDOC'
#if defined __x86_64__ #if defined __x86_64__
/* 64 bit registers */ /* 64 bit registers */
HEREDOC PATCH_I386
patch -p1 << 'PATCH_ARM'
--- a/src/fault-linux-arm.h
+++ b/src/fault-linux-arm.h
@@ -17,6 +17,7 @@
#include "fault-posix-ucontext.h"
+#define HAVE_STACKVMA 0
#if defined(__aarch64__) || defined(__ARM_64BIT_STATE) || defined(__ARM_PCS_AAPCS64) /* 64-bit */
/* See glibc/sysdeps/unix/sysv/linux/aarch64/sys/ucontext.h.
PATCH_ARM
cd .. cd ..
mkdir build mkdir build
cd build cd build
# Hack
if [ $host = aarch64-linux-musleabi ]
then
sed -i 's/^CFG_FAULT=$/CFG_FAULT=fault-linux-arm.h/' \
../libsigsegv-$version/configure
fi
../libsigsegv-$version/configure \ ../libsigsegv-$version/configure \
--host=$host \ --host=$host \
--prefix=$out \ --prefix=$out \

View File

@ -8,3 +8,13 @@
#if defined __x86_64__ #if defined __x86_64__
/* 64 bit registers */ /* 64 bit registers */
--- a/src/fault-linux-arm.h
+++ b/src/fault-linux-arm.h
@@ -17,6 +17,7 @@
#include "fault-posix-ucontext.h"
+#define HAVE_STACKVMA 0
#if defined(__aarch64__) || defined(__ARM_64BIT_STATE) || defined(__ARM_PCS_AAPCS64) /* 64-bit */
/* See glibc/sysdeps/unix/sysv/linux/aarch64/sys/ucontext.h.

View File

@ -63,12 +63,6 @@ Polish:
changed too quickly. changed too quickly.
# Finding the Serf Executable
- [ ] Right now, `urbit-worker` is found by looking it up in the PATH. This
is wrong, but what is right?
# Take Advantage of New IPC Features # Take Advantage of New IPC Features
- [ ] Hook up `scry` to drivers. - [ ] Hook up `scry` to drivers.

View File

@ -14,7 +14,7 @@ import qualified Urbit.Vere.Serf as Serf
data PierConfig = PierConfig data PierConfig = PierConfig
{ _pcPierPath :: FilePath { _pcPierPath :: FilePath
, _pcDryRun :: Bool , _pcDryRun :: Bool
, _pcSerfExe :: Text , _pcSerfExe :: Maybe Text
, _pcSerfFlags :: [Serf.Flag] , _pcSerfFlags :: [Serf.Flag]
} deriving (Show) } deriving (Show)

View File

@ -146,7 +146,7 @@ toPierConfig pierPath o@(CLI.Opts{..}) = PierConfig { .. }
where where
_pcPierPath = pierPath _pcPierPath = pierPath
_pcDryRun = oDryRun || isJust oDryFrom _pcDryRun = oDryRun || isJust oDryFrom
_pcSerfExe = fromMaybe "urbit-worker" oSerfExe _pcSerfExe = oSerfExe
_pcSerfFlags = toSerfFlags o _pcSerfFlags = toSerfFlags o
toNetworkConfig :: CLI.Opts -> NetworkConfig toNetworkConfig :: CLI.Opts -> NetworkConfig

View File

@ -233,7 +233,7 @@ ames env who isFake enqueueEv stderr = (initialEvents, runAmes)
NewtEfSend (_id, ()) dest (MkBytes bs) -> do NewtEfSend (_id, ()) dest (MkBytes bs) -> do
atomically (readTVar aTurfs) >>= \case atomically (readTVar aTurfs) >>= \case
Nothing -> pure () Nothing -> stderr "ames: send before turfs" >> pure ()
Just turfs -> sendPacket drv mode dest bs Just turfs -> sendPacket drv mode dest bs
sendPacket :: AmesDrv -> NetworkMode -> AmesDest -> ByteString -> RIO e () sendPacket :: AmesDrv -> NetworkMode -> AmesDest -> ByteString -> RIO e ()

View File

@ -18,6 +18,8 @@ import Network.Ethereum.Api.Types hiding (blockNumber)
import Network.Ethereum.Web3 import Network.Ethereum.Web3
import Network.HTTP.Client.TLS import Network.HTTP.Client.TLS
import Data.Solidity.Prim.Address (fromHexString)
import qualified Crypto.Hash.SHA256 as SHA256 import qualified Crypto.Hash.SHA256 as SHA256
import qualified Crypto.Hash.SHA512 as SHA512 import qualified Crypto.Hash.SHA512 as SHA512
import qualified Crypto.Sign.Ed25519 as Ed import qualified Crypto.Sign.Ed25519 as Ed
@ -25,14 +27,13 @@ import qualified Data.Binary as B
import qualified Data.ByteArray as BA import qualified Data.ByteArray as BA
import qualified Data.ByteString as BS import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as C import qualified Data.ByteString.Char8 as C
import qualified Network.Ethereum.Ens as Ens -- import qualified Network.Ethereum.Ens as Ens
import qualified Network.HTTP.Client as C import qualified Network.HTTP.Client as C
import qualified Urbit.Azimuth as AZ import qualified Urbit.Azimuth as AZ
import qualified Urbit.Ob as Ob import qualified Urbit.Ob as Ob
-- During boot, use the infura provider -- During boot, use the infura provider
provider = HttpProvider provider = HttpProvider "http://eth-mainnet.urbit.org:8545"
"https://mainnet.infura.io/v3/196a7f37c7d54211b4a07904ec73ad87"
-- Conversion Utilities -------------------------------------------------------- -- Conversion Utilities --------------------------------------------------------
@ -224,8 +225,16 @@ dawnVent dSeed@(Seed ship life ring oaf) = do
block <- blockNumber block <- blockNumber
putStrLn ("boot: ethereum block #" ++ tshow block) putStrLn ("boot: ethereum block #" ++ tshow block)
putStrLn "boot: retrieving azimuth contract" -- TODO: Eventually look up the contract from ENS. Right now, our infura
azimuth <- withAccount () $ Ens.resolve "azimuth.eth" -- node is filtering everything except for a very small set of contracts,
-- so just hard code the address.
--
-- putStrLn "boot: retrieving azimuth contract"
-- azimuth <- withAccount () $ Ens.resolve "azimuth.eth"
let azimuthAddr = "0x223c067f8cf28ae173ee5cafea60ca44c335fecb"
let azimuth = case fromHexString azimuthAddr of
Left _ -> error "Impossible"
Right x -> x
immediateSponsor <- validateShipAndGetImmediateSponsor block azimuth dSeed immediateSponsor <- validateShipAndGetImmediateSponsor block azimuth dSeed
dSponsor <- getSponsorshipChain block azimuth immediateSponsor dSponsor <- getSponsorshipChain block azimuth immediateSponsor

View File

@ -175,8 +175,9 @@ startServ
-> Bool -> Bool
-> HttpServerConf -> HttpServerConf
-> (EvErr -> STM ()) -> (EvErr -> STM ())
-> (Text -> RIO e ())
-> RIO e Serv -> RIO e Serv
startServ who isFake conf plan = do startServ who isFake conf plan stderr = do
logDebug (displayShow ("EYRE", "startServ")) logDebug (displayShow ("EYRE", "startServ"))
multi <- view multiEyreApiL multi <- view multiEyreApiL
@ -271,6 +272,10 @@ startServ who isFake conf plan = do
fil = pierPath <> "/.http.ports" fil = pierPath <> "/.http.ports"
logDebug $ displayShow ("EYRE", "All Servers Started.", srvId, por, fil) logDebug $ displayShow ("EYRE", "All Servers Started.", srvId, por, fil)
for secPor $ \p ->
stderr ("http: secure web interface live on https://localhost:" <> tshow p)
stderr ("http: web interface live on http://localhost:" <> tshow insPor)
stderr ("http: loopback live on http://localhost:" <> tshow lopPor)
pure (Serv srvId conf lop ins mSec por fil vLive) pure (Serv srvId conf lop ins mSec por fil vLive)
@ -285,12 +290,15 @@ eyre'
:: (HasPierEnv e, HasMultiEyreApi e) :: (HasPierEnv e, HasMultiEyreApi e)
=> Ship => Ship
-> Bool -> Bool
-> (Text -> RIO e ())
-> RIO e ([Ev], RAcquire e (DriverApi HttpServerEf)) -> RIO e ([Ev], RAcquire e (DriverApi HttpServerEf))
eyre' who isFake = do
eyre' who isFake stderr = do
ventQ :: TQueue EvErr <- newTQueueIO ventQ :: TQueue EvErr <- newTQueueIO
env <- ask env <- ask
let (bornEvs, startDriver) = eyre env who (writeTQueue ventQ) isFake let (bornEvs, startDriver) =
eyre env who (writeTQueue ventQ) isFake stderr
let runDriver = do let runDriver = do
diOnEffect <- startDriver diOnEffect <- startDriver
@ -318,8 +326,9 @@ eyre
-> Ship -> Ship
-> (EvErr -> STM ()) -> (EvErr -> STM ())
-> Bool -> Bool
-> (Text -> RIO e ())
-> ([Ev], RAcquire e (HttpServerEf -> IO ())) -> ([Ev], RAcquire e (HttpServerEf -> IO ()))
eyre env who plan isFake = (initialEvents, runHttpServer) eyre env who plan isFake stderr = (initialEvents, runHttpServer)
where where
king = fromIntegral (env ^. kingIdL) king = fromIntegral (env ^. kingIdL)
multi = env ^. multiEyreApiL multi = env ^. multiEyreApiL
@ -343,7 +352,7 @@ eyre env who plan isFake = (initialEvents, runHttpServer)
restart :: Drv -> HttpServerConf -> RIO e Serv restart :: Drv -> HttpServerConf -> RIO e Serv
restart (Drv var) conf = do restart (Drv var) conf = do
logDebug "Restarting http server" logDebug "Restarting http server"
let startAct = startServ who isFake conf plan let startAct = startServ who isFake conf plan stderr
res <- fromEither =<< restartService var startAct kill res <- fromEither =<< restartService var startAct kill
logDebug "Done restating http server" logDebug "Done restating http server"
pure res pure res

View File

@ -25,6 +25,8 @@ import Urbit.King.App
import Urbit.Vere.Pier.Types import Urbit.Vere.Pier.Types
import Control.Monad.STM (retry) import Control.Monad.STM (retry)
import System.Environment (getExecutablePath)
import System.FilePath (splitFileName, (</>))
import System.Posix.Files (ownerModes, setFileMode) import System.Posix.Files (ownerModes, setFileMode)
import Urbit.EventLog.LMDB (EventLog) import Urbit.EventLog.LMDB (EventLog)
import Urbit.King.API (TermConn) import Urbit.King.API (TermConn)
@ -121,17 +123,25 @@ runSerf
-> RAcquire e Serf -> RAcquire e Serf
runSerf vSlog pax = do runSerf vSlog pax = do
env <- ask env <- ask
Serf.withSerf (config env) serfProg <- io getSerfProg
Serf.withSerf (config env serfProg)
where where
slog txt = atomically (readTVar vSlog) >>= (\f -> f txt) slog txt = atomically (readTVar vSlog) >>= (\f -> f txt)
config env = Serf.Config config env serfProg = Serf.Config
{ scSerf = env ^. pierConfigL . pcSerfExe . to unpack { scSerf = env ^. pierConfigL . pcSerfExe . to (maybe serfProg unpack)
, scPier = pax , scPier = pax
, scFlag = env ^. pierConfigL . pcSerfFlags , scFlag = env ^. pierConfigL . pcSerfFlags
, scSlog = \(pri, tank) -> printTank slog pri tank , scSlog = \(pri, tank) -> printTank slog pri tank
, scStdr = \txt -> slog (txt <> "\r\n") , scStdr = \txt -> slog (txt <> "\r\n")
, scDead = pure () -- TODO: What can be done? , scDead = pure () -- TODO: What can be done?
} }
getSerfProg :: IO FilePath
getSerfProg = do
(path, filename) <- splitFileName <$> getExecutablePath
pure $ case filename of
"urbit" -> path </> "urbit-worker"
"urbit-king" -> path </> "urbit-worker"
_ -> "urbit-worker"
-- Boot a new ship. ------------------------------------------------------------ -- Boot a new ship. ------------------------------------------------------------
@ -421,7 +431,7 @@ drivers env who isFake plan termSys stderr serfSIGINT = do
(behnBorn, runBehn) <- rio Behn.behn' (behnBorn, runBehn) <- rio Behn.behn'
(termBorn, runTerm) <- rio (Term.term' termSys serfSIGINT) (termBorn, runTerm) <- rio (Term.term' termSys serfSIGINT)
(amesBorn, runAmes) <- rio (Ames.ames' who isFake stderr) (amesBorn, runAmes) <- rio (Ames.ames' who isFake stderr)
(httpBorn, runEyre) <- rio (Eyre.eyre' who isFake) (httpBorn, runEyre) <- rio (Eyre.eyre' who isFake stderr)
(clayBorn, runClay) <- rio Clay.clay' (clayBorn, runClay) <- rio Clay.clay'
(irisBorn, runIris) <- rio Iris.client' (irisBorn, runIris) <- rio Iris.client'

6
pkg/urbit/configure vendored
View File

@ -30,7 +30,7 @@ if [ -n "${HOST-}" ]
then os=$(sed 's$^[^-]*-\([^-]*\)-.*$\1$' <<< "$HOST") then os=$(sed 's$^[^-]*-\([^-]*\)-.*$\1$' <<< "$HOST")
cpu=$(sed 's$-.*$$' <<< ${HOST}) cpu=$(sed 's$-.*$$' <<< ${HOST})
else os=$(uname -s) else os=$(uname -s)
cpu=$(uname -p) cpu=$(uname -m)
fi fi
case $(tr A-Z a-z <<< $cpu) in case $(tr A-Z a-z <<< $cpu) in
@ -46,6 +46,10 @@ case $(tr A-Z a-z <<< $cpu) in
x86_64) x86_64)
defmacro U3_OS_ENDIAN_little 1 defmacro U3_OS_ENDIAN_little 1
;; ;;
aarch64)
defmacro U3_OS_ENDIAN_little 1
defmacro U3_CPU_aarch64 1
;;
*) *)
echo "Unknown or unsupported CPU: '$cpu'" >&2 echo "Unknown or unsupported CPU: '$cpu'" >&2
exit 1 exit 1

View File

@ -26,6 +26,7 @@
# define c3__at c3_s2('a','t') # define c3__at c3_s2('a','t')
# define c3__atom c3_s4('a','t','o','m') # define c3__atom c3_s4('a','t','o','m')
# define c3__auth c3_s4('a','u','t','h') # define c3__auth c3_s4('a','u','t','h')
# define c3__ax c3_s2('a','x')
# define c3__axe c3_s3('a','x','e') # define c3__axe c3_s3('a','x','e')
# define c3__axil c3_s4('a','x','i','l') # define c3__axil c3_s4('a','x','i','l')
# define c3__axis c3_s4('a','x','i','s') # define c3__axis c3_s4('a','x','i','s')

View File

@ -381,6 +381,36 @@
*/ */
typedef void (*u3_peek_cb)(void*, u3_noun); typedef void (*u3_peek_cb)(void*, u3_noun);
/* u3_pico_type: kinds of proto-peek
*/
typedef enum {
u3_pico_full = 0,
u3_pico_mine = 1,
u3_pico_last = 2
} u3_pico_type;
/* u3_pico: proto-peek
*/
typedef struct _u3_pico {
struct _u3_pico* nex_u; // next in queue
void* ptr_v; // context
u3_peek_cb fun_f; // callback
u3_noun gan; // leakset
u3_pico_type typ_e; // type-tagged
union { //
u3_noun ful; // full: /care/beam
struct { // mine:
c3_m car_m; // care
u3_noun pax; // /desk/case/path
} min_u; //
struct { // last:
c3_m car_m; // care
u3_atom des; // desk
u3_noun pax; // /path
} las_u;
};
} u3_pico;
/* u3_peek: namespace read request /* u3_peek: namespace read request
*/ */
typedef struct _u3_peek { typedef struct _u3_peek {
@ -603,6 +633,10 @@
u3_play* pay_u; // recompute u3_play* pay_u; // recompute
u3_work* wok_u; // work u3_work* wok_u; // work
}; };
struct {
u3_pico* ent_u;
u3_pico* ext_u;
} pec_u;
// XX remove // XX remove
c3_s por_s; // UDP port c3_s por_s; // UDP port
u3_save* sav_u; // autosave u3_save* sav_u; // autosave
@ -759,6 +793,16 @@
void void
u3_ovum_free(u3_ovum *egg_u); u3_ovum_free(u3_ovum *egg_u);
/* u3_pico_init(): initialize a scry request struct
*/
u3_pico*
u3_pico_init();
/* u3_pico_free(): dispose a scry request struct
*/
void
u3_pico_free(u3_pico* pic_u);
/* u3_mcut_char(): measure/cut character. /* u3_mcut_char(): measure/cut character.
*/ */
c3_w c3_w
@ -958,35 +1002,10 @@
void void
u3_lord_play(u3_lord* god_u, u3_info fon_u); u3_lord_play(u3_lord* god_u, u3_info fon_u);
/* u3_lord_peek(): read namespace. /* u3_lord_peek(): read namespace, injecting what's missing.
*/ */
void void
u3_lord_peek(u3_lord* god_u, u3_lord_peek(u3_lord* god_u, u3_pico* pic_u);
u3_noun gan,
u3_noun ful,
void* ptr_v,
u3_peek_cb fun_f);
/* u3_lord_peek_mine(): read namespace, injecting ship.
*/
void
u3_lord_peek_mine(u3_lord* god_u,
u3_noun gan,
c3_m car_m,
u3_noun pax,
void* ptr_v,
u3_peek_cb fun_f);
/* u3_lord_peek_last(): read namespace, injecting ship and case.
*/
void
u3_lord_peek_last(u3_lord* god_u,
u3_noun gan,
c3_m car_m,
u3_atom des,
u3_noun pax,
void* ptr_v,
u3_peek_cb fun_f);
/** Filesystem (new api). /** Filesystem (new api).
**/ **/
@ -1199,6 +1218,38 @@
void void
u3_newt_mojo_stop(u3_mojo* moj_u, u3_moor_bail bal_f); u3_newt_mojo_stop(u3_mojo* moj_u, u3_moor_bail bal_f);
/** Pier scries.
**/
/* u3_pier_peek(): read namespace.
*/
void
u3_pier_peek(u3_pier* pir_u,
u3_noun gan,
u3_noun ful,
void* ptr_v,
u3_peek_cb fun_f);
/* u3_pier_peek_mine(): read namespace, injecting ship.
*/
void
u3_pier_peek_mine(u3_pier* pir_u,
u3_noun gan,
c3_m car_m,
u3_noun pax,
void* ptr_v,
u3_peek_cb fun_f);
/* u3_pier_peek_last(): read namespace, injecting ship and case.
*/
void
u3_pier_peek_last(u3_pier* pir_u,
u3_noun gan,
c3_m car_m,
u3_atom des,
u3_noun pax,
void* ptr_v,
u3_peek_cb fun_f);
/** Pier control. /** Pier control.
**/ **/
/* u3_pier_exit(): trigger a gentle shutdown. /* u3_pier_exit(): trigger a gentle shutdown.

View File

@ -38,7 +38,7 @@ u3_lmdb_init(const c3_c* pax_c, size_t siz_i)
c3_w ret_w; c3_w ret_w;
if ( (ret_w = mdb_env_create(&env_u)) ) { if ( (ret_w = mdb_env_create(&env_u)) ) {
fprintf(stderr, "lmdb: init fail: %s\n", mdb_strerror(ret_w)); fprintf(stderr, "lmdb: init fail: %s\r\n", mdb_strerror(ret_w));
return 0; return 0;
} }
@ -61,7 +61,7 @@ u3_lmdb_init(const c3_c* pax_c, size_t siz_i)
} }
if ( (ret_w = mdb_env_open(env_u, pax_c, 0, 0664)) ) { if ( (ret_w = mdb_env_open(env_u, pax_c, 0, 0664)) ) {
fprintf(stderr, "lmdb: failed to open event log: %s\n", fprintf(stderr, "lmdb: failed to open event log: %s\r\n",
mdb_strerror(ret_w)); mdb_strerror(ret_w));
// XX dispose env_u // XX dispose env_u
// //
@ -93,7 +93,7 @@ u3_lmdb_gulf(MDB_env* env_u, c3_d* low_d, c3_d* hig_d)
// XX why no MDB_RDONLY? // XX why no MDB_RDONLY?
// //
if ( (ret_w = mdb_txn_begin(env_u, 0, 0, &txn_u)) ) { if ( (ret_w = mdb_txn_begin(env_u, 0, 0, &txn_u)) ) {
fprintf(stderr, "lmdb: gulf: txn_begin fail: %s\n", mdb_strerror(ret_w)); fprintf(stderr, "lmdb: gulf: txn_begin fail: %s\r\n", mdb_strerror(ret_w));
return c3n; return c3n;
} }
@ -103,7 +103,7 @@ u3_lmdb_gulf(MDB_env* env_u, c3_d* low_d, c3_d* hig_d)
c3_w ops_w = MDB_CREATE | MDB_INTEGERKEY; c3_w ops_w = MDB_CREATE | MDB_INTEGERKEY;
if ( (ret_w = mdb_dbi_open(txn_u, "EVENTS", ops_w, &mdb_u)) ) { if ( (ret_w = mdb_dbi_open(txn_u, "EVENTS", ops_w, &mdb_u)) ) {
fprintf(stderr, "lmdb: gulf: dbi_open fail: %s\n", mdb_strerror(ret_w)); fprintf(stderr, "lmdb: gulf: dbi_open fail: %s\r\n", mdb_strerror(ret_w));
// XX confirm // XX confirm
// //
mdb_txn_abort(txn_u); mdb_txn_abort(txn_u);
@ -120,7 +120,7 @@ u3_lmdb_gulf(MDB_env* env_u, c3_d* low_d, c3_d* hig_d)
// creates a cursor to point to the last event // creates a cursor to point to the last event
// //
if ( (ret_w = mdb_cursor_open(txn_u, mdb_u, &cur_u)) ) { if ( (ret_w = mdb_cursor_open(txn_u, mdb_u, &cur_u)) ) {
fprintf(stderr, "lmdb: gulf: cursor_open fail: %s\n", fprintf(stderr, "lmdb: gulf: cursor_open fail: %s\r\n",
mdb_strerror(ret_w)); mdb_strerror(ret_w));
// XX confirm // XX confirm
// //
@ -140,7 +140,7 @@ u3_lmdb_gulf(MDB_env* env_u, c3_d* low_d, c3_d* hig_d)
return c3y; return c3y;
} }
else if ( ret_w ) { else if ( ret_w ) {
fprintf(stderr, "lmdb: gulf: head fail: %s\n", fprintf(stderr, "lmdb: gulf: head fail: %s\r\n",
mdb_strerror(ret_w)); mdb_strerror(ret_w));
mdb_cursor_close(cur_u); mdb_cursor_close(cur_u);
mdb_txn_abort(txn_u); mdb_txn_abort(txn_u);
@ -191,7 +191,7 @@ u3_lmdb_read(MDB_env* env_u,
// create a read-only transaction. // create a read-only transaction.
// //
if ( (ret_w = mdb_txn_begin(env_u, 0, MDB_RDONLY, &txn_u)) ) { if ( (ret_w = mdb_txn_begin(env_u, 0, MDB_RDONLY, &txn_u)) ) {
fprintf(stderr, "lmdb: read txn_begin fail: %s\n", mdb_strerror(ret_w)); fprintf(stderr, "lmdb: read txn_begin fail: %s\r\n", mdb_strerror(ret_w));
return c3n; return c3n;
} }
@ -201,7 +201,7 @@ u3_lmdb_read(MDB_env* env_u,
c3_w ops_w = MDB_CREATE | MDB_INTEGERKEY; c3_w ops_w = MDB_CREATE | MDB_INTEGERKEY;
if ( (ret_w = mdb_dbi_open(txn_u, "EVENTS", ops_w, &mdb_u)) ) { if ( (ret_w = mdb_dbi_open(txn_u, "EVENTS", ops_w, &mdb_u)) ) {
fprintf(stderr, "lmdb: read: dbi_open fail: %s\n", mdb_strerror(ret_w)); fprintf(stderr, "lmdb: read: dbi_open fail: %s\r\n", mdb_strerror(ret_w));
// XX confirm // XX confirm
// //
mdb_txn_abort(txn_u); mdb_txn_abort(txn_u);
@ -220,7 +220,7 @@ u3_lmdb_read(MDB_env* env_u,
// creates a cursor to iterate over keys starting at [eve_d] // creates a cursor to iterate over keys starting at [eve_d]
// //
if ( (ret_w = mdb_cursor_open(txn_u, mdb_u, &cur_u)) ) { if ( (ret_w = mdb_cursor_open(txn_u, mdb_u, &cur_u)) ) {
fprintf(stderr, "lmdb: read: cursor_open fail: %s\n", fprintf(stderr, "lmdb: read: cursor_open fail: %s\r\n",
mdb_strerror(ret_w)); mdb_strerror(ret_w));
// XX confirm // XX confirm
// //
@ -312,7 +312,7 @@ u3_lmdb_save(MDB_env* env_u,
// create a write transaction // create a write transaction
// //
if ( (ret_w = mdb_txn_begin(env_u, 0, 0, &txn_u)) ) { if ( (ret_w = mdb_txn_begin(env_u, 0, 0, &txn_u)) ) {
fprintf(stderr, "lmdb: write: txn_begin fail: %s\n", mdb_strerror(ret_w)); fprintf(stderr, "lmdb: write: txn_begin fail: %s\r\n", mdb_strerror(ret_w));
return c3n; return c3n;
} }
@ -322,7 +322,7 @@ u3_lmdb_save(MDB_env* env_u,
c3_w ops_w = MDB_CREATE | MDB_INTEGERKEY; c3_w ops_w = MDB_CREATE | MDB_INTEGERKEY;
if ( (ret_w = mdb_dbi_open(txn_u, "EVENTS", ops_w, &mdb_u)) ) { if ( (ret_w = mdb_dbi_open(txn_u, "EVENTS", ops_w, &mdb_u)) ) {
fprintf(stderr, "lmdb: write: dbi_open fail: %s\n", mdb_strerror(ret_w)); fprintf(stderr, "lmdb: write: dbi_open fail: %s\r\n", mdb_strerror(ret_w));
mdb_txn_abort(txn_u); mdb_txn_abort(txn_u);
return c3n; return c3n;
} }
@ -343,7 +343,7 @@ u3_lmdb_save(MDB_env* env_u,
MDB_val val_u = { .mv_size = siz_i[i_d], .mv_data = byt_p[i_d] }; MDB_val val_u = { .mv_size = siz_i[i_d], .mv_data = byt_p[i_d] };
if ( (ret_w = mdb_put(txn_u, mdb_u, &key_u, &val_u, ops_w)) ) { if ( (ret_w = mdb_put(txn_u, mdb_u, &key_u, &val_u, ops_w)) ) {
fprintf(stderr, "lmdb: write failed on event %" PRIu64 "\n", key_d); fprintf(stderr, "lmdb: write failed on event %" PRIu64 "\r\n", key_d);
mdb_txn_abort(txn_u); mdb_txn_abort(txn_u);
return c3n; return c3n;
} }
@ -354,7 +354,7 @@ u3_lmdb_save(MDB_env* env_u,
// commit transaction // commit transaction
// //
if ( (ret_w = mdb_txn_commit(txn_u)) ) { if ( (ret_w = mdb_txn_commit(txn_u)) ) {
fprintf(stderr, "lmdb: write failed: %s\n", mdb_strerror(ret_w)); fprintf(stderr, "lmdb: write failed: %s\r\n", mdb_strerror(ret_w));
return c3n; return c3n;
} }
@ -376,7 +376,7 @@ u3_lmdb_read_meta(MDB_env* env_u,
// create a read transaction // create a read transaction
// //
if ( (ret_w = mdb_txn_begin(env_u, 0, MDB_RDONLY, &txn_u)) ) { if ( (ret_w = mdb_txn_begin(env_u, 0, MDB_RDONLY, &txn_u)) ) {
fprintf(stderr, "lmdb: meta read: txn_begin fail: %s\n", fprintf(stderr, "lmdb: meta read: txn_begin fail: %s\r\n",
mdb_strerror(ret_w)); mdb_strerror(ret_w));
return read_f(ptr_v, 0, 0); return read_f(ptr_v, 0, 0);
} }
@ -384,7 +384,7 @@ u3_lmdb_read_meta(MDB_env* env_u,
// open the database in the transaction // open the database in the transaction
// //
if ( (ret_w = mdb_dbi_open(txn_u, "META", 0, &mdb_u)) ) { if ( (ret_w = mdb_dbi_open(txn_u, "META", 0, &mdb_u)) ) {
fprintf(stderr, "lmdb: meta read: dbi_open fail: %s\n", fprintf(stderr, "lmdb: meta read: dbi_open fail: %s\r\n",
mdb_strerror(ret_w)); mdb_strerror(ret_w));
mdb_txn_abort(txn_u); mdb_txn_abort(txn_u);
return read_f(ptr_v, 0, 0); return read_f(ptr_v, 0, 0);
@ -396,7 +396,7 @@ u3_lmdb_read_meta(MDB_env* env_u,
MDB_val val_u; MDB_val val_u;
if ( (ret_w = mdb_get(txn_u, mdb_u, &key_u, &val_u)) ) { if ( (ret_w = mdb_get(txn_u, mdb_u, &key_u, &val_u)) ) {
fprintf(stderr, "lmdb: read failed: %s\n", mdb_strerror(ret_w)); fprintf(stderr, "lmdb: read failed: %s\r\n", mdb_strerror(ret_w));
mdb_txn_abort(txn_u); mdb_txn_abort(txn_u);
return read_f(ptr_v, 0, 0); return read_f(ptr_v, 0, 0);
} }
@ -425,7 +425,7 @@ u3_lmdb_save_meta(MDB_env* env_u,
// create a write transaction // create a write transaction
// //
if ( (ret_w = mdb_txn_begin(env_u, 0, 0, &txn_u)) ) { if ( (ret_w = mdb_txn_begin(env_u, 0, 0, &txn_u)) ) {
fprintf(stderr, "lmdb: meta write: txn_begin fail: %s\n", fprintf(stderr, "lmdb: meta write: txn_begin fail: %s\r\n",
mdb_strerror(ret_w)); mdb_strerror(ret_w));
return c3n; return c3n;
} }
@ -433,7 +433,7 @@ u3_lmdb_save_meta(MDB_env* env_u,
// opens the database in the transaction // opens the database in the transaction
// //
if ( (ret_w = mdb_dbi_open(txn_u, "META", MDB_CREATE, &mdb_u)) ) { if ( (ret_w = mdb_dbi_open(txn_u, "META", MDB_CREATE, &mdb_u)) ) {
fprintf(stderr, "lmdb: meta write: dbi_open fail: %s\n", fprintf(stderr, "lmdb: meta write: dbi_open fail: %s\r\n",
mdb_strerror(ret_w)); mdb_strerror(ret_w));
mdb_txn_abort(txn_u); mdb_txn_abort(txn_u);
return c3n; return c3n;
@ -446,7 +446,7 @@ u3_lmdb_save_meta(MDB_env* env_u,
MDB_val val_u = { .mv_size = val_i, .mv_data = val_p }; MDB_val val_u = { .mv_size = val_i, .mv_data = val_p };
if ( (ret_w = mdb_put(txn_u, mdb_u, &key_u, &val_u, 0)) ) { if ( (ret_w = mdb_put(txn_u, mdb_u, &key_u, &val_u, 0)) ) {
fprintf(stderr, "lmdb: write failed: %s\n", mdb_strerror(ret_w)); fprintf(stderr, "lmdb: write failed: %s\r\n", mdb_strerror(ret_w));
mdb_txn_abort(txn_u); mdb_txn_abort(txn_u);
return c3n; return c3n;
} }
@ -455,7 +455,7 @@ u3_lmdb_save_meta(MDB_env* env_u,
// commit txn // commit txn
// //
if ( (ret_w = mdb_txn_commit(txn_u)) ) { if ( (ret_w = mdb_txn_commit(txn_u)) ) {
fprintf(stderr, "lmdb: meta write: commit failed: %s\n", fprintf(stderr, "lmdb: meta write: commit failed: %s\r\n",
mdb_strerror(ret_w)); mdb_strerror(ret_w));
return c3n; return c3n;
} }

View File

@ -692,7 +692,7 @@ u3_disk_init(c3_c* pax_c, u3_disk_cb cb_u)
// //
{ {
if ( 0 == (log_u->dir_u = u3_foil_folder(pax_c)) ) { if ( 0 == (log_u->dir_u = u3_foil_folder(pax_c)) ) {
fprintf(stderr, "disk: failed to load pier at %s", pax_c); fprintf(stderr, "disk: failed to load pier at %s\r\n", pax_c);
c3_free(log_u); c3_free(log_u);
return 0; return 0;
} }
@ -707,7 +707,7 @@ u3_disk_init(c3_c* pax_c, u3_disk_cb cb_u)
strcat(urb_c, "/.urb"); strcat(urb_c, "/.urb");
if ( 0 == (log_u->urb_u = u3_foil_folder(urb_c)) ) { if ( 0 == (log_u->urb_u = u3_foil_folder(urb_c)) ) {
fprintf(stderr, "disk: failed to load /.urb in %s", pax_c); fprintf(stderr, "disk: failed to load /.urb in %s\r\n", pax_c);
c3_free(urb_c); c3_free(urb_c);
c3_free(log_u); c3_free(log_u);
return 0; return 0;
@ -740,7 +740,7 @@ u3_disk_init(c3_c* pax_c, u3_disk_cb cb_u)
strcat(log_c, "/.urb/log"); strcat(log_c, "/.urb/log");
if ( 0 == (log_u->com_u = u3_foil_folder(log_c)) ) { if ( 0 == (log_u->com_u = u3_foil_folder(log_c)) ) {
fprintf(stderr, "disk: failed to load /.urb/log in %s", pax_c); fprintf(stderr, "disk: failed to load /.urb/log in %s\r\n", pax_c);
c3_free(log_c); c3_free(log_c);
c3_free(log_u); c3_free(log_u);
return 0; return 0;
@ -752,10 +752,14 @@ u3_disk_init(c3_c* pax_c, u3_disk_cb cb_u)
// "[..] on 64-bit there is no penalty for making this huge (say 1TB)." // "[..] on 64-bit there is no penalty for making this huge (say 1TB)."
// //
{ {
#if defined(U3_CPU_aarch64) && defined(U3_OS_linux)
const size_t siz_i = 64424509440;
#else
const size_t siz_i = 1099511627776; const size_t siz_i = 1099511627776;
#endif
if ( 0 == (log_u->mdb_u = u3_lmdb_init(log_c, siz_i)) ) { if ( 0 == (log_u->mdb_u = u3_lmdb_init(log_c, siz_i)) ) {
fprintf(stderr, "disk: failed to initialize database"); fprintf(stderr, "disk: failed to initialize database\r\n");
c3_free(log_c); c3_free(log_c);
c3_free(log_u); c3_free(log_u);
return 0; return 0;
@ -772,7 +776,7 @@ u3_disk_init(c3_c* pax_c, u3_disk_cb cb_u)
c3_d fir_d; c3_d fir_d;
if ( c3n == u3_lmdb_gulf(log_u->mdb_u, &fir_d, &log_u->dun_d) ) { if ( c3n == u3_lmdb_gulf(log_u->mdb_u, &fir_d, &log_u->dun_d) ) {
fprintf(stderr, "disk: failed to load latest event from database"); fprintf(stderr, "disk: failed to load latest event from database\r\n");
c3_free(log_u); c3_free(log_u);
return 0; return 0;
} }

View File

@ -30,21 +30,58 @@
*/ */
typedef struct _u3_ames { // packet network state typedef struct _u3_ames { // packet network state
u3_auto car_u; // driver u3_auto car_u; // driver
u3_pier* pir_u; // pier
union { // union { //
uv_udp_t wax_u; // uv_udp_t wax_u; //
uv_handle_t had_u; // uv_handle_t had_u; //
}; // }; //
c3_d who_d[2]; // identity
c3_o fak_o; // fake keys
c3_s por_s; // public IPv4 port
c3_c* dns_c; // domain XX multiple/fallback c3_c* dns_c; // domain XX multiple/fallback
c3_d dop_d; // drop count c3_d dop_d; // drop count
c3_d fal_d; // crash count c3_d fal_d; // crash count
c3_w imp_w[256]; // imperial IPs c3_w imp_w[256]; // imperial IPs
time_t imp_t[256]; // imperial IP timestamps time_t imp_t[256]; // imperial IP timestamps
c3_o imp_o[256]; // imperial print status c3_o imp_o[256]; // imperial print status
c3_o see_o; // can scry
c3_o fit_o; // filtering active
c3_y ver_y; // protocol version
c3_d vet_d; // version mismatches filtered
c3_d mut_d; // invalid mugs filtered
struct _u3_panc* pac_u; // packets pending forwards
c3_d foq_d; // forward queue size
c3_d fow_d; // forwarded count
c3_d fod_d; // forwards dropped count
} u3_ames; } u3_ames;
/* u3_head: ames packet header
*/
typedef struct _u3_head {
c3_y ver_y; // protocol version
c3_l mug_l; // truncated mug hash of u3_body
c3_y sac_y; // sender class
c3_y rac_y; // receiver class
c3_o enc_o; // encrypted?
} u3_head;
/* u3_body: ames packet body
*/
typedef struct _u3_body {
c3_d sen_d[2]; // sender
c3_d rec_d[2]; // receiver
c3_w con_w; // jam size
c3_y* con_y; // (jam [origin content])
} u3_body;
/* u3_panc: deconstructed incoming packet
*/
typedef struct _u3_panc {
u3_ames* sam_u; // ames backpointer
struct _u3_panc* pre_u; // previous packet
struct _u3_panc* nex_u; // next packet
u3_lane ore_u; // origin lane
u3_head hed_u; // header
u3_body bod_u; // body
} u3_panc;
/* _ames_alloc(): libuv buffer allocator. /* _ames_alloc(): libuv buffer allocator.
*/ */
static void static void
@ -70,6 +107,34 @@ _ames_pact_free(u3_pact* pac_u)
c3_free(pac_u); c3_free(pac_u);
} }
/* _ames_panc_free(): remove references, lose refcounts and free struct
*/
static void
_ames_panc_free(u3_panc* pac_u) {
if (0 != pac_u->nex_u) {
pac_u->nex_u->pre_u = pac_u->pre_u;
}
if (0 != pac_u->pre_u) {
pac_u->pre_u->nex_u = pac_u->nex_u;
} else {
c3_assert(pac_u == pac_u->sam_u->pac_u);
pac_u->sam_u->pac_u = pac_u->nex_u;
}
c3_free(pac_u->bod_u.con_y);
c3_free(pac_u);
}
/* _ca_mug_body(): truncated mug hash of bytes
*/
static c3_l
_ca_mug_body(c3_w len_w, c3_y* byt_y)
{
// mask off ((1 << 20) - 1)
//
return u3r_mug_bytes(byt_y, len_w) & 0xfffff;
}
/* _ames_send_cb(): send callback. /* _ames_send_cb(): send callback.
*/ */
static void static void
@ -232,6 +297,108 @@ u3_ames_encode_lane(u3_lane lan) {
return u3ke_jam(u3nt(c3__ipv4, u3i_words(1, &lan.pip_w), lan.por_s)); return u3ke_jam(u3nt(c3__ipv4, u3i_words(1, &lan.pip_w), lan.por_s));
} }
/* _ames_lane_from_sockaddr(): sockaddr_in to lane struct
*/
static u3_lane
_ames_lane_from_sockaddr(struct sockaddr_in* add_u)
{
u3_lane lan_u;
lan_u.por_s = ntohs(add_u->sin_port);
lan_u.pip_w = ntohl(add_u->sin_addr.s_addr);
return lan_u;
}
/* _ames_serialize_packet(): u3_panc to atom, updating the origin lane if dop_o
** (retains pac_u)
*/
static u3_noun
_ames_serialize_packet(u3_panc* pac_u, c3_o dop_o)
{
c3_y sen_y = 2 << pac_u->hed_u.sac_y;
c3_y rec_y = 2 << pac_u->hed_u.rac_y;
c3_o nal_o = c3n;
// update the body's lane, if desired
//
if (c3y == dop_o) {
// unpack (jam [(unit lane) body])
//
u3_noun lon, bod;
{
//NOTE we checked for cue safety in _ames_recv_cb
u3_noun old = u3ke_cue(u3i_bytes(pac_u->bod_u.con_w, pac_u->bod_u.con_y));
u3x_cell(old, &lon, &bod);
u3k(lon); u3k(bod);
u3z(old);
}
// only replace the lane if it was ~
//
//NOTE this sets an opaque lane even in the "sender is galaxy" case,
// but that doesn't matter: ames.hoon ignores origin in that case,
// always using the appropriate galaxy lane instead.
//
if (u3_nul == lon) {
u3z(lon);
lon = u3nt(u3_nul, c3n, u3_ames_encode_lane(pac_u->ore_u));
nal_o = c3y;
c3_free(pac_u->bod_u.con_y);
u3_noun jam = u3ke_jam(u3nc(lon, bod));
pac_u->bod_u.con_w = u3r_met(3, jam);
pac_u->bod_u.con_y = c3_malloc(pac_u->bod_u.con_w);
u3r_bytes(0, pac_u->bod_u.con_w, pac_u->bod_u.con_y, jam);
u3z(jam);
}
else {
u3z(lon); u3z(bod);
}
}
// serialize the packet
//
u3_noun pac;
{
// start with the body
//
u3_body* bod_u = &pac_u->bod_u;
c3_y* pac_y = c3_malloc(4 + sen_y + rec_y + bod_u->con_w);
{
u3_atom sen = u3i_chubs(2, bod_u->sen_d);
u3_atom rec = u3i_chubs(2, bod_u->rec_d);
u3r_bytes(0, sen_y, pac_y + 4, sen);
u3r_bytes(0, rec_y, pac_y + 4 + sen_y, rec);
u3z(sen); u3z(rec);
}
memcpy(pac_y + 4 + sen_y + rec_y, bod_u->con_y, bod_u->con_w);
// if we updated the origin lane, we need to update the mug too
//
if (c3y == nal_o) {
pac_u->hed_u.mug_l = _ca_mug_body(sen_y + rec_y + bod_u->con_w,
pac_y + 4);
}
// now we can serialize the head
//
u3_head* hed_u = &pac_u->hed_u;
c3_w hed_w = hed_u->ver_y
| (hed_u->mug_l << 3)
| (hed_u->sac_y << 23)
| (hed_u->rac_y << 25)
| (hed_u->enc_o << 27);
// XX assumes little-endian
//
memcpy(pac_y, &hed_w, 4);
pac = u3i_bytes(4 + sen_y + rec_y + bod_u->con_w, pac_y);
c3_free(pac_y);
}
return pac;
}
/* _ames_czar(): galaxy address resolution. /* _ames_czar(): galaxy address resolution.
*/ */
static void static void
@ -435,6 +602,77 @@ _ames_hear_bail(u3_ovum* egg_u, u3_noun lud)
u3_ovum_free(egg_u); u3_ovum_free(egg_u);
} }
/* _ames_put_packet(): add packet to queue, drop old packets on pressure
*/
static void
_ames_put_packet(u3_ames* sam_u,
u3_noun msg,
u3_lane lan_u)
{
u3_noun wir = u3nc(c3__ames, u3_nul);
u3_noun cad = u3nt(c3__hear, u3nc(c3n, u3_ames_encode_lane(lan_u)), msg);
u3_auto_peer(
u3_auto_plan(&sam_u->car_u,
u3_ovum_init(0, c3__a, wir, cad)),
0, 0, _ames_hear_bail);
_ames_cap_queue(sam_u);
}
/* _ames_forward(): forward pac_u onto the (list lane) las, then free pac_u
*/
static void
_ames_forward(u3_panc* pac_u, u3_noun las)
{
pac_u->sam_u->fow_d++;
if ( 0 == (pac_u->sam_u->fow_d % 10000) ) {
u3l_log("ames: forwarded %" PRIu64 " total\n", pac_u->sam_u->fow_d);
}
{
u3_noun los = las;
u3_noun pac = _ames_serialize_packet(pac_u, c3y);
while (u3_nul != las) {
_ames_ef_send(pac_u->sam_u, u3k(u3h(las)), u3k(pac));
las = u3t(las);
}
u3z(los); u3z(pac);
}
pac_u->sam_u->foq_d--;
_ames_panc_free(pac_u);
}
/* _ames_lane_scry_cb(): learn lane to forward packet on
*/
static void
_ames_lane_scry_cb(void* vod_p, u3_noun nun)
{
u3_panc* pac_u = vod_p;
u3_weak las = u3r_at(7, nun);
// if scry fails, remember we can't scry, and just inject the packet
//
if (u3_none == las) {
pac_u->sam_u->foq_d--;
u3l_log("ames: giving up scry\n");
pac_u->sam_u->see_o = c3n;
_ames_put_packet(pac_u->sam_u,
_ames_serialize_packet(pac_u, c3n),
pac_u->ore_u);
_ames_panc_free(pac_u);
}
// if there is a lane, forward the packet on it
//
else if (u3_nul != las) {
_ames_forward(pac_u, u3k(las));
}
// if there is no lane, drop the packet
u3z(nun);
}
/* _ames_recv_cb(): receive callback. /* _ames_recv_cb(): receive callback.
*/ */
static void static void
@ -445,39 +683,161 @@ _ames_recv_cb(uv_udp_t* wax_u,
unsigned flg_i) unsigned flg_i)
{ {
u3_ames* sam_u = wax_u->data; u3_ames* sam_u = wax_u->data;
c3_o pas_o = c3y;
c3_y* byt_y = (c3_y*)buf_u->base;
c3_y* bod_y = byt_y + 4;
u3_head hed_u;
// data present, and protocol version in header matches 0 // ensure a sane message size
// //
// XX inflexible, scry version out of ames if (4 >= nrd_i) {
pas_o = c3n;
}
// unpack the packet header
// //
if ( (0 < nrd_i) else {
&& (0 == (0x7 & *((c3_w*)buf_u->base))) ) c3_w hed_w = (byt_y[0] << 0)
{ | (byt_y[1] << 8)
u3_noun wir = u3nc(c3__ames, u3_nul); | (byt_y[2] << 16)
u3_noun cad; | (byt_y[3] << 24);
hed_u.ver_y = hed_w & 0x7;
hed_u.mug_l = (hed_w >> 3) & 0xfffff; //NOTE ((1 << 20) - 1)
hed_u.sac_y = (hed_w >> 23) & 0x3;
hed_u.rac_y = (hed_w >> 25) & 0x3;
hed_u.enc_o = (hed_w >> 27) & 0x1;
}
// ensure the protocol version matches ours
//
if ( c3y == pas_o
&& (c3y == sam_u->fit_o)
&& (sam_u->ver_y != hed_u.ver_y) )
{ {
pas_o = c3n;
sam_u->vet_d++;
if ( 0 == (sam_u->vet_d % 100) ) {
u3l_log("ames: %" PRIu64 " dropped for version mismatch\n", sam_u->vet_d);
}
}
// ensure the mug is valid
//
if ( c3y == pas_o
&& (hed_u.mug_l != _ca_mug_body(nrd_i - 4, bod_y)) )
{
pas_o = c3n;
sam_u->mut_d++;
if ( 0 == (sam_u->mut_d % 100) ) {
u3l_log("ames: %" PRIu64 " dropped for invalid mug\n", sam_u->mut_d);
}
}
// unpack the body
//
c3_y sen_y = 2 << hed_u.sac_y;
c3_y rec_y = 2 << hed_u.rac_y;
c3_d sen_d[2];
c3_d rec_d[2];
c3_w con_w = nrd_i - 4 - sen_y - rec_y;
c3_y* con_y = NULL;
if (c3y == pas_o) {
u3_noun sen = u3i_bytes(sen_y, bod_y);
u3_noun rec = u3i_bytes(rec_y, bod_y + sen_y);
u3r_chubs(0, 2, rec_d, rec);
u3r_chubs(0, 2, sen_d, sen);
u3z(sen); u3z(rec);
con_y = c3_malloc(con_w);
memcpy(con_y, bod_y + sen_y + rec_y, con_w);
// ensure the content is cue-able
//
u3_noun pro = u3m_soft(0, u3ke_cue, u3i_bytes(con_w, con_y));
pas_o = (u3_blip == u3h(pro)) ? c3y : c3n;
u3z(pro);
}
// if we can scry,
// and we are not the recipient,
// we might want to forward statelessly
//
if ( c3y == pas_o
&& c3y == sam_u->see_o
&& ( (rec_d[0] != sam_u->pir_u->who_d[0])
|| (rec_d[1] != sam_u->pir_u->who_d[1]) ) )
{
pas_o = c3n;
// if the queue is full, and we can't forward synchronously,
// just drop the packet
//
//TODO drop oldest item in forward queue in favor of this one.
// ames.c doesn't/shouldn't know about the shape of scry events,
// so can't pluck these out of the event queue like it does in
// _ames_cap_queue. as such, blocked on u3_lord_peek_cancel or w/e.
//
if ( (1000 < sam_u->foq_d)
&& !(rec_d[1] == 0 && (256 > rec_d[0])) )
{
c3_free(con_y);
sam_u->fod_d++;
if ( 0 == (sam_u->fod_d % 100000) ) {
u3l_log("ames: dropped %" PRIu64 " forwards total\n", sam_u->fod_d);
}
}
// otherwise, proceed with forwarding
//
else {
sam_u->foq_d++;
// store the packet details for later processing
//
u3_panc* pac_u = c3_calloc(sizeof(*pac_u));
pac_u->sam_u = sam_u;
pac_u->hed_u = hed_u;
pac_u->bod_u.sen_d[0] = sen_d[0];
pac_u->bod_u.sen_d[1] = sen_d[1];
pac_u->bod_u.rec_d[0] = rec_d[0];
pac_u->bod_u.rec_d[1] = rec_d[1];
pac_u->bod_u.con_w = con_w;
pac_u->bod_u.con_y = con_y;
pac_u->ore_u = _ames_lane_from_sockaddr((struct sockaddr_in *)adr_u);
if (0 != sam_u->pac_u) {
pac_u->nex_u = sam_u->pac_u;
sam_u->pac_u->pre_u = pac_u;
}
sam_u->pac_u = pac_u;
// if the recipient is a galaxy, their lane is always &+~gax
//
if ( (rec_d[1] == 0) && (256 > rec_d[0]) ) {
_ames_forward(pac_u, u3nc(u3nc(c3y, (c3_y)rec_d[0]), u3_nul));
}
// otherwise, if there's space in the queue, scry the lane out of ames
//
else {
u3_noun pax = u3nq(u3i_string("peers"),
u3dc("scot", 'p', u3i_chubs(2, rec_d)),
u3i_string("forward-lane"),
u3_nul);
u3_pier_peek_last(sam_u->pir_u, u3_nul, c3__ax,
u3_nul, pax, pac_u, _ames_lane_scry_cb);
}
}
}
// if we passed the filter, inject the packet
//
if (c3y == pas_o) {
c3_free(con_y);
u3_lane ore_u = _ames_lane_from_sockaddr((struct sockaddr_in *)adr_u);
u3_noun msg = u3i_bytes((c3_w)nrd_i, (c3_y*)buf_u->base); u3_noun msg = u3i_bytes((c3_w)nrd_i, (c3_y*)buf_u->base);
u3_noun lan; _ames_put_packet(sam_u, msg, ore_u);
{
struct sockaddr_in* add_u = (struct sockaddr_in *)adr_u;
u3_lane lan_u;
lan_u.por_s = ntohs(add_u->sin_port);
lan_u.pip_w = ntohl(add_u->sin_addr.s_addr);
lan = u3_ames_encode_lane(lan_u);
}
cad = u3nt(c3__hear, u3nc(c3n, lan), msg);
}
u3_auto_peer(
u3_auto_plan(&sam_u->car_u,
u3_ovum_init(0, c3__a, wir, cad)),
0, 0, _ames_hear_bail);
_ames_cap_queue(sam_u);
} }
c3_free(buf_u->base); c3_free(buf_u->base);
@ -488,13 +848,13 @@ _ames_recv_cb(uv_udp_t* wax_u,
static void static void
_ames_io_start(u3_ames* sam_u) _ames_io_start(u3_ames* sam_u)
{ {
c3_s por_s = sam_u->por_s; c3_s por_s = sam_u->pir_u->por_s;
u3_noun who = u3i_chubs(2, sam_u->who_d); u3_noun who = u3i_chubs(2, sam_u->pir_u->who_d);
u3_noun rac = u3do("clan:title", u3k(who)); u3_noun rac = u3do("clan:title", u3k(who));
c3_i ret_i; c3_i ret_i;
if ( c3__czar == rac ) { if ( c3__czar == rac ) {
c3_y num_y = (c3_y)sam_u->who_d[0]; c3_y num_y = (c3_y)sam_u->pir_u->who_d[0];
c3_s zar_s = _ames_czar_port(num_y); c3_s zar_s = _ames_czar_port(num_y);
if ( 0 == por_s ) { if ( 0 == por_s ) {
@ -537,14 +897,14 @@ _ames_io_start(u3_ames* sam_u)
uv_udp_getsockname(&sam_u->wax_u, (struct sockaddr *)&add_u, &add_i); uv_udp_getsockname(&sam_u->wax_u, (struct sockaddr *)&add_u, &add_i);
c3_assert(add_u.sin_port); c3_assert(add_u.sin_port);
sam_u->por_s = ntohs(add_u.sin_port); sam_u->pir_u->por_s = ntohs(add_u.sin_port);
} }
if ( c3y == u3_Host.ops_u.net ) { if ( c3y == u3_Host.ops_u.net ) {
u3l_log("ames: live on %d\n", sam_u->por_s); u3l_log("ames: live on %d\n", sam_u->pir_u->por_s);
} }
else { else {
u3l_log("ames: live on %d (localhost only)\n", sam_u->por_s); u3l_log("ames: live on %d (localhost only)\n", sam_u->pir_u->por_s);
} }
uv_udp_recv_start(&sam_u->wax_u, _ames_alloc, _ames_recv_cb); uv_udp_recv_start(&sam_u->wax_u, _ames_alloc, _ames_recv_cb);
@ -574,7 +934,7 @@ _ames_ef_turf(u3_ames* sam_u, u3_noun tuf)
u3z(tuf); u3z(tuf);
} }
else if ( (c3n == sam_u->fak_o) && (0 == sam_u->dns_c) ) { else if ( (c3n == sam_u->pir_u->fak_o) && (0 == sam_u->dns_c) ) {
u3l_log("ames: turf: no domains\n"); u3l_log("ames: turf: no domains\n");
} }
@ -585,6 +945,32 @@ _ames_ef_turf(u3_ames* sam_u, u3_noun tuf)
} }
} }
/* _ames_prot_scry_cb(): receive protocol version
*/
static void
_ames_prot_scry_cb(void* vod_p, u3_noun nun)
{
u3_ames* sam_u = vod_p;
u3_weak ver = u3r_at(7, nun);
if (u3_none == ver) {
// assume protocol version 0
//
sam_u->ver_y = 0;
}
else if ( (c3n == u3a_is_cat(ver))
|| (7 < ver) ) {
u3m_p("ames: strange protocol", nun);
sam_u->ver_y = 0;
}
else {
sam_u->ver_y = ver;
}
sam_u->fit_o = c3y;
u3z(nun);
}
/* _ames_io_talk(): start receiving ames traffic. /* _ames_io_talk(): start receiving ames traffic.
*/ */
static void static void
@ -601,6 +987,12 @@ _ames_io_talk(u3_auto* car_u)
u3_auto_plan(car_u, u3_ovum_init(0, c3__a, wir, cad)); u3_auto_plan(car_u, u3_ovum_init(0, c3__a, wir, cad));
} }
// scry the protocol version out of arvo
//
u3_pier_peek_last(car_u->pir_u, u3_nul, c3__ax, u3_nul,
u3nt(u3i_string("protocol"), u3i_string("version"), u3_nul),
sam_u, _ames_prot_scry_cb);
} }
/* _ames_kick_newt(): apply packet network outputs. /* _ames_kick_newt(): apply packet network outputs.
@ -693,6 +1085,14 @@ static void
_ames_exit_cb(uv_handle_t* had_u) _ames_exit_cb(uv_handle_t* had_u)
{ {
u3_ames* sam_u = had_u->data; u3_ames* sam_u = had_u->data;
u3_panc* pac_u = sam_u->pac_u;
while (0 != pac_u) {
u3_panc* nex_u = pac_u->nex_u;
_ames_panc_free(pac_u);
pac_u = nex_u;
}
c3_free(sam_u); c3_free(sam_u);
} }
@ -712,6 +1112,11 @@ _ames_io_info(u3_auto* car_u)
{ {
u3_ames* sam_u = (u3_ames*)car_u; u3_ames* sam_u = (u3_ames*)car_u;
u3l_log(" dropped: %" PRIu64 "\n", sam_u->dop_d); u3l_log(" dropped: %" PRIu64 "\n", sam_u->dop_d);
u3l_log(" forwards dropped: %" PRIu64 "\n", sam_u->fod_d);
u3l_log(" forwards pending: %" PRIu64 "\n", sam_u->foq_d);
u3l_log(" forwarded: %" PRIu64 "\n", sam_u->fow_d);
u3l_log(" filtered (ver): %" PRIu64 "\n", sam_u->vet_d);
u3l_log(" filtered (mug): %" PRIu64 "\n", sam_u->mut_d);
u3l_log(" crashed: %" PRIu64 "\n", sam_u->fal_d); u3l_log(" crashed: %" PRIu64 "\n", sam_u->fal_d);
} }
@ -721,22 +1126,21 @@ u3_auto*
u3_ames_io_init(u3_pier* pir_u) u3_ames_io_init(u3_pier* pir_u)
{ {
u3_ames* sam_u = c3_calloc(sizeof(*sam_u)); u3_ames* sam_u = c3_calloc(sizeof(*sam_u));
sam_u->who_d[0] = pir_u->who_d[0]; sam_u->pir_u = pir_u;
sam_u->who_d[1] = pir_u->who_d[1];
sam_u->por_s = pir_u->por_s;
sam_u->fak_o = pir_u->fak_o;
sam_u->dop_d = 0; sam_u->dop_d = 0;
sam_u->see_o = c3y;
sam_u->fit_o = c3n;
sam_u->foq_d = 0;
c3_assert( !uv_udp_init(u3L, &sam_u->wax_u) ); c3_assert( !uv_udp_init(u3L, &sam_u->wax_u) );
sam_u->wax_u.data = sam_u; sam_u->wax_u.data = sam_u;
// Disable networking for fake ships // Disable networking for fake ships
// //
if ( c3y == sam_u->fak_o ) { if ( c3y == sam_u->pir_u->fak_o ) {
u3_Host.ops_u.net = c3n; u3_Host.ops_u.net = c3n;
} }
u3_auto* car_u = &sam_u->car_u; u3_auto* car_u = &sam_u->car_u;
car_u->nam_m = c3__ames; car_u->nam_m = c3__ames;
car_u->liv_o = c3n; car_u->liv_o = c3n;

View File

@ -349,12 +349,12 @@ _lord_plea_peek_bail(u3_lord* god_u, u3_peek* pek_u, u3_noun dud)
{ {
u3_pier_punt_goof("peek", dud); u3_pier_punt_goof("peek", dud);
pek_u->fun_f(pek_u->ptr_v, u3_nul);
u3z(pek_u->now); u3z(pek_u->now);
u3z(pek_u->gan); u3z(pek_u->gan);
u3z(pek_u->ful); u3z(pek_u->ful);
c3_free(pek_u); c3_free(pek_u);
_lord_bail(god_u);
} }
/* _lord_plea_peek_done(): hear serf %peek %done /* _lord_plea_peek_done(): hear serf %peek %done
@ -803,92 +803,52 @@ _lord_writ_plan(u3_lord* god_u, u3_writ* wit_u)
_lord_writ_send(god_u, wit_u); _lord_writ_send(god_u, wit_u);
} }
/* u3_lord_peek(): read namespace. /* u3_lord_peek(): read namespace, injecting what's missing.
*/ */
void void
u3_lord_peek(u3_lord* god_u, u3_lord_peek(u3_lord* god_u, u3_pico* pic_u)
u3_noun gan,
u3_noun ful,
void* ptr_v,
u3_peek_cb fun_f)
{ {
u3_writ* wit_u = _lord_writ_new(god_u); u3_writ* wit_u = _lord_writ_new(god_u);
wit_u->typ_e = u3_writ_peek; wit_u->typ_e = u3_writ_peek;
wit_u->pek_u = c3_calloc(sizeof(*wit_u->pek_u)); wit_u->pek_u = c3_calloc(sizeof(*wit_u->pek_u));
wit_u->pek_u->ptr_v = ptr_v; wit_u->pek_u->ptr_v = pic_u->ptr_v;
wit_u->pek_u->fun_f = fun_f; wit_u->pek_u->fun_f = pic_u->fun_f;
wit_u->pek_u->now = u3_time_in_tv(&wit_u->tim_u); wit_u->pek_u->now = u3_time_in_tv(&wit_u->tim_u);
wit_u->pek_u->gan = gan; wit_u->pek_u->gan = u3k(pic_u->gan);
wit_u->pek_u->ful = ful;
// XX cache check // construct the full scry path
// //
switch ( pic_u->typ_e ) {
default: c3_assert(0);
_lord_writ_plan(god_u, wit_u); case u3_pico_full: {
} wit_u->pek_u->ful = u3k(pic_u->ful);
} break;
/* u3_lord_peek_mine(): read namespace, injecting ship (our). case u3_pico_mine: {
*/
void
u3_lord_peek_mine(u3_lord* god_u,
u3_noun gan,
c3_m car_m,
u3_noun pax,
void* ptr_v,
u3_peek_cb fun_f)
{
u3_writ* wit_u = _lord_writ_new(god_u);
wit_u->typ_e = u3_writ_peek;
wit_u->pek_u = c3_calloc(sizeof(*wit_u->pek_u));
wit_u->pek_u->ptr_v = ptr_v;
wit_u->pek_u->fun_f = fun_f;
wit_u->pek_u->now = u3_time_in_tv(&wit_u->tim_u);
wit_u->pek_u->gan = gan;
{
// XX cache // XX cache
// //
u3_pier* pir_u = god_u->cb_u.ptr_v; // XX do better u3_pier* pir_u = god_u->cb_u.ptr_v; // XX do better
u3_noun our = u3dc("scot", 'p', u3i_chubs(2, pir_u->who_d)); u3_noun our = u3dc("scot", 'p', u3i_chubs(2, pir_u->who_d));
wit_u->pek_u->ful = u3nt(car_m, our, pax); wit_u->pek_u->ful = u3nt(pic_u->min_u.car_m, our, u3k(pic_u->min_u.pax));
} } break;
// XX cache check case u3_pico_last: {
//
_lord_writ_plan(god_u, wit_u);
}
/* u3_lord_peek_last(): read namespace, injecting ship (our) and case (now).
*/
void
u3_lord_peek_last(u3_lord* god_u,
u3_noun gan,
c3_m car_m,
u3_atom des,
u3_noun pax,
void* ptr_v,
u3_peek_cb fun_f)
{
u3_writ* wit_u = _lord_writ_new(god_u);
wit_u->typ_e = u3_writ_peek;
wit_u->pek_u = c3_calloc(sizeof(*wit_u->pek_u));
wit_u->pek_u->ptr_v = ptr_v;
wit_u->pek_u->fun_f = fun_f;
wit_u->pek_u->now = u3_time_in_tv(&wit_u->tim_u);
wit_u->pek_u->gan = gan;
{
// XX cache // XX cache
// //
u3_pier* pir_u = god_u->cb_u.ptr_v; // XX do better u3_pier* pir_u = god_u->cb_u.ptr_v; // XX do better
u3_noun our = u3dc("scot", 'p', u3i_chubs(2, pir_u->who_d)); u3_noun our = u3dc("scot", 'p', u3i_chubs(2, pir_u->who_d));
u3_noun cas = u3dc("scot", c3__da, u3k(wit_u->pek_u->now)); u3_noun cas = u3dc("scot", c3__da, u3k(wit_u->pek_u->now));
wit_u->pek_u->ful = u3nc(car_m, u3nq(our, des, cas, pax)); wit_u->pek_u->ful = u3nc(pic_u->las_u.car_m,
u3nq(our,
u3k(pic_u->las_u.des),
cas,
u3k(pic_u->las_u.pax)));
} break;
} }
// NB, won't be cached, result shouldn't be // XX cache check, unless last
// //
_lord_writ_plan(god_u, wit_u); _lord_writ_plan(god_u, wit_u);
} }

View File

@ -26,6 +26,42 @@
#undef VERBOSE_PIER #undef VERBOSE_PIER
/* _pier_peek_plan(): add a u3_pico to the peek queue
*/
static void
_pier_peek_plan(u3_pier* pir_u, u3_pico* pic_u)
{
if (!pir_u->pec_u.ent_u) {
c3_assert( !pir_u->pec_u.ext_u );
pir_u->pec_u.ent_u = pir_u->pec_u.ext_u = pic_u;
}
else {
pir_u->pec_u.ent_u->nex_u = pic_u;
pir_u->pec_u.ent_u = pic_u;
}
u3_pier_spin(pir_u);
}
/* _pier_peek_next(): pop u3_pico off of peek queue
*/
static u3_pico*
_pier_peek_next(u3_pier* pir_u)
{
u3_pico* pic_u = pir_u->pec_u.ext_u;
if (pic_u) {
pir_u->pec_u.ext_u = pic_u->nex_u;
if (!pir_u->pec_u.ext_u) {
pir_u->pec_u.ent_u = 0;
}
pic_u->nex_u = 0;
}
return pic_u;
}
/* _pier_work_send(): send new events for processing /* _pier_work_send(): send new events for processing
*/ */
static void static void
@ -60,13 +96,32 @@ _pier_work_send(u3_work* wok_u)
{ {
u3_ovum* egg_u; u3_ovum* egg_u;
u3_noun ovo; u3_noun ovo;
u3_pico* pic_u;
while ( len_w-- && car_u && (egg_u = u3_auto_next(car_u, &ovo)) ) { while ( len_w && car_u && (egg_u = u3_auto_next(car_u, &ovo)) ) {
len_w--;
u3_lord_work(god_u, egg_u, ovo); u3_lord_work(god_u, egg_u, ovo);
// queue events depth first // queue events depth first
// //
car_u = egg_u->car_u; car_u = egg_u->car_u;
// interleave scry requests
//
if ( len_w && (pic_u = _pier_peek_next(pir_u)) )
{
len_w--;
u3_lord_peek(god_u, pic_u);
u3_pico_free(pic_u);
}
}
// if there's room left in the batch, fill it up with remaining scries
//
while ( len_w-- && (pic_u = _pier_peek_next(pir_u)) )
{
u3_lord_peek(god_u, pic_u);
u3_pico_free(pic_u);
} }
} }
} }
@ -353,6 +408,75 @@ u3_pier_spin(u3_pier* pir_u)
} }
} }
/* u3_pier_peek(): read namespace.
*/
void
u3_pier_peek(u3_pier* pir_u,
u3_noun gan,
u3_noun ful,
void* ptr_v,
u3_peek_cb fun_f)
{
u3_pico* pic_u = u3_pico_init();
pic_u->ptr_v = ptr_v;
pic_u->fun_f = fun_f;
pic_u->gan = gan;
//
pic_u->typ_e = u3_pico_full;
pic_u->ful = ful;
_pier_peek_plan(pir_u, pic_u);
}
/* u3_pier_peek_mine(): read namespace, injecting ship.
*/
void
u3_pier_peek_mine(u3_pier* pir_u,
u3_noun gan,
c3_m car_m,
u3_noun pax,
void* ptr_v,
u3_peek_cb fun_f)
{
u3_pico* pic_u = u3_pico_init();
pic_u->ptr_v = ptr_v;
pic_u->fun_f = fun_f;
pic_u->gan = gan;
//
pic_u->typ_e = u3_pico_mine;
pic_u->min_u.car_m = car_m;
pic_u->min_u.pax = pax;
_pier_peek_plan(pir_u, pic_u);
}
/* u3_pier_peek_last(): read namespace, injecting ship and case.
*/
void
u3_pier_peek_last(u3_pier* pir_u,
u3_noun gan,
c3_m car_m,
u3_atom des,
u3_noun pax,
void* ptr_v,
u3_peek_cb fun_f)
{
u3_pico* pic_u = u3_pico_init();
pic_u->ptr_v = ptr_v;
pic_u->fun_f = fun_f;
pic_u->gan = gan;
//
pic_u->typ_e = u3_pico_last;
pic_u->las_u.car_m = car_m;
pic_u->las_u.des = des;
pic_u->las_u.pax = pax;
_pier_peek_plan(pir_u, pic_u);
}
/* _pier_work_init(): begin processing new events /* _pier_work_init(): begin processing new events
*/ */
static void static void
@ -1024,7 +1148,7 @@ _pier_on_lord_live(void* ptr_v)
// run the requested scry, jam to disk, then exit // run the requested scry, jam to disk, then exit
// //
u3l_log("pier: scry\n"); u3l_log("pier: scry\n");
u3_lord_peek_last(god_u, u3_nul, u3k(car), u3k(dek), u3k(pax), u3_pier_peek_last(pir_u, u3_nul, u3k(car), u3k(dek), u3k(pax),
pir_u, _pier_on_scry_done); pir_u, _pier_on_scry_done);
} }
u3z(pex); u3z(pex);
@ -1210,7 +1334,13 @@ _pier_init(c3_w wag_w, c3_c* pax_c)
u3_pier* u3_pier*
u3_pier_stay(c3_w wag_w, u3_noun pax) u3_pier_stay(c3_w wag_w, u3_noun pax)
{ {
u3_pier* pir_u = _pier_init(wag_w, u3r_string(pax)); u3_pier* pir_u;
if ( !(pir_u = _pier_init(wag_w, u3r_string(pax))) ) {
fprintf(stderr, "pier: stay: init fail\r\n");
u3_king_bail();
return 0;
}
if ( c3n == u3_disk_read_meta(pir_u->log_u, pir_u->who_d, if ( c3n == u3_disk_read_meta(pir_u->log_u, pir_u->who_d,
&pir_u->fak_o, &pir_u->lif_w) ) &pir_u->fak_o, &pir_u->lif_w) )
@ -1420,7 +1550,13 @@ u3_pier_boot(c3_w wag_w, // config flags
u3_noun pil, // type-of/path-to pill u3_noun pil, // type-of/path-to pill
u3_noun pax) // path to pier u3_noun pax) // path to pier
{ {
u3_pier* pir_u = _pier_init(wag_w, u3r_string(pax)); u3_pier* pir_u;
if ( !(pir_u = _pier_init(wag_w, u3r_string(pax))) ) {
fprintf(stderr, "pier: boot: init fail\r\n");
u3_king_bail();
return 0;
}
if ( c3n == _pier_boot_plan(pir_u, who, ven, pil) ) { if ( c3n == _pier_boot_plan(pir_u, who, ven, pil) ) {
fprintf(stderr, "pier: boot plan fail\r\n"); fprintf(stderr, "pier: boot plan fail\r\n");

View File

@ -168,6 +168,42 @@ u3_ovum_free(u3_ovum *egg_u)
c3_free(egg_u); c3_free(egg_u);
} }
/* u3_pico_init(): initialize a scry request struct
*/
u3_pico*
u3_pico_init()
{
u3_pico* pic_u = c3_calloc(sizeof(*pic_u));
return pic_u;
}
/* u3_pico_free(): dispose a scry request struct
*/
void
u3_pico_free(u3_pico* pic_u)
{
u3z(pic_u->gan);
switch ( pic_u->typ_e ) {
default: c3_assert(0);
case u3_pico_full: {
u3z(pic_u->ful);
} break;
case u3_pico_mine: {
u3z(pic_u->min_u.pax);
} break;
case u3_pico_last: {
u3z(pic_u->las_u.des);
u3z(pic_u->las_u.pax);
} break;
}
c3_free(pic_u);
}
/* u3_mcut_char(): measure/cut character. /* u3_mcut_char(): measure/cut character.
*/ */
c3_w c3_w