Channels¶
Channels are mechanisms used in the agent to share and synchronize threads. They are basically a queue that supports two primitives:
chan_send
.
chan_recv
.
Both of them are based on a chan_msg
that is the unit transmitted in the queue.
These functions return the number of messages sent/received,
0 or 1,
with 0 usually being an error in the case of chan_send
.
A chan_msg
is:
struct chan_msg {
void (*fn)(struct chan_msg *);
void *param;
void *resp;
};
The field fn
defines the function
that will attend the message on the receiver side
while param
is a pointer to the input parameters
and resp
is a pointer to the output response.
The field resp
also defines the behaviour of the channel,
if it is NULL
then it is considered an asynchronous message
but if it is different than NULL
then
it is a synchronous message.
There are several versions the receiving function:
chan_recv
: It blocks the thread until a message is received.
chan_tryrecv
: It attends a pending message if any and otherwise it just returns without blocking.
chan_timedrecv
: It blocks the thread until a message is received or a timeout expires.
It is important to notice that in both cases,
synchronous and asynchronous messages,
the message passed to chan_send
is copied when
it is stored in the channel queue,
so the sender can use local variables for chan_msg
even in the case of asynchronous messages.
Beware that it only applies to the chan_msg
struct itself
and not to the memory pointed to by params
and resp
.
For asynchronous messages, when resp
is NULL
,
care must be taken in allocating params
in the heap or statically, because these data should be
available to the agent running on another thread.
Semantic of synchronous messages¶
A synchronous channel follows the semantic of CSP,
where a communication in the channel implies a rendezvous
between chan_send
and chan_recv
.
The thread calling chan_send
will block
until the other endpoint thread calls chan_recv
and it finishes the execution of the receiving function indicated in the message.
This rendezvous implies a synchronization between both threads
and it means
that the receiving thread can access the parameters in mutual exclusion.
This can be summarized by the Go lemma Share Memory By Communicating:
Don't communicate by sharing memory; share memory by communicating.
Channels allow passing references to data structures between threads. If this is considered passing around ownership of the data (the ability to read and write it), they become a powerful and expressive synchronization mechanism.
Semantic of asynchronous messages¶
When a message is sent using chan_send
but
the field resp
is NULL
then the message is just enqueued and sends returns immediately.
It means
that the life of parameters passed by params
must be longer that
the function calling chan_send
,
usually implying dynamic memory because
as the sender doesn't known when
the receiver will attend the message
only the receiver can free the memory allocated by the sender.
Example of a synchronous message¶
struct sum_params {
int *ary;
int n;
};
void
add(struct chan_msg *msg)
{
int i, n, *out = msg->resp;
struct sum_params *p = msg->params;
for (i = n = 0; i < p->n; i++)
n += p->ary[n];
*out = n;
}
void
sender(struct chan *ch, int *array, int size)
{
struct chan_msg msg;
struct sum_params par;
int sum;
par.ary = array
par.n = siz;
msg.fn = add;
msg.params = ∥
msg.resp = ∑
if (chan_send(&msg) != 1) {
perror("sender");
return;
}
printf("sum value=%d\n", ret);
}
void
receiver(struct chan *ch)
{
for (;;)
chan_recv(ch);
}
The thread calling sender
will block
until the thread calling receiver
finishes the execution of add
and it means
that receiver
accesses the integer array in mutual exclusion
without race conditions.
Example of an asynchronous message¶
struct sum_params {
int *ary;
int n;
};
void
add(struct chan_msg *msg)
{
int i, n;
struct sum_params *p = msg->params;
for (i = n = 0; i < p->n; i++)
n += p->ary[n];
printf("sum value=%d\n", n);
free(p);
}
void
sender(struct chan *ch, int *array, int n)
{
struct chan_msg msg;
struct sum_params *par;
par = malloc(sizeof(struct sum_params));
if (!par) {
free(ary);
perror("sender");
}
par->ary = array
par->n = n;
msg.fn = add;
msg.params = ∥
msg.resp = NULL;
if (chan_send(&msg) != 1) {
perror("sender");
return;
}
}
void
receiver(struct chan *ch)
{
for (;;)
chan_recv(ch);
}
The thread calling sender
returns from chan_send
as soon
the request is stored in the internal queue of the channel
and it means
that it cannot free the memory pointed to by par
because it does not known when
the receiver thread will use it.
In the same way,
the thread calling sender
does not know
when it can use the memory pointed by array without race conditions.
External mechanism are needed to ensure the mutual exclusion
of the sender thread has to assume that
the full ownership of the memory pointed to by array
was transferred to the thread calling chan_recv
until the end of life of that memory.