Experimental features¶
EVP streams¶
Introduction¶
EVP streams are meant as a communication mechanism between module instances. Their design is closely inspired by the POSIX sockets interface, but streams are opinionated towards asynchronous communication. This means, among other things:
Write operations do not block the calling thread. Data shall be transferred asynchronously, probably from another thread.
Users do not have to read synchronously. Instead, a user-defined callback shall be triggered once input data is available.
Similarly to POSIX sockets, EVP streams do not assume a specific transport layer or network interface, but are extensible to any implementation that provides the semantics required by its public interface, described below:
EVP_RESULT EVP_streamInputOpen(struct EVP_client *h, const char *name, EVP_STREAM_READ_CALLBACK cb, void *userData, EVP_STREAM *stream);
EVP_RESULT EVP_streamOutputOpen(struct EVP_client *h, const char *name, EVP_STREAM *stream);
EVP_RESULT EVP_streamClose(struct EVP_client *h, EVP_STREAM stream);
EVP_RESULT EVP_streamWrite(struct EVP_client *h, EVP_STREAM stream, const void *buf, size_t n);
Where:
EVP_streamInputOpen()
sets up a stream that must be configured with directionin
and namename
from thedeploymentManifest
.cb
refers to a user-defined callback that will be triggered by the implementation as soon as input data is available.user
is an opaque pointer that is passed to the callback pointed to bycb
. The implementation will not attempt to read or modify the contents pointed to byuser
. Therefore,user
can also be a null pointer.EVP_streamOutputOpen()
sets up a stream that must be configured with directionout
and namename
from thedeploymentManifest
.EVP_streamClose()
releases the resources allocated by a stream opened withEVP_streamInputOpen()
orEVP_streamOutputOpen()
. For streams opened withEVP_streamOutputOpen()
,EVP_streamClose()
shall flush any pending outgoing messages before closing the stream.EVP_streamWrite()
must queue the request defined by theconst void *
, which is the user payload, and thesize_t
, which defines its size. This function must return immediately, and therefore is not required to send data over the network. This should be done asynchronously, for example, via a separate thread. This function can only be used with streams previously opened withEVP_streamOutputOpen()
.
Another significant difference between POSIX sockets and EVP streams is
their direction: while POSIX sockets can be bidirectional, EVP streams are
only unidirectional. In other words, EVP streams can be either in
or
out
, but never both. As a consequence, using EVP_streamWrite()
on an
in
stream will return an error.
Bidirectional streams are currently not planned, but it should still be possible to implement them. Otherwise, they might limit some usecases.
Use cases¶
EVP streams have been designed for real-world scenarios, such as:
A network of cameras that send information to a central server, without relying on a MQTT broker.
Usage¶
Two things are required from the user side:
Add streams
to the deploymentManifest
¶
If EVP streams are used, the instanceSpecs
object from a deploymentManifest
must include a streams
object in it, with at least one stream definition.
The example below shows an output stream, called my-stream
, of type example
:
{
"instanceSpecs": {
"instance-name": {
"streams": {
"my-stream": {
"type": "example",
"direction": "out",
"parameters": {
// Type-specific parameters, if any.
}
}
}
}
}
}
The definition for the parameters
object is entirely type
-defined, and
might as well not exist if the stream type does not require it.
The following stream types are supported:
null
: a placeholder implementation only meant for testing purposes.null
streams are always supported by the agent.posix
: only uses the POSIX C standard library.
Take into account that, if a module does not use any stream, the streams
object must not exist.
Open the stream from the module instance¶
When a module instance has been started with a set of configured streams, a
call to EVP_streamInputOpen()
or EVP_streamOutputOpen()
will look up the
current deploymentManifest
to check whether the stream was defined
accordingly.
The example module below shows how to open an output stream called
my-stream
:
int main(void)
{
EVP_client *h = EVP_initialize();
/* Pointer validation. */
EVP_stream stream;
EVP_RESULT result = EVP_streamOutputOpen(h, "my-stream", &stream);
if (result != EVP_OK) {
/* Handle error. */
}
/* Application code. */
}
If my-stream
could not be found on the streams
JSON object belonging to
the calling module instance, EVP_INVAL
will be returned. See the
documentation for EVP_streamInputOpen()
and EVP_streamOutputOpen()
for a list of possible error values.
Once a stream has been opened successfully:
Input streams shall get their user-defined callback triggered once input data is available.
Output streams can call
EVP_streamWrite()
.
High-level design¶
As stated above, one of the design goals behind EVP streams is extensibility. This means any protocol that provides a reliable, stream-oriented connection is eligible as a stream type, such as the TCP protocol.
File hierarchy¶
EVP streams are implemented with the following source files from the streams directory:
stream.h: provides the type definitions and function declarations used internally by the agent and
evpmodulesdk
.stream.c: provides the protocol-agnostic code shared among all stream types.
However, this implementation must be complemented with:
SDK-specific implementation:
Private interfaces¶
stream.h
declares the functions that must be implemented by the SDK
implementation:
struct stream_impl *stream_from_stream(struct EVP_client *h, EVP_STREAM stream);
struct stream_impl *stream_from_name(struct EVP_client *h, const char *name);
EVP_RESULT stream_insert(struct EVP_client *h, struct stream_impl *si);
EVP_RESULT stream_remove(struct EVP_client *h, struct stream_impl *si);
int stream_insert_read_event(struct EVP_client *h, struct sdk_event_stream_read_available *ev);
where:
stream_from_stream
: a thin wrapper overstream_impl_from_stream
, required since theTAILQ
containing the available streams is defined onstruct EVP_client
, whose definition in turn depends on the SDK implementation. This function creates the relationship between aEVP_STREAM
(an opaque integer type) andstruct stream_impl *
(the internal data structure used across all files on this component).stream_from_name
: a thin wrapper overstream_impl_from_name
, required since theTAILQ
containing the available streams is defined onstruct EVP_client
, whose definition in turn depends on the SDK implementation.
Stream type-specific interfaces¶
An instance of the following data type must be defined for each stream type:
struct stream_ops {
EVP_RESULT (*init)(struct stream_impl *);
int (*close)(struct stream_impl *);
int (*write)(const struct stream_impl *, const void *, size_t);
int (*read)(struct stream_impl *, struct stream_read *sr);
void (*free_msg)(void *);
int (*atexit)(void);
};
where:
init
performs the required type-specific initialization of a stream. The desired stream configuration is contained inside thecfg
member on thestruct stream_impl
passed to this function. A status code as defined byEVP_RESULT
must be returned. If notEVP_OK
, the status code shall be propagated to the user.close
must deallocate any resources previously allocated by a call toinit
. Returns zero if successful, non-zero otherwise.write
must queue the request defined by theconst void *
, which is the user payload, and thesize_t
, which defines its size. This function must return immediately, and therefore is not required to send data over the network. This should be done asynchronously e.g.: via a separate thread.read
is called from a separate thread bystream.c
and must lock until a message is received from a peer. When a message is received from a peer,read
must fill thestruct stream_read
passed to it. See chapter “Filling astruct stream_read
” for further reference.free_msg
defines how to release the resources as given by thefree_args
member onstruct stream_read
.atexit
defines a function handler that will be registered to the standardatexit(3)
function. It can be a null pointer if no actions are required.
Input streams¶
Since EVP streams are meant to receive messages asynchronously from peers, this requires setting up a separate thread that can block until a message is received, without blocking the normal execution flow of the agent or SDK.
This thread is required for all stream types,
so it is implemented by stream.c
. The thread body is defined by the function in_thread
. There, it triggers
the type-specific read
callback and, if successful, it shall call
notify_read_available
to create a new
struct sdk_event_stream_read_available
event.
However, implementations must fill a struct stream_read
instance, which
contains the following members:
struct stream_read {
EVP_STREAM_PEER_ID id;
const void *buf;
size_t n;
void *free_args;
};
where:
id
is an opaque identifier that shall be propagated to user code, and is typically meant to identify a connection. User must not make any assumptions about the meaning behind this value, as it is entirely implementation-defined. This member was added as a compromise to distinguish several peers on unencrypted connections. However, it is meant to be replaced with a strong authentication method once TLS support is added to EVP streams.buf
is the pointer where the input message is stored. This is defined as a read-only pointer since users are not meant to modify it. However, ifbuf
needs to be released by the implementation,free_args
can be used for this purpose.n
is the length of the input message.free_args
is an optional pointer that shall be passed to thefree
callback instruct sdk_event_stream_read_available
, once the event has been processed by the user-defined callback. It can be a null pointer if no resources need to be released. It can also point to the same buffer pointed to bybuf
.
Filling a struct stream_read
¶
When an input message is received from a peer, a struct stream_read
must
be filled by the implementation so as to generate a
struct sdk_event_stream_read_available
instance that can be appended to the
struct EVP_client
member events
.
Inserting a struct sdk_event_stream_read_available
¶
Once an input message is received, stream.c
will call stream_insert_read_event
in order to ask the SDK implementation
how the event should be stored in the event queue.
Local SDK¶
Since EVP_client
was already designed to be accessed from multiple threads
in the case of local SDK, the implementation for stream_insert_read_event
was relatively straightforward: the struct sdk_event_stream_read_available
instance can be safely appended into the queue as long as the sdk_{un}lock
function pair is called.
Remote SDK¶
A new challenge appeared when attempting to integrate this functionality:
as opposed to local SDK, remote SDK has no mechanism to access a
struct EVP_client
instance in a thread-safe manner.
The possibility to add mutexes in various places to ensure thread-safety is not implemented as it would have added a significant amount of complexity to the existing remote SDK implementation, which already suffers from unneeded complexity.
Therefore, it was preferred to take advantage
of the call to poll(2)
on client_io.c.
Since this system call can monitor several file descriptors at once, it can
be used to monitor events coming from in_thread
.
This solution requires the use of nameless AF_UNIX
sockets, which can be
achieved via the socketpair(2)
function. Then, the new file descriptor would
be added to the list defined by the struct pollfd
instance passed to
poll(2)
.
This required some more changes,
though, as client_io.c
always assumed that any incoming information from the only file descriptor
consumed by poll(2)
comes from the agent side. However, this assumption was
no longer true, as EVP streams now introduce a new event source.
In other words, poll(2)
would now return because of the following events:
A timeout;
Information coming from the agent side;
Information coming from the thread running
in_thread
.
To solve this, a callback table was set up to assign a specific function for each file descriptor:
static int (*const process[sizeof(fds) / sizeof(*fds)])(
struct sdk_client *,
const struct pollfd *) = {process_agent_in, process_stream_in};
Therefore, when stream_insert_read_event
is called, the address of the
struct sdk_event_stream_read_available
instance passed to it is sent over
the socket. Notice that only the struct
address is required, as the
ownership is always transferred to the event queue, which is then responsible
for deallocating the resources.
Since the socket pair is shared among all streams, it is required to set up
a mutex that ensures only full and ordered packets are sent, even if the
packet size is in fact small (that is, sizeof (void *)
).
Multi-layered callback interface¶
As suggested above, the remote SDK implementation is a bit overly complex for several reasons. One of them is the abuse of opaque data types that end up obscuring or forbidding the access to internal data structures without any clear benefits, as the various interfaces within the remote SDK are never meant to be available to users in any case.
As a consequence of this, process_stream_in
has no way to retrieve the
struct EVP_client
related to the stream, since its caller is only limited
to a struct sdk_client
instance. This forces EVP_initialize()
to set up
a callback/opaque-pointer pair that allows sdk.c
to insert the new event into struct EVP_client
member events
, which must
be assigned to its struct sdk_transport
instance, which is the only data
type available from struct sdk_client
.
FAQ¶
Why not use POSIX sockets directly?¶
Networking in wasi-sdk
is not supported. This means that code such as the example below would not
build because of missing functionality, for example, with undefined reference to socket
link-time errors:
#include <sys/socket.h>
int main(void)
{
int fd = socket(/* arguments */);
/* Application code. */
}
This means applications hosting a Wasm runtime, such as the agent running wasm-micro-runtime, must relay networking from module instances to the host.
Why asynchronous I/O?¶
While a thinner abstraction with identical semantics compared to POSIX sockets could have been possible, an interface always relying on asynchronous semantics was deemed more desirable and interesting to future users.