Improve metadata decoding in Streams and allow for custom streams.

This commit is contained in:
Dillon Kearns 2024-04-17 09:11:14 -07:00
parent edd9a31e6d
commit b9a9c41d4d
5 changed files with 230 additions and 59 deletions

File diff suppressed because one or more lines are too long

View File

@ -5,11 +5,14 @@ export async function hello(input, { cwd, env }) {
}
export async function upperCaseStream() {
return new Transform({
transform(chunk, encoding, callback) {
callback(null, chunk.toString().toUpperCase());
},
});
return {
metadata: () => "Hi! I'm metadata from upperCaseStream!",
stream: new Transform({
transform(chunk, encoding, callback) {
callback(null, chunk.toString().toUpperCase());
},
}),
};
}
export async function customReadStream() {
@ -20,3 +23,12 @@ export async function customReadStream() {
},
});
}
export async function customWrite(input) {
return {
stream: process.stdout,
metadata: () => {
return "Hi! I'm metadata from customWriteStream!";
},
};
}

View File

@ -160,6 +160,32 @@ b =
|> try
|> expectError "HTTP FatalError message"
"BadStatus: 404 Not Found"
, Stream.fromString "This is input..."
|> Stream.pipe
(Stream.customTransformWithMeta
"upperCaseStream"
Encode.null
(Decode.string |> Decode.map Ok)
)
|> Stream.read
|> try
|> test "duplex meta"
(Expect.equal
{ metadata = "Hi! I'm metadata from upperCaseStream!"
, body = "THIS IS INPUT..."
}
)
, Stream.fromString "This is input to writeStream!\n"
|> Stream.pipe
(Stream.customWriteWithMeta
"customWrite"
Encode.null
(Decode.string |> Decode.map Ok)
)
|> Stream.readMetadata
|> try
|> test "writeStream meta"
(Expect.equal "Hi! I'm metadata from customWriteStream!")
]

View File

@ -616,21 +616,21 @@ function runStream(req, portsFile) {
resolve(
jsonResponse(req, {
body: await consumers.json(lastStream),
metadata: await metadataResponse,
metadata: await tryCallingFunction(metadataResponse),
})
);
} else if (kind === "text") {
resolve(
jsonResponse(req, {
body: await consumers.text(lastStream),
metadata: await metadataResponse,
metadata: await tryCallingFunction(metadataResponse),
})
);
} else if (kind === "none") {
resolve(
jsonResponse(req, {
body: null,
metadata: await metadataResponse,
metadata: await tryCallingFunction(metadataResponse),
})
);
} else if (kind === "command") {
@ -646,6 +646,21 @@ function runStream(req, portsFile) {
});
}
async function tryCallingFunction(func) {
if (func) {
// if is promise
if (func.then) {
return await func;
}
// if is function
else if (typeof func === "function") {
return await func();
}
} else {
return func;
}
}
/**
*
* @param {import('node:stream').Stream} lastStream
@ -678,9 +693,9 @@ async function pipePartToStream(
quiet,
env,
});
if (validateStream.isDuplexStream(newLocal)) {
lastStream.pipe(newLocal);
return { stream: newLocal };
if (validateStream.isDuplexStream(newLocal.stream)) {
lastStream.pipe(newLocal.stream);
return newLocal;
} else {
throw `Expected '${part.portName}' to be a duplex stream!`;
}
@ -690,10 +705,18 @@ async function pipePartToStream(
stream: await portsFile[part.portName](part.input, { cwd, quiet, env }),
};
} else if (part.name === "customWrite") {
return {
metadata: null,
stream: await portsFile[part.portName](part.input, { cwd, quiet, env }),
};
const newLocal = await portsFile[part.portName](part.input, {
cwd,
quiet,
env,
});
if (!validateStream.isWritableStream(newLocal.stream)) {
console.error("Expected a writable stream!");
resolve({ error: "Expected a writable stream!" });
} else {
lastStream && lastStream.pipe(newLocal.stream);
}
return newLocal;
} else if (part.name === "gzip") {
return { metadata: null, stream: lastStream.pipe(zlib.createGzip()) };
} else if (part.name === "unzip") {
@ -717,12 +740,14 @@ async function pipePartToStream(
retry: part.retries,
timeout: part.timeoutInMs,
});
let metadata = {
headers: Object.fromEntries(response.headers.entries()),
statusCode: response.status,
// bodyKind,
url: response.url,
statusText: response.statusText,
let metadata = () => {
return {
headers: Object.fromEntries(response.headers.entries()),
statusCode: response.status,
// bodyKind,
url: response.url,
statusText: response.statusText,
};
};
return { metadata, stream: response.body };
} else if (part.name === "command") {
@ -777,7 +802,7 @@ async function pipePartToStream(
return { metadata: null, stream: newStream };
}
} else if (part.name === "fromString") {
return { stream: Readable.from([part.string]) };
return { stream: Readable.from([part.string]), metadata: null };
} else {
// console.error(`Unknown stream part: ${part.name}!`);
// process.exit(1);

View File

@ -2,12 +2,13 @@ module BackendTask.Stream exposing
( Stream
, fileRead, fileWrite, fromString, http, httpWithInput, pipe, stdin, stdout, stderr, gzip, unzip
, command
, read, readJson, run
, read, readJson, readMetadata, run
, Error(..)
, commandWithOptions
, StderrOutput(..)
, CommandOptions, defaultCommandOptions, allowNon0Status, withOutput, withTimeout
, customRead, customWrite, customDuplex
, customReadWithMeta, customTransformWithMeta, customWriteWithMeta
)
{-| A `Stream` represents a flow of data through a pipeline.
@ -56,7 +57,7 @@ End example
## Running Streams
@docs read, readJson, run
@docs read, readJson, readMetadata, run
@docs Error
@ -74,6 +75,11 @@ End example
@docs customRead, customWrite, customDuplex
### With Metadata Decoders
@docs customReadWithMeta, customTransformWithMeta, customWriteWithMeta
-}
import BackendTask exposing (BackendTask)
@ -169,6 +175,48 @@ customWrite name input =
]
{-| -}
customReadWithMeta :
String
-> Encode.Value
-> Decoder (Result { fatal : FatalError, recoverable : error } metadata)
-> Stream error metadata { read : (), write : Never }
customReadWithMeta name input decoder =
single ( "", decoder )
"customRead"
[ ( "portName", Encode.string name )
, ( "input", input )
]
{-| -}
customWriteWithMeta :
String
-> Encode.Value
-> Decoder (Result { fatal : FatalError, recoverable : error } metadata)
-> Stream error metadata { read : Never, write : () }
customWriteWithMeta name input decoder =
single ( "", decoder )
"customWrite"
[ ( "portName", Encode.string name )
, ( "input", input )
]
{-| -}
customTransformWithMeta :
String
-> Encode.Value
-> Decoder (Result { fatal : FatalError, recoverable : error } metadata)
-> Stream error metadata { read : (), write : () }
customTransformWithMeta name input decoder =
single ( "", decoder )
"customDuplex"
[ ( "portName", Encode.string name )
, ( "input", input )
]
{-| -}
customDuplex : String -> Encode.Value -> Stream () () { read : (), write : () }
customDuplex name input =
@ -333,43 +381,103 @@ read ((Stream ( _, decoder ) _) as stream) =
, body = BackendTask.Http.jsonBody (pipelineEncoder stream "text")
, expect =
BackendTask.Http.expectJson
(Decode.oneOf
[ Decode.field "error" Decode.string
|> Decode.andThen
(\error ->
Decode.succeed
(Err
(FatalError.recoverable
{ title = "Stream Error"
, body = error
}
(StreamError error)
)
)
)
, decodeLog (Decode.field "metadata" decoder)
|> Decode.andThen
(\result ->
case result of
Ok metadata ->
Decode.map
(\body ->
Ok
{ metadata = metadata
, body = body
}
(decodeLog
(Decode.oneOf
[ Decode.field "error" Decode.string
|> Decode.andThen
(\error ->
Decode.succeed
(Err
(FatalError.recoverable
{ title = "Stream Error"
, body = error
}
(StreamError error)
)
(Decode.field "body" Decode.string)
Err error ->
Decode.field "body" Decode.string
|> Decode.maybe
|> Decode.map
)
)
, decodeLog (Decode.field "metadata" decoder)
|> Decode.andThen
(\result ->
case result of
Ok metadata ->
Decode.map
(\body ->
error |> mapRecoverable body |> Err
Ok
{ metadata = metadata
, body = body
}
)
(Decode.field "body" Decode.string)
Err error ->
Decode.field "body" Decode.string
|> Decode.maybe
|> Decode.map
(\body ->
error |> mapRecoverable body |> Err
)
)
, Decode.succeed
(Err
(FatalError.recoverable
{ title = "Stream Error", body = "No metadata" }
(StreamError "No metadata")
)
)
]
]
)
)
}
|> BackendTask.andThen BackendTask.fromResult
{-| -}
readMetadata :
Stream error metadata { read : read, write : write }
-> BackendTask { fatal : FatalError, recoverable : Error error String } metadata
readMetadata ((Stream ( _, decoder ) _) as stream) =
BackendTask.Internal.Request.request
{ name = "stream"
-- TODO pass in `decoderName` to pipelineEncoder
, body = BackendTask.Http.jsonBody (pipelineEncoder stream "none")
, expect =
BackendTask.Http.expectJson
(decodeLog
(Decode.oneOf
[ Decode.field "error" Decode.string
|> Decode.andThen
(\error ->
Decode.succeed
(Err
(FatalError.recoverable
{ title = "Stream Error"
, body = error
}
(StreamError error)
)
)
)
, decodeLog (Decode.field "metadata" decoder)
|> Decode.map
(\result ->
case result of
Ok metadata ->
Ok metadata
Err error ->
error |> mapRecoverable Nothing |> Err
)
, Decode.succeed
(Err
(FatalError.recoverable
{ title = "Stream Error", body = "No metadata" }
(StreamError "No metadata")
)
)
]
)
)
}
|> BackendTask.andThen BackendTask.fromResult
@ -382,7 +490,7 @@ decodeLog decoder =
(\_ ->
--let
-- _ =
-- Debug.log (Encode.encode 2 value) ()
-- Debug.log "VALUE" (Encode.encode 2 value)
--in
decoder
)