Experimental features¶
EVP streams¶
Introduction¶
EVP streams are meant as a communication mechanism between module instances. Their design is closely inspired by the POSIX sockets interface, but streams are opinionated towards asynchronous communication. This means, among other things:
Write operations do not block the calling thread. Data shall be transferred asynchronously, probably from another thread.
Users do not have to read synchronously. Instead, a user-defined callback shall be triggered once input data is available.
Similarly to POSIX sockets, EVP streams do not assume a specific transport layer or network interface, but are extensible to any implementation that provides the semantics required by its public interface, described below:
EVP_RESULT EVP_streamInputOpen(struct EVP_client *h, const char *name, EVP_STREAM_READ_CALLBACK cb, void *userData, EVP_STREAM *stream);
EVP_RESULT EVP_streamOutputOpen(struct EVP_client *h, const char *name, EVP_STREAM *stream);
EVP_RESULT EVP_streamClose(struct EVP_client *h, EVP_STREAM stream);
EVP_RESULT EVP_streamWrite(struct EVP_client *h, EVP_STREAM stream, const void *buf, size_t n);
Where:
EVP_streamInputOpen()sets up a stream that must be configured with directioninand namenamefrom thedeploymentManifest.cbrefers to a user-defined callback that will be triggered by the implementation as soon as input data is available.useris an opaque pointer that is passed to the callback pointed to bycb. The implementation will not attempt to read or modify the contents pointed to byuser. Therefore,usercan also be a null pointer.EVP_streamOutputOpen()sets up a stream that must be configured with directionoutand namenamefrom thedeploymentManifest.EVP_streamClose()releases the resources allocated by a stream opened withEVP_streamInputOpen()orEVP_streamOutputOpen(). For streams opened withEVP_streamOutputOpen(),EVP_streamClose()shall flush any pending outgoing messages before closing the stream.EVP_streamWrite()must queue the request defined by theconst void *, which is the user payload, and thesize_t, which defines its size. This function must return immediately, and therefore is not required to send data over the network. This should be done asynchronously, for example, via a separate thread. This function can only be used with streams previously opened withEVP_streamOutputOpen().
Another significant difference between POSIX sockets and EVP streams is
their direction: while POSIX sockets can be bidirectional, EVP streams are
only unidirectional. In other words, EVP streams can be either in or
out, but never both. As a consequence, using EVP_streamWrite() on an
in stream will return an error.
Bidirectional streams are currently not planned, but it should still be possible to implement them. Otherwise, they might limit some usecases.
Use cases¶
EVP streams have been designed for real-world scenarios, such as:
A network of cameras that send information to a central server, without relying on a MQTT broker.
Usage¶
Two things are required from the user side:
Add streams to the deploymentManifest¶
If EVP streams are used, the instanceSpecs object from a deploymentManifest
must include a streams object in it, with at least one stream definition.
The example below shows an output stream, called my-stream, of type example:
{
"instanceSpecs": {
"instance-name": {
"streams": {
"my-stream": {
"type": "example",
"direction": "out",
"parameters": {
// Type-specific parameters, if any.
}
}
}
}
}
}
The definition for the parameters object is entirely type-defined, and
might as well not exist if the stream type does not require it.
The following stream types are supported:
null: a placeholder implementation only meant for testing purposes.nullstreams are always supported by the agent.posix: only uses the POSIX C standard library.
Take into account that, if a module does not use any stream, the streams
object must not exist.
Open the stream from the module instance¶
When a module instance has been started with a set of configured streams, a
call to EVP_streamInputOpen() or EVP_streamOutputOpen() will look up the
current deploymentManifest to check whether the stream was defined
accordingly.
The example module below shows how to open an output stream called
my-stream:
int main(void)
{
EVP_client *h = EVP_initialize();
/* Pointer validation. */
EVP_stream stream;
EVP_RESULT result = EVP_streamOutputOpen(h, "my-stream", &stream);
if (result != EVP_OK) {
/* Handle error. */
}
/* Application code. */
}
If my-stream could not be found on the streams JSON object belonging to
the calling module instance, EVP_INVAL will be returned. See the
documentation for EVP_streamInputOpen() and EVP_streamOutputOpen() for a list of possible error values.
Once a stream has been opened successfully:
Input streams shall get their user-defined callback triggered once input data is available.
Output streams can call
EVP_streamWrite().
High-level design¶
As stated above, one of the design goals behind EVP streams is extensibility. This means any protocol that provides a reliable, stream-oriented connection is eligible as a stream type, such as the TCP protocol.
File hierarchy¶
EVP streams are implemented with the following source files from the streams directory:
stream.h: provides the type definitions and function declarations used internally by the agent and
evpmodulesdk.stream.c: provides the protocol-agnostic code shared among all stream types.
However, this implementation must be complemented with:
SDK-specific implementation:
Private interfaces¶
stream.h declares the functions that must be implemented by the SDK
implementation:
struct stream_impl *stream_from_stream(struct EVP_client *h, EVP_STREAM stream);
struct stream_impl *stream_from_name(struct EVP_client *h, const char *name);
EVP_RESULT stream_insert(struct EVP_client *h, struct stream_impl *si);
EVP_RESULT stream_remove(struct EVP_client *h, struct stream_impl *si);
int stream_insert_read_event(struct EVP_client *h, struct sdk_event_stream_read_available *ev);
where:
stream_from_stream: a thin wrapper overstream_impl_from_stream, required since theTAILQcontaining the available streams is defined onstruct EVP_client, whose definition in turn depends on the SDK implementation. This function creates the relationship between aEVP_STREAM(an opaque integer type) andstruct stream_impl *(the internal data structure used across all files on this component).stream_from_name: a thin wrapper overstream_impl_from_name, required since theTAILQcontaining the available streams is defined onstruct EVP_client, whose definition in turn depends on the SDK implementation.
Stream type-specific interfaces¶
An instance of the following data type must be defined for each stream type:
struct stream_ops {
EVP_RESULT (*init)(struct stream_impl *);
int (*close)(struct stream_impl *);
int (*write)(const struct stream_impl *, const void *, size_t);
int (*read)(struct stream_impl *, struct stream_read *sr);
void (*free_msg)(void *);
int (*atexit)(void);
};
where:
initperforms the required type-specific initialization of a stream. The desired stream configuration is contained inside thecfgmember on thestruct stream_implpassed to this function. A status code as defined byEVP_RESULTmust be returned. If notEVP_OK, the status code shall be propagated to the user.closemust deallocate any resources previously allocated by a call toinit. Returns zero if successful, non-zero otherwise.writemust queue the request defined by theconst void *, which is the user payload, and thesize_t, which defines its size. This function must return immediately, and therefore is not required to send data over the network. This should be done asynchronously e.g.: via a separate thread.readis called from a separate thread bystream.cand must lock until a message is received from a peer. When a message is received from a peer,readmust fill thestruct stream_readpassed to it. See chapter “Filling astruct stream_read” for further reference.free_msgdefines how to release the resources as given by thefree_argsmember onstruct stream_read.atexitdefines a function handler that will be registered to the standardatexit(3)function. It can be a null pointer if no actions are required.
Input streams¶
Since EVP streams are meant to receive messages asynchronously from peers, this requires setting up a separate thread that can block until a message is received, without blocking the normal execution flow of the agent or SDK.
This thread is required for all stream types,
so it is implemented by stream.c
. The thread body is defined by the function in_thread. There, it triggers
the type-specific read callback and, if successful, it shall call
notify_read_available to create a new
struct sdk_event_stream_read_available event.
However, implementations must fill a struct stream_read instance, which
contains the following members:
struct stream_read {
EVP_STREAM_PEER_ID id;
const void *buf;
size_t n;
void *free_args;
};
where:
idis an opaque identifier that shall be propagated to user code, and is typically meant to identify a connection. User must not make any assumptions about the meaning behind this value, as it is entirely implementation-defined. This member was added as a compromise to distinguish several peers on unencrypted connections. However, it is meant to be replaced with a strong authentication method once TLS support is added to EVP streams.bufis the pointer where the input message is stored. This is defined as a read-only pointer since users are not meant to modify it. However, ifbufneeds to be released by the implementation,free_argscan be used for this purpose.nis the length of the input message.free_argsis an optional pointer that shall be passed to thefreecallback instruct sdk_event_stream_read_available, once the event has been processed by the user-defined callback. It can be a null pointer if no resources need to be released. It can also point to the same buffer pointed to bybuf.
Filling a struct stream_read¶
When an input message is received from a peer, a struct stream_read must
be filled by the implementation so as to generate a
struct sdk_event_stream_read_available instance that can be appended to the
struct EVP_client member events.
Inserting a struct sdk_event_stream_read_available¶
Once an input message is received, stream.c
will call stream_insert_read_event in order to ask the SDK implementation
how the event should be stored in the event queue.
Local SDK¶
Since EVP_client was already designed to be accessed from multiple threads
in the case of local SDK, the implementation for stream_insert_read_event
was relatively straightforward: the struct sdk_event_stream_read_available
instance can be safely appended into the queue as long as the sdk_{un}lock
function pair is called.
Remote SDK¶
A new challenge appeared when attempting to integrate this functionality:
as opposed to local SDK, remote SDK has no mechanism to access a
struct EVP_client instance in a thread-safe manner.
The possibility to add mutexes in various places to ensure thread-safety is not implemented as it would have added a significant amount of complexity to the existing remote SDK implementation, which already suffers from unneeded complexity.
Therefore, it was preferred to take advantage
of the call to poll(2) on client_io.c.
Since this system call can monitor several file descriptors at once, it can
be used to monitor events coming from in_thread.
This solution requires the use of nameless AF_UNIX sockets, which can be
achieved via the socketpair(2) function. Then, the new file descriptor would
be added to the list defined by the struct pollfd instance passed to
poll(2).
This required some more changes,
though, as client_io.c
always assumed that any incoming information from the only file descriptor
consumed by poll(2) comes from the agent side. However, this assumption was
no longer true, as EVP streams now introduce a new event source.
In other words, poll(2) would now return because of the following events:
A timeout;
Information coming from the agent side;
Information coming from the thread running
in_thread.
To solve this, a callback table was set up to assign a specific function for each file descriptor:
static int (*const process[sizeof(fds) / sizeof(*fds)])(
struct sdk_client *,
const struct pollfd *) = {process_agent_in, process_stream_in};
Therefore, when stream_insert_read_event is called, the address of the
struct sdk_event_stream_read_available instance passed to it is sent over
the socket. Notice that only the struct address is required, as the
ownership is always transferred to the event queue, which is then responsible
for deallocating the resources.
Since the socket pair is shared among all streams, it is required to set up
a mutex that ensures only full and ordered packets are sent, even if the
packet size is in fact small (that is, sizeof (void *)).
Multi-layered callback interface¶
As suggested above, the remote SDK implementation is a bit overly complex for several reasons. One of them is the abuse of opaque data types that end up obscuring or forbidding the access to internal data structures without any clear benefits, as the various interfaces within the remote SDK are never meant to be available to users in any case.
As a consequence of this, process_stream_in has no way to retrieve the
struct EVP_client related to the stream, since its caller is only limited
to a struct sdk_client instance. This forces EVP_initialize() to set up
a callback/opaque-pointer pair that allows sdk.c
to insert the new event into struct EVP_client member events, which must
be assigned to its struct sdk_transport instance, which is the only data
type available from struct sdk_client.
FAQ¶
Why not use POSIX sockets directly?¶
Networking in wasi-sdk
is not supported. This means that code such as the example below would not
build because of missing functionality, for example, with undefined reference to socket
link-time errors:
#include <sys/socket.h>
int main(void)
{
int fd = socket(/* arguments */);
/* Application code. */
}
This means applications hosting a Wasm runtime, such as the agent running wasm-micro-runtime, must relay networking from module instances to the host.
Why asynchronous I/O?¶
While a thinner abstraction with identical semantics compared to POSIX sockets could have been possible, an interface always relying on asynchronous semantics was deemed more desirable and interesting to future users.